Skip to main content

laminar_db/
db.rs

1//! The main `LaminarDB` database facade.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow::array::{RecordBatch, StringArray};
8use arrow::datatypes::{DataType, Field, Schema};
9use datafusion::prelude::SessionContext;
10use laminar_core::streaming;
11use laminar_sql::parser::{parse_streaming_sql, ShowCommand, StreamingStatement};
12use laminar_sql::planner::StreamingPlanner;
13use laminar_sql::register_streaming_functions;
14use laminar_sql::translator::{AsofJoinTranslatorConfig, JoinOperatorConfig};
15
16use crate::builder::LaminarDbBuilder;
17use crate::catalog::SourceCatalog;
18use crate::config::LaminarConfig;
19use crate::error::DbError;
20use crate::handle::{
21    DdlInfo, ExecuteResult, QueryHandle, QueryInfo, SinkInfo, SourceHandle, SourceInfo,
22    UntypedSourceHandle,
23};
24use crate::pipeline::ControlMsg;
25use crate::pipeline_lifecycle::url_to_checkpoint_prefix;
26use crate::sql_utils;
27
28/// Cloneable async sender for the live-DDL control channel.
29pub(crate) type ControlMsgTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ControlMsg>>;
30
31/// Lifecycle state of a [`LaminarDB`] instance.
32///
33/// Stored as `AtomicU8` on [`LaminarDB::state`] so transitions are
34/// lock-free; the repr is fixed so `as u8` / `TryFrom<u8>` give the
35/// same bytes the atomic sees.
36#[repr(u8)]
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub(crate) enum DbState {
39    Created = 0,
40    Starting = 1,
41    Running = 2,
42    ShuttingDown = 3,
43    Stopped = 4,
44}
45
46impl DbState {
47    /// Decode a byte previously stored into the atomic. Returns `None`
48    /// if the byte isn't a known variant — always a bug, because only
49    /// `Self::store_into` / `DbState::* as u8` write this atomic.
50    pub(crate) fn from_u8(raw: u8) -> Option<Self> {
51        Some(match raw {
52            0 => Self::Created,
53            1 => Self::Starting,
54            2 => Self::Running,
55            3 => Self::ShuttingDown,
56            4 => Self::Stopped,
57            _ => return None,
58        })
59    }
60}
61
62// Compat aliases — remaining call sites still read/write raw bytes via
63// the atomic. The enum variants and these constants share the same
64// byte values (`#[repr(u8)]` above), so both work until the mechanical
65// rewrite lands.
66pub(crate) const STATE_CREATED: u8 = DbState::Created as u8;
67pub(crate) const STATE_STARTING: u8 = DbState::Starting as u8;
68pub(crate) const STATE_RUNNING: u8 = DbState::Running as u8;
69pub(crate) const STATE_SHUTTING_DOWN: u8 = DbState::ShuttingDown as u8;
70pub(crate) const STATE_STOPPED: u8 = DbState::Stopped as u8;
71
72fn cache_entries_from_memory(mem: laminar_sql::parser::lookup_table::ByteSize) -> usize {
73    (mem.as_bytes() / 256).max(1024) as usize
74}
75
76/// The main `LaminarDB` database handle.
77///
78/// Provides a unified interface for SQL execution, data ingestion,
79/// and result consumption. All streaming infrastructure (sources, sinks,
80/// channels, subscriptions) is managed internally.
81pub struct LaminarDB {
82    pub(crate) catalog: Arc<SourceCatalog>,
83    pub(crate) planner: parking_lot::Mutex<StreamingPlanner>,
84    pub(crate) ctx: SessionContext,
85    pub(crate) config: LaminarConfig,
86    pub(crate) config_vars: Arc<HashMap<String, String>>,
87    pub(crate) shutdown: std::sync::atomic::AtomicBool,
88    /// Unified checkpoint coordinator (populated by `start()`).
89    pub(crate) coordinator:
90        Arc<tokio::sync::Mutex<Option<crate::checkpoint_coordinator::CheckpointCoordinator>>>,
91    pub(crate) connector_manager: parking_lot::Mutex<crate::connector_manager::ConnectorManager>,
92    pub(crate) connector_registry: Arc<laminar_connectors::registry::ConnectorRegistry>,
93    pub(crate) mv_registry: parking_lot::Mutex<laminar_core::mv::MvRegistry>,
94    pub(crate) table_store: Arc<parking_lot::RwLock<crate::table_store::TableStore>>,
95    pub(crate) state: Arc<std::sync::atomic::AtomicU8>,
96    /// Handle to the background processing task (if running).
97    pub(crate) runtime_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
98    /// Signal to stop the processing loop.
99    pub(crate) shutdown_signal: Arc<tokio::sync::Notify>,
100    /// Prometheus engine metrics. `None` until `set_engine_metrics()` is called.
101    pub(crate) engine_metrics:
102        parking_lot::Mutex<Option<Arc<crate::engine_metrics::EngineMetrics>>>,
103    /// Shared Prometheus registry. `None` until `set_prometheus_registry()` is called.
104    pub(crate) prometheus_registry: parking_lot::Mutex<Option<Arc<prometheus::Registry>>>,
105    /// Instant when the database was created, for uptime calculation.
106    pub(crate) start_time: std::time::Instant,
107    /// Session properties set via `SET key = value`.
108    pub(crate) session_properties: parking_lot::Mutex<HashMap<String, String>>,
109    /// Global pipeline watermark (min of all source watermarks).
110    pub(crate) pipeline_watermark: Arc<std::sync::atomic::AtomicI64>,
111    /// Shared lookup table registry for physical planning of lookup joins.
112    pub(crate) lookup_registry: Arc<laminar_sql::datafusion::LookupTableRegistry>,
113    /// Control channel sender for live DDL to the running coordinator.
114    /// `None` before `start()` or after `shutdown()`.
115    pub(crate) control_tx: parking_lot::Mutex<Option<ControlMsgTx>>,
116    /// Materialized view result store (shared with compute thread and query threads).
117    pub(crate) mv_store: Arc<parking_lot::RwLock<crate::mv_store::MvStore>>,
118    /// Cluster control facade, installed by `LaminarDbBuilder::cluster_controller`.
119    /// `None` in embedded / single-instance mode; `Some` activates the
120    /// leader / follower checkpoint flow when the coordinator starts.
121    #[cfg(feature = "cluster-unstable")]
122    pub(crate) cluster_controller:
123        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::ClusterController>>>,
124    /// State backend for durability-gate participation. Installed by
125    /// `LaminarDbBuilder::state_backend`. When paired with a
126    /// `vnode_registry` the coordinator writes per-vnode markers each
127    /// checkpoint and the gate runs.
128    pub(crate) state_backend:
129        parking_lot::Mutex<Option<Arc<dyn laminar_core::state::StateBackend>>>,
130    /// Vnode topology + assignment. Installed by
131    /// `LaminarDbBuilder::vnode_registry`. Needed for the coordinator to
132    /// know which vnodes this instance owns.
133    pub(crate) vnode_registry: parking_lot::Mutex<Option<Arc<laminar_core::state::VnodeRegistry>>>,
134    /// Extra physical optimizer rules from the builder, applied to both
135    /// `self.ctx` and the pipeline-side `OperatorGraph` context.
136    pub(crate) physical_optimizer_rules:
137        Arc<[Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>]>,
138    /// `target_partitions` override from the builder, mirrored into the
139    /// pipeline-side `SessionContext`.
140    pub(crate) pipeline_target_partitions: Option<usize>,
141    /// Outbound shuffle handle. Installed via `LaminarDbBuilder::shuffle_sender`;
142    /// used by `SqlQueryOperator` to row-shuffle pre-aggregate batches to vnode
143    /// owners.
144    #[cfg(feature = "cluster-unstable")]
145    pub(crate) shuffle_sender:
146        parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleSender>>>,
147    /// Inbound shuffle handle. Installed via `LaminarDbBuilder::shuffle_receiver`.
148    #[cfg(feature = "cluster-unstable")]
149    pub(crate) shuffle_receiver:
150        parking_lot::Mutex<Option<Arc<laminar_core::shuffle::ShuffleReceiver>>>,
151    /// Shared commit-marker store. Installed via the builder.
152    #[cfg(feature = "cluster-unstable")]
153    pub(crate) decision_store:
154        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::CheckpointDecisionStore>>>,
155    /// Shared vnode-assignment snapshot store. Installed via the
156    /// builder; the rebalance tasks (spawned by the caller) watch it.
157    #[cfg(feature = "cluster-unstable")]
158    pub(crate) assignment_snapshot_store:
159        parking_lot::Mutex<Option<Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>>,
160    /// Forwards `db.checkpoint()` requests to the running pipeline so
161    /// the callback captures operator state before the manifest is
162    /// packed. `None` before `start()` or after `shutdown()`; when
163    /// `None`, `db.checkpoint()` falls back to the direct coordinator
164    /// path (only valid when the engine has no stateful operators).
165    pub(crate) force_ckpt_tx: parking_lot::Mutex<Option<ForceCheckpointTx>>,
166}
167
168/// Oneshot reply carrying the full `CheckpointResult` back to the
169/// caller of `db.checkpoint()`. Created per-request.
170///
171/// Uses crossfire's oneshot (matches `sink_task`) rather than
172/// `tokio::sync::oneshot` so the whole checkpoint plumbing uses a
173/// consistent primitive family.
174pub(crate) type ForceCheckpointReply =
175    crossfire::oneshot::TxOneshot<Result<crate::checkpoint_coordinator::CheckpointResult, DbError>>;
176
177/// Channel used by `db.checkpoint()` to hand a reply sender to the
178/// running pipeline callback. Bounded at 64 — cold path (one item per
179/// manual `db.checkpoint()` call); the cap is generous enough that the
180/// caller is never expected to wait.
181pub(crate) type ForceCheckpointTx =
182    crossfire::MAsyncTx<crossfire::mpsc::Array<ForceCheckpointReply>>;
183
184/// Paired receive-side of [`ForceCheckpointTx`], held by
185/// `ConnectorPipelineCallback`.
186pub(crate) type ForceCheckpointRx =
187    crossfire::AsyncRx<crossfire::mpsc::Array<ForceCheckpointReply>>;
188
189/// Capacity of the force-checkpoint request channel.
190pub(crate) const FORCE_CHECKPOINT_CHANNEL_CAPACITY: usize = 64;
191
192pub(crate) struct SourceWatermarkState {
193    pub(crate) extractor: laminar_core::time::EventTimeExtractor,
194    pub(crate) generator: Box<dyn laminar_core::time::WatermarkGenerator>,
195    pub(crate) column: String,
196}
197
198pub(crate) fn filter_late_rows(
199    batch: &RecordBatch,
200    column: &str,
201    watermark: i64,
202) -> Option<RecordBatch> {
203    match laminar_core::time::filter_batch_by_timestamp(
204        batch,
205        column,
206        watermark,
207        laminar_core::time::ThresholdOp::GreaterEq,
208    ) {
209        Ok(out) => out,
210        Err(e) => {
211            // Schema drift — drop the batch rather than silently admit late rows.
212            tracing::error!(%column, error = %e, "filter_late_rows: dropping batch");
213            None
214        }
215    }
216}
217
218pub(crate) use laminar_core::time::parse_duration_str;
219
220impl LaminarDB {
221    /// Create an embedded in-memory database with default settings.
222    ///
223    /// # Errors
224    ///
225    /// Returns `DbError` if `DataFusion` context creation fails.
226    pub fn open() -> Result<Self, DbError> {
227        Self::open_with_config(LaminarConfig::default())
228    }
229
230    /// Create with custom configuration.
231    ///
232    /// # Errors
233    ///
234    /// Returns `DbError` if `DataFusion` context creation fails.
235    pub fn open_with_config(config: LaminarConfig) -> Result<Self, DbError> {
236        Self::open_with_config_and_vars(config, HashMap::new())
237    }
238
239    /// Create with custom configuration and config variables for SQL substitution.
240    ///
241    /// # Errors
242    ///
243    /// Returns `DbError` if `DataFusion` context creation fails.
244    #[allow(clippy::unnecessary_wraps)]
245    pub(crate) fn open_with_config_and_vars(
246        config: LaminarConfig,
247        config_vars: HashMap<String, String>,
248    ) -> Result<Self, DbError> {
249        Self::open_with_config_and_vars_and_rules(config, config_vars, &[], None)
250    }
251
252    /// Same as [`Self::open_with_config_and_vars`] but also installs
253    /// the given physical-optimizer rules on the `DataFusion` session.
254    #[allow(clippy::unnecessary_wraps)]
255    pub(crate) fn open_with_config_and_vars_and_rules(
256        config: LaminarConfig,
257        config_vars: HashMap<String, String>,
258        extra_optimizer_rules: &[Arc<
259            dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
260        >],
261        target_partitions: Option<usize>,
262    ) -> Result<Self, DbError> {
263        // One-time crossfire backoff tuning. No-op on multi-core; on single-core
264        // VMs this swaps spin-loops for yields (~2x channel throughput).
265        // Idempotent via an internal atomic — safe to call on every instance.
266        crossfire::detect_backoff_cfg();
267
268        let lookup_registry = Arc::new(laminar_sql::datafusion::LookupTableRegistry::new());
269
270        // Build a SessionContext with the LookupJoinExtensionPlanner wired
271        // into the physical planner so LookupJoinNode → LookupJoinExec works.
272        let ctx = {
273            let mut session_config = laminar_sql::datafusion::base_session_config();
274            if let Some(n) = target_partitions {
275                session_config = session_config.with_target_partitions(n);
276            }
277            let extension_planner: Arc<
278                dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync,
279            > = Arc::new(laminar_sql::datafusion::LookupJoinExtensionPlanner::new(
280                Arc::clone(&lookup_registry),
281            ));
282            let query_planner: Arc<dyn datafusion::execution::context::QueryPlanner + Send + Sync> =
283                Arc::new(LookupQueryPlanner { extension_planner });
284            let mut state_builder = datafusion::execution::SessionStateBuilder::new()
285                .with_config(session_config)
286                .with_default_features()
287                .with_query_planner(query_planner);
288            for rule in extra_optimizer_rules {
289                state_builder = state_builder.with_physical_optimizer_rule(Arc::clone(rule));
290            }
291            SessionContext::new_with_state(state_builder.build())
292        };
293        register_streaming_functions(&ctx);
294
295        let catalog = Arc::new(SourceCatalog::new(
296            config.default_buffer_size,
297            config.default_backpressure,
298        ));
299
300        let connector_registry = Arc::new(laminar_connectors::registry::ConnectorRegistry::new());
301        Self::register_builtin_connectors(&connector_registry);
302
303        Ok(Self {
304            catalog,
305            planner: parking_lot::Mutex::new(StreamingPlanner::new()),
306            ctx,
307            config,
308            config_vars: Arc::new(config_vars),
309            shutdown: std::sync::atomic::AtomicBool::new(false),
310            coordinator: Arc::new(tokio::sync::Mutex::new(None)),
311            connector_manager: parking_lot::Mutex::new(
312                crate::connector_manager::ConnectorManager::new(),
313            ),
314            connector_registry,
315            mv_registry: parking_lot::Mutex::new(laminar_core::mv::MvRegistry::new()),
316            table_store: Arc::new(parking_lot::RwLock::new(
317                crate::table_store::TableStore::new(),
318            )),
319            state: Arc::new(std::sync::atomic::AtomicU8::new(STATE_CREATED)),
320            runtime_handle: parking_lot::Mutex::new(None),
321            shutdown_signal: Arc::new(tokio::sync::Notify::new()),
322            engine_metrics: parking_lot::Mutex::new(None),
323            prometheus_registry: parking_lot::Mutex::new(None),
324            start_time: std::time::Instant::now(),
325            session_properties: parking_lot::Mutex::new(HashMap::new()),
326            pipeline_watermark: Arc::new(std::sync::atomic::AtomicI64::new(i64::MIN)),
327            lookup_registry,
328            control_tx: parking_lot::Mutex::new(None),
329            mv_store: Arc::new(parking_lot::RwLock::new(crate::mv_store::MvStore::new())),
330            #[cfg(feature = "cluster-unstable")]
331            cluster_controller: parking_lot::Mutex::new(None),
332            state_backend: parking_lot::Mutex::new(None),
333            vnode_registry: parking_lot::Mutex::new(None),
334            physical_optimizer_rules: extra_optimizer_rules.to_vec().into(),
335            pipeline_target_partitions: target_partitions,
336            #[cfg(feature = "cluster-unstable")]
337            shuffle_sender: parking_lot::Mutex::new(None),
338            #[cfg(feature = "cluster-unstable")]
339            shuffle_receiver: parking_lot::Mutex::new(None),
340            #[cfg(feature = "cluster-unstable")]
341            decision_store: parking_lot::Mutex::new(None),
342            #[cfg(feature = "cluster-unstable")]
343            assignment_snapshot_store: parking_lot::Mutex::new(None),
344            force_ckpt_tx: parking_lot::Mutex::new(None),
345        })
346    }
347
348    /// Install the outbound shuffle handle used by cluster-mode streaming
349    /// aggregates to route pre-aggregate rows to vnode owners. Called by
350    /// [`LaminarDbBuilder::shuffle_sender`]. Must be set before `start()`
351    /// to take effect.
352    #[cfg(feature = "cluster-unstable")]
353    pub(crate) fn set_shuffle_sender(&self, sender: Arc<laminar_core::shuffle::ShuffleSender>) {
354        *self.shuffle_sender.lock() = Some(sender);
355    }
356
357    /// Install the inbound shuffle handle. Pair with
358    /// [`Self::set_shuffle_sender`]; neither alone is a no-op.
359    #[cfg(feature = "cluster-unstable")]
360    pub(crate) fn set_shuffle_receiver(
361        &self,
362        receiver: Arc<laminar_core::shuffle::ShuffleReceiver>,
363    ) {
364        *self.shuffle_receiver.lock() = Some(receiver);
365    }
366
367    /// Install the commit-marker store. Must be called before `start()`.
368    #[cfg(feature = "cluster-unstable")]
369    pub(crate) fn set_decision_store(
370        &self,
371        store: Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
372    ) {
373        *self.decision_store.lock() = Some(store);
374    }
375
376    /// Install the assignment snapshot store. Must be called before `start()`.
377    #[cfg(feature = "cluster-unstable")]
378    pub(crate) fn set_assignment_snapshot_store(
379        &self,
380        store: Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
381    ) {
382        *self.assignment_snapshot_store.lock() = Some(store);
383    }
384
385    /// Adopt a new assignment snapshot: update the registry, backend
386    /// fence, and coordinator under the coordinator mutex so the
387    /// change lands strictly between checkpoints. Idempotent for
388    /// versions at or below the current registry version.
389    #[cfg(feature = "cluster-unstable")]
390    pub async fn adopt_assignment_snapshot(
391        &self,
392        snapshot: laminar_core::cluster::control::AssignmentSnapshot,
393    ) {
394        let Some(registry) = self.vnode_registry.lock().clone() else {
395            return;
396        };
397        if snapshot.version <= registry.assignment_version() {
398            return;
399        }
400        let vnode_count = registry.vnode_count();
401        let new_assignment: Arc<[laminar_core::state::NodeId]> =
402            snapshot.to_vnode_vec(vnode_count).into();
403
404        // Hold the coord mutex so the registry and fence update land
405        // between epochs from the coordinator's perspective.
406        let mut guard = self.coordinator.lock().await;
407        registry.set_assignment_and_version(new_assignment, snapshot.version);
408        if let Some(backend) = self.state_backend.lock().clone() {
409            backend.set_authoritative_version(snapshot.version);
410        }
411        if let Some(coord) = guard.as_mut() {
412            coord.set_assignment_version(snapshot.version);
413            let self_id = self
414                .cluster_controller
415                .lock()
416                .as_ref()
417                .map_or(laminar_core::state::NodeId(0), |c| {
418                    laminar_core::state::NodeId(c.instance_id().0)
419                });
420            coord.set_vnode_set(laminar_core::state::owned_vnodes(&registry, self_id));
421            coord.set_gate_vnode_set((0..vnode_count).collect());
422        }
423        drop(guard);
424
425        tracing::info!(version = snapshot.version, "adopted assignment snapshot",);
426    }
427
428    /// Install the cluster control facade. Called by
429    /// [`LaminarDbBuilder::cluster_controller`](crate::LaminarDbBuilder::cluster_controller)
430    /// before the pipeline starts; the `CheckpointCoordinator` picks
431    /// it up when constructed in `pipeline_lifecycle`.
432    #[cfg(feature = "cluster-unstable")]
433    pub(crate) fn set_cluster_controller(
434        &self,
435        controller: Arc<laminar_core::cluster::control::ClusterController>,
436    ) {
437        *self.cluster_controller.lock() = Some(controller);
438    }
439
440    /// Install a state backend so the checkpoint coordinator publishes
441    /// per-vnode durability markers and the gate runs. Pair with
442    /// [`Self::set_vnode_registry`]; either alone is a no-op.
443    pub(crate) fn set_state_backend(&self, backend: Arc<dyn laminar_core::state::StateBackend>) {
444        *self.state_backend.lock() = Some(backend);
445    }
446
447    /// Install a vnode registry so the checkpoint coordinator knows
448    /// which vnodes this instance owns. Pair with
449    /// [`Self::set_state_backend`]; either alone is a no-op.
450    pub(crate) fn set_vnode_registry(&self, registry: Arc<laminar_core::state::VnodeRegistry>) {
451        *self.vnode_registry.lock() = Some(registry);
452    }
453
454    /// Underlying `DataFusion` `SessionContext`. Primarily for tests
455    /// that need to compile SQL through the same session the engine
456    /// uses.
457    #[must_use]
458    pub fn session_context(&self) -> &SessionContext {
459        &self.ctx
460    }
461
462    /// Get a fluent builder for constructing a `LaminarDB`.
463    #[must_use]
464    pub fn builder() -> LaminarDbBuilder {
465        LaminarDbBuilder::new()
466    }
467
468    /// Register built-in connectors based on enabled features.
469    #[allow(unused_variables)]
470    fn register_builtin_connectors(registry: &laminar_connectors::registry::ConnectorRegistry) {
471        #[cfg(feature = "kafka")]
472        {
473            laminar_connectors::kafka::register_kafka_source(registry);
474            laminar_connectors::kafka::register_kafka_sink(registry);
475        }
476        #[cfg(feature = "postgres-cdc")]
477        {
478            laminar_connectors::cdc::postgres::register_postgres_cdc_source(registry);
479        }
480        #[cfg(feature = "postgres-sink")]
481        {
482            laminar_connectors::postgres::register_postgres_sink(registry);
483        }
484        #[cfg(feature = "delta-lake")]
485        {
486            laminar_connectors::lakehouse::register_delta_lake_sink(registry);
487            laminar_connectors::lakehouse::register_delta_lake_source(registry);
488        }
489        #[cfg(feature = "iceberg")]
490        {
491            laminar_connectors::lakehouse::register_iceberg_sink(registry);
492            laminar_connectors::lakehouse::register_iceberg_source(registry);
493        }
494        #[cfg(feature = "websocket")]
495        {
496            laminar_connectors::websocket::register_websocket_source(registry);
497            laminar_connectors::websocket::register_websocket_sink(registry);
498        }
499        #[cfg(feature = "mysql-cdc")]
500        {
501            laminar_connectors::cdc::mysql::register_mysql_cdc_source(registry);
502        }
503        #[cfg(feature = "mongodb-cdc")]
504        {
505            laminar_connectors::mongodb::register_mongodb_cdc_source(registry);
506            laminar_connectors::mongodb::register_mongodb_sink(registry);
507        }
508        #[cfg(feature = "files")]
509        {
510            laminar_connectors::files::register_file_source(registry);
511            laminar_connectors::files::register_file_sink(registry);
512        }
513        #[cfg(feature = "otel")]
514        {
515            laminar_connectors::otel::register_otel_source(registry);
516        }
517        #[cfg(feature = "nats")]
518        {
519            laminar_connectors::nats::register_nats_source(registry);
520            laminar_connectors::nats::register_nats_sink(registry);
521        }
522    }
523
524    /// Handle `CREATE LOOKUP TABLE` by registering the table in the
525    /// `TableStore`, `ConnectorManager`, `DataFusion` catalog, and lookup
526    /// registry.
527    fn handle_register_lookup_table(
528        &self,
529        info: laminar_sql::planner::LookupTableInfo,
530    ) -> Result<ExecuteResult, DbError> {
531        use laminar_sql::parser::lookup_table::ConnectorType;
532
533        if info.primary_key.len() != 1 {
534            return Err(DbError::InvalidOperation(
535                "Lookup table requires a single-column primary key".into(),
536            ));
537        }
538        let pk = info.primary_key[0].clone();
539
540        // Register in TableStore for PK-based upsert
541        let cache_mode = info.properties.cache_memory.map(|mem| {
542            let max_entries = cache_entries_from_memory(mem);
543            crate::table_cache_mode::TableCacheMode::Partial { max_entries }
544        });
545        if let Some(cache) = cache_mode {
546            self.table_store.write().create_table_with_cache(
547                &info.name,
548                info.arrow_schema.clone(),
549                &pk,
550                cache,
551            )?;
552        } else {
553            self.table_store
554                .write()
555                .create_table(&info.name, info.arrow_schema.clone(), &pk)?;
556        }
557
558        // For external connectors: register in ConnectorManager so
559        // start_connector_pipeline() handles snapshot + CDC loading
560        if !matches!(info.properties.connector, ConnectorType::Static) {
561            self.register_lookup_connector(&info, &pk)?;
562        }
563
564        // Register in DataFusion for SELECT/JOIN queries
565        {
566            let provider = crate::table_provider::ReferenceTableProvider::new(
567                info.name.clone(),
568                info.arrow_schema.clone(),
569                self.table_store.clone(),
570            );
571            let _ = self.ctx.deregister_table(&info.name);
572            self.ctx
573                .register_table(&info.name, Arc::new(provider))
574                .map_err(|e| {
575                    DbError::InvalidOperation(format!("Failed to register lookup table: {e}"))
576                })?;
577        }
578
579        // Register snapshot in the lookup registry so the physical
580        // planner can build LookupJoinExec nodes for JOIN queries.
581        if let Some(batch) = self.table_store.read().to_record_batch(&info.name) {
582            self.lookup_registry.register(
583                &info.name,
584                laminar_sql::datafusion::LookupSnapshot {
585                    batch,
586                    key_columns: info.primary_key.clone(),
587                },
588            );
589        }
590
591        // Register the logical optimizer rule so JOINs referencing
592        // this table are rewritten to LookupJoinNode.
593        self.refresh_lookup_optimizer_rule();
594
595        Ok(ExecuteResult::Ddl(DdlInfo {
596            statement_type: "CREATE LOOKUP TABLE".to_string(),
597            object_name: info.name,
598        }))
599    }
600
601    /// Register an external connector for a lookup table in the
602    /// `ConnectorManager` and `TableStore`.
603    #[allow(clippy::unnecessary_wraps)]
604    fn register_lookup_connector(
605        &self,
606        info: &laminar_sql::planner::LookupTableInfo,
607        pk: &str,
608    ) -> Result<(), DbError> {
609        use laminar_sql::parser::lookup_table::ConnectorType;
610
611        let connector_type_str = match &info.properties.connector {
612            ConnectorType::Postgres => "postgres",
613            ConnectorType::PostgresCdc => "postgres-cdc",
614            ConnectorType::MysqlCdc => "mysql-cdc",
615            ConnectorType::Redis => "redis",
616            ConnectorType::S3Parquet => "s3-parquet",
617            ConnectorType::DeltaLake => "delta-lake",
618            ConnectorType::Custom(s) => s.as_str(),
619            ConnectorType::Static => unreachable!(),
620        };
621
622        self.table_store
623            .write()
624            .set_connector(&info.name, connector_type_str);
625
626        let refresh = match info.properties.strategy {
627            laminar_sql::parser::lookup_table::LookupStrategy::Replicated
628            | laminar_sql::parser::lookup_table::LookupStrategy::Partitioned => {
629                // Standalone postgres uses snapshot-only (no CDC slot needed).
630                if matches!(info.properties.connector, ConnectorType::Postgres) {
631                    Some(laminar_connectors::reference::RefreshMode::SnapshotOnly)
632                } else {
633                    Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
634                }
635            }
636            laminar_sql::parser::lookup_table::LookupStrategy::OnDemand => {
637                Some(laminar_connectors::reference::RefreshMode::Manual)
638            }
639        };
640
641        // Build connector options and format options from raw WITH clause.
642        // Keys consumed by LookupTableProperties are excluded; keys starting
643        // with "format." are routed to format_options (prefix stripped).
644        let consumed = [
645            "connector",
646            "strategy",
647            "cache.memory",
648            "cache.disk",
649            "cache.ttl",
650            "pushdown",
651            "format",
652        ];
653        let mut connector_options = HashMap::with_capacity(info.raw_options.len());
654        let mut format_options = HashMap::with_capacity(4);
655        for (k, v) in &info.raw_options {
656            let lower = k.to_lowercase();
657            if consumed.contains(&lower.as_str()) {
658                continue;
659            }
660            if let Some(suffix) = lower.strip_prefix("format.") {
661                format_options.insert(suffix.to_string(), v.clone());
662            } else {
663                connector_options.insert(k.clone(), v.clone());
664            }
665        }
666
667        let cache_max = info.properties.cache_memory.map(cache_entries_from_memory);
668
669        self.connector_manager
670            .lock()
671            .register_table(crate::connector_manager::TableRegistration {
672                name: info.name.clone(),
673                primary_key: pk.to_string(),
674                connector_type: Some(connector_type_str.to_string()),
675                connector_options,
676                format: info.raw_options.get("format").cloned(),
677                format_options,
678                refresh,
679                cache_max_entries: cache_max,
680            });
681
682        Ok(())
683    }
684
685    /// Replaces the `LookupJoinRewriteRule` on the `DataFusion` context
686    /// with one that knows the current set of registered lookup tables.
687    fn refresh_lookup_optimizer_rule(&self) {
688        use laminar_sql::planner::lookup_join::{LookupColumnPruningRule, LookupJoinRewriteRule};
689        use laminar_sql::planner::predicate_split::{
690            PlanPushdownMode, PlanSourceCapabilities, PredicateSplitterRule,
691            SourceCapabilitiesRegistry,
692        };
693
694        // Remove old rules if present
695        self.ctx.remove_optimizer_rule("lookup_join_rewrite");
696        self.ctx.remove_optimizer_rule("predicate_splitter");
697        self.ctx.remove_optimizer_rule("lookup_column_pruning");
698
699        let tables = self.planner.lock().lookup_tables_cloned();
700        if tables.is_empty() {
701            return;
702        }
703
704        // Build capabilities registry from table properties
705        let mut caps_registry = SourceCapabilitiesRegistry::default();
706        for (name, info) in &tables {
707            let mode = match info.properties.pushdown_mode {
708                laminar_sql::parser::lookup_table::PushdownMode::Enabled
709                | laminar_sql::parser::lookup_table::PushdownMode::Auto => PlanPushdownMode::Full,
710                laminar_sql::parser::lookup_table::PushdownMode::Disabled => PlanPushdownMode::None,
711            };
712            let pk_set: std::collections::HashSet<String> =
713                info.primary_key.iter().cloned().collect();
714            caps_registry.register(
715                name.clone(),
716                PlanSourceCapabilities {
717                    pushdown_mode: mode,
718                    eq_columns: pk_set,
719                    range_columns: std::collections::HashSet::new(),
720                    in_columns: std::collections::HashSet::new(),
721                    supports_null_check: false,
722                },
723            );
724        }
725
726        // Register rules in order: rewrite → predicate split → column pruning
727        self.ctx
728            .add_optimizer_rule(Arc::new(LookupJoinRewriteRule::new(tables)));
729        self.ctx
730            .add_optimizer_rule(Arc::new(PredicateSplitterRule::new(caps_registry)));
731        self.ctx
732            .add_optimizer_rule(Arc::new(LookupColumnPruningRule));
733    }
734
735    /// Returns the connector registry for registering custom connectors.
736    ///
737    /// Use this to register user-defined source/sink connectors before
738    /// calling `start()`.
739    #[must_use]
740    pub fn connector_registry(&self) -> &laminar_connectors::registry::ConnectorRegistry {
741        &self.connector_registry
742    }
743
744    /// Register a custom scalar UDF on the `SessionContext`.
745    ///
746    /// Called by `LaminarDbBuilder::build()` after construction.
747    pub(crate) fn register_custom_udf(&self, udf: datafusion_expr::ScalarUDF) {
748        self.ctx.register_udf(udf);
749    }
750
751    /// Register a custom aggregate UDF (UDAF) on the `SessionContext`.
752    ///
753    /// Called by `LaminarDbBuilder::build()` after construction.
754    pub(crate) fn register_custom_udaf(&self, udaf: datafusion_expr::AggregateUDF) {
755        self.ctx.register_udaf(udaf);
756    }
757
758    /// Registers a Delta Lake table as a `DataFusion` `TableProvider`.
759    ///
760    /// After registration, the table can be queried via SQL:
761    /// ```sql
762    /// SELECT * FROM my_delta_table WHERE id > 100
763    /// ```
764    ///
765    /// # Arguments
766    ///
767    /// * `name` - SQL table name (e.g., `"trades"`)
768    /// * `table_uri` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
769    /// * `storage_options` - Storage credentials and configuration
770    ///
771    /// # Errors
772    ///
773    /// Returns `DbError` if the table cannot be opened or registered.
774    #[cfg(feature = "delta-lake")]
775    pub async fn register_delta_table(
776        &self,
777        name: &str,
778        table_uri: &str,
779        storage_options: HashMap<String, String>,
780    ) -> Result<(), DbError> {
781        laminar_connectors::lakehouse::delta_table_provider::register_delta_table(
782            &self.ctx,
783            name,
784            table_uri,
785            storage_options,
786        )
787        .await
788        .map_err(DbError::from)
789    }
790
791    /// Execute a SQL statement.
792    ///
793    /// Supports:
794    /// - `CREATE SOURCE` / `CREATE SINK` — registers sources and sinks
795    /// - `DROP SOURCE` / `DROP SINK` — removes sources and sinks
796    /// - `SHOW SOURCES` / `SHOW SINKS` / `SHOW QUERIES` — list registered objects
797    /// - `DESCRIBE source_name` — show source schema
798    /// - `SELECT ...` — execute a streaming query
799    /// - `INSERT INTO source_name VALUES (...)` — insert data
800    /// - `CREATE MATERIALIZED VIEW` — create a streaming materialized view
801    /// - `EXPLAIN SELECT ...` — show query plan
802    ///
803    /// # Errors
804    ///
805    /// Returns `DbError` if SQL parsing, planning, or execution fails.
806    pub async fn execute(&self, sql: &str) -> Result<ExecuteResult, DbError> {
807        if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
808            return Err(DbError::Shutdown);
809        }
810
811        // Apply config variable substitution
812        let resolved = if self.config_vars.is_empty() {
813            sql.to_string()
814        } else {
815            sql_utils::resolve_config_vars(sql, &self.config_vars, true)?
816        };
817
818        // Split into multiple statements
819        let stmts = sql_utils::split_statements(&resolved);
820        if stmts.is_empty() {
821            return Err(DbError::InvalidOperation("Empty SQL statement".into()));
822        }
823
824        // Execute each statement, return the last result (or first error)
825        let mut last_result = None;
826        for stmt_sql in &stmts {
827            last_result = Some(self.execute_single(stmt_sql).await?);
828        }
829
830        last_result.ok_or_else(|| DbError::InvalidOperation("Empty SQL statement".into()))
831    }
832
833    /// Execute a single SQL statement.
834    #[allow(clippy::too_many_lines)]
835    async fn execute_single(&self, sql: &str) -> Result<ExecuteResult, DbError> {
836        let statements = parse_streaming_sql(sql)?;
837
838        if statements.is_empty() {
839            return Err(DbError::InvalidOperation("Empty SQL statement".into()));
840        }
841
842        let statement = &statements[0];
843
844        match statement {
845            StreamingStatement::CreateSource(create) => {
846                let result = self.handle_create_source(create).await?;
847                if let ExecuteResult::Ddl(ref info) = result {
848                    self.connector_manager
849                        .lock()
850                        .store_ddl(&info.object_name, sql);
851                }
852                Ok(result)
853            }
854            StreamingStatement::CreateSink(create) => {
855                let result = self.handle_create_sink(create)?;
856                if let ExecuteResult::Ddl(ref info) = result {
857                    self.connector_manager
858                        .lock()
859                        .store_ddl(&info.object_name, sql);
860                }
861                Ok(result)
862            }
863            StreamingStatement::CreateStream {
864                name,
865                query,
866                emit_clause,
867                query_sql,
868                ..
869            } => self.handle_create_stream(name, query, emit_clause.as_ref(), query_sql),
870            StreamingStatement::CreateContinuousQuery { .. }
871            | StreamingStatement::CreateLookupTable(_)
872            | StreamingStatement::DropLookupTable { .. } => self.handle_query(sql).await,
873            StreamingStatement::Standard(stmt) => {
874                if let sqlparser::ast::Statement::CreateTable(ct) = stmt.as_ref() {
875                    self.handle_create_table(ct)
876                } else if let sqlparser::ast::Statement::Drop {
877                    object_type: sqlparser::ast::ObjectType::Table,
878                    names,
879                    if_exists,
880                    ..
881                } = stmt.as_ref()
882                {
883                    self.handle_drop_table(names, *if_exists)
884                } else if let sqlparser::ast::Statement::Set(set_stmt) = stmt.as_ref() {
885                    self.handle_set(set_stmt)
886                } else {
887                    self.handle_query(sql).await
888                }
889            }
890            StreamingStatement::InsertInto {
891                table_name,
892                columns,
893                values,
894            } => self.handle_insert_into(table_name, columns, values).await,
895            StreamingStatement::DropSource {
896                name,
897                if_exists,
898                cascade,
899            } => self.handle_drop_source(name, *if_exists, *cascade),
900            StreamingStatement::DropSink {
901                name,
902                if_exists,
903                cascade,
904            } => self.handle_drop_sink(name, *if_exists, *cascade),
905            StreamingStatement::DropStream {
906                name,
907                if_exists,
908                cascade,
909            } => self.handle_drop_stream(name, *if_exists, *cascade),
910            StreamingStatement::DropMaterializedView {
911                name,
912                if_exists,
913                cascade,
914            } => self.handle_drop_materialized_view(name, *if_exists, *cascade),
915            StreamingStatement::Show(cmd) => {
916                let batch = match cmd {
917                    ShowCommand::Sources => self.build_show_sources(),
918                    ShowCommand::Sinks => self.build_show_sinks(),
919                    ShowCommand::Queries => self.build_show_queries(),
920                    ShowCommand::MaterializedViews => self.build_show_materialized_views(),
921                    ShowCommand::Streams => self.build_show_streams(),
922                    ShowCommand::Tables => self.build_show_tables(),
923                    ShowCommand::CheckpointStatus => self.build_show_checkpoint_status().await?,
924                    ShowCommand::CreateSource { name } => {
925                        self.build_show_create_source(&name.to_string())?
926                    }
927                    ShowCommand::CreateSink { name } => {
928                        self.build_show_create_sink(&name.to_string())?
929                    }
930                };
931                Ok(ExecuteResult::Metadata(batch))
932            }
933            StreamingStatement::Checkpoint => {
934                let result = self.checkpoint().await?;
935                Ok(ExecuteResult::Ddl(DdlInfo {
936                    statement_type: "CHECKPOINT".to_string(),
937                    object_name: format!("checkpoint_{}", result.checkpoint_id),
938                }))
939            }
940            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
941                self.handle_restore_checkpoint(*checkpoint_id)
942            }
943            StreamingStatement::Describe { name, .. } => {
944                let name_str = name.to_string();
945                let batch = self.build_describe(&name_str)?;
946                Ok(ExecuteResult::Metadata(batch))
947            }
948            StreamingStatement::Explain {
949                statement, analyze, ..
950            } => {
951                if *analyze {
952                    self.handle_explain_analyze(statement, sql).await
953                } else {
954                    self.handle_explain(statement)
955                }
956            }
957            StreamingStatement::CreateMaterializedView {
958                name,
959                query,
960                or_replace,
961                if_not_exists,
962                query_sql,
963                ..
964            } => {
965                self.handle_create_materialized_view(
966                    sql,
967                    name,
968                    query,
969                    *or_replace,
970                    *if_not_exists,
971                    query_sql,
972                )
973                .await
974            }
975            StreamingStatement::AlterSource { name, operation } => {
976                self.handle_alter_source(name, operation)
977            }
978        }
979    }
980
981    /// Handle INSERT INTO statement.
982    ///
983    /// Inserts SQL VALUES into a registered source, a `TableStore`-managed
984    /// table (with PK upsert), or a plain `DataFusion` `MemTable`.
985    async fn handle_insert_into(
986        &self,
987        table_name: &sqlparser::ast::ObjectName,
988        _columns: &[sqlparser::ast::Ident],
989        values: &[Vec<sqlparser::ast::Expr>],
990    ) -> Result<ExecuteResult, DbError> {
991        let name = table_name.to_string();
992
993        // Try inserting into a registered source
994        if let Some(entry) = self.catalog.get_source(&name) {
995            let batch = sql_utils::sql_values_to_record_batch(&entry.schema, values)?;
996            entry
997                .push_and_buffer(batch)
998                .map_err(|e| DbError::InsertError(format!("Failed to push to source: {e}")))?;
999            return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1000        }
1001
1002        // Try inserting into a TableStore-managed table (with PK upsert).
1003        // Single lock scope avoids TOCTOU race between has_table/schema/upsert.
1004        {
1005            let mut ts = self.table_store.write();
1006            if ts.has_table(&name) {
1007                let schema = ts
1008                    .table_schema(&name)
1009                    .ok_or_else(|| DbError::TableNotFound(name.clone()))?;
1010                let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1011                ts.upsert(&name, &batch)?;
1012                drop(ts); // release before sync (which may also lock)
1013
1014                self.sync_table_to_datafusion(&name)?;
1015                return Ok(ExecuteResult::RowsAffected(values.len() as u64));
1016            }
1017        }
1018
1019        // Otherwise, insert into a DataFusion MemTable
1020        // Look up the table provider
1021        let table = self
1022            .ctx
1023            .table_provider(&name)
1024            .await
1025            .map_err(|_| DbError::TableNotFound(name.clone()))?;
1026
1027        let schema = table.schema();
1028        let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
1029
1030        // Deregister the old table, then re-register with the new data
1031        self.ctx
1032            .deregister_table(&name)
1033            .map_err(|e| DbError::InsertError(format!("Failed to deregister table: {e}")))?;
1034
1035        let mem_table =
1036            datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![batch]])
1037                .map_err(|e| DbError::InsertError(format!("Failed to create table: {e}")))?;
1038
1039        self.ctx
1040            .register_table(&name, Arc::new(mem_table))
1041            .map_err(|e| DbError::InsertError(format!("Failed to register table: {e}")))?;
1042
1043        Ok(ExecuteResult::RowsAffected(values.len() as u64))
1044    }
1045
1046    /// Handle RESTORE FROM CHECKPOINT statement (not yet implemented).
1047    ///
1048    /// Will eventually stop the pipeline, reload state from the checkpoint
1049    /// manifest, seek source offsets, and restart the pipeline.
1050    #[allow(clippy::unused_self)] // will use self when restore is implemented
1051    fn handle_restore_checkpoint(&self, _checkpoint_id: u64) -> Result<ExecuteResult, DbError> {
1052        Err(DbError::Unsupported(
1053            "RESTORE FROM CHECKPOINT is not yet implemented — \
1054             requires pipeline stop, state reload from manifest, \
1055             source offset seek, and pipeline restart"
1056                .to_string(),
1057        ))
1058    }
1059
1060    /// Get a session property value.
1061    #[must_use]
1062    pub fn get_session_property(&self, key: &str) -> Option<String> {
1063        self.session_properties
1064            .lock()
1065            .get(&key.to_lowercase())
1066            .cloned()
1067    }
1068
1069    /// Get all session properties.
1070    #[must_use]
1071    pub fn session_properties(&self) -> HashMap<String, String> {
1072        self.session_properties.lock().clone()
1073    }
1074
1075    /// Subscribe to a named stream or materialized view.
1076    ///
1077    /// # Errors
1078    ///
1079    /// Returns `DbError::StreamNotFound` if the stream is not registered.
1080    pub fn subscribe<T: crate::handle::FromBatch>(
1081        &self,
1082        name: &str,
1083    ) -> Result<crate::handle::TypedSubscription<T>, DbError> {
1084        let sub = self
1085            .catalog
1086            .get_stream_subscription(name)
1087            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
1088        Ok(crate::handle::TypedSubscription::from_raw(sub))
1089    }
1090
1091    /// Subscribe to a named stream's output.
1092    ///
1093    /// # Errors
1094    ///
1095    /// Returns `DbError::StreamNotFound` if the stream doesn't exist.
1096    #[cfg(feature = "api")]
1097    pub fn subscribe_raw(
1098        &self,
1099        name: &str,
1100    ) -> Result<laminar_core::streaming::Subscription<crate::catalog::ArrowRecord>, DbError> {
1101        self.catalog
1102            .get_stream_subscription(name)
1103            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))
1104    }
1105
1106    /// Handle EXPLAIN statement — show the streaming query plan.
1107    fn handle_explain(&self, statement: &StreamingStatement) -> Result<ExecuteResult, DbError> {
1108        let mut planner = self.planner.lock();
1109
1110        // Plan the inner statement to extract streaming info
1111        let plan_result = planner.plan(statement);
1112
1113        let mut rows: Vec<(String, String)> = Vec::new();
1114
1115        match plan_result {
1116            Ok(plan) => {
1117                rows.push((
1118                    "plan_type".into(),
1119                    match &plan {
1120                        laminar_sql::planner::StreamingPlan::Query(_) => "Query",
1121                        laminar_sql::planner::StreamingPlan::RegisterSource(_) => "RegisterSource",
1122                        laminar_sql::planner::StreamingPlan::RegisterSink(_) => "RegisterSink",
1123                        laminar_sql::planner::StreamingPlan::Standard(_) => "Standard",
1124                        laminar_sql::planner::StreamingPlan::RegisterLookupTable(_) => {
1125                            "RegisterLookupTable"
1126                        }
1127                        laminar_sql::planner::StreamingPlan::DropLookupTable { .. } => {
1128                            "DropLookupTable"
1129                        }
1130                    }
1131                    .into(),
1132                ));
1133                match &plan {
1134                    laminar_sql::planner::StreamingPlan::Query(qp) => {
1135                        if let Some(name) = &qp.name {
1136                            rows.push(("query_name".into(), name.clone()));
1137                        }
1138                        if let Some(wc) = &qp.window_config {
1139                            rows.push(("window".into(), format!("{wc}")));
1140                        }
1141                        if let Some(jcs) = &qp.join_config {
1142                            if jcs.len() == 1 {
1143                                rows.push(("join".into(), format!("{}", jcs[0])));
1144                            } else {
1145                                for (i, jc) in jcs.iter().enumerate() {
1146                                    rows.push((format!("join_step_{}", i + 1), format!("{jc}")));
1147                                }
1148                            }
1149                        }
1150                        if let Some(oc) = &qp.order_config {
1151                            rows.push(("order_by".into(), format!("{oc:?}")));
1152                        }
1153                        if let Some(fc) = &qp.frame_config {
1154                            rows.push((
1155                                "frame_functions".into(),
1156                                format!("{}", fc.functions.len()),
1157                            ));
1158                        }
1159                        if let Some(ec) = &qp.emit_clause {
1160                            rows.push(("emit".into(), format!("{ec}")));
1161                        }
1162                    }
1163                    laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1164                        rows.push(("source".into(), info.name.clone()));
1165                    }
1166                    laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1167                        rows.push(("sink".into(), info.name.clone()));
1168                    }
1169                    laminar_sql::planner::StreamingPlan::Standard(_) => {
1170                        rows.push(("execution".into(), "DataFusion pass-through".into()));
1171                    }
1172                    laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1173                        rows.push(("lookup_table".into(), info.name.clone()));
1174                    }
1175                    laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1176                        rows.push(("drop_lookup_table".into(), name.clone()));
1177                    }
1178                }
1179            }
1180            Err(e) => {
1181                // Even if planning fails, show what we know
1182                rows.push(("error".into(), format!("{e}")));
1183                rows.push((
1184                    "statement".into(),
1185                    format!("{:?}", std::mem::discriminant(statement)),
1186                ));
1187            }
1188        }
1189
1190        let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1191        let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1192
1193        let schema = Arc::new(Schema::new(vec![
1194            Field::new("plan_key", DataType::Utf8, false),
1195            Field::new("plan_value", DataType::Utf8, false),
1196        ]));
1197
1198        let batch = RecordBatch::try_new(
1199            schema,
1200            vec![
1201                Arc::new(StringArray::from(keys)),
1202                Arc::new(StringArray::from(values)),
1203            ],
1204        )
1205        .map_err(|e| DbError::InvalidOperation(format!("explain metadata: {e}")))?;
1206
1207        Ok(ExecuteResult::Metadata(batch))
1208    }
1209
1210    /// Handle EXPLAIN ANALYZE: run the plan and collect execution metrics.
1211    async fn handle_explain_analyze(
1212        &self,
1213        statement: &StreamingStatement,
1214        original_sql: &str,
1215    ) -> Result<ExecuteResult, DbError> {
1216        // First get the normal EXPLAIN output
1217        let explain_result = self.handle_explain(statement)?;
1218        let mut rows: Vec<(String, String)> = Vec::new();
1219
1220        if let ExecuteResult::Metadata(explain_batch) = &explain_result {
1221            let keys_col = explain_batch
1222                .column(0)
1223                .as_any()
1224                .downcast_ref::<StringArray>();
1225            let vals_col = explain_batch
1226                .column(1)
1227                .as_any()
1228                .downcast_ref::<StringArray>();
1229            if let (Some(keys), Some(vals)) = (keys_col, vals_col) {
1230                for i in 0..explain_batch.num_rows() {
1231                    rows.push((keys.value(i).to_string(), vals.value(i).to_string()));
1232                }
1233            }
1234        }
1235
1236        // Extract the inner SQL from the original EXPLAIN ANALYZE statement
1237        let upper = original_sql.to_uppercase();
1238        let inner_start = upper.find("ANALYZE").map_or(0, |pos| pos + "ANALYZE".len());
1239        let inner_sql = original_sql[inner_start..].trim();
1240
1241        // Try to execute the inner query via DataFusion and collect metrics
1242        let start = std::time::Instant::now();
1243        match self.ctx.sql(inner_sql).await {
1244            Ok(df) => match df.collect().await {
1245                Ok(batches) => {
1246                    let elapsed = start.elapsed();
1247                    let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
1248                    rows.push(("rows_produced".into(), total_rows.to_string()));
1249                    rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1250                    rows.push(("batches_processed".into(), batches.len().to_string()));
1251                }
1252                Err(e) => {
1253                    let elapsed = start.elapsed();
1254                    rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
1255                    rows.push(("analyze_error".into(), format!("{e}")));
1256                }
1257            },
1258            Err(e) => {
1259                rows.push(("analyze_error".into(), format!("{e}")));
1260            }
1261        }
1262
1263        let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
1264        let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
1265
1266        let schema = Arc::new(Schema::new(vec![
1267            Field::new("plan_key", DataType::Utf8, false),
1268            Field::new("plan_value", DataType::Utf8, false),
1269        ]));
1270
1271        let batch = RecordBatch::try_new(
1272            schema,
1273            vec![
1274                Arc::new(StringArray::from(keys)),
1275                Arc::new(StringArray::from(values)),
1276            ],
1277        )
1278        .map_err(|e| DbError::InvalidOperation(format!("explain analyze metadata: {e}")))?;
1279
1280        Ok(ExecuteResult::Metadata(batch))
1281    }
1282
1283    /// Handle a streaming or standard SQL query.
1284    #[allow(clippy::too_many_lines)]
1285    pub(crate) async fn handle_query(&self, sql: &str) -> Result<ExecuteResult, DbError> {
1286        // Synchronous planning under the lock — released before any await
1287        let plan = {
1288            let statements = parse_streaming_sql(sql)?;
1289            if statements.is_empty() {
1290                return Err(DbError::InvalidOperation("Empty SQL statement".into()));
1291            }
1292            let mut planner = self.planner.lock();
1293            planner
1294                .plan(&statements[0])
1295                .map_err(laminar_sql::Error::from)?
1296        };
1297
1298        match plan {
1299            laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
1300                Ok(ExecuteResult::Ddl(DdlInfo {
1301                    statement_type: "DDL".to_string(),
1302                    object_name: info.name,
1303                }))
1304            }
1305            laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
1306                Ok(ExecuteResult::Ddl(DdlInfo {
1307                    statement_type: "DDL".to_string(),
1308                    object_name: info.name,
1309                }))
1310            }
1311            laminar_sql::planner::StreamingPlan::Query(query_plan) => {
1312                // Check for ASOF join — DataFusion can't parse ASOF syntax
1313                if let Some(asof_config) = Self::extract_asof_config(&query_plan) {
1314                    return self.execute_asof_query(&asof_config, sql).await;
1315                }
1316
1317                let plan_sql = query_plan.statement.to_string();
1318                let logical_plan = self.ctx.state().create_logical_plan(&plan_sql).await?;
1319
1320                // DataFusion interpreted execution.
1321                let df = self.ctx.execute_logical_plan(logical_plan).await?;
1322                let stream = df.execute_stream().await?;
1323
1324                Ok(self.bridge_query_stream(sql, stream))
1325            }
1326            laminar_sql::planner::StreamingPlan::Standard(stmt) => {
1327                // Async execution without the lock
1328                let sql_str = stmt.to_string();
1329                let df = self.ctx.sql(&sql_str).await?;
1330                let stream = df.execute_stream().await?;
1331
1332                Ok(self.bridge_query_stream(sql, stream))
1333            }
1334            laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
1335                self.handle_register_lookup_table(info)
1336            }
1337            laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1338                self.table_store.write().drop_table(&name);
1339                self.connector_manager.lock().unregister_table(&name);
1340                let _ = self.ctx.deregister_table(&name);
1341                self.lookup_registry.unregister(&name);
1342                self.refresh_lookup_optimizer_rule();
1343                Ok(ExecuteResult::Ddl(DdlInfo {
1344                    statement_type: "DROP LOOKUP TABLE".to_string(),
1345                    object_name: name,
1346                }))
1347            }
1348        }
1349    }
1350
1351    /// Bridge a `DataFusion` `SendableRecordBatchStream` into the streaming
1352    /// subscription infrastructure and return a `QueryHandle`.
1353    fn bridge_query_stream(
1354        &self,
1355        sql: &str,
1356        stream: datafusion::physical_plan::SendableRecordBatchStream,
1357    ) -> ExecuteResult {
1358        let query_id = self.catalog.register_query(sql);
1359        let schema = stream.schema();
1360
1361        let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1362        let (source, sink) =
1363            streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1364
1365        let subscription = sink.subscribe();
1366
1367        let source_clone = source.clone();
1368        tokio::spawn(async move {
1369            use tokio_stream::StreamExt;
1370            let mut stream = stream;
1371            while let Some(result) = stream.next().await {
1372                match result {
1373                    Ok(batch) => {
1374                        if source_clone.push_arrow(batch).is_err() {
1375                            break;
1376                        }
1377                    }
1378                    Err(_) => break,
1379                }
1380            }
1381            drop(source_clone);
1382        });
1383
1384        ExecuteResult::Query(QueryHandle {
1385            id: query_id,
1386            schema,
1387            sql: sql.to_string(),
1388            subscription: Some(subscription),
1389            active: true,
1390        })
1391    }
1392
1393    /// Extract an ASOF join config from a query plan, if present.
1394    fn extract_asof_config(
1395        plan: &laminar_sql::planner::QueryPlan,
1396    ) -> Option<AsofJoinTranslatorConfig> {
1397        plan.join_config.as_ref()?.iter().find_map(|jc| {
1398            if let JoinOperatorConfig::Asof(cfg) = jc {
1399                Some(cfg.clone())
1400            } else {
1401                None
1402            }
1403        })
1404    }
1405
1406    /// Execute an ASOF join query by fetching left/right tables separately
1407    /// and performing the join in-process (bypasses `DataFusion`'s SQL parser
1408    /// which doesn't understand ASOF syntax).
1409    async fn execute_asof_query(
1410        &self,
1411        asof_config: &AsofJoinTranslatorConfig,
1412        original_sql: &str,
1413    ) -> Result<ExecuteResult, DbError> {
1414        let left_sql = format!("SELECT * FROM {}", asof_config.left_table);
1415        let right_sql = format!("SELECT * FROM {}", asof_config.right_table);
1416
1417        let left_batches = self
1418            .ctx
1419            .sql(&left_sql)
1420            .await
1421            .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?
1422            .collect()
1423            .await
1424            .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?;
1425
1426        let right_batches = self
1427            .ctx
1428            .sql(&right_sql)
1429            .await
1430            .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?
1431            .collect()
1432            .await
1433            .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?;
1434
1435        let result_batch =
1436            crate::asof_batch::execute_asof_join_batch(&left_batches, &right_batches, asof_config)?;
1437
1438        if result_batch.num_rows() == 0 {
1439            let query_id = self.catalog.register_query(original_sql);
1440            return Ok(ExecuteResult::Query(QueryHandle {
1441                id: query_id,
1442                schema: result_batch.schema(),
1443                sql: original_sql.to_string(),
1444                subscription: None,
1445                active: false,
1446            }));
1447        }
1448
1449        let schema = result_batch.schema();
1450        let mem_table =
1451            datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![result_batch]])
1452                .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1453
1454        let _ = self.ctx.deregister_table("__asof_result");
1455        self.ctx
1456            .register_table("__asof_result", Arc::new(mem_table))
1457            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1458
1459        let df = self
1460            .ctx
1461            .sql("SELECT * FROM __asof_result")
1462            .await
1463            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1464        let stream = df
1465            .execute_stream()
1466            .await
1467            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1468
1469        let _ = self.ctx.deregister_table("__asof_result");
1470
1471        Ok(self.bridge_query_stream(original_sql, stream))
1472    }
1473
1474    /// Get a typed source handle for pushing data.
1475    ///
1476    /// The source must have been created via `CREATE SOURCE`.
1477    ///
1478    /// # Errors
1479    ///
1480    /// Returns `DbError::SourceNotFound` if the source is not registered.
1481    /// Returns `DbError::SchemaMismatch` if the Rust type's schema does not
1482    /// match the source's SQL schema.
1483    pub fn source<T: laminar_core::streaming::Record>(
1484        &self,
1485        name: &str,
1486    ) -> Result<SourceHandle<T>, DbError> {
1487        let entry = self
1488            .catalog
1489            .get_source(name)
1490            .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1491        SourceHandle::new(entry)
1492    }
1493
1494    /// Get an untyped source handle for pushing `RecordBatch` data.
1495    ///
1496    /// # Errors
1497    ///
1498    /// Returns `DbError::SourceNotFound` if the source is not registered.
1499    pub fn source_untyped(&self, name: &str) -> Result<UntypedSourceHandle, DbError> {
1500        let entry = self
1501            .catalog
1502            .get_source(name)
1503            .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1504        Ok(UntypedSourceHandle::new(entry))
1505    }
1506
1507    /// List all registered sources.
1508    pub fn sources(&self) -> Vec<SourceInfo> {
1509        let names = self.catalog.list_sources();
1510        names
1511            .into_iter()
1512            .filter_map(|name| {
1513                self.catalog.get_source(&name).map(|e| SourceInfo {
1514                    name: e.name.clone(),
1515                    schema: e.schema.clone(),
1516                    watermark_column: e.watermark_column.clone(),
1517                })
1518            })
1519            .collect()
1520    }
1521
1522    /// List all registered sinks.
1523    pub fn sinks(&self) -> Vec<SinkInfo> {
1524        self.catalog
1525            .list_sinks()
1526            .into_iter()
1527            .map(|name| SinkInfo { name })
1528            .collect()
1529    }
1530
1531    /// List all registered streams with their SQL definitions.
1532    pub fn streams(&self) -> Vec<crate::handle::StreamInfo> {
1533        let mgr = self.connector_manager.lock();
1534        mgr.streams()
1535            .iter()
1536            .map(|(name, reg)| crate::handle::StreamInfo {
1537                name: name.clone(),
1538                sql: Some(reg.query_sql.clone()),
1539            })
1540            .collect()
1541    }
1542
1543    /// Build the pipeline topology graph from registered sources, streams,
1544    /// and sinks.
1545    ///
1546    /// Returns a `PipelineTopology` with nodes for every source, stream,
1547    /// and sink, plus edges derived from stream SQL `FROM` references and
1548    /// sink `input` fields.
1549    pub fn pipeline_topology(&self) -> crate::handle::PipelineTopology {
1550        use crate::handle::{PipelineEdge, PipelineNode, PipelineNodeType};
1551
1552        let mut nodes = Vec::new();
1553        let mut edges = Vec::new();
1554
1555        // Collect source names for FROM matching
1556        let source_names = self.catalog.list_sources();
1557
1558        // Source nodes
1559        for name in &source_names {
1560            let schema = self.catalog.get_source(name).map(|e| e.schema.clone());
1561            nodes.push(PipelineNode {
1562                name: name.clone(),
1563                node_type: PipelineNodeType::Source,
1564                schema,
1565                sql: None,
1566            });
1567        }
1568
1569        // Stream nodes + edges from SQL FROM references
1570        let mgr = self.connector_manager.lock();
1571        let stream_names: Vec<String> = mgr.streams().keys().cloned().collect();
1572        for (name, reg) in mgr.streams() {
1573            nodes.push(PipelineNode {
1574                name: name.clone(),
1575                node_type: PipelineNodeType::Stream,
1576                schema: None,
1577                sql: Some(reg.query_sql.clone()),
1578            });
1579
1580            // Extract FROM references by checking which known sources/streams
1581            // appear in the query SQL. This is a lightweight heuristic that
1582            // avoids a full SQL parse.
1583            let sql_upper = reg.query_sql.to_uppercase();
1584            for src in &source_names {
1585                if sql_upper.contains(&src.to_uppercase()) {
1586                    edges.push(PipelineEdge {
1587                        from: src.clone(),
1588                        to: name.clone(),
1589                    });
1590                }
1591            }
1592            // Also check for stream-to-stream references (cascading)
1593            for other in &stream_names {
1594                if other != name && sql_upper.contains(&other.to_uppercase()) {
1595                    edges.push(PipelineEdge {
1596                        from: other.clone(),
1597                        to: name.clone(),
1598                    });
1599                }
1600            }
1601        }
1602
1603        // Sink nodes + edges from input field
1604        for (name, reg) in mgr.sinks() {
1605            nodes.push(PipelineNode {
1606                name: name.clone(),
1607                node_type: PipelineNodeType::Sink,
1608                schema: None,
1609                sql: None,
1610            });
1611
1612            // Sinks read from their `input` field
1613            if !reg.input.is_empty() {
1614                edges.push(PipelineEdge {
1615                    from: reg.input.clone(),
1616                    to: name.clone(),
1617                });
1618            }
1619        }
1620
1621        // Also add catalog-only sinks (no connector type) that aren't
1622        // already in the connector manager
1623        let cm_sink_names: std::collections::HashSet<&String> = mgr.sinks().keys().collect();
1624        for name in self.catalog.list_sinks() {
1625            if !cm_sink_names.contains(&name) {
1626                // Check if there's a sink entry in the catalog with input info
1627                if let Some(input) = self.catalog.get_sink_input(&name) {
1628                    nodes.push(PipelineNode {
1629                        name: name.clone(),
1630                        node_type: PipelineNodeType::Sink,
1631                        schema: None,
1632                        sql: None,
1633                    });
1634                    if !input.is_empty() {
1635                        edges.push(PipelineEdge {
1636                            from: input,
1637                            to: name,
1638                        });
1639                    }
1640                }
1641            }
1642        }
1643
1644        drop(mgr);
1645
1646        crate::handle::PipelineTopology { nodes, edges }
1647    }
1648
1649    /// List all active queries.
1650    pub fn queries(&self) -> Vec<QueryInfo> {
1651        self.catalog
1652            .list_queries()
1653            .into_iter()
1654            .map(|(id, sql, active)| QueryInfo { id, sql, active })
1655            .collect()
1656    }
1657
1658    /// Returns whether streaming checkpointing is enabled.
1659    #[must_use]
1660    pub fn is_checkpoint_enabled(&self) -> bool {
1661        self.config.checkpoint.is_some()
1662    }
1663
1664    /// Returns a checkpoint store instance, if checkpointing is configured.
1665    ///
1666    /// Returns an [`ObjectStoreCheckpointStore`](laminar_storage::ObjectStoreCheckpointStore)
1667    /// when `object_store_url` is set, otherwise a
1668    /// [`FileSystemCheckpointStore`](laminar_storage::FileSystemCheckpointStore).
1669    pub fn checkpoint_store(&self) -> Option<Box<dyn laminar_storage::CheckpointStore>> {
1670        let cp_config = self.config.checkpoint.as_ref()?;
1671        let max_retained = cp_config.max_retained.unwrap_or(3);
1672        // Pass the runtime vnode count through so manifest validation
1673        // checks against the real invariant, not a hardcoded default.
1674        let vnode_count = self.vnode_registry.lock().as_ref().map_or(
1675            laminar_storage::checkpoint_manifest::DEFAULT_VNODE_COUNT,
1676            |r| u16::try_from(r.vnode_count()).unwrap_or(u16::MAX),
1677        );
1678
1679        if let Some(ref url) = self.config.object_store_url {
1680            let obj_store = laminar_storage::object_store_builder::build_object_store(
1681                url,
1682                &self.config.object_store_options,
1683            )
1684            .ok()?;
1685            let prefix = url_to_checkpoint_prefix(url);
1686            Some(Box::new(
1687                laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
1688                    obj_store,
1689                    prefix,
1690                    max_retained,
1691                )
1692                .with_vnode_count(vnode_count),
1693            ))
1694        } else {
1695            let data_dir = cp_config
1696                .data_dir
1697                .clone()
1698                .or_else(|| self.config.storage_dir.clone())
1699                .unwrap_or_else(|| std::path::PathBuf::from("./data"));
1700            Some(Box::new(
1701                laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
1702                    &data_dir,
1703                    max_retained,
1704                )
1705                .with_vnode_count(vnode_count),
1706            ))
1707        }
1708    }
1709
1710    /// Triggers a streaming checkpoint that persists source offsets, sink
1711    /// positions, and operator state to disk via the
1712    /// [`CheckpointCoordinator`](crate::checkpoint_coordinator::CheckpointCoordinator).
1713    ///
1714    /// Returns the checkpoint result on success, including the checkpoint ID,
1715    /// epoch, and duration.
1716    ///
1717    /// # Errors
1718    ///
1719    /// Returns `DbError::Checkpoint` if checkpointing is not enabled, the
1720    /// coordinator has not been initialized (call `start()` first), or the
1721    /// checkpoint operation fails.
1722    pub async fn checkpoint(
1723        &self,
1724    ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
1725        if self.config.checkpoint.is_none() {
1726            return Err(DbError::Checkpoint(
1727                "checkpointing is not enabled".to_string(),
1728            ));
1729        }
1730
1731        // When the streaming pipeline is live, route through the
1732        // pipeline callback so it captures operator state (via the same
1733        // path that the periodic checkpoint timer uses). Without this,
1734        // the manifest has an empty `operator_states` map and restart
1735        // loses everything the `IncrementalAggState` accumulators held.
1736        let tx = self.force_ckpt_tx.lock().clone();
1737        if let Some(tx) = tx {
1738            let (reply_tx, reply_rx) = crossfire::oneshot::oneshot();
1739            tx.send(reply_tx).await.map_err(|_| {
1740                DbError::Checkpoint(
1741                    "pipeline callback receiver closed — engine may be shutting down".into(),
1742                )
1743            })?;
1744            return reply_rx.await.map_err(|_| {
1745                DbError::Checkpoint("pipeline callback dropped oneshot before replying".into())
1746            })?;
1747        }
1748
1749        // Fallback: no running pipeline (e.g., engine built but not yet
1750        // started). Drive the coordinator directly. Operator state will
1751        // be empty, but restart from this manifest is still well-defined
1752        // because there's nothing to restore anyway.
1753        let mut guard = self.coordinator.lock().await;
1754        let coord = guard.as_mut().ok_or_else(|| {
1755            DbError::Checkpoint("coordinator not initialized — call start() first".to_string())
1756        })?;
1757        coord
1758            .checkpoint(crate::checkpoint_coordinator::CheckpointRequest::default())
1759            .await
1760    }
1761
1762    /// Returns checkpoint performance statistics.
1763    ///
1764    /// Returns `None` if the checkpoint coordinator has not been initialized.
1765    pub async fn checkpoint_stats(&self) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
1766        let guard = self.coordinator.lock().await;
1767        guard
1768            .as_ref()
1769            .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
1770    }
1771}
1772
1773impl std::fmt::Debug for LaminarDB {
1774    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1775        f.debug_struct("LaminarDB")
1776            .field("sources", &self.catalog.list_sources().len())
1777            .field("sinks", &self.catalog.list_sinks().len())
1778            .field("materialized_views", &self.mv_registry.lock().len())
1779            .field("checkpoint_enabled", &self.is_checkpoint_enabled())
1780            .field("shutdown", &self.is_closed())
1781            .finish_non_exhaustive()
1782    }
1783}
1784
1785/// Wraps `DefaultPhysicalPlanner` with lookup join extension support.
1786struct LookupQueryPlanner {
1787    extension_planner: Arc<dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync>,
1788}
1789
1790impl std::fmt::Debug for LookupQueryPlanner {
1791    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1792        f.debug_struct("LookupQueryPlanner").finish_non_exhaustive()
1793    }
1794}
1795
1796#[async_trait::async_trait]
1797impl datafusion::execution::context::QueryPlanner for LookupQueryPlanner {
1798    async fn create_physical_plan(
1799        &self,
1800        logical_plan: &datafusion::logical_expr::LogicalPlan,
1801        session_state: &datafusion::execution::SessionState,
1802    ) -> datafusion_common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
1803        use datafusion::physical_planner::PhysicalPlanner;
1804        let planner =
1805            datafusion::physical_planner::DefaultPhysicalPlanner::with_extension_planners(vec![
1806                Arc::clone(&self.extension_planner),
1807            ]);
1808        planner
1809            .create_physical_plan(logical_plan, session_state)
1810            .await
1811    }
1812}
1813
1814#[cfg(test)]
1815mod tests;