Skip to main content

laminar_db/
pipeline_lifecycle.rs

1//! Pipeline lifecycle management: start, close, shutdown.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use laminar_core::streaming;
9use rustc_hash::FxHashMap;
10
11use crate::db::{DbState, LaminarDB, SourceWatermarkState};
12use crate::error::DbError;
13
14/// Resolves each `CREATE STREAM`'s output Arrow schema by planning its SQL.
15/// Temporary `EmptyTable` placeholders let downstream streams plan against
16/// upstream streams; they are removed before returning.
17/// Resolve a query's output schema by planning it. `DataFusion` can't lower
18/// `ASOF JOIN`, so on failure retry with the schema-equivalent rewrite. `None`
19/// if it still can't be planned (e.g. a dependency isn't registered yet).
20pub(crate) async fn plan_output_schema(
21    ctx: &datafusion::prelude::SessionContext,
22    sql: &str,
23) -> Option<arrow_schema::SchemaRef> {
24    let plan = if let Ok(plan) = ctx.state().create_logical_plan(sql).await {
25        plan
26    } else {
27        let rewritten = crate::sql_analysis::rewrite_asof_joins_for_planning(sql)?;
28        ctx.state().create_logical_plan(&rewritten).await.ok()?
29    };
30    let fields: Vec<_> = plan
31        .schema()
32        .fields()
33        .iter()
34        .map(|f| (**f).clone())
35        .collect();
36    Some(Arc::new(arrow_schema::Schema::new(fields)))
37}
38
39async fn resolve_stream_output_schemas(
40    ctx: &datafusion::prelude::SessionContext,
41    stream_regs: &HashMap<String, crate::connector_manager::StreamRegistration>,
42) -> Result<HashMap<String, arrow_schema::SchemaRef>, DbError> {
43    use datafusion::datasource::empty::EmptyTable;
44
45    let mut out: HashMap<String, arrow_schema::SchemaRef> =
46        HashMap::with_capacity(stream_regs.len());
47    let mut pending: Vec<&crate::connector_manager::StreamRegistration> =
48        stream_regs.values().collect();
49    // Placeholders we own and must clean up; pre-existing tables are left alone.
50    let mut placeholders: Vec<String> = Vec::new();
51
52    let result: Result<(), DbError> = async {
53        while !pending.is_empty() {
54            let mut next: Vec<&crate::connector_manager::StreamRegistration> = Vec::new();
55            let mut progressed = false;
56            for reg in pending {
57                let Some(schema) = plan_output_schema(ctx, &reg.query_sql).await else {
58                    next.push(reg);
59                    continue;
60                };
61
62                if !ctx.table_exist(&reg.name).unwrap_or(false) {
63                    ctx.register_table(&reg.name, Arc::new(EmptyTable::new(schema.clone())))
64                        .map_err(|e| {
65                            DbError::Pipeline(format!(
66                                "could not register placeholder for stream '{}': {e}",
67                                reg.name
68                            ))
69                        })?;
70                    placeholders.push(reg.name.clone());
71                }
72                out.insert(reg.name.clone(), schema);
73                progressed = true;
74            }
75
76            if !progressed {
77                let mut unresolved: Vec<&str> = next.iter().map(|r| r.name.as_str()).collect();
78                unresolved.sort_unstable();
79                // Report the rewritten-plan error when the query is an ASOF join:
80                // the raw plan only ever says "AsOf unsupported", which masks the
81                // real blocker (the rewrite is what we actually plan against).
82                let sql = &next[0].query_sql;
83                let err = match crate::sql_analysis::rewrite_asof_joins_for_planning(sql) {
84                    Some(rewritten) => ctx.state().create_logical_plan(&rewritten).await.err(),
85                    None => ctx.state().create_logical_plan(sql).await.err(),
86                }
87                .map_or_else(|| "unknown error".to_string(), |e| e.to_string());
88                return Err(DbError::Pipeline(format!(
89                    "unresolvable stream dependency among [{}]: {err}",
90                    unresolved.join(", ")
91                )));
92            }
93            pending = next;
94        }
95        Ok(())
96    }
97    .await;
98
99    for name in &placeholders {
100        let _ = ctx.deregister_table(name);
101    }
102
103    result.map(|()| out)
104}
105
106impl LaminarDB {
107    /// Shut down the database gracefully.
108    pub fn close(&self) {
109        self.shutdown
110            .store(true, std::sync::atomic::Ordering::Relaxed);
111    }
112
113    /// Returns `true` if the database has been shut down.
114    pub fn is_closed(&self) -> bool {
115        self.shutdown.load(std::sync::atomic::Ordering::Relaxed)
116    }
117
118    pub(crate) fn is_pipeline_running(&self) -> bool {
119        matches!(
120            DbState::load(&self.state),
121            DbState::Running | DbState::Starting | DbState::ShuttingDown
122        )
123    }
124
125    /// Start the streaming pipeline. Idempotent if already running.
126    ///
127    /// Activates registered connectors and spawns the background processing
128    /// task. On failure the instance is unwound back to `Created` so the
129    /// caller can retry after fixing the offending config.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the pipeline cannot be started.
134    pub async fn start(&self) -> Result<(), DbError> {
135        match DbState::load(&self.state) {
136            DbState::Running | DbState::Starting => return Ok(()),
137            DbState::Stopped => {
138                return Err(DbError::InvalidOperation(
139                    "Cannot start a stopped pipeline. Create a new LaminarDB instance.".into(),
140                ));
141            }
142            DbState::ShuttingDown => {
143                return Err(DbError::InvalidOperation(
144                    "cannot start pipeline: shutdown/stop in progress".into(),
145                ));
146            }
147            DbState::Created => {}
148        }
149
150        DbState::Starting.store(&self.state);
151
152        // Fallback for embedded use without a server.
153        {
154            let mut guard = self.engine_metrics.lock();
155            if guard.is_none() {
156                *guard = Some(Arc::new(crate::engine_metrics::EngineMetrics::new(
157                    &prometheus::Registry::new(),
158                )));
159            }
160        }
161
162        // Rebuild any catalog objects this node lacks from the shared manifest
163        // before wiring the pipeline — replaying DDL here is the same pre-start
164        // path users take when they CREATE before start(). Best-effort/no-op
165        // outside cluster mode or with no manifest stored.
166        #[cfg(feature = "cluster")]
167        self.restore_catalog_from_manifest().await;
168
169        match self.start_inner().await {
170            Ok(()) => {
171                DbState::Running.store(&self.state);
172                Ok(())
173            }
174            Err(e) => {
175                // Unwind so a retry actually re-runs startup instead of
176                // silently succeeding against a wedged Starting state.
177                DbState::Created.store(&self.state);
178                Err(e)
179            }
180        }
181    }
182
183    #[allow(clippy::too_many_lines)]
184    async fn start_inner(&self) -> Result<(), DbError> {
185        // Snapshot connector registrations under the lock
186        let (source_regs, sink_regs, stream_regs, table_regs, has_external) = {
187            let mgr = self.connector_manager.lock();
188            (
189                mgr.sources().clone(),
190                mgr.sinks().clone(),
191                mgr.streams().clone(),
192                mgr.tables().clone(),
193                mgr.has_external_connectors(),
194            )
195        };
196
197        // Log which sources have external connectors for debugging.
198        for (name, reg) in &source_regs {
199            tracing::debug!(source = %name, connector_type = ?reg.connector_type, "Registered source");
200        }
201        for (name, reg) in &sink_regs {
202            tracing::debug!(sink = %name, connector_type = ?reg.connector_type, "Registered sink");
203        }
204
205        // Initialize checkpoint coordinator (shared across all pipeline modes)
206        if let Some(ref cp_config) = self.config.checkpoint {
207            use crate::checkpoint_coordinator::{
208                CheckpointConfig as CkpConfig, CheckpointCoordinator,
209            };
210
211            let max_retained = cp_config.max_retained.unwrap_or(3);
212            let vnode_count = self.vnode_registry.lock().as_ref().map_or(
213                laminar_core::storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
214                |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
215            );
216
217            // Resolve once; both the checkpoint store and the decision
218            // store want the same root.
219            let data_dir = cp_config
220                .data_dir
221                .clone()
222                .or_else(|| self.config.storage_dir.clone())
223                .unwrap_or_else(|| std::path::PathBuf::from("./data"));
224
225            let (store, decision_backing): (
226                Box<dyn laminar_core::storage::CheckpointStore>,
227                Arc<dyn object_store::ObjectStore>,
228            ) = if let Some(ref url) = self.config.object_store_url {
229                // The builder roots the store at the URL's path prefix,
230                // so the checkpoint store (and the decision store
231                // sharing `obj`) need no extra key prefix.
232                let obj = laminar_core::storage::object_store_builder::build_object_store(
233                    url,
234                    &self.config.object_store_options,
235                )
236                .map_err(|e| DbError::Config(format!("object store: {e}")))?;
237                let cs = laminar_core::storage::checkpoint_store::ObjectStoreCheckpointStore::new(
238                    Arc::clone(&obj),
239                    String::new(),
240                    max_retained,
241                )
242                .with_vnode_count(vnode_count);
243                (Box::new(cs), obj)
244            } else {
245                std::fs::create_dir_all(&data_dir).map_err(|e| {
246                    DbError::Config(format!("data dir {}: {e}", data_dir.display()))
247                })?;
248                let obj: Arc<dyn object_store::ObjectStore> = Arc::new(
249                    object_store::local::LocalFileSystem::new_with_prefix(&data_dir)
250                        .map_err(|e| DbError::Config(format!("local fs: {e}")))?,
251                );
252                let cs = laminar_core::storage::checkpoint_store::FileSystemCheckpointStore::new(
253                    &data_dir,
254                    max_retained,
255                )
256                .with_vnode_count(vnode_count);
257                (Box::new(cs), obj)
258            };
259
260            let defaults = CkpConfig::default();
261            let config = CkpConfig {
262                interval: cp_config.interval_ms.map(std::time::Duration::from_millis),
263                max_retained,
264                max_in_flight_epochs: cp_config
265                    .max_in_flight_epochs
266                    .unwrap_or(defaults.max_in_flight_epochs),
267                max_staged_bytes: cp_config
268                    .max_staged_bytes
269                    .unwrap_or(defaults.max_staged_bytes),
270                ..defaults
271            };
272            let mut coord = CheckpointCoordinator::new(config, store).await?;
273            if let Some(ref prom) = *self.engine_metrics.lock() {
274                coord.set_metrics(Arc::clone(prom));
275            }
276
277            // Cluster mode: install the controller so the coordinator
278            // can consult it for leader / follower role once the
279            // barrier-protocol flow change lands.
280            #[cfg(feature = "cluster")]
281            if let Some(controller) = self.cluster_controller.lock().clone() {
282                coord.set_cluster_controller(controller);
283            }
284
285            // Durability gate wiring: if both the state backend and vnode
286            // registry are installed, tell the coordinator which vnodes
287            // this instance owns. In cluster mode the owner id comes from
288            // the cluster controller; in single-instance mode we assume
289            // a `single_owner` registry and pass `NodeId(0)`.
290            if let (Some(backend), Some(registry)) = (
291                self.state_backend.lock().clone(),
292                self.vnode_registry.lock().clone(),
293            ) {
294                let owner = {
295                    #[cfg(feature = "cluster")]
296                    {
297                        self.cluster_controller
298                            .lock()
299                            .as_ref()
300                            .map_or(laminar_core::state::NodeId(0), |c| {
301                                laminar_core::state::NodeId(c.instance_id().0)
302                            })
303                    }
304                    #[cfg(not(feature = "cluster"))]
305                    {
306                        laminar_core::state::NodeId(0)
307                    }
308                };
309                // Both sides of the split-brain guard must see the same
310                // generation. The coordinator stamps outgoing writes with
311                // it (caller side), and the backend rejects incoming
312                // writes below it (authority side).
313                // Without the backend call the authoritative version
314                // stays at 0 and the fence is a silent no-op — the
315                // registry's version carries the snapshot generation
316                // across a restart, but only in-memory.
317                let version = registry.assignment_version();
318                backend.set_authoritative_version(version);
319                coord.set_state_backend(backend);
320                coord.set_assignment_version(version);
321                coord.set_vnode_set(laminar_core::state::owned_vnodes(&registry, owner));
322                // Leader's gate checks the full registry — across all
323                // instances — so the 2PC commit only fires when every
324                // follower has also persisted its markers.
325                coord.set_gate_vnode_set((0..registry.vnode_count()).collect());
326            }
327
328            // Decision store: caller-supplied (cluster) wins, else
329            // auto-wire one on the same backing so recovery can read
330            // the commit marker.
331            let ds = {
332                #[cfg(feature = "cluster")]
333                {
334                    self.decision_store.lock().clone().unwrap_or_else(|| {
335                        Arc::new(
336                            laminar_core::checkpoint_decision::CheckpointDecisionStore::new(
337                                Arc::clone(&decision_backing),
338                            ),
339                        )
340                    })
341                }
342                #[cfg(not(feature = "cluster"))]
343                {
344                    Arc::new(
345                        laminar_core::checkpoint_decision::CheckpointDecisionStore::new(
346                            Arc::clone(&decision_backing),
347                        ),
348                    )
349                }
350            };
351            coord.set_decision_store(ds);
352
353            // Reconcile any Pending sinks from the last manifest against
354            // the durable commit marker before accepting new traffic.
355            coord.reconcile_prepared_on_init().await;
356
357            *self.coordinator.lock().await = Some(coord);
358        }
359
360        if has_external || !stream_regs.is_empty() {
361            tracing::info!(
362                sources = source_regs.len(),
363                sinks = sink_regs.len(),
364                streams = stream_regs.len(),
365                tables = table_regs.len(),
366                has_external,
367                "Starting pipeline"
368            );
369            self.start_connector_pipeline(
370                source_regs,
371                sink_regs,
372                stream_regs,
373                table_regs,
374                has_external,
375            )
376            .await?;
377        } else {
378            tracing::info!(
379                sources = source_regs.len(),
380                sinks = sink_regs.len(),
381                "Starting in embedded (in-memory) mode — no streams"
382            );
383        }
384
385        Ok(())
386    }
387
388    /// Build and start the unified pipeline with sources, sinks, and streams.
389    ///
390    /// Handles both embedded (in-memory only) and connector-backed sources
391    /// through a single code path. Connector-less sources are wrapped as
392    /// `CatalogSourceConnector` to participate in the pipeline alongside
393    /// external connectors (Kafka, CDC, etc.).
394    ///
395    /// Each source runs as a tokio task pushing batches via mpsc channel
396    /// to a `StreamingCoordinator` that drives SQL cycles, sink writes,
397    /// and checkpoint coordination.
398    #[allow(clippy::too_many_lines)]
399    async fn start_connector_pipeline(
400        &self,
401        source_regs: HashMap<String, crate::connector_manager::SourceRegistration>,
402        sink_regs: HashMap<String, crate::connector_manager::SinkRegistration>,
403        stream_regs: HashMap<String, crate::connector_manager::StreamRegistration>,
404        table_regs: HashMap<String, crate::connector_manager::TableRegistration>,
405        has_external: bool,
406    ) -> Result<(), DbError> {
407        use crate::connector_manager::{
408            build_sink_config, build_source_config, build_table_config,
409        };
410        use crate::operator_graph::OperatorGraph;
411        use crate::pipeline::{PipelineConfig, SourceRegistration};
412        use laminar_connectors::reference::{ReferenceTableSource, RefreshMode};
413
414        // Build OperatorGraph context mirroring the rules + partition
415        // count installed on `LaminarDB::ctx` via the builder.
416        let ctx = {
417            use datafusion::execution::SessionStateBuilder;
418            let mut session_config = laminar_sql::datafusion::base_session_config();
419            if let Some(n) = self.pipeline_target_partitions {
420                session_config = session_config.with_target_partitions(n);
421            }
422            let query_planner = Arc::clone(self.ctx.state().query_planner());
423            let mut state_builder = SessionStateBuilder::new()
424                .with_config(session_config)
425                .with_default_features()
426                .with_query_planner(query_planner);
427            for rule in self.physical_optimizer_rules.iter() {
428                state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
429            }
430            let context =
431                datafusion::prelude::SessionContext::new_with_state(state_builder.build());
432            for rule in self.ctx.state().optimizers() {
433                context.add_optimizer_rule(Arc::clone(rule));
434            }
435            context
436        };
437        laminar_sql::register_streaming_functions(&ctx);
438
439        // Register lookup/reference tables in the operator graph's
440        // SessionContext so JOIN queries can resolve them.
441        let lookup_tables: Vec<(String, arrow::datatypes::SchemaRef)> = {
442            let ts = self.table_store.read();
443            ts.table_names()
444                .into_iter()
445                .filter_map(|name| {
446                    let schema = ts.table_schema(&name)?;
447                    Some((name, schema))
448                })
449                .collect()
450        };
451        for (name, schema) in lookup_tables {
452            let provider = crate::table_provider::ReferenceTableProvider::new(
453                name.clone(),
454                schema,
455                self.table_store.clone(),
456            );
457            if let Err(e) = ctx.register_table(&name, Arc::new(provider)) {
458                tracing::warn!(
459                    table = %name,
460                    error = %e,
461                    "failed to register lookup table in operator graph context"
462                );
463            }
464        }
465
466        let mut graph = OperatorGraph::new(ctx);
467        graph.set_max_state_bytes(self.config.max_state_bytes_per_operator);
468        graph.set_lookup_registry(Arc::clone(&self.lookup_registry));
469        if let Some(ref prom) = *self.engine_metrics.lock() {
470            graph.set_metrics(Arc::clone(prom));
471        }
472        if let (Some(runtime), Some(handle)) = (&self.ai_runtime, &self.ai_handle) {
473            graph.set_ai_runtime(Arc::clone(runtime), handle.clone());
474        }
475
476        // Install the cluster row-shuffle config if all the pieces are
477        // present (registry, sender, receiver, controller). Without any
478        // one of them, aggregate queries run single-node.
479        #[cfg(feature = "cluster")]
480        {
481            let sender = self.shuffle_sender.lock().clone();
482            let receiver = self.shuffle_receiver.lock().clone();
483            let registry = self.vnode_registry.lock().clone();
484            let controller = self.cluster_controller.lock().clone();
485            if let (Some(sender), Some(receiver), Some(registry), Some(controller)) =
486                (sender, receiver, registry, controller)
487            {
488                let self_id = laminar_core::state::NodeId(controller.instance_id().0);
489                graph.set_cluster_shuffle(crate::operator::sql_query::ClusterShuffleConfig {
490                    registry,
491                    sender,
492                    receiver,
493                    self_id,
494                });
495                // Share the DB's staged rehydration map so the graph applies
496                // rebalanced-in vnode state into operators each cycle.
497                graph.set_rehydration_handle(Arc::clone(&self.rehydrated_vnode_state));
498            }
499        }
500
501        // Register source schemas for ALL sources (both external connectors
502        // and catalog-bridge sources) so the graph can create empty
503        // placeholder tables when no data arrives in a given cycle.
504        for name in source_regs.keys() {
505            if let Some(entry) = self.catalog.get_source(name) {
506                graph.register_source_schema(name.clone(), entry.schema.clone());
507            }
508        }
509
510        // Partial (on-demand) lookup tables: route their enrichment joins to the
511        // async-decoupled operator. A table is partial iff it refreshes Manually
512        // — the same condition the on-demand phase below uses to register it as
513        // `RegisteredLookup::Partial`. Map each to its column names so the
514        // operator can disambiguate colliding output columns.
515        let partial_lookup_tables: rustc_hash::FxHashMap<String, Vec<String>> = table_regs
516            .values()
517            .filter(|r| matches!(r.refresh, Some(RefreshMode::Manual)))
518            .filter_map(|r| {
519                let schema = self.table_store.read().table_schema(&r.name)?;
520                let cols = schema.fields().iter().map(|f| f.name().clone()).collect();
521                Some((r.name.clone(), cols))
522            })
523            .collect();
524        graph.set_partial_lookup_tables(partial_lookup_tables);
525        // Ring-1 workers (lookup-enrich fetch, AI) spawn on the main runtime.
526        graph.set_runtime_handle(
527            self.ai_handle
528                .clone()
529                .unwrap_or_else(tokio::runtime::Handle::current),
530        );
531
532        for reg in stream_regs.values() {
533            graph.add_query(
534                reg.name.clone(),
535                reg.query_sql.clone(),
536                reg.emit_clause.clone(),
537                reg.window_config.clone(),
538                reg.order_config.clone(),
539                None,
540                reg.join_config.clone(),
541            );
542        }
543        // Surface any plan-time AI routing errors (unknown model, unsupported
544        // task, malformed AI query) collected during `add_query`.
545        graph.take_build_errors()?;
546
547        // Register temporal join tables as Versioned in the lookup registry
548        // so that temporal join operators can use persistent versioned state.
549        for tcfg in graph.temporal_join_configs() {
550            if self.lookup_registry.get_entry(&tcfg.table_name).is_none() {
551                // Get initial data. If none exists yet, use an empty batch
552                // with the correct schema from the catalog (not Schema::empty).
553                let initial_batch = self
554                    .table_store
555                    .read()
556                    .to_record_batch(&tcfg.table_name)
557                    .or_else(|| {
558                        self.catalog
559                            .get_source(&tcfg.table_name)
560                            .map(|e| RecordBatch::new_empty(e.schema.clone()))
561                    })
562                    .unwrap_or_else(|| {
563                        RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty()))
564                    });
565                let key_columns = vec![tcfg.table_key_column.clone()];
566                let key_indices: Vec<usize> = key_columns
567                    .iter()
568                    .filter_map(|k| initial_batch.schema().index_of(k).ok())
569                    .collect();
570
571                // When table_version_column is empty (translator couldn't resolve it
572                // from the AS OF clause), pick the first timestamp/int column that
573                // isn't the join key.
574                let resolved_version_col = if tcfg.table_version_column.is_empty() {
575                    let schema = initial_batch.schema();
576                    schema
577                        .fields()
578                        .iter()
579                        .find(|f| {
580                            f.name() != &tcfg.table_key_column
581                                && matches!(
582                                    f.data_type(),
583                                    arrow::datatypes::DataType::Int64
584                                        | arrow::datatypes::DataType::Timestamp(_, _)
585                                )
586                        })
587                        .map(|f| f.name().clone())
588                        .unwrap_or_default()
589                } else {
590                    tcfg.table_version_column.clone()
591                };
592
593                let Ok(version_col_idx) = initial_batch.schema().index_of(&resolved_version_col)
594                else {
595                    if !initial_batch.schema().fields().is_empty() {
596                        tracing::warn!(
597                            table=%tcfg.table_name,
598                            version_col=%resolved_version_col,
599                            "Version column not found in temporal table schema; \
600                             will resolve on first CDC batch"
601                        );
602                    }
603                    // Register with empty index — built on first CDC update.
604                    self.lookup_registry.register_versioned(
605                        &tcfg.table_name,
606                        laminar_sql::datafusion::VersionedLookupState {
607                            batch: initial_batch,
608                            index: Arc::new(
609                                laminar_sql::datafusion::lookup_join_exec::VersionedIndex::default(
610                                ),
611                            ),
612                            key_columns,
613                            version_column: resolved_version_col,
614                            stream_time_column: tcfg.stream_time_column.clone(),
615                            max_versions_per_key: usize::MAX,
616                        },
617                    );
618                    continue;
619                };
620                let index = Arc::new(
621                    laminar_sql::datafusion::lookup_join_exec::VersionedIndex::build(
622                        &initial_batch,
623                        &key_indices,
624                        version_col_idx,
625                        usize::MAX,
626                    )
627                    .unwrap_or_default(),
628                );
629                self.lookup_registry.register_versioned(
630                    &tcfg.table_name,
631                    laminar_sql::datafusion::VersionedLookupState {
632                        batch: initial_batch,
633                        index,
634                        key_columns,
635                        version_column: resolved_version_col,
636                        stream_time_column: tcfg.stream_time_column.clone(),
637                        max_versions_per_key: usize::MAX,
638                    },
639                );
640            }
641        }
642
643        // Grab the shared Prometheus registry (if set) so connectors can
644        // register their metrics on it.
645        let prom_registry = self.prometheus_registry.lock().clone();
646
647        // Build sources as owned SourceRegistrations (no Arc<Mutex>).
648        let mut sources: Vec<SourceRegistration> = Vec::new();
649        for (name, reg) in &source_regs {
650            if reg.connector_type.is_none() {
651                continue;
652            }
653            let mut config = build_source_config(reg)?;
654
655            // Pass the SQL-defined Arrow schema to the connector so it can
656            // deserialize records with the correct column names and types.
657            if let Some(entry) = self.catalog.get_source(name) {
658                let schema_str = crate::pipeline_callback::encode_arrow_schema(&entry.schema);
659                config.set("_arrow_schema".to_string(), schema_str);
660            }
661
662            #[cfg_attr(not(feature = "cluster"), allow(unused_mut))]
663            let mut source = self
664                .connector_registry
665                .create_source(&config, prom_registry.as_deref())
666                .map_err(|e| {
667                    DbError::Connector(format!(
668                        "Cannot create source '{}' (type '{}'): {e}",
669                        name,
670                        config.connector_type()
671                    ))
672                })?;
673            // Cluster mode: hand the source the vnode topology so a partitioned
674            // source (Kafka) consumes only the partitions this node owns and
675            // re-binds on rotation. No-op for non-partitioned sources.
676            #[cfg(feature = "cluster")]
677            if let (Some(registry), Some(self_id)) = (
678                self.vnode_registry.lock().clone(),
679                self.cluster_controller
680                    .lock()
681                    .as_ref()
682                    .map(|c| laminar_core::state::NodeId(c.instance_id().0)),
683            ) {
684                source.set_vnode_assignment(registry, self_id);
685            }
686            let supports_replay = source.supports_replay();
687            if !supports_replay {
688                tracing::warn!(
689                    source = %name,
690                    "source does not support replay — exactly-once semantics \
691                     are degraded to at-most-once for this source"
692                );
693            }
694            // Property-driven watermark wiring. `[source.watermark]` in
695            // TOML is the preferred path (honours max_out_of_orderness);
696            // this exists for sources that pass `event.time.column` /
697            // `event.time.field` as a connector property instead. The
698            // second pass below builds the matching SourceWatermarkState.
699            // Kafka uses `column`, WebSocket uses `field` — both spellings
700            // are live.
701            if let Some(entry) = self.catalog.get_source(name) {
702                if entry.source.event_time_column().is_none() {
703                    if let Some(col) = config.get("event.time.column") {
704                        entry.source.set_event_time_column(col);
705                    } else if let Some(col) = config.get("event.time.field") {
706                        entry.source.set_event_time_column(col);
707                    }
708                }
709                // Carry the out-of-orderness bound alongside the column so
710                // the fallback watermark path below uses the configured
711                // tolerance instead of Duration::ZERO.
712                if let Some(ms_str) = config.get("max.out.of.orderness.ms") {
713                    match ms_str.parse::<u64>() {
714                        Ok(ms) => {
715                            entry
716                                .source
717                                .set_max_out_of_orderness(std::time::Duration::from_millis(ms));
718                        }
719                        Err(e) => {
720                            tracing::warn!(
721                                source = %name,
722                                value = %ms_str,
723                                error = %e,
724                                "ignoring unparseable max.out.of.orderness.ms — \
725                                 watermark will use Duration::ZERO"
726                            );
727                        }
728                    }
729                }
730            }
731
732            sources.push(SourceRegistration {
733                name: name.clone(),
734                connector: source,
735                config,
736                supports_replay,
737                restore_checkpoint: None, // Set after recovery below
738            });
739        }
740
741        // Bridge connector-less sources into the pipeline so db.insert()
742        // data flows through the standard source task → coordinator path.
743        // This covers two cases:
744        //   1. Sources in source_regs with connector_type == None (registered
745        //      in connector manager but without a FROM clause).
746        //   2. Sources in the catalog but NOT in source_regs at all (pure
747        //      embedded sources created without any connector specification).
748        let bridged_names: rustc_hash::FxHashSet<String> =
749            sources.iter().map(|s| s.name.clone()).collect();
750        // First: bridge sources in source_regs that have no connector.
751        for (name, reg) in &source_regs {
752            if reg.connector_type.is_some() {
753                continue; // Already created as external connector above
754            }
755            if let Some(entry) = self.catalog.get_source(name) {
756                let subscription = entry.sink.subscribe();
757                let connector = crate::catalog_connector::CatalogSourceConnector::new(
758                    subscription,
759                    entry.schema.clone(),
760                    entry.data_notify(),
761                );
762                sources.push(SourceRegistration {
763                    name: name.clone(),
764                    connector: Box::new(connector),
765                    config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
766                    supports_replay: false,
767                    restore_checkpoint: None,
768                });
769            }
770        }
771        // Second: bridge catalog sources not in source_regs (embedded-only
772        // sources that were never registered with the connector manager).
773        for name in self.catalog.list_sources() {
774            if bridged_names.contains(&name) || source_regs.contains_key(&name) {
775                continue;
776            }
777            if let Some(entry) = self.catalog.get_source(&name) {
778                graph.register_source_schema(name.clone(), entry.schema.clone());
779                let subscription = entry.sink.subscribe();
780                let connector = crate::catalog_connector::CatalogSourceConnector::new(
781                    subscription,
782                    entry.schema.clone(),
783                    entry.data_notify(),
784                );
785                sources.push(SourceRegistration {
786                    name: name.clone(),
787                    connector: Box::new(connector),
788                    config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
789                    supports_replay: false,
790                    restore_checkpoint: None,
791                });
792            }
793        }
794
795        // Resolve each stream's output schema with DataFusion so sinks
796        // downstream see it via `_arrow_schema` (mirrors the source path
797        // above). Publish the result into `LaminarDB::stream_schemas` so
798        // SUBSCRIBE WHERE can compile predicates against the real schema.
799        let stream_output_schemas = resolve_stream_output_schemas(&self.ctx, &stream_regs).await?;
800        {
801            let mut schemas = self.stream_schemas.write();
802            schemas.clear();
803            schemas.extend(
804                stream_output_schemas
805                    .iter()
806                    .map(|(k, v)| (k.clone(), Arc::clone(v))),
807            );
808        }
809
810        // Build sinks. Each runs in its own tokio task with a bounded
811        // command channel and shares one event channel back to the
812        // pipeline callback.
813        let (sink_event_tx, sink_event_rx) =
814            laminar_core::streaming::channel::channel::<crate::sink_task::SinkEvent>(
815                crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY,
816            );
817        #[allow(clippy::type_complexity)]
818        let mut sinks: Vec<(
819            String,
820            crate::sink_task::SinkTaskHandle,
821            Option<String>,
822            String, // input stream name (FROM clause target)
823            bool,   // changelog-capable
824        )> = Vec::new();
825        for (name, reg) in &sink_regs {
826            if reg.connector_type.is_none() {
827                continue;
828            }
829            let mut config = build_sink_config(reg)?;
830            // Resolve the upstream schema from a stream first, then fall back
831            // to a source — `CREATE SINK ... FROM <name>` accepts either.
832            let upstream_schema = stream_output_schemas.get(&reg.input).cloned().or_else(|| {
833                self.catalog
834                    .get_source(&reg.input)
835                    .map(|e| e.schema.clone())
836            });
837            if let Some(schema) = upstream_schema {
838                let schema_str = crate::pipeline_callback::encode_arrow_schema(&schema);
839                config.set("_arrow_schema".to_string(), schema_str);
840            }
841            let mut sink = self
842                .connector_registry
843                .create_sink(&config, prom_registry.as_deref())
844                .map_err(|e| {
845                    DbError::Connector(format!(
846                        "Cannot create sink '{}' (type '{}'): {e}",
847                        name,
848                        config.connector_type()
849                    ))
850                })?;
851            // Open the connector before handing it to the task.
852            sink.open(&config)
853                .await
854                .map_err(|e| DbError::Connector(format!("Failed to open sink '{name}': {e}")))?;
855            let caps = sink.capabilities();
856            // Resolve per-sink write timeout: user override (property)
857            // takes precedence over the sink's declared suggestion.
858            let write_timeout =
859                match config
860                    .get_parsed::<u64>("sink.write.timeout.ms")
861                    .map_err(|e| {
862                        DbError::Connector(format!(
863                            "Invalid 'sink.write.timeout.ms' for sink '{name}': {e}"
864                        ))
865                    })? {
866                    Some(ms) => std::time::Duration::from_millis(ms),
867                    None => caps.suggested_write_timeout,
868                };
869            if write_timeout.is_zero() {
870                return Err(DbError::Connector(format!(
871                    "sink '{name}': write_timeout must be > 0 \
872                     (check 'sink.write.timeout.ms' or the sink's \
873                     suggested_write_timeout)"
874                )));
875            }
876            let sink_id: std::sync::Arc<str> = std::sync::Arc::from(name.as_str());
877            let handle =
878                crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
879                    name: name.clone(),
880                    sink_id,
881                    connector: sink,
882                    exactly_once: caps.exactly_once,
883                    channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
884                    flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
885                    write_timeout,
886                    event_tx: sink_event_tx.clone(),
887                });
888            sinks.push((
889                name.clone(),
890                handle,
891                reg.filter_expr.clone(),
892                reg.input.clone(),
893                caps.changelog,
894            ));
895        }
896        // Drop the local sender so the channel disconnects when all
897        // sink tasks exit.
898        drop(sink_event_tx);
899
900        // Build table sources from registrations
901        let mut table_sources: Vec<(String, Box<dyn ReferenceTableSource>, RefreshMode)> =
902            Vec::new();
903        for (name, reg) in &table_regs {
904            if reg.connector_type.is_none() {
905                continue;
906            }
907            let config = build_table_config(reg)?;
908            let source = self
909                .connector_registry
910                .create_table_source(&config)
911                .map_err(|e| {
912                    DbError::Connector(format!("Cannot create table source '{name}': {e}"))
913                })?;
914            let mode = reg.refresh.clone().unwrap_or(RefreshMode::SnapshotPlusCdc);
915            table_sources.push((name.clone(), source, mode));
916        }
917
918        // Register sinks with the checkpoint coordinator.
919        // Sources are owned by the TPC runtime — checkpoint reads go
920        // through lock-free watch channels instead.
921        {
922            let mut guard = self.coordinator.lock().await;
923            if let Some(ref mut coord) = *guard {
924                for (name, handle, _, _, _) in &sinks {
925                    let exactly_once = handle.exactly_once();
926                    coord.register_sink(name.clone(), handle.clone(), exactly_once);
927                }
928            }
929        }
930
931        // Recovery: restore sink/table state via unified coordinator.
932        // Must run BEFORE begin_initial_epoch so the coordinator's epoch
933        // reflects the recovered state. We also hoist the recovered
934        // per-source watermarks out of the coordinator lock so the later
935        // watermark-state construction can seed each generator + the
936        // combined tracker. Without this, generators restart at
937        // `i64::MIN` while source offsets resume mid-stream — windowed
938        // operators re-fire and late-drop diverges from the pre-crash
939        // run, breaking deterministic recovery for every event-time
940        // pipeline.
941        let mut recovered_source_wms: rustc_hash::FxHashMap<String, i64> =
942            rustc_hash::FxHashMap::default();
943        {
944            let mut guard = self.coordinator.lock().await;
945            if let Some(ref mut coord) = *guard {
946                match coord.recover().await {
947                    Ok(Some(recovered)) => {
948                        recovered_source_wms = recovered
949                            .manifest
950                            .source_watermarks
951                            .iter()
952                            .filter(|(_, &wm)| wm != i64::MIN)
953                            .map(|(name, &wm)| (name.clone(), wm))
954                            .collect();
955                        for (name, source, _) in &mut table_sources {
956                            if let Some(cp) = recovered.manifest.table_offsets.get(name) {
957                                let restored =
958                                    crate::checkpoint_coordinator::connector_to_source_checkpoint(
959                                        cp,
960                                    );
961                                if let Err(e) = source.restore(&restored).await {
962                                    tracing::warn!(
963                                        table=%name, error=%e,
964                                        "Table source restore failed"
965                                    );
966                                }
967                            }
968                        }
969                        // Attach recovered source offsets to SourceRegistrations.
970                        // These will be passed to the TPC source adapters, which
971                        // call connector.restore() after open() to seek Kafka
972                        // consumers to their checkpoint positions.
973                        for src in &mut sources {
974                            if !src.supports_replay {
975                                continue;
976                            }
977                            if let Some(cp) = recovered.manifest.source_offsets.get(&src.name) {
978                                let restored =
979                                    crate::checkpoint_coordinator::connector_to_source_checkpoint(
980                                        cp,
981                                    );
982                                tracing::info!(
983                                    source = %src.name,
984                                    offsets = cp.offsets.len(),
985                                    "attaching checkpoint offsets for source recovery"
986                                );
987                                src.restore_checkpoint = Some(restored);
988                            }
989                        }
990                        let mut graph_restore_failed = false;
991                        let op_keys: Vec<&String> =
992                            recovered.manifest.operator_states.keys().collect();
993                        let instance_hint = {
994                            #[cfg(feature = "cluster")]
995                            {
996                                self.cluster_controller
997                                    .lock()
998                                    .as_ref()
999                                    .map_or(0, |c| c.instance_id().0)
1000                            }
1001                            #[cfg(not(feature = "cluster"))]
1002                            {
1003                                0u64
1004                            }
1005                        };
1006                        tracing::info!(
1007                            instance = instance_hint,
1008                            count = op_keys.len(),
1009                            keys = ?op_keys,
1010                            "manifest operator_states summary"
1011                        );
1012                        if let Some(op) = recovered.manifest.operator_states.get("operator_graph") {
1013                            if let Some(bytes) = op.decode_inline() {
1014                                match graph.restore_from_bytes(&bytes) {
1015                                    Ok(n) => {
1016                                        tracing::info!(
1017                                            queries = n,
1018                                            "Restored operator graph state from checkpoint"
1019                                        );
1020                                    }
1021                                    Err(e) => {
1022                                        // Source offsets were already staged
1023                                        // for the connectors above. Resuming
1024                                        // with empty operator state would
1025                                        // silently lose every in-flight
1026                                        // window / aggregator. Fail loud
1027                                        // instead — the operator can drop
1028                                        // the checkpoint and restart fresh
1029                                        // explicitly if that's the intent.
1030                                        return Err(DbError::Checkpoint(format!(
1031                                            "[LDB-6029] operator graph restore failed: \
1032                                             {e} — refusing to start with checkpointed \
1033                                             source offsets and empty operator state"
1034                                        )));
1035                                    }
1036                                }
1037                            } else {
1038                                tracing::warn!(
1039                                    "manifest has 'operator_graph' but decode_inline returned None"
1040                                );
1041                            }
1042                        } else if recovered
1043                            .manifest
1044                            .operator_states
1045                            .contains_key("stream_executor")
1046                        {
1047                            graph_restore_failed = true;
1048                            tracing::warn!(
1049                                "Found old stream_executor checkpoint format; \
1050                                 skipping restore (clean break). Starting fresh."
1051                            );
1052                        }
1053
1054                        // Skip MV restore when operator state failed to load —
1055                        // stale MV data with fresh operators is inconsistent.
1056                        // No operator state at all (stateless pipeline) is fine.
1057                        if !graph_restore_failed {
1058                            let prefix = crate::mv_store::CHECKPOINT_KEY_PREFIX;
1059                            let mut store = self.mv_store.write();
1060                            let mut restored = 0usize;
1061                            for (key, op) in &recovered.manifest.operator_states {
1062                                if let Some(name) = key.strip_prefix(prefix) {
1063                                    if let Some(bytes) = op.decode_inline() {
1064                                        match store.restore_from_ipc(name, &bytes) {
1065                                            Ok(true) => restored += 1,
1066                                            Ok(false) => {} // MV no longer registered
1067                                            Err(e) => {
1068                                                tracing::warn!(mv = name, error = %e, "MV restore failed");
1069                                            }
1070                                        }
1071                                    }
1072                                }
1073                            }
1074                            if restored > 0 {
1075                                tracing::info!(mvs = restored, "Restored MV state from checkpoint");
1076                            }
1077                        }
1078                        tracing::info!(
1079                            epoch = recovered.epoch(),
1080                            sources_restored = recovered.sources_restored,
1081                            sinks_rolled_back = recovered.sinks_rolled_back,
1082                            "Recovered from unified checkpoint"
1083                        );
1084                    }
1085                    Ok(None) => {
1086                        tracing::info!("No checkpoint found, starting fresh");
1087                    }
1088                    Err(e) => {
1089                        tracing::warn!(error = %e, "Checkpoint recovery failed, starting fresh");
1090                    }
1091                }
1092            }
1093        }
1094
1095        // Begin the initial epoch on exactly-once sinks AFTER recovery
1096        // so the coordinator's epoch reflects the recovered checkpoint.
1097        {
1098            let guard = self.coordinator.lock().await;
1099            if let Some(ref coord) = *guard {
1100                coord.begin_initial_epoch().await?;
1101            }
1102        }
1103
1104        // Snapshot phase: populate tables before stream processing begins
1105        for (name, source, mode) in &mut table_sources {
1106            if matches!(mode, RefreshMode::Manual) {
1107                continue;
1108            }
1109            while let Some(batch) = source
1110                .poll_snapshot()
1111                .await
1112                .map_err(|e| DbError::Connector(format!("Table '{name}' snapshot error: {e}")))?
1113            {
1114                self.table_store
1115                    .write()
1116                    .upsert(name, &batch)
1117                    .map_err(|e| DbError::Connector(format!("Table '{name}' upsert error: {e}")))?;
1118            }
1119            self.sync_table_to_datafusion(name)?;
1120            {
1121                let mut ts = self.table_store.write();
1122                ts.rebuild_xor_filter(name);
1123                ts.set_ready(name, true);
1124            }
1125            // Update lookup registry so join queries see fresh data.
1126            // Skip if already registered as Versioned (temporal join tables
1127            // must keep their version history, not be overwritten as Snapshot).
1128            if matches!(
1129                self.lookup_registry.get_entry(name),
1130                Some(laminar_sql::datafusion::RegisteredLookup::Versioned(_))
1131            ) {
1132                // Already versioned — don't downgrade to Snapshot.
1133            } else if let Some(batch) = self.table_store.read().to_record_batch(name) {
1134                self.lookup_registry
1135                    .register(name, laminar_sql::datafusion::LookupSnapshot { batch });
1136            }
1137        }
1138
1139        // On-demand (Manual) tables: register as Partial in the lookup
1140        // registry so lookup joins use the lookup cache + source fallback path,
1141        // then promote to SnapshotPlusCdc so poll_tables() calls poll_changes().
1142        for (name, _source, mode) in &mut table_sources {
1143            if !matches!(mode, RefreshMode::Manual) {
1144                continue;
1145            }
1146            let Some(reg) = table_regs.get(name.as_str()) else {
1147                continue;
1148            };
1149            // 64 MiB default budget; the cache is byte-weighted, not entry-counted.
1150            let capacity_bytes = reg.cache_max_bytes.unwrap_or(64 * 1024 * 1024);
1151            let Some(schema) = self.table_store.read().table_schema(name) else {
1152                continue;
1153            };
1154            let pk_csv = &reg.primary_key;
1155            let pk_cols: Vec<String> = pk_csv
1156                .split(',')
1157                .map(|s| s.trim().to_string())
1158                .filter(|s| !s.is_empty())
1159                .collect();
1160            let key_sort_fields: Vec<arrow::row::SortField> = pk_cols
1161                .iter()
1162                .filter_map(|col| {
1163                    schema
1164                        .field_with_name(col)
1165                        .ok()
1166                        .map(|f| arrow::row::SortField::new(f.data_type().clone()))
1167                })
1168                .collect();
1169
1170            let cache = Arc::new(laminar_core::lookup::lookup_cache::LookupMemoryCache::new(
1171                0,
1172                laminar_core::lookup::lookup_cache::LookupMemoryCacheConfig {
1173                    capacity_bytes,
1174                    ttl: reg.cache_ttl,
1175                },
1176            ));
1177            // Try to create a lookup source for cache-miss fallback via
1178            // the registry factory (no cross-crate type dependency).
1179            let lookup_source = if let Ok(mut config) = build_table_config(reg) {
1180                config.set("_primary_key_columns", pk_csv.as_str());
1181                match self
1182                    .connector_registry
1183                    .create_lookup_source(config, Some(Arc::clone(&schema)))
1184                    .await
1185                {
1186                    Some(Ok(src)) => Some(src),
1187                    Some(Err(e)) => {
1188                        tracing::warn!(
1189                            table = %name, error = %e,
1190                            "lookup source creation failed; cache-only mode"
1191                        );
1192                        None
1193                    }
1194                    None => None,
1195                }
1196            } else {
1197                None
1198            };
1199
1200            // Projection pushdown: fetch only the columns any query references
1201            // (a per-table superset, since the cache is shared), plus the key.
1202            let projection = crate::sql_analysis::compute_lookup_projection(
1203                &schema,
1204                &pk_cols,
1205                name.as_str(),
1206                stream_regs.values().map(|r| r.query_sql.as_str()),
1207            );
1208
1209            self.lookup_registry.register_partial(
1210                name,
1211                laminar_sql::datafusion::PartialLookupState {
1212                    lookup_cache: cache,
1213                    schema,
1214                    key_columns: pk_cols,
1215                    key_sort_fields,
1216                    source: lookup_source,
1217                    fetch_semaphore: Arc::new(tokio::sync::Semaphore::new(16)),
1218                    projection,
1219                },
1220            );
1221            *mode = RefreshMode::SnapshotPlusCdc;
1222            tracing::info!(
1223                table = %name,
1224                capacity_bytes,
1225                ttl = ?reg.cache_ttl,
1226                pk = %pk_csv,
1227                "registered on-demand lookup table (partial cache)"
1228            );
1229        }
1230
1231        // Get stream source handles so results also flow to db.subscribe().
1232        let mut stream_sources: Vec<(String, streaming::Source<crate::catalog::ArrowRecord>)> =
1233            Vec::new();
1234        for reg in stream_regs.values() {
1235            if let Some(src) = self.catalog.get_stream_source(&reg.name) {
1236                stream_sources.push((reg.name.clone(), src));
1237            }
1238        }
1239
1240        // Per-source watermark state. Future-skew ceiling;
1241        // `LAMINAR_MAX_FUTURE_SKEW_MS=0` disables it (legacy unbounded).
1242        let future_skew_ms = match std::env::var("LAMINAR_MAX_FUTURE_SKEW_MS") {
1243            Ok(v) => v.parse::<i64>().unwrap_or_else(|_| {
1244                tracing::warn!(
1245                    value = %v,
1246                    "invalid LAMINAR_MAX_FUTURE_SKEW_MS (expected an integer); \
1247                     using the default"
1248                );
1249                laminar_core::time::DEFAULT_MAX_FUTURE_SKEW_MS
1250            }),
1251            Err(_) => laminar_core::time::DEFAULT_MAX_FUTURE_SKEW_MS,
1252        };
1253        let source_names = self.catalog.list_sources();
1254        let mut watermark_states: FxHashMap<String, SourceWatermarkState> =
1255            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1256        let mut source_entries_for_wm: FxHashMap<String, Arc<crate::catalog::SourceEntry>> =
1257            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1258        let mut source_ids: FxHashMap<String, usize> =
1259            FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1260        for name in source_names {
1261            if let Some(entry) = self.catalog.get_source(&name) {
1262                if let (Some(col), Some(dur)) =
1263                    (&entry.watermark_column, entry.max_out_of_orderness)
1264                {
1265                    let extractor = laminar_core::time::EventTimeExtractor::from_column(col)
1266                        .with_mode(laminar_core::time::ExtractionMode::Max);
1267                    let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1268                        .is_processing_time
1269                        .load(std::sync::atomic::Ordering::Relaxed)
1270                    {
1271                        Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1272                    } else {
1273                        Box::new(
1274                            laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(dur)
1275                                .with_max_future_skew(future_skew_ms),
1276                        )
1277                    };
1278                    let id = source_ids.len();
1279                    source_ids.insert(name.clone(), id);
1280                    watermark_states.insert(
1281                        name.clone(),
1282                        SourceWatermarkState {
1283                            extractor,
1284                            generator,
1285                            column: col.clone(),
1286                        },
1287                    );
1288                }
1289                source_entries_for_wm.insert(name, entry);
1290            }
1291        }
1292
1293        // Fallback watermark path for sources that set `event_time_column`
1294        // without an SQL `WATERMARK FOR` clause — exercised by the
1295        // programmatic API (`handle.set_event_time_column`) and by the
1296        // connector-property wiring above. Uses the bound from
1297        // `source.max_out_of_orderness()` if one was wired from connector
1298        // properties (e.g. Kafka `max.out.of.orderness.ms`), otherwise
1299        // falls back to `Duration::ZERO`.
1300        for name in self.catalog.list_sources() {
1301            if watermark_states.contains_key(&name) {
1302                continue;
1303            }
1304            if let Some(entry) = self.catalog.get_source(&name) {
1305                if let Some(col) = entry.source.event_time_column() {
1306                    let extractor = laminar_core::time::EventTimeExtractor::from_column(&col)
1307                        .with_mode(laminar_core::time::ExtractionMode::Max);
1308                    let ooo_bound = entry
1309                        .source
1310                        .max_out_of_orderness()
1311                        .unwrap_or(std::time::Duration::ZERO);
1312                    let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1313                        .is_processing_time
1314                        .load(std::sync::atomic::Ordering::Relaxed)
1315                    {
1316                        Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1317                    } else {
1318                        Box::new(
1319                            laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(
1320                                ooo_bound,
1321                            )
1322                            .with_max_future_skew(future_skew_ms),
1323                        )
1324                    };
1325                    let id = source_ids.len();
1326                    source_ids.insert(name.clone(), id);
1327                    watermark_states.insert(
1328                        name.clone(),
1329                        SourceWatermarkState {
1330                            extractor,
1331                            generator,
1332                            column: col,
1333                        },
1334                    );
1335                }
1336            }
1337        }
1338
1339        // Idle-source detection off by default (opt in via
1340        // LAMINAR_SOURCE_IDLE_TIMEOUT_MS > 0, applied to all sources;
1341        // unset/0/invalid ⇒ disabled). Tracker is per-source capable.
1342        let idle_timeout_ms: Option<u64> = match std::env::var("LAMINAR_SOURCE_IDLE_TIMEOUT_MS") {
1343            Ok(v) => match v.parse::<u64>() {
1344                Ok(0) => None,
1345                Ok(ms) => Some(ms),
1346                Err(_) => {
1347                    tracing::warn!(
1348                        value = %v,
1349                        "invalid LAMINAR_SOURCE_IDLE_TIMEOUT_MS (expected a non-negative \
1350                         integer); idle-source detection disabled"
1351                    );
1352                    None
1353                }
1354            },
1355            Err(_) => None,
1356        };
1357        let mut tracker = if source_ids.is_empty() {
1358            None
1359        } else {
1360            let mut t = laminar_core::time::WatermarkTracker::new(source_ids.len());
1361            if let Some(ms) = idle_timeout_ms {
1362                let d = std::time::Duration::from_millis(ms);
1363                for id in 0..source_ids.len() {
1364                    t.set_idle_timeout(id, Some(d));
1365                }
1366            }
1367            Some(t)
1368        };
1369
1370        // A pipeline with some watermarked and some un-watermarked sources
1371        // is a semantic hazard: an un-watermarked source's per-stream
1372        // watermark falls back to the global, so a join/window over both
1373        // closes on the *watermarked* source's clock. Surface this so it
1374        // isn't an invisible source of "where did my late rows go?"
1375        // bug reports. The real fix needs planner-level rejection.
1376        let registered = self.catalog.list_sources();
1377        let unwatermarked: Vec<&str> = registered
1378            .iter()
1379            .filter(|n| !source_ids.contains_key(*n))
1380            .map(String::as_str)
1381            .collect();
1382        if !source_ids.is_empty() && !unwatermarked.is_empty() {
1383            tracing::warn!(
1384                watermarked = source_ids.len(),
1385                unwatermarked = unwatermarked.len(),
1386                unwatermarked_names = ?unwatermarked,
1387                "Pipeline mixes watermarked and un-watermarked sources. An un-watermarked \
1388                 source in a join/window inherits the global watermark — time-based \
1389                 operators may behave unexpectedly. Add `WATERMARK FOR` to the missing \
1390                 sources or split into separate pipelines."
1391            );
1392        }
1393
1394        // Seed every restored generator and the combined tracker with the
1395        // watermark captured at checkpoint time. Anything not in the
1396        // recovered map (or `i64::MIN`) starts cold, which is correct for
1397        // first-run / fresh sources.
1398        if !recovered_source_wms.is_empty() {
1399            let mut combined = i64::MIN;
1400            for (name, wm) in &recovered_source_wms {
1401                if let Some(state) = watermark_states.get_mut(name) {
1402                    let _ = state.generator.advance_watermark(*wm);
1403                }
1404                if let (Some(t), Some(&id)) = (tracker.as_mut(), source_ids.get(name)) {
1405                    if let Some(global) = t.update_source(id, *wm) {
1406                        combined = combined.max(global.timestamp());
1407                    }
1408                }
1409            }
1410            if combined != i64::MIN {
1411                self.pipeline_watermark
1412                    .store(combined, std::sync::atomic::Ordering::SeqCst);
1413                tracing::info!(
1414                    sources = recovered_source_wms.len(),
1415                    pipeline_watermark = combined,
1416                    "Restored watermarks from checkpoint"
1417                );
1418            }
1419        }
1420
1421        let max_poll = self.config.default_buffer_size.min(1024);
1422        let checkpoint_interval = self
1423            .config
1424            .checkpoint
1425            .as_ref()
1426            .and_then(|c| c.interval_ms)
1427            .map(std::time::Duration::from_millis);
1428
1429        tracing::info!(
1430            sources = sources.len(),
1431            sinks = sinks.len(),
1432            streams = stream_regs.len(),
1433            subscriptions = stream_sources.len(),
1434            watermark_sources = source_ids.len(),
1435            "Starting event-driven connector pipeline"
1436        );
1437
1438        // Build pipeline config.
1439        // Embedded mode (no external connectors): zero batch window for
1440        // minimal latency — data is processed as soon as it arrives.
1441        // Connector mode: 5ms batch window amortizes SQL overhead across
1442        // high-throughput external sources (Kafka, CDC).
1443        let drain_budget_ns = self.config.pipeline_drain_budget_ns.unwrap_or(1_000_000);
1444        let query_budget_ns = self.config.pipeline_query_budget_ns.unwrap_or(8_000_000);
1445        let pipeline_config = PipelineConfig {
1446            max_poll_records: max_poll,
1447            channel_capacity: self.config.pipeline_channel_capacity.unwrap_or(64),
1448            fallback_poll_interval: if has_external {
1449                std::time::Duration::from_millis(10)
1450            } else {
1451                std::time::Duration::from_millis(1)
1452            },
1453            checkpoint_interval,
1454            batch_window: self
1455                .config
1456                .pipeline_batch_window
1457                .unwrap_or(if has_external {
1458                    std::time::Duration::from_millis(5)
1459                } else {
1460                    std::time::Duration::ZERO
1461                }),
1462            barrier_alignment_timeout: self
1463                .config
1464                .checkpoint
1465                .as_ref()
1466                .and_then(|c| c.alignment_timeout_ms)
1467                .map_or(
1468                    std::time::Duration::from_secs(30),
1469                    std::time::Duration::from_millis,
1470                ),
1471            delivery_guarantee: self.config.delivery_guarantee,
1472            // cycle_budget is a soft cap for logging; ensure it's at least
1473            // drain + query so sub-budgets can actually be used.
1474            cycle_budget_ns: 10_000_000_u64.max(drain_budget_ns + query_budget_ns),
1475            drain_budget_ns,
1476            query_budget_ns,
1477            background_budget_ns: 5_000_000, // 5ms
1478            max_input_buf_batches: self.config.pipeline_max_input_buf_batches.unwrap_or(256),
1479            max_input_buf_bytes: self.config.pipeline_max_input_buf_bytes,
1480            backpressure_policy: self.config.pipeline_backpressure_policy,
1481        };
1482
1483        // Validate delivery guarantee constraints.
1484        {
1485            use laminar_connectors::connector::DeliveryGuarantee;
1486
1487            if pipeline_config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1488                for src in &sources {
1489                    if !src.supports_replay {
1490                        return Err(DbError::Config(format!(
1491                            "[LDB-5030] exactly-once requires all sources to support replay, \
1492                             but source '{}' does not. Use at-least-once or remove this source.",
1493                            src.name
1494                        )));
1495                    }
1496                }
1497                for (name, handle, _, _, _) in &sinks {
1498                    if !handle.exactly_once() {
1499                        return Err(DbError::Config(format!(
1500                            "[LDB-5031] exactly-once requires all sinks to support \
1501                             exactly-once semantics, but sink '{name}' does not. \
1502                             Use at-least-once or configure a transactional sink."
1503                        )));
1504                    }
1505                }
1506                if pipeline_config.checkpoint_interval.is_none() {
1507                    return Err(DbError::Config(
1508                        "[LDB-5032] exactly-once requires checkpointing to be enabled. \
1509                         Set checkpoint.interval.ms in the pipeline configuration."
1510                            .into(),
1511                    ));
1512                }
1513            } else if pipeline_config.delivery_guarantee == DeliveryGuarantee::AtLeastOnce {
1514                let has_non_replayable = sources.iter().any(|s| !s.supports_replay);
1515                let has_eo_sink = sinks.iter().any(|(_, h, _, _, _)| h.exactly_once());
1516                if has_non_replayable && has_eo_sink {
1517                    tracing::warn!(
1518                        "[LDB-5033] pipeline has exactly-once sinks but some sources \
1519                         do not support replay — effective guarantee is at-most-once \
1520                         for events from non-replayable sources"
1521                    );
1522                }
1523            }
1524        }
1525
1526        let shutdown = self.shutdown_signal.clone();
1527
1528        // Build the PipelineCallback implementation that bridges to db.rs internals.
1529        let pipeline_watermark = Arc::clone(&self.pipeline_watermark);
1530        let coordinator = Arc::clone(&self.coordinator);
1531        let table_store_for_loop = self.table_store.clone();
1532        // Compute a pipeline hash for change detection across checkpoints.
1533        let pipeline_hash = {
1534            use std::hash::{Hash, Hasher};
1535            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1536            for reg in stream_regs.values() {
1537                reg.name.hash(&mut hasher);
1538                reg.query_sql.hash(&mut hasher);
1539            }
1540            for name in source_regs.keys() {
1541                name.hash(&mut hasher);
1542            }
1543            for name in sink_regs.keys() {
1544                name.hash(&mut hasher);
1545            }
1546            Some(hasher.finish())
1547        };
1548
1549        // Wire per-query budget and input buffer cap from pipeline config.
1550        graph.set_query_budget_ns(pipeline_config.query_budget_ns);
1551        graph.set_max_input_buf_batches(pipeline_config.max_input_buf_batches);
1552        graph.set_max_input_buf_bytes(pipeline_config.max_input_buf_bytes);
1553        graph.set_backpressure_policy(pipeline_config.backpressure_policy);
1554
1555        let sinks_pending_filter_count = sinks
1556            .iter()
1557            .filter(|(_, _, filter_sql, _, _)| filter_sql.is_some())
1558            .count();
1559
1560        let source_name_arcs: rustc_hash::FxHashMap<usize, Arc<str>> = source_ids
1561            .iter()
1562            .map(|(name, &sid)| (sid, Arc::<str>::from(name.as_str())))
1563            .collect();
1564        let source_wms_buf = rustc_hash::FxHashMap::with_capacity_and_hasher(
1565            source_name_arcs.len(),
1566            rustc_hash::FxBuildHasher,
1567        );
1568
1569        let prom = self
1570            .engine_metrics
1571            .lock()
1572            .clone()
1573            .expect("EngineMetrics must be set before start()");
1574
1575        // Force-checkpoint channel: `db.checkpoint()` on the caller side
1576        // hands a oneshot sender across to the callback, which captures
1577        // operator state and replies. Installed here so both ends exist
1578        // before the compute thread spawns. Crossfire for consistency
1579        // with the rest of the pipeline plumbing (sink_task, coordinator).
1580        let (force_ckpt_tx, force_ckpt_rx) = crossfire::mpsc::bounded_async::<
1581            crate::db::ForceCheckpointReply,
1582        >(crate::db::FORCE_CHECKPOINT_CHANNEL_CAPACITY);
1583        *self.force_ckpt_tx.lock() = Some(force_ckpt_tx);
1584
1585        let (checkpoint_complete_tx, checkpoint_complete_rx) = crossfire::mpsc::bounded_async::<(
1586            u64,
1587            rustc_hash::FxHashMap<String, laminar_connectors::checkpoint::SourceCheckpoint>,
1588        )>(16);
1589        // Shared admission state: the callback claims a slot (and staged
1590        // bytes) per in-flight epoch; the streaming coordinator gates new
1591        // barriers on the caps. A single-open-transaction sink cannot
1592        // overlap epochs, so depth is capped at 1 whenever ANY registered
1593        // sink is exactly-once — not just when the pipeline-level
1594        // guarantee says so. (DDL/server-configured sinks declare
1595        // exactly-once via connector capabilities without ever setting
1596        // the pipeline-level guarantee; keying off the pipeline config
1597        // alone would pipeline epochs over an open Kafka transaction.)
1598        let has_exactly_once_sink = sinks
1599            .iter()
1600            .any(|(_, handle, _, _, _)| handle.exactly_once());
1601        let checkpoint_in_flight = Arc::new(std::sync::atomic::AtomicU64::new(0));
1602        let staged_bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
1603        let (epoch_allocator, ckpt_quorum_timeout, max_in_flight_epochs, max_staged_bytes) = {
1604            let guard = coordinator.lock().await;
1605            match guard.as_ref() {
1606                Some(coord) => {
1607                    let cfg = coord.config();
1608                    let depth = if has_exactly_once_sink
1609                        || pipeline_config.delivery_guarantee
1610                            == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
1611                    {
1612                        1
1613                    } else {
1614                        cfg.max_in_flight_epochs.max(1)
1615                    };
1616                    (
1617                        Some(coord.epoch_allocator()),
1618                        cfg.quorum_timeout,
1619                        depth,
1620                        // 0 would pause admission permanently.
1621                        cfg.max_staged_bytes.max(1),
1622                    )
1623                }
1624                None => (None, std::time::Duration::from_secs(3), 1, u64::MAX),
1625            }
1626        };
1627        #[cfg(not(feature = "cluster"))]
1628        let _ = ckpt_quorum_timeout;
1629
1630        let static_stream_names: rustc_hash::FxHashSet<Arc<str>> = stream_sources
1631            .iter()
1632            .map(|(name, _)| Arc::from(name.as_str()))
1633            .collect();
1634
1635        let callback = crate::pipeline_callback::ConnectorPipelineCallback {
1636            graph,
1637            stream_sources,
1638            sinks,
1639            watermark_states,
1640            source_entries_for_wm,
1641            source_ids,
1642            source_name_arcs,
1643            source_wms_buf,
1644            tracker,
1645            prom,
1646            pipeline_watermark,
1647            coordinator,
1648            table_sources,
1649            table_store: table_store_for_loop,
1650            mv_store_has_any: self.mv_store.read().has_any_handle(),
1651            mv_store: self.mv_store.clone(),
1652            lookup_registry: Arc::clone(&self.lookup_registry),
1653            filter_ctx: laminar_sql::create_session_context(),
1654            compiled_sink_filters: Vec::new(),
1655            pending_sink_filter_compiles: sinks_pending_filter_count,
1656            last_checkpoint: std::time::Instant::now(),
1657            checkpoint_interval: self
1658                .config
1659                .checkpoint
1660                .as_ref()
1661                .and_then(|c| c.interval_ms)
1662                .map(std::time::Duration::from_millis),
1663            pipeline_hash,
1664            delivery_guarantee: pipeline_config.delivery_guarantee,
1665            serialization_timeout: std::time::Duration::from_secs(120),
1666            sink_event_rx,
1667            sink_timed_out: false,
1668            shutdown_signal: Arc::clone(&self.shutdown_signal),
1669            #[cfg(feature = "cluster")]
1670            cluster_controller: self.cluster_controller.lock().clone(),
1671            #[cfg(feature = "cluster")]
1672            follower_tail: Arc::default(),
1673            #[cfg(feature = "cluster")]
1674            barrier_injectors: Vec::new(),
1675            #[cfg(feature = "cluster")]
1676            pending_follower_checkpoint: None,
1677            force_ckpt_rx: Some(force_ckpt_rx),
1678            subscription_registry: Arc::clone(&self.subscription_registry),
1679            #[cfg(feature = "cluster")]
1680            active_subs: Arc::clone(&self.active_subs),
1681            #[cfg(feature = "cluster")]
1682            sub_route: std::sync::OnceLock::new(),
1683            static_stream_names,
1684            checkpoint_complete_tx,
1685            checkpoint_in_flight: Arc::clone(&checkpoint_in_flight),
1686            staged_bytes: Arc::clone(&staged_bytes),
1687            epoch_allocator,
1688            #[cfg(feature = "cluster")]
1689            quorum_timeout: ckpt_quorum_timeout,
1690            exactly_once_sinks: has_exactly_once_sink,
1691        };
1692
1693        // Start the streaming coordinator on a dedicated compute thread.
1694        // Source tasks were already spawned on the main tokio runtime in
1695        // StreamingCoordinator::new(). The coordinator communicates with
1696        // them via crossfire mpsc which works across runtimes.
1697        {
1698            // Control channel for live DDL (add/drop stream).
1699            let (control_tx, control_rx) =
1700                crossfire::mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1701            *self.control_tx.lock() = Some(control_tx);
1702
1703            let coordinator = crate::pipeline::StreamingCoordinator::new(
1704                sources,
1705                pipeline_config,
1706                Arc::clone(&shutdown),
1707                control_rx,
1708            )
1709            .await?
1710            .with_checkpoint_complete_rx(checkpoint_complete_rx)
1711            .with_checkpoint_admission(
1712                checkpoint_in_flight,
1713                max_in_flight_epochs,
1714                staged_bytes,
1715                max_staged_bytes,
1716            );
1717
1718            let (done_tx, done_rx) = crossfire::oneshot::oneshot::<()>();
1719            let (startup_tx, startup_rx) = crossfire::oneshot::oneshot::<Result<(), String>>();
1720            match std::thread::Builder::new()
1721                .name("laminar-compute".into())
1722                .spawn(move || {
1723                    let rt = match tokio::runtime::Builder::new_current_thread()
1724                        .enable_all()
1725                        .build()
1726                    {
1727                        Ok(rt) => {
1728                            startup_tx.send(Ok(()));
1729                            rt
1730                        }
1731                        Err(e) => {
1732                            startup_tx.send(Err(format!("compute runtime: {e}")));
1733                            return;
1734                        }
1735                    };
1736                    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1737                        rt.block_on(async move {
1738                            coordinator.run(callback).await;
1739                        });
1740                    }));
1741                    if let Err(panic) = result {
1742                        let msg = panic
1743                            .downcast_ref::<String>()
1744                            .map(String::as_str)
1745                            .or_else(|| panic.downcast_ref::<&str>().copied())
1746                            .unwrap_or("unknown");
1747                        tracing::error!(panic = msg, "laminar-compute thread panicked");
1748                        // done_tx dropped → done_rx returns Err → logged by watcher task
1749                        return;
1750                    }
1751                    done_tx.send(());
1752                }) {
1753                Ok(_) => {}
1754                Err(e) => {
1755                    return Err(DbError::Config(format!(
1756                        "failed to spawn compute thread: {e}"
1757                    )));
1758                }
1759            }
1760
1761            // Wait for the thread to confirm the runtime started.
1762            match startup_rx.await {
1763                Ok(Ok(())) => {}
1764                Ok(Err(e)) => return Err(DbError::Config(e)),
1765                Err(_) => {
1766                    return Err(DbError::Config(
1767                        "compute thread exited before starting runtime".into(),
1768                    ));
1769                }
1770            }
1771
1772            let watcher_state = Arc::clone(&self.state);
1773            let watcher_shutdown = Arc::clone(&self.shutdown_signal);
1774            let handle = tokio::spawn(async move {
1775                if done_rx.await.is_err() {
1776                    tracing::error!("laminar-compute thread exited unexpectedly");
1777                    DbState::Stopped.store(&watcher_state);
1778                    watcher_shutdown.notify_one();
1779                }
1780            });
1781
1782            *self.runtime_handle.lock() = Some(handle);
1783        }
1784        Ok(())
1785    }
1786
1787    /// Shut down the streaming pipeline gracefully. Idempotent.
1788    ///
1789    /// # Errors
1790    ///
1791    /// Returns an error if shutdown encounters an error.
1792    pub async fn shutdown(&self) -> Result<(), DbError> {
1793        if matches!(
1794            DbState::load(&self.state),
1795            DbState::Stopped | DbState::ShuttingDown
1796        ) {
1797            return Ok(());
1798        }
1799
1800        DbState::ShuttingDown.store(&self.state);
1801
1802        // Drop the force-checkpoint sender first so any later db.checkpoint()
1803        // errors cleanly instead of waiting on a oneshot that will never reply.
1804        *self.force_ckpt_tx.lock() = None;
1805
1806        self.shutdown_signal.notify_one();
1807
1808        let handle = self.runtime_handle.lock().take();
1809        if let Some(handle) = handle {
1810            match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
1811                Ok(Ok(())) => tracing::info!("Pipeline shut down cleanly"),
1812                Ok(Err(e)) => tracing::warn!(error = %e, "Pipeline task panicked during shutdown"),
1813                Err(_) => tracing::warn!("Pipeline shutdown timed out after 10s"),
1814            }
1815        }
1816
1817        DbState::Stopped.store(&self.state);
1818        self.close();
1819        Ok(())
1820    }
1821
1822    /// Stop the streaming pipeline so it can be restarted.
1823    ///
1824    /// # Errors
1825    /// Returns [`DbError::InvalidOperation`] if the pipeline is still starting
1826    /// or the coordinator does not exit within the stop timeout.
1827    pub async fn stop_pipeline(&self) -> Result<(), DbError> {
1828        // Atomically claim the stop: only the caller that flips Running ->
1829        // ShuttingDown tears down, so a concurrent stop can't race in and mark
1830        // the pipeline restartable while the coordinator is still shutting down.
1831        match DbState::compare_exchange(DbState::Running, DbState::ShuttingDown, &self.state) {
1832            Ok(_) => {}
1833            // Already stopped / never started — idempotent no-op.
1834            Err(DbState::Created | DbState::Stopped) => return Ok(()),
1835            // Starting, or a stop is already in progress — refuse.
1836            Err(_) => {
1837                return Err(DbError::InvalidOperation(
1838                    "cannot stop pipeline: not running (starting, or a stop already in progress)"
1839                        .into(),
1840                ));
1841            }
1842        }
1843
1844        // Clear the force-checkpoint sender.
1845        *self.force_ckpt_tx.lock() = None;
1846
1847        self.shutdown_signal.notify_one();
1848
1849        // Take runtime handle before awaiting.
1850        let handle = self.runtime_handle.lock().take();
1851        if let Some(handle) = handle {
1852            match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
1853                Ok(Ok(())) => tracing::info!("Pipeline stopped cleanly"),
1854                Ok(Err(e)) => tracing::warn!(error = %e, "Pipeline task panicked during stop"),
1855                Err(_) => {
1856                    // Coordinator did not exit, keep state as ShuttingDown.
1857                    tracing::error!("Pipeline stop timed out after 10s; coordinator still running");
1858                    return Err(DbError::InvalidOperation(
1859                        "pipeline stop timed out; coordinator did not exit".into(),
1860                    ));
1861                }
1862            }
1863        }
1864
1865        // Drop control channel and reset state to allow restart.
1866        *self.control_tx.lock() = None;
1867        DbState::Created.store(&self.state);
1868        Ok(())
1869    }
1870}
1871
1872#[cfg(test)]
1873mod resolver_tests {
1874    use super::resolve_stream_output_schemas;
1875    use crate::connector_manager::StreamRegistration;
1876    use arrow_schema::{DataType, Field, Schema};
1877    use datafusion::datasource::empty::EmptyTable;
1878    use datafusion::prelude::SessionContext;
1879    use std::sync::Arc;
1880    use std::time::Duration;
1881
1882    fn ctx_with_payments() -> SessionContext {
1883        let ctx = SessionContext::new();
1884        let schema = Arc::new(Schema::new(vec![
1885            Field::new("region", DataType::Utf8, false),
1886            Field::new("method", DataType::Utf8, false),
1887            Field::new("amount_usd", DataType::Float64, false),
1888            Field::new("status", DataType::Utf8, false),
1889            Field::new(
1890                "event_time",
1891                DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1892                false,
1893            ),
1894        ]));
1895        ctx.register_table("payments", Arc::new(EmptyTable::new(schema)))
1896            .unwrap();
1897        ctx.register_udf(datafusion_expr::ScalarUDF::from(
1898            laminar_sql::datafusion::TumbleWindowStart::new(),
1899        ));
1900        ctx
1901    }
1902
1903    fn reg(name: &str, sql: &str, windowed: bool) -> StreamRegistration {
1904        StreamRegistration {
1905            name: name.to_string(),
1906            query_sql: sql.to_string(),
1907            emit_clause: None,
1908            // Resolver only checks `is_some()`; the size doesn't matter.
1909            window_config: windowed.then(|| {
1910                laminar_sql::translator::WindowOperatorConfig::tumbling(
1911                    "event_time".into(),
1912                    Duration::ZERO,
1913                )
1914            }),
1915            order_config: None,
1916            join_config: None,
1917        }
1918    }
1919
1920    #[tokio::test]
1921    async fn windowed_stream_schema_matches_user_select() {
1922        let ctx = ctx_with_payments();
1923        let mut regs = std::collections::HashMap::new();
1924        regs.insert(
1925            "agg".to_string(),
1926            reg(
1927                "agg",
1928                "SELECT region, COUNT(*) AS n FROM payments \
1929                 GROUP BY tumble(event_time, INTERVAL '1' MINUTE), region",
1930                true,
1931            ),
1932        );
1933
1934        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
1935        let names: Vec<&str> = out["agg"]
1936            .fields()
1937            .iter()
1938            .map(|f| f.name().as_str())
1939            .collect();
1940        assert_eq!(names, vec!["region", "n"]);
1941    }
1942
1943    #[tokio::test]
1944    async fn windowed_stream_with_explicit_window_columns() {
1945        let ctx = ctx_with_payments();
1946        ctx.register_udf(datafusion_expr::ScalarUDF::from(
1947            laminar_sql::datafusion::TumbleWindowEnd::new(),
1948        ));
1949        let mut regs = std::collections::HashMap::new();
1950        regs.insert(
1951            "agg".to_string(),
1952            reg(
1953                "agg",
1954                "SELECT \
1955                    tumble(event_time, INTERVAL '1' MINUTE)     AS window_start, \
1956                    tumble_end(event_time, INTERVAL '1' MINUTE) AS window_end, \
1957                    region, \
1958                    COUNT(*) AS n \
1959                 FROM payments \
1960                 GROUP BY \
1961                    tumble(event_time, INTERVAL '1' MINUTE), \
1962                    tumble_end(event_time, INTERVAL '1' MINUTE), \
1963                    region",
1964                true,
1965            ),
1966        );
1967
1968        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
1969        let names: Vec<&str> = out["agg"]
1970            .fields()
1971            .iter()
1972            .map(|f| f.name().as_str())
1973            .collect();
1974        assert_eq!(names, vec!["window_start", "window_end", "region", "n"]);
1975        assert_eq!(
1976            out["agg"].field(0).data_type(),
1977            &DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1978        );
1979        assert_eq!(
1980            out["agg"].field(1).data_type(),
1981            &DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1982        );
1983    }
1984
1985    #[tokio::test]
1986    async fn non_windowed_stream_has_no_prefix() {
1987        let ctx = ctx_with_payments();
1988        let mut regs = std::collections::HashMap::new();
1989        regs.insert(
1990            "passthrough".to_string(),
1991            reg(
1992                "passthrough",
1993                "SELECT region, amount_usd FROM payments",
1994                false,
1995            ),
1996        );
1997
1998        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
1999        let names: Vec<&str> = out["passthrough"]
2000            .fields()
2001            .iter()
2002            .map(|f| f.name().as_str())
2003            .collect();
2004        assert_eq!(names, vec!["region", "amount_usd"]);
2005    }
2006
2007    #[tokio::test]
2008    async fn chained_streams_resolve_via_iterative_planning() {
2009        // `b` reads from `a`; iteration order doesn't matter — the loop
2010        // re-tries `b` after `a` is registered.
2011        let ctx = ctx_with_payments();
2012        let mut regs = std::collections::HashMap::new();
2013        regs.insert(
2014            "b".to_string(),
2015            reg("b", "SELECT region, n + 1 AS n_plus_one FROM a", false),
2016        );
2017        regs.insert(
2018            "a".to_string(),
2019            reg(
2020                "a",
2021                "SELECT region, COUNT(*) AS n FROM payments GROUP BY region",
2022                false,
2023            ),
2024        );
2025
2026        let out = resolve_stream_output_schemas(&ctx, &regs).await.unwrap();
2027        let b_names: Vec<&str> = out["b"]
2028            .fields()
2029            .iter()
2030            .map(|f| f.name().as_str())
2031            .collect();
2032        assert_eq!(b_names, vec!["region", "n_plus_one"]);
2033
2034        // Placeholders must not leak into the public ctx — `subscribe()`
2035        // is the data path for streams; `SELECT * FROM <stream>` should
2036        // not silently return zero rows from a left-over EmptyTable.
2037        assert!(!ctx.table_exist("a").unwrap_or(false));
2038        assert!(!ctx.table_exist("b").unwrap_or(false));
2039    }
2040
2041    #[tokio::test]
2042    async fn unresolvable_streams_surface_planner_error() {
2043        let ctx = ctx_with_payments();
2044        let mut regs = std::collections::HashMap::new();
2045        // Cycle: a→b, b→a. Planning stalls; we report the unresolved set.
2046        regs.insert("a".to_string(), reg("a", "SELECT * FROM b", false));
2047        regs.insert("b".to_string(), reg("b", "SELECT * FROM a", false));
2048
2049        let err = resolve_stream_output_schemas(&ctx, &regs)
2050            .await
2051            .unwrap_err()
2052            .to_string();
2053        assert!(err.contains("unresolvable stream dependency"), "got: {err}");
2054        assert!(err.contains('a') && err.contains('b'), "got: {err}");
2055    }
2056}