Skip to main content

laminar_db/
pipeline_lifecycle.rs

1//! Pipeline lifecycle management: start, close, shutdown.
2//!
3//! Reopened `impl LaminarDB` — split from `db.rs`.
4#![allow(clippy::disallowed_types)] // cold path
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10use laminar_core::streaming;
11use rustc_hash::FxHashMap;
12
13use crate::db::{
14    infer_timestamp_format, LaminarDB, SourceWatermarkState, STATE_RUNNING, STATE_SHUTTING_DOWN,
15    STATE_STARTING, STATE_STOPPED,
16};
17use crate::error::DbError;
18
19/// Extract a checkpoint path prefix from an object store URL.
20///
21/// For `s3://bucket/prefix/path` → `"prefix/path/"`.
22/// For `file:///some/path` → `""` (local FS uses the path directly).
23pub(crate) fn url_to_checkpoint_prefix(url: &str) -> String {
24    // Strip scheme
25    let after_scheme = url.find("://").map_or(url, |i| &url[i + 3..]);
26
27    // For file:// URLs, the prefix is empty (LocalFileSystem already has the root)
28    if url.starts_with("file://") {
29        return String::new();
30    }
31
32    // For cloud URLs like s3://bucket/prefix → extract everything after bucket
33    if let Some(slash_pos) = after_scheme.find('/') {
34        let prefix = &after_scheme[slash_pos + 1..];
35        if prefix.is_empty() {
36            String::new()
37        } else if prefix.ends_with('/') {
38            prefix.to_string()
39        } else {
40            format!("{prefix}/")
41        }
42    } else {
43        String::new()
44    }
45}
46
47impl LaminarDB {
48    /// Shut down the database gracefully.
49    pub fn close(&self) {
50        self.shutdown
51            .store(true, std::sync::atomic::Ordering::Relaxed);
52    }
53
54    /// Check if the database is shut down.
55    pub fn is_closed(&self) -> bool {
56        self.shutdown.load(std::sync::atomic::Ordering::Relaxed)
57    }
58
59    /// Start the streaming pipeline.
60    ///
61    /// Activates all registered connectors and begins processing.
62    /// This is a no-op if the pipeline is already running.
63    ///
64    /// When the `kafka` feature is enabled and Kafka sources/sinks are
65    /// registered, this builds `KafkaSource`/`KafkaSink` instances and
66    /// spawns a background task that polls sources, executes stream queries
67    /// via `DataFusion`, and writes results to sinks.
68    ///
69    /// In embedded (in-memory) mode, this simply transitions to `Running`.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the pipeline cannot be started.
74    #[allow(clippy::too_many_lines)]
75    pub async fn start(&self) -> Result<(), DbError> {
76        let current = self.state.load(std::sync::atomic::Ordering::Acquire);
77        if current == STATE_RUNNING || current == STATE_STARTING {
78            return Ok(());
79        }
80        if current == STATE_STOPPED {
81            return Err(DbError::InvalidOperation(
82                "Cannot start a stopped pipeline. Create a new LaminarDB instance.".into(),
83            ));
84        }
85
86        self.state
87            .store(STATE_STARTING, std::sync::atomic::Ordering::Release);
88
89        // Snapshot connector registrations under the lock
90        let (source_regs, sink_regs, stream_regs, table_regs, has_external) = {
91            let mgr = self.connector_manager.lock();
92            (
93                mgr.sources().clone(),
94                mgr.sinks().clone(),
95                mgr.streams().clone(),
96                mgr.tables().clone(),
97                mgr.has_external_connectors(),
98            )
99        };
100
101        // Log which sources have external connectors for debugging.
102        for (name, reg) in &source_regs {
103            tracing::debug!(source = %name, connector_type = ?reg.connector_type, "Registered source");
104        }
105        for (name, reg) in &sink_regs {
106            tracing::debug!(sink = %name, connector_type = ?reg.connector_type, "Registered sink");
107        }
108
109        // Initialize checkpoint coordinator (shared across all pipeline modes)
110        if let Some(ref cp_config) = self.config.checkpoint {
111            use crate::checkpoint_coordinator::{
112                CheckpointConfig as CkpConfig, CheckpointCoordinator,
113            };
114
115            let max_retained = cp_config.max_retained.unwrap_or(3);
116
117            let store: Box<dyn laminar_storage::CheckpointStore> =
118                if let Some(ref url) = self.config.object_store_url {
119                    let obj_store = laminar_storage::object_store_factory::build_object_store(
120                        url,
121                        &self.config.object_store_options,
122                    )
123                    .map_err(|e| DbError::Config(format!("object store: {e}")))?;
124                    let prefix = url_to_checkpoint_prefix(url);
125                    Box::new(
126                        laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
127                            obj_store,
128                            prefix,
129                            max_retained,
130                        )
131                        .map_err(|e| DbError::Config(format!("checkpoint store runtime: {e}")))?,
132                    )
133                } else {
134                    let data_dir = cp_config
135                        .data_dir
136                        .clone()
137                        .or_else(|| self.config.storage_dir.clone())
138                        .unwrap_or_else(|| std::path::PathBuf::from("./data"));
139                    Box::new(
140                        laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
141                            &data_dir,
142                            max_retained,
143                        ),
144                    )
145                };
146
147            let config = CkpConfig {
148                interval: cp_config.interval_ms.map(std::time::Duration::from_millis),
149                max_retained,
150                ..CkpConfig::default()
151            };
152            let mut coord = CheckpointCoordinator::new(config, store);
153            coord.set_counters(Arc::clone(&self.counters));
154
155            // Wire per-core WAL for crash recovery between checkpoints.
156            // Without this, all data since the last checkpoint is lost on crash.
157            let wal_dir = cp_config
158                .data_dir
159                .clone()
160                .or_else(|| self.config.storage_dir.clone())
161                .unwrap_or_else(|| std::path::PathBuf::from("./data"))
162                .join("wal");
163            // Match the TPC runtime's core count logic: explicit config,
164            // available_parallelism for external connectors, or source count.
165            let num_cores = self
166                .config
167                .tpc
168                .as_ref()
169                .and_then(|t| t.num_cores)
170                .unwrap_or_else(|| {
171                    if has_external {
172                        std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
173                    } else {
174                        source_regs.len().max(1)
175                    }
176                });
177            match laminar_storage::per_core_wal::PerCoreWalManager::new(
178                laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, num_cores),
179            ) {
180                Ok(wal) => {
181                    tracing::info!(
182                        wal_dir = %wal_dir.display(),
183                        num_cores,
184                        "WAL manager initialized"
185                    );
186                    coord.register_wal_manager(wal);
187                }
188                Err(e) => {
189                    tracing::warn!(
190                        error = %e,
191                        "WAL initialization failed — running without WAL \
192                         (data between checkpoints may be lost on crash)"
193                    );
194                }
195            }
196
197            *self.coordinator.lock().await = Some(coord);
198        }
199
200        if has_external || !stream_regs.is_empty() {
201            tracing::info!(
202                sources = source_regs.len(),
203                sinks = sink_regs.len(),
204                streams = stream_regs.len(),
205                tables = table_regs.len(),
206                has_external,
207                "Starting pipeline"
208            );
209            self.start_connector_pipeline(
210                source_regs,
211                sink_regs,
212                stream_regs,
213                table_regs,
214                has_external,
215            )
216            .await?;
217        } else {
218            tracing::info!(
219                sources = source_regs.len(),
220                sinks = sink_regs.len(),
221                "Starting in embedded (in-memory) mode — no streams"
222            );
223        }
224
225        self.state
226            .store(STATE_RUNNING, std::sync::atomic::Ordering::Release);
227        Ok(())
228    }
229
230    /// Build and start the unified pipeline with sources, sinks, and streams.
231    ///
232    /// Handles both embedded (in-memory only) and connector-backed sources
233    /// through a single code path. Connector-less sources are wrapped as
234    /// `CatalogSourceConnector` to participate in the pipeline alongside
235    /// external connectors (Kafka, CDC, etc.).
236    ///
237    /// The pipeline uses a thread-per-core architecture: each source
238    /// runs on a dedicated I/O thread, pushing events through SPSC
239    /// queues to CPU-pinned core threads. A `TpcPipelineCoordinator`
240    /// tokio task drains core outboxes and runs SQL cycles.
241    ///
242    /// When `has_external` is false (pure embedded mode), one core thread
243    /// per source is used to minimize overhead while providing unified
244    /// watermark tracking, error handling, and checkpointing.
245    #[allow(clippy::too_many_lines)]
246    async fn start_connector_pipeline(
247        &self,
248        source_regs: HashMap<String, crate::connector_manager::SourceRegistration>,
249        sink_regs: HashMap<String, crate::connector_manager::SinkRegistration>,
250        stream_regs: HashMap<String, crate::connector_manager::StreamRegistration>,
251        table_regs: HashMap<String, crate::connector_manager::TableRegistration>,
252        has_external: bool,
253    ) -> Result<(), DbError> {
254        use crate::connector_manager::{
255            build_sink_config, build_source_config, build_table_config,
256        };
257        use crate::pipeline::{PipelineConfig, SourceRegistration, TpcPipelineCoordinator};
258        use crate::stream_executor::StreamExecutor;
259        use laminar_connectors::reference::{ReferenceTableSource, RefreshMode};
260
261        // Build StreamExecutor
262        let ctx = laminar_sql::create_session_context();
263        laminar_sql::register_streaming_functions(&ctx);
264        let mut executor = StreamExecutor::new(ctx);
265        executor.set_lookup_registry(Arc::clone(&self.lookup_registry));
266
267        // Register source schemas for ALL sources (both external connectors
268        // and catalog-bridge sources) so the executor can create empty
269        // placeholder tables when no data arrives in a given cycle.
270        for name in source_regs.keys() {
271            if let Some(entry) = self.catalog.get_source(name) {
272                executor.register_source_schema(name.clone(), entry.schema.clone());
273            }
274        }
275
276        for reg in stream_regs.values() {
277            executor.add_query(
278                reg.name.clone(),
279                reg.query_sql.clone(),
280                reg.emit_clause.clone(),
281                reg.window_config.clone(),
282                reg.order_config.clone(),
283            );
284        }
285
286        // Register temporal join tables as Versioned in the lookup registry
287        // so that execute_temporal_query() can use persistent versioned state.
288        for tcfg in executor.temporal_join_configs() {
289            if self.lookup_registry.get_entry(&tcfg.table_name).is_none() {
290                // Get initial data. If none exists yet, use an empty batch
291                // with the correct schema from the catalog (not Schema::empty).
292                let initial_batch = self
293                    .table_store
294                    .read()
295                    .to_record_batch(&tcfg.table_name)
296                    .or_else(|| {
297                        self.catalog
298                            .get_source(&tcfg.table_name)
299                            .map(|e| RecordBatch::new_empty(e.schema.clone()))
300                    })
301                    .unwrap_or_else(|| {
302                        RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty()))
303                    });
304                let key_columns = vec![tcfg.table_key_column.clone()];
305                let key_indices: Vec<usize> = key_columns
306                    .iter()
307                    .filter_map(|k| initial_batch.schema().index_of(k).ok())
308                    .collect();
309                let Ok(version_col_idx) =
310                    initial_batch.schema().index_of(&tcfg.table_version_column)
311                else {
312                    if !initial_batch.schema().fields().is_empty() {
313                        tracing::warn!(
314                            table=%tcfg.table_name,
315                            version_col=%tcfg.table_version_column,
316                            "Version column not found in temporal table schema; \
317                             will resolve on first CDC batch"
318                        );
319                    }
320                    // Register with empty index — built on first CDC update.
321                    self.lookup_registry.register_versioned(
322                        &tcfg.table_name,
323                        laminar_sql::datafusion::VersionedLookupState {
324                            batch: initial_batch,
325                            index: Arc::new(
326                                laminar_sql::datafusion::lookup_join_exec::VersionedIndex::default(
327                                ),
328                            ),
329                            key_columns,
330                            version_column: tcfg.table_version_column.clone(),
331                            stream_time_column: tcfg.stream_time_column.clone(),
332                        },
333                    );
334                    continue;
335                };
336                let index = Arc::new(
337                    laminar_sql::datafusion::lookup_join_exec::VersionedIndex::build(
338                        &initial_batch,
339                        &key_indices,
340                        version_col_idx,
341                    )
342                    .unwrap_or_default(),
343                );
344                self.lookup_registry.register_versioned(
345                    &tcfg.table_name,
346                    laminar_sql::datafusion::VersionedLookupState {
347                        batch: initial_batch,
348                        index,
349                        key_columns,
350                        version_column: tcfg.table_version_column.clone(),
351                        stream_time_column: tcfg.stream_time_column.clone(),
352                    },
353                );
354            }
355        }
356
357        // Build sources as owned SourceRegistrations (no Arc<Mutex>).
358        let mut sources: Vec<SourceRegistration> = Vec::new();
359        for (name, reg) in &source_regs {
360            if reg.connector_type.is_none() {
361                continue;
362            }
363            let mut config = build_source_config(reg)?;
364
365            // Pass the SQL-defined Arrow schema to the connector so it can
366            // deserialize records with the correct column names and types.
367            if let Some(entry) = self.catalog.get_source(name) {
368                let schema_str = crate::pipeline_callback::encode_arrow_schema(&entry.schema);
369                config.set("_arrow_schema".to_string(), schema_str);
370            }
371
372            let source = self
373                .connector_registry
374                .create_source(&config)
375                .map_err(|e| {
376                    DbError::Connector(format!(
377                        "Cannot create source '{}' (type '{}'): {e}",
378                        name,
379                        config.connector_type()
380                    ))
381                })?;
382            let supports_replay = source.supports_replay();
383            if !supports_replay {
384                tracing::warn!(
385                    source = %name,
386                    "source does not support replay — exactly-once semantics \
387                     are degraded to at-most-once for this source"
388                );
389            }
390            // Wire event.time.column from connector config to the core Source
391            // so SourceWatermarkState can extract watermarks from batch data.
392            if let Some(entry) = self.catalog.get_source(name) {
393                if entry.source.event_time_column().is_none() {
394                    if let Some(col) = config.get("event.time.column") {
395                        entry.source.set_event_time_column(col);
396                    } else if let Some(col) = config.get("event.time.field") {
397                        entry.source.set_event_time_column(col);
398                    }
399                }
400            }
401
402            sources.push(SourceRegistration {
403                name: name.clone(),
404                connector: source,
405                config,
406                supports_replay,
407                restore_checkpoint: None, // Set after recovery below
408            });
409        }
410
411        // Bridge connector-less sources into the pipeline so db.insert()
412        // data flows through the standard source task → coordinator path.
413        // This covers two cases:
414        //   1. Sources in source_regs with connector_type == None (registered
415        //      in connector manager but without a FROM clause).
416        //   2. Sources in the catalog but NOT in source_regs at all (pure
417        //      embedded sources created without any connector specification).
418        let bridged_names: rustc_hash::FxHashSet<String> =
419            sources.iter().map(|s| s.name.clone()).collect();
420        // First: bridge sources in source_regs that have no connector.
421        for (name, reg) in &source_regs {
422            if reg.connector_type.is_some() {
423                continue; // Already created as external connector above
424            }
425            if let Some(entry) = self.catalog.get_source(name) {
426                let subscription = entry.sink.subscribe();
427                let connector = crate::catalog_connector::CatalogSourceConnector::new(
428                    subscription,
429                    entry.schema.clone(),
430                    entry.data_notify(),
431                );
432                sources.push(SourceRegistration {
433                    name: name.clone(),
434                    connector: Box::new(connector),
435                    config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
436                    supports_replay: false,
437                    restore_checkpoint: None,
438                });
439            }
440        }
441        // Second: bridge catalog sources not in source_regs (embedded-only
442        // sources that were never registered with the connector manager).
443        for name in self.catalog.list_sources() {
444            if bridged_names.contains(&name) || source_regs.contains_key(&name) {
445                continue;
446            }
447            if let Some(entry) = self.catalog.get_source(&name) {
448                executor.register_source_schema(name.clone(), entry.schema.clone());
449                let subscription = entry.sink.subscribe();
450                let connector = crate::catalog_connector::CatalogSourceConnector::new(
451                    subscription,
452                    entry.schema.clone(),
453                    entry.data_notify(),
454                );
455                sources.push(SourceRegistration {
456                    name: name.clone(),
457                    connector: Box::new(connector),
458                    config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
459                    supports_replay: false,
460                    restore_checkpoint: None,
461                });
462            }
463        }
464
465        // Build sinks via registry (generic — no connector-specific code).
466        // Each sink runs in its own tokio task with a bounded command channel,
467        // eliminating Arc<Mutex> contention between pipeline writes and
468        // checkpoint operations.
469        #[allow(clippy::type_complexity)]
470        let mut sinks: Vec<(
471            String,
472            crate::sink_task::SinkTaskHandle,
473            Option<String>,
474            String, // input stream name (FROM clause target)
475        )> = Vec::new();
476        for (name, reg) in &sink_regs {
477            if reg.connector_type.is_none() {
478                continue;
479            }
480            let config = build_sink_config(reg)?;
481            let mut sink = self.connector_registry.create_sink(&config).map_err(|e| {
482                DbError::Connector(format!(
483                    "Cannot create sink '{}' (type '{}'): {e}",
484                    name,
485                    config.connector_type()
486                ))
487            })?;
488            // Open the connector before handing it to the task.
489            sink.open(&config)
490                .await
491                .map_err(|e| DbError::Connector(format!("Failed to open sink '{name}': {e}")))?;
492            let exactly_once = sink.capabilities().exactly_once;
493            let handle = crate::sink_task::SinkTaskHandle::spawn(name.clone(), sink, exactly_once);
494            sinks.push((
495                name.clone(),
496                handle,
497                reg.filter_expr.clone(),
498                reg.input.clone(),
499            ));
500        }
501
502        // Build table sources from registrations
503        let mut table_sources: Vec<(String, Box<dyn ReferenceTableSource>, RefreshMode)> =
504            Vec::new();
505        for (name, reg) in &table_regs {
506            if reg.connector_type.is_none() {
507                continue;
508            }
509            let config = build_table_config(reg)?;
510            let source = self
511                .connector_registry
512                .create_table_source(&config)
513                .map_err(|e| {
514                    DbError::Connector(format!("Cannot create table source '{name}': {e}"))
515                })?;
516            let mode = reg.refresh.clone().unwrap_or(RefreshMode::SnapshotPlusCdc);
517            table_sources.push((name.clone(), source, mode));
518        }
519
520        // Register sinks with the checkpoint coordinator.
521        // Sources are owned by the TPC runtime — checkpoint reads go
522        // through lock-free watch channels instead.
523        {
524            let mut guard = self.coordinator.lock().await;
525            if let Some(ref mut coord) = *guard {
526                for (name, handle, _, _) in &sinks {
527                    let exactly_once = handle.exactly_once();
528                    coord.register_sink(name.clone(), handle.clone(), exactly_once);
529                }
530            }
531        }
532
533        // Recovery: restore sink/table state via unified coordinator.
534        // Must run BEFORE begin_initial_epoch so the coordinator's epoch
535        // reflects the recovered state.
536        {
537            let mut guard = self.coordinator.lock().await;
538            if let Some(ref mut coord) = *guard {
539                match coord.recover().await {
540                    Ok(Some(recovered)) => {
541                        for (name, source, _) in &mut table_sources {
542                            if let Some(cp) = recovered.manifest.table_offsets.get(name) {
543                                let restored =
544                                    crate::checkpoint_coordinator::connector_to_source_checkpoint(
545                                        cp,
546                                    );
547                                if let Err(e) = source.restore(&restored).await {
548                                    tracing::warn!(
549                                        table=%name, error=%e,
550                                        "Table source restore failed"
551                                    );
552                                }
553                            }
554                        }
555                        // Attach recovered source offsets to SourceRegistrations.
556                        // These will be passed to the TPC source adapters, which
557                        // call connector.restore() after open() to seek Kafka
558                        // consumers to their checkpoint positions.
559                        for src in &mut sources {
560                            if !src.supports_replay {
561                                continue;
562                            }
563                            if let Some(cp) = recovered.manifest.source_offsets.get(&src.name) {
564                                let restored =
565                                    crate::checkpoint_coordinator::connector_to_source_checkpoint(
566                                        cp,
567                                    );
568                                tracing::info!(
569                                    source = %src.name,
570                                    offsets = cp.offsets.len(),
571                                    "attaching checkpoint offsets for source recovery"
572                                );
573                                src.restore_checkpoint = Some(restored);
574                            }
575                        }
576                        // Restore stream executor aggregate state
577                        if let Some(op) = recovered.manifest.operator_states.get("stream_executor")
578                        {
579                            if let Some(bytes) = op.decode_inline() {
580                                match executor.restore_state(&bytes) {
581                                    Ok(n) => {
582                                        tracing::info!(
583                                            queries = n,
584                                            "Restored stream executor state from checkpoint"
585                                        );
586                                    }
587                                    Err(e) => {
588                                        tracing::warn!(
589                                            error = %e,
590                                            "Stream executor state restore failed, starting fresh"
591                                        );
592                                    }
593                                }
594                            }
595                        }
596                        tracing::info!(
597                            epoch = recovered.epoch(),
598                            sources_restored = recovered.sources_restored,
599                            sinks_rolled_back = recovered.sinks_rolled_back,
600                            "Recovered from unified checkpoint"
601                        );
602                    }
603                    Ok(None) => {
604                        tracing::info!("No checkpoint found, starting fresh");
605                    }
606                    Err(e) => {
607                        tracing::warn!(error = %e, "Checkpoint recovery failed, starting fresh");
608                    }
609                }
610            }
611        }
612
613        // Begin the initial epoch on exactly-once sinks AFTER recovery
614        // so the coordinator's epoch reflects the recovered checkpoint.
615        {
616            let guard = self.coordinator.lock().await;
617            if let Some(ref coord) = *guard {
618                coord.begin_initial_epoch().await?;
619            }
620        }
621
622        // Snapshot phase: populate tables before stream processing begins
623        for (name, source, mode) in &mut table_sources {
624            if matches!(mode, RefreshMode::Manual) {
625                continue;
626            }
627            while let Some(batch) = source
628                .poll_snapshot()
629                .await
630                .map_err(|e| DbError::Connector(format!("Table '{name}' snapshot error: {e}")))?
631            {
632                self.table_store
633                    .write()
634                    .upsert(name, &batch)
635                    .map_err(|e| DbError::Connector(format!("Table '{name}' upsert error: {e}")))?;
636            }
637            self.sync_table_to_datafusion(name)?;
638            {
639                let mut ts = self.table_store.write();
640                ts.rebuild_xor_filter(name);
641                ts.set_ready(name, true);
642            }
643            // Update lookup registry so join queries see fresh data.
644            // Skip if already registered as Versioned (temporal join tables
645            // must keep their version history, not be overwritten as Snapshot).
646            if matches!(
647                self.lookup_registry.get_entry(name),
648                Some(laminar_sql::datafusion::RegisteredLookup::Versioned(_))
649            ) {
650                // Already versioned — don't downgrade to Snapshot.
651            } else if let Some(batch) = self.table_store.read().to_record_batch(name) {
652                self.lookup_registry.register(
653                    name,
654                    laminar_sql::datafusion::LookupSnapshot {
655                        batch,
656                        key_columns: vec![], // already indexed by primary key
657                    },
658                );
659            }
660        }
661
662        // Get stream source handles so results also flow to db.subscribe().
663        let mut stream_sources: Vec<(String, streaming::Source<crate::catalog::ArrowRecord>)> =
664            Vec::new();
665        for reg in stream_regs.values() {
666            if let Some(src) = self.catalog.get_stream_source(&reg.name) {
667                stream_sources.push((reg.name.clone(), src));
668            }
669        }
670
671        // Build per-source watermark tracking state (connector pipeline)
672        let source_names = self.catalog.list_sources();
673        let mut watermark_states: FxHashMap<String, SourceWatermarkState> =
674            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
675        let mut source_entries_for_wm: FxHashMap<String, Arc<crate::catalog::SourceEntry>> =
676            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
677        let mut source_ids: FxHashMap<String, usize> =
678            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
679        for name in source_names {
680            if let Some(entry) = self.catalog.get_source(&name) {
681                if let (Some(col), Some(dur)) =
682                    (&entry.watermark_column, entry.max_out_of_orderness)
683                {
684                    let format = infer_timestamp_format(&entry.schema, col);
685                    let extractor =
686                        laminar_core::time::EventTimeExtractor::from_column(col, format)
687                            .with_mode(laminar_core::time::ExtractionMode::Max);
688                    let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
689                        .is_processing_time
690                        .load(std::sync::atomic::Ordering::Relaxed)
691                    {
692                        Box::new(laminar_core::time::ProcessingTimeGenerator::new())
693                    } else {
694                        Box::new(
695                            laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(dur),
696                        )
697                    };
698                    let id = source_ids.len();
699                    source_ids.insert(name.clone(), id);
700                    watermark_states.insert(
701                        name.clone(),
702                        SourceWatermarkState {
703                            extractor,
704                            generator,
705                            column: col.clone(),
706                            format,
707                        },
708                    );
709                }
710                source_entries_for_wm.insert(name, entry);
711            }
712        }
713
714        // Also create watermark state for sources that declared event_time_column
715        // programmatically (via source.set_event_time_column()) but have no SQL WATERMARK
716        for name in self.catalog.list_sources() {
717            if watermark_states.contains_key(&name) {
718                continue;
719            }
720            if let Some(entry) = self.catalog.get_source(&name) {
721                if let Some(col) = entry.source.event_time_column() {
722                    let format = infer_timestamp_format(&entry.schema, &col);
723                    let extractor =
724                        laminar_core::time::EventTimeExtractor::from_column(&col, format)
725                            .with_mode(laminar_core::time::ExtractionMode::Max);
726                    let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
727                        .is_processing_time
728                        .load(std::sync::atomic::Ordering::Relaxed)
729                    {
730                        Box::new(laminar_core::time::ProcessingTimeGenerator::new())
731                    } else {
732                        Box::new(
733                            laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(
734                                std::time::Duration::ZERO,
735                            ),
736                        )
737                    };
738                    let id = source_ids.len();
739                    source_ids.insert(name.clone(), id);
740                    watermark_states.insert(
741                        name.clone(),
742                        SourceWatermarkState {
743                            extractor,
744                            generator,
745                            column: col,
746                            format,
747                        },
748                    );
749                }
750            }
751        }
752
753        let tracker = if source_ids.is_empty() {
754            None
755        } else {
756            Some(laminar_core::time::WatermarkTracker::new(source_ids.len()))
757        };
758
759        let max_poll = self.config.default_buffer_size.min(1024);
760        let checkpoint_interval = self
761            .config
762            .checkpoint
763            .as_ref()
764            .and_then(|c| c.interval_ms)
765            .map(std::time::Duration::from_millis);
766
767        tracing::info!(
768            sources = sources.len(),
769            sinks = sinks.len(),
770            streams = stream_regs.len(),
771            subscriptions = stream_sources.len(),
772            watermark_sources = source_ids.len(),
773            "Starting event-driven connector pipeline"
774        );
775
776        // Build pipeline config.
777        // Embedded mode (no external connectors): zero batch window for
778        // minimal latency — data is processed as soon as it arrives.
779        // Connector mode: 5ms batch window amortizes SQL overhead across
780        // high-throughput external sources (Kafka, CDC).
781        let pipeline_config = PipelineConfig {
782            max_poll_records: max_poll,
783            channel_capacity: 64,
784            fallback_poll_interval: if has_external {
785                std::time::Duration::from_millis(10)
786            } else {
787                std::time::Duration::from_millis(1)
788            },
789            checkpoint_interval,
790            batch_window: if has_external {
791                std::time::Duration::from_millis(5)
792            } else {
793                std::time::Duration::ZERO
794            },
795            barrier_alignment_timeout: std::time::Duration::from_secs(30),
796            delivery_guarantee: laminar_connectors::connector::DeliveryGuarantee::default(),
797        };
798
799        // Validate delivery guarantee constraints.
800        {
801            use laminar_connectors::connector::DeliveryGuarantee;
802
803            if pipeline_config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
804                for src in &sources {
805                    if !src.supports_replay {
806                        return Err(DbError::Config(format!(
807                            "[LDB-5030] exactly-once requires all sources to support replay, \
808                             but source '{}' does not. Use at-least-once or remove this source.",
809                            src.name
810                        )));
811                    }
812                }
813                for (name, handle, _, _) in &sinks {
814                    if !handle.exactly_once() {
815                        return Err(DbError::Config(format!(
816                            "[LDB-5031] exactly-once requires all sinks to support \
817                             exactly-once semantics, but sink '{name}' does not. \
818                             Use at-least-once or configure a transactional sink."
819                        )));
820                    }
821                }
822                if pipeline_config.checkpoint_interval.is_none() {
823                    return Err(DbError::Config(
824                        "[LDB-5032] exactly-once requires checkpointing to be enabled. \
825                         Set checkpoint.interval.ms in the pipeline configuration."
826                            .into(),
827                    ));
828                }
829            } else if pipeline_config.delivery_guarantee == DeliveryGuarantee::AtLeastOnce {
830                let has_non_replayable = sources.iter().any(|s| !s.supports_replay);
831                let has_eo_sink = sinks.iter().any(|(_, h, _, _)| h.exactly_once());
832                if has_non_replayable && has_eo_sink {
833                    tracing::warn!(
834                        "[LDB-5033] pipeline has exactly-once sinks but some sources \
835                         do not support replay — effective guarantee is at-most-once \
836                         for events from non-replayable sources"
837                    );
838                }
839            }
840        }
841
842        let shutdown = self.shutdown_signal.clone();
843
844        // Build the PipelineCallback implementation that bridges to db.rs internals.
845        let counters = Arc::clone(&self.counters);
846        let pipeline_watermark = Arc::clone(&self.pipeline_watermark);
847        let checkpoint_in_progress = Arc::new(std::sync::atomic::AtomicBool::new(false));
848        let coordinator = Arc::clone(&self.coordinator);
849        let table_store_for_loop = self.table_store.clone();
850        // Compute a pipeline hash for change detection across checkpoints.
851        let pipeline_hash = {
852            use std::hash::{Hash, Hasher};
853            let mut hasher = std::collections::hash_map::DefaultHasher::new();
854            for reg in stream_regs.values() {
855                reg.name.hash(&mut hasher);
856                reg.query_sql.hash(&mut hasher);
857            }
858            for name in source_regs.keys() {
859                name.hash(&mut hasher);
860            }
861            for name in sink_regs.keys() {
862                name.hash(&mut hasher);
863            }
864            Some(hasher.finish())
865        };
866
867        let callback = crate::pipeline_callback::ConnectorPipelineCallback {
868            executor,
869            stream_sources,
870            sinks,
871            watermark_states,
872            source_entries_for_wm,
873            source_ids,
874            tracker,
875            counters,
876            pipeline_watermark,
877            checkpoint_in_progress,
878            coordinator,
879            table_sources,
880            table_store: table_store_for_loop,
881            lookup_registry: Arc::clone(&self.lookup_registry),
882            filter_ctx: laminar_sql::create_session_context(),
883            last_checkpoint: std::time::Instant::now(),
884            checkpoint_interval: self
885                .config
886                .checkpoint
887                .as_ref()
888                .and_then(|c| c.interval_ms)
889                .map(std::time::Duration::from_millis),
890            pipeline_hash,
891            delivery_guarantee: pipeline_config.delivery_guarantee,
892        };
893
894        // Build TPC config (use explicit settings or auto-detect defaults).
895        {
896            use laminar_core::tpc::TpcConfig;
897
898            let tpc_cfg = self.config.tpc.clone().unwrap_or_default();
899            let num_sources = sources.len().max(1);
900            let num_cores = tpc_cfg.num_cores.unwrap_or_else(|| {
901                if has_external {
902                    std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
903                } else {
904                    // Pure embedded mode: one core per source.
905                    num_sources
906                }
907            });
908            // Ensure at least one core per source — SPSC queues require
909            // exactly one producer thread. When num_cores < num_sources,
910            // round-robin routing puts multiple producers on the same inbox.
911            let num_cores = num_cores.max(num_sources);
912            if let Some(configured) = tpc_cfg.num_cores {
913                if configured < num_sources {
914                    tracing::warn!(
915                        configured_cores = configured,
916                        required_cores = num_sources,
917                        "Overriding num_cores to match source count \
918                         (SPSC single-producer invariant)"
919                    );
920                }
921            }
922            let tpc_config = TpcConfig {
923                num_cores,
924                cpu_pinning: tpc_cfg.cpu_pinning,
925                cpu_start: tpc_cfg.cpu_start,
926                numa_aware: tpc_cfg.numa_aware,
927                ..Default::default()
928            };
929
930            let tpc_coordinator = TpcPipelineCoordinator::new(
931                sources,
932                pipeline_config,
933                &tpc_config,
934                Arc::clone(&shutdown),
935            )?;
936
937            let handle = tokio::spawn(async move {
938                tpc_coordinator.run(Box::new(callback)).await;
939            });
940
941            *self.runtime_handle.lock() = Some(handle);
942        }
943        Ok(())
944    }
945
946    /// Shut down the streaming pipeline gracefully.
947    ///
948    /// Signals the processing loop to stop, waits for it to complete
949    /// (with a timeout), then transitions to `Stopped`.
950    /// This is idempotent -- calling it multiple times is safe.
951    ///
952    /// # Errors
953    ///
954    /// Returns an error if shutdown encounters an error.
955    pub async fn shutdown(&self) -> Result<(), DbError> {
956        let current = self.state.load(std::sync::atomic::Ordering::Acquire);
957        if current == STATE_STOPPED || current == STATE_SHUTTING_DOWN {
958            return Ok(());
959        }
960
961        self.state
962            .store(STATE_SHUTTING_DOWN, std::sync::atomic::Ordering::Release);
963
964        // Signal the runtime loop to stop
965        self.shutdown_signal.notify_one();
966
967        // Await the runtime handle (with timeout)
968        let handle = self.runtime_handle.lock().take();
969        if let Some(handle) = handle {
970            match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
971                Ok(Ok(())) => {
972                    tracing::info!("Pipeline shut down cleanly");
973                }
974                Ok(Err(e)) => {
975                    tracing::warn!(error = %e, "Pipeline task panicked during shutdown");
976                }
977                Err(_) => {
978                    tracing::warn!("Pipeline shutdown timed out after 10s");
979                }
980            }
981        }
982
983        self.state
984            .store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
985        self.close();
986        Ok(())
987    }
988}