laminardb/
cluster_config.rs1use std::fmt;
4use std::time::Duration;
5
6use crate::config::{CoordinationSection, DiscoverySection, ServerConfig};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ClusterNodeId(String);
11
12impl ClusterNodeId {
13 const MAX_LEN: usize = 64;
14
15 pub fn from_config(id: String) -> Result<Self, ClusterConfigError> {
16 if id.is_empty() {
17 return Err(ClusterConfigError::InvalidNodeId(
18 "node_id must not be empty".to_string(),
19 ));
20 }
21 let truncated = if id.len() > Self::MAX_LEN {
22 id[..Self::MAX_LEN].to_string()
23 } else {
24 id
25 };
26 Ok(Self(truncated))
27 }
28
29 pub fn auto_generate(bind_addr: &str) -> Self {
31 let port = bind_addr.rsplit(':').next().unwrap_or("8080");
32
33 let hostname = gethostname::gethostname();
34 let hostname_str = hostname.to_string_lossy();
35
36 let candidate = if hostname_str.is_empty() {
37 format!("{}", uuid::Uuid::new_v4())
38 } else {
39 format!("{hostname_str}-{port}")
40 };
41
42 let truncated = if candidate.len() > Self::MAX_LEN {
43 candidate[..Self::MAX_LEN].to_string()
44 } else {
45 candidate
46 };
47
48 Self(truncated)
49 }
50
51 pub fn as_str(&self) -> &str {
52 &self.0
53 }
54}
55
56impl fmt::Display for ClusterNodeId {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.write_str(&self.0)
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct ClusterConfig {
65 pub node_id: ClusterNodeId,
66 pub discovery: DiscoverySection,
67 pub coordination: CoordinationSection,
68 pub formation_timeout: Duration,
69}
70
71impl ClusterConfig {
72 const DEFAULT_FORMATION_TIMEOUT: Duration = Duration::from_secs(60);
73
74 pub fn from_server_config(config: &ServerConfig) -> Result<Option<Self>, ClusterConfigError> {
75 if config.server.mode != "cluster" {
76 return Ok(None);
77 }
78
79 let discovery = config
80 .discovery
81 .clone()
82 .ok_or_else(|| ClusterConfigError::MissingSection("[discovery]".to_string()))?;
83
84 let coordination = config
85 .coordination
86 .clone()
87 .ok_or_else(|| ClusterConfigError::MissingSection("[coordination]".to_string()))?;
88
89 if discovery.seeds.is_empty() && discovery.strategy == "static" {
90 return Err(ClusterConfigError::EmptySeeds);
91 }
92
93 let node_id = match &config.node_id {
94 Some(id) => ClusterNodeId::from_config(id.clone())?,
95 None => ClusterNodeId::auto_generate(&config.server.bind),
96 };
97
98 Ok(Some(Self {
99 node_id,
100 discovery,
101 coordination,
102 formation_timeout: Self::DEFAULT_FORMATION_TIMEOUT,
103 }))
104 }
105}
106
107#[derive(Debug, thiserror::Error)]
108pub enum ClusterConfigError {
109 #[error("cluster mode requires {0} section in config")]
110 MissingSection(String),
111 #[error("invalid node_id: {0}")]
112 InvalidNodeId(String),
113 #[error("static discovery requires at least one seed address")]
114 EmptySeeds,
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::config::*;
121
122 fn base_config() -> ServerConfig {
123 ServerConfig {
124 server: ServerSection::default(),
125 state: laminar_core::state::StateBackendConfig::default(),
126 checkpoint: CheckpointSection::default(),
127 sources: vec![],
128 lookups: vec![],
129 pipelines: vec![],
130 sinks: vec![],
131 discovery: None,
132 coordination: None,
133 node_id: None,
134 sql: None,
135 ai: Default::default(),
136 models: Default::default(),
137 }
138 }
139
140 fn cluster_config() -> ServerConfig {
141 let mut config = base_config();
142 config.server.mode = "cluster".to_string();
143 config.node_id = Some("test-node-1".to_string());
144 config.discovery = Some(DiscoverySection {
145 strategy: "static".to_string(),
146 seeds: vec!["node-1:7946".to_string(), "node-2:7946".to_string()],
147 gossip_port: 7946,
148 });
149 config.coordination = Some(CoordinationSection {
150 strategy: "raft".to_string(),
151 raft_port: 7947,
152 election_timeout: Duration::from_millis(1500),
153 heartbeat_interval: Duration::from_millis(300),
154 });
155 config
156 }
157
158 #[test]
159 fn test_cluster_config_from_server_config_valid() {
160 let config = cluster_config();
161 let result = ClusterConfig::from_server_config(&config).unwrap();
162 let cluster_cfg = result.expect("should return Some for cluster mode");
163 assert_eq!(cluster_cfg.node_id.as_str(), "test-node-1");
164 assert_eq!(cluster_cfg.discovery.strategy, "static");
165 assert_eq!(cluster_cfg.coordination.raft_port, 7947);
166 assert_eq!(cluster_cfg.formation_timeout, Duration::from_secs(60));
167 }
168
169 #[test]
170 fn test_cluster_config_embedded_mode_returns_none() {
171 let config = base_config();
172 let result = ClusterConfig::from_server_config(&config).unwrap();
173 assert!(result.is_none());
174 }
175
176 #[test]
177 fn test_cluster_config_missing_discovery() {
178 let mut config = cluster_config();
179 config.discovery = None;
180 let err = ClusterConfig::from_server_config(&config).unwrap_err();
181 assert!(err.to_string().contains("[discovery]"));
182 }
183
184 #[test]
185 fn test_cluster_config_missing_coordination() {
186 let mut config = cluster_config();
187 config.coordination = None;
188 let err = ClusterConfig::from_server_config(&config).unwrap_err();
189 assert!(err.to_string().contains("[coordination]"));
190 }
191
192 #[test]
193 fn test_node_id_from_config() {
194 let node_id = ClusterNodeId::from_config("star-1".to_string()).unwrap();
195 assert_eq!(node_id.as_str(), "star-1");
196 }
197
198 #[test]
199 fn test_node_id_auto_generate() {
200 let node_id = ClusterNodeId::auto_generate("0.0.0.0:8080");
201 let s = node_id.as_str();
202 assert!(!s.is_empty());
203 assert!(s.ends_with("-8080"), "expected suffix -8080, got: {s}");
204 }
205
206 #[test]
207 fn test_node_id_auto_generate_truncation() {
208 let node_id = ClusterNodeId::from_config("a".repeat(100)).unwrap();
210 assert_eq!(node_id.as_str().len(), 64);
211 }
212
213 #[test]
214 fn test_cluster_config_error_display() {
215 assert!(ClusterConfigError::MissingSection("[discovery]".into())
216 .to_string()
217 .contains("[discovery]"));
218 assert!(ClusterConfigError::EmptySeeds
219 .to_string()
220 .contains("at least one seed"));
221 }
222
223 #[test]
224 fn test_empty_seeds_with_static_strategy() {
225 let mut config = cluster_config();
226 config.discovery.as_mut().unwrap().seeds.clear();
227 let err = ClusterConfig::from_server_config(&config).unwrap_err();
228 match err {
229 ClusterConfigError::EmptySeeds => {}
230 other => panic!("expected EmptySeeds, got: {other}"),
231 }
232 }
233}