1use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use serde::Serialize;
7use tracing::{info, warn};
8
9use laminar_db::LaminarDB;
10
11use crate::config::{LookupConfig, PipelineConfig, ServerConfig, SinkConfig, SourceConfig};
12use crate::server;
13
14#[derive(Debug, Default)]
15pub struct ConfigDiff {
16 pub sources_added: Vec<SourceConfig>,
17 pub sources_removed: Vec<SourceConfig>,
18 pub sources_changed: Vec<SourceConfig>,
19
20 pub lookups_added: Vec<LookupConfig>,
21 pub lookups_removed: Vec<LookupConfig>,
22 pub lookups_changed: Vec<LookupConfig>,
23
24 pub pipelines_added: Vec<PipelineConfig>,
25 pub pipelines_removed: Vec<PipelineConfig>,
26 pub pipelines_changed: Vec<PipelineConfig>,
27
28 pub sinks_added: Vec<SinkConfig>,
29 pub sinks_removed: Vec<SinkConfig>,
30 pub sinks_changed: Vec<SinkConfig>,
31
32 pub warnings: Vec<String>,
33}
34
35impl ConfigDiff {
36 pub fn is_empty(&self) -> bool {
37 self.sources_added.is_empty()
38 && self.sources_removed.is_empty()
39 && self.sources_changed.is_empty()
40 && self.lookups_added.is_empty()
41 && self.lookups_removed.is_empty()
42 && self.lookups_changed.is_empty()
43 && self.pipelines_added.is_empty()
44 && self.pipelines_removed.is_empty()
45 && self.pipelines_changed.is_empty()
46 && self.sinks_added.is_empty()
47 && self.sinks_removed.is_empty()
48 && self.sinks_changed.is_empty()
49 }
50}
51
52#[derive(Debug, Serialize)]
53pub struct ReloadResult {
54 pub success: bool,
55 pub applied: Vec<ReloadOp>,
56 pub failed: Vec<ReloadFailure>,
57 pub warnings: Vec<String>,
58}
59
60#[derive(Debug, Serialize)]
61pub struct ReloadOp {
62 pub action: String,
63 pub object_type: String,
64 pub name: String,
65}
66
67#[derive(Debug, Serialize)]
68pub struct ReloadFailure {
69 pub action: String,
70 pub object_type: String,
71 pub name: String,
72 pub error: String,
73}
74
75pub fn diff_configs(old: &ServerConfig, new: &ServerConfig) -> ConfigDiff {
77 let mut diff = ConfigDiff::default();
78
79 diff_named_section(
81 &old.sources,
82 &new.sources,
83 |s| &s.name,
84 &mut diff.sources_added,
85 &mut diff.sources_removed,
86 &mut diff.sources_changed,
87 );
88
89 diff_named_section(
90 &old.lookups,
91 &new.lookups,
92 |l| &l.name,
93 &mut diff.lookups_added,
94 &mut diff.lookups_removed,
95 &mut diff.lookups_changed,
96 );
97
98 diff_named_section(
99 &old.pipelines,
100 &new.pipelines,
101 |p| &p.name,
102 &mut diff.pipelines_added,
103 &mut diff.pipelines_removed,
104 &mut diff.pipelines_changed,
105 );
106
107 diff_named_section(
108 &old.sinks,
109 &new.sinks,
110 |s| &s.name,
111 &mut diff.sinks_added,
112 &mut diff.sinks_removed,
113 &mut diff.sinks_changed,
114 );
115
116 if old.server != new.server {
118 diff.warnings
119 .push("[server] section changed — requires restart".to_string());
120 }
121 if old.state != new.state {
122 diff.warnings
123 .push("[state] section changed — requires restart".to_string());
124 }
125 if old.checkpoint != new.checkpoint {
126 diff.warnings
127 .push("[checkpoint] section changed — requires restart".to_string());
128 }
129 if old.discovery != new.discovery {
130 diff.warnings
131 .push("[discovery] section changed — requires restart".to_string());
132 }
133 if old.sql != new.sql {
134 diff.warnings
135 .push("sql field changed — requires restart".to_string());
136 }
137 if old.coordination != new.coordination {
138 diff.warnings
139 .push("[coordination] section changed — requires restart".to_string());
140 }
141 if old.node_id != new.node_id {
142 diff.warnings
143 .push("node_id changed — requires restart".to_string());
144 }
145 if old.ai != new.ai {
148 diff.warnings
149 .push("[ai] section changed — requires restart".to_string());
150 }
151 if old.models != new.models {
152 diff.warnings
153 .push("[models] section changed — requires restart".to_string());
154 }
155
156 diff
157}
158
159fn diff_named_section<T: Clone + PartialEq>(
160 old: &[T],
161 new: &[T],
162 name_fn: fn(&T) -> &str,
163 added: &mut Vec<T>,
164 removed: &mut Vec<T>,
165 changed: &mut Vec<T>,
166) {
167 let old_map: std::collections::HashMap<&str, &T> =
169 old.iter().map(|item| (name_fn(item), item)).collect();
170 let new_map: std::collections::HashMap<&str, &T> =
171 new.iter().map(|item| (name_fn(item), item)).collect();
172
173 for (name, item) in &old_map {
175 if !new_map.contains_key(name) {
176 removed.push((*item).clone());
177 }
178 }
179
180 for (name, new_item) in &new_map {
182 match old_map.get(name) {
183 None => added.push((*new_item).clone()),
184 Some(old_item) => {
185 if *old_item != *new_item {
186 changed.push((*new_item).clone());
187 }
188 }
189 }
190 }
191}
192
193pub async fn apply_reload(db: &LaminarDB, diff: &ConfigDiff) -> ReloadResult {
198 let mut applied = Vec::new();
199 let mut failed = Vec::new();
200
201 for sink in diff.sinks_removed.iter().chain(diff.sinks_changed.iter()) {
203 let ddl = format!("DROP SINK IF EXISTS {} CASCADE", sink.name);
204 exec_ddl(
205 db,
206 &ddl,
207 "drop",
208 "sink",
209 &sink.name,
210 &mut applied,
211 &mut failed,
212 )
213 .await;
214 }
215 for p in diff
216 .pipelines_removed
217 .iter()
218 .chain(diff.pipelines_changed.iter())
219 {
220 let ddl = format!("DROP STREAM IF EXISTS {} CASCADE", p.name);
221 exec_ddl(
222 db,
223 &ddl,
224 "drop",
225 "stream",
226 &p.name,
227 &mut applied,
228 &mut failed,
229 )
230 .await;
231 }
232 for l in diff
233 .lookups_removed
234 .iter()
235 .chain(diff.lookups_changed.iter())
236 {
237 let ddl = format!("DROP LOOKUP TABLE IF EXISTS {} CASCADE", l.name);
238 exec_ddl(
239 db,
240 &ddl,
241 "drop",
242 "lookup",
243 &l.name,
244 &mut applied,
245 &mut failed,
246 )
247 .await;
248 }
249 for s in diff
250 .sources_removed
251 .iter()
252 .chain(diff.sources_changed.iter())
253 {
254 let ddl = format!("DROP SOURCE IF EXISTS {} CASCADE", s.name);
255 exec_ddl(
256 db,
257 &ddl,
258 "drop",
259 "source",
260 &s.name,
261 &mut applied,
262 &mut failed,
263 )
264 .await;
265 }
266
267 for s in diff.sources_added.iter().chain(diff.sources_changed.iter()) {
269 let ddl = server::source_to_ddl(s);
270 exec_ddl(
271 db,
272 &ddl,
273 "create",
274 "source",
275 &s.name,
276 &mut applied,
277 &mut failed,
278 )
279 .await;
280 }
281 for l in diff.lookups_added.iter().chain(diff.lookups_changed.iter()) {
282 match server::lookup_to_ddl(l) {
283 Ok(ddl) => {
284 exec_ddl(
285 db,
286 &ddl,
287 "create",
288 "lookup",
289 &l.name,
290 &mut applied,
291 &mut failed,
292 )
293 .await;
294 }
295 Err(e) => {
296 warn!("Invalid lookup config '{}': {e}", l.name);
297 failed.push(ReloadFailure {
298 action: "create".to_string(),
299 object_type: "lookup".to_string(),
300 name: l.name.clone(),
301 error: e.to_string(),
302 });
303 }
304 }
305 }
306 for p in diff
307 .pipelines_added
308 .iter()
309 .chain(diff.pipelines_changed.iter())
310 {
311 let ddl = server::pipeline_to_ddl(p);
312 exec_ddl(
313 db,
314 &ddl,
315 "create",
316 "stream",
317 &p.name,
318 &mut applied,
319 &mut failed,
320 )
321 .await;
322 }
323 for sink in diff.sinks_added.iter().chain(diff.sinks_changed.iter()) {
324 let ddl = server::sink_to_ddl(sink);
325 exec_ddl(
326 db,
327 &ddl,
328 "create",
329 "sink",
330 &sink.name,
331 &mut applied,
332 &mut failed,
333 )
334 .await;
335 }
336
337 ReloadResult {
338 success: failed.is_empty(),
339 applied,
340 failed,
341 warnings: diff.warnings.clone(),
342 }
343}
344
345async fn exec_ddl(
346 db: &LaminarDB,
347 ddl: &str,
348 action: &str,
349 object_type: &str,
350 name: &str,
351 applied: &mut Vec<ReloadOp>,
352 failed: &mut Vec<ReloadFailure>,
353) {
354 match db.execute(ddl).await {
355 Ok(_) => {
356 info!("{action} {object_type}: {name}");
357 applied.push(ReloadOp {
358 action: action.to_string(),
359 object_type: object_type.to_string(),
360 name: name.to_string(),
361 });
362 }
363 Err(e) => {
364 warn!("Failed to {action} {object_type} '{name}': {e}");
365 failed.push(ReloadFailure {
366 action: action.to_string(),
367 object_type: object_type.to_string(),
368 name: name.to_string(),
369 error: e.to_string(),
370 });
371 }
372 }
373}
374
375#[derive(Clone)]
377pub struct ReloadGuard {
378 in_progress: Arc<AtomicBool>,
379}
380
381impl ReloadGuard {
382 pub fn new() -> Self {
383 Self {
384 in_progress: Arc::new(AtomicBool::new(false)),
385 }
386 }
387
388 pub fn try_acquire(&self) -> Option<ReloadGuardHandle> {
389 let was_free =
390 self.in_progress
391 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
392 if was_free.is_ok() {
393 Some(ReloadGuardHandle {
394 flag: Arc::clone(&self.in_progress),
395 })
396 } else {
397 None
398 }
399 }
400}
401
402pub struct ReloadGuardHandle {
403 flag: Arc<AtomicBool>,
404}
405
406impl Drop for ReloadGuardHandle {
407 fn drop(&mut self) {
408 self.flag.store(false, Ordering::Release);
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::config::*;
416
417 fn empty_config() -> ServerConfig {
418 ServerConfig {
419 server: ServerSection::default(),
420 state: laminar_core::state::StateBackendConfig::default(),
421 checkpoint: CheckpointSection::default(),
422 sources: vec![],
423 lookups: vec![],
424 pipelines: vec![],
425 sinks: vec![],
426 discovery: None,
427 coordination: None,
428 node_id: None,
429 sql: None,
430 ai: Default::default(),
431 models: Default::default(),
432 }
433 }
434
435 fn make_source(name: &str) -> SourceConfig {
436 SourceConfig {
437 name: name.to_string(),
438 connector: "kafka".to_string(),
439 format: "json".to_string(),
440 properties: toml::Table::new(),
441 schema: vec![],
442 watermark: None,
443 }
444 }
445
446 fn make_pipeline(name: &str, sql: &str) -> PipelineConfig {
447 PipelineConfig {
448 name: name.to_string(),
449 sql: sql.to_string(),
450 }
451 }
452
453 fn make_sink(name: &str, pipeline: &str) -> SinkConfig {
454 SinkConfig {
455 name: name.to_string(),
456 pipeline: pipeline.to_string(),
457 connector: "kafka".to_string(),
458 delivery: "at_least_once".to_string(),
459 properties: toml::Table::new(),
460 }
461 }
462
463 fn make_lookup(name: &str) -> LookupConfig {
464 LookupConfig {
465 name: name.to_string(),
466 connector: "postgres".to_string(),
467 strategy: "poll".to_string(),
468 cache: LookupCacheConfig::default(),
469 properties: toml::Table::new(),
470 primary_key: vec![],
471 schema: vec![],
472 }
473 }
474
475 #[test]
478 fn test_diff_empty_configs() {
479 let old = empty_config();
480 let new = empty_config();
481 let diff = diff_configs(&old, &new);
482 assert!(diff.is_empty());
483 assert!(diff.warnings.is_empty());
484 }
485
486 #[test]
487 fn test_diff_identical_configs() {
488 let mut old = empty_config();
489 old.sources.push(make_source("s1"));
490 old.pipelines.push(make_pipeline("p1", "SELECT 1"));
491 let new = old.clone();
492 let diff = diff_configs(&old, &new);
493 assert!(diff.is_empty());
494 }
495
496 #[test]
497 fn test_diff_source_added() {
498 let old = empty_config();
499 let mut new = empty_config();
500 new.sources.push(make_source("new_src"));
501 let diff = diff_configs(&old, &new);
502 assert_eq!(diff.sources_added.len(), 1);
503 assert_eq!(diff.sources_added[0].name, "new_src");
504 assert!(diff.sources_removed.is_empty());
505 assert!(diff.sources_changed.is_empty());
506 }
507
508 #[test]
509 fn test_diff_source_removed() {
510 let mut old = empty_config();
511 old.sources.push(make_source("old_src"));
512 let new = empty_config();
513 let diff = diff_configs(&old, &new);
514 assert!(diff.sources_added.is_empty());
515 assert_eq!(diff.sources_removed.len(), 1);
516 assert_eq!(diff.sources_removed[0].name, "old_src");
517 }
518
519 #[test]
520 fn test_diff_source_changed() {
521 let mut old = empty_config();
522 old.sources.push(make_source("s1"));
523 let mut new = empty_config();
524 let mut changed = make_source("s1");
525 changed.format = "avro".to_string();
526 new.sources.push(changed);
527 let diff = diff_configs(&old, &new);
528 assert!(diff.sources_added.is_empty());
529 assert!(diff.sources_removed.is_empty());
530 assert_eq!(diff.sources_changed.len(), 1);
531 assert_eq!(diff.sources_changed[0].format, "avro");
532 }
533
534 #[test]
535 fn test_diff_pipeline_sql_changed() {
536 let mut old = empty_config();
537 old.pipelines.push(make_pipeline("p1", "SELECT 1"));
538 let mut new = empty_config();
539 new.pipelines.push(make_pipeline("p1", "SELECT 2"));
540 let diff = diff_configs(&old, &new);
541 assert_eq!(diff.pipelines_changed.len(), 1);
542 assert_eq!(diff.pipelines_changed[0].sql, "SELECT 2");
543 }
544
545 #[test]
546 fn test_diff_sink_changed() {
547 let mut old = empty_config();
548 old.sinks.push(make_sink("out", "p1"));
549 let mut new = empty_config();
550 let mut changed = make_sink("out", "p1");
551 changed.delivery = "exactly_once".to_string();
552 new.sinks.push(changed);
553 let diff = diff_configs(&old, &new);
554 assert_eq!(diff.sinks_changed.len(), 1);
555 }
556
557 #[test]
558 fn test_diff_lookup_changed() {
559 let mut old = empty_config();
560 old.lookups.push(make_lookup("lk1"));
561 let mut new = empty_config();
562 let mut changed = make_lookup("lk1");
563 changed.strategy = "cdc".to_string();
564 new.lookups.push(changed);
565 let diff = diff_configs(&old, &new);
566 assert_eq!(diff.lookups_changed.len(), 1);
567 }
568
569 #[test]
570 fn test_diff_non_reloadable_warnings() {
571 let old = empty_config();
572 let mut new = empty_config();
573 new.server.bind = "0.0.0.0:9999".to_string();
574 new.state = laminar_core::state::StateBackendConfig::local("./data/state");
575 new.ai
576 .defaults
577 .insert("classify".to_string(), "m".to_string());
578 let diff = diff_configs(&old, &new);
579 assert!(diff.is_empty()); assert!(diff.warnings.iter().any(|w| w.contains("[server]")));
581 assert!(diff.warnings.iter().any(|w| w.contains("[state]")));
582 assert!(diff.warnings.iter().any(|w| w.contains("[ai]")));
583 }
584
585 #[test]
586 fn test_diff_multiple_sections_changed() {
587 let old = empty_config();
588 let mut new = empty_config();
589 new.sources.push(make_source("s1"));
590 new.pipelines.push(make_pipeline("p1", "SELECT 1"));
591 new.sinks.push(make_sink("out", "p1"));
592 let diff = diff_configs(&old, &new);
593 assert_eq!(diff.sources_added.len(), 1);
594 assert_eq!(diff.pipelines_added.len(), 1);
595 assert_eq!(diff.sinks_added.len(), 1);
596 assert!(!diff.is_empty());
597 }
598
599 #[test]
600 fn test_is_empty_on_default() {
601 let diff = ConfigDiff::default();
602 assert!(diff.is_empty());
603 }
604
605 #[test]
608 fn test_guard_acquire_release() {
609 let guard = ReloadGuard::new();
610 {
611 let handle = guard.try_acquire();
612 assert!(handle.is_some());
613 assert!(guard.try_acquire().is_none());
615 }
616 assert!(guard.try_acquire().is_some());
618 }
619
620 #[test]
621 fn test_guard_concurrent_reject() {
622 let guard = ReloadGuard::new();
623 let _handle = guard.try_acquire().unwrap();
624 assert!(guard.try_acquire().is_none());
625 assert!(guard.try_acquire().is_none());
626 }
627
628 #[test]
629 fn test_guard_raii_drop() {
630 let guard = ReloadGuard::new();
631 let handle = guard.try_acquire().unwrap();
632 drop(handle);
633 let handle2 = guard.try_acquire();
634 assert!(handle2.is_some());
635 }
636
637 #[tokio::test]
640 async fn test_apply_add_source() {
641 let db = LaminarDB::open().unwrap();
642 let mut diff = ConfigDiff::default();
643 diff.sources_added.push(SourceConfig {
644 name: "test_src".to_string(),
645 connector: "kafka".to_string(),
646 format: "json".to_string(),
647 properties: toml::Table::new(),
648 schema: vec![ColumnDef {
649 name: "id".to_string(),
650 data_type: "BIGINT".to_string(),
651 nullable: false,
652 }],
653 watermark: None,
654 });
655 let result = apply_reload(&db, &diff).await;
656 let total = result.applied.len() + result.failed.len();
658 assert_eq!(total, 1, "expected exactly one create operation");
659 if result.success {
660 assert_eq!(result.applied[0].action, "create");
661 assert_eq!(result.applied[0].name, "test_src");
662 } else {
663 assert_eq!(result.failed[0].action, "create");
664 assert_eq!(result.failed[0].name, "test_src");
665 }
666 }
667
668 #[tokio::test]
669 async fn test_apply_remove_source() {
670 let db = LaminarDB::open().unwrap();
671 db.execute("CREATE SOURCE rm_src (id BIGINT)")
673 .await
674 .unwrap();
675
676 let mut diff = ConfigDiff::default();
677 diff.sources_removed.push(make_source("rm_src"));
678 let result = apply_reload(&db, &diff).await;
679 assert!(result.success);
680 assert_eq!(result.applied.len(), 1);
681 assert_eq!(result.applied[0].action, "drop");
682 }
683
684 #[tokio::test]
685 async fn test_apply_change_pipeline() {
686 let db = LaminarDB::open().unwrap();
687 db.execute("CREATE SOURCE cp_src (id BIGINT, val DOUBLE)")
689 .await
690 .unwrap();
691 db.execute("CREATE STREAM cp_pipe AS SELECT id, val FROM cp_src")
692 .await
693 .unwrap();
694
695 let mut diff = ConfigDiff::default();
697 diff.pipelines_changed
698 .push(make_pipeline("cp_pipe", "SELECT id FROM cp_src"));
699 let result = apply_reload(&db, &diff).await;
700 assert!(result.success);
701 assert_eq!(result.applied.len(), 2);
703 }
704
705 #[tokio::test]
706 async fn test_apply_ordered_removal() {
707 let db = LaminarDB::open().unwrap();
708 db.execute("CREATE SOURCE ord_src (id BIGINT)")
710 .await
711 .unwrap();
712 db.execute("CREATE STREAM ord_pipe AS SELECT id FROM ord_src")
713 .await
714 .unwrap();
715
716 let mut diff = ConfigDiff::default();
718 diff.sources_removed.push(make_source("ord_src"));
719 diff.pipelines_removed
720 .push(make_pipeline("ord_pipe", "SELECT id FROM ord_src"));
721
722 let result = apply_reload(&db, &diff).await;
723 assert!(result.success);
724 let drop_names: Vec<&str> = result
726 .applied
727 .iter()
728 .filter(|op| op.action == "drop")
729 .map(|op| op.name.as_str())
730 .collect();
731 let pipe_idx = drop_names.iter().position(|n| *n == "ord_pipe");
732 let src_idx = drop_names.iter().position(|n| *n == "ord_src");
733 assert!(pipe_idx < src_idx, "pipeline must be dropped before source");
734 }
735
736 #[tokio::test]
737 async fn test_apply_empty_diff() {
738 let db = LaminarDB::open().unwrap();
739 let diff = ConfigDiff::default();
740 let result = apply_reload(&db, &diff).await;
741 assert!(result.success);
742 assert!(result.applied.is_empty());
743 assert!(result.failed.is_empty());
744 }
745
746 #[tokio::test]
747 async fn test_apply_warnings_passed_through() {
748 let db = LaminarDB::open().unwrap();
749 let mut diff = ConfigDiff::default();
750 diff.warnings
751 .push("[server] section changed — requires restart".to_string());
752 let result = apply_reload(&db, &diff).await;
753 assert!(result.success);
754 assert_eq!(result.warnings.len(), 1);
755 assert!(result.warnings[0].contains("[server]"));
756 }
757}