1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use parking_lot::RwLock;
12use tokio::sync::watch;
13use tokio_util::sync::CancellationToken;
14
15use super::{Discovery, DiscoveryError, NodeId, NodeInfo, NodeMetadata, NodeState};
16
17pub mod keys {
19 pub const NODE_STATE: &str = "node:state";
21 pub const RPC_ADDRESS: &str = "node:rpc_addr";
23 pub const RAFT_ADDRESS: &str = "node:raft_addr";
25 pub const NODE_NAME: &str = "node:name";
27 pub const PARTITIONS_OWNED: &str = "partitions:owned";
29 pub const LOAD_CORES: &str = "load:cores";
31 pub const LOAD_MEMORY: &str = "load:memory_bytes";
33 pub const FAILURE_DOMAIN: &str = "node:failure_domain";
35 pub const NODE_VERSION: &str = "node:version";
37}
38
39#[derive(Debug, Clone)]
41pub struct GossipDiscoveryConfig {
42 pub gossip_address: String,
44 pub seed_nodes: Vec<String>,
46 pub gossip_interval: Duration,
48 pub phi_threshold: f64,
50 pub dead_node_grace_period: Duration,
52 pub cluster_id: String,
54 pub node_id: NodeId,
56 pub local_node: NodeInfo,
58}
59
60impl Default for GossipDiscoveryConfig {
61 fn default() -> Self {
62 Self {
63 gossip_address: "127.0.0.1:9003".into(),
64 seed_nodes: Vec::new(),
65 gossip_interval: Duration::from_millis(500),
66 phi_threshold: 8.0,
67 dead_node_grace_period: Duration::from_secs(300),
68 cluster_id: "laminardb-default".into(),
69 node_id: NodeId(1),
70 local_node: NodeInfo {
71 id: NodeId(1),
72 name: "node-1".into(),
73 rpc_address: "127.0.0.1:9000".into(),
74 raft_address: "127.0.0.1:9001".into(),
75 state: NodeState::Active,
76 metadata: NodeMetadata::default(),
77 last_heartbeat_ms: 0,
78 },
79 }
80 }
81}
82
83pub struct GossipDiscovery {
85 config: GossipDiscoveryConfig,
86 peers: Arc<RwLock<HashMap<u64, NodeInfo>>>,
87 membership_tx: watch::Sender<Vec<NodeInfo>>,
88 membership_rx: watch::Receiver<Vec<NodeInfo>>,
89 cancel: CancellationToken,
90 started: bool,
91 chitchat_handle: Option<chitchat::ChitchatHandle>,
92}
93
94impl GossipDiscovery {
95 #[must_use]
97 pub fn new(config: GossipDiscoveryConfig) -> Self {
98 let (tx, rx) = watch::channel(Vec::new());
99 Self {
100 config,
101 peers: Arc::new(RwLock::new(HashMap::new())),
102 membership_tx: tx,
103 membership_rx: rx,
104 cancel: CancellationToken::new(),
105 started: false,
106 chitchat_handle: None,
107 }
108 }
109
110 fn parse_node_info(node_id_str: &str, kvs: &HashMap<String, String>) -> Option<NodeInfo> {
112 let id: u64 = node_id_str.strip_prefix("node-")?.parse().ok()?;
113 let rpc_address = kvs.get(keys::RPC_ADDRESS)?.clone();
114 let raft_address = kvs.get(keys::RAFT_ADDRESS).cloned().unwrap_or_default();
115 let name = kvs
116 .get(keys::NODE_NAME)
117 .cloned()
118 .unwrap_or_else(|| format!("node-{id}"));
119 let state = kvs
120 .get(keys::NODE_STATE)
121 .and_then(|s| match s.as_str() {
122 "joining" => Some(NodeState::Joining),
123 "active" => Some(NodeState::Active),
124 "suspected" => Some(NodeState::Suspected),
125 "draining" => Some(NodeState::Draining),
126 "left" => Some(NodeState::Left),
127 _ => None,
128 })
129 .unwrap_or(NodeState::Active);
130
131 let cores: u32 = kvs
132 .get(keys::LOAD_CORES)
133 .and_then(|s| s.parse().ok())
134 .unwrap_or(1);
135 let memory_bytes: u64 = kvs
136 .get(keys::LOAD_MEMORY)
137 .and_then(|s| s.parse().ok())
138 .unwrap_or(0);
139 let failure_domain = kvs.get(keys::FAILURE_DOMAIN).cloned();
140 let version = kvs.get(keys::NODE_VERSION).cloned().unwrap_or_default();
141 let owned_partitions: Vec<u32> = kvs
142 .get(keys::PARTITIONS_OWNED)
143 .map(|s| s.split(',').filter_map(|p| p.trim().parse().ok()).collect())
144 .unwrap_or_default();
145
146 Some(NodeInfo {
147 id: NodeId(id),
148 name,
149 rpc_address,
150 raft_address,
151 state,
152 metadata: NodeMetadata {
153 cores,
154 memory_bytes,
155 failure_domain,
156 tags: HashMap::new(),
157 owned_partitions,
158 version,
159 },
160 last_heartbeat_ms: chrono::Utc::now().timestamp_millis(),
161 })
162 }
163
164 fn local_kvs(info: &NodeInfo) -> Vec<(String, String)> {
166 let mut kvs = vec![
167 (keys::NODE_STATE.into(), info.state.to_string()),
168 (keys::RPC_ADDRESS.into(), info.rpc_address.clone()),
169 (keys::RAFT_ADDRESS.into(), info.raft_address.clone()),
170 (keys::NODE_NAME.into(), info.name.clone()),
171 (keys::LOAD_CORES.into(), info.metadata.cores.to_string()),
172 (
173 keys::LOAD_MEMORY.into(),
174 info.metadata.memory_bytes.to_string(),
175 ),
176 (keys::NODE_VERSION.into(), info.metadata.version.clone()),
177 ];
178 if let Some(ref fd) = info.metadata.failure_domain {
179 kvs.push((keys::FAILURE_DOMAIN.into(), fd.clone()));
180 }
181 if !info.metadata.owned_partitions.is_empty() {
182 let parts: Vec<String> = info
183 .metadata
184 .owned_partitions
185 .iter()
186 .map(ToString::to_string)
187 .collect();
188 kvs.push((keys::PARTITIONS_OWNED.into(), parts.join(",")));
189 }
190 kvs
191 }
192}
193
194impl std::fmt::Debug for GossipDiscovery {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 f.debug_struct("GossipDiscovery")
197 .field("config", &self.config)
198 .field("started", &self.started)
199 .finish_non_exhaustive()
200 }
201}
202
203impl Discovery for GossipDiscovery {
204 async fn start(&mut self) -> Result<(), DiscoveryError> {
205 if self.started {
206 return Ok(());
207 }
208
209 let node_id = format!("node-{}", self.config.node_id.0);
210 let gossip_addr = self
211 .config
212 .gossip_address
213 .parse()
214 .map_err(|e: std::net::AddrParseError| DiscoveryError::Bind(e.to_string()))?;
215
216 let seed_addrs: Vec<String> = self.config.seed_nodes.clone();
217
218 let config = chitchat::ChitchatConfig {
219 chitchat_id: chitchat::ChitchatId::new(
220 node_id,
221 0, gossip_addr,
223 ),
224 cluster_id: self.config.cluster_id.clone(),
225 gossip_interval: self.config.gossip_interval,
226 listen_addr: gossip_addr,
227 seed_nodes: seed_addrs,
228 failure_detector_config: chitchat::FailureDetectorConfig {
229 phi_threshold: self.config.phi_threshold,
230 initial_interval: self.config.gossip_interval,
231 dead_node_grace_period: self.config.dead_node_grace_period,
234 ..Default::default()
235 },
236 marked_for_deletion_grace_period: self.config.dead_node_grace_period,
237 extra_liveness_predicate: None,
238 catchup_callback: None,
239 };
240
241 let initial_kvs = Self::local_kvs(&self.config.local_node);
242 let transport = chitchat::transport::UdpTransport;
243 let chitchat_handle = chitchat::spawn_chitchat(config, initial_kvs, &transport)
244 .await
245 .map_err(|e| DiscoveryError::Bind(e.to_string()))?;
246
247 self.chitchat_handle = Some(chitchat_handle);
248
249 let peers = Arc::clone(&self.peers);
251 let membership_tx = self.membership_tx.clone();
252 let cancel = self.cancel.clone();
253 let chitchat = self.chitchat_handle.as_ref().unwrap().chitchat().clone();
254 let local_node_id = self.config.node_id;
255
256 tokio::spawn(async move {
257 let mut interval = tokio::time::interval(Duration::from_millis(500));
258 loop {
259 tokio::select! {
260 () = cancel.cancelled() => break,
261 _ = interval.tick() => {
262 let chitchat_guard = chitchat.lock().await;
263 let mut new_peers = HashMap::new();
264
265 let live_ids: std::collections::HashSet<&chitchat::ChitchatId> =
268 chitchat_guard.live_nodes().collect();
269
270 for (cc_id, state) in chitchat_guard.node_states() {
272 let kvs: HashMap<String, String> = state
273 .key_values()
274 .map(|(k, v)| (k.to_string(), v.to_string()))
275 .collect();
276
277 if let Some(mut info) = Self::parse_node_info(
278 &cc_id.node_id,
279 &kvs,
280 ) {
281 if info.id == local_node_id {
282 continue;
283 }
284
285 if !live_ids.contains(cc_id) {
290 info.state = NodeState::Suspected;
291 }
292
293 new_peers.insert(info.id.0, info);
294 }
295 }
296
297 let peer_list: Vec<NodeInfo> =
298 new_peers.values().cloned().collect();
299 *peers.write() = new_peers;
300 let _ = membership_tx.send(peer_list);
301 }
302 }
303 }
304 });
305
306 self.started = true;
307 Ok(())
308 }
309
310 async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError> {
311 if !self.started {
312 return Err(DiscoveryError::NotStarted);
313 }
314 let peers = self.peers.read();
315 Ok(peers.values().cloned().collect())
316 }
317
318 async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError> {
319 if !self.started {
320 return Err(DiscoveryError::NotStarted);
321 }
322 if let Some(ref handle) = self.chitchat_handle {
323 let kvs = Self::local_kvs(&info);
324 handle
325 .with_chitchat(|chitchat| {
326 for (key, value) in &kvs {
327 chitchat.self_node_state().set(key.clone(), value.clone());
328 }
329 })
330 .await;
331 }
332 Ok(())
333 }
334
335 fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
336 self.membership_rx.clone()
337 }
338
339 async fn stop(&mut self) -> Result<(), DiscoveryError> {
340 self.cancel.cancel();
341 self.started = false;
342 if let Some(handle) = self.chitchat_handle.take() {
345 if let Err(e) = handle.shutdown().await {
346 tracing::warn!("Chitchat shutdown error: {e}");
347 }
348 }
349 self.cancel = CancellationToken::new();
351 Ok(())
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_key_namespace() {
361 assert_eq!(keys::NODE_STATE, "node:state");
362 assert_eq!(keys::RPC_ADDRESS, "node:rpc_addr");
363 }
364
365 #[test]
366 fn test_gossip_config_default() {
367 let config = GossipDiscoveryConfig::default();
368 assert_eq!(config.gossip_interval, Duration::from_millis(500));
369 assert!((config.phi_threshold - 8.0).abs() < f64::EPSILON);
370 }
371
372 #[test]
373 fn test_parse_node_info() {
374 let mut kvs = HashMap::new();
375 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
376 kvs.insert(keys::RAFT_ADDRESS.into(), "127.0.0.1:9001".into());
377 kvs.insert(keys::NODE_NAME.into(), "test-node".into());
378 kvs.insert(keys::NODE_STATE.into(), "active".into());
379 kvs.insert(keys::LOAD_CORES.into(), "4".into());
380 kvs.insert(keys::LOAD_MEMORY.into(), "8589934592".into());
381
382 let info = GossipDiscovery::parse_node_info("node-42", &kvs).unwrap();
383 assert_eq!(info.id, NodeId(42));
384 assert_eq!(info.name, "test-node");
385 assert_eq!(info.metadata.cores, 4);
386 assert_eq!(info.state, NodeState::Active);
387 }
388
389 #[test]
390 fn test_parse_node_info_invalid_id() {
391 let kvs = HashMap::new();
392 assert!(GossipDiscovery::parse_node_info("invalid", &kvs).is_none());
393 }
394
395 #[test]
396 fn test_parse_node_info_missing_rpc() {
397 let kvs = HashMap::new();
398 assert!(GossipDiscovery::parse_node_info("node-1", &kvs).is_none());
399 }
400
401 #[test]
402 fn test_local_kvs() {
403 let info = NodeInfo {
404 id: NodeId(1),
405 name: "n1".into(),
406 rpc_address: "127.0.0.1:9000".into(),
407 raft_address: "127.0.0.1:9001".into(),
408 state: NodeState::Active,
409 metadata: NodeMetadata {
410 cores: 4,
411 memory_bytes: 1024,
412 failure_domain: Some("us-east-1a".into()),
413 owned_partitions: vec![0, 1, 2],
414 ..NodeMetadata::default()
415 },
416 last_heartbeat_ms: 0,
417 };
418 let kvs = GossipDiscovery::local_kvs(&info);
419 assert!(kvs.iter().any(|(k, _)| k == keys::RPC_ADDRESS));
420 assert!(kvs.iter().any(|(k, _)| k == keys::FAILURE_DOMAIN));
421 assert!(kvs
422 .iter()
423 .any(|(k, v)| k == keys::PARTITIONS_OWNED && v == "0,1,2"));
424 }
425
426 #[test]
427 fn test_parse_owned_partitions() {
428 let mut kvs = HashMap::new();
429 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
430 kvs.insert(keys::PARTITIONS_OWNED.into(), "0,1,5,10".into());
431
432 let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
433 assert_eq!(info.metadata.owned_partitions, vec![0, 1, 5, 10]);
434 }
435
436 #[test]
437 fn test_parse_all_node_states() {
438 for (state_str, expected) in [
439 ("joining", NodeState::Joining),
440 ("active", NodeState::Active),
441 ("suspected", NodeState::Suspected),
442 ("draining", NodeState::Draining),
443 ("left", NodeState::Left),
444 ] {
445 let mut kvs = HashMap::new();
446 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
447 kvs.insert(keys::NODE_STATE.into(), state_str.into());
448
449 let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
450 assert_eq!(info.state, expected);
451 }
452 }
453
454 #[tokio::test]
455 async fn test_not_started_errors() {
456 let config = GossipDiscoveryConfig::default();
457 let disc = GossipDiscovery::new(config);
458 assert!(disc.peers().await.is_err());
459 }
460}