1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow::array::{RecordBatch, StringArray};
8use arrow::datatypes::{DataType, Field, Schema};
9use datafusion::prelude::SessionContext;
10use laminar_core::streaming;
11use laminar_sql::parser::{parse_streaming_sql, ShowCommand, StreamingStatement};
12use laminar_sql::planner::StreamingPlanner;
13use laminar_sql::register_streaming_functions;
14use laminar_sql::translator::{AsofJoinTranslatorConfig, JoinOperatorConfig};
15
16use crate::builder::LaminarDbBuilder;
17use crate::catalog::SourceCatalog;
18use crate::config::LaminarConfig;
19use crate::error::DbError;
20use crate::handle::{
21 DdlInfo, ExecuteResult, QueryHandle, QueryInfo, SinkInfo, SourceHandle, SourceInfo,
22 UntypedSourceHandle,
23};
24use crate::pipeline::ControlMsg;
25use crate::pipeline_lifecycle::url_to_checkpoint_prefix;
26use crate::sql_utils;
27
28pub(crate) type ControlMsgTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ControlMsg>>;
30
31#[repr(u8)]
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub(crate) enum DbState {
39 Created = 0,
40 Starting = 1,
41 Running = 2,
42 ShuttingDown = 3,
43 Stopped = 4,
44}
45
46impl DbState {
47 pub(crate) fn from_u8(raw: u8) -> Option<Self> {
51 Some(match raw {
52 0 => Self::Created,
53 1 => Self::Starting,
54 2 => Self::Running,
55 3 => Self::ShuttingDown,
56 4 => Self::Stopped,
57 _ => return None,
58 })
59 }
60}
61
62pub(crate) const STATE_CREATED: u8 = DbState::Created as u8;
67pub(crate) const STATE_STARTING: u8 = DbState::Starting as u8;
68pub(crate) const STATE_RUNNING: u8 = DbState::Running as u8;
69pub(crate) const STATE_SHUTTING_DOWN: u8 = DbState::ShuttingDown as u8;
70pub(crate) const STATE_STOPPED: u8 = DbState::Stopped as u8;
71
72fn cache_entries_from_memory(mem: laminar_sql::parser::lookup_table::ByteSize) -> usize {
73 (mem.as_bytes() / 256).max(1024) as usize
74}
75
76pub struct LaminarDB {
82 pub(crate) catalog: Arc<SourceCatalog>,
83 pub(crate) planner: parking_lot::Mutex<StreamingPlanner>,
84 pub(crate) ctx: SessionContext,
85 pub(crate) config: LaminarConfig,
86 pub(crate) config_vars: Arc<HashMap<String, String>>,
87 pub(crate) shutdown: std::sync::atomic::AtomicBool,
88 pub(crate) coordinator:
90 Arc<tokio::sync::Mutex<Option<crate::checkpoint_coordinator::CheckpointCoordinator>>>,
91 pub(crate) connector_manager: parking_lot::Mutex<crate::connector_manager::ConnectorManager>,
92 pub(crate) connector_registry: Arc<laminar_connectors::registry::ConnectorRegistry>,
93 pub(crate) mv_registry: parking_lot::Mutex<laminar_core::mv::MvRegistry>,
94 pub(crate) table_store: Arc<parking_lot::RwLock<crate::table_store::TableStore>>,
95 pub(crate) state: Arc<std::sync::atomic::AtomicU8>,
96 pub(crate) runtime_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
98 pub(crate) shutdown_signal: Arc<tokio::sync::Notify>,
100 pub(crate) engine_metrics:
102 parking_lot::Mutex<Option<Arc<crate::engine_metrics::EngineMetrics>>>,
103 pub(crate) prometheus_registry: parking_lot::Mutex<Option<Arc<prometheus::Registry>>>,
105 pub(crate) start_time: std::time::Instant,
107 pub(crate) session_properties: parking_lot::Mutex<HashMap<String, String>>,
109 pub(crate) pipeline_watermark: Arc<std::sync::atomic::AtomicI64>,
111 pub(crate) lookup_registry: Arc<laminar_sql::datafusion::LookupTableRegistry>,
113 pub(crate) control_tx: parking_lot::Mutex<Option<ControlMsgTx>>,
116 pub(crate) mv_store: Arc<parking_lot::RwLock<crate::mv_store::MvStore>>,
118 #[cfg(feature = "cluster-unstable")]
122 pub(crate) cluster_controller:
123 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::ClusterController>>>,
124 pub(crate) state_backend:
129 parking_lot::Mutex<Option<Arc<dyn laminar_core::state::StateBackend>>>,
130 pub(crate) vnode_registry: parking_lot::Mutex<Option<Arc<laminar_core::state::VnodeRegistry>>>,
134 pub(crate) physical_optimizer_rules:
137 Arc<[Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>]>,
138 pub(crate) pipeline_target_partitions: Option<usize>,
141 #[cfg(feature = "cluster-unstable")]
145 pub(crate) shuffle_sender:
146 parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleSender>>>,
147 #[cfg(feature = "cluster-unstable")]
149 pub(crate) shuffle_receiver:
150 parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleReceiver>>>,
151 #[cfg(feature = "cluster-unstable")]
153 pub(crate) decision_store:
154 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::CheckpointDecisionStore>>>,
155 #[cfg(feature = "cluster-unstable")]
158 pub(crate) assignment_snapshot_store:
159 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>>,
160 pub(crate) force_ckpt_tx: parking_lot::Mutex<Option<ForceCheckpointTx>>,
166}
167
168pub(crate) type ForceCheckpointReply =
175 crossfire::oneshot::TxOneshot<Result<crate::checkpoint_coordinator::CheckpointResult, DbError>>;
176
177pub(crate) type ForceCheckpointTx =
182 crossfire::MAsyncTx<crossfire::mpsc::Array<ForceCheckpointReply>>;
183
184pub(crate) type ForceCheckpointRx =
187 crossfire::AsyncRx<crossfire::mpsc::Array<ForceCheckpointReply>>;
188
189pub(crate) const FORCE_CHECKPOINT_CHANNEL_CAPACITY: usize = 64;
191
192pub(crate) struct SourceWatermarkState {
193 pub(crate) extractor: laminar_core::time::EventTimeExtractor,
194 pub(crate) generator: Box<dyn laminar_core::time::WatermarkGenerator>,
195 pub(crate) column: String,
196}
197
198pub(crate) fn filter_late_rows(
199 batch: &RecordBatch,
200 column: &str,
201 watermark: i64,
202) -> Option<RecordBatch> {
203 match laminar_core::time::filter_batch_by_timestamp(
204 batch,
205 column,
206 watermark,
207 laminar_core::time::ThresholdOp::GreaterEq,
208 ) {
209 Ok(out) => out,
210 Err(e) => {
211 tracing::error!(%column, error = %e, "filter_late_rows: dropping batch");
213 None
214 }
215 }
216}
217
218pub(crate) use laminar_core::time::parse_duration_str;
219
220impl LaminarDB {
221 pub fn open() -> Result<Self, DbError> {
227 Self::open_with_config(LaminarConfig::default())
228 }
229
230 pub fn open_with_config(config: LaminarConfig) -> Result<Self, DbError> {
236 Self::open_with_config_and_vars(config, HashMap::new())
237 }
238
239 #[allow(clippy::unnecessary_wraps)]
245 pub(crate) fn open_with_config_and_vars(
246 config: LaminarConfig,
247 config_vars: HashMap<String, String>,
248 ) -> Result<Self, DbError> {
249 Self::open_with_config_and_vars_and_rules(config, config_vars, &[], None)
250 }
251
252 #[allow(clippy::unnecessary_wraps)]
255 pub(crate) fn open_with_config_and_vars_and_rules(
256 config: LaminarConfig,
257 config_vars: HashMap<String, String>,
258 extra_optimizer_rules: &[Arc<
259 dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
260 >],
261 target_partitions: Option<usize>,
262 ) -> Result<Self, DbError> {
263 crossfire::detect_backoff_cfg();
267
268 let lookup_registry = Arc::new(laminar_sql::datafusion::LookupTableRegistry::new());
269
270 let ctx = {
273 let mut session_config = laminar_sql::datafusion::base_session_config();
274 if let Some(n) = target_partitions {
275 session_config = session_config.with_target_partitions(n);
276 }
277 let extension_planner: Arc<
278 dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync,
279 > = Arc::new(laminar_sql::datafusion::LookupJoinExtensionPlanner::new(
280 Arc::clone(&lookup_registry),
281 ));
282 let query_planner: Arc<dyn datafusion::execution::context::QueryPlanner + Send + Sync> =
283 Arc::new(LookupQueryPlanner { extension_planner });
284 let mut state_builder = datafusion::execution::SessionStateBuilder::new()
285 .with_config(session_config)
286 .with_default_features()
287 .with_query_planner(query_planner);
288 for rule in extra_optimizer_rules {
289 state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
290 }
291 SessionContext::new_with_state(state_builder.build())
292 };
293 register_streaming_functions(&ctx);
294
295 let catalog = Arc::new(SourceCatalog::new(
296 config.default_buffer_size,
297 config.default_backpressure,
298 ));
299
300 let connector_registry = Arc::new(laminar_connectors::registry::ConnectorRegistry::new());
301 Self::register_builtin_connectors(&connector_registry);
302
303 Ok(Self {
304 catalog,
305 planner: parking_lot::Mutex::new(StreamingPlanner::new()),
306 ctx,
307 config,
308 config_vars: Arc::new(config_vars),
309 shutdown: std::sync::atomic::AtomicBool::new(false),
310 coordinator: Arc::new(tokio::sync::Mutex::new(None)),
311 connector_manager: parking_lot::Mutex::new(
312 crate::connector_manager::ConnectorManager::new(),
313 ),
314 connector_registry,
315 mv_registry: parking_lot::Mutex::new(laminar_core::mv::MvRegistry::new()),
316 table_store: Arc::new(parking_lot::RwLock::new(
317 crate::table_store::TableStore::new(),
318 )),
319 state: Arc::new(std::sync::atomic::AtomicU8::new(STATE_CREATED)),
320 runtime_handle: parking_lot::Mutex::new(None),
321 shutdown_signal: Arc::new(tokio::sync::Notify::new()),
322 engine_metrics: parking_lot::Mutex::new(None),
323 prometheus_registry: parking_lot::Mutex::new(None),
324 start_time: std::time::Instant::now(),
325 session_properties: parking_lot::Mutex::new(HashMap::new()),
326 pipeline_watermark: Arc::new(std::sync::atomic::AtomicI64::new(i64::MIN)),
327 lookup_registry,
328 control_tx: parking_lot::Mutex::new(None),
329 mv_store: Arc::new(parking_lot::RwLock::new(crate::mv_store::MvStore::new())),
330 #[cfg(feature = "cluster-unstable")]
331 cluster_controller: parking_lot::Mutex::new(None),
332 state_backend: parking_lot::Mutex::new(None),
333 vnode_registry: parking_lot::Mutex::new(None),
334 physical_optimizer_rules: extra_optimizer_rules.to_vec().into(),
335 pipeline_target_partitions: target_partitions,
336 #[cfg(feature = "cluster-unstable")]
337 shuffle_sender: parking_lot::Mutex::new(None),
338 #[cfg(feature = "cluster-unstable")]
339 shuffle_receiver: parking_lot::Mutex::new(None),
340 #[cfg(feature = "cluster-unstable")]
341 decision_store: parking_lot::Mutex::new(None),
342 #[cfg(feature = "cluster-unstable")]
343 assignment_snapshot_store: parking_lot::Mutex::new(None),
344 force_ckpt_tx: parking_lot::Mutex::new(None),
345 })
346 }
347
348 #[cfg(feature = "cluster-unstable")]
353 pub(crate) fn set_shuffle_sender(&self, sender: Arc<laminar_core::shuffle::ShuffleSender>) {
354 *self.shuffle_sender.lock() = Some(sender);
355 }
356
357 #[cfg(feature = "cluster-unstable")]
360 pub(crate) fn set_shuffle_receiver(
361 &self,
362 receiver: Arc<laminar_core::shuffle::ShuffleReceiver>,
363 ) {
364 *self.shuffle_receiver.lock() = Some(receiver);
365 }
366
367 #[cfg(feature = "cluster-unstable")]
369 pub(crate) fn set_decision_store(
370 &self,
371 store: Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
372 ) {
373 *self.decision_store.lock() = Some(store);
374 }
375
376 #[cfg(feature = "cluster-unstable")]
378 pub(crate) fn set_assignment_snapshot_store(
379 &self,
380 store: Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
381 ) {
382 *self.assignment_snapshot_store.lock() = Some(store);
383 }
384
385 #[cfg(feature = "cluster-unstable")]
390 pub async fn adopt_assignment_snapshot(
391 &self,
392 snapshot: laminar_core::cluster::control::AssignmentSnapshot,
393 ) {
394 let Some(registry) = self.vnode_registry.lock().clone() else {
395 return;
396 };
397 if snapshot.version <= registry.assignment_version() {
398 return;
399 }
400 let vnode_count = registry.vnode_count();
401 let new_assignment: Arc<[laminar_core::state::NodeId]> =
402 snapshot.to_vnode_vec(vnode_count).into();
403
404 let mut guard = self.coordinator.lock().await;
407 registry.set_assignment_and_version(new_assignment, snapshot.version);
408 if let Some(backend) = self.state_backend.lock().clone() {
409 backend.set_authoritative_version(snapshot.version);
410 }
411 if let Some(coord) = guard.as_mut() {
412 coord.set_assignment_version(snapshot.version);
413 let self_id = self
414 .cluster_controller
415 .lock()
416 .as_ref()
417 .map_or(laminar_core::state::NodeId(0), |c| {
418 laminar_core::state::NodeId(c.instance_id().0)
419 });
420 coord.set_vnode_set(laminar_core::state::owned_vnodes(®istry, self_id));
421 coord.set_gate_vnode_set((0..vnode_count).collect());
422 }
423 drop(guard);
424
425 tracing::info!(version = snapshot.version, "adopted assignment snapshot",);
426 }
427
428 #[cfg(feature = "cluster-unstable")]
433 pub(crate) fn set_cluster_controller(
434 &self,
435 controller: Arc<laminar_core::cluster::control::ClusterController>,
436 ) {
437 *self.cluster_controller.lock() = Some(controller);
438 }
439
440 pub(crate) fn set_state_backend(&self, backend: Arc<dyn laminar_core::state::StateBackend>) {
444 *self.state_backend.lock() = Some(backend);
445 }
446
447 pub(crate) fn set_vnode_registry(&self, registry: Arc<laminar_core::state::VnodeRegistry>) {
451 *self.vnode_registry.lock() = Some(registry);
452 }
453
454 #[must_use]
458 pub fn session_context(&self) -> &SessionContext {
459 &self.ctx
460 }
461
462 #[must_use]
464 pub fn builder() -> LaminarDbBuilder {
465 LaminarDbBuilder::new()
466 }
467
468 #[allow(unused_variables)]
470 fn register_builtin_connectors(registry: &laminar_connectors::registry::ConnectorRegistry) {
471 #[cfg(feature = "kafka")]
472 {
473 laminar_connectors::kafka::register_kafka_source(registry);
474 laminar_connectors::kafka::register_kafka_sink(registry);
475 }
476 #[cfg(feature = "postgres-cdc")]
477 {
478 laminar_connectors::cdc::postgres::register_postgres_cdc_source(registry);
479 }
480 #[cfg(feature = "postgres-sink")]
481 {
482 laminar_connectors::postgres::register_postgres_sink(registry);
483 }
484 #[cfg(feature = "delta-lake")]
485 {
486 laminar_connectors::lakehouse::register_delta_lake_sink(registry);
487 laminar_connectors::lakehouse::register_delta_lake_source(registry);
488 }
489 #[cfg(feature = "iceberg")]
490 {
491 laminar_connectors::lakehouse::register_iceberg_sink(registry);
492 laminar_connectors::lakehouse::register_iceberg_source(registry);
493 }
494 #[cfg(feature = "websocket")]
495 {
496 laminar_connectors::websocket::register_websocket_source(registry);
497 laminar_connectors::websocket::register_websocket_sink(registry);
498 }
499 #[cfg(feature = "mysql-cdc")]
500 {
501 laminar_connectors::cdc::mysql::register_mysql_cdc_source(registry);
502 }
503 #[cfg(feature = "mongodb-cdc")]
504 {
505 laminar_connectors::mongodb::register_mongodb_cdc_source(registry);
506 laminar_connectors::mongodb::register_mongodb_sink(registry);
507 }
508 #[cfg(feature = "files")]
509 {
510 laminar_connectors::files::register_file_source(registry);
511 laminar_connectors::files::register_file_sink(registry);
512 }
513 #[cfg(feature = "otel")]
514 {
515 laminar_connectors::otel::register_otel_source(registry);
516 }
517 #[cfg(feature = "nats")]
518 {
519 laminar_connectors::nats::register_nats_source(registry);
520 laminar_connectors::nats::register_nats_sink(registry);
521 }
522 }
523
524 fn handle_register_lookup_table(
528 &self,
529 info: laminar_sql::planner::LookupTableInfo,
530 ) -> Result<ExecuteResult, DbError> {
531 use laminar_sql::parser::lookup_table::ConnectorType;
532
533 if info.primary_key.len() != 1 {
534 return Err(DbError::InvalidOperation(
535 "Lookup table requires a single-column primary key".into(),
536 ));
537 }
538 let pk = info.primary_key[0].clone();
539
540 let cache_mode = info.properties.cache_memory.map(|mem| {
542 let max_entries = cache_entries_from_memory(mem);
543 crate::table_cache_mode::TableCacheMode::Partial { max_entries }
544 });
545 if let Some(cache) = cache_mode {
546 self.table_store.write().create_table_with_cache(
547 &info.name,
548 info.arrow_schema.clone(),
549 &pk,
550 cache,
551 )?;
552 } else {
553 self.table_store
554 .write()
555 .create_table(&info.name, info.arrow_schema.clone(), &pk)?;
556 }
557
558 if !matches!(info.properties.connector, ConnectorType::Static) {
561 self.register_lookup_connector(&info, &pk)?;
562 }
563
564 {
566 let provider = crate::table_provider::ReferenceTableProvider::new(
567 info.name.clone(),
568 info.arrow_schema.clone(),
569 self.table_store.clone(),
570 );
571 let _ = self.ctx.deregister_table(&info.name);
572 self.ctx
573 .register_table(&info.name, Arc::new(provider))
574 .map_err(|e| {
575 DbError::InvalidOperation(format!("Failed to register lookup table: {e}"))
576 })?;
577 }
578
579 if let Some(batch) = self.table_store.read().to_record_batch(&info.name) {
582 self.lookup_registry.register(
583 &info.name,
584 laminar_sql::datafusion::LookupSnapshot {
585 batch,
586 key_columns: info.primary_key.clone(),
587 },
588 );
589 }
590
591 self.refresh_lookup_optimizer_rule();
594
595 Ok(ExecuteResult::Ddl(DdlInfo {
596 statement_type: "CREATE LOOKUP TABLE".to_string(),
597 object_name: info.name,
598 }))
599 }
600
601 #[allow(clippy::unnecessary_wraps)]
604 fn register_lookup_connector(
605 &self,
606 info: &laminar_sql::planner::LookupTableInfo,
607 pk: &str,
608 ) -> Result<(), DbError> {
609 use laminar_sql::parser::lookup_table::ConnectorType;
610
611 let connector_type_str = match &info.properties.connector {
612 ConnectorType::Postgres => "postgres",
613 ConnectorType::PostgresCdc => "postgres-cdc",
614 ConnectorType::MysqlCdc => "mysql-cdc",
615 ConnectorType::Redis => "redis",
616 ConnectorType::S3Parquet => "s3-parquet",
617 ConnectorType::DeltaLake => "delta-lake",
618 ConnectorType::Custom(s) => s.as_str(),
619 ConnectorType::Static => unreachable!(),
620 };
621
622 self.table_store
623 .write()
624 .set_connector(&info.name, connector_type_str);
625
626 let refresh = match info.properties.strategy {
627 laminar_sql::parser::lookup_table::LookupStrategy::Replicated
628 | laminar_sql::parser::lookup_table::LookupStrategy::Partitioned => {
629 if matches!(info.properties.connector, ConnectorType::Postgres) {
631 Some(laminar_connectors::reference::RefreshMode::SnapshotOnly)
632 } else {
633 Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
634 }
635 }
636 laminar_sql::parser::lookup_table::LookupStrategy::OnDemand => {
637 Some(laminar_connectors::reference::RefreshMode::Manual)
638 }
639 };
640
641 let consumed = [
645 "connector",
646 "strategy",
647 "cache.memory",
648 "cache.disk",
649 "cache.ttl",
650 "pushdown",
651 "format",
652 ];
653 let mut connector_options = HashMap::with_capacity(info.raw_options.len());
654 let mut format_options = HashMap::with_capacity(4);
655 for (k, v) in &info.raw_options {
656 let lower = k.to_lowercase();
657 if consumed.contains(&lower.as_str()) {
658 continue;
659 }
660 if let Some(suffix) = lower.strip_prefix("format.") {
661 format_options.insert(suffix.to_string(), v.clone());
662 } else {
663 connector_options.insert(k.clone(), v.clone());
664 }
665 }
666
667 let cache_max = info.properties.cache_memory.map(cache_entries_from_memory);
668
669 self.connector_manager
670 .lock()
671 .register_table(crate::connector_manager::TableRegistration {
672 name: info.name.clone(),
673 primary_key: pk.to_string(),
674 connector_type: Some(connector_type_str.to_string()),
675 connector_options,
676 format: info.raw_options.get("format").cloned(),
677 format_options,
678 refresh,
679 cache_max_entries: cache_max,
680 });
681
682 Ok(())
683 }
684
685 fn refresh_lookup_optimizer_rule(&self) {
688 use laminar_sql::planner::lookup_join::{LookupColumnPruningRule, LookupJoinRewriteRule};
689 use laminar_sql::planner::predicate_split::{
690 PlanPushdownMode, PlanSourceCapabilities, PredicateSplitterRule,
691 SourceCapabilitiesRegistry,
692 };
693
694 self.ctx.remove_optimizer_rule("lookup_join_rewrite");
696 self.ctx.remove_optimizer_rule("predicate_splitter");
697 self.ctx.remove_optimizer_rule("lookup_column_pruning");
698
699 let tables = self.planner.lock().lookup_tables_cloned();
700 if tables.is_empty() {
701 return;
702 }
703
704 let mut caps_registry = SourceCapabilitiesRegistry::default();
706 for (name, info) in &tables {
707 let mode = match info.properties.pushdown_mode {
708 laminar_sql::parser::lookup_table::PushdownMode::Enabled
709 | laminar_sql::parser::lookup_table::PushdownMode::Auto => PlanPushdownMode::Full,
710 laminar_sql::parser::lookup_table::PushdownMode::Disabled => PlanPushdownMode::None,
711 };
712 let pk_set: std::collections::HashSet<String> =
713 info.primary_key.iter().cloned().collect();
714 caps_registry.register(
715 name.clone(),
716 PlanSourceCapabilities {
717 pushdown_mode: mode,
718 eq_columns: pk_set,
719 range_columns: std::collections::HashSet::new(),
720 in_columns: std::collections::HashSet::new(),
721 supports_null_check: false,
722 },
723 );
724 }
725
726 self.ctx
728 .add_optimizer_rule(Arc::new(LookupJoinRewriteRule::new(tables)));
729 self.ctx
730 .add_optimizer_rule(Arc::new(PredicateSplitterRule::new(caps_registry)));
731 self.ctx
732 .add_optimizer_rule(Arc::new(LookupColumnPruningRule));
733 }
734
735 #[must_use]
740 pub fn connector_registry(&self) -> &laminar_connectors::registry::ConnectorRegistry {
741 &self.connector_registry
742 }
743
744 pub(crate) fn register_custom_udf(&self, udf: datafusion_expr::ScalarUDF) {
748 self.ctx.register_udf(udf);
749 }
750
751 pub(crate) fn register_custom_udaf(&self, udaf: datafusion_expr::AggregateUDF) {
755 self.ctx.register_udaf(udaf);
756 }
757
758 #[cfg(feature = "delta-lake")]
775 pub async fn register_delta_table(
776 &self,
777 name: &str,
778 table_uri: &str,
779 storage_options: HashMap<String, String>,
780 ) -> Result<(), DbError> {
781 laminar_connectors::lakehouse::delta_table_provider::register_delta_table(
782 &self.ctx,
783 name,
784 table_uri,
785 storage_options,
786 )
787 .await
788 .map_err(DbError::from)
789 }
790
791 pub async fn execute(&self, sql: &str) -> Result<ExecuteResult, DbError> {
807 if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
808 return Err(DbError::Shutdown);
809 }
810
811 let resolved = if self.config_vars.is_empty() {
813 sql.to_string()
814 } else {
815 sql_utils::resolve_config_vars(sql, &self.config_vars, true)?
816 };
817
818 let stmts = sql_utils::split_statements(&resolved);
820 if stmts.is_empty() {
821 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
822 }
823
824 let mut last_result = None;
826 for stmt_sql in &stmts {
827 last_result = Some(self.execute_single(stmt_sql).await?);
828 }
829
830 last_result.ok_or_else(|| DbError::InvalidOperation("Empty SQL statement".into()))
831 }
832
833 #[allow(clippy::too_many_lines)]
835 async fn execute_single(&self, sql: &str) -> Result<ExecuteResult, DbError> {
836 let statements = parse_streaming_sql(sql)?;
837
838 if statements.is_empty() {
839 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
840 }
841
842 let statement = &statements[0];
843
844 match statement {
845 StreamingStatement::CreateSource(create) => {
846 let result = self.handle_create_source(create).await?;
847 if let ExecuteResult::Ddl(ref info) = result {
848 self.connector_manager
849 .lock()
850 .store_ddl(&info.object_name, sql);
851 }
852 Ok(result)
853 }
854 StreamingStatement::CreateSink(create) => {
855 let result = self.handle_create_sink(create)?;
856 if let ExecuteResult::Ddl(ref info) = result {
857 self.connector_manager
858 .lock()
859 .store_ddl(&info.object_name, sql);
860 }
861 Ok(result)
862 }
863 StreamingStatement::CreateStream {
864 name,
865 query,
866 emit_clause,
867 query_sql,
868 ..
869 } => self.handle_create_stream(name, query, emit_clause.as_ref(), query_sql),
870 StreamingStatement::CreateContinuousQuery { .. }
871 | StreamingStatement::CreateLookupTable(_)
872 | StreamingStatement::DropLookupTable { .. } => self.handle_query(sql).await,
873 StreamingStatement::Standard(stmt) => {
874 if let sqlparser::ast::Statement::CreateTable(ct) = stmt.as_ref() {
875 self.handle_create_table(ct)
876 } else if let sqlparser::ast::Statement::Drop {
877 object_type: sqlparser::ast::ObjectType::Table,
878 names,
879 if_exists,
880 ..
881 } = stmt.as_ref()
882 {
883 self.handle_drop_table(names, *if_exists)
884 } else if let sqlparser::ast::Statement::Set(set_stmt) = stmt.as_ref() {
885 self.handle_set(set_stmt)
886 } else {
887 self.handle_query(sql).await
888 }
889 }
890 StreamingStatement::InsertInto {
891 table_name,
892 columns,
893 values,
894 } => self.handle_insert_into(table_name, columns, values).await,
895 StreamingStatement::DropSource {
896 name,
897 if_exists,
898 cascade,
899 } => self.handle_drop_source(name, *if_exists, *cascade),
900 StreamingStatement::DropSink {
901 name,
902 if_exists,
903 cascade,
904 } => self.handle_drop_sink(name, *if_exists, *cascade),
905 StreamingStatement::DropStream {
906 name,
907 if_exists,
908 cascade,
909 } => self.handle_drop_stream(name, *if_exists, *cascade),
910 StreamingStatement::DropMaterializedView {
911 name,
912 if_exists,
913 cascade,
914 } => self.handle_drop_materialized_view(name, *if_exists, *cascade),
915 StreamingStatement::Show(cmd) => {
916 let batch = match cmd {
917 ShowCommand::Sources => self.build_show_sources(),
918 ShowCommand::Sinks => self.build_show_sinks(),
919 ShowCommand::Queries => self.build_show_queries(),
920 ShowCommand::MaterializedViews => self.build_show_materialized_views(),
921 ShowCommand::Streams => self.build_show_streams(),
922 ShowCommand::Tables => self.build_show_tables(),
923 ShowCommand::CheckpointStatus => self.build_show_checkpoint_status().await?,
924 ShowCommand::CreateSource { name } => {
925 self.build_show_create_source(&name.to_string())?
926 }
927 ShowCommand::CreateSink { name } => {
928 self.build_show_create_sink(&name.to_string())?
929 }
930 };
931 Ok(ExecuteResult::Metadata(batch))
932 }
933 StreamingStatement::Checkpoint => {
934 let result = self.checkpoint().await?;
935 Ok(ExecuteResult::Ddl(DdlInfo {
936 statement_type: "CHECKPOINT".to_string(),
937 object_name: format!("checkpoint_{}", result.checkpoint_id),
938 }))
939 }
940 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
941 self.handle_restore_checkpoint(*checkpoint_id)
942 }
943 StreamingStatement::Describe { name, .. } => {
944 let name_str = name.to_string();
945 let batch = self.build_describe(&name_str)?;
946 Ok(ExecuteResult::Metadata(batch))
947 }
948 StreamingStatement::Explain {
949 statement, analyze, ..
950 } => {
951 if *analyze {
952 self.handle_explain_analyze(statement, sql).await
953 } else {
954 self.handle_explain(statement)
955 }
956 }
957 StreamingStatement::CreateMaterializedView {
958 name,
959 query,
960 or_replace,
961 if_not_exists,
962 query_sql,
963 ..
964 } => {
965 self.handle_create_materialized_view(
966 sql,
967 name,
968 query,
969 *or_replace,
970 *if_not_exists,
971 query_sql,
972 )
973 .await
974 }
975 StreamingStatement::AlterSource { name, operation } => {
976 self.handle_alter_source(name, operation)
977 }
978 }
979 }
980
981 async fn handle_insert_into(
986 &self,
987 table_name: &sqlparser::ast::ObjectName,
988 _columns: &[sqlparser::ast::Ident],
989 values: &[Vec<sqlparser::ast::Expr>],
990 ) -> Result<ExecuteResult, DbError> {
991 let name = table_name.to_string();
992
993 if let Some(entry) = self.catalog.get_source(&name) {
995 let batch = sql_utils::sql_values_to_record_batch(&entry.schema, values)?;
996 entry
997 .push_and_buffer(batch)
998 .map_err(|e| DbError::InsertError(format!("Failed to push to source: {e}")))?;
999 return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1000 }
1001
1002 {
1005 let mut ts = self.table_store.write();
1006 if ts.has_table(&name) {
1007 let schema = ts
1008 .table_schema(&name)
1009 .ok_or_else(|| DbError::TableNotFound(name.clone()))?;
1010 let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1011 ts.upsert(&name, &batch)?;
1012 drop(ts); self.sync_table_to_datafusion(&name)?;
1015 return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1016 }
1017 }
1018
1019 let table = self
1022 .ctx
1023 .table_provider(&name)
1024 .await
1025 .map_err(|_| DbError::TableNotFound(name.clone()))?;
1026
1027 let schema = table.schema();
1028 let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1029
1030 self.ctx
1032 .deregister_table(&name)
1033 .map_err(|e| DbError::InsertError(format!("Failed to deregister table: {e}")))?;
1034
1035 let mem_table =
1036 datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![batch]])
1037 .map_err(|e| DbError::InsertError(format!("Failed to create table: {e}")))?;
1038
1039 self.ctx
1040 .register_table(&name, Arc::new(mem_table))
1041 .map_err(|e| DbError::InsertError(format!("Failed to register table: {e}")))?;
1042
1043 Ok(ExecuteResult::RowsAffected(values.len() as u64))
1044 }
1045
1046 #[allow(clippy::unused_self)] fn handle_restore_checkpoint(&self, _checkpoint_id: u64) -> Result<ExecuteResult, DbError> {
1052 Err(DbError::Unsupported(
1053 "RESTORE FROM CHECKPOINT is not yet implemented — \
1054 requires pipeline stop, state reload from manifest, \
1055 source offset seek, and pipeline restart"
1056 .to_string(),
1057 ))
1058 }
1059
1060 #[must_use]
1062 pub fn get_session_property(&self, key: &str) -> Option<String> {
1063 self.session_properties
1064 .lock()
1065 .get(&key.to_lowercase())
1066 .cloned()
1067 }
1068
1069 #[must_use]
1071 pub fn session_properties(&self) -> HashMap<String, String> {
1072 self.session_properties.lock().clone()
1073 }
1074
1075 pub fn subscribe<T: crate::handle::FromBatch>(
1081 &self,
1082 name: &str,
1083 ) -> Result<crate::handle::TypedSubscription<T>, DbError> {
1084 let sub = self
1085 .catalog
1086 .get_stream_subscription(name)
1087 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
1088 Ok(crate::handle::TypedSubscription::from_raw(sub))
1089 }
1090
1091 #[cfg(feature = "api")]
1097 pub fn subscribe_raw(
1098 &self,
1099 name: &str,
1100 ) -> Result<laminar_core::streaming::Subscription<crate::catalog::ArrowRecord>, DbError> {
1101 self.catalog
1102 .get_stream_subscription(name)
1103 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))
1104 }
1105
1106 fn handle_explain(&self, statement: &StreamingStatement) -> Result<ExecuteResult, DbError> {
1108 let mut planner = self.planner.lock();
1109
1110 let plan_result = planner.plan(statement);
1112
1113 let mut rows: Vec<(String, String)> = Vec::new();
1114
1115 match plan_result {
1116 Ok(plan) => {
1117 rows.push((
1118 "plan_type".into(),
1119 match &plan {
1120 laminar_sql::planner::StreamingPlan::Query(_) => "Query",
1121 laminar_sql::planner::StreamingPlan::RegisterSource(_) => "RegisterSource",
1122 laminar_sql::planner::StreamingPlan::RegisterSink(_) => "RegisterSink",
1123 laminar_sql::planner::StreamingPlan::Standard(_) => "Standard",
1124 laminar_sql::planner::StreamingPlan::RegisterLookupTable(_) => {
1125 "RegisterLookupTable"
1126 }
1127 laminar_sql::planner::StreamingPlan::DropLookupTable { .. } => {
1128 "DropLookupTable"
1129 }
1130 }
1131 .into(),
1132 ));
1133 match &plan {
1134 laminar_sql::planner::StreamingPlan::Query(qp) => {
1135 if let Some(name) = &qp.name {
1136 rows.push(("query_name".into(), name.clone()));
1137 }
1138 if let Some(wc) = &qp.window_config {
1139 rows.push(("window".into(), format!("{wc}")));
1140 }
1141 if let Some(jcs) = &qp.join_config {
1142 if jcs.len() == 1 {
1143 rows.push(("join".into(), format!("{}", jcs[0])));
1144 } else {
1145 for (i, jc) in jcs.iter().enumerate() {
1146 rows.push((format!("join_step_{}", i + 1), format!("{jc}")));
1147 }
1148 }
1149 }
1150 if let Some(oc) = &qp.order_config {
1151 rows.push(("order_by".into(), format!("{oc:?}")));
1152 }
1153 if let Some(fc) = &qp.frame_config {
1154 rows.push((
1155 "frame_functions".into(),
1156 format!("{}", fc.functions.len()),
1157 ));
1158 }
1159 if let Some(ec) = &qp.emit_clause {
1160 rows.push(("emit".into(), format!("{ec}")));
1161 }
1162 }
1163 laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1164 rows.push(("source".into(), info.name.clone()));
1165 }
1166 laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1167 rows.push(("sink".into(), info.name.clone()));
1168 }
1169 laminar_sql::planner::StreamingPlan::Standard(_) => {
1170 rows.push(("execution".into(), "DataFusion pass-through".into()));
1171 }
1172 laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1173 rows.push(("lookup_table".into(), info.name.clone()));
1174 }
1175 laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1176 rows.push(("drop_lookup_table".into(), name.clone()));
1177 }
1178 }
1179 }
1180 Err(e) => {
1181 rows.push(("error".into(), format!("{e}")));
1183 rows.push((
1184 "statement".into(),
1185 format!("{:?}", std::mem::discriminant(statement)),
1186 ));
1187 }
1188 }
1189
1190 let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1191 let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1192
1193 let schema = Arc::new(Schema::new(vec![
1194 Field::new("plan_key", DataType::Utf8, false),
1195 Field::new("plan_value", DataType::Utf8, false),
1196 ]));
1197
1198 let batch = RecordBatch::try_new(
1199 schema,
1200 vec![
1201 Arc::new(StringArray::from(keys)),
1202 Arc::new(StringArray::from(values)),
1203 ],
1204 )
1205 .map_err(|e| DbError::InvalidOperation(format!("explain metadata: {e}")))?;
1206
1207 Ok(ExecuteResult::Metadata(batch))
1208 }
1209
1210 async fn handle_explain_analyze(
1212 &self,
1213 statement: &StreamingStatement,
1214 original_sql: &str,
1215 ) -> Result<ExecuteResult, DbError> {
1216 let explain_result = self.handle_explain(statement)?;
1218 let mut rows: Vec<(String, String)> = Vec::new();
1219
1220 if let ExecuteResult::Metadata(explain_batch) = &explain_result {
1221 let keys_col = explain_batch
1222 .column(0)
1223 .as_any()
1224 .downcast_ref::<StringArray>();
1225 let vals_col = explain_batch
1226 .column(1)
1227 .as_any()
1228 .downcast_ref::<StringArray>();
1229 if let (Some(keys), Some(vals)) = (keys_col, vals_col) {
1230 for i in 0..explain_batch.num_rows() {
1231 rows.push((keys.value(i).to_string(), vals.value(i).to_string()));
1232 }
1233 }
1234 }
1235
1236 let upper = original_sql.to_uppercase();
1238 let inner_start = upper.find("ANALYZE").map_or(0, |pos| pos + "ANALYZE".len());
1239 let inner_sql = original_sql[inner_start..].trim();
1240
1241 let start = std::time::Instant::now();
1243 match self.ctx.sql(inner_sql).await {
1244 Ok(df) => match df.collect().await {
1245 Ok(batches) => {
1246 let elapsed = start.elapsed();
1247 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1248 rows.push(("rows_produced".into(), total_rows.to_string()));
1249 rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1250 rows.push(("batches_processed".into(), batches.len().to_string()));
1251 }
1252 Err(e) => {
1253 let elapsed = start.elapsed();
1254 rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1255 rows.push(("analyze_error".into(), format!("{e}")));
1256 }
1257 },
1258 Err(e) => {
1259 rows.push(("analyze_error".into(), format!("{e}")));
1260 }
1261 }
1262
1263 let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1264 let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1265
1266 let schema = Arc::new(Schema::new(vec![
1267 Field::new("plan_key", DataType::Utf8, false),
1268 Field::new("plan_value", DataType::Utf8, false),
1269 ]));
1270
1271 let batch = RecordBatch::try_new(
1272 schema,
1273 vec![
1274 Arc::new(StringArray::from(keys)),
1275 Arc::new(StringArray::from(values)),
1276 ],
1277 )
1278 .map_err(|e| DbError::InvalidOperation(format!("explain analyze metadata: {e}")))?;
1279
1280 Ok(ExecuteResult::Metadata(batch))
1281 }
1282
1283 #[allow(clippy::too_many_lines)]
1285 pub(crate) async fn handle_query(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1286 let plan = {
1288 let statements = parse_streaming_sql(sql)?;
1289 if statements.is_empty() {
1290 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1291 }
1292 let mut planner = self.planner.lock();
1293 planner
1294 .plan(&statements[0])
1295 .map_err(laminar_sql::Error::from)?
1296 };
1297
1298 match plan {
1299 laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1300 Ok(ExecuteResult::Ddl(DdlInfo {
1301 statement_type: "DDL".to_string(),
1302 object_name: info.name,
1303 }))
1304 }
1305 laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1306 Ok(ExecuteResult::Ddl(DdlInfo {
1307 statement_type: "DDL".to_string(),
1308 object_name: info.name,
1309 }))
1310 }
1311 laminar_sql::planner::StreamingPlan::Query(query_plan) => {
1312 if let Some(asof_config) = Self::extract_asof_config(&query_plan) {
1314 return self.execute_asof_query(&asof_config, sql).await;
1315 }
1316
1317 let plan_sql = query_plan.statement.to_string();
1318 let logical_plan = self.ctx.state().create_logical_plan(&plan_sql).await?;
1319
1320 let df = self.ctx.execute_logical_plan(logical_plan).await?;
1322 let stream = df.execute_stream().await?;
1323
1324 Ok(self.bridge_query_stream(sql, stream))
1325 }
1326 laminar_sql::planner::StreamingPlan::Standard(stmt) => {
1327 let sql_str = stmt.to_string();
1329 let df = self.ctx.sql(&sql_str).await?;
1330 let stream = df.execute_stream().await?;
1331
1332 Ok(self.bridge_query_stream(sql, stream))
1333 }
1334 laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1335 self.handle_register_lookup_table(info)
1336 }
1337 laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1338 self.table_store.write().drop_table(&name);
1339 self.connector_manager.lock().unregister_table(&name);
1340 let _ = self.ctx.deregister_table(&name);
1341 self.lookup_registry.unregister(&name);
1342 self.refresh_lookup_optimizer_rule();
1343 Ok(ExecuteResult::Ddl(DdlInfo {
1344 statement_type: "DROP LOOKUP TABLE".to_string(),
1345 object_name: name,
1346 }))
1347 }
1348 }
1349 }
1350
1351 fn bridge_query_stream(
1354 &self,
1355 sql: &str,
1356 stream: datafusion::physical_plan::SendableRecordBatchStream,
1357 ) -> ExecuteResult {
1358 let query_id = self.catalog.register_query(sql);
1359 let schema = stream.schema();
1360
1361 let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1362 let (source, sink) =
1363 streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1364
1365 let subscription = sink.subscribe();
1366
1367 let source_clone = source.clone();
1368 tokio::spawn(async move {
1369 use tokio_stream::StreamExt;
1370 let mut stream = stream;
1371 while let Some(result) = stream.next().await {
1372 match result {
1373 Ok(batch) => {
1374 if source_clone.push_arrow(batch).is_err() {
1375 break;
1376 }
1377 }
1378 Err(_) => break,
1379 }
1380 }
1381 drop(source_clone);
1382 });
1383
1384 ExecuteResult::Query(QueryHandle {
1385 id: query_id,
1386 schema,
1387 sql: sql.to_string(),
1388 subscription: Some(subscription),
1389 active: true,
1390 })
1391 }
1392
1393 fn extract_asof_config(
1395 plan: &laminar_sql::planner::QueryPlan,
1396 ) -> Option<AsofJoinTranslatorConfig> {
1397 plan.join_config.as_ref()?.iter().find_map(|jc| {
1398 if let JoinOperatorConfig::Asof(cfg) = jc {
1399 Some(cfg.clone())
1400 } else {
1401 None
1402 }
1403 })
1404 }
1405
1406 async fn execute_asof_query(
1410 &self,
1411 asof_config: &AsofJoinTranslatorConfig,
1412 original_sql: &str,
1413 ) -> Result<ExecuteResult, DbError> {
1414 let left_sql = format!("SELECT * FROM {}", asof_config.left_table);
1415 let right_sql = format!("SELECT * FROM {}", asof_config.right_table);
1416
1417 let left_batches = self
1418 .ctx
1419 .sql(&left_sql)
1420 .await
1421 .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?
1422 .collect()
1423 .await
1424 .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?;
1425
1426 let right_batches = self
1427 .ctx
1428 .sql(&right_sql)
1429 .await
1430 .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?
1431 .collect()
1432 .await
1433 .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?;
1434
1435 let result_batch =
1436 crate::asof_batch::execute_asof_join_batch(&left_batches, &right_batches, asof_config)?;
1437
1438 if result_batch.num_rows() == 0 {
1439 let query_id = self.catalog.register_query(original_sql);
1440 return Ok(ExecuteResult::Query(QueryHandle {
1441 id: query_id,
1442 schema: result_batch.schema(),
1443 sql: original_sql.to_string(),
1444 subscription: None,
1445 active: false,
1446 }));
1447 }
1448
1449 let schema = result_batch.schema();
1450 let mem_table =
1451 datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![result_batch]])
1452 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1453
1454 let _ = self.ctx.deregister_table("__asof_result");
1455 self.ctx
1456 .register_table("__asof_result", Arc::new(mem_table))
1457 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1458
1459 let df = self
1460 .ctx
1461 .sql("SELECT * FROM __asof_result")
1462 .await
1463 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1464 let stream = df
1465 .execute_stream()
1466 .await
1467 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1468
1469 let _ = self.ctx.deregister_table("__asof_result");
1470
1471 Ok(self.bridge_query_stream(original_sql, stream))
1472 }
1473
1474 pub fn source<T: laminar_core::streaming::Record>(
1484 &self,
1485 name: &str,
1486 ) -> Result<SourceHandle<T>, DbError> {
1487 let entry = self
1488 .catalog
1489 .get_source(name)
1490 .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1491 SourceHandle::new(entry)
1492 }
1493
1494 pub fn source_untyped(&self, name: &str) -> Result<UntypedSourceHandle, DbError> {
1500 let entry = self
1501 .catalog
1502 .get_source(name)
1503 .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1504 Ok(UntypedSourceHandle::new(entry))
1505 }
1506
1507 pub fn sources(&self) -> Vec<SourceInfo> {
1509 let names = self.catalog.list_sources();
1510 names
1511 .into_iter()
1512 .filter_map(|name| {
1513 self.catalog.get_source(&name).map(|e| SourceInfo {
1514 name: e.name.clone(),
1515 schema: e.schema.clone(),
1516 watermark_column: e.watermark_column.clone(),
1517 })
1518 })
1519 .collect()
1520 }
1521
1522 pub fn sinks(&self) -> Vec<SinkInfo> {
1524 self.catalog
1525 .list_sinks()
1526 .into_iter()
1527 .map(|name| SinkInfo { name })
1528 .collect()
1529 }
1530
1531 pub fn streams(&self) -> Vec<crate::handle::StreamInfo> {
1533 let mgr = self.connector_manager.lock();
1534 mgr.streams()
1535 .iter()
1536 .map(|(name, reg)| crate::handle::StreamInfo {
1537 name: name.clone(),
1538 sql: Some(reg.query_sql.clone()),
1539 })
1540 .collect()
1541 }
1542
1543 pub fn pipeline_topology(&self) -> crate::handle::PipelineTopology {
1550 use crate::handle::{PipelineEdge, PipelineNode, PipelineNodeType};
1551
1552 let mut nodes = Vec::new();
1553 let mut edges = Vec::new();
1554
1555 let source_names = self.catalog.list_sources();
1557
1558 for name in &source_names {
1560 let schema = self.catalog.get_source(name).map(|e| e.schema.clone());
1561 nodes.push(PipelineNode {
1562 name: name.clone(),
1563 node_type: PipelineNodeType::Source,
1564 schema,
1565 sql: None,
1566 });
1567 }
1568
1569 let mgr = self.connector_manager.lock();
1571 let stream_names: Vec<String> = mgr.streams().keys().cloned().collect();
1572 for (name, reg) in mgr.streams() {
1573 nodes.push(PipelineNode {
1574 name: name.clone(),
1575 node_type: PipelineNodeType::Stream,
1576 schema: None,
1577 sql: Some(reg.query_sql.clone()),
1578 });
1579
1580 let sql_upper = reg.query_sql.to_uppercase();
1584 for src in &source_names {
1585 if sql_upper.contains(&src.to_uppercase()) {
1586 edges.push(PipelineEdge {
1587 from: src.clone(),
1588 to: name.clone(),
1589 });
1590 }
1591 }
1592 for other in &stream_names {
1594 if other != name && sql_upper.contains(&other.to_uppercase()) {
1595 edges.push(PipelineEdge {
1596 from: other.clone(),
1597 to: name.clone(),
1598 });
1599 }
1600 }
1601 }
1602
1603 for (name, reg) in mgr.sinks() {
1605 nodes.push(PipelineNode {
1606 name: name.clone(),
1607 node_type: PipelineNodeType::Sink,
1608 schema: None,
1609 sql: None,
1610 });
1611
1612 if !reg.input.is_empty() {
1614 edges.push(PipelineEdge {
1615 from: reg.input.clone(),
1616 to: name.clone(),
1617 });
1618 }
1619 }
1620
1621 let cm_sink_names: std::collections::HashSet<&String> = mgr.sinks().keys().collect();
1624 for name in self.catalog.list_sinks() {
1625 if !cm_sink_names.contains(&name) {
1626 if let Some(input) = self.catalog.get_sink_input(&name) {
1628 nodes.push(PipelineNode {
1629 name: name.clone(),
1630 node_type: PipelineNodeType::Sink,
1631 schema: None,
1632 sql: None,
1633 });
1634 if !input.is_empty() {
1635 edges.push(PipelineEdge {
1636 from: input,
1637 to: name,
1638 });
1639 }
1640 }
1641 }
1642 }
1643
1644 drop(mgr);
1645
1646 crate::handle::PipelineTopology { nodes, edges }
1647 }
1648
1649 pub fn queries(&self) -> Vec<QueryInfo> {
1651 self.catalog
1652 .list_queries()
1653 .into_iter()
1654 .map(|(id, sql, active)| QueryInfo { id, sql, active })
1655 .collect()
1656 }
1657
1658 #[must_use]
1660 pub fn is_checkpoint_enabled(&self) -> bool {
1661 self.config.checkpoint.is_some()
1662 }
1663
1664 pub fn checkpoint_store(&self) -> Option<Box<dyn laminar_storage::CheckpointStore>> {
1670 let cp_config = self.config.checkpoint.as_ref()?;
1671 let max_retained = cp_config.max_retained.unwrap_or(3);
1672 let vnode_count = self.vnode_registry.lock().as_ref().map_or(
1675 laminar_storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
1676 |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
1677 );
1678
1679 if let Some(ref url) = self.config.object_store_url {
1680 let obj_store = laminar_storage::object_store_builder::build_object_store(
1681 url,
1682 &self.config.object_store_options,
1683 )
1684 .ok()?;
1685 let prefix = url_to_checkpoint_prefix(url);
1686 Some(Box::new(
1687 laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
1688 obj_store,
1689 prefix,
1690 max_retained,
1691 )
1692 .with_vnode_count(vnode_count),
1693 ))
1694 } else {
1695 let data_dir = cp_config
1696 .data_dir
1697 .clone()
1698 .or_else(|| self.config.storage_dir.clone())
1699 .unwrap_or_else(|| std::path::PathBuf::from("./data"));
1700 Some(Box::new(
1701 laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
1702 &data_dir,
1703 max_retained,
1704 )
1705 .with_vnode_count(vnode_count),
1706 ))
1707 }
1708 }
1709
1710 pub async fn checkpoint(
1723 &self,
1724 ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
1725 if self.config.checkpoint.is_none() {
1726 return Err(DbError::Checkpoint(
1727 "checkpointing is not enabled".to_string(),
1728 ));
1729 }
1730
1731 let tx = self.force_ckpt_tx.lock().clone();
1737 if let Some(tx) = tx {
1738 let (reply_tx, reply_rx) = crossfire::oneshot::oneshot();
1739 tx.send(reply_tx).await.map_err(|_| {
1740 DbError::Checkpoint(
1741 "pipeline callback receiver closed — engine may be shutting down".into(),
1742 )
1743 })?;
1744 return reply_rx.await.map_err(|_| {
1745 DbError::Checkpoint("pipeline callback dropped oneshot before replying".into())
1746 })?;
1747 }
1748
1749 let mut guard = self.coordinator.lock().await;
1754 let coord = guard.as_mut().ok_or_else(|| {
1755 DbError::Checkpoint("coordinator not initialized — call start() first".to_string())
1756 })?;
1757 coord
1758 .checkpoint(crate::checkpoint_coordinator::CheckpointRequest::default())
1759 .await
1760 }
1761
1762 pub async fn checkpoint_stats(&self) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
1766 let guard = self.coordinator.lock().await;
1767 guard
1768 .as_ref()
1769 .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
1770 }
1771}
1772
1773impl std::fmt::Debug for LaminarDB {
1774 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1775 f.debug_struct("LaminarDB")
1776 .field("sources", &self.catalog.list_sources().len())
1777 .field("sinks", &self.catalog.list_sinks().len())
1778 .field("materialized_views", &self.mv_registry.lock().len())
1779 .field("checkpoint_enabled", &self.is_checkpoint_enabled())
1780 .field("shutdown", &self.is_closed())
1781 .finish_non_exhaustive()
1782 }
1783}
1784
1785struct LookupQueryPlanner {
1787 extension_planner: Arc<dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync>,
1788}
1789
1790impl std::fmt::Debug for LookupQueryPlanner {
1791 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1792 f.debug_struct("LookupQueryPlanner").finish_non_exhaustive()
1793 }
1794}
1795
1796#[async_trait::async_trait]
1797impl datafusion::execution::context::QueryPlanner for LookupQueryPlanner {
1798 async fn create_physical_plan(
1799 &self,
1800 logical_plan: &datafusion::logical_expr::LogicalPlan,
1801 session_state: &datafusion::execution::SessionState,
1802 ) -> datafusion_common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
1803 use datafusion::physical_planner::PhysicalPlanner;
1804 let planner =
1805 datafusion::physical_planner::DefaultPhysicalPlanner::with_extension_planners(vec![
1806 Arc::clone(&self.extension_planner),
1807 ]);
1808 planner
1809 .create_physical_plan(logical_plan, session_state)
1810 .await
1811 }
1812}
1813
1814#[cfg(test)]
1815mod tests;