1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use laminar_core::streaming;
9use rustc_hash::FxHashMap;
10
11use crate::db::{
12 LaminarDB, SourceWatermarkState, STATE_CREATED, STATE_RUNNING, STATE_SHUTTING_DOWN,
13 STATE_STARTING, STATE_STOPPED,
14};
15use crate::error::DbError;
16
17async fn resolve_stream_output_schemas(
22 ctx: &datafusion::prelude::SessionContext,
23 stream_regs: &HashMap<String, crate::connector_manager::StreamRegistration>,
24) -> Result<HashMap<String, arrow_schema::SchemaRef>, DbError> {
25 use arrow_schema::Schema;
26 use datafusion::datasource::empty::EmptyTable;
27
28 let mut out: HashMap<String, arrow_schema::SchemaRef> =
29 HashMap::with_capacity(stream_regs.len());
30 let mut pending: Vec<&crate::connector_manager::StreamRegistration> =
31 stream_regs.values().collect();
32 let mut placeholders: Vec<String> = Vec::new();
34
35 let result: Result<(), DbError> = async {
36 while !pending.is_empty() {
37 let mut next: Vec<&crate::connector_manager::StreamRegistration> = Vec::new();
38 let mut progressed = false;
39 for reg in pending {
40 let Ok(plan) = ctx.state().create_logical_plan(®.query_sql).await else {
41 next.push(reg);
42 continue;
43 };
44
45 let mut fields = if reg.window_config.is_some() {
46 laminar_sql::translator::WindowOperatorConfig::output_prefix_fields()
47 } else {
48 Vec::new()
49 };
50 for f in plan.schema().fields() {
51 fields.push((**f).clone());
52 }
53 let schema = Arc::new(Schema::new(fields));
54
55 if !ctx.table_exist(®.name).unwrap_or(false) {
56 ctx.register_table(®.name, Arc::new(EmptyTable::new(schema.clone())))
57 .map_err(|e| {
58 DbError::Pipeline(format!(
59 "could not register placeholder for stream '{}': {e}",
60 reg.name
61 ))
62 })?;
63 placeholders.push(reg.name.clone());
64 }
65 out.insert(reg.name.clone(), schema);
66 progressed = true;
67 }
68
69 if !progressed {
70 let mut unresolved: Vec<&str> = next.iter().map(|r| r.name.as_str()).collect();
71 unresolved.sort_unstable();
72 let err = ctx
73 .state()
74 .create_logical_plan(&next[0].query_sql)
75 .await
76 .err()
77 .map_or_else(|| "unknown error".to_string(), |e| e.to_string());
78 return Err(DbError::Pipeline(format!(
79 "unresolvable stream dependency among [{}]: {err}",
80 unresolved.join(", ")
81 )));
82 }
83 pending = next;
84 }
85 Ok(())
86 }
87 .await;
88
89 for name in &placeholders {
90 let _ = ctx.deregister_table(name);
91 }
92
93 result.map(|()| out)
94}
95
96pub(crate) fn url_to_checkpoint_prefix(url: &str) -> String {
97 let after_scheme = url.find("://").map_or(url, |i| &url[i + 3..]);
99
100 if url.starts_with("file://") {
102 return String::new();
103 }
104
105 if let Some(slash_pos) = after_scheme.find('/') {
107 let prefix = &after_scheme[slash_pos + 1..];
108 if prefix.is_empty() {
109 String::new()
110 } else if prefix.ends_with('/') {
111 prefix.to_string()
112 } else {
113 format!("{prefix}/")
114 }
115 } else {
116 String::new()
117 }
118}
119
120impl LaminarDB {
121 pub fn close(&self) {
123 self.shutdown
124 .store(true, std::sync::atomic::Ordering::Relaxed);
125 }
126
127 pub fn is_closed(&self) -> bool {
129 self.shutdown.load(std::sync::atomic::Ordering::Relaxed)
130 }
131
132 pub(crate) fn is_pipeline_running(&self) -> bool {
133 let s = self.state.load(std::sync::atomic::Ordering::Acquire);
134 s == STATE_RUNNING || s == STATE_STARTING || s == STATE_SHUTTING_DOWN
135 }
136
137 pub async fn start(&self) -> Result<(), DbError> {
155 let current = self.state.load(std::sync::atomic::Ordering::Acquire);
156 if current == STATE_RUNNING || current == STATE_STARTING {
157 return Ok(());
158 }
159 if current == STATE_STOPPED {
160 return Err(DbError::InvalidOperation(
161 "Cannot start a stopped pipeline. Create a new LaminarDB instance.".into(),
162 ));
163 }
164
165 self.state
166 .store(STATE_STARTING, std::sync::atomic::Ordering::Release);
167
168 {
170 let mut guard = self.engine_metrics.lock();
171 if guard.is_none() {
172 *guard = Some(Arc::new(crate::engine_metrics::EngineMetrics::new(
173 &prometheus::Registry::new(),
174 )));
175 }
176 }
177
178 match self.start_inner().await {
179 Ok(()) => {
180 self.state
181 .store(STATE_RUNNING, std::sync::atomic::Ordering::Release);
182 Ok(())
183 }
184 Err(e) => {
185 self.state
189 .store(STATE_CREATED, std::sync::atomic::Ordering::Release);
190 Err(e)
191 }
192 }
193 }
194
195 #[allow(clippy::too_many_lines)]
199 async fn start_inner(&self) -> Result<(), DbError> {
200 let (source_regs, sink_regs, stream_regs, table_regs, has_external) = {
202 let mgr = self.connector_manager.lock();
203 (
204 mgr.sources().clone(),
205 mgr.sinks().clone(),
206 mgr.streams().clone(),
207 mgr.tables().clone(),
208 mgr.has_external_connectors(),
209 )
210 };
211
212 for (name, reg) in &source_regs {
214 tracing::debug!(source = %name, connector_type = ?reg.connector_type, "Registered source");
215 }
216 for (name, reg) in &sink_regs {
217 tracing::debug!(sink = %name, connector_type = ?reg.connector_type, "Registered sink");
218 }
219
220 if let Some(ref cp_config) = self.config.checkpoint {
222 use crate::checkpoint_coordinator::{
223 CheckpointConfig as CkpConfig, CheckpointCoordinator,
224 };
225
226 let max_retained = cp_config.max_retained.unwrap_or(3);
227 let vnode_count = self.vnode_registry.lock().as_ref().map_or(
228 laminar_storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
229 |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
230 );
231
232 let store: Box<dyn laminar_storage::CheckpointStore> =
233 if let Some(ref url) = self.config.object_store_url {
234 let obj_store = laminar_storage::object_store_builder::build_object_store(
235 url,
236 &self.config.object_store_options,
237 )
238 .map_err(|e| DbError::Config(format!("object store: {e}")))?;
239 let prefix = url_to_checkpoint_prefix(url);
240 Box::new(
241 laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
242 obj_store,
243 prefix,
244 max_retained,
245 )
246 .with_vnode_count(vnode_count),
247 )
248 } else {
249 let data_dir = cp_config
250 .data_dir
251 .clone()
252 .or_else(|| self.config.storage_dir.clone())
253 .unwrap_or_else(|| std::path::PathBuf::from("./data"));
254 Box::new(
255 laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
256 &data_dir,
257 max_retained,
258 )
259 .with_vnode_count(vnode_count),
260 )
261 };
262
263 let config = CkpConfig {
264 interval: cp_config.interval_ms.map(std::time::Duration::from_millis),
265 max_retained,
266 ..CkpConfig::default()
267 };
268 let mut coord = CheckpointCoordinator::new(config, store).await?;
269 if let Some(ref prom) = *self.engine_metrics.lock() {
270 coord.set_metrics(Arc::clone(prom));
271 }
272
273 #[cfg(feature = "cluster-unstable")]
277 if let Some(controller) = self.cluster_controller.lock().clone() {
278 coord.set_cluster_controller(controller);
279 }
280
281 if let (Some(backend), Some(registry)) = (
287 self.state_backend.lock().clone(),
288 self.vnode_registry.lock().clone(),
289 ) {
290 let owner = {
291 #[cfg(feature = "cluster-unstable")]
292 {
293 self.cluster_controller
294 .lock()
295 .as_ref()
296 .map_or(laminar_core::state::NodeId(0), |c| {
297 laminar_core::state::NodeId(c.instance_id().0)
298 })
299 }
300 #[cfg(not(feature = "cluster-unstable"))]
301 {
302 laminar_core::state::NodeId(0)
303 }
304 };
305 let version = registry.assignment_version();
314 backend.set_authoritative_version(version);
315 coord.set_state_backend(backend);
316 coord.set_assignment_version(version);
317 coord.set_vnode_set(laminar_core::state::owned_vnodes(®istry, owner));
318 coord.set_gate_vnode_set((0..registry.vnode_count()).collect());
322 }
323
324 #[cfg(feature = "cluster-unstable")]
330 if let Some(ds) = self.decision_store.lock().clone() {
331 coord.set_decision_store(ds);
332 }
333
334 #[cfg(feature = "cluster-unstable")]
341 coord.reconcile_prepared_on_init().await;
342
343 *self.coordinator.lock().await = Some(coord);
344 }
345
346 if has_external || !stream_regs.is_empty() {
347 tracing::info!(
348 sources = source_regs.len(),
349 sinks = sink_regs.len(),
350 streams = stream_regs.len(),
351 tables = table_regs.len(),
352 has_external,
353 "Starting pipeline"
354 );
355 self.start_connector_pipeline(
356 source_regs,
357 sink_regs,
358 stream_regs,
359 table_regs,
360 has_external,
361 )
362 .await?;
363 } else {
364 tracing::info!(
365 sources = source_regs.len(),
366 sinks = sink_regs.len(),
367 "Starting in embedded (in-memory) mode — no streams"
368 );
369 }
370
371 Ok(())
372 }
373
374 #[allow(clippy::too_many_lines)]
385 async fn start_connector_pipeline(
386 &self,
387 source_regs: HashMap<String, crate::connector_manager::SourceRegistration>,
388 sink_regs: HashMap<String, crate::connector_manager::SinkRegistration>,
389 stream_regs: HashMap<String, crate::connector_manager::StreamRegistration>,
390 table_regs: HashMap<String, crate::connector_manager::TableRegistration>,
391 has_external: bool,
392 ) -> Result<(), DbError> {
393 use crate::connector_manager::{
394 build_sink_config, build_source_config, build_table_config,
395 };
396 use crate::operator_graph::OperatorGraph;
397 use crate::pipeline::{PipelineConfig, SourceRegistration};
398 use laminar_connectors::reference::{ReferenceTableSource, RefreshMode};
399
400 let ctx = {
403 use datafusion::execution::SessionStateBuilder;
404 let mut session_config = laminar_sql::datafusion::base_session_config();
405 if let Some(n) = self.pipeline_target_partitions {
406 session_config = session_config.with_target_partitions(n);
407 }
408 let mut state_builder = SessionStateBuilder::new()
409 .with_config(session_config)
410 .with_default_features();
411 for rule in self.physical_optimizer_rules.iter() {
412 state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
413 }
414 datafusion::prelude::SessionContext::new_with_state(state_builder.build())
415 };
416 laminar_sql::register_streaming_functions(&ctx);
417
418 let lookup_tables: Vec<(String, arrow::datatypes::SchemaRef)> = {
421 let ts = self.table_store.read();
422 ts.table_names()
423 .into_iter()
424 .filter_map(|name| {
425 let schema = ts.table_schema(&name)?;
426 Some((name, schema))
427 })
428 .collect()
429 };
430 for (name, schema) in lookup_tables {
431 let provider = crate::table_provider::ReferenceTableProvider::new(
432 name.clone(),
433 schema,
434 self.table_store.clone(),
435 );
436 if let Err(e) = ctx.register_table(&name, Arc::new(provider)) {
437 tracing::warn!(
438 table = %name,
439 error = %e,
440 "failed to register lookup table in operator graph context"
441 );
442 }
443 }
444
445 let mut graph = OperatorGraph::new(ctx);
446 graph.set_max_state_bytes(self.config.max_state_bytes_per_operator);
447 graph.set_lookup_registry(Arc::clone(&self.lookup_registry));
448 if let Some(ref prom) = *self.engine_metrics.lock() {
449 graph.set_metrics(Arc::clone(prom));
450 }
451
452 #[cfg(feature = "cluster-unstable")]
456 {
457 let sender = self.shuffle_sender.lock().clone();
458 let receiver = self.shuffle_receiver.lock().clone();
459 let registry = self.vnode_registry.lock().clone();
460 let controller = self.cluster_controller.lock().clone();
461 if let (Some(sender), Some(receiver), Some(registry), Some(controller)) =
462 (sender, receiver, registry, controller)
463 {
464 let self_id = laminar_core::state::NodeId(controller.instance_id().0);
465 graph.set_cluster_shuffle(crate::operator::sql_query::ClusterShuffleConfig {
466 registry,
467 sender,
468 receiver,
469 self_id,
470 });
471 }
472 }
473
474 for name in source_regs.keys() {
478 if let Some(entry) = self.catalog.get_source(name) {
479 graph.register_source_schema(name.clone(), entry.schema.clone());
480 }
481 }
482
483 for reg in stream_regs.values() {
484 graph.add_query(
485 reg.name.clone(),
486 reg.query_sql.clone(),
487 reg.emit_clause.clone(),
488 reg.window_config.clone(),
489 reg.order_config.clone(),
490 None,
491 );
492 }
493
494 for tcfg in graph.temporal_join_configs() {
497 if self.lookup_registry.get_entry(&tcfg.table_name).is_none() {
498 let initial_batch = self
501 .table_store
502 .read()
503 .to_record_batch(&tcfg.table_name)
504 .or_else(|| {
505 self.catalog
506 .get_source(&tcfg.table_name)
507 .map(|e| RecordBatch::new_empty(e.schema.clone()))
508 })
509 .unwrap_or_else(|| {
510 RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty()))
511 });
512 let key_columns = vec![tcfg.table_key_column.clone()];
513 let key_indices: Vec<usize> = key_columns
514 .iter()
515 .filter_map(|k| initial_batch.schema().index_of(k).ok())
516 .collect();
517
518 let resolved_version_col = if tcfg.table_version_column.is_empty() {
522 let schema = initial_batch.schema();
523 schema
524 .fields()
525 .iter()
526 .find(|f| {
527 f.name() != &tcfg.table_key_column
528 && matches!(
529 f.data_type(),
530 arrow::datatypes::DataType::Int64
531 | arrow::datatypes::DataType::Timestamp(_, _)
532 )
533 })
534 .map(|f| f.name().clone())
535 .unwrap_or_default()
536 } else {
537 tcfg.table_version_column.clone()
538 };
539
540 let Ok(version_col_idx) = initial_batch.schema().index_of(&resolved_version_col)
541 else {
542 if !initial_batch.schema().fields().is_empty() {
543 tracing::warn!(
544 table=%tcfg.table_name,
545 version_col=%resolved_version_col,
546 "Version column not found in temporal table schema; \
547 will resolve on first CDC batch"
548 );
549 }
550 self.lookup_registry.register_versioned(
552 &tcfg.table_name,
553 laminar_sql::datafusion::VersionedLookupState {
554 batch: initial_batch,
555 index: Arc::new(
556 laminar_sql::datafusion::lookup_join_exec::VersionedIndex::default(
557 ),
558 ),
559 key_columns,
560 version_column: resolved_version_col,
561 stream_time_column: tcfg.stream_time_column.clone(),
562 max_versions_per_key: usize::MAX,
563 },
564 );
565 continue;
566 };
567 let index = Arc::new(
568 laminar_sql::datafusion::lookup_join_exec::VersionedIndex::build(
569 &initial_batch,
570 &key_indices,
571 version_col_idx,
572 usize::MAX,
573 )
574 .unwrap_or_default(),
575 );
576 self.lookup_registry.register_versioned(
577 &tcfg.table_name,
578 laminar_sql::datafusion::VersionedLookupState {
579 batch: initial_batch,
580 index,
581 key_columns,
582 version_column: resolved_version_col,
583 stream_time_column: tcfg.stream_time_column.clone(),
584 max_versions_per_key: usize::MAX,
585 },
586 );
587 }
588 }
589
590 let prom_registry = self.prometheus_registry.lock().clone();
593
594 let mut sources: Vec<SourceRegistration> = Vec::new();
596 for (name, reg) in &source_regs {
597 if reg.connector_type.is_none() {
598 continue;
599 }
600 let mut config = build_source_config(reg)?;
601
602 if let Some(entry) = self.catalog.get_source(name) {
605 let schema_str = crate::pipeline_callback::encode_arrow_schema(&entry.schema);
606 config.set("_arrow_schema".to_string(), schema_str);
607 }
608
609 let source = self
610 .connector_registry
611 .create_source(&config, prom_registry.as_deref())
612 .map_err(|e| {
613 DbError::Connector(format!(
614 "Cannot create source '{}' (type '{}'): {e}",
615 name,
616 config.connector_type()
617 ))
618 })?;
619 let supports_replay = source.supports_replay();
620 if !supports_replay {
621 tracing::warn!(
622 source = %name,
623 "source does not support replay — exactly-once semantics \
624 are degraded to at-most-once for this source"
625 );
626 }
627 if let Some(entry) = self.catalog.get_source(name) {
635 if entry.source.event_time_column().is_none() {
636 if let Some(col) = config.get("event.time.column") {
637 entry.source.set_event_time_column(col);
638 } else if let Some(col) = config.get("event.time.field") {
639 entry.source.set_event_time_column(col);
640 }
641 }
642 if let Some(ms_str) = config.get("max.out.of.orderness.ms") {
646 match ms_str.parse::<u64>() {
647 Ok(ms) => {
648 entry
649 .source
650 .set_max_out_of_orderness(std::time::Duration::from_millis(ms));
651 }
652 Err(e) => {
653 tracing::warn!(
654 source = %name,
655 value = %ms_str,
656 error = %e,
657 "ignoring unparseable max.out.of.orderness.ms — \
658 watermark will use Duration::ZERO"
659 );
660 }
661 }
662 }
663 }
664
665 sources.push(SourceRegistration {
666 name: name.clone(),
667 connector: source,
668 config,
669 supports_replay,
670 restore_checkpoint: None, });
672 }
673
674 let bridged_names: rustc_hash::FxHashSet<String> =
682 sources.iter().map(|s| s.name.clone()).collect();
683 for (name, reg) in &source_regs {
685 if reg.connector_type.is_some() {
686 continue; }
688 if let Some(entry) = self.catalog.get_source(name) {
689 let subscription = entry.sink.subscribe();
690 let connector = crate::catalog_connector::CatalogSourceConnector::new(
691 subscription,
692 entry.schema.clone(),
693 entry.data_notify(),
694 );
695 sources.push(SourceRegistration {
696 name: name.clone(),
697 connector: Box::new(connector),
698 config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
699 supports_replay: false,
700 restore_checkpoint: None,
701 });
702 }
703 }
704 for name in self.catalog.list_sources() {
707 if bridged_names.contains(&name) || source_regs.contains_key(&name) {
708 continue;
709 }
710 if let Some(entry) = self.catalog.get_source(&name) {
711 graph.register_source_schema(name.clone(), entry.schema.clone());
712 let subscription = entry.sink.subscribe();
713 let connector = crate::catalog_connector::CatalogSourceConnector::new(
714 subscription,
715 entry.schema.clone(),
716 entry.data_notify(),
717 );
718 sources.push(SourceRegistration {
719 name: name.clone(),
720 connector: Box::new(connector),
721 config: laminar_connectors::config::ConnectorConfig::new("catalog-bridge"),
722 supports_replay: false,
723 restore_checkpoint: None,
724 });
725 }
726 }
727
728 let stream_output_schemas = resolve_stream_output_schemas(&self.ctx, &stream_regs).await?;
732
733 let (sink_event_tx, sink_event_rx) =
737 laminar_core::streaming::channel::channel::<crate::sink_task::SinkEvent>(
738 crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY,
739 );
740 #[allow(clippy::type_complexity)]
741 let mut sinks: Vec<(
742 String,
743 crate::sink_task::SinkTaskHandle,
744 Option<String>,
745 String, bool, )> = Vec::new();
748 for (name, reg) in &sink_regs {
749 if reg.connector_type.is_none() {
750 continue;
751 }
752 let mut config = build_sink_config(reg)?;
753 let upstream_schema = stream_output_schemas.get(®.input).cloned().or_else(|| {
756 self.catalog
757 .get_source(®.input)
758 .map(|e| e.schema.clone())
759 });
760 if let Some(schema) = upstream_schema {
761 let schema_str = crate::pipeline_callback::encode_arrow_schema(&schema);
762 config.set("_arrow_schema".to_string(), schema_str);
763 }
764 let mut sink = self
765 .connector_registry
766 .create_sink(&config, prom_registry.as_deref())
767 .map_err(|e| {
768 DbError::Connector(format!(
769 "Cannot create sink '{}' (type '{}'): {e}",
770 name,
771 config.connector_type()
772 ))
773 })?;
774 sink.open(&config)
776 .await
777 .map_err(|e| DbError::Connector(format!("Failed to open sink '{name}': {e}")))?;
778 let caps = sink.capabilities();
779 let write_timeout =
782 match config
783 .get_parsed::<u64>("sink.write.timeout.ms")
784 .map_err(|e| {
785 DbError::Connector(format!(
786 "Invalid 'sink.write.timeout.ms' for sink '{name}': {e}"
787 ))
788 })? {
789 Some(ms) => std::time::Duration::from_millis(ms),
790 None => caps.suggested_write_timeout,
791 };
792 if write_timeout.is_zero() {
793 return Err(DbError::Connector(format!(
794 "sink '{name}': write_timeout must be > 0 \
795 (check 'sink.write.timeout.ms' or the sink's \
796 suggested_write_timeout)"
797 )));
798 }
799 let sink_id: std::sync::Arc<str> = std::sync::Arc::from(name.as_str());
800 let handle =
801 crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
802 name: name.clone(),
803 sink_id,
804 connector: sink,
805 exactly_once: caps.exactly_once,
806 channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
807 flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
808 write_timeout,
809 event_tx: sink_event_tx.clone(),
810 });
811 sinks.push((
812 name.clone(),
813 handle,
814 reg.filter_expr.clone(),
815 reg.input.clone(),
816 caps.changelog,
817 ));
818 }
819 drop(sink_event_tx);
822
823 let mut table_sources: Vec<(String, Box<dyn ReferenceTableSource>, RefreshMode)> =
825 Vec::new();
826 for (name, reg) in &table_regs {
827 if reg.connector_type.is_none() {
828 continue;
829 }
830 let config = build_table_config(reg)?;
831 let source = self
832 .connector_registry
833 .create_table_source(&config)
834 .map_err(|e| {
835 DbError::Connector(format!("Cannot create table source '{name}': {e}"))
836 })?;
837 let mode = reg.refresh.clone().unwrap_or(RefreshMode::SnapshotPlusCdc);
838 table_sources.push((name.clone(), source, mode));
839 }
840
841 {
845 let mut guard = self.coordinator.lock().await;
846 if let Some(ref mut coord) = *guard {
847 for (name, handle, _, _, _) in &sinks {
848 let exactly_once = handle.exactly_once();
849 coord.register_sink(name.clone(), handle.clone(), exactly_once);
850 }
851 }
852 }
853
854 {
858 let mut guard = self.coordinator.lock().await;
859 if let Some(ref mut coord) = *guard {
860 match coord.recover().await {
861 Ok(Some(recovered)) => {
862 for (name, source, _) in &mut table_sources {
863 if let Some(cp) = recovered.manifest.table_offsets.get(name) {
864 let restored =
865 crate::checkpoint_coordinator::connector_to_source_checkpoint(
866 cp,
867 );
868 if let Err(e) = source.restore(&restored).await {
869 tracing::warn!(
870 table=%name, error=%e,
871 "Table source restore failed"
872 );
873 }
874 }
875 }
876 for src in &mut sources {
881 if !src.supports_replay {
882 continue;
883 }
884 if let Some(cp) = recovered.manifest.source_offsets.get(&src.name) {
885 let restored =
886 crate::checkpoint_coordinator::connector_to_source_checkpoint(
887 cp,
888 );
889 tracing::info!(
890 source = %src.name,
891 offsets = cp.offsets.len(),
892 "attaching checkpoint offsets for source recovery"
893 );
894 src.restore_checkpoint = Some(restored);
895 }
896 }
897 let mut graph_restore_failed = false;
898 let op_keys: Vec<&String> =
899 recovered.manifest.operator_states.keys().collect();
900 let instance_hint = {
901 #[cfg(feature = "cluster-unstable")]
902 {
903 self.cluster_controller
904 .lock()
905 .as_ref()
906 .map_or(0, |c| c.instance_id().0)
907 }
908 #[cfg(not(feature = "cluster-unstable"))]
909 {
910 0u64
911 }
912 };
913 tracing::info!(
914 instance = instance_hint,
915 count = op_keys.len(),
916 keys = ?op_keys,
917 "manifest operator_states summary"
918 );
919 if let Some(op) = recovered.manifest.operator_states.get("operator_graph") {
920 if let Some(bytes) = op.decode_inline() {
921 match graph.restore_from_bytes(&bytes) {
922 Ok(n) => {
923 tracing::info!(
924 queries = n,
925 "Restored operator graph state from checkpoint"
926 );
927 }
928 Err(e) => {
929 graph_restore_failed = true;
930 tracing::warn!(
931 error = %e,
932 "Operator graph state restore failed, starting fresh"
933 );
934 }
935 }
936 } else {
937 tracing::warn!(
938 "manifest has 'operator_graph' but decode_inline returned None"
939 );
940 }
941 } else if recovered
942 .manifest
943 .operator_states
944 .contains_key("stream_executor")
945 {
946 graph_restore_failed = true;
947 tracing::warn!(
948 "Found old stream_executor checkpoint format; \
949 skipping restore (clean break). Starting fresh."
950 );
951 }
952
953 if !graph_restore_failed {
957 let prefix = crate::mv_store::CHECKPOINT_KEY_PREFIX;
958 let mut store = self.mv_store.write();
959 let mut restored = 0usize;
960 for (key, op) in &recovered.manifest.operator_states {
961 if let Some(name) = key.strip_prefix(prefix) {
962 if let Some(bytes) = op.decode_inline() {
963 match store.restore_from_ipc(name, &bytes) {
964 Ok(true) => restored += 1,
965 Ok(false) => {} Err(e) => {
967 tracing::warn!(mv = name, error = %e, "MV restore failed");
968 }
969 }
970 }
971 }
972 }
973 if restored > 0 {
974 tracing::info!(mvs = restored, "Restored MV state from checkpoint");
975 }
976 }
977 tracing::info!(
978 epoch = recovered.epoch(),
979 sources_restored = recovered.sources_restored,
980 sinks_rolled_back = recovered.sinks_rolled_back,
981 "Recovered from unified checkpoint"
982 );
983 }
984 Ok(None) => {
985 tracing::info!("No checkpoint found, starting fresh");
986 }
987 Err(e) => {
988 tracing::warn!(error = %e, "Checkpoint recovery failed, starting fresh");
989 }
990 }
991 }
992 }
993
994 {
997 let guard = self.coordinator.lock().await;
998 if let Some(ref coord) = *guard {
999 coord.begin_initial_epoch().await?;
1000 }
1001 }
1002
1003 for (name, source, mode) in &mut table_sources {
1005 if matches!(mode, RefreshMode::Manual) {
1006 continue;
1007 }
1008 while let Some(batch) = source
1009 .poll_snapshot()
1010 .await
1011 .map_err(|e| DbError::Connector(format!("Table '{name}' snapshot error: {e}")))?
1012 {
1013 self.table_store
1014 .write()
1015 .upsert(name, &batch)
1016 .map_err(|e| DbError::Connector(format!("Table '{name}' upsert error: {e}")))?;
1017 }
1018 self.sync_table_to_datafusion(name)?;
1019 {
1020 let mut ts = self.table_store.write();
1021 ts.rebuild_xor_filter(name);
1022 ts.set_ready(name, true);
1023 }
1024 if matches!(
1028 self.lookup_registry.get_entry(name),
1029 Some(laminar_sql::datafusion::RegisteredLookup::Versioned(_))
1030 ) {
1031 } else if let Some(batch) = self.table_store.read().to_record_batch(name) {
1033 self.lookup_registry.register(
1034 name,
1035 laminar_sql::datafusion::LookupSnapshot {
1036 batch,
1037 key_columns: vec![], },
1039 );
1040 }
1041 }
1042
1043 for (name, _source, mode) in &mut table_sources {
1047 if !matches!(mode, RefreshMode::Manual) {
1048 continue;
1049 }
1050 let Some(reg) = table_regs.get(name.as_str()) else {
1051 continue;
1052 };
1053 let max_entries = reg.cache_max_entries.unwrap_or(65_536);
1054 let Some(schema) = self.table_store.read().table_schema(name) else {
1055 continue;
1056 };
1057 let pk_csv = ®.primary_key;
1058 let pk_cols: Vec<String> = pk_csv
1059 .split(',')
1060 .map(|s| s.trim().to_string())
1061 .filter(|s| !s.is_empty())
1062 .collect();
1063 let key_sort_fields: Vec<arrow::row::SortField> = pk_cols
1064 .iter()
1065 .filter_map(|col| {
1066 schema
1067 .field_with_name(col)
1068 .ok()
1069 .map(|f| arrow::row::SortField::new(f.data_type().clone()))
1070 })
1071 .collect();
1072
1073 let cache = Arc::new(laminar_core::lookup::foyer_cache::FoyerMemoryCache::new(
1074 0,
1075 laminar_core::lookup::foyer_cache::FoyerMemoryCacheConfig {
1076 capacity: max_entries,
1077 shards: 16,
1078 },
1079 ));
1080 let lookup_source = if let Ok(mut config) = build_table_config(reg) {
1083 config.set("_primary_key_columns", pk_csv.as_str());
1084 match self.connector_registry.create_lookup_source(config).await {
1085 Some(Ok(src)) => Some(src),
1086 Some(Err(e)) => {
1087 tracing::warn!(
1088 table = %name, error = %e,
1089 "lookup source creation failed; cache-only mode"
1090 );
1091 None
1092 }
1093 None => None,
1094 }
1095 } else {
1096 None
1097 };
1098
1099 self.lookup_registry.register_partial(
1100 name,
1101 laminar_sql::datafusion::PartialLookupState {
1102 foyer_cache: cache,
1103 schema,
1104 key_columns: pk_cols,
1105 key_sort_fields,
1106 source: lookup_source,
1107 fetch_semaphore: Arc::new(tokio::sync::Semaphore::new(16)),
1108 },
1109 );
1110 *mode = RefreshMode::SnapshotPlusCdc;
1111 tracing::info!(
1112 table = %name,
1113 max_entries,
1114 pk = %pk_csv,
1115 "registered on-demand lookup table (partial cache)"
1116 );
1117 }
1118
1119 let mut stream_sources: Vec<(String, streaming::Source<crate::catalog::ArrowRecord>)> =
1121 Vec::new();
1122 for reg in stream_regs.values() {
1123 if let Some(src) = self.catalog.get_stream_source(®.name) {
1124 stream_sources.push((reg.name.clone(), src));
1125 }
1126 }
1127
1128 let source_names = self.catalog.list_sources();
1130 let mut watermark_states: FxHashMap<String, SourceWatermarkState> =
1131 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1132 let mut source_entries_for_wm: FxHashMap<String, Arc<crate::catalog::SourceEntry>> =
1133 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1134 let mut source_ids: FxHashMap<String, usize> =
1135 FxHashMap::with_capacity_and_hasher(source_names.len(), rustc_hash::FxBuildHasher);
1136 for name in source_names {
1137 if let Some(entry) = self.catalog.get_source(&name) {
1138 if let (Some(col), Some(dur)) =
1139 (&entry.watermark_column, entry.max_out_of_orderness)
1140 {
1141 let extractor = laminar_core::time::EventTimeExtractor::from_column(col)
1142 .with_mode(laminar_core::time::ExtractionMode::Max);
1143 let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1144 .is_processing_time
1145 .load(std::sync::atomic::Ordering::Relaxed)
1146 {
1147 Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1148 } else {
1149 Box::new(
1150 laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(dur),
1151 )
1152 };
1153 let id = source_ids.len();
1154 source_ids.insert(name.clone(), id);
1155 watermark_states.insert(
1156 name.clone(),
1157 SourceWatermarkState {
1158 extractor,
1159 generator,
1160 column: col.clone(),
1161 },
1162 );
1163 }
1164 source_entries_for_wm.insert(name, entry);
1165 }
1166 }
1167
1168 for name in self.catalog.list_sources() {
1176 if watermark_states.contains_key(&name) {
1177 continue;
1178 }
1179 if let Some(entry) = self.catalog.get_source(&name) {
1180 if let Some(col) = entry.source.event_time_column() {
1181 let extractor = laminar_core::time::EventTimeExtractor::from_column(&col)
1182 .with_mode(laminar_core::time::ExtractionMode::Max);
1183 let ooo_bound = entry
1184 .source
1185 .max_out_of_orderness()
1186 .unwrap_or(std::time::Duration::ZERO);
1187 let generator: Box<dyn laminar_core::time::WatermarkGenerator> = if entry
1188 .is_processing_time
1189 .load(std::sync::atomic::Ordering::Relaxed)
1190 {
1191 Box::new(laminar_core::time::ProcessingTimeGenerator::new())
1192 } else {
1193 Box::new(
1194 laminar_core::time::BoundedOutOfOrdernessGenerator::from_duration(
1195 ooo_bound,
1196 ),
1197 )
1198 };
1199 let id = source_ids.len();
1200 source_ids.insert(name.clone(), id);
1201 watermark_states.insert(
1202 name.clone(),
1203 SourceWatermarkState {
1204 extractor,
1205 generator,
1206 column: col,
1207 },
1208 );
1209 }
1210 }
1211 }
1212
1213 let tracker = if source_ids.is_empty() {
1214 None
1215 } else {
1216 Some(laminar_core::time::WatermarkTracker::new(source_ids.len()))
1217 };
1218
1219 let max_poll = self.config.default_buffer_size.min(1024);
1220 let checkpoint_interval = self
1221 .config
1222 .checkpoint
1223 .as_ref()
1224 .and_then(|c| c.interval_ms)
1225 .map(std::time::Duration::from_millis);
1226
1227 tracing::info!(
1228 sources = sources.len(),
1229 sinks = sinks.len(),
1230 streams = stream_regs.len(),
1231 subscriptions = stream_sources.len(),
1232 watermark_sources = source_ids.len(),
1233 "Starting event-driven connector pipeline"
1234 );
1235
1236 let drain_budget_ns = self.config.pipeline_drain_budget_ns.unwrap_or(1_000_000);
1242 let query_budget_ns = self.config.pipeline_query_budget_ns.unwrap_or(8_000_000);
1243 let pipeline_config = PipelineConfig {
1244 max_poll_records: max_poll,
1245 channel_capacity: self.config.pipeline_channel_capacity.unwrap_or(64),
1246 fallback_poll_interval: if has_external {
1247 std::time::Duration::from_millis(10)
1248 } else {
1249 std::time::Duration::from_millis(1)
1250 },
1251 checkpoint_interval,
1252 batch_window: self
1253 .config
1254 .pipeline_batch_window
1255 .unwrap_or(if has_external {
1256 std::time::Duration::from_millis(5)
1257 } else {
1258 std::time::Duration::ZERO
1259 }),
1260 barrier_alignment_timeout: std::time::Duration::from_secs(30),
1264 delivery_guarantee: self.config.delivery_guarantee,
1265 cycle_budget_ns: 10_000_000_u64.max(drain_budget_ns + query_budget_ns),
1268 drain_budget_ns,
1269 query_budget_ns,
1270 background_budget_ns: 5_000_000, max_input_buf_batches: self.config.pipeline_max_input_buf_batches.unwrap_or(256),
1272 max_input_buf_bytes: self.config.pipeline_max_input_buf_bytes,
1273 backpressure_policy: self.config.pipeline_backpressure_policy,
1274 };
1275
1276 {
1278 use laminar_connectors::connector::DeliveryGuarantee;
1279
1280 if pipeline_config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
1281 for src in &sources {
1282 if !src.supports_replay {
1283 return Err(DbError::Config(format!(
1284 "[LDB-5030] exactly-once requires all sources to support replay, \
1285 but source '{}' does not. Use at-least-once or remove this source.",
1286 src.name
1287 )));
1288 }
1289 }
1290 for (name, handle, _, _, _) in &sinks {
1291 if !handle.exactly_once() {
1292 return Err(DbError::Config(format!(
1293 "[LDB-5031] exactly-once requires all sinks to support \
1294 exactly-once semantics, but sink '{name}' does not. \
1295 Use at-least-once or configure a transactional sink."
1296 )));
1297 }
1298 }
1299 if pipeline_config.checkpoint_interval.is_none() {
1300 return Err(DbError::Config(
1301 "[LDB-5032] exactly-once requires checkpointing to be enabled. \
1302 Set checkpoint.interval.ms in the pipeline configuration."
1303 .into(),
1304 ));
1305 }
1306 } else if pipeline_config.delivery_guarantee == DeliveryGuarantee::AtLeastOnce {
1307 let has_non_replayable = sources.iter().any(|s| !s.supports_replay);
1308 let has_eo_sink = sinks.iter().any(|(_, h, _, _, _)| h.exactly_once());
1309 if has_non_replayable && has_eo_sink {
1310 tracing::warn!(
1311 "[LDB-5033] pipeline has exactly-once sinks but some sources \
1312 do not support replay — effective guarantee is at-most-once \
1313 for events from non-replayable sources"
1314 );
1315 }
1316 }
1317 }
1318
1319 let shutdown = self.shutdown_signal.clone();
1320
1321 let pipeline_watermark = Arc::clone(&self.pipeline_watermark);
1323 let coordinator = Arc::clone(&self.coordinator);
1324 let table_store_for_loop = self.table_store.clone();
1325 let pipeline_hash = {
1327 use std::hash::{Hash, Hasher};
1328 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1329 for reg in stream_regs.values() {
1330 reg.name.hash(&mut hasher);
1331 reg.query_sql.hash(&mut hasher);
1332 }
1333 for name in source_regs.keys() {
1334 name.hash(&mut hasher);
1335 }
1336 for name in sink_regs.keys() {
1337 name.hash(&mut hasher);
1338 }
1339 Some(hasher.finish())
1340 };
1341
1342 graph.set_query_budget_ns(pipeline_config.query_budget_ns);
1344 graph.set_max_input_buf_batches(pipeline_config.max_input_buf_batches);
1345 graph.set_max_input_buf_bytes(pipeline_config.max_input_buf_bytes);
1346 graph.set_backpressure_policy(pipeline_config.backpressure_policy);
1347
1348 let sinks_pending_filter_count = sinks
1349 .iter()
1350 .filter(|(_, _, filter_sql, _, _)| filter_sql.is_some())
1351 .count();
1352
1353 let source_name_arcs: rustc_hash::FxHashMap<usize, Arc<str>> = source_ids
1354 .iter()
1355 .map(|(name, &sid)| (sid, Arc::<str>::from(name.as_str())))
1356 .collect();
1357 let source_wms_buf = rustc_hash::FxHashMap::with_capacity_and_hasher(
1358 source_name_arcs.len(),
1359 rustc_hash::FxBuildHasher,
1360 );
1361
1362 let prom = self
1363 .engine_metrics
1364 .lock()
1365 .clone()
1366 .expect("EngineMetrics must be set before start()");
1367
1368 let (force_ckpt_tx, force_ckpt_rx) = crossfire::mpsc::bounded_async::<
1374 crate::db::ForceCheckpointReply,
1375 >(crate::db::FORCE_CHECKPOINT_CHANNEL_CAPACITY);
1376 *self.force_ckpt_tx.lock() = Some(force_ckpt_tx);
1377
1378 let callback = crate::pipeline_callback::ConnectorPipelineCallback {
1379 graph,
1380 stream_sources,
1381 sinks,
1382 watermark_states,
1383 source_entries_for_wm,
1384 source_ids,
1385 source_name_arcs,
1386 source_wms_buf,
1387 tracker,
1388 prom,
1389 pipeline_watermark,
1390 coordinator,
1391 table_sources,
1392 table_store: table_store_for_loop,
1393 mv_store_has_any: self.mv_store.read().has_any_handle(),
1394 mv_store: self.mv_store.clone(),
1395 lookup_registry: Arc::clone(&self.lookup_registry),
1396 filter_ctx: laminar_sql::create_session_context(),
1397 compiled_sink_filters: Vec::new(),
1398 pending_sink_filter_compiles: sinks_pending_filter_count,
1399 last_checkpoint: std::time::Instant::now(),
1400 checkpoint_interval: self
1401 .config
1402 .checkpoint
1403 .as_ref()
1404 .and_then(|c| c.interval_ms)
1405 .map(std::time::Duration::from_millis),
1406 pipeline_hash,
1407 delivery_guarantee: pipeline_config.delivery_guarantee,
1408 serialization_timeout: std::time::Duration::from_secs(120),
1409 sink_event_rx,
1410 sink_timed_out: false,
1411 shutdown_signal: Arc::clone(&self.shutdown_signal),
1412 #[cfg(feature = "cluster-unstable")]
1413 cluster_controller: self.cluster_controller.lock().clone(),
1414 #[cfg(feature = "cluster-unstable")]
1415 last_follower_epoch: None,
1416 force_ckpt_rx: Some(force_ckpt_rx),
1417 };
1418
1419 {
1424 let (control_tx, control_rx) =
1426 crossfire::mpsc::bounded_async::<crate::pipeline::ControlMsg>(64);
1427 *self.control_tx.lock() = Some(control_tx);
1428
1429 let coordinator = crate::pipeline::StreamingCoordinator::new(
1430 sources,
1431 pipeline_config,
1432 Arc::clone(&shutdown),
1433 control_rx,
1434 )
1435 .await?;
1436
1437 let (done_tx, done_rx) = crossfire::oneshot::oneshot::<()>();
1438 let (startup_tx, startup_rx) = crossfire::oneshot::oneshot::<Result<(), String>>();
1439 match std::thread::Builder::new()
1440 .name("laminar-compute".into())
1441 .spawn(move || {
1442 let rt = match tokio::runtime::Builder::new_current_thread()
1443 .enable_all()
1444 .build()
1445 {
1446 Ok(rt) => {
1447 startup_tx.send(Ok(()));
1448 rt
1449 }
1450 Err(e) => {
1451 startup_tx.send(Err(format!("compute runtime: {e}")));
1452 return;
1453 }
1454 };
1455 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1456 rt.block_on(async move {
1457 coordinator.run(callback).await;
1458 });
1459 }));
1460 if let Err(panic) = result {
1461 let msg = panic
1462 .downcast_ref::<String>()
1463 .map(String::as_str)
1464 .or_else(|| panic.downcast_ref::<&str>().copied())
1465 .unwrap_or("unknown");
1466 tracing::error!(panic = msg, "laminar-compute thread panicked");
1467 return;
1469 }
1470 done_tx.send(());
1471 }) {
1472 Ok(_) => {}
1473 Err(e) => {
1474 return Err(DbError::Config(format!(
1475 "failed to spawn compute thread: {e}"
1476 )));
1477 }
1478 }
1479
1480 match startup_rx.await {
1482 Ok(Ok(())) => {}
1483 Ok(Err(e)) => return Err(DbError::Config(e)),
1484 Err(_) => {
1485 return Err(DbError::Config(
1486 "compute thread exited before starting runtime".into(),
1487 ));
1488 }
1489 }
1490
1491 let watcher_state = Arc::clone(&self.state);
1492 let watcher_shutdown = Arc::clone(&self.shutdown_signal);
1493 let handle = tokio::spawn(async move {
1494 if done_rx.await.is_err() {
1495 tracing::error!("laminar-compute thread exited unexpectedly");
1496 watcher_state.store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
1497 watcher_shutdown.notify_one();
1498 }
1499 });
1500
1501 *self.runtime_handle.lock() = Some(handle);
1502 }
1503 Ok(())
1504 }
1505
1506 pub async fn shutdown(&self) -> Result<(), DbError> {
1516 let current = self.state.load(std::sync::atomic::Ordering::Acquire);
1517 if current == STATE_STOPPED || current == STATE_SHUTTING_DOWN {
1518 return Ok(());
1519 }
1520
1521 self.state
1522 .store(STATE_SHUTTING_DOWN, std::sync::atomic::Ordering::Release);
1523
1524 *self.force_ckpt_tx.lock() = None;
1529
1530 self.shutdown_signal.notify_one();
1532
1533 let handle = self.runtime_handle.lock().take();
1535 if let Some(handle) = handle {
1536 match tokio::time::timeout(std::time::Duration::from_secs(10), handle).await {
1537 Ok(Ok(())) => {
1538 tracing::info!("Pipeline shut down cleanly");
1539 }
1540 Ok(Err(e)) => {
1541 tracing::warn!(error = %e, "Pipeline task panicked during shutdown");
1542 }
1543 Err(_) => {
1544 tracing::warn!("Pipeline shutdown timed out after 10s");
1545 }
1546 }
1547 }
1548
1549 self.state
1550 .store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
1551 self.close();
1552 Ok(())
1553 }
1554}
1555
1556#[cfg(test)]
1557mod resolver_tests {
1558 use super::resolve_stream_output_schemas;
1559 use crate::connector_manager::StreamRegistration;
1560 use arrow_schema::{DataType, Field, Schema};
1561 use datafusion::datasource::empty::EmptyTable;
1562 use datafusion::prelude::SessionContext;
1563 use std::sync::Arc;
1564 use std::time::Duration;
1565
1566 fn ctx_with_payments() -> SessionContext {
1567 let ctx = SessionContext::new();
1568 let schema = Arc::new(Schema::new(vec![
1569 Field::new("region", DataType::Utf8, false),
1570 Field::new("method", DataType::Utf8, false),
1571 Field::new("amount_usd", DataType::Float64, false),
1572 Field::new("status", DataType::Utf8, false),
1573 Field::new(
1574 "event_time",
1575 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1576 false,
1577 ),
1578 ]));
1579 ctx.register_table("payments", Arc::new(EmptyTable::new(schema)))
1580 .unwrap();
1581 ctx.register_udf(datafusion_expr::ScalarUDF::from(
1582 laminar_sql::datafusion::TumbleWindowStart::new(),
1583 ));
1584 ctx
1585 }
1586
1587 fn reg(name: &str, sql: &str, windowed: bool) -> StreamRegistration {
1588 StreamRegistration {
1589 name: name.to_string(),
1590 query_sql: sql.to_string(),
1591 emit_clause: None,
1592 window_config: windowed.then(|| {
1594 laminar_sql::translator::WindowOperatorConfig::tumbling(
1595 "event_time".into(),
1596 Duration::ZERO,
1597 )
1598 }),
1599 order_config: None,
1600 }
1601 }
1602
1603 #[tokio::test]
1604 async fn windowed_stream_gets_window_start_end_prefix() {
1605 let ctx = ctx_with_payments();
1606 let mut regs = std::collections::HashMap::new();
1607 regs.insert(
1608 "agg".to_string(),
1609 reg(
1610 "agg",
1611 "SELECT region, COUNT(*) AS n FROM payments \
1612 GROUP BY tumble(event_time, INTERVAL '1' MINUTE), region",
1613 true,
1614 ),
1615 );
1616
1617 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
1618 let names: Vec<&str> = out["agg"]
1619 .fields()
1620 .iter()
1621 .map(|f| f.name().as_str())
1622 .collect();
1623 assert_eq!(&names[..2], &["window_start", "window_end"]);
1624 assert_eq!(out["agg"].field(0).data_type(), &DataType::Int64);
1625 assert!(names.contains(&"region") && names.contains(&"n"));
1626 }
1627
1628 #[tokio::test]
1629 async fn non_windowed_stream_has_no_prefix() {
1630 let ctx = ctx_with_payments();
1631 let mut regs = std::collections::HashMap::new();
1632 regs.insert(
1633 "passthrough".to_string(),
1634 reg(
1635 "passthrough",
1636 "SELECT region, amount_usd FROM payments",
1637 false,
1638 ),
1639 );
1640
1641 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
1642 let names: Vec<&str> = out["passthrough"]
1643 .fields()
1644 .iter()
1645 .map(|f| f.name().as_str())
1646 .collect();
1647 assert_eq!(names, vec!["region", "amount_usd"]);
1648 }
1649
1650 #[tokio::test]
1651 async fn chained_streams_resolve_via_iterative_planning() {
1652 let ctx = ctx_with_payments();
1655 let mut regs = std::collections::HashMap::new();
1656 regs.insert(
1657 "b".to_string(),
1658 reg("b", "SELECT region, n + 1 AS n_plus_one FROM a", false),
1659 );
1660 regs.insert(
1661 "a".to_string(),
1662 reg(
1663 "a",
1664 "SELECT region, COUNT(*) AS n FROM payments GROUP BY region",
1665 false,
1666 ),
1667 );
1668
1669 let out = resolve_stream_output_schemas(&ctx, ®s).await.unwrap();
1670 let b_names: Vec<&str> = out["b"]
1671 .fields()
1672 .iter()
1673 .map(|f| f.name().as_str())
1674 .collect();
1675 assert_eq!(b_names, vec!["region", "n_plus_one"]);
1676
1677 assert!(!ctx.table_exist("a").unwrap_or(false));
1681 assert!(!ctx.table_exist("b").unwrap_or(false));
1682 }
1683
1684 #[tokio::test]
1685 async fn unresolvable_streams_surface_planner_error() {
1686 let ctx = ctx_with_payments();
1687 let mut regs = std::collections::HashMap::new();
1688 regs.insert("a".to_string(), reg("a", "SELECT * FROM b", false));
1690 regs.insert("b".to_string(), reg("b", "SELECT * FROM a", false));
1691
1692 let err = resolve_stream_output_schemas(&ctx, ®s)
1693 .await
1694 .unwrap_err()
1695 .to_string();
1696 assert!(err.contains("unresolvable stream dependency"), "got: {err}");
1697 assert!(err.contains('a') && err.contains('b'), "got: {err}");
1698 }
1699}