Skip to main content

laminardb/
server.rs

1//! Engine construction and lifecycle for LaminarDB server.
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use tokio::signal;
7use tracing::info;
8
9use laminar_core::streaming::checkpoint::StreamCheckpointConfig;
10use laminar_db::{DbError, EngineMetrics, LaminarDB, Profile};
11
12#[cfg(feature = "cluster-unstable")]
13use crate::cluster_config::{ClusterConfig, ClusterConfigError};
14use crate::config::{
15    ConfigError, LookupConfig, PipelineConfig, ServerConfig, SinkConfig, SourceConfig,
16};
17use crate::http;
18use crate::metrics::ServerMetrics;
19use crate::reload::ReloadGuard;
20#[cfg(all(test, any(feature = "otel", feature = "kafka")))]
21use laminar_core::state::StateBackendConfig;
22
23/// Handle to a running LaminarDB server. Call `wait_for_shutdown` to block until Ctrl-C.
24pub enum ServerHandle {
25    Embedded {
26        db: Arc<LaminarDB>,
27        api_handle: tokio::task::JoinHandle<()>,
28        pgwire_handle: Option<tokio::task::JoinHandle<()>>,
29        watcher_handle: Option<tokio::task::JoinHandle<()>>,
30    },
31    #[cfg(feature = "cluster-unstable")]
32    Cluster(Box<crate::cluster::ClusterHandle>),
33}
34
35impl ServerHandle {
36    /// Block until SIGINT/SIGTERM, then gracefully shut down.
37    pub async fn wait_for_shutdown(self) -> Result<(), ServerError> {
38        match self {
39            Self::Embedded {
40                db,
41                api_handle,
42                pgwire_handle,
43                watcher_handle,
44            } => {
45                wait_for_termination_signal().await?;
46
47                info!("Received shutdown signal, shutting down...");
48
49                if let Some(wh) = &watcher_handle {
50                    wh.abort();
51                }
52                if let Some(pg) = &pgwire_handle {
53                    pg.abort();
54                }
55                db.shutdown()
56                    .await
57                    .map_err(|e| ServerError::Shutdown(e.to_string()))?;
58                api_handle.abort();
59
60                info!("Shutdown complete");
61                Ok(())
62            }
63            #[cfg(feature = "cluster-unstable")]
64            Self::Cluster(handle) => (*handle)
65                .wait_for_shutdown()
66                .await
67                .map_err(|e| ServerError::Cluster(e.to_string())),
68        }
69    }
70}
71
72async fn wait_for_termination_signal() -> Result<(), ServerError> {
73    #[cfg(unix)]
74    {
75        use tokio::signal::unix::{signal, SignalKind};
76        let mut sigterm = signal(SignalKind::terminate())
77            .map_err(|e| ServerError::Shutdown(format!("SIGTERM handler failed: {e}")))?;
78        tokio::select! {
79            result = signal::ctrl_c() => {
80                result.map_err(|e| ServerError::Shutdown(format!("SIGINT handler failed: {e}")))?;
81            }
82            _ = sigterm.recv() => {}
83        }
84        Ok(())
85    }
86    #[cfg(not(unix))]
87    {
88        signal::ctrl_c()
89            .await
90            .map_err(|e| ServerError::Shutdown(format!("signal handler failed: {e}")))?;
91        Ok(())
92    }
93}
94
95/// Build and start a LaminarDB server from the given configuration.
96pub async fn run_server(
97    config: ServerConfig,
98    config_path: PathBuf,
99) -> Result<ServerHandle, ServerError> {
100    // Cluster mode: gated behind the `cluster-unstable` feature flag.
101    #[cfg(feature = "cluster-unstable")]
102    {
103        let cluster_cfg = ClusterConfig::from_server_config(&config)?;
104
105        if let Some(cluster_cfg) = cluster_cfg {
106            let handle = crate::cluster::start_cluster(config, cluster_cfg, config_path)
107                .await
108                .map_err(|e| ServerError::Cluster(e.to_string()))?;
109            return Ok(ServerHandle::Cluster(Box::new(handle)));
110        }
111    }
112    #[cfg(not(feature = "cluster-unstable"))]
113    if config.server.mode == "cluster" {
114        return Err(ServerError::Cluster(
115            "Cluster mode requires the 'cluster-unstable' feature flag. \
116             This mode is not yet production-ready."
117                .to_string(),
118        ));
119    }
120
121    // Build LaminarDB via builder API
122    let mut builder = LaminarDB::builder();
123
124    let storage_dir = config.state.local_storage_dir();
125    let has_storage = config.state.is_durable();
126    if let Some(path) = storage_dir {
127        builder = builder.storage_dir(path);
128    }
129
130    let profile = match config.server.mode.as_str() {
131        "embedded" if has_storage => Profile::Embedded,
132        "embedded" => Profile::BareMetal,
133        "cluster" => Profile::Cluster,
134        _ => Profile::BareMetal,
135    };
136    builder = builder.profile(profile);
137    builder = apply_checkpoint_config(builder, &config.checkpoint.url, &config.checkpoint);
138
139    // Build the state backend + single-owner vnode registry from config so
140    // the checkpoint coordinator's durability gate runs with real markers.
141    // Cluster mode overrides the owner id after its controller is built.
142    let state_backend = config
143        .state
144        .build()
145        .await
146        .map_err(|e| ServerError::Build(format!("state backend: {e}")))?;
147    let vnode_registry = Arc::new(laminar_core::state::VnodeRegistry::single_owner(
148        config.state.vnode_capacity(),
149        laminar_core::state::NodeId(0),
150    ));
151    builder = builder
152        .state_backend(state_backend)
153        .vnode_registry(vnode_registry);
154
155    // Build the AI subsystem from `[ai]`/`[models]` and install it. Without
156    // configured models this is a no-op and `ai_*` functions fail at plan time.
157    if let Some(ai_runtime) = crate::ai::build_ai_runtime(&config)? {
158        builder = builder.ai(ai_runtime);
159    }
160
161    let db = builder
162        .build()
163        .await
164        .map_err(|e| ServerError::Build(e.to_string()))?;
165    let db = Arc::new(db);
166
167    // Prometheus registry — must be set before start().
168    let hostname = gethostname::gethostname().to_string_lossy().into_owned();
169    let pipeline_name = config
170        .pipelines
171        .first()
172        .map_or("default", |p| p.name.as_str())
173        .to_string();
174    let registry = Arc::new(crate::metrics::build_registry([
175        ("instance".into(), hostname),
176        ("pipeline".into(), pipeline_name),
177    ]));
178    let engine_metrics = Arc::new(EngineMetrics::new(&registry));
179    db.set_engine_metrics(Arc::clone(&engine_metrics));
180    db.set_prometheus_registry(Arc::clone(&registry));
181
182    execute_config_ddl(&db, &config).await?;
183
184    db.start()
185        .await
186        .map_err(|e| ServerError::Start(e.to_string()))?;
187    info!("Pipeline started");
188
189    let pgwire_bind = config.server.pgwire_bind.clone();
190    let pgwire_users = config.server.pgwire_users.clone();
191    let pgwire_allow_remote = config.server.pgwire_allow_remote;
192    let pgwire_tls_cert = config.server.pgwire_tls_cert.clone();
193    let pgwire_tls_key = config.server.pgwire_tls_key.clone();
194    let pgwire_tls_client_ca = config.server.pgwire_tls_client_ca.clone();
195    let pgwire_tls_min_version =
196        crate::pgwire::TlsMinVersion::from_config_str(&config.server.pgwire_tls_min_version)
197            .expect("pgwire_tls_min_version validated at config load");
198    let pgwire_max_connections = config.server.pgwire_max_connections;
199    let pgwire_max_auth_failures = config.server.pgwire_max_auth_failures_per_min;
200    let (app_state, api_handle) =
201        start_http_api(Arc::clone(&db), registry, config_path.clone(), config).await?;
202    let watcher_handle = spawn_config_watcher(&app_state, config_path);
203
204    let pgwire_handle = if let Some(bind) = pgwire_bind {
205        let tls = match (&pgwire_tls_cert, &pgwire_tls_key) {
206            (Some(c), Some(k)) => Some(crate::pgwire::TlsPaths {
207                cert: c,
208                key: k,
209                min_version: pgwire_tls_min_version,
210                client_ca: pgwire_tls_client_ca.as_deref(),
211            }),
212            _ => None,
213        };
214        match crate::pgwire::serve(
215            Arc::clone(&db),
216            &bind,
217            pgwire_users,
218            pgwire_allow_remote,
219            tls,
220            pgwire_max_connections,
221            pgwire_max_auth_failures,
222        )
223        .await
224        {
225            Ok((_, h)) => Some(h),
226            Err(e) => {
227                // Roll back: stop the HTTP server, the file watcher, and the
228                // pipeline before propagating the bind failure.
229                if let Some(wh) = &watcher_handle {
230                    wh.abort();
231                }
232                api_handle.abort();
233                let _ = db.shutdown().await;
234                return Err(e);
235            }
236        }
237    } else {
238        None
239    };
240
241    Ok(ServerHandle::Embedded {
242        db,
243        api_handle,
244        pgwire_handle,
245        watcher_handle,
246    })
247}
248
249// ---------------------------------------------------------------------------
250// Shared helpers (used by both embedded and cluster startup)
251// ---------------------------------------------------------------------------
252
253/// Apply checkpoint settings to a `LaminarDB` builder.
254pub(crate) fn apply_checkpoint_config(
255    mut builder: laminar_db::LaminarDbBuilder,
256    checkpoint_url: &str,
257    checkpoint: &crate::config::CheckpointSection,
258) -> laminar_db::LaminarDbBuilder {
259    let cfg = StreamCheckpointConfig {
260        interval_ms: Some(checkpoint.interval.as_millis() as u64),
261        data_dir: file_url_to_path(checkpoint_url),
262        max_retained: Some(checkpoint.max_retained),
263        // Not yet exposed in the server TOML; keeps the 30s default.
264        alignment_timeout_ms: None,
265    };
266    builder = builder.checkpoint(cfg);
267
268    if !checkpoint_url.starts_with("file:///") && !checkpoint_url.is_empty() {
269        builder = builder.object_store_url(checkpoint_url.to_string());
270        if !checkpoint.storage.is_empty() {
271            builder = builder.object_store_options(checkpoint.storage.clone());
272        }
273    }
274
275    builder
276}
277
278/// Execute DDL for all config sections (sources, lookups, pipelines, sinks, raw SQL).
279pub(crate) async fn execute_config_ddl(
280    db: &LaminarDB,
281    config: &ServerConfig,
282) -> Result<(), ServerError> {
283    // Empty-schema + WATERMARK FOR is handled inside the laminar-db DDL
284    // layer: it calls `discover_schema` on the connector, populates the
285    // source columns from the result, then validates the watermark column
286    // against them. Connectors that cannot discover a schema (e.g. Kafka
287    // without a reachable Schema Registry) return a clear error from that
288    // path — we don't need to pre-empt it here.
289    for source in &config.sources {
290        let ddl = source_to_ddl(source);
291        db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
292            section: "source".to_string(),
293            name: source.name.clone(),
294            source: e,
295        })?;
296        info!("Created source: {}", source.name);
297    }
298
299    for lookup in &config.lookups {
300        let ddl = lookup_to_ddl(lookup)?;
301        db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
302            section: "lookup".to_string(),
303            name: lookup.name.clone(),
304            source: e,
305        })?;
306        info!("Created lookup table: {}", lookup.name);
307    }
308
309    for pipeline in &config.pipelines {
310        let ddl = pipeline_to_ddl(pipeline);
311        db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
312            section: "pipeline".to_string(),
313            name: pipeline.name.clone(),
314            source: e,
315        })?;
316        info!("Created pipeline: {}", pipeline.name);
317    }
318
319    for sink in &config.sinks {
320        let ddl = sink_to_ddl(sink);
321        db.execute(&ddl).await.map_err(|e| ServerError::Ddl {
322            section: "sink".to_string(),
323            name: sink.name.clone(),
324            source: e,
325        })?;
326        info!("Created sink: {}", sink.name);
327    }
328
329    if let Some(ref sql) = config.sql {
330        let trimmed = sql.trim();
331        if !trimmed.is_empty() {
332            db.execute(trimmed).await.map_err(|e| {
333                let snippet: String = trimmed.chars().take(80).collect();
334                ServerError::Ddl {
335                    section: "sql".to_string(),
336                    name: snippet,
337                    source: e,
338                }
339            })?;
340            info!("Executed SQL pipeline definition");
341        }
342    }
343
344    Ok(())
345}
346
347/// Start HTTP API server and return (shared state, join handle).
348pub(crate) async fn start_http_api(
349    db: Arc<LaminarDB>,
350    registry: Arc<prometheus::Registry>,
351    config_path: PathBuf,
352    config: ServerConfig,
353) -> Result<(Arc<http::AppState>, tokio::task::JoinHandle<()>), ServerError> {
354    let bind = config.server.bind.clone();
355
356    let server_metrics = ServerMetrics::new(&registry);
357
358    let app_state = Arc::new(http::AppState {
359        db,
360        config_path,
361        current_config: parking_lot::RwLock::new(config),
362        reload_guard: ReloadGuard::new(),
363        registry,
364        server_metrics,
365    });
366    let router = http::build_router(Arc::clone(&app_state));
367    let api_handle = http::serve(router, &bind).await?;
368    info!("HTTP API listening on {bind}");
369    Ok((app_state, api_handle))
370}
371
372/// Spawn config file watcher unless disabled via `LAMINAR_DISABLE_FILE_WATCH=1`.
373pub(crate) fn spawn_config_watcher(
374    app_state: &Arc<http::AppState>,
375    config_path: PathBuf,
376) -> Option<tokio::task::JoinHandle<()>> {
377    let disabled =
378        std::env::var("LAMINAR_DISABLE_FILE_WATCH").is_ok_and(|v| v == "1" || v == "true");
379    if disabled {
380        info!("Config file watcher disabled via LAMINAR_DISABLE_FILE_WATCH");
381        return None;
382    }
383    let watcher_state = Arc::clone(app_state);
384    let handle = tokio::spawn(async move {
385        crate::watcher::watch_config(
386            config_path,
387            watcher_state,
388            std::time::Duration::from_millis(500),
389        )
390        .await;
391    });
392    info!("Config file watcher started");
393    Some(handle)
394}
395
396// ---------------------------------------------------------------------------
397// Helpers
398// ---------------------------------------------------------------------------
399
400/// Extract a local filesystem path from a `file://` URL, or `None` for cloud URLs.
401fn file_url_to_path(url: &str) -> Option<PathBuf> {
402    if !url.starts_with("file:///") {
403        return None;
404    }
405    let raw = url.strip_prefix("file://").unwrap_or(url);
406    #[cfg(windows)]
407    let raw = {
408        let b = raw.as_bytes();
409        if b.len() >= 3 && b[0] == b'/' && b[1].is_ascii_alphabetic() && b[2] == b':' {
410            &raw[1..]
411        } else {
412            raw
413        }
414    };
415    Some(PathBuf::from(raw))
416}
417
418// ---------------------------------------------------------------------------
419// DDL generation
420// ---------------------------------------------------------------------------
421
422pub fn source_to_ddl(source: &SourceConfig) -> String {
423    let mut parts = Vec::new();
424    parts.push(format!("CREATE SOURCE {}", source.name));
425
426    // Column definitions
427    let mut col_defs: Vec<String> = source
428        .schema
429        .iter()
430        .map(|c| {
431            let nullability = if c.nullable { "" } else { " NOT NULL" };
432            format!("{} {}{}", c.name, c.data_type, nullability)
433        })
434        .collect();
435
436    // Watermark clause
437    if let Some(wm) = &source.watermark {
438        let secs = wm.max_out_of_orderness.as_secs();
439        col_defs.push(format!(
440            "WATERMARK FOR {} AS {} - INTERVAL '{}' SECOND",
441            wm.column, wm.column, secs
442        ));
443    }
444
445    if !col_defs.is_empty() {
446        parts.push(format!("({})", col_defs.join(", ")));
447    }
448
449    // FROM CONNECTOR (...) clause
450    let connector_keyword = source.connector.replace('-', "_").to_uppercase();
451    let mut opts = Vec::new();
452    opts.push(format!("format = '{}'", source.format));
453    for (key, value) in &source.properties {
454        // Quote keys that contain dots (e.g. kafka.session.timeout.ms)
455        // to prevent SQL parser errors with dotted identifiers.
456        if key.contains('.') {
457            opts.push(format!("\"{}\" = '{}'", key, toml_value_to_sql(value)));
458        } else {
459            opts.push(format!("{} = '{}'", key, toml_value_to_sql(value)));
460        }
461    }
462    parts.push(format!("FROM {} ({})", connector_keyword, opts.join(", ")));
463
464    parts.join(" ")
465}
466
467pub fn pipeline_to_ddl(pipeline: &PipelineConfig) -> String {
468    format!("CREATE STREAM {} AS {}", pipeline.name, pipeline.sql.trim())
469}
470
471pub fn sink_to_ddl(sink: &SinkConfig) -> String {
472    let connector_keyword = sink.connector.replace('-', "_").to_uppercase();
473    let mut opts: Vec<String> = sink
474        .properties
475        .iter()
476        .map(|(key, value)| {
477            if key.contains('.') {
478                format!("\"{}\" = '{}'", key, toml_value_to_sql(value))
479            } else {
480                format!("{} = '{}'", key, toml_value_to_sql(value))
481            }
482        })
483        .collect();
484    if sink.delivery != "at_least_once" {
485        opts.push(format!("delivery = '{}'", sink.delivery));
486    }
487
488    if opts.is_empty() {
489        format!(
490            "CREATE SINK {} FROM {} INTO {}",
491            sink.name, sink.pipeline, connector_keyword
492        )
493    } else {
494        format!(
495            "CREATE SINK {} FROM {} INTO {} ({})",
496            sink.name,
497            sink.pipeline,
498            connector_keyword,
499            opts.join(", ")
500        )
501    }
502}
503
504#[allow(clippy::result_large_err)]
505pub fn lookup_to_ddl(lookup: &LookupConfig) -> Result<String, ServerError> {
506    if lookup.schema.is_empty() {
507        return Err(ServerError::Ddl {
508            section: "lookup".to_string(),
509            name: lookup.name.clone(),
510            source: DbError::Config(format!(
511                "[[lookup]] '{}' requires a [[lookup.schema]] section with at least \
512                 one column definition",
513                lookup.name,
514            )),
515        });
516    }
517
518    let mut parts = Vec::new();
519    parts.push(format!("CREATE LOOKUP TABLE {}", lookup.name));
520
521    // Column definitions + PRIMARY KEY
522    let mut col_defs: Vec<String> = lookup
523        .schema
524        .iter()
525        .map(|c| {
526            let nullability = if c.nullable { "" } else { " NOT NULL" };
527            format!("{} {}{}", c.name, c.data_type, nullability)
528        })
529        .collect();
530    if !lookup.primary_key.is_empty() {
531        col_defs.push(format!("PRIMARY KEY ({})", lookup.primary_key.join(", ")));
532    }
533    parts.push(format!("({})", col_defs.join(", ")));
534
535    // WITH clause
536    let mut opts = Vec::new();
537    opts.push(format!("'connector' = '{}'", lookup.connector));
538    opts.push(format!("'strategy' = '{}'", lookup.strategy));
539    if lookup.cache.size_bytes != 100 * 1024 * 1024 {
540        opts.push(format!("'cache_memory' = '{}'", lookup.cache.size_bytes));
541    }
542    if lookup.cache.ttl.as_secs() != 300 {
543        opts.push(format!("'cache_ttl' = '{}'", lookup.cache.ttl.as_secs()));
544    }
545    for (key, value) in &lookup.properties {
546        opts.push(format!("'{}' = '{}'", key, toml_value_to_sql(value)));
547    }
548    parts.push(format!("WITH ({})", opts.join(", ")));
549
550    Ok(parts.join(" "))
551}
552
553/// Convert a TOML value to a SQL string literal value.
554/// Escapes single quotes (SQL standard: ' → '').
555fn toml_value_to_sql(value: &toml::Value) -> String {
556    match value {
557        toml::Value::String(s) => s.replace('\'', "''"),
558        toml::Value::Integer(i) => i.to_string(),
559        toml::Value::Float(f) => f.to_string(),
560        toml::Value::Boolean(b) => b.to_string(),
561        toml::Value::Array(arr) => {
562            let items: Vec<String> = arr.iter().map(toml_value_to_sql).collect();
563            items.join(",")
564        }
565        other => format!("{other}"),
566    }
567}
568
569#[derive(Debug, thiserror::Error)]
570pub enum ServerError {
571    #[error("failed to build LaminarDB: {0}")]
572    Build(String),
573    #[error("failed to execute DDL for {section} '{name}': {source}")]
574    Ddl {
575        section: String,
576        name: String,
577        source: DbError,
578    },
579    #[error("failed to start pipeline: {0}")]
580    Start(String),
581    #[error("HTTP server error: {0}")]
582    Http(String),
583    #[error("shutdown error: {0}")]
584    Shutdown(String),
585    #[error(transparent)]
586    Config(#[from] ConfigError),
587    #[error("cluster mode error: {0}")]
588    Cluster(String),
589    #[cfg(feature = "cluster-unstable")]
590    #[error(transparent)]
591    ClusterConfig(#[from] ClusterConfigError),
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use crate::config::*;
598
599    fn make_source(name: &str, connector: &str) -> SourceConfig {
600        SourceConfig {
601            name: name.to_string(),
602            connector: connector.to_string(),
603            format: "json".to_string(),
604            properties: toml::Table::new(),
605            schema: vec![
606                ColumnDef {
607                    name: "id".to_string(),
608                    data_type: "BIGINT".to_string(),
609                    nullable: false,
610                },
611                ColumnDef {
612                    name: "name".to_string(),
613                    data_type: "VARCHAR".to_string(),
614                    nullable: true,
615                },
616            ],
617            watermark: None,
618        }
619    }
620
621    #[test]
622    fn test_source_to_ddl_basic() {
623        let source = make_source("events", "kafka");
624        let ddl = source_to_ddl(&source);
625        assert!(ddl.starts_with("CREATE SOURCE events"));
626        assert!(ddl.contains("id BIGINT NOT NULL"));
627        assert!(ddl.contains("name VARCHAR"));
628        assert!(ddl.contains("FROM KAFKA"));
629        assert!(ddl.contains("format = 'json'"));
630    }
631
632    /// Columnless OTel source + WATERMARK FOR must compose: the OTel
633    /// connector implements `discover_schema` so the DDL layer can
634    /// resolve columns before validating the watermark.
635    #[cfg(feature = "otel")]
636    #[tokio::test]
637    async fn execute_config_ddl_columnless_otel_with_watermark_succeeds() {
638        let mut source = SourceConfig {
639            name: "otel_events".to_string(),
640            connector: "otel".to_string(),
641            format: "json".to_string(),
642            properties: toml::Table::new(),
643            schema: vec![],
644            watermark: Some(WatermarkConfig {
645                column: "_laminar_received_at".to_string(),
646                max_out_of_orderness: std::time::Duration::from_secs(10),
647            }),
648        };
649        // Bind to an ephemeral port so the test doesn't clash with 4317.
650        source
651            .properties
652            .insert("port".to_string(), toml::Value::String("0".to_string()));
653        source.properties.insert(
654            "signals".to_string(),
655            toml::Value::String("logs".to_string()),
656        );
657
658        let db = laminar_db::LaminarDB::open().unwrap();
659        let config = ServerConfig {
660            server: ServerSection::default(),
661            state: StateBackendConfig::default(),
662            checkpoint: CheckpointSection::default(),
663            sources: vec![source],
664            lookups: vec![],
665            pipelines: vec![],
666            sinks: vec![],
667            sql: None,
668            discovery: None,
669            coordination: None,
670            node_id: None,
671            ai: Default::default(),
672            models: Default::default(),
673        };
674        execute_config_ddl(&db, &config)
675            .await
676            .expect("columnless OTel + WATERMARK FOR should compose");
677    }
678
679    /// Columnless Kafka source + WATERMARK FOR: the Kafka connector can't
680    /// discover a schema without `bootstrap.servers` configured, so the DDL
681    /// layer surfaces a "schema auto-discovery failed: …" error (or, when
682    /// the connector returns no schema, "could not auto-discover a schema").
683    /// The server no longer pre-empts this — we just check the error bubbles
684    /// up clearly. Requires the kafka connector to be registered.
685    #[cfg(feature = "kafka")]
686    #[tokio::test]
687    async fn execute_config_ddl_columnless_kafka_surfaces_discovery_error() {
688        let mut source = make_source("events", "kafka");
689        source.schema.clear();
690        source.watermark = Some(WatermarkConfig {
691            column: "ts".to_string(),
692            max_out_of_orderness: std::time::Duration::from_secs(5),
693        });
694
695        let db = laminar_db::LaminarDB::open().unwrap();
696        let config = ServerConfig {
697            server: ServerSection::default(),
698            state: StateBackendConfig::default(),
699            checkpoint: CheckpointSection::default(),
700            sources: vec![source],
701            lookups: vec![],
702            pipelines: vec![],
703            sinks: vec![],
704            sql: None,
705            discovery: None,
706            coordination: None,
707            node_id: None,
708            ai: Default::default(),
709            models: Default::default(),
710        };
711        let err = execute_config_ddl(&db, &config).await.unwrap_err();
712        let msg = err.to_string();
713        assert!(
714            msg.contains("schema auto-discovery failed")
715                || msg.contains("could not auto-discover a schema")
716                || msg.contains("no columns declared"),
717            "expected schema-discovery error from the DDL layer, got: {msg}"
718        );
719    }
720
721    #[test]
722    fn test_source_to_ddl_with_watermark() {
723        let mut source = make_source("events", "kafka");
724        source.watermark = Some(WatermarkConfig {
725            column: "ts".to_string(),
726            max_out_of_orderness: std::time::Duration::from_secs(5),
727        });
728        let ddl = source_to_ddl(&source);
729        assert!(ddl.contains("WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"));
730    }
731
732    #[test]
733    fn test_source_to_ddl_with_properties() {
734        let mut source = make_source("events", "kafka");
735        source.properties.insert(
736            "brokers".to_string(),
737            toml::Value::String("localhost:9092".to_string()),
738        );
739        source.properties.insert(
740            "topic".to_string(),
741            toml::Value::String("events".to_string()),
742        );
743        let ddl = source_to_ddl(&source);
744        assert!(ddl.contains("brokers = 'localhost:9092'"));
745        assert!(ddl.contains("topic = 'events'"));
746    }
747
748    #[test]
749    fn test_pipeline_to_ddl() {
750        let pipeline = PipelineConfig {
751            name: "vwap".to_string(),
752            sql: "SELECT symbol, SUM(price) FROM trades GROUP BY symbol".to_string(),
753        };
754        let ddl = pipeline_to_ddl(&pipeline);
755        assert_eq!(
756            ddl,
757            "CREATE STREAM vwap AS SELECT symbol, SUM(price) FROM trades GROUP BY symbol"
758        );
759    }
760
761    #[test]
762    fn test_sink_to_ddl() {
763        let mut props = toml::Table::new();
764        props.insert(
765            "topic".to_string(),
766            toml::Value::String("output".to_string()),
767        );
768        props.insert(
769            "brokers".to_string(),
770            toml::Value::String("localhost:9092".to_string()),
771        );
772        let sink = SinkConfig {
773            name: "output_sink".to_string(),
774            pipeline: "vwap".to_string(),
775            connector: "kafka".to_string(),
776            delivery: "at_least_once".to_string(),
777            properties: props,
778        };
779        let ddl = sink_to_ddl(&sink);
780        assert!(ddl.starts_with("CREATE SINK output_sink FROM vwap INTO KAFKA"));
781        assert!(ddl.contains("topic = 'output'"));
782        assert!(ddl.contains("brokers = 'localhost:9092'"));
783        // at_least_once is default, should not appear
784        assert!(!ddl.contains("delivery"));
785    }
786
787    #[test]
788    fn test_sink_to_ddl_exactly_once() {
789        let sink = SinkConfig {
790            name: "out".to_string(),
791            pipeline: "p".to_string(),
792            connector: "kafka".to_string(),
793            delivery: "exactly_once".to_string(),
794            properties: toml::Table::new(),
795        };
796        let ddl = sink_to_ddl(&sink);
797        assert!(ddl.contains("delivery = 'exactly_once'"));
798    }
799
800    #[test]
801    fn test_lookup_to_ddl() {
802        let lookup = LookupConfig {
803            name: "instruments".to_string(),
804            connector: "postgres".to_string(),
805            strategy: "poll".to_string(),
806            cache: LookupCacheConfig::default(),
807            properties: {
808                let mut t = toml::Table::new();
809                t.insert(
810                    "connection".to_string(),
811                    toml::Value::String("postgresql://localhost/db".to_string()),
812                );
813                t
814            },
815            primary_key: vec!["symbol".to_string()],
816            schema: vec![ColumnDef {
817                name: "symbol".to_string(),
818                data_type: "VARCHAR".to_string(),
819                nullable: false,
820            }],
821        };
822        let ddl = lookup_to_ddl(&lookup).unwrap();
823        assert!(ddl.starts_with("CREATE LOOKUP TABLE instruments"));
824        assert!(ddl.contains("symbol VARCHAR NOT NULL"));
825        assert!(ddl.contains("PRIMARY KEY (symbol)"));
826        assert!(ddl.contains("'connector' = 'postgres'"));
827        assert!(ddl.contains("'strategy' = 'poll'"));
828        assert!(ddl.contains("'connection' = 'postgresql://localhost/db'"));
829    }
830
831    #[test]
832    fn test_lookup_to_ddl_no_primary_key() {
833        let lookup = LookupConfig {
834            name: "t".to_string(),
835            connector: "postgres".to_string(),
836            strategy: "poll".to_string(),
837            cache: LookupCacheConfig::default(),
838            properties: toml::Table::new(),
839            primary_key: vec![],
840            schema: vec![ColumnDef {
841                name: "id".to_string(),
842                data_type: "INT".to_string(),
843                nullable: false,
844            }],
845        };
846        let ddl = lookup_to_ddl(&lookup).unwrap();
847        assert!(!ddl.contains("PRIMARY KEY"));
848    }
849
850    #[test]
851    fn test_lookup_to_ddl_empty_schema_rejected() {
852        let lookup = LookupConfig {
853            name: "bad".to_string(),
854            connector: "postgres".to_string(),
855            strategy: "poll".to_string(),
856            cache: LookupCacheConfig::default(),
857            properties: toml::Table::new(),
858            primary_key: vec![],
859            schema: vec![],
860        };
861        assert!(lookup_to_ddl(&lookup).is_err());
862    }
863
864    #[test]
865    fn test_toml_value_to_sql() {
866        assert_eq!(
867            toml_value_to_sql(&toml::Value::String("hello".to_string())),
868            "hello"
869        );
870        assert_eq!(toml_value_to_sql(&toml::Value::Integer(42)), "42");
871        assert_eq!(toml_value_to_sql(&toml::Value::Boolean(true)), "true");
872        assert_eq!(toml_value_to_sql(&toml::Value::Float(3.25)), "3.25");
873    }
874
875    #[test]
876    fn test_toml_value_to_sql_escapes_single_quotes() {
877        assert_eq!(
878            toml_value_to_sql(&toml::Value::String("it's a test".to_string())),
879            "it''s a test"
880        );
881        assert_eq!(
882            toml_value_to_sql(&toml::Value::String("a''b".to_string())),
883            "a''''b"
884        );
885    }
886}