1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10use laminar_core::streaming;
11use rustc_hash::FxHashMap;
12
13use crate::db::{
14 infer_timestamp_format, LaminarDB, SourceWatermarkState, STATE_RUNNING, STATE_SHUTTING_DOWN,
15 STATE_STARTING, STATE_STOPPED,
16};
17use crate::error::DbError;
18
19pub(crate) fn url_to_checkpoint_prefix(url: &str) -> String {
24 let after_scheme = url.find("://").map_or(url, |i| &url[i + 3..]);
26
27 if url.starts_with("file://") {
29 return String::new();
30 }
31
32 if let Some(slash_pos) = after_scheme.find('/') {
34 let prefix = &after_scheme[slash_pos + 1..];
35 if prefix.is_empty() {
36 String::new()
37 } else if prefix.ends_with('/') {
38 prefix.to_string()
39 } else {
40 format!("{prefix}/")
41 }
42 } else {
43 String::new()
44 }
45}
46
47impl LaminarDB {
48 pub fn close(&self) {
50 self.shutdown
51 .store(true, std::sync::atomic::Ordering::Relaxed);
52 }
53
54 pub fn is_closed(&self) -> bool {
56 self.shutdown.load(std::sync::atomic::Ordering::Relaxed)
57 }
58
59 #[allow(clippy::too_many_lines)]
75 pub async fn start(&self) -> Result<(), DbError> {
76 let current = self.state.load(std::sync::atomic::Ordering::Acquire);
77 if current == STATE_RUNNING || current == STATE_STARTING {
78 return Ok(());
79 }
80 if current == STATE_STOPPED {
81 return Err(DbError::InvalidOperation(
82 "Cannot start a stopped pipeline. Create a new LaminarDB instance.".into(),
83 ));
84 }
85
86 self.state
87 .store(STATE_STARTING, std::sync::atomic::Ordering::Release);
88
89 let (source_regs, sink_regs, stream_regs, table_regs, has_external) = {
91 let mgr = self.connector_manager.lock();
92 (
93 mgr.sources().clone(),
94 mgr.sinks().clone(),
95 mgr.streams().clone(),
96 mgr.tables().clone(),
97 mgr.has_external_connectors(),
98 )
99 };
100
101 for (name, reg) in &source_regs {
103 tracing::debug!(source = %name, connector_type = ?reg.connector_type, "Registered source");
104 }
105 for (name, reg) in &sink_regs {
106 tracing::debug!(sink = %name, connector_type = ?reg.connector_type, "Registered sink");
107 }
108
109 if let Some(ref cp_config) = self.config.checkpoint {
111 use crate::checkpoint_coordinator::{
112 CheckpointConfig as CkpConfig, CheckpointCoordinator,
113 };
114
115 let max_retained = cp_config.max_retained.unwrap_or(3);
116
117 let store: Box<dyn laminar_storage::CheckpointStore> =
118 if let Some(ref url) = self.config.object_store_url {
119 let obj_store = laminar_storage::object_store_factory::build_object_store(
120 url,
121 &self.config.object_store_options,
122 )
123 .map_err(|e| DbError::Config(format!("object store: {e}")))?;
124 let prefix = url_to_checkpoint_prefix(url);
125 Box::new(
126 laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
127 obj_store,
128 prefix,
129 max_retained,
130 )
131 .map_err(|e| DbError::Config(format!("checkpoint store runtime: {e}")))?,
132 )
133 } else {
134 let data_dir = cp_config
135 .data_dir
136 .clone()
137 .or_else(|| self.config.storage_dir.clone())
138 .unwrap_or_else(|| std::path::PathBuf::from("./data"));
139 Box::new(
140 laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
141 &data_dir,
142 max_retained,
143 ),
144 )
145 };
146
147 let config = CkpConfig {
148 interval: cp_config.interval_ms.map(std::time::Duration::from_millis),
149 max_retained,
150 ..CkpConfig::default()
151 };
152 let mut coord = CheckpointCoordinator::new(config, store);
153 coord.set_counters(Arc::clone(&self.counters));
154
155 let wal_dir = cp_config
158 .data_dir
159 .clone()
160 .or_else(|| self.config.storage_dir.clone())
161 .unwrap_or_else(|| std::path::PathBuf::from("./data"))
162 .join("wal");
163 let num_cores = self
166 .config
167 .tpc
168 .as_ref()
169 .and_then(|t| t.num_cores)
170 .unwrap_or_else(|| {
171 if has_external {
172 std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
173 } else {
174 source_regs.len().max(1)
175 }
176 });
177 match laminar_storage::per_core_wal::PerCoreWalManager::new(
178 laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, num_cores),
179 ) {
180 Ok(wal) => {
181 tracing::info!(
182 wal_dir = %wal_dir.display(),
183 num_cores,
184 "WAL manager initialized"
185 );
186 coord.register_wal_manager(wal);
187 }
188 Err(e) => {
189 tracing::warn!(
190 error = %e,
191 "WAL initialization failed — running without WAL \
192 (data between checkpoints may be lost on crash)"
193 );
194 }
195 }
196
197 *self.coordinator.lock().await = Some(coord);
198 }
199
200 if has_external || !stream_regs.is_empty() {
201 tracing::info!(
202 sources = source_regs.len(),
203 sinks = sink_regs.len(),
204 streams = stream_regs.len(),
205 tables = table_regs.len(),
206 has_external,
207 "Starting pipeline"
208 );
209 self.start_connector_pipeline(
210 source_regs,
211 sink_regs,
212 stream_regs,
213 table_regs,
214 has_external,
215 )
216 .await?;
217 } else {
218 tracing::info!(
219 sources = source_regs.len(),
220 sinks = sink_regs.len(),
221 "Starting in embedded (in-memory) mode — no streams"
222 );
223 }
224
225 self.state
226 .store(STATE_RUNNING, std::sync::atomic::Ordering::Release);
227 Ok(())
228 }
229
230 #[allow(clippy::too_many_lines)]
246 async fn start_connector_pipeline(
247 &self,
248 source_regs: HashMap<String, crate::connector_manager::SourceRegistration>,
249 sink_regs: HashMap<String, crate::connector_manager::SinkRegistration>,
250 stream_regs: HashMap<String, crate::connector_manager::StreamRegistration>,
251 table_regs: HashMap<String, crate::connector_manager::TableRegistration>,
252 has_external: bool,
253 ) -> Result<(), DbError> {
254 use crate::connector_manager::{
255 build_sink_config, build_source_config, build_table_config,
256 };
257 use crate::pipeline::{PipelineConfig, SourceRegistration, TpcPipelineCoordinator};
258 use crate::stream_executor::StreamExecutor;
259 use laminar_connectors::reference::{ReferenceTableSource, RefreshMode};
260
261 let ctx = laminar_sql::create_session_context();
263 laminar_sql::register_streaming_functions(&ctx);
264 let mut executor = StreamExecutor::new(ctx);
265 executor.set_lookup_registry(Arc::clone(&self.lookup_registry));
266
267 for name in source_regs.keys() {
271 if let Some(entry) = self.catalog.get_source(name) {
272 executor.register_source_schema(name.clone(), entry.schema.clone());
273 }
274 }
275
276 for reg in stream_regs.values() {
277 executor.add_query(
278 reg.name.clone(),
279 reg.query_sql.clone(),
280 reg.emit_clause.clone(),
281 reg.window_config.clone(),
282 reg.order_config.clone(),
283 );
284 }
285
286 for tcfg in executor.temporal_join_configs() {
289 if self.lookup_registry.get_entry(&tcfg.table_name).is_none() {
290 let initial_batch = self
293 .table_store
294 .read()
295 .to_record_batch(&tcfg.table_name)
296 .or_else(|| {
297 self.catalog
298 .get_source(&tcfg.table_name)
299 .map(|e| RecordBatch::new_empty(e.schema.clone()))
300 })
301 .unwrap_or_else(|| {
302 RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty()))
303 });
304 let key_columns = vec![tcfg.table_key_column.clone()];
305 let key_indices: Vec<usize> = key_columns
306 .iter()
307 .filter_map(|k| initial_batch.schema().index_of(k).ok())
308 .collect();
309 let Ok(version_col_idx) =
310 initial_batch.schema().index_of(&tcfg.table_version_column)
311 else {
312 if !initial_batch.schema().fields().is_empty() {
313 tracing::warn!(
314 table=%tcfg.table_name,
315 version_col=%tcfg.table_version_column,
316 "Version column not found in temporal table schema; \
317 will resolve on first CDC batch"
318 );
319 }
320 self.lookup_registry.register_versioned(
322 &tcfg.table_name,
323 laminar_sql::datafusion::VersionedLookupState {
324 batch: initial_batch,
325 index: Arc::new(
326 laminar_sql::datafusion::lookup_join_exec::VersionedIndex::default(
327 ),
328 ),
329 key_columns,
330 version_column: tcfg.table_version_column.clone(),
331 stream_time_column: tcfg.stream_time_column.clone(),
332 },
333 );
334 continue;
335 };
336 let index = Arc::new(
337 laminar_sql::datafusion::lookup_join_exec::VersionedIndex::build(
338 &initial_batch,
339 &key_indices,
340 version_col_idx,
341 )
342 .unwrap_or_default(),
343 );
344 self.lookup_registry.register_versioned(
345 &tcfg.table_name,
346 laminar_sql::datafusion::VersionedLookupState {
347 batch: initial_batch,
348 index,
349 key_columns,
350 version_column: tcfg.table_version_column.clone(),
351 stream_time_column: tcfg.stream_time_column.clone(),
352 },
353 );
354 }
355 }
356
357 let mut sources: Vec<SourceRegistration> = Vec::new();
359 for (name, reg) in &source_regs {
360 if reg.connector_type.is_none() {
361 continue;
362 }
363 let mut config = build_source_config(reg)?;
364
365 if let Some(entry) = self.catalog.get_source(name) {
368 let schema_str = crate::pipeline_callback::encode_arrow_schema(&entry.schema);
369 config.set("_arrow_schema".to_string(), schema_str);
370 }
371
372 let source = self
373 .connector_registry
374 .create_source(&config)
375 .map_err(|e| {
376 DbError::Connector(format!(
377 "Cannot create source '{}' (type '{}'): {e}",
378 name,
379 config.connector_type()
380 ))
381 })?;
382 let supports_replay = source.supports_replay();
383 if !supports_replay {
384 tracing::warn!(
385 source = %name,
386 "source does not support replay — exactly-once semantics \
387 are degraded to at-most-once for this source"
388 );
389 }
390 if let Some(entry) = self.catalog.get_source(name) {
393 if entry.source.event_time_column().is_none() {
394 if let Some(col) = config.get("event.time.column") {
395 entry.source.set_event_time_column(col);
396 } else if let Some(col) = config.get("event.time.field") {
397 entry.source.set_event_time_column(col);
398 }
399 }
400 }
401
402 sources.push(SourceRegistration {
403 name: name.clone(),
404 connector: source,
405 config,
406 supports_replay,
407 restore_checkpoint: None, });
409 }
410
411 let bridged_names: rustc_hash::FxHashSet<String> =
419 sources.iter().map(|s| s.name.clone()).collect();
420 for (name, reg) in &source_regs {
422 if reg.connector_type.is_some() {
423 continue; }
425 if let Some(entry) = self.catalog.get_source(name) {
426 let subscription = entry.sink.subscribe();
427 let connector = crate::catalog_connector::CatalogSourceConnector::new(
428 subscription,
429 entry.schema.clone(),
430 entry.data_notify(),
431 );
432 sources.push(SourceRegistration {
433 name: name.clone(),
434 connector: Box::new(connector),
435 config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
436 supports_replay: false,
437 restore_checkpoint: None,
438 });
439 }
440 }
441 for name in self.catalog.list_sources() {
444 if bridged_names.contains(&name) || source_regs.contains_key(&name) {
445 continue;
446 }
447 if let Some(entry) = self.catalog.get_source(&name) {
448 executor.register_source_schema(name.clone(), entry.schema.clone());
449 let subscription = entry.sink.subscribe();
450 let connector = crate::catalog_connector::CatalogSourceConnector::new(
451 subscription,
452 entry.schema.clone(),
453 entry.data_notify(),
454 );
455 sources.push(SourceRegistration {
456 name: name.clone(),
457 connector: Box::new(connector),
458 config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
459 supports_replay: false,
460 restore_checkpoint: None,
461 });
462 }
463 }
464
465 #[allow(clippy::type_complexity)]
470 let mut sinks: Vec<(
471 String,
472 crate::sink_task::SinkTaskHandle,
473 Option<String>,
474 String, )> = Vec::new();
476 for (name, reg) in &sink_regs {
477 if reg.connector_type.is_none() {
478 continue;
479 }
480 let config = build_sink_config(reg)?;
481 let mut sink = self.connector_registry.create_sink(&config).map_err(|e| {
482 DbError::Connector(format!(
483 "Cannot create sink '{}' (type '{}'): {e}",
484 name,
485 config.connector_type()
486 ))
487 })?;
488 sink.open(&config)
490 .await
491 .map_err(|e| DbError::Connector(format!("Failed to open sink '{name}': {e}")))?;
492 let exactly_once = sink.capabilities().exactly_once;
493 let handle = crate::sink_task::SinkTaskHandle::spawn(name.clone(), sink, exactly_once);
494 sinks.push((
495 name.clone(),
496 handle,
497 reg.filter_expr.clone(),
498 reg.input.clone(),
499 ));
500 }
501
502 let mut table_sources: Vec<(String, Box<dyn ReferenceTableSource>, RefreshMode)> =
504 Vec::new();
505 for (name, reg) in &table_regs {
506 if reg.connector_type.is_none() {
507 continue;
508 }
509 let config = build_table_config(reg)?;
510 let source = self
511 .connector_registry
512 .create_table_source(&config)
513 .map_err(|e| {
514 DbError::Connector(format!("Cannot create table source '{name}': {e}"))
515 })?;
516 let mode = reg.refresh.clone().unwrap_or(RefreshMode::SnapshotPlusCdc);
517 table_sources.push((name.clone(), source, mode));
518 }
519
520 {
524 let mut guard = self.coordinator.lock().await;
525 if let Some(ref mut coord) = *guard {
526 for (name, handle, _, _) in &sinks {
527 let exactly_once = handle.exactly_once();
528 coord.register_sink(name.clone(), handle.clone(), exactly_once);
529 }
530 }
531 }
532
533 {
537 let mut guard = self.coordinator.lock().await;
538 if let Some(ref mut coord) = *guard {
539 match coord.recover().await {
540 Ok(Some(recovered)) => {
541 for (name, source, _) in &mut table_sources {
542 if let Some(cp) = recovered.manifest.table_offsets.get(name) {
543 let restored =
544 crate::checkpoint_coordinator::connector_to_source_checkpoint(
545 cp,
546 );
547 if let Err(e) = source.restore(&restored).await {
548 tracing::warn!(
549 table=%name, error=%e,
550 "Table source restore failed"
551 );
552 }
553 }
554 }
555 for src in &mut sources {
560 if !src.supports_replay {
561 continue;
562 }
563 if let Some(cp) = recovered.manifest.source_offsets.get(&src.name) {
564 let restored =
565 crate::checkpoint_coordinator::connector_to_source_checkpoint(
566 cp,
567 );
568 tracing::info!(
569 source = %src.name,
570 offsets = cp.offsets.len(),
571 "attaching checkpoint offsets for source recovery"
572 );
573 src.restore_checkpoint = Some(restored);
574 }
575 }
576 if let Some(op) = recovered.manifest.operator_states.get("stream_executor")
578 {
579 if let Some(bytes) = op.decode_inline() {
580 match executor.restore_state(&bytes) {
581 Ok(n) => {
582 tracing::info!(
583 queries = n,
584 "Restored stream executor state from checkpoint"
585 );
586 }
587 Err(e) => {
588 tracing::warn!(
589 error = %e,
590 "Stream executor state restore failed, starting fresh"
591 );
592 }
593 }
594 }
595 }
596 tracing::info!(
597 epoch = recovered.epoch(),
598 sources_restored = recovered.sources_restored,
599 sinks_rolled_back = recovered.sinks_rolled_back,
600 "Recovered from unified checkpoint"
601 );
602 }
603 Ok(None) => {
604 tracing::info!("No checkpoint found, starting fresh");
605 }
606 Err(e) => {
607 tracing::warn!(error = %e, "Checkpoint recovery failed, starting fresh");
608 }
609 }
610 }
611 }
612
613 {
616 let guard = self.coordinator.lock().await;
617 if let Some(ref coord) = *guard {
618 coord.begin_initial_epoch().await?;
619 }
620 }
621
622 for (name, source, mode) in &mut table_sources {
624 if matches!(mode, RefreshMode::Manual) {
625 continue;
626 }
627 while let Some(batch) = source
628 .poll_snapshot()
629 .await
630 .map_err(|e| DbError::Connector(format!("Table '{name}' snapshot error: {e}")))?
631 {
632 self.table_store
633 .write()
634 .upsert(name, &batch)
635 .map_err(|e| DbError::Connector(format!("Table '{name}' upsert error: {e}")))?;
636 }
637 self.sync_table_to_datafusion(name)?;
638 {
639 let mut ts = self.table_store.write();
640 ts.rebuild_xor_filter(name);
641 ts.set_ready(name, true);
642 }
643 if matches!(
647 self.lookup_registry.get_entry(name),
648 Some(laminar_sql::datafusion::RegisteredLookup::Versioned(_))
649 ) {
650 } else if let Some(batch) = self.table_store.read().to_record_batch(name) {
652 self.lookup_registry.register(
653 name,
654 laminar_sql::datafusion::LookupSnapshot {
655 batch,
656 key_columns: vec![], },
658 );
659 }
660 }
661
662 let mut stream_sources: Vec<(String, streaming::Source<crate::catalog::ArrowRecord>)> =
664 Vec::new();
665 for reg in stream_regs.values() {
666 if let Some(src) = self.catalog.get_stream_source(®.name) {
667 stream_sources.push((reg.name.clone(), src));
668 }
669 }
670
671 let source_names = self.catalog.list_sources();
673 let mut watermark_states: FxHashMap<String, SourceWatermarkState> =
674 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
675 let mut source_entries_for_wm: FxHashMap<String, Arc<crate::catalog::SourceEntry>> =
676 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
677 let mut source_ids: FxHashMap<String, usize> =
678 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
679 for name in source_names {
680 if let Some(entry) = self.catalog.get_source(&name) {
681 if let (Some(col), Some(dur)) =
682 (&entry.watermark_column, entry.max_out_of_orderness)
683 {
684 let format = infer_timestamp_format(&entry.schema, col);
685 let extractor =
686 laminar_core::time::EventTimeExtractor::from_column(col, format)
687 .with_mode(laminar_core::time::ExtractionMode::Max);
688 let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
689 .is_processing_time
690 .load(std::sync::atomic::Ordering::Relaxed)
691 {
692 Box::new(laminar_core::time::ProcessingTimeGenerator::new())
693 } else {
694 Box::new(
695 laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(dur),
696 )
697 };
698 let id = source_ids.len();
699 source_ids.insert(name.clone(), id);
700 watermark_states.insert(
701 name.clone(),
702 SourceWatermarkState {
703 extractor,
704 generator,
705 column: col.clone(),
706 format,
707 },
708 );
709 }
710 source_entries_for_wm.insert(name, entry);
711 }
712 }
713
714 for name in self.catalog.list_sources() {
717 if watermark_states.contains_key(&name) {
718 continue;
719 }
720 if let Some(entry) = self.catalog.get_source(&name) {
721 if let Some(col) = entry.source.event_time_column() {
722 let format = infer_timestamp_format(&entry.schema, &col);
723 let extractor =
724 laminar_core::time::EventTimeExtractor::from_column(&col, format)
725 .with_mode(laminar_core::time::ExtractionMode::Max);
726 let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
727 .is_processing_time
728 .load(std::sync::atomic::Ordering::Relaxed)
729 {
730 Box::new(laminar_core::time::ProcessingTimeGenerator::new())
731 } else {
732 Box::new(
733 laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(
734 std::time::Duration::ZERO,
735 ),
736 )
737 };
738 let id = source_ids.len();
739 source_ids.insert(name.clone(), id);
740 watermark_states.insert(
741 name.clone(),
742 SourceWatermarkState {
743 extractor,
744 generator,
745 column: col,
746 format,
747 },
748 );
749 }
750 }
751 }
752
753 let tracker = if source_ids.is_empty() {
754 None
755 } else {
756 Some(laminar_core::time::WatermarkTracker::new(source_ids.len()))
757 };
758
759 let max_poll = self.config.default_buffer_size.min(1024);
760 let checkpoint_interval = self
761 .config
762 .checkpoint
763 .as_ref()
764 .and_then(|c| c.interval_ms)
765 .map(std::time::Duration::from_millis);
766
767 tracing::info!(
768 sources = sources.len(),
769 sinks = sinks.len(),
770 streams = stream_regs.len(),
771 subscriptions = stream_sources.len(),
772 watermark_sources = source_ids.len(),
773 "Starting event-driven connector pipeline"
774 );
775
776 let pipeline_config = PipelineConfig {
782 max_poll_records: max_poll,
783 channel_capacity: 64,
784 fallback_poll_interval: if has_external {
785 std::time::Duration::from_millis(10)
786 } else {
787 std::time::Duration::from_millis(1)
788 },
789 checkpoint_interval,
790 batch_window: if has_external {
791 std::time::Duration::from_millis(5)
792 } else {
793 std::time::Duration::ZERO
794 },
795 barrier_alignment_timeout: std::time::Duration::from_secs(30),
796 delivery_guarantee: laminar_connectors::connector::DeliveryGuarantee::default(),
797 };
798
799 {
801 use laminar_connectors::connector::DeliveryGuarantee;
802
803 if pipeline_config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
804 for src in &sources {
805 if !src.supports_replay {
806 return Err(DbError::Config(format!(
807 "[LDB-5030] exactly-once requires all sources to support replay, \
808 but source '{}' does not. Use at-least-once or remove this source.",
809 src.name
810 )));
811 }
812 }
813 for (name, handle, _, _) in &sinks {
814 if !handle.exactly_once() {
815 return Err(DbError::Config(format!(
816 "[LDB-5031] exactly-once requires all sinks to support \
817 exactly-once semantics, but sink '{name}' does not. \
818 Use at-least-once or configure a transactional sink."
819 )));
820 }
821 }
822 if pipeline_config.checkpoint_interval.is_none() {
823 return Err(DbError::Config(
824 "[LDB-5032] exactly-once requires checkpointing to be enabled. \
825 Set checkpoint.interval.ms in the pipeline configuration."
826 .into(),
827 ));
828 }
829 } else if pipeline_config.delivery_guarantee == DeliveryGuarantee::AtLeastOnce {
830 let has_non_replayable = sources.iter().any(|s| !s.supports_replay);
831 let has_eo_sink = sinks.iter().any(|(_, h, _, _)| h.exactly_once());
832 if has_non_replayable && has_eo_sink {
833 tracing::warn!(
834 "[LDB-5033] pipeline has exactly-once sinks but some sources \
835 do not support replay — effective guarantee is at-most-once \
836 for events from non-replayable sources"
837 );
838 }
839 }
840 }
841
842 let shutdown = self.shutdown_signal.clone();
843
844 let counters = Arc::clone(&self.counters);
846 let pipeline_watermark = Arc::clone(&self.pipeline_watermark);
847 let checkpoint_in_progress = Arc::new(std::sync::atomic::AtomicBool::new(false));
848 let coordinator = Arc::clone(&self.coordinator);
849 let table_store_for_loop = self.table_store.clone();
850 let pipeline_hash = {
852 use std::hash::{Hash, Hasher};
853 let mut hasher = std::collections::hash_map::DefaultHasher::new();
854 for reg in stream_regs.values() {
855 reg.name.hash(&mut hasher);
856 reg.query_sql.hash(&mut hasher);
857 }
858 for name in source_regs.keys() {
859 name.hash(&mut hasher);
860 }
861 for name in sink_regs.keys() {
862 name.hash(&mut hasher);
863 }
864 Some(hasher.finish())
865 };
866
867 let callback = crate::pipeline_callback::ConnectorPipelineCallback {
868 executor,
869 stream_sources,
870 sinks,
871 watermark_states,
872 source_entries_for_wm,
873 source_ids,
874 tracker,
875 counters,
876 pipeline_watermark,
877 checkpoint_in_progress,
878 coordinator,
879 table_sources,
880 table_store: table_store_for_loop,
881 lookup_registry: Arc::clone(&self.lookup_registry),
882 filter_ctx: laminar_sql::create_session_context(),
883 last_checkpoint: std::time::Instant::now(),
884 checkpoint_interval: self
885 .config
886 .checkpoint
887 .as_ref()
888 .and_then(|c| c.interval_ms)
889 .map(std::time::Duration::from_millis),
890 pipeline_hash,
891 delivery_guarantee: pipeline_config.delivery_guarantee,
892 };
893
894 {
896 use laminar_core::tpc::TpcConfig;
897
898 let tpc_cfg = self.config.tpc.clone().unwrap_or_default();
899 let num_sources = sources.len().max(1);
900 let num_cores = tpc_cfg.num_cores.unwrap_or_else(|| {
901 if has_external {
902 std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
903 } else {
904 num_sources
906 }
907 });
908 let num_cores = num_cores.max(num_sources);
912 if let Some(configured) = tpc_cfg.num_cores {
913 if configured < num_sources {
914 tracing::warn!(
915 configured_cores = configured,
916 required_cores = num_sources,
917 "Overriding num_cores to match source count \
918 (SPSC single-producer invariant)"
919 );
920 }
921 }
922 let tpc_config = TpcConfig {
923 num_cores,
924 cpu_pinning: tpc_cfg.cpu_pinning,
925 cpu_start: tpc_cfg.cpu_start,
926 numa_aware: tpc_cfg.numa_aware,
927 ..Default::default()
928 };
929
930 let tpc_coordinator = TpcPipelineCoordinator::new(
931 sources,
932 pipeline_config,
933 &tpc_config,
934 Arc::clone(&shutdown),
935 )?;
936
937 let handle = tokio::spawn(async move {
938 tpc_coordinator.run(Box::new(callback)).await;
939 });
940
941 *self.runtime_handle.lock() = Some(handle);
942 }
943 Ok(())
944 }
945
946 pub async fn shutdown(&self) -> Result<(), DbError> {
956 let current = self.state.load(std::sync::atomic::Ordering::Acquire);
957 if current == STATE_STOPPED || current == STATE_SHUTTING_DOWN {
958 return Ok(());
959 }
960
961 self.state
962 .store(STATE_SHUTTING_DOWN, std::sync::atomic::Ordering::Release);
963
964 self.shutdown_signal.notify_one();
966
967 let handle = self.runtime_handle.lock().take();
969 if let Some(handle) = handle {
970 match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
971 Ok(Ok(())) => {
972 tracing::info!("Pipeline shut down cleanly");
973 }
974 Ok(Err(e)) => {
975 tracing::warn!(error = %e, "Pipeline task panicked during shutdown");
976 }
977 Err(_) => {
978 tracing::warn!("Pipeline shutdown timed out after 10s");
979 }
980 }
981 }
982
983 self.state
984 .store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
985 self.close();
986 Ok(())
987 }
988}