1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use laminar_core::streaming;
9use rustc_hash::FxHashMap;
10
11use crate::db::{DbState, LaminarDB, SourceWatermarkState};
12use crate::error::DbError;
13
14pub(crate) async fn plan_output_schema(
21 ctx: &datafusion::prelude::SessionContext,
22 sql: &str,
23) -> Option<arrow_schema::SchemaRef> {
24 let plan = if let Ok(plan) = ctx.state().create_logical_plan(sql).await {
25 plan
26 } else {
27 let rewritten = crate::sql_analysis::rewrite_asof_joins_for_planning(sql)?;
28 ctx.state().create_logical_plan(&rewritten).await.ok()?
29 };
30 let fields: Vec<_> = plan
31 .schema()
32 .fields()
33 .iter()
34 .map(|f| (**f).clone())
35 .collect();
36 Some(Arc::new(arrow_schema::Schema::new(fields)))
37}
38
39async fn resolve_stream_output_schemas(
40 ctx: &datafusion::prelude::SessionContext,
41 stream_regs: &HashMap<String, crate::connector_manager::StreamRegistration>,
42) -> Result<HashMap<String, arrow_schema::SchemaRef>, DbError> {
43 use datafusion::datasource::empty::EmptyTable;
44
45 let mut out: HashMap<String, arrow_schema::SchemaRef> =
46 HashMap::with_capacity(stream_regs.len());
47 let mut pending: Vec<&crate::connector_manager::StreamRegistration> =
48 stream_regs.values().collect();
49 let mut placeholders: Vec<String> = Vec::new();
51
52 let result: Result<(), DbError> = async {
53 while !pending.is_empty() {
54 let mut next: Vec<&crate::connector_manager::StreamRegistration> = Vec::new();
55 let mut progressed = false;
56 for reg in pending {
57 let Some(schema) = plan_output_schema(ctx, ®.query_sql).await else {
58 next.push(reg);
59 continue;
60 };
61
62 if !ctx.table_exist(®.name).unwrap_or(false) {
63 ctx.register_table(®.name, Arc::new(EmptyTable::new(schema.clone())))
64 .map_err(|e| {
65 DbError::Pipeline(format!(
66 "could not register placeholder for stream '{}': {e}",
67 reg.name
68 ))
69 })?;
70 placeholders.push(reg.name.clone());
71 }
72 out.insert(reg.name.clone(), schema);
73 progressed = true;
74 }
75
76 if !progressed {
77 let mut unresolved: Vec<&str> = next.iter().map(|r| r.name.as_str()).collect();
78 unresolved.sort_unstable();
79 let sql = &next[0].query_sql;
83 let err = match crate::sql_analysis::rewrite_asof_joins_for_planning(sql) {
84 Some(rewritten) => ctx.state().create_logical_plan(&rewritten).await.err(),
85 None => ctx.state().create_logical_plan(sql).await.err(),
86 }
87 .map_or_else(|| "unknown error".to_string(), |e| e.to_string());
88 return Err(DbError::Pipeline(format!(
89 "unresolvable stream dependency among [{}]: {err}",
90 unresolved.join(", ")
91 )));
92 }
93 pending = next;
94 }
95 Ok(())
96 }
97 .await;
98
99 for name in &placeholders {
100 let _ = ctx.deregister_table(name);
101 }
102
103 result.map(|()| out)
104}
105
106impl LaminarDB {
107 pub fn close(&self) {
109 self.shutdown
110 .store(true, std::sync::atomic::Ordering::Relaxed);
111 }
112
113 pub fn is_closed(&self) -> bool {
115 self.shutdown.load(std::sync::atomic::Ordering::Relaxed)
116 }
117
118 pub(crate) fn is_pipeline_running(&self) -> bool {
119 matches!(
120 DbState::load(&self.state),
121 DbState::Running | DbState::Starting | DbState::ShuttingDown
122 )
123 }
124
125 pub async fn start(&self) -> Result<(), DbError> {
135 match DbState::load(&self.state) {
136 DbState::Running | DbState::Starting => return Ok(()),
137 DbState::Stopped => {
138 return Err(DbError::InvalidOperation(
139 "Cannot start a stopped pipeline. Create a new LaminarDB instance.".into(),
140 ));
141 }
142 DbState::ShuttingDown => {
143 return Err(DbError::InvalidOperation(
144 "cannot start pipeline: shutdown/stop in progress".into(),
145 ));
146 }
147 DbState::Created => {}
148 }
149
150 DbState::Starting.store(&self.state);
151
152 {
154 let mut guard = self.engine_metrics.lock();
155 if guard.is_none() {
156 *guard = Some(Arc::new(crate::engine_metrics::EngineMetrics::new(
157 &prometheus::Registry::new(),
158 )));
159 }
160 }
161
162 #[cfg(feature = "cluster")]
167 self.restore_catalog_from_manifest().await;
168
169 match self.start_inner().await {
170 Ok(()) => {
171 DbState::Running.store(&self.state);
172 Ok(())
173 }
174 Err(e) => {
175 DbState::Created.store(&self.state);
178 Err(e)
179 }
180 }
181 }
182
183 #[allow(clippy::too_many_lines)]
184 async fn start_inner(&self) -> Result<(), DbError> {
185 let (source_regs, sink_regs, stream_regs, table_regs, has_external) = {
187 let mgr = self.connector_manager.lock();
188 (
189 mgr.sources().clone(),
190 mgr.sinks().clone(),
191 mgr.streams().clone(),
192 mgr.tables().clone(),
193 mgr.has_external_connectors(),
194 )
195 };
196
197 for (name, reg) in &source_regs {
199 tracing::debug!(source = %name, connector_type = ?reg.connector_type, "Registered source");
200 }
201 for (name, reg) in &sink_regs {
202 tracing::debug!(sink = %name, connector_type = ?reg.connector_type, "Registered sink");
203 }
204
205 if let Some(ref cp_config) = self.config.checkpoint {
207 use crate::checkpoint_coordinator::{
208 CheckpointConfig as CkpConfig, CheckpointCoordinator,
209 };
210
211 let max_retained = cp_config.max_retained.unwrap_or(3);
212 let vnode_count = self.vnode_registry.lock().as_ref().map_or(
213 laminar_core::storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
214 |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
215 );
216
217 let data_dir = cp_config
220 .data_dir
221 .clone()
222 .or_else(|| self.config.storage_dir.clone())
223 .unwrap_or_else(|| std::path::PathBuf::from("./data"));
224
225 let (store, decision_backing): (
226 Box<dyn laminar_core::storage::CheckpointStore>,
227 Arc<dyn object_store::ObjectStore>,
228 ) = if let Some(ref url) = self.config.object_store_url {
229 let obj = laminar_core::storage::object_store_builder::build_object_store(
233 url,
234 &self.config.object_store_options,
235 )
236 .map_err(|e| DbError::Config(format!("object store: {e}")))?;
237 let cs = laminar_core::storage::checkpoint_store::ObjectStoreCheckpointStore::new(
238 Arc::clone(&obj),
239 String::new(),
240 max_retained,
241 )
242 .with_vnode_count(vnode_count);
243 (Box::new(cs), obj)
244 } else {
245 std::fs::create_dir_all(&data_dir).map_err(|e| {
246 DbError::Config(format!("data dir {}: {e}", data_dir.display()))
247 })?;
248 let obj: Arc<dyn object_store::ObjectStore> = Arc::new(
249 object_store::local::LocalFileSystem::new_with_prefix(&data_dir)
250 .map_err(|e| DbError::Config(format!("local fs: {e}")))?,
251 );
252 let cs = laminar_core::storage::checkpoint_store::FileSystemCheckpointStore::new(
253 &data_dir,
254 max_retained,
255 )
256 .with_vnode_count(vnode_count);
257 (Box::new(cs), obj)
258 };
259
260 let defaults = CkpConfig::default();
261 let config = CkpConfig {
262 interval: cp_config.interval_ms.map(std::time::Duration::from_millis),
263 max_retained,
264 max_in_flight_epochs: cp_config
265 .max_in_flight_epochs
266 .unwrap_or(defaults.max_in_flight_epochs),
267 max_staged_bytes: cp_config
268 .max_staged_bytes
269 .unwrap_or(defaults.max_staged_bytes),
270 ..defaults
271 };
272 let mut coord = CheckpointCoordinator::new(config, store).await?;
273 if let Some(ref prom) = *self.engine_metrics.lock() {
274 coord.set_metrics(Arc::clone(prom));
275 }
276
277 #[cfg(feature = "cluster")]
281 if let Some(controller) = self.cluster_controller.lock().clone() {
282 coord.set_cluster_controller(controller);
283 }
284
285 if let (Some(backend), Some(registry)) = (
291 self.state_backend.lock().clone(),
292 self.vnode_registry.lock().clone(),
293 ) {
294 let owner = {
295 #[cfg(feature = "cluster")]
296 {
297 self.cluster_controller
298 .lock()
299 .as_ref()
300 .map_or(laminar_core::state::NodeId(0), |c| {
301 laminar_core::state::NodeId(c.instance_id().0)
302 })
303 }
304 #[cfg(not(feature = "cluster"))]
305 {
306 laminar_core::state::NodeId(0)
307 }
308 };
309 let version = registry.assignment_version();
318 backend.set_authoritative_version(version);
319 coord.set_state_backend(backend);
320 coord.set_assignment_version(version);
321 coord.set_vnode_set(laminar_core::state::owned_vnodes(®istry, owner));
322 coord.set_gate_vnode_set((0..registry.vnode_count()).collect());
326 }
327
328 let ds = {
332 #[cfg(feature = "cluster")]
333 {
334 self.decision_store.lock().clone().unwrap_or_else(|| {
335 Arc::new(
336 laminar_core::checkpoint_decision::CheckpointDecisionStore::new(
337 Arc::clone(&decision_backing),
338 ),
339 )
340 })
341 }
342 #[cfg(not(feature = "cluster"))]
343 {
344 Arc::new(
345 laminar_core::checkpoint_decision::CheckpointDecisionStore::new(
346 Arc::clone(&decision_backing),
347 ),
348 )
349 }
350 };
351 coord.set_decision_store(ds);
352
353 coord.reconcile_prepared_on_init().await;
356
357 *self.coordinator.lock().await = Some(coord);
358 }
359
360 if has_external || !stream_regs.is_empty() {
361 tracing::info!(
362 sources = source_regs.len(),
363 sinks = sink_regs.len(),
364 streams = stream_regs.len(),
365 tables = table_regs.len(),
366 has_external,
367 "Starting pipeline"
368 );
369 self.start_connector_pipeline(
370 source_regs,
371 sink_regs,
372 stream_regs,
373 table_regs,
374 has_external,
375 )
376 .await?;
377 } else {
378 tracing::info!(
379 sources = source_regs.len(),
380 sinks = sink_regs.len(),
381 "Starting in embedded (in-memory) mode — no streams"
382 );
383 }
384
385 Ok(())
386 }
387
388 #[allow(clippy::too_many_lines)]
399 async fn start_connector_pipeline(
400 &self,
401 source_regs: HashMap<String, crate::connector_manager::SourceRegistration>,
402 sink_regs: HashMap<String, crate::connector_manager::SinkRegistration>,
403 stream_regs: HashMap<String, crate::connector_manager::StreamRegistration>,
404 table_regs: HashMap<String, crate::connector_manager::TableRegistration>,
405 has_external: bool,
406 ) -> Result<(), DbError> {
407 use crate::connector_manager::{
408 build_sink_config, build_source_config, build_table_config,
409 };
410 use crate::operator_graph::OperatorGraph;
411 use crate::pipeline::{PipelineConfig, SourceRegistration};
412 use laminar_connectors::reference::{ReferenceTableSource, RefreshMode};
413
414 let ctx = {
417 use datafusion::execution::SessionStateBuilder;
418 let mut session_config = laminar_sql::datafusion::base_session_config();
419 if let Some(n) = self.pipeline_target_partitions {
420 session_config = session_config.with_target_partitions(n);
421 }
422 let query_planner = Arc::clone(self.ctx.state().query_planner());
423 let mut state_builder = SessionStateBuilder::new()
424 .with_config(session_config)
425 .with_default_features()
426 .with_query_planner(query_planner);
427 for rule in self.physical_optimizer_rules.iter() {
428 state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
429 }
430 let context =
431 datafusion::prelude::SessionContext::new_with_state(state_builder.build());
432 for rule in self.ctx.state().optimizers() {
433 context.add_optimizer_rule(Arc::clone(rule));
434 }
435 context
436 };
437 laminar_sql::register_streaming_functions(&ctx);
438
439 let lookup_tables: Vec<(String, arrow::datatypes::SchemaRef)> = {
442 let ts = self.table_store.read();
443 ts.table_names()
444 .into_iter()
445 .filter_map(|name| {
446 let schema = ts.table_schema(&name)?;
447 Some((name, schema))
448 })
449 .collect()
450 };
451 for (name, schema) in lookup_tables {
452 let provider = crate::table_provider::ReferenceTableProvider::new(
453 name.clone(),
454 schema,
455 self.table_store.clone(),
456 );
457 if let Err(e) = ctx.register_table(&name, Arc::new(provider)) {
458 tracing::warn!(
459 table = %name,
460 error = %e,
461 "failed to register lookup table in operator graph context"
462 );
463 }
464 }
465
466 let mut graph = OperatorGraph::new(ctx);
467 graph.set_max_state_bytes(self.config.max_state_bytes_per_operator);
468 graph.set_lookup_registry(Arc::clone(&self.lookup_registry));
469 if let Some(ref prom) = *self.engine_metrics.lock() {
470 graph.set_metrics(Arc::clone(prom));
471 }
472 if let (Some(runtime), Some(handle)) = (&self.ai_runtime, &self.ai_handle) {
473 graph.set_ai_runtime(Arc::clone(runtime), handle.clone());
474 }
475
476 #[cfg(feature = "cluster")]
480 {
481 let sender = self.shuffle_sender.lock().clone();
482 let receiver = self.shuffle_receiver.lock().clone();
483 let registry = self.vnode_registry.lock().clone();
484 let controller = self.cluster_controller.lock().clone();
485 if let (Some(sender), Some(receiver), Some(registry), Some(controller)) =
486 (sender, receiver, registry, controller)
487 {
488 let self_id = laminar_core::state::NodeId(controller.instance_id().0);
489 graph.set_cluster_shuffle(crate::operator::sql_query::ClusterShuffleConfig {
490 registry,
491 sender,
492 receiver,
493 self_id,
494 });
495 graph.set_rehydration_handle(Arc::clone(&self.rehydrated_vnode_state));
498 }
499 }
500
501 for name in source_regs.keys() {
505 if let Some(entry) = self.catalog.get_source(name) {
506 graph.register_source_schema(name.clone(), entry.schema.clone());
507 }
508 }
509
510 let partial_lookup_tables: rustc_hash::FxHashMap<String, Vec<String>> = table_regs
516 .values()
517 .filter(|r| matches!(r.refresh, Some(RefreshMode::Manual)))
518 .filter_map(|r| {
519 let schema = self.table_store.read().table_schema(&r.name)?;
520 let cols = schema.fields().iter().map(|f| f.name().clone()).collect();
521 Some((r.name.clone(), cols))
522 })
523 .collect();
524 graph.set_partial_lookup_tables(partial_lookup_tables);
525 graph.set_runtime_handle(
527 self.ai_handle
528 .clone()
529 .unwrap_or_else(tokio::runtime::Handle::current),
530 );
531
532 for reg in stream_regs.values() {
533 graph.add_query(
534 reg.name.clone(),
535 reg.query_sql.clone(),
536 reg.emit_clause.clone(),
537 reg.window_config.clone(),
538 reg.order_config.clone(),
539 None,
540 reg.join_config.clone(),
541 );
542 }
543 graph.take_build_errors()?;
546
547 for tcfg in graph.temporal_join_configs() {
550 if self.lookup_registry.get_entry(&tcfg.table_name).is_none() {
551 let initial_batch = self
554 .table_store
555 .read()
556 .to_record_batch(&tcfg.table_name)
557 .or_else(|| {
558 self.catalog
559 .get_source(&tcfg.table_name)
560 .map(|e| RecordBatch::new_empty(e.schema.clone()))
561 })
562 .unwrap_or_else(|| {
563 RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty()))
564 });
565 let key_columns = vec![tcfg.table_key_column.clone()];
566 let key_indices: Vec<usize> = key_columns
567 .iter()
568 .filter_map(|k| initial_batch.schema().index_of(k).ok())
569 .collect();
570
571 let resolved_version_col = if tcfg.table_version_column.is_empty() {
575 let schema = initial_batch.schema();
576 schema
577 .fields()
578 .iter()
579 .find(|f| {
580 f.name() != &tcfg.table_key_column
581 && matches!(
582 f.data_type(),
583 arrow::datatypes::DataType::Int64
584 | arrow::datatypes::DataType::Timestamp(_, _)
585 )
586 })
587 .map(|f| f.name().clone())
588 .unwrap_or_default()
589 } else {
590 tcfg.table_version_column.clone()
591 };
592
593 let Ok(version_col_idx) = initial_batch.schema().index_of(&resolved_version_col)
594 else {
595 if !initial_batch.schema().fields().is_empty() {
596 tracing::warn!(
597 table=%tcfg.table_name,
598 version_col=%resolved_version_col,
599 "Version column not found in temporal table schema; \
600 will resolve on first CDC batch"
601 );
602 }
603 self.lookup_registry.register_versioned(
605 &tcfg.table_name,
606 laminar_sql::datafusion::VersionedLookupState {
607 batch: initial_batch,
608 index: Arc::new(
609 laminar_sql::datafusion::lookup_join_exec::VersionedIndex::default(
610 ),
611 ),
612 key_columns,
613 version_column: resolved_version_col,
614 stream_time_column: tcfg.stream_time_column.clone(),
615 max_versions_per_key: usize::MAX,
616 },
617 );
618 continue;
619 };
620 let index = Arc::new(
621 laminar_sql::datafusion::lookup_join_exec::VersionedIndex::build(
622 &initial_batch,
623 &key_indices,
624 version_col_idx,
625 usize::MAX,
626 )
627 .unwrap_or_default(),
628 );
629 self.lookup_registry.register_versioned(
630 &tcfg.table_name,
631 laminar_sql::datafusion::VersionedLookupState {
632 batch: initial_batch,
633 index,
634 key_columns,
635 version_column: resolved_version_col,
636 stream_time_column: tcfg.stream_time_column.clone(),
637 max_versions_per_key: usize::MAX,
638 },
639 );
640 }
641 }
642
643 let prom_registry = self.prometheus_registry.lock().clone();
646
647 let mut sources: Vec<SourceRegistration> = Vec::new();
649 for (name, reg) in &source_regs {
650 if reg.connector_type.is_none() {
651 continue;
652 }
653 let mut config = build_source_config(reg)?;
654
655 if let Some(entry) = self.catalog.get_source(name) {
658 let schema_str = crate::pipeline_callback::encode_arrow_schema(&entry.schema);
659 config.set("_arrow_schema".to_string(), schema_str);
660 }
661
662 #[cfg_attr(not(feature = "cluster"), allow(unused_mut))]
663 let mut source = self
664 .connector_registry
665 .create_source(&config, prom_registry.as_deref())
666 .map_err(|e| {
667 DbError::Connector(format!(
668 "Cannot create source '{}' (type '{}'): {e}",
669 name,
670 config.connector_type()
671 ))
672 })?;
673 #[cfg(feature = "cluster")]
677 if let (Some(registry), Some(self_id)) = (
678 self.vnode_registry.lock().clone(),
679 self.cluster_controller
680 .lock()
681 .as_ref()
682 .map(|c| laminar_core::state::NodeId(c.instance_id().0)),
683 ) {
684 source.set_vnode_assignment(registry, self_id);
685 }
686 let supports_replay = source.supports_replay();
687 if !supports_replay {
688 tracing::warn!(
689 source = %name,
690 "source does not support replay — exactly-once semantics \
691 are degraded to at-most-once for this source"
692 );
693 }
694 if let Some(entry) = self.catalog.get_source(name) {
702 if entry.source.event_time_column().is_none() {
703 if let Some(col) = config.get("event.time.column") {
704 entry.source.set_event_time_column(col);
705 } else if let Some(col) = config.get("event.time.field") {
706 entry.source.set_event_time_column(col);
707 }
708 }
709 if let Some(ms_str) = config.get("max.out.of.orderness.ms") {
713 match ms_str.parse::<u64>() {
714 Ok(ms) => {
715 entry
716 .source
717 .set_max_out_of_orderness(std::time::Duration::from_millis(ms));
718 }
719 Err(e) => {
720 tracing::warn!(
721 source = %name,
722 value = %ms_str,
723 error = %e,
724 "ignoring unparseable max.out.of.orderness.ms — \
725 watermark will use Duration::ZERO"
726 );
727 }
728 }
729 }
730 }
731
732 sources.push(SourceRegistration {
733 name: name.clone(),
734 connector: source,
735 config,
736 supports_replay,
737 restore_checkpoint: None, });
739 }
740
741 let bridged_names: rustc_hash::FxHashSet<String> =
749 sources.iter().map(|s| s.name.clone()).collect();
750 for (name, reg) in &source_regs {
752 if reg.connector_type.is_some() {
753 continue; }
755 if let Some(entry) = self.catalog.get_source(name) {
756 let subscription = entry.sink.subscribe();
757 let connector = crate::catalog_connector::CatalogSourceConnector::new(
758 subscription,
759 entry.schema.clone(),
760 entry.data_notify(),
761 );
762 sources.push(SourceRegistration {
763 name: name.clone(),
764 connector: Box::new(connector),
765 config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
766 supports_replay: false,
767 restore_checkpoint: None,
768 });
769 }
770 }
771 for name in self.catalog.list_sources() {
774 if bridged_names.contains(&name) || source_regs.contains_key(&name) {
775 continue;
776 }
777 if let Some(entry) = self.catalog.get_source(&name) {
778 graph.register_source_schema(name.clone(), entry.schema.clone());
779 let subscription = entry.sink.subscribe();
780 let connector = crate::catalog_connector::CatalogSourceConnector::new(
781 subscription,
782 entry.schema.clone(),
783 entry.data_notify(),
784 );
785 sources.push(SourceRegistration {
786 name: name.clone(),
787 connector: Box::new(connector),
788 config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
789 supports_replay: false,
790 restore_checkpoint: None,
791 });
792 }
793 }
794
795 let stream_output_schemas = resolve_stream_output_schemas(&self.ctx, &stream_regs).await?;
800 {
801 let mut schemas = self.stream_schemas.write();
802 schemas.clear();
803 schemas.extend(
804 stream_output_schemas
805 .iter()
806 .map(|(k, v)| (k.clone(), Arc::clone(v))),
807 );
808 }
809
810 let (sink_event_tx, sink_event_rx) =
814 laminar_core::streaming::channel::channel::<crate::sink_task::SinkEvent>(
815 crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY,
816 );
817 #[allow(clippy::type_complexity)]
818 let mut sinks: Vec<(
819 String,
820 crate::sink_task::SinkTaskHandle,
821 Option<String>,
822 String, bool, )> = Vec::new();
825 for (name, reg) in &sink_regs {
826 if reg.connector_type.is_none() {
827 continue;
828 }
829 let mut config = build_sink_config(reg)?;
830 let upstream_schema = stream_output_schemas.get(®.input).cloned().or_else(|| {
833 self.catalog
834 .get_source(®.input)
835 .map(|e| e.schema.clone())
836 });
837 if let Some(schema) = upstream_schema {
838 let schema_str = crate::pipeline_callback::encode_arrow_schema(&schema);
839 config.set("_arrow_schema".to_string(), schema_str);
840 }
841 let mut sink = self
842 .connector_registry
843 .create_sink(&config, prom_registry.as_deref())
844 .map_err(|e| {
845 DbError::Connector(format!(
846 "Cannot create sink '{}' (type '{}'): {e}",
847 name,
848 config.connector_type()
849 ))
850 })?;
851 sink.open(&config)
853 .await
854 .map_err(|e| DbError::Connector(format!("Failed to open sink '{name}': {e}")))?;
855 let caps = sink.capabilities();
856 let write_timeout =
859 match config
860 .get_parsed::<u64>("sink.write.timeout.ms")
861 .map_err(|e| {
862 DbError::Connector(format!(
863 "Invalid 'sink.write.timeout.ms' for sink '{name}': {e}"
864 ))
865 })? {
866 Some(ms) => std::time::Duration::from_millis(ms),
867 None => caps.suggested_write_timeout,
868 };
869 if write_timeout.is_zero() {
870 return Err(DbError::Connector(format!(
871 "sink '{name}': write_timeout must be > 0 \
872 (check 'sink.write.timeout.ms' or the sink's \
873 suggested_write_timeout)"
874 )));
875 }
876 let sink_id: std::sync::Arc<str> = std::sync::Arc::from(name.as_str());
877 let handle =
878 crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
879 name: name.clone(),
880 sink_id,
881 connector: sink,
882 exactly_once: caps.exactly_once,
883 channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
884 flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
885 write_timeout,
886 event_tx: sink_event_tx.clone(),
887 });
888 sinks.push((
889 name.clone(),
890 handle,
891 reg.filter_expr.clone(),
892 reg.input.clone(),
893 caps.changelog,
894 ));
895 }
896 drop(sink_event_tx);
899
900 let mut table_sources: Vec<(String, Box<dyn ReferenceTableSource>, RefreshMode)> =
902 Vec::new();
903 for (name, reg) in &table_regs {
904 if reg.connector_type.is_none() {
905 continue;
906 }
907 let config = build_table_config(reg)?;
908 let source = self
909 .connector_registry
910 .create_table_source(&config)
911 .map_err(|e| {
912 DbError::Connector(format!("Cannot create table source '{name}': {e}"))
913 })?;
914 let mode = reg.refresh.clone().unwrap_or(RefreshMode::SnapshotPlusCdc);
915 table_sources.push((name.clone(), source, mode));
916 }
917
918 {
922 let mut guard = self.coordinator.lock().await;
923 if let Some(ref mut coord) = *guard {
924 for (name, handle, _, _, _) in &sinks {
925 let exactly_once = handle.exactly_once();
926 coord.register_sink(name.clone(), handle.clone(), exactly_once);
927 }
928 }
929 }
930
931 let mut recovered_source_wms: rustc_hash::FxHashMap<String, i64> =
942 rustc_hash::FxHashMap::default();
943 {
944 let mut guard = self.coordinator.lock().await;
945 if let Some(ref mut coord) = *guard {
946 match coord.recover().await {
947 Ok(Some(recovered)) => {
948 recovered_source_wms = recovered
949 .manifest
950 .source_watermarks
951 .iter()
952 .filter(|(_, &wm)| wm != i64::MIN)
953 .map(|(name, &wm)| (name.clone(), wm))
954 .collect();
955 for (name, source, _) in &mut table_sources {
956 if let Some(cp) = recovered.manifest.table_offsets.get(name) {
957 let restored =
958 crate::checkpoint_coordinator::connector_to_source_checkpoint(
959 cp,
960 );
961 if let Err(e) = source.restore(&restored).await {
962 tracing::warn!(
963 table=%name, error=%e,
964 "Table source restore failed"
965 );
966 }
967 }
968 }
969 for src in &mut sources {
974 if !src.supports_replay {
975 continue;
976 }
977 if let Some(cp) = recovered.manifest.source_offsets.get(&src.name) {
978 let restored =
979 crate::checkpoint_coordinator::connector_to_source_checkpoint(
980 cp,
981 );
982 tracing::info!(
983 source = %src.name,
984 offsets = cp.offsets.len(),
985 "attaching checkpoint offsets for source recovery"
986 );
987 src.restore_checkpoint = Some(restored);
988 }
989 }
990 let mut graph_restore_failed = false;
991 let op_keys: Vec<&String> =
992 recovered.manifest.operator_states.keys().collect();
993 let instance_hint = {
994 #[cfg(feature = "cluster")]
995 {
996 self.cluster_controller
997 .lock()
998 .as_ref()
999 .map_or(0, |c| c.instance_id().0)
1000 }
1001 #[cfg(not(feature = "cluster"))]
1002 {
1003 0u64
1004 }
1005 };
1006 tracing::info!(
1007 instance = instance_hint,
1008 count = op_keys.len(),
1009 keys = ?op_keys,
1010 "manifest operator_states summary"
1011 );
1012 if let Some(op) = recovered.manifest.operator_states.get("operator_graph") {
1013 if let Some(bytes) = op.decode_inline() {
1014 match graph.restore_from_bytes(&bytes) {
1015 Ok(n) => {
1016 tracing::info!(
1017 queries = n,
1018 "Restored operator graph state from checkpoint"
1019 );
1020 }
1021 Err(e) => {
1022 return Err(DbError::Checkpoint(format!(
1031 "[LDB-6029] operator graph restore failed: \
1032 {e} — refusing to start with checkpointed \
1033 source offsets and empty operator state"
1034 )));
1035 }
1036 }
1037 } else {
1038 tracing::warn!(
1039 "manifest has 'operator_graph' but decode_inline returned None"
1040 );
1041 }
1042 } else if recovered
1043 .manifest
1044 .operator_states
1045 .contains_key("stream_executor")
1046 {
1047 graph_restore_failed = true;
1048 tracing::warn!(
1049 "Found old stream_executor checkpoint format; \
1050 skipping restore (clean break). Starting fresh."
1051 );
1052 }
1053
1054 if !graph_restore_failed {
1058 let prefix = crate::mv_store::CHECKPOINT_KEY_PREFIX;
1059 let mut store = self.mv_store.write();
1060 let mut restored = 0usize;
1061 for (key, op) in &recovered.manifest.operator_states {
1062 if let Some(name) = key.strip_prefix(prefix) {
1063 if let Some(bytes) = op.decode_inline() {
1064 match store.restore_from_ipc(name, &bytes) {
1065 Ok(true) => restored += 1,
1066 Ok(false) => {} Err(e) => {
1068 tracing::warn!(mv = name, error = %e, "MV restore failed");
1069 }
1070 }
1071 }
1072 }
1073 }
1074 if restored > 0 {
1075 tracing::info!(mvs = restored, "Restored MV state from checkpoint");
1076 }
1077 }
1078 tracing::info!(
1079 epoch = recovered.epoch(),
1080 sources_restored = recovered.sources_restored,
1081 sinks_rolled_back = recovered.sinks_rolled_back,
1082 "Recovered from unified checkpoint"
1083 );
1084 }
1085 Ok(None) => {
1086 tracing::info!("No checkpoint found, starting fresh");
1087 }
1088 Err(e) => {
1089 tracing::warn!(error = %e, "Checkpoint recovery failed, starting fresh");
1090 }
1091 }
1092 }
1093 }
1094
1095 {
1098 let guard = self.coordinator.lock().await;
1099 if let Some(ref coord) = *guard {
1100 coord.begin_initial_epoch().await?;
1101 }
1102 }
1103
1104 for (name, source, mode) in &mut table_sources {
1106 if matches!(mode, RefreshMode::Manual) {
1107 continue;
1108 }
1109 while let Some(batch) = source
1110 .poll_snapshot()
1111 .await
1112 .map_err(|e| DbError::Connector(format!("Table '{name}' snapshot error: {e}")))?
1113 {
1114 self.table_store
1115 .write()
1116 .upsert(name, &batch)
1117 .map_err(|e| DbError::Connector(format!("Table '{name}' upsert error: {e}")))?;
1118 }
1119 self.sync_table_to_datafusion(name)?;
1120 {
1121 let mut ts = self.table_store.write();
1122 ts.rebuild_xor_filter(name);
1123 ts.set_ready(name, true);
1124 }
1125 if matches!(
1129 self.lookup_registry.get_entry(name),
1130 Some(laminar_sql::datafusion::RegisteredLookup::Versioned(_))
1131 ) {
1132 } else if let Some(batch) = self.table_store.read().to_record_batch(name) {
1134 self.lookup_registry
1135 .register(name, laminar_sql::datafusion::LookupSnapshot { batch });
1136 }
1137 }
1138
1139 for (name, _source, mode) in &mut table_sources {
1143 if !matches!(mode, RefreshMode::Manual) {
1144 continue;
1145 }
1146 let Some(reg) = table_regs.get(name.as_str()) else {
1147 continue;
1148 };
1149 let capacity_bytes = reg.cache_max_bytes.unwrap_or(64 * 1024 * 1024);
1151 let Some(schema) = self.table_store.read().table_schema(name) else {
1152 continue;
1153 };
1154 let pk_csv = ®.primary_key;
1155 let pk_cols: Vec<String> = pk_csv
1156 .split(',')
1157 .map(|s| s.trim().to_string())
1158 .filter(|s| !s.is_empty())
1159 .collect();
1160 let key_sort_fields: Vec<arrow::row::SortField> = pk_cols
1161 .iter()
1162 .filter_map(|col| {
1163 schema
1164 .field_with_name(col)
1165 .ok()
1166 .map(|f| arrow::row::SortField::new(f.data_type().clone()))
1167 })
1168 .collect();
1169
1170 let cache = Arc::new(laminar_core::lookup::lookup_cache::LookupMemoryCache::new(
1171 0,
1172 laminar_core::lookup::lookup_cache::LookupMemoryCacheConfig {
1173 capacity_bytes,
1174 ttl: reg.cache_ttl,
1175 },
1176 ));
1177 let lookup_source = if let Ok(mut config) = build_table_config(reg) {
1180 config.set("_primary_key_columns", pk_csv.as_str());
1181 match self
1182 .connector_registry
1183 .create_lookup_source(config, Some(Arc::clone(&schema)))
1184 .await
1185 {
1186 Some(Ok(src)) => Some(src),
1187 Some(Err(e)) => {
1188 tracing::warn!(
1189 table = %name, error = %e,
1190 "lookup source creation failed; cache-only mode"
1191 );
1192 None
1193 }
1194 None => None,
1195 }
1196 } else {
1197 None
1198 };
1199
1200 let projection = crate::sql_analysis::compute_lookup_projection(
1203 &schema,
1204 &pk_cols,
1205 name.as_str(),
1206 stream_regs.values().map(|r| r.query_sql.as_str()),
1207 );
1208
1209 self.lookup_registry.register_partial(
1210 name,
1211 laminar_sql::datafusion::PartialLookupState {
1212 lookup_cache: cache,
1213 schema,
1214 key_columns: pk_cols,
1215 key_sort_fields,
1216 source: lookup_source,
1217 fetch_semaphore: Arc::new(tokio::sync::Semaphore::new(16)),
1218 projection,
1219 },
1220 );
1221 *mode = RefreshMode::SnapshotPlusCdc;
1222 tracing::info!(
1223 table = %name,
1224 capacity_bytes,
1225 ttl = ?reg.cache_ttl,
1226 pk = %pk_csv,
1227 "registered on-demand lookup table (partial cache)"
1228 );
1229 }
1230
1231 let mut stream_sources: Vec<(String, streaming::Source<crate::catalog::ArrowRecord>)> =
1233 Vec::new();
1234 for reg in stream_regs.values() {
1235 if let Some(src) = self.catalog.get_stream_source(®.name) {
1236 stream_sources.push((reg.name.clone(), src));
1237 }
1238 }
1239
1240 let future_skew_ms = match std::env::var("LAMINAR_MAX_FUTURE_SKEW_MS") {
1243 Ok(v) => v.parse::<i64>().unwrap_or_else(|_| {
1244 tracing::warn!(
1245 value = %v,
1246 "invalid LAMINAR_MAX_FUTURE_SKEW_MS (expected an integer); \
1247 using the default"
1248 );
1249 laminar_core::time::DEFAULT_MAX_FUTURE_SKEW_MS
1250 }),
1251 Err(_) => laminar_core::time::DEFAULT_MAX_FUTURE_SKEW_MS,
1252 };
1253 let source_names = self.catalog.list_sources();
1254 let mut watermark_states: FxHashMap<String, SourceWatermarkState> =
1255 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1256 let mut source_entries_for_wm: FxHashMap<String, Arc<crate::catalog::SourceEntry>> =
1257 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1258 let mut source_ids: FxHashMap<String, usize> =
1259 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1260 for name in source_names {
1261 if let Some(entry) = self.catalog.get_source(&name) {
1262 if let (Some(col), Some(dur)) =
1263 (&entry.watermark_column, entry.max_out_of_orderness)
1264 {
1265 let extractor = laminar_core::time::EventTimeExtractor::from_column(col)
1266 .with_mode(laminar_core::time::ExtractionMode::Max);
1267 let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1268 .is_processing_time
1269 .load(std::sync::atomic::Ordering::Relaxed)
1270 {
1271 Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1272 } else {
1273 Box::new(
1274 laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(dur)
1275 .with_max_future_skew(future_skew_ms),
1276 )
1277 };
1278 let id = source_ids.len();
1279 source_ids.insert(name.clone(), id);
1280 watermark_states.insert(
1281 name.clone(),
1282 SourceWatermarkState {
1283 extractor,
1284 generator,
1285 column: col.clone(),
1286 },
1287 );
1288 }
1289 source_entries_for_wm.insert(name, entry);
1290 }
1291 }
1292
1293 for name in self.catalog.list_sources() {
1301 if watermark_states.contains_key(&name) {
1302 continue;
1303 }
1304 if let Some(entry) = self.catalog.get_source(&name) {
1305 if let Some(col) = entry.source.event_time_column() {
1306 let extractor = laminar_core::time::EventTimeExtractor::from_column(&col)
1307 .with_mode(laminar_core::time::ExtractionMode::Max);
1308 let ooo_bound = entry
1309 .source
1310 .max_out_of_orderness()
1311 .unwrap_or(std::time::Duration::ZERO);
1312 let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1313 .is_processing_time
1314 .load(std::sync::atomic::Ordering::Relaxed)
1315 {
1316 Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1317 } else {
1318 Box::new(
1319 laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(
1320 ooo_bound,
1321 )
1322 .with_max_future_skew(future_skew_ms),
1323 )
1324 };
1325 let id = source_ids.len();
1326 source_ids.insert(name.clone(), id);
1327 watermark_states.insert(
1328 name.clone(),
1329 SourceWatermarkState {
1330 extractor,
1331 generator,
1332 column: col,
1333 },
1334 );
1335 }
1336 }
1337 }
1338
1339 let idle_timeout_ms: Option<u64> = match std::env::var("LAMINAR_SOURCE_IDLE_TIMEOUT_MS") {
1343 Ok(v) => match v.parse::<u64>() {
1344 Ok(0) => None,
1345 Ok(ms) => Some(ms),
1346 Err(_) => {
1347 tracing::warn!(
1348 value = %v,
1349 "invalid LAMINAR_SOURCE_IDLE_TIMEOUT_MS (expected a non-negative \
1350 integer); idle-source detection disabled"
1351 );
1352 None
1353 }
1354 },
1355 Err(_) => None,
1356 };
1357 let mut tracker = if source_ids.is_empty() {
1358 None
1359 } else {
1360 let mut t = laminar_core::time::WatermarkTracker::new(source_ids.len());
1361 if let Some(ms) = idle_timeout_ms {
1362 let d = std::time::Duration::from_millis(ms);
1363 for id in 0..source_ids.len() {
1364 t.set_idle_timeout(id, Some(d));
1365 }
1366 }
1367 Some(t)
1368 };
1369
1370 let registered = self.catalog.list_sources();
1377 let unwatermarked: Vec<&str> = registered
1378 .iter()
1379 .filter(|n| !source_ids.contains_key(*n))
1380 .map(String::as_str)
1381 .collect();
1382 if !source_ids.is_empty() && !unwatermarked.is_empty() {
1383 tracing::warn!(
1384 watermarked = source_ids.len(),
1385 unwatermarked = unwatermarked.len(),
1386 unwatermarked_names = ?unwatermarked,
1387 "Pipeline mixes watermarked and un-watermarked sources. An un-watermarked \
1388 source in a join/window inherits the global watermark — time-based \
1389 operators may behave unexpectedly. Add `WATERMARK FOR` to the missing \
1390 sources or split into separate pipelines."
1391 );
1392 }
1393
1394 if !recovered_source_wms.is_empty() {
1399 let mut combined = i64::MIN;
1400 for (name, wm) in &recovered_source_wms {
1401 if let Some(state) = watermark_states.get_mut(name) {
1402 let _ = state.generator.advance_watermark(*wm);
1403 }
1404 if let (Some(t), Some(&id)) = (tracker.as_mut(), source_ids.get(name)) {
1405 if let Some(global) = t.update_source(id, *wm) {
1406 combined = combined.max(global.timestamp());
1407 }
1408 }
1409 }
1410 if combined != i64::MIN {
1411 self.pipeline_watermark
1412 .store(combined, std::sync::atomic::Ordering::SeqCst);
1413 tracing::info!(
1414 sources = recovered_source_wms.len(),
1415 pipeline_watermark = combined,
1416 "Restored watermarks from checkpoint"
1417 );
1418 }
1419 }
1420
1421 let max_poll = self.config.default_buffer_size.min(1024);
1422 let checkpoint_interval = self
1423 .config
1424 .checkpoint
1425 .as_ref()
1426 .and_then(|c| c.interval_ms)
1427 .map(std::time::Duration::from_millis);
1428
1429 tracing::info!(
1430 sources = sources.len(),
1431 sinks = sinks.len(),
1432 streams = stream_regs.len(),
1433 subscriptions = stream_sources.len(),
1434 watermark_sources = source_ids.len(),
1435 "Starting event-driven connector pipeline"
1436 );
1437
1438 let drain_budget_ns = self.config.pipeline_drain_budget_ns.unwrap_or(1_000_000);
1444 let query_budget_ns = self.config.pipeline_query_budget_ns.unwrap_or(8_000_000);
1445 let pipeline_config = PipelineConfig {
1446 max_poll_records: max_poll,
1447 channel_capacity: self.config.pipeline_channel_capacity.unwrap_or(64),
1448 fallback_poll_interval: if has_external {
1449 std::time::Duration::from_millis(10)
1450 } else {
1451 std::time::Duration::from_millis(1)
1452 },
1453 checkpoint_interval,
1454 batch_window: self
1455 .config
1456 .pipeline_batch_window
1457 .unwrap_or(if has_external {
1458 std::time::Duration::from_millis(5)
1459 } else {
1460 std::time::Duration::ZERO
1461 }),
1462 barrier_alignment_timeout: self
1463 .config
1464 .checkpoint
1465 .as_ref()
1466 .and_then(|c| c.alignment_timeout_ms)
1467 .map_or(
1468 std::time::Duration::from_secs(30),
1469 std::time::Duration::from_millis,
1470 ),
1471 delivery_guarantee: self.config.delivery_guarantee,
1472 cycle_budget_ns: 10_000_000_u64.max(drain_budget_ns + query_budget_ns),
1475 drain_budget_ns,
1476 query_budget_ns,
1477 background_budget_ns: 5_000_000, max_input_buf_batches: self.config.pipeline_max_input_buf_batches.unwrap_or(256),
1479 max_input_buf_bytes: self.config.pipeline_max_input_buf_bytes,
1480 backpressure_policy: self.config.pipeline_backpressure_policy,
1481 };
1482
1483 {
1485 use laminar_connectors::connector::DeliveryGuarantee;
1486
1487 if pipeline_config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1488 for src in &sources {
1489 if !src.supports_replay {
1490 return Err(DbError::Config(format!(
1491 "[LDB-5030] exactly-once requires all sources to support replay, \
1492 but source '{}' does not. Use at-least-once or remove this source.",
1493 src.name
1494 )));
1495 }
1496 }
1497 for (name, handle, _, _, _) in &sinks {
1498 if !handle.exactly_once() {
1499 return Err(DbError::Config(format!(
1500 "[LDB-5031] exactly-once requires all sinks to support \
1501 exactly-once semantics, but sink '{name}' does not. \
1502 Use at-least-once or configure a transactional sink."
1503 )));
1504 }
1505 }
1506 if pipeline_config.checkpoint_interval.is_none() {
1507 return Err(DbError::Config(
1508 "[LDB-5032] exactly-once requires checkpointing to be enabled. \
1509 Set checkpoint.interval.ms in the pipeline configuration."
1510 .into(),
1511 ));
1512 }
1513 } else if pipeline_config.delivery_guarantee == DeliveryGuarantee::AtLeastOnce {
1514 let has_non_replayable = sources.iter().any(|s| !s.supports_replay);
1515 let has_eo_sink = sinks.iter().any(|(_, h, _, _, _)| h.exactly_once());
1516 if has_non_replayable && has_eo_sink {
1517 tracing::warn!(
1518 "[LDB-5033] pipeline has exactly-once sinks but some sources \
1519 do not support replay — effective guarantee is at-most-once \
1520 for events from non-replayable sources"
1521 );
1522 }
1523 }
1524 }
1525
1526 let shutdown = self.shutdown_signal.clone();
1527
1528 let pipeline_watermark = Arc::clone(&self.pipeline_watermark);
1530 let coordinator = Arc::clone(&self.coordinator);
1531 let table_store_for_loop = self.table_store.clone();
1532 let pipeline_hash = {
1534 use std::hash::{Hash, Hasher};
1535 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1536 for reg in stream_regs.values() {
1537 reg.name.hash(&mut hasher);
1538 reg.query_sql.hash(&mut hasher);
1539 }
1540 for name in source_regs.keys() {
1541 name.hash(&mut hasher);
1542 }
1543 for name in sink_regs.keys() {
1544 name.hash(&mut hasher);
1545 }
1546 Some(hasher.finish())
1547 };
1548
1549 graph.set_query_budget_ns(pipeline_config.query_budget_ns);
1551 graph.set_max_input_buf_batches(pipeline_config.max_input_buf_batches);
1552 graph.set_max_input_buf_bytes(pipeline_config.max_input_buf_bytes);
1553 graph.set_backpressure_policy(pipeline_config.backpressure_policy);
1554
1555 let sinks_pending_filter_count = sinks
1556 .iter()
1557 .filter(|(_, _, filter_sql, _, _)| filter_sql.is_some())
1558 .count();
1559
1560 let source_name_arcs: rustc_hash::FxHashMap<usize, Arc<str>> = source_ids
1561 .iter()
1562 .map(|(name, &sid)| (sid, Arc::<str>::from(name.as_str())))
1563 .collect();
1564 let source_wms_buf = rustc_hash::FxHashMap::with_capacity_and_hasher(
1565 source_name_arcs.len(),
1566 rustc_hash::FxBuildHasher,
1567 );
1568
1569 let prom = self
1570 .engine_metrics
1571 .lock()
1572 .clone()
1573 .expect("EngineMetrics must be set before start()");
1574
1575 let (force_ckpt_tx, force_ckpt_rx) = crossfire::mpsc::bounded_async::<
1581 crate::db::ForceCheckpointReply,
1582 >(crate::db::FORCE_CHECKPOINT_CHANNEL_CAPACITY);
1583 *self.force_ckpt_tx.lock() = Some(force_ckpt_tx);
1584
1585 let (checkpoint_complete_tx, checkpoint_complete_rx) = crossfire::mpsc::bounded_async::<(
1586 u64,
1587 rustc_hash::FxHashMap<String, laminar_connectors::checkpoint::SourceCheckpoint>,
1588 )>(16);
1589 let has_exactly_once_sink = sinks
1599 .iter()
1600 .any(|(_, handle, _, _, _)| handle.exactly_once());
1601 let checkpoint_in_flight = Arc::new(std::sync::atomic::AtomicU64::new(0));
1602 let staged_bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
1603 let (epoch_allocator, ckpt_quorum_timeout, max_in_flight_epochs, max_staged_bytes) = {
1604 let guard = coordinator.lock().await;
1605 match guard.as_ref() {
1606 Some(coord) => {
1607 let cfg = coord.config();
1608 let depth = if has_exactly_once_sink
1609 || pipeline_config.delivery_guarantee
1610 == laminar_connectors::connector::DeliveryGuarantee::ExactlyOnce
1611 {
1612 1
1613 } else {
1614 cfg.max_in_flight_epochs.max(1)
1615 };
1616 (
1617 Some(coord.epoch_allocator()),
1618 cfg.quorum_timeout,
1619 depth,
1620 cfg.max_staged_bytes.max(1),
1622 )
1623 }
1624 None => (None, std::time::Duration::from_secs(3), 1, u64::MAX),
1625 }
1626 };
1627 #[cfg(not(feature = "cluster"))]
1628 let _ = ckpt_quorum_timeout;
1629
1630 let static_stream_names: rustc_hash::FxHashSet<Arc<str>> = stream_sources
1631 .iter()
1632 .map(|(name, _)| Arc::from(name.as_str()))
1633 .collect();
1634
1635 let callback = crate::pipeline_callback::ConnectorPipelineCallback {
1636 graph,
1637 stream_sources,
1638 sinks,
1639 watermark_states,
1640 source_entries_for_wm,
1641 source_ids,
1642 source_name_arcs,
1643 source_wms_buf,
1644 tracker,
1645 prom,
1646 pipeline_watermark,
1647 coordinator,
1648 table_sources,
1649 table_store: table_store_for_loop,
1650 mv_store_has_any: self.mv_store.read().has_any_handle(),
1651 mv_store: self.mv_store.clone(),
1652 lookup_registry: Arc::clone(&self.lookup_registry),
1653 filter_ctx: laminar_sql::create_session_context(),
1654 compiled_sink_filters: Vec::new(),
1655 pending_sink_filter_compiles: sinks_pending_filter_count,
1656 last_checkpoint: std::time::Instant::now(),
1657 checkpoint_interval: self
1658 .config
1659 .checkpoint
1660 .as_ref()
1661 .and_then(|c| c.interval_ms)
1662 .map(std::time::Duration::from_millis),
1663 pipeline_hash,
1664 delivery_guarantee: pipeline_config.delivery_guarantee,
1665 serialization_timeout: std::time::Duration::from_secs(120),
1666 sink_event_rx,
1667 sink_timed_out: false,
1668 shutdown_signal: Arc::clone(&self.shutdown_signal),
1669 #[cfg(feature = "cluster")]
1670 cluster_controller: self.cluster_controller.lock().clone(),
1671 #[cfg(feature = "cluster")]
1672 follower_tail: Arc::default(),
1673 #[cfg(feature = "cluster")]
1674 barrier_injectors: Vec::new(),
1675 #[cfg(feature = "cluster")]
1676 pending_follower_checkpoint: None,
1677 force_ckpt_rx: Some(force_ckpt_rx),
1678 subscription_registry: Arc::clone(&self.subscription_registry),
1679 #[cfg(feature = "cluster")]
1680 active_subs: Arc::clone(&self.active_subs),
1681 #[cfg(feature = "cluster")]
1682 sub_route: std::sync::OnceLock::new(),
1683 static_stream_names,
1684 checkpoint_complete_tx,
1685 checkpoint_in_flight: Arc::clone(&checkpoint_in_flight),
1686 staged_bytes: Arc::clone(&staged_bytes),
1687 epoch_allocator,
1688 #[cfg(feature = "cluster")]
1689 quorum_timeout: ckpt_quorum_timeout,
1690 exactly_once_sinks: has_exactly_once_sink,
1691 };
1692
1693 {
1698 let (control_tx, control_rx) =
1700 crossfire::mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1701 *self.control_tx.lock() = Some(control_tx);
1702
1703 let coordinator = crate::pipeline::StreamingCoordinator::new(
1704 sources,
1705 pipeline_config,
1706 Arc::clone(&shutdown),
1707 control_rx,
1708 )
1709 .await?
1710 .with_checkpoint_complete_rx(checkpoint_complete_rx)
1711 .with_checkpoint_admission(
1712 checkpoint_in_flight,
1713 max_in_flight_epochs,
1714 staged_bytes,
1715 max_staged_bytes,
1716 );
1717
1718 let (done_tx, done_rx) = crossfire::oneshot::oneshot::<()>();
1719 let (startup_tx, startup_rx) = crossfire::oneshot::oneshot::<Result<(), String>>();
1720 match std::thread::Builder::new()
1721 .name("laminar-compute".into())
1722 .spawn(move || {
1723 let rt = match tokio::runtime::Builder::new_current_thread()
1724 .enable_all()
1725 .build()
1726 {
1727 Ok(rt) => {
1728 startup_tx.send(Ok(()));
1729 rt
1730 }
1731 Err(e) => {
1732 startup_tx.send(Err(format!("compute runtime: {e}")));
1733 return;
1734 }
1735 };
1736 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1737 rt.block_on(async move {
1738 coordinator.run(callback).await;
1739 });
1740 }));
1741 if let Err(panic) = result {
1742 let msg = panic
1743 .downcast_ref::<String>()
1744 .map(String::as_str)
1745 .or_else(|| panic.downcast_ref::<&str>().copied())
1746 .unwrap_or("unknown");
1747 tracing::error!(panic = msg, "laminar-compute thread panicked");
1748 return;
1750 }
1751 done_tx.send(());
1752 }) {
1753 Ok(_) => {}
1754 Err(e) => {
1755 return Err(DbError::Config(format!(
1756 "failed to spawn compute thread: {e}"
1757 )));
1758 }
1759 }
1760
1761 match startup_rx.await {
1763 Ok(Ok(())) => {}
1764 Ok(Err(e)) => return Err(DbError::Config(e)),
1765 Err(_) => {
1766 return Err(DbError::Config(
1767 "compute thread exited before starting runtime".into(),
1768 ));
1769 }
1770 }
1771
1772 let watcher_state = Arc::clone(&self.state);
1773 let watcher_shutdown = Arc::clone(&self.shutdown_signal);
1774 let handle = tokio::spawn(async move {
1775 if done_rx.await.is_err() {
1776 tracing::error!("laminar-compute thread exited unexpectedly");
1777 DbState::Stopped.store(&watcher_state);
1778 watcher_shutdown.notify_one();
1779 }
1780 });
1781
1782 *self.runtime_handle.lock() = Some(handle);
1783 }
1784 Ok(())
1785 }
1786
1787 pub async fn shutdown(&self) -> Result<(), DbError> {
1793 if matches!(
1794 DbState::load(&self.state),
1795 DbState::Stopped | DbState::ShuttingDown
1796 ) {
1797 return Ok(());
1798 }
1799
1800 DbState::ShuttingDown.store(&self.state);
1801
1802 *self.force_ckpt_tx.lock() = None;
1805
1806 self.shutdown_signal.notify_one();
1807
1808 let handle = self.runtime_handle.lock().take();
1809 if let Some(handle) = handle {
1810 match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
1811 Ok(Ok(())) => tracing::info!("Pipeline shut down cleanly"),
1812 Ok(Err(e)) => tracing::warn!(error = %e, "Pipeline task panicked during shutdown"),
1813 Err(_) => tracing::warn!("Pipeline shutdown timed out after 10s"),
1814 }
1815 }
1816
1817 DbState::Stopped.store(&self.state);
1818 self.close();
1819 Ok(())
1820 }
1821
1822 pub async fn stop_pipeline(&self) -> Result<(), DbError> {
1828 match DbState::compare_exchange(DbState::Running, DbState::ShuttingDown, &self.state) {
1832 Ok(_) => {}
1833 Err(DbState::Created | DbState::Stopped) => return Ok(()),
1835 Err(_) => {
1837 return Err(DbError::InvalidOperation(
1838 "cannot stop pipeline: not running (starting, or a stop already in progress)"
1839 .into(),
1840 ));
1841 }
1842 }
1843
1844 *self.force_ckpt_tx.lock() = None;
1846
1847 self.shutdown_signal.notify_one();
1848
1849 let handle = self.runtime_handle.lock().take();
1851 if let Some(handle) = handle {
1852 match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
1853 Ok(Ok(())) => tracing::info!("Pipeline stopped cleanly"),
1854 Ok(Err(e)) => tracing::warn!(error = %e, "Pipeline task panicked during stop"),
1855 Err(_) => {
1856 tracing::error!("Pipeline stop timed out after 10s; coordinator still running");
1858 return Err(DbError::InvalidOperation(
1859 "pipeline stop timed out; coordinator did not exit".into(),
1860 ));
1861 }
1862 }
1863 }
1864
1865 *self.control_tx.lock() = None;
1867 DbState::Created.store(&self.state);
1868 Ok(())
1869 }
1870}
1871
1872#[cfg(test)]
1873mod resolver_tests {
1874 use super::resolve_stream_output_schemas;
1875 use crate::connector_manager::StreamRegistration;
1876 use arrow_schema::{DataType, Field, Schema};
1877 use datafusion::datasource::empty::EmptyTable;
1878 use datafusion::prelude::SessionContext;
1879 use std::sync::Arc;
1880 use std::time::Duration;
1881
1882 fn ctx_with_payments() -> SessionContext {
1883 let ctx = SessionContext::new();
1884 let schema = Arc::new(Schema::new(vec![
1885 Field::new("region", DataType::Utf8, false),
1886 Field::new("method", DataType::Utf8, false),
1887 Field::new("amount_usd", DataType::Float64, false),
1888 Field::new("status", DataType::Utf8, false),
1889 Field::new(
1890 "event_time",
1891 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1892 false,
1893 ),
1894 ]));
1895 ctx.register_table("payments", Arc::new(EmptyTable::new(schema)))
1896 .unwrap();
1897 ctx.register_udf(datafusion_expr::ScalarUDF::from(
1898 laminar_sql::datafusion::TumbleWindowStart::new(),
1899 ));
1900 ctx
1901 }
1902
1903 fn reg(name: &str, sql: &str, windowed: bool) -> StreamRegistration {
1904 StreamRegistration {
1905 name: name.to_string(),
1906 query_sql: sql.to_string(),
1907 emit_clause: None,
1908 window_config: windowed.then(|| {
1910 laminar_sql::translator::WindowOperatorConfig::tumbling(
1911 "event_time".into(),
1912 Duration::ZERO,
1913 )
1914 }),
1915 order_config: None,
1916 join_config: None,
1917 }
1918 }
1919
1920 #[tokio::test]
1921 async fn windowed_stream_schema_matches_user_select() {
1922 let ctx = ctx_with_payments();
1923 let mut regs = std::collections::HashMap::new();
1924 regs.insert(
1925 "agg".to_string(),
1926 reg(
1927 "agg",
1928 "SELECT region, COUNT(*) AS n FROM payments \
1929 GROUP BY tumble(event_time, INTERVAL '1' MINUTE), region",
1930 true,
1931 ),
1932 );
1933
1934 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
1935 let names: Vec<&str> = out["agg"]
1936 .fields()
1937 .iter()
1938 .map(|f| f.name().as_str())
1939 .collect();
1940 assert_eq!(names, vec!["region", "n"]);
1941 }
1942
1943 #[tokio::test]
1944 async fn windowed_stream_with_explicit_window_columns() {
1945 let ctx = ctx_with_payments();
1946 ctx.register_udf(datafusion_expr::ScalarUDF::from(
1947 laminar_sql::datafusion::TumbleWindowEnd::new(),
1948 ));
1949 let mut regs = std::collections::HashMap::new();
1950 regs.insert(
1951 "agg".to_string(),
1952 reg(
1953 "agg",
1954 "SELECT \
1955 tumble(event_time, INTERVAL '1' MINUTE) AS window_start, \
1956 tumble_end(event_time, INTERVAL '1' MINUTE) AS window_end, \
1957 region, \
1958 COUNT(*) AS n \
1959 FROM payments \
1960 GROUP BY \
1961 tumble(event_time, INTERVAL '1' MINUTE), \
1962 tumble_end(event_time, INTERVAL '1' MINUTE), \
1963 region",
1964 true,
1965 ),
1966 );
1967
1968 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
1969 let names: Vec<&str> = out["agg"]
1970 .fields()
1971 .iter()
1972 .map(|f| f.name().as_str())
1973 .collect();
1974 assert_eq!(names, vec!["window_start", "window_end", "region", "n"]);
1975 assert_eq!(
1976 out["agg"].field(0).data_type(),
1977 &DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1978 );
1979 assert_eq!(
1980 out["agg"].field(1).data_type(),
1981 &DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1982 );
1983 }
1984
1985 #[tokio::test]
1986 async fn non_windowed_stream_has_no_prefix() {
1987 let ctx = ctx_with_payments();
1988 let mut regs = std::collections::HashMap::new();
1989 regs.insert(
1990 "passthrough".to_string(),
1991 reg(
1992 "passthrough",
1993 "SELECT region, amount_usd FROM payments",
1994 false,
1995 ),
1996 );
1997
1998 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
1999 let names: Vec<&str> = out["passthrough"]
2000 .fields()
2001 .iter()
2002 .map(|f| f.name().as_str())
2003 .collect();
2004 assert_eq!(names, vec!["region", "amount_usd"]);
2005 }
2006
2007 #[tokio::test]
2008 async fn chained_streams_resolve_via_iterative_planning() {
2009 let ctx = ctx_with_payments();
2012 let mut regs = std::collections::HashMap::new();
2013 regs.insert(
2014 "b".to_string(),
2015 reg("b", "SELECT region, n + 1 AS n_plus_one FROM a", false),
2016 );
2017 regs.insert(
2018 "a".to_string(),
2019 reg(
2020 "a",
2021 "SELECT region, COUNT(*) AS n FROM payments GROUP BY region",
2022 false,
2023 ),
2024 );
2025
2026 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
2027 let b_names: Vec<&str> = out["b"]
2028 .fields()
2029 .iter()
2030 .map(|f| f.name().as_str())
2031 .collect();
2032 assert_eq!(b_names, vec!["region", "n_plus_one"]);
2033
2034 assert!(!ctx.table_exist("a").unwrap_or(false));
2038 assert!(!ctx.table_exist("b").unwrap_or(false));
2039 }
2040
2041 #[tokio::test]
2042 async fn unresolvable_streams_surface_planner_error() {
2043 let ctx = ctx_with_payments();
2044 let mut regs = std::collections::HashMap::new();
2045 regs.insert("a".to_string(), reg("a", "SELECT * FROM b", false));
2047 regs.insert("b".to_string(), reg("b", "SELECT * FROM a", false));
2048
2049 let err = resolve_stream_output_schemas(&ctx, ®s)
2050 .await
2051 .unwrap_err()
2052 .to_string();
2053 assert!(err.contains("unresolvable stream dependency"), "got: {err}");
2054 assert!(err.contains('a') && err.contains('b'), "got: {err}");
2055 }
2056}