Skip to main content

laminar_db/
pipeline_lifecycle.rs

1//! Pipeline lifecycle management: start, close, shutdown.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use laminar_core::streaming;
9use rustc_hash::FxHashMap;
10
11use crate::db::{
12    LaminarDB, SourceWatermarkState, STATE_CREATED, STATE_RUNNING, STATE_SHUTTING_DOWN,
13    STATE_STARTING, STATE_STOPPED,
14};
15use crate::error::DbError;
16
17/// Resolves each `CREATE STREAM`'s output Arrow schema by planning its SQL.
18/// Windowed streams get the `window_start, window_end` prefix. Temporary
19/// `EmptyTable` placeholders let downstream streams plan against upstream
20/// streams; they are removed before returning.
21async fn resolve_stream_output_schemas(
22    ctx: &datafusion::prelude::SessionContext,
23    stream_regs: &HashMap<String, crate::connector_manager::StreamRegistration>,
24) -> Result<HashMap<String, arrow_schema::SchemaRef>, DbError> {
25    use arrow_schema::Schema;
26    use datafusion::datasource::empty::EmptyTable;
27
28    let mut out: HashMap<String, arrow_schema::SchemaRef> =
29        HashMap::with_capacity(stream_regs.len());
30    let mut pending: Vec<&crate::connector_manager::StreamRegistration> =
31        stream_regs.values().collect();
32    // Placeholders we own and must clean up; pre-existing tables are left alone.
33    let mut placeholders: Vec<String> = Vec::new();
34
35    let result: Result<(), DbError> = async {
36        while !pending.is_empty() {
37            let mut next: Vec<&crate::connector_manager::StreamRegistration> = Vec::new();
38            let mut progressed = false;
39            for reg in pending {
40                let Ok(plan) = ctx.state().create_logical_plan(&reg.query_sql).await else {
41                    next.push(reg);
42                    continue;
43                };
44
45                let mut fields = if reg.window_config.is_some() {
46                    laminar_sql::translator::WindowOperatorConfig::output_prefix_fields()
47                } else {
48                    Vec::new()
49                };
50                for f in plan.schema().fields() {
51                    fields.push((**f).clone());
52                }
53                let schema = Arc::new(Schema::new(fields));
54
55                if !ctx.table_exist(&reg.name).unwrap_or(false) {
56                    ctx.register_table(&reg.name, Arc::new(EmptyTable::new(schema.clone())))
57                        .map_err(|e| {
58                            DbError::Pipeline(format!(
59                                "could not register placeholder for stream '{}': {e}",
60                                reg.name
61                            ))
62                        })?;
63                    placeholders.push(reg.name.clone());
64                }
65                out.insert(reg.name.clone(), schema);
66                progressed = true;
67            }
68
69            if !progressed {
70                let mut unresolved: Vec<&str> = next.iter().map(|r| r.name.as_str()).collect();
71                unresolved.sort_unstable();
72                let err = ctx
73                    .state()
74                    .create_logical_plan(&next[0].query_sql)
75                    .await
76                    .err()
77                    .map_or_else(|| "unknown error".to_string(), |e| e.to_string());
78                return Err(DbError::Pipeline(format!(
79                    "unresolvable stream dependency among [{}]: {err}",
80                    unresolved.join(", ")
81                )));
82            }
83            pending = next;
84        }
85        Ok(())
86    }
87    .await;
88
89    for name in &placeholders {
90        let _ = ctx.deregister_table(name);
91    }
92
93    result.map(|()| out)
94}
95
96pub(crate) fn url_to_checkpoint_prefix(url: &str) -> String {
97    // Strip scheme
98    let after_scheme = url.find("://").map_or(url, |i| &url[i + 3..]);
99
100    // For file:// URLs, the prefix is empty (LocalFileSystem already has the root)
101    if url.starts_with("file://") {
102        return String::new();
103    }
104
105    // For cloud URLs like s3://bucket/prefix → extract everything after bucket
106    if let Some(slash_pos) = after_scheme.find('/') {
107        let prefix = &after_scheme[slash_pos + 1..];
108        if prefix.is_empty() {
109            String::new()
110        } else if prefix.ends_with('/') {
111            prefix.to_string()
112        } else {
113            format!("{prefix}/")
114        }
115    } else {
116        String::new()
117    }
118}
119
120impl LaminarDB {
121    /// Shut down the database gracefully.
122    pub fn close(&self) {
123        self.shutdown
124            .store(true, std::sync::atomic::Ordering::Relaxed);
125    }
126
127    /// Returns `true` if the database has been shut down.
128    pub fn is_closed(&self) -> bool {
129        self.shutdown.load(std::sync::atomic::Ordering::Relaxed)
130    }
131
132    pub(crate) fn is_pipeline_running(&self) -> bool {
133        let s = self.state.load(std::sync::atomic::Ordering::Acquire);
134        s == STATE_RUNNING || s == STATE_STARTING || s == STATE_SHUTTING_DOWN
135    }
136
137    /// Start the streaming pipeline.
138    ///
139    /// Activates all registered connectors and begins processing.
140    /// This is a no-op if the pipeline is already running.
141    ///
142    /// When the `kafka` feature is enabled and Kafka sources/sinks are
143    /// registered, this builds `KafkaSource`/`KafkaSink` instances and
144    /// spawns a background task that polls sources, executes stream queries
145    /// via `DataFusion`, and writes results to sinks.
146    ///
147    /// In embedded (in-memory) mode, this simply transitions to `Running`.
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if the pipeline cannot be started. On failure, the
152    /// instance is unwound back to `STATE_CREATED` so the caller can retry
153    /// after fixing the offending config.
154    pub async fn start(&self) -> Result<(), DbError> {
155        let current = self.state.load(std::sync::atomic::Ordering::Acquire);
156        if current == STATE_RUNNING || current == STATE_STARTING {
157            return Ok(());
158        }
159        if current == STATE_STOPPED {
160            return Err(DbError::InvalidOperation(
161                "Cannot start a stopped pipeline. Create a new LaminarDB instance.".into(),
162            ));
163        }
164
165        self.state
166            .store(STATE_STARTING, std::sync::atomic::Ordering::Release);
167
168        // Fallback for embedded use without a server.
169        {
170            let mut guard = self.engine_metrics.lock();
171            if guard.is_none() {
172                *guard = Some(Arc::new(crate::engine_metrics::EngineMetrics::new(
173                    &prometheus::Registry::new(),
174                )));
175            }
176        }
177
178        match self.start_inner().await {
179            Ok(()) => {
180                self.state
181                    .store(STATE_RUNNING, std::sync::atomic::Ordering::Release);
182                Ok(())
183            }
184            Err(e) => {
185                // Any failure between STATE_STARTING and STATE_RUNNING must
186                // unwind — otherwise the instance is wedged and a retry from
187                // the caller silently succeeds without actually starting.
188                self.state
189                    .store(STATE_CREATED, std::sync::atomic::Ordering::Release);
190                Err(e)
191            }
192        }
193    }
194
195    /// Runs the actual startup sequence. Called exclusively by
196    /// [`LaminarDB::start`], which handles the `STATE_STARTING` ↔
197    /// `STATE_RUNNING` / `STATE_CREATED` transitions around it.
198    #[allow(clippy::too_many_lines)]
199    async fn start_inner(&self) -> Result<(), DbError> {
200        // Snapshot connector registrations under the lock
201        let (source_regs, sink_regs, stream_regs, table_regs, has_external) = {
202            let mgr = self.connector_manager.lock();
203            (
204                mgr.sources().clone(),
205                mgr.sinks().clone(),
206                mgr.streams().clone(),
207                mgr.tables().clone(),
208                mgr.has_external_connectors(),
209            )
210        };
211
212        // Log which sources have external connectors for debugging.
213        for (name, reg) in &source_regs {
214            tracing::debug!(source = %name, connector_type = ?reg.connector_type, "Registered source");
215        }
216        for (name, reg) in &sink_regs {
217            tracing::debug!(sink = %name, connector_type = ?reg.connector_type, "Registered sink");
218        }
219
220        // Initialize checkpoint coordinator (shared across all pipeline modes)
221        if let Some(ref cp_config) = self.config.checkpoint {
222            use crate::checkpoint_coordinator::{
223                CheckpointConfig as CkpConfig, CheckpointCoordinator,
224            };
225
226            let max_retained = cp_config.max_retained.unwrap_or(3);
227            let vnode_count = self.vnode_registry.lock().as_ref().map_or(
228                laminar_storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
229                |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
230            );
231
232            let store: Box<dyn laminar_storage::CheckpointStore> =
233                if let Some(ref url) = self.config.object_store_url {
234                    let obj_store = laminar_storage::object_store_builder::build_object_store(
235                        url,
236                        &self.config.object_store_options,
237                    )
238                    .map_err(|e| DbError::Config(format!("object store: {e}")))?;
239                    let prefix = url_to_checkpoint_prefix(url);
240                    Box::new(
241                        laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
242                            obj_store,
243                            prefix,
244                            max_retained,
245                        )
246                        .with_vnode_count(vnode_count),
247                    )
248                } else {
249                    let data_dir = cp_config
250                        .data_dir
251                        .clone()
252                        .or_else(|| self.config.storage_dir.clone())
253                        .unwrap_or_else(|| std::path::PathBuf::from("./data"));
254                    Box::new(
255                        laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
256                            &data_dir,
257                            max_retained,
258                        )
259                        .with_vnode_count(vnode_count),
260                    )
261                };
262
263            let config = CkpConfig {
264                interval: cp_config.interval_ms.map(std::time::Duration::from_millis),
265                max_retained,
266                ..CkpConfig::default()
267            };
268            let mut coord = CheckpointCoordinator::new(config, store).await?;
269            if let Some(ref prom) = *self.engine_metrics.lock() {
270                coord.set_metrics(Arc::clone(prom));
271            }
272
273            // Cluster mode: install the controller so the coordinator
274            // can consult it for leader / follower role once the
275            // barrier-protocol flow change lands.
276            #[cfg(feature = "cluster-unstable")]
277            if let Some(controller) = self.cluster_controller.lock().clone() {
278                coord.set_cluster_controller(controller);
279            }
280
281            // Durability gate wiring: if both the state backend and vnode
282            // registry are installed, tell the coordinator which vnodes
283            // this instance owns. In cluster mode the owner id comes from
284            // the cluster controller; in single-instance mode we assume
285            // a `single_owner` registry and pass `NodeId(0)`.
286            if let (Some(backend), Some(registry)) = (
287                self.state_backend.lock().clone(),
288                self.vnode_registry.lock().clone(),
289            ) {
290                let owner = {
291                    #[cfg(feature = "cluster-unstable")]
292                    {
293                        self.cluster_controller
294                            .lock()
295                            .as_ref()
296                            .map_or(laminar_core::state::NodeId(0), |c| {
297                                laminar_core::state::NodeId(c.instance_id().0)
298                            })
299                    }
300                    #[cfg(not(feature = "cluster-unstable"))]
301                    {
302                        laminar_core::state::NodeId(0)
303                    }
304                };
305                // Both sides of the split-brain guard must see the same
306                // generation. The coordinator stamps outgoing writes with
307                // it (caller side), and the backend rejects incoming
308                // writes below it (authority side).
309                // Without the backend call the authoritative version
310                // stays at 0 and the fence is a silent no-op — the
311                // registry's version carries the snapshot generation
312                // across a restart, but only in-memory.
313                let version = registry.assignment_version();
314                backend.set_authoritative_version(version);
315                coord.set_state_backend(backend);
316                coord.set_assignment_version(version);
317                coord.set_vnode_set(laminar_core::state::owned_vnodes(&registry, owner));
318                // Leader's gate checks the full registry — across all
319                // instances — so the 2PC commit only fires when every
320                // follower has also persisted its markers.
321                coord.set_gate_vnode_set((0..registry.vnode_count()).collect());
322            }
323
324            // Plug in the cluster 2PC decision store before the
325            // recovery sweep so `reconcile_prepared_on_init` can
326            // consult it. Writes land here before the leader's Commit
327            // announcement goes out, so a new leader mid-2PC reads
328            // the durable vote instead of guessing.
329            #[cfg(feature = "cluster-unstable")]
330            if let Some(ds) = self.decision_store.lock().clone() {
331                coord.set_decision_store(ds);
332            }
333
334            // Cluster recovery: if this instance's last persisted
335            // manifest has any sink in Pending state, consult the
336            // durable 2PC decision store and drive local sinks to the
337            // recorded verdict. Committed → local commit + leader
338            // re-announces Commit for stragglers; Aborted or absent →
339            // rollback + leader announces Abort.
340            #[cfg(feature = "cluster-unstable")]
341            coord.reconcile_prepared_on_init().await;
342
343            *self.coordinator.lock().await = Some(coord);
344        }
345
346        if has_external || !stream_regs.is_empty() {
347            tracing::info!(
348                sources = source_regs.len(),
349                sinks = sink_regs.len(),
350                streams = stream_regs.len(),
351                tables = table_regs.len(),
352                has_external,
353                "Starting pipeline"
354            );
355            self.start_connector_pipeline(
356                source_regs,
357                sink_regs,
358                stream_regs,
359                table_regs,
360                has_external,
361            )
362            .await?;
363        } else {
364            tracing::info!(
365                sources = source_regs.len(),
366                sinks = sink_regs.len(),
367                "Starting in embedded (in-memory) mode — no streams"
368            );
369        }
370
371        Ok(())
372    }
373
374    /// Build and start the unified pipeline with sources, sinks, and streams.
375    ///
376    /// Handles both embedded (in-memory only) and connector-backed sources
377    /// through a single code path. Connector-less sources are wrapped as
378    /// `CatalogSourceConnector` to participate in the pipeline alongside
379    /// external connectors (Kafka, CDC, etc.).
380    ///
381    /// Each source runs as a tokio task pushing batches via mpsc channel
382    /// to a `StreamingCoordinator` that drives SQL cycles, sink writes,
383    /// and checkpoint coordination.
384    #[allow(clippy::too_many_lines)]
385    async fn start_connector_pipeline(
386        &self,
387        source_regs: HashMap<String, crate::connector_manager::SourceRegistration>,
388        sink_regs: HashMap<String, crate::connector_manager::SinkRegistration>,
389        stream_regs: HashMap<String, crate::connector_manager::StreamRegistration>,
390        table_regs: HashMap<String, crate::connector_manager::TableRegistration>,
391        has_external: bool,
392    ) -> Result<(), DbError> {
393        use crate::connector_manager::{
394            build_sink_config, build_source_config, build_table_config,
395        };
396        use crate::operator_graph::OperatorGraph;
397        use crate::pipeline::{PipelineConfig, SourceRegistration};
398        use laminar_connectors::reference::{ReferenceTableSource, RefreshMode};
399
400        // Build OperatorGraph context mirroring the rules + partition
401        // count installed on `LaminarDB::ctx` via the builder.
402        let ctx = {
403            use datafusion::execution::SessionStateBuilder;
404            let mut session_config = laminar_sql::datafusion::base_session_config();
405            if let Some(n) = self.pipeline_target_partitions {
406                session_config = session_config.with_target_partitions(n);
407            }
408            let mut state_builder = SessionStateBuilder::new()
409                .with_config(session_config)
410                .with_default_features();
411            for rule in self.physical_optimizer_rules.iter() {
412                state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
413            }
414            datafusion::prelude::SessionContext::new_with_state(state_builder.build())
415        };
416        laminar_sql::register_streaming_functions(&ctx);
417
418        // Register lookup/reference tables in the operator graph's
419        // SessionContext so JOIN queries can resolve them.
420        let lookup_tables: Vec<(String, arrow::datatypes::SchemaRef)> = {
421            let ts = self.table_store.read();
422            ts.table_names()
423                .into_iter()
424                .filter_map(|name| {
425                    let schema = ts.table_schema(&name)?;
426                    Some((name, schema))
427                })
428                .collect()
429        };
430        for (name, schema) in lookup_tables {
431            let provider = crate::table_provider::ReferenceTableProvider::new(
432                name.clone(),
433                schema,
434                self.table_store.clone(),
435            );
436            if let Err(e) = ctx.register_table(&name, Arc::new(provider)) {
437                tracing::warn!(
438                    table = %name,
439                    error = %e,
440                    "failed to register lookup table in operator graph context"
441                );
442            }
443        }
444
445        let mut graph = OperatorGraph::new(ctx);
446        graph.set_max_state_bytes(self.config.max_state_bytes_per_operator);
447        graph.set_lookup_registry(Arc::clone(&self.lookup_registry));
448        if let Some(ref prom) = *self.engine_metrics.lock() {
449            graph.set_metrics(Arc::clone(prom));
450        }
451
452        // Install the cluster row-shuffle config if all the pieces are
453        // present (registry, sender, receiver, controller). Without any
454        // one of them, aggregate queries run single-node.
455        #[cfg(feature = "cluster-unstable")]
456        {
457            let sender = self.shuffle_sender.lock().clone();
458            let receiver = self.shuffle_receiver.lock().clone();
459            let registry = self.vnode_registry.lock().clone();
460            let controller = self.cluster_controller.lock().clone();
461            if let (Some(sender), Some(receiver), Some(registry), Some(controller)) =
462                (sender, receiver, registry, controller)
463            {
464                let self_id = laminar_core::state::NodeId(controller.instance_id().0);
465                graph.set_cluster_shuffle(crate::operator::sql_query::ClusterShuffleConfig {
466                    registry,
467                    sender,
468                    receiver,
469                    self_id,
470                });
471            }
472        }
473
474        // Register source schemas for ALL sources (both external connectors
475        // and catalog-bridge sources) so the graph can create empty
476        // placeholder tables when no data arrives in a given cycle.
477        for name in source_regs.keys() {
478            if let Some(entry) = self.catalog.get_source(name) {
479                graph.register_source_schema(name.clone(), entry.schema.clone());
480            }
481        }
482
483        for reg in stream_regs.values() {
484            graph.add_query(
485                reg.name.clone(),
486                reg.query_sql.clone(),
487                reg.emit_clause.clone(),
488                reg.window_config.clone(),
489                reg.order_config.clone(),
490                None,
491            );
492        }
493
494        // Register temporal join tables as Versioned in the lookup registry
495        // so that temporal join operators can use persistent versioned state.
496        for tcfg in graph.temporal_join_configs() {
497            if self.lookup_registry.get_entry(&tcfg.table_name).is_none() {
498                // Get initial data. If none exists yet, use an empty batch
499                // with the correct schema from the catalog (not Schema::empty).
500                let initial_batch = self
501                    .table_store
502                    .read()
503                    .to_record_batch(&tcfg.table_name)
504                    .or_else(|| {
505                        self.catalog
506                            .get_source(&tcfg.table_name)
507                            .map(|e| RecordBatch::new_empty(e.schema.clone()))
508                    })
509                    .unwrap_or_else(|| {
510                        RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty()))
511                    });
512                let key_columns = vec![tcfg.table_key_column.clone()];
513                let key_indices: Vec<usize> = key_columns
514                    .iter()
515                    .filter_map(|k| initial_batch.schema().index_of(k).ok())
516                    .collect();
517
518                // When table_version_column is empty (translator couldn't resolve it
519                // from the AS OF clause), pick the first timestamp/int column that
520                // isn't the join key.
521                let resolved_version_col = if tcfg.table_version_column.is_empty() {
522                    let schema = initial_batch.schema();
523                    schema
524                        .fields()
525                        .iter()
526                        .find(|f| {
527                            f.name() != &tcfg.table_key_column
528                                && matches!(
529                                    f.data_type(),
530                                    arrow::datatypes::DataType::Int64
531                                        | arrow::datatypes::DataType::Timestamp(_, _)
532                                )
533                        })
534                        .map(|f| f.name().clone())
535                        .unwrap_or_default()
536                } else {
537                    tcfg.table_version_column.clone()
538                };
539
540                let Ok(version_col_idx) = initial_batch.schema().index_of(&resolved_version_col)
541                else {
542                    if !initial_batch.schema().fields().is_empty() {
543                        tracing::warn!(
544                            table=%tcfg.table_name,
545                            version_col=%resolved_version_col,
546                            "Version column not found in temporal table schema; \
547                             will resolve on first CDC batch"
548                        );
549                    }
550                    // Register with empty index — built on first CDC update.
551                    self.lookup_registry.register_versioned(
552                        &tcfg.table_name,
553                        laminar_sql::datafusion::VersionedLookupState {
554                            batch: initial_batch,
555                            index: Arc::new(
556                                laminar_sql::datafusion::lookup_join_exec::VersionedIndex::default(
557                                ),
558                            ),
559                            key_columns,
560                            version_column: resolved_version_col,
561                            stream_time_column: tcfg.stream_time_column.clone(),
562                            max_versions_per_key: usize::MAX,
563                        },
564                    );
565                    continue;
566                };
567                let index = Arc::new(
568                    laminar_sql::datafusion::lookup_join_exec::VersionedIndex::build(
569                        &initial_batch,
570                        &key_indices,
571                        version_col_idx,
572                        usize::MAX,
573                    )
574                    .unwrap_or_default(),
575                );
576                self.lookup_registry.register_versioned(
577                    &tcfg.table_name,
578                    laminar_sql::datafusion::VersionedLookupState {
579                        batch: initial_batch,
580                        index,
581                        key_columns,
582                        version_column: resolved_version_col,
583                        stream_time_column: tcfg.stream_time_column.clone(),
584                        max_versions_per_key: usize::MAX,
585                    },
586                );
587            }
588        }
589
590        // Grab the shared Prometheus registry (if set) so connectors can
591        // register their metrics on it.
592        let prom_registry = self.prometheus_registry.lock().clone();
593
594        // Build sources as owned SourceRegistrations (no Arc<Mutex>).
595        let mut sources: Vec<SourceRegistration> = Vec::new();
596        for (name, reg) in &source_regs {
597            if reg.connector_type.is_none() {
598                continue;
599            }
600            let mut config = build_source_config(reg)?;
601
602            // Pass the SQL-defined Arrow schema to the connector so it can
603            // deserialize records with the correct column names and types.
604            if let Some(entry) = self.catalog.get_source(name) {
605                let schema_str = crate::pipeline_callback::encode_arrow_schema(&entry.schema);
606                config.set("_arrow_schema".to_string(), schema_str);
607            }
608
609            let source = self
610                .connector_registry
611                .create_source(&config, prom_registry.as_deref())
612                .map_err(|e| {
613                    DbError::Connector(format!(
614                        "Cannot create source '{}' (type '{}'): {e}",
615                        name,
616                        config.connector_type()
617                    ))
618                })?;
619            let supports_replay = source.supports_replay();
620            if !supports_replay {
621                tracing::warn!(
622                    source = %name,
623                    "source does not support replay — exactly-once semantics \
624                     are degraded to at-most-once for this source"
625                );
626            }
627            // Property-driven watermark wiring. `[source.watermark]` in
628            // TOML is the preferred path (honours max_out_of_orderness);
629            // this exists for sources that pass `event.time.column` /
630            // `event.time.field` as a connector property instead. The
631            // second pass below builds the matching SourceWatermarkState.
632            // Kafka uses `column`, WebSocket uses `field` — both spellings
633            // are live.
634            if let Some(entry) = self.catalog.get_source(name) {
635                if entry.source.event_time_column().is_none() {
636                    if let Some(col) = config.get("event.time.column") {
637                        entry.source.set_event_time_column(col);
638                    } else if let Some(col) = config.get("event.time.field") {
639                        entry.source.set_event_time_column(col);
640                    }
641                }
642                // Carry the out-of-orderness bound alongside the column so
643                // the fallback watermark path below uses the configured
644                // tolerance instead of Duration::ZERO.
645                if let Some(ms_str) = config.get("max.out.of.orderness.ms") {
646                    match ms_str.parse::<u64>() {
647                        Ok(ms) => {
648                            entry
649                                .source
650                                .set_max_out_of_orderness(std::time::Duration::from_millis(ms));
651                        }
652                        Err(e) => {
653                            tracing::warn!(
654                                source = %name,
655                                value = %ms_str,
656                                error = %e,
657                                "ignoring unparseable max.out.of.orderness.ms — \
658                                 watermark will use Duration::ZERO"
659                            );
660                        }
661                    }
662                }
663            }
664
665            sources.push(SourceRegistration {
666                name: name.clone(),
667                connector: source,
668                config,
669                supports_replay,
670                restore_checkpoint: None, // Set after recovery below
671            });
672        }
673
674        // Bridge connector-less sources into the pipeline so db.insert()
675        // data flows through the standard source task → coordinator path.
676        // This covers two cases:
677        //   1. Sources in source_regs with connector_type == None (registered
678        //      in connector manager but without a FROM clause).
679        //   2. Sources in the catalog but NOT in source_regs at all (pure
680        //      embedded sources created without any connector specification).
681        let bridged_names: rustc_hash::FxHashSet<String> =
682            sources.iter().map(|s| s.name.clone()).collect();
683        // First: bridge sources in source_regs that have no connector.
684        for (name, reg) in &source_regs {
685            if reg.connector_type.is_some() {
686                continue; // Already created as external connector above
687            }
688            if let Some(entry) = self.catalog.get_source(name) {
689                let subscription = entry.sink.subscribe();
690                let connector = crate::catalog_connector::CatalogSourceConnector::new(
691                    subscription,
692                    entry.schema.clone(),
693                    entry.data_notify(),
694                );
695                sources.push(SourceRegistration {
696                    name: name.clone(),
697                    connector: Box::new(connector),
698                    config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
699                    supports_replay: false,
700                    restore_checkpoint: None,
701                });
702            }
703        }
704        // Second: bridge catalog sources not in source_regs (embedded-only
705        // sources that were never registered with the connector manager).
706        for name in self.catalog.list_sources() {
707            if bridged_names.contains(&name) || source_regs.contains_key(&name) {
708                continue;
709            }
710            if let Some(entry) = self.catalog.get_source(&name) {
711                graph.register_source_schema(name.clone(), entry.schema.clone());
712                let subscription = entry.sink.subscribe();
713                let connector = crate::catalog_connector::CatalogSourceConnector::new(
714                    subscription,
715                    entry.schema.clone(),
716                    entry.data_notify(),
717                );
718                sources.push(SourceRegistration {
719                    name: name.clone(),
720                    connector: Box::new(connector),
721                    config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
722                    supports_replay: false,
723                    restore_checkpoint: None,
724                });
725            }
726        }
727
728        // Resolve each stream's output schema with DataFusion so sinks
729        // downstream see it via `_arrow_schema` (mirrors the source path
730        // above).
731        let stream_output_schemas = resolve_stream_output_schemas(&self.ctx, &stream_regs).await?;
732
733        // Build sinks. Each runs in its own tokio task with a bounded
734        // command channel and shares one event channel back to the
735        // pipeline callback.
736        let (sink_event_tx, sink_event_rx) =
737            laminar_core::streaming::channel::channel::<crate::sink_task::SinkEvent>(
738                crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY,
739            );
740        #[allow(clippy::type_complexity)]
741        let mut sinks: Vec<(
742            String,
743            crate::sink_task::SinkTaskHandle,
744            Option<String>,
745            String, // input stream name (FROM clause target)
746            bool,   // changelog-capable
747        )> = Vec::new();
748        for (name, reg) in &sink_regs {
749            if reg.connector_type.is_none() {
750                continue;
751            }
752            let mut config = build_sink_config(reg)?;
753            // Resolve the upstream schema from a stream first, then fall back
754            // to a source — `CREATE SINK ... FROM <name>` accepts either.
755            let upstream_schema = stream_output_schemas.get(&reg.input).cloned().or_else(|| {
756                self.catalog
757                    .get_source(&reg.input)
758                    .map(|e| e.schema.clone())
759            });
760            if let Some(schema) = upstream_schema {
761                let schema_str = crate::pipeline_callback::encode_arrow_schema(&schema);
762                config.set("_arrow_schema".to_string(), schema_str);
763            }
764            let mut sink = self
765                .connector_registry
766                .create_sink(&config, prom_registry.as_deref())
767                .map_err(|e| {
768                    DbError::Connector(format!(
769                        "Cannot create sink '{}' (type '{}'): {e}",
770                        name,
771                        config.connector_type()
772                    ))
773                })?;
774            // Open the connector before handing it to the task.
775            sink.open(&config)
776                .await
777                .map_err(|e| DbError::Connector(format!("Failed to open sink '{name}': {e}")))?;
778            let caps = sink.capabilities();
779            // Resolve per-sink write timeout: user override (property)
780            // takes precedence over the sink's declared suggestion.
781            let write_timeout =
782                match config
783                    .get_parsed::<u64>("sink.write.timeout.ms")
784                    .map_err(|e| {
785                        DbError::Connector(format!(
786                            "Invalid 'sink.write.timeout.ms' for sink '{name}': {e}"
787                        ))
788                    })? {
789                    Some(ms) => std::time::Duration::from_millis(ms),
790                    None => caps.suggested_write_timeout,
791                };
792            if write_timeout.is_zero() {
793                return Err(DbError::Connector(format!(
794                    "sink '{name}': write_timeout must be > 0 \
795                     (check 'sink.write.timeout.ms' or the sink's \
796                     suggested_write_timeout)"
797                )));
798            }
799            let sink_id: std::sync::Arc<str> = std::sync::Arc::from(name.as_str());
800            let handle =
801                crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
802                    name: name.clone(),
803                    sink_id,
804                    connector: sink,
805                    exactly_once: caps.exactly_once,
806                    channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
807                    flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
808                    write_timeout,
809                    event_tx: sink_event_tx.clone(),
810                });
811            sinks.push((
812                name.clone(),
813                handle,
814                reg.filter_expr.clone(),
815                reg.input.clone(),
816                caps.changelog,
817            ));
818        }
819        // Drop the local sender so the channel disconnects when all
820        // sink tasks exit.
821        drop(sink_event_tx);
822
823        // Build table sources from registrations
824        let mut table_sources: Vec<(String, Box<dyn ReferenceTableSource>, RefreshMode)> =
825            Vec::new();
826        for (name, reg) in &table_regs {
827            if reg.connector_type.is_none() {
828                continue;
829            }
830            let config = build_table_config(reg)?;
831            let source = self
832                .connector_registry
833                .create_table_source(&config)
834                .map_err(|e| {
835                    DbError::Connector(format!("Cannot create table source '{name}': {e}"))
836                })?;
837            let mode = reg.refresh.clone().unwrap_or(RefreshMode::SnapshotPlusCdc);
838            table_sources.push((name.clone(), source, mode));
839        }
840
841        // Register sinks with the checkpoint coordinator.
842        // Sources are owned by the TPC runtime — checkpoint reads go
843        // through lock-free watch channels instead.
844        {
845            let mut guard = self.coordinator.lock().await;
846            if let Some(ref mut coord) = *guard {
847                for (name, handle, _, _, _) in &sinks {
848                    let exactly_once = handle.exactly_once();
849                    coord.register_sink(name.clone(), handle.clone(), exactly_once);
850                }
851            }
852        }
853
854        // Recovery: restore sink/table state via unified coordinator.
855        // Must run BEFORE begin_initial_epoch so the coordinator's epoch
856        // reflects the recovered state.
857        {
858            let mut guard = self.coordinator.lock().await;
859            if let Some(ref mut coord) = *guard {
860                match coord.recover().await {
861                    Ok(Some(recovered)) => {
862                        for (name, source, _) in &mut table_sources {
863                            if let Some(cp) = recovered.manifest.table_offsets.get(name) {
864                                let restored =
865                                    crate::checkpoint_coordinator::connector_to_source_checkpoint(
866                                        cp,
867                                    );
868                                if let Err(e) = source.restore(&restored).await {
869                                    tracing::warn!(
870                                        table=%name, error=%e,
871                                        "Table source restore failed"
872                                    );
873                                }
874                            }
875                        }
876                        // Attach recovered source offsets to SourceRegistrations.
877                        // These will be passed to the TPC source adapters, which
878                        // call connector.restore() after open() to seek Kafka
879                        // consumers to their checkpoint positions.
880                        for src in &mut sources {
881                            if !src.supports_replay {
882                                continue;
883                            }
884                            if let Some(cp) = recovered.manifest.source_offsets.get(&src.name) {
885                                let restored =
886                                    crate::checkpoint_coordinator::connector_to_source_checkpoint(
887                                        cp,
888                                    );
889                                tracing::info!(
890                                    source = %src.name,
891                                    offsets = cp.offsets.len(),
892                                    "attaching checkpoint offsets for source recovery"
893                                );
894                                src.restore_checkpoint = Some(restored);
895                            }
896                        }
897                        let mut graph_restore_failed = false;
898                        let op_keys: Vec<&String> =
899                            recovered.manifest.operator_states.keys().collect();
900                        let instance_hint = {
901                            #[cfg(feature = "cluster-unstable")]
902                            {
903                                self.cluster_controller
904                                    .lock()
905                                    .as_ref()
906                                    .map_or(0, |c| c.instance_id().0)
907                            }
908                            #[cfg(not(feature = "cluster-unstable"))]
909                            {
910                                0u64
911                            }
912                        };
913                        tracing::info!(
914                            instance = instance_hint,
915                            count = op_keys.len(),
916                            keys = ?op_keys,
917                            "manifest operator_states summary"
918                        );
919                        if let Some(op) = recovered.manifest.operator_states.get("operator_graph") {
920                            if let Some(bytes) = op.decode_inline() {
921                                match graph.restore_from_bytes(&bytes) {
922                                    Ok(n) => {
923                                        tracing::info!(
924                                            queries = n,
925                                            "Restored operator graph state from checkpoint"
926                                        );
927                                    }
928                                    Err(e) => {
929                                        graph_restore_failed = true;
930                                        tracing::warn!(
931                                            error = %e,
932                                            "Operator graph state restore failed, starting fresh"
933                                        );
934                                    }
935                                }
936                            } else {
937                                tracing::warn!(
938                                    "manifest has 'operator_graph' but decode_inline returned None"
939                                );
940                            }
941                        } else if recovered
942                            .manifest
943                            .operator_states
944                            .contains_key("stream_executor")
945                        {
946                            graph_restore_failed = true;
947                            tracing::warn!(
948                                "Found old stream_executor checkpoint format; \
949                                 skipping restore (clean break). Starting fresh."
950                            );
951                        }
952
953                        // Skip MV restore when operator state failed to load —
954                        // stale MV data with fresh operators is inconsistent.
955                        // No operator state at all (stateless pipeline) is fine.
956                        if !graph_restore_failed {
957                            let prefix = crate::mv_store::CHECKPOINT_KEY_PREFIX;
958                            let mut store = self.mv_store.write();
959                            let mut restored = 0usize;
960                            for (key, op) in &recovered.manifest.operator_states {
961                                if let Some(name) = key.strip_prefix(prefix) {
962                                    if let Some(bytes) = op.decode_inline() {
963                                        match store.restore_from_ipc(name, &bytes) {
964                                            Ok(true) => restored += 1,
965                                            Ok(false) => {} // MV no longer registered
966                                            Err(e) => {
967                                                tracing::warn!(mv = name, error = %e, "MV restore failed");
968                                            }
969                                        }
970                                    }
971                                }
972                            }
973                            if restored > 0 {
974                                tracing::info!(mvs = restored, "Restored MV state from checkpoint");
975                            }
976                        }
977                        tracing::info!(
978                            epoch = recovered.epoch(),
979                            sources_restored = recovered.sources_restored,
980                            sinks_rolled_back = recovered.sinks_rolled_back,
981                            "Recovered from unified checkpoint"
982                        );
983                    }
984                    Ok(None) => {
985                        tracing::info!("No checkpoint found, starting fresh");
986                    }
987                    Err(e) => {
988                        tracing::warn!(error = %e, "Checkpoint recovery failed, starting fresh");
989                    }
990                }
991            }
992        }
993
994        // Begin the initial epoch on exactly-once sinks AFTER recovery
995        // so the coordinator's epoch reflects the recovered checkpoint.
996        {
997            let guard = self.coordinator.lock().await;
998            if let Some(ref coord) = *guard {
999                coord.begin_initial_epoch().await?;
1000            }
1001        }
1002
1003        // Snapshot phase: populate tables before stream processing begins
1004        for (name, source, mode) in &mut table_sources {
1005            if matches!(mode, RefreshMode::Manual) {
1006                continue;
1007            }
1008            while let Some(batch) = source
1009                .poll_snapshot()
1010                .await
1011                .map_err(|e| DbError::Connector(format!("Table '{name}' snapshot error: {e}")))?
1012            {
1013                self.table_store
1014                    .write()
1015                    .upsert(name, &batch)
1016                    .map_err(|e| DbError::Connector(format!("Table '{name}' upsert error: {e}")))?;
1017            }
1018            self.sync_table_to_datafusion(name)?;
1019            {
1020                let mut ts = self.table_store.write();
1021                ts.rebuild_xor_filter(name);
1022                ts.set_ready(name, true);
1023            }
1024            // Update lookup registry so join queries see fresh data.
1025            // Skip if already registered as Versioned (temporal join tables
1026            // must keep their version history, not be overwritten as Snapshot).
1027            if matches!(
1028                self.lookup_registry.get_entry(name),
1029                Some(laminar_sql::datafusion::RegisteredLookup::Versioned(_))
1030            ) {
1031                // Already versioned — don't downgrade to Snapshot.
1032            } else if let Some(batch) = self.table_store.read().to_record_batch(name) {
1033                self.lookup_registry.register(
1034                    name,
1035                    laminar_sql::datafusion::LookupSnapshot {
1036                        batch,
1037                        key_columns: vec![], // already indexed by primary key
1038                    },
1039                );
1040            }
1041        }
1042
1043        // On-demand (Manual) tables: register as Partial in the lookup
1044        // registry so lookup joins use the foyer cache + source fallback path,
1045        // then promote to SnapshotPlusCdc so poll_tables() calls poll_changes().
1046        for (name, _source, mode) in &mut table_sources {
1047            if !matches!(mode, RefreshMode::Manual) {
1048                continue;
1049            }
1050            let Some(reg) = table_regs.get(name.as_str()) else {
1051                continue;
1052            };
1053            let max_entries = reg.cache_max_entries.unwrap_or(65_536);
1054            let Some(schema) = self.table_store.read().table_schema(name) else {
1055                continue;
1056            };
1057            let pk_csv = &reg.primary_key;
1058            let pk_cols: Vec<String> = pk_csv
1059                .split(',')
1060                .map(|s| s.trim().to_string())
1061                .filter(|s| !s.is_empty())
1062                .collect();
1063            let key_sort_fields: Vec<arrow::row::SortField> = pk_cols
1064                .iter()
1065                .filter_map(|col| {
1066                    schema
1067                        .field_with_name(col)
1068                        .ok()
1069                        .map(|f| arrow::row::SortField::new(f.data_type().clone()))
1070                })
1071                .collect();
1072
1073            let cache = Arc::new(laminar_core::lookup::foyer_cache::FoyerMemoryCache::new(
1074                0,
1075                laminar_core::lookup::foyer_cache::FoyerMemoryCacheConfig {
1076                    capacity: max_entries,
1077                    shards: 16,
1078                },
1079            ));
1080            // Try to create a lookup source for cache-miss fallback via
1081            // the registry factory (no cross-crate type dependency).
1082            let lookup_source = if let Ok(mut config) = build_table_config(reg) {
1083                config.set("_primary_key_columns", pk_csv.as_str());
1084                match self.connector_registry.create_lookup_source(config).await {
1085                    Some(Ok(src)) => Some(src),
1086                    Some(Err(e)) => {
1087                        tracing::warn!(
1088                            table = %name, error = %e,
1089                            "lookup source creation failed; cache-only mode"
1090                        );
1091                        None
1092                    }
1093                    None => None,
1094                }
1095            } else {
1096                None
1097            };
1098
1099            self.lookup_registry.register_partial(
1100                name,
1101                laminar_sql::datafusion::PartialLookupState {
1102                    foyer_cache: cache,
1103                    schema,
1104                    key_columns: pk_cols,
1105                    key_sort_fields,
1106                    source: lookup_source,
1107                    fetch_semaphore: Arc::new(tokio::sync::Semaphore::new(16)),
1108                },
1109            );
1110            *mode = RefreshMode::SnapshotPlusCdc;
1111            tracing::info!(
1112                table = %name,
1113                max_entries,
1114                pk = %pk_csv,
1115                "registered on-demand lookup table (partial cache)"
1116            );
1117        }
1118
1119        // Get stream source handles so results also flow to db.subscribe().
1120        let mut stream_sources: Vec<(String, streaming::Source<crate::catalog::ArrowRecord>)> =
1121            Vec::new();
1122        for reg in stream_regs.values() {
1123            if let Some(src) = self.catalog.get_stream_source(&reg.name) {
1124                stream_sources.push((reg.name.clone(), src));
1125            }
1126        }
1127
1128        // Build per-source watermark tracking state (connector pipeline)
1129        let source_names = self.catalog.list_sources();
1130        let mut watermark_states: FxHashMap<String, SourceWatermarkState> =
1131            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1132        let mut source_entries_for_wm: FxHashMap<String, Arc<crate::catalog::SourceEntry>> =
1133            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1134        let mut source_ids: FxHashMap<String, usize> =
1135            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1136        for name in source_names {
1137            if let Some(entry) = self.catalog.get_source(&name) {
1138                if let (Some(col), Some(dur)) =
1139                    (&entry.watermark_column, entry.max_out_of_orderness)
1140                {
1141                    let extractor = laminar_core::time::EventTimeExtractor::from_column(col)
1142                        .with_mode(laminar_core::time::ExtractionMode::Max);
1143                    let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1144                        .is_processing_time
1145                        .load(std::sync::atomic::Ordering::Relaxed)
1146                    {
1147                        Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1148                    } else {
1149                        Box::new(
1150                            laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(dur),
1151                        )
1152                    };
1153                    let id = source_ids.len();
1154                    source_ids.insert(name.clone(), id);
1155                    watermark_states.insert(
1156                        name.clone(),
1157                        SourceWatermarkState {
1158                            extractor,
1159                            generator,
1160                            column: col.clone(),
1161                        },
1162                    );
1163                }
1164                source_entries_for_wm.insert(name, entry);
1165            }
1166        }
1167
1168        // Fallback watermark path for sources that set `event_time_column`
1169        // without an SQL `WATERMARK FOR` clause — exercised by the
1170        // programmatic API (`handle.set_event_time_column`) and by the
1171        // connector-property wiring above. Uses the bound from
1172        // `source.max_out_of_orderness()` if one was wired from connector
1173        // properties (e.g. Kafka `max.out.of.orderness.ms`), otherwise
1174        // falls back to `Duration::ZERO`.
1175        for name in self.catalog.list_sources() {
1176            if watermark_states.contains_key(&name) {
1177                continue;
1178            }
1179            if let Some(entry) = self.catalog.get_source(&name) {
1180                if let Some(col) = entry.source.event_time_column() {
1181                    let extractor = laminar_core::time::EventTimeExtractor::from_column(&col)
1182                        .with_mode(laminar_core::time::ExtractionMode::Max);
1183                    let ooo_bound = entry
1184                        .source
1185                        .max_out_of_orderness()
1186                        .unwrap_or(std::time::Duration::ZERO);
1187                    let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1188                        .is_processing_time
1189                        .load(std::sync::atomic::Ordering::Relaxed)
1190                    {
1191                        Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1192                    } else {
1193                        Box::new(
1194                            laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(
1195                                ooo_bound,
1196                            ),
1197                        )
1198                    };
1199                    let id = source_ids.len();
1200                    source_ids.insert(name.clone(), id);
1201                    watermark_states.insert(
1202                        name.clone(),
1203                        SourceWatermarkState {
1204                            extractor,
1205                            generator,
1206                            column: col,
1207                        },
1208                    );
1209                }
1210            }
1211        }
1212
1213        let tracker = if source_ids.is_empty() {
1214            None
1215        } else {
1216            Some(laminar_core::time::WatermarkTracker::new(source_ids.len()))
1217        };
1218
1219        let max_poll = self.config.default_buffer_size.min(1024);
1220        let checkpoint_interval = self
1221            .config
1222            .checkpoint
1223            .as_ref()
1224            .and_then(|c| c.interval_ms)
1225            .map(std::time::Duration::from_millis);
1226
1227        tracing::info!(
1228            sources = sources.len(),
1229            sinks = sinks.len(),
1230            streams = stream_regs.len(),
1231            subscriptions = stream_sources.len(),
1232            watermark_sources = source_ids.len(),
1233            "Starting event-driven connector pipeline"
1234        );
1235
1236        // Build pipeline config.
1237        // Embedded mode (no external connectors): zero batch window for
1238        // minimal latency — data is processed as soon as it arrives.
1239        // Connector mode: 5ms batch window amortizes SQL overhead across
1240        // high-throughput external sources (Kafka, CDC).
1241        let drain_budget_ns = self.config.pipeline_drain_budget_ns.unwrap_or(1_000_000);
1242        let query_budget_ns = self.config.pipeline_query_budget_ns.unwrap_or(8_000_000);
1243        let pipeline_config = PipelineConfig {
1244            max_poll_records: max_poll,
1245            channel_capacity: self.config.pipeline_channel_capacity.unwrap_or(64),
1246            fallback_poll_interval: if has_external {
1247                std::time::Duration::from_millis(10)
1248            } else {
1249                std::time::Duration::from_millis(1)
1250            },
1251            checkpoint_interval,
1252            batch_window: self
1253                .config
1254                .pipeline_batch_window
1255                .unwrap_or(if has_external {
1256                    std::time::Duration::from_millis(5)
1257                } else {
1258                    std::time::Duration::ZERO
1259                }),
1260            // Tracks CheckpointConfig::default().alignment_timeout.
1261            // TODO: expose alignment_timeout_ms in LaminarDbConfig.checkpoint
1262            // so users can configure this.
1263            barrier_alignment_timeout: std::time::Duration::from_secs(30),
1264            delivery_guarantee: self.config.delivery_guarantee,
1265            // cycle_budget is a soft cap for logging; ensure it's at least
1266            // drain + query so sub-budgets can actually be used.
1267            cycle_budget_ns: 10_000_000_u64.max(drain_budget_ns + query_budget_ns),
1268            drain_budget_ns,
1269            query_budget_ns,
1270            background_budget_ns: 5_000_000, // 5ms
1271            max_input_buf_batches: self.config.pipeline_max_input_buf_batches.unwrap_or(256),
1272            max_input_buf_bytes: self.config.pipeline_max_input_buf_bytes,
1273            backpressure_policy: self.config.pipeline_backpressure_policy,
1274        };
1275
1276        // Validate delivery guarantee constraints.
1277        {
1278            use laminar_connectors::connector::DeliveryGuarantee;
1279
1280            if pipeline_config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1281                for src in &sources {
1282                    if !src.supports_replay {
1283                        return Err(DbError::Config(format!(
1284                            "[LDB-5030] exactly-once requires all sources to support replay, \
1285                             but source '{}' does not. Use at-least-once or remove this source.",
1286                            src.name
1287                        )));
1288                    }
1289                }
1290                for (name, handle, _, _, _) in &sinks {
1291                    if !handle.exactly_once() {
1292                        return Err(DbError::Config(format!(
1293                            "[LDB-5031] exactly-once requires all sinks to support \
1294                             exactly-once semantics, but sink '{name}' does not. \
1295                             Use at-least-once or configure a transactional sink."
1296                        )));
1297                    }
1298                }
1299                if pipeline_config.checkpoint_interval.is_none() {
1300                    return Err(DbError::Config(
1301                        "[LDB-5032] exactly-once requires checkpointing to be enabled. \
1302                         Set checkpoint.interval.ms in the pipeline configuration."
1303                            .into(),
1304                    ));
1305                }
1306            } else if pipeline_config.delivery_guarantee == DeliveryGuarantee::AtLeastOnce {
1307                let has_non_replayable = sources.iter().any(|s| !s.supports_replay);
1308                let has_eo_sink = sinks.iter().any(|(_, h, _, _, _)| h.exactly_once());
1309                if has_non_replayable && has_eo_sink {
1310                    tracing::warn!(
1311                        "[LDB-5033] pipeline has exactly-once sinks but some sources \
1312                         do not support replay — effective guarantee is at-most-once \
1313                         for events from non-replayable sources"
1314                    );
1315                }
1316            }
1317        }
1318
1319        let shutdown = self.shutdown_signal.clone();
1320
1321        // Build the PipelineCallback implementation that bridges to db.rs internals.
1322        let pipeline_watermark = Arc::clone(&self.pipeline_watermark);
1323        let coordinator = Arc::clone(&self.coordinator);
1324        let table_store_for_loop = self.table_store.clone();
1325        // Compute a pipeline hash for change detection across checkpoints.
1326        let pipeline_hash = {
1327            use std::hash::{Hash, Hasher};
1328            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1329            for reg in stream_regs.values() {
1330                reg.name.hash(&mut hasher);
1331                reg.query_sql.hash(&mut hasher);
1332            }
1333            for name in source_regs.keys() {
1334                name.hash(&mut hasher);
1335            }
1336            for name in sink_regs.keys() {
1337                name.hash(&mut hasher);
1338            }
1339            Some(hasher.finish())
1340        };
1341
1342        // Wire per-query budget and input buffer cap from pipeline config.
1343        graph.set_query_budget_ns(pipeline_config.query_budget_ns);
1344        graph.set_max_input_buf_batches(pipeline_config.max_input_buf_batches);
1345        graph.set_max_input_buf_bytes(pipeline_config.max_input_buf_bytes);
1346        graph.set_backpressure_policy(pipeline_config.backpressure_policy);
1347
1348        let sinks_pending_filter_count = sinks
1349            .iter()
1350            .filter(|(_, _, filter_sql, _, _)| filter_sql.is_some())
1351            .count();
1352
1353        let source_name_arcs: rustc_hash::FxHashMap<usize, Arc<str>> = source_ids
1354            .iter()
1355            .map(|(name, &sid)| (sid, Arc::<str>::from(name.as_str())))
1356            .collect();
1357        let source_wms_buf = rustc_hash::FxHashMap::with_capacity_and_hasher(
1358            source_name_arcs.len(),
1359            rustc_hash::FxBuildHasher,
1360        );
1361
1362        let prom = self
1363            .engine_metrics
1364            .lock()
1365            .clone()
1366            .expect("EngineMetrics must be set before start()");
1367
1368        // Force-checkpoint channel: `db.checkpoint()` on the caller side
1369        // hands a oneshot sender across to the callback, which captures
1370        // operator state and replies. Installed here so both ends exist
1371        // before the compute thread spawns. Crossfire for consistency
1372        // with the rest of the pipeline plumbing (sink_task, coordinator).
1373        let (force_ckpt_tx, force_ckpt_rx) = crossfire::mpsc::bounded_async::<
1374            crate::db::ForceCheckpointReply,
1375        >(crate::db::FORCE_CHECKPOINT_CHANNEL_CAPACITY);
1376        *self.force_ckpt_tx.lock() = Some(force_ckpt_tx);
1377
1378        let callback = crate::pipeline_callback::ConnectorPipelineCallback {
1379            graph,
1380            stream_sources,
1381            sinks,
1382            watermark_states,
1383            source_entries_for_wm,
1384            source_ids,
1385            source_name_arcs,
1386            source_wms_buf,
1387            tracker,
1388            prom,
1389            pipeline_watermark,
1390            coordinator,
1391            table_sources,
1392            table_store: table_store_for_loop,
1393            mv_store_has_any: self.mv_store.read().has_any_handle(),
1394            mv_store: self.mv_store.clone(),
1395            lookup_registry: Arc::clone(&self.lookup_registry),
1396            filter_ctx: laminar_sql::create_session_context(),
1397            compiled_sink_filters: Vec::new(),
1398            pending_sink_filter_compiles: sinks_pending_filter_count,
1399            last_checkpoint: std::time::Instant::now(),
1400            checkpoint_interval: self
1401                .config
1402                .checkpoint
1403                .as_ref()
1404                .and_then(|c| c.interval_ms)
1405                .map(std::time::Duration::from_millis),
1406            pipeline_hash,
1407            delivery_guarantee: pipeline_config.delivery_guarantee,
1408            serialization_timeout: std::time::Duration::from_secs(120),
1409            sink_event_rx,
1410            sink_timed_out: false,
1411            shutdown_signal: Arc::clone(&self.shutdown_signal),
1412            #[cfg(feature = "cluster-unstable")]
1413            cluster_controller: self.cluster_controller.lock().clone(),
1414            #[cfg(feature = "cluster-unstable")]
1415            last_follower_epoch: None,
1416            force_ckpt_rx: Some(force_ckpt_rx),
1417        };
1418
1419        // Start the streaming coordinator on a dedicated compute thread.
1420        // Source tasks were already spawned on the main tokio runtime in
1421        // StreamingCoordinator::new(). The coordinator communicates with
1422        // them via crossfire mpsc which works across runtimes.
1423        {
1424            // Control channel for live DDL (add/drop stream).
1425            let (control_tx, control_rx) =
1426                crossfire::mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1427            *self.control_tx.lock() = Some(control_tx);
1428
1429            let coordinator = crate::pipeline::StreamingCoordinator::new(
1430                sources,
1431                pipeline_config,
1432                Arc::clone(&shutdown),
1433                control_rx,
1434            )
1435            .await?;
1436
1437            let (done_tx, done_rx) = crossfire::oneshot::oneshot::<()>();
1438            let (startup_tx, startup_rx) = crossfire::oneshot::oneshot::<Result<(), String>>();
1439            match std::thread::Builder::new()
1440                .name("laminar-compute".into())
1441                .spawn(move || {
1442                    let rt = match tokio::runtime::Builder::new_current_thread()
1443                        .enable_all()
1444                        .build()
1445                    {
1446                        Ok(rt) => {
1447                            startup_tx.send(Ok(()));
1448                            rt
1449                        }
1450                        Err(e) => {
1451                            startup_tx.send(Err(format!("compute runtime: {e}")));
1452                            return;
1453                        }
1454                    };
1455                    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1456                        rt.block_on(async move {
1457                            coordinator.run(callback).await;
1458                        });
1459                    }));
1460                    if let Err(panic) = result {
1461                        let msg = panic
1462                            .downcast_ref::<String>()
1463                            .map(String::as_str)
1464                            .or_else(|| panic.downcast_ref::<&str>().copied())
1465                            .unwrap_or("unknown");
1466                        tracing::error!(panic = msg, "laminar-compute thread panicked");
1467                        // done_tx dropped → done_rx returns Err → logged by watcher task
1468                        return;
1469                    }
1470                    done_tx.send(());
1471                }) {
1472                Ok(_) => {}
1473                Err(e) => {
1474                    return Err(DbError::Config(format!(
1475                        "failed to spawn compute thread: {e}"
1476                    )));
1477                }
1478            }
1479
1480            // Wait for the thread to confirm the runtime started.
1481            match startup_rx.await {
1482                Ok(Ok(())) => {}
1483                Ok(Err(e)) => return Err(DbError::Config(e)),
1484                Err(_) => {
1485                    return Err(DbError::Config(
1486                        "compute thread exited before starting runtime".into(),
1487                    ));
1488                }
1489            }
1490
1491            let watcher_state = Arc::clone(&self.state);
1492            let watcher_shutdown = Arc::clone(&self.shutdown_signal);
1493            let handle = tokio::spawn(async move {
1494                if done_rx.await.is_err() {
1495                    tracing::error!("laminar-compute thread exited unexpectedly");
1496                    watcher_state.store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
1497                    watcher_shutdown.notify_one();
1498                }
1499            });
1500
1501            *self.runtime_handle.lock() = Some(handle);
1502        }
1503        Ok(())
1504    }
1505
1506    /// Shut down the streaming pipeline gracefully.
1507    ///
1508    /// Signals the processing loop to stop, waits for it to complete
1509    /// (with a timeout), then transitions to `Stopped`.
1510    /// This is idempotent -- calling it multiple times is safe.
1511    ///
1512    /// # Errors
1513    ///
1514    /// Returns an error if shutdown encounters an error.
1515    pub async fn shutdown(&self) -> Result<(), DbError> {
1516        let current = self.state.load(std::sync::atomic::Ordering::Acquire);
1517        if current == STATE_STOPPED || current == STATE_SHUTTING_DOWN {
1518            return Ok(());
1519        }
1520
1521        self.state
1522            .store(STATE_SHUTTING_DOWN, std::sync::atomic::Ordering::Release);
1523
1524        // Drop the force-checkpoint sender before signalling shutdown.
1525        // Any subsequent `db.checkpoint()` call falls through to the
1526        // (now-defunct) coordinator path, which errors cleanly instead
1527        // of hanging on a oneshot that will never be answered.
1528        *self.force_ckpt_tx.lock() = None;
1529
1530        // Signal the runtime loop to stop
1531        self.shutdown_signal.notify_one();
1532
1533        // Await the runtime handle (with timeout)
1534        let handle = self.runtime_handle.lock().take();
1535        if let Some(handle) = handle {
1536            match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
1537                Ok(Ok(())) => {
1538                    tracing::info!("Pipeline shut down cleanly");
1539                }
1540                Ok(Err(e)) => {
1541                    tracing::warn!(error = %e, "Pipeline task panicked during shutdown");
1542                }
1543                Err(_) => {
1544                    tracing::warn!("Pipeline shutdown timed out after 10s");
1545                }
1546            }
1547        }
1548
1549        self.state
1550            .store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
1551        self.close();
1552        Ok(())
1553    }
1554}
1555
1556#[cfg(test)]
1557mod resolver_tests {
1558    use super::resolve_stream_output_schemas;
1559    use crate::connector_manager::StreamRegistration;
1560    use arrow_schema::{DataType, Field, Schema};
1561    use datafusion::datasource::empty::EmptyTable;
1562    use datafusion::prelude::SessionContext;
1563    use std::sync::Arc;
1564    use std::time::Duration;
1565
1566    fn ctx_with_payments() -> SessionContext {
1567        let ctx = SessionContext::new();
1568        let schema = Arc::new(Schema::new(vec![
1569            Field::new("region", DataType::Utf8, false),
1570            Field::new("method", DataType::Utf8, false),
1571            Field::new("amount_usd", DataType::Float64, false),
1572            Field::new("status", DataType::Utf8, false),
1573            Field::new(
1574                "event_time",
1575                DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1576                false,
1577            ),
1578        ]));
1579        ctx.register_table("payments", Arc::new(EmptyTable::new(schema)))
1580            .unwrap();
1581        ctx.register_udf(datafusion_expr::ScalarUDF::from(
1582            laminar_sql::datafusion::TumbleWindowStart::new(),
1583        ));
1584        ctx
1585    }
1586
1587    fn reg(name: &str, sql: &str, windowed: bool) -> StreamRegistration {
1588        StreamRegistration {
1589            name: name.to_string(),
1590            query_sql: sql.to_string(),
1591            emit_clause: None,
1592            // Resolver only checks `is_some()`; the size doesn't matter.
1593            window_config: windowed.then(|| {
1594                laminar_sql::translator::WindowOperatorConfig::tumbling(
1595                    "event_time".into(),
1596                    Duration::ZERO,
1597                )
1598            }),
1599            order_config: None,
1600        }
1601    }
1602
1603    #[tokio::test]
1604    async fn windowed_stream_gets_window_start_end_prefix() {
1605        let ctx = ctx_with_payments();
1606        let mut regs = std::collections::HashMap::new();
1607        regs.insert(
1608            "agg".to_string(),
1609            reg(
1610                "agg",
1611                "SELECT region, COUNT(*) AS n FROM payments \
1612                 GROUP BY tumble(event_time, INTERVAL '1' MINUTE), region",
1613                true,
1614            ),
1615        );
1616
1617        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
1618        let names: Vec<&str> = out["agg"]
1619            .fields()
1620            .iter()
1621            .map(|f| f.name().as_str())
1622            .collect();
1623        assert_eq!(&names[..2], &["window_start", "window_end"]);
1624        assert_eq!(out["agg"].field(0).data_type(), &DataType::Int64);
1625        assert!(names.contains(&"region") && names.contains(&"n"));
1626    }
1627
1628    #[tokio::test]
1629    async fn non_windowed_stream_has_no_prefix() {
1630        let ctx = ctx_with_payments();
1631        let mut regs = std::collections::HashMap::new();
1632        regs.insert(
1633            "passthrough".to_string(),
1634            reg(
1635                "passthrough",
1636                "SELECT region, amount_usd FROM payments",
1637                false,
1638            ),
1639        );
1640
1641        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
1642        let names: Vec<&str> = out["passthrough"]
1643            .fields()
1644            .iter()
1645            .map(|f| f.name().as_str())
1646            .collect();
1647        assert_eq!(names, vec!["region", "amount_usd"]);
1648    }
1649
1650    #[tokio::test]
1651    async fn chained_streams_resolve_via_iterative_planning() {
1652        // `b` reads from `a`; iteration order doesn't matter — the loop
1653        // re-tries `b` after `a` is registered.
1654        let ctx = ctx_with_payments();
1655        let mut regs = std::collections::HashMap::new();
1656        regs.insert(
1657            "b".to_string(),
1658            reg("b", "SELECT region, n + 1 AS n_plus_one FROM a", false),
1659        );
1660        regs.insert(
1661            "a".to_string(),
1662            reg(
1663                "a",
1664                "SELECT region, COUNT(*) AS n FROM payments GROUP BY region",
1665                false,
1666            ),
1667        );
1668
1669        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
1670        let b_names: Vec<&str> = out["b"]
1671            .fields()
1672            .iter()
1673            .map(|f| f.name().as_str())
1674            .collect();
1675        assert_eq!(b_names, vec!["region", "n_plus_one"]);
1676
1677        // Placeholders must not leak into the public ctx — `subscribe()`
1678        // is the data path for streams; `SELECT * FROM <stream>` should
1679        // not silently return zero rows from a left-over EmptyTable.
1680        assert!(!ctx.table_exist("a").unwrap_or(false));
1681        assert!(!ctx.table_exist("b").unwrap_or(false));
1682    }
1683
1684    #[tokio::test]
1685    async fn unresolvable_streams_surface_planner_error() {
1686        let ctx = ctx_with_payments();
1687        let mut regs = std::collections::HashMap::new();
1688        // Cycle: a→b, b→a. Planning stalls; we report the unresolved set.
1689        regs.insert("a".to_string(), reg("a", "SELECT * FROM b", false));
1690        regs.insert("b".to_string(), reg("b", "SELECT * FROM a", false));
1691
1692        let err = resolve_stream_output_schemas(&ctx, &regs)
1693            .await
1694            .unwrap_err()
1695            .to_string();
1696        assert!(err.contains("unresolvable stream dependency"), "got: {err}");
1697        assert!(err.contains('a') && err.contains('b'), "got: {err}");
1698    }
1699}