Skip to main content

laminardb/
cluster_config.rs

1//! Cluster mode configuration extraction and validation.
2
3use std::fmt;
4use std::time::Duration;
5
6use crate::config::{CoordinationSection, DiscoverySection, ServerConfig};
7
8/// Node identity for cluster mode (non-empty, max 64 chars).
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ClusterNodeId(String);
11
12impl ClusterNodeId {
13    const MAX_LEN: usize = 64;
14
15    pub fn from_config(id: String) -> Result<Self, ClusterConfigError> {
16        if id.is_empty() {
17            return Err(ClusterConfigError::InvalidNodeId(
18                "node_id must not be empty".to_string(),
19            ));
20        }
21        let truncated = if id.len() > Self::MAX_LEN {
22            id[..Self::MAX_LEN].to_string()
23        } else {
24            id
25        };
26        Ok(Self(truncated))
27    }
28
29    /// Auto-generate from bind address: `{hostname}-{port}`, or UUID v4 fallback.
30    pub fn auto_generate(bind_addr: &str) -> Self {
31        let port = bind_addr.rsplit(':').next().unwrap_or("8080");
32
33        let hostname = gethostname::gethostname();
34        let hostname_str = hostname.to_string_lossy();
35
36        let candidate = if hostname_str.is_empty() {
37            format!("{}", uuid::Uuid::new_v4())
38        } else {
39            format!("{hostname_str}-{port}")
40        };
41
42        let truncated = if candidate.len() > Self::MAX_LEN {
43            candidate[..Self::MAX_LEN].to_string()
44        } else {
45            candidate
46        };
47
48        Self(truncated)
49    }
50
51    pub fn as_str(&self) -> &str {
52        &self.0
53    }
54}
55
56impl fmt::Display for ClusterNodeId {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.write_str(&self.0)
59    }
60}
61
62/// Extracted and validated cluster configuration.
63#[derive(Debug, Clone)]
64pub struct ClusterConfig {
65    pub node_id: ClusterNodeId,
66    pub discovery: DiscoverySection,
67    pub coordination: CoordinationSection,
68    pub formation_timeout: Duration,
69}
70
71impl ClusterConfig {
72    const DEFAULT_FORMATION_TIMEOUT: Duration = Duration::from_secs(60);
73
74    pub fn from_server_config(config: &ServerConfig) -> Result<Option<Self>, ClusterConfigError> {
75        if config.server.mode != "cluster" {
76            return Ok(None);
77        }
78
79        let discovery = config
80            .discovery
81            .clone()
82            .ok_or_else(|| ClusterConfigError::MissingSection("[discovery]".to_string()))?;
83
84        let coordination = config
85            .coordination
86            .clone()
87            .ok_or_else(|| ClusterConfigError::MissingSection("[coordination]".to_string()))?;
88
89        if discovery.seeds.is_empty() && discovery.strategy == "static" {
90            return Err(ClusterConfigError::EmptySeeds);
91        }
92
93        let node_id = match &config.node_id {
94            Some(id) => ClusterNodeId::from_config(id.clone())?,
95            None => ClusterNodeId::auto_generate(&config.server.bind),
96        };
97
98        Ok(Some(Self {
99            node_id,
100            discovery,
101            coordination,
102            formation_timeout: Self::DEFAULT_FORMATION_TIMEOUT,
103        }))
104    }
105}
106
107#[derive(Debug, thiserror::Error)]
108pub enum ClusterConfigError {
109    #[error("cluster mode requires {0} section in config")]
110    MissingSection(String),
111    #[error("invalid node_id: {0}")]
112    InvalidNodeId(String),
113    #[error("static discovery requires at least one seed address")]
114    EmptySeeds,
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::config::*;
121
122    fn base_config() -> ServerConfig {
123        ServerConfig {
124            server: ServerSection::default(),
125            state: laminar_core::state::StateBackendConfig::default(),
126            checkpoint: CheckpointSection::default(),
127            sources: vec![],
128            lookups: vec![],
129            pipelines: vec![],
130            sinks: vec![],
131            discovery: None,
132            coordination: None,
133            node_id: None,
134            sql: None,
135            ai: Default::default(),
136            models: Default::default(),
137        }
138    }
139
140    fn cluster_config() -> ServerConfig {
141        let mut config = base_config();
142        config.server.mode = "cluster".to_string();
143        config.node_id = Some("test-node-1".to_string());
144        config.discovery = Some(DiscoverySection {
145            strategy: "static".to_string(),
146            seeds: vec!["node-1:7946".to_string(), "node-2:7946".to_string()],
147            gossip_port: 7946,
148        });
149        config.coordination = Some(CoordinationSection {
150            strategy: "raft".to_string(),
151            raft_port: 7947,
152            election_timeout: Duration::from_millis(1500),
153            heartbeat_interval: Duration::from_millis(300),
154        });
155        config
156    }
157
158    #[test]
159    fn test_cluster_config_from_server_config_valid() {
160        let config = cluster_config();
161        let result = ClusterConfig::from_server_config(&config).unwrap();
162        let cluster_cfg = result.expect("should return Some for cluster mode");
163        assert_eq!(cluster_cfg.node_id.as_str(), "test-node-1");
164        assert_eq!(cluster_cfg.discovery.strategy, "static");
165        assert_eq!(cluster_cfg.coordination.raft_port, 7947);
166        assert_eq!(cluster_cfg.formation_timeout, Duration::from_secs(60));
167    }
168
169    #[test]
170    fn test_cluster_config_embedded_mode_returns_none() {
171        let config = base_config();
172        let result = ClusterConfig::from_server_config(&config).unwrap();
173        assert!(result.is_none());
174    }
175
176    #[test]
177    fn test_cluster_config_missing_discovery() {
178        let mut config = cluster_config();
179        config.discovery = None;
180        let err = ClusterConfig::from_server_config(&config).unwrap_err();
181        assert!(err.to_string().contains("[discovery]"));
182    }
183
184    #[test]
185    fn test_cluster_config_missing_coordination() {
186        let mut config = cluster_config();
187        config.coordination = None;
188        let err = ClusterConfig::from_server_config(&config).unwrap_err();
189        assert!(err.to_string().contains("[coordination]"));
190    }
191
192    #[test]
193    fn test_node_id_from_config() {
194        let node_id = ClusterNodeId::from_config("star-1".to_string()).unwrap();
195        assert_eq!(node_id.as_str(), "star-1");
196    }
197
198    #[test]
199    fn test_node_id_auto_generate() {
200        let node_id = ClusterNodeId::auto_generate("0.0.0.0:8080");
201        let s = node_id.as_str();
202        assert!(!s.is_empty());
203        assert!(s.ends_with("-8080"), "expected suffix -8080, got: {s}");
204    }
205
206    #[test]
207    fn test_node_id_auto_generate_truncation() {
208        // A very long hostname won't exceed 64 chars
209        let node_id = ClusterNodeId::from_config("a".repeat(100)).unwrap();
210        assert_eq!(node_id.as_str().len(), 64);
211    }
212
213    #[test]
214    fn test_cluster_config_error_display() {
215        assert!(ClusterConfigError::MissingSection("[discovery]".into())
216            .to_string()
217            .contains("[discovery]"));
218        assert!(ClusterConfigError::EmptySeeds
219            .to_string()
220            .contains("at least one seed"));
221    }
222
223    #[test]
224    fn test_empty_seeds_with_static_strategy() {
225        let mut config = cluster_config();
226        config.discovery.as_mut().unwrap().seeds.clear();
227        let err = ClusterConfig::from_server_config(&config).unwrap_err();
228        match err {
229            ClusterConfigError::EmptySeeds => {}
230            other => panic!("expected EmptySeeds, got: {other}"),
231        }
232    }
233}