1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
8#[cfg(feature = "cluster")]
9use std::net::ToSocketAddrs;
10use std::sync::Arc;
11use std::time::Duration;
12
13use parking_lot::RwLock;
14use tokio::sync::watch;
15use tokio_util::sync::CancellationToken;
16
17use super::{Discovery, DiscoveryError, NodeId, NodeInfo, NodeMetadata, NodeState};
18
19pub mod keys {
21 pub const NODE_STATE: &str = "node:state";
23 pub const RPC_ADDRESS: &str = "node:rpc_addr";
25 pub const RAFT_ADDRESS: &str = "node:raft_addr";
27 pub const NODE_NAME: &str = "node:name";
29 pub const PARTITIONS_OWNED: &str = "partitions:owned";
31 pub const LOAD_CORES: &str = "load:cores";
33 pub const LOAD_MEMORY: &str = "load:memory_bytes";
35 pub const FAILURE_DOMAIN: &str = "node:failure_domain";
37 pub const NODE_VERSION: &str = "node:version";
39}
40
41#[derive(Debug, Clone)]
43pub struct GossipDiscoveryConfig {
44 pub gossip_address: String,
46 pub seed_nodes: Vec<String>,
48 pub gossip_interval: Duration,
50 pub phi_threshold: f64,
52 pub dead_node_grace_period: Duration,
54 pub cluster_id: String,
56 pub node_id: NodeId,
58 pub local_node: NodeInfo,
60 pub advertise_host: Option<String>,
62}
63
64impl Default for GossipDiscoveryConfig {
65 fn default() -> Self {
66 Self {
67 gossip_address: "127.0.0.1:9003".into(),
68 seed_nodes: Vec::new(),
69 gossip_interval: Duration::from_millis(500),
70 phi_threshold: 8.0,
71 dead_node_grace_period: Duration::from_secs(300),
72 cluster_id: "laminardb-default".into(),
73 node_id: NodeId(1),
74 local_node: NodeInfo {
75 id: NodeId(1),
76 name: "node-1".into(),
77 rpc_address: "127.0.0.1:9000".into(),
78 raft_address: "127.0.0.1:9001".into(),
79 state: NodeState::Active,
80 metadata: NodeMetadata::default(),
81 last_heartbeat_ms: 0,
82 },
83 advertise_host: None,
84 }
85 }
86}
87
88pub struct GossipDiscovery {
90 config: GossipDiscoveryConfig,
91 peers: Arc<RwLock<HashMap<u64, NodeInfo>>>,
92 membership_tx: watch::Sender<Vec<NodeInfo>>,
93 membership_rx: watch::Receiver<Vec<NodeInfo>>,
94 cancel: CancellationToken,
95 started: bool,
96 chitchat_handle: Option<chitchat::ChitchatHandle>,
97}
98
99impl GossipDiscovery {
100 #[must_use]
102 pub fn new(config: GossipDiscoveryConfig) -> Self {
103 let (tx, rx) = watch::channel(Vec::new());
104 Self {
105 config,
106 peers: Arc::new(RwLock::new(HashMap::new())),
107 membership_tx: tx,
108 membership_rx: rx,
109 cancel: CancellationToken::new(),
110 started: false,
111 chitchat_handle: None,
112 }
113 }
114
115 #[must_use]
120 pub fn chitchat_handle(&self) -> Option<&chitchat::ChitchatHandle> {
121 self.chitchat_handle.as_ref()
122 }
123
124 fn parse_node_info(node_id_str: &str, kvs: &HashMap<String, String>) -> Option<NodeInfo> {
126 let id: u64 = node_id_str.strip_prefix("node-")?.parse().ok()?;
127 let rpc_address = kvs.get(keys::RPC_ADDRESS)?.clone();
128 let raft_address = kvs.get(keys::RAFT_ADDRESS).cloned().unwrap_or_default();
129 let name = kvs
130 .get(keys::NODE_NAME)
131 .cloned()
132 .unwrap_or_else(|| format!("node-{id}"));
133 let state = kvs
134 .get(keys::NODE_STATE)
135 .and_then(|s| match s.as_str() {
136 "joining" => Some(NodeState::Joining),
137 "active" => Some(NodeState::Active),
138 "suspected" => Some(NodeState::Suspected),
139 "draining" => Some(NodeState::Draining),
140 "left" => Some(NodeState::Left),
141 _ => None,
142 })
143 .unwrap_or(NodeState::Active);
144
145 let cores: u32 = kvs
146 .get(keys::LOAD_CORES)
147 .and_then(|s| s.parse().ok())
148 .unwrap_or(1);
149 let memory_bytes: u64 = kvs
150 .get(keys::LOAD_MEMORY)
151 .and_then(|s| s.parse().ok())
152 .unwrap_or(0);
153 let failure_domain = kvs.get(keys::FAILURE_DOMAIN).cloned();
154 let version = kvs.get(keys::NODE_VERSION).cloned().unwrap_or_default();
155 let owned_partitions: Vec<u32> = kvs
156 .get(keys::PARTITIONS_OWNED)
157 .map(|s| s.split(',').filter_map(|p| p.trim().parse().ok()).collect())
158 .unwrap_or_default();
159
160 Some(NodeInfo {
161 id: NodeId(id),
162 name,
163 rpc_address,
164 raft_address,
165 state,
166 metadata: NodeMetadata {
167 cores,
168 memory_bytes,
169 failure_domain,
170 tags: HashMap::new(),
171 owned_partitions,
172 version,
173 },
174 last_heartbeat_ms: chrono::Utc::now().timestamp_millis(),
175 })
176 }
177
178 fn local_kvs(info: &NodeInfo) -> Vec<(String, String)> {
180 let mut kvs = vec![
181 (keys::NODE_STATE.into(), info.state.to_string()),
182 (keys::RPC_ADDRESS.into(), info.rpc_address.clone()),
183 (keys::RAFT_ADDRESS.into(), info.raft_address.clone()),
184 (keys::NODE_NAME.into(), info.name.clone()),
185 (keys::LOAD_CORES.into(), info.metadata.cores.to_string()),
186 (
187 keys::LOAD_MEMORY.into(),
188 info.metadata.memory_bytes.to_string(),
189 ),
190 (keys::NODE_VERSION.into(), info.metadata.version.clone()),
191 ];
192 if let Some(ref fd) = info.metadata.failure_domain {
193 kvs.push((keys::FAILURE_DOMAIN.into(), fd.clone()));
194 }
195 if !info.metadata.owned_partitions.is_empty() {
196 let parts: Vec<String> = info
197 .metadata
198 .owned_partitions
199 .iter()
200 .map(ToString::to_string)
201 .collect();
202 kvs.push((keys::PARTITIONS_OWNED.into(), parts.join(",")));
203 }
204 kvs
205 }
206}
207
208impl std::fmt::Debug for GossipDiscovery {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("GossipDiscovery")
211 .field("config", &self.config)
212 .field("started", &self.started)
213 .finish_non_exhaustive()
214 }
215}
216
217impl GossipDiscovery {
218 #[allow(clippy::too_many_lines)]
233 pub async fn start_with_transport<T>(&mut self, transport: &T) -> Result<(), DiscoveryError>
234 where
235 T: chitchat::transport::Transport,
236 {
237 if self.started {
238 return Ok(());
239 }
240
241 let node_id = format!("node-{}", self.config.node_id.0);
242 let gossip_addr: std::net::SocketAddr = self
243 .config
244 .gossip_address
245 .parse()
246 .map_err(|e: std::net::AddrParseError| DiscoveryError::Bind(e.to_string()))?;
247
248 let advertise_addr = if let Some(ref host) = self.config.advertise_host {
249 let mut resolved = None;
250 #[cfg(feature = "cluster")]
251 {
252 if let Ok(addrs) = (host.as_str(), gossip_addr.port()).to_socket_addrs() {
253 for addr in addrs {
254 if addr.ip().is_ipv4() {
255 resolved = Some(addr);
256 break;
257 }
258 }
259 }
260 }
261 if let Some(addr) = resolved {
262 addr
263 } else {
264 return Err(DiscoveryError::Bind(format!(
265 "failed to resolve configured advertise_host '{host}' (or cluster feature is disabled)"
266 )));
267 }
268 } else if gossip_addr.ip().is_unspecified() {
269 let resolved = {
270 let mut res = None;
271 #[cfg(feature = "cluster")]
272 {
273 let hostname = gethostname::gethostname();
274 let hostname_str = hostname.to_string_lossy();
275 if !hostname_str.is_empty() {
276 if let Ok(addrs) =
277 (hostname_str.as_ref(), gossip_addr.port()).to_socket_addrs()
278 {
279 for addr in addrs {
280 if addr.ip().is_ipv4() && !addr.ip().is_loopback() {
281 res = Some(addr);
282 break;
283 }
284 }
285 }
286 }
287 }
288 res
289 };
290 resolved.unwrap_or_else(|| {
291 std::net::SocketAddr::new(
292 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
293 gossip_addr.port(),
294 )
295 })
296 } else {
297 gossip_addr
298 };
299
300 let seed_addrs: Vec<String> = self.config.seed_nodes.clone();
301
302 tracing::info!(
303 "Starting gossip discovery: gossip_addr = {}, advertise_addr = {}, seeds = {:?}",
304 gossip_addr,
305 advertise_addr,
306 seed_addrs
307 );
308
309 let generation = std::time::SystemTime::now()
315 .duration_since(std::time::UNIX_EPOCH)
316 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
317
318 let config = chitchat::ChitchatConfig {
319 chitchat_id: chitchat::ChitchatId::new(node_id, generation, advertise_addr),
320 cluster_id: self.config.cluster_id.clone(),
321 gossip_interval: self.config.gossip_interval,
322 listen_addr: gossip_addr,
323 seed_nodes: seed_addrs,
324 failure_detector_config: chitchat::FailureDetectorConfig {
325 phi_threshold: self.config.phi_threshold,
326 initial_interval: self.config.gossip_interval,
327 dead_node_grace_period: self.config.dead_node_grace_period,
330 ..Default::default()
331 },
332 marked_for_deletion_grace_period: self.config.dead_node_grace_period,
333 extra_liveness_predicate: None,
334 catchup_callback: None,
335 };
336
337 let initial_kvs = Self::local_kvs(&self.config.local_node);
338 let chitchat_handle = chitchat::spawn_chitchat(config, initial_kvs, transport)
339 .await
340 .map_err(|e| DiscoveryError::Bind(e.to_string()))?;
341
342 self.chitchat_handle = Some(chitchat_handle);
343
344 let peers = Arc::clone(&self.peers);
346 let membership_tx = self.membership_tx.clone();
347 let cancel = self.cancel.clone();
348 let chitchat = self.chitchat_handle.as_ref().unwrap().chitchat().clone();
349 let local_node_id = self.config.node_id;
350
351 tokio::spawn(async move {
352 let mut interval = tokio::time::interval(Duration::from_millis(500));
353 loop {
354 tokio::select! {
355 () = cancel.cancelled() => break,
356 _ = interval.tick() => {
357 let chitchat_guard = chitchat.lock().await;
358 let mut new_peers: HashMap<u64, NodeInfo> = HashMap::new();
359
360 let live_ids: std::collections::HashSet<&chitchat::ChitchatId> =
363 chitchat_guard.live_nodes().collect();
364
365 let nodes: Vec<_> = chitchat_guard.node_states().keys().map(|id| format!("{}(live={})", id.node_id, live_ids.contains(id))).collect();
366 tracing::debug!("Chitchat state nodes: {:?}", nodes);
367
368 for (cc_id, state) in chitchat_guard.node_states() {
370 let kvs: HashMap<String, String> = state
371 .key_values()
372 .map(|(k, v)| (k.to_string(), v.to_string()))
373 .collect();
374
375 if let Some(mut info) = Self::parse_node_info(
376 &cc_id.node_id,
377 &kvs,
378 ) {
379 if info.id == local_node_id {
380 continue;
381 }
382
383 if !live_ids.contains(cc_id) {
388 info.state = NodeState::Suspected;
389 }
390
391 match new_peers.get(&info.id.0) {
398 Some(existing)
399 if !matches!(info.state, NodeState::Active)
400 && matches!(existing.state, NodeState::Active) =>
401 {
402 }
404 _ => {
405 new_peers.insert(info.id.0, info);
406 }
407 }
408 }
409 }
410
411 let peer_list: Vec<NodeInfo> =
412 new_peers.values().cloned().collect();
413 *peers.write() = new_peers;
414 let _ = membership_tx.send(peer_list);
415 }
416 }
417 }
418 });
419
420 self.started = true;
421 Ok(())
422 }
423}
424
425impl Discovery for GossipDiscovery {
426 async fn start(&mut self) -> Result<(), DiscoveryError> {
427 self.start_with_transport(&chitchat::transport::UdpTransport)
428 .await
429 }
430
431 async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError> {
432 if !self.started {
433 return Err(DiscoveryError::NotStarted);
434 }
435 let peers = self.peers.read();
436 Ok(peers.values().cloned().collect())
437 }
438
439 async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError> {
440 if !self.started {
441 return Err(DiscoveryError::NotStarted);
442 }
443 if let Some(ref handle) = self.chitchat_handle {
444 let kvs = Self::local_kvs(&info);
445 handle
446 .with_chitchat(|chitchat| {
447 for (key, value) in &kvs {
448 chitchat.self_node_state().set(key.clone(), value.clone());
449 }
450 })
451 .await;
452 }
453 Ok(())
454 }
455
456 fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
457 self.membership_rx.clone()
458 }
459
460 async fn stop(&mut self) -> Result<(), DiscoveryError> {
461 self.cancel.cancel();
462 self.started = false;
463 if let Some(handle) = self.chitchat_handle.take() {
466 if let Err(e) = handle.shutdown().await {
467 tracing::warn!("Chitchat shutdown error: {e}");
468 }
469 }
470 self.cancel = CancellationToken::new();
472 Ok(())
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 #[test]
481 fn test_key_namespace() {
482 assert_eq!(keys::NODE_STATE, "node:state");
483 assert_eq!(keys::RPC_ADDRESS, "node:rpc_addr");
484 }
485
486 #[test]
487 fn test_gossip_config_default() {
488 let config = GossipDiscoveryConfig::default();
489 assert_eq!(config.gossip_interval, Duration::from_millis(500));
490 assert!((config.phi_threshold - 8.0).abs() < f64::EPSILON);
491 }
492
493 #[test]
494 fn test_parse_node_info() {
495 let mut kvs = HashMap::new();
496 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
497 kvs.insert(keys::RAFT_ADDRESS.into(), "127.0.0.1:9001".into());
498 kvs.insert(keys::NODE_NAME.into(), "test-node".into());
499 kvs.insert(keys::NODE_STATE.into(), "active".into());
500 kvs.insert(keys::LOAD_CORES.into(), "4".into());
501 kvs.insert(keys::LOAD_MEMORY.into(), "8589934592".into());
502
503 let info = GossipDiscovery::parse_node_info("node-42", &kvs).unwrap();
504 assert_eq!(info.id, NodeId(42));
505 assert_eq!(info.name, "test-node");
506 assert_eq!(info.metadata.cores, 4);
507 assert_eq!(info.state, NodeState::Active);
508 }
509
510 #[test]
511 fn test_parse_node_info_invalid_id() {
512 let kvs = HashMap::new();
513 assert!(GossipDiscovery::parse_node_info("invalid", &kvs).is_none());
514 }
515
516 #[test]
517 fn test_parse_node_info_missing_rpc() {
518 let kvs = HashMap::new();
519 assert!(GossipDiscovery::parse_node_info("node-1", &kvs).is_none());
520 }
521
522 #[test]
523 fn test_local_kvs() {
524 let info = NodeInfo {
525 id: NodeId(1),
526 name: "n1".into(),
527 rpc_address: "127.0.0.1:9000".into(),
528 raft_address: "127.0.0.1:9001".into(),
529 state: NodeState::Active,
530 metadata: NodeMetadata {
531 cores: 4,
532 memory_bytes: 1024,
533 failure_domain: Some("us-east-1a".into()),
534 owned_partitions: vec![0, 1, 2],
535 ..NodeMetadata::default()
536 },
537 last_heartbeat_ms: 0,
538 };
539 let kvs = GossipDiscovery::local_kvs(&info);
540 assert!(kvs.iter().any(|(k, _)| k == keys::RPC_ADDRESS));
541 assert!(kvs.iter().any(|(k, _)| k == keys::FAILURE_DOMAIN));
542 assert!(kvs
543 .iter()
544 .any(|(k, v)| k == keys::PARTITIONS_OWNED && v == "0,1,2"));
545 }
546
547 #[test]
548 fn test_parse_owned_partitions() {
549 let mut kvs = HashMap::new();
550 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
551 kvs.insert(keys::PARTITIONS_OWNED.into(), "0,1,5,10".into());
552
553 let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
554 assert_eq!(info.metadata.owned_partitions, vec![0, 1, 5, 10]);
555 }
556
557 #[test]
558 fn test_parse_all_node_states() {
559 for (state_str, expected) in [
560 ("joining", NodeState::Joining),
561 ("active", NodeState::Active),
562 ("suspected", NodeState::Suspected),
563 ("draining", NodeState::Draining),
564 ("left", NodeState::Left),
565 ] {
566 let mut kvs = HashMap::new();
567 kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
568 kvs.insert(keys::NODE_STATE.into(), state_str.into());
569
570 let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
571 assert_eq!(info.state, expected);
572 }
573 }
574
575 #[tokio::test]
576 async fn test_not_started_errors() {
577 let config = GossipDiscoveryConfig::default();
578 let disc = GossipDiscovery::new(config);
579 assert!(disc.peers().await.is_err());
580 }
581}