Skip to main content

laminar_db/
db.rs

1//! The main `LaminarDB` database facade.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow::array::{RecordBatch, StringArray};
8use arrow::datatypes::{DataType, Field, Schema};
9use datafusion::prelude::SessionContext;
10use laminar_core::streaming;
11use laminar_sql::parser::{parse_streaming_sql, ShowCommand, StreamingStatement};
12use laminar_sql::planner::StreamingPlanner;
13use laminar_sql::register_streaming_functions;
14use laminar_sql::translator::{AsofJoinTranslatorConfig, JoinOperatorConfig};
15
16use crate::builder::LaminarDbBuilder;
17use crate::catalog::SourceCatalog;
18use crate::config::LaminarConfig;
19use crate::error::DbError;
20use crate::handle::{
21    DdlInfo, ExecuteResult, QueryHandle, QueryInfo, SinkInfo, SourceHandle, SourceInfo,
22    UntypedSourceHandle,
23};
24use crate::pipeline::ControlMsg;
25use crate::sql_utils;
26
27/// Cloneable async sender for the live-DDL control channel.
28pub(crate) type ControlMsgTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ControlMsg>>;
29
30/// Lifecycle state of a [`LaminarDB`] instance, stored as `AtomicU8`.
31#[repr(u8)]
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) enum DbState {
34    Created = 0,
35    Starting = 1,
36    Running = 2,
37    ShuttingDown = 3,
38    Stopped = 4,
39}
40
41impl DbState {
42    pub(crate) fn from_u8(raw: u8) -> Option<Self> {
43        Some(match raw {
44            0 => Self::Created,
45            1 => Self::Starting,
46            2 => Self::Running,
47            3 => Self::ShuttingDown,
48            4 => Self::Stopped,
49            _ => return None,
50        })
51    }
52
53    pub(crate) fn load(atomic: &std::sync::atomic::AtomicU8) -> Self {
54        Self::from_u8(atomic.load(std::sync::atomic::Ordering::Acquire)).unwrap_or(Self::Stopped)
55    }
56
57    pub(crate) fn store(self, atomic: &std::sync::atomic::AtomicU8) {
58        atomic.store(self as u8, std::sync::atomic::Ordering::Release);
59    }
60
61    /// Atomically transition `current -> new`; returns the observed state on
62    /// failure so callers can claim an exclusive transition.
63    pub(crate) fn compare_exchange(
64        current: Self,
65        new: Self,
66        atomic: &std::sync::atomic::AtomicU8,
67    ) -> Result<Self, Self> {
68        use std::sync::atomic::Ordering;
69        atomic
70            .compare_exchange(
71                current as u8,
72                new as u8,
73                Ordering::AcqRel,
74                Ordering::Acquire,
75            )
76            .map(|v| Self::from_u8(v).unwrap_or(Self::Stopped))
77            .map_err(|v| Self::from_u8(v).unwrap_or(Self::Stopped))
78    }
79}
80
81fn cache_entries_from_memory(mem: laminar_sql::parser::lookup_table::ByteSize) -> usize {
82    (mem.as_bytes() / 256).max(1024) as usize
83}
84
85/// The main `LaminarDB` database handle.
86///
87/// Provides a unified interface for SQL execution, data ingestion,
88/// and result consumption. All streaming infrastructure (sources, sinks,
89/// channels, subscriptions) is managed internally.
90pub struct LaminarDB {
91    pub(crate) catalog: Arc<SourceCatalog>,
92    pub(crate) planner: parking_lot::Mutex<StreamingPlanner>,
93    pub(crate) ctx: SessionContext,
94    pub(crate) config: LaminarConfig,
95    pub(crate) config_vars: Arc<HashMap<String, String>>,
96    pub(crate) shutdown: std::sync::atomic::AtomicBool,
97    /// Populated by `start()`.
98    pub(crate) coordinator:
99        Arc<tokio::sync::Mutex<Option<crate::checkpoint_coordinator::CheckpointCoordinator>>>,
100    pub(crate) connector_manager: parking_lot::Mutex<crate::connector_manager::ConnectorManager>,
101    pub(crate) connector_registry: Arc<laminar_connectors::registry::ConnectorRegistry>,
102    pub(crate) mv_registry: parking_lot::Mutex<laminar_core::mv::MvRegistry>,
103    pub(crate) table_store: Arc<parking_lot::RwLock<crate::table_store::TableStore>>,
104    pub(crate) state: Arc<std::sync::atomic::AtomicU8>,
105    pub(crate) runtime_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
106    pub(crate) shutdown_signal: Arc<tokio::sync::Notify>,
107    pub(crate) engine_metrics:
108        parking_lot::Mutex<Option<Arc<crate::engine_metrics::EngineMetrics>>>,
109    pub(crate) prometheus_registry: parking_lot::Mutex<Option<Arc<prometheus::Registry>>>,
110    pub(crate) start_time: std::time::Instant,
111    pub(crate) session_properties: parking_lot::Mutex<HashMap<String, String>>,
112    /// Min of all source watermarks.
113    pub(crate) pipeline_watermark: Arc<std::sync::atomic::AtomicI64>,
114    /// The registry of lookup table snapshots.
115    pub(crate) lookup_registry: Arc<laminar_sql::datafusion::LookupTableRegistry>,
116    /// Assembled AI subsystem (registry + providers + cache + call log).
117    /// `None` unless `[ai]`/`[models]` are configured. Set once in the builder.
118    pub(crate) ai_runtime: Option<Arc<crate::ai::AiRuntime>>,
119    /// Main runtime handle the AI inference workers spawn on. Set with `ai_runtime`.
120    pub(crate) ai_handle: Option<tokio::runtime::Handle>,
121    /// Live-DDL channel to the running coordinator. `None` outside `start..shutdown`.
122    pub(crate) control_tx: parking_lot::Mutex<Option<ControlMsgTx>>,
123    pub(crate) mv_store: Arc<parking_lot::RwLock<crate::mv_store::MvStore>>,
124    /// Activates leader/follower checkpoint flow when set. `None` in embedded mode.
125    #[cfg(feature = "cluster")]
126    pub(crate) cluster_controller:
127        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::ClusterController>>>,
128    /// Pair with `vnode_registry`; the coordinator writes per-vnode durability
129    /// markers each checkpoint and runs the gate when both are installed.
130    pub(crate) state_backend:
131        parking_lot::Mutex<Option<Arc<dyn laminar_core::state::StateBackend>>>,
132    pub(crate) vnode_registry: parking_lot::Mutex<Option<Arc<laminar_core::state::VnodeRegistry>>>,
133    /// Applied to both `self.ctx` and the pipeline-side `OperatorGraph` context.
134    pub(crate) physical_optimizer_rules:
135        Arc<[Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>]>,
136    /// `target_partitions` override; cluster mode sets this to `vnode_count`.
137    pub(crate) pipeline_target_partitions: Option<usize>,
138    /// Used by `SqlQueryOperator` to row-shuffle pre-aggregate batches to owners.
139    #[cfg(feature = "cluster")]
140    pub(crate) shuffle_sender:
141        parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleSender>>>,
142    /// `Arc`-wrapped so the subscription-router task can hold a weak handle and
143    /// read the receiver lazily (installed after the controller).
144    #[cfg(feature = "cluster")]
145    pub(crate) shuffle_receiver:
146        Arc<parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleReceiver>>>>,
147    #[cfg(feature = "cluster")]
148    pub(crate) decision_store:
149        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::CheckpointDecisionStore>>>,
150    #[cfg(feature = "cluster")]
151    pub(crate) assignment_snapshot_store:
152        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>>,
153    /// Cluster-wide catalog manifest store (`catalog/manifest.json` on the
154    /// shared object store). Persisted on each successful checkpoint and
155    /// replayed at boot so a node rebuilds MVs/sources it lacks locally.
156    #[cfg(feature = "cluster")]
157    pub(crate) catalog_manifest_store:
158        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::CatalogManifestStore>>>,
159    /// Committed per-vnode state staged by [`Self::adopt_assignment_snapshot`]
160    /// for vnodes this node newly acquired in a rebalance. Operators that
161    /// adopt the per-vnode state backend drain this on their next cycle to
162    /// resume from the last committed epoch instead of empty state.
163    ///
164    /// `Arc`-wrapped so the same handle is shared with the `OperatorGraph`
165    /// (via [`ClusterShuffleConfig`](crate::operator::sql_query::ClusterShuffleConfig)),
166    /// which drains and applies the staged slices into live operators each cycle.
167    #[cfg(feature = "cluster")]
168    pub(crate) rehydrated_vnode_state: Arc<parking_lot::Mutex<HashMap<u32, RehydratedVnode>>>,
169    /// Hands `db.checkpoint()` requests to the pipeline callback so it can capture
170    /// operator state before the manifest is packed. When `None`, falls back to
171    /// the direct coordinator path — only valid for stateless engines.
172    pub(crate) force_ckpt_tx: parking_lot::Mutex<Option<ForceCheckpointTx>>,
173    pub(crate) subscription_registry: Arc<crate::subscription::SubscriptionRegistry>,
174    /// Cluster mode: stream/MV name → subscribing node ids, refreshed from
175    /// gossip by the router task and read each cycle by producers.
176    #[cfg(feature = "cluster")]
177    pub(crate) active_subs:
178        Arc<parking_lot::RwLock<std::collections::HashMap<String, std::collections::HashSet<u64>>>>,
179    /// Stream output schemas resolved at `start()`; consulted by SUBSCRIBE WHERE.
180    pub(crate) stream_schemas:
181        parking_lot::RwLock<std::collections::HashMap<String, arrow_schema::SchemaRef>>,
182}
183
184/// Reply channel for a single `db.checkpoint()` request.
185pub(crate) type ForceCheckpointReply =
186    crossfire::oneshot::TxOneshot<Result<crate::checkpoint_coordinator::CheckpointResult, DbError>>;
187
188/// `db.checkpoint()` → pipeline callback request channel. Cold path; the cap
189/// is generous enough that callers never wait.
190pub(crate) type ForceCheckpointTx =
191    crossfire::MAsyncTx<crossfire::mpsc::Array<ForceCheckpointReply>>;
192
193pub(crate) type ForceCheckpointRx =
194    crossfire::AsyncRx<crossfire::mpsc::Array<ForceCheckpointReply>>;
195
196pub(crate) const FORCE_CHECKPOINT_CHANNEL_CAPACITY: usize = 64;
197
198/// Subscription-router timer period. Drains run every tick, so this bounds
199/// cross-node SUBSCRIBE delivery latency.
200#[cfg(feature = "cluster")]
201const SUB_ROUTER_TICK: std::time::Duration = std::time::Duration::from_millis(10);
202/// Refresh the gossip interest cache ~every 500ms while subscriptions exist.
203#[cfg(feature = "cluster")]
204const SUB_REFRESH_ACTIVE_TICKS: u64 = 50;
205/// Back off to ~5s when there is no subscription activity anywhere.
206#[cfg(feature = "cluster")]
207const SUB_REFRESH_IDLE_TICKS: u64 = 500;
208
209pub(crate) struct SourceWatermarkState {
210    pub(crate) extractor: laminar_core::time::EventTimeExtractor,
211    pub(crate) generator: Box<dyn laminar_core::time::WatermarkGenerator>,
212    pub(crate) column: String,
213}
214
215/// Keep rows at/after the watermark. `Ok(None)` = all rows late;
216/// `Err` = schema drift (missing/non-timestamp column).
217pub(crate) fn filter_late_rows(
218    batch: &RecordBatch,
219    column: &str,
220    watermark: i64,
221) -> Result<Option<RecordBatch>, laminar_core::time::FilterError> {
222    laminar_core::time::filter_batch_by_timestamp(
223        batch,
224        column,
225        watermark,
226        laminar_core::time::ThresholdOp::GreaterEq,
227    )
228}
229
230pub(crate) use laminar_core::time::parse_duration_str;
231
232/// Committed state for a single vnode, staged during rebalance adoption
233/// so newly-acquired vnodes resume from the last committed epoch.
234#[cfg(feature = "cluster")]
235#[derive(Debug, Clone)]
236pub struct RehydratedVnode {
237    /// Committed epoch the partial was read from.
238    pub epoch: u64,
239    /// The vnode's `partial.bin` bytes at `epoch`.
240    pub bytes: bytes::Bytes,
241}
242
243/// Summary of a single [`LaminarDB::adopt_assignment_snapshot`] call.
244///
245/// Returned so the rebalance control plane can log and meter what each
246/// adoption moved — in particular the vnodes this node gained and how
247/// much of their committed state it rehydrated from durable storage.
248#[cfg(feature = "cluster")]
249#[derive(Debug, Default)]
250pub struct SnapshotAdoption {
251    /// `false` when the snapshot was stale (≤ the current registry
252    /// version) or no registry was installed — nothing changed.
253    pub adopted: bool,
254    /// The snapshot version this call considered.
255    pub version: u64,
256    /// Vnodes this node owns now but did not before this rotation.
257    pub newly_acquired: Vec<u32>,
258    /// How many of `newly_acquired` had committed state read back.
259    pub rehydrated: usize,
260    /// Committed epoch the rehydration read from, if any.
261    pub rehydration_epoch: Option<u64>,
262}
263
264impl LaminarDB {
265    /// Create an embedded in-memory database with default settings.
266    ///
267    /// # Errors
268    ///
269    /// Returns `DbError` if `DataFusion` context creation fails.
270    pub fn open() -> Result<Self, DbError> {
271        Self::open_with_config(LaminarConfig::default())
272    }
273
274    /// Create with custom configuration.
275    ///
276    /// # Errors
277    ///
278    /// Returns `DbError` if `DataFusion` context creation fails.
279    pub fn open_with_config(config: LaminarConfig) -> Result<Self, DbError> {
280        Self::open_with_config_and_vars(config, HashMap::new())
281    }
282
283    /// Create with custom configuration and config variables for SQL substitution.
284    ///
285    /// # Errors
286    ///
287    /// Returns `DbError` if `DataFusion` context creation fails.
288    #[allow(clippy::unnecessary_wraps)]
289    pub(crate) fn open_with_config_and_vars(
290        config: LaminarConfig,
291        config_vars: HashMap<String, String>,
292    ) -> Result<Self, DbError> {
293        Self::open_with_config_and_vars_and_rules(config, config_vars, &[], None)
294    }
295
296    /// Same as [`Self::open_with_config_and_vars`] but also installs
297    /// the given physical-optimizer rules on the `DataFusion` session.
298    #[allow(clippy::unnecessary_wraps)]
299    pub(crate) fn open_with_config_and_vars_and_rules(
300        config: LaminarConfig,
301        config_vars: HashMap<String, String>,
302        extra_optimizer_rules: &[Arc<
303            dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
304        >],
305        target_partitions: Option<usize>,
306    ) -> Result<Self, DbError> {
307        // One-time crossfire backoff tuning. No-op on multi-core; on single-core
308        // VMs this swaps spin-loops for yields (~2x channel throughput).
309        // Idempotent via an internal atomic — safe to call on every instance.
310        crossfire::detect_backoff_cfg();
311
312        let lookup_registry = Arc::new(laminar_sql::datafusion::LookupTableRegistry::new());
313
314        // Build a SessionContext with the LookupJoinExtensionPlanner wired
315        // into the physical planner so LookupJoinNode → LookupJoinExec works.
316        let ctx = {
317            let mut session_config = laminar_sql::datafusion::base_session_config();
318            if let Some(n) = target_partitions {
319                session_config = session_config.with_target_partitions(n);
320            }
321            let extension_planner: Arc<
322                dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync,
323            > = Arc::new(laminar_sql::datafusion::LookupJoinExtensionPlanner::new(
324                Arc::clone(&lookup_registry),
325            ));
326            let query_planner: Arc<dyn datafusion::execution::context::QueryPlanner + Send + Sync> =
327                Arc::new(LookupQueryPlanner { extension_planner });
328            let mut state_builder = datafusion::execution::SessionStateBuilder::new()
329                .with_config(session_config)
330                .with_default_features()
331                .with_query_planner(query_planner);
332            #[cfg(feature = "cluster")]
333            {
334                state_builder = state_builder.with_physical_optimizer_rule(Arc::new(
335                    laminar_sql::datafusion::cluster_repartition::DistributedJoinRule,
336                ));
337            }
338            for rule in extra_optimizer_rules {
339                state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
340            }
341            SessionContext::new_with_state(state_builder.build())
342        };
343        register_streaming_functions(&ctx);
344
345        let catalog = Arc::new(SourceCatalog::new(
346            config.default_buffer_size,
347            config.default_backpressure,
348        ));
349
350        let connector_registry = Arc::new(laminar_connectors::registry::ConnectorRegistry::new());
351        Self::register_builtin_connectors(&connector_registry);
352        #[cfg(feature = "cluster")]
353        let mut physical_rules = extra_optimizer_rules.to_vec();
354        #[cfg(feature = "cluster")]
355        {
356            physical_rules.push(Arc::new(
357                laminar_sql::datafusion::cluster_repartition::DistributedJoinRule,
358            ));
359        }
360        #[cfg(not(feature = "cluster"))]
361        let physical_rules = extra_optimizer_rules.to_vec();
362
363        Ok(Self {
364            catalog,
365            planner: parking_lot::Mutex::new(StreamingPlanner::new()),
366            ctx,
367            config,
368            config_vars: Arc::new(config_vars),
369            shutdown: std::sync::atomic::AtomicBool::new(false),
370            coordinator: Arc::new(tokio::sync::Mutex::new(None)),
371            connector_manager: parking_lot::Mutex::new(
372                crate::connector_manager::ConnectorManager::new(),
373            ),
374            connector_registry,
375            mv_registry: parking_lot::Mutex::new(laminar_core::mv::MvRegistry::new()),
376            table_store: Arc::new(parking_lot::RwLock::new(
377                crate::table_store::TableStore::new(),
378            )),
379            state: Arc::new(std::sync::atomic::AtomicU8::new(DbState::Created as u8)),
380            runtime_handle: parking_lot::Mutex::new(None),
381            shutdown_signal: Arc::new(tokio::sync::Notify::new()),
382            engine_metrics: parking_lot::Mutex::new(None),
383            prometheus_registry: parking_lot::Mutex::new(None),
384            start_time: std::time::Instant::now(),
385            session_properties: parking_lot::Mutex::new(HashMap::new()),
386            pipeline_watermark: Arc::new(std::sync::atomic::AtomicI64::new(i64::MIN)),
387            lookup_registry,
388            ai_runtime: None,
389            ai_handle: None,
390            control_tx: parking_lot::Mutex::new(None),
391            mv_store: Arc::new(parking_lot::RwLock::new(crate::mv_store::MvStore::new())),
392            #[cfg(feature = "cluster")]
393            cluster_controller: parking_lot::Mutex::new(None),
394            state_backend: parking_lot::Mutex::new(None),
395            vnode_registry: parking_lot::Mutex::new(None),
396            physical_optimizer_rules: physical_rules.into(),
397            pipeline_target_partitions: target_partitions,
398            #[cfg(feature = "cluster")]
399            shuffle_sender: parking_lot::Mutex::new(None),
400            #[cfg(feature = "cluster")]
401            shuffle_receiver: Arc::new(parking_lot::Mutex::new(None)),
402            #[cfg(feature = "cluster")]
403            decision_store: parking_lot::Mutex::new(None),
404            #[cfg(feature = "cluster")]
405            assignment_snapshot_store: parking_lot::Mutex::new(None),
406            #[cfg(feature = "cluster")]
407            catalog_manifest_store: parking_lot::Mutex::new(None),
408            #[cfg(feature = "cluster")]
409            rehydrated_vnode_state: Arc::new(parking_lot::Mutex::new(HashMap::new())),
410            force_ckpt_tx: parking_lot::Mutex::new(None),
411            subscription_registry: Arc::new(crate::subscription::SubscriptionRegistry::new()),
412            #[cfg(feature = "cluster")]
413            active_subs: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
414            stream_schemas: parking_lot::RwLock::new(std::collections::HashMap::new()),
415        })
416    }
417
418    /// Install the AI subsystem and the runtime handle its inference workers
419    /// spawn on. Called by the builder before the engine is shared; the handle
420    /// must be the main multi-threaded runtime.
421    pub(crate) fn set_ai_runtime(
422        &mut self,
423        runtime: Arc<crate::ai::AiRuntime>,
424        handle: tokio::runtime::Handle,
425    ) {
426        // Register the laminar.models / laminar.ai_calls catalog views. A
427        // failure here is non-fatal — inference still works, the views just
428        // aren't queryable.
429        if let Err(e) = crate::ai_catalog::register_ai_catalog(&self.ctx, &runtime) {
430            tracing::warn!(error = %e, "failed to register laminar.* AI catalog views");
431        }
432        self.ai_runtime = Some(runtime);
433        self.ai_handle = Some(handle);
434    }
435
436    #[cfg(feature = "cluster")]
437    pub(crate) fn set_shuffle_sender(&self, sender: Arc<laminar_core::shuffle::ShuffleSender>) {
438        *self.shuffle_sender.lock() = Some(sender);
439        self.update_sql_cluster_context();
440    }
441
442    #[cfg(feature = "cluster")]
443    pub(crate) fn set_shuffle_receiver(
444        &self,
445        receiver: Arc<laminar_core::shuffle::ShuffleReceiver>,
446    ) {
447        *self.shuffle_receiver.lock() = Some(receiver);
448        self.update_sql_cluster_context();
449    }
450
451    #[cfg(feature = "cluster")]
452    fn update_sql_cluster_context(&self) {
453        if let (Some(registry), Some(sender), Some(receiver)) = (
454            self.vnode_registry.lock().as_ref(),
455            self.shuffle_sender.lock().as_ref(),
456            self.shuffle_receiver.lock().as_ref(),
457        ) {
458            let self_id = self
459                .cluster_controller
460                .lock()
461                .as_ref()
462                .map_or(laminar_core::state::NodeId(0), |c| {
463                    laminar_core::state::NodeId(c.instance_id().0)
464                });
465            laminar_sql::datafusion::cluster_repartition::set_cluster_context(
466                Arc::clone(registry),
467                Arc::clone(sender),
468                Arc::clone(receiver),
469                self_id,
470            );
471        }
472    }
473
474    #[cfg(feature = "cluster")]
475    pub(crate) fn set_decision_store(
476        &self,
477        store: Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
478    ) {
479        *self.decision_store.lock() = Some(store);
480    }
481
482    #[cfg(feature = "cluster")]
483    pub(crate) fn set_assignment_snapshot_store(
484        &self,
485        store: Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
486    ) {
487        *self.assignment_snapshot_store.lock() = Some(store);
488    }
489
490    /// Install the shared catalog manifest store. Enables catalog-manifest
491    /// persistence (on checkpoint) and boot-time replay.
492    #[cfg(feature = "cluster")]
493    pub(crate) fn set_catalog_manifest_store(
494        &self,
495        store: Arc<laminar_core::cluster::control::CatalogManifestStore>,
496    ) {
497        *self.catalog_manifest_store.lock() = Some(store);
498    }
499
500    /// Publish this node's current catalog DDL to the shared
501    /// `catalog/manifest.json`. Best-effort and cluster-only — a failure is
502    /// logged, never propagated, since the manifest is an availability aid,
503    /// not a correctness gate. Called after each successful checkpoint.
504    #[cfg(feature = "cluster")]
505    pub(crate) async fn persist_catalog_manifest(&self) {
506        let Some(store) = self.catalog_manifest_store.lock().clone() else {
507            return;
508        };
509        let entries: Vec<laminar_core::cluster::control::CatalogManifestEntry> = self
510            .connector_manager
511            .lock()
512            .ordered_ddl()
513            .into_iter()
514            .map(|(name, ddl)| laminar_core::cluster::control::CatalogManifestEntry { name, ddl })
515            .collect();
516        #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
517        let now_ms = std::time::SystemTime::now()
518            .duration_since(std::time::UNIX_EPOCH)
519            .map_or(0, |d| d.as_millis() as i64);
520        let manifest = laminar_core::cluster::control::CatalogManifest {
521            // Wall-clock as a monotonic-in-practice diagnostic version; the
522            // object is overwritten in place, so the value is informational.
523            version: now_ms.unsigned_abs(),
524            updated_at_ms: now_ms,
525            entries,
526        };
527        if let Err(e) = store.save(&manifest).await {
528            tracing::warn!(error = %e, "catalog manifest persist failed");
529        }
530    }
531
532    /// Replay catalog DDL from the shared `catalog/manifest.json`, recreating
533    /// any object this node doesn't already have locally. Best-effort and
534    /// cluster-only; runs at boot before the pipeline starts so the operator
535    /// graph for rebalanced-in MVs exists. A per-entry replay failure is
536    /// logged and skipped — the node still serves the objects that did rebuild.
537    #[cfg(feature = "cluster")]
538    pub(crate) async fn restore_catalog_from_manifest(&self) {
539        let Some(store) = self.catalog_manifest_store.lock().clone() else {
540            return;
541        };
542        let manifest = match store.load().await {
543            Ok(Some(m)) => m,
544            Ok(None) => return,
545            Err(e) => {
546                tracing::warn!(error = %e, "catalog manifest load failed — skipping replay");
547                return;
548            }
549        };
550        // Detach the manifest store during replay: `execute` otherwise
551        // re-persists this node's still-partial catalog after every DDL, so a
552        // mid-replay failure would truncate the shared manifest. Reinstalled below.
553        let detached = self.catalog_manifest_store.lock().take();
554        for entry in &manifest.entries {
555            if self.catalog_object_exists(&entry.name) {
556                continue;
557            }
558            match self.execute(&entry.ddl).await {
559                Ok(_) => tracing::info!(name = %entry.name, "replayed catalog DDL from manifest"),
560                Err(e) => tracing::warn!(
561                    name = %entry.name, error = %e,
562                    "catalog manifest replay failed for object"
563                ),
564            }
565        }
566        *self.catalog_manifest_store.lock() = detached;
567    }
568
569    /// Whether a catalog object with `name` is already registered locally
570    /// (any of source/sink/stream/table). Drives idempotent manifest replay.
571    #[cfg(feature = "cluster")]
572    fn catalog_object_exists(&self, name: &str) -> bool {
573        let mgr = self.connector_manager.lock();
574        mgr.sources().contains_key(name)
575            || mgr.sinks().contains_key(name)
576            || mgr.streams().contains_key(name)
577            || mgr.tables().contains_key(name)
578    }
579
580    /// Adopt a new vnode assignment snapshot atomically across the registry,
581    /// state-backend fence, and coordinator, then rehydrate committed state
582    /// for any vnodes this node newly acquired. Idempotent for versions ≤ the
583    /// current registry version.
584    ///
585    /// Rehydration (the durable read of newly-acquired vnodes' partials) runs
586    /// *after* the coordinator lock is released so a slow object store can't
587    /// stall the checkpoint cadence; the recovered bytes are staged in
588    /// [`Self::rehydrated_vnode_state`] for operators to drain.
589    #[cfg(feature = "cluster")]
590    pub async fn adopt_assignment_snapshot(
591        &self,
592        snapshot: laminar_core::cluster::control::AssignmentSnapshot,
593    ) -> SnapshotAdoption {
594        let Some(registry) = self.vnode_registry.lock().clone() else {
595            return SnapshotAdoption::default();
596        };
597        if snapshot.version <= registry.assignment_version() {
598            return SnapshotAdoption {
599                adopted: false,
600                version: snapshot.version,
601                ..SnapshotAdoption::default()
602            };
603        }
604        let vnode_count = registry.vnode_count();
605        let new_assignment: Arc<[laminar_core::state::NodeId]> =
606            snapshot.to_vnode_vec(vnode_count).into();
607
608        let self_id = self
609            .cluster_controller
610            .lock()
611            .as_ref()
612            .map_or(laminar_core::state::NodeId(0), |c| {
613                laminar_core::state::NodeId(c.instance_id().0)
614            });
615
616        // Hold the coord mutex so registry + fence updates land between
617        // epochs. Snapshot the owned set on both sides of the swap to
618        // derive exactly which vnodes this node gained.
619        let mut guard = self.coordinator.lock().await;
620        let old_owned = laminar_core::state::owned_vnodes(&registry, self_id);
621        registry.set_assignment_and_version(new_assignment, snapshot.version);
622        let new_owned = laminar_core::state::owned_vnodes(&registry, self_id);
623        if let Some(backend) = self.state_backend.lock().clone() {
624            backend.set_authoritative_version(snapshot.version);
625        }
626        if let Some(coord) = guard.as_mut() {
627            coord.set_assignment_version(snapshot.version);
628            coord.set_vnode_set(new_owned.clone());
629            coord.set_gate_vnode_set((0..vnode_count).collect());
630        }
631        drop(guard);
632
633        // Newly-acquired vnodes = new \ old.
634        let old_set: std::collections::HashSet<u32> = old_owned.into_iter().collect();
635        let newly_acquired: Vec<u32> = new_owned
636            .into_iter()
637            .filter(|v| !old_set.contains(v))
638            .collect();
639
640        // Mark every newly-acquired vnode `Restoring` up front — before the
641        // (possibly slow) durable read below — so the operator suppresses
642        // emission for their keys from the moment the shuffle starts routing
643        // their rows here. Vnodes with no durable state are flipped back to
644        // `Active` immediately after the read; the ones we stage stay
645        // `Restoring` until the graph merges their state in.
646        if !newly_acquired.is_empty() {
647            registry.mark_restoring(&newly_acquired);
648        }
649
650        let mut adoption = SnapshotAdoption {
651            adopted: true,
652            version: snapshot.version,
653            newly_acquired: newly_acquired.clone(),
654            rehydrated: 0,
655            rehydration_epoch: None,
656        };
657
658        // Pull the last committed state for the gained vnodes off the
659        // shared durable backend so they don't resume from empty state.
660        // Clone the Arc out first so the (non-Send) lock guard is dropped
661        // before the rehydration await.
662        let backend = self.state_backend.lock().clone();
663        if let (false, Some(backend)) = (newly_acquired.is_empty(), backend) {
664            let report = crate::recovery_manager::VnodeRehydrator::new(backend.as_ref())
665                .rehydrate(&newly_acquired)
666                .await;
667            adoption.rehydrated = report.restored.len();
668            adoption.rehydration_epoch = report.epoch;
669            // Vnodes with no durable state to apply serve immediately — flip
670            // them back to `Active` so their emission isn't gated forever. The
671            // graph flips the staged ones once it merges their state in.
672            let no_state: Vec<u32> = newly_acquired
673                .iter()
674                .copied()
675                .filter(|v| !report.restored.contains_key(v))
676                .collect();
677            if !no_state.is_empty() {
678                registry.mark_active(&no_state);
679            }
680            if let Some(epoch) = report.epoch {
681                let mut staged = self.rehydrated_vnode_state.lock();
682                for (vnode, bytes) in report.restored {
683                    staged.insert(vnode, RehydratedVnode { epoch, bytes });
684                }
685            }
686        } else if !newly_acquired.is_empty() {
687            // No backend / nothing to rehydrate — clear the Restoring marks we
688            // optimistically set so emission isn't gated with no state coming.
689            registry.mark_active(&newly_acquired);
690        }
691
692        tracing::info!(
693            version = snapshot.version,
694            newly_acquired = adoption.newly_acquired.len(),
695            rehydrated = adoption.rehydrated,
696            rehydration_epoch = ?adoption.rehydration_epoch,
697            "adopted assignment snapshot",
698        );
699        adoption
700    }
701
702    /// Snapshot of committed per-vnode state staged for newly-acquired
703    /// vnodes during the most recent rebalance adoptions. Keyed by vnode.
704    /// Drained by operators that adopt the per-vnode state backend; exposed
705    /// for inspection and tests.
706    #[cfg(feature = "cluster")]
707    #[must_use]
708    pub fn rehydrated_vnode_state(&self) -> HashMap<u32, RehydratedVnode> {
709        self.rehydrated_vnode_state.lock().clone()
710    }
711
712    #[cfg(feature = "cluster")]
713    pub(crate) fn set_cluster_controller(
714        &self,
715        controller: Arc<laminar_core::cluster::control::ClusterController>,
716    ) {
717        // Serve this node's local MV/table rows to peers. Weak store handles
718        // (not `self`) avoid a reference cycle.
719        controller.register_query_handler(Arc::new(DbQueryHandler {
720            mv_store: Arc::downgrade(&self.mv_store),
721            table_store: Arc::downgrade(&self.table_store),
722            // Isolated context: a pushed `filter_sql` is compiled with only its
723            // temp table visible, so it can't reference other registered tables.
724            filter_ctx: SessionContext::new(),
725        }));
726        self.spawn_subscription_router(&controller);
727        *self.cluster_controller.lock() = Some(controller);
728        self.update_sql_cluster_context();
729    }
730
731    /// Router task: refreshes the gossip interest cache (cadence backs off when
732    /// idle) and drains remote `__sub::` batches to local subscribers. Weak
733    /// handles, so it exits once the `LaminarDB` is dropped.
734    #[cfg(feature = "cluster")]
735    fn spawn_subscription_router(
736        &self,
737        controller: &Arc<laminar_core::cluster::control::ClusterController>,
738    ) {
739        use std::collections::{HashMap, HashSet};
740
741        // Needs a running runtime; some unit tests install a controller without
742        // one. Skip — distributed routing is server-mode only.
743        if tokio::runtime::Handle::try_current().is_err() {
744            return;
745        }
746
747        let kv = Arc::clone(controller.kv());
748
749        // Skip entirely on backends that can't discover subscription interest
750        // (e.g. object store), rather than advertising interest nothing reads.
751        if !kv.supports_subscription_routing() {
752            tracing::info!(
753                "distributed SUBSCRIBE routing disabled: coordination backend has \
754                 no subscription-interest discovery"
755            );
756            return;
757        }
758
759        let active_subs = Arc::downgrade(&self.active_subs);
760        let subscription_registry = Arc::downgrade(&self.subscription_registry);
761        let shuffle_receiver = Arc::downgrade(&self.shuffle_receiver);
762
763        tokio::spawn(async move {
764            let mut tick = tokio::time::interval(SUB_ROUTER_TICK);
765            tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
766            let mut advertised: HashSet<String> = HashSet::new();
767            let mut until_refresh: u64 = 0;
768            loop {
769                tick.tick().await;
770
771                // Upgrade weak handles; bail once the DB has been dropped.
772                let (Some(active_subs), Some(registry), Some(receiver_slot)) = (
773                    active_subs.upgrade(),
774                    subscription_registry.upgrade(),
775                    shuffle_receiver.upgrade(),
776                ) else {
777                    break;
778                };
779
780                let local_names = registry.active_subscription_names();
781
782                // Refresh the producer-side interest cache when due.
783                if until_refresh == 0 {
784                    let mut map: HashMap<String, HashSet<u64>> = HashMap::new();
785                    for (node_id, key, value) in kv.scan_prefix("sub:").await {
786                        if !value.is_empty() {
787                            if let Some(name) = key.strip_prefix("sub:") {
788                                map.entry(name.to_string()).or_default().insert(node_id.0);
789                            }
790                        }
791                    }
792                    let idle = map.is_empty() && local_names.is_empty();
793                    *active_subs.write() = map;
794                    until_refresh = if idle {
795                        SUB_REFRESH_IDLE_TICKS
796                    } else {
797                        SUB_REFRESH_ACTIVE_TICKS
798                    };
799                }
800                until_refresh = until_refresh.saturating_sub(1);
801
802                // Advertise newly-added / clear newly-removed local interests
803                // (rare; only on subscription lifecycle changes).
804                for name in &local_names {
805                    if advertised.insert(name.clone()) {
806                        kv.write(&format!("sub:{name}"), "active".to_string()).await;
807                    }
808                }
809                let removed: Vec<String> = advertised
810                    .iter()
811                    .filter(|n| !local_names.contains(*n))
812                    .cloned()
813                    .collect();
814                for name in removed {
815                    kv.write(&format!("sub:{name}"), String::new()).await;
816                    advertised.remove(&name);
817                }
818
819                // Publish remote batches for locally-subscribed streams; one lock
820                // cycle drains every `__sub::` stage (dropped subs fall through
821                // `send_batch` as a no-op rather than piling up in `staged`).
822                if !local_names.is_empty() {
823                    let receiver = receiver_slot.lock().clone();
824                    if let Some(receiver) = receiver {
825                        for (stage, batches) in receiver
826                            .drain_staged_with_prefix(crate::subscription::REMOTE_STAGE_PREFIX)
827                        {
828                            if let Some(name) =
829                                crate::subscription::stream_from_remote_stage(&stage)
830                            {
831                                for batch in batches {
832                                    registry.send_batch(name, batch);
833                                }
834                            }
835                        }
836                    }
837                }
838            }
839        });
840    }
841
842    pub(crate) fn set_state_backend(&self, backend: Arc<dyn laminar_core::state::StateBackend>) {
843        *self.state_backend.lock() = Some(backend);
844    }
845
846    pub(crate) fn set_vnode_registry(&self, registry: Arc<laminar_core::state::VnodeRegistry>) {
847        *self.vnode_registry.lock() = Some(registry);
848        #[cfg(feature = "cluster")]
849        self.update_sql_cluster_context();
850    }
851
852    /// The underlying `DataFusion` `SessionContext`.
853    #[must_use]
854    pub fn session_context(&self) -> &SessionContext {
855        &self.ctx
856    }
857
858    /// Returns a fluent builder for constructing a [`LaminarDB`].
859    #[must_use]
860    pub fn builder() -> LaminarDbBuilder {
861        LaminarDbBuilder::new()
862    }
863
864    /// Register built-in connectors based on enabled features.
865    #[allow(unused_variables)]
866    fn register_builtin_connectors(registry: &laminar_connectors::registry::ConnectorRegistry) {
867        // Infra-free synthetic source; always available (used by demos
868        // and the cluster soak harness).
869        laminar_connectors::generator::register_generator_source(registry);
870        #[cfg(feature = "kafka")]
871        {
872            laminar_connectors::kafka::register_kafka_source(registry);
873            laminar_connectors::kafka::register_kafka_sink(registry);
874        }
875        #[cfg(feature = "postgres-cdc")]
876        {
877            laminar_connectors::cdc::postgres::register_postgres_cdc_source(registry);
878        }
879        #[cfg(feature = "postgres-sink")]
880        {
881            laminar_connectors::postgres::register_postgres_sink(registry);
882        }
883        #[cfg(feature = "delta-lake")]
884        {
885            laminar_connectors::lakehouse::register_delta_lake_sink(registry);
886            laminar_connectors::lakehouse::register_delta_lake_source(registry);
887        }
888        #[cfg(feature = "iceberg")]
889        {
890            laminar_connectors::lakehouse::register_iceberg_sink(registry);
891            laminar_connectors::lakehouse::register_iceberg_source(registry);
892        }
893        #[cfg(feature = "websocket")]
894        {
895            laminar_connectors::websocket::register_websocket_source(registry);
896            laminar_connectors::websocket::register_websocket_sink(registry);
897        }
898        #[cfg(feature = "mysql-cdc")]
899        {
900            laminar_connectors::cdc::mysql::register_mysql_cdc_source(registry);
901        }
902        #[cfg(feature = "mongodb-cdc")]
903        {
904            laminar_connectors::mongodb::register_mongodb_cdc_source(registry);
905            laminar_connectors::mongodb::register_mongodb_sink(registry);
906        }
907        #[cfg(feature = "files")]
908        {
909            laminar_connectors::files::register_file_source(registry);
910            laminar_connectors::files::register_file_sink(registry);
911        }
912        #[cfg(feature = "otel")]
913        {
914            laminar_connectors::otel::register_otel_source(registry);
915        }
916        #[cfg(feature = "nats")]
917        {
918            laminar_connectors::nats::register_nats_source(registry);
919            laminar_connectors::nats::register_nats_sink(registry);
920        }
921    }
922
923    /// Handle `CREATE LOOKUP TABLE` by registering the table in the
924    /// `TableStore`, `ConnectorManager`, `DataFusion` catalog, and lookup
925    /// registry.
926    fn handle_register_lookup_table(
927        &self,
928        info: laminar_sql::planner::LookupTableInfo,
929    ) -> Result<ExecuteResult, DbError> {
930        use laminar_sql::parser::lookup_table::ConnectorType;
931
932        if info.primary_key.len() != 1 {
933            return Err(DbError::InvalidOperation(
934                "Lookup table requires a single-column primary key".into(),
935            ));
936        }
937        let pk = info.primary_key[0].clone();
938
939        // Register in TableStore for PK-based upsert
940        let cache_mode = info.properties.cache_memory.map(|mem| {
941            let max_entries = cache_entries_from_memory(mem);
942            crate::table_cache_mode::TableCacheMode::Partial { max_entries }
943        });
944        if let Some(cache) = cache_mode {
945            self.table_store.write().create_table_with_cache(
946                &info.name,
947                info.arrow_schema.clone(),
948                &pk,
949                cache,
950            )?;
951        } else {
952            self.table_store
953                .write()
954                .create_table(&info.name, info.arrow_schema.clone(), &pk)?;
955        }
956
957        // For external connectors: register in ConnectorManager so
958        // start_connector_pipeline() handles snapshot + CDC loading
959        if !matches!(info.properties.connector, ConnectorType::Static) {
960            self.register_lookup_connector(&info, &pk)?;
961        }
962
963        // Register in DataFusion for SELECT/JOIN queries
964        {
965            let provider = crate::table_provider::ReferenceTableProvider::new(
966                info.name.clone(),
967                info.arrow_schema.clone(),
968                self.table_store.clone(),
969            );
970            let _ = self.ctx.deregister_table(&info.name);
971            self.ctx
972                .register_table(&info.name, Arc::new(provider))
973                .map_err(|e| {
974                    DbError::InvalidOperation(format!("Failed to register lookup table: {e}"))
975                })?;
976        }
977
978        // Register snapshot in the lookup registry so the physical
979        // planner can build LookupJoinExec nodes for JOIN queries.
980        if let Some(batch) = self.table_store.read().to_record_batch(&info.name) {
981            self.lookup_registry.register(
982                &info.name,
983                laminar_sql::datafusion::LookupSnapshot { batch },
984            );
985        }
986
987        // Register the logical optimizer rule so JOINs referencing
988        // this table are rewritten to LookupJoinNode.
989        self.refresh_lookup_optimizer_rule();
990
991        Ok(ExecuteResult::Ddl(DdlInfo {
992            statement_type: "CREATE LOOKUP TABLE".to_string(),
993            object_name: info.name,
994        }))
995    }
996
997    /// Register an external connector for a lookup table in the
998    /// `ConnectorManager` and `TableStore`.
999    #[allow(clippy::unnecessary_wraps)]
1000    fn register_lookup_connector(
1001        &self,
1002        info: &laminar_sql::planner::LookupTableInfo,
1003        pk: &str,
1004    ) -> Result<(), DbError> {
1005        use laminar_sql::parser::lookup_table::ConnectorType;
1006
1007        let connector_type_str = match &info.properties.connector {
1008            ConnectorType::Postgres => "postgres",
1009            ConnectorType::PostgresCdc => "postgres-cdc",
1010            ConnectorType::MysqlCdc => "mysql-cdc",
1011            ConnectorType::Redis => "redis",
1012            ConnectorType::S3Parquet => "s3-parquet",
1013            ConnectorType::DeltaLake => "delta-lake",
1014            ConnectorType::Custom(s) => s.as_str(),
1015            ConnectorType::Static => unreachable!(),
1016        };
1017
1018        self.table_store
1019            .write()
1020            .set_connector(&info.name, connector_type_str);
1021
1022        let refresh = match info.properties.strategy {
1023            laminar_sql::parser::lookup_table::LookupStrategy::Replicated
1024            | laminar_sql::parser::lookup_table::LookupStrategy::Partitioned => {
1025                // Standalone postgres uses snapshot-only (no CDC slot needed).
1026                if matches!(info.properties.connector, ConnectorType::Postgres) {
1027                    Some(laminar_connectors::reference::RefreshMode::SnapshotOnly)
1028                } else {
1029                    Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
1030                }
1031            }
1032            laminar_sql::parser::lookup_table::LookupStrategy::OnDemand => {
1033                Some(laminar_connectors::reference::RefreshMode::Manual)
1034            }
1035        };
1036
1037        // Build connector options and format options from raw WITH clause.
1038        // Keys consumed by LookupTableProperties are excluded; keys starting
1039        // with "format." are routed to format_options (prefix stripped).
1040        let consumed = [
1041            "connector",
1042            "strategy",
1043            "cache.memory",
1044            "cache.disk",
1045            "cache.ttl",
1046            "pushdown",
1047            "format",
1048        ];
1049        let mut connector_options = HashMap::with_capacity(info.raw_options.len());
1050        let mut format_options = HashMap::with_capacity(4);
1051        for (k, v) in &info.raw_options {
1052            let lower = k.to_lowercase();
1053            if consumed.contains(&lower.as_str()) {
1054                continue;
1055            }
1056            if let Some(suffix) = lower.strip_prefix("format.") {
1057                format_options.insert(suffix.to_string(), v.clone());
1058            } else {
1059                connector_options.insert(k.clone(), v.clone());
1060            }
1061        }
1062
1063        // The on-demand lookup cache is byte-weighted; carry the user's
1064        // cache_memory budget through as bytes (no lossy entry-count conversion).
1065        let cache_max_bytes = info
1066            .properties
1067            .cache_memory
1068            .map(|m| usize::try_from(m.as_bytes()).unwrap_or(usize::MAX));
1069
1070        // cache_ttl is specified in seconds; carry it as a Duration so the
1071        // partial lookup cache expires stale entries lazily on read.
1072        let cache_ttl = info
1073            .properties
1074            .cache_ttl
1075            .map(std::time::Duration::from_secs);
1076
1077        self.connector_manager
1078            .lock()
1079            .register_table(crate::connector_manager::TableRegistration {
1080                name: info.name.clone(),
1081                primary_key: pk.to_string(),
1082                connector_type: Some(connector_type_str.to_string()),
1083                connector_options,
1084                format: info.raw_options.get("format").cloned(),
1085                format_options,
1086                refresh,
1087                cache_max_bytes,
1088                cache_ttl,
1089            });
1090
1091        Ok(())
1092    }
1093
1094    /// Replaces the `LookupJoinRewriteRule` on the `DataFusion` context
1095    /// with one that knows the current set of registered lookup tables.
1096    fn refresh_lookup_optimizer_rule(&self) {
1097        use laminar_sql::planner::lookup_join::{LookupColumnPruningRule, LookupJoinRewriteRule};
1098        use laminar_sql::planner::predicate_split::{
1099            PlanPushdownMode, PlanSourceCapabilities, PredicateSplitterRule,
1100            SourceCapabilitiesRegistry,
1101        };
1102
1103        // Remove old rules if present
1104        self.ctx.remove_optimizer_rule("lookup_join_rewrite");
1105        self.ctx.remove_optimizer_rule("predicate_splitter");
1106        self.ctx.remove_optimizer_rule("lookup_column_pruning");
1107
1108        let tables = self.planner.lock().lookup_tables_cloned();
1109        if tables.is_empty() {
1110            return;
1111        }
1112
1113        // Build capabilities registry from table properties
1114        let mut caps_registry = SourceCapabilitiesRegistry::default();
1115        for (name, info) in &tables {
1116            let mode = match info.properties.pushdown_mode {
1117                laminar_sql::parser::lookup_table::PushdownMode::Enabled
1118                | laminar_sql::parser::lookup_table::PushdownMode::Auto => PlanPushdownMode::Full,
1119                laminar_sql::parser::lookup_table::PushdownMode::Disabled => PlanPushdownMode::None,
1120            };
1121            let pk_set: std::collections::HashSet<String> =
1122                info.primary_key.iter().cloned().collect();
1123            caps_registry.register(
1124                name.clone(),
1125                PlanSourceCapabilities {
1126                    pushdown_mode: mode,
1127                    eq_columns: pk_set,
1128                    range_columns: std::collections::HashSet::new(),
1129                    in_columns: std::collections::HashSet::new(),
1130                    supports_null_check: false,
1131                },
1132            );
1133        }
1134
1135        // Register rules in order: rewrite → predicate split → column pruning
1136        self.ctx
1137            .add_optimizer_rule(Arc::new(LookupJoinRewriteRule::new(tables)));
1138        self.ctx
1139            .add_optimizer_rule(Arc::new(PredicateSplitterRule::new(caps_registry)));
1140        self.ctx
1141            .add_optimizer_rule(Arc::new(LookupColumnPruningRule));
1142    }
1143
1144    /// Returns the connector registry for registering custom connectors.
1145    ///
1146    /// Use this to register user-defined source/sink connectors before
1147    /// calling `start()`.
1148    #[must_use]
1149    pub fn connector_registry(&self) -> &laminar_connectors::registry::ConnectorRegistry {
1150        &self.connector_registry
1151    }
1152
1153    /// Register a custom scalar UDF on the `SessionContext`.
1154    ///
1155    /// Called by `LaminarDbBuilder::build()` after construction.
1156    pub(crate) fn register_custom_udf(&self, udf: datafusion_expr::ScalarUDF) {
1157        self.ctx.register_udf(udf);
1158    }
1159
1160    /// Register a custom aggregate UDF (UDAF) on the `SessionContext`.
1161    ///
1162    /// Called by `LaminarDbBuilder::build()` after construction.
1163    pub(crate) fn register_custom_udaf(&self, udaf: datafusion_expr::AggregateUDF) {
1164        self.ctx.register_udaf(udaf);
1165    }
1166
1167    /// Registers a Delta Lake table as a `DataFusion` `TableProvider`.
1168    ///
1169    /// After registration, the table can be queried via SQL:
1170    /// ```sql
1171    /// SELECT * FROM my_delta_table WHERE id > 100
1172    /// ```
1173    ///
1174    /// # Arguments
1175    ///
1176    /// * `name` - SQL table name (e.g., `"trades"`)
1177    /// * `table_uri` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
1178    /// * `storage_options` - Storage credentials and configuration
1179    ///
1180    /// # Errors
1181    ///
1182    /// Returns `DbError` if the table cannot be opened or registered.
1183    #[cfg(feature = "delta-lake")]
1184    pub async fn register_delta_table(
1185        &self,
1186        name: &str,
1187        table_uri: &str,
1188        storage_options: HashMap<String, String>,
1189    ) -> Result<(), DbError> {
1190        laminar_connectors::lakehouse::delta_table_provider::register_delta_table(
1191            &self.ctx,
1192            name,
1193            table_uri,
1194            storage_options,
1195        )
1196        .await
1197        .map_err(DbError::from)
1198    }
1199
1200    /// Execute a SQL statement.
1201    ///
1202    /// Supports:
1203    /// - `CREATE SOURCE` / `CREATE SINK` — registers sources and sinks
1204    /// - `DROP SOURCE` / `DROP SINK` — removes sources and sinks
1205    /// - `SHOW SOURCES` / `SHOW SINKS` / `SHOW QUERIES` — list registered objects
1206    /// - `DESCRIBE source_name` — show source schema
1207    /// - `SELECT ...` — execute a streaming query
1208    /// - `INSERT INTO source_name VALUES (...)` — insert data
1209    /// - `CREATE MATERIALIZED VIEW` — create a streaming materialized view
1210    /// - `EXPLAIN SELECT ...` — show query plan
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns `DbError` if SQL parsing, planning, or execution fails.
1215    pub async fn execute(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1216        if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1217            return Err(DbError::Shutdown);
1218        }
1219
1220        // Apply config variable substitution
1221        let resolved = if self.config_vars.is_empty() {
1222            sql.to_string()
1223        } else {
1224            sql_utils::resolve_config_vars(sql, &self.config_vars, true)?
1225        };
1226
1227        // Split into multiple statements
1228        let stmts = sql_utils::split_statements(&resolved);
1229        if stmts.is_empty() {
1230            return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1231        }
1232
1233        // Execute each statement, return the last result (or first error)
1234        let mut last_result = None;
1235        for stmt_sql in &stmts {
1236            last_result = Some(self.execute_single(stmt_sql).await?);
1237        }
1238
1239        last_result.ok_or_else(|| DbError::InvalidOperation("Empty SQL statement".into()))
1240    }
1241
1242    /// Execute a single SQL statement.
1243    #[allow(clippy::too_many_lines)]
1244    async fn execute_single(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1245        let statements = parse_streaming_sql(sql)?;
1246
1247        if statements.is_empty() {
1248            return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1249        }
1250
1251        let statement = &statements[0];
1252
1253        let result = match statement {
1254            StreamingStatement::CreateSource(create) => {
1255                let result = self.handle_create_source(create).await?;
1256                if let ExecuteResult::Ddl(ref info) = result {
1257                    self.connector_manager
1258                        .lock()
1259                        .store_ddl(&info.object_name, sql);
1260                }
1261                Ok(result)
1262            }
1263            StreamingStatement::CreateSink(create) => {
1264                let result = self.handle_create_sink(create)?;
1265                if let ExecuteResult::Ddl(ref info) = result {
1266                    self.connector_manager
1267                        .lock()
1268                        .store_ddl(&info.object_name, sql);
1269                }
1270                Ok(result)
1271            }
1272            StreamingStatement::CreateStream {
1273                name,
1274                query,
1275                emit_clause,
1276                query_sql,
1277                retention_bytes,
1278                ..
1279            } => {
1280                let result = self
1281                    .handle_create_stream(
1282                        name,
1283                        query,
1284                        emit_clause.as_ref(),
1285                        query_sql,
1286                        *retention_bytes,
1287                    )
1288                    .await?;
1289                if let ExecuteResult::Ddl(ref info) = result {
1290                    self.connector_manager
1291                        .lock()
1292                        .store_ddl(&info.object_name, sql);
1293                }
1294                Ok(result)
1295            }
1296            StreamingStatement::CreateContinuousQuery { .. }
1297            | StreamingStatement::CreateLookupTable(_)
1298            | StreamingStatement::DropLookupTable { .. } => self.handle_query(sql).await,
1299            StreamingStatement::Standard(stmt) => {
1300                if let sqlparser::ast::Statement::CreateTable(ct) = stmt.as_ref() {
1301                    let result = self.handle_create_table(ct)?;
1302                    if let ExecuteResult::Ddl(ref info) = result {
1303                        self.connector_manager
1304                            .lock()
1305                            .store_ddl(&info.object_name, sql);
1306                    }
1307                    Ok(result)
1308                } else if let sqlparser::ast::Statement::Drop {
1309                    object_type: sqlparser::ast::ObjectType::Table,
1310                    names,
1311                    if_exists,
1312                    ..
1313                } = stmt.as_ref()
1314                {
1315                    self.handle_drop_table(names, *if_exists)
1316                } else if let sqlparser::ast::Statement::Set(set_stmt) = stmt.as_ref() {
1317                    self.handle_set(set_stmt)
1318                } else {
1319                    self.handle_query(sql).await
1320                }
1321            }
1322            StreamingStatement::InsertInto {
1323                table_name,
1324                columns,
1325                values,
1326            } => self.handle_insert_into(table_name, columns, values).await,
1327            StreamingStatement::DropSource {
1328                name,
1329                if_exists,
1330                cascade,
1331            } => self.handle_drop_source(name, *if_exists, *cascade),
1332            StreamingStatement::DropSink {
1333                name,
1334                if_exists,
1335                cascade,
1336            } => self.handle_drop_sink(name, *if_exists, *cascade),
1337            StreamingStatement::DropStream {
1338                name,
1339                if_exists,
1340                cascade,
1341            } => self.handle_drop_stream(name, *if_exists, *cascade),
1342            StreamingStatement::DropMaterializedView {
1343                name,
1344                if_exists,
1345                cascade,
1346            } => self.handle_drop_materialized_view(name, *if_exists, *cascade),
1347            StreamingStatement::Show(cmd) => {
1348                let batch = match cmd {
1349                    ShowCommand::Sources => self.build_show_sources(),
1350                    ShowCommand::Sinks => self.build_show_sinks(),
1351                    ShowCommand::Queries => self.build_show_queries(),
1352                    ShowCommand::MaterializedViews => self.build_show_materialized_views(),
1353                    ShowCommand::Streams => self.build_show_streams(),
1354                    ShowCommand::Tables => self.build_show_tables(),
1355                    ShowCommand::CheckpointStatus => self.build_show_checkpoint_status().await?,
1356                    ShowCommand::CreateSource { name } => {
1357                        self.build_show_create_source(&name.to_string())?
1358                    }
1359                    ShowCommand::CreateSink { name } => {
1360                        self.build_show_create_sink(&name.to_string())?
1361                    }
1362                };
1363                Ok(ExecuteResult::Metadata(batch))
1364            }
1365            StreamingStatement::Checkpoint => {
1366                let result = self.checkpoint().await?;
1367                Ok(ExecuteResult::Ddl(DdlInfo {
1368                    statement_type: "CHECKPOINT".to_string(),
1369                    object_name: format!("checkpoint_{}", result.checkpoint_id),
1370                }))
1371            }
1372            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1373                self.handle_restore_checkpoint(*checkpoint_id)
1374            }
1375            StreamingStatement::Describe { name, .. } => {
1376                let name_str = name.to_string();
1377                let batch = self.build_describe(&name_str)?;
1378                Ok(ExecuteResult::Metadata(batch))
1379            }
1380            StreamingStatement::Explain {
1381                statement, analyze, ..
1382            } => {
1383                if *analyze {
1384                    self.handle_explain_analyze(statement, sql).await
1385                } else {
1386                    self.handle_explain(statement)
1387                }
1388            }
1389            StreamingStatement::CreateMaterializedView {
1390                name,
1391                query,
1392                emit_clause,
1393                or_replace,
1394                if_not_exists,
1395                query_sql,
1396                ..
1397            } => {
1398                let result = self
1399                    .handle_create_materialized_view(
1400                        sql,
1401                        name,
1402                        query,
1403                        emit_clause.clone(),
1404                        *or_replace,
1405                        *if_not_exists,
1406                        query_sql,
1407                    )
1408                    .await?;
1409                if let ExecuteResult::Ddl(ref info) = result {
1410                    self.connector_manager
1411                        .lock()
1412                        .store_ddl(&info.object_name, sql);
1413                }
1414                Ok(result)
1415            }
1416            StreamingStatement::AlterSource { name, operation } => {
1417                self.handle_alter_source(name, operation)
1418            }
1419            StreamingStatement::Subscribe(_) => Err(DbError::InvalidOperation(
1420                "SUBSCRIBE requires the pgwire endpoint, not HTTP /api/v1/sql".into(),
1421            )),
1422            StreamingStatement::DeclareCursorForSubscribe { .. } => Err(DbError::InvalidOperation(
1423                "DECLARE CURSOR FOR SUBSCRIBE requires the pgwire endpoint, not HTTP /api/v1/sql"
1424                    .into(),
1425            )),
1426        };
1427
1428        #[cfg(feature = "cluster")]
1429        if let Ok(ExecuteResult::Ddl(ref info)) = &result {
1430            if info.statement_type != "CHECKPOINT" {
1431                self.persist_catalog_manifest().await;
1432            }
1433        }
1434
1435        result
1436    }
1437
1438    /// Handle INSERT INTO statement.
1439    ///
1440    /// Inserts SQL VALUES into a registered source, a `TableStore`-managed
1441    /// table (with PK upsert), or a plain `DataFusion` `MemTable`.
1442    async fn handle_insert_into(
1443        &self,
1444        table_name: &sqlparser::ast::ObjectName,
1445        _columns: &[sqlparser::ast::Ident],
1446        values: &[Vec<sqlparser::ast::Expr>],
1447    ) -> Result<ExecuteResult, DbError> {
1448        let name = table_name.to_string();
1449
1450        // Try inserting into a registered source
1451        if let Some(entry) = self.catalog.get_source(&name) {
1452            let batch = sql_utils::sql_values_to_record_batch(&entry.schema, values)?;
1453            entry
1454                .push_and_buffer(batch)
1455                .map_err(|e| DbError::InsertError(format!("Failed to push to source: {e}")))?;
1456            return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1457        }
1458
1459        // Try inserting into a TableStore-managed table (with PK upsert).
1460        // Single lock scope avoids TOCTOU race between has_table/schema/upsert.
1461        {
1462            let mut ts = self.table_store.write();
1463            if ts.has_table(&name) {
1464                let schema = ts
1465                    .table_schema(&name)
1466                    .ok_or_else(|| DbError::TableNotFound(name.clone()))?;
1467                let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1468                ts.upsert(&name, &batch)?;
1469                drop(ts); // release before sync (which may also lock)
1470
1471                self.sync_table_to_datafusion(&name)?;
1472                return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1473            }
1474        }
1475
1476        // Otherwise, insert into a DataFusion MemTable
1477        // Look up the table provider
1478        let table = self
1479            .ctx
1480            .table_provider(&name)
1481            .await
1482            .map_err(|_| DbError::TableNotFound(name.clone()))?;
1483
1484        let schema = table.schema();
1485        let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1486
1487        // Deregister the old table, then re-register with the new data
1488        self.ctx
1489            .deregister_table(&name)
1490            .map_err(|e| DbError::InsertError(format!("Failed to deregister table: {e}")))?;
1491
1492        let mem_table =
1493            datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![batch]])
1494                .map_err(|e| DbError::InsertError(format!("Failed to create table: {e}")))?;
1495
1496        self.ctx
1497            .register_table(&name, Arc::new(mem_table))
1498            .map_err(|e| DbError::InsertError(format!("Failed to register table: {e}")))?;
1499
1500        Ok(ExecuteResult::RowsAffected(values.len() as u64))
1501    }
1502
1503    /// Handle RESTORE FROM CHECKPOINT statement (not yet implemented).
1504    ///
1505    /// Will eventually stop the pipeline, reload state from the checkpoint
1506    /// manifest, seek source offsets, and restart the pipeline.
1507    #[allow(clippy::unused_self)] // will use self when restore is implemented
1508    fn handle_restore_checkpoint(&self, _checkpoint_id: u64) -> Result<ExecuteResult, DbError> {
1509        Err(DbError::Unsupported(
1510            "RESTORE FROM CHECKPOINT is not yet implemented — \
1511             requires pipeline stop, state reload from manifest, \
1512             source offset seek, and pipeline restart"
1513                .to_string(),
1514        ))
1515    }
1516
1517    /// Get a session property value.
1518    #[must_use]
1519    pub fn get_session_property(&self, key: &str) -> Option<String> {
1520        self.session_properties
1521            .lock()
1522            .get(&key.to_lowercase())
1523            .cloned()
1524    }
1525
1526    /// Get all session properties.
1527    #[must_use]
1528    pub fn session_properties(&self) -> HashMap<String, String> {
1529        self.session_properties.lock().clone()
1530    }
1531
1532    /// Set a session property. Keys are lowercased.
1533    pub fn set_session_property(&self, key: &str, value: &str) {
1534        self.session_properties
1535            .lock()
1536            .insert(key.to_lowercase(), value.to_string());
1537    }
1538
1539    /// Subscribe to a named stream or materialized view.
1540    ///
1541    /// # Errors
1542    ///
1543    /// Returns `DbError::StreamNotFound` if the stream is not registered.
1544    pub fn subscribe<T: crate::handle::FromBatch>(
1545        &self,
1546        name: &str,
1547    ) -> Result<crate::handle::TypedSubscription<T>, DbError> {
1548        let sub = self
1549            .catalog
1550            .get_stream_subscription(name)
1551            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
1552        Ok(crate::handle::TypedSubscription::from_raw(sub))
1553    }
1554
1555    /// Subscribe to a named stream's output.
1556    ///
1557    /// # Errors
1558    ///
1559    /// Returns `DbError::StreamNotFound` if the stream doesn't exist.
1560    #[cfg(feature = "api")]
1561    pub fn subscribe_raw(
1562        &self,
1563        name: &str,
1564    ) -> Result<laminar_core::streaming::Subscription<crate::catalog::ArrowRecord>, DbError> {
1565        self.catalog
1566            .get_stream_subscription(name)
1567            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))
1568    }
1569
1570    /// Schema a `SUBSCRIBE` against `name` would emit, plus a `filterable`
1571    /// flag that's `false` only when the schema came from a `StreamEntry`
1572    /// sink placeholder (`Schema::empty`) — a `WHERE` clause can't compile
1573    /// against that and must be rejected.
1574    ///
1575    /// Lookup order: MV registry → `start()`-resolved stream output →
1576    /// `StreamEntry` sink (placeholder). A bare SOURCE is intentionally not
1577    /// resolved: only streams/MVs defined over it publish to the registry, so
1578    /// subscribing to the source directly would block forever — it falls
1579    /// through to `None`, which the caller surfaces as `StreamNotFound`.
1580    #[must_use]
1581    pub fn lookup_subscription_schema(
1582        &self,
1583        name: &str,
1584    ) -> Option<(arrow_schema::SchemaRef, bool)> {
1585        if let Some(mv) = self.mv_registry.lock().get(name).cloned() {
1586            return Some((mv.schema, true));
1587        }
1588        if let Some(schema) = self.stream_schemas.read().get(name).cloned() {
1589            return Some((schema, true));
1590        }
1591        if let Some(entry) = self.catalog.get_stream_entry(name) {
1592            return Some((entry.sink.schema(), false));
1593        }
1594        None
1595    }
1596
1597    /// Open a SUBSCRIBE portal against a named MV or resolved stream. A bare
1598    /// SOURCE is not subscribable (surfaced as `StreamNotFound`).
1599    /// `filter_sql` is rejected on streams (their schema is opaque).
1600    ///
1601    /// # Errors
1602    /// `StreamNotFound` for unknown `name`; `Pipeline` for subscriber-cap
1603    /// or filter-compile failures; `InvalidOperation` when `AsOfEpoch(n)`
1604    /// is requested but `n` is no longer retained.
1605    pub async fn open_subscription(
1606        &self,
1607        name: &str,
1608        filter_sql: Option<&str>,
1609        start: crate::subscription::SubscribeStart,
1610    ) -> Result<crate::subscription::SubscriptionPortal, DbError> {
1611        let attached = self.subscription_registry.subscriber_count(name);
1612        if attached >= crate::subscription::MAX_SUBSCRIBERS_PER_MV {
1613            return Err(DbError::Pipeline(format!(
1614                "subscriber cap reached for '{name}' ({attached}/{})",
1615                crate::subscription::MAX_SUBSCRIBERS_PER_MV
1616            )));
1617        }
1618
1619        let (schema, filterable) = self
1620            .lookup_subscription_schema(name)
1621            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
1622
1623        let filter = match filter_sql {
1624            None => None,
1625            Some(_) if !filterable => {
1626                return Err(DbError::Pipeline(format!(
1627                    "WHERE on '{name}' is not supported: stream output schema \
1628                     was not resolved (likely a planner failure at start())"
1629                )));
1630            }
1631            Some(sql) => Some(crate::filter_compile::compile(&self.ctx, sql, &schema).await?),
1632        };
1633
1634        let (replay, rx) = self
1635            .subscription_registry
1636            .subscribe(name, start)
1637            .map_err(|e| {
1638                let requested = match start {
1639                    crate::subscription::SubscribeStart::AsOfEpoch(n) => n,
1640                    crate::subscription::SubscribeStart::Tail => 0,
1641                };
1642                DbError::InvalidOperation(format!(
1643                    "epoch {requested} for stream '{name}' is no longer retained \
1644                     (earliest retained is {})",
1645                    e.earliest_retained
1646                ))
1647            })?;
1648        Ok(match filter {
1649            Some(phys) => crate::subscription::SubscriptionPortal::open_with_filter(
1650                name, schema, replay, rx, phys,
1651            ),
1652            None => crate::subscription::SubscriptionPortal::open(name, schema, replay, rx),
1653        })
1654    }
1655
1656    /// Handle EXPLAIN statement — show the streaming query plan.
1657    fn handle_explain(&self, statement: &StreamingStatement) -> Result<ExecuteResult, DbError> {
1658        let mut planner = self.planner.lock();
1659
1660        // Plan the inner statement to extract streaming info
1661        let plan_result = planner.plan(statement);
1662
1663        let mut rows: Vec<(String, String)> = Vec::new();
1664
1665        match plan_result {
1666            Ok(plan) => {
1667                rows.push((
1668                    "plan_type".into(),
1669                    match &plan {
1670                        laminar_sql::planner::StreamingPlan::Query(_) => "Query",
1671                        laminar_sql::planner::StreamingPlan::RegisterSource(_) => "RegisterSource",
1672                        laminar_sql::planner::StreamingPlan::RegisterSink(_) => "RegisterSink",
1673                        laminar_sql::planner::StreamingPlan::Standard(_) => "Standard",
1674                        laminar_sql::planner::StreamingPlan::RegisterLookupTable(_) => {
1675                            "RegisterLookupTable"
1676                        }
1677                        laminar_sql::planner::StreamingPlan::DropLookupTable { .. } => {
1678                            "DropLookupTable"
1679                        }
1680                    }
1681                    .into(),
1682                ));
1683                match &plan {
1684                    laminar_sql::planner::StreamingPlan::Query(qp) => {
1685                        if let Some(name) = &qp.name {
1686                            rows.push(("query_name".into(), name.clone()));
1687                        }
1688                        if let Some(wc) = &qp.window_config {
1689                            rows.push(("window".into(), format!("{wc}")));
1690                        }
1691                        if let Some(jcs) = &qp.join_config {
1692                            if jcs.len() == 1 {
1693                                rows.push(("join".into(), format!("{}", jcs[0])));
1694                            } else {
1695                                for (i, jc) in jcs.iter().enumerate() {
1696                                    rows.push((format!("join_step_{}", i + 1), format!("{jc}")));
1697                                }
1698                            }
1699                        }
1700                        if let Some(oc) = &qp.order_config {
1701                            rows.push(("order_by".into(), format!("{oc:?}")));
1702                        }
1703                        if let Some(fc) = &qp.frame_config {
1704                            rows.push((
1705                                "frame_functions".into(),
1706                                format!("{}", fc.functions.len()),
1707                            ));
1708                        }
1709                        if let Some(ec) = &qp.emit_clause {
1710                            rows.push(("emit".into(), format!("{ec}")));
1711                        }
1712                    }
1713                    laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1714                        rows.push(("source".into(), info.name.clone()));
1715                    }
1716                    laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1717                        rows.push(("sink".into(), info.name.clone()));
1718                    }
1719                    laminar_sql::planner::StreamingPlan::Standard(_) => {
1720                        rows.push(("execution".into(), "DataFusion pass-through".into()));
1721                    }
1722                    laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1723                        rows.push(("lookup_table".into(), info.name.clone()));
1724                    }
1725                    laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1726                        rows.push(("drop_lookup_table".into(), name.clone()));
1727                    }
1728                }
1729            }
1730            Err(e) => {
1731                // Even if planning fails, show what we know
1732                rows.push(("error".into(), format!("{e}")));
1733                rows.push((
1734                    "statement".into(),
1735                    format!("{:?}", std::mem::discriminant(statement)),
1736                ));
1737            }
1738        }
1739
1740        let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1741        let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1742
1743        let schema = Arc::new(Schema::new(vec![
1744            Field::new("plan_key", DataType::Utf8, false),
1745            Field::new("plan_value", DataType::Utf8, false),
1746        ]));
1747
1748        let batch = RecordBatch::try_new(
1749            schema,
1750            vec![
1751                Arc::new(StringArray::from(keys)),
1752                Arc::new(StringArray::from(values)),
1753            ],
1754        )
1755        .map_err(|e| DbError::InvalidOperation(format!("explain metadata: {e}")))?;
1756
1757        Ok(ExecuteResult::Metadata(batch))
1758    }
1759
1760    /// Handle EXPLAIN ANALYZE: run the plan and collect execution metrics.
1761    async fn handle_explain_analyze(
1762        &self,
1763        statement: &StreamingStatement,
1764        original_sql: &str,
1765    ) -> Result<ExecuteResult, DbError> {
1766        // First get the normal EXPLAIN output
1767        let explain_result = self.handle_explain(statement)?;
1768        let mut rows: Vec<(String, String)> = Vec::new();
1769
1770        if let ExecuteResult::Metadata(explain_batch) = &explain_result {
1771            let keys_col = explain_batch
1772                .column(0)
1773                .as_any()
1774                .downcast_ref::<StringArray>();
1775            let vals_col = explain_batch
1776                .column(1)
1777                .as_any()
1778                .downcast_ref::<StringArray>();
1779            if let (Some(keys), Some(vals)) = (keys_col, vals_col) {
1780                for i in 0..explain_batch.num_rows() {
1781                    rows.push((keys.value(i).to_string(), vals.value(i).to_string()));
1782                }
1783            }
1784        }
1785
1786        // Extract the inner SQL from the original EXPLAIN ANALYZE statement
1787        let upper = original_sql.to_uppercase();
1788        let inner_start = upper.find("ANALYZE").map_or(0, |pos| pos + "ANALYZE".len());
1789        let inner_sql = original_sql[inner_start..].trim();
1790
1791        // Try to execute the inner query via DataFusion and collect metrics
1792        let start = std::time::Instant::now();
1793        match self.ctx.sql(inner_sql).await {
1794            Ok(df) => match df.collect().await {
1795                Ok(batches) => {
1796                    let elapsed = start.elapsed();
1797                    let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1798                    rows.push(("rows_produced".into(), total_rows.to_string()));
1799                    rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1800                    rows.push(("batches_processed".into(), batches.len().to_string()));
1801                }
1802                Err(e) => {
1803                    let elapsed = start.elapsed();
1804                    rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1805                    rows.push(("analyze_error".into(), format!("{e}")));
1806                }
1807            },
1808            Err(e) => {
1809                rows.push(("analyze_error".into(), format!("{e}")));
1810            }
1811        }
1812
1813        let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1814        let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1815
1816        let schema = Arc::new(Schema::new(vec![
1817            Field::new("plan_key", DataType::Utf8, false),
1818            Field::new("plan_value", DataType::Utf8, false),
1819        ]));
1820
1821        let batch = RecordBatch::try_new(
1822            schema,
1823            vec![
1824                Arc::new(StringArray::from(keys)),
1825                Arc::new(StringArray::from(values)),
1826            ],
1827        )
1828        .map_err(|e| DbError::InvalidOperation(format!("explain analyze metadata: {e}")))?;
1829
1830        Ok(ExecuteResult::Metadata(batch))
1831    }
1832
1833    /// Handle a streaming or standard SQL query.
1834    #[allow(clippy::too_many_lines)]
1835    pub(crate) async fn handle_query(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1836        // Synchronous planning under the lock — released before any await
1837        let plan = {
1838            let statements = parse_streaming_sql(sql)?;
1839            if statements.is_empty() {
1840                return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1841            }
1842            let mut planner = self.planner.lock();
1843            planner
1844                .plan(&statements[0])
1845                .map_err(laminar_sql::Error::from)?
1846        };
1847
1848        match plan {
1849            laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1850                Ok(ExecuteResult::Ddl(DdlInfo {
1851                    statement_type: "DDL".to_string(),
1852                    object_name: info.name,
1853                }))
1854            }
1855            laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1856                Ok(ExecuteResult::Ddl(DdlInfo {
1857                    statement_type: "DDL".to_string(),
1858                    object_name: info.name,
1859                }))
1860            }
1861            laminar_sql::planner::StreamingPlan::Query(query_plan) => {
1862                // Check for ASOF join — DataFusion can't parse ASOF syntax
1863                if let Some(asof_config) = Self::extract_asof_config(&query_plan) {
1864                    return self.execute_asof_query(&asof_config, sql).await;
1865                }
1866
1867                let plan_sql = query_plan.statement.to_string();
1868                let logical_plan = self.ctx.state().create_logical_plan(&plan_sql).await?;
1869
1870                // DataFusion interpreted execution.
1871                let df = self.ctx.execute_logical_plan(logical_plan).await?;
1872                let stream = df.execute_stream().await?;
1873
1874                Ok(self.bridge_query_stream(sql, stream))
1875            }
1876            laminar_sql::planner::StreamingPlan::Standard(stmt) => {
1877                // Async execution without the lock
1878                let sql_str = stmt.to_string();
1879                let df = self.ctx.sql(&sql_str).await?;
1880                let stream = df.execute_stream().await?;
1881
1882                Ok(self.bridge_query_stream(sql, stream))
1883            }
1884            laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1885                self.handle_register_lookup_table(info)
1886            }
1887            laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1888                self.table_store.write().drop_table(&name);
1889                self.connector_manager.lock().unregister_table(&name);
1890                let _ = self.ctx.deregister_table(&name);
1891                self.lookup_registry.unregister(&name);
1892                self.refresh_lookup_optimizer_rule();
1893                Ok(ExecuteResult::Ddl(DdlInfo {
1894                    statement_type: "DROP LOOKUP TABLE".to_string(),
1895                    object_name: name,
1896                }))
1897            }
1898        }
1899    }
1900
1901    /// Bridge a `DataFusion` `SendableRecordBatchStream` into the streaming
1902    /// subscription infrastructure and return a `QueryHandle`.
1903    fn bridge_query_stream(
1904        &self,
1905        sql: &str,
1906        stream: datafusion::physical_plan::SendableRecordBatchStream,
1907    ) -> ExecuteResult {
1908        let query_id = self.catalog.register_query(sql);
1909        let schema = stream.schema();
1910
1911        let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1912        let (source, sink) =
1913            streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1914
1915        let subscription = sink.subscribe();
1916
1917        let cancel_token = tokio_util::sync::CancellationToken::new();
1918        let cancel_token_clone = cancel_token.clone();
1919
1920        let source_clone = source.clone();
1921        let catalog = Arc::clone(&self.catalog);
1922        let query_id_clone = query_id;
1923        tokio::spawn(async move {
1924            use tokio_stream::StreamExt;
1925            let mut stream = stream;
1926            loop {
1927                tokio::select! {
1928                    () = cancel_token_clone.cancelled() => {
1929                        break;
1930                    }
1931                    result = stream.next() => {
1932                        match result {
1933                            Some(Ok(batch)) => {
1934                                if source_clone.push_arrow(batch).is_err() {
1935                                    break;
1936                                }
1937                            }
1938                            _ => break,
1939                        }
1940                    }
1941                }
1942            }
1943            drop(source_clone);
1944            catalog.deactivate_query(query_id_clone);
1945        });
1946
1947        ExecuteResult::Query(QueryHandle {
1948            id: query_id,
1949            schema,
1950            sql: sql.to_string(),
1951            subscription: Some(subscription),
1952            active: true,
1953            cancel_token,
1954        })
1955    }
1956
1957    /// Extract an ASOF join config from a query plan, if present.
1958    fn extract_asof_config(
1959        plan: &laminar_sql::planner::QueryPlan,
1960    ) -> Option<AsofJoinTranslatorConfig> {
1961        plan.join_config.as_ref()?.iter().find_map(|jc| {
1962            if let JoinOperatorConfig::Asof(cfg) = jc {
1963                Some(cfg.clone())
1964            } else {
1965                None
1966            }
1967        })
1968    }
1969
1970    /// Execute an ASOF join query by fetching left/right tables separately
1971    /// and performing the join in-process (bypasses `DataFusion`'s SQL parser
1972    /// which doesn't understand ASOF syntax).
1973    async fn execute_asof_query(
1974        &self,
1975        asof_config: &AsofJoinTranslatorConfig,
1976        original_sql: &str,
1977    ) -> Result<ExecuteResult, DbError> {
1978        let left_sql = format!("SELECT * FROM {}", asof_config.left_table);
1979        let right_sql = format!("SELECT * FROM {}", asof_config.right_table);
1980
1981        let left_batches = self
1982            .ctx
1983            .sql(&left_sql)
1984            .await
1985            .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?
1986            .collect()
1987            .await
1988            .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?;
1989
1990        let right_batches = self
1991            .ctx
1992            .sql(&right_sql)
1993            .await
1994            .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?
1995            .collect()
1996            .await
1997            .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?;
1998
1999        let result_batch =
2000            crate::asof_batch::execute_asof_join_batch(&left_batches, &right_batches, asof_config)?;
2001
2002        if result_batch.num_rows() == 0 {
2003            let query_id = self.catalog.register_query(original_sql);
2004            self.catalog.deactivate_query(query_id);
2005            return Ok(ExecuteResult::Query(QueryHandle {
2006                id: query_id,
2007                schema: result_batch.schema(),
2008                sql: original_sql.to_string(),
2009                subscription: None,
2010                active: false,
2011                cancel_token: tokio_util::sync::CancellationToken::new(),
2012            }));
2013        }
2014
2015        let schema = result_batch.schema();
2016        let mem_table =
2017            datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![result_batch]])
2018                .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2019
2020        let _ = self.ctx.deregister_table("__asof_result");
2021        self.ctx
2022            .register_table("__asof_result", Arc::new(mem_table))
2023            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2024
2025        let df = self
2026            .ctx
2027            .sql("SELECT * FROM __asof_result")
2028            .await
2029            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2030        let stream = df
2031            .execute_stream()
2032            .await
2033            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
2034
2035        let _ = self.ctx.deregister_table("__asof_result");
2036
2037        Ok(self.bridge_query_stream(original_sql, stream))
2038    }
2039
2040    /// Get a typed source handle for pushing data.
2041    ///
2042    /// The source must have been created via `CREATE SOURCE`.
2043    ///
2044    /// # Errors
2045    ///
2046    /// Returns `DbError::SourceNotFound` if the source is not registered.
2047    /// Returns `DbError::SchemaMismatch` if the Rust type's schema does not
2048    /// match the source's SQL schema.
2049    pub fn source<T: laminar_core::streaming::Record>(
2050        &self,
2051        name: &str,
2052    ) -> Result<SourceHandle<T>, DbError> {
2053        let entry = self
2054            .catalog
2055            .get_source(name)
2056            .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
2057        SourceHandle::new(entry)
2058    }
2059
2060    /// Get an untyped source handle for pushing `RecordBatch` data.
2061    ///
2062    /// # Errors
2063    ///
2064    /// Returns `DbError::SourceNotFound` if the source is not registered.
2065    pub fn source_untyped(&self, name: &str) -> Result<UntypedSourceHandle, DbError> {
2066        let entry = self
2067            .catalog
2068            .get_source(name)
2069            .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
2070        Ok(UntypedSourceHandle::new(entry))
2071    }
2072
2073    /// List all registered sources.
2074    pub fn sources(&self) -> Vec<SourceInfo> {
2075        let names = self.catalog.list_sources();
2076        names
2077            .into_iter()
2078            .filter_map(|name| {
2079                self.catalog.get_source(&name).map(|e| SourceInfo {
2080                    name: e.name.clone(),
2081                    schema: e.schema.clone(),
2082                    watermark_column: e.watermark_column.clone(),
2083                })
2084            })
2085            .collect()
2086    }
2087
2088    /// List all registered sinks.
2089    pub fn sinks(&self) -> Vec<SinkInfo> {
2090        self.catalog
2091            .list_sinks()
2092            .into_iter()
2093            .map(|name| SinkInfo { name })
2094            .collect()
2095    }
2096
2097    /// List all registered materialized views with their SQL and state.
2098    pub fn materialized_views(&self) -> Vec<crate::handle::MaterializedViewInfo> {
2099        let registry = self.mv_registry.lock();
2100        registry
2101            .views()
2102            .map(crate::handle::MaterializedViewInfo::from)
2103            .collect()
2104    }
2105
2106    /// List all registered streams with their SQL definitions.
2107    pub fn streams(&self) -> Vec<crate::handle::StreamInfo> {
2108        let mgr = self.connector_manager.lock();
2109        mgr.streams()
2110            .iter()
2111            .map(|(name, reg)| crate::handle::StreamInfo {
2112                name: name.clone(),
2113                sql: Some(reg.query_sql.clone()),
2114            })
2115            .collect()
2116    }
2117
2118    /// Build the pipeline topology graph from registered sources, streams,
2119    /// and sinks.
2120    ///
2121    /// Returns a `PipelineTopology` with nodes for every source, stream,
2122    /// and sink, plus edges derived from stream SQL `FROM` references and
2123    /// sink `input` fields.
2124    pub fn pipeline_topology(&self) -> crate::handle::PipelineTopology {
2125        use crate::handle::{PipelineEdge, PipelineNode, PipelineNodeType};
2126
2127        let mut nodes = Vec::new();
2128        let mut edges = Vec::new();
2129
2130        // Collect source names for FROM matching
2131        let source_names = self.catalog.list_sources();
2132
2133        // Source nodes
2134        for name in &source_names {
2135            let schema = self.catalog.get_source(name).map(|e| e.schema.clone());
2136            nodes.push(PipelineNode {
2137                name: name.clone(),
2138                node_type: PipelineNodeType::Source,
2139                schema,
2140                sql: None,
2141            });
2142        }
2143
2144        // Stream nodes + edges from SQL FROM references
2145        let mgr = self.connector_manager.lock();
2146        let stream_names: Vec<String> = mgr.streams().keys().cloned().collect();
2147        for (name, reg) in mgr.streams() {
2148            nodes.push(PipelineNode {
2149                name: name.clone(),
2150                node_type: PipelineNodeType::Stream,
2151                schema: None,
2152                sql: Some(reg.query_sql.clone()),
2153            });
2154
2155            // Extract FROM references by checking which known sources/streams
2156            // appear in the query SQL. This is a lightweight heuristic that
2157            // avoids a full SQL parse.
2158            let sql_upper = reg.query_sql.to_uppercase();
2159            for src in &source_names {
2160                if sql_upper.contains(&src.to_uppercase()) {
2161                    edges.push(PipelineEdge {
2162                        from: src.clone(),
2163                        to: name.clone(),
2164                    });
2165                }
2166            }
2167            // Also check for stream-to-stream references (cascading)
2168            for other in &stream_names {
2169                if other != name && sql_upper.contains(&other.to_uppercase()) {
2170                    edges.push(PipelineEdge {
2171                        from: other.clone(),
2172                        to: name.clone(),
2173                    });
2174                }
2175            }
2176        }
2177
2178        // Sink nodes + edges from input field
2179        for (name, reg) in mgr.sinks() {
2180            nodes.push(PipelineNode {
2181                name: name.clone(),
2182                node_type: PipelineNodeType::Sink,
2183                schema: None,
2184                sql: None,
2185            });
2186
2187            // Sinks read from their `input` field
2188            if !reg.input.is_empty() {
2189                edges.push(PipelineEdge {
2190                    from: reg.input.clone(),
2191                    to: name.clone(),
2192                });
2193            }
2194        }
2195
2196        // Also add catalog-only sinks (no connector type) that aren't
2197        // already in the connector manager
2198        let cm_sink_names: std::collections::HashSet<&String> = mgr.sinks().keys().collect();
2199        for name in self.catalog.list_sinks() {
2200            if !cm_sink_names.contains(&name) {
2201                // Check if there's a sink entry in the catalog with input info
2202                if let Some(input) = self.catalog.get_sink_input(&name) {
2203                    nodes.push(PipelineNode {
2204                        name: name.clone(),
2205                        node_type: PipelineNodeType::Sink,
2206                        schema: None,
2207                        sql: None,
2208                    });
2209                    if !input.is_empty() {
2210                        edges.push(PipelineEdge {
2211                            from: input,
2212                            to: name,
2213                        });
2214                    }
2215                }
2216            }
2217        }
2218
2219        drop(mgr);
2220
2221        crate::handle::PipelineTopology { nodes, edges }
2222    }
2223
2224    /// List all active queries.
2225    pub fn queries(&self) -> Vec<QueryInfo> {
2226        self.catalog
2227            .list_queries()
2228            .into_iter()
2229            .map(|(id, sql, active)| QueryInfo { id, sql, active })
2230            .collect()
2231    }
2232
2233    /// Returns whether streaming checkpointing is enabled.
2234    #[must_use]
2235    pub fn is_checkpoint_enabled(&self) -> bool {
2236        self.config.checkpoint.is_some()
2237    }
2238
2239    /// Returns a checkpoint store instance, if checkpointing is configured.
2240    ///
2241    /// Returns an [`ObjectStoreCheckpointStore`](laminar_core::storage::ObjectStoreCheckpointStore)
2242    /// when `object_store_url` is set, otherwise a
2243    /// [`FileSystemCheckpointStore`](laminar_core::storage::FileSystemCheckpointStore).
2244    pub fn checkpoint_store(&self) -> Option<Box<dyn laminar_core::storage::CheckpointStore>> {
2245        let cp_config = self.config.checkpoint.as_ref()?;
2246        let max_retained = cp_config.max_retained.unwrap_or(3);
2247        // Pass the runtime vnode count through so manifest validation
2248        // checks against the real invariant, not a hardcoded default.
2249        let vnode_count = self.vnode_registry.lock().as_ref().map_or(
2250            laminar_core::storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
2251            |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
2252        );
2253
2254        if let Some(ref url) = self.config.object_store_url {
2255            // The builder roots the store at the URL's path prefix.
2256            let obj_store = laminar_core::storage::object_store_builder::build_object_store(
2257                url,
2258                &self.config.object_store_options,
2259            )
2260            .ok()?;
2261            Some(Box::new(
2262                laminar_core::storage::checkpoint_store::ObjectStoreCheckpointStore::new(
2263                    obj_store,
2264                    String::new(),
2265                    max_retained,
2266                )
2267                .with_vnode_count(vnode_count),
2268            ))
2269        } else {
2270            let data_dir = cp_config
2271                .data_dir
2272                .clone()
2273                .or_else(|| self.config.storage_dir.clone())
2274                .unwrap_or_else(|| std::path::PathBuf::from("./data"));
2275            Some(Box::new(
2276                laminar_core::storage::checkpoint_store::FileSystemCheckpointStore::new(
2277                    &data_dir,
2278                    max_retained,
2279                )
2280                .with_vnode_count(vnode_count),
2281            ))
2282        }
2283    }
2284
2285    /// Triggers a streaming checkpoint that persists source offsets, sink
2286    /// positions, and operator state to disk via the
2287    /// [`CheckpointCoordinator`](crate::checkpoint_coordinator::CheckpointCoordinator).
2288    ///
2289    /// Returns the checkpoint result on success, including the checkpoint ID,
2290    /// epoch, and duration.
2291    ///
2292    /// # Errors
2293    ///
2294    /// Returns `DbError::Checkpoint` if checkpointing is not enabled, the
2295    /// coordinator has not been initialized (call `start()` first), or the
2296    /// checkpoint operation fails.
2297    pub async fn checkpoint(
2298        &self,
2299    ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
2300        if self.config.checkpoint.is_none() {
2301            return Err(DbError::Checkpoint(
2302                "checkpointing is not enabled".to_string(),
2303            ));
2304        }
2305
2306        #[cfg(feature = "cluster")]
2307        {
2308            let leader_opt = {
2309                let cc_guard = self.cluster_controller.lock();
2310                cc_guard.as_ref().and_then(|cc| {
2311                    if cc.is_leader() {
2312                        None
2313                    } else {
2314                        cc.current_leader().and_then(|leader_id| {
2315                            let watch = cc.members_watch();
2316                            let members = watch.borrow();
2317                            members
2318                                .iter()
2319                                .find(|m| m.id == leader_id)
2320                                .map(|m| m.rpc_address.clone())
2321                        })
2322                    }
2323                })
2324            };
2325            if let Some(leader_rpc) = leader_opt {
2326                tracing::info!(
2327                    "Forwarding checkpoint request to leader node at HTTP address {}",
2328                    leader_rpc
2329                );
2330                return self.forward_checkpoint_to_leader(&leader_rpc).await;
2331            }
2332        }
2333
2334        // When the streaming pipeline is live, route through the
2335        // pipeline callback so it captures operator state (via the same
2336        // path that the periodic checkpoint timer uses). Without this,
2337        // the manifest has an empty `operator_states` map and restart
2338        // loses everything the `IncrementalAggState` accumulators held.
2339        let tx = self.force_ckpt_tx.lock().clone();
2340        let result = if let Some(tx) = tx {
2341            let (reply_tx, reply_rx) = crossfire::oneshot::oneshot();
2342            tx.send(reply_tx).await.map_err(|_| {
2343                DbError::Checkpoint(
2344                    "pipeline callback receiver closed — engine may be shutting down".into(),
2345                )
2346            })?;
2347            reply_rx.await.map_err(|_| {
2348                DbError::Checkpoint("pipeline callback dropped oneshot before replying".into())
2349            })?
2350        } else {
2351            // Fallback: no running pipeline (e.g., engine built but not yet
2352            // started). Drive the coordinator directly. Operator state will
2353            // be empty, but restart from this manifest is still well-defined
2354            // because there's nothing to restore anyway.
2355            let mut guard = self.coordinator.lock().await;
2356            let coord = guard.as_mut().ok_or_else(|| {
2357                DbError::Checkpoint("coordinator not initialized — call start() first".to_string())
2358            })?;
2359            coord
2360                .checkpoint(crate::checkpoint_coordinator::CheckpointRequest::default())
2361                .await
2362        };
2363
2364        // Refresh the shared catalog manifest on every successful checkpoint so
2365        // a node joining later can rebuild this node's MVs/sources. Best-effort.
2366        #[cfg(feature = "cluster")]
2367        if matches!(&result, Ok(r) if r.success) {
2368            self.persist_catalog_manifest().await;
2369        }
2370
2371        result
2372    }
2373
2374    #[cfg(feature = "cluster")]
2375    async fn forward_checkpoint_to_leader(
2376        &self,
2377        addr: &str,
2378    ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
2379        #[derive(serde::Deserialize)]
2380        struct ForwardedCheckpointResponse {
2381            success: bool,
2382            checkpoint_id: u64,
2383            epoch: u64,
2384            duration_ms: u64,
2385            error: Option<String>,
2386        }
2387
2388        let mut req = reqwest::Client::new()
2389            .post(format!("http://{addr}/api/v1/checkpoint"))
2390            .timeout(std::time::Duration::from_secs(10));
2391        if let Some(token) = &self.config.http_auth_token {
2392            req = req.bearer_auth(token.expose());
2393        }
2394        let resp = req.send().await.map_err(|e| {
2395            DbError::Checkpoint(format!(
2396                "failed to forward checkpoint to leader at {addr}: {e}"
2397            ))
2398        })?;
2399
2400        let status = resp.status();
2401        let body = resp.text().await.map_err(|e| {
2402            DbError::Checkpoint(format!("failed to read leader checkpoint response: {e}"))
2403        })?;
2404
2405        // The leader returns a `CheckpointResponse` body even when the
2406        // checkpoint itself failed (HTTP 500 + `success: false`), so parse it so
2407        // that structured failure reaches the follower. A body that isn't a
2408        // `CheckpointResponse` (e.g. a 401 error payload) is an auth/transport
2409        // failure — surface the status and body instead.
2410        match serde_json::from_str::<ForwardedCheckpointResponse>(&body) {
2411            Ok(response) => Ok(crate::checkpoint_coordinator::CheckpointResult {
2412                success: response.success,
2413                checkpoint_id: response.checkpoint_id,
2414                epoch: response.epoch,
2415                duration: std::time::Duration::from_millis(response.duration_ms),
2416                error: response.error,
2417            }),
2418            Err(_) => Err(DbError::Checkpoint(format!(
2419                "leader rejected checkpoint ({status}): {body}"
2420            ))),
2421        }
2422    }
2423
2424    /// Returns checkpoint performance statistics.
2425    ///
2426    /// Returns `None` if the checkpoint coordinator has not been initialized.
2427    pub async fn checkpoint_stats(&self) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
2428        let guard = self.coordinator.lock().await;
2429        guard
2430            .as_ref()
2431            .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
2432    }
2433}
2434
2435impl std::fmt::Debug for LaminarDB {
2436    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2437        f.debug_struct("LaminarDB")
2438            .field("sources", &self.catalog.list_sources().len())
2439            .field("sinks", &self.catalog.list_sinks().len())
2440            .field("materialized_views", &self.mv_registry.lock().len())
2441            .field("checkpoint_enabled", &self.is_checkpoint_enabled())
2442            .field("shutdown", &self.is_closed())
2443            .finish_non_exhaustive()
2444    }
2445}
2446
2447/// Wraps `DefaultPhysicalPlanner` with lookup join extension support.
2448struct LookupQueryPlanner {
2449    extension_planner: Arc<dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync>,
2450}
2451
2452impl std::fmt::Debug for LookupQueryPlanner {
2453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2454        f.debug_struct("LookupQueryPlanner").finish_non_exhaustive()
2455    }
2456}
2457
2458#[async_trait::async_trait]
2459impl datafusion::execution::context::QueryPlanner for LookupQueryPlanner {
2460    async fn create_physical_plan(
2461        &self,
2462        logical_plan: &datafusion::logical_expr::LogicalPlan,
2463        session_state: &datafusion::execution::SessionState,
2464    ) -> datafusion_common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
2465        use datafusion::physical_planner::PhysicalPlanner;
2466        let planner =
2467            datafusion::physical_planner::DefaultPhysicalPlanner::with_extension_planners(vec![
2468                Arc::clone(&self.extension_planner),
2469            ]);
2470        planner
2471            .create_physical_plan(logical_plan, session_state)
2472            .await
2473    }
2474}
2475
2476/// Serves a node's local MV / reference-table rows to peers (pull path). Weak
2477/// store handles, so it never keeps the database alive.
2478#[cfg(feature = "cluster")]
2479struct DbQueryHandler {
2480    mv_store: std::sync::Weak<parking_lot::RwLock<crate::mv_store::MvStore>>,
2481    table_store: std::sync::Weak<parking_lot::RwLock<crate::table_store::TableStore>>,
2482    /// Isolated context for compiling a pushed `filter_sql` (only the temp table
2483    /// is visible), so a crafted predicate can't reference other tables.
2484    filter_ctx: SessionContext,
2485}
2486
2487#[cfg(feature = "cluster")]
2488#[async_trait::async_trait]
2489impl laminar_core::cluster::control::RemoteQueryHandler for DbQueryHandler {
2490    async fn remote_scan(
2491        &self,
2492        table_name: &str,
2493        projection: Option<Vec<usize>>,
2494        filter_sql: Option<String>,
2495    ) -> Result<arrow::array::RecordBatch, String> {
2496        let batch = self
2497            .mv_store
2498            .upgrade()
2499            .and_then(|s| s.read().to_record_batch(table_name))
2500            .or_else(|| {
2501                self.table_store
2502                    .upgrade()
2503                    .and_then(|s| s.read().to_record_batch(table_name))
2504            })
2505            .ok_or_else(|| format!("table '{table_name}' not found"))?;
2506
2507        // Apply the pushed predicate before projecting (it may reference dropped
2508        // columns); on any failure skip it — the coordinator re-applies it.
2509        let batch = match filter_sql {
2510            Some(sql) => {
2511                let schema = batch.schema();
2512                match crate::filter_compile::compile(&self.filter_ctx, &sql, &schema).await {
2513                    Ok(expr) => match crate::filter_compile::apply(&batch, expr.as_ref()) {
2514                        Ok(Some(filtered)) => filtered,
2515                        Ok(None) => arrow::array::RecordBatch::new_empty(schema),
2516                        Err(e) => {
2517                            tracing::debug!(table = table_name, error = %e,
2518                                "remote_scan: skipping pushed filter (apply failed)");
2519                            batch
2520                        }
2521                    },
2522                    Err(e) => {
2523                        tracing::debug!(table = table_name, error = %e,
2524                            "remote_scan: skipping pushed filter (compile failed)");
2525                        batch
2526                    }
2527                }
2528            }
2529            None => batch,
2530        };
2531
2532        match projection {
2533            Some(proj) => batch.project(&proj).map_err(|e| e.to_string()),
2534            None => Ok(batch),
2535        }
2536    }
2537}
2538
2539#[cfg(test)]
2540mod tests;