Skip to main content

laminar_core/delta/discovery/
gossip_discovery.rs

1//! Gossip-based discovery using chitchat.
2//!
3//! Uses the chitchat protocol (from Quickwit) for decentralized
4//! node discovery with phi-accrual failure detection.
5
6#![allow(clippy::disallowed_types)] // cold path: gossip discovery coordination
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use parking_lot::RwLock;
12use tokio::sync::watch;
13use tokio_util::sync::CancellationToken;
14
15use super::{Discovery, DiscoveryError, NodeId, NodeInfo, NodeMetadata, NodeState};
16
17/// Key namespace for chitchat key-value pairs.
18pub mod keys {
19    /// Node state key.
20    pub const NODE_STATE: &str = "node:state";
21    /// RPC address key.
22    pub const RPC_ADDRESS: &str = "node:rpc_addr";
23    /// Raft address key.
24    pub const RAFT_ADDRESS: &str = "node:raft_addr";
25    /// Node name key.
26    pub const NODE_NAME: &str = "node:name";
27    /// Owned partitions key (comma-separated list).
28    pub const PARTITIONS_OWNED: &str = "partitions:owned";
29    /// CPU core count key.
30    pub const LOAD_CORES: &str = "load:cores";
31    /// Memory bytes key.
32    pub const LOAD_MEMORY: &str = "load:memory_bytes";
33    /// Failure domain key.
34    pub const FAILURE_DOMAIN: &str = "node:failure_domain";
35    /// Version key.
36    pub const NODE_VERSION: &str = "node:version";
37}
38
39/// Configuration for gossip-based discovery.
40#[derive(Debug, Clone)]
41pub struct GossipDiscoveryConfig {
42    /// Address to bind the gossip listener.
43    pub gossip_address: String,
44    /// Seed node addresses for initial cluster bootstrap.
45    pub seed_nodes: Vec<String>,
46    /// Interval between gossip rounds.
47    pub gossip_interval: Duration,
48    /// Phi-accrual failure detector threshold.
49    pub phi_threshold: f64,
50    /// Grace period before removing dead nodes.
51    pub dead_node_grace_period: Duration,
52    /// Cluster identifier (must match across all nodes).
53    pub cluster_id: String,
54    /// This node's ID.
55    pub node_id: NodeId,
56    /// This node's info (published via chitchat keys).
57    pub local_node: NodeInfo,
58}
59
60impl Default for GossipDiscoveryConfig {
61    fn default() -> Self {
62        Self {
63            gossip_address: "127.0.0.1:9003".into(),
64            seed_nodes: Vec::new(),
65            gossip_interval: Duration::from_millis(500),
66            phi_threshold: 8.0,
67            dead_node_grace_period: Duration::from_secs(300),
68            cluster_id: "laminardb-default".into(),
69            node_id: NodeId(1),
70            local_node: NodeInfo {
71                id: NodeId(1),
72                name: "node-1".into(),
73                rpc_address: "127.0.0.1:9000".into(),
74                raft_address: "127.0.0.1:9001".into(),
75                state: NodeState::Active,
76                metadata: NodeMetadata::default(),
77                last_heartbeat_ms: 0,
78            },
79        }
80    }
81}
82
83/// Gossip-based discovery using the chitchat protocol.
84pub struct GossipDiscovery {
85    config: GossipDiscoveryConfig,
86    peers: Arc<RwLock<HashMap<u64, NodeInfo>>>,
87    membership_tx: watch::Sender<Vec<NodeInfo>>,
88    membership_rx: watch::Receiver<Vec<NodeInfo>>,
89    cancel: CancellationToken,
90    started: bool,
91    chitchat_handle: Option<chitchat::ChitchatHandle>,
92}
93
94impl GossipDiscovery {
95    /// Create a new gossip discovery instance.
96    #[must_use]
97    pub fn new(config: GossipDiscoveryConfig) -> Self {
98        let (tx, rx) = watch::channel(Vec::new());
99        Self {
100            config,
101            peers: Arc::new(RwLock::new(HashMap::new())),
102            membership_tx: tx,
103            membership_rx: rx,
104            cancel: CancellationToken::new(),
105            started: false,
106            chitchat_handle: None,
107        }
108    }
109
110    /// Parse a `NodeInfo` from chitchat key-value pairs.
111    fn parse_node_info(node_id_str: &str, kvs: &HashMap<String, String>) -> Option<NodeInfo> {
112        let id: u64 = node_id_str.strip_prefix("node-")?.parse().ok()?;
113        let rpc_address = kvs.get(keys::RPC_ADDRESS)?.clone();
114        let raft_address = kvs.get(keys::RAFT_ADDRESS).cloned().unwrap_or_default();
115        let name = kvs
116            .get(keys::NODE_NAME)
117            .cloned()
118            .unwrap_or_else(|| format!("node-{id}"));
119        let state = kvs
120            .get(keys::NODE_STATE)
121            .and_then(|s| match s.as_str() {
122                "joining" => Some(NodeState::Joining),
123                "active" => Some(NodeState::Active),
124                "suspected" => Some(NodeState::Suspected),
125                "draining" => Some(NodeState::Draining),
126                "left" => Some(NodeState::Left),
127                _ => None,
128            })
129            .unwrap_or(NodeState::Active);
130
131        let cores: u32 = kvs
132            .get(keys::LOAD_CORES)
133            .and_then(|s| s.parse().ok())
134            .unwrap_or(1);
135        let memory_bytes: u64 = kvs
136            .get(keys::LOAD_MEMORY)
137            .and_then(|s| s.parse().ok())
138            .unwrap_or(0);
139        let failure_domain = kvs.get(keys::FAILURE_DOMAIN).cloned();
140        let version = kvs.get(keys::NODE_VERSION).cloned().unwrap_or_default();
141        let owned_partitions: Vec<u32> = kvs
142            .get(keys::PARTITIONS_OWNED)
143            .map(|s| s.split(',').filter_map(|p| p.trim().parse().ok()).collect())
144            .unwrap_or_default();
145
146        Some(NodeInfo {
147            id: NodeId(id),
148            name,
149            rpc_address,
150            raft_address,
151            state,
152            metadata: NodeMetadata {
153                cores,
154                memory_bytes,
155                failure_domain,
156                tags: HashMap::new(),
157                owned_partitions,
158                version,
159            },
160            last_heartbeat_ms: chrono::Utc::now().timestamp_millis(),
161        })
162    }
163
164    /// Build the chitchat key-value set for the local node.
165    fn local_kvs(info: &NodeInfo) -> Vec<(String, String)> {
166        let mut kvs = vec![
167            (keys::NODE_STATE.into(), info.state.to_string()),
168            (keys::RPC_ADDRESS.into(), info.rpc_address.clone()),
169            (keys::RAFT_ADDRESS.into(), info.raft_address.clone()),
170            (keys::NODE_NAME.into(), info.name.clone()),
171            (keys::LOAD_CORES.into(), info.metadata.cores.to_string()),
172            (
173                keys::LOAD_MEMORY.into(),
174                info.metadata.memory_bytes.to_string(),
175            ),
176            (keys::NODE_VERSION.into(), info.metadata.version.clone()),
177        ];
178        if let Some(ref fd) = info.metadata.failure_domain {
179            kvs.push((keys::FAILURE_DOMAIN.into(), fd.clone()));
180        }
181        if !info.metadata.owned_partitions.is_empty() {
182            let parts: Vec<String> = info
183                .metadata
184                .owned_partitions
185                .iter()
186                .map(ToString::to_string)
187                .collect();
188            kvs.push((keys::PARTITIONS_OWNED.into(), parts.join(",")));
189        }
190        kvs
191    }
192}
193
194impl std::fmt::Debug for GossipDiscovery {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        f.debug_struct("GossipDiscovery")
197            .field("config", &self.config)
198            .field("started", &self.started)
199            .finish_non_exhaustive()
200    }
201}
202
203impl Discovery for GossipDiscovery {
204    async fn start(&mut self) -> Result<(), DiscoveryError> {
205        if self.started {
206            return Ok(());
207        }
208
209        let node_id = format!("node-{}", self.config.node_id.0);
210        let gossip_addr = self
211            .config
212            .gossip_address
213            .parse()
214            .map_err(|e: std::net::AddrParseError| DiscoveryError::Bind(e.to_string()))?;
215
216        let seed_addrs: Vec<String> = self.config.seed_nodes.clone();
217
218        let config = chitchat::ChitchatConfig {
219            chitchat_id: chitchat::ChitchatId::new(
220                node_id,
221                0, // generation
222                gossip_addr,
223            ),
224            cluster_id: self.config.cluster_id.clone(),
225            gossip_interval: self.config.gossip_interval,
226            listen_addr: gossip_addr,
227            seed_nodes: seed_addrs,
228            failure_detector_config: chitchat::FailureDetectorConfig {
229                phi_threshold: self.config.phi_threshold,
230                initial_interval: self.config.gossip_interval,
231                // Map dead_node_grace_period to the failure detector's GC
232                // timer (W6 fix). Default is 24h which is far too long.
233                dead_node_grace_period: self.config.dead_node_grace_period,
234                ..Default::default()
235            },
236            marked_for_deletion_grace_period: self.config.dead_node_grace_period,
237            extra_liveness_predicate: None,
238            catchup_callback: None,
239        };
240
241        let initial_kvs = Self::local_kvs(&self.config.local_node);
242        let transport = chitchat::transport::UdpTransport;
243        let chitchat_handle = chitchat::spawn_chitchat(config, initial_kvs, &transport)
244            .await
245            .map_err(|e| DiscoveryError::Bind(e.to_string()))?;
246
247        self.chitchat_handle = Some(chitchat_handle);
248
249        // Spawn membership watcher
250        let peers = Arc::clone(&self.peers);
251        let membership_tx = self.membership_tx.clone();
252        let cancel = self.cancel.clone();
253        let chitchat = self.chitchat_handle.as_ref().unwrap().chitchat().clone();
254        let local_node_id = self.config.node_id;
255
256        tokio::spawn(async move {
257            let mut interval = tokio::time::interval(Duration::from_millis(500));
258            loop {
259                tokio::select! {
260                    () = cancel.cancelled() => break,
261                    _ = interval.tick() => {
262                        let chitchat_guard = chitchat.lock().await;
263                        let mut new_peers = HashMap::new();
264
265                        // Collect the set of live node IDs from the failure
266                        // detector so we only include reachable peers (C3 fix).
267                        let live_ids: std::collections::HashSet<&chitchat::ChitchatId> =
268                            chitchat_guard.live_nodes().collect();
269
270                        // Iterate all known nodes, tagging dead ones
271                        for (cc_id, state) in chitchat_guard.node_states() {
272                            let kvs: HashMap<String, String> = state
273                                .key_values()
274                                .map(|(k, v)| (k.to_string(), v.to_string()))
275                                .collect();
276
277                            if let Some(mut info) = Self::parse_node_info(
278                                &cc_id.node_id,
279                                &kvs,
280                            ) {
281                                if info.id == local_node_id {
282                                    continue;
283                                }
284
285                                // Override self-reported state with failure
286                                // detector opinion: if chitchat considers this
287                                // node dead, mark it as Suspected/Left rather
288                                // than trusting the gossip KV (C3 fix).
289                                if !live_ids.contains(cc_id) {
290                                    info.state = NodeState::Suspected;
291                                }
292
293                                new_peers.insert(info.id.0, info);
294                            }
295                        }
296
297                        let peer_list: Vec<NodeInfo> =
298                            new_peers.values().cloned().collect();
299                        *peers.write() = new_peers;
300                        let _ = membership_tx.send(peer_list);
301                    }
302                }
303            }
304        });
305
306        self.started = true;
307        Ok(())
308    }
309
310    async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError> {
311        if !self.started {
312            return Err(DiscoveryError::NotStarted);
313        }
314        let peers = self.peers.read();
315        Ok(peers.values().cloned().collect())
316    }
317
318    async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError> {
319        if !self.started {
320            return Err(DiscoveryError::NotStarted);
321        }
322        if let Some(ref handle) = self.chitchat_handle {
323            let kvs = Self::local_kvs(&info);
324            handle
325                .with_chitchat(|chitchat| {
326                    for (key, value) in &kvs {
327                        chitchat.self_node_state().set(key.clone(), value.clone());
328                    }
329                })
330                .await;
331        }
332        Ok(())
333    }
334
335    fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
336        self.membership_rx.clone()
337    }
338
339    async fn stop(&mut self) -> Result<(), DiscoveryError> {
340        self.cancel.cancel();
341        self.started = false;
342        // Properly shut down chitchat (W8 fix): send shutdown command
343        // and wait for the background task to exit, releasing the UDP socket.
344        if let Some(handle) = self.chitchat_handle.take() {
345            if let Err(e) = handle.shutdown().await {
346                tracing::warn!("Chitchat shutdown error: {e}");
347            }
348        }
349        // Fresh token so restart after stop works
350        self.cancel = CancellationToken::new();
351        Ok(())
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_key_namespace() {
361        assert_eq!(keys::NODE_STATE, "node:state");
362        assert_eq!(keys::RPC_ADDRESS, "node:rpc_addr");
363    }
364
365    #[test]
366    fn test_gossip_config_default() {
367        let config = GossipDiscoveryConfig::default();
368        assert_eq!(config.gossip_interval, Duration::from_millis(500));
369        assert!((config.phi_threshold - 8.0).abs() < f64::EPSILON);
370    }
371
372    #[test]
373    fn test_parse_node_info() {
374        let mut kvs = HashMap::new();
375        kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
376        kvs.insert(keys::RAFT_ADDRESS.into(), "127.0.0.1:9001".into());
377        kvs.insert(keys::NODE_NAME.into(), "test-node".into());
378        kvs.insert(keys::NODE_STATE.into(), "active".into());
379        kvs.insert(keys::LOAD_CORES.into(), "4".into());
380        kvs.insert(keys::LOAD_MEMORY.into(), "8589934592".into());
381
382        let info = GossipDiscovery::parse_node_info("node-42", &kvs).unwrap();
383        assert_eq!(info.id, NodeId(42));
384        assert_eq!(info.name, "test-node");
385        assert_eq!(info.metadata.cores, 4);
386        assert_eq!(info.state, NodeState::Active);
387    }
388
389    #[test]
390    fn test_parse_node_info_invalid_id() {
391        let kvs = HashMap::new();
392        assert!(GossipDiscovery::parse_node_info("invalid", &kvs).is_none());
393    }
394
395    #[test]
396    fn test_parse_node_info_missing_rpc() {
397        let kvs = HashMap::new();
398        assert!(GossipDiscovery::parse_node_info("node-1", &kvs).is_none());
399    }
400
401    #[test]
402    fn test_local_kvs() {
403        let info = NodeInfo {
404            id: NodeId(1),
405            name: "n1".into(),
406            rpc_address: "127.0.0.1:9000".into(),
407            raft_address: "127.0.0.1:9001".into(),
408            state: NodeState::Active,
409            metadata: NodeMetadata {
410                cores: 4,
411                memory_bytes: 1024,
412                failure_domain: Some("us-east-1a".into()),
413                owned_partitions: vec![0, 1, 2],
414                ..NodeMetadata::default()
415            },
416            last_heartbeat_ms: 0,
417        };
418        let kvs = GossipDiscovery::local_kvs(&info);
419        assert!(kvs.iter().any(|(k, _)| k == keys::RPC_ADDRESS));
420        assert!(kvs.iter().any(|(k, _)| k == keys::FAILURE_DOMAIN));
421        assert!(kvs
422            .iter()
423            .any(|(k, v)| k == keys::PARTITIONS_OWNED && v == "0,1,2"));
424    }
425
426    #[test]
427    fn test_parse_owned_partitions() {
428        let mut kvs = HashMap::new();
429        kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
430        kvs.insert(keys::PARTITIONS_OWNED.into(), "0,1,5,10".into());
431
432        let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
433        assert_eq!(info.metadata.owned_partitions, vec![0, 1, 5, 10]);
434    }
435
436    #[test]
437    fn test_parse_all_node_states() {
438        for (state_str, expected) in [
439            ("joining", NodeState::Joining),
440            ("active", NodeState::Active),
441            ("suspected", NodeState::Suspected),
442            ("draining", NodeState::Draining),
443            ("left", NodeState::Left),
444        ] {
445            let mut kvs = HashMap::new();
446            kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
447            kvs.insert(keys::NODE_STATE.into(), state_str.into());
448
449            let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
450            assert_eq!(info.state, expected);
451        }
452    }
453
454    #[tokio::test]
455    async fn test_not_started_errors() {
456        let config = GossipDiscoveryConfig::default();
457        let disc = GossipDiscovery::new(config);
458        assert!(disc.peers().await.is_err());
459    }
460}