1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow::array::{RecordBatch, StringArray};
8use arrow::datatypes::{DataType, Field, Schema};
9use datafusion::prelude::SessionContext;
10use laminar_core::streaming;
11use laminar_sql::parser::{parse_streaming_sql, ShowCommand, StreamingStatement};
12use laminar_sql::planner::StreamingPlanner;
13use laminar_sql::register_streaming_functions;
14use laminar_sql::translator::{AsofJoinTranslatorConfig, JoinOperatorConfig};
15
16use crate::builder::LaminarDbBuilder;
17use crate::catalog::SourceCatalog;
18use crate::config::LaminarConfig;
19use crate::error::DbError;
20use crate::handle::{
21 DdlInfo, ExecuteResult, QueryHandle, QueryInfo, SinkInfo, SourceHandle, SourceInfo,
22 UntypedSourceHandle,
23};
24use crate::pipeline::ControlMsg;
25use crate::sql_utils;
26
27pub(crate) type ControlMsgTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ControlMsg>>;
29
30#[repr(u8)]
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) enum DbState {
34 Created = 0,
35 Starting = 1,
36 Running = 2,
37 ShuttingDown = 3,
38 Stopped = 4,
39}
40
41impl DbState {
42 pub(crate) fn from_u8(raw: u8) -> Option<Self> {
43 Some(match raw {
44 0 => Self::Created,
45 1 => Self::Starting,
46 2 => Self::Running,
47 3 => Self::ShuttingDown,
48 4 => Self::Stopped,
49 _ => return None,
50 })
51 }
52
53 pub(crate) fn load(atomic: &std::sync::atomic::AtomicU8) -> Self {
54 Self::from_u8(atomic.load(std::sync::atomic::Ordering::Acquire)).unwrap_or(Self::Stopped)
55 }
56
57 pub(crate) fn store(self, atomic: &std::sync::atomic::AtomicU8) {
58 atomic.store(self as u8, std::sync::atomic::Ordering::Release);
59 }
60
61 pub(crate) fn compare_exchange(
64 current: Self,
65 new: Self,
66 atomic: &std::sync::atomic::AtomicU8,
67 ) -> Result<Self, Self> {
68 use std::sync::atomic::Ordering;
69 atomic
70 .compare_exchange(
71 current as u8,
72 new as u8,
73 Ordering::AcqRel,
74 Ordering::Acquire,
75 )
76 .map(|v| Self::from_u8(v).unwrap_or(Self::Stopped))
77 .map_err(|v| Self::from_u8(v).unwrap_or(Self::Stopped))
78 }
79}
80
81fn cache_entries_from_memory(mem: laminar_sql::parser::lookup_table::ByteSize) -> usize {
82 (mem.as_bytes() / 256).max(1024) as usize
83}
84
85pub struct LaminarDB {
91 pub(crate) catalog: Arc<SourceCatalog>,
92 pub(crate) planner: parking_lot::Mutex<StreamingPlanner>,
93 pub(crate) ctx: SessionContext,
94 pub(crate) config: LaminarConfig,
95 pub(crate) config_vars: Arc<HashMap<String, String>>,
96 pub(crate) shutdown: std::sync::atomic::AtomicBool,
97 pub(crate) coordinator:
99 Arc<tokio::sync::Mutex<Option<crate::checkpoint_coordinator::CheckpointCoordinator>>>,
100 pub(crate) connector_manager: parking_lot::Mutex<crate::connector_manager::ConnectorManager>,
101 pub(crate) connector_registry: Arc<laminar_connectors::registry::ConnectorRegistry>,
102 pub(crate) mv_registry: parking_lot::Mutex<laminar_core::mv::MvRegistry>,
103 pub(crate) table_store: Arc<parking_lot::RwLock<crate::table_store::TableStore>>,
104 pub(crate) state: Arc<std::sync::atomic::AtomicU8>,
105 pub(crate) runtime_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
106 pub(crate) shutdown_signal: Arc<tokio::sync::Notify>,
107 pub(crate) engine_metrics:
108 parking_lot::Mutex<Option<Arc<crate::engine_metrics::EngineMetrics>>>,
109 pub(crate) prometheus_registry: parking_lot::Mutex<Option<Arc<prometheus::Registry>>>,
110 pub(crate) start_time: std::time::Instant,
111 pub(crate) session_properties: parking_lot::Mutex<HashMap<String, String>>,
112 pub(crate) pipeline_watermark: Arc<std::sync::atomic::AtomicI64>,
114 pub(crate) lookup_registry: Arc<laminar_sql::datafusion::LookupTableRegistry>,
116 pub(crate) ai_runtime: Option<Arc<crate::ai::AiRuntime>>,
119 pub(crate) ai_handle: Option<tokio::runtime::Handle>,
121 pub(crate) control_tx: parking_lot::Mutex<Option<ControlMsgTx>>,
123 pub(crate) mv_store: Arc<parking_lot::RwLock<crate::mv_store::MvStore>>,
124 #[cfg(feature = "cluster")]
126 pub(crate) cluster_controller:
127 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::ClusterController>>>,
128 pub(crate) state_backend:
131 parking_lot::Mutex<Option<Arc<dyn laminar_core::state::StateBackend>>>,
132 pub(crate) vnode_registry: parking_lot::Mutex<Option<Arc<laminar_core::state::VnodeRegistry>>>,
133 pub(crate) physical_optimizer_rules:
135 Arc<[Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>]>,
136 pub(crate) pipeline_target_partitions: Option<usize>,
138 #[cfg(feature = "cluster")]
140 pub(crate) shuffle_sender:
141 parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleSender>>>,
142 #[cfg(feature = "cluster")]
145 pub(crate) shuffle_receiver:
146 Arc<parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleReceiver>>>>,
147 #[cfg(feature = "cluster")]
148 pub(crate) decision_store:
149 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::CheckpointDecisionStore>>>,
150 #[cfg(feature = "cluster")]
151 pub(crate) assignment_snapshot_store:
152 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>>,
153 #[cfg(feature = "cluster")]
157 pub(crate) catalog_manifest_store:
158 parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::CatalogManifestStore>>>,
159 #[cfg(feature = "cluster")]
168 pub(crate) rehydrated_vnode_state: Arc<parking_lot::Mutex<HashMap<u32, RehydratedVnode>>>,
169 pub(crate) force_ckpt_tx: parking_lot::Mutex<Option<ForceCheckpointTx>>,
173 pub(crate) subscription_registry: Arc<crate::subscription::SubscriptionRegistry>,
174 #[cfg(feature = "cluster")]
177 pub(crate) active_subs:
178 Arc<parking_lot::RwLock<std::collections::HashMap<String, std::collections::HashSet<u64>>>>,
179 pub(crate) stream_schemas:
181 parking_lot::RwLock<std::collections::HashMap<String, arrow_schema::SchemaRef>>,
182}
183
184pub(crate) type ForceCheckpointReply =
186 crossfire::oneshot::TxOneshot<Result<crate::checkpoint_coordinator::CheckpointResult, DbError>>;
187
188pub(crate) type ForceCheckpointTx =
191 crossfire::MAsyncTx<crossfire::mpsc::Array<ForceCheckpointReply>>;
192
193pub(crate) type ForceCheckpointRx =
194 crossfire::AsyncRx<crossfire::mpsc::Array<ForceCheckpointReply>>;
195
196pub(crate) const FORCE_CHECKPOINT_CHANNEL_CAPACITY: usize = 64;
197
198#[cfg(feature = "cluster")]
201const SUB_ROUTER_TICK: std::time::Duration = std::time::Duration::from_millis(10);
202#[cfg(feature = "cluster")]
204const SUB_REFRESH_ACTIVE_TICKS: u64 = 50;
205#[cfg(feature = "cluster")]
207const SUB_REFRESH_IDLE_TICKS: u64 = 500;
208
209pub(crate) struct SourceWatermarkState {
210 pub(crate) extractor: laminar_core::time::EventTimeExtractor,
211 pub(crate) generator: Box<dyn laminar_core::time::WatermarkGenerator>,
212 pub(crate) column: String,
213}
214
215pub(crate) fn filter_late_rows(
218 batch: &RecordBatch,
219 column: &str,
220 watermark: i64,
221) -> Result<Option<RecordBatch>, laminar_core::time::FilterError> {
222 laminar_core::time::filter_batch_by_timestamp(
223 batch,
224 column,
225 watermark,
226 laminar_core::time::ThresholdOp::GreaterEq,
227 )
228}
229
230pub(crate) use laminar_core::time::parse_duration_str;
231
232#[cfg(feature = "cluster")]
235#[derive(Debug, Clone)]
236pub struct RehydratedVnode {
237 pub epoch: u64,
239 pub bytes: bytes::Bytes,
241}
242
243#[cfg(feature = "cluster")]
249#[derive(Debug, Default)]
250pub struct SnapshotAdoption {
251 pub adopted: bool,
254 pub version: u64,
256 pub newly_acquired: Vec<u32>,
258 pub rehydrated: usize,
260 pub rehydration_epoch: Option<u64>,
262}
263
264impl LaminarDB {
265 pub fn open() -> Result<Self, DbError> {
271 Self::open_with_config(LaminarConfig::default())
272 }
273
274 pub fn open_with_config(config: LaminarConfig) -> Result<Self, DbError> {
280 Self::open_with_config_and_vars(config, HashMap::new())
281 }
282
283 #[allow(clippy::unnecessary_wraps)]
289 pub(crate) fn open_with_config_and_vars(
290 config: LaminarConfig,
291 config_vars: HashMap<String, String>,
292 ) -> Result<Self, DbError> {
293 Self::open_with_config_and_vars_and_rules(config, config_vars, &[], None)
294 }
295
296 #[allow(clippy::unnecessary_wraps)]
299 pub(crate) fn open_with_config_and_vars_and_rules(
300 config: LaminarConfig,
301 config_vars: HashMap<String, String>,
302 extra_optimizer_rules: &[Arc<
303 dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
304 >],
305 target_partitions: Option<usize>,
306 ) -> Result<Self, DbError> {
307 crossfire::detect_backoff_cfg();
311
312 let lookup_registry = Arc::new(laminar_sql::datafusion::LookupTableRegistry::new());
313
314 let ctx = {
317 let mut session_config = laminar_sql::datafusion::base_session_config();
318 if let Some(n) = target_partitions {
319 session_config = session_config.with_target_partitions(n);
320 }
321 let extension_planner: Arc<
322 dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync,
323 > = Arc::new(laminar_sql::datafusion::LookupJoinExtensionPlanner::new(
324 Arc::clone(&lookup_registry),
325 ));
326 let query_planner: Arc<dyn datafusion::execution::context::QueryPlanner + Send + Sync> =
327 Arc::new(LookupQueryPlanner { extension_planner });
328 let mut state_builder = datafusion::execution::SessionStateBuilder::new()
329 .with_config(session_config)
330 .with_default_features()
331 .with_query_planner(query_planner);
332 #[cfg(feature = "cluster")]
333 {
334 state_builder = state_builder.with_physical_optimizer_rule(Arc::new(
335 laminar_sql::datafusion::cluster_repartition::DistributedJoinRule,
336 ));
337 }
338 for rule in extra_optimizer_rules {
339 state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
340 }
341 SessionContext::new_with_state(state_builder.build())
342 };
343 register_streaming_functions(&ctx);
344
345 let catalog = Arc::new(SourceCatalog::new(
346 config.default_buffer_size,
347 config.default_backpressure,
348 ));
349
350 let connector_registry = Arc::new(laminar_connectors::registry::ConnectorRegistry::new());
351 Self::register_builtin_connectors(&connector_registry);
352 #[cfg(feature = "cluster")]
353 let mut physical_rules = extra_optimizer_rules.to_vec();
354 #[cfg(feature = "cluster")]
355 {
356 physical_rules.push(Arc::new(
357 laminar_sql::datafusion::cluster_repartition::DistributedJoinRule,
358 ));
359 }
360 #[cfg(not(feature = "cluster"))]
361 let physical_rules = extra_optimizer_rules.to_vec();
362
363 Ok(Self {
364 catalog,
365 planner: parking_lot::Mutex::new(StreamingPlanner::new()),
366 ctx,
367 config,
368 config_vars: Arc::new(config_vars),
369 shutdown: std::sync::atomic::AtomicBool::new(false),
370 coordinator: Arc::new(tokio::sync::Mutex::new(None)),
371 connector_manager: parking_lot::Mutex::new(
372 crate::connector_manager::ConnectorManager::new(),
373 ),
374 connector_registry,
375 mv_registry: parking_lot::Mutex::new(laminar_core::mv::MvRegistry::new()),
376 table_store: Arc::new(parking_lot::RwLock::new(
377 crate::table_store::TableStore::new(),
378 )),
379 state: Arc::new(std::sync::atomic::AtomicU8::new(DbState::Created as u8)),
380 runtime_handle: parking_lot::Mutex::new(None),
381 shutdown_signal: Arc::new(tokio::sync::Notify::new()),
382 engine_metrics: parking_lot::Mutex::new(None),
383 prometheus_registry: parking_lot::Mutex::new(None),
384 start_time: std::time::Instant::now(),
385 session_properties: parking_lot::Mutex::new(HashMap::new()),
386 pipeline_watermark: Arc::new(std::sync::atomic::AtomicI64::new(i64::MIN)),
387 lookup_registry,
388 ai_runtime: None,
389 ai_handle: None,
390 control_tx: parking_lot::Mutex::new(None),
391 mv_store: Arc::new(parking_lot::RwLock::new(crate::mv_store::MvStore::new())),
392 #[cfg(feature = "cluster")]
393 cluster_controller: parking_lot::Mutex::new(None),
394 state_backend: parking_lot::Mutex::new(None),
395 vnode_registry: parking_lot::Mutex::new(None),
396 physical_optimizer_rules: physical_rules.into(),
397 pipeline_target_partitions: target_partitions,
398 #[cfg(feature = "cluster")]
399 shuffle_sender: parking_lot::Mutex::new(None),
400 #[cfg(feature = "cluster")]
401 shuffle_receiver: Arc::new(parking_lot::Mutex::new(None)),
402 #[cfg(feature = "cluster")]
403 decision_store: parking_lot::Mutex::new(None),
404 #[cfg(feature = "cluster")]
405 assignment_snapshot_store: parking_lot::Mutex::new(None),
406 #[cfg(feature = "cluster")]
407 catalog_manifest_store: parking_lot::Mutex::new(None),
408 #[cfg(feature = "cluster")]
409 rehydrated_vnode_state: Arc::new(parking_lot::Mutex::new(HashMap::new())),
410 force_ckpt_tx: parking_lot::Mutex::new(None),
411 subscription_registry: Arc::new(crate::subscription::SubscriptionRegistry::new()),
412 #[cfg(feature = "cluster")]
413 active_subs: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
414 stream_schemas: parking_lot::RwLock::new(std::collections::HashMap::new()),
415 })
416 }
417
418 pub(crate) fn set_ai_runtime(
422 &mut self,
423 runtime: Arc<crate::ai::AiRuntime>,
424 handle: tokio::runtime::Handle,
425 ) {
426 if let Err(e) = crate::ai_catalog::register_ai_catalog(&self.ctx, &runtime) {
430 tracing::warn!(error = %e, "failed to register laminar.* AI catalog views");
431 }
432 self.ai_runtime = Some(runtime);
433 self.ai_handle = Some(handle);
434 }
435
436 #[cfg(feature = "cluster")]
437 pub(crate) fn set_shuffle_sender(&self, sender: Arc<laminar_core::shuffle::ShuffleSender>) {
438 *self.shuffle_sender.lock() = Some(sender);
439 self.update_sql_cluster_context();
440 }
441
442 #[cfg(feature = "cluster")]
443 pub(crate) fn set_shuffle_receiver(
444 &self,
445 receiver: Arc<laminar_core::shuffle::ShuffleReceiver>,
446 ) {
447 *self.shuffle_receiver.lock() = Some(receiver);
448 self.update_sql_cluster_context();
449 }
450
451 #[cfg(feature = "cluster")]
452 fn update_sql_cluster_context(&self) {
453 if let (Some(registry), Some(sender), Some(receiver)) = (
454 self.vnode_registry.lock().as_ref(),
455 self.shuffle_sender.lock().as_ref(),
456 self.shuffle_receiver.lock().as_ref(),
457 ) {
458 let self_id = self
459 .cluster_controller
460 .lock()
461 .as_ref()
462 .map_or(laminar_core::state::NodeId(0), |c| {
463 laminar_core::state::NodeId(c.instance_id().0)
464 });
465 laminar_sql::datafusion::cluster_repartition::set_cluster_context(
466 Arc::clone(registry),
467 Arc::clone(sender),
468 Arc::clone(receiver),
469 self_id,
470 );
471 }
472 }
473
474 #[cfg(feature = "cluster")]
475 pub(crate) fn set_decision_store(
476 &self,
477 store: Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
478 ) {
479 *self.decision_store.lock() = Some(store);
480 }
481
482 #[cfg(feature = "cluster")]
483 pub(crate) fn set_assignment_snapshot_store(
484 &self,
485 store: Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
486 ) {
487 *self.assignment_snapshot_store.lock() = Some(store);
488 }
489
490 #[cfg(feature = "cluster")]
493 pub(crate) fn set_catalog_manifest_store(
494 &self,
495 store: Arc<laminar_core::cluster::control::CatalogManifestStore>,
496 ) {
497 *self.catalog_manifest_store.lock() = Some(store);
498 }
499
500 #[cfg(feature = "cluster")]
505 pub(crate) async fn persist_catalog_manifest(&self) {
506 let Some(store) = self.catalog_manifest_store.lock().clone() else {
507 return;
508 };
509 let entries: Vec<laminar_core::cluster::control::CatalogManifestEntry> = self
510 .connector_manager
511 .lock()
512 .ordered_ddl()
513 .into_iter()
514 .map(|(name, ddl)| laminar_core::cluster::control::CatalogManifestEntry { name, ddl })
515 .collect();
516 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
517 let now_ms = std::time::SystemTime::now()
518 .duration_since(std::time::UNIX_EPOCH)
519 .map_or(0, |d| d.as_millis() as i64);
520 let manifest = laminar_core::cluster::control::CatalogManifest {
521 version: now_ms.unsigned_abs(),
524 updated_at_ms: now_ms,
525 entries,
526 };
527 if let Err(e) = store.save(&manifest).await {
528 tracing::warn!(error = %e, "catalog manifest persist failed");
529 }
530 }
531
532 #[cfg(feature = "cluster")]
538 pub(crate) async fn restore_catalog_from_manifest(&self) {
539 let Some(store) = self.catalog_manifest_store.lock().clone() else {
540 return;
541 };
542 let manifest = match store.load().await {
543 Ok(Some(m)) => m,
544 Ok(None) => return,
545 Err(e) => {
546 tracing::warn!(error = %e, "catalog manifest load failed — skipping replay");
547 return;
548 }
549 };
550 let detached = self.catalog_manifest_store.lock().take();
554 for entry in &manifest.entries {
555 if self.catalog_object_exists(&entry.name) {
556 continue;
557 }
558 match self.execute(&entry.ddl).await {
559 Ok(_) => tracing::info!(name = %entry.name, "replayed catalog DDL from manifest"),
560 Err(e) => tracing::warn!(
561 name = %entry.name, error = %e,
562 "catalog manifest replay failed for object"
563 ),
564 }
565 }
566 *self.catalog_manifest_store.lock() = detached;
567 }
568
569 #[cfg(feature = "cluster")]
572 fn catalog_object_exists(&self, name: &str) -> bool {
573 let mgr = self.connector_manager.lock();
574 mgr.sources().contains_key(name)
575 || mgr.sinks().contains_key(name)
576 || mgr.streams().contains_key(name)
577 || mgr.tables().contains_key(name)
578 }
579
580 #[cfg(feature = "cluster")]
590 pub async fn adopt_assignment_snapshot(
591 &self,
592 snapshot: laminar_core::cluster::control::AssignmentSnapshot,
593 ) -> SnapshotAdoption {
594 let Some(registry) = self.vnode_registry.lock().clone() else {
595 return SnapshotAdoption::default();
596 };
597 if snapshot.version <= registry.assignment_version() {
598 return SnapshotAdoption {
599 adopted: false,
600 version: snapshot.version,
601 ..SnapshotAdoption::default()
602 };
603 }
604 let vnode_count = registry.vnode_count();
605 let new_assignment: Arc<[laminar_core::state::NodeId]> =
606 snapshot.to_vnode_vec(vnode_count).into();
607
608 let self_id = self
609 .cluster_controller
610 .lock()
611 .as_ref()
612 .map_or(laminar_core::state::NodeId(0), |c| {
613 laminar_core::state::NodeId(c.instance_id().0)
614 });
615
616 let mut guard = self.coordinator.lock().await;
620 let old_owned = laminar_core::state::owned_vnodes(®istry, self_id);
621 registry.set_assignment_and_version(new_assignment, snapshot.version);
622 let new_owned = laminar_core::state::owned_vnodes(®istry, self_id);
623 if let Some(backend) = self.state_backend.lock().clone() {
624 backend.set_authoritative_version(snapshot.version);
625 }
626 if let Some(coord) = guard.as_mut() {
627 coord.set_assignment_version(snapshot.version);
628 coord.set_vnode_set(new_owned.clone());
629 coord.set_gate_vnode_set((0..vnode_count).collect());
630 }
631 drop(guard);
632
633 let old_set: std::collections::HashSet<u32> = old_owned.into_iter().collect();
635 let newly_acquired: Vec<u32> = new_owned
636 .into_iter()
637 .filter(|v| !old_set.contains(v))
638 .collect();
639
640 if !newly_acquired.is_empty() {
647 registry.mark_restoring(&newly_acquired);
648 }
649
650 let mut adoption = SnapshotAdoption {
651 adopted: true,
652 version: snapshot.version,
653 newly_acquired: newly_acquired.clone(),
654 rehydrated: 0,
655 rehydration_epoch: None,
656 };
657
658 let backend = self.state_backend.lock().clone();
663 if let (false, Some(backend)) = (newly_acquired.is_empty(), backend) {
664 let report = crate::recovery_manager::VnodeRehydrator::new(backend.as_ref())
665 .rehydrate(&newly_acquired)
666 .await;
667 adoption.rehydrated = report.restored.len();
668 adoption.rehydration_epoch = report.epoch;
669 let no_state: Vec<u32> = newly_acquired
673 .iter()
674 .copied()
675 .filter(|v| !report.restored.contains_key(v))
676 .collect();
677 if !no_state.is_empty() {
678 registry.mark_active(&no_state);
679 }
680 if let Some(epoch) = report.epoch {
681 let mut staged = self.rehydrated_vnode_state.lock();
682 for (vnode, bytes) in report.restored {
683 staged.insert(vnode, RehydratedVnode { epoch, bytes });
684 }
685 }
686 } else if !newly_acquired.is_empty() {
687 registry.mark_active(&newly_acquired);
690 }
691
692 tracing::info!(
693 version = snapshot.version,
694 newly_acquired = adoption.newly_acquired.len(),
695 rehydrated = adoption.rehydrated,
696 rehydration_epoch = ?adoption.rehydration_epoch,
697 "adopted assignment snapshot",
698 );
699 adoption
700 }
701
702 #[cfg(feature = "cluster")]
707 #[must_use]
708 pub fn rehydrated_vnode_state(&self) -> HashMap<u32, RehydratedVnode> {
709 self.rehydrated_vnode_state.lock().clone()
710 }
711
712 #[cfg(feature = "cluster")]
713 pub(crate) fn set_cluster_controller(
714 &self,
715 controller: Arc<laminar_core::cluster::control::ClusterController>,
716 ) {
717 controller.register_query_handler(Arc::new(DbQueryHandler {
720 mv_store: Arc::downgrade(&self.mv_store),
721 table_store: Arc::downgrade(&self.table_store),
722 filter_ctx: SessionContext::new(),
725 }));
726 self.spawn_subscription_router(&controller);
727 *self.cluster_controller.lock() = Some(controller);
728 self.update_sql_cluster_context();
729 }
730
731 #[cfg(feature = "cluster")]
735 fn spawn_subscription_router(
736 &self,
737 controller: &Arc<laminar_core::cluster::control::ClusterController>,
738 ) {
739 use std::collections::{HashMap, HashSet};
740
741 if tokio::runtime::Handle::try_current().is_err() {
744 return;
745 }
746
747 let kv = Arc::clone(controller.kv());
748
749 if !kv.supports_subscription_routing() {
752 tracing::info!(
753 "distributed SUBSCRIBE routing disabled: coordination backend has \
754 no subscription-interest discovery"
755 );
756 return;
757 }
758
759 let active_subs = Arc::downgrade(&self.active_subs);
760 let subscription_registry = Arc::downgrade(&self.subscription_registry);
761 let shuffle_receiver = Arc::downgrade(&self.shuffle_receiver);
762
763 tokio::spawn(async move {
764 let mut tick = tokio::time::interval(SUB_ROUTER_TICK);
765 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
766 let mut advertised: HashSet<String> = HashSet::new();
767 let mut until_refresh: u64 = 0;
768 loop {
769 tick.tick().await;
770
771 let (Some(active_subs), Some(registry), Some(receiver_slot)) = (
773 active_subs.upgrade(),
774 subscription_registry.upgrade(),
775 shuffle_receiver.upgrade(),
776 ) else {
777 break;
778 };
779
780 let local_names = registry.active_subscription_names();
781
782 if until_refresh == 0 {
784 let mut map: HashMap<String, HashSet<u64>> = HashMap::new();
785 for (node_id, key, value) in kv.scan_prefix("sub:").await {
786 if !value.is_empty() {
787 if let Some(name) = key.strip_prefix("sub:") {
788 map.entry(name.to_string()).or_default().insert(node_id.0);
789 }
790 }
791 }
792 let idle = map.is_empty() && local_names.is_empty();
793 *active_subs.write() = map;
794 until_refresh = if idle {
795 SUB_REFRESH_IDLE_TICKS
796 } else {
797 SUB_REFRESH_ACTIVE_TICKS
798 };
799 }
800 until_refresh = until_refresh.saturating_sub(1);
801
802 for name in &local_names {
805 if advertised.insert(name.clone()) {
806 kv.write(&format!("sub:{name}"), "active".to_string()).await;
807 }
808 }
809 let removed: Vec<String> = advertised
810 .iter()
811 .filter(|n| !local_names.contains(*n))
812 .cloned()
813 .collect();
814 for name in removed {
815 kv.write(&format!("sub:{name}"), String::new()).await;
816 advertised.remove(&name);
817 }
818
819 if !local_names.is_empty() {
823 let receiver = receiver_slot.lock().clone();
824 if let Some(receiver) = receiver {
825 for (stage, batches) in receiver
826 .drain_staged_with_prefix(crate::subscription::REMOTE_STAGE_PREFIX)
827 {
828 if let Some(name) =
829 crate::subscription::stream_from_remote_stage(&stage)
830 {
831 for batch in batches {
832 registry.send_batch(name, batch);
833 }
834 }
835 }
836 }
837 }
838 }
839 });
840 }
841
842 pub(crate) fn set_state_backend(&self, backend: Arc<dyn laminar_core::state::StateBackend>) {
843 *self.state_backend.lock() = Some(backend);
844 }
845
846 pub(crate) fn set_vnode_registry(&self, registry: Arc<laminar_core::state::VnodeRegistry>) {
847 *self.vnode_registry.lock() = Some(registry);
848 #[cfg(feature = "cluster")]
849 self.update_sql_cluster_context();
850 }
851
852 #[must_use]
854 pub fn session_context(&self) -> &SessionContext {
855 &self.ctx
856 }
857
858 #[must_use]
860 pub fn builder() -> LaminarDbBuilder {
861 LaminarDbBuilder::new()
862 }
863
864 #[allow(unused_variables)]
866 fn register_builtin_connectors(registry: &laminar_connectors::registry::ConnectorRegistry) {
867 laminar_connectors::generator::register_generator_source(registry);
870 #[cfg(feature = "kafka")]
871 {
872 laminar_connectors::kafka::register_kafka_source(registry);
873 laminar_connectors::kafka::register_kafka_sink(registry);
874 }
875 #[cfg(feature = "postgres-cdc")]
876 {
877 laminar_connectors::cdc::postgres::register_postgres_cdc_source(registry);
878 }
879 #[cfg(feature = "postgres-sink")]
880 {
881 laminar_connectors::postgres::register_postgres_sink(registry);
882 }
883 #[cfg(feature = "delta-lake")]
884 {
885 laminar_connectors::lakehouse::register_delta_lake_sink(registry);
886 laminar_connectors::lakehouse::register_delta_lake_source(registry);
887 }
888 #[cfg(feature = "iceberg")]
889 {
890 laminar_connectors::lakehouse::register_iceberg_sink(registry);
891 laminar_connectors::lakehouse::register_iceberg_source(registry);
892 }
893 #[cfg(feature = "websocket")]
894 {
895 laminar_connectors::websocket::register_websocket_source(registry);
896 laminar_connectors::websocket::register_websocket_sink(registry);
897 }
898 #[cfg(feature = "mysql-cdc")]
899 {
900 laminar_connectors::cdc::mysql::register_mysql_cdc_source(registry);
901 }
902 #[cfg(feature = "mongodb-cdc")]
903 {
904 laminar_connectors::mongodb::register_mongodb_cdc_source(registry);
905 laminar_connectors::mongodb::register_mongodb_sink(registry);
906 }
907 #[cfg(feature = "files")]
908 {
909 laminar_connectors::files::register_file_source(registry);
910 laminar_connectors::files::register_file_sink(registry);
911 }
912 #[cfg(feature = "otel")]
913 {
914 laminar_connectors::otel::register_otel_source(registry);
915 }
916 #[cfg(feature = "nats")]
917 {
918 laminar_connectors::nats::register_nats_source(registry);
919 laminar_connectors::nats::register_nats_sink(registry);
920 }
921 }
922
923 fn handle_register_lookup_table(
927 &self,
928 info: laminar_sql::planner::LookupTableInfo,
929 ) -> Result<ExecuteResult, DbError> {
930 use laminar_sql::parser::lookup_table::ConnectorType;
931
932 if info.primary_key.len() != 1 {
933 return Err(DbError::InvalidOperation(
934 "Lookup table requires a single-column primary key".into(),
935 ));
936 }
937 let pk = info.primary_key[0].clone();
938
939 let cache_mode = info.properties.cache_memory.map(|mem| {
941 let max_entries = cache_entries_from_memory(mem);
942 crate::table_cache_mode::TableCacheMode::Partial { max_entries }
943 });
944 if let Some(cache) = cache_mode {
945 self.table_store.write().create_table_with_cache(
946 &info.name,
947 info.arrow_schema.clone(),
948 &pk,
949 cache,
950 )?;
951 } else {
952 self.table_store
953 .write()
954 .create_table(&info.name, info.arrow_schema.clone(), &pk)?;
955 }
956
957 if !matches!(info.properties.connector, ConnectorType::Static) {
960 self.register_lookup_connector(&info, &pk)?;
961 }
962
963 {
965 let provider = crate::table_provider::ReferenceTableProvider::new(
966 info.name.clone(),
967 info.arrow_schema.clone(),
968 self.table_store.clone(),
969 );
970 let _ = self.ctx.deregister_table(&info.name);
971 self.ctx
972 .register_table(&info.name, Arc::new(provider))
973 .map_err(|e| {
974 DbError::InvalidOperation(format!("Failed to register lookup table: {e}"))
975 })?;
976 }
977
978 if let Some(batch) = self.table_store.read().to_record_batch(&info.name) {
981 self.lookup_registry.register(
982 &info.name,
983 laminar_sql::datafusion::LookupSnapshot { batch },
984 );
985 }
986
987 self.refresh_lookup_optimizer_rule();
990
991 Ok(ExecuteResult::Ddl(DdlInfo {
992 statement_type: "CREATE LOOKUP TABLE".to_string(),
993 object_name: info.name,
994 }))
995 }
996
997 #[allow(clippy::unnecessary_wraps)]
1000 fn register_lookup_connector(
1001 &self,
1002 info: &laminar_sql::planner::LookupTableInfo,
1003 pk: &str,
1004 ) -> Result<(), DbError> {
1005 use laminar_sql::parser::lookup_table::ConnectorType;
1006
1007 let connector_type_str = match &info.properties.connector {
1008 ConnectorType::Postgres => "postgres",
1009 ConnectorType::PostgresCdc => "postgres-cdc",
1010 ConnectorType::MysqlCdc => "mysql-cdc",
1011 ConnectorType::Redis => "redis",
1012 ConnectorType::S3Parquet => "s3-parquet",
1013 ConnectorType::DeltaLake => "delta-lake",
1014 ConnectorType::Custom(s) => s.as_str(),
1015 ConnectorType::Static => unreachable!(),
1016 };
1017
1018 self.table_store
1019 .write()
1020 .set_connector(&info.name, connector_type_str);
1021
1022 let refresh = match info.properties.strategy {
1023 laminar_sql::parser::lookup_table::LookupStrategy::Replicated
1024 | laminar_sql::parser::lookup_table::LookupStrategy::Partitioned => {
1025 if matches!(info.properties.connector, ConnectorType::Postgres) {
1027 Some(laminar_connectors::reference::RefreshMode::SnapshotOnly)
1028 } else {
1029 Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
1030 }
1031 }
1032 laminar_sql::parser::lookup_table::LookupStrategy::OnDemand => {
1033 Some(laminar_connectors::reference::RefreshMode::Manual)
1034 }
1035 };
1036
1037 let consumed = [
1041 "connector",
1042 "strategy",
1043 "cache.memory",
1044 "cache.disk",
1045 "cache.ttl",
1046 "pushdown",
1047 "format",
1048 ];
1049 let mut connector_options = HashMap::with_capacity(info.raw_options.len());
1050 let mut format_options = HashMap::with_capacity(4);
1051 for (k, v) in &info.raw_options {
1052 let lower = k.to_lowercase();
1053 if consumed.contains(&lower.as_str()) {
1054 continue;
1055 }
1056 if let Some(suffix) = lower.strip_prefix("format.") {
1057 format_options.insert(suffix.to_string(), v.clone());
1058 } else {
1059 connector_options.insert(k.clone(), v.clone());
1060 }
1061 }
1062
1063 let cache_max_bytes = info
1066 .properties
1067 .cache_memory
1068 .map(|m| usize::try_from(m.as_bytes()).unwrap_or(usize::MAX));
1069
1070 let cache_ttl = info
1073 .properties
1074 .cache_ttl
1075 .map(std::time::Duration::from_secs);
1076
1077 self.connector_manager
1078 .lock()
1079 .register_table(crate::connector_manager::TableRegistration {
1080 name: info.name.clone(),
1081 primary_key: pk.to_string(),
1082 connector_type: Some(connector_type_str.to_string()),
1083 connector_options,
1084 format: info.raw_options.get("format").cloned(),
1085 format_options,
1086 refresh,
1087 cache_max_bytes,
1088 cache_ttl,
1089 });
1090
1091 Ok(())
1092 }
1093
1094 fn refresh_lookup_optimizer_rule(&self) {
1097 use laminar_sql::planner::lookup_join::{LookupColumnPruningRule, LookupJoinRewriteRule};
1098 use laminar_sql::planner::predicate_split::{
1099 PlanPushdownMode, PlanSourceCapabilities, PredicateSplitterRule,
1100 SourceCapabilitiesRegistry,
1101 };
1102
1103 self.ctx.remove_optimizer_rule("lookup_join_rewrite");
1105 self.ctx.remove_optimizer_rule("predicate_splitter");
1106 self.ctx.remove_optimizer_rule("lookup_column_pruning");
1107
1108 let tables = self.planner.lock().lookup_tables_cloned();
1109 if tables.is_empty() {
1110 return;
1111 }
1112
1113 let mut caps_registry = SourceCapabilitiesRegistry::default();
1115 for (name, info) in &tables {
1116 let mode = match info.properties.pushdown_mode {
1117 laminar_sql::parser::lookup_table::PushdownMode::Enabled
1118 | laminar_sql::parser::lookup_table::PushdownMode::Auto => PlanPushdownMode::Full,
1119 laminar_sql::parser::lookup_table::PushdownMode::Disabled => PlanPushdownMode::None,
1120 };
1121 let pk_set: std::collections::HashSet<String> =
1122 info.primary_key.iter().cloned().collect();
1123 caps_registry.register(
1124 name.clone(),
1125 PlanSourceCapabilities {
1126 pushdown_mode: mode,
1127 eq_columns: pk_set,
1128 range_columns: std::collections::HashSet::new(),
1129 in_columns: std::collections::HashSet::new(),
1130 supports_null_check: false,
1131 },
1132 );
1133 }
1134
1135 self.ctx
1137 .add_optimizer_rule(Arc::new(LookupJoinRewriteRule::new(tables)));
1138 self.ctx
1139 .add_optimizer_rule(Arc::new(PredicateSplitterRule::new(caps_registry)));
1140 self.ctx
1141 .add_optimizer_rule(Arc::new(LookupColumnPruningRule));
1142 }
1143
1144 #[must_use]
1149 pub fn connector_registry(&self) -> &laminar_connectors::registry::ConnectorRegistry {
1150 &self.connector_registry
1151 }
1152
1153 pub(crate) fn register_custom_udf(&self, udf: datafusion_expr::ScalarUDF) {
1157 self.ctx.register_udf(udf);
1158 }
1159
1160 pub(crate) fn register_custom_udaf(&self, udaf: datafusion_expr::AggregateUDF) {
1164 self.ctx.register_udaf(udaf);
1165 }
1166
1167 #[cfg(feature = "delta-lake")]
1184 pub async fn register_delta_table(
1185 &self,
1186 name: &str,
1187 table_uri: &str,
1188 storage_options: HashMap<String, String>,
1189 ) -> Result<(), DbError> {
1190 laminar_connectors::lakehouse::delta_table_provider::register_delta_table(
1191 &self.ctx,
1192 name,
1193 table_uri,
1194 storage_options,
1195 )
1196 .await
1197 .map_err(DbError::from)
1198 }
1199
1200 pub async fn execute(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1216 if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1217 return Err(DbError::Shutdown);
1218 }
1219
1220 let resolved = if self.config_vars.is_empty() {
1222 sql.to_string()
1223 } else {
1224 sql_utils::resolve_config_vars(sql, &self.config_vars, true)?
1225 };
1226
1227 let stmts = sql_utils::split_statements(&resolved);
1229 if stmts.is_empty() {
1230 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1231 }
1232
1233 let mut last_result = None;
1235 for stmt_sql in &stmts {
1236 last_result = Some(self.execute_single(stmt_sql).await?);
1237 }
1238
1239 last_result.ok_or_else(|| DbError::InvalidOperation("Empty SQL statement".into()))
1240 }
1241
1242 #[allow(clippy::too_many_lines)]
1244 async fn execute_single(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1245 let statements = parse_streaming_sql(sql)?;
1246
1247 if statements.is_empty() {
1248 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1249 }
1250
1251 let statement = &statements[0];
1252
1253 let result = match statement {
1254 StreamingStatement::CreateSource(create) => {
1255 let result = self.handle_create_source(create).await?;
1256 if let ExecuteResult::Ddl(ref info) = result {
1257 self.connector_manager
1258 .lock()
1259 .store_ddl(&info.object_name, sql);
1260 }
1261 Ok(result)
1262 }
1263 StreamingStatement::CreateSink(create) => {
1264 let result = self.handle_create_sink(create)?;
1265 if let ExecuteResult::Ddl(ref info) = result {
1266 self.connector_manager
1267 .lock()
1268 .store_ddl(&info.object_name, sql);
1269 }
1270 Ok(result)
1271 }
1272 StreamingStatement::CreateStream {
1273 name,
1274 query,
1275 emit_clause,
1276 query_sql,
1277 retention_bytes,
1278 ..
1279 } => {
1280 let result = self
1281 .handle_create_stream(
1282 name,
1283 query,
1284 emit_clause.as_ref(),
1285 query_sql,
1286 *retention_bytes,
1287 )
1288 .await?;
1289 if let ExecuteResult::Ddl(ref info) = result {
1290 self.connector_manager
1291 .lock()
1292 .store_ddl(&info.object_name, sql);
1293 }
1294 Ok(result)
1295 }
1296 StreamingStatement::CreateContinuousQuery { .. }
1297 | StreamingStatement::CreateLookupTable(_)
1298 | StreamingStatement::DropLookupTable { .. } => self.handle_query(sql).await,
1299 StreamingStatement::Standard(stmt) => {
1300 if let sqlparser::ast::Statement::CreateTable(ct) = stmt.as_ref() {
1301 let result = self.handle_create_table(ct)?;
1302 if let ExecuteResult::Ddl(ref info) = result {
1303 self.connector_manager
1304 .lock()
1305 .store_ddl(&info.object_name, sql);
1306 }
1307 Ok(result)
1308 } else if let sqlparser::ast::Statement::Drop {
1309 object_type: sqlparser::ast::ObjectType::Table,
1310 names,
1311 if_exists,
1312 ..
1313 } = stmt.as_ref()
1314 {
1315 self.handle_drop_table(names, *if_exists)
1316 } else if let sqlparser::ast::Statement::Set(set_stmt) = stmt.as_ref() {
1317 self.handle_set(set_stmt)
1318 } else {
1319 self.handle_query(sql).await
1320 }
1321 }
1322 StreamingStatement::InsertInto {
1323 table_name,
1324 columns,
1325 values,
1326 } => self.handle_insert_into(table_name, columns, values).await,
1327 StreamingStatement::DropSource {
1328 name,
1329 if_exists,
1330 cascade,
1331 } => self.handle_drop_source(name, *if_exists, *cascade),
1332 StreamingStatement::DropSink {
1333 name,
1334 if_exists,
1335 cascade,
1336 } => self.handle_drop_sink(name, *if_exists, *cascade),
1337 StreamingStatement::DropStream {
1338 name,
1339 if_exists,
1340 cascade,
1341 } => self.handle_drop_stream(name, *if_exists, *cascade),
1342 StreamingStatement::DropMaterializedView {
1343 name,
1344 if_exists,
1345 cascade,
1346 } => self.handle_drop_materialized_view(name, *if_exists, *cascade),
1347 StreamingStatement::Show(cmd) => {
1348 let batch = match cmd {
1349 ShowCommand::Sources => self.build_show_sources(),
1350 ShowCommand::Sinks => self.build_show_sinks(),
1351 ShowCommand::Queries => self.build_show_queries(),
1352 ShowCommand::MaterializedViews => self.build_show_materialized_views(),
1353 ShowCommand::Streams => self.build_show_streams(),
1354 ShowCommand::Tables => self.build_show_tables(),
1355 ShowCommand::CheckpointStatus => self.build_show_checkpoint_status().await?,
1356 ShowCommand::CreateSource { name } => {
1357 self.build_show_create_source(&name.to_string())?
1358 }
1359 ShowCommand::CreateSink { name } => {
1360 self.build_show_create_sink(&name.to_string())?
1361 }
1362 };
1363 Ok(ExecuteResult::Metadata(batch))
1364 }
1365 StreamingStatement::Checkpoint => {
1366 let result = self.checkpoint().await?;
1367 Ok(ExecuteResult::Ddl(DdlInfo {
1368 statement_type: "CHECKPOINT".to_string(),
1369 object_name: format!("checkpoint_{}", result.checkpoint_id),
1370 }))
1371 }
1372 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1373 self.handle_restore_checkpoint(*checkpoint_id)
1374 }
1375 StreamingStatement::Describe { name, .. } => {
1376 let name_str = name.to_string();
1377 let batch = self.build_describe(&name_str)?;
1378 Ok(ExecuteResult::Metadata(batch))
1379 }
1380 StreamingStatement::Explain {
1381 statement, analyze, ..
1382 } => {
1383 if *analyze {
1384 self.handle_explain_analyze(statement, sql).await
1385 } else {
1386 self.handle_explain(statement)
1387 }
1388 }
1389 StreamingStatement::CreateMaterializedView {
1390 name,
1391 query,
1392 emit_clause,
1393 or_replace,
1394 if_not_exists,
1395 query_sql,
1396 ..
1397 } => {
1398 let result = self
1399 .handle_create_materialized_view(
1400 sql,
1401 name,
1402 query,
1403 emit_clause.clone(),
1404 *or_replace,
1405 *if_not_exists,
1406 query_sql,
1407 )
1408 .await?;
1409 if let ExecuteResult::Ddl(ref info) = result {
1410 self.connector_manager
1411 .lock()
1412 .store_ddl(&info.object_name, sql);
1413 }
1414 Ok(result)
1415 }
1416 StreamingStatement::AlterSource { name, operation } => {
1417 self.handle_alter_source(name, operation)
1418 }
1419 StreamingStatement::Subscribe(_) => Err(DbError::InvalidOperation(
1420 "SUBSCRIBE requires the pgwire endpoint, not HTTP /api/v1/sql".into(),
1421 )),
1422 StreamingStatement::DeclareCursorForSubscribe { .. } => Err(DbError::InvalidOperation(
1423 "DECLARE CURSOR FOR SUBSCRIBE requires the pgwire endpoint, not HTTP /api/v1/sql"
1424 .into(),
1425 )),
1426 };
1427
1428 #[cfg(feature = "cluster")]
1429 if let Ok(ExecuteResult::Ddl(ref info)) = &result {
1430 if info.statement_type != "CHECKPOINT" {
1431 self.persist_catalog_manifest().await;
1432 }
1433 }
1434
1435 result
1436 }
1437
1438 async fn handle_insert_into(
1443 &self,
1444 table_name: &sqlparser::ast::ObjectName,
1445 _columns: &[sqlparser::ast::Ident],
1446 values: &[Vec<sqlparser::ast::Expr>],
1447 ) -> Result<ExecuteResult, DbError> {
1448 let name = table_name.to_string();
1449
1450 if let Some(entry) = self.catalog.get_source(&name) {
1452 let batch = sql_utils::sql_values_to_record_batch(&entry.schema, values)?;
1453 entry
1454 .push_and_buffer(batch)
1455 .map_err(|e| DbError::InsertError(format!("Failed to push to source: {e}")))?;
1456 return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1457 }
1458
1459 {
1462 let mut ts = self.table_store.write();
1463 if ts.has_table(&name) {
1464 let schema = ts
1465 .table_schema(&name)
1466 .ok_or_else(|| DbError::TableNotFound(name.clone()))?;
1467 let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1468 ts.upsert(&name, &batch)?;
1469 drop(ts); self.sync_table_to_datafusion(&name)?;
1472 return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1473 }
1474 }
1475
1476 let table = self
1479 .ctx
1480 .table_provider(&name)
1481 .await
1482 .map_err(|_| DbError::TableNotFound(name.clone()))?;
1483
1484 let schema = table.schema();
1485 let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1486
1487 self.ctx
1489 .deregister_table(&name)
1490 .map_err(|e| DbError::InsertError(format!("Failed to deregister table: {e}")))?;
1491
1492 let mem_table =
1493 datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![batch]])
1494 .map_err(|e| DbError::InsertError(format!("Failed to create table: {e}")))?;
1495
1496 self.ctx
1497 .register_table(&name, Arc::new(mem_table))
1498 .map_err(|e| DbError::InsertError(format!("Failed to register table: {e}")))?;
1499
1500 Ok(ExecuteResult::RowsAffected(values.len() as u64))
1501 }
1502
1503 #[allow(clippy::unused_self)] fn handle_restore_checkpoint(&self, _checkpoint_id: u64) -> Result<ExecuteResult, DbError> {
1509 Err(DbError::Unsupported(
1510 "RESTORE FROM CHECKPOINT is not yet implemented — \
1511 requires pipeline stop, state reload from manifest, \
1512 source offset seek, and pipeline restart"
1513 .to_string(),
1514 ))
1515 }
1516
1517 #[must_use]
1519 pub fn get_session_property(&self, key: &str) -> Option<String> {
1520 self.session_properties
1521 .lock()
1522 .get(&key.to_lowercase())
1523 .cloned()
1524 }
1525
1526 #[must_use]
1528 pub fn session_properties(&self) -> HashMap<String, String> {
1529 self.session_properties.lock().clone()
1530 }
1531
1532 pub fn set_session_property(&self, key: &str, value: &str) {
1534 self.session_properties
1535 .lock()
1536 .insert(key.to_lowercase(), value.to_string());
1537 }
1538
1539 pub fn subscribe<T: crate::handle::FromBatch>(
1545 &self,
1546 name: &str,
1547 ) -> Result<crate::handle::TypedSubscription<T>, DbError> {
1548 let sub = self
1549 .catalog
1550 .get_stream_subscription(name)
1551 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
1552 Ok(crate::handle::TypedSubscription::from_raw(sub))
1553 }
1554
1555 #[cfg(feature = "api")]
1561 pub fn subscribe_raw(
1562 &self,
1563 name: &str,
1564 ) -> Result<laminar_core::streaming::Subscription<crate::catalog::ArrowRecord>, DbError> {
1565 self.catalog
1566 .get_stream_subscription(name)
1567 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))
1568 }
1569
1570 #[must_use]
1581 pub fn lookup_subscription_schema(
1582 &self,
1583 name: &str,
1584 ) -> Option<(arrow_schema::SchemaRef, bool)> {
1585 if let Some(mv) = self.mv_registry.lock().get(name).cloned() {
1586 return Some((mv.schema, true));
1587 }
1588 if let Some(schema) = self.stream_schemas.read().get(name).cloned() {
1589 return Some((schema, true));
1590 }
1591 if let Some(entry) = self.catalog.get_stream_entry(name) {
1592 return Some((entry.sink.schema(), false));
1593 }
1594 None
1595 }
1596
1597 pub async fn open_subscription(
1606 &self,
1607 name: &str,
1608 filter_sql: Option<&str>,
1609 start: crate::subscription::SubscribeStart,
1610 ) -> Result<crate::subscription::SubscriptionPortal, DbError> {
1611 let attached = self.subscription_registry.subscriber_count(name);
1612 if attached >= crate::subscription::MAX_SUBSCRIBERS_PER_MV {
1613 return Err(DbError::Pipeline(format!(
1614 "subscriber cap reached for '{name}' ({attached}/{})",
1615 crate::subscription::MAX_SUBSCRIBERS_PER_MV
1616 )));
1617 }
1618
1619 let (schema, filterable) = self
1620 .lookup_subscription_schema(name)
1621 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
1622
1623 let filter = match filter_sql {
1624 None => None,
1625 Some(_) if !filterable => {
1626 return Err(DbError::Pipeline(format!(
1627 "WHERE on '{name}' is not supported: stream output schema \
1628 was not resolved (likely a planner failure at start())"
1629 )));
1630 }
1631 Some(sql) => Some(crate::filter_compile::compile(&self.ctx, sql, &schema).await?),
1632 };
1633
1634 let (replay, rx) = self
1635 .subscription_registry
1636 .subscribe(name, start)
1637 .map_err(|e| {
1638 let requested = match start {
1639 crate::subscription::SubscribeStart::AsOfEpoch(n) => n,
1640 crate::subscription::SubscribeStart::Tail => 0,
1641 };
1642 DbError::InvalidOperation(format!(
1643 "epoch {requested} for stream '{name}' is no longer retained \
1644 (earliest retained is {})",
1645 e.earliest_retained
1646 ))
1647 })?;
1648 Ok(match filter {
1649 Some(phys) => crate::subscription::SubscriptionPortal::open_with_filter(
1650 name, schema, replay, rx, phys,
1651 ),
1652 None => crate::subscription::SubscriptionPortal::open(name, schema, replay, rx),
1653 })
1654 }
1655
1656 fn handle_explain(&self, statement: &StreamingStatement) -> Result<ExecuteResult, DbError> {
1658 let mut planner = self.planner.lock();
1659
1660 let plan_result = planner.plan(statement);
1662
1663 let mut rows: Vec<(String, String)> = Vec::new();
1664
1665 match plan_result {
1666 Ok(plan) => {
1667 rows.push((
1668 "plan_type".into(),
1669 match &plan {
1670 laminar_sql::planner::StreamingPlan::Query(_) => "Query",
1671 laminar_sql::planner::StreamingPlan::RegisterSource(_) => "RegisterSource",
1672 laminar_sql::planner::StreamingPlan::RegisterSink(_) => "RegisterSink",
1673 laminar_sql::planner::StreamingPlan::Standard(_) => "Standard",
1674 laminar_sql::planner::StreamingPlan::RegisterLookupTable(_) => {
1675 "RegisterLookupTable"
1676 }
1677 laminar_sql::planner::StreamingPlan::DropLookupTable { .. } => {
1678 "DropLookupTable"
1679 }
1680 }
1681 .into(),
1682 ));
1683 match &plan {
1684 laminar_sql::planner::StreamingPlan::Query(qp) => {
1685 if let Some(name) = &qp.name {
1686 rows.push(("query_name".into(), name.clone()));
1687 }
1688 if let Some(wc) = &qp.window_config {
1689 rows.push(("window".into(), format!("{wc}")));
1690 }
1691 if let Some(jcs) = &qp.join_config {
1692 if jcs.len() == 1 {
1693 rows.push(("join".into(), format!("{}", jcs[0])));
1694 } else {
1695 for (i, jc) in jcs.iter().enumerate() {
1696 rows.push((format!("join_step_{}", i + 1), format!("{jc}")));
1697 }
1698 }
1699 }
1700 if let Some(oc) = &qp.order_config {
1701 rows.push(("order_by".into(), format!("{oc:?}")));
1702 }
1703 if let Some(fc) = &qp.frame_config {
1704 rows.push((
1705 "frame_functions".into(),
1706 format!("{}", fc.functions.len()),
1707 ));
1708 }
1709 if let Some(ec) = &qp.emit_clause {
1710 rows.push(("emit".into(), format!("{ec}")));
1711 }
1712 }
1713 laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1714 rows.push(("source".into(), info.name.clone()));
1715 }
1716 laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1717 rows.push(("sink".into(), info.name.clone()));
1718 }
1719 laminar_sql::planner::StreamingPlan::Standard(_) => {
1720 rows.push(("execution".into(), "DataFusion pass-through".into()));
1721 }
1722 laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1723 rows.push(("lookup_table".into(), info.name.clone()));
1724 }
1725 laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1726 rows.push(("drop_lookup_table".into(), name.clone()));
1727 }
1728 }
1729 }
1730 Err(e) => {
1731 rows.push(("error".into(), format!("{e}")));
1733 rows.push((
1734 "statement".into(),
1735 format!("{:?}", std::mem::discriminant(statement)),
1736 ));
1737 }
1738 }
1739
1740 let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1741 let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1742
1743 let schema = Arc::new(Schema::new(vec![
1744 Field::new("plan_key", DataType::Utf8, false),
1745 Field::new("plan_value", DataType::Utf8, false),
1746 ]));
1747
1748 let batch = RecordBatch::try_new(
1749 schema,
1750 vec![
1751 Arc::new(StringArray::from(keys)),
1752 Arc::new(StringArray::from(values)),
1753 ],
1754 )
1755 .map_err(|e| DbError::InvalidOperation(format!("explain metadata: {e}")))?;
1756
1757 Ok(ExecuteResult::Metadata(batch))
1758 }
1759
1760 async fn handle_explain_analyze(
1762 &self,
1763 statement: &StreamingStatement,
1764 original_sql: &str,
1765 ) -> Result<ExecuteResult, DbError> {
1766 let explain_result = self.handle_explain(statement)?;
1768 let mut rows: Vec<(String, String)> = Vec::new();
1769
1770 if let ExecuteResult::Metadata(explain_batch) = &explain_result {
1771 let keys_col = explain_batch
1772 .column(0)
1773 .as_any()
1774 .downcast_ref::<StringArray>();
1775 let vals_col = explain_batch
1776 .column(1)
1777 .as_any()
1778 .downcast_ref::<StringArray>();
1779 if let (Some(keys), Some(vals)) = (keys_col, vals_col) {
1780 for i in 0..explain_batch.num_rows() {
1781 rows.push((keys.value(i).to_string(), vals.value(i).to_string()));
1782 }
1783 }
1784 }
1785
1786 let upper = original_sql.to_uppercase();
1788 let inner_start = upper.find("ANALYZE").map_or(0, |pos| pos + "ANALYZE".len());
1789 let inner_sql = original_sql[inner_start..].trim();
1790
1791 let start = std::time::Instant::now();
1793 match self.ctx.sql(inner_sql).await {
1794 Ok(df) => match df.collect().await {
1795 Ok(batches) => {
1796 let elapsed = start.elapsed();
1797 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1798 rows.push(("rows_produced".into(), total_rows.to_string()));
1799 rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1800 rows.push(("batches_processed".into(), batches.len().to_string()));
1801 }
1802 Err(e) => {
1803 let elapsed = start.elapsed();
1804 rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1805 rows.push(("analyze_error".into(), format!("{e}")));
1806 }
1807 },
1808 Err(e) => {
1809 rows.push(("analyze_error".into(), format!("{e}")));
1810 }
1811 }
1812
1813 let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1814 let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1815
1816 let schema = Arc::new(Schema::new(vec![
1817 Field::new("plan_key", DataType::Utf8, false),
1818 Field::new("plan_value", DataType::Utf8, false),
1819 ]));
1820
1821 let batch = RecordBatch::try_new(
1822 schema,
1823 vec![
1824 Arc::new(StringArray::from(keys)),
1825 Arc::new(StringArray::from(values)),
1826 ],
1827 )
1828 .map_err(|e| DbError::InvalidOperation(format!("explain analyze metadata: {e}")))?;
1829
1830 Ok(ExecuteResult::Metadata(batch))
1831 }
1832
1833 #[allow(clippy::too_many_lines)]
1835 pub(crate) async fn handle_query(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1836 let plan = {
1838 let statements = parse_streaming_sql(sql)?;
1839 if statements.is_empty() {
1840 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1841 }
1842 let mut planner = self.planner.lock();
1843 planner
1844 .plan(&statements[0])
1845 .map_err(laminar_sql::Error::from)?
1846 };
1847
1848 match plan {
1849 laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1850 Ok(ExecuteResult::Ddl(DdlInfo {
1851 statement_type: "DDL".to_string(),
1852 object_name: info.name,
1853 }))
1854 }
1855 laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1856 Ok(ExecuteResult::Ddl(DdlInfo {
1857 statement_type: "DDL".to_string(),
1858 object_name: info.name,
1859 }))
1860 }
1861 laminar_sql::planner::StreamingPlan::Query(query_plan) => {
1862 if let Some(asof_config) = Self::extract_asof_config(&query_plan) {
1864 return self.execute_asof_query(&asof_config, sql).await;
1865 }
1866
1867 let plan_sql = query_plan.statement.to_string();
1868 let logical_plan = self.ctx.state().create_logical_plan(&plan_sql).await?;
1869
1870 let df = self.ctx.execute_logical_plan(logical_plan).await?;
1872 let stream = df.execute_stream().await?;
1873
1874 Ok(self.bridge_query_stream(sql, stream))
1875 }
1876 laminar_sql::planner::StreamingPlan::Standard(stmt) => {
1877 let sql_str = stmt.to_string();
1879 let df = self.ctx.sql(&sql_str).await?;
1880 let stream = df.execute_stream().await?;
1881
1882 Ok(self.bridge_query_stream(sql, stream))
1883 }
1884 laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1885 self.handle_register_lookup_table(info)
1886 }
1887 laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1888 self.table_store.write().drop_table(&name);
1889 self.connector_manager.lock().unregister_table(&name);
1890 let _ = self.ctx.deregister_table(&name);
1891 self.lookup_registry.unregister(&name);
1892 self.refresh_lookup_optimizer_rule();
1893 Ok(ExecuteResult::Ddl(DdlInfo {
1894 statement_type: "DROP LOOKUP TABLE".to_string(),
1895 object_name: name,
1896 }))
1897 }
1898 }
1899 }
1900
1901 fn bridge_query_stream(
1904 &self,
1905 sql: &str,
1906 stream: datafusion::physical_plan::SendableRecordBatchStream,
1907 ) -> ExecuteResult {
1908 let query_id = self.catalog.register_query(sql);
1909 let schema = stream.schema();
1910
1911 let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1912 let (source, sink) =
1913 streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1914
1915 let subscription = sink.subscribe();
1916
1917 let cancel_token = tokio_util::sync::CancellationToken::new();
1918 let cancel_token_clone = cancel_token.clone();
1919
1920 let source_clone = source.clone();
1921 let catalog = Arc::clone(&self.catalog);
1922 let query_id_clone = query_id;
1923 tokio::spawn(async move {
1924 use tokio_stream::StreamExt;
1925 let mut stream = stream;
1926 loop {
1927 tokio::select! {
1928 () = cancel_token_clone.cancelled() => {
1929 break;
1930 }
1931 result = stream.next() => {
1932 match result {
1933 Some(Ok(batch)) => {
1934 if source_clone.push_arrow(batch).is_err() {
1935 break;
1936 }
1937 }
1938 _ => break,
1939 }
1940 }
1941 }
1942 }
1943 drop(source_clone);
1944 catalog.deactivate_query(query_id_clone);
1945 });
1946
1947 ExecuteResult::Query(QueryHandle {
1948 id: query_id,
1949 schema,
1950 sql: sql.to_string(),
1951 subscription: Some(subscription),
1952 active: true,
1953 cancel_token,
1954 })
1955 }
1956
1957 fn extract_asof_config(
1959 plan: &laminar_sql::planner::QueryPlan,
1960 ) -> Option<AsofJoinTranslatorConfig> {
1961 plan.join_config.as_ref()?.iter().find_map(|jc| {
1962 if let JoinOperatorConfig::Asof(cfg) = jc {
1963 Some(cfg.clone())
1964 } else {
1965 None
1966 }
1967 })
1968 }
1969
1970 async fn execute_asof_query(
1974 &self,
1975 asof_config: &AsofJoinTranslatorConfig,
1976 original_sql: &str,
1977 ) -> Result<ExecuteResult, DbError> {
1978 let left_sql = format!("SELECT * FROM {}", asof_config.left_table);
1979 let right_sql = format!("SELECT * FROM {}", asof_config.right_table);
1980
1981 let left_batches = self
1982 .ctx
1983 .sql(&left_sql)
1984 .await
1985 .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?
1986 .collect()
1987 .await
1988 .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?;
1989
1990 let right_batches = self
1991 .ctx
1992 .sql(&right_sql)
1993 .await
1994 .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?
1995 .collect()
1996 .await
1997 .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?;
1998
1999 let result_batch =
2000 crate::asof_batch::execute_asof_join_batch(&left_batches, &right_batches, asof_config)?;
2001
2002 if result_batch.num_rows() == 0 {
2003 let query_id = self.catalog.register_query(original_sql);
2004 self.catalog.deactivate_query(query_id);
2005 return Ok(ExecuteResult::Query(QueryHandle {
2006 id: query_id,
2007 schema: result_batch.schema(),
2008 sql: original_sql.to_string(),
2009 subscription: None,
2010 active: false,
2011 cancel_token: tokio_util::sync::CancellationToken::new(),
2012 }));
2013 }
2014
2015 let schema = result_batch.schema();
2016 let mem_table =
2017 datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![result_batch]])
2018 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2019
2020 let _ = self.ctx.deregister_table("__asof_result");
2021 self.ctx
2022 .register_table("__asof_result", Arc::new(mem_table))
2023 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2024
2025 let df = self
2026 .ctx
2027 .sql("SELECT * FROM __asof_result")
2028 .await
2029 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2030 let stream = df
2031 .execute_stream()
2032 .await
2033 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2034
2035 let _ = self.ctx.deregister_table("__asof_result");
2036
2037 Ok(self.bridge_query_stream(original_sql, stream))
2038 }
2039
2040 pub fn source<T: laminar_core::streaming::Record>(
2050 &self,
2051 name: &str,
2052 ) -> Result<SourceHandle<T>, DbError> {
2053 let entry = self
2054 .catalog
2055 .get_source(name)
2056 .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
2057 SourceHandle::new(entry)
2058 }
2059
2060 pub fn source_untyped(&self, name: &str) -> Result<UntypedSourceHandle, DbError> {
2066 let entry = self
2067 .catalog
2068 .get_source(name)
2069 .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
2070 Ok(UntypedSourceHandle::new(entry))
2071 }
2072
2073 pub fn sources(&self) -> Vec<SourceInfo> {
2075 let names = self.catalog.list_sources();
2076 names
2077 .into_iter()
2078 .filter_map(|name| {
2079 self.catalog.get_source(&name).map(|e| SourceInfo {
2080 name: e.name.clone(),
2081 schema: e.schema.clone(),
2082 watermark_column: e.watermark_column.clone(),
2083 })
2084 })
2085 .collect()
2086 }
2087
2088 pub fn sinks(&self) -> Vec<SinkInfo> {
2090 self.catalog
2091 .list_sinks()
2092 .into_iter()
2093 .map(|name| SinkInfo { name })
2094 .collect()
2095 }
2096
2097 pub fn materialized_views(&self) -> Vec<crate::handle::MaterializedViewInfo> {
2099 let registry = self.mv_registry.lock();
2100 registry
2101 .views()
2102 .map(crate::handle::MaterializedViewInfo::from)
2103 .collect()
2104 }
2105
2106 pub fn streams(&self) -> Vec<crate::handle::StreamInfo> {
2108 let mgr = self.connector_manager.lock();
2109 mgr.streams()
2110 .iter()
2111 .map(|(name, reg)| crate::handle::StreamInfo {
2112 name: name.clone(),
2113 sql: Some(reg.query_sql.clone()),
2114 })
2115 .collect()
2116 }
2117
2118 pub fn pipeline_topology(&self) -> crate::handle::PipelineTopology {
2125 use crate::handle::{PipelineEdge, PipelineNode, PipelineNodeType};
2126
2127 let mut nodes = Vec::new();
2128 let mut edges = Vec::new();
2129
2130 let source_names = self.catalog.list_sources();
2132
2133 for name in &source_names {
2135 let schema = self.catalog.get_source(name).map(|e| e.schema.clone());
2136 nodes.push(PipelineNode {
2137 name: name.clone(),
2138 node_type: PipelineNodeType::Source,
2139 schema,
2140 sql: None,
2141 });
2142 }
2143
2144 let mgr = self.connector_manager.lock();
2146 let stream_names: Vec<String> = mgr.streams().keys().cloned().collect();
2147 for (name, reg) in mgr.streams() {
2148 nodes.push(PipelineNode {
2149 name: name.clone(),
2150 node_type: PipelineNodeType::Stream,
2151 schema: None,
2152 sql: Some(reg.query_sql.clone()),
2153 });
2154
2155 let sql_upper = reg.query_sql.to_uppercase();
2159 for src in &source_names {
2160 if sql_upper.contains(&src.to_uppercase()) {
2161 edges.push(PipelineEdge {
2162 from: src.clone(),
2163 to: name.clone(),
2164 });
2165 }
2166 }
2167 for other in &stream_names {
2169 if other != name && sql_upper.contains(&other.to_uppercase()) {
2170 edges.push(PipelineEdge {
2171 from: other.clone(),
2172 to: name.clone(),
2173 });
2174 }
2175 }
2176 }
2177
2178 for (name, reg) in mgr.sinks() {
2180 nodes.push(PipelineNode {
2181 name: name.clone(),
2182 node_type: PipelineNodeType::Sink,
2183 schema: None,
2184 sql: None,
2185 });
2186
2187 if !reg.input.is_empty() {
2189 edges.push(PipelineEdge {
2190 from: reg.input.clone(),
2191 to: name.clone(),
2192 });
2193 }
2194 }
2195
2196 let cm_sink_names: std::collections::HashSet<&String> = mgr.sinks().keys().collect();
2199 for name in self.catalog.list_sinks() {
2200 if !cm_sink_names.contains(&name) {
2201 if let Some(input) = self.catalog.get_sink_input(&name) {
2203 nodes.push(PipelineNode {
2204 name: name.clone(),
2205 node_type: PipelineNodeType::Sink,
2206 schema: None,
2207 sql: None,
2208 });
2209 if !input.is_empty() {
2210 edges.push(PipelineEdge {
2211 from: input,
2212 to: name,
2213 });
2214 }
2215 }
2216 }
2217 }
2218
2219 drop(mgr);
2220
2221 crate::handle::PipelineTopology { nodes, edges }
2222 }
2223
2224 pub fn queries(&self) -> Vec<QueryInfo> {
2226 self.catalog
2227 .list_queries()
2228 .into_iter()
2229 .map(|(id, sql, active)| QueryInfo { id, sql, active })
2230 .collect()
2231 }
2232
2233 #[must_use]
2235 pub fn is_checkpoint_enabled(&self) -> bool {
2236 self.config.checkpoint.is_some()
2237 }
2238
2239 pub fn checkpoint_store(&self) -> Option<Box<dyn laminar_core::storage::CheckpointStore>> {
2245 let cp_config = self.config.checkpoint.as_ref()?;
2246 let max_retained = cp_config.max_retained.unwrap_or(3);
2247 let vnode_count = self.vnode_registry.lock().as_ref().map_or(
2250 laminar_core::storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
2251 |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
2252 );
2253
2254 if let Some(ref url) = self.config.object_store_url {
2255 let obj_store = laminar_core::storage::object_store_builder::build_object_store(
2257 url,
2258 &self.config.object_store_options,
2259 )
2260 .ok()?;
2261 Some(Box::new(
2262 laminar_core::storage::checkpoint_store::ObjectStoreCheckpointStore::new(
2263 obj_store,
2264 String::new(),
2265 max_retained,
2266 )
2267 .with_vnode_count(vnode_count),
2268 ))
2269 } else {
2270 let data_dir = cp_config
2271 .data_dir
2272 .clone()
2273 .or_else(|| self.config.storage_dir.clone())
2274 .unwrap_or_else(|| std::path::PathBuf::from("./data"));
2275 Some(Box::new(
2276 laminar_core::storage::checkpoint_store::FileSystemCheckpointStore::new(
2277 &data_dir,
2278 max_retained,
2279 )
2280 .with_vnode_count(vnode_count),
2281 ))
2282 }
2283 }
2284
2285 pub async fn checkpoint(
2298 &self,
2299 ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
2300 if self.config.checkpoint.is_none() {
2301 return Err(DbError::Checkpoint(
2302 "checkpointing is not enabled".to_string(),
2303 ));
2304 }
2305
2306 #[cfg(feature = "cluster")]
2307 {
2308 let leader_opt = {
2309 let cc_guard = self.cluster_controller.lock();
2310 cc_guard.as_ref().and_then(|cc| {
2311 if cc.is_leader() {
2312 None
2313 } else {
2314 cc.current_leader().and_then(|leader_id| {
2315 let watch = cc.members_watch();
2316 let members = watch.borrow();
2317 members
2318 .iter()
2319 .find(|m| m.id == leader_id)
2320 .map(|m| m.rpc_address.clone())
2321 })
2322 }
2323 })
2324 };
2325 if let Some(leader_rpc) = leader_opt {
2326 tracing::info!(
2327 "Forwarding checkpoint request to leader node at HTTP address {}",
2328 leader_rpc
2329 );
2330 return self.forward_checkpoint_to_leader(&leader_rpc).await;
2331 }
2332 }
2333
2334 let tx = self.force_ckpt_tx.lock().clone();
2340 let result = if let Some(tx) = tx {
2341 let (reply_tx, reply_rx) = crossfire::oneshot::oneshot();
2342 tx.send(reply_tx).await.map_err(|_| {
2343 DbError::Checkpoint(
2344 "pipeline callback receiver closed — engine may be shutting down".into(),
2345 )
2346 })?;
2347 reply_rx.await.map_err(|_| {
2348 DbError::Checkpoint("pipeline callback dropped oneshot before replying".into())
2349 })?
2350 } else {
2351 let mut guard = self.coordinator.lock().await;
2356 let coord = guard.as_mut().ok_or_else(|| {
2357 DbError::Checkpoint("coordinator not initialized — call start() first".to_string())
2358 })?;
2359 coord
2360 .checkpoint(crate::checkpoint_coordinator::CheckpointRequest::default())
2361 .await
2362 };
2363
2364 #[cfg(feature = "cluster")]
2367 if matches!(&result, Ok(r) if r.success) {
2368 self.persist_catalog_manifest().await;
2369 }
2370
2371 result
2372 }
2373
2374 #[cfg(feature = "cluster")]
2375 async fn forward_checkpoint_to_leader(
2376 &self,
2377 addr: &str,
2378 ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
2379 #[derive(serde::Deserialize)]
2380 struct ForwardedCheckpointResponse {
2381 success: bool,
2382 checkpoint_id: u64,
2383 epoch: u64,
2384 duration_ms: u64,
2385 error: Option<String>,
2386 }
2387
2388 let mut req = reqwest::Client::new()
2389 .post(format!("http://{addr}/api/v1/checkpoint"))
2390 .timeout(std::time::Duration::from_secs(10));
2391 if let Some(token) = &self.config.http_auth_token {
2392 req = req.bearer_auth(token.expose());
2393 }
2394 let resp = req.send().await.map_err(|e| {
2395 DbError::Checkpoint(format!(
2396 "failed to forward checkpoint to leader at {addr}: {e}"
2397 ))
2398 })?;
2399
2400 let status = resp.status();
2401 let body = resp.text().await.map_err(|e| {
2402 DbError::Checkpoint(format!("failed to read leader checkpoint response: {e}"))
2403 })?;
2404
2405 match serde_json::from_str::<ForwardedCheckpointResponse>(&body) {
2411 Ok(response) => Ok(crate::checkpoint_coordinator::CheckpointResult {
2412 success: response.success,
2413 checkpoint_id: response.checkpoint_id,
2414 epoch: response.epoch,
2415 duration: std::time::Duration::from_millis(response.duration_ms),
2416 error: response.error,
2417 }),
2418 Err(_) => Err(DbError::Checkpoint(format!(
2419 "leader rejected checkpoint ({status}): {body}"
2420 ))),
2421 }
2422 }
2423
2424 pub async fn checkpoint_stats(&self) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
2428 let guard = self.coordinator.lock().await;
2429 guard
2430 .as_ref()
2431 .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
2432 }
2433}
2434
2435impl std::fmt::Debug for LaminarDB {
2436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2437 f.debug_struct("LaminarDB")
2438 .field("sources", &self.catalog.list_sources().len())
2439 .field("sinks", &self.catalog.list_sinks().len())
2440 .field("materialized_views", &self.mv_registry.lock().len())
2441 .field("checkpoint_enabled", &self.is_checkpoint_enabled())
2442 .field("shutdown", &self.is_closed())
2443 .finish_non_exhaustive()
2444 }
2445}
2446
2447struct LookupQueryPlanner {
2449 extension_planner: Arc<dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync>,
2450}
2451
2452impl std::fmt::Debug for LookupQueryPlanner {
2453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2454 f.debug_struct("LookupQueryPlanner").finish_non_exhaustive()
2455 }
2456}
2457
2458#[async_trait::async_trait]
2459impl datafusion::execution::context::QueryPlanner for LookupQueryPlanner {
2460 async fn create_physical_plan(
2461 &self,
2462 logical_plan: &datafusion::logical_expr::LogicalPlan,
2463 session_state: &datafusion::execution::SessionState,
2464 ) -> datafusion_common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
2465 use datafusion::physical_planner::PhysicalPlanner;
2466 let planner =
2467 datafusion::physical_planner::DefaultPhysicalPlanner::with_extension_planners(vec![
2468 Arc::clone(&self.extension_planner),
2469 ]);
2470 planner
2471 .create_physical_plan(logical_plan, session_state)
2472 .await
2473 }
2474}
2475
2476#[cfg(feature = "cluster")]
2479struct DbQueryHandler {
2480 mv_store: std::sync::Weak<parking_lot::RwLock<crate::mv_store::MvStore>>,
2481 table_store: std::sync::Weak<parking_lot::RwLock<crate::table_store::TableStore>>,
2482 filter_ctx: SessionContext,
2485}
2486
2487#[cfg(feature = "cluster")]
2488#[async_trait::async_trait]
2489impl laminar_core::cluster::control::RemoteQueryHandler for DbQueryHandler {
2490 async fn remote_scan(
2491 &self,
2492 table_name: &str,
2493 projection: Option<Vec<usize>>,
2494 filter_sql: Option<String>,
2495 ) -> Result<arrow::array::RecordBatch, String> {
2496 let batch = self
2497 .mv_store
2498 .upgrade()
2499 .and_then(|s| s.read().to_record_batch(table_name))
2500 .or_else(|| {
2501 self.table_store
2502 .upgrade()
2503 .and_then(|s| s.read().to_record_batch(table_name))
2504 })
2505 .ok_or_else(|| format!("table '{table_name}' not found"))?;
2506
2507 let batch = match filter_sql {
2510 Some(sql) => {
2511 let schema = batch.schema();
2512 match crate::filter_compile::compile(&self.filter_ctx, &sql, &schema).await {
2513 Ok(expr) => match crate::filter_compile::apply(&batch, expr.as_ref()) {
2514 Ok(Some(filtered)) => filtered,
2515 Ok(None) => arrow::array::RecordBatch::new_empty(schema),
2516 Err(e) => {
2517 tracing::debug!(table = table_name, error = %e,
2518 "remote_scan: skipping pushed filter (apply failed)");
2519 batch
2520 }
2521 },
2522 Err(e) => {
2523 tracing::debug!(table = table_name, error = %e,
2524 "remote_scan: skipping pushed filter (compile failed)");
2525 batch
2526 }
2527 }
2528 }
2529 None => batch,
2530 };
2531
2532 match projection {
2533 Some(proj) => batch.project(&proj).map_err(|e| e.to_string()),
2534 None => Ok(batch),
2535 }
2536 }
2537}
2538
2539#[cfg(test)]
2540mod tests;