Skip to main content

laminar_db/
db.rs

1//! The main `LaminarDB` database facade.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow::array::{RecordBatch, StringArray};
8use arrow::datatypes::{DataType, Field, Schema};
9use datafusion::prelude::SessionContext;
10use laminar_core::streaming;
11use laminar_sql::parser::{parse_streaming_sql, ShowCommand, StreamingStatement};
12use laminar_sql::planner::StreamingPlanner;
13use laminar_sql::register_streaming_functions;
14use laminar_sql::translator::{AsofJoinTranslatorConfig, JoinOperatorConfig};
15
16use crate::builder::LaminarDbBuilder;
17use crate::catalog::SourceCatalog;
18use crate::config::LaminarConfig;
19use crate::error::DbError;
20use crate::handle::{
21    DdlInfo, ExecuteResult, QueryHandle, QueryInfo, SinkInfo, SourceHandle, SourceInfo,
22    UntypedSourceHandle,
23};
24use crate::pipeline_lifecycle::url_to_checkpoint_prefix;
25use crate::sql_utils;
26
27pub(crate) const STATE_CREATED: u8 = 0;
28pub(crate) const STATE_STARTING: u8 = 1;
29pub(crate) const STATE_RUNNING: u8 = 2;
30pub(crate) const STATE_SHUTTING_DOWN: u8 = 3;
31pub(crate) const STATE_STOPPED: u8 = 4;
32
33/// Extract SQL text from a `StreamingStatement` for storage in the connector manager.
34pub(crate) fn streaming_statement_to_sql(stmt: &StreamingStatement) -> String {
35    match stmt {
36        StreamingStatement::Standard(sql_stmt) => sql_stmt.to_string(),
37        StreamingStatement::CreateContinuousQuery { query, .. } => {
38            streaming_statement_to_sql(query)
39        }
40        other => format!("{other:?}"),
41    }
42}
43
44/// The main `LaminarDB` database handle.
45///
46/// Provides a unified interface for SQL execution, data ingestion,
47/// and result consumption. All streaming infrastructure (sources, sinks,
48/// channels, subscriptions) is managed internally.
49///
50/// # Example
51///
52/// ```rust,ignore
53/// use laminar_db::LaminarDB;
54///
55/// let db = LaminarDB::open()?;
56///
57/// db.execute("CREATE SOURCE trades (
58///     symbol VARCHAR, price DOUBLE, ts BIGINT,
59///     WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
60/// )").await?;
61///
62/// let query = db.execute("SELECT symbol, AVG(price) FROM trades
63///     GROUP BY symbol, TUMBLE(ts, INTERVAL '1' MINUTE)
64/// ").await?;
65/// ```
66pub struct LaminarDB {
67    pub(crate) catalog: Arc<SourceCatalog>,
68    pub(crate) planner: parking_lot::Mutex<StreamingPlanner>,
69    pub(crate) ctx: SessionContext,
70    pub(crate) config: LaminarConfig,
71    pub(crate) config_vars: Arc<HashMap<String, String>>,
72    pub(crate) shutdown: std::sync::atomic::AtomicBool,
73    /// Unified checkpoint coordinator (populated by `start()`).
74    pub(crate) coordinator:
75        Arc<tokio::sync::Mutex<Option<crate::checkpoint_coordinator::CheckpointCoordinator>>>,
76    pub(crate) connector_manager: parking_lot::Mutex<crate::connector_manager::ConnectorManager>,
77    pub(crate) connector_registry: Arc<laminar_connectors::registry::ConnectorRegistry>,
78    pub(crate) mv_registry: parking_lot::Mutex<laminar_core::mv::MvRegistry>,
79    pub(crate) table_store: Arc<parking_lot::RwLock<crate::table_store::TableStore>>,
80    pub(crate) state: std::sync::atomic::AtomicU8,
81    /// Handle to the background processing task (if running).
82    pub(crate) runtime_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
83    /// Signal to stop the processing loop.
84    pub(crate) shutdown_signal: Arc<tokio::sync::Notify>,
85    /// Shared pipeline counters for observability.
86    pub(crate) counters: Arc<crate::metrics::PipelineCounters>,
87    /// Instant when the database was created, for uptime calculation.
88    pub(crate) start_time: std::time::Instant,
89    /// Session properties set via `SET key = value`.
90    pub(crate) session_properties: parking_lot::Mutex<HashMap<String, String>>,
91    /// Global pipeline watermark (min of all source watermarks).
92    pub(crate) pipeline_watermark: Arc<std::sync::atomic::AtomicI64>,
93    /// Shared compiler cache for JIT-compiled pipelines.
94    /// Protected by `Mutex` — only locked during compilation, not execution.
95    #[cfg(feature = "jit")]
96    pub(crate) compiler_cache: parking_lot::Mutex<laminar_core::compiler::CompilerCache>,
97    /// Shared lookup table registry for physical planning of lookup joins.
98    pub(crate) lookup_registry: Arc<laminar_sql::datafusion::LookupTableRegistry>,
99}
100
101/// Per-source watermark tracking state for the pipeline loop.
102///
103/// Combines an `EventTimeExtractor` (to find the max timestamp in each batch)
104/// with a watermark generator (to compute the watermark with delay).
105pub(crate) struct SourceWatermarkState {
106    pub(crate) extractor: laminar_core::time::EventTimeExtractor,
107    pub(crate) generator: Box<dyn laminar_core::time::WatermarkGenerator>,
108    /// Watermark column name for late-row filtering.
109    pub(crate) column: String,
110    /// Timestamp format for late-row filtering.
111    pub(crate) format: laminar_core::time::TimestampFormat,
112}
113
114/// Infer the `TimestampFormat` from a schema column's `DataType`.
115pub(crate) fn infer_timestamp_format(
116    schema: &arrow::datatypes::SchemaRef,
117    column: &str,
118) -> laminar_core::time::TimestampFormat {
119    if let Ok(idx) = schema.index_of(column) {
120        match schema.field(idx).data_type() {
121            DataType::Timestamp(_, _) => laminar_core::time::TimestampFormat::ArrowNative,
122            _ => laminar_core::time::TimestampFormat::UnixMillis,
123        }
124    } else {
125        laminar_core::time::TimestampFormat::UnixMillis
126    }
127}
128
129/// Filters rows from a `RecordBatch` whose timestamp is behind the watermark.
130///
131/// Returns `None` if all rows are late (i.e., filtered result is empty).
132/// For sources without a watermark column, callers should skip this function
133/// and pass the batch through unfiltered.
134pub(crate) fn filter_late_rows(
135    batch: &RecordBatch,
136    column: &str,
137    watermark: i64,
138    format: laminar_core::time::TimestampFormat,
139) -> Option<RecordBatch> {
140    crate::batch_filter::filter_batch_by_timestamp(
141        batch,
142        column,
143        watermark,
144        format,
145        crate::batch_filter::ThresholdOp::GreaterEq,
146    )
147}
148
149/// Parse a human-readable duration string (e.g., "5s", "1m", "500ms", "30s").
150pub(crate) fn parse_duration_str(s: &str) -> Option<std::time::Duration> {
151    let s = s.trim();
152    if s.ends_with("ms") {
153        let n: u64 = s.strip_suffix("ms")?.trim().parse().ok()?;
154        Some(std::time::Duration::from_millis(n))
155    } else if s.ends_with('s') {
156        let n: u64 = s.strip_suffix('s')?.trim().parse().ok()?;
157        Some(std::time::Duration::from_secs(n))
158    } else if s.ends_with('m') {
159        let n: u64 = s.strip_suffix('m')?.trim().parse().ok()?;
160        Some(std::time::Duration::from_secs(n * 60))
161    } else {
162        // Try parsing as seconds
163        let n: u64 = s.parse().ok()?;
164        Some(std::time::Duration::from_secs(n))
165    }
166}
167
168impl LaminarDB {
169    /// Create an embedded in-memory database with default settings.
170    ///
171    /// # Errors
172    ///
173    /// Returns `DbError` if `DataFusion` context creation fails.
174    pub fn open() -> Result<Self, DbError> {
175        Self::open_with_config(LaminarConfig::default())
176    }
177
178    /// Create with custom configuration.
179    ///
180    /// # Errors
181    ///
182    /// Returns `DbError` if `DataFusion` context creation fails.
183    pub fn open_with_config(config: LaminarConfig) -> Result<Self, DbError> {
184        Self::open_with_config_and_vars(config, HashMap::new())
185    }
186
187    /// Create with custom configuration and config variables for SQL substitution.
188    ///
189    /// # Errors
190    ///
191    /// Returns `DbError` if `DataFusion` context creation fails.
192    #[allow(clippy::unnecessary_wraps)]
193    pub(crate) fn open_with_config_and_vars(
194        config: LaminarConfig,
195        config_vars: HashMap<String, String>,
196    ) -> Result<Self, DbError> {
197        let lookup_registry = Arc::new(laminar_sql::datafusion::LookupTableRegistry::new());
198
199        // Build a SessionContext with the LookupJoinExtensionPlanner wired
200        // into the physical planner so LookupJoinNode → LookupJoinExec works.
201        let ctx = {
202            let session_config = laminar_sql::datafusion::base_session_config();
203            let extension_planner: Arc<
204                dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync,
205            > = Arc::new(laminar_sql::datafusion::LookupJoinExtensionPlanner::new(
206                Arc::clone(&lookup_registry),
207            ));
208            let query_planner: Arc<dyn datafusion::execution::context::QueryPlanner + Send + Sync> =
209                Arc::new(LookupQueryPlanner { extension_planner });
210            let state = datafusion::execution::SessionStateBuilder::new()
211                .with_config(session_config)
212                .with_default_features()
213                .with_query_planner(query_planner)
214                .build();
215            SessionContext::new_with_state(state)
216        };
217        register_streaming_functions(&ctx);
218
219        let catalog = Arc::new(SourceCatalog::new(
220            config.default_buffer_size,
221            config.default_backpressure,
222        ));
223
224        let connector_registry = Arc::new(laminar_connectors::registry::ConnectorRegistry::new());
225        Self::register_builtin_connectors(&connector_registry);
226
227        Ok(Self {
228            catalog,
229            planner: parking_lot::Mutex::new(StreamingPlanner::new()),
230            ctx,
231            config,
232            config_vars: Arc::new(config_vars),
233            shutdown: std::sync::atomic::AtomicBool::new(false),
234            coordinator: Arc::new(tokio::sync::Mutex::new(None)),
235            connector_manager: parking_lot::Mutex::new(
236                crate::connector_manager::ConnectorManager::new(),
237            ),
238            connector_registry,
239            mv_registry: parking_lot::Mutex::new(laminar_core::mv::MvRegistry::new()),
240            table_store: Arc::new(parking_lot::RwLock::new(
241                crate::table_store::TableStore::new(),
242            )),
243            state: std::sync::atomic::AtomicU8::new(STATE_CREATED),
244            runtime_handle: parking_lot::Mutex::new(None),
245            shutdown_signal: Arc::new(tokio::sync::Notify::new()),
246            counters: Arc::new(crate::metrics::PipelineCounters::new()),
247            start_time: std::time::Instant::now(),
248            session_properties: parking_lot::Mutex::new(HashMap::new()),
249            pipeline_watermark: Arc::new(std::sync::atomic::AtomicI64::new(i64::MIN)),
250            #[cfg(feature = "jit")]
251            compiler_cache: parking_lot::Mutex::new(
252                laminar_core::compiler::CompilerCache::new(64)
253                    .expect("JIT compiler cache initialization"),
254            ),
255            lookup_registry,
256        })
257    }
258
259    /// Get a fluent builder for constructing a `LaminarDB`.
260    #[must_use]
261    pub fn builder() -> LaminarDbBuilder {
262        LaminarDbBuilder::new()
263    }
264
265    /// Register built-in connectors based on enabled features.
266    #[allow(unused_variables)]
267    fn register_builtin_connectors(registry: &laminar_connectors::registry::ConnectorRegistry) {
268        #[cfg(feature = "kafka")]
269        {
270            laminar_connectors::kafka::register_kafka_source(registry);
271            laminar_connectors::kafka::register_kafka_sink(registry);
272        }
273        #[cfg(feature = "postgres-cdc")]
274        {
275            laminar_connectors::cdc::postgres::register_postgres_cdc(registry);
276        }
277        #[cfg(feature = "postgres-sink")]
278        {
279            laminar_connectors::postgres::register_postgres_sink(registry);
280        }
281        #[cfg(feature = "delta-lake")]
282        {
283            laminar_connectors::lakehouse::register_delta_lake_sink(registry);
284            laminar_connectors::lakehouse::register_delta_lake_source(registry);
285        }
286        #[cfg(feature = "websocket")]
287        {
288            laminar_connectors::websocket::register_websocket_source(registry);
289            laminar_connectors::websocket::register_websocket_sink(registry);
290        }
291        #[cfg(feature = "mysql-cdc")]
292        {
293            laminar_connectors::cdc::mysql::register_mysql_cdc_source(registry);
294        }
295        #[cfg(feature = "files")]
296        {
297            laminar_connectors::files::register_file_source(registry);
298            laminar_connectors::files::register_file_sink(registry);
299        }
300    }
301
302    /// Replaces the `LookupJoinRewriteRule` on the `DataFusion` context
303    /// with one that knows the current set of registered lookup tables.
304    fn refresh_lookup_optimizer_rule(&self) {
305        use laminar_sql::planner::lookup_join::{LookupColumnPruningRule, LookupJoinRewriteRule};
306        use laminar_sql::planner::predicate_split::{
307            PlanPushdownMode, PlanSourceCapabilities, PredicateSplitterRule,
308            SourceCapabilitiesRegistry,
309        };
310
311        // Remove old rules if present
312        self.ctx.remove_optimizer_rule("lookup_join_rewrite");
313        self.ctx.remove_optimizer_rule("predicate_splitter");
314        self.ctx.remove_optimizer_rule("lookup_column_pruning");
315
316        let tables = self.planner.lock().lookup_tables_cloned();
317        if tables.is_empty() {
318            return;
319        }
320
321        // Build capabilities registry from table properties
322        let mut caps_registry = SourceCapabilitiesRegistry::default();
323        for (name, info) in &tables {
324            let mode = match info.properties.pushdown_mode {
325                laminar_sql::parser::lookup_table::PushdownMode::Enabled
326                | laminar_sql::parser::lookup_table::PushdownMode::Auto => PlanPushdownMode::Full,
327                laminar_sql::parser::lookup_table::PushdownMode::Disabled => PlanPushdownMode::None,
328            };
329            let pk_set: std::collections::HashSet<String> =
330                info.primary_key.iter().cloned().collect();
331            caps_registry.register(
332                name.clone(),
333                PlanSourceCapabilities {
334                    pushdown_mode: mode,
335                    eq_columns: pk_set,
336                    range_columns: std::collections::HashSet::new(),
337                    in_columns: std::collections::HashSet::new(),
338                    supports_null_check: false,
339                },
340            );
341        }
342
343        // Register rules in order: rewrite → predicate split → column pruning
344        self.ctx
345            .add_optimizer_rule(Arc::new(LookupJoinRewriteRule::new(tables)));
346        self.ctx
347            .add_optimizer_rule(Arc::new(PredicateSplitterRule::new(caps_registry)));
348        self.ctx
349            .add_optimizer_rule(Arc::new(LookupColumnPruningRule));
350    }
351
352    /// Returns the connector registry for registering custom connectors.
353    ///
354    /// Use this to register user-defined source/sink connectors before
355    /// calling `start()`.
356    #[must_use]
357    pub fn connector_registry(&self) -> &laminar_connectors::registry::ConnectorRegistry {
358        &self.connector_registry
359    }
360
361    /// Register a custom scalar UDF on the `SessionContext`.
362    ///
363    /// Called by `LaminarDbBuilder::build()` after construction.
364    pub(crate) fn register_custom_udf(&self, udf: datafusion_expr::ScalarUDF) {
365        self.ctx.register_udf(udf);
366    }
367
368    /// Register a custom aggregate UDF (UDAF) on the `SessionContext`.
369    ///
370    /// Called by `LaminarDbBuilder::build()` after construction.
371    pub(crate) fn register_custom_udaf(&self, udaf: datafusion_expr::AggregateUDF) {
372        self.ctx.register_udaf(udaf);
373    }
374
375    /// Registers a Delta Lake table as a `DataFusion` `TableProvider`.
376    ///
377    /// After registration, the table can be queried via SQL:
378    /// ```sql
379    /// SELECT * FROM my_delta_table WHERE id > 100
380    /// ```
381    ///
382    /// # Arguments
383    ///
384    /// * `name` - SQL table name (e.g., `"trades"`)
385    /// * `table_uri` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
386    /// * `storage_options` - Storage credentials and configuration
387    ///
388    /// # Errors
389    ///
390    /// Returns `DbError` if the table cannot be opened or registered.
391    #[cfg(feature = "delta-lake")]
392    pub async fn register_delta_table(
393        &self,
394        name: &str,
395        table_uri: &str,
396        storage_options: HashMap<String, String>,
397    ) -> Result<(), DbError> {
398        laminar_connectors::lakehouse::delta_table_provider::register_delta_table(
399            &self.ctx,
400            name,
401            table_uri,
402            storage_options,
403        )
404        .await
405        .map_err(|e| DbError::Connector(e.to_string()))
406    }
407
408    /// Execute a SQL statement.
409    ///
410    /// Supports:
411    /// - `CREATE SOURCE` / `CREATE SINK` — registers sources and sinks
412    /// - `DROP SOURCE` / `DROP SINK` — removes sources and sinks
413    /// - `SHOW SOURCES` / `SHOW SINKS` / `SHOW QUERIES` — list registered objects
414    /// - `DESCRIBE source_name` — show source schema
415    /// - `SELECT ...` — execute a streaming query
416    /// - `INSERT INTO source_name VALUES (...)` — insert data
417    /// - `CREATE MATERIALIZED VIEW` — create a streaming materialized view
418    /// - `EXPLAIN SELECT ...` — show query plan
419    ///
420    /// # Errors
421    ///
422    /// Returns `DbError` if SQL parsing, planning, or execution fails.
423    pub async fn execute(&self, sql: &str) -> Result<ExecuteResult, DbError> {
424        if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
425            return Err(DbError::Shutdown);
426        }
427
428        // Apply config variable substitution
429        let resolved = if self.config_vars.is_empty() {
430            sql.to_string()
431        } else {
432            sql_utils::resolve_config_vars(sql, &self.config_vars, true)?
433        };
434
435        // Split into multiple statements
436        let stmts = sql_utils::split_statements(&resolved);
437        if stmts.is_empty() {
438            return Err(DbError::InvalidOperation("Empty SQL statement".into()));
439        }
440
441        // Execute each statement, return the last result (or first error)
442        let mut last_result = None;
443        for stmt_sql in &stmts {
444            last_result = Some(self.execute_single(stmt_sql).await?);
445        }
446
447        last_result.ok_or_else(|| DbError::InvalidOperation("Empty SQL statement".into()))
448    }
449
450    /// Execute a single SQL statement.
451    #[allow(clippy::too_many_lines)]
452    async fn execute_single(&self, sql: &str) -> Result<ExecuteResult, DbError> {
453        let statements = parse_streaming_sql(sql)?;
454
455        if statements.is_empty() {
456            return Err(DbError::InvalidOperation("Empty SQL statement".into()));
457        }
458
459        let statement = &statements[0];
460
461        match statement {
462            StreamingStatement::CreateSource(create) => {
463                let result = self.handle_create_source(create)?;
464                if let ExecuteResult::Ddl(ref info) = result {
465                    self.connector_manager
466                        .lock()
467                        .store_ddl(&info.object_name, sql);
468                }
469                Ok(result)
470            }
471            StreamingStatement::CreateSink(create) => {
472                let result = self.handle_create_sink(create)?;
473                if let ExecuteResult::Ddl(ref info) = result {
474                    self.connector_manager
475                        .lock()
476                        .store_ddl(&info.object_name, sql);
477                }
478                Ok(result)
479            }
480            StreamingStatement::CreateStream {
481                name,
482                query,
483                emit_clause,
484                ..
485            } => self.handle_create_stream(name, query, emit_clause.as_ref()),
486            StreamingStatement::CreateContinuousQuery { .. }
487            | StreamingStatement::CreateLookupTable(_)
488            | StreamingStatement::DropLookupTable { .. } => self.handle_query(sql).await,
489            StreamingStatement::Standard(stmt) => {
490                if let sqlparser::ast::Statement::CreateTable(ct) = stmt.as_ref() {
491                    self.handle_create_table(ct)
492                } else if let sqlparser::ast::Statement::Drop {
493                    object_type: sqlparser::ast::ObjectType::Table,
494                    names,
495                    if_exists,
496                    ..
497                } = stmt.as_ref()
498                {
499                    self.handle_drop_table(names, *if_exists)
500                } else if let sqlparser::ast::Statement::Set(set_stmt) = stmt.as_ref() {
501                    self.handle_set(set_stmt)
502                } else {
503                    self.handle_query(sql).await
504                }
505            }
506            StreamingStatement::InsertInto {
507                table_name,
508                columns,
509                values,
510            } => self.handle_insert_into(table_name, columns, values).await,
511            StreamingStatement::DropSource {
512                name,
513                if_exists,
514                cascade,
515            } => self.handle_drop_source(name, *if_exists, *cascade),
516            StreamingStatement::DropSink {
517                name,
518                if_exists,
519                cascade,
520            } => self.handle_drop_sink(name, *if_exists, *cascade),
521            StreamingStatement::DropStream {
522                name,
523                if_exists,
524                cascade,
525            } => self.handle_drop_stream(name, *if_exists, *cascade),
526            StreamingStatement::DropMaterializedView {
527                name,
528                if_exists,
529                cascade,
530            } => self.handle_drop_materialized_view(name, *if_exists, *cascade),
531            StreamingStatement::Show(cmd) => {
532                let batch = match cmd {
533                    ShowCommand::Sources => self.build_show_sources(),
534                    ShowCommand::Sinks => self.build_show_sinks(),
535                    ShowCommand::Queries => self.build_show_queries(),
536                    ShowCommand::MaterializedViews => self.build_show_materialized_views(),
537                    ShowCommand::Streams => self.build_show_streams(),
538                    ShowCommand::Tables => self.build_show_tables(),
539                    ShowCommand::CheckpointStatus => self.build_show_checkpoint_status()?,
540                    ShowCommand::CreateSource { name } => {
541                        self.build_show_create_source(&name.to_string())?
542                    }
543                    ShowCommand::CreateSink { name } => {
544                        self.build_show_create_sink(&name.to_string())?
545                    }
546                };
547                Ok(ExecuteResult::Metadata(batch))
548            }
549            StreamingStatement::Checkpoint => {
550                let result = self.checkpoint().await?;
551                Ok(ExecuteResult::Ddl(DdlInfo {
552                    statement_type: "CHECKPOINT".to_string(),
553                    object_name: format!("checkpoint_{}", result.checkpoint_id),
554                }))
555            }
556            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
557                self.handle_restore_checkpoint(*checkpoint_id)
558            }
559            StreamingStatement::Describe { name, .. } => {
560                let name_str = name.to_string();
561                let batch = self.build_describe(&name_str)?;
562                Ok(ExecuteResult::Metadata(batch))
563            }
564            StreamingStatement::Explain {
565                statement, analyze, ..
566            } => {
567                if *analyze {
568                    self.handle_explain_analyze(statement, sql).await
569                } else {
570                    self.handle_explain(statement)
571                }
572            }
573            StreamingStatement::CreateMaterializedView {
574                name,
575                query,
576                or_replace,
577                if_not_exists,
578                ..
579            } => {
580                self.handle_create_materialized_view(sql, name, query, *or_replace, *if_not_exists)
581                    .await
582            }
583            StreamingStatement::AlterSource { name, operation } => {
584                self.handle_alter_source(name, operation)
585            }
586        }
587    }
588
589    /// Handle INSERT INTO statement.
590    ///
591    /// Inserts SQL VALUES into a registered source, a `TableStore`-managed
592    /// table (with PK upsert), or a plain `DataFusion` `MemTable`.
593    async fn handle_insert_into(
594        &self,
595        table_name: &sqlparser::ast::ObjectName,
596        _columns: &[sqlparser::ast::Ident],
597        values: &[Vec<sqlparser::ast::Expr>],
598    ) -> Result<ExecuteResult, DbError> {
599        let name = table_name.to_string();
600
601        // Try inserting into a registered source
602        if let Some(entry) = self.catalog.get_source(&name) {
603            let batch = sql_utils::sql_values_to_record_batch(&entry.schema, values)?;
604            entry
605                .push_and_buffer(batch)
606                .map_err(|e| DbError::InsertError(format!("Failed to push to source: {e}")))?;
607            return Ok(ExecuteResult::RowsAffected(values.len() as u64));
608        }
609
610        // Try inserting into a TableStore-managed table (with PK upsert).
611        // Single lock scope avoids TOCTOU race between has_table/schema/upsert.
612        {
613            let mut ts = self.table_store.write();
614            if ts.has_table(&name) {
615                let schema = ts
616                    .table_schema(&name)
617                    .ok_or_else(|| DbError::TableNotFound(name.clone()))?;
618                let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
619                ts.upsert(&name, &batch)?;
620                drop(ts); // release before sync (which may also lock)
621
622                self.sync_table_to_datafusion(&name)?;
623                return Ok(ExecuteResult::RowsAffected(values.len() as u64));
624            }
625        }
626
627        // Otherwise, insert into a DataFusion MemTable
628        // Look up the table provider
629        let table = self
630            .ctx
631            .table_provider(&name)
632            .await
633            .map_err(|_| DbError::TableNotFound(name.clone()))?;
634
635        let schema = table.schema();
636        let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
637
638        // Deregister the old table, then re-register with the new data
639        self.ctx
640            .deregister_table(&name)
641            .map_err(|e| DbError::InsertError(format!("Failed to deregister table: {e}")))?;
642
643        let mem_table =
644            datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![batch]])
645                .map_err(|e| DbError::InsertError(format!("Failed to create table: {e}")))?;
646
647        self.ctx
648            .register_table(&name, Arc::new(mem_table))
649            .map_err(|e| DbError::InsertError(format!("Failed to register table: {e}")))?;
650
651        Ok(ExecuteResult::RowsAffected(values.len() as u64))
652    }
653
654    /// Handle CREATE TABLE statement.
655    ///
656    /// Creates a static reference/dimension table backed by a `DataFusion`
657    /// `MemTable`. If a PRIMARY KEY is specified, the table is also registered
658    /// in the `TableStore` for upsert/delete/lookup semantics.
659    /// If `WITH (...)` options include a `connector` key, the table is
660    /// registered in the `ConnectorManager` for connector-backed population.
661    #[allow(clippy::unused_self)] // will use self when restore is implemented
662    fn handle_restore_checkpoint(&self, _checkpoint_id: u64) -> Result<ExecuteResult, DbError> {
663        Err(DbError::Unsupported(
664            "RESTORE FROM CHECKPOINT is not yet implemented — \
665             requires pipeline stop, state reload from manifest, \
666             source offset seek, and pipeline restart"
667                .to_string(),
668        ))
669    }
670
671    /// Get a session property value.
672    #[must_use]
673    pub fn get_session_property(&self, key: &str) -> Option<String> {
674        self.session_properties
675            .lock()
676            .get(&key.to_lowercase())
677            .cloned()
678    }
679
680    /// Get all session properties.
681    #[must_use]
682    pub fn session_properties(&self) -> HashMap<String, String> {
683        self.session_properties.lock().clone()
684    }
685
686    /// Subscribe to a named stream or materialized view.
687    ///
688    /// # Errors
689    ///
690    /// Returns `DbError::StreamNotFound` if the stream is not registered.
691    pub fn subscribe<T: crate::handle::FromBatch>(
692        &self,
693        name: &str,
694    ) -> Result<crate::handle::TypedSubscription<T>, DbError> {
695        let sub = self
696            .catalog
697            .get_stream_subscription(name)
698            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
699        Ok(crate::handle::TypedSubscription::from_raw(sub))
700    }
701
702    /// Get a raw Arrow subscription for a named stream (crate-internal).
703    ///
704    /// Used by `api::Connection::subscribe` to create an `ArrowSubscription`
705    /// without requiring the `FromBatch` trait bound.
706    #[cfg(feature = "api")]
707    pub(crate) fn subscribe_raw(
708        &self,
709        name: &str,
710    ) -> Result<laminar_core::streaming::Subscription<crate::catalog::ArrowRecord>, DbError> {
711        self.catalog
712            .get_stream_subscription(name)
713            .ok_or_else(|| DbError::StreamNotFound(name.to_string()))
714    }
715
716    /// Handle EXPLAIN statement — show the streaming query plan.
717    fn handle_explain(&self, statement: &StreamingStatement) -> Result<ExecuteResult, DbError> {
718        let mut planner = self.planner.lock();
719
720        // Plan the inner statement to extract streaming info
721        let plan_result = planner.plan(statement);
722
723        let mut rows: Vec<(String, String)> = Vec::new();
724
725        match plan_result {
726            Ok(plan) => {
727                rows.push((
728                    "plan_type".into(),
729                    match &plan {
730                        laminar_sql::planner::StreamingPlan::Query(_) => "Query",
731                        laminar_sql::planner::StreamingPlan::RegisterSource(_) => "RegisterSource",
732                        laminar_sql::planner::StreamingPlan::RegisterSink(_) => "RegisterSink",
733                        laminar_sql::planner::StreamingPlan::Standard(_) => "Standard",
734                        laminar_sql::planner::StreamingPlan::DagExplain(_) => "DagExplain",
735                        laminar_sql::planner::StreamingPlan::RegisterLookupTable(_) => {
736                            "RegisterLookupTable"
737                        }
738                        laminar_sql::planner::StreamingPlan::DropLookupTable { .. } => {
739                            "DropLookupTable"
740                        }
741                    }
742                    .into(),
743                ));
744                match &plan {
745                    laminar_sql::planner::StreamingPlan::Query(qp) => {
746                        if let Some(name) = &qp.name {
747                            rows.push(("query_name".into(), name.clone()));
748                        }
749                        if let Some(wc) = &qp.window_config {
750                            rows.push(("window".into(), format!("{wc}")));
751                        }
752                        if let Some(jcs) = &qp.join_config {
753                            if jcs.len() == 1 {
754                                rows.push(("join".into(), format!("{}", jcs[0])));
755                            } else {
756                                for (i, jc) in jcs.iter().enumerate() {
757                                    rows.push((format!("join_step_{}", i + 1), format!("{jc}")));
758                                }
759                            }
760                        }
761                        if let Some(oc) = &qp.order_config {
762                            rows.push(("order_by".into(), format!("{oc:?}")));
763                        }
764                        if let Some(fc) = &qp.frame_config {
765                            rows.push((
766                                "frame_functions".into(),
767                                format!("{}", fc.functions.len()),
768                            ));
769                        }
770                        if let Some(ec) = &qp.emit_clause {
771                            rows.push(("emit".into(), format!("{ec}")));
772                        }
773                    }
774                    laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
775                        rows.push(("source".into(), info.name.clone()));
776                    }
777                    laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
778                        rows.push(("sink".into(), info.name.clone()));
779                    }
780                    laminar_sql::planner::StreamingPlan::Standard(_) => {
781                        rows.push(("execution".into(), "DataFusion pass-through".into()));
782                    }
783                    laminar_sql::planner::StreamingPlan::DagExplain(output) => {
784                        rows.push(("dag_topology".into(), output.topology_text.clone()));
785                    }
786                    laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
787                        rows.push(("lookup_table".into(), info.name.clone()));
788                    }
789                    laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
790                        rows.push(("drop_lookup_table".into(), name.clone()));
791                    }
792                }
793            }
794            Err(e) => {
795                // Even if planning fails, show what we know
796                rows.push(("error".into(), format!("{e}")));
797                rows.push((
798                    "statement".into(),
799                    format!("{:?}", std::mem::discriminant(statement)),
800                ));
801            }
802        }
803
804        let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
805        let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
806
807        let schema = Arc::new(Schema::new(vec![
808            Field::new("plan_key", DataType::Utf8, false),
809            Field::new("plan_value", DataType::Utf8, false),
810        ]));
811
812        let batch = RecordBatch::try_new(
813            schema,
814            vec![
815                Arc::new(StringArray::from(keys)),
816                Arc::new(StringArray::from(values)),
817            ],
818        )
819        .map_err(|e| DbError::InvalidOperation(format!("explain metadata: {e}")))?;
820
821        Ok(ExecuteResult::Metadata(batch))
822    }
823
824    /// Handle EXPLAIN ANALYZE: run the plan and collect execution metrics.
825    async fn handle_explain_analyze(
826        &self,
827        statement: &StreamingStatement,
828        original_sql: &str,
829    ) -> Result<ExecuteResult, DbError> {
830        // First get the normal EXPLAIN output
831        let explain_result = self.handle_explain(statement)?;
832        let mut rows: Vec<(String, String)> = Vec::new();
833
834        if let ExecuteResult::Metadata(explain_batch) = &explain_result {
835            let keys_col = explain_batch
836                .column(0)
837                .as_any()
838                .downcast_ref::<StringArray>();
839            let vals_col = explain_batch
840                .column(1)
841                .as_any()
842                .downcast_ref::<StringArray>();
843            if let (Some(keys), Some(vals)) = (keys_col, vals_col) {
844                for i in 0..explain_batch.num_rows() {
845                    rows.push((keys.value(i).to_string(), vals.value(i).to_string()));
846                }
847            }
848        }
849
850        // Extract the inner SQL from the original EXPLAIN ANALYZE statement
851        let upper = original_sql.to_uppercase();
852        let inner_start = upper.find("ANALYZE").map_or(0, |pos| pos + "ANALYZE".len());
853        let inner_sql = original_sql[inner_start..].trim();
854
855        // Try to execute the inner query via DataFusion and collect metrics
856        let start = std::time::Instant::now();
857        match self.ctx.sql(inner_sql).await {
858            Ok(df) => match df.collect().await {
859                Ok(batches) => {
860                    let elapsed = start.elapsed();
861                    let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
862                    rows.push(("rows_produced".into(), total_rows.to_string()));
863                    rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
864                    rows.push(("batches_processed".into(), batches.len().to_string()));
865                }
866                Err(e) => {
867                    let elapsed = start.elapsed();
868                    rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
869                    rows.push(("analyze_error".into(), format!("{e}")));
870                }
871            },
872            Err(e) => {
873                rows.push(("analyze_error".into(), format!("{e}")));
874            }
875        }
876
877        let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
878        let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
879
880        let schema = Arc::new(Schema::new(vec![
881            Field::new("plan_key", DataType::Utf8, false),
882            Field::new("plan_value", DataType::Utf8, false),
883        ]));
884
885        let batch = RecordBatch::try_new(
886            schema,
887            vec![
888                Arc::new(StringArray::from(keys)),
889                Arc::new(StringArray::from(values)),
890            ],
891        )
892        .map_err(|e| DbError::InvalidOperation(format!("explain analyze metadata: {e}")))?;
893
894        Ok(ExecuteResult::Metadata(batch))
895    }
896
897    /// Handle a streaming or standard SQL query.
898    #[allow(clippy::too_many_lines)]
899    pub(crate) async fn handle_query(&self, sql: &str) -> Result<ExecuteResult, DbError> {
900        // Synchronous planning under the lock — released before any await
901        let plan = {
902            let statements = parse_streaming_sql(sql)?;
903            if statements.is_empty() {
904                return Err(DbError::InvalidOperation("Empty SQL statement".into()));
905            }
906            let mut planner = self.planner.lock();
907            planner
908                .plan(&statements[0])
909                .map_err(laminar_sql::Error::from)?
910        };
911
912        match plan {
913            laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
914                Ok(ExecuteResult::Ddl(DdlInfo {
915                    statement_type: "DDL".to_string(),
916                    object_name: info.name,
917                }))
918            }
919            laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
920                Ok(ExecuteResult::Ddl(DdlInfo {
921                    statement_type: "DDL".to_string(),
922                    object_name: info.name,
923                }))
924            }
925            laminar_sql::planner::StreamingPlan::Query(query_plan) => {
926                // Check for ASOF join — DataFusion can't parse ASOF syntax
927                if let Some(asof_config) = Self::extract_asof_config(&query_plan) {
928                    return self.execute_asof_query(&asof_config, sql).await;
929                }
930
931                let plan_sql = query_plan.statement.to_string();
932                let logical_plan = self.ctx.state().create_logical_plan(&plan_sql).await?;
933
934                // Try JIT compilation first (when feature enabled).
935                #[cfg(feature = "jit")]
936                {
937                    if let Some(compiled) = self.try_compile_query(sql, &logical_plan) {
938                        tracing::info!(
939                            sql = %sql,
940                            compiled = compiled.metadata.compiled_pipeline_count,
941                            fallback = compiled.metadata.fallback_pipeline_count,
942                            "Query compiled via JIT"
943                        );
944                        return self.bridge_compiled_query(sql, compiled).await;
945                    }
946                }
947
948                // Fall through to DataFusion interpreted execution.
949                let df = self.ctx.execute_logical_plan(logical_plan).await?;
950                let stream = df.execute_stream().await?;
951
952                Ok(self.bridge_query_stream(sql, stream))
953            }
954            laminar_sql::planner::StreamingPlan::Standard(stmt) => {
955                // Async execution without the lock
956                let sql_str = stmt.to_string();
957                let df = self.ctx.sql(&sql_str).await?;
958                let stream = df.execute_stream().await?;
959
960                Ok(self.bridge_query_stream(sql, stream))
961            }
962            laminar_sql::planner::StreamingPlan::DagExplain(output) => {
963                Ok(ExecuteResult::Ddl(DdlInfo {
964                    statement_type: "EXPLAIN DAG".to_string(),
965                    object_name: output.topology_text,
966                }))
967            }
968            laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
969                use laminar_sql::parser::lookup_table::ConnectorType;
970
971                let pk = info
972                    .primary_key
973                    .first()
974                    .ok_or_else(|| {
975                        DbError::InvalidOperation("Lookup table requires a primary key".into())
976                    })?
977                    .clone();
978
979                // Register in TableStore for PK-based upsert
980                let cache_mode = info.properties.cache_memory.map(|mem| {
981                    let max_entries = (mem.as_bytes() / 256).max(1024) as usize;
982                    crate::table_cache_mode::TableCacheMode::Partial { max_entries }
983                });
984                if let Some(cache) = cache_mode {
985                    self.table_store.write().create_table_with_cache(
986                        &info.name,
987                        info.arrow_schema.clone(),
988                        &pk,
989                        cache,
990                    )?;
991                } else {
992                    self.table_store.write().create_table(
993                        &info.name,
994                        info.arrow_schema.clone(),
995                        &pk,
996                    )?;
997                }
998
999                // For external connectors: register in ConnectorManager so
1000                // start_connector_pipeline() handles snapshot + CDC loading
1001                if !matches!(info.properties.connector, ConnectorType::Static) {
1002                    let connector_type_str = match &info.properties.connector {
1003                        ConnectorType::PostgresCdc => "postgres-cdc",
1004                        ConnectorType::MysqlCdc => "mysql-cdc",
1005                        ConnectorType::Redis => "redis",
1006                        ConnectorType::S3Parquet => "s3-parquet",
1007                        ConnectorType::Custom(s) => s.as_str(),
1008                        ConnectorType::Static => unreachable!(),
1009                    };
1010
1011                    self.table_store
1012                        .write()
1013                        .set_connector(&info.name, connector_type_str);
1014
1015                    let refresh = match info.properties.strategy {
1016                        laminar_sql::parser::lookup_table::LookupStrategy::Replicated
1017                        | laminar_sql::parser::lookup_table::LookupStrategy::Partitioned => {
1018                            Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
1019                        }
1020                        laminar_sql::parser::lookup_table::LookupStrategy::OnDemand => {
1021                            Some(laminar_connectors::reference::RefreshMode::Manual)
1022                        }
1023                    };
1024
1025                    // Build connector options from raw WITH clause
1026                    // (exclude keys already consumed by LookupTableProperties)
1027                    let connector_options: HashMap<String, String> = info
1028                        .raw_options
1029                        .iter()
1030                        .filter(|(k, _)| {
1031                            ![
1032                                "connector",
1033                                "strategy",
1034                                "cache.memory",
1035                                "cache.disk",
1036                                "cache.ttl",
1037                                "pushdown",
1038                            ]
1039                            .contains(&k.as_str())
1040                        })
1041                        .map(|(k, v)| (k.clone(), v.clone()))
1042                        .collect();
1043
1044                    let table_cache_mode = info.properties.cache_memory.map(|mem| {
1045                        let max_entries = (mem.as_bytes() / 256).max(1024) as usize;
1046                        crate::table_cache_mode::TableCacheMode::Partial { max_entries }
1047                    });
1048                    let cache_max = info
1049                        .properties
1050                        .cache_memory
1051                        .map(|mem| (mem.as_bytes() / 256).max(1024) as usize);
1052
1053                    self.connector_manager.lock().register_table(
1054                        crate::connector_manager::TableRegistration {
1055                            name: info.name.clone(),
1056                            primary_key: pk,
1057                            connector_type: Some(connector_type_str.to_string()),
1058                            connector_options,
1059                            format: info.raw_options.get("format").cloned(),
1060                            format_options: HashMap::new(),
1061                            refresh,
1062                            cache_mode: table_cache_mode,
1063                            cache_max_entries: cache_max,
1064                            storage: None,
1065                        },
1066                    );
1067                }
1068
1069                // Register in DataFusion for SELECT/JOIN queries
1070                {
1071                    let provider = crate::table_provider::ReferenceTableProvider::new(
1072                        info.name.clone(),
1073                        info.arrow_schema.clone(),
1074                        self.table_store.clone(),
1075                    );
1076                    let _ = self.ctx.deregister_table(&info.name);
1077                    self.ctx
1078                        .register_table(&info.name, Arc::new(provider))
1079                        .map_err(|e| {
1080                            DbError::InvalidOperation(format!(
1081                                "Failed to register lookup table: {e}"
1082                            ))
1083                        })?;
1084                }
1085
1086                // Register snapshot in the lookup registry so the physical
1087                // planner can build LookupJoinExec nodes for JOIN queries.
1088                if let Some(batch) = self.table_store.read().to_record_batch(&info.name) {
1089                    self.lookup_registry.register(
1090                        &info.name,
1091                        laminar_sql::datafusion::LookupSnapshot {
1092                            batch,
1093                            key_columns: info.primary_key.clone(),
1094                        },
1095                    );
1096                }
1097
1098                // Register the logical optimizer rule so JOINs referencing
1099                // this table are rewritten to LookupJoinNode.
1100                self.refresh_lookup_optimizer_rule();
1101
1102                Ok(ExecuteResult::Ddl(DdlInfo {
1103                    statement_type: "CREATE LOOKUP TABLE".to_string(),
1104                    object_name: info.name,
1105                }))
1106            }
1107            laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1108                self.table_store.write().drop_table(&name);
1109                self.connector_manager.lock().unregister_table(&name);
1110                let _ = self.ctx.deregister_table(&name);
1111                self.lookup_registry.unregister(&name);
1112                self.refresh_lookup_optimizer_rule();
1113                Ok(ExecuteResult::Ddl(DdlInfo {
1114                    statement_type: "DROP LOOKUP TABLE".to_string(),
1115                    object_name: name,
1116                }))
1117            }
1118        }
1119    }
1120
1121    /// Bridge a `DataFusion` `SendableRecordBatchStream` into the streaming
1122    /// subscription infrastructure and return a `QueryHandle`.
1123    fn bridge_query_stream(
1124        &self,
1125        sql: &str,
1126        stream: datafusion::physical_plan::SendableRecordBatchStream,
1127    ) -> ExecuteResult {
1128        let query_id = self.catalog.register_query(sql);
1129        let schema = stream.schema();
1130
1131        let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1132        let (source, sink) =
1133            streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1134
1135        let subscription = sink.subscribe();
1136
1137        let source_clone = source.clone();
1138        tokio::spawn(async move {
1139            use tokio_stream::StreamExt;
1140            let mut stream = stream;
1141            while let Some(result) = stream.next().await {
1142                match result {
1143                    Ok(batch) => {
1144                        if source_clone.push_arrow(batch).is_err() {
1145                            break;
1146                        }
1147                    }
1148                    Err(_) => break,
1149                }
1150            }
1151            drop(source_clone);
1152        });
1153
1154        ExecuteResult::Query(QueryHandle {
1155            id: query_id,
1156            schema,
1157            sql: sql.to_string(),
1158            subscription: Some(subscription),
1159            active: true,
1160        })
1161    }
1162
1163    // ── JIT compilation integration ─────────────────────────────────
1164
1165    /// Attempts to compile a query via the JIT compiler stack.
1166    ///
1167    /// Returns `Some(compiled)` if compilation succeeds.
1168    /// Returns `None` if the plan has stateful operators or compilation fails.
1169    /// Compilation errors are logged at `DEBUG` level and swallowed — transparent fallback.
1170    #[cfg(feature = "jit")]
1171    fn try_compile_query(
1172        &self,
1173        sql: &str,
1174        logical_plan: &datafusion::logical_expr::LogicalPlan,
1175    ) -> Option<laminar_core::compiler::CompiledStreamingQuery> {
1176        let Some(mut cache) = self.compiler_cache.try_lock() else {
1177            tracing::debug!(sql = %sql, "Compiler cache contended, falling back to DataFusion");
1178            return None;
1179        };
1180        let config = laminar_core::compiler::QueryConfig::default();
1181        match laminar_core::compiler::compile_streaming_query(
1182            sql,
1183            logical_plan,
1184            &mut cache,
1185            &config,
1186        ) {
1187            Ok(result) => result,
1188            Err(e) => {
1189                tracing::debug!(
1190                    sql = %sql,
1191                    error = %e,
1192                    "JIT compilation failed, falling back to DataFusion"
1193                );
1194                None
1195            }
1196        }
1197    }
1198
1199    /// Executes a compiled query: runs the source scan via `DataFusion`, feeds
1200    /// rows through the compiled pipeline, and bridges output to the subscription.
1201    #[cfg(feature = "jit")]
1202    async fn bridge_compiled_query(
1203        &self,
1204        sql: &str,
1205        compiled: laminar_core::compiler::CompiledStreamingQuery,
1206    ) -> Result<ExecuteResult, DbError> {
1207        use laminar_core::compiler::batch_reader::BatchRowReader;
1208        use laminar_core::compiler::event_time::{EventTimeConfig, RowEventTimeExtractor};
1209        use laminar_core::compiler::pipeline_bridge::Ring1Action;
1210
1211        // 1. Execute source scan via DataFusion to get input stream.
1212        let df = self.ctx.execute_logical_plan(compiled.source_plan).await?;
1213        let input_stream = df.execute_stream().await?;
1214        let output_schema = compiled.output_schema.arrow_schema().clone();
1215
1216        // 2. Set up subscription infrastructure.
1217        let query_id = self.catalog.register_query(sql);
1218        let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1219        let (source, sink) =
1220            streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1221        let subscription = sink.subscribe();
1222
1223        // 3. Build event time extractor (auto-detect from schema).
1224        let input_schema = compiled.input_schema;
1225        let time_config = EventTimeConfig::default();
1226        let time_extractor = RowEventTimeExtractor::from_schema(&input_schema, &time_config);
1227
1228        // 4. Spawn bridge task: input stream → EventRow → compiled pipeline → output batches.
1229        let source_clone = source.clone();
1230        let mut query = compiled.query;
1231        query
1232            .start()
1233            .map_err(|e| DbError::InvalidOperation(e.to_string()))?;
1234
1235        tokio::spawn(async move {
1236            use tokio_stream::StreamExt;
1237            let mut stream = input_stream;
1238            let mut extractor = time_extractor;
1239            let mut arena = bumpalo::Bump::with_capacity(1024 * 128);
1240
1241            while let Some(result) = stream.next().await {
1242                match result {
1243                    Ok(batch) => {
1244                        let reader = BatchRowReader::new(&batch, &input_schema);
1245                        for row_idx in 0..reader.row_count() {
1246                            let row = reader.read_row(row_idx, &arena);
1247                            let event_time = match extractor {
1248                                Some(ref mut ext) => ext.extract(&row),
1249                                #[allow(clippy::cast_possible_wrap)]
1250                                None => row_idx as i64,
1251                            };
1252                            let _ = query.submit_row(&row, event_time, row_idx as u64);
1253                        }
1254                        // Advance watermark after each batch.
1255                        if let Some(ref ext) = extractor {
1256                            let _ = query.advance_watermark(ext.watermark());
1257                        }
1258                        // Drain compiled output.
1259                        let actions = query.poll_ring1();
1260                        for action in actions {
1261                            if let Ring1Action::ProcessBatch(out_batch) = action {
1262                                if source_clone.push_arrow(out_batch).is_err() {
1263                                    return;
1264                                }
1265                            }
1266                        }
1267                        // Reset arena for next batch — keeps underlying allocation.
1268                        arena.reset();
1269                    }
1270                    Err(_) => break,
1271                }
1272            }
1273            // Flush remaining output.
1274            let _ = query.send_eof();
1275            for action in query.poll_ring1() {
1276                if let Ring1Action::ProcessBatch(batch) = action {
1277                    // Best-effort push; subscription channel may be full
1278                    let _ = source_clone.push_arrow(batch);
1279                }
1280            }
1281        });
1282
1283        Ok(ExecuteResult::Query(QueryHandle {
1284            id: query_id,
1285            schema: output_schema,
1286            sql: sql.to_string(),
1287            subscription: Some(subscription),
1288            active: true,
1289        }))
1290    }
1291
1292    /// Extract an ASOF join config from a query plan, if present.
1293    fn extract_asof_config(
1294        plan: &laminar_sql::planner::QueryPlan,
1295    ) -> Option<AsofJoinTranslatorConfig> {
1296        plan.join_config.as_ref()?.iter().find_map(|jc| {
1297            if let JoinOperatorConfig::Asof(cfg) = jc {
1298                Some(cfg.clone())
1299            } else {
1300                None
1301            }
1302        })
1303    }
1304
1305    /// Execute an ASOF join query by fetching left/right tables separately
1306    /// and performing the join in-process (bypasses `DataFusion`'s SQL parser
1307    /// which doesn't understand ASOF syntax).
1308    async fn execute_asof_query(
1309        &self,
1310        asof_config: &AsofJoinTranslatorConfig,
1311        original_sql: &str,
1312    ) -> Result<ExecuteResult, DbError> {
1313        let left_sql = format!("SELECT * FROM {}", asof_config.left_table);
1314        let right_sql = format!("SELECT * FROM {}", asof_config.right_table);
1315
1316        let left_batches = self
1317            .ctx
1318            .sql(&left_sql)
1319            .await
1320            .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?
1321            .collect()
1322            .await
1323            .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?;
1324
1325        let right_batches = self
1326            .ctx
1327            .sql(&right_sql)
1328            .await
1329            .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?
1330            .collect()
1331            .await
1332            .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?;
1333
1334        let result_batch =
1335            crate::asof_batch::execute_asof_join_batch(&left_batches, &right_batches, asof_config)?;
1336
1337        if result_batch.num_rows() == 0 {
1338            let query_id = self.catalog.register_query(original_sql);
1339            return Ok(ExecuteResult::Query(QueryHandle {
1340                id: query_id,
1341                schema: result_batch.schema(),
1342                sql: original_sql.to_string(),
1343                subscription: None,
1344                active: false,
1345            }));
1346        }
1347
1348        let schema = result_batch.schema();
1349        let mem_table =
1350            datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![result_batch]])
1351                .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1352
1353        let _ = self.ctx.deregister_table("__asof_result");
1354        self.ctx
1355            .register_table("__asof_result", Arc::new(mem_table))
1356            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1357
1358        let df = self
1359            .ctx
1360            .sql("SELECT * FROM __asof_result")
1361            .await
1362            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1363        let stream = df
1364            .execute_stream()
1365            .await
1366            .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1367
1368        let _ = self.ctx.deregister_table("__asof_result");
1369
1370        Ok(self.bridge_query_stream(original_sql, stream))
1371    }
1372
1373    /// Get a typed source handle for pushing data.
1374    ///
1375    /// The source must have been created via `CREATE SOURCE`.
1376    ///
1377    /// # Errors
1378    ///
1379    /// Returns `DbError::SourceNotFound` if the source is not registered.
1380    pub fn source<T: laminar_core::streaming::Record>(
1381        &self,
1382        name: &str,
1383    ) -> Result<SourceHandle<T>, DbError> {
1384        let entry = self
1385            .catalog
1386            .get_source(name)
1387            .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1388        SourceHandle::new(entry)
1389    }
1390
1391    /// Get an untyped source handle for pushing `RecordBatch` data.
1392    ///
1393    /// # Errors
1394    ///
1395    /// Returns `DbError::SourceNotFound` if the source is not registered.
1396    pub fn source_untyped(&self, name: &str) -> Result<UntypedSourceHandle, DbError> {
1397        let entry = self
1398            .catalog
1399            .get_source(name)
1400            .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1401        Ok(UntypedSourceHandle::new(entry))
1402    }
1403
1404    /// List all registered sources.
1405    pub fn sources(&self) -> Vec<SourceInfo> {
1406        let names = self.catalog.list_sources();
1407        names
1408            .into_iter()
1409            .filter_map(|name| {
1410                self.catalog.get_source(&name).map(|e| SourceInfo {
1411                    name: e.name.clone(),
1412                    schema: e.schema.clone(),
1413                    watermark_column: e.watermark_column.clone(),
1414                })
1415            })
1416            .collect()
1417    }
1418
1419    /// List all registered sinks.
1420    pub fn sinks(&self) -> Vec<SinkInfo> {
1421        self.catalog
1422            .list_sinks()
1423            .into_iter()
1424            .map(|name| SinkInfo { name })
1425            .collect()
1426    }
1427
1428    /// List all registered streams with their SQL definitions.
1429    pub fn streams(&self) -> Vec<crate::handle::StreamInfo> {
1430        let mgr = self.connector_manager.lock();
1431        mgr.streams()
1432            .iter()
1433            .map(|(name, reg)| crate::handle::StreamInfo {
1434                name: name.clone(),
1435                sql: Some(reg.query_sql.clone()),
1436            })
1437            .collect()
1438    }
1439
1440    /// Build the pipeline topology graph from registered sources, streams,
1441    /// and sinks.
1442    ///
1443    /// Returns a `PipelineTopology` with nodes for every source, stream,
1444    /// and sink, plus edges derived from stream SQL `FROM` references and
1445    /// sink `input` fields.
1446    pub fn pipeline_topology(&self) -> crate::handle::PipelineTopology {
1447        use crate::handle::{PipelineEdge, PipelineNode, PipelineNodeType};
1448
1449        let mut nodes = Vec::new();
1450        let mut edges = Vec::new();
1451
1452        // Collect source names for FROM matching
1453        let source_names = self.catalog.list_sources();
1454
1455        // Source nodes
1456        for name in &source_names {
1457            let schema = self.catalog.get_source(name).map(|e| e.schema.clone());
1458            nodes.push(PipelineNode {
1459                name: name.clone(),
1460                node_type: PipelineNodeType::Source,
1461                schema,
1462                sql: None,
1463            });
1464        }
1465
1466        // Stream nodes + edges from SQL FROM references
1467        let mgr = self.connector_manager.lock();
1468        let stream_names: Vec<String> = mgr.streams().keys().cloned().collect();
1469        for (name, reg) in mgr.streams() {
1470            nodes.push(PipelineNode {
1471                name: name.clone(),
1472                node_type: PipelineNodeType::Stream,
1473                schema: None,
1474                sql: Some(reg.query_sql.clone()),
1475            });
1476
1477            // Extract FROM references by checking which known sources/streams
1478            // appear in the query SQL. This is a lightweight heuristic that
1479            // avoids a full SQL parse.
1480            let sql_upper = reg.query_sql.to_uppercase();
1481            for src in &source_names {
1482                if sql_upper.contains(&src.to_uppercase()) {
1483                    edges.push(PipelineEdge {
1484                        from: src.clone(),
1485                        to: name.clone(),
1486                    });
1487                }
1488            }
1489            // Also check for stream-to-stream references (cascading)
1490            for other in &stream_names {
1491                if other != name && sql_upper.contains(&other.to_uppercase()) {
1492                    edges.push(PipelineEdge {
1493                        from: other.clone(),
1494                        to: name.clone(),
1495                    });
1496                }
1497            }
1498        }
1499
1500        // Sink nodes + edges from input field
1501        for (name, reg) in mgr.sinks() {
1502            nodes.push(PipelineNode {
1503                name: name.clone(),
1504                node_type: PipelineNodeType::Sink,
1505                schema: None,
1506                sql: None,
1507            });
1508
1509            // Sinks read from their `input` field
1510            if !reg.input.is_empty() {
1511                edges.push(PipelineEdge {
1512                    from: reg.input.clone(),
1513                    to: name.clone(),
1514                });
1515            }
1516        }
1517
1518        // Also add catalog-only sinks (no connector type) that aren't
1519        // already in the connector manager
1520        let cm_sink_names: std::collections::HashSet<&String> = mgr.sinks().keys().collect();
1521        for name in self.catalog.list_sinks() {
1522            if !cm_sink_names.contains(&name) {
1523                // Check if there's a sink entry in the catalog with input info
1524                if let Some(input) = self.catalog.get_sink_input(&name) {
1525                    nodes.push(PipelineNode {
1526                        name: name.clone(),
1527                        node_type: PipelineNodeType::Sink,
1528                        schema: None,
1529                        sql: None,
1530                    });
1531                    if !input.is_empty() {
1532                        edges.push(PipelineEdge {
1533                            from: input,
1534                            to: name,
1535                        });
1536                    }
1537                }
1538            }
1539        }
1540
1541        drop(mgr);
1542
1543        crate::handle::PipelineTopology { nodes, edges }
1544    }
1545
1546    /// List all active queries.
1547    pub fn queries(&self) -> Vec<QueryInfo> {
1548        self.catalog
1549            .list_queries()
1550            .into_iter()
1551            .map(|(id, sql, active)| QueryInfo { id, sql, active })
1552            .collect()
1553    }
1554
1555    /// Returns whether streaming checkpointing is enabled.
1556    #[must_use]
1557    pub fn is_checkpoint_enabled(&self) -> bool {
1558        self.config.checkpoint.is_some()
1559    }
1560
1561    /// Returns a checkpoint store instance, if checkpointing is configured.
1562    ///
1563    /// Returns an [`ObjectStoreCheckpointStore`](laminar_storage::ObjectStoreCheckpointStore)
1564    /// when `object_store_url` is set, otherwise a
1565    /// [`FileSystemCheckpointStore`](laminar_storage::FileSystemCheckpointStore).
1566    pub fn checkpoint_store(&self) -> Option<Box<dyn laminar_storage::CheckpointStore>> {
1567        let cp_config = self.config.checkpoint.as_ref()?;
1568        let max_retained = cp_config.max_retained.unwrap_or(3);
1569
1570        if let Some(ref url) = self.config.object_store_url {
1571            let obj_store = laminar_storage::object_store_factory::build_object_store(
1572                url,
1573                &self.config.object_store_options,
1574            )
1575            .ok()?;
1576            let prefix = url_to_checkpoint_prefix(url);
1577            Some(Box::new(
1578                laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
1579                    obj_store,
1580                    prefix,
1581                    max_retained,
1582                )
1583                .ok()?,
1584            ))
1585        } else {
1586            let data_dir = cp_config
1587                .data_dir
1588                .clone()
1589                .or_else(|| self.config.storage_dir.clone())
1590                .unwrap_or_else(|| std::path::PathBuf::from("./data"));
1591            Some(Box::new(
1592                laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
1593                    &data_dir,
1594                    max_retained,
1595                ),
1596            ))
1597        }
1598    }
1599
1600    /// Triggers a streaming checkpoint that persists source offsets, sink
1601    /// positions, and operator state to disk via the
1602    /// [`CheckpointCoordinator`](crate::checkpoint_coordinator::CheckpointCoordinator).
1603    ///
1604    /// Returns the checkpoint result on success, including the checkpoint ID,
1605    /// epoch, and duration.
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns `DbError::Checkpoint` if checkpointing is not enabled, the
1610    /// coordinator has not been initialized (call `start()` first), or the
1611    /// checkpoint operation fails.
1612    pub async fn checkpoint(
1613        &self,
1614    ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
1615        if self.config.checkpoint.is_none() {
1616            return Err(DbError::Checkpoint(
1617                "checkpointing is not enabled".to_string(),
1618            ));
1619        }
1620        let mut guard = self.coordinator.lock().await;
1621        let coord = guard.as_mut().ok_or_else(|| {
1622            DbError::Checkpoint("coordinator not initialized — call start() first".to_string())
1623        })?;
1624        coord
1625            .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
1626            .await
1627    }
1628
1629    /// Returns checkpoint performance statistics.
1630    ///
1631    /// Returns `None` if the checkpoint coordinator has not been initialized.
1632    pub async fn checkpoint_stats(&self) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
1633        let guard = self.coordinator.lock().await;
1634        guard
1635            .as_ref()
1636            .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
1637    }
1638}
1639
1640impl std::fmt::Debug for LaminarDB {
1641    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1642        f.debug_struct("LaminarDB")
1643            .field("sources", &self.catalog.list_sources().len())
1644            .field("sinks", &self.catalog.list_sinks().len())
1645            .field("materialized_views", &self.mv_registry.lock().len())
1646            .field("checkpoint_enabled", &self.is_checkpoint_enabled())
1647            .field("shutdown", &self.is_closed())
1648            .finish_non_exhaustive()
1649    }
1650}
1651
1652/// Wraps `DefaultPhysicalPlanner` with lookup join extension support.
1653struct LookupQueryPlanner {
1654    extension_planner: Arc<dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync>,
1655}
1656
1657impl std::fmt::Debug for LookupQueryPlanner {
1658    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1659        f.debug_struct("LookupQueryPlanner").finish_non_exhaustive()
1660    }
1661}
1662
1663#[async_trait::async_trait]
1664impl datafusion::execution::context::QueryPlanner for LookupQueryPlanner {
1665    async fn create_physical_plan(
1666        &self,
1667        logical_plan: &datafusion::logical_expr::LogicalPlan,
1668        session_state: &datafusion::execution::SessionState,
1669    ) -> datafusion_common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
1670        use datafusion::physical_planner::PhysicalPlanner;
1671        let planner =
1672            datafusion::physical_planner::DefaultPhysicalPlanner::with_extension_planners(vec![
1673                Arc::clone(&self.extension_planner),
1674            ]);
1675        planner
1676            .create_physical_plan(logical_plan, session_state)
1677            .await
1678    }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683    use super::*;
1684    use crate::ddl::extract_connector_from_with_options;
1685
1686    #[tokio::test]
1687    async fn test_open_default() {
1688        let db = LaminarDB::open().unwrap();
1689        assert!(!db.is_closed());
1690        assert!(db.sources().is_empty());
1691        assert!(db.sinks().is_empty());
1692    }
1693
1694    #[tokio::test]
1695    async fn test_create_source() {
1696        let db = LaminarDB::open().unwrap();
1697        let result = db
1698            .execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, ts BIGINT)")
1699            .await
1700            .unwrap();
1701
1702        match result {
1703            ExecuteResult::Ddl(info) => {
1704                assert_eq!(info.statement_type, "CREATE SOURCE");
1705                assert_eq!(info.object_name, "trades");
1706            }
1707            _ => panic!("Expected DDL result"),
1708        }
1709
1710        assert_eq!(db.sources().len(), 1);
1711        assert_eq!(db.sources()[0].name, "trades");
1712    }
1713
1714    #[tokio::test]
1715    async fn test_create_source_with_watermark() {
1716        let db = LaminarDB::open().unwrap();
1717        db.execute(
1718            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND)",
1719        )
1720        .await
1721        .unwrap();
1722
1723        let sources = db.sources();
1724        assert_eq!(sources.len(), 1);
1725        assert_eq!(sources[0].watermark_column, Some("ts".to_string()));
1726    }
1727
1728    #[tokio::test]
1729    async fn test_create_source_duplicate_error() {
1730        let db = LaminarDB::open().unwrap();
1731        db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1732        let result = db.execute("CREATE SOURCE test (id INT)").await;
1733        assert!(result.is_err());
1734    }
1735
1736    #[tokio::test]
1737    async fn test_create_source_if_not_exists() {
1738        let db = LaminarDB::open().unwrap();
1739        db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1740        let result = db
1741            .execute("CREATE SOURCE IF NOT EXISTS test (id INT)")
1742            .await;
1743        assert!(result.is_ok());
1744    }
1745
1746    #[tokio::test]
1747    async fn test_create_or_replace_source() {
1748        let db = LaminarDB::open().unwrap();
1749        db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1750        let result = db
1751            .execute("CREATE OR REPLACE SOURCE test (id INT, name VARCHAR)")
1752            .await;
1753        assert!(result.is_ok());
1754    }
1755
1756    #[tokio::test]
1757    async fn test_create_sink() {
1758        let db = LaminarDB::open().unwrap();
1759        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
1760        db.execute("CREATE SINK output FROM events").await.unwrap();
1761
1762        assert_eq!(db.sinks().len(), 1);
1763    }
1764
1765    #[tokio::test]
1766    async fn test_source_handle_untyped() {
1767        let db = LaminarDB::open().unwrap();
1768        db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
1769            .await
1770            .unwrap();
1771
1772        let handle = db.source_untyped("events").unwrap();
1773        assert_eq!(handle.name(), "events");
1774        assert_eq!(handle.schema().fields().len(), 2);
1775    }
1776
1777    #[tokio::test]
1778    async fn test_source_not_found() {
1779        let db = LaminarDB::open().unwrap();
1780        let result = db.source_untyped("nonexistent");
1781        assert!(matches!(result, Err(DbError::SourceNotFound(_))));
1782    }
1783
1784    #[tokio::test]
1785    async fn test_show_sources() {
1786        let db = LaminarDB::open().unwrap();
1787        db.execute("CREATE SOURCE a (id INT)").await.unwrap();
1788        db.execute("CREATE SOURCE b (id INT)").await.unwrap();
1789
1790        let result = db.execute("SHOW SOURCES").await.unwrap();
1791        match result {
1792            ExecuteResult::Metadata(batch) => {
1793                assert_eq!(batch.num_rows(), 2);
1794            }
1795            _ => panic!("Expected Metadata result"),
1796        }
1797    }
1798
1799    #[tokio::test]
1800    async fn test_describe_source() {
1801        let db = LaminarDB::open().unwrap();
1802        db.execute("CREATE SOURCE events (id BIGINT, name VARCHAR, active BOOLEAN)")
1803            .await
1804            .unwrap();
1805
1806        let result = db.execute("DESCRIBE events").await.unwrap();
1807        match result {
1808            ExecuteResult::Metadata(batch) => {
1809                assert_eq!(batch.num_rows(), 3);
1810            }
1811            _ => panic!("Expected Metadata result"),
1812        }
1813    }
1814
1815    #[tokio::test]
1816    async fn test_describe_table() {
1817        let db = LaminarDB::open().unwrap();
1818        db.execute("CREATE TABLE products (id BIGINT PRIMARY KEY, name VARCHAR, price DOUBLE)")
1819            .await
1820            .unwrap();
1821        db.execute("INSERT INTO products VALUES (1, 'Widget', 9.99)")
1822            .await
1823            .unwrap();
1824
1825        let result = db.execute("DESCRIBE products").await.unwrap();
1826        match result {
1827            ExecuteResult::Metadata(batch) => {
1828                assert_eq!(batch.num_rows(), 3);
1829            }
1830            _ => panic!("Expected Metadata result"),
1831        }
1832    }
1833
1834    #[tokio::test]
1835    async fn test_describe_materialized_view() {
1836        let db = LaminarDB::open().unwrap();
1837        db.execute("CREATE SOURCE events (id BIGINT, name VARCHAR, value DOUBLE)")
1838            .await
1839            .unwrap();
1840        db.execute(
1841            "CREATE MATERIALIZED VIEW event_counts AS \
1842             SELECT name, COUNT(*) as cnt FROM events GROUP BY name",
1843        )
1844        .await
1845        .unwrap();
1846
1847        let result = db.execute("DESCRIBE event_counts").await.unwrap();
1848        match result {
1849            ExecuteResult::Metadata(batch) => {
1850                assert!(batch.num_rows() >= 2, "Should have at least name and cnt");
1851            }
1852            _ => panic!("Expected Metadata result"),
1853        }
1854    }
1855
1856    #[tokio::test]
1857    async fn test_describe_not_found() {
1858        let db = LaminarDB::open().unwrap();
1859        let result = db.execute("DESCRIBE nonexistent").await;
1860        assert!(result.is_err());
1861    }
1862
1863    #[tokio::test]
1864    async fn test_drop_source() {
1865        let db = LaminarDB::open().unwrap();
1866        db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1867        assert_eq!(db.sources().len(), 1);
1868
1869        db.execute("DROP SOURCE test").await.unwrap();
1870        assert_eq!(db.sources().len(), 0);
1871    }
1872
1873    #[tokio::test]
1874    async fn test_drop_source_if_exists() {
1875        let db = LaminarDB::open().unwrap();
1876        // Should not error when source doesn't exist
1877        let result = db.execute("DROP SOURCE IF EXISTS nonexistent").await;
1878        assert!(result.is_ok());
1879    }
1880
1881    #[tokio::test]
1882    async fn test_drop_source_not_found() {
1883        let db = LaminarDB::open().unwrap();
1884        let result = db.execute("DROP SOURCE nonexistent").await;
1885        assert!(matches!(result, Err(DbError::SourceNotFound(_))));
1886    }
1887
1888    #[tokio::test]
1889    async fn test_shutdown() {
1890        let db = LaminarDB::open().unwrap();
1891        assert!(!db.is_closed());
1892        db.close();
1893        assert!(db.is_closed());
1894
1895        let result = db.execute("CREATE SOURCE test (id INT)").await;
1896        assert!(matches!(result, Err(DbError::Shutdown)));
1897    }
1898
1899    #[tokio::test]
1900    async fn test_debug_format() {
1901        let db = LaminarDB::open().unwrap();
1902        let debug = format!("{db:?}");
1903        assert!(debug.contains("LaminarDB"));
1904    }
1905
1906    #[tokio::test]
1907    async fn test_explain_create_source() {
1908        let db = LaminarDB::open().unwrap();
1909        let result = db
1910            .execute("EXPLAIN CREATE SOURCE trades (symbol VARCHAR, price DOUBLE)")
1911            .await
1912            .unwrap();
1913        match result {
1914            ExecuteResult::Metadata(batch) => {
1915                assert!(batch.num_rows() > 0);
1916                let keys = batch
1917                    .column(0)
1918                    .as_any()
1919                    .downcast_ref::<StringArray>()
1920                    .unwrap();
1921                // Should contain plan_type and source info
1922                let key_values: Vec<&str> = (0..batch.num_rows()).map(|i| keys.value(i)).collect();
1923                assert!(key_values.contains(&"plan_type"));
1924            }
1925            _ => panic!("Expected Metadata result for EXPLAIN"),
1926        }
1927    }
1928
1929    #[tokio::test]
1930    async fn test_cancel_query() {
1931        let db = LaminarDB::open().unwrap();
1932        // Register a query via catalog directly for testing
1933        assert_eq!(db.active_query_count(), 0);
1934
1935        // Simulate a query registration
1936        let query_id = db.catalog.register_query("SELECT * FROM test");
1937        assert_eq!(db.active_query_count(), 1);
1938
1939        // Cancel it
1940        db.cancel_query(query_id).unwrap();
1941        assert_eq!(db.active_query_count(), 0);
1942    }
1943
1944    #[tokio::test]
1945    async fn test_source_and_sink_counts() {
1946        let db = LaminarDB::open().unwrap();
1947        assert_eq!(db.source_count(), 0);
1948        assert_eq!(db.sink_count(), 0);
1949
1950        db.execute("CREATE SOURCE a (id INT)").await.unwrap();
1951        db.execute("CREATE SOURCE b (id INT)").await.unwrap();
1952        assert_eq!(db.source_count(), 2);
1953
1954        db.execute("CREATE SINK output FROM a").await.unwrap();
1955        assert_eq!(db.sink_count(), 1);
1956
1957        db.execute("DROP SOURCE a").await.unwrap();
1958        assert_eq!(db.source_count(), 1);
1959    }
1960
1961    // ── Multi-statement execution tests ─────────────────
1962
1963    #[tokio::test]
1964    async fn test_multi_statement_execution() {
1965        let db = LaminarDB::open().unwrap();
1966        db.execute("CREATE SOURCE a (id INT); CREATE SOURCE b (id INT); CREATE SINK output FROM a")
1967            .await
1968            .unwrap();
1969        assert_eq!(db.source_count(), 2);
1970        assert_eq!(db.sink_count(), 1);
1971    }
1972
1973    #[tokio::test]
1974    async fn test_multi_statement_trailing_semicolon() {
1975        let db = LaminarDB::open().unwrap();
1976        db.execute("CREATE SOURCE a (id INT);").await.unwrap();
1977        assert_eq!(db.source_count(), 1);
1978    }
1979
1980    #[tokio::test]
1981    async fn test_multi_statement_error_stops() {
1982        let db = LaminarDB::open().unwrap();
1983        // Second statement should fail (duplicate)
1984        let result = db
1985            .execute("CREATE SOURCE a (id INT); CREATE SOURCE a (id INT)")
1986            .await;
1987        assert!(result.is_err());
1988        // First statement should have succeeded
1989        assert_eq!(db.source_count(), 1);
1990    }
1991
1992    // ── Config variable substitution tests ──────────────
1993
1994    #[tokio::test]
1995    async fn test_config_var_substitution() {
1996        let db = LaminarDB::builder()
1997            .config_var("TABLE_NAME", "events")
1998            .build()
1999            .await
2000            .unwrap();
2001        // Config var in source name won't work (parsed as identifier),
2002        // but it works in WITH option values
2003        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2004        assert_eq!(db.source_count(), 1);
2005    }
2006
2007    // ── CREATE STREAM tests ─────────────────────────────
2008
2009    #[tokio::test]
2010    async fn test_create_stream() {
2011        let db = LaminarDB::open().unwrap();
2012        let result = db
2013            .execute("CREATE STREAM counts AS SELECT COUNT(*) as cnt FROM events")
2014            .await
2015            .unwrap();
2016        match result {
2017            ExecuteResult::Ddl(info) => {
2018                assert_eq!(info.statement_type, "CREATE STREAM");
2019                assert_eq!(info.object_name, "counts");
2020            }
2021            _ => panic!("Expected DDL result"),
2022        }
2023    }
2024
2025    #[tokio::test]
2026    async fn test_drop_stream() {
2027        let db = LaminarDB::open().unwrap();
2028        db.execute("CREATE STREAM counts AS SELECT COUNT(*) as cnt FROM events")
2029            .await
2030            .unwrap();
2031        let result = db.execute("DROP STREAM counts").await.unwrap();
2032        match result {
2033            ExecuteResult::Ddl(info) => {
2034                assert_eq!(info.statement_type, "DROP STREAM");
2035            }
2036            _ => panic!("Expected DDL result"),
2037        }
2038    }
2039
2040    #[tokio::test]
2041    async fn test_drop_stream_not_found() {
2042        let db = LaminarDB::open().unwrap();
2043        let result = db.execute("DROP STREAM nonexistent").await;
2044        assert!(matches!(result, Err(DbError::StreamNotFound(_))));
2045    }
2046
2047    #[tokio::test]
2048    async fn test_drop_stream_if_exists() {
2049        let db = LaminarDB::open().unwrap();
2050        let result = db.execute("DROP STREAM IF EXISTS nonexistent").await;
2051        assert!(result.is_ok());
2052    }
2053
2054    #[tokio::test]
2055    async fn test_show_streams() {
2056        let db = LaminarDB::open().unwrap();
2057        db.execute("CREATE STREAM a AS SELECT 1 FROM events")
2058            .await
2059            .unwrap();
2060        let result = db.execute("SHOW STREAMS").await.unwrap();
2061        match result {
2062            ExecuteResult::Metadata(batch) => {
2063                assert_eq!(batch.num_rows(), 1);
2064            }
2065            _ => panic!("Expected Metadata result"),
2066        }
2067    }
2068
2069    #[tokio::test]
2070    async fn test_stream_duplicate_error() {
2071        let db = LaminarDB::open().unwrap();
2072        db.execute("CREATE STREAM counts AS SELECT COUNT(*) FROM events")
2073            .await
2074            .unwrap();
2075        let result = db
2076            .execute("CREATE STREAM counts AS SELECT COUNT(*) FROM events")
2077            .await;
2078        assert!(matches!(result, Err(DbError::StreamAlreadyExists(_))));
2079    }
2080
2081    #[tokio::test]
2082    async fn test_create_table() {
2083        let db = LaminarDB::open().unwrap();
2084        let result = db
2085            .execute("CREATE TABLE products (id INT, name VARCHAR, price DOUBLE)")
2086            .await
2087            .unwrap();
2088
2089        match result {
2090            ExecuteResult::Ddl(info) => {
2091                assert_eq!(info.statement_type, "CREATE TABLE");
2092                assert_eq!(info.object_name, "products");
2093            }
2094            _ => panic!("Expected DDL result"),
2095        }
2096    }
2097
2098    #[tokio::test]
2099    async fn test_create_table_and_query_empty() {
2100        let db = LaminarDB::open().unwrap();
2101        db.execute("CREATE TABLE dim (id INT, label VARCHAR)")
2102            .await
2103            .unwrap();
2104
2105        let result = db.execute("SELECT * FROM dim").await.unwrap();
2106        match result {
2107            ExecuteResult::Query(q) => {
2108                assert_eq!(q.schema().fields().len(), 2);
2109            }
2110            _ => panic!("Expected Query result"),
2111        }
2112    }
2113
2114    #[tokio::test]
2115    async fn test_insert_into_source() {
2116        let db = LaminarDB::open().unwrap();
2117        db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
2118            .await
2119            .unwrap();
2120
2121        let result = db
2122            .execute("INSERT INTO events VALUES (1, 3.14), (2, 2.72)")
2123            .await
2124            .unwrap();
2125        match result {
2126            ExecuteResult::RowsAffected(n) => assert_eq!(n, 2),
2127            _ => panic!("Expected RowsAffected"),
2128        }
2129    }
2130
2131    #[tokio::test]
2132    async fn test_insert_into_table() {
2133        let db = LaminarDB::open().unwrap();
2134        db.execute("CREATE TABLE products (id INT, name VARCHAR, price DOUBLE)")
2135            .await
2136            .unwrap();
2137
2138        let result = db
2139            .execute("INSERT INTO products VALUES (1, 'Widget', 9.99)")
2140            .await
2141            .unwrap();
2142        match result {
2143            ExecuteResult::RowsAffected(n) => assert_eq!(n, 1),
2144            _ => panic!("Expected RowsAffected"),
2145        }
2146    }
2147
2148    #[tokio::test]
2149    async fn test_insert_into_nonexistent_table() {
2150        let db = LaminarDB::open().unwrap();
2151        let result = db.execute("INSERT INTO nosuch VALUES (1, 2)").await;
2152        assert!(result.is_err());
2153    }
2154
2155    #[tokio::test]
2156    async fn test_create_table_with_types() {
2157        let db = LaminarDB::open().unwrap();
2158        let result = db
2159            .execute("CREATE TABLE orders (id BIGINT NOT NULL, qty SMALLINT, total DECIMAL(10,2))")
2160            .await
2161            .unwrap();
2162
2163        match result {
2164            ExecuteResult::Ddl(info) => {
2165                assert_eq!(info.statement_type, "CREATE TABLE");
2166                assert_eq!(info.object_name, "orders");
2167            }
2168            _ => panic!("Expected DDL result"),
2169        }
2170    }
2171
2172    #[tokio::test]
2173    async fn test_insert_null_values() {
2174        let db = LaminarDB::open().unwrap();
2175        db.execute("CREATE SOURCE data (id BIGINT, label VARCHAR)")
2176            .await
2177            .unwrap();
2178
2179        let result = db
2180            .execute("INSERT INTO data VALUES (1, NULL)")
2181            .await
2182            .unwrap();
2183        match result {
2184            ExecuteResult::RowsAffected(n) => assert_eq!(n, 1),
2185            _ => panic!("Expected RowsAffected"),
2186        }
2187    }
2188
2189    #[tokio::test]
2190    async fn test_insert_negative_values() {
2191        let db = LaminarDB::open().unwrap();
2192        db.execute("CREATE SOURCE temps (id BIGINT, celsius DOUBLE)")
2193            .await
2194            .unwrap();
2195
2196        let result = db
2197            .execute("INSERT INTO temps VALUES (1, -40.0)")
2198            .await
2199            .unwrap();
2200        match result {
2201            ExecuteResult::RowsAffected(n) => assert_eq!(n, 1),
2202            _ => panic!("Expected RowsAffected"),
2203        }
2204    }
2205
2206    // ── Connector registry / DDL validation tests ──
2207
2208    #[tokio::test]
2209    async fn test_create_source_unknown_connector() {
2210        let db = LaminarDB::open().unwrap();
2211        // Use correct SQL syntax: FROM <type> (...) SCHEMA (...)
2212        let result = db
2213            .execute(
2214                "CREATE SOURCE events FROM NONEXISTENT \
2215                 ('topic' = 'test') SCHEMA (id INT)",
2216            )
2217            .await;
2218        assert!(result.is_err());
2219        let err = result.unwrap_err().to_string();
2220        assert!(err.contains("Unknown source connector type"), "got: {err}");
2221    }
2222
2223    #[tokio::test]
2224    async fn test_create_sink_unknown_connector() {
2225        let db = LaminarDB::open().unwrap();
2226        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2227        // Use correct SQL syntax: INTO <type> (...)
2228        let result = db
2229            .execute(
2230                "CREATE SINK output FROM events \
2231                 INTO NONEXISTENT ('topic' = 'out')",
2232            )
2233            .await;
2234        assert!(result.is_err());
2235        let err = result.unwrap_err().to_string();
2236        assert!(err.contains("Unknown sink connector type"), "got: {err}");
2237    }
2238
2239    #[tokio::test]
2240    async fn test_create_source_invalid_format() {
2241        // We test format validation via build_source_config in
2242        // connector_manager::tests (since the SQL parser may reject
2243        // unknown formats at parse time rather than DDL validation).
2244        // Here we verify that an error is returned either way.
2245        let db = LaminarDB::open().unwrap();
2246        let result = db
2247            .execute(
2248                "CREATE SOURCE events FROM NONEXISTENT \
2249                 FORMAT BADFORMAT SCHEMA (id INT)",
2250            )
2251            .await;
2252        assert!(result.is_err());
2253    }
2254
2255    #[tokio::test]
2256    async fn test_connector_registry_accessor() {
2257        let db = LaminarDB::open().unwrap();
2258        let registry = db.connector_registry();
2259
2260        // With feature flags enabled, built-in connectors are auto-registered.
2261        // Without any features, registry should be empty.
2262        #[allow(unused_mut)]
2263        let mut expected_sources = 0;
2264        #[allow(unused_mut)]
2265        let mut expected_sinks = 0;
2266
2267        #[cfg(feature = "kafka")]
2268        {
2269            expected_sources += 1; // kafka source
2270            expected_sinks += 1; // kafka sink
2271        }
2272        #[cfg(feature = "postgres-cdc")]
2273        {
2274            expected_sources += 1; // postgres CDC source
2275        }
2276        #[cfg(feature = "postgres-sink")]
2277        {
2278            expected_sinks += 1; // postgres sink
2279        }
2280        #[cfg(feature = "delta-lake")]
2281        {
2282            expected_sources += 1; // delta-lake source
2283            expected_sinks += 1; // delta-lake sink
2284        }
2285        #[cfg(feature = "websocket")]
2286        {
2287            expected_sources += 1; // websocket source
2288            expected_sinks += 1; // websocket sink
2289        }
2290        #[cfg(feature = "mysql-cdc")]
2291        {
2292            expected_sources += 1; // mysql CDC source
2293        }
2294        #[cfg(feature = "files")]
2295        {
2296            expected_sources += 1; // file source
2297            expected_sinks += 1; // file sink
2298        }
2299
2300        assert_eq!(registry.list_sources().len(), expected_sources);
2301        assert_eq!(registry.list_sinks().len(), expected_sinks);
2302    }
2303
2304    #[tokio::test]
2305    async fn test_builder_register_connector() {
2306        use std::sync::Arc;
2307
2308        let db = LaminarDB::builder()
2309            .register_connector(|registry| {
2310                registry.register_source(
2311                    "test-source",
2312                    laminar_connectors::config::ConnectorInfo {
2313                        name: "test-source".to_string(),
2314                        display_name: "Test Source".to_string(),
2315                        version: "0.1.0".to_string(),
2316                        is_source: true,
2317                        is_sink: false,
2318                        config_keys: vec![],
2319                    },
2320                    Arc::new(|| Box::new(laminar_connectors::testing::MockSourceConnector::new())),
2321                );
2322            })
2323            .build()
2324            .await
2325            .unwrap();
2326        let registry = db.connector_registry();
2327        assert!(registry.list_sources().contains(&"test-source".to_string()));
2328    }
2329
2330    // ── Materialized View Catalog tests ──
2331
2332    #[tokio::test]
2333    async fn test_create_materialized_view() {
2334        let db = LaminarDB::open().unwrap();
2335        db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
2336            .await
2337            .unwrap();
2338
2339        let result = db
2340            .execute("CREATE MATERIALIZED VIEW event_stats AS SELECT * FROM events")
2341            .await;
2342
2343        // The MV may fail at query execution (no data in DataFusion) but the
2344        // important thing is the MV path is invoked and the registry is wired up.
2345        // If it succeeds, verify the DDL result.
2346        if let Ok(ExecuteResult::Ddl(info)) = &result {
2347            assert_eq!(info.statement_type, "CREATE MATERIALIZED VIEW");
2348            assert_eq!(info.object_name, "event_stats");
2349        }
2350    }
2351
2352    #[tokio::test]
2353    async fn test_mv_registry_base_tables() {
2354        let db = LaminarDB::open().unwrap();
2355        db.execute("CREATE SOURCE trades (sym VARCHAR, price DOUBLE)")
2356            .await
2357            .unwrap();
2358
2359        let registry = db.mv_registry.lock();
2360        assert!(registry.is_base_table("trades"));
2361    }
2362
2363    #[tokio::test]
2364    async fn test_show_materialized_views_empty() {
2365        let db = LaminarDB::open().unwrap();
2366        let result = db.execute("SHOW MATERIALIZED VIEWS").await.unwrap();
2367        match result {
2368            ExecuteResult::Metadata(batch) => {
2369                assert_eq!(batch.num_rows(), 0);
2370                assert_eq!(batch.num_columns(), 3);
2371                assert_eq!(batch.schema().field(0).name(), "view_name");
2372                assert_eq!(batch.schema().field(1).name(), "sql");
2373                assert_eq!(batch.schema().field(2).name(), "state");
2374            }
2375            _ => panic!("Expected Metadata result"),
2376        }
2377    }
2378
2379    #[tokio::test]
2380    async fn test_drop_materialized_view_if_exists() {
2381        let db = LaminarDB::open().unwrap();
2382        // Should not error with IF EXISTS on non-existent view
2383        let result = db
2384            .execute("DROP MATERIALIZED VIEW IF EXISTS nonexistent")
2385            .await
2386            .unwrap();
2387        match result {
2388            ExecuteResult::Ddl(info) => {
2389                assert_eq!(info.statement_type, "DROP MATERIALIZED VIEW");
2390            }
2391            _ => panic!("Expected Ddl result"),
2392        }
2393    }
2394
2395    #[tokio::test]
2396    async fn test_drop_materialized_view_not_found() {
2397        let db = LaminarDB::open().unwrap();
2398        let result = db.execute("DROP MATERIALIZED VIEW nonexistent").await;
2399        assert!(result.is_err());
2400        let err = result.unwrap_err().to_string();
2401        assert!(
2402            err.contains("not found"),
2403            "Expected 'not found' error, got: {err}"
2404        );
2405    }
2406
2407    #[tokio::test]
2408    async fn test_create_mv_if_not_exists() {
2409        let db = LaminarDB::open().unwrap();
2410        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2411
2412        // Register a view directly in the registry for this test
2413        {
2414            let mut registry = db.mv_registry.lock();
2415            let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2416            let mv = laminar_core::mv::MaterializedView::new(
2417                "my_view",
2418                "SELECT * FROM events",
2419                vec!["events".to_string()],
2420                schema,
2421            );
2422            registry.register(mv).unwrap();
2423        }
2424
2425        // IF NOT EXISTS should succeed without error
2426        let result = db
2427            .execute("CREATE MATERIALIZED VIEW IF NOT EXISTS my_view AS SELECT * FROM events")
2428            .await
2429            .unwrap();
2430        match result {
2431            ExecuteResult::Ddl(info) => {
2432                assert_eq!(info.object_name, "my_view");
2433            }
2434            _ => panic!("Expected Ddl result"),
2435        }
2436    }
2437
2438    #[tokio::test]
2439    async fn test_create_mv_duplicate_error() {
2440        let db = LaminarDB::open().unwrap();
2441        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2442
2443        // Register a view directly
2444        {
2445            let mut registry = db.mv_registry.lock();
2446            let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2447            let mv = laminar_core::mv::MaterializedView::new(
2448                "my_view",
2449                "SELECT * FROM events",
2450                vec!["events".to_string()],
2451                schema,
2452            );
2453            registry.register(mv).unwrap();
2454        }
2455
2456        // Without IF NOT EXISTS, should error
2457        let result = db
2458            .execute("CREATE MATERIALIZED VIEW my_view AS SELECT * FROM events")
2459            .await;
2460        assert!(result.is_err());
2461        let err = result.unwrap_err().to_string();
2462        assert!(
2463            err.contains("already exists"),
2464            "Expected 'already exists' error, got: {err}"
2465        );
2466    }
2467
2468    #[tokio::test]
2469    async fn test_show_materialized_views_with_entries() {
2470        let db = LaminarDB::open().unwrap();
2471        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2472
2473        // Register views directly for metadata testing
2474        {
2475            let mut registry = db.mv_registry.lock();
2476            let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2477            let mv = laminar_core::mv::MaterializedView::new(
2478                "view_a",
2479                "SELECT * FROM events",
2480                vec!["events".to_string()],
2481                schema,
2482            );
2483            registry.register(mv).unwrap();
2484        }
2485
2486        let result = db.execute("SHOW MATERIALIZED VIEWS").await.unwrap();
2487        match result {
2488            ExecuteResult::Metadata(batch) => {
2489                assert_eq!(batch.num_rows(), 1);
2490                let names = batch
2491                    .column(0)
2492                    .as_any()
2493                    .downcast_ref::<StringArray>()
2494                    .unwrap();
2495                assert_eq!(names.value(0), "view_a");
2496            }
2497            _ => panic!("Expected Metadata result"),
2498        }
2499    }
2500
2501    #[tokio::test]
2502    async fn test_drop_mv_and_show() {
2503        let db = LaminarDB::open().unwrap();
2504        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2505
2506        // Register a view
2507        {
2508            let mut registry = db.mv_registry.lock();
2509            let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2510            let mv = laminar_core::mv::MaterializedView::new(
2511                "temp_view",
2512                "SELECT * FROM events",
2513                vec!["events".to_string()],
2514                schema,
2515            );
2516            registry.register(mv).unwrap();
2517        }
2518
2519        // Verify it's there
2520        assert_eq!(db.mv_registry.lock().len(), 1);
2521
2522        // Drop it
2523        db.execute("DROP MATERIALIZED VIEW temp_view")
2524            .await
2525            .unwrap();
2526
2527        // Verify it's gone
2528        assert_eq!(db.mv_registry.lock().len(), 0);
2529    }
2530
2531    #[tokio::test]
2532    async fn test_debug_includes_mv_count() {
2533        let db = LaminarDB::open().unwrap();
2534        let debug = format!("{db:?}");
2535        assert!(
2536            debug.contains("materialized_views: 0"),
2537            "Debug should include MV count, got: {debug}"
2538        );
2539    }
2540
2541    // ── Pipeline Topology Introspection tests ──
2542
2543    #[tokio::test]
2544    async fn test_pipeline_topology_empty() {
2545        let db = LaminarDB::open().unwrap();
2546        let topo = db.pipeline_topology();
2547        assert!(topo.nodes.is_empty());
2548        assert!(topo.edges.is_empty());
2549    }
2550
2551    #[tokio::test]
2552    async fn test_pipeline_topology_sources_only() {
2553        use crate::handle::PipelineNodeType;
2554
2555        let db = LaminarDB::open().unwrap();
2556        db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
2557            .await
2558            .unwrap();
2559        db.execute("CREATE SOURCE clicks (url VARCHAR, ts BIGINT)")
2560            .await
2561            .unwrap();
2562
2563        let topo = db.pipeline_topology();
2564        assert_eq!(topo.nodes.len(), 2);
2565        assert!(topo.edges.is_empty());
2566
2567        for node in &topo.nodes {
2568            assert_eq!(node.node_type, PipelineNodeType::Source);
2569            assert!(node.schema.is_some());
2570            assert!(node.sql.is_none());
2571        }
2572    }
2573
2574    #[tokio::test]
2575    async fn test_pipeline_topology_full_pipeline() {
2576        use crate::handle::PipelineNodeType;
2577
2578        let db = LaminarDB::open().unwrap();
2579        db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
2580            .await
2581            .unwrap();
2582        db.execute("CREATE STREAM agg AS SELECT COUNT(*) as cnt FROM events GROUP BY id")
2583            .await
2584            .unwrap();
2585        db.execute("CREATE SINK output FROM agg").await.unwrap();
2586
2587        let topo = db.pipeline_topology();
2588
2589        // Nodes: 1 source + 1 stream + 1 sink = 3
2590        assert_eq!(topo.nodes.len(), 3);
2591
2592        let sources: Vec<_> = topo
2593            .nodes
2594            .iter()
2595            .filter(|n| n.node_type == PipelineNodeType::Source)
2596            .collect();
2597        let streams: Vec<_> = topo
2598            .nodes
2599            .iter()
2600            .filter(|n| n.node_type == PipelineNodeType::Stream)
2601            .collect();
2602        let sinks: Vec<_> = topo
2603            .nodes
2604            .iter()
2605            .filter(|n| n.node_type == PipelineNodeType::Sink)
2606            .collect();
2607
2608        assert_eq!(sources.len(), 1);
2609        assert_eq!(streams.len(), 1);
2610        assert_eq!(sinks.len(), 1);
2611
2612        assert_eq!(sources[0].name, "events");
2613        assert_eq!(streams[0].name, "agg");
2614        assert!(streams[0].sql.is_some());
2615        assert_eq!(sinks[0].name, "output");
2616
2617        // Edges: events->agg, agg->output
2618        assert_eq!(topo.edges.len(), 2);
2619        assert!(topo
2620            .edges
2621            .iter()
2622            .any(|e| e.from == "events" && e.to == "agg"));
2623        assert!(topo
2624            .edges
2625            .iter()
2626            .any(|e| e.from == "agg" && e.to == "output"));
2627    }
2628
2629    #[tokio::test]
2630    async fn test_pipeline_topology_fan_out() {
2631        let db = LaminarDB::open().unwrap();
2632        db.execute("CREATE SOURCE ticks (symbol VARCHAR, price DOUBLE)")
2633            .await
2634            .unwrap();
2635        db.execute("CREATE STREAM ohlc AS SELECT symbol, MIN(price) FROM ticks GROUP BY symbol")
2636            .await
2637            .unwrap();
2638        db.execute("CREATE STREAM vol AS SELECT symbol, COUNT(*) FROM ticks GROUP BY symbol")
2639            .await
2640            .unwrap();
2641
2642        let topo = db.pipeline_topology();
2643
2644        // 1 source + 2 streams = 3 nodes
2645        assert_eq!(topo.nodes.len(), 3);
2646
2647        // Both streams should have an edge from ticks
2648        let ticks_edges: Vec<_> = topo.edges.iter().filter(|e| e.from == "ticks").collect();
2649        assert_eq!(ticks_edges.len(), 2);
2650
2651        let targets: Vec<&str> = ticks_edges.iter().map(|e| e.to.as_str()).collect();
2652        assert!(targets.contains(&"ohlc"));
2653        assert!(targets.contains(&"vol"));
2654    }
2655
2656    #[tokio::test]
2657    async fn test_streams_method() {
2658        let db = LaminarDB::open().unwrap();
2659        assert!(db.streams().is_empty());
2660
2661        db.execute("CREATE STREAM counts AS SELECT COUNT(*) FROM events")
2662            .await
2663            .unwrap();
2664
2665        let streams = db.streams();
2666        assert_eq!(streams.len(), 1);
2667        assert_eq!(streams[0].name, "counts");
2668        assert!(streams[0].sql.is_some());
2669        assert!(
2670            streams[0].sql.as_ref().unwrap().contains("COUNT"),
2671            "SQL should contain the query: {:?}",
2672            streams[0].sql,
2673        );
2674    }
2675
2676    #[tokio::test]
2677    async fn test_pipeline_node_types() {
2678        use crate::handle::PipelineNodeType;
2679
2680        let db = LaminarDB::open().unwrap();
2681        db.execute("CREATE SOURCE src (id INT)").await.unwrap();
2682        db.execute("CREATE STREAM st AS SELECT * FROM src")
2683            .await
2684            .unwrap();
2685        db.execute("CREATE SINK sk FROM st").await.unwrap();
2686
2687        let topo = db.pipeline_topology();
2688
2689        let find = |name: &str| topo.nodes.iter().find(|n| n.name == name).unwrap();
2690
2691        assert_eq!(find("src").node_type, PipelineNodeType::Source);
2692        assert_eq!(find("st").node_type, PipelineNodeType::Stream);
2693        assert_eq!(find("sk").node_type, PipelineNodeType::Sink);
2694    }
2695
2696    // ── Reference Table tests ──
2697
2698    #[tokio::test]
2699    async fn test_create_table_with_primary_key() {
2700        let db = LaminarDB::open().unwrap();
2701        let result = db
2702            .execute(
2703                "CREATE TABLE instruments (\
2704                 symbol VARCHAR PRIMARY KEY, \
2705                 company_name VARCHAR, \
2706                 sector VARCHAR\
2707                 )",
2708            )
2709            .await
2710            .unwrap();
2711
2712        match result {
2713            ExecuteResult::Ddl(info) => {
2714                assert_eq!(info.statement_type, "CREATE TABLE");
2715                assert_eq!(info.object_name, "instruments");
2716            }
2717            _ => panic!("Expected DDL result"),
2718        }
2719
2720        // Verify TableStore registration
2721        let ts = db.table_store.read();
2722        assert!(ts.has_table("instruments"));
2723        assert_eq!(ts.primary_key("instruments"), Some("symbol"));
2724        assert_eq!(ts.table_row_count("instruments"), 0);
2725    }
2726
2727    #[tokio::test]
2728    async fn test_create_table_with_connector_options() {
2729        let db = LaminarDB::open().unwrap();
2730        let result = db
2731            .execute(
2732                "CREATE TABLE instruments (\
2733                 symbol VARCHAR PRIMARY KEY, \
2734                 company_name VARCHAR\
2735                 ) WITH (connector = 'kafka', topic = 'instruments')",
2736            )
2737            .await
2738            .unwrap();
2739
2740        match result {
2741            ExecuteResult::Ddl(info) => {
2742                assert_eq!(info.object_name, "instruments");
2743            }
2744            _ => panic!("Expected DDL result"),
2745        }
2746
2747        // Verify ConnectorManager registration
2748        let mgr = db.connector_manager.lock();
2749        let tables = mgr.tables();
2750        assert!(tables.contains_key("instruments"));
2751        let reg = &tables["instruments"];
2752        assert_eq!(reg.connector_type.as_deref(), Some("kafka"));
2753        assert_eq!(reg.primary_key, "symbol");
2754
2755        // Verify TableStore connector
2756        let ts = db.table_store.read();
2757        assert_eq!(ts.connector("instruments"), Some("kafka"));
2758    }
2759
2760    #[tokio::test]
2761    async fn test_insert_into_table_with_pk_upserts() {
2762        let db = LaminarDB::open().unwrap();
2763        db.execute(
2764            "CREATE TABLE products (\
2765             id INT PRIMARY KEY, \
2766             name VARCHAR, \
2767             price DOUBLE\
2768             )",
2769        )
2770        .await
2771        .unwrap();
2772
2773        // Insert a row
2774        db.execute("INSERT INTO products VALUES (1, 'Widget', 9.99)")
2775            .await
2776            .unwrap();
2777        assert_eq!(db.table_store.read().table_row_count("products"), 1);
2778
2779        // Upsert (same PK = overwrite)
2780        db.execute("INSERT INTO products VALUES (1, 'Super Widget', 19.99)")
2781            .await
2782            .unwrap();
2783        assert_eq!(db.table_store.read().table_row_count("products"), 1);
2784
2785        // Insert another row (different PK)
2786        db.execute("INSERT INTO products VALUES (2, 'Gadget', 14.99)")
2787            .await
2788            .unwrap();
2789        assert_eq!(db.table_store.read().table_row_count("products"), 2);
2790
2791        // Verify via SELECT
2792        let result = db.execute("SELECT * FROM products").await.unwrap();
2793        match result {
2794            ExecuteResult::Query(q) => {
2795                assert_eq!(q.schema().fields().len(), 3);
2796            }
2797            _ => panic!("Expected Query result"),
2798        }
2799    }
2800
2801    #[tokio::test]
2802    async fn test_show_tables() {
2803        let db = LaminarDB::open().unwrap();
2804
2805        // Empty
2806        let result = db.execute("SHOW TABLES").await.unwrap();
2807        match result {
2808            ExecuteResult::Metadata(batch) => {
2809                assert_eq!(batch.num_rows(), 0);
2810                assert_eq!(batch.num_columns(), 4);
2811                assert_eq!(batch.schema().field(0).name(), "name");
2812                assert_eq!(batch.schema().field(1).name(), "primary_key");
2813                assert_eq!(batch.schema().field(2).name(), "row_count");
2814                assert_eq!(batch.schema().field(3).name(), "connector");
2815            }
2816            _ => panic!("Expected Metadata result"),
2817        }
2818
2819        // With a table
2820        db.execute("CREATE TABLE t (id INT PRIMARY KEY, val VARCHAR)")
2821            .await
2822            .unwrap();
2823        let result = db.execute("SHOW TABLES").await.unwrap();
2824        match result {
2825            ExecuteResult::Metadata(batch) => {
2826                assert_eq!(batch.num_rows(), 1);
2827            }
2828            _ => panic!("Expected Metadata result"),
2829        }
2830    }
2831
2832    #[tokio::test]
2833    async fn test_drop_table() {
2834        let db = LaminarDB::open().unwrap();
2835        db.execute("CREATE TABLE t (id INT PRIMARY KEY, val VARCHAR)")
2836            .await
2837            .unwrap();
2838        assert!(db.table_store.read().has_table("t"));
2839
2840        db.execute("DROP TABLE t").await.unwrap();
2841        assert!(!db.table_store.read().has_table("t"));
2842    }
2843
2844    #[tokio::test]
2845    async fn test_drop_table_if_exists() {
2846        let db = LaminarDB::open().unwrap();
2847        let result = db.execute("DROP TABLE IF EXISTS nonexistent").await;
2848        assert!(result.is_ok());
2849    }
2850
2851    // ── HAVING clause tests ─────────────────────────────
2852
2853    #[tokio::test]
2854    async fn test_having_filters_grouped_results() {
2855        let db = LaminarDB::open().unwrap();
2856
2857        // Create table and query via DataFusion directly
2858        db.ctx
2859            .sql(
2860                "CREATE TABLE hv_trades AS SELECT * FROM (VALUES \
2861                 ('AAPL', 100), ('GOOG', 5), ('MSFT', 50)) \
2862                 AS t(symbol, volume)",
2863            )
2864            .await
2865            .unwrap();
2866
2867        let df = db
2868            .ctx
2869            .sql("SELECT symbol, volume FROM hv_trades WHERE volume > 10 ORDER BY symbol")
2870            .await
2871            .unwrap();
2872
2873        let batches = df.collect().await.unwrap();
2874        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2875        // AAPL(100), MSFT(50) pass; GOOG(5) filtered
2876        assert_eq!(total_rows, 2);
2877    }
2878
2879    #[tokio::test]
2880    async fn test_having_with_aggregate() {
2881        let db = LaminarDB::open().unwrap();
2882
2883        db.ctx
2884            .sql(
2885                "CREATE TABLE hv_orders AS SELECT * FROM (VALUES \
2886                 ('A', 100), ('A', 200), ('B', 50), ('B', 30), ('C', 500)) \
2887                 AS t(category, amount)",
2888            )
2889            .await
2890            .unwrap();
2891
2892        // Query with GROUP BY + HAVING through DataFusion
2893        let df = db
2894            .ctx
2895            .sql(
2896                "SELECT category, SUM(amount) as total \
2897                 FROM hv_orders GROUP BY category \
2898                 HAVING SUM(amount) > 100 ORDER BY category",
2899            )
2900            .await
2901            .unwrap();
2902
2903        let batches = df.collect().await.unwrap();
2904        assert!(!batches.is_empty());
2905
2906        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2907        // A: 300 > 100 ✓, B: 80 ✗, C: 500 > 100 ✓
2908        assert_eq!(total_rows, 2);
2909    }
2910
2911    #[tokio::test]
2912    async fn test_having_all_filtered_out() {
2913        let db = LaminarDB::open().unwrap();
2914
2915        db.ctx
2916            .sql(
2917                "CREATE TABLE items AS SELECT * FROM (VALUES \
2918                 ('x', 1), ('y', 2)) AS t(name, qty)",
2919            )
2920            .await
2921            .unwrap();
2922
2923        let df = db
2924            .ctx
2925            .sql("SELECT name, SUM(qty) as total FROM items GROUP BY name HAVING SUM(qty) > 1000")
2926            .await
2927            .unwrap();
2928
2929        let batches = df.collect().await.unwrap();
2930        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2931        assert_eq!(total_rows, 0);
2932    }
2933
2934    #[tokio::test]
2935    async fn test_having_compound_predicate() {
2936        let db = LaminarDB::open().unwrap();
2937
2938        db.ctx
2939            .sql(
2940                "CREATE TABLE sales AS SELECT * FROM (VALUES \
2941                 ('A', 100), ('A', 200), ('B', 50), ('C', 10), ('C', 20)) \
2942                 AS t(region, amount)",
2943            )
2944            .await
2945            .unwrap();
2946
2947        let df = db
2948            .ctx
2949            .sql(
2950                "SELECT region, COUNT(*) as cnt, SUM(amount) as total \
2951                 FROM sales GROUP BY region \
2952                 HAVING COUNT(*) >= 2 AND SUM(amount) > 25 \
2953                 ORDER BY region",
2954            )
2955            .await
2956            .unwrap();
2957
2958        let batches = df.collect().await.unwrap();
2959        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2960        // A: cnt=2>=2 AND total=300>25 ✓
2961        // B: cnt=1<2 ✗
2962        // C: cnt=2>=2 AND total=30>25 ✓
2963        assert_eq!(total_rows, 2);
2964    }
2965
2966    // ── Multi-way JOIN tests ─────────────────────────────
2967
2968    #[tokio::test]
2969    async fn test_multi_join_two_way_lookup() {
2970        let db = LaminarDB::open().unwrap();
2971
2972        // Create tables via DataFusion
2973        db.ctx
2974            .sql(
2975                "CREATE TABLE orders AS SELECT * FROM (VALUES \
2976                 (1, 100, 'A'), (2, 200, 'B')) AS t(id, customer_id, product_code)",
2977            )
2978            .await
2979            .unwrap();
2980        db.ctx
2981            .sql(
2982                "CREATE TABLE customers AS SELECT * FROM (VALUES \
2983                 (100, 'Alice'), (200, 'Bob')) AS t(id, name)",
2984            )
2985            .await
2986            .unwrap();
2987        db.ctx
2988            .sql(
2989                "CREATE TABLE products AS SELECT * FROM (VALUES \
2990                 ('A', 'Widget'), ('B', 'Gadget')) AS t(code, label)",
2991            )
2992            .await
2993            .unwrap();
2994
2995        // Two-way join through DataFusion
2996        let df = db
2997            .ctx
2998            .sql(
2999                "SELECT o.id, c.name, p.label \
3000                 FROM orders o \
3001                 JOIN customers c ON o.customer_id = c.id \
3002                 JOIN products p ON o.product_code = p.code \
3003                 ORDER BY o.id",
3004            )
3005            .await
3006            .unwrap();
3007
3008        let batches = df.collect().await.unwrap();
3009        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3010        assert_eq!(total_rows, 2);
3011    }
3012
3013    #[tokio::test]
3014    async fn test_multi_join_three_way() {
3015        let db = LaminarDB::open().unwrap();
3016
3017        db.ctx
3018            .sql("CREATE TABLE t1 AS SELECT * FROM (VALUES (1, 10), (2, 20)) AS t(id, fk1)")
3019            .await
3020            .unwrap();
3021        db.ctx
3022            .sql("CREATE TABLE t2 AS SELECT * FROM (VALUES (10, 100), (20, 200)) AS t(id, fk2)")
3023            .await
3024            .unwrap();
3025        db.ctx
3026            .sql("CREATE TABLE t3 AS SELECT * FROM (VALUES (100, 'x'), (200, 'y')) AS t(id, fk3)")
3027            .await
3028            .unwrap();
3029        db.ctx
3030            .sql("CREATE TABLE t4 AS SELECT * FROM (VALUES ('x', 'final_x'), ('y', 'final_y')) AS t(id, val)")
3031            .await
3032            .unwrap();
3033
3034        let df = db
3035            .ctx
3036            .sql(
3037                "SELECT t1.id, t4.val \
3038                 FROM t1 \
3039                 JOIN t2 ON t1.fk1 = t2.id \
3040                 JOIN t3 ON t2.fk2 = t3.id \
3041                 JOIN t4 ON t3.fk3 = t4.id \
3042                 ORDER BY t1.id",
3043            )
3044            .await
3045            .unwrap();
3046
3047        let batches = df.collect().await.unwrap();
3048        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3049        assert_eq!(total_rows, 2);
3050    }
3051
3052    #[tokio::test]
3053    async fn test_multi_join_mixed_types() {
3054        let db = LaminarDB::open().unwrap();
3055
3056        db.ctx
3057            .sql(
3058                "CREATE TABLE stream_a AS SELECT * FROM (VALUES \
3059                 (1, 'k1'), (2, 'k2')) AS t(id, key)",
3060            )
3061            .await
3062            .unwrap();
3063        db.ctx
3064            .sql(
3065                "CREATE TABLE stream_b AS SELECT * FROM (VALUES \
3066                 ('k1', 10), ('k2', 20)) AS t(key, value)",
3067            )
3068            .await
3069            .unwrap();
3070        db.ctx
3071            .sql(
3072                "CREATE TABLE dim_c AS SELECT * FROM (VALUES \
3073                 ('k1', 'label1'), ('k2', 'label2')) AS t(key, label)",
3074            )
3075            .await
3076            .unwrap();
3077
3078        // Inner join + left join
3079        let df = db
3080            .ctx
3081            .sql(
3082                "SELECT a.id, b.value, c.label \
3083                 FROM stream_a a \
3084                 JOIN stream_b b ON a.key = b.key \
3085                 LEFT JOIN dim_c c ON a.key = c.key \
3086                 ORDER BY a.id",
3087            )
3088            .await
3089            .unwrap();
3090
3091        let batches = df.collect().await.unwrap();
3092        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3093        assert_eq!(total_rows, 2);
3094    }
3095
3096    #[tokio::test]
3097    async fn test_multi_join_single_backward_compat() {
3098        let db = LaminarDB::open().unwrap();
3099
3100        db.ctx
3101            .sql(
3102                "CREATE TABLE left_t AS SELECT * FROM (VALUES \
3103                 (1, 'a'), (2, 'b')) AS t(id, val)",
3104            )
3105            .await
3106            .unwrap();
3107        db.ctx
3108            .sql(
3109                "CREATE TABLE right_t AS SELECT * FROM (VALUES \
3110                 (1, 'x'), (2, 'y')) AS t(id, data)",
3111            )
3112            .await
3113            .unwrap();
3114
3115        // Single join still works
3116        let df = db
3117            .ctx
3118            .sql(
3119                "SELECT l.id, l.val, r.data \
3120                 FROM left_t l JOIN right_t r ON l.id = r.id \
3121                 ORDER BY l.id",
3122            )
3123            .await
3124            .unwrap();
3125
3126        let batches = df.collect().await.unwrap();
3127        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3128        assert_eq!(total_rows, 2);
3129    }
3130
3131    // ── Window Frame tests ────────────────────────────────
3132
3133    #[tokio::test]
3134    async fn test_frame_moving_average() {
3135        let db = LaminarDB::open().unwrap();
3136
3137        db.ctx
3138            .sql(
3139                "CREATE TABLE frame_prices AS SELECT * FROM (VALUES \
3140                 (1, 10.0), (2, 20.0), (3, 30.0), (4, 40.0), (5, 50.0)) \
3141                 AS t(id, price)",
3142            )
3143            .await
3144            .unwrap();
3145
3146        let df = db
3147            .ctx
3148            .sql(
3149                "SELECT id, AVG(price) OVER (ORDER BY id \
3150                 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS ma \
3151                 FROM frame_prices ORDER BY id",
3152            )
3153            .await
3154            .unwrap();
3155
3156        let batches = df.collect().await.unwrap();
3157        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3158        assert_eq!(total_rows, 5);
3159
3160        // Verify moving average values: row 3 → avg(10,20,30) = 20
3161        let ma_col = batches[0]
3162            .column(1)
3163            .as_any()
3164            .downcast_ref::<arrow::array::Float64Array>()
3165            .unwrap();
3166        assert!((ma_col.value(2) - 20.0).abs() < 0.01);
3167    }
3168
3169    #[tokio::test]
3170    async fn test_frame_running_sum() {
3171        let db = LaminarDB::open().unwrap();
3172
3173        db.ctx
3174            .sql(
3175                "CREATE TABLE frame_amounts AS SELECT * FROM (VALUES \
3176                 (1, 100.0), (2, 200.0), (3, 300.0)) AS t(id, amount)",
3177            )
3178            .await
3179            .unwrap();
3180
3181        let df = db
3182            .ctx
3183            .sql(
3184                "SELECT id, SUM(amount) OVER (ORDER BY id \
3185                 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running \
3186                 FROM frame_amounts ORDER BY id",
3187            )
3188            .await
3189            .unwrap();
3190
3191        let batches = df.collect().await.unwrap();
3192        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3193        assert_eq!(total_rows, 3);
3194
3195        let sum_col = batches[0]
3196            .column(1)
3197            .as_any()
3198            .downcast_ref::<arrow::array::Float64Array>()
3199            .unwrap();
3200        // Row 3: cumulative sum = 100 + 200 + 300 = 600
3201        assert!((sum_col.value(2) - 600.0).abs() < 0.01);
3202    }
3203
3204    #[tokio::test]
3205    async fn test_frame_rolling_max() {
3206        let db = LaminarDB::open().unwrap();
3207
3208        db.ctx
3209            .sql(
3210                "CREATE TABLE frame_vals AS SELECT * FROM (VALUES \
3211                 (1, 5.0), (2, 15.0), (3, 10.0), (4, 20.0)) AS t(id, price)",
3212            )
3213            .await
3214            .unwrap();
3215
3216        let df = db
3217            .ctx
3218            .sql(
3219                "SELECT id, MAX(price) OVER (ORDER BY id \
3220                 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS rmax \
3221                 FROM frame_vals ORDER BY id",
3222            )
3223            .await
3224            .unwrap();
3225
3226        let batches = df.collect().await.unwrap();
3227        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3228        assert_eq!(total_rows, 4);
3229
3230        let max_col = batches[0]
3231            .column(1)
3232            .as_any()
3233            .downcast_ref::<arrow::array::Float64Array>()
3234            .unwrap();
3235        // Row 3: max(5, 15, 10) = 15
3236        assert!((max_col.value(2) - 15.0).abs() < 0.01);
3237    }
3238
3239    #[tokio::test]
3240    async fn test_frame_rolling_count() {
3241        let db = LaminarDB::open().unwrap();
3242
3243        db.ctx
3244            .sql(
3245                "CREATE TABLE frame_events AS SELECT * FROM (VALUES \
3246                 (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')) AS t(id, code)",
3247            )
3248            .await
3249            .unwrap();
3250
3251        let df = db
3252            .ctx
3253            .sql(
3254                "SELECT id, COUNT(*) OVER (ORDER BY id \
3255                 ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS cnt \
3256                 FROM frame_events ORDER BY id",
3257            )
3258            .await
3259            .unwrap();
3260
3261        let batches = df.collect().await.unwrap();
3262        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3263        assert_eq!(total_rows, 4);
3264
3265        let cnt_col = batches[0]
3266            .column(1)
3267            .as_any()
3268            .downcast_ref::<arrow::array::Int64Array>()
3269            .unwrap();
3270        // Row 1: count of just row 1 = 1
3271        assert_eq!(cnt_col.value(0), 1);
3272        // Row 2+: count of current + 1 preceding = 2
3273        assert_eq!(cnt_col.value(1), 2);
3274        assert_eq!(cnt_col.value(2), 2);
3275    }
3276
3277    // ── Connector-Backed Table Population ──
3278
3279    /// Helper: create a test `RecordBatch` for table population tests.
3280    fn table_test_batch(ids: &[i32], symbols: &[&str]) -> RecordBatch {
3281        use arrow::array::Int32Array;
3282        let schema = Arc::new(Schema::new(vec![
3283            Field::new("id", DataType::Int32, false),
3284            Field::new("symbol", DataType::Utf8, false),
3285        ]));
3286        RecordBatch::try_new(
3287            schema,
3288            vec![
3289                Arc::new(Int32Array::from(ids.to_vec())),
3290                Arc::new(StringArray::from(symbols.to_vec())),
3291            ],
3292        )
3293        .unwrap()
3294    }
3295
3296    /// Register a mock table source factory that returns a `MockReferenceTableSource`
3297    /// pre-loaded with the given snapshot and change batches.
3298    fn register_mock_table_source(
3299        db: &LaminarDB,
3300        snapshot_batches: Vec<RecordBatch>,
3301        change_batches: Vec<RecordBatch>,
3302    ) {
3303        use laminar_connectors::config::ConnectorInfo;
3304        use laminar_connectors::reference::MockReferenceTableSource;
3305
3306        let snap = std::sync::Arc::new(parking_lot::Mutex::new(Some(snapshot_batches)));
3307        let chg = std::sync::Arc::new(parking_lot::Mutex::new(Some(change_batches)));
3308        db.connector_registry().register_table_source(
3309            "mock",
3310            ConnectorInfo {
3311                name: "mock".to_string(),
3312                display_name: "Mock Table Source".to_string(),
3313                version: "0.1.0".to_string(),
3314                is_source: true,
3315                is_sink: false,
3316                config_keys: vec![],
3317            },
3318            std::sync::Arc::new(move |_config| {
3319                let s = snap.lock().take().unwrap_or_default();
3320                let c = chg.lock().take().unwrap_or_default();
3321                Ok(Box::new(MockReferenceTableSource::new(s, c)))
3322            }),
3323        );
3324    }
3325
3326    #[tokio::test]
3327    async fn test_table_source_snapshot_populates_table() {
3328        let db = LaminarDB::open().unwrap();
3329        let batch = table_test_batch(&[1, 2], &["AAPL", "GOOG"]);
3330        register_mock_table_source(&db, vec![batch], vec![]);
3331
3332        db.execute("CREATE SOURCE events (symbol VARCHAR, price DOUBLE)")
3333            .await
3334            .unwrap();
3335
3336        db.execute(
3337            "CREATE TABLE instruments (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3338             WITH (connector = 'mock', format = 'json')",
3339        )
3340        .await
3341        .unwrap();
3342
3343        db.start().await.unwrap();
3344
3345        // Table should be populated by snapshot
3346        let ts = db.table_store.read();
3347        assert!(ts.is_ready("instruments"));
3348        assert_eq!(ts.table_row_count("instruments"), 2);
3349    }
3350
3351    #[tokio::test]
3352    async fn test_table_source_manual_no_snapshot() {
3353        let db = LaminarDB::open().unwrap();
3354        let batch = table_test_batch(&[1], &["AAPL"]);
3355        register_mock_table_source(&db, vec![batch], vec![]);
3356
3357        db.execute("CREATE SOURCE events (symbol VARCHAR, price DOUBLE)")
3358            .await
3359            .unwrap();
3360
3361        db.execute(
3362            "CREATE TABLE instruments (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3363             WITH (connector = 'mock', format = 'json', refresh = 'manual')",
3364        )
3365        .await
3366        .unwrap();
3367
3368        db.start().await.unwrap();
3369
3370        // Manual mode: table stays empty
3371        let ts = db.table_store.read();
3372        assert!(!ts.is_ready("instruments"));
3373        assert_eq!(ts.table_row_count("instruments"), 0);
3374    }
3375
3376    #[tokio::test]
3377    async fn test_table_source_multiple_tables() {
3378        use laminar_connectors::config::ConnectorInfo;
3379        use laminar_connectors::reference::MockReferenceTableSource;
3380
3381        let db = LaminarDB::open().unwrap();
3382
3383        // Register two separate mock factories
3384
3385        let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
3386        let cc = call_count.clone();
3387        let batch1 = table_test_batch(&[1], &["AAPL"]);
3388        let batch2 = table_test_batch(&[2, 3], &["GOOG", "MSFT"]);
3389        let batches =
3390            std::sync::Arc::new(parking_lot::Mutex::new(vec![vec![batch1], vec![batch2]]));
3391
3392        db.connector_registry().register_table_source(
3393            "mock",
3394            ConnectorInfo {
3395                name: "mock".to_string(),
3396                display_name: "Mock".to_string(),
3397                version: "0.1.0".to_string(),
3398                is_source: true,
3399                is_sink: false,
3400                config_keys: vec![],
3401            },
3402            std::sync::Arc::new(move |_config| {
3403                let idx = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst) as usize;
3404                let mut all = batches.lock();
3405                let snap = if idx < all.len() {
3406                    std::mem::take(&mut all[idx])
3407                } else {
3408                    vec![]
3409                };
3410                Ok(Box::new(MockReferenceTableSource::new(snap, vec![])))
3411            }),
3412        );
3413
3414        db.execute("CREATE SOURCE events (x INT)").await.unwrap();
3415
3416        db.execute(
3417            "CREATE TABLE t1 (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3418             WITH (connector = 'mock', format = 'json')",
3419        )
3420        .await
3421        .unwrap();
3422
3423        db.execute(
3424            "CREATE TABLE t2 (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3425             WITH (connector = 'mock', format = 'json')",
3426        )
3427        .await
3428        .unwrap();
3429
3430        db.start().await.unwrap();
3431
3432        let ts = db.table_store.read();
3433        // Both tables should be snapshot-populated (order may vary)
3434        let total = ts.table_row_count("t1") + ts.table_row_count("t2");
3435        assert_eq!(total, 3); // 1 + 2
3436        assert!(ts.is_ready("t1"));
3437        assert!(ts.is_ready("t2"));
3438    }
3439
3440    #[tokio::test]
3441    async fn test_table_create_with_refresh_mode() {
3442        let db = LaminarDB::open().unwrap();
3443
3444        // Just test DDL parsing — no need to register a mock factory
3445        db.execute(
3446            "CREATE TABLE t (id INT PRIMARY KEY, name VARCHAR NOT NULL) \
3447             WITH (connector = 'mock', format = 'json', refresh = 'cdc')",
3448        )
3449        .await
3450        .unwrap();
3451
3452        let mgr = db.connector_manager.lock();
3453        let reg = mgr.tables().get("t").unwrap();
3454        assert_eq!(
3455            reg.refresh,
3456            Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
3457        );
3458    }
3459
3460    #[tokio::test]
3461    async fn test_table_source_snapshot_only_no_changes() {
3462        let db = LaminarDB::open().unwrap();
3463        let snap = table_test_batch(&[1], &["AAPL"]);
3464        let change = table_test_batch(&[2], &["GOOG"]);
3465        register_mock_table_source(&db, vec![snap], vec![change]);
3466
3467        db.execute("CREATE SOURCE events (symbol VARCHAR, price DOUBLE)")
3468            .await
3469            .unwrap();
3470
3471        db.execute(
3472            "CREATE TABLE instruments (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3473             WITH (connector = 'mock', format = 'json', refresh = 'snapshot_only')",
3474        )
3475        .await
3476        .unwrap();
3477
3478        db.start().await.unwrap();
3479
3480        // Should have snapshot data but not the change batch (it's snapshot_only)
3481        let mut ts = db.table_store.write();
3482        assert!(ts.is_ready("instruments"));
3483        assert_eq!(ts.table_row_count("instruments"), 1);
3484        // The change batch id=2/GOOG should NOT be present
3485        assert!(ts.lookup("instruments", "2").is_none());
3486    }
3487
3488    // ── PARTIAL Cache Mode DDL tests ──
3489
3490    #[tokio::test]
3491    async fn test_create_table_partial_cache_mode() {
3492        let db = LaminarDB::open().unwrap();
3493        db.execute(
3494            "CREATE TABLE large_dim (\
3495             id INT PRIMARY KEY, \
3496             name VARCHAR\
3497             ) WITH (cache_mode = 'partial')",
3498        )
3499        .await
3500        .unwrap();
3501
3502        // Verify table exists
3503        {
3504            let ts = db.table_store.read();
3505            assert!(ts.has_table("large_dim"));
3506            // Cache metrics should exist for partial-mode tables
3507        }
3508
3509        // Insert some data
3510        db.execute("INSERT INTO large_dim VALUES (1, 'Alice')")
3511            .await
3512            .unwrap();
3513        db.execute("INSERT INTO large_dim VALUES (2, 'Bob')")
3514            .await
3515            .unwrap();
3516
3517        let ts = db.table_store.read();
3518        assert_eq!(ts.table_row_count("large_dim"), 2);
3519    }
3520
3521    #[tokio::test]
3522    async fn test_create_table_partial_with_max_entries() {
3523        let db = LaminarDB::open().unwrap();
3524        db.execute(
3525            "CREATE TABLE customers (\
3526             id INT PRIMARY KEY, \
3527             name VARCHAR\
3528             ) WITH (cache_mode = 'partial', cache_max_entries = '10000')",
3529        )
3530        .await
3531        .unwrap();
3532
3533        let ts = db.table_store.read();
3534        assert!(ts.has_table("customers"));
3535        // Verify the cache metrics report the correct max_entries
3536        let metrics = ts.cache_metrics("customers").unwrap();
3537        assert_eq!(metrics.cache_max_entries, 10000);
3538    }
3539
3540    #[tokio::test]
3541    async fn test_create_table_invalid_cache_max_entries() {
3542        let db = LaminarDB::open().unwrap();
3543        let result = db
3544            .execute(
3545                "CREATE TABLE bad (\
3546                 id INT PRIMARY KEY, \
3547                 name VARCHAR\
3548                 ) WITH (cache_mode = 'partial', cache_max_entries = 'not_a_number')",
3549            )
3550            .await;
3551        assert!(result.is_err());
3552        assert!(result
3553            .unwrap_err()
3554            .to_string()
3555            .contains("cache_max_entries"));
3556    }
3557
3558    // --- Pipeline Observability API tests ---
3559
3560    #[tokio::test]
3561    async fn test_metrics_initial_state() {
3562        let db = LaminarDB::open().unwrap();
3563        let m = db.metrics();
3564        assert_eq!(m.total_events_ingested, 0);
3565        assert_eq!(m.total_events_emitted, 0);
3566        assert_eq!(m.total_events_dropped, 0);
3567        assert_eq!(m.total_cycles, 0);
3568        assert_eq!(m.total_batches, 0);
3569        assert_eq!(m.state, crate::metrics::PipelineState::Created);
3570        assert_eq!(m.source_count, 0);
3571        assert_eq!(m.stream_count, 0);
3572        assert_eq!(m.sink_count, 0);
3573    }
3574
3575    #[tokio::test]
3576    async fn test_source_metrics_after_push() {
3577        let db = LaminarDB::open().unwrap();
3578        db.execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE)")
3579            .await
3580            .unwrap();
3581
3582        // Push some data
3583        let handle = db.source_untyped("trades").unwrap();
3584        let batch = RecordBatch::try_new(
3585            handle.schema().clone(),
3586            vec![
3587                Arc::new(arrow::array::StringArray::from(vec!["AAPL", "GOOG"])),
3588                Arc::new(arrow::array::Float64Array::from(vec![150.0, 2800.0])),
3589            ],
3590        )
3591        .unwrap();
3592        handle.push_arrow(batch).unwrap();
3593
3594        let sm = db.source_metrics("trades").unwrap();
3595        assert_eq!(sm.name, "trades");
3596        assert_eq!(sm.total_events, 1); // 1 push = sequence 1
3597        assert!(sm.pending > 0);
3598        assert!(sm.capacity > 0);
3599        assert!(sm.utilization > 0.0);
3600    }
3601
3602    #[tokio::test]
3603    async fn test_source_metrics_not_found() {
3604        let db = LaminarDB::open().unwrap();
3605        assert!(db.source_metrics("nonexistent").is_none());
3606    }
3607
3608    #[tokio::test]
3609    async fn test_all_source_metrics() {
3610        let db = LaminarDB::open().unwrap();
3611        db.execute("CREATE SOURCE a (id INT)").await.unwrap();
3612        db.execute("CREATE SOURCE b (id INT)").await.unwrap();
3613
3614        let all = db.all_source_metrics();
3615        assert_eq!(all.len(), 2);
3616        #[allow(clippy::disallowed_types)] // test code
3617        let names: std::collections::HashSet<_> = all.iter().map(|m| m.name.clone()).collect();
3618        assert!(names.contains("a"));
3619        assert!(names.contains("b"));
3620    }
3621
3622    #[tokio::test]
3623    async fn test_total_events_processed_zero() {
3624        let db = LaminarDB::open().unwrap();
3625        assert_eq!(db.total_events_processed(), 0);
3626    }
3627
3628    #[tokio::test]
3629    async fn test_pipeline_state_enum_created() {
3630        let db = LaminarDB::open().unwrap();
3631        assert_eq!(
3632            db.pipeline_state_enum(),
3633            crate::metrics::PipelineState::Created
3634        );
3635    }
3636
3637    #[tokio::test]
3638    async fn test_counters_accessible() {
3639        let db = LaminarDB::open().unwrap();
3640        let c = db.counters();
3641        c.events_ingested
3642            .fetch_add(42, std::sync::atomic::Ordering::Relaxed);
3643        let m = db.metrics();
3644        assert_eq!(m.total_events_ingested, 42);
3645    }
3646
3647    #[tokio::test]
3648    async fn test_metrics_counts_after_create() {
3649        let db = LaminarDB::open().unwrap();
3650        db.execute("CREATE SOURCE s1 (id INT)").await.unwrap();
3651        db.execute("CREATE SINK out1 FROM s1").await.unwrap();
3652
3653        let m = db.metrics();
3654        assert_eq!(m.source_count, 1);
3655        assert_eq!(m.sink_count, 1);
3656    }
3657
3658    #[tokio::test]
3659    async fn test_source_handle_capacity() {
3660        let db = LaminarDB::open().unwrap();
3661        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
3662
3663        let handle = db.source_untyped("events").unwrap();
3664        // Default buffer size is 1024
3665        assert!(handle.capacity() >= 1024);
3666        assert!(!handle.is_backpressured());
3667    }
3668
3669    #[tokio::test]
3670    async fn test_stream_metrics_with_sql() {
3671        let db = LaminarDB::open().unwrap();
3672        db.execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, ts BIGINT)")
3673            .await
3674            .unwrap();
3675        db.execute(
3676            "CREATE STREAM avg_price AS \
3677             SELECT symbol, AVG(price) as avg_price \
3678             FROM trades GROUP BY symbol, TUMBLE(ts, INTERVAL '1' MINUTE)",
3679        )
3680        .await
3681        .unwrap();
3682
3683        let sm = db.stream_metrics("avg_price");
3684        assert!(sm.is_some());
3685        let sm = sm.unwrap();
3686        assert_eq!(sm.name, "avg_price");
3687        assert!(sm.sql.is_some());
3688        assert!(sm.sql.as_deref().unwrap().contains("AVG"));
3689    }
3690
3691    #[tokio::test]
3692    async fn test_all_stream_metrics() {
3693        let db = LaminarDB::open().unwrap();
3694        db.execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, ts BIGINT)")
3695            .await
3696            .unwrap();
3697        db.execute(
3698            "CREATE STREAM s1 AS SELECT symbol, AVG(price) as avg_price \
3699             FROM trades GROUP BY symbol, TUMBLE(ts, INTERVAL '1' MINUTE)",
3700        )
3701        .await
3702        .unwrap();
3703
3704        let all = db.all_stream_metrics();
3705        assert_eq!(all.len(), 1);
3706        assert_eq!(all[0].name, "s1");
3707    }
3708
3709    #[tokio::test]
3710    async fn test_stream_metrics_not_found() {
3711        let db = LaminarDB::open().unwrap();
3712        assert!(db.stream_metrics("nonexistent").is_none());
3713    }
3714
3715    // ── Watermark Source Tracker tests ──────────────────────────────────
3716
3717    /// Helper: push a batch with `Timestamp(µs)` column to a source.
3718    ///
3719    /// `timestamps_ms` are in **milliseconds**; the helper converts to microseconds
3720    /// internally to match the `TIMESTAMP` SQL type (`Timestamp(Microsecond, None)`).
3721    fn make_ts_batch(schema: &arrow::datatypes::SchemaRef, timestamps_ms: &[i64]) -> RecordBatch {
3722        let us_values: Vec<i64> = timestamps_ms.iter().map(|ms| ms * 1000).collect();
3723        RecordBatch::try_new(
3724            schema.clone(),
3725            vec![
3726                Arc::new(arrow::array::Int64Array::from(
3727                    (1..=i64::try_from(timestamps_ms.len()).expect("len fits i64"))
3728                        .collect::<Vec<_>>(),
3729                )),
3730                Arc::new(arrow::array::TimestampMicrosecondArray::from(us_values)),
3731            ],
3732        )
3733        .unwrap()
3734    }
3735
3736    #[tokio::test]
3737    async fn test_watermark_advances_on_push() {
3738        let db = LaminarDB::open().unwrap();
3739        db.execute(
3740            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3741             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3742        )
3743        .await
3744        .unwrap();
3745        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3746            .await
3747            .unwrap();
3748        db.start().await.unwrap();
3749
3750        let handle = db.source_untyped("events").unwrap();
3751        let schema = handle.schema().clone();
3752        let batch = make_ts_batch(&schema, &[1000, 2000, 3000]);
3753        handle.push_arrow(batch).unwrap();
3754
3755        // Wait for pipeline loop to process
3756        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3757
3758        // With 0s delay, watermark should be max timestamp = 3000
3759        let wm = handle.current_watermark();
3760        assert_eq!(
3761            wm, 3000,
3762            "watermark should equal max timestamp with 0s delay"
3763        );
3764    }
3765
3766    #[tokio::test]
3767    async fn test_watermark_bounded_delay() {
3768        let db = LaminarDB::open().unwrap();
3769        db.execute(
3770            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3771             WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND)",
3772        )
3773        .await
3774        .unwrap();
3775        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3776            .await
3777            .unwrap();
3778        db.start().await.unwrap();
3779
3780        let handle = db.source_untyped("events").unwrap();
3781        let schema = handle.schema().clone();
3782
3783        // Push timestamps [1000, 800, 1200] — max = 1200
3784        let batch = make_ts_batch(&schema, &[1000, 800, 1200]);
3785        handle.push_arrow(batch).unwrap();
3786
3787        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3788
3789        // Watermark = max(1200) - 100ms delay = 1100
3790        let wm = handle.current_watermark();
3791        assert_eq!(wm, 1100, "watermark should be max_ts - delay");
3792    }
3793
3794    #[tokio::test]
3795    async fn test_watermark_no_regression() {
3796        let db = LaminarDB::open().unwrap();
3797        db.execute(
3798            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3799             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3800        )
3801        .await
3802        .unwrap();
3803        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3804            .await
3805            .unwrap();
3806        db.start().await.unwrap();
3807
3808        let handle = db.source_untyped("events").unwrap();
3809        let schema = handle.schema().clone();
3810
3811        // Push high timestamps first
3812        let batch1 = make_ts_batch(&schema, &[5000]);
3813        handle.push_arrow(batch1).unwrap();
3814        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3815        let wm1 = handle.current_watermark();
3816
3817        // Push lower timestamps
3818        let batch2 = make_ts_batch(&schema, &[1000]);
3819        handle.push_arrow(batch2).unwrap();
3820        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3821        let wm2 = handle.current_watermark();
3822
3823        // Watermark should never decrease
3824        assert!(wm2 >= wm1, "watermark must not regress: {wm2} < {wm1}");
3825        assert_eq!(wm1, 5000);
3826        assert_eq!(wm2, 5000);
3827    }
3828
3829    #[tokio::test]
3830    async fn test_source_without_watermark() {
3831        let db = LaminarDB::open().unwrap();
3832        db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
3833            .await
3834            .unwrap();
3835
3836        // Source without WATERMARK clause should have default watermark
3837        let handle = db.source_untyped("events").unwrap();
3838        assert_eq!(handle.current_watermark(), i64::MIN);
3839        assert!(handle.max_out_of_orderness().is_none());
3840    }
3841
3842    #[tokio::test]
3843    async fn test_watermark_with_arrow_timestamp_column() {
3844        let db = LaminarDB::open().unwrap();
3845        db.execute(
3846            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3847             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3848        )
3849        .await
3850        .unwrap();
3851        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3852            .await
3853            .unwrap();
3854        db.start().await.unwrap();
3855
3856        let handle = db.source_untyped("events").unwrap();
3857        let schema = handle.schema().clone();
3858
3859        // Build a batch with Arrow Timestamp(us) column matching the schema
3860        let batch = RecordBatch::try_new(
3861            schema,
3862            vec![
3863                Arc::new(arrow::array::Int64Array::from(vec![1])),
3864                Arc::new(arrow::array::TimestampMicrosecondArray::from(vec![
3865                    5_000_000i64,
3866                ])),
3867            ],
3868        )
3869        .unwrap();
3870        handle.push_arrow(batch).unwrap();
3871
3872        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3873        let wm = handle.current_watermark();
3874        // ArrowNative format: timestamp is in microseconds, extractor converts to millis
3875        assert_eq!(wm, 5000, "watermark should work with Arrow Timestamp type");
3876    }
3877
3878    #[tokio::test]
3879    async fn test_pipeline_watermark_global_min() {
3880        let db = LaminarDB::open().unwrap();
3881        db.execute(
3882            "CREATE SOURCE trades (id BIGINT, ts TIMESTAMP, \
3883             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3884        )
3885        .await
3886        .unwrap();
3887        db.execute(
3888            "CREATE SOURCE orders (id BIGINT, ts TIMESTAMP, \
3889             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3890        )
3891        .await
3892        .unwrap();
3893        db.execute("CREATE STREAM out AS SELECT id, ts FROM trades")
3894            .await
3895            .unwrap();
3896        db.start().await.unwrap();
3897
3898        let trades = db.source_untyped("trades").unwrap();
3899        let orders = db.source_untyped("orders").unwrap();
3900
3901        // Push high watermark to trades
3902        let batch1 = make_ts_batch(trades.schema(), &[5000]);
3903        trades.push_arrow(batch1).unwrap();
3904
3905        // Push lower watermark to orders
3906        let batch2 = make_ts_batch(orders.schema(), &[2000]);
3907        orders.push_arrow(batch2).unwrap();
3908
3909        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3910
3911        // Global watermark should be min(5000, 2000) = 2000
3912        let global = db.pipeline_watermark();
3913        assert_eq!(
3914            global, 2000,
3915            "global watermark should be min of all sources"
3916        );
3917    }
3918
3919    #[tokio::test]
3920    async fn test_pipeline_watermark_in_metrics() {
3921        let db = LaminarDB::open().unwrap();
3922        db.execute(
3923            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3924             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3925        )
3926        .await
3927        .unwrap();
3928        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3929            .await
3930            .unwrap();
3931        db.start().await.unwrap();
3932
3933        let handle = db.source_untyped("events").unwrap();
3934        let batch = make_ts_batch(handle.schema(), &[4000]);
3935        handle.push_arrow(batch).unwrap();
3936
3937        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3938
3939        let m = db.metrics();
3940        assert_eq!(
3941            m.pipeline_watermark,
3942            db.pipeline_watermark(),
3943            "metrics().pipeline_watermark should match pipeline_watermark()"
3944        );
3945        assert_eq!(m.pipeline_watermark, 4000);
3946    }
3947
3948    #[tokio::test]
3949    async fn test_source_handle_max_out_of_orderness() {
3950        let db = LaminarDB::open().unwrap();
3951        db.execute(
3952            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3953             WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)",
3954        )
3955        .await
3956        .unwrap();
3957
3958        let handle = db.source_untyped("events").unwrap();
3959        let dur = handle.max_out_of_orderness();
3960        assert_eq!(dur, Some(std::time::Duration::from_secs(5)));
3961    }
3962
3963    #[tokio::test]
3964    async fn test_source_handle_no_watermark() {
3965        let db = LaminarDB::open().unwrap();
3966        db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
3967            .await
3968            .unwrap();
3969
3970        let handle = db.source_untyped("events").unwrap();
3971        assert!(handle.max_out_of_orderness().is_none());
3972    }
3973
3974    #[tokio::test]
3975    async fn test_late_data_dropped_after_external_watermark() {
3976        // Scenario:
3977        //  1. Push on-time batch (ts = [1000, 2000, 3000])
3978        //  2. Advance watermark to 200_000 externally via source.watermark()
3979        //  3. Push late batch (ts = [100, 200, 300]) — all timestamps < watermark
3980        //  4. Verify late batch does NOT appear in stream output
3981        let db = LaminarDB::open().unwrap();
3982        db.execute(
3983            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3984             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3985        )
3986        .await
3987        .unwrap();
3988        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3989            .await
3990            .unwrap();
3991
3992        let sub = db.catalog.get_stream_subscription("out").unwrap();
3993        db.start().await.unwrap();
3994
3995        let handle = db.source_untyped("events").unwrap();
3996        let schema = handle.schema().clone();
3997
3998        // Step 1: Push on-time data
3999        let batch1 = make_ts_batch(&schema, &[1000, 2000, 3000]);
4000        handle.push_arrow(batch1).unwrap();
4001        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4002
4003        // Drain on-time results
4004        let mut on_time_rows = 0;
4005        for _ in 0..256 {
4006            match sub.poll() {
4007                Some(b) => on_time_rows += b.num_rows(),
4008                None => break,
4009            }
4010        }
4011        assert!(on_time_rows > 0, "should have on-time rows");
4012
4013        // Step 2: Advance watermark to 200_000 (external signal)
4014        handle.watermark(200_000);
4015        // Give the pipeline loop a cycle to pick up the external watermark
4016        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4017
4018        // Step 3: Push late data (all timestamps < 200_000)
4019        let late_batch = make_ts_batch(&schema, &[100, 200, 300]);
4020        handle.push_arrow(late_batch).unwrap();
4021        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4022
4023        // Step 4: Check that late data was filtered out
4024        let mut late_rows = 0;
4025        for _ in 0..256 {
4026            match sub.poll() {
4027                Some(b) => late_rows += b.num_rows(),
4028                None => break,
4029            }
4030        }
4031        assert_eq!(late_rows, 0, "late data behind watermark should be dropped");
4032    }
4033
4034    #[test]
4035    fn test_filter_late_rows_filters_correctly() {
4036        use arrow::array::Int64Array;
4037
4038        // Int64 / UnixMillis format
4039        let schema = Arc::new(Schema::new(vec![
4040            Field::new("id", DataType::Int64, false),
4041            Field::new("ts", DataType::Int64, false),
4042        ]));
4043        let batch = RecordBatch::try_new(
4044            schema,
4045            vec![
4046                Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
4047                Arc::new(Int64Array::from(vec![100, 500, 200, 800])),
4048            ],
4049        )
4050        .unwrap();
4051
4052        // Watermark at 300: rows with ts >= 300 survive (ts=500, ts=800)
4053        let filtered = filter_late_rows(
4054            &batch,
4055            "ts",
4056            300,
4057            laminar_core::time::TimestampFormat::UnixMillis,
4058        );
4059        let filtered = filtered.expect("should have some on-time rows");
4060        assert_eq!(filtered.num_rows(), 2);
4061
4062        // Check values
4063        let ids = filtered
4064            .column(0)
4065            .as_any()
4066            .downcast_ref::<Int64Array>()
4067            .unwrap();
4068        assert_eq!(ids.value(0), 2); // ts=500
4069        assert_eq!(ids.value(1), 4); // ts=800
4070    }
4071
4072    #[test]
4073    fn test_filter_late_rows_all_late() {
4074        use arrow::array::Int64Array;
4075
4076        let schema = Arc::new(Schema::new(vec![
4077            Field::new("id", DataType::Int64, false),
4078            Field::new("ts", DataType::Int64, false),
4079        ]));
4080        let batch = RecordBatch::try_new(
4081            schema,
4082            vec![
4083                Arc::new(Int64Array::from(vec![1, 2])),
4084                Arc::new(Int64Array::from(vec![100, 200])),
4085            ],
4086        )
4087        .unwrap();
4088
4089        // Watermark at 1000: all rows are late
4090        let result = filter_late_rows(
4091            &batch,
4092            "ts",
4093            1000,
4094            laminar_core::time::TimestampFormat::UnixMillis,
4095        );
4096        assert!(result.is_none(), "all-late batch should return None");
4097    }
4098
4099    #[test]
4100    fn test_filter_late_rows_no_column() {
4101        use arrow::array::Int64Array;
4102
4103        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4104        let batch =
4105            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2]))]).unwrap();
4106
4107        // Column not found — batch passes through unfiltered
4108        let result = filter_late_rows(
4109            &batch,
4110            "ts",
4111            1000,
4112            laminar_core::time::TimestampFormat::UnixMillis,
4113        );
4114        let result = result.expect("should pass through when column not found");
4115        assert_eq!(result.num_rows(), 2);
4116    }
4117
4118    /// Helper: creates a `RecordBatch` with (id: BIGINT, ts: BIGINT).
4119    fn make_bigint_ts_batch(
4120        schema: &arrow::datatypes::SchemaRef,
4121        timestamps: &[i64],
4122    ) -> RecordBatch {
4123        RecordBatch::try_new(
4124            schema.clone(),
4125            vec![
4126                Arc::new(arrow::array::Int64Array::from(
4127                    (1..=i64::try_from(timestamps.len()).expect("len fits i64"))
4128                        .collect::<Vec<_>>(),
4129                )),
4130                Arc::new(arrow::array::Int64Array::from(timestamps.to_vec())),
4131            ],
4132        )
4133        .unwrap()
4134    }
4135
4136    #[tokio::test]
4137    async fn test_programmatic_watermark_filters_late_rows() {
4138        // Source with set_event_time_column("ts"), no SQL WATERMARK clause.
4139        // Push data, advance watermark, push late data, verify late data filtered.
4140        let db = LaminarDB::open().unwrap();
4141        db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
4142            .await
4143            .unwrap();
4144        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
4145            .await
4146            .unwrap();
4147
4148        let sub = db.catalog.get_stream_subscription("out").unwrap();
4149
4150        let handle = db.source_untyped("events").unwrap();
4151        handle.set_event_time_column("ts");
4152
4153        db.start().await.unwrap();
4154
4155        let schema = handle.schema().clone();
4156
4157        // Step 1: Push on-time data
4158        let batch1 = make_bigint_ts_batch(&schema, &[1000, 2000, 3000]);
4159        handle.push_arrow(batch1).unwrap();
4160        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4161
4162        // Drain on-time results
4163        let mut on_time_rows = 0;
4164        for _ in 0..256 {
4165            match sub.poll() {
4166                Some(b) => on_time_rows += b.num_rows(),
4167                None => break,
4168            }
4169        }
4170        assert!(on_time_rows > 0, "should have on-time rows");
4171
4172        // Step 2: Advance watermark to 200_000 (external signal)
4173        handle.watermark(200_000);
4174        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4175
4176        // Step 3: Push late data (all timestamps < 200_000)
4177        let late_batch = make_bigint_ts_batch(&schema, &[100, 200, 300]);
4178        handle.push_arrow(late_batch).unwrap();
4179        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4180
4181        // Step 4: Check that late data was filtered out
4182        let mut late_rows = 0;
4183        for _ in 0..256 {
4184            match sub.poll() {
4185                Some(b) => late_rows += b.num_rows(),
4186                None => break,
4187            }
4188        }
4189        assert_eq!(late_rows, 0, "late data behind watermark should be dropped");
4190    }
4191
4192    #[tokio::test]
4193    async fn test_sql_watermark_for_col_filters_late_rows() {
4194        // Source with WATERMARK FOR ts (no AS expr), should use zero delay.
4195        let db = LaminarDB::open().unwrap();
4196        db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT, WATERMARK FOR ts)")
4197            .await
4198            .unwrap();
4199        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
4200            .await
4201            .unwrap();
4202
4203        let sub = db.catalog.get_stream_subscription("out").unwrap();
4204        db.start().await.unwrap();
4205
4206        let handle = db.source_untyped("events").unwrap();
4207        let schema = handle.schema().clone();
4208
4209        // Push on-time data
4210        let batch1 = make_bigint_ts_batch(&schema, &[1000, 2000, 3000]);
4211        handle.push_arrow(batch1).unwrap();
4212        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4213
4214        let mut on_time_rows = 0;
4215        for _ in 0..256 {
4216            match sub.poll() {
4217                Some(b) => on_time_rows += b.num_rows(),
4218                None => break,
4219            }
4220        }
4221        assert!(on_time_rows > 0, "should have on-time rows");
4222
4223        // Advance watermark externally
4224        handle.watermark(200_000);
4225        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4226
4227        // Push late data
4228        let late_batch = make_bigint_ts_batch(&schema, &[100, 200, 300]);
4229        handle.push_arrow(late_batch).unwrap();
4230        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4231
4232        let mut late_rows = 0;
4233        for _ in 0..256 {
4234            match sub.poll() {
4235                Some(b) => late_rows += b.num_rows(),
4236                None => break,
4237            }
4238        }
4239        assert_eq!(late_rows, 0, "late data behind watermark should be dropped");
4240    }
4241
4242    #[tokio::test]
4243    async fn test_no_watermark_passes_all_data() {
4244        // Source without any watermark config — all data should pass through.
4245        let db = LaminarDB::open().unwrap();
4246        db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
4247            .await
4248            .unwrap();
4249        db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
4250            .await
4251            .unwrap();
4252
4253        let sub = db.catalog.get_stream_subscription("out").unwrap();
4254        db.start().await.unwrap();
4255
4256        let handle = db.source_untyped("events").unwrap();
4257        let schema = handle.schema().clone();
4258
4259        // Push two batches — no watermark filtering should happen
4260        let batch1 = make_bigint_ts_batch(&schema, &[1000, 2000, 3000]);
4261        handle.push_arrow(batch1).unwrap();
4262        handle.watermark(200_000); // watermark without event_time_column is a no-op for filtering
4263        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4264
4265        let batch2 = make_bigint_ts_batch(&schema, &[100, 200, 300]);
4266        handle.push_arrow(batch2).unwrap();
4267        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4268
4269        // All rows from both batches should appear
4270        let mut total_rows = 0;
4271        for _ in 0..256 {
4272            match sub.poll() {
4273                Some(b) => total_rows += b.num_rows(),
4274                None => break,
4275            }
4276        }
4277        assert_eq!(
4278            total_rows, 6,
4279            "all data should pass through without watermark config"
4280        );
4281    }
4282
4283    #[tokio::test]
4284    async fn test_select_from_source() {
4285        let db = LaminarDB::open().unwrap();
4286        db.execute("CREATE SOURCE sensors (id BIGINT, temp DOUBLE)")
4287            .await
4288            .unwrap();
4289        db.execute("INSERT INTO sensors VALUES (1, 22.5), (2, 23.1)")
4290            .await
4291            .unwrap();
4292
4293        let result = db.execute("SELECT * FROM sensors").await.unwrap();
4294        match result {
4295            ExecuteResult::Query(mut q) => {
4296                // The bridge_query_stream spawns a tokio task; yield to let it run.
4297                tokio::task::yield_now().await;
4298                let sub = q.subscribe_raw().unwrap();
4299                let mut total_rows = 0;
4300                for _ in 0..256 {
4301                    match sub.poll() {
4302                        Some(b) => total_rows += b.num_rows(),
4303                        None => break,
4304                    }
4305                }
4306                assert_eq!(total_rows, 2);
4307            }
4308            _ => panic!("Expected Query result from SELECT on source"),
4309        }
4310    }
4311
4312    #[tokio::test]
4313    async fn test_select_from_dropped_source_fails() {
4314        let db = LaminarDB::open().unwrap();
4315        db.execute("CREATE SOURCE sensors (id BIGINT, temp DOUBLE)")
4316            .await
4317            .unwrap();
4318        db.execute("DROP SOURCE sensors").await.unwrap();
4319
4320        let result = db.execute("SELECT * FROM sensors").await;
4321        assert!(result.is_err(), "SELECT after DROP SOURCE should fail");
4322    }
4323
4324    #[tokio::test]
4325    async fn test_select_from_replaced_source() {
4326        let db = LaminarDB::open().unwrap();
4327        db.execute("CREATE SOURCE sensors (id BIGINT, temp DOUBLE)")
4328            .await
4329            .unwrap();
4330        db.execute("INSERT INTO sensors VALUES (1, 20.0)")
4331            .await
4332            .unwrap();
4333
4334        // Replace the source — old buffer is gone
4335        db.execute("CREATE OR REPLACE SOURCE sensors (id BIGINT, temp DOUBLE)")
4336            .await
4337            .unwrap();
4338        db.execute("INSERT INTO sensors VALUES (2, 30.0)")
4339            .await
4340            .unwrap();
4341
4342        let result = db.execute("SELECT * FROM sensors").await.unwrap();
4343        match result {
4344            ExecuteResult::Query(mut q) => {
4345                tokio::task::yield_now().await;
4346                let sub = q.subscribe_raw().unwrap();
4347                let mut total_rows = 0;
4348                for _ in 0..256 {
4349                    match sub.poll() {
4350                        Some(b) => total_rows += b.num_rows(),
4351                        None => break,
4352                    }
4353                }
4354                assert_eq!(
4355                    total_rows, 1,
4356                    "only the post-replace insert should be visible"
4357                );
4358            }
4359            _ => panic!("Expected Query result"),
4360        }
4361    }
4362
4363    #[tokio::test]
4364    async fn test_mv_registers_stream_in_connector_manager() {
4365        let db = LaminarDB::open().unwrap();
4366        db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
4367            .await
4368            .unwrap();
4369
4370        // Before MV creation, no stream registered
4371        {
4372            let mgr = db.connector_manager.lock();
4373            assert!(
4374                !mgr.streams().contains_key("event_totals"),
4375                "stream should not exist before MV creation"
4376            );
4377        }
4378
4379        let result = db
4380            .execute("CREATE MATERIALIZED VIEW event_totals AS SELECT * FROM events")
4381            .await;
4382
4383        // The MV may fail at query execution (no data), but if DDL succeeds
4384        // the connector manager should have the stream registered
4385        if result.is_ok() {
4386            let mgr = db.connector_manager.lock();
4387            assert!(
4388                mgr.streams().contains_key("event_totals"),
4389                "MV should be registered as a stream in connector manager"
4390            );
4391            let reg = &mgr.streams()["event_totals"];
4392            assert!(
4393                reg.query_sql.contains("events"),
4394                "stream query should reference the source"
4395            );
4396        }
4397    }
4398
4399    #[tokio::test]
4400    async fn test_drop_mv_unregisters_stream() {
4401        let db = LaminarDB::open().unwrap();
4402        db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
4403            .await
4404            .unwrap();
4405
4406        let result = db
4407            .execute("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM events")
4408            .await;
4409
4410        if result.is_ok() {
4411            // Verify registered
4412            {
4413                let mgr = db.connector_manager.lock();
4414                assert!(mgr.streams().contains_key("mv1"));
4415            }
4416
4417            // Drop the MV
4418            db.execute("DROP MATERIALIZED VIEW mv1").await.unwrap();
4419
4420            // Verify unregistered
4421            {
4422                let mgr = db.connector_manager.lock();
4423                assert!(
4424                    !mgr.streams().contains_key("mv1"),
4425                    "stream should be unregistered after DROP MV"
4426                );
4427            }
4428        }
4429    }
4430
4431    #[tokio::test]
4432    async fn test_set_session_property() {
4433        let db = LaminarDB::open().unwrap();
4434        db.execute("SET parallelism = 4").await.unwrap();
4435        assert_eq!(
4436            db.get_session_property("parallelism"),
4437            Some("4".to_string())
4438        );
4439    }
4440
4441    #[tokio::test]
4442    async fn test_set_session_property_string_value() {
4443        let db = LaminarDB::open().unwrap();
4444        db.execute("SET state_ttl = '1 hour'").await.unwrap();
4445        assert_eq!(
4446            db.get_session_property("state_ttl"),
4447            Some("1 hour".to_string())
4448        );
4449    }
4450
4451    #[tokio::test]
4452    async fn test_set_session_property_overwrite() {
4453        let db = LaminarDB::open().unwrap();
4454        db.execute("SET batch_size = 100").await.unwrap();
4455        db.execute("SET batch_size = 200").await.unwrap();
4456        assert_eq!(
4457            db.get_session_property("batch_size"),
4458            Some("200".to_string())
4459        );
4460    }
4461
4462    #[tokio::test]
4463    async fn test_get_session_property_not_set() {
4464        let db = LaminarDB::open().unwrap();
4465        assert_eq!(db.get_session_property("nonexistent"), None);
4466    }
4467
4468    #[tokio::test]
4469    async fn test_session_properties_all() {
4470        let db = LaminarDB::open().unwrap();
4471        db.execute("SET parallelism = 4").await.unwrap();
4472        db.execute("SET state_ttl = '1 hour'").await.unwrap();
4473        let props = db.session_properties();
4474        assert_eq!(props.len(), 2);
4475        assert_eq!(props.get("parallelism"), Some(&"4".to_string()));
4476        assert_eq!(props.get("state_ttl"), Some(&"1 hour".to_string()));
4477    }
4478
4479    #[tokio::test]
4480    async fn test_alter_source_add_column() {
4481        let db = LaminarDB::open().unwrap();
4482        db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
4483            .await
4484            .unwrap();
4485
4486        // Verify initial schema has 2 columns
4487        let schema = db.catalog.describe_source("events").unwrap();
4488        assert_eq!(schema.fields().len(), 2);
4489
4490        // Add a column
4491        db.execute("ALTER SOURCE events ADD COLUMN new_col VARCHAR")
4492            .await
4493            .unwrap();
4494
4495        // Verify updated schema has 3 columns
4496        let schema = db.catalog.describe_source("events").unwrap();
4497        assert_eq!(schema.fields().len(), 3);
4498        assert_eq!(schema.field(2).name(), "new_col");
4499    }
4500
4501    #[tokio::test]
4502    async fn test_alter_source_not_found() {
4503        let db = LaminarDB::open().unwrap();
4504        let result = db
4505            .execute("ALTER SOURCE nonexistent ADD COLUMN col INT")
4506            .await;
4507        assert!(result.is_err());
4508    }
4509
4510    #[tokio::test]
4511    async fn test_alter_source_set_properties() {
4512        let db = LaminarDB::open().unwrap();
4513        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
4514        db.execute("ALTER SOURCE events SET ('batch.size' = '1000')")
4515            .await
4516            .unwrap();
4517        assert_eq!(
4518            db.get_session_property("events.batch.size"),
4519            Some("1000".to_string())
4520        );
4521    }
4522
4523    // ── extract_connector_from_with_options tests ──
4524
4525    #[test]
4526    fn test_extract_connector_from_with_options_basic() {
4527        let mut opts = HashMap::new();
4528        opts.insert("connector".to_string(), "kafka".to_string());
4529        opts.insert("topic".to_string(), "events".to_string());
4530        opts.insert(
4531            "bootstrap.servers".to_string(),
4532            "localhost:9092".to_string(),
4533        );
4534        opts.insert("format".to_string(), "json".to_string());
4535
4536        let (conn_opts, format, fmt_opts) = extract_connector_from_with_options(&opts);
4537
4538        // 'connector' and 'format' are extracted, not in connector_options
4539        assert!(!conn_opts.contains_key("connector"));
4540        assert!(!conn_opts.contains_key("format"));
4541        assert_eq!(conn_opts.get("topic"), Some(&"events".to_string()));
4542        assert_eq!(
4543            conn_opts.get("bootstrap.servers"),
4544            Some(&"localhost:9092".to_string())
4545        );
4546        assert_eq!(format, Some("json".to_string()));
4547        assert!(fmt_opts.is_empty());
4548    }
4549
4550    #[test]
4551    fn test_extract_connector_filters_streaming_keys() {
4552        let mut opts = HashMap::new();
4553        opts.insert("connector".to_string(), "websocket".to_string());
4554        opts.insert("url".to_string(), "wss://feed.example.com".to_string());
4555        opts.insert("buffer_size".to_string(), "4096".to_string());
4556        opts.insert("backpressure".to_string(), "block".to_string());
4557        opts.insert("watermark_delay".to_string(), "5s".to_string());
4558
4559        let (conn_opts, _, _) = extract_connector_from_with_options(&opts);
4560
4561        // Streaming keys should NOT be in connector_options
4562        assert!(!conn_opts.contains_key("buffer_size"));
4563        assert!(!conn_opts.contains_key("backpressure"));
4564        assert!(!conn_opts.contains_key("watermark_delay"));
4565        // Connector-specific key should be present
4566        assert_eq!(
4567            conn_opts.get("url"),
4568            Some(&"wss://feed.example.com".to_string())
4569        );
4570    }
4571
4572    #[test]
4573    fn test_extract_connector_format_options() {
4574        let mut opts = HashMap::new();
4575        opts.insert("connector".to_string(), "kafka".to_string());
4576        opts.insert("format".to_string(), "avro".to_string());
4577        opts.insert(
4578            "format.schema.registry.url".to_string(),
4579            "http://localhost:8081".to_string(),
4580        );
4581        opts.insert("topic".to_string(), "events".to_string());
4582
4583        let (conn_opts, format, fmt_opts) = extract_connector_from_with_options(&opts);
4584
4585        assert_eq!(format, Some("avro".to_string()));
4586        assert_eq!(
4587            fmt_opts.get("schema.registry.url"),
4588            Some(&"http://localhost:8081".to_string())
4589        );
4590        assert_eq!(conn_opts.get("topic"), Some(&"events".to_string()));
4591        assert!(!conn_opts.contains_key("format.schema.registry.url"));
4592    }
4593
4594    #[tokio::test]
4595    async fn test_create_source_with_connector_option() {
4596        // Verify that WITH ('connector' = '...') is accepted at the DDL level.
4597        // The actual connector won't be instantiated because the type isn't
4598        // registered in the default embedded registry, so we just check
4599        // that the error is "Unknown source connector type" (meaning the
4600        // WITH clause was correctly routed) rather than silently ignored.
4601        let db = LaminarDB::open().unwrap();
4602        let result = db
4603            .execute(
4604                "CREATE SOURCE ws_feed (id BIGINT, data TEXT) WITH (
4605                    'connector' = 'websocket',
4606                    'url' = 'wss://feed.example.com',
4607                    'format' = 'json'
4608                )",
4609            )
4610            .await;
4611
4612        // Without the websocket feature, the connector type won't be registered,
4613        // so we expect an "Unknown source connector type" error — which proves
4614        // the WITH clause WAS routed to the connector registry.
4615        if let Err(e) = result {
4616            let msg = e.to_string();
4617            assert!(
4618                msg.contains("Unknown source connector type"),
4619                "Expected connector routing error, got: {msg}"
4620            );
4621        } else {
4622            // If websocket feature IS enabled, the connector type is registered
4623            // and the DDL succeeds — also acceptable.
4624        }
4625    }
4626
4627    #[tokio::test]
4628    async fn test_show_sources_enriched() {
4629        let db = LaminarDB::open().unwrap();
4630        db.execute(
4631            "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND)",
4632        )
4633        .await
4634        .unwrap();
4635
4636        let result = db.execute("SHOW SOURCES").await.unwrap();
4637        match result {
4638            ExecuteResult::Metadata(batch) => {
4639                assert_eq!(batch.num_rows(), 1);
4640                assert_eq!(batch.num_columns(), 4);
4641                assert_eq!(batch.schema().field(0).name(), "source_name");
4642                assert_eq!(batch.schema().field(1).name(), "connector");
4643                assert_eq!(batch.schema().field(2).name(), "format");
4644                assert_eq!(batch.schema().field(3).name(), "watermark_column");
4645
4646                let names = batch
4647                    .column(0)
4648                    .as_any()
4649                    .downcast_ref::<StringArray>()
4650                    .unwrap();
4651                assert_eq!(names.value(0), "events");
4652
4653                let wm = batch
4654                    .column(3)
4655                    .as_any()
4656                    .downcast_ref::<StringArray>()
4657                    .unwrap();
4658                assert_eq!(wm.value(0), "ts");
4659            }
4660            _ => panic!("Expected Metadata result"),
4661        }
4662    }
4663
4664    #[tokio::test]
4665    async fn test_show_sinks_enriched() {
4666        let db = LaminarDB::open().unwrap();
4667        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
4668        db.execute("CREATE SINK output FROM events").await.unwrap();
4669
4670        let result = db.execute("SHOW SINKS").await.unwrap();
4671        match result {
4672            ExecuteResult::Metadata(batch) => {
4673                assert_eq!(batch.num_rows(), 1);
4674                assert_eq!(batch.num_columns(), 4);
4675                assert_eq!(batch.schema().field(0).name(), "sink_name");
4676                assert_eq!(batch.schema().field(1).name(), "input");
4677                assert_eq!(batch.schema().field(2).name(), "connector");
4678                assert_eq!(batch.schema().field(3).name(), "format");
4679
4680                let names = batch
4681                    .column(0)
4682                    .as_any()
4683                    .downcast_ref::<StringArray>()
4684                    .unwrap();
4685                assert_eq!(names.value(0), "output");
4686
4687                let inputs = batch
4688                    .column(1)
4689                    .as_any()
4690                    .downcast_ref::<StringArray>()
4691                    .unwrap();
4692                assert_eq!(inputs.value(0), "events");
4693            }
4694            _ => panic!("Expected Metadata result"),
4695        }
4696    }
4697
4698    #[tokio::test]
4699    async fn test_show_streams_enriched() {
4700        let db = LaminarDB::open().unwrap();
4701        db.execute("CREATE STREAM my_stream AS SELECT 1 FROM events")
4702            .await
4703            .unwrap();
4704
4705        let result = db.execute("SHOW STREAMS").await.unwrap();
4706        match result {
4707            ExecuteResult::Metadata(batch) => {
4708                assert_eq!(batch.num_rows(), 1);
4709                assert_eq!(batch.num_columns(), 2);
4710                assert_eq!(batch.schema().field(0).name(), "stream_name");
4711                assert_eq!(batch.schema().field(1).name(), "sql");
4712
4713                let sqls = batch
4714                    .column(1)
4715                    .as_any()
4716                    .downcast_ref::<StringArray>()
4717                    .unwrap();
4718                assert!(
4719                    sqls.value(0).contains("SELECT"),
4720                    "SQL column should contain query"
4721                );
4722            }
4723            _ => panic!("Expected Metadata result"),
4724        }
4725    }
4726
4727    #[tokio::test]
4728    async fn test_show_create_source() {
4729        let db = LaminarDB::open().unwrap();
4730        let ddl = "CREATE SOURCE events (id BIGINT, name VARCHAR)";
4731        db.execute(ddl).await.unwrap();
4732
4733        let result = db.execute("SHOW CREATE SOURCE events").await.unwrap();
4734        match result {
4735            ExecuteResult::Metadata(batch) => {
4736                assert_eq!(batch.num_rows(), 1);
4737                assert_eq!(batch.schema().field(0).name(), "create_statement");
4738                let stmts = batch
4739                    .column(0)
4740                    .as_any()
4741                    .downcast_ref::<StringArray>()
4742                    .unwrap();
4743                assert_eq!(stmts.value(0), ddl);
4744            }
4745            _ => panic!("Expected Metadata result"),
4746        }
4747    }
4748
4749    #[tokio::test]
4750    async fn test_show_create_sink() {
4751        let db = LaminarDB::open().unwrap();
4752        db.execute("CREATE SOURCE events (id INT)").await.unwrap();
4753        let ddl = "CREATE SINK output FROM events";
4754        db.execute(ddl).await.unwrap();
4755
4756        let result = db.execute("SHOW CREATE SINK output").await.unwrap();
4757        match result {
4758            ExecuteResult::Metadata(batch) => {
4759                assert_eq!(batch.num_rows(), 1);
4760                assert_eq!(batch.schema().field(0).name(), "create_statement");
4761                let stmts = batch
4762                    .column(0)
4763                    .as_any()
4764                    .downcast_ref::<StringArray>()
4765                    .unwrap();
4766                assert_eq!(stmts.value(0), ddl);
4767            }
4768            _ => panic!("Expected Metadata result"),
4769        }
4770    }
4771
4772    #[tokio::test]
4773    async fn test_show_create_source_not_found() {
4774        let db = LaminarDB::open().unwrap();
4775        let result = db.execute("SHOW CREATE SOURCE nonexistent").await;
4776        assert!(result.is_err());
4777    }
4778
4779    #[tokio::test]
4780    async fn test_show_create_sink_not_found() {
4781        let db = LaminarDB::open().unwrap();
4782        let result = db.execute("SHOW CREATE SINK nonexistent").await;
4783        assert!(result.is_err());
4784    }
4785
4786    #[tokio::test]
4787    async fn test_explain_analyze_returns_metrics() {
4788        let db = LaminarDB::open().unwrap();
4789        db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
4790            .await
4791            .unwrap();
4792
4793        let result = db
4794            .execute("EXPLAIN ANALYZE SELECT * FROM events")
4795            .await
4796            .unwrap();
4797        match result {
4798            ExecuteResult::Metadata(batch) => {
4799                let keys = batch
4800                    .column(0)
4801                    .as_any()
4802                    .downcast_ref::<StringArray>()
4803                    .unwrap();
4804                let key_vals: Vec<&str> = (0..batch.num_rows()).map(|i| keys.value(i)).collect();
4805                assert!(
4806                    key_vals.contains(&"rows_produced"),
4807                    "Expected rows_produced metric, got: {key_vals:?}"
4808                );
4809                assert!(
4810                    key_vals.contains(&"execution_time_ms"),
4811                    "Expected execution_time_ms metric, got: {key_vals:?}"
4812                );
4813            }
4814            _ => panic!("Expected Metadata result"),
4815        }
4816    }
4817
4818    #[tokio::test]
4819    async fn test_explain_without_analyze_has_no_metrics() {
4820        let db = LaminarDB::open().unwrap();
4821        db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
4822            .await
4823            .unwrap();
4824
4825        let result = db.execute("EXPLAIN SELECT * FROM events").await.unwrap();
4826        match result {
4827            ExecuteResult::Metadata(batch) => {
4828                let keys = batch
4829                    .column(0)
4830                    .as_any()
4831                    .downcast_ref::<StringArray>()
4832                    .unwrap();
4833                let key_vals: Vec<&str> = (0..batch.num_rows()).map(|i| keys.value(i)).collect();
4834                assert!(
4835                    !key_vals.contains(&"rows_produced"),
4836                    "EXPLAIN without ANALYZE should not have rows_produced"
4837                );
4838            }
4839            _ => panic!("Expected Metadata result"),
4840        }
4841    }
4842
4843    #[tokio::test]
4844    async fn test_connectorless_source_does_not_break_pipeline() {
4845        let db = LaminarDB::open().unwrap();
4846
4847        // Connector-less source (no FROM clause) — formerly caused
4848        // LDB-1002 "No partitions provided" on every pipeline cycle.
4849        db.execute("CREATE SOURCE metadata (symbol VARCHAR, category VARCHAR)")
4850            .await
4851            .unwrap();
4852
4853        // A real source with a watermark that the pipeline will process.
4854        db.execute(
4855            "CREATE SOURCE trades (id BIGINT, price DOUBLE, ts BIGINT, \
4856             WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
4857        )
4858        .await
4859        .unwrap();
4860
4861        db.execute("CREATE STREAM out AS SELECT id, price FROM trades")
4862            .await
4863            .unwrap();
4864
4865        db.start().await.unwrap();
4866
4867        // Push data into the real source.
4868        let handle = db.source_untyped("trades").unwrap();
4869        let schema = handle.schema().clone();
4870        let batch = RecordBatch::try_new(
4871            schema,
4872            vec![
4873                Arc::new(arrow::array::Int64Array::from(vec![1, 2])),
4874                Arc::new(arrow::array::Float64Array::from(vec![100.0, 200.0])),
4875                Arc::new(arrow::array::Int64Array::from(vec![1000, 2000])),
4876            ],
4877        )
4878        .unwrap();
4879        handle.push_arrow(batch).unwrap();
4880
4881        // Let the pipeline run a few cycles.
4882        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4883
4884        // Verify the pipeline processed data without errors.
4885        let m = db.metrics();
4886        assert!(m.total_events_ingested > 0, "pipeline should ingest events");
4887
4888        // Push data into the connector-less source via push_arrow — should
4889        // work without causing pipeline errors.
4890        let meta_handle = db.source_untyped("metadata").unwrap();
4891        let meta_schema = meta_handle.schema().clone();
4892        let meta_batch = RecordBatch::try_new(
4893            meta_schema,
4894            vec![
4895                Arc::new(arrow::array::StringArray::from(vec!["BTC", "ETH"])),
4896                Arc::new(arrow::array::StringArray::from(vec!["L1", "L1"])),
4897            ],
4898        )
4899        .unwrap();
4900        meta_handle.push_arrow(meta_batch).unwrap();
4901
4902        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4903
4904        // Pipeline should still be healthy.
4905        let m2 = db.metrics();
4906        assert!(
4907            m2.total_events_ingested >= m.total_events_ingested,
4908            "pipeline should continue after connector-less source push"
4909        );
4910    }
4911}