Skip to main content

laminardb/
reload.rs

1//! Hot-reload: config diff engine and incremental DDL application.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use serde::Serialize;
7use tracing::{info, warn};
8
9use laminar_db::LaminarDB;
10
11use crate::config::{LookupConfig, PipelineConfig, ServerConfig, SinkConfig, SourceConfig};
12use crate::server;
13
14#[derive(Debug, Default)]
15pub struct ConfigDiff {
16    pub sources_added: Vec<SourceConfig>,
17    pub sources_removed: Vec<SourceConfig>,
18    pub sources_changed: Vec<SourceConfig>,
19
20    pub lookups_added: Vec<LookupConfig>,
21    pub lookups_removed: Vec<LookupConfig>,
22    pub lookups_changed: Vec<LookupConfig>,
23
24    pub pipelines_added: Vec<PipelineConfig>,
25    pub pipelines_removed: Vec<PipelineConfig>,
26    pub pipelines_changed: Vec<PipelineConfig>,
27
28    pub sinks_added: Vec<SinkConfig>,
29    pub sinks_removed: Vec<SinkConfig>,
30    pub sinks_changed: Vec<SinkConfig>,
31
32    pub warnings: Vec<String>,
33}
34
35impl ConfigDiff {
36    pub fn is_empty(&self) -> bool {
37        self.sources_added.is_empty()
38            && self.sources_removed.is_empty()
39            && self.sources_changed.is_empty()
40            && self.lookups_added.is_empty()
41            && self.lookups_removed.is_empty()
42            && self.lookups_changed.is_empty()
43            && self.pipelines_added.is_empty()
44            && self.pipelines_removed.is_empty()
45            && self.pipelines_changed.is_empty()
46            && self.sinks_added.is_empty()
47            && self.sinks_removed.is_empty()
48            && self.sinks_changed.is_empty()
49    }
50}
51
52#[derive(Debug, Serialize)]
53pub struct ReloadResult {
54    pub success: bool,
55    pub applied: Vec<ReloadOp>,
56    pub failed: Vec<ReloadFailure>,
57    pub warnings: Vec<String>,
58}
59
60#[derive(Debug, Serialize)]
61pub struct ReloadOp {
62    pub action: String,
63    pub object_type: String,
64    pub name: String,
65}
66
67#[derive(Debug, Serialize)]
68pub struct ReloadFailure {
69    pub action: String,
70    pub object_type: String,
71    pub name: String,
72    pub error: String,
73}
74
75/// Compute the diff between an old and new configuration.
76pub fn diff_configs(old: &ServerConfig, new: &ServerConfig) -> ConfigDiff {
77    let mut diff = ConfigDiff::default();
78
79    // Diff named sections
80    diff_named_section(
81        &old.sources,
82        &new.sources,
83        |s| &s.name,
84        &mut diff.sources_added,
85        &mut diff.sources_removed,
86        &mut diff.sources_changed,
87    );
88
89    diff_named_section(
90        &old.lookups,
91        &new.lookups,
92        |l| &l.name,
93        &mut diff.lookups_added,
94        &mut diff.lookups_removed,
95        &mut diff.lookups_changed,
96    );
97
98    diff_named_section(
99        &old.pipelines,
100        &new.pipelines,
101        |p| &p.name,
102        &mut diff.pipelines_added,
103        &mut diff.pipelines_removed,
104        &mut diff.pipelines_changed,
105    );
106
107    diff_named_section(
108        &old.sinks,
109        &new.sinks,
110        |s| &s.name,
111        &mut diff.sinks_added,
112        &mut diff.sinks_removed,
113        &mut diff.sinks_changed,
114    );
115
116    // Non-reloadable section warnings
117    if old.server != new.server {
118        diff.warnings
119            .push("[server] section changed — requires restart".to_string());
120    }
121    if old.state != new.state {
122        diff.warnings
123            .push("[state] section changed — requires restart".to_string());
124    }
125    if old.checkpoint != new.checkpoint {
126        diff.warnings
127            .push("[checkpoint] section changed — requires restart".to_string());
128    }
129    if old.discovery != new.discovery {
130        diff.warnings
131            .push("[discovery] section changed — requires restart".to_string());
132    }
133    if old.sql != new.sql {
134        diff.warnings
135            .push("sql field changed — requires restart".to_string());
136    }
137    if old.coordination != new.coordination {
138        diff.warnings
139            .push("[coordination] section changed — requires restart".to_string());
140    }
141    if old.node_id != new.node_id {
142        diff.warnings
143            .push("node_id changed — requires restart".to_string());
144    }
145    // The AI runtime (providers, models, defaults) is built once at startup and
146    // isn't hot-swappable, so changes need a restart rather than being ignored.
147    if old.ai != new.ai {
148        diff.warnings
149            .push("[ai] section changed — requires restart".to_string());
150    }
151    if old.models != new.models {
152        diff.warnings
153            .push("[models] section changed — requires restart".to_string());
154    }
155
156    diff
157}
158
159fn diff_named_section<T: Clone + PartialEq>(
160    old: &[T],
161    new: &[T],
162    name_fn: fn(&T) -> &str,
163    added: &mut Vec<T>,
164    removed: &mut Vec<T>,
165    changed: &mut Vec<T>,
166) {
167    // Build name → item maps
168    let old_map: std::collections::HashMap<&str, &T> =
169        old.iter().map(|item| (name_fn(item), item)).collect();
170    let new_map: std::collections::HashMap<&str, &T> =
171        new.iter().map(|item| (name_fn(item), item)).collect();
172
173    // Removed: in old but not in new
174    for (name, item) in &old_map {
175        if !new_map.contains_key(name) {
176            removed.push((*item).clone());
177        }
178    }
179
180    // Added or changed: in new
181    for (name, new_item) in &new_map {
182        match old_map.get(name) {
183            None => added.push((*new_item).clone()),
184            Some(old_item) => {
185                if *old_item != *new_item {
186                    changed.push((*new_item).clone());
187                }
188            }
189        }
190    }
191}
192
193/// Apply a config diff to a live `LaminarDB` instance via incremental DDL.
194///
195/// Remove phase (reverse dependency order): sinks → streams → lookups → sources.
196/// Create phase (dependency order): sources → lookups → pipelines → sinks.
197pub async fn apply_reload(db: &LaminarDB, diff: &ConfigDiff) -> ReloadResult {
198    let mut applied = Vec::new();
199    let mut failed = Vec::new();
200
201    // Remove phase (reverse dependency order)
202    for sink in diff.sinks_removed.iter().chain(diff.sinks_changed.iter()) {
203        let ddl = format!("DROP SINK IF EXISTS {} CASCADE", sink.name);
204        exec_ddl(
205            db,
206            &ddl,
207            "drop",
208            "sink",
209            &sink.name,
210            &mut applied,
211            &mut failed,
212        )
213        .await;
214    }
215    for p in diff
216        .pipelines_removed
217        .iter()
218        .chain(diff.pipelines_changed.iter())
219    {
220        let ddl = format!("DROP STREAM IF EXISTS {} CASCADE", p.name);
221        exec_ddl(
222            db,
223            &ddl,
224            "drop",
225            "stream",
226            &p.name,
227            &mut applied,
228            &mut failed,
229        )
230        .await;
231    }
232    for l in diff
233        .lookups_removed
234        .iter()
235        .chain(diff.lookups_changed.iter())
236    {
237        let ddl = format!("DROP LOOKUP TABLE IF EXISTS {} CASCADE", l.name);
238        exec_ddl(
239            db,
240            &ddl,
241            "drop",
242            "lookup",
243            &l.name,
244            &mut applied,
245            &mut failed,
246        )
247        .await;
248    }
249    for s in diff
250        .sources_removed
251        .iter()
252        .chain(diff.sources_changed.iter())
253    {
254        let ddl = format!("DROP SOURCE IF EXISTS {} CASCADE", s.name);
255        exec_ddl(
256            db,
257            &ddl,
258            "drop",
259            "source",
260            &s.name,
261            &mut applied,
262            &mut failed,
263        )
264        .await;
265    }
266
267    // Create phase (dependency order)
268    for s in diff.sources_added.iter().chain(diff.sources_changed.iter()) {
269        let ddl = server::source_to_ddl(s);
270        exec_ddl(
271            db,
272            &ddl,
273            "create",
274            "source",
275            &s.name,
276            &mut applied,
277            &mut failed,
278        )
279        .await;
280    }
281    for l in diff.lookups_added.iter().chain(diff.lookups_changed.iter()) {
282        match server::lookup_to_ddl(l) {
283            Ok(ddl) => {
284                exec_ddl(
285                    db,
286                    &ddl,
287                    "create",
288                    "lookup",
289                    &l.name,
290                    &mut applied,
291                    &mut failed,
292                )
293                .await;
294            }
295            Err(e) => {
296                warn!("Invalid lookup config '{}': {e}", l.name);
297                failed.push(ReloadFailure {
298                    action: "create".to_string(),
299                    object_type: "lookup".to_string(),
300                    name: l.name.clone(),
301                    error: e.to_string(),
302                });
303            }
304        }
305    }
306    for p in diff
307        .pipelines_added
308        .iter()
309        .chain(diff.pipelines_changed.iter())
310    {
311        let ddl = server::pipeline_to_ddl(p);
312        exec_ddl(
313            db,
314            &ddl,
315            "create",
316            "stream",
317            &p.name,
318            &mut applied,
319            &mut failed,
320        )
321        .await;
322    }
323    for sink in diff.sinks_added.iter().chain(diff.sinks_changed.iter()) {
324        let ddl = server::sink_to_ddl(sink);
325        exec_ddl(
326            db,
327            &ddl,
328            "create",
329            "sink",
330            &sink.name,
331            &mut applied,
332            &mut failed,
333        )
334        .await;
335    }
336
337    ReloadResult {
338        success: failed.is_empty(),
339        applied,
340        failed,
341        warnings: diff.warnings.clone(),
342    }
343}
344
345async fn exec_ddl(
346    db: &LaminarDB,
347    ddl: &str,
348    action: &str,
349    object_type: &str,
350    name: &str,
351    applied: &mut Vec<ReloadOp>,
352    failed: &mut Vec<ReloadFailure>,
353) {
354    match db.execute(ddl).await {
355        Ok(_) => {
356            info!("{action} {object_type}: {name}");
357            applied.push(ReloadOp {
358                action: action.to_string(),
359                object_type: object_type.to_string(),
360                name: name.to_string(),
361            });
362        }
363        Err(e) => {
364            warn!("Failed to {action} {object_type} '{name}': {e}");
365            failed.push(ReloadFailure {
366                action: action.to_string(),
367                object_type: object_type.to_string(),
368                name: name.to_string(),
369                error: e.to_string(),
370            });
371        }
372    }
373}
374
375/// Prevents concurrent reloads via CAS on an `AtomicBool`.
376#[derive(Clone)]
377pub struct ReloadGuard {
378    in_progress: Arc<AtomicBool>,
379}
380
381impl ReloadGuard {
382    pub fn new() -> Self {
383        Self {
384            in_progress: Arc::new(AtomicBool::new(false)),
385        }
386    }
387
388    pub fn try_acquire(&self) -> Option<ReloadGuardHandle> {
389        let was_free =
390            self.in_progress
391                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
392        if was_free.is_ok() {
393            Some(ReloadGuardHandle {
394                flag: Arc::clone(&self.in_progress),
395            })
396        } else {
397            None
398        }
399    }
400}
401
402pub struct ReloadGuardHandle {
403    flag: Arc<AtomicBool>,
404}
405
406impl Drop for ReloadGuardHandle {
407    fn drop(&mut self) {
408        self.flag.store(false, Ordering::Release);
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::config::*;
416
417    fn empty_config() -> ServerConfig {
418        ServerConfig {
419            server: ServerSection::default(),
420            state: laminar_core::state::StateBackendConfig::default(),
421            checkpoint: CheckpointSection::default(),
422            sources: vec![],
423            lookups: vec![],
424            pipelines: vec![],
425            sinks: vec![],
426            discovery: None,
427            coordination: None,
428            node_id: None,
429            sql: None,
430            ai: Default::default(),
431            models: Default::default(),
432        }
433    }
434
435    fn make_source(name: &str) -> SourceConfig {
436        SourceConfig {
437            name: name.to_string(),
438            connector: "kafka".to_string(),
439            format: "json".to_string(),
440            properties: toml::Table::new(),
441            schema: vec![],
442            watermark: None,
443        }
444    }
445
446    fn make_pipeline(name: &str, sql: &str) -> PipelineConfig {
447        PipelineConfig {
448            name: name.to_string(),
449            sql: sql.to_string(),
450        }
451    }
452
453    fn make_sink(name: &str, pipeline: &str) -> SinkConfig {
454        SinkConfig {
455            name: name.to_string(),
456            pipeline: pipeline.to_string(),
457            connector: "kafka".to_string(),
458            delivery: "at_least_once".to_string(),
459            properties: toml::Table::new(),
460        }
461    }
462
463    fn make_lookup(name: &str) -> LookupConfig {
464        LookupConfig {
465            name: name.to_string(),
466            connector: "postgres".to_string(),
467            strategy: "poll".to_string(),
468            cache: LookupCacheConfig::default(),
469            properties: toml::Table::new(),
470            primary_key: vec![],
471            schema: vec![],
472        }
473    }
474
475    // -- diff_configs tests --
476
477    #[test]
478    fn test_diff_empty_configs() {
479        let old = empty_config();
480        let new = empty_config();
481        let diff = diff_configs(&old, &new);
482        assert!(diff.is_empty());
483        assert!(diff.warnings.is_empty());
484    }
485
486    #[test]
487    fn test_diff_identical_configs() {
488        let mut old = empty_config();
489        old.sources.push(make_source("s1"));
490        old.pipelines.push(make_pipeline("p1", "SELECT 1"));
491        let new = old.clone();
492        let diff = diff_configs(&old, &new);
493        assert!(diff.is_empty());
494    }
495
496    #[test]
497    fn test_diff_source_added() {
498        let old = empty_config();
499        let mut new = empty_config();
500        new.sources.push(make_source("new_src"));
501        let diff = diff_configs(&old, &new);
502        assert_eq!(diff.sources_added.len(), 1);
503        assert_eq!(diff.sources_added[0].name, "new_src");
504        assert!(diff.sources_removed.is_empty());
505        assert!(diff.sources_changed.is_empty());
506    }
507
508    #[test]
509    fn test_diff_source_removed() {
510        let mut old = empty_config();
511        old.sources.push(make_source("old_src"));
512        let new = empty_config();
513        let diff = diff_configs(&old, &new);
514        assert!(diff.sources_added.is_empty());
515        assert_eq!(diff.sources_removed.len(), 1);
516        assert_eq!(diff.sources_removed[0].name, "old_src");
517    }
518
519    #[test]
520    fn test_diff_source_changed() {
521        let mut old = empty_config();
522        old.sources.push(make_source("s1"));
523        let mut new = empty_config();
524        let mut changed = make_source("s1");
525        changed.format = "avro".to_string();
526        new.sources.push(changed);
527        let diff = diff_configs(&old, &new);
528        assert!(diff.sources_added.is_empty());
529        assert!(diff.sources_removed.is_empty());
530        assert_eq!(diff.sources_changed.len(), 1);
531        assert_eq!(diff.sources_changed[0].format, "avro");
532    }
533
534    #[test]
535    fn test_diff_pipeline_sql_changed() {
536        let mut old = empty_config();
537        old.pipelines.push(make_pipeline("p1", "SELECT 1"));
538        let mut new = empty_config();
539        new.pipelines.push(make_pipeline("p1", "SELECT 2"));
540        let diff = diff_configs(&old, &new);
541        assert_eq!(diff.pipelines_changed.len(), 1);
542        assert_eq!(diff.pipelines_changed[0].sql, "SELECT 2");
543    }
544
545    #[test]
546    fn test_diff_sink_changed() {
547        let mut old = empty_config();
548        old.sinks.push(make_sink("out", "p1"));
549        let mut new = empty_config();
550        let mut changed = make_sink("out", "p1");
551        changed.delivery = "exactly_once".to_string();
552        new.sinks.push(changed);
553        let diff = diff_configs(&old, &new);
554        assert_eq!(diff.sinks_changed.len(), 1);
555    }
556
557    #[test]
558    fn test_diff_lookup_changed() {
559        let mut old = empty_config();
560        old.lookups.push(make_lookup("lk1"));
561        let mut new = empty_config();
562        let mut changed = make_lookup("lk1");
563        changed.strategy = "cdc".to_string();
564        new.lookups.push(changed);
565        let diff = diff_configs(&old, &new);
566        assert_eq!(diff.lookups_changed.len(), 1);
567    }
568
569    #[test]
570    fn test_diff_non_reloadable_warnings() {
571        let old = empty_config();
572        let mut new = empty_config();
573        new.server.bind = "0.0.0.0:9999".to_string();
574        new.state = laminar_core::state::StateBackendConfig::local("./data/state");
575        new.ai
576            .defaults
577            .insert("classify".to_string(), "m".to_string());
578        let diff = diff_configs(&old, &new);
579        assert!(diff.is_empty()); // no reloadable changes
580        assert!(diff.warnings.iter().any(|w| w.contains("[server]")));
581        assert!(diff.warnings.iter().any(|w| w.contains("[state]")));
582        assert!(diff.warnings.iter().any(|w| w.contains("[ai]")));
583    }
584
585    #[test]
586    fn test_diff_multiple_sections_changed() {
587        let old = empty_config();
588        let mut new = empty_config();
589        new.sources.push(make_source("s1"));
590        new.pipelines.push(make_pipeline("p1", "SELECT 1"));
591        new.sinks.push(make_sink("out", "p1"));
592        let diff = diff_configs(&old, &new);
593        assert_eq!(diff.sources_added.len(), 1);
594        assert_eq!(diff.pipelines_added.len(), 1);
595        assert_eq!(diff.sinks_added.len(), 1);
596        assert!(!diff.is_empty());
597    }
598
599    #[test]
600    fn test_is_empty_on_default() {
601        let diff = ConfigDiff::default();
602        assert!(diff.is_empty());
603    }
604
605    // -- ReloadGuard tests --
606
607    #[test]
608    fn test_guard_acquire_release() {
609        let guard = ReloadGuard::new();
610        {
611            let handle = guard.try_acquire();
612            assert!(handle.is_some());
613            // While held, second acquire fails
614            assert!(guard.try_acquire().is_none());
615        }
616        // After drop, can acquire again
617        assert!(guard.try_acquire().is_some());
618    }
619
620    #[test]
621    fn test_guard_concurrent_reject() {
622        let guard = ReloadGuard::new();
623        let _handle = guard.try_acquire().unwrap();
624        assert!(guard.try_acquire().is_none());
625        assert!(guard.try_acquire().is_none());
626    }
627
628    #[test]
629    fn test_guard_raii_drop() {
630        let guard = ReloadGuard::new();
631        let handle = guard.try_acquire().unwrap();
632        drop(handle);
633        let handle2 = guard.try_acquire();
634        assert!(handle2.is_some());
635    }
636
637    // -- apply_reload tests (using real LaminarDB) --
638
639    #[tokio::test]
640    async fn test_apply_add_source() {
641        let db = LaminarDB::open().unwrap();
642        let mut diff = ConfigDiff::default();
643        diff.sources_added.push(SourceConfig {
644            name: "test_src".to_string(),
645            connector: "kafka".to_string(),
646            format: "json".to_string(),
647            properties: toml::Table::new(),
648            schema: vec![ColumnDef {
649                name: "id".to_string(),
650                data_type: "BIGINT".to_string(),
651                nullable: false,
652            }],
653            watermark: None,
654        });
655        let result = apply_reload(&db, &diff).await;
656        // Connector may not be available in test builds; verify the op was attempted
657        let total = result.applied.len() + result.failed.len();
658        assert_eq!(total, 1, "expected exactly one create operation");
659        if result.success {
660            assert_eq!(result.applied[0].action, "create");
661            assert_eq!(result.applied[0].name, "test_src");
662        } else {
663            assert_eq!(result.failed[0].action, "create");
664            assert_eq!(result.failed[0].name, "test_src");
665        }
666    }
667
668    #[tokio::test]
669    async fn test_apply_remove_source() {
670        let db = LaminarDB::open().unwrap();
671        // First create the source
672        db.execute("CREATE SOURCE rm_src (id BIGINT)")
673            .await
674            .unwrap();
675
676        let mut diff = ConfigDiff::default();
677        diff.sources_removed.push(make_source("rm_src"));
678        let result = apply_reload(&db, &diff).await;
679        assert!(result.success);
680        assert_eq!(result.applied.len(), 1);
681        assert_eq!(result.applied[0].action, "drop");
682    }
683
684    #[tokio::test]
685    async fn test_apply_change_pipeline() {
686        let db = LaminarDB::open().unwrap();
687        // Create source and initial pipeline
688        db.execute("CREATE SOURCE cp_src (id BIGINT, val DOUBLE)")
689            .await
690            .unwrap();
691        db.execute("CREATE STREAM cp_pipe AS SELECT id, val FROM cp_src")
692            .await
693            .unwrap();
694
695        // Change pipeline SQL
696        let mut diff = ConfigDiff::default();
697        diff.pipelines_changed
698            .push(make_pipeline("cp_pipe", "SELECT id FROM cp_src"));
699        let result = apply_reload(&db, &diff).await;
700        assert!(result.success);
701        // Should have 2 ops: drop + create
702        assert_eq!(result.applied.len(), 2);
703    }
704
705    #[tokio::test]
706    async fn test_apply_ordered_removal() {
707        let db = LaminarDB::open().unwrap();
708        // Create a full pipeline chain: source → pipeline → sink
709        db.execute("CREATE SOURCE ord_src (id BIGINT)")
710            .await
711            .unwrap();
712        db.execute("CREATE STREAM ord_pipe AS SELECT id FROM ord_src")
713            .await
714            .unwrap();
715
716        // Remove all in one diff — should respect dependency order
717        let mut diff = ConfigDiff::default();
718        diff.sources_removed.push(make_source("ord_src"));
719        diff.pipelines_removed
720            .push(make_pipeline("ord_pipe", "SELECT id FROM ord_src"));
721
722        let result = apply_reload(&db, &diff).await;
723        assert!(result.success);
724        // Pipeline should be dropped before source
725        let drop_names: Vec<&str> = result
726            .applied
727            .iter()
728            .filter(|op| op.action == "drop")
729            .map(|op| op.name.as_str())
730            .collect();
731        let pipe_idx = drop_names.iter().position(|n| *n == "ord_pipe");
732        let src_idx = drop_names.iter().position(|n| *n == "ord_src");
733        assert!(pipe_idx < src_idx, "pipeline must be dropped before source");
734    }
735
736    #[tokio::test]
737    async fn test_apply_empty_diff() {
738        let db = LaminarDB::open().unwrap();
739        let diff = ConfigDiff::default();
740        let result = apply_reload(&db, &diff).await;
741        assert!(result.success);
742        assert!(result.applied.is_empty());
743        assert!(result.failed.is_empty());
744    }
745
746    #[tokio::test]
747    async fn test_apply_warnings_passed_through() {
748        let db = LaminarDB::open().unwrap();
749        let mut diff = ConfigDiff::default();
750        diff.warnings
751            .push("[server] section changed — requires restart".to_string());
752        let result = apply_reload(&db, &diff).await;
753        assert!(result.success);
754        assert_eq!(result.warnings.len(), 1);
755        assert!(result.warnings[0].contains("[server]"));
756    }
757}