1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use parking_lot::RwLock;
12use tokio::sync::watch;
13use tokio_util::sync::CancellationToken;
14
15use super::{Discovery, DiscoveryError, NodeId, NodeInfo, NodeMetadata, NodeState};
16
17pub mod keys {
19 pub const NODE_STATE: &str = "node:state";
21 pub const RPC_ADDRESS: &str = "node:rpc_addr";
23 pub const RAFT_ADDRESS: &str = "node:raft_addr";
25 pub const NODE_NAME: &str = "node:name";
27 pub const PARTITIONS_OWNED: &str = "partitions:owned";
29 pub const LOAD_CORES: &str = "load:cores";
31 pub const LOAD_MEMORY: &str = "load:memory_bytes";
33 pub const FAILURE_DOMAIN: &str = "node:failure_domain";
35 pub const NODE_VERSION: &str = "node:version";
37}
38
39#[derive(Debug, Clone)]
41pub struct GossipDiscoveryConfig {
42 pub gossip_address: String,
44 pub seed_nodes: Vec<String>,
46 pub gossip_interval: Duration,
48 pub phi_threshold: f64,
50 pub dead_node_grace_period: Duration,
52 pub cluster_id: String,
54 pub node_id: NodeId,
56 pub local_node: NodeInfo,
58}
59
60impl Default for GossipDiscoveryConfig {
61 fn default() -> Self {
62 Self {
63 gossip_address: "127.0.0.1:9003".into(),
64 seed_nodes: Vec::new(),
65 gossip_interval: Duration::from_millis(500),
66 phi_threshold: 8.0,
67 dead_node_grace_period: Duration::from_secs(300),
68 cluster_id: "laminardb-default".into(),
69 node_id: NodeId(1),
70 local_node: NodeInfo {
71 id: NodeId(1),
72 name: "node-1".into(),
73 rpc_address: "127.0.0.1:9000".into(),
74 raft_address: "127.0.0.1:9001".into(),
75 state: NodeState::Active,
76 metadata: NodeMetadata::default(),
77 last_heartbeat_ms: 0,
78 },
79 }
80 }
81}
82
83pub struct GossipDiscovery {
85 config: GossipDiscoveryConfig,
86 peers: Arc<RwLock<HashMap<u64, NodeInfo>>>,
87 membership_tx: watch::Sender<Vec<NodeInfo>>,
88 membership_rx: watch::Receiver<Vec<NodeInfo>>,
89 cancel: CancellationToken,
90 started: bool,
91 chitchat_handle: Option<chitchat::ChitchatHandle>,
92}
93
94impl GossipDiscovery {
95 #[must_use]
97 pub fn new(config: GossipDiscoveryConfig) -> Self {
98 let (tx, rx) = watch::channel(Vec::new());
99 Self {
100 config,
101 peers: Arc::new(RwLock::new(HashMap::new())),
102 membership_tx: tx,
103 membership_rx: rx,
104 cancel: CancellationToken::new(),
105 started: false,
106 chitchat_handle: None,
107 }
108 }
109
110 #[must_use]
115 pub fn chitchat_handle(&self) -> Option<&chitchat::ChitchatHandle> {
116 self.chitchat_handle.as_ref()
117 }
118
119 fn parse_node_info(node_id_str: &str, kvs: &HashMap<String, String>) -> Option<NodeInfo> {
121 let id: u64 = node_id_str.strip_prefix("node-")?.parse().ok()?;
122 let rpc_address = kvs.get(keys::RPC_ADDRESS)?.clone();
123 let raft_address = kvs.get(keys::RAFT_ADDRESS).cloned().unwrap_or_default();
124 let name = kvs
125 .get(keys::NODE_NAME)
126 .cloned()
127 .unwrap_or_else(|| format!("node-{id}"));
128 let state = kvs
129 .get(keys::NODE_STATE)
130 .and_then(|s| match s.as_str() {
131 "joining" => Some(NodeState::Joining),
132 "active" => Some(NodeState::Active),
133 "suspected" => Some(NodeState::Suspected),
134 "draining" => Some(NodeState::Draining),
135 "left" => Some(NodeState::Left),
136 _ => None,
137 })
138 .unwrap_or(NodeState::Active);
139
140 let cores: u32 = kvs
141 .get(keys::LOAD_CORES)
142 .and_then(|s| s.parse().ok())
143 .unwrap_or(1);
144 let memory_bytes: u64 = kvs
145 .get(keys::LOAD_MEMORY)
146 .and_then(|s| s.parse().ok())
147 .unwrap_or(0);
148 let failure_domain = kvs.get(keys::FAILURE_DOMAIN).cloned();
149 let version = kvs.get(keys::NODE_VERSION).cloned().unwrap_or_default();
150 let owned_partitions: Vec<u32> = kvs
151 .get(keys::PARTITIONS_OWNED)
152 .map(|s| s.split(',').filter_map(|p| p.trim().parse().ok()).collect())
153 .unwrap_or_default();
154
155 Some(NodeInfo {
156 id: NodeId(id),
157 name,
158 rpc_address,
159 raft_address,
160 state,
161 metadata: NodeMetadata {
162 cores,
163 memory_bytes,
164 failure_domain,
165 tags: HashMap::new(),
166 owned_partitions,
167 version,
168 },
169 last_heartbeat_ms: chrono::Utc::now().timestamp_millis(),
170 })
171 }
172
173 fn local_kvs(info: &NodeInfo) -> Vec<(String, String)> {
175 let mut kvs = vec![
176 (keys::NODE_STATE.into(), info.state.to_string()),
177 (keys::RPC_ADDRESS.into(), info.rpc_address.clone()),
178 (keys::RAFT_ADDRESS.into(), info.raft_address.clone()),
179 (keys::NODE_NAME.into(), info.name.clone()),
180 (keys::LOAD_CORES.into(), info.metadata.cores.to_string()),
181 (
182 keys::LOAD_MEMORY.into(),
183 info.metadata.memory_bytes.to_string(),
184 ),
185 (keys::NODE_VERSION.into(), info.metadata.version.clone()),
186 ];
187 if let Some(ref fd) = info.metadata.failure_domain {
188 kvs.push((keys::FAILURE_DOMAIN.into(), fd.clone()));
189 }
190 if !info.metadata.owned_partitions.is_empty() {
191 let parts: Vec<String> = info
192 .metadata
193 .owned_partitions
194 .iter()
195 .map(ToString::to_string)
196 .collect();
197 kvs.push((keys::PARTITIONS_OWNED.into(), parts.join(",")));
198 }
199 kvs
200 }
201}
202
203impl std::fmt::Debug for GossipDiscovery {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("GossipDiscovery")
206 .field("config", &self.config)
207 .field("started", &self.started)
208 .finish_non_exhaustive()
209 }
210}
211
212impl GossipDiscovery {
213 pub async fn start_with_transport<T>(&mut self, transport: &T) -> Result<(), DiscoveryError>
228 where
229 T: chitchat::transport::Transport,
230 {
231 if self.started {
232 return Ok(());
233 }
234
235 let node_id = format!("node-{}", self.config.node_id.0);
236 let gossip_addr = self
237 .config
238 .gossip_address
239 .parse()
240 .map_err(|e: std::net::AddrParseError| DiscoveryError::Bind(e.to_string()))?;
241
242 let seed_addrs: Vec<String> = self.config.seed_nodes.clone();
243
244 let generation = std::time::SystemTime::now()
250 .duration_since(std::time::UNIX_EPOCH)
251 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
252
253 let config = chitchat::ChitchatConfig {
254 chitchat_id: chitchat::ChitchatId::new(node_id, generation, gossip_addr),
255 cluster_id: self.config.cluster_id.clone(),
256 gossip_interval: self.config.gossip_interval,
257 listen_addr: gossip_addr,
258 seed_nodes: seed_addrs,
259 failure_detector_config: chitchat::FailureDetectorConfig {
260 phi_threshold: self.config.phi_threshold,
261 initial_interval: self.config.gossip_interval,
262 dead_node_grace_period: self.config.dead_node_grace_period,
265 ..Default::default()
266 },
267 marked_for_deletion_grace_period: self.config.dead_node_grace_period,
268 extra_liveness_predicate: None,
269 catchup_callback: None,
270 };
271
272 let initial_kvs = Self::local_kvs(&self.config.local_node);
273 let chitchat_handle = chitchat::spawn_chitchat(config, initial_kvs, transport)
274 .await
275 .map_err(|e| DiscoveryError::Bind(e.to_string()))?;
276
277 self.chitchat_handle = Some(chitchat_handle);
278
279 let peers = Arc::clone(&self.peers);
281 let membership_tx = self.membership_tx.clone();
282 let cancel = self.cancel.clone();
283 let chitchat = self.chitchat_handle.as_ref().unwrap().chitchat().clone();
284 let local_node_id = self.config.node_id;
285
286 tokio::spawn(async move {
287 let mut interval = tokio::time::interval(Duration::from_millis(500));
288 loop {
289 tokio::select! {
290 () = cancel.cancelled() => break,
291 _ = interval.tick() => {
292 let chitchat_guard = chitchat.lock().await;
293 let mut new_peers: HashMap<u64, NodeInfo> = HashMap::new();
294
295 let live_ids: std::collections::HashSet<&chitchat::ChitchatId> =
298 chitchat_guard.live_nodes().collect();
299
300 for (cc_id, state) in chitchat_guard.node_states() {
302 let kvs: HashMap<String, String> = state
303 .key_values()
304 .map(|(k, v)| (k.to_string(), v.to_string()))
305 .collect();
306
307 if let Some(mut info) = Self::parse_node_info(
308 &cc_id.node_id,
309 &kvs,
310 ) {
311 if info.id == local_node_id {
312 continue;
313 }
314
315 if !live_ids.contains(cc_id) {
320 info.state = NodeState::Suspected;
321 }
322
323 match new_peers.get(&info.id.0) {
330 Some(existing)
331 if !matches!(info.state, NodeState::Active)
332 && matches!(existing.state, NodeState::Active) =>
333 {
334 }
336 _ => {
337 new_peers.insert(info.id.0, info);
338 }
339 }
340 }
341 }
342
343 let peer_list: Vec<NodeInfo> =
344 new_peers.values().cloned().collect();
345 *peers.write() = new_peers;
346 let _ = membership_tx.send(peer_list);
347 }
348 }
349 }
350 });
351
352 self.started = true;
353 Ok(())
354 }
355}
356
357impl Discovery for GossipDiscovery {
358 async fn start(&mut self) -> Result<(), DiscoveryError> {
359 self.start_with_transport(&chitchat::transport::UdpTransport)
360 .await
361 }
362
363 async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError> {
364 if !self.started {
365 return Err(DiscoveryError::NotStarted);
366 }
367 let peers = self.peers.read();
368 Ok(peers.values().cloned().collect())
369 }
370
371 async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError> {
372 if !self.started {
373 return Err(DiscoveryError::NotStarted);
374 }
375 if let Some(ref handle) = self.chitchat_handle {
376 let kvs = Self::local_kvs(&info);
377 handle
378 .with_chitchat(|chitchat| {
379 for (key, value) in &kvs {
380 chitchat.self_node_state().set(key.clone(), value.clone());
381 }
382 })
383 .await;
384 }
385 Ok(())
386 }
387
388 fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
389 self.membership_rx.clone()
390 }
391
392 async fn stop(&mut self) -> Result<(), DiscoveryError> {
393 self.cancel.cancel();
394 self.started = false;
395 if let Some(handle) = self.chitchat_handle.take() {
398 if let Err(e) = handle.shutdown().await {
399 tracing::warn!("Chitchat shutdown error: {e}");
400 }
401 }
402 self.cancel = CancellationToken::new();
404 Ok(())
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_key_namespace() {
414 assert_eq!(keys::NODE_STATE, "node:state");
415 assert_eq!(keys::RPC_ADDRESS, "node:rpc_addr");
416 }
417
418 #[test]
419 fn test_gossip_config_default() {
420 let config = GossipDiscoveryConfig::default();
421 assert_eq!(config.gossip_interval, Duration::from_millis(500));
422 assert!((config.phi_threshold - 8.0).abs() < f64::EPSILON);
423 }
424
425 #[test]
426 fn test_parse_node_info() {
427 let mut kvs = HashMap::new();
428 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
429 kvs.insert(keys::RAFT_ADDRESS.into(), "127.0.0.1:9001".into());
430 kvs.insert(keys::NODE_NAME.into(), "test-node".into());
431 kvs.insert(keys::NODE_STATE.into(), "active".into());
432 kvs.insert(keys::LOAD_CORES.into(), "4".into());
433 kvs.insert(keys::LOAD_MEMORY.into(), "8589934592".into());
434
435 let info = GossipDiscovery::parse_node_info("node-42", &kvs).unwrap();
436 assert_eq!(info.id, NodeId(42));
437 assert_eq!(info.name, "test-node");
438 assert_eq!(info.metadata.cores, 4);
439 assert_eq!(info.state, NodeState::Active);
440 }
441
442 #[test]
443 fn test_parse_node_info_invalid_id() {
444 let kvs = HashMap::new();
445 assert!(GossipDiscovery::parse_node_info("invalid", &kvs).is_none());
446 }
447
448 #[test]
449 fn test_parse_node_info_missing_rpc() {
450 let kvs = HashMap::new();
451 assert!(GossipDiscovery::parse_node_info("node-1", &kvs).is_none());
452 }
453
454 #[test]
455 fn test_local_kvs() {
456 let info = NodeInfo {
457 id: NodeId(1),
458 name: "n1".into(),
459 rpc_address: "127.0.0.1:9000".into(),
460 raft_address: "127.0.0.1:9001".into(),
461 state: NodeState::Active,
462 metadata: NodeMetadata {
463 cores: 4,
464 memory_bytes: 1024,
465 failure_domain: Some("us-east-1a".into()),
466 owned_partitions: vec![0, 1, 2],
467 ..NodeMetadata::default()
468 },
469 last_heartbeat_ms: 0,
470 };
471 let kvs = GossipDiscovery::local_kvs(&info);
472 assert!(kvs.iter().any(|(k, _)| k == keys::RPC_ADDRESS));
473 assert!(kvs.iter().any(|(k, _)| k == keys::FAILURE_DOMAIN));
474 assert!(kvs
475 .iter()
476 .any(|(k, v)| k == keys::PARTITIONS_OWNED && v == "0,1,2"));
477 }
478
479 #[test]
480 fn test_parse_owned_partitions() {
481 let mut kvs = HashMap::new();
482 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
483 kvs.insert(keys::PARTITIONS_OWNED.into(), "0,1,5,10".into());
484
485 let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
486 assert_eq!(info.metadata.owned_partitions, vec![0, 1, 5, 10]);
487 }
488
489 #[test]
490 fn test_parse_all_node_states() {
491 for (state_str, expected) in [
492 ("joining", NodeState::Joining),
493 ("active", NodeState::Active),
494 ("suspected", NodeState::Suspected),
495 ("draining", NodeState::Draining),
496 ("left", NodeState::Left),
497 ] {
498 let mut kvs = HashMap::new();
499 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
500 kvs.insert(keys::NODE_STATE.into(), state_str.into());
501
502 let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
503 assert_eq!(info.state, expected);
504 }
505 }
506
507 #[tokio::test]
508 async fn test_not_started_errors() {
509 let config = GossipDiscoveryConfig::default();
510 let disc = GossipDiscovery::new(config);
511 assert!(disc.peers().await.is_err());
512 }
513}