1use std::path::PathBuf;
4use std::sync::Arc;
5
6use tokio::signal;
7use tracing::info;
8
9use laminar_core::streaming::checkpoint::StreamCheckpointConfig;
10use laminar_db::{DbError, EngineMetrics, LaminarDB, Profile};
11
12#[cfg(feature = "cluster-unstable")]
13use crate::cluster_config::{ClusterConfig, ClusterConfigError};
14use crate::config::{
15 ConfigError, LookupConfig, PipelineConfig, ServerConfig, SinkConfig, SourceConfig,
16};
17use crate::http;
18use crate::metrics::ServerMetrics;
19use crate::reload::ReloadGuard;
20#[cfg(all(test, any(feature = "otel", feature = "kafka")))]
21use laminar_core::state::StateBackendConfig;
22
23pub enum ServerHandle {
25 Embedded {
26 db: Arc<LaminarDB>,
27 api_handle: tokio::task::JoinHandle<()>,
28 pgwire_handle: Option<tokio::task::JoinHandle<()>>,
29 watcher_handle: Option<tokio::task::JoinHandle<()>>,
30 },
31 #[cfg(feature = "cluster-unstable")]
32 Cluster(Box<crate::cluster::ClusterHandle>),
33}
34
35impl ServerHandle {
36 pub async fn wait_for_shutdown(self) -> Result<(), ServerError> {
38 match self {
39 Self::Embedded {
40 db,
41 api_handle,
42 pgwire_handle,
43 watcher_handle,
44 } => {
45 wait_for_termination_signal().await?;
46
47 info!("Received shutdown signal, shutting down...");
48
49 if let Some(wh) = &watcher_handle {
50 wh.abort();
51 }
52 if let Some(pg) = &pgwire_handle {
53 pg.abort();
54 }
55 db.shutdown()
56 .await
57 .map_err(|e| ServerError::Shutdown(e.to_string()))?;
58 api_handle.abort();
59
60 info!("Shutdown complete");
61 Ok(())
62 }
63 #[cfg(feature = "cluster-unstable")]
64 Self::Cluster(handle) => (*handle)
65 .wait_for_shutdown()
66 .await
67 .map_err(|e| ServerError::Cluster(e.to_string())),
68 }
69 }
70}
71
72async fn wait_for_termination_signal() -> Result<(), ServerError> {
73 #[cfg(unix)]
74 {
75 use tokio::signal::unix::{signal, SignalKind};
76 let mut sigterm = signal(SignalKind::terminate())
77 .map_err(|e| ServerError::Shutdown(format!("SIGTERM handler failed: {e}")))?;
78 tokio::select! {
79 result = signal::ctrl_c() => {
80 result.map_err(|e| ServerError::Shutdown(format!("SIGINT handler failed: {e}")))?;
81 }
82 _ = sigterm.recv() => {}
83 }
84 Ok(())
85 }
86 #[cfg(not(unix))]
87 {
88 signal::ctrl_c()
89 .await
90 .map_err(|e| ServerError::Shutdown(format!("signal handler failed: {e}")))?;
91 Ok(())
92 }
93}
94
95pub async fn run_server(
97 config: ServerConfig,
98 config_path: PathBuf,
99) -> Result<ServerHandle, ServerError> {
100 #[cfg(feature = "cluster-unstable")]
102 {
103 let cluster_cfg = ClusterConfig::from_server_config(&config)?;
104
105 if let Some(cluster_cfg) = cluster_cfg {
106 let handle = crate::cluster::start_cluster(config, cluster_cfg, config_path)
107 .await
108 .map_err(|e| ServerError::Cluster(e.to_string()))?;
109 return Ok(ServerHandle::Cluster(Box::new(handle)));
110 }
111 }
112 #[cfg(not(feature = "cluster-unstable"))]
113 if config.server.mode == "cluster" {
114 return Err(ServerError::Cluster(
115 "Cluster mode requires the 'cluster-unstable' feature flag. \
116 This mode is not yet production-ready."
117 .to_string(),
118 ));
119 }
120
121 let mut builder = LaminarDB::builder();
123
124 let storage_dir = config.state.local_storage_dir();
125 let has_storage = config.state.is_durable();
126 if let Some(path) = storage_dir {
127 builder = builder.storage_dir(path);
128 }
129
130 let profile = match config.server.mode.as_str() {
131 "embedded" if has_storage => Profile::Embedded,
132 "embedded" => Profile::BareMetal,
133 "cluster" => Profile::Cluster,
134 _ => Profile::BareMetal,
135 };
136 builder = builder.profile(profile);
137 builder = apply_checkpoint_config(builder, &config.checkpoint.url, &config.checkpoint);
138
139 let state_backend = config
143 .state
144 .build()
145 .await
146 .map_err(|e| ServerError::Build(format!("state backend: {e}")))?;
147 let vnode_registry = Arc::new(laminar_core::state::VnodeRegistry::single_owner(
148 config.state.vnode_capacity(),
149 laminar_core::state::NodeId(0),
150 ));
151 builder = builder
152 .state_backend(state_backend)
153 .vnode_registry(vnode_registry);
154
155 if let Some(ai_runtime) = crate::ai::build_ai_runtime(&config)? {
158 builder = builder.ai(ai_runtime);
159 }
160
161 let db = builder
162 .build()
163 .await
164 .map_err(|e| ServerError::Build(e.to_string()))?;
165 let db = Arc::new(db);
166
167 let hostname = gethostname::gethostname().to_string_lossy().into_owned();
169 let pipeline_name = config
170 .pipelines
171 .first()
172 .map_or("default", |p| p.name.as_str())
173 .to_string();
174 let registry = Arc::new(crate::metrics::build_registry([
175 ("instance".into(), hostname),
176 ("pipeline".into(), pipeline_name),
177 ]));
178 let engine_metrics = Arc::new(EngineMetrics::new(®istry));
179 db.set_engine_metrics(Arc::clone(&engine_metrics));
180 db.set_prometheus_registry(Arc::clone(®istry));
181
182 execute_config_ddl(&db, &config).await?;
183
184 db.start()
185 .await
186 .map_err(|e| ServerError::Start(e.to_string()))?;
187 info!("Pipeline started");
188
189 let pgwire_bind = config.server.pgwire_bind.clone();
190 let pgwire_users = config.server.pgwire_users.clone();
191 let pgwire_allow_remote = config.server.pgwire_allow_remote;
192 let pgwire_tls_cert = config.server.pgwire_tls_cert.clone();
193 let pgwire_tls_key = config.server.pgwire_tls_key.clone();
194 let pgwire_tls_client_ca = config.server.pgwire_tls_client_ca.clone();
195 let pgwire_tls_min_version =
196 crate::pgwire::TlsMinVersion::from_config_str(&config.server.pgwire_tls_min_version)
197 .expect("pgwire_tls_min_version validated at config load");
198 let pgwire_max_connections = config.server.pgwire_max_connections;
199 let pgwire_max_auth_failures = config.server.pgwire_max_auth_failures_per_min;
200 let (app_state, api_handle) =
201 start_http_api(Arc::clone(&db), registry, config_path.clone(), config).await?;
202 let watcher_handle = spawn_config_watcher(&app_state, config_path);
203
204 let pgwire_handle = if let Some(bind) = pgwire_bind {
205 let tls = match (&pgwire_tls_cert, &pgwire_tls_key) {
206 (Some(c), Some(k)) => Some(crate::pgwire::TlsPaths {
207 cert: c,
208 key: k,
209 min_version: pgwire_tls_min_version,
210 client_ca: pgwire_tls_client_ca.as_deref(),
211 }),
212 _ => None,
213 };
214 match crate::pgwire::serve(
215 Arc::clone(&db),
216 &bind,
217 pgwire_users,
218 pgwire_allow_remote,
219 tls,
220 pgwire_max_connections,
221 pgwire_max_auth_failures,
222 )
223 .await
224 {
225 Ok((_, h)) => Some(h),
226 Err(e) => {
227 if let Some(wh) = &watcher_handle {
230 wh.abort();
231 }
232 api_handle.abort();
233 let _ = db.shutdown().await;
234 return Err(e);
235 }
236 }
237 } else {
238 None
239 };
240
241 Ok(ServerHandle::Embedded {
242 db,
243 api_handle,
244 pgwire_handle,
245 watcher_handle,
246 })
247}
248
249pub(crate) fn apply_checkpoint_config(
255 mut builder: laminar_db::LaminarDbBuilder,
256 checkpoint_url: &str,
257 checkpoint: &crate::config::CheckpointSection,
258) -> laminar_db::LaminarDbBuilder {
259 let cfg = StreamCheckpointConfig {
260 interval_ms: Some(checkpoint.interval.as_millis() as u64),
261 data_dir: file_url_to_path(checkpoint_url),
262 max_retained: Some(checkpoint.max_retained),
263 alignment_timeout_ms: None,
265 };
266 builder = builder.checkpoint(cfg);
267
268 if !checkpoint_url.starts_with("file:///") && !checkpoint_url.is_empty() {
269 builder = builder.object_store_url(checkpoint_url.to_string());
270 if !checkpoint.storage.is_empty() {
271 builder = builder.object_store_options(checkpoint.storage.clone());
272 }
273 }
274
275 builder
276}
277
278pub(crate) async fn execute_config_ddl(
280 db: &LaminarDB,
281 config: &ServerConfig,
282) -> Result<(), ServerError> {
283 for source in &config.sources {
290 let ddl = source_to_ddl(source);
291 db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
292 section: "source".to_string(),
293 name: source.name.clone(),
294 source: e,
295 })?;
296 info!("Created source: {}", source.name);
297 }
298
299 for lookup in &config.lookups {
300 let ddl = lookup_to_ddl(lookup)?;
301 db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
302 section: "lookup".to_string(),
303 name: lookup.name.clone(),
304 source: e,
305 })?;
306 info!("Created lookup table: {}", lookup.name);
307 }
308
309 for pipeline in &config.pipelines {
310 let ddl = pipeline_to_ddl(pipeline);
311 db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
312 section: "pipeline".to_string(),
313 name: pipeline.name.clone(),
314 source: e,
315 })?;
316 info!("Created pipeline: {}", pipeline.name);
317 }
318
319 for sink in &config.sinks {
320 let ddl = sink_to_ddl(sink);
321 db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
322 section: "sink".to_string(),
323 name: sink.name.clone(),
324 source: e,
325 })?;
326 info!("Created sink: {}", sink.name);
327 }
328
329 if let Some(ref sql) = config.sql {
330 let trimmed = sql.trim();
331 if !trimmed.is_empty() {
332 db.execute(trimmed).await.map_err(|e| {
333 let snippet: String = trimmed.chars().take(80).collect();
334 ServerError::Ddl {
335 section: "sql".to_string(),
336 name: snippet,
337 source: e,
338 }
339 })?;
340 info!("Executed SQL pipeline definition");
341 }
342 }
343
344 Ok(())
345}
346
347pub(crate) async fn start_http_api(
349 db: Arc<LaminarDB>,
350 registry: Arc<prometheus::Registry>,
351 config_path: PathBuf,
352 config: ServerConfig,
353) -> Result<(Arc<http::AppState>, tokio::task::JoinHandle<()>), ServerError> {
354 let bind = config.server.bind.clone();
355
356 let server_metrics = ServerMetrics::new(®istry);
357
358 let app_state = Arc::new(http::AppState {
359 db,
360 config_path,
361 current_config: parking_lot::RwLock::new(config),
362 reload_guard: ReloadGuard::new(),
363 registry,
364 server_metrics,
365 });
366 let router = http::build_router(Arc::clone(&app_state));
367 let api_handle = http::serve(router, &bind).await?;
368 info!("HTTP API listening on {bind}");
369 Ok((app_state, api_handle))
370}
371
372pub(crate) fn spawn_config_watcher(
374 app_state: &Arc<http::AppState>,
375 config_path: PathBuf,
376) -> Option<tokio::task::JoinHandle<()>> {
377 let disabled =
378 std::env::var("LAMINAR_DISABLE_FILE_WATCH").is_ok_and(|v| v == "1" || v == "true");
379 if disabled {
380 info!("Config file watcher disabled via LAMINAR_DISABLE_FILE_WATCH");
381 return None;
382 }
383 let watcher_state = Arc::clone(app_state);
384 let handle = tokio::spawn(async move {
385 crate::watcher::watch_config(
386 config_path,
387 watcher_state,
388 std::time::Duration::from_millis(500),
389 )
390 .await;
391 });
392 info!("Config file watcher started");
393 Some(handle)
394}
395
396fn file_url_to_path(url: &str) -> Option<PathBuf> {
402 if !url.starts_with("file:///") {
403 return None;
404 }
405 let raw = url.strip_prefix("file://").unwrap_or(url);
406 #[cfg(windows)]
407 let raw = {
408 let b = raw.as_bytes();
409 if b.len() >= 3 && b[0] == b'/' && b[1].is_ascii_alphabetic() && b[2] == b':' {
410 &raw[1..]
411 } else {
412 raw
413 }
414 };
415 Some(PathBuf::from(raw))
416}
417
418pub fn source_to_ddl(source: &SourceConfig) -> String {
423 let mut parts = Vec::new();
424 parts.push(format!("CREATE SOURCE {}", source.name));
425
426 let mut col_defs: Vec<String> = source
428 .schema
429 .iter()
430 .map(|c| {
431 let nullability = if c.nullable { "" } else { " NOT NULL" };
432 format!("{} {}{}", c.name, c.data_type, nullability)
433 })
434 .collect();
435
436 if let Some(wm) = &source.watermark {
438 let secs = wm.max_out_of_orderness.as_secs();
439 col_defs.push(format!(
440 "WATERMARK FOR {} AS {} - INTERVAL '{}' SECOND",
441 wm.column, wm.column, secs
442 ));
443 }
444
445 if !col_defs.is_empty() {
446 parts.push(format!("({})", col_defs.join(", ")));
447 }
448
449 let connector_keyword = source.connector.replace('-', "_").to_uppercase();
451 let mut opts = Vec::new();
452 opts.push(format!("format = '{}'", source.format));
453 for (key, value) in &source.properties {
454 if key.contains('.') {
457 opts.push(format!("\"{}\" = '{}'", key, toml_value_to_sql(value)));
458 } else {
459 opts.push(format!("{} = '{}'", key, toml_value_to_sql(value)));
460 }
461 }
462 parts.push(format!("FROM {} ({})", connector_keyword, opts.join(", ")));
463
464 parts.join(" ")
465}
466
467pub fn pipeline_to_ddl(pipeline: &PipelineConfig) -> String {
468 format!("CREATE STREAM {} AS {}", pipeline.name, pipeline.sql.trim())
469}
470
471pub fn sink_to_ddl(sink: &SinkConfig) -> String {
472 let connector_keyword = sink.connector.replace('-', "_").to_uppercase();
473 let mut opts: Vec<String> = sink
474 .properties
475 .iter()
476 .map(|(key, value)| {
477 if key.contains('.') {
478 format!("\"{}\" = '{}'", key, toml_value_to_sql(value))
479 } else {
480 format!("{} = '{}'", key, toml_value_to_sql(value))
481 }
482 })
483 .collect();
484 if sink.delivery != "at_least_once" {
485 opts.push(format!("delivery = '{}'", sink.delivery));
486 }
487
488 if opts.is_empty() {
489 format!(
490 "CREATE SINK {} FROM {} INTO {}",
491 sink.name, sink.pipeline, connector_keyword
492 )
493 } else {
494 format!(
495 "CREATE SINK {} FROM {} INTO {} ({})",
496 sink.name,
497 sink.pipeline,
498 connector_keyword,
499 opts.join(", ")
500 )
501 }
502}
503
504#[allow(clippy::result_large_err)]
505pub fn lookup_to_ddl(lookup: &LookupConfig) -> Result<String, ServerError> {
506 if lookup.schema.is_empty() {
507 return Err(ServerError::Ddl {
508 section: "lookup".to_string(),
509 name: lookup.name.clone(),
510 source: DbError::Config(format!(
511 "[[lookup]] '{}' requires a [[lookup.schema]] section with at least \
512 one column definition",
513 lookup.name,
514 )),
515 });
516 }
517
518 let mut parts = Vec::new();
519 parts.push(format!("CREATE LOOKUP TABLE {}", lookup.name));
520
521 let mut col_defs: Vec<String> = lookup
523 .schema
524 .iter()
525 .map(|c| {
526 let nullability = if c.nullable { "" } else { " NOT NULL" };
527 format!("{} {}{}", c.name, c.data_type, nullability)
528 })
529 .collect();
530 if !lookup.primary_key.is_empty() {
531 col_defs.push(format!("PRIMARY KEY ({})", lookup.primary_key.join(", ")));
532 }
533 parts.push(format!("({})", col_defs.join(", ")));
534
535 let mut opts = Vec::new();
537 opts.push(format!("'connector' = '{}'", lookup.connector));
538 opts.push(format!("'strategy' = '{}'", lookup.strategy));
539 if lookup.cache.size_bytes != 100 * 1024 * 1024 {
540 opts.push(format!("'cache_memory' = '{}'", lookup.cache.size_bytes));
541 }
542 if lookup.cache.ttl.as_secs() != 300 {
543 opts.push(format!("'cache_ttl' = '{}'", lookup.cache.ttl.as_secs()));
544 }
545 for (key, value) in &lookup.properties {
546 opts.push(format!("'{}' = '{}'", key, toml_value_to_sql(value)));
547 }
548 parts.push(format!("WITH ({})", opts.join(", ")));
549
550 Ok(parts.join(" "))
551}
552
553fn toml_value_to_sql(value: &toml::Value) -> String {
556 match value {
557 toml::Value::String(s) => s.replace('\'', "''"),
558 toml::Value::Integer(i) => i.to_string(),
559 toml::Value::Float(f) => f.to_string(),
560 toml::Value::Boolean(b) => b.to_string(),
561 toml::Value::Array(arr) => {
562 let items: Vec<String> = arr.iter().map(toml_value_to_sql).collect();
563 items.join(",")
564 }
565 other => format!("{other}"),
566 }
567}
568
569#[derive(Debug, thiserror::Error)]
570pub enum ServerError {
571 #[error("failed to build LaminarDB: {0}")]
572 Build(String),
573 #[error("failed to execute DDL for {section} '{name}': {source}")]
574 Ddl {
575 section: String,
576 name: String,
577 source: DbError,
578 },
579 #[error("failed to start pipeline: {0}")]
580 Start(String),
581 #[error("HTTP server error: {0}")]
582 Http(String),
583 #[error("shutdown error: {0}")]
584 Shutdown(String),
585 #[error(transparent)]
586 Config(#[from] ConfigError),
587 #[error("cluster mode error: {0}")]
588 Cluster(String),
589 #[cfg(feature = "cluster-unstable")]
590 #[error(transparent)]
591 ClusterConfig(#[from] ClusterConfigError),
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use crate::config::*;
598
599 fn make_source(name: &str, connector: &str) -> SourceConfig {
600 SourceConfig {
601 name: name.to_string(),
602 connector: connector.to_string(),
603 format: "json".to_string(),
604 properties: toml::Table::new(),
605 schema: vec![
606 ColumnDef {
607 name: "id".to_string(),
608 data_type: "BIGINT".to_string(),
609 nullable: false,
610 },
611 ColumnDef {
612 name: "name".to_string(),
613 data_type: "VARCHAR".to_string(),
614 nullable: true,
615 },
616 ],
617 watermark: None,
618 }
619 }
620
621 #[test]
622 fn test_source_to_ddl_basic() {
623 let source = make_source("events", "kafka");
624 let ddl = source_to_ddl(&source);
625 assert!(ddl.starts_with("CREATE SOURCE events"));
626 assert!(ddl.contains("id BIGINT NOT NULL"));
627 assert!(ddl.contains("name VARCHAR"));
628 assert!(ddl.contains("FROM KAFKA"));
629 assert!(ddl.contains("format = 'json'"));
630 }
631
632 #[cfg(feature = "otel")]
636 #[tokio::test]
637 async fn execute_config_ddl_columnless_otel_with_watermark_succeeds() {
638 let mut source = SourceConfig {
639 name: "otel_events".to_string(),
640 connector: "otel".to_string(),
641 format: "json".to_string(),
642 properties: toml::Table::new(),
643 schema: vec![],
644 watermark: Some(WatermarkConfig {
645 column: "_laminar_received_at".to_string(),
646 max_out_of_orderness: std::time::Duration::from_secs(10),
647 }),
648 };
649 source
651 .properties
652 .insert("port".to_string(), toml::Value::String("0".to_string()));
653 source.properties.insert(
654 "signals".to_string(),
655 toml::Value::String("logs".to_string()),
656 );
657
658 let db = laminar_db::LaminarDB::open().unwrap();
659 let config = ServerConfig {
660 server: ServerSection::default(),
661 state: StateBackendConfig::default(),
662 checkpoint: CheckpointSection::default(),
663 sources: vec![source],
664 lookups: vec![],
665 pipelines: vec![],
666 sinks: vec![],
667 sql: None,
668 discovery: None,
669 coordination: None,
670 node_id: None,
671 ai: Default::default(),
672 models: Default::default(),
673 };
674 execute_config_ddl(&db, &config)
675 .await
676 .expect("columnless OTel + WATERMARK FOR should compose");
677 }
678
679 #[cfg(feature = "kafka")]
686 #[tokio::test]
687 async fn execute_config_ddl_columnless_kafka_surfaces_discovery_error() {
688 let mut source = make_source("events", "kafka");
689 source.schema.clear();
690 source.watermark = Some(WatermarkConfig {
691 column: "ts".to_string(),
692 max_out_of_orderness: std::time::Duration::from_secs(5),
693 });
694
695 let db = laminar_db::LaminarDB::open().unwrap();
696 let config = ServerConfig {
697 server: ServerSection::default(),
698 state: StateBackendConfig::default(),
699 checkpoint: CheckpointSection::default(),
700 sources: vec![source],
701 lookups: vec![],
702 pipelines: vec![],
703 sinks: vec![],
704 sql: None,
705 discovery: None,
706 coordination: None,
707 node_id: None,
708 ai: Default::default(),
709 models: Default::default(),
710 };
711 let err = execute_config_ddl(&db, &config).await.unwrap_err();
712 let msg = err.to_string();
713 assert!(
714 msg.contains("schema auto-discovery failed")
715 || msg.contains("could not auto-discover a schema")
716 || msg.contains("no columns declared"),
717 "expected schema-discovery error from the DDL layer, got: {msg}"
718 );
719 }
720
721 #[test]
722 fn test_source_to_ddl_with_watermark() {
723 let mut source = make_source("events", "kafka");
724 source.watermark = Some(WatermarkConfig {
725 column: "ts".to_string(),
726 max_out_of_orderness: std::time::Duration::from_secs(5),
727 });
728 let ddl = source_to_ddl(&source);
729 assert!(ddl.contains("WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"));
730 }
731
732 #[test]
733 fn test_source_to_ddl_with_properties() {
734 let mut source = make_source("events", "kafka");
735 source.properties.insert(
736 "brokers".to_string(),
737 toml::Value::String("localhost:9092".to_string()),
738 );
739 source.properties.insert(
740 "topic".to_string(),
741 toml::Value::String("events".to_string()),
742 );
743 let ddl = source_to_ddl(&source);
744 assert!(ddl.contains("brokers = 'localhost:9092'"));
745 assert!(ddl.contains("topic = 'events'"));
746 }
747
748 #[test]
749 fn test_pipeline_to_ddl() {
750 let pipeline = PipelineConfig {
751 name: "vwap".to_string(),
752 sql: "SELECT symbol, SUM(price) FROM trades GROUP BY symbol".to_string(),
753 };
754 let ddl = pipeline_to_ddl(&pipeline);
755 assert_eq!(
756 ddl,
757 "CREATE STREAM vwap AS SELECT symbol, SUM(price) FROM trades GROUP BY symbol"
758 );
759 }
760
761 #[test]
762 fn test_sink_to_ddl() {
763 let mut props = toml::Table::new();
764 props.insert(
765 "topic".to_string(),
766 toml::Value::String("output".to_string()),
767 );
768 props.insert(
769 "brokers".to_string(),
770 toml::Value::String("localhost:9092".to_string()),
771 );
772 let sink = SinkConfig {
773 name: "output_sink".to_string(),
774 pipeline: "vwap".to_string(),
775 connector: "kafka".to_string(),
776 delivery: "at_least_once".to_string(),
777 properties: props,
778 };
779 let ddl = sink_to_ddl(&sink);
780 assert!(ddl.starts_with("CREATE SINK output_sink FROM vwap INTO KAFKA"));
781 assert!(ddl.contains("topic = 'output'"));
782 assert!(ddl.contains("brokers = 'localhost:9092'"));
783 assert!(!ddl.contains("delivery"));
785 }
786
787 #[test]
788 fn test_sink_to_ddl_exactly_once() {
789 let sink = SinkConfig {
790 name: "out".to_string(),
791 pipeline: "p".to_string(),
792 connector: "kafka".to_string(),
793 delivery: "exactly_once".to_string(),
794 properties: toml::Table::new(),
795 };
796 let ddl = sink_to_ddl(&sink);
797 assert!(ddl.contains("delivery = 'exactly_once'"));
798 }
799
800 #[test]
801 fn test_lookup_to_ddl() {
802 let lookup = LookupConfig {
803 name: "instruments".to_string(),
804 connector: "postgres".to_string(),
805 strategy: "poll".to_string(),
806 cache: LookupCacheConfig::default(),
807 properties: {
808 let mut t = toml::Table::new();
809 t.insert(
810 "connection".to_string(),
811 toml::Value::String("postgresql://localhost/db".to_string()),
812 );
813 t
814 },
815 primary_key: vec!["symbol".to_string()],
816 schema: vec![ColumnDef {
817 name: "symbol".to_string(),
818 data_type: "VARCHAR".to_string(),
819 nullable: false,
820 }],
821 };
822 let ddl = lookup_to_ddl(&lookup).unwrap();
823 assert!(ddl.starts_with("CREATE LOOKUP TABLE instruments"));
824 assert!(ddl.contains("symbol VARCHAR NOT NULL"));
825 assert!(ddl.contains("PRIMARY KEY (symbol)"));
826 assert!(ddl.contains("'connector' = 'postgres'"));
827 assert!(ddl.contains("'strategy' = 'poll'"));
828 assert!(ddl.contains("'connection' = 'postgresql://localhost/db'"));
829 }
830
831 #[test]
832 fn test_lookup_to_ddl_no_primary_key() {
833 let lookup = LookupConfig {
834 name: "t".to_string(),
835 connector: "postgres".to_string(),
836 strategy: "poll".to_string(),
837 cache: LookupCacheConfig::default(),
838 properties: toml::Table::new(),
839 primary_key: vec![],
840 schema: vec![ColumnDef {
841 name: "id".to_string(),
842 data_type: "INT".to_string(),
843 nullable: false,
844 }],
845 };
846 let ddl = lookup_to_ddl(&lookup).unwrap();
847 assert!(!ddl.contains("PRIMARY KEY"));
848 }
849
850 #[test]
851 fn test_lookup_to_ddl_empty_schema_rejected() {
852 let lookup = LookupConfig {
853 name: "bad".to_string(),
854 connector: "postgres".to_string(),
855 strategy: "poll".to_string(),
856 cache: LookupCacheConfig::default(),
857 properties: toml::Table::new(),
858 primary_key: vec![],
859 schema: vec![],
860 };
861 assert!(lookup_to_ddl(&lookup).is_err());
862 }
863
864 #[test]
865 fn test_toml_value_to_sql() {
866 assert_eq!(
867 toml_value_to_sql(&toml::Value::String("hello".to_string())),
868 "hello"
869 );
870 assert_eq!(toml_value_to_sql(&toml::Value::Integer(42)), "42");
871 assert_eq!(toml_value_to_sql(&toml::Value::Boolean(true)), "true");
872 assert_eq!(toml_value_to_sql(&toml::Value::Float(3.25)), "3.25");
873 }
874
875 #[test]
876 fn test_toml_value_to_sql_escapes_single_quotes() {
877 assert_eq!(
878 toml_value_to_sql(&toml::Value::String("it's a test".to_string())),
879 "it''s a test"
880 );
881 assert_eq!(
882 toml_value_to_sql(&toml::Value::String("a''b".to_string())),
883 "a''''b"
884 );
885 }
886}