Skip to main content

laminar_core/cluster/discovery/
gossip_discovery.rs

1//! Gossip-based discovery using chitchat.
2//!
3//! Uses the chitchat protocol (from Quickwit) for decentralized
4//! node discovery with phi-accrual failure detection.
5
6#![allow(clippy::disallowed_types)] // cold path: gossip discovery coordination
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use parking_lot::RwLock;
12use tokio::sync::watch;
13use tokio_util::sync::CancellationToken;
14
15use super::{Discovery, DiscoveryError, NodeId, NodeInfo, NodeMetadata, NodeState};
16
17/// Key namespace for chitchat key-value pairs.
18pub mod keys {
19    /// Node state key.
20    pub const NODE_STATE: &str = "node:state";
21    /// RPC address key.
22    pub const RPC_ADDRESS: &str = "node:rpc_addr";
23    /// Raft address key.
24    pub const RAFT_ADDRESS: &str = "node:raft_addr";
25    /// Node name key.
26    pub const NODE_NAME: &str = "node:name";
27    /// Owned partitions key (comma-separated list).
28    pub const PARTITIONS_OWNED: &str = "partitions:owned";
29    /// CPU core count key.
30    pub const LOAD_CORES: &str = "load:cores";
31    /// Memory bytes key.
32    pub const LOAD_MEMORY: &str = "load:memory_bytes";
33    /// Failure domain key.
34    pub const FAILURE_DOMAIN: &str = "node:failure_domain";
35    /// Version key.
36    pub const NODE_VERSION: &str = "node:version";
37}
38
39/// Configuration for gossip-based discovery.
40#[derive(Debug, Clone)]
41pub struct GossipDiscoveryConfig {
42    /// Address to bind the gossip listener.
43    pub gossip_address: String,
44    /// Seed node addresses for initial cluster bootstrap.
45    pub seed_nodes: Vec<String>,
46    /// Interval between gossip rounds.
47    pub gossip_interval: Duration,
48    /// Phi-accrual failure detector threshold.
49    pub phi_threshold: f64,
50    /// Grace period before removing dead nodes.
51    pub dead_node_grace_period: Duration,
52    /// Cluster identifier (must match across all nodes).
53    pub cluster_id: String,
54    /// This node's ID.
55    pub node_id: NodeId,
56    /// This node's info (published via chitchat keys).
57    pub local_node: NodeInfo,
58}
59
60impl Default for GossipDiscoveryConfig {
61    fn default() -> Self {
62        Self {
63            gossip_address: "127.0.0.1:9003".into(),
64            seed_nodes: Vec::new(),
65            gossip_interval: Duration::from_millis(500),
66            phi_threshold: 8.0,
67            dead_node_grace_period: Duration::from_secs(300),
68            cluster_id: "laminardb-default".into(),
69            node_id: NodeId(1),
70            local_node: NodeInfo {
71                id: NodeId(1),
72                name: "node-1".into(),
73                rpc_address: "127.0.0.1:9000".into(),
74                raft_address: "127.0.0.1:9001".into(),
75                state: NodeState::Active,
76                metadata: NodeMetadata::default(),
77                last_heartbeat_ms: 0,
78            },
79        }
80    }
81}
82
83/// Gossip-based discovery using the chitchat protocol.
84pub struct GossipDiscovery {
85    config: GossipDiscoveryConfig,
86    peers: Arc<RwLock<HashMap<u64, NodeInfo>>>,
87    membership_tx: watch::Sender<Vec<NodeInfo>>,
88    membership_rx: watch::Receiver<Vec<NodeInfo>>,
89    cancel: CancellationToken,
90    started: bool,
91    chitchat_handle: Option<chitchat::ChitchatHandle>,
92}
93
94impl GossipDiscovery {
95    /// Create a new gossip discovery instance.
96    #[must_use]
97    pub fn new(config: GossipDiscoveryConfig) -> Self {
98        let (tx, rx) = watch::channel(Vec::new());
99        Self {
100            config,
101            peers: Arc::new(RwLock::new(HashMap::new())),
102            membership_tx: tx,
103            membership_rx: rx,
104            cancel: CancellationToken::new(),
105            started: false,
106            chitchat_handle: None,
107        }
108    }
109
110    /// Borrow the underlying chitchat handle, if the discovery has
111    /// been started. Enables other cluster components (barrier
112    /// coordinator, shuffle peer registry) to share the same chitchat
113    /// instance rather than spawning their own.
114    #[must_use]
115    pub fn chitchat_handle(&self) -> Option<&chitchat::ChitchatHandle> {
116        self.chitchat_handle.as_ref()
117    }
118
119    /// Parse a `NodeInfo` from chitchat key-value pairs.
120    fn parse_node_info(node_id_str: &str, kvs: &HashMap<String, String>) -> Option<NodeInfo> {
121        let id: u64 = node_id_str.strip_prefix("node-")?.parse().ok()?;
122        let rpc_address = kvs.get(keys::RPC_ADDRESS)?.clone();
123        let raft_address = kvs.get(keys::RAFT_ADDRESS).cloned().unwrap_or_default();
124        let name = kvs
125            .get(keys::NODE_NAME)
126            .cloned()
127            .unwrap_or_else(|| format!("node-{id}"));
128        let state = kvs
129            .get(keys::NODE_STATE)
130            .and_then(|s| match s.as_str() {
131                "joining" => Some(NodeState::Joining),
132                "active" => Some(NodeState::Active),
133                "suspected" => Some(NodeState::Suspected),
134                "draining" => Some(NodeState::Draining),
135                "left" => Some(NodeState::Left),
136                _ => None,
137            })
138            .unwrap_or(NodeState::Active);
139
140        let cores: u32 = kvs
141            .get(keys::LOAD_CORES)
142            .and_then(|s| s.parse().ok())
143            .unwrap_or(1);
144        let memory_bytes: u64 = kvs
145            .get(keys::LOAD_MEMORY)
146            .and_then(|s| s.parse().ok())
147            .unwrap_or(0);
148        let failure_domain = kvs.get(keys::FAILURE_DOMAIN).cloned();
149        let version = kvs.get(keys::NODE_VERSION).cloned().unwrap_or_default();
150        let owned_partitions: Vec<u32> = kvs
151            .get(keys::PARTITIONS_OWNED)
152            .map(|s| s.split(',').filter_map(|p| p.trim().parse().ok()).collect())
153            .unwrap_or_default();
154
155        Some(NodeInfo {
156            id: NodeId(id),
157            name,
158            rpc_address,
159            raft_address,
160            state,
161            metadata: NodeMetadata {
162                cores,
163                memory_bytes,
164                failure_domain,
165                tags: HashMap::new(),
166                owned_partitions,
167                version,
168            },
169            last_heartbeat_ms: chrono::Utc::now().timestamp_millis(),
170        })
171    }
172
173    /// Build the chitchat key-value set for the local node.
174    fn local_kvs(info: &NodeInfo) -> Vec<(String, String)> {
175        let mut kvs = vec![
176            (keys::NODE_STATE.into(), info.state.to_string()),
177            (keys::RPC_ADDRESS.into(), info.rpc_address.clone()),
178            (keys::RAFT_ADDRESS.into(), info.raft_address.clone()),
179            (keys::NODE_NAME.into(), info.name.clone()),
180            (keys::LOAD_CORES.into(), info.metadata.cores.to_string()),
181            (
182                keys::LOAD_MEMORY.into(),
183                info.metadata.memory_bytes.to_string(),
184            ),
185            (keys::NODE_VERSION.into(), info.metadata.version.clone()),
186        ];
187        if let Some(ref fd) = info.metadata.failure_domain {
188            kvs.push((keys::FAILURE_DOMAIN.into(), fd.clone()));
189        }
190        if !info.metadata.owned_partitions.is_empty() {
191            let parts: Vec<String> = info
192                .metadata
193                .owned_partitions
194                .iter()
195                .map(ToString::to_string)
196                .collect();
197            kvs.push((keys::PARTITIONS_OWNED.into(), parts.join(",")));
198        }
199        kvs
200    }
201}
202
203impl std::fmt::Debug for GossipDiscovery {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("GossipDiscovery")
206            .field("config", &self.config)
207            .field("started", &self.started)
208            .finish_non_exhaustive()
209    }
210}
211
212impl GossipDiscovery {
213    /// Start with a caller-provided chitchat transport. Test harnesses
214    /// use this to inject a filtering / fault-injecting transport
215    /// wrapper (see
216    /// [`cluster::testing::PartitionableTransport`](crate::cluster::testing::PartitionableTransport)).
217    /// The regular [`Discovery::start`] just delegates here with a
218    /// default [`UdpTransport`](chitchat::transport::UdpTransport).
219    ///
220    /// # Errors
221    /// Same as [`Discovery::start`].
222    ///
223    /// # Panics
224    /// Panics via `unwrap` on an internal assertion if called twice
225    /// concurrently from the same `GossipDiscovery` — the `started`
226    /// flag check makes the second call a no-op.
227    pub async fn start_with_transport<T>(&mut self, transport: &T) -> Result<(), DiscoveryError>
228    where
229        T: chitchat::transport::Transport,
230    {
231        if self.started {
232            return Ok(());
233        }
234
235        let node_id = format!("node-{}", self.config.node_id.0);
236        let gossip_addr = self
237            .config
238            .gossip_address
239            .parse()
240            .map_err(|e: std::net::AddrParseError| DiscoveryError::Bind(e.to_string()))?;
241
242        let seed_addrs: Vec<String> = self.config.seed_nodes.clone();
243
244        // Generation: wall-clock millis. A node that was previously
245        // known (same `node_id`) rejoining under the same string
246        // needs a strictly-greater generation so chitchat supersedes
247        // the stale entry rather than treating it as the same
248        // instance.
249        let generation = std::time::SystemTime::now()
250            .duration_since(std::time::UNIX_EPOCH)
251            .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
252
253        let config = chitchat::ChitchatConfig {
254            chitchat_id: chitchat::ChitchatId::new(node_id, generation, gossip_addr),
255            cluster_id: self.config.cluster_id.clone(),
256            gossip_interval: self.config.gossip_interval,
257            listen_addr: gossip_addr,
258            seed_nodes: seed_addrs,
259            failure_detector_config: chitchat::FailureDetectorConfig {
260                phi_threshold: self.config.phi_threshold,
261                initial_interval: self.config.gossip_interval,
262                // Map dead_node_grace_period to the failure detector's GC
263                // timer (W6 fix). Default is 24h which is far too long.
264                dead_node_grace_period: self.config.dead_node_grace_period,
265                ..Default::default()
266            },
267            marked_for_deletion_grace_period: self.config.dead_node_grace_period,
268            extra_liveness_predicate: None,
269            catchup_callback: None,
270        };
271
272        let initial_kvs = Self::local_kvs(&self.config.local_node);
273        let chitchat_handle = chitchat::spawn_chitchat(config, initial_kvs, transport)
274            .await
275            .map_err(|e| DiscoveryError::Bind(e.to_string()))?;
276
277        self.chitchat_handle = Some(chitchat_handle);
278
279        // Spawn membership watcher
280        let peers = Arc::clone(&self.peers);
281        let membership_tx = self.membership_tx.clone();
282        let cancel = self.cancel.clone();
283        let chitchat = self.chitchat_handle.as_ref().unwrap().chitchat().clone();
284        let local_node_id = self.config.node_id;
285
286        tokio::spawn(async move {
287            let mut interval = tokio::time::interval(Duration::from_millis(500));
288            loop {
289                tokio::select! {
290                    () = cancel.cancelled() => break,
291                    _ = interval.tick() => {
292                        let chitchat_guard = chitchat.lock().await;
293                        let mut new_peers: HashMap<u64, NodeInfo> = HashMap::new();
294
295                        // Collect the set of live node IDs from the failure
296                        // detector so we only include reachable peers (C3 fix).
297                        let live_ids: std::collections::HashSet<&chitchat::ChitchatId> =
298                            chitchat_guard.live_nodes().collect();
299
300                        // Iterate all known nodes, tagging dead ones
301                        for (cc_id, state) in chitchat_guard.node_states() {
302                            let kvs: HashMap<String, String> = state
303                                .key_values()
304                                .map(|(k, v)| (k.to_string(), v.to_string()))
305                                .collect();
306
307                            if let Some(mut info) = Self::parse_node_info(
308                                &cc_id.node_id,
309                                &kvs,
310                            ) {
311                                if info.id == local_node_id {
312                                    continue;
313                                }
314
315                                // Override self-reported state with failure
316                                // detector opinion: if chitchat considers this
317                                // node dead, mark it as Suspected/Left rather
318                                // than trusting the gossip KV (C3 fix).
319                                if !live_ids.contains(cc_id) {
320                                    info.state = NodeState::Suspected;
321                                }
322
323                                // A node id can appear twice across a
324                                // rejoin — once as the old entry
325                                // (Suspected or Left) and once as the
326                                // freshly-started entry (Active). Prefer
327                                // the Active record so `current_leader()`
328                                // sees the rejoined instance.
329                                match new_peers.get(&info.id.0) {
330                                    Some(existing)
331                                        if !matches!(info.state, NodeState::Active)
332                                            && matches!(existing.state, NodeState::Active) =>
333                                    {
334                                        // Keep the already-recorded Active entry.
335                                    }
336                                    _ => {
337                                        new_peers.insert(info.id.0, info);
338                                    }
339                                }
340                            }
341                        }
342
343                        let peer_list: Vec<NodeInfo> =
344                            new_peers.values().cloned().collect();
345                        *peers.write() = new_peers;
346                        let _ = membership_tx.send(peer_list);
347                    }
348                }
349            }
350        });
351
352        self.started = true;
353        Ok(())
354    }
355}
356
357impl Discovery for GossipDiscovery {
358    async fn start(&mut self) -> Result<(), DiscoveryError> {
359        self.start_with_transport(&chitchat::transport::UdpTransport)
360            .await
361    }
362
363    async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError> {
364        if !self.started {
365            return Err(DiscoveryError::NotStarted);
366        }
367        let peers = self.peers.read();
368        Ok(peers.values().cloned().collect())
369    }
370
371    async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError> {
372        if !self.started {
373            return Err(DiscoveryError::NotStarted);
374        }
375        if let Some(ref handle) = self.chitchat_handle {
376            let kvs = Self::local_kvs(&info);
377            handle
378                .with_chitchat(|chitchat| {
379                    for (key, value) in &kvs {
380                        chitchat.self_node_state().set(key.clone(), value.clone());
381                    }
382                })
383                .await;
384        }
385        Ok(())
386    }
387
388    fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
389        self.membership_rx.clone()
390    }
391
392    async fn stop(&mut self) -> Result<(), DiscoveryError> {
393        self.cancel.cancel();
394        self.started = false;
395        // Properly shut down chitchat (W8 fix): send shutdown command
396        // and wait for the background task to exit, releasing the UDP socket.
397        if let Some(handle) = self.chitchat_handle.take() {
398            if let Err(e) = handle.shutdown().await {
399                tracing::warn!("Chitchat shutdown error: {e}");
400            }
401        }
402        // Fresh token so restart after stop works
403        self.cancel = CancellationToken::new();
404        Ok(())
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_key_namespace() {
414        assert_eq!(keys::NODE_STATE, "node:state");
415        assert_eq!(keys::RPC_ADDRESS, "node:rpc_addr");
416    }
417
418    #[test]
419    fn test_gossip_config_default() {
420        let config = GossipDiscoveryConfig::default();
421        assert_eq!(config.gossip_interval, Duration::from_millis(500));
422        assert!((config.phi_threshold - 8.0).abs() < f64::EPSILON);
423    }
424
425    #[test]
426    fn test_parse_node_info() {
427        let mut kvs = HashMap::new();
428        kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
429        kvs.insert(keys::RAFT_ADDRESS.into(), "127.0.0.1:9001".into());
430        kvs.insert(keys::NODE_NAME.into(), "test-node".into());
431        kvs.insert(keys::NODE_STATE.into(), "active".into());
432        kvs.insert(keys::LOAD_CORES.into(), "4".into());
433        kvs.insert(keys::LOAD_MEMORY.into(), "8589934592".into());
434
435        let info = GossipDiscovery::parse_node_info("node-42", &kvs).unwrap();
436        assert_eq!(info.id, NodeId(42));
437        assert_eq!(info.name, "test-node");
438        assert_eq!(info.metadata.cores, 4);
439        assert_eq!(info.state, NodeState::Active);
440    }
441
442    #[test]
443    fn test_parse_node_info_invalid_id() {
444        let kvs = HashMap::new();
445        assert!(GossipDiscovery::parse_node_info("invalid", &kvs).is_none());
446    }
447
448    #[test]
449    fn test_parse_node_info_missing_rpc() {
450        let kvs = HashMap::new();
451        assert!(GossipDiscovery::parse_node_info("node-1", &kvs).is_none());
452    }
453
454    #[test]
455    fn test_local_kvs() {
456        let info = NodeInfo {
457            id: NodeId(1),
458            name: "n1".into(),
459            rpc_address: "127.0.0.1:9000".into(),
460            raft_address: "127.0.0.1:9001".into(),
461            state: NodeState::Active,
462            metadata: NodeMetadata {
463                cores: 4,
464                memory_bytes: 1024,
465                failure_domain: Some("us-east-1a".into()),
466                owned_partitions: vec![0, 1, 2],
467                ..NodeMetadata::default()
468            },
469            last_heartbeat_ms: 0,
470        };
471        let kvs = GossipDiscovery::local_kvs(&info);
472        assert!(kvs.iter().any(|(k, _)| k == keys::RPC_ADDRESS));
473        assert!(kvs.iter().any(|(k, _)| k == keys::FAILURE_DOMAIN));
474        assert!(kvs
475            .iter()
476            .any(|(k, v)| k == keys::PARTITIONS_OWNED && v == "0,1,2"));
477    }
478
479    #[test]
480    fn test_parse_owned_partitions() {
481        let mut kvs = HashMap::new();
482        kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
483        kvs.insert(keys::PARTITIONS_OWNED.into(), "0,1,5,10".into());
484
485        let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
486        assert_eq!(info.metadata.owned_partitions, vec![0, 1, 5, 10]);
487    }
488
489    #[test]
490    fn test_parse_all_node_states() {
491        for (state_str, expected) in [
492            ("joining", NodeState::Joining),
493            ("active", NodeState::Active),
494            ("suspected", NodeState::Suspected),
495            ("draining", NodeState::Draining),
496            ("left", NodeState::Left),
497        ] {
498            let mut kvs = HashMap::new();
499            kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
500            kvs.insert(keys::NODE_STATE.into(), state_str.into());
501
502            let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
503            assert_eq!(info.state, expected);
504        }
505    }
506
507    #[tokio::test]
508    async fn test_not_started_errors() {
509        let config = GossipDiscoveryConfig::default();
510        let disc = GossipDiscovery::new(config);
511        assert!(disc.peers().await.is_err());
512    }
513}