1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::sync::Arc;
6
7use arrow::array::{RecordBatch, StringArray};
8use arrow::datatypes::{DataType, Field, Schema};
9use datafusion::prelude::SessionContext;
10use laminar_core::streaming;
11use laminar_sql::parser::{parse_streaming_sql, ShowCommand, StreamingStatement};
12use laminar_sql::planner::StreamingPlanner;
13use laminar_sql::register_streaming_functions;
14use laminar_sql::translator::{AsofJoinTranslatorConfig, JoinOperatorConfig};
15
16use crate::builder::LaminarDbBuilder;
17use crate::catalog::SourceCatalog;
18use crate::config::LaminarConfig;
19use crate::error::DbError;
20use crate::handle::{
21 DdlInfo, ExecuteResult, QueryHandle, QueryInfo, SinkInfo, SourceHandle, SourceInfo,
22 UntypedSourceHandle,
23};
24use crate::pipeline_lifecycle::url_to_checkpoint_prefix;
25use crate::sql_utils;
26
27pub(crate) const STATE_CREATED: u8 = 0;
28pub(crate) const STATE_STARTING: u8 = 1;
29pub(crate) const STATE_RUNNING: u8 = 2;
30pub(crate) const STATE_SHUTTING_DOWN: u8 = 3;
31pub(crate) const STATE_STOPPED: u8 = 4;
32
33pub(crate) fn streaming_statement_to_sql(stmt: &StreamingStatement) -> String {
35 match stmt {
36 StreamingStatement::Standard(sql_stmt) => sql_stmt.to_string(),
37 StreamingStatement::CreateContinuousQuery { query, .. } => {
38 streaming_statement_to_sql(query)
39 }
40 other => format!("{other:?}"),
41 }
42}
43
44pub struct LaminarDB {
67 pub(crate) catalog: Arc<SourceCatalog>,
68 pub(crate) planner: parking_lot::Mutex<StreamingPlanner>,
69 pub(crate) ctx: SessionContext,
70 pub(crate) config: LaminarConfig,
71 pub(crate) config_vars: Arc<HashMap<String, String>>,
72 pub(crate) shutdown: std::sync::atomic::AtomicBool,
73 pub(crate) coordinator:
75 Arc<tokio::sync::Mutex<Option<crate::checkpoint_coordinator::CheckpointCoordinator>>>,
76 pub(crate) connector_manager: parking_lot::Mutex<crate::connector_manager::ConnectorManager>,
77 pub(crate) connector_registry: Arc<laminar_connectors::registry::ConnectorRegistry>,
78 pub(crate) mv_registry: parking_lot::Mutex<laminar_core::mv::MvRegistry>,
79 pub(crate) table_store: Arc<parking_lot::RwLock<crate::table_store::TableStore>>,
80 pub(crate) state: std::sync::atomic::AtomicU8,
81 pub(crate) runtime_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
83 pub(crate) shutdown_signal: Arc<tokio::sync::Notify>,
85 pub(crate) counters: Arc<crate::metrics::PipelineCounters>,
87 pub(crate) start_time: std::time::Instant,
89 pub(crate) session_properties: parking_lot::Mutex<HashMap<String, String>>,
91 pub(crate) pipeline_watermark: Arc<std::sync::atomic::AtomicI64>,
93 #[cfg(feature = "jit")]
96 pub(crate) compiler_cache: parking_lot::Mutex<laminar_core::compiler::CompilerCache>,
97 pub(crate) lookup_registry: Arc<laminar_sql::datafusion::LookupTableRegistry>,
99}
100
101pub(crate) struct SourceWatermarkState {
106 pub(crate) extractor: laminar_core::time::EventTimeExtractor,
107 pub(crate) generator: Box<dyn laminar_core::time::WatermarkGenerator>,
108 pub(crate) column: String,
110 pub(crate) format: laminar_core::time::TimestampFormat,
112}
113
114pub(crate) fn infer_timestamp_format(
116 schema: &arrow::datatypes::SchemaRef,
117 column: &str,
118) -> laminar_core::time::TimestampFormat {
119 if let Ok(idx) = schema.index_of(column) {
120 match schema.field(idx).data_type() {
121 DataType::Timestamp(_, _) => laminar_core::time::TimestampFormat::ArrowNative,
122 _ => laminar_core::time::TimestampFormat::UnixMillis,
123 }
124 } else {
125 laminar_core::time::TimestampFormat::UnixMillis
126 }
127}
128
129pub(crate) fn filter_late_rows(
135 batch: &RecordBatch,
136 column: &str,
137 watermark: i64,
138 format: laminar_core::time::TimestampFormat,
139) -> Option<RecordBatch> {
140 crate::batch_filter::filter_batch_by_timestamp(
141 batch,
142 column,
143 watermark,
144 format,
145 crate::batch_filter::ThresholdOp::GreaterEq,
146 )
147}
148
149pub(crate) fn parse_duration_str(s: &str) -> Option<std::time::Duration> {
151 let s = s.trim();
152 if s.ends_with("ms") {
153 let n: u64 = s.strip_suffix("ms")?.trim().parse().ok()?;
154 Some(std::time::Duration::from_millis(n))
155 } else if s.ends_with('s') {
156 let n: u64 = s.strip_suffix('s')?.trim().parse().ok()?;
157 Some(std::time::Duration::from_secs(n))
158 } else if s.ends_with('m') {
159 let n: u64 = s.strip_suffix('m')?.trim().parse().ok()?;
160 Some(std::time::Duration::from_secs(n * 60))
161 } else {
162 let n: u64 = s.parse().ok()?;
164 Some(std::time::Duration::from_secs(n))
165 }
166}
167
168impl LaminarDB {
169 pub fn open() -> Result<Self, DbError> {
175 Self::open_with_config(LaminarConfig::default())
176 }
177
178 pub fn open_with_config(config: LaminarConfig) -> Result<Self, DbError> {
184 Self::open_with_config_and_vars(config, HashMap::new())
185 }
186
187 #[allow(clippy::unnecessary_wraps)]
193 pub(crate) fn open_with_config_and_vars(
194 config: LaminarConfig,
195 config_vars: HashMap<String, String>,
196 ) -> Result<Self, DbError> {
197 let lookup_registry = Arc::new(laminar_sql::datafusion::LookupTableRegistry::new());
198
199 let ctx = {
202 let session_config = laminar_sql::datafusion::base_session_config();
203 let extension_planner: Arc<
204 dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync,
205 > = Arc::new(laminar_sql::datafusion::LookupJoinExtensionPlanner::new(
206 Arc::clone(&lookup_registry),
207 ));
208 let query_planner: Arc<dyn datafusion::execution::context::QueryPlanner + Send + Sync> =
209 Arc::new(LookupQueryPlanner { extension_planner });
210 let state = datafusion::execution::SessionStateBuilder::new()
211 .with_config(session_config)
212 .with_default_features()
213 .with_query_planner(query_planner)
214 .build();
215 SessionContext::new_with_state(state)
216 };
217 register_streaming_functions(&ctx);
218
219 let catalog = Arc::new(SourceCatalog::new(
220 config.default_buffer_size,
221 config.default_backpressure,
222 ));
223
224 let connector_registry = Arc::new(laminar_connectors::registry::ConnectorRegistry::new());
225 Self::register_builtin_connectors(&connector_registry);
226
227 Ok(Self {
228 catalog,
229 planner: parking_lot::Mutex::new(StreamingPlanner::new()),
230 ctx,
231 config,
232 config_vars: Arc::new(config_vars),
233 shutdown: std::sync::atomic::AtomicBool::new(false),
234 coordinator: Arc::new(tokio::sync::Mutex::new(None)),
235 connector_manager: parking_lot::Mutex::new(
236 crate::connector_manager::ConnectorManager::new(),
237 ),
238 connector_registry,
239 mv_registry: parking_lot::Mutex::new(laminar_core::mv::MvRegistry::new()),
240 table_store: Arc::new(parking_lot::RwLock::new(
241 crate::table_store::TableStore::new(),
242 )),
243 state: std::sync::atomic::AtomicU8::new(STATE_CREATED),
244 runtime_handle: parking_lot::Mutex::new(None),
245 shutdown_signal: Arc::new(tokio::sync::Notify::new()),
246 counters: Arc::new(crate::metrics::PipelineCounters::new()),
247 start_time: std::time::Instant::now(),
248 session_properties: parking_lot::Mutex::new(HashMap::new()),
249 pipeline_watermark: Arc::new(std::sync::atomic::AtomicI64::new(i64::MIN)),
250 #[cfg(feature = "jit")]
251 compiler_cache: parking_lot::Mutex::new(
252 laminar_core::compiler::CompilerCache::new(64)
253 .expect("JIT compiler cache initialization"),
254 ),
255 lookup_registry,
256 })
257 }
258
259 #[must_use]
261 pub fn builder() -> LaminarDbBuilder {
262 LaminarDbBuilder::new()
263 }
264
265 #[allow(unused_variables)]
267 fn register_builtin_connectors(registry: &laminar_connectors::registry::ConnectorRegistry) {
268 #[cfg(feature = "kafka")]
269 {
270 laminar_connectors::kafka::register_kafka_source(registry);
271 laminar_connectors::kafka::register_kafka_sink(registry);
272 }
273 #[cfg(feature = "postgres-cdc")]
274 {
275 laminar_connectors::cdc::postgres::register_postgres_cdc(registry);
276 }
277 #[cfg(feature = "postgres-sink")]
278 {
279 laminar_connectors::postgres::register_postgres_sink(registry);
280 }
281 #[cfg(feature = "delta-lake")]
282 {
283 laminar_connectors::lakehouse::register_delta_lake_sink(registry);
284 laminar_connectors::lakehouse::register_delta_lake_source(registry);
285 }
286 #[cfg(feature = "websocket")]
287 {
288 laminar_connectors::websocket::register_websocket_source(registry);
289 laminar_connectors::websocket::register_websocket_sink(registry);
290 }
291 #[cfg(feature = "mysql-cdc")]
292 {
293 laminar_connectors::cdc::mysql::register_mysql_cdc_source(registry);
294 }
295 #[cfg(feature = "files")]
296 {
297 laminar_connectors::files::register_file_source(registry);
298 laminar_connectors::files::register_file_sink(registry);
299 }
300 }
301
302 fn refresh_lookup_optimizer_rule(&self) {
305 use laminar_sql::planner::lookup_join::{LookupColumnPruningRule, LookupJoinRewriteRule};
306 use laminar_sql::planner::predicate_split::{
307 PlanPushdownMode, PlanSourceCapabilities, PredicateSplitterRule,
308 SourceCapabilitiesRegistry,
309 };
310
311 self.ctx.remove_optimizer_rule("lookup_join_rewrite");
313 self.ctx.remove_optimizer_rule("predicate_splitter");
314 self.ctx.remove_optimizer_rule("lookup_column_pruning");
315
316 let tables = self.planner.lock().lookup_tables_cloned();
317 if tables.is_empty() {
318 return;
319 }
320
321 let mut caps_registry = SourceCapabilitiesRegistry::default();
323 for (name, info) in &tables {
324 let mode = match info.properties.pushdown_mode {
325 laminar_sql::parser::lookup_table::PushdownMode::Enabled
326 | laminar_sql::parser::lookup_table::PushdownMode::Auto => PlanPushdownMode::Full,
327 laminar_sql::parser::lookup_table::PushdownMode::Disabled => PlanPushdownMode::None,
328 };
329 let pk_set: std::collections::HashSet<String> =
330 info.primary_key.iter().cloned().collect();
331 caps_registry.register(
332 name.clone(),
333 PlanSourceCapabilities {
334 pushdown_mode: mode,
335 eq_columns: pk_set,
336 range_columns: std::collections::HashSet::new(),
337 in_columns: std::collections::HashSet::new(),
338 supports_null_check: false,
339 },
340 );
341 }
342
343 self.ctx
345 .add_optimizer_rule(Arc::new(LookupJoinRewriteRule::new(tables)));
346 self.ctx
347 .add_optimizer_rule(Arc::new(PredicateSplitterRule::new(caps_registry)));
348 self.ctx
349 .add_optimizer_rule(Arc::new(LookupColumnPruningRule));
350 }
351
352 #[must_use]
357 pub fn connector_registry(&self) -> &laminar_connectors::registry::ConnectorRegistry {
358 &self.connector_registry
359 }
360
361 pub(crate) fn register_custom_udf(&self, udf: datafusion_expr::ScalarUDF) {
365 self.ctx.register_udf(udf);
366 }
367
368 pub(crate) fn register_custom_udaf(&self, udaf: datafusion_expr::AggregateUDF) {
372 self.ctx.register_udaf(udaf);
373 }
374
375 #[cfg(feature = "delta-lake")]
392 pub async fn register_delta_table(
393 &self,
394 name: &str,
395 table_uri: &str,
396 storage_options: HashMap<String, String>,
397 ) -> Result<(), DbError> {
398 laminar_connectors::lakehouse::delta_table_provider::register_delta_table(
399 &self.ctx,
400 name,
401 table_uri,
402 storage_options,
403 )
404 .await
405 .map_err(|e| DbError::Connector(e.to_string()))
406 }
407
408 pub async fn execute(&self, sql: &str) -> Result<ExecuteResult, DbError> {
424 if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
425 return Err(DbError::Shutdown);
426 }
427
428 let resolved = if self.config_vars.is_empty() {
430 sql.to_string()
431 } else {
432 sql_utils::resolve_config_vars(sql, &self.config_vars, true)?
433 };
434
435 let stmts = sql_utils::split_statements(&resolved);
437 if stmts.is_empty() {
438 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
439 }
440
441 let mut last_result = None;
443 for stmt_sql in &stmts {
444 last_result = Some(self.execute_single(stmt_sql).await?);
445 }
446
447 last_result.ok_or_else(|| DbError::InvalidOperation("Empty SQL statement".into()))
448 }
449
450 #[allow(clippy::too_many_lines)]
452 async fn execute_single(&self, sql: &str) -> Result<ExecuteResult, DbError> {
453 let statements = parse_streaming_sql(sql)?;
454
455 if statements.is_empty() {
456 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
457 }
458
459 let statement = &statements[0];
460
461 match statement {
462 StreamingStatement::CreateSource(create) => {
463 let result = self.handle_create_source(create)?;
464 if let ExecuteResult::Ddl(ref info) = result {
465 self.connector_manager
466 .lock()
467 .store_ddl(&info.object_name, sql);
468 }
469 Ok(result)
470 }
471 StreamingStatement::CreateSink(create) => {
472 let result = self.handle_create_sink(create)?;
473 if let ExecuteResult::Ddl(ref info) = result {
474 self.connector_manager
475 .lock()
476 .store_ddl(&info.object_name, sql);
477 }
478 Ok(result)
479 }
480 StreamingStatement::CreateStream {
481 name,
482 query,
483 emit_clause,
484 ..
485 } => self.handle_create_stream(name, query, emit_clause.as_ref()),
486 StreamingStatement::CreateContinuousQuery { .. }
487 | StreamingStatement::CreateLookupTable(_)
488 | StreamingStatement::DropLookupTable { .. } => self.handle_query(sql).await,
489 StreamingStatement::Standard(stmt) => {
490 if let sqlparser::ast::Statement::CreateTable(ct) = stmt.as_ref() {
491 self.handle_create_table(ct)
492 } else if let sqlparser::ast::Statement::Drop {
493 object_type: sqlparser::ast::ObjectType::Table,
494 names,
495 if_exists,
496 ..
497 } = stmt.as_ref()
498 {
499 self.handle_drop_table(names, *if_exists)
500 } else if let sqlparser::ast::Statement::Set(set_stmt) = stmt.as_ref() {
501 self.handle_set(set_stmt)
502 } else {
503 self.handle_query(sql).await
504 }
505 }
506 StreamingStatement::InsertInto {
507 table_name,
508 columns,
509 values,
510 } => self.handle_insert_into(table_name, columns, values).await,
511 StreamingStatement::DropSource {
512 name,
513 if_exists,
514 cascade,
515 } => self.handle_drop_source(name, *if_exists, *cascade),
516 StreamingStatement::DropSink {
517 name,
518 if_exists,
519 cascade,
520 } => self.handle_drop_sink(name, *if_exists, *cascade),
521 StreamingStatement::DropStream {
522 name,
523 if_exists,
524 cascade,
525 } => self.handle_drop_stream(name, *if_exists, *cascade),
526 StreamingStatement::DropMaterializedView {
527 name,
528 if_exists,
529 cascade,
530 } => self.handle_drop_materialized_view(name, *if_exists, *cascade),
531 StreamingStatement::Show(cmd) => {
532 let batch = match cmd {
533 ShowCommand::Sources => self.build_show_sources(),
534 ShowCommand::Sinks => self.build_show_sinks(),
535 ShowCommand::Queries => self.build_show_queries(),
536 ShowCommand::MaterializedViews => self.build_show_materialized_views(),
537 ShowCommand::Streams => self.build_show_streams(),
538 ShowCommand::Tables => self.build_show_tables(),
539 ShowCommand::CheckpointStatus => self.build_show_checkpoint_status()?,
540 ShowCommand::CreateSource { name } => {
541 self.build_show_create_source(&name.to_string())?
542 }
543 ShowCommand::CreateSink { name } => {
544 self.build_show_create_sink(&name.to_string())?
545 }
546 };
547 Ok(ExecuteResult::Metadata(batch))
548 }
549 StreamingStatement::Checkpoint => {
550 let result = self.checkpoint().await?;
551 Ok(ExecuteResult::Ddl(DdlInfo {
552 statement_type: "CHECKPOINT".to_string(),
553 object_name: format!("checkpoint_{}", result.checkpoint_id),
554 }))
555 }
556 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
557 self.handle_restore_checkpoint(*checkpoint_id)
558 }
559 StreamingStatement::Describe { name, .. } => {
560 let name_str = name.to_string();
561 let batch = self.build_describe(&name_str)?;
562 Ok(ExecuteResult::Metadata(batch))
563 }
564 StreamingStatement::Explain {
565 statement, analyze, ..
566 } => {
567 if *analyze {
568 self.handle_explain_analyze(statement, sql).await
569 } else {
570 self.handle_explain(statement)
571 }
572 }
573 StreamingStatement::CreateMaterializedView {
574 name,
575 query,
576 or_replace,
577 if_not_exists,
578 ..
579 } => {
580 self.handle_create_materialized_view(sql, name, query, *or_replace, *if_not_exists)
581 .await
582 }
583 StreamingStatement::AlterSource { name, operation } => {
584 self.handle_alter_source(name, operation)
585 }
586 }
587 }
588
589 async fn handle_insert_into(
594 &self,
595 table_name: &sqlparser::ast::ObjectName,
596 _columns: &[sqlparser::ast::Ident],
597 values: &[Vec<sqlparser::ast::Expr>],
598 ) -> Result<ExecuteResult, DbError> {
599 let name = table_name.to_string();
600
601 if let Some(entry) = self.catalog.get_source(&name) {
603 let batch = sql_utils::sql_values_to_record_batch(&entry.schema, values)?;
604 entry
605 .push_and_buffer(batch)
606 .map_err(|e| DbError::InsertError(format!("Failed to push to source: {e}")))?;
607 return Ok(ExecuteResult::RowsAffected(values.len() as u64));
608 }
609
610 {
613 let mut ts = self.table_store.write();
614 if ts.has_table(&name) {
615 let schema = ts
616 .table_schema(&name)
617 .ok_or_else(|| DbError::TableNotFound(name.clone()))?;
618 let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
619 ts.upsert(&name, &batch)?;
620 drop(ts); self.sync_table_to_datafusion(&name)?;
623 return Ok(ExecuteResult::RowsAffected(values.len() as u64));
624 }
625 }
626
627 let table = self
630 .ctx
631 .table_provider(&name)
632 .await
633 .map_err(|_| DbError::TableNotFound(name.clone()))?;
634
635 let schema = table.schema();
636 let batch = sql_utils::sql_values_to_record_batch(&schema, values)?;
637
638 self.ctx
640 .deregister_table(&name)
641 .map_err(|e| DbError::InsertError(format!("Failed to deregister table: {e}")))?;
642
643 let mem_table =
644 datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![batch]])
645 .map_err(|e| DbError::InsertError(format!("Failed to create table: {e}")))?;
646
647 self.ctx
648 .register_table(&name, Arc::new(mem_table))
649 .map_err(|e| DbError::InsertError(format!("Failed to register table: {e}")))?;
650
651 Ok(ExecuteResult::RowsAffected(values.len() as u64))
652 }
653
654 #[allow(clippy::unused_self)] fn handle_restore_checkpoint(&self, _checkpoint_id: u64) -> Result<ExecuteResult, DbError> {
663 Err(DbError::Unsupported(
664 "RESTORE FROM CHECKPOINT is not yet implemented — \
665 requires pipeline stop, state reload from manifest, \
666 source offset seek, and pipeline restart"
667 .to_string(),
668 ))
669 }
670
671 #[must_use]
673 pub fn get_session_property(&self, key: &str) -> Option<String> {
674 self.session_properties
675 .lock()
676 .get(&key.to_lowercase())
677 .cloned()
678 }
679
680 #[must_use]
682 pub fn session_properties(&self) -> HashMap<String, String> {
683 self.session_properties.lock().clone()
684 }
685
686 pub fn subscribe<T: crate::handle::FromBatch>(
692 &self,
693 name: &str,
694 ) -> Result<crate::handle::TypedSubscription<T>, DbError> {
695 let sub = self
696 .catalog
697 .get_stream_subscription(name)
698 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))?;
699 Ok(crate::handle::TypedSubscription::from_raw(sub))
700 }
701
702 #[cfg(feature = "api")]
707 pub(crate) fn subscribe_raw(
708 &self,
709 name: &str,
710 ) -> Result<laminar_core::streaming::Subscription<crate::catalog::ArrowRecord>, DbError> {
711 self.catalog
712 .get_stream_subscription(name)
713 .ok_or_else(|| DbError::StreamNotFound(name.to_string()))
714 }
715
716 fn handle_explain(&self, statement: &StreamingStatement) -> Result<ExecuteResult, DbError> {
718 let mut planner = self.planner.lock();
719
720 let plan_result = planner.plan(statement);
722
723 let mut rows: Vec<(String, String)> = Vec::new();
724
725 match plan_result {
726 Ok(plan) => {
727 rows.push((
728 "plan_type".into(),
729 match &plan {
730 laminar_sql::planner::StreamingPlan::Query(_) => "Query",
731 laminar_sql::planner::StreamingPlan::RegisterSource(_) => "RegisterSource",
732 laminar_sql::planner::StreamingPlan::RegisterSink(_) => "RegisterSink",
733 laminar_sql::planner::StreamingPlan::Standard(_) => "Standard",
734 laminar_sql::planner::StreamingPlan::DagExplain(_) => "DagExplain",
735 laminar_sql::planner::StreamingPlan::RegisterLookupTable(_) => {
736 "RegisterLookupTable"
737 }
738 laminar_sql::planner::StreamingPlan::DropLookupTable { .. } => {
739 "DropLookupTable"
740 }
741 }
742 .into(),
743 ));
744 match &plan {
745 laminar_sql::planner::StreamingPlan::Query(qp) => {
746 if let Some(name) = &qp.name {
747 rows.push(("query_name".into(), name.clone()));
748 }
749 if let Some(wc) = &qp.window_config {
750 rows.push(("window".into(), format!("{wc}")));
751 }
752 if let Some(jcs) = &qp.join_config {
753 if jcs.len() == 1 {
754 rows.push(("join".into(), format!("{}", jcs[0])));
755 } else {
756 for (i, jc) in jcs.iter().enumerate() {
757 rows.push((format!("join_step_{}", i + 1), format!("{jc}")));
758 }
759 }
760 }
761 if let Some(oc) = &qp.order_config {
762 rows.push(("order_by".into(), format!("{oc:?}")));
763 }
764 if let Some(fc) = &qp.frame_config {
765 rows.push((
766 "frame_functions".into(),
767 format!("{}", fc.functions.len()),
768 ));
769 }
770 if let Some(ec) = &qp.emit_clause {
771 rows.push(("emit".into(), format!("{ec}")));
772 }
773 }
774 laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
775 rows.push(("source".into(), info.name.clone()));
776 }
777 laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
778 rows.push(("sink".into(), info.name.clone()));
779 }
780 laminar_sql::planner::StreamingPlan::Standard(_) => {
781 rows.push(("execution".into(), "DataFusion pass-through".into()));
782 }
783 laminar_sql::planner::StreamingPlan::DagExplain(output) => {
784 rows.push(("dag_topology".into(), output.topology_text.clone()));
785 }
786 laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
787 rows.push(("lookup_table".into(), info.name.clone()));
788 }
789 laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
790 rows.push(("drop_lookup_table".into(), name.clone()));
791 }
792 }
793 }
794 Err(e) => {
795 rows.push(("error".into(), format!("{e}")));
797 rows.push((
798 "statement".into(),
799 format!("{:?}", std::mem::discriminant(statement)),
800 ));
801 }
802 }
803
804 let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
805 let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
806
807 let schema = Arc::new(Schema::new(vec![
808 Field::new("plan_key", DataType::Utf8, false),
809 Field::new("plan_value", DataType::Utf8, false),
810 ]));
811
812 let batch = RecordBatch::try_new(
813 schema,
814 vec![
815 Arc::new(StringArray::from(keys)),
816 Arc::new(StringArray::from(values)),
817 ],
818 )
819 .map_err(|e| DbError::InvalidOperation(format!("explain metadata: {e}")))?;
820
821 Ok(ExecuteResult::Metadata(batch))
822 }
823
824 async fn handle_explain_analyze(
826 &self,
827 statement: &StreamingStatement,
828 original_sql: &str,
829 ) -> Result<ExecuteResult, DbError> {
830 let explain_result = self.handle_explain(statement)?;
832 let mut rows: Vec<(String, String)> = Vec::new();
833
834 if let ExecuteResult::Metadata(explain_batch) = &explain_result {
835 let keys_col = explain_batch
836 .column(0)
837 .as_any()
838 .downcast_ref::<StringArray>();
839 let vals_col = explain_batch
840 .column(1)
841 .as_any()
842 .downcast_ref::<StringArray>();
843 if let (Some(keys), Some(vals)) = (keys_col, vals_col) {
844 for i in 0..explain_batch.num_rows() {
845 rows.push((keys.value(i).to_string(), vals.value(i).to_string()));
846 }
847 }
848 }
849
850 let upper = original_sql.to_uppercase();
852 let inner_start = upper.find("ANALYZE").map_or(0, |pos| pos + "ANALYZE".len());
853 let inner_sql = original_sql[inner_start..].trim();
854
855 let start = std::time::Instant::now();
857 match self.ctx.sql(inner_sql).await {
858 Ok(df) => match df.collect().await {
859 Ok(batches) => {
860 let elapsed = start.elapsed();
861 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
862 rows.push(("rows_produced".into(), total_rows.to_string()));
863 rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
864 rows.push(("batches_processed".into(), batches.len().to_string()));
865 }
866 Err(e) => {
867 let elapsed = start.elapsed();
868 rows.push(("execution_time_ms".into(), elapsed.as_millis().to_string()));
869 rows.push(("analyze_error".into(), format!("{e}")));
870 }
871 },
872 Err(e) => {
873 rows.push(("analyze_error".into(), format!("{e}")));
874 }
875 }
876
877 let keys: Vec<&str> = rows.iter().map(|(k, _)| k.as_str()).collect();
878 let values: Vec<&str> = rows.iter().map(|(_, v)| v.as_str()).collect();
879
880 let schema = Arc::new(Schema::new(vec![
881 Field::new("plan_key", DataType::Utf8, false),
882 Field::new("plan_value", DataType::Utf8, false),
883 ]));
884
885 let batch = RecordBatch::try_new(
886 schema,
887 vec![
888 Arc::new(StringArray::from(keys)),
889 Arc::new(StringArray::from(values)),
890 ],
891 )
892 .map_err(|e| DbError::InvalidOperation(format!("explain analyze metadata: {e}")))?;
893
894 Ok(ExecuteResult::Metadata(batch))
895 }
896
897 #[allow(clippy::too_many_lines)]
899 pub(crate) async fn handle_query(&self, sql: &str) -> Result<ExecuteResult, DbError> {
900 let plan = {
902 let statements = parse_streaming_sql(sql)?;
903 if statements.is_empty() {
904 return Err(DbError::InvalidOperation("Empty SQL statement".into()));
905 }
906 let mut planner = self.planner.lock();
907 planner
908 .plan(&statements[0])
909 .map_err(laminar_sql::Error::from)?
910 };
911
912 match plan {
913 laminar_sql::planner::StreamingPlan::RegisterSource(info) => {
914 Ok(ExecuteResult::Ddl(DdlInfo {
915 statement_type: "DDL".to_string(),
916 object_name: info.name,
917 }))
918 }
919 laminar_sql::planner::StreamingPlan::RegisterSink(info) => {
920 Ok(ExecuteResult::Ddl(DdlInfo {
921 statement_type: "DDL".to_string(),
922 object_name: info.name,
923 }))
924 }
925 laminar_sql::planner::StreamingPlan::Query(query_plan) => {
926 if let Some(asof_config) = Self::extract_asof_config(&query_plan) {
928 return self.execute_asof_query(&asof_config, sql).await;
929 }
930
931 let plan_sql = query_plan.statement.to_string();
932 let logical_plan = self.ctx.state().create_logical_plan(&plan_sql).await?;
933
934 #[cfg(feature = "jit")]
936 {
937 if let Some(compiled) = self.try_compile_query(sql, &logical_plan) {
938 tracing::info!(
939 sql = %sql,
940 compiled = compiled.metadata.compiled_pipeline_count,
941 fallback = compiled.metadata.fallback_pipeline_count,
942 "Query compiled via JIT"
943 );
944 return self.bridge_compiled_query(sql, compiled).await;
945 }
946 }
947
948 let df = self.ctx.execute_logical_plan(logical_plan).await?;
950 let stream = df.execute_stream().await?;
951
952 Ok(self.bridge_query_stream(sql, stream))
953 }
954 laminar_sql::planner::StreamingPlan::Standard(stmt) => {
955 let sql_str = stmt.to_string();
957 let df = self.ctx.sql(&sql_str).await?;
958 let stream = df.execute_stream().await?;
959
960 Ok(self.bridge_query_stream(sql, stream))
961 }
962 laminar_sql::planner::StreamingPlan::DagExplain(output) => {
963 Ok(ExecuteResult::Ddl(DdlInfo {
964 statement_type: "EXPLAIN DAG".to_string(),
965 object_name: output.topology_text,
966 }))
967 }
968 laminar_sql::planner::StreamingPlan::RegisterLookupTable(info) => {
969 use laminar_sql::parser::lookup_table::ConnectorType;
970
971 let pk = info
972 .primary_key
973 .first()
974 .ok_or_else(|| {
975 DbError::InvalidOperation("Lookup table requires a primary key".into())
976 })?
977 .clone();
978
979 let cache_mode = info.properties.cache_memory.map(|mem| {
981 let max_entries = (mem.as_bytes() / 256).max(1024) as usize;
982 crate::table_cache_mode::TableCacheMode::Partial { max_entries }
983 });
984 if let Some(cache) = cache_mode {
985 self.table_store.write().create_table_with_cache(
986 &info.name,
987 info.arrow_schema.clone(),
988 &pk,
989 cache,
990 )?;
991 } else {
992 self.table_store.write().create_table(
993 &info.name,
994 info.arrow_schema.clone(),
995 &pk,
996 )?;
997 }
998
999 if !matches!(info.properties.connector, ConnectorType::Static) {
1002 let connector_type_str = match &info.properties.connector {
1003 ConnectorType::PostgresCdc => "postgres-cdc",
1004 ConnectorType::MysqlCdc => "mysql-cdc",
1005 ConnectorType::Redis => "redis",
1006 ConnectorType::S3Parquet => "s3-parquet",
1007 ConnectorType::Custom(s) => s.as_str(),
1008 ConnectorType::Static => unreachable!(),
1009 };
1010
1011 self.table_store
1012 .write()
1013 .set_connector(&info.name, connector_type_str);
1014
1015 let refresh = match info.properties.strategy {
1016 laminar_sql::parser::lookup_table::LookupStrategy::Replicated
1017 | laminar_sql::parser::lookup_table::LookupStrategy::Partitioned => {
1018 Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
1019 }
1020 laminar_sql::parser::lookup_table::LookupStrategy::OnDemand => {
1021 Some(laminar_connectors::reference::RefreshMode::Manual)
1022 }
1023 };
1024
1025 let connector_options: HashMap<String, String> = info
1028 .raw_options
1029 .iter()
1030 .filter(|(k, _)| {
1031 ![
1032 "connector",
1033 "strategy",
1034 "cache.memory",
1035 "cache.disk",
1036 "cache.ttl",
1037 "pushdown",
1038 ]
1039 .contains(&k.as_str())
1040 })
1041 .map(|(k, v)| (k.clone(), v.clone()))
1042 .collect();
1043
1044 let table_cache_mode = info.properties.cache_memory.map(|mem| {
1045 let max_entries = (mem.as_bytes() / 256).max(1024) as usize;
1046 crate::table_cache_mode::TableCacheMode::Partial { max_entries }
1047 });
1048 let cache_max = info
1049 .properties
1050 .cache_memory
1051 .map(|mem| (mem.as_bytes() / 256).max(1024) as usize);
1052
1053 self.connector_manager.lock().register_table(
1054 crate::connector_manager::TableRegistration {
1055 name: info.name.clone(),
1056 primary_key: pk,
1057 connector_type: Some(connector_type_str.to_string()),
1058 connector_options,
1059 format: info.raw_options.get("format").cloned(),
1060 format_options: HashMap::new(),
1061 refresh,
1062 cache_mode: table_cache_mode,
1063 cache_max_entries: cache_max,
1064 storage: None,
1065 },
1066 );
1067 }
1068
1069 {
1071 let provider = crate::table_provider::ReferenceTableProvider::new(
1072 info.name.clone(),
1073 info.arrow_schema.clone(),
1074 self.table_store.clone(),
1075 );
1076 let _ = self.ctx.deregister_table(&info.name);
1077 self.ctx
1078 .register_table(&info.name, Arc::new(provider))
1079 .map_err(|e| {
1080 DbError::InvalidOperation(format!(
1081 "Failed to register lookup table: {e}"
1082 ))
1083 })?;
1084 }
1085
1086 if let Some(batch) = self.table_store.read().to_record_batch(&info.name) {
1089 self.lookup_registry.register(
1090 &info.name,
1091 laminar_sql::datafusion::LookupSnapshot {
1092 batch,
1093 key_columns: info.primary_key.clone(),
1094 },
1095 );
1096 }
1097
1098 self.refresh_lookup_optimizer_rule();
1101
1102 Ok(ExecuteResult::Ddl(DdlInfo {
1103 statement_type: "CREATE LOOKUP TABLE".to_string(),
1104 object_name: info.name,
1105 }))
1106 }
1107 laminar_sql::planner::StreamingPlan::DropLookupTable { name } => {
1108 self.table_store.write().drop_table(&name);
1109 self.connector_manager.lock().unregister_table(&name);
1110 let _ = self.ctx.deregister_table(&name);
1111 self.lookup_registry.unregister(&name);
1112 self.refresh_lookup_optimizer_rule();
1113 Ok(ExecuteResult::Ddl(DdlInfo {
1114 statement_type: "DROP LOOKUP TABLE".to_string(),
1115 object_name: name,
1116 }))
1117 }
1118 }
1119 }
1120
1121 fn bridge_query_stream(
1124 &self,
1125 sql: &str,
1126 stream: datafusion::physical_plan::SendableRecordBatchStream,
1127 ) -> ExecuteResult {
1128 let query_id = self.catalog.register_query(sql);
1129 let schema = stream.schema();
1130
1131 let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1132 let (source, sink) =
1133 streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1134
1135 let subscription = sink.subscribe();
1136
1137 let source_clone = source.clone();
1138 tokio::spawn(async move {
1139 use tokio_stream::StreamExt;
1140 let mut stream = stream;
1141 while let Some(result) = stream.next().await {
1142 match result {
1143 Ok(batch) => {
1144 if source_clone.push_arrow(batch).is_err() {
1145 break;
1146 }
1147 }
1148 Err(_) => break,
1149 }
1150 }
1151 drop(source_clone);
1152 });
1153
1154 ExecuteResult::Query(QueryHandle {
1155 id: query_id,
1156 schema,
1157 sql: sql.to_string(),
1158 subscription: Some(subscription),
1159 active: true,
1160 })
1161 }
1162
1163 #[cfg(feature = "jit")]
1171 fn try_compile_query(
1172 &self,
1173 sql: &str,
1174 logical_plan: &datafusion::logical_expr::LogicalPlan,
1175 ) -> Option<laminar_core::compiler::CompiledStreamingQuery> {
1176 let Some(mut cache) = self.compiler_cache.try_lock() else {
1177 tracing::debug!(sql = %sql, "Compiler cache contended, falling back to DataFusion");
1178 return None;
1179 };
1180 let config = laminar_core::compiler::QueryConfig::default();
1181 match laminar_core::compiler::compile_streaming_query(
1182 sql,
1183 logical_plan,
1184 &mut cache,
1185 &config,
1186 ) {
1187 Ok(result) => result,
1188 Err(e) => {
1189 tracing::debug!(
1190 sql = %sql,
1191 error = %e,
1192 "JIT compilation failed, falling back to DataFusion"
1193 );
1194 None
1195 }
1196 }
1197 }
1198
1199 #[cfg(feature = "jit")]
1202 async fn bridge_compiled_query(
1203 &self,
1204 sql: &str,
1205 compiled: laminar_core::compiler::CompiledStreamingQuery,
1206 ) -> Result<ExecuteResult, DbError> {
1207 use laminar_core::compiler::batch_reader::BatchRowReader;
1208 use laminar_core::compiler::event_time::{EventTimeConfig, RowEventTimeExtractor};
1209 use laminar_core::compiler::pipeline_bridge::Ring1Action;
1210
1211 let df = self.ctx.execute_logical_plan(compiled.source_plan).await?;
1213 let input_stream = df.execute_stream().await?;
1214 let output_schema = compiled.output_schema.arrow_schema().clone();
1215
1216 let query_id = self.catalog.register_query(sql);
1218 let source_cfg = streaming::SourceConfig::with_buffer_size(self.config.default_buffer_size);
1219 let (source, sink) =
1220 streaming::create_with_config::<crate::catalog::ArrowRecord>(source_cfg);
1221 let subscription = sink.subscribe();
1222
1223 let input_schema = compiled.input_schema;
1225 let time_config = EventTimeConfig::default();
1226 let time_extractor = RowEventTimeExtractor::from_schema(&input_schema, &time_config);
1227
1228 let source_clone = source.clone();
1230 let mut query = compiled.query;
1231 query
1232 .start()
1233 .map_err(|e| DbError::InvalidOperation(e.to_string()))?;
1234
1235 tokio::spawn(async move {
1236 use tokio_stream::StreamExt;
1237 let mut stream = input_stream;
1238 let mut extractor = time_extractor;
1239 let mut arena = bumpalo::Bump::with_capacity(1024 * 128);
1240
1241 while let Some(result) = stream.next().await {
1242 match result {
1243 Ok(batch) => {
1244 let reader = BatchRowReader::new(&batch, &input_schema);
1245 for row_idx in 0..reader.row_count() {
1246 let row = reader.read_row(row_idx, &arena);
1247 let event_time = match extractor {
1248 Some(ref mut ext) => ext.extract(&row),
1249 #[allow(clippy::cast_possible_wrap)]
1250 None => row_idx as i64,
1251 };
1252 let _ = query.submit_row(&row, event_time, row_idx as u64);
1253 }
1254 if let Some(ref ext) = extractor {
1256 let _ = query.advance_watermark(ext.watermark());
1257 }
1258 let actions = query.poll_ring1();
1260 for action in actions {
1261 if let Ring1Action::ProcessBatch(out_batch) = action {
1262 if source_clone.push_arrow(out_batch).is_err() {
1263 return;
1264 }
1265 }
1266 }
1267 arena.reset();
1269 }
1270 Err(_) => break,
1271 }
1272 }
1273 let _ = query.send_eof();
1275 for action in query.poll_ring1() {
1276 if let Ring1Action::ProcessBatch(batch) = action {
1277 let _ = source_clone.push_arrow(batch);
1279 }
1280 }
1281 });
1282
1283 Ok(ExecuteResult::Query(QueryHandle {
1284 id: query_id,
1285 schema: output_schema,
1286 sql: sql.to_string(),
1287 subscription: Some(subscription),
1288 active: true,
1289 }))
1290 }
1291
1292 fn extract_asof_config(
1294 plan: &laminar_sql::planner::QueryPlan,
1295 ) -> Option<AsofJoinTranslatorConfig> {
1296 plan.join_config.as_ref()?.iter().find_map(|jc| {
1297 if let JoinOperatorConfig::Asof(cfg) = jc {
1298 Some(cfg.clone())
1299 } else {
1300 None
1301 }
1302 })
1303 }
1304
1305 async fn execute_asof_query(
1309 &self,
1310 asof_config: &AsofJoinTranslatorConfig,
1311 original_sql: &str,
1312 ) -> Result<ExecuteResult, DbError> {
1313 let left_sql = format!("SELECT * FROM {}", asof_config.left_table);
1314 let right_sql = format!("SELECT * FROM {}", asof_config.right_table);
1315
1316 let left_batches = self
1317 .ctx
1318 .sql(&left_sql)
1319 .await
1320 .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?
1321 .collect()
1322 .await
1323 .map_err(|e| DbError::query_pipeline(&asof_config.left_table, &e))?;
1324
1325 let right_batches = self
1326 .ctx
1327 .sql(&right_sql)
1328 .await
1329 .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?
1330 .collect()
1331 .await
1332 .map_err(|e| DbError::query_pipeline(&asof_config.right_table, &e))?;
1333
1334 let result_batch =
1335 crate::asof_batch::execute_asof_join_batch(&left_batches, &right_batches, asof_config)?;
1336
1337 if result_batch.num_rows() == 0 {
1338 let query_id = self.catalog.register_query(original_sql);
1339 return Ok(ExecuteResult::Query(QueryHandle {
1340 id: query_id,
1341 schema: result_batch.schema(),
1342 sql: original_sql.to_string(),
1343 subscription: None,
1344 active: false,
1345 }));
1346 }
1347
1348 let schema = result_batch.schema();
1349 let mem_table =
1350 datafusion::datasource::MemTable::try_new(schema.clone(), vec![vec![result_batch]])
1351 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1352
1353 let _ = self.ctx.deregister_table("__asof_result");
1354 self.ctx
1355 .register_table("__asof_result", Arc::new(mem_table))
1356 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1357
1358 let df = self
1359 .ctx
1360 .sql("SELECT * FROM __asof_result")
1361 .await
1362 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1363 let stream = df
1364 .execute_stream()
1365 .await
1366 .map_err(|e| DbError::query_pipeline("ASOF join", &e))?;
1367
1368 let _ = self.ctx.deregister_table("__asof_result");
1369
1370 Ok(self.bridge_query_stream(original_sql, stream))
1371 }
1372
1373 pub fn source<T: laminar_core::streaming::Record>(
1381 &self,
1382 name: &str,
1383 ) -> Result<SourceHandle<T>, DbError> {
1384 let entry = self
1385 .catalog
1386 .get_source(name)
1387 .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1388 SourceHandle::new(entry)
1389 }
1390
1391 pub fn source_untyped(&self, name: &str) -> Result<UntypedSourceHandle, DbError> {
1397 let entry = self
1398 .catalog
1399 .get_source(name)
1400 .ok_or_else(|| DbError::SourceNotFound(name.to_string()))?;
1401 Ok(UntypedSourceHandle::new(entry))
1402 }
1403
1404 pub fn sources(&self) -> Vec<SourceInfo> {
1406 let names = self.catalog.list_sources();
1407 names
1408 .into_iter()
1409 .filter_map(|name| {
1410 self.catalog.get_source(&name).map(|e| SourceInfo {
1411 name: e.name.clone(),
1412 schema: e.schema.clone(),
1413 watermark_column: e.watermark_column.clone(),
1414 })
1415 })
1416 .collect()
1417 }
1418
1419 pub fn sinks(&self) -> Vec<SinkInfo> {
1421 self.catalog
1422 .list_sinks()
1423 .into_iter()
1424 .map(|name| SinkInfo { name })
1425 .collect()
1426 }
1427
1428 pub fn streams(&self) -> Vec<crate::handle::StreamInfo> {
1430 let mgr = self.connector_manager.lock();
1431 mgr.streams()
1432 .iter()
1433 .map(|(name, reg)| crate::handle::StreamInfo {
1434 name: name.clone(),
1435 sql: Some(reg.query_sql.clone()),
1436 })
1437 .collect()
1438 }
1439
1440 pub fn pipeline_topology(&self) -> crate::handle::PipelineTopology {
1447 use crate::handle::{PipelineEdge, PipelineNode, PipelineNodeType};
1448
1449 let mut nodes = Vec::new();
1450 let mut edges = Vec::new();
1451
1452 let source_names = self.catalog.list_sources();
1454
1455 for name in &source_names {
1457 let schema = self.catalog.get_source(name).map(|e| e.schema.clone());
1458 nodes.push(PipelineNode {
1459 name: name.clone(),
1460 node_type: PipelineNodeType::Source,
1461 schema,
1462 sql: None,
1463 });
1464 }
1465
1466 let mgr = self.connector_manager.lock();
1468 let stream_names: Vec<String> = mgr.streams().keys().cloned().collect();
1469 for (name, reg) in mgr.streams() {
1470 nodes.push(PipelineNode {
1471 name: name.clone(),
1472 node_type: PipelineNodeType::Stream,
1473 schema: None,
1474 sql: Some(reg.query_sql.clone()),
1475 });
1476
1477 let sql_upper = reg.query_sql.to_uppercase();
1481 for src in &source_names {
1482 if sql_upper.contains(&src.to_uppercase()) {
1483 edges.push(PipelineEdge {
1484 from: src.clone(),
1485 to: name.clone(),
1486 });
1487 }
1488 }
1489 for other in &stream_names {
1491 if other != name && sql_upper.contains(&other.to_uppercase()) {
1492 edges.push(PipelineEdge {
1493 from: other.clone(),
1494 to: name.clone(),
1495 });
1496 }
1497 }
1498 }
1499
1500 for (name, reg) in mgr.sinks() {
1502 nodes.push(PipelineNode {
1503 name: name.clone(),
1504 node_type: PipelineNodeType::Sink,
1505 schema: None,
1506 sql: None,
1507 });
1508
1509 if !reg.input.is_empty() {
1511 edges.push(PipelineEdge {
1512 from: reg.input.clone(),
1513 to: name.clone(),
1514 });
1515 }
1516 }
1517
1518 let cm_sink_names: std::collections::HashSet<&String> = mgr.sinks().keys().collect();
1521 for name in self.catalog.list_sinks() {
1522 if !cm_sink_names.contains(&name) {
1523 if let Some(input) = self.catalog.get_sink_input(&name) {
1525 nodes.push(PipelineNode {
1526 name: name.clone(),
1527 node_type: PipelineNodeType::Sink,
1528 schema: None,
1529 sql: None,
1530 });
1531 if !input.is_empty() {
1532 edges.push(PipelineEdge {
1533 from: input,
1534 to: name,
1535 });
1536 }
1537 }
1538 }
1539 }
1540
1541 drop(mgr);
1542
1543 crate::handle::PipelineTopology { nodes, edges }
1544 }
1545
1546 pub fn queries(&self) -> Vec<QueryInfo> {
1548 self.catalog
1549 .list_queries()
1550 .into_iter()
1551 .map(|(id, sql, active)| QueryInfo { id, sql, active })
1552 .collect()
1553 }
1554
1555 #[must_use]
1557 pub fn is_checkpoint_enabled(&self) -> bool {
1558 self.config.checkpoint.is_some()
1559 }
1560
1561 pub fn checkpoint_store(&self) -> Option<Box<dyn laminar_storage::CheckpointStore>> {
1567 let cp_config = self.config.checkpoint.as_ref()?;
1568 let max_retained = cp_config.max_retained.unwrap_or(3);
1569
1570 if let Some(ref url) = self.config.object_store_url {
1571 let obj_store = laminar_storage::object_store_factory::build_object_store(
1572 url,
1573 &self.config.object_store_options,
1574 )
1575 .ok()?;
1576 let prefix = url_to_checkpoint_prefix(url);
1577 Some(Box::new(
1578 laminar_storage::checkpoint_store::ObjectStoreCheckpointStore::new(
1579 obj_store,
1580 prefix,
1581 max_retained,
1582 )
1583 .ok()?,
1584 ))
1585 } else {
1586 let data_dir = cp_config
1587 .data_dir
1588 .clone()
1589 .or_else(|| self.config.storage_dir.clone())
1590 .unwrap_or_else(|| std::path::PathBuf::from("./data"));
1591 Some(Box::new(
1592 laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(
1593 &data_dir,
1594 max_retained,
1595 ),
1596 ))
1597 }
1598 }
1599
1600 pub async fn checkpoint(
1613 &self,
1614 ) -> Result<crate::checkpoint_coordinator::CheckpointResult, DbError> {
1615 if self.config.checkpoint.is_none() {
1616 return Err(DbError::Checkpoint(
1617 "checkpointing is not enabled".to_string(),
1618 ));
1619 }
1620 let mut guard = self.coordinator.lock().await;
1621 let coord = guard.as_mut().ok_or_else(|| {
1622 DbError::Checkpoint("coordinator not initialized — call start() first".to_string())
1623 })?;
1624 coord
1625 .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
1626 .await
1627 }
1628
1629 pub async fn checkpoint_stats(&self) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
1633 let guard = self.coordinator.lock().await;
1634 guard
1635 .as_ref()
1636 .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
1637 }
1638}
1639
1640impl std::fmt::Debug for LaminarDB {
1641 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1642 f.debug_struct("LaminarDB")
1643 .field("sources", &self.catalog.list_sources().len())
1644 .field("sinks", &self.catalog.list_sinks().len())
1645 .field("materialized_views", &self.mv_registry.lock().len())
1646 .field("checkpoint_enabled", &self.is_checkpoint_enabled())
1647 .field("shutdown", &self.is_closed())
1648 .finish_non_exhaustive()
1649 }
1650}
1651
1652struct LookupQueryPlanner {
1654 extension_planner: Arc<dyn datafusion::physical_planner::ExtensionPlanner + Send + Sync>,
1655}
1656
1657impl std::fmt::Debug for LookupQueryPlanner {
1658 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1659 f.debug_struct("LookupQueryPlanner").finish_non_exhaustive()
1660 }
1661}
1662
1663#[async_trait::async_trait]
1664impl datafusion::execution::context::QueryPlanner for LookupQueryPlanner {
1665 async fn create_physical_plan(
1666 &self,
1667 logical_plan: &datafusion::logical_expr::LogicalPlan,
1668 session_state: &datafusion::execution::SessionState,
1669 ) -> datafusion_common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
1670 use datafusion::physical_planner::PhysicalPlanner;
1671 let planner =
1672 datafusion::physical_planner::DefaultPhysicalPlanner::with_extension_planners(vec![
1673 Arc::clone(&self.extension_planner),
1674 ]);
1675 planner
1676 .create_physical_plan(logical_plan, session_state)
1677 .await
1678 }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683 use super::*;
1684 use crate::ddl::extract_connector_from_with_options;
1685
1686 #[tokio::test]
1687 async fn test_open_default() {
1688 let db = LaminarDB::open().unwrap();
1689 assert!(!db.is_closed());
1690 assert!(db.sources().is_empty());
1691 assert!(db.sinks().is_empty());
1692 }
1693
1694 #[tokio::test]
1695 async fn test_create_source() {
1696 let db = LaminarDB::open().unwrap();
1697 let result = db
1698 .execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, ts BIGINT)")
1699 .await
1700 .unwrap();
1701
1702 match result {
1703 ExecuteResult::Ddl(info) => {
1704 assert_eq!(info.statement_type, "CREATE SOURCE");
1705 assert_eq!(info.object_name, "trades");
1706 }
1707 _ => panic!("Expected DDL result"),
1708 }
1709
1710 assert_eq!(db.sources().len(), 1);
1711 assert_eq!(db.sources()[0].name, "trades");
1712 }
1713
1714 #[tokio::test]
1715 async fn test_create_source_with_watermark() {
1716 let db = LaminarDB::open().unwrap();
1717 db.execute(
1718 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND)",
1719 )
1720 .await
1721 .unwrap();
1722
1723 let sources = db.sources();
1724 assert_eq!(sources.len(), 1);
1725 assert_eq!(sources[0].watermark_column, Some("ts".to_string()));
1726 }
1727
1728 #[tokio::test]
1729 async fn test_create_source_duplicate_error() {
1730 let db = LaminarDB::open().unwrap();
1731 db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1732 let result = db.execute("CREATE SOURCE test (id INT)").await;
1733 assert!(result.is_err());
1734 }
1735
1736 #[tokio::test]
1737 async fn test_create_source_if_not_exists() {
1738 let db = LaminarDB::open().unwrap();
1739 db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1740 let result = db
1741 .execute("CREATE SOURCE IF NOT EXISTS test (id INT)")
1742 .await;
1743 assert!(result.is_ok());
1744 }
1745
1746 #[tokio::test]
1747 async fn test_create_or_replace_source() {
1748 let db = LaminarDB::open().unwrap();
1749 db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1750 let result = db
1751 .execute("CREATE OR REPLACE SOURCE test (id INT, name VARCHAR)")
1752 .await;
1753 assert!(result.is_ok());
1754 }
1755
1756 #[tokio::test]
1757 async fn test_create_sink() {
1758 let db = LaminarDB::open().unwrap();
1759 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
1760 db.execute("CREATE SINK output FROM events").await.unwrap();
1761
1762 assert_eq!(db.sinks().len(), 1);
1763 }
1764
1765 #[tokio::test]
1766 async fn test_source_handle_untyped() {
1767 let db = LaminarDB::open().unwrap();
1768 db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
1769 .await
1770 .unwrap();
1771
1772 let handle = db.source_untyped("events").unwrap();
1773 assert_eq!(handle.name(), "events");
1774 assert_eq!(handle.schema().fields().len(), 2);
1775 }
1776
1777 #[tokio::test]
1778 async fn test_source_not_found() {
1779 let db = LaminarDB::open().unwrap();
1780 let result = db.source_untyped("nonexistent");
1781 assert!(matches!(result, Err(DbError::SourceNotFound(_))));
1782 }
1783
1784 #[tokio::test]
1785 async fn test_show_sources() {
1786 let db = LaminarDB::open().unwrap();
1787 db.execute("CREATE SOURCE a (id INT)").await.unwrap();
1788 db.execute("CREATE SOURCE b (id INT)").await.unwrap();
1789
1790 let result = db.execute("SHOW SOURCES").await.unwrap();
1791 match result {
1792 ExecuteResult::Metadata(batch) => {
1793 assert_eq!(batch.num_rows(), 2);
1794 }
1795 _ => panic!("Expected Metadata result"),
1796 }
1797 }
1798
1799 #[tokio::test]
1800 async fn test_describe_source() {
1801 let db = LaminarDB::open().unwrap();
1802 db.execute("CREATE SOURCE events (id BIGINT, name VARCHAR, active BOOLEAN)")
1803 .await
1804 .unwrap();
1805
1806 let result = db.execute("DESCRIBE events").await.unwrap();
1807 match result {
1808 ExecuteResult::Metadata(batch) => {
1809 assert_eq!(batch.num_rows(), 3);
1810 }
1811 _ => panic!("Expected Metadata result"),
1812 }
1813 }
1814
1815 #[tokio::test]
1816 async fn test_describe_table() {
1817 let db = LaminarDB::open().unwrap();
1818 db.execute("CREATE TABLE products (id BIGINT PRIMARY KEY, name VARCHAR, price DOUBLE)")
1819 .await
1820 .unwrap();
1821 db.execute("INSERT INTO products VALUES (1, 'Widget', 9.99)")
1822 .await
1823 .unwrap();
1824
1825 let result = db.execute("DESCRIBE products").await.unwrap();
1826 match result {
1827 ExecuteResult::Metadata(batch) => {
1828 assert_eq!(batch.num_rows(), 3);
1829 }
1830 _ => panic!("Expected Metadata result"),
1831 }
1832 }
1833
1834 #[tokio::test]
1835 async fn test_describe_materialized_view() {
1836 let db = LaminarDB::open().unwrap();
1837 db.execute("CREATE SOURCE events (id BIGINT, name VARCHAR, value DOUBLE)")
1838 .await
1839 .unwrap();
1840 db.execute(
1841 "CREATE MATERIALIZED VIEW event_counts AS \
1842 SELECT name, COUNT(*) as cnt FROM events GROUP BY name",
1843 )
1844 .await
1845 .unwrap();
1846
1847 let result = db.execute("DESCRIBE event_counts").await.unwrap();
1848 match result {
1849 ExecuteResult::Metadata(batch) => {
1850 assert!(batch.num_rows() >= 2, "Should have at least name and cnt");
1851 }
1852 _ => panic!("Expected Metadata result"),
1853 }
1854 }
1855
1856 #[tokio::test]
1857 async fn test_describe_not_found() {
1858 let db = LaminarDB::open().unwrap();
1859 let result = db.execute("DESCRIBE nonexistent").await;
1860 assert!(result.is_err());
1861 }
1862
1863 #[tokio::test]
1864 async fn test_drop_source() {
1865 let db = LaminarDB::open().unwrap();
1866 db.execute("CREATE SOURCE test (id INT)").await.unwrap();
1867 assert_eq!(db.sources().len(), 1);
1868
1869 db.execute("DROP SOURCE test").await.unwrap();
1870 assert_eq!(db.sources().len(), 0);
1871 }
1872
1873 #[tokio::test]
1874 async fn test_drop_source_if_exists() {
1875 let db = LaminarDB::open().unwrap();
1876 let result = db.execute("DROP SOURCE IF EXISTS nonexistent").await;
1878 assert!(result.is_ok());
1879 }
1880
1881 #[tokio::test]
1882 async fn test_drop_source_not_found() {
1883 let db = LaminarDB::open().unwrap();
1884 let result = db.execute("DROP SOURCE nonexistent").await;
1885 assert!(matches!(result, Err(DbError::SourceNotFound(_))));
1886 }
1887
1888 #[tokio::test]
1889 async fn test_shutdown() {
1890 let db = LaminarDB::open().unwrap();
1891 assert!(!db.is_closed());
1892 db.close();
1893 assert!(db.is_closed());
1894
1895 let result = db.execute("CREATE SOURCE test (id INT)").await;
1896 assert!(matches!(result, Err(DbError::Shutdown)));
1897 }
1898
1899 #[tokio::test]
1900 async fn test_debug_format() {
1901 let db = LaminarDB::open().unwrap();
1902 let debug = format!("{db:?}");
1903 assert!(debug.contains("LaminarDB"));
1904 }
1905
1906 #[tokio::test]
1907 async fn test_explain_create_source() {
1908 let db = LaminarDB::open().unwrap();
1909 let result = db
1910 .execute("EXPLAIN CREATE SOURCE trades (symbol VARCHAR, price DOUBLE)")
1911 .await
1912 .unwrap();
1913 match result {
1914 ExecuteResult::Metadata(batch) => {
1915 assert!(batch.num_rows() > 0);
1916 let keys = batch
1917 .column(0)
1918 .as_any()
1919 .downcast_ref::<StringArray>()
1920 .unwrap();
1921 let key_values: Vec<&str> = (0..batch.num_rows()).map(|i| keys.value(i)).collect();
1923 assert!(key_values.contains(&"plan_type"));
1924 }
1925 _ => panic!("Expected Metadata result for EXPLAIN"),
1926 }
1927 }
1928
1929 #[tokio::test]
1930 async fn test_cancel_query() {
1931 let db = LaminarDB::open().unwrap();
1932 assert_eq!(db.active_query_count(), 0);
1934
1935 let query_id = db.catalog.register_query("SELECT * FROM test");
1937 assert_eq!(db.active_query_count(), 1);
1938
1939 db.cancel_query(query_id).unwrap();
1941 assert_eq!(db.active_query_count(), 0);
1942 }
1943
1944 #[tokio::test]
1945 async fn test_source_and_sink_counts() {
1946 let db = LaminarDB::open().unwrap();
1947 assert_eq!(db.source_count(), 0);
1948 assert_eq!(db.sink_count(), 0);
1949
1950 db.execute("CREATE SOURCE a (id INT)").await.unwrap();
1951 db.execute("CREATE SOURCE b (id INT)").await.unwrap();
1952 assert_eq!(db.source_count(), 2);
1953
1954 db.execute("CREATE SINK output FROM a").await.unwrap();
1955 assert_eq!(db.sink_count(), 1);
1956
1957 db.execute("DROP SOURCE a").await.unwrap();
1958 assert_eq!(db.source_count(), 1);
1959 }
1960
1961 #[tokio::test]
1964 async fn test_multi_statement_execution() {
1965 let db = LaminarDB::open().unwrap();
1966 db.execute("CREATE SOURCE a (id INT); CREATE SOURCE b (id INT); CREATE SINK output FROM a")
1967 .await
1968 .unwrap();
1969 assert_eq!(db.source_count(), 2);
1970 assert_eq!(db.sink_count(), 1);
1971 }
1972
1973 #[tokio::test]
1974 async fn test_multi_statement_trailing_semicolon() {
1975 let db = LaminarDB::open().unwrap();
1976 db.execute("CREATE SOURCE a (id INT);").await.unwrap();
1977 assert_eq!(db.source_count(), 1);
1978 }
1979
1980 #[tokio::test]
1981 async fn test_multi_statement_error_stops() {
1982 let db = LaminarDB::open().unwrap();
1983 let result = db
1985 .execute("CREATE SOURCE a (id INT); CREATE SOURCE a (id INT)")
1986 .await;
1987 assert!(result.is_err());
1988 assert_eq!(db.source_count(), 1);
1990 }
1991
1992 #[tokio::test]
1995 async fn test_config_var_substitution() {
1996 let db = LaminarDB::builder()
1997 .config_var("TABLE_NAME", "events")
1998 .build()
1999 .await
2000 .unwrap();
2001 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2004 assert_eq!(db.source_count(), 1);
2005 }
2006
2007 #[tokio::test]
2010 async fn test_create_stream() {
2011 let db = LaminarDB::open().unwrap();
2012 let result = db
2013 .execute("CREATE STREAM counts AS SELECT COUNT(*) as cnt FROM events")
2014 .await
2015 .unwrap();
2016 match result {
2017 ExecuteResult::Ddl(info) => {
2018 assert_eq!(info.statement_type, "CREATE STREAM");
2019 assert_eq!(info.object_name, "counts");
2020 }
2021 _ => panic!("Expected DDL result"),
2022 }
2023 }
2024
2025 #[tokio::test]
2026 async fn test_drop_stream() {
2027 let db = LaminarDB::open().unwrap();
2028 db.execute("CREATE STREAM counts AS SELECT COUNT(*) as cnt FROM events")
2029 .await
2030 .unwrap();
2031 let result = db.execute("DROP STREAM counts").await.unwrap();
2032 match result {
2033 ExecuteResult::Ddl(info) => {
2034 assert_eq!(info.statement_type, "DROP STREAM");
2035 }
2036 _ => panic!("Expected DDL result"),
2037 }
2038 }
2039
2040 #[tokio::test]
2041 async fn test_drop_stream_not_found() {
2042 let db = LaminarDB::open().unwrap();
2043 let result = db.execute("DROP STREAM nonexistent").await;
2044 assert!(matches!(result, Err(DbError::StreamNotFound(_))));
2045 }
2046
2047 #[tokio::test]
2048 async fn test_drop_stream_if_exists() {
2049 let db = LaminarDB::open().unwrap();
2050 let result = db.execute("DROP STREAM IF EXISTS nonexistent").await;
2051 assert!(result.is_ok());
2052 }
2053
2054 #[tokio::test]
2055 async fn test_show_streams() {
2056 let db = LaminarDB::open().unwrap();
2057 db.execute("CREATE STREAM a AS SELECT 1 FROM events")
2058 .await
2059 .unwrap();
2060 let result = db.execute("SHOW STREAMS").await.unwrap();
2061 match result {
2062 ExecuteResult::Metadata(batch) => {
2063 assert_eq!(batch.num_rows(), 1);
2064 }
2065 _ => panic!("Expected Metadata result"),
2066 }
2067 }
2068
2069 #[tokio::test]
2070 async fn test_stream_duplicate_error() {
2071 let db = LaminarDB::open().unwrap();
2072 db.execute("CREATE STREAM counts AS SELECT COUNT(*) FROM events")
2073 .await
2074 .unwrap();
2075 let result = db
2076 .execute("CREATE STREAM counts AS SELECT COUNT(*) FROM events")
2077 .await;
2078 assert!(matches!(result, Err(DbError::StreamAlreadyExists(_))));
2079 }
2080
2081 #[tokio::test]
2082 async fn test_create_table() {
2083 let db = LaminarDB::open().unwrap();
2084 let result = db
2085 .execute("CREATE TABLE products (id INT, name VARCHAR, price DOUBLE)")
2086 .await
2087 .unwrap();
2088
2089 match result {
2090 ExecuteResult::Ddl(info) => {
2091 assert_eq!(info.statement_type, "CREATE TABLE");
2092 assert_eq!(info.object_name, "products");
2093 }
2094 _ => panic!("Expected DDL result"),
2095 }
2096 }
2097
2098 #[tokio::test]
2099 async fn test_create_table_and_query_empty() {
2100 let db = LaminarDB::open().unwrap();
2101 db.execute("CREATE TABLE dim (id INT, label VARCHAR)")
2102 .await
2103 .unwrap();
2104
2105 let result = db.execute("SELECT * FROM dim").await.unwrap();
2106 match result {
2107 ExecuteResult::Query(q) => {
2108 assert_eq!(q.schema().fields().len(), 2);
2109 }
2110 _ => panic!("Expected Query result"),
2111 }
2112 }
2113
2114 #[tokio::test]
2115 async fn test_insert_into_source() {
2116 let db = LaminarDB::open().unwrap();
2117 db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
2118 .await
2119 .unwrap();
2120
2121 let result = db
2122 .execute("INSERT INTO events VALUES (1, 3.14), (2, 2.72)")
2123 .await
2124 .unwrap();
2125 match result {
2126 ExecuteResult::RowsAffected(n) => assert_eq!(n, 2),
2127 _ => panic!("Expected RowsAffected"),
2128 }
2129 }
2130
2131 #[tokio::test]
2132 async fn test_insert_into_table() {
2133 let db = LaminarDB::open().unwrap();
2134 db.execute("CREATE TABLE products (id INT, name VARCHAR, price DOUBLE)")
2135 .await
2136 .unwrap();
2137
2138 let result = db
2139 .execute("INSERT INTO products VALUES (1, 'Widget', 9.99)")
2140 .await
2141 .unwrap();
2142 match result {
2143 ExecuteResult::RowsAffected(n) => assert_eq!(n, 1),
2144 _ => panic!("Expected RowsAffected"),
2145 }
2146 }
2147
2148 #[tokio::test]
2149 async fn test_insert_into_nonexistent_table() {
2150 let db = LaminarDB::open().unwrap();
2151 let result = db.execute("INSERT INTO nosuch VALUES (1, 2)").await;
2152 assert!(result.is_err());
2153 }
2154
2155 #[tokio::test]
2156 async fn test_create_table_with_types() {
2157 let db = LaminarDB::open().unwrap();
2158 let result = db
2159 .execute("CREATE TABLE orders (id BIGINT NOT NULL, qty SMALLINT, total DECIMAL(10,2))")
2160 .await
2161 .unwrap();
2162
2163 match result {
2164 ExecuteResult::Ddl(info) => {
2165 assert_eq!(info.statement_type, "CREATE TABLE");
2166 assert_eq!(info.object_name, "orders");
2167 }
2168 _ => panic!("Expected DDL result"),
2169 }
2170 }
2171
2172 #[tokio::test]
2173 async fn test_insert_null_values() {
2174 let db = LaminarDB::open().unwrap();
2175 db.execute("CREATE SOURCE data (id BIGINT, label VARCHAR)")
2176 .await
2177 .unwrap();
2178
2179 let result = db
2180 .execute("INSERT INTO data VALUES (1, NULL)")
2181 .await
2182 .unwrap();
2183 match result {
2184 ExecuteResult::RowsAffected(n) => assert_eq!(n, 1),
2185 _ => panic!("Expected RowsAffected"),
2186 }
2187 }
2188
2189 #[tokio::test]
2190 async fn test_insert_negative_values() {
2191 let db = LaminarDB::open().unwrap();
2192 db.execute("CREATE SOURCE temps (id BIGINT, celsius DOUBLE)")
2193 .await
2194 .unwrap();
2195
2196 let result = db
2197 .execute("INSERT INTO temps VALUES (1, -40.0)")
2198 .await
2199 .unwrap();
2200 match result {
2201 ExecuteResult::RowsAffected(n) => assert_eq!(n, 1),
2202 _ => panic!("Expected RowsAffected"),
2203 }
2204 }
2205
2206 #[tokio::test]
2209 async fn test_create_source_unknown_connector() {
2210 let db = LaminarDB::open().unwrap();
2211 let result = db
2213 .execute(
2214 "CREATE SOURCE events FROM NONEXISTENT \
2215 ('topic' = 'test') SCHEMA (id INT)",
2216 )
2217 .await;
2218 assert!(result.is_err());
2219 let err = result.unwrap_err().to_string();
2220 assert!(err.contains("Unknown source connector type"), "got: {err}");
2221 }
2222
2223 #[tokio::test]
2224 async fn test_create_sink_unknown_connector() {
2225 let db = LaminarDB::open().unwrap();
2226 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2227 let result = db
2229 .execute(
2230 "CREATE SINK output FROM events \
2231 INTO NONEXISTENT ('topic' = 'out')",
2232 )
2233 .await;
2234 assert!(result.is_err());
2235 let err = result.unwrap_err().to_string();
2236 assert!(err.contains("Unknown sink connector type"), "got: {err}");
2237 }
2238
2239 #[tokio::test]
2240 async fn test_create_source_invalid_format() {
2241 let db = LaminarDB::open().unwrap();
2246 let result = db
2247 .execute(
2248 "CREATE SOURCE events FROM NONEXISTENT \
2249 FORMAT BADFORMAT SCHEMA (id INT)",
2250 )
2251 .await;
2252 assert!(result.is_err());
2253 }
2254
2255 #[tokio::test]
2256 async fn test_connector_registry_accessor() {
2257 let db = LaminarDB::open().unwrap();
2258 let registry = db.connector_registry();
2259
2260 #[allow(unused_mut)]
2263 let mut expected_sources = 0;
2264 #[allow(unused_mut)]
2265 let mut expected_sinks = 0;
2266
2267 #[cfg(feature = "kafka")]
2268 {
2269 expected_sources += 1; expected_sinks += 1; }
2272 #[cfg(feature = "postgres-cdc")]
2273 {
2274 expected_sources += 1; }
2276 #[cfg(feature = "postgres-sink")]
2277 {
2278 expected_sinks += 1; }
2280 #[cfg(feature = "delta-lake")]
2281 {
2282 expected_sources += 1; expected_sinks += 1; }
2285 #[cfg(feature = "websocket")]
2286 {
2287 expected_sources += 1; expected_sinks += 1; }
2290 #[cfg(feature = "mysql-cdc")]
2291 {
2292 expected_sources += 1; }
2294 #[cfg(feature = "files")]
2295 {
2296 expected_sources += 1; expected_sinks += 1; }
2299
2300 assert_eq!(registry.list_sources().len(), expected_sources);
2301 assert_eq!(registry.list_sinks().len(), expected_sinks);
2302 }
2303
2304 #[tokio::test]
2305 async fn test_builder_register_connector() {
2306 use std::sync::Arc;
2307
2308 let db = LaminarDB::builder()
2309 .register_connector(|registry| {
2310 registry.register_source(
2311 "test-source",
2312 laminar_connectors::config::ConnectorInfo {
2313 name: "test-source".to_string(),
2314 display_name: "Test Source".to_string(),
2315 version: "0.1.0".to_string(),
2316 is_source: true,
2317 is_sink: false,
2318 config_keys: vec![],
2319 },
2320 Arc::new(|| Box::new(laminar_connectors::testing::MockSourceConnector::new())),
2321 );
2322 })
2323 .build()
2324 .await
2325 .unwrap();
2326 let registry = db.connector_registry();
2327 assert!(registry.list_sources().contains(&"test-source".to_string()));
2328 }
2329
2330 #[tokio::test]
2333 async fn test_create_materialized_view() {
2334 let db = LaminarDB::open().unwrap();
2335 db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
2336 .await
2337 .unwrap();
2338
2339 let result = db
2340 .execute("CREATE MATERIALIZED VIEW event_stats AS SELECT * FROM events")
2341 .await;
2342
2343 if let Ok(ExecuteResult::Ddl(info)) = &result {
2347 assert_eq!(info.statement_type, "CREATE MATERIALIZED VIEW");
2348 assert_eq!(info.object_name, "event_stats");
2349 }
2350 }
2351
2352 #[tokio::test]
2353 async fn test_mv_registry_base_tables() {
2354 let db = LaminarDB::open().unwrap();
2355 db.execute("CREATE SOURCE trades (sym VARCHAR, price DOUBLE)")
2356 .await
2357 .unwrap();
2358
2359 let registry = db.mv_registry.lock();
2360 assert!(registry.is_base_table("trades"));
2361 }
2362
2363 #[tokio::test]
2364 async fn test_show_materialized_views_empty() {
2365 let db = LaminarDB::open().unwrap();
2366 let result = db.execute("SHOW MATERIALIZED VIEWS").await.unwrap();
2367 match result {
2368 ExecuteResult::Metadata(batch) => {
2369 assert_eq!(batch.num_rows(), 0);
2370 assert_eq!(batch.num_columns(), 3);
2371 assert_eq!(batch.schema().field(0).name(), "view_name");
2372 assert_eq!(batch.schema().field(1).name(), "sql");
2373 assert_eq!(batch.schema().field(2).name(), "state");
2374 }
2375 _ => panic!("Expected Metadata result"),
2376 }
2377 }
2378
2379 #[tokio::test]
2380 async fn test_drop_materialized_view_if_exists() {
2381 let db = LaminarDB::open().unwrap();
2382 let result = db
2384 .execute("DROP MATERIALIZED VIEW IF EXISTS nonexistent")
2385 .await
2386 .unwrap();
2387 match result {
2388 ExecuteResult::Ddl(info) => {
2389 assert_eq!(info.statement_type, "DROP MATERIALIZED VIEW");
2390 }
2391 _ => panic!("Expected Ddl result"),
2392 }
2393 }
2394
2395 #[tokio::test]
2396 async fn test_drop_materialized_view_not_found() {
2397 let db = LaminarDB::open().unwrap();
2398 let result = db.execute("DROP MATERIALIZED VIEW nonexistent").await;
2399 assert!(result.is_err());
2400 let err = result.unwrap_err().to_string();
2401 assert!(
2402 err.contains("not found"),
2403 "Expected 'not found' error, got: {err}"
2404 );
2405 }
2406
2407 #[tokio::test]
2408 async fn test_create_mv_if_not_exists() {
2409 let db = LaminarDB::open().unwrap();
2410 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2411
2412 {
2414 let mut registry = db.mv_registry.lock();
2415 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2416 let mv = laminar_core::mv::MaterializedView::new(
2417 "my_view",
2418 "SELECT * FROM events",
2419 vec!["events".to_string()],
2420 schema,
2421 );
2422 registry.register(mv).unwrap();
2423 }
2424
2425 let result = db
2427 .execute("CREATE MATERIALIZED VIEW IF NOT EXISTS my_view AS SELECT * FROM events")
2428 .await
2429 .unwrap();
2430 match result {
2431 ExecuteResult::Ddl(info) => {
2432 assert_eq!(info.object_name, "my_view");
2433 }
2434 _ => panic!("Expected Ddl result"),
2435 }
2436 }
2437
2438 #[tokio::test]
2439 async fn test_create_mv_duplicate_error() {
2440 let db = LaminarDB::open().unwrap();
2441 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2442
2443 {
2445 let mut registry = db.mv_registry.lock();
2446 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2447 let mv = laminar_core::mv::MaterializedView::new(
2448 "my_view",
2449 "SELECT * FROM events",
2450 vec!["events".to_string()],
2451 schema,
2452 );
2453 registry.register(mv).unwrap();
2454 }
2455
2456 let result = db
2458 .execute("CREATE MATERIALIZED VIEW my_view AS SELECT * FROM events")
2459 .await;
2460 assert!(result.is_err());
2461 let err = result.unwrap_err().to_string();
2462 assert!(
2463 err.contains("already exists"),
2464 "Expected 'already exists' error, got: {err}"
2465 );
2466 }
2467
2468 #[tokio::test]
2469 async fn test_show_materialized_views_with_entries() {
2470 let db = LaminarDB::open().unwrap();
2471 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2472
2473 {
2475 let mut registry = db.mv_registry.lock();
2476 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2477 let mv = laminar_core::mv::MaterializedView::new(
2478 "view_a",
2479 "SELECT * FROM events",
2480 vec!["events".to_string()],
2481 schema,
2482 );
2483 registry.register(mv).unwrap();
2484 }
2485
2486 let result = db.execute("SHOW MATERIALIZED VIEWS").await.unwrap();
2487 match result {
2488 ExecuteResult::Metadata(batch) => {
2489 assert_eq!(batch.num_rows(), 1);
2490 let names = batch
2491 .column(0)
2492 .as_any()
2493 .downcast_ref::<StringArray>()
2494 .unwrap();
2495 assert_eq!(names.value(0), "view_a");
2496 }
2497 _ => panic!("Expected Metadata result"),
2498 }
2499 }
2500
2501 #[tokio::test]
2502 async fn test_drop_mv_and_show() {
2503 let db = LaminarDB::open().unwrap();
2504 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
2505
2506 {
2508 let mut registry = db.mv_registry.lock();
2509 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2510 let mv = laminar_core::mv::MaterializedView::new(
2511 "temp_view",
2512 "SELECT * FROM events",
2513 vec!["events".to_string()],
2514 schema,
2515 );
2516 registry.register(mv).unwrap();
2517 }
2518
2519 assert_eq!(db.mv_registry.lock().len(), 1);
2521
2522 db.execute("DROP MATERIALIZED VIEW temp_view")
2524 .await
2525 .unwrap();
2526
2527 assert_eq!(db.mv_registry.lock().len(), 0);
2529 }
2530
2531 #[tokio::test]
2532 async fn test_debug_includes_mv_count() {
2533 let db = LaminarDB::open().unwrap();
2534 let debug = format!("{db:?}");
2535 assert!(
2536 debug.contains("materialized_views: 0"),
2537 "Debug should include MV count, got: {debug}"
2538 );
2539 }
2540
2541 #[tokio::test]
2544 async fn test_pipeline_topology_empty() {
2545 let db = LaminarDB::open().unwrap();
2546 let topo = db.pipeline_topology();
2547 assert!(topo.nodes.is_empty());
2548 assert!(topo.edges.is_empty());
2549 }
2550
2551 #[tokio::test]
2552 async fn test_pipeline_topology_sources_only() {
2553 use crate::handle::PipelineNodeType;
2554
2555 let db = LaminarDB::open().unwrap();
2556 db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
2557 .await
2558 .unwrap();
2559 db.execute("CREATE SOURCE clicks (url VARCHAR, ts BIGINT)")
2560 .await
2561 .unwrap();
2562
2563 let topo = db.pipeline_topology();
2564 assert_eq!(topo.nodes.len(), 2);
2565 assert!(topo.edges.is_empty());
2566
2567 for node in &topo.nodes {
2568 assert_eq!(node.node_type, PipelineNodeType::Source);
2569 assert!(node.schema.is_some());
2570 assert!(node.sql.is_none());
2571 }
2572 }
2573
2574 #[tokio::test]
2575 async fn test_pipeline_topology_full_pipeline() {
2576 use crate::handle::PipelineNodeType;
2577
2578 let db = LaminarDB::open().unwrap();
2579 db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
2580 .await
2581 .unwrap();
2582 db.execute("CREATE STREAM agg AS SELECT COUNT(*) as cnt FROM events GROUP BY id")
2583 .await
2584 .unwrap();
2585 db.execute("CREATE SINK output FROM agg").await.unwrap();
2586
2587 let topo = db.pipeline_topology();
2588
2589 assert_eq!(topo.nodes.len(), 3);
2591
2592 let sources: Vec<_> = topo
2593 .nodes
2594 .iter()
2595 .filter(|n| n.node_type == PipelineNodeType::Source)
2596 .collect();
2597 let streams: Vec<_> = topo
2598 .nodes
2599 .iter()
2600 .filter(|n| n.node_type == PipelineNodeType::Stream)
2601 .collect();
2602 let sinks: Vec<_> = topo
2603 .nodes
2604 .iter()
2605 .filter(|n| n.node_type == PipelineNodeType::Sink)
2606 .collect();
2607
2608 assert_eq!(sources.len(), 1);
2609 assert_eq!(streams.len(), 1);
2610 assert_eq!(sinks.len(), 1);
2611
2612 assert_eq!(sources[0].name, "events");
2613 assert_eq!(streams[0].name, "agg");
2614 assert!(streams[0].sql.is_some());
2615 assert_eq!(sinks[0].name, "output");
2616
2617 assert_eq!(topo.edges.len(), 2);
2619 assert!(topo
2620 .edges
2621 .iter()
2622 .any(|e| e.from == "events" && e.to == "agg"));
2623 assert!(topo
2624 .edges
2625 .iter()
2626 .any(|e| e.from == "agg" && e.to == "output"));
2627 }
2628
2629 #[tokio::test]
2630 async fn test_pipeline_topology_fan_out() {
2631 let db = LaminarDB::open().unwrap();
2632 db.execute("CREATE SOURCE ticks (symbol VARCHAR, price DOUBLE)")
2633 .await
2634 .unwrap();
2635 db.execute("CREATE STREAM ohlc AS SELECT symbol, MIN(price) FROM ticks GROUP BY symbol")
2636 .await
2637 .unwrap();
2638 db.execute("CREATE STREAM vol AS SELECT symbol, COUNT(*) FROM ticks GROUP BY symbol")
2639 .await
2640 .unwrap();
2641
2642 let topo = db.pipeline_topology();
2643
2644 assert_eq!(topo.nodes.len(), 3);
2646
2647 let ticks_edges: Vec<_> = topo.edges.iter().filter(|e| e.from == "ticks").collect();
2649 assert_eq!(ticks_edges.len(), 2);
2650
2651 let targets: Vec<&str> = ticks_edges.iter().map(|e| e.to.as_str()).collect();
2652 assert!(targets.contains(&"ohlc"));
2653 assert!(targets.contains(&"vol"));
2654 }
2655
2656 #[tokio::test]
2657 async fn test_streams_method() {
2658 let db = LaminarDB::open().unwrap();
2659 assert!(db.streams().is_empty());
2660
2661 db.execute("CREATE STREAM counts AS SELECT COUNT(*) FROM events")
2662 .await
2663 .unwrap();
2664
2665 let streams = db.streams();
2666 assert_eq!(streams.len(), 1);
2667 assert_eq!(streams[0].name, "counts");
2668 assert!(streams[0].sql.is_some());
2669 assert!(
2670 streams[0].sql.as_ref().unwrap().contains("COUNT"),
2671 "SQL should contain the query: {:?}",
2672 streams[0].sql,
2673 );
2674 }
2675
2676 #[tokio::test]
2677 async fn test_pipeline_node_types() {
2678 use crate::handle::PipelineNodeType;
2679
2680 let db = LaminarDB::open().unwrap();
2681 db.execute("CREATE SOURCE src (id INT)").await.unwrap();
2682 db.execute("CREATE STREAM st AS SELECT * FROM src")
2683 .await
2684 .unwrap();
2685 db.execute("CREATE SINK sk FROM st").await.unwrap();
2686
2687 let topo = db.pipeline_topology();
2688
2689 let find = |name: &str| topo.nodes.iter().find(|n| n.name == name).unwrap();
2690
2691 assert_eq!(find("src").node_type, PipelineNodeType::Source);
2692 assert_eq!(find("st").node_type, PipelineNodeType::Stream);
2693 assert_eq!(find("sk").node_type, PipelineNodeType::Sink);
2694 }
2695
2696 #[tokio::test]
2699 async fn test_create_table_with_primary_key() {
2700 let db = LaminarDB::open().unwrap();
2701 let result = db
2702 .execute(
2703 "CREATE TABLE instruments (\
2704 symbol VARCHAR PRIMARY KEY, \
2705 company_name VARCHAR, \
2706 sector VARCHAR\
2707 )",
2708 )
2709 .await
2710 .unwrap();
2711
2712 match result {
2713 ExecuteResult::Ddl(info) => {
2714 assert_eq!(info.statement_type, "CREATE TABLE");
2715 assert_eq!(info.object_name, "instruments");
2716 }
2717 _ => panic!("Expected DDL result"),
2718 }
2719
2720 let ts = db.table_store.read();
2722 assert!(ts.has_table("instruments"));
2723 assert_eq!(ts.primary_key("instruments"), Some("symbol"));
2724 assert_eq!(ts.table_row_count("instruments"), 0);
2725 }
2726
2727 #[tokio::test]
2728 async fn test_create_table_with_connector_options() {
2729 let db = LaminarDB::open().unwrap();
2730 let result = db
2731 .execute(
2732 "CREATE TABLE instruments (\
2733 symbol VARCHAR PRIMARY KEY, \
2734 company_name VARCHAR\
2735 ) WITH (connector = 'kafka', topic = 'instruments')",
2736 )
2737 .await
2738 .unwrap();
2739
2740 match result {
2741 ExecuteResult::Ddl(info) => {
2742 assert_eq!(info.object_name, "instruments");
2743 }
2744 _ => panic!("Expected DDL result"),
2745 }
2746
2747 let mgr = db.connector_manager.lock();
2749 let tables = mgr.tables();
2750 assert!(tables.contains_key("instruments"));
2751 let reg = &tables["instruments"];
2752 assert_eq!(reg.connector_type.as_deref(), Some("kafka"));
2753 assert_eq!(reg.primary_key, "symbol");
2754
2755 let ts = db.table_store.read();
2757 assert_eq!(ts.connector("instruments"), Some("kafka"));
2758 }
2759
2760 #[tokio::test]
2761 async fn test_insert_into_table_with_pk_upserts() {
2762 let db = LaminarDB::open().unwrap();
2763 db.execute(
2764 "CREATE TABLE products (\
2765 id INT PRIMARY KEY, \
2766 name VARCHAR, \
2767 price DOUBLE\
2768 )",
2769 )
2770 .await
2771 .unwrap();
2772
2773 db.execute("INSERT INTO products VALUES (1, 'Widget', 9.99)")
2775 .await
2776 .unwrap();
2777 assert_eq!(db.table_store.read().table_row_count("products"), 1);
2778
2779 db.execute("INSERT INTO products VALUES (1, 'Super Widget', 19.99)")
2781 .await
2782 .unwrap();
2783 assert_eq!(db.table_store.read().table_row_count("products"), 1);
2784
2785 db.execute("INSERT INTO products VALUES (2, 'Gadget', 14.99)")
2787 .await
2788 .unwrap();
2789 assert_eq!(db.table_store.read().table_row_count("products"), 2);
2790
2791 let result = db.execute("SELECT * FROM products").await.unwrap();
2793 match result {
2794 ExecuteResult::Query(q) => {
2795 assert_eq!(q.schema().fields().len(), 3);
2796 }
2797 _ => panic!("Expected Query result"),
2798 }
2799 }
2800
2801 #[tokio::test]
2802 async fn test_show_tables() {
2803 let db = LaminarDB::open().unwrap();
2804
2805 let result = db.execute("SHOW TABLES").await.unwrap();
2807 match result {
2808 ExecuteResult::Metadata(batch) => {
2809 assert_eq!(batch.num_rows(), 0);
2810 assert_eq!(batch.num_columns(), 4);
2811 assert_eq!(batch.schema().field(0).name(), "name");
2812 assert_eq!(batch.schema().field(1).name(), "primary_key");
2813 assert_eq!(batch.schema().field(2).name(), "row_count");
2814 assert_eq!(batch.schema().field(3).name(), "connector");
2815 }
2816 _ => panic!("Expected Metadata result"),
2817 }
2818
2819 db.execute("CREATE TABLE t (id INT PRIMARY KEY, val VARCHAR)")
2821 .await
2822 .unwrap();
2823 let result = db.execute("SHOW TABLES").await.unwrap();
2824 match result {
2825 ExecuteResult::Metadata(batch) => {
2826 assert_eq!(batch.num_rows(), 1);
2827 }
2828 _ => panic!("Expected Metadata result"),
2829 }
2830 }
2831
2832 #[tokio::test]
2833 async fn test_drop_table() {
2834 let db = LaminarDB::open().unwrap();
2835 db.execute("CREATE TABLE t (id INT PRIMARY KEY, val VARCHAR)")
2836 .await
2837 .unwrap();
2838 assert!(db.table_store.read().has_table("t"));
2839
2840 db.execute("DROP TABLE t").await.unwrap();
2841 assert!(!db.table_store.read().has_table("t"));
2842 }
2843
2844 #[tokio::test]
2845 async fn test_drop_table_if_exists() {
2846 let db = LaminarDB::open().unwrap();
2847 let result = db.execute("DROP TABLE IF EXISTS nonexistent").await;
2848 assert!(result.is_ok());
2849 }
2850
2851 #[tokio::test]
2854 async fn test_having_filters_grouped_results() {
2855 let db = LaminarDB::open().unwrap();
2856
2857 db.ctx
2859 .sql(
2860 "CREATE TABLE hv_trades AS SELECT * FROM (VALUES \
2861 ('AAPL', 100), ('GOOG', 5), ('MSFT', 50)) \
2862 AS t(symbol, volume)",
2863 )
2864 .await
2865 .unwrap();
2866
2867 let df = db
2868 .ctx
2869 .sql("SELECT symbol, volume FROM hv_trades WHERE volume > 10 ORDER BY symbol")
2870 .await
2871 .unwrap();
2872
2873 let batches = df.collect().await.unwrap();
2874 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2875 assert_eq!(total_rows, 2);
2877 }
2878
2879 #[tokio::test]
2880 async fn test_having_with_aggregate() {
2881 let db = LaminarDB::open().unwrap();
2882
2883 db.ctx
2884 .sql(
2885 "CREATE TABLE hv_orders AS SELECT * FROM (VALUES \
2886 ('A', 100), ('A', 200), ('B', 50), ('B', 30), ('C', 500)) \
2887 AS t(category, amount)",
2888 )
2889 .await
2890 .unwrap();
2891
2892 let df = db
2894 .ctx
2895 .sql(
2896 "SELECT category, SUM(amount) as total \
2897 FROM hv_orders GROUP BY category \
2898 HAVING SUM(amount) > 100 ORDER BY category",
2899 )
2900 .await
2901 .unwrap();
2902
2903 let batches = df.collect().await.unwrap();
2904 assert!(!batches.is_empty());
2905
2906 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2907 assert_eq!(total_rows, 2);
2909 }
2910
2911 #[tokio::test]
2912 async fn test_having_all_filtered_out() {
2913 let db = LaminarDB::open().unwrap();
2914
2915 db.ctx
2916 .sql(
2917 "CREATE TABLE items AS SELECT * FROM (VALUES \
2918 ('x', 1), ('y', 2)) AS t(name, qty)",
2919 )
2920 .await
2921 .unwrap();
2922
2923 let df = db
2924 .ctx
2925 .sql("SELECT name, SUM(qty) as total FROM items GROUP BY name HAVING SUM(qty) > 1000")
2926 .await
2927 .unwrap();
2928
2929 let batches = df.collect().await.unwrap();
2930 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2931 assert_eq!(total_rows, 0);
2932 }
2933
2934 #[tokio::test]
2935 async fn test_having_compound_predicate() {
2936 let db = LaminarDB::open().unwrap();
2937
2938 db.ctx
2939 .sql(
2940 "CREATE TABLE sales AS SELECT * FROM (VALUES \
2941 ('A', 100), ('A', 200), ('B', 50), ('C', 10), ('C', 20)) \
2942 AS t(region, amount)",
2943 )
2944 .await
2945 .unwrap();
2946
2947 let df = db
2948 .ctx
2949 .sql(
2950 "SELECT region, COUNT(*) as cnt, SUM(amount) as total \
2951 FROM sales GROUP BY region \
2952 HAVING COUNT(*) >= 2 AND SUM(amount) > 25 \
2953 ORDER BY region",
2954 )
2955 .await
2956 .unwrap();
2957
2958 let batches = df.collect().await.unwrap();
2959 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
2960 assert_eq!(total_rows, 2);
2964 }
2965
2966 #[tokio::test]
2969 async fn test_multi_join_two_way_lookup() {
2970 let db = LaminarDB::open().unwrap();
2971
2972 db.ctx
2974 .sql(
2975 "CREATE TABLE orders AS SELECT * FROM (VALUES \
2976 (1, 100, 'A'), (2, 200, 'B')) AS t(id, customer_id, product_code)",
2977 )
2978 .await
2979 .unwrap();
2980 db.ctx
2981 .sql(
2982 "CREATE TABLE customers AS SELECT * FROM (VALUES \
2983 (100, 'Alice'), (200, 'Bob')) AS t(id, name)",
2984 )
2985 .await
2986 .unwrap();
2987 db.ctx
2988 .sql(
2989 "CREATE TABLE products AS SELECT * FROM (VALUES \
2990 ('A', 'Widget'), ('B', 'Gadget')) AS t(code, label)",
2991 )
2992 .await
2993 .unwrap();
2994
2995 let df = db
2997 .ctx
2998 .sql(
2999 "SELECT o.id, c.name, p.label \
3000 FROM orders o \
3001 JOIN customers c ON o.customer_id = c.id \
3002 JOIN products p ON o.product_code = p.code \
3003 ORDER BY o.id",
3004 )
3005 .await
3006 .unwrap();
3007
3008 let batches = df.collect().await.unwrap();
3009 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3010 assert_eq!(total_rows, 2);
3011 }
3012
3013 #[tokio::test]
3014 async fn test_multi_join_three_way() {
3015 let db = LaminarDB::open().unwrap();
3016
3017 db.ctx
3018 .sql("CREATE TABLE t1 AS SELECT * FROM (VALUES (1, 10), (2, 20)) AS t(id, fk1)")
3019 .await
3020 .unwrap();
3021 db.ctx
3022 .sql("CREATE TABLE t2 AS SELECT * FROM (VALUES (10, 100), (20, 200)) AS t(id, fk2)")
3023 .await
3024 .unwrap();
3025 db.ctx
3026 .sql("CREATE TABLE t3 AS SELECT * FROM (VALUES (100, 'x'), (200, 'y')) AS t(id, fk3)")
3027 .await
3028 .unwrap();
3029 db.ctx
3030 .sql("CREATE TABLE t4 AS SELECT * FROM (VALUES ('x', 'final_x'), ('y', 'final_y')) AS t(id, val)")
3031 .await
3032 .unwrap();
3033
3034 let df = db
3035 .ctx
3036 .sql(
3037 "SELECT t1.id, t4.val \
3038 FROM t1 \
3039 JOIN t2 ON t1.fk1 = t2.id \
3040 JOIN t3 ON t2.fk2 = t3.id \
3041 JOIN t4 ON t3.fk3 = t4.id \
3042 ORDER BY t1.id",
3043 )
3044 .await
3045 .unwrap();
3046
3047 let batches = df.collect().await.unwrap();
3048 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3049 assert_eq!(total_rows, 2);
3050 }
3051
3052 #[tokio::test]
3053 async fn test_multi_join_mixed_types() {
3054 let db = LaminarDB::open().unwrap();
3055
3056 db.ctx
3057 .sql(
3058 "CREATE TABLE stream_a AS SELECT * FROM (VALUES \
3059 (1, 'k1'), (2, 'k2')) AS t(id, key)",
3060 )
3061 .await
3062 .unwrap();
3063 db.ctx
3064 .sql(
3065 "CREATE TABLE stream_b AS SELECT * FROM (VALUES \
3066 ('k1', 10), ('k2', 20)) AS t(key, value)",
3067 )
3068 .await
3069 .unwrap();
3070 db.ctx
3071 .sql(
3072 "CREATE TABLE dim_c AS SELECT * FROM (VALUES \
3073 ('k1', 'label1'), ('k2', 'label2')) AS t(key, label)",
3074 )
3075 .await
3076 .unwrap();
3077
3078 let df = db
3080 .ctx
3081 .sql(
3082 "SELECT a.id, b.value, c.label \
3083 FROM stream_a a \
3084 JOIN stream_b b ON a.key = b.key \
3085 LEFT JOIN dim_c c ON a.key = c.key \
3086 ORDER BY a.id",
3087 )
3088 .await
3089 .unwrap();
3090
3091 let batches = df.collect().await.unwrap();
3092 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3093 assert_eq!(total_rows, 2);
3094 }
3095
3096 #[tokio::test]
3097 async fn test_multi_join_single_backward_compat() {
3098 let db = LaminarDB::open().unwrap();
3099
3100 db.ctx
3101 .sql(
3102 "CREATE TABLE left_t AS SELECT * FROM (VALUES \
3103 (1, 'a'), (2, 'b')) AS t(id, val)",
3104 )
3105 .await
3106 .unwrap();
3107 db.ctx
3108 .sql(
3109 "CREATE TABLE right_t AS SELECT * FROM (VALUES \
3110 (1, 'x'), (2, 'y')) AS t(id, data)",
3111 )
3112 .await
3113 .unwrap();
3114
3115 let df = db
3117 .ctx
3118 .sql(
3119 "SELECT l.id, l.val, r.data \
3120 FROM left_t l JOIN right_t r ON l.id = r.id \
3121 ORDER BY l.id",
3122 )
3123 .await
3124 .unwrap();
3125
3126 let batches = df.collect().await.unwrap();
3127 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3128 assert_eq!(total_rows, 2);
3129 }
3130
3131 #[tokio::test]
3134 async fn test_frame_moving_average() {
3135 let db = LaminarDB::open().unwrap();
3136
3137 db.ctx
3138 .sql(
3139 "CREATE TABLE frame_prices AS SELECT * FROM (VALUES \
3140 (1, 10.0), (2, 20.0), (3, 30.0), (4, 40.0), (5, 50.0)) \
3141 AS t(id, price)",
3142 )
3143 .await
3144 .unwrap();
3145
3146 let df = db
3147 .ctx
3148 .sql(
3149 "SELECT id, AVG(price) OVER (ORDER BY id \
3150 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS ma \
3151 FROM frame_prices ORDER BY id",
3152 )
3153 .await
3154 .unwrap();
3155
3156 let batches = df.collect().await.unwrap();
3157 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3158 assert_eq!(total_rows, 5);
3159
3160 let ma_col = batches[0]
3162 .column(1)
3163 .as_any()
3164 .downcast_ref::<arrow::array::Float64Array>()
3165 .unwrap();
3166 assert!((ma_col.value(2) - 20.0).abs() < 0.01);
3167 }
3168
3169 #[tokio::test]
3170 async fn test_frame_running_sum() {
3171 let db = LaminarDB::open().unwrap();
3172
3173 db.ctx
3174 .sql(
3175 "CREATE TABLE frame_amounts AS SELECT * FROM (VALUES \
3176 (1, 100.0), (2, 200.0), (3, 300.0)) AS t(id, amount)",
3177 )
3178 .await
3179 .unwrap();
3180
3181 let df = db
3182 .ctx
3183 .sql(
3184 "SELECT id, SUM(amount) OVER (ORDER BY id \
3185 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running \
3186 FROM frame_amounts ORDER BY id",
3187 )
3188 .await
3189 .unwrap();
3190
3191 let batches = df.collect().await.unwrap();
3192 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3193 assert_eq!(total_rows, 3);
3194
3195 let sum_col = batches[0]
3196 .column(1)
3197 .as_any()
3198 .downcast_ref::<arrow::array::Float64Array>()
3199 .unwrap();
3200 assert!((sum_col.value(2) - 600.0).abs() < 0.01);
3202 }
3203
3204 #[tokio::test]
3205 async fn test_frame_rolling_max() {
3206 let db = LaminarDB::open().unwrap();
3207
3208 db.ctx
3209 .sql(
3210 "CREATE TABLE frame_vals AS SELECT * FROM (VALUES \
3211 (1, 5.0), (2, 15.0), (3, 10.0), (4, 20.0)) AS t(id, price)",
3212 )
3213 .await
3214 .unwrap();
3215
3216 let df = db
3217 .ctx
3218 .sql(
3219 "SELECT id, MAX(price) OVER (ORDER BY id \
3220 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS rmax \
3221 FROM frame_vals ORDER BY id",
3222 )
3223 .await
3224 .unwrap();
3225
3226 let batches = df.collect().await.unwrap();
3227 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3228 assert_eq!(total_rows, 4);
3229
3230 let max_col = batches[0]
3231 .column(1)
3232 .as_any()
3233 .downcast_ref::<arrow::array::Float64Array>()
3234 .unwrap();
3235 assert!((max_col.value(2) - 15.0).abs() < 0.01);
3237 }
3238
3239 #[tokio::test]
3240 async fn test_frame_rolling_count() {
3241 let db = LaminarDB::open().unwrap();
3242
3243 db.ctx
3244 .sql(
3245 "CREATE TABLE frame_events AS SELECT * FROM (VALUES \
3246 (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')) AS t(id, code)",
3247 )
3248 .await
3249 .unwrap();
3250
3251 let df = db
3252 .ctx
3253 .sql(
3254 "SELECT id, COUNT(*) OVER (ORDER BY id \
3255 ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS cnt \
3256 FROM frame_events ORDER BY id",
3257 )
3258 .await
3259 .unwrap();
3260
3261 let batches = df.collect().await.unwrap();
3262 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
3263 assert_eq!(total_rows, 4);
3264
3265 let cnt_col = batches[0]
3266 .column(1)
3267 .as_any()
3268 .downcast_ref::<arrow::array::Int64Array>()
3269 .unwrap();
3270 assert_eq!(cnt_col.value(0), 1);
3272 assert_eq!(cnt_col.value(1), 2);
3274 assert_eq!(cnt_col.value(2), 2);
3275 }
3276
3277 fn table_test_batch(ids: &[i32], symbols: &[&str]) -> RecordBatch {
3281 use arrow::array::Int32Array;
3282 let schema = Arc::new(Schema::new(vec![
3283 Field::new("id", DataType::Int32, false),
3284 Field::new("symbol", DataType::Utf8, false),
3285 ]));
3286 RecordBatch::try_new(
3287 schema,
3288 vec![
3289 Arc::new(Int32Array::from(ids.to_vec())),
3290 Arc::new(StringArray::from(symbols.to_vec())),
3291 ],
3292 )
3293 .unwrap()
3294 }
3295
3296 fn register_mock_table_source(
3299 db: &LaminarDB,
3300 snapshot_batches: Vec<RecordBatch>,
3301 change_batches: Vec<RecordBatch>,
3302 ) {
3303 use laminar_connectors::config::ConnectorInfo;
3304 use laminar_connectors::reference::MockReferenceTableSource;
3305
3306 let snap = std::sync::Arc::new(parking_lot::Mutex::new(Some(snapshot_batches)));
3307 let chg = std::sync::Arc::new(parking_lot::Mutex::new(Some(change_batches)));
3308 db.connector_registry().register_table_source(
3309 "mock",
3310 ConnectorInfo {
3311 name: "mock".to_string(),
3312 display_name: "Mock Table Source".to_string(),
3313 version: "0.1.0".to_string(),
3314 is_source: true,
3315 is_sink: false,
3316 config_keys: vec![],
3317 },
3318 std::sync::Arc::new(move |_config| {
3319 let s = snap.lock().take().unwrap_or_default();
3320 let c = chg.lock().take().unwrap_or_default();
3321 Ok(Box::new(MockReferenceTableSource::new(s, c)))
3322 }),
3323 );
3324 }
3325
3326 #[tokio::test]
3327 async fn test_table_source_snapshot_populates_table() {
3328 let db = LaminarDB::open().unwrap();
3329 let batch = table_test_batch(&[1, 2], &["AAPL", "GOOG"]);
3330 register_mock_table_source(&db, vec![batch], vec![]);
3331
3332 db.execute("CREATE SOURCE events (symbol VARCHAR, price DOUBLE)")
3333 .await
3334 .unwrap();
3335
3336 db.execute(
3337 "CREATE TABLE instruments (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3338 WITH (connector = 'mock', format = 'json')",
3339 )
3340 .await
3341 .unwrap();
3342
3343 db.start().await.unwrap();
3344
3345 let ts = db.table_store.read();
3347 assert!(ts.is_ready("instruments"));
3348 assert_eq!(ts.table_row_count("instruments"), 2);
3349 }
3350
3351 #[tokio::test]
3352 async fn test_table_source_manual_no_snapshot() {
3353 let db = LaminarDB::open().unwrap();
3354 let batch = table_test_batch(&[1], &["AAPL"]);
3355 register_mock_table_source(&db, vec![batch], vec![]);
3356
3357 db.execute("CREATE SOURCE events (symbol VARCHAR, price DOUBLE)")
3358 .await
3359 .unwrap();
3360
3361 db.execute(
3362 "CREATE TABLE instruments (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3363 WITH (connector = 'mock', format = 'json', refresh = 'manual')",
3364 )
3365 .await
3366 .unwrap();
3367
3368 db.start().await.unwrap();
3369
3370 let ts = db.table_store.read();
3372 assert!(!ts.is_ready("instruments"));
3373 assert_eq!(ts.table_row_count("instruments"), 0);
3374 }
3375
3376 #[tokio::test]
3377 async fn test_table_source_multiple_tables() {
3378 use laminar_connectors::config::ConnectorInfo;
3379 use laminar_connectors::reference::MockReferenceTableSource;
3380
3381 let db = LaminarDB::open().unwrap();
3382
3383 let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
3386 let cc = call_count.clone();
3387 let batch1 = table_test_batch(&[1], &["AAPL"]);
3388 let batch2 = table_test_batch(&[2, 3], &["GOOG", "MSFT"]);
3389 let batches =
3390 std::sync::Arc::new(parking_lot::Mutex::new(vec![vec![batch1], vec![batch2]]));
3391
3392 db.connector_registry().register_table_source(
3393 "mock",
3394 ConnectorInfo {
3395 name: "mock".to_string(),
3396 display_name: "Mock".to_string(),
3397 version: "0.1.0".to_string(),
3398 is_source: true,
3399 is_sink: false,
3400 config_keys: vec![],
3401 },
3402 std::sync::Arc::new(move |_config| {
3403 let idx = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst) as usize;
3404 let mut all = batches.lock();
3405 let snap = if idx < all.len() {
3406 std::mem::take(&mut all[idx])
3407 } else {
3408 vec![]
3409 };
3410 Ok(Box::new(MockReferenceTableSource::new(snap, vec![])))
3411 }),
3412 );
3413
3414 db.execute("CREATE SOURCE events (x INT)").await.unwrap();
3415
3416 db.execute(
3417 "CREATE TABLE t1 (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3418 WITH (connector = 'mock', format = 'json')",
3419 )
3420 .await
3421 .unwrap();
3422
3423 db.execute(
3424 "CREATE TABLE t2 (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3425 WITH (connector = 'mock', format = 'json')",
3426 )
3427 .await
3428 .unwrap();
3429
3430 db.start().await.unwrap();
3431
3432 let ts = db.table_store.read();
3433 let total = ts.table_row_count("t1") + ts.table_row_count("t2");
3435 assert_eq!(total, 3); assert!(ts.is_ready("t1"));
3437 assert!(ts.is_ready("t2"));
3438 }
3439
3440 #[tokio::test]
3441 async fn test_table_create_with_refresh_mode() {
3442 let db = LaminarDB::open().unwrap();
3443
3444 db.execute(
3446 "CREATE TABLE t (id INT PRIMARY KEY, name VARCHAR NOT NULL) \
3447 WITH (connector = 'mock', format = 'json', refresh = 'cdc')",
3448 )
3449 .await
3450 .unwrap();
3451
3452 let mgr = db.connector_manager.lock();
3453 let reg = mgr.tables().get("t").unwrap();
3454 assert_eq!(
3455 reg.refresh,
3456 Some(laminar_connectors::reference::RefreshMode::SnapshotPlusCdc)
3457 );
3458 }
3459
3460 #[tokio::test]
3461 async fn test_table_source_snapshot_only_no_changes() {
3462 let db = LaminarDB::open().unwrap();
3463 let snap = table_test_batch(&[1], &["AAPL"]);
3464 let change = table_test_batch(&[2], &["GOOG"]);
3465 register_mock_table_source(&db, vec![snap], vec![change]);
3466
3467 db.execute("CREATE SOURCE events (symbol VARCHAR, price DOUBLE)")
3468 .await
3469 .unwrap();
3470
3471 db.execute(
3472 "CREATE TABLE instruments (id INT PRIMARY KEY, symbol VARCHAR NOT NULL) \
3473 WITH (connector = 'mock', format = 'json', refresh = 'snapshot_only')",
3474 )
3475 .await
3476 .unwrap();
3477
3478 db.start().await.unwrap();
3479
3480 let mut ts = db.table_store.write();
3482 assert!(ts.is_ready("instruments"));
3483 assert_eq!(ts.table_row_count("instruments"), 1);
3484 assert!(ts.lookup("instruments", "2").is_none());
3486 }
3487
3488 #[tokio::test]
3491 async fn test_create_table_partial_cache_mode() {
3492 let db = LaminarDB::open().unwrap();
3493 db.execute(
3494 "CREATE TABLE large_dim (\
3495 id INT PRIMARY KEY, \
3496 name VARCHAR\
3497 ) WITH (cache_mode = 'partial')",
3498 )
3499 .await
3500 .unwrap();
3501
3502 {
3504 let ts = db.table_store.read();
3505 assert!(ts.has_table("large_dim"));
3506 }
3508
3509 db.execute("INSERT INTO large_dim VALUES (1, 'Alice')")
3511 .await
3512 .unwrap();
3513 db.execute("INSERT INTO large_dim VALUES (2, 'Bob')")
3514 .await
3515 .unwrap();
3516
3517 let ts = db.table_store.read();
3518 assert_eq!(ts.table_row_count("large_dim"), 2);
3519 }
3520
3521 #[tokio::test]
3522 async fn test_create_table_partial_with_max_entries() {
3523 let db = LaminarDB::open().unwrap();
3524 db.execute(
3525 "CREATE TABLE customers (\
3526 id INT PRIMARY KEY, \
3527 name VARCHAR\
3528 ) WITH (cache_mode = 'partial', cache_max_entries = '10000')",
3529 )
3530 .await
3531 .unwrap();
3532
3533 let ts = db.table_store.read();
3534 assert!(ts.has_table("customers"));
3535 let metrics = ts.cache_metrics("customers").unwrap();
3537 assert_eq!(metrics.cache_max_entries, 10000);
3538 }
3539
3540 #[tokio::test]
3541 async fn test_create_table_invalid_cache_max_entries() {
3542 let db = LaminarDB::open().unwrap();
3543 let result = db
3544 .execute(
3545 "CREATE TABLE bad (\
3546 id INT PRIMARY KEY, \
3547 name VARCHAR\
3548 ) WITH (cache_mode = 'partial', cache_max_entries = 'not_a_number')",
3549 )
3550 .await;
3551 assert!(result.is_err());
3552 assert!(result
3553 .unwrap_err()
3554 .to_string()
3555 .contains("cache_max_entries"));
3556 }
3557
3558 #[tokio::test]
3561 async fn test_metrics_initial_state() {
3562 let db = LaminarDB::open().unwrap();
3563 let m = db.metrics();
3564 assert_eq!(m.total_events_ingested, 0);
3565 assert_eq!(m.total_events_emitted, 0);
3566 assert_eq!(m.total_events_dropped, 0);
3567 assert_eq!(m.total_cycles, 0);
3568 assert_eq!(m.total_batches, 0);
3569 assert_eq!(m.state, crate::metrics::PipelineState::Created);
3570 assert_eq!(m.source_count, 0);
3571 assert_eq!(m.stream_count, 0);
3572 assert_eq!(m.sink_count, 0);
3573 }
3574
3575 #[tokio::test]
3576 async fn test_source_metrics_after_push() {
3577 let db = LaminarDB::open().unwrap();
3578 db.execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE)")
3579 .await
3580 .unwrap();
3581
3582 let handle = db.source_untyped("trades").unwrap();
3584 let batch = RecordBatch::try_new(
3585 handle.schema().clone(),
3586 vec![
3587 Arc::new(arrow::array::StringArray::from(vec!["AAPL", "GOOG"])),
3588 Arc::new(arrow::array::Float64Array::from(vec![150.0, 2800.0])),
3589 ],
3590 )
3591 .unwrap();
3592 handle.push_arrow(batch).unwrap();
3593
3594 let sm = db.source_metrics("trades").unwrap();
3595 assert_eq!(sm.name, "trades");
3596 assert_eq!(sm.total_events, 1); assert!(sm.pending > 0);
3598 assert!(sm.capacity > 0);
3599 assert!(sm.utilization > 0.0);
3600 }
3601
3602 #[tokio::test]
3603 async fn test_source_metrics_not_found() {
3604 let db = LaminarDB::open().unwrap();
3605 assert!(db.source_metrics("nonexistent").is_none());
3606 }
3607
3608 #[tokio::test]
3609 async fn test_all_source_metrics() {
3610 let db = LaminarDB::open().unwrap();
3611 db.execute("CREATE SOURCE a (id INT)").await.unwrap();
3612 db.execute("CREATE SOURCE b (id INT)").await.unwrap();
3613
3614 let all = db.all_source_metrics();
3615 assert_eq!(all.len(), 2);
3616 #[allow(clippy::disallowed_types)] let names: std::collections::HashSet<_> = all.iter().map(|m| m.name.clone()).collect();
3618 assert!(names.contains("a"));
3619 assert!(names.contains("b"));
3620 }
3621
3622 #[tokio::test]
3623 async fn test_total_events_processed_zero() {
3624 let db = LaminarDB::open().unwrap();
3625 assert_eq!(db.total_events_processed(), 0);
3626 }
3627
3628 #[tokio::test]
3629 async fn test_pipeline_state_enum_created() {
3630 let db = LaminarDB::open().unwrap();
3631 assert_eq!(
3632 db.pipeline_state_enum(),
3633 crate::metrics::PipelineState::Created
3634 );
3635 }
3636
3637 #[tokio::test]
3638 async fn test_counters_accessible() {
3639 let db = LaminarDB::open().unwrap();
3640 let c = db.counters();
3641 c.events_ingested
3642 .fetch_add(42, std::sync::atomic::Ordering::Relaxed);
3643 let m = db.metrics();
3644 assert_eq!(m.total_events_ingested, 42);
3645 }
3646
3647 #[tokio::test]
3648 async fn test_metrics_counts_after_create() {
3649 let db = LaminarDB::open().unwrap();
3650 db.execute("CREATE SOURCE s1 (id INT)").await.unwrap();
3651 db.execute("CREATE SINK out1 FROM s1").await.unwrap();
3652
3653 let m = db.metrics();
3654 assert_eq!(m.source_count, 1);
3655 assert_eq!(m.sink_count, 1);
3656 }
3657
3658 #[tokio::test]
3659 async fn test_source_handle_capacity() {
3660 let db = LaminarDB::open().unwrap();
3661 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
3662
3663 let handle = db.source_untyped("events").unwrap();
3664 assert!(handle.capacity() >= 1024);
3666 assert!(!handle.is_backpressured());
3667 }
3668
3669 #[tokio::test]
3670 async fn test_stream_metrics_with_sql() {
3671 let db = LaminarDB::open().unwrap();
3672 db.execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, ts BIGINT)")
3673 .await
3674 .unwrap();
3675 db.execute(
3676 "CREATE STREAM avg_price AS \
3677 SELECT symbol, AVG(price) as avg_price \
3678 FROM trades GROUP BY symbol, TUMBLE(ts, INTERVAL '1' MINUTE)",
3679 )
3680 .await
3681 .unwrap();
3682
3683 let sm = db.stream_metrics("avg_price");
3684 assert!(sm.is_some());
3685 let sm = sm.unwrap();
3686 assert_eq!(sm.name, "avg_price");
3687 assert!(sm.sql.is_some());
3688 assert!(sm.sql.as_deref().unwrap().contains("AVG"));
3689 }
3690
3691 #[tokio::test]
3692 async fn test_all_stream_metrics() {
3693 let db = LaminarDB::open().unwrap();
3694 db.execute("CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, ts BIGINT)")
3695 .await
3696 .unwrap();
3697 db.execute(
3698 "CREATE STREAM s1 AS SELECT symbol, AVG(price) as avg_price \
3699 FROM trades GROUP BY symbol, TUMBLE(ts, INTERVAL '1' MINUTE)",
3700 )
3701 .await
3702 .unwrap();
3703
3704 let all = db.all_stream_metrics();
3705 assert_eq!(all.len(), 1);
3706 assert_eq!(all[0].name, "s1");
3707 }
3708
3709 #[tokio::test]
3710 async fn test_stream_metrics_not_found() {
3711 let db = LaminarDB::open().unwrap();
3712 assert!(db.stream_metrics("nonexistent").is_none());
3713 }
3714
3715 fn make_ts_batch(schema: &arrow::datatypes::SchemaRef, timestamps_ms: &[i64]) -> RecordBatch {
3722 let us_values: Vec<i64> = timestamps_ms.iter().map(|ms| ms * 1000).collect();
3723 RecordBatch::try_new(
3724 schema.clone(),
3725 vec![
3726 Arc::new(arrow::array::Int64Array::from(
3727 (1..=i64::try_from(timestamps_ms.len()).expect("len fits i64"))
3728 .collect::<Vec<_>>(),
3729 )),
3730 Arc::new(arrow::array::TimestampMicrosecondArray::from(us_values)),
3731 ],
3732 )
3733 .unwrap()
3734 }
3735
3736 #[tokio::test]
3737 async fn test_watermark_advances_on_push() {
3738 let db = LaminarDB::open().unwrap();
3739 db.execute(
3740 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3741 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3742 )
3743 .await
3744 .unwrap();
3745 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3746 .await
3747 .unwrap();
3748 db.start().await.unwrap();
3749
3750 let handle = db.source_untyped("events").unwrap();
3751 let schema = handle.schema().clone();
3752 let batch = make_ts_batch(&schema, &[1000, 2000, 3000]);
3753 handle.push_arrow(batch).unwrap();
3754
3755 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3757
3758 let wm = handle.current_watermark();
3760 assert_eq!(
3761 wm, 3000,
3762 "watermark should equal max timestamp with 0s delay"
3763 );
3764 }
3765
3766 #[tokio::test]
3767 async fn test_watermark_bounded_delay() {
3768 let db = LaminarDB::open().unwrap();
3769 db.execute(
3770 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3771 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND)",
3772 )
3773 .await
3774 .unwrap();
3775 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3776 .await
3777 .unwrap();
3778 db.start().await.unwrap();
3779
3780 let handle = db.source_untyped("events").unwrap();
3781 let schema = handle.schema().clone();
3782
3783 let batch = make_ts_batch(&schema, &[1000, 800, 1200]);
3785 handle.push_arrow(batch).unwrap();
3786
3787 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3788
3789 let wm = handle.current_watermark();
3791 assert_eq!(wm, 1100, "watermark should be max_ts - delay");
3792 }
3793
3794 #[tokio::test]
3795 async fn test_watermark_no_regression() {
3796 let db = LaminarDB::open().unwrap();
3797 db.execute(
3798 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3799 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3800 )
3801 .await
3802 .unwrap();
3803 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3804 .await
3805 .unwrap();
3806 db.start().await.unwrap();
3807
3808 let handle = db.source_untyped("events").unwrap();
3809 let schema = handle.schema().clone();
3810
3811 let batch1 = make_ts_batch(&schema, &[5000]);
3813 handle.push_arrow(batch1).unwrap();
3814 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3815 let wm1 = handle.current_watermark();
3816
3817 let batch2 = make_ts_batch(&schema, &[1000]);
3819 handle.push_arrow(batch2).unwrap();
3820 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
3821 let wm2 = handle.current_watermark();
3822
3823 assert!(wm2 >= wm1, "watermark must not regress: {wm2} < {wm1}");
3825 assert_eq!(wm1, 5000);
3826 assert_eq!(wm2, 5000);
3827 }
3828
3829 #[tokio::test]
3830 async fn test_source_without_watermark() {
3831 let db = LaminarDB::open().unwrap();
3832 db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
3833 .await
3834 .unwrap();
3835
3836 let handle = db.source_untyped("events").unwrap();
3838 assert_eq!(handle.current_watermark(), i64::MIN);
3839 assert!(handle.max_out_of_orderness().is_none());
3840 }
3841
3842 #[tokio::test]
3843 async fn test_watermark_with_arrow_timestamp_column() {
3844 let db = LaminarDB::open().unwrap();
3845 db.execute(
3846 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3847 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3848 )
3849 .await
3850 .unwrap();
3851 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3852 .await
3853 .unwrap();
3854 db.start().await.unwrap();
3855
3856 let handle = db.source_untyped("events").unwrap();
3857 let schema = handle.schema().clone();
3858
3859 let batch = RecordBatch::try_new(
3861 schema,
3862 vec![
3863 Arc::new(arrow::array::Int64Array::from(vec![1])),
3864 Arc::new(arrow::array::TimestampMicrosecondArray::from(vec![
3865 5_000_000i64,
3866 ])),
3867 ],
3868 )
3869 .unwrap();
3870 handle.push_arrow(batch).unwrap();
3871
3872 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3873 let wm = handle.current_watermark();
3874 assert_eq!(wm, 5000, "watermark should work with Arrow Timestamp type");
3876 }
3877
3878 #[tokio::test]
3879 async fn test_pipeline_watermark_global_min() {
3880 let db = LaminarDB::open().unwrap();
3881 db.execute(
3882 "CREATE SOURCE trades (id BIGINT, ts TIMESTAMP, \
3883 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3884 )
3885 .await
3886 .unwrap();
3887 db.execute(
3888 "CREATE SOURCE orders (id BIGINT, ts TIMESTAMP, \
3889 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3890 )
3891 .await
3892 .unwrap();
3893 db.execute("CREATE STREAM out AS SELECT id, ts FROM trades")
3894 .await
3895 .unwrap();
3896 db.start().await.unwrap();
3897
3898 let trades = db.source_untyped("trades").unwrap();
3899 let orders = db.source_untyped("orders").unwrap();
3900
3901 let batch1 = make_ts_batch(trades.schema(), &[5000]);
3903 trades.push_arrow(batch1).unwrap();
3904
3905 let batch2 = make_ts_batch(orders.schema(), &[2000]);
3907 orders.push_arrow(batch2).unwrap();
3908
3909 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3910
3911 let global = db.pipeline_watermark();
3913 assert_eq!(
3914 global, 2000,
3915 "global watermark should be min of all sources"
3916 );
3917 }
3918
3919 #[tokio::test]
3920 async fn test_pipeline_watermark_in_metrics() {
3921 let db = LaminarDB::open().unwrap();
3922 db.execute(
3923 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3924 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3925 )
3926 .await
3927 .unwrap();
3928 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3929 .await
3930 .unwrap();
3931 db.start().await.unwrap();
3932
3933 let handle = db.source_untyped("events").unwrap();
3934 let batch = make_ts_batch(handle.schema(), &[4000]);
3935 handle.push_arrow(batch).unwrap();
3936
3937 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3938
3939 let m = db.metrics();
3940 assert_eq!(
3941 m.pipeline_watermark,
3942 db.pipeline_watermark(),
3943 "metrics().pipeline_watermark should match pipeline_watermark()"
3944 );
3945 assert_eq!(m.pipeline_watermark, 4000);
3946 }
3947
3948 #[tokio::test]
3949 async fn test_source_handle_max_out_of_orderness() {
3950 let db = LaminarDB::open().unwrap();
3951 db.execute(
3952 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3953 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)",
3954 )
3955 .await
3956 .unwrap();
3957
3958 let handle = db.source_untyped("events").unwrap();
3959 let dur = handle.max_out_of_orderness();
3960 assert_eq!(dur, Some(std::time::Duration::from_secs(5)));
3961 }
3962
3963 #[tokio::test]
3964 async fn test_source_handle_no_watermark() {
3965 let db = LaminarDB::open().unwrap();
3966 db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
3967 .await
3968 .unwrap();
3969
3970 let handle = db.source_untyped("events").unwrap();
3971 assert!(handle.max_out_of_orderness().is_none());
3972 }
3973
3974 #[tokio::test]
3975 async fn test_late_data_dropped_after_external_watermark() {
3976 let db = LaminarDB::open().unwrap();
3982 db.execute(
3983 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, \
3984 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
3985 )
3986 .await
3987 .unwrap();
3988 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
3989 .await
3990 .unwrap();
3991
3992 let sub = db.catalog.get_stream_subscription("out").unwrap();
3993 db.start().await.unwrap();
3994
3995 let handle = db.source_untyped("events").unwrap();
3996 let schema = handle.schema().clone();
3997
3998 let batch1 = make_ts_batch(&schema, &[1000, 2000, 3000]);
4000 handle.push_arrow(batch1).unwrap();
4001 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4002
4003 let mut on_time_rows = 0;
4005 for _ in 0..256 {
4006 match sub.poll() {
4007 Some(b) => on_time_rows += b.num_rows(),
4008 None => break,
4009 }
4010 }
4011 assert!(on_time_rows > 0, "should have on-time rows");
4012
4013 handle.watermark(200_000);
4015 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4017
4018 let late_batch = make_ts_batch(&schema, &[100, 200, 300]);
4020 handle.push_arrow(late_batch).unwrap();
4021 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4022
4023 let mut late_rows = 0;
4025 for _ in 0..256 {
4026 match sub.poll() {
4027 Some(b) => late_rows += b.num_rows(),
4028 None => break,
4029 }
4030 }
4031 assert_eq!(late_rows, 0, "late data behind watermark should be dropped");
4032 }
4033
4034 #[test]
4035 fn test_filter_late_rows_filters_correctly() {
4036 use arrow::array::Int64Array;
4037
4038 let schema = Arc::new(Schema::new(vec![
4040 Field::new("id", DataType::Int64, false),
4041 Field::new("ts", DataType::Int64, false),
4042 ]));
4043 let batch = RecordBatch::try_new(
4044 schema,
4045 vec![
4046 Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
4047 Arc::new(Int64Array::from(vec![100, 500, 200, 800])),
4048 ],
4049 )
4050 .unwrap();
4051
4052 let filtered = filter_late_rows(
4054 &batch,
4055 "ts",
4056 300,
4057 laminar_core::time::TimestampFormat::UnixMillis,
4058 );
4059 let filtered = filtered.expect("should have some on-time rows");
4060 assert_eq!(filtered.num_rows(), 2);
4061
4062 let ids = filtered
4064 .column(0)
4065 .as_any()
4066 .downcast_ref::<Int64Array>()
4067 .unwrap();
4068 assert_eq!(ids.value(0), 2); assert_eq!(ids.value(1), 4); }
4071
4072 #[test]
4073 fn test_filter_late_rows_all_late() {
4074 use arrow::array::Int64Array;
4075
4076 let schema = Arc::new(Schema::new(vec![
4077 Field::new("id", DataType::Int64, false),
4078 Field::new("ts", DataType::Int64, false),
4079 ]));
4080 let batch = RecordBatch::try_new(
4081 schema,
4082 vec![
4083 Arc::new(Int64Array::from(vec![1, 2])),
4084 Arc::new(Int64Array::from(vec![100, 200])),
4085 ],
4086 )
4087 .unwrap();
4088
4089 let result = filter_late_rows(
4091 &batch,
4092 "ts",
4093 1000,
4094 laminar_core::time::TimestampFormat::UnixMillis,
4095 );
4096 assert!(result.is_none(), "all-late batch should return None");
4097 }
4098
4099 #[test]
4100 fn test_filter_late_rows_no_column() {
4101 use arrow::array::Int64Array;
4102
4103 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
4104 let batch =
4105 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2]))]).unwrap();
4106
4107 let result = filter_late_rows(
4109 &batch,
4110 "ts",
4111 1000,
4112 laminar_core::time::TimestampFormat::UnixMillis,
4113 );
4114 let result = result.expect("should pass through when column not found");
4115 assert_eq!(result.num_rows(), 2);
4116 }
4117
4118 fn make_bigint_ts_batch(
4120 schema: &arrow::datatypes::SchemaRef,
4121 timestamps: &[i64],
4122 ) -> RecordBatch {
4123 RecordBatch::try_new(
4124 schema.clone(),
4125 vec![
4126 Arc::new(arrow::array::Int64Array::from(
4127 (1..=i64::try_from(timestamps.len()).expect("len fits i64"))
4128 .collect::<Vec<_>>(),
4129 )),
4130 Arc::new(arrow::array::Int64Array::from(timestamps.to_vec())),
4131 ],
4132 )
4133 .unwrap()
4134 }
4135
4136 #[tokio::test]
4137 async fn test_programmatic_watermark_filters_late_rows() {
4138 let db = LaminarDB::open().unwrap();
4141 db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
4142 .await
4143 .unwrap();
4144 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
4145 .await
4146 .unwrap();
4147
4148 let sub = db.catalog.get_stream_subscription("out").unwrap();
4149
4150 let handle = db.source_untyped("events").unwrap();
4151 handle.set_event_time_column("ts");
4152
4153 db.start().await.unwrap();
4154
4155 let schema = handle.schema().clone();
4156
4157 let batch1 = make_bigint_ts_batch(&schema, &[1000, 2000, 3000]);
4159 handle.push_arrow(batch1).unwrap();
4160 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4161
4162 let mut on_time_rows = 0;
4164 for _ in 0..256 {
4165 match sub.poll() {
4166 Some(b) => on_time_rows += b.num_rows(),
4167 None => break,
4168 }
4169 }
4170 assert!(on_time_rows > 0, "should have on-time rows");
4171
4172 handle.watermark(200_000);
4174 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4175
4176 let late_batch = make_bigint_ts_batch(&schema, &[100, 200, 300]);
4178 handle.push_arrow(late_batch).unwrap();
4179 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4180
4181 let mut late_rows = 0;
4183 for _ in 0..256 {
4184 match sub.poll() {
4185 Some(b) => late_rows += b.num_rows(),
4186 None => break,
4187 }
4188 }
4189 assert_eq!(late_rows, 0, "late data behind watermark should be dropped");
4190 }
4191
4192 #[tokio::test]
4193 async fn test_sql_watermark_for_col_filters_late_rows() {
4194 let db = LaminarDB::open().unwrap();
4196 db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT, WATERMARK FOR ts)")
4197 .await
4198 .unwrap();
4199 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
4200 .await
4201 .unwrap();
4202
4203 let sub = db.catalog.get_stream_subscription("out").unwrap();
4204 db.start().await.unwrap();
4205
4206 let handle = db.source_untyped("events").unwrap();
4207 let schema = handle.schema().clone();
4208
4209 let batch1 = make_bigint_ts_batch(&schema, &[1000, 2000, 3000]);
4211 handle.push_arrow(batch1).unwrap();
4212 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4213
4214 let mut on_time_rows = 0;
4215 for _ in 0..256 {
4216 match sub.poll() {
4217 Some(b) => on_time_rows += b.num_rows(),
4218 None => break,
4219 }
4220 }
4221 assert!(on_time_rows > 0, "should have on-time rows");
4222
4223 handle.watermark(200_000);
4225 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4226
4227 let late_batch = make_bigint_ts_batch(&schema, &[100, 200, 300]);
4229 handle.push_arrow(late_batch).unwrap();
4230 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4231
4232 let mut late_rows = 0;
4233 for _ in 0..256 {
4234 match sub.poll() {
4235 Some(b) => late_rows += b.num_rows(),
4236 None => break,
4237 }
4238 }
4239 assert_eq!(late_rows, 0, "late data behind watermark should be dropped");
4240 }
4241
4242 #[tokio::test]
4243 async fn test_no_watermark_passes_all_data() {
4244 let db = LaminarDB::open().unwrap();
4246 db.execute("CREATE SOURCE events (id BIGINT, ts BIGINT)")
4247 .await
4248 .unwrap();
4249 db.execute("CREATE STREAM out AS SELECT id, ts FROM events")
4250 .await
4251 .unwrap();
4252
4253 let sub = db.catalog.get_stream_subscription("out").unwrap();
4254 db.start().await.unwrap();
4255
4256 let handle = db.source_untyped("events").unwrap();
4257 let schema = handle.schema().clone();
4258
4259 let batch1 = make_bigint_ts_batch(&schema, &[1000, 2000, 3000]);
4261 handle.push_arrow(batch1).unwrap();
4262 handle.watermark(200_000); tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4264
4265 let batch2 = make_bigint_ts_batch(&schema, &[100, 200, 300]);
4266 handle.push_arrow(batch2).unwrap();
4267 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4268
4269 let mut total_rows = 0;
4271 for _ in 0..256 {
4272 match sub.poll() {
4273 Some(b) => total_rows += b.num_rows(),
4274 None => break,
4275 }
4276 }
4277 assert_eq!(
4278 total_rows, 6,
4279 "all data should pass through without watermark config"
4280 );
4281 }
4282
4283 #[tokio::test]
4284 async fn test_select_from_source() {
4285 let db = LaminarDB::open().unwrap();
4286 db.execute("CREATE SOURCE sensors (id BIGINT, temp DOUBLE)")
4287 .await
4288 .unwrap();
4289 db.execute("INSERT INTO sensors VALUES (1, 22.5), (2, 23.1)")
4290 .await
4291 .unwrap();
4292
4293 let result = db.execute("SELECT * FROM sensors").await.unwrap();
4294 match result {
4295 ExecuteResult::Query(mut q) => {
4296 tokio::task::yield_now().await;
4298 let sub = q.subscribe_raw().unwrap();
4299 let mut total_rows = 0;
4300 for _ in 0..256 {
4301 match sub.poll() {
4302 Some(b) => total_rows += b.num_rows(),
4303 None => break,
4304 }
4305 }
4306 assert_eq!(total_rows, 2);
4307 }
4308 _ => panic!("Expected Query result from SELECT on source"),
4309 }
4310 }
4311
4312 #[tokio::test]
4313 async fn test_select_from_dropped_source_fails() {
4314 let db = LaminarDB::open().unwrap();
4315 db.execute("CREATE SOURCE sensors (id BIGINT, temp DOUBLE)")
4316 .await
4317 .unwrap();
4318 db.execute("DROP SOURCE sensors").await.unwrap();
4319
4320 let result = db.execute("SELECT * FROM sensors").await;
4321 assert!(result.is_err(), "SELECT after DROP SOURCE should fail");
4322 }
4323
4324 #[tokio::test]
4325 async fn test_select_from_replaced_source() {
4326 let db = LaminarDB::open().unwrap();
4327 db.execute("CREATE SOURCE sensors (id BIGINT, temp DOUBLE)")
4328 .await
4329 .unwrap();
4330 db.execute("INSERT INTO sensors VALUES (1, 20.0)")
4331 .await
4332 .unwrap();
4333
4334 db.execute("CREATE OR REPLACE SOURCE sensors (id BIGINT, temp DOUBLE)")
4336 .await
4337 .unwrap();
4338 db.execute("INSERT INTO sensors VALUES (2, 30.0)")
4339 .await
4340 .unwrap();
4341
4342 let result = db.execute("SELECT * FROM sensors").await.unwrap();
4343 match result {
4344 ExecuteResult::Query(mut q) => {
4345 tokio::task::yield_now().await;
4346 let sub = q.subscribe_raw().unwrap();
4347 let mut total_rows = 0;
4348 for _ in 0..256 {
4349 match sub.poll() {
4350 Some(b) => total_rows += b.num_rows(),
4351 None => break,
4352 }
4353 }
4354 assert_eq!(
4355 total_rows, 1,
4356 "only the post-replace insert should be visible"
4357 );
4358 }
4359 _ => panic!("Expected Query result"),
4360 }
4361 }
4362
4363 #[tokio::test]
4364 async fn test_mv_registers_stream_in_connector_manager() {
4365 let db = LaminarDB::open().unwrap();
4366 db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
4367 .await
4368 .unwrap();
4369
4370 {
4372 let mgr = db.connector_manager.lock();
4373 assert!(
4374 !mgr.streams().contains_key("event_totals"),
4375 "stream should not exist before MV creation"
4376 );
4377 }
4378
4379 let result = db
4380 .execute("CREATE MATERIALIZED VIEW event_totals AS SELECT * FROM events")
4381 .await;
4382
4383 if result.is_ok() {
4386 let mgr = db.connector_manager.lock();
4387 assert!(
4388 mgr.streams().contains_key("event_totals"),
4389 "MV should be registered as a stream in connector manager"
4390 );
4391 let reg = &mgr.streams()["event_totals"];
4392 assert!(
4393 reg.query_sql.contains("events"),
4394 "stream query should reference the source"
4395 );
4396 }
4397 }
4398
4399 #[tokio::test]
4400 async fn test_drop_mv_unregisters_stream() {
4401 let db = LaminarDB::open().unwrap();
4402 db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
4403 .await
4404 .unwrap();
4405
4406 let result = db
4407 .execute("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM events")
4408 .await;
4409
4410 if result.is_ok() {
4411 {
4413 let mgr = db.connector_manager.lock();
4414 assert!(mgr.streams().contains_key("mv1"));
4415 }
4416
4417 db.execute("DROP MATERIALIZED VIEW mv1").await.unwrap();
4419
4420 {
4422 let mgr = db.connector_manager.lock();
4423 assert!(
4424 !mgr.streams().contains_key("mv1"),
4425 "stream should be unregistered after DROP MV"
4426 );
4427 }
4428 }
4429 }
4430
4431 #[tokio::test]
4432 async fn test_set_session_property() {
4433 let db = LaminarDB::open().unwrap();
4434 db.execute("SET parallelism = 4").await.unwrap();
4435 assert_eq!(
4436 db.get_session_property("parallelism"),
4437 Some("4".to_string())
4438 );
4439 }
4440
4441 #[tokio::test]
4442 async fn test_set_session_property_string_value() {
4443 let db = LaminarDB::open().unwrap();
4444 db.execute("SET state_ttl = '1 hour'").await.unwrap();
4445 assert_eq!(
4446 db.get_session_property("state_ttl"),
4447 Some("1 hour".to_string())
4448 );
4449 }
4450
4451 #[tokio::test]
4452 async fn test_set_session_property_overwrite() {
4453 let db = LaminarDB::open().unwrap();
4454 db.execute("SET batch_size = 100").await.unwrap();
4455 db.execute("SET batch_size = 200").await.unwrap();
4456 assert_eq!(
4457 db.get_session_property("batch_size"),
4458 Some("200".to_string())
4459 );
4460 }
4461
4462 #[tokio::test]
4463 async fn test_get_session_property_not_set() {
4464 let db = LaminarDB::open().unwrap();
4465 assert_eq!(db.get_session_property("nonexistent"), None);
4466 }
4467
4468 #[tokio::test]
4469 async fn test_session_properties_all() {
4470 let db = LaminarDB::open().unwrap();
4471 db.execute("SET parallelism = 4").await.unwrap();
4472 db.execute("SET state_ttl = '1 hour'").await.unwrap();
4473 let props = db.session_properties();
4474 assert_eq!(props.len(), 2);
4475 assert_eq!(props.get("parallelism"), Some(&"4".to_string()));
4476 assert_eq!(props.get("state_ttl"), Some(&"1 hour".to_string()));
4477 }
4478
4479 #[tokio::test]
4480 async fn test_alter_source_add_column() {
4481 let db = LaminarDB::open().unwrap();
4482 db.execute("CREATE SOURCE events (id INT, value DOUBLE)")
4483 .await
4484 .unwrap();
4485
4486 let schema = db.catalog.describe_source("events").unwrap();
4488 assert_eq!(schema.fields().len(), 2);
4489
4490 db.execute("ALTER SOURCE events ADD COLUMN new_col VARCHAR")
4492 .await
4493 .unwrap();
4494
4495 let schema = db.catalog.describe_source("events").unwrap();
4497 assert_eq!(schema.fields().len(), 3);
4498 assert_eq!(schema.field(2).name(), "new_col");
4499 }
4500
4501 #[tokio::test]
4502 async fn test_alter_source_not_found() {
4503 let db = LaminarDB::open().unwrap();
4504 let result = db
4505 .execute("ALTER SOURCE nonexistent ADD COLUMN col INT")
4506 .await;
4507 assert!(result.is_err());
4508 }
4509
4510 #[tokio::test]
4511 async fn test_alter_source_set_properties() {
4512 let db = LaminarDB::open().unwrap();
4513 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
4514 db.execute("ALTER SOURCE events SET ('batch.size' = '1000')")
4515 .await
4516 .unwrap();
4517 assert_eq!(
4518 db.get_session_property("events.batch.size"),
4519 Some("1000".to_string())
4520 );
4521 }
4522
4523 #[test]
4526 fn test_extract_connector_from_with_options_basic() {
4527 let mut opts = HashMap::new();
4528 opts.insert("connector".to_string(), "kafka".to_string());
4529 opts.insert("topic".to_string(), "events".to_string());
4530 opts.insert(
4531 "bootstrap.servers".to_string(),
4532 "localhost:9092".to_string(),
4533 );
4534 opts.insert("format".to_string(), "json".to_string());
4535
4536 let (conn_opts, format, fmt_opts) = extract_connector_from_with_options(&opts);
4537
4538 assert!(!conn_opts.contains_key("connector"));
4540 assert!(!conn_opts.contains_key("format"));
4541 assert_eq!(conn_opts.get("topic"), Some(&"events".to_string()));
4542 assert_eq!(
4543 conn_opts.get("bootstrap.servers"),
4544 Some(&"localhost:9092".to_string())
4545 );
4546 assert_eq!(format, Some("json".to_string()));
4547 assert!(fmt_opts.is_empty());
4548 }
4549
4550 #[test]
4551 fn test_extract_connector_filters_streaming_keys() {
4552 let mut opts = HashMap::new();
4553 opts.insert("connector".to_string(), "websocket".to_string());
4554 opts.insert("url".to_string(), "wss://feed.example.com".to_string());
4555 opts.insert("buffer_size".to_string(), "4096".to_string());
4556 opts.insert("backpressure".to_string(), "block".to_string());
4557 opts.insert("watermark_delay".to_string(), "5s".to_string());
4558
4559 let (conn_opts, _, _) = extract_connector_from_with_options(&opts);
4560
4561 assert!(!conn_opts.contains_key("buffer_size"));
4563 assert!(!conn_opts.contains_key("backpressure"));
4564 assert!(!conn_opts.contains_key("watermark_delay"));
4565 assert_eq!(
4567 conn_opts.get("url"),
4568 Some(&"wss://feed.example.com".to_string())
4569 );
4570 }
4571
4572 #[test]
4573 fn test_extract_connector_format_options() {
4574 let mut opts = HashMap::new();
4575 opts.insert("connector".to_string(), "kafka".to_string());
4576 opts.insert("format".to_string(), "avro".to_string());
4577 opts.insert(
4578 "format.schema.registry.url".to_string(),
4579 "http://localhost:8081".to_string(),
4580 );
4581 opts.insert("topic".to_string(), "events".to_string());
4582
4583 let (conn_opts, format, fmt_opts) = extract_connector_from_with_options(&opts);
4584
4585 assert_eq!(format, Some("avro".to_string()));
4586 assert_eq!(
4587 fmt_opts.get("schema.registry.url"),
4588 Some(&"http://localhost:8081".to_string())
4589 );
4590 assert_eq!(conn_opts.get("topic"), Some(&"events".to_string()));
4591 assert!(!conn_opts.contains_key("format.schema.registry.url"));
4592 }
4593
4594 #[tokio::test]
4595 async fn test_create_source_with_connector_option() {
4596 let db = LaminarDB::open().unwrap();
4602 let result = db
4603 .execute(
4604 "CREATE SOURCE ws_feed (id BIGINT, data TEXT) WITH (
4605 'connector' = 'websocket',
4606 'url' = 'wss://feed.example.com',
4607 'format' = 'json'
4608 )",
4609 )
4610 .await;
4611
4612 if let Err(e) = result {
4616 let msg = e.to_string();
4617 assert!(
4618 msg.contains("Unknown source connector type"),
4619 "Expected connector routing error, got: {msg}"
4620 );
4621 } else {
4622 }
4625 }
4626
4627 #[tokio::test]
4628 async fn test_show_sources_enriched() {
4629 let db = LaminarDB::open().unwrap();
4630 db.execute(
4631 "CREATE SOURCE events (id BIGINT, ts TIMESTAMP, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND)",
4632 )
4633 .await
4634 .unwrap();
4635
4636 let result = db.execute("SHOW SOURCES").await.unwrap();
4637 match result {
4638 ExecuteResult::Metadata(batch) => {
4639 assert_eq!(batch.num_rows(), 1);
4640 assert_eq!(batch.num_columns(), 4);
4641 assert_eq!(batch.schema().field(0).name(), "source_name");
4642 assert_eq!(batch.schema().field(1).name(), "connector");
4643 assert_eq!(batch.schema().field(2).name(), "format");
4644 assert_eq!(batch.schema().field(3).name(), "watermark_column");
4645
4646 let names = batch
4647 .column(0)
4648 .as_any()
4649 .downcast_ref::<StringArray>()
4650 .unwrap();
4651 assert_eq!(names.value(0), "events");
4652
4653 let wm = batch
4654 .column(3)
4655 .as_any()
4656 .downcast_ref::<StringArray>()
4657 .unwrap();
4658 assert_eq!(wm.value(0), "ts");
4659 }
4660 _ => panic!("Expected Metadata result"),
4661 }
4662 }
4663
4664 #[tokio::test]
4665 async fn test_show_sinks_enriched() {
4666 let db = LaminarDB::open().unwrap();
4667 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
4668 db.execute("CREATE SINK output FROM events").await.unwrap();
4669
4670 let result = db.execute("SHOW SINKS").await.unwrap();
4671 match result {
4672 ExecuteResult::Metadata(batch) => {
4673 assert_eq!(batch.num_rows(), 1);
4674 assert_eq!(batch.num_columns(), 4);
4675 assert_eq!(batch.schema().field(0).name(), "sink_name");
4676 assert_eq!(batch.schema().field(1).name(), "input");
4677 assert_eq!(batch.schema().field(2).name(), "connector");
4678 assert_eq!(batch.schema().field(3).name(), "format");
4679
4680 let names = batch
4681 .column(0)
4682 .as_any()
4683 .downcast_ref::<StringArray>()
4684 .unwrap();
4685 assert_eq!(names.value(0), "output");
4686
4687 let inputs = batch
4688 .column(1)
4689 .as_any()
4690 .downcast_ref::<StringArray>()
4691 .unwrap();
4692 assert_eq!(inputs.value(0), "events");
4693 }
4694 _ => panic!("Expected Metadata result"),
4695 }
4696 }
4697
4698 #[tokio::test]
4699 async fn test_show_streams_enriched() {
4700 let db = LaminarDB::open().unwrap();
4701 db.execute("CREATE STREAM my_stream AS SELECT 1 FROM events")
4702 .await
4703 .unwrap();
4704
4705 let result = db.execute("SHOW STREAMS").await.unwrap();
4706 match result {
4707 ExecuteResult::Metadata(batch) => {
4708 assert_eq!(batch.num_rows(), 1);
4709 assert_eq!(batch.num_columns(), 2);
4710 assert_eq!(batch.schema().field(0).name(), "stream_name");
4711 assert_eq!(batch.schema().field(1).name(), "sql");
4712
4713 let sqls = batch
4714 .column(1)
4715 .as_any()
4716 .downcast_ref::<StringArray>()
4717 .unwrap();
4718 assert!(
4719 sqls.value(0).contains("SELECT"),
4720 "SQL column should contain query"
4721 );
4722 }
4723 _ => panic!("Expected Metadata result"),
4724 }
4725 }
4726
4727 #[tokio::test]
4728 async fn test_show_create_source() {
4729 let db = LaminarDB::open().unwrap();
4730 let ddl = "CREATE SOURCE events (id BIGINT, name VARCHAR)";
4731 db.execute(ddl).await.unwrap();
4732
4733 let result = db.execute("SHOW CREATE SOURCE events").await.unwrap();
4734 match result {
4735 ExecuteResult::Metadata(batch) => {
4736 assert_eq!(batch.num_rows(), 1);
4737 assert_eq!(batch.schema().field(0).name(), "create_statement");
4738 let stmts = batch
4739 .column(0)
4740 .as_any()
4741 .downcast_ref::<StringArray>()
4742 .unwrap();
4743 assert_eq!(stmts.value(0), ddl);
4744 }
4745 _ => panic!("Expected Metadata result"),
4746 }
4747 }
4748
4749 #[tokio::test]
4750 async fn test_show_create_sink() {
4751 let db = LaminarDB::open().unwrap();
4752 db.execute("CREATE SOURCE events (id INT)").await.unwrap();
4753 let ddl = "CREATE SINK output FROM events";
4754 db.execute(ddl).await.unwrap();
4755
4756 let result = db.execute("SHOW CREATE SINK output").await.unwrap();
4757 match result {
4758 ExecuteResult::Metadata(batch) => {
4759 assert_eq!(batch.num_rows(), 1);
4760 assert_eq!(batch.schema().field(0).name(), "create_statement");
4761 let stmts = batch
4762 .column(0)
4763 .as_any()
4764 .downcast_ref::<StringArray>()
4765 .unwrap();
4766 assert_eq!(stmts.value(0), ddl);
4767 }
4768 _ => panic!("Expected Metadata result"),
4769 }
4770 }
4771
4772 #[tokio::test]
4773 async fn test_show_create_source_not_found() {
4774 let db = LaminarDB::open().unwrap();
4775 let result = db.execute("SHOW CREATE SOURCE nonexistent").await;
4776 assert!(result.is_err());
4777 }
4778
4779 #[tokio::test]
4780 async fn test_show_create_sink_not_found() {
4781 let db = LaminarDB::open().unwrap();
4782 let result = db.execute("SHOW CREATE SINK nonexistent").await;
4783 assert!(result.is_err());
4784 }
4785
4786 #[tokio::test]
4787 async fn test_explain_analyze_returns_metrics() {
4788 let db = LaminarDB::open().unwrap();
4789 db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
4790 .await
4791 .unwrap();
4792
4793 let result = db
4794 .execute("EXPLAIN ANALYZE SELECT * FROM events")
4795 .await
4796 .unwrap();
4797 match result {
4798 ExecuteResult::Metadata(batch) => {
4799 let keys = batch
4800 .column(0)
4801 .as_any()
4802 .downcast_ref::<StringArray>()
4803 .unwrap();
4804 let key_vals: Vec<&str> = (0..batch.num_rows()).map(|i| keys.value(i)).collect();
4805 assert!(
4806 key_vals.contains(&"rows_produced"),
4807 "Expected rows_produced metric, got: {key_vals:?}"
4808 );
4809 assert!(
4810 key_vals.contains(&"execution_time_ms"),
4811 "Expected execution_time_ms metric, got: {key_vals:?}"
4812 );
4813 }
4814 _ => panic!("Expected Metadata result"),
4815 }
4816 }
4817
4818 #[tokio::test]
4819 async fn test_explain_without_analyze_has_no_metrics() {
4820 let db = LaminarDB::open().unwrap();
4821 db.execute("CREATE SOURCE events (id BIGINT, value DOUBLE)")
4822 .await
4823 .unwrap();
4824
4825 let result = db.execute("EXPLAIN SELECT * FROM events").await.unwrap();
4826 match result {
4827 ExecuteResult::Metadata(batch) => {
4828 let keys = batch
4829 .column(0)
4830 .as_any()
4831 .downcast_ref::<StringArray>()
4832 .unwrap();
4833 let key_vals: Vec<&str> = (0..batch.num_rows()).map(|i| keys.value(i)).collect();
4834 assert!(
4835 !key_vals.contains(&"rows_produced"),
4836 "EXPLAIN without ANALYZE should not have rows_produced"
4837 );
4838 }
4839 _ => panic!("Expected Metadata result"),
4840 }
4841 }
4842
4843 #[tokio::test]
4844 async fn test_connectorless_source_does_not_break_pipeline() {
4845 let db = LaminarDB::open().unwrap();
4846
4847 db.execute("CREATE SOURCE metadata (symbol VARCHAR, category VARCHAR)")
4850 .await
4851 .unwrap();
4852
4853 db.execute(
4855 "CREATE SOURCE trades (id BIGINT, price DOUBLE, ts BIGINT, \
4856 WATERMARK FOR ts AS ts - INTERVAL '0' SECOND)",
4857 )
4858 .await
4859 .unwrap();
4860
4861 db.execute("CREATE STREAM out AS SELECT id, price FROM trades")
4862 .await
4863 .unwrap();
4864
4865 db.start().await.unwrap();
4866
4867 let handle = db.source_untyped("trades").unwrap();
4869 let schema = handle.schema().clone();
4870 let batch = RecordBatch::try_new(
4871 schema,
4872 vec![
4873 Arc::new(arrow::array::Int64Array::from(vec![1, 2])),
4874 Arc::new(arrow::array::Float64Array::from(vec![100.0, 200.0])),
4875 Arc::new(arrow::array::Int64Array::from(vec![1000, 2000])),
4876 ],
4877 )
4878 .unwrap();
4879 handle.push_arrow(batch).unwrap();
4880
4881 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4883
4884 let m = db.metrics();
4886 assert!(m.total_events_ingested > 0, "pipeline should ingest events");
4887
4888 let meta_handle = db.source_untyped("metadata").unwrap();
4891 let meta_schema = meta_handle.schema().clone();
4892 let meta_batch = RecordBatch::try_new(
4893 meta_schema,
4894 vec![
4895 Arc::new(arrow::array::StringArray::from(vec!["BTC", "ETH"])),
4896 Arc::new(arrow::array::StringArray::from(vec!["L1", "L1"])),
4897 ],
4898 )
4899 .unwrap();
4900 meta_handle.push_arrow(meta_batch).unwrap();
4901
4902 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
4903
4904 let m2 = db.metrics();
4906 assert!(
4907 m2.total_events_ingested >= m.total_events_ingested,
4908 "pipeline should continue after connector-less source push"
4909 );
4910 }
4911}