Skip to main content

laminar_core/cluster/discovery/
gossip_discovery.rs

1//! Gossip-based discovery using chitchat.
2//!
3//! Uses the chitchat protocol (from Quickwit) for decentralized
4//! node discovery with phi-accrual failure detection.
5
6#![allow(clippy::disallowed_types)] // cold path: gossip discovery coordination
7use std::collections::HashMap;
8#[cfg(feature = "cluster")]
9use std::net::ToSocketAddrs;
10use std::sync::Arc;
11use std::time::Duration;
12
13use parking_lot::RwLock;
14use tokio::sync::watch;
15use tokio_util::sync::CancellationToken;
16
17use super::{Discovery, DiscoveryError, NodeId, NodeInfo, NodeMetadata, NodeState};
18
19/// Key namespace for chitchat key-value pairs.
20pub mod keys {
21    /// Node state key.
22    pub const NODE_STATE: &str = "node:state";
23    /// RPC address key.
24    pub const RPC_ADDRESS: &str = "node:rpc_addr";
25    /// Raft address key.
26    pub const RAFT_ADDRESS: &str = "node:raft_addr";
27    /// Node name key.
28    pub const NODE_NAME: &str = "node:name";
29    /// Owned partitions key (comma-separated list).
30    pub const PARTITIONS_OWNED: &str = "partitions:owned";
31    /// CPU core count key.
32    pub const LOAD_CORES: &str = "load:cores";
33    /// Memory bytes key.
34    pub const LOAD_MEMORY: &str = "load:memory_bytes";
35    /// Failure domain key.
36    pub const FAILURE_DOMAIN: &str = "node:failure_domain";
37    /// Version key.
38    pub const NODE_VERSION: &str = "node:version";
39}
40
41/// Configuration for gossip-based discovery.
42#[derive(Debug, Clone)]
43pub struct GossipDiscoveryConfig {
44    /// Address to bind the gossip listener.
45    pub gossip_address: String,
46    /// Seed node addresses for initial cluster bootstrap.
47    pub seed_nodes: Vec<String>,
48    /// Interval between gossip rounds.
49    pub gossip_interval: Duration,
50    /// Phi-accrual failure detector threshold.
51    pub phi_threshold: f64,
52    /// Grace period before removing dead nodes.
53    pub dead_node_grace_period: Duration,
54    /// Cluster identifier (must match across all nodes).
55    pub cluster_id: String,
56    /// This node's ID.
57    pub node_id: NodeId,
58    /// This node's info (published via chitchat keys).
59    pub local_node: NodeInfo,
60    /// Optional hostname or IP to advertise.
61    pub advertise_host: Option<String>,
62}
63
64impl Default for GossipDiscoveryConfig {
65    fn default() -> Self {
66        Self {
67            gossip_address: "127.0.0.1:9003".into(),
68            seed_nodes: Vec::new(),
69            gossip_interval: Duration::from_millis(500),
70            phi_threshold: 8.0,
71            dead_node_grace_period: Duration::from_secs(300),
72            cluster_id: "laminardb-default".into(),
73            node_id: NodeId(1),
74            local_node: NodeInfo {
75                id: NodeId(1),
76                name: "node-1".into(),
77                rpc_address: "127.0.0.1:9000".into(),
78                raft_address: "127.0.0.1:9001".into(),
79                state: NodeState::Active,
80                metadata: NodeMetadata::default(),
81                last_heartbeat_ms: 0,
82            },
83            advertise_host: None,
84        }
85    }
86}
87
88/// Gossip-based discovery using the chitchat protocol.
89pub struct GossipDiscovery {
90    config: GossipDiscoveryConfig,
91    peers: Arc<RwLock<HashMap<u64, NodeInfo>>>,
92    membership_tx: watch::Sender<Vec<NodeInfo>>,
93    membership_rx: watch::Receiver<Vec<NodeInfo>>,
94    cancel: CancellationToken,
95    started: bool,
96    chitchat_handle: Option<chitchat::ChitchatHandle>,
97}
98
99impl GossipDiscovery {
100    /// Create a new gossip discovery instance.
101    #[must_use]
102    pub fn new(config: GossipDiscoveryConfig) -> Self {
103        let (tx, rx) = watch::channel(Vec::new());
104        Self {
105            config,
106            peers: Arc::new(RwLock::new(HashMap::new())),
107            membership_tx: tx,
108            membership_rx: rx,
109            cancel: CancellationToken::new(),
110            started: false,
111            chitchat_handle: None,
112        }
113    }
114
115    /// Borrow the underlying chitchat handle, if the discovery has
116    /// been started. Enables other cluster components (barrier
117    /// coordinator, shuffle peer registry) to share the same chitchat
118    /// instance rather than spawning their own.
119    #[must_use]
120    pub fn chitchat_handle(&self) -> Option<&chitchat::ChitchatHandle> {
121        self.chitchat_handle.as_ref()
122    }
123
124    /// Parse a `NodeInfo` from chitchat key-value pairs.
125    fn parse_node_info(node_id_str: &str, kvs: &HashMap<String, String>) -> Option<NodeInfo> {
126        let id: u64 = node_id_str.strip_prefix("node-")?.parse().ok()?;
127        let rpc_address = kvs.get(keys::RPC_ADDRESS)?.clone();
128        let raft_address = kvs.get(keys::RAFT_ADDRESS).cloned().unwrap_or_default();
129        let name = kvs
130            .get(keys::NODE_NAME)
131            .cloned()
132            .unwrap_or_else(|| format!("node-{id}"));
133        let state = kvs
134            .get(keys::NODE_STATE)
135            .and_then(|s| match s.as_str() {
136                "joining" => Some(NodeState::Joining),
137                "active" => Some(NodeState::Active),
138                "suspected" => Some(NodeState::Suspected),
139                "draining" => Some(NodeState::Draining),
140                "left" => Some(NodeState::Left),
141                _ => None,
142            })
143            .unwrap_or(NodeState::Active);
144
145        let cores: u32 = kvs
146            .get(keys::LOAD_CORES)
147            .and_then(|s| s.parse().ok())
148            .unwrap_or(1);
149        let memory_bytes: u64 = kvs
150            .get(keys::LOAD_MEMORY)
151            .and_then(|s| s.parse().ok())
152            .unwrap_or(0);
153        let failure_domain = kvs.get(keys::FAILURE_DOMAIN).cloned();
154        let version = kvs.get(keys::NODE_VERSION).cloned().unwrap_or_default();
155        let owned_partitions: Vec<u32> = kvs
156            .get(keys::PARTITIONS_OWNED)
157            .map(|s| s.split(',').filter_map(|p| p.trim().parse().ok()).collect())
158            .unwrap_or_default();
159
160        Some(NodeInfo {
161            id: NodeId(id),
162            name,
163            rpc_address,
164            raft_address,
165            state,
166            metadata: NodeMetadata {
167                cores,
168                memory_bytes,
169                failure_domain,
170                tags: HashMap::new(),
171                owned_partitions,
172                version,
173            },
174            last_heartbeat_ms: chrono::Utc::now().timestamp_millis(),
175        })
176    }
177
178    /// Build the chitchat key-value set for the local node.
179    fn local_kvs(info: &NodeInfo) -> Vec<(String, String)> {
180        let mut kvs = vec![
181            (keys::NODE_STATE.into(), info.state.to_string()),
182            (keys::RPC_ADDRESS.into(), info.rpc_address.clone()),
183            (keys::RAFT_ADDRESS.into(), info.raft_address.clone()),
184            (keys::NODE_NAME.into(), info.name.clone()),
185            (keys::LOAD_CORES.into(), info.metadata.cores.to_string()),
186            (
187                keys::LOAD_MEMORY.into(),
188                info.metadata.memory_bytes.to_string(),
189            ),
190            (keys::NODE_VERSION.into(), info.metadata.version.clone()),
191        ];
192        if let Some(ref fd) = info.metadata.failure_domain {
193            kvs.push((keys::FAILURE_DOMAIN.into(), fd.clone()));
194        }
195        if !info.metadata.owned_partitions.is_empty() {
196            let parts: Vec<String> = info
197                .metadata
198                .owned_partitions
199                .iter()
200                .map(ToString::to_string)
201                .collect();
202            kvs.push((keys::PARTITIONS_OWNED.into(), parts.join(",")));
203        }
204        kvs
205    }
206}
207
208impl std::fmt::Debug for GossipDiscovery {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("GossipDiscovery")
211            .field("config", &self.config)
212            .field("started", &self.started)
213            .finish_non_exhaustive()
214    }
215}
216
217impl GossipDiscovery {
218    /// Start with a caller-provided chitchat transport. Test harnesses
219    /// use this to inject a filtering / fault-injecting transport
220    /// wrapper (see
221    /// [`cluster::testing::PartitionableTransport`](crate::cluster::testing::PartitionableTransport)).
222    /// The regular [`Discovery::start`] just delegates here with a
223    /// default [`UdpTransport`](chitchat::transport::UdpTransport).
224    ///
225    /// # Errors
226    /// Same as [`Discovery::start`].
227    ///
228    /// # Panics
229    /// Panics via `unwrap` on an internal assertion if called twice
230    /// concurrently from the same `GossipDiscovery` — the `started`
231    /// flag check makes the second call a no-op.
232    #[allow(clippy::too_many_lines)]
233    pub async fn start_with_transport<T>(&mut self, transport: &T) -> Result<(), DiscoveryError>
234    where
235        T: chitchat::transport::Transport,
236    {
237        if self.started {
238            return Ok(());
239        }
240
241        let node_id = format!("node-{}", self.config.node_id.0);
242        let gossip_addr: std::net::SocketAddr = self
243            .config
244            .gossip_address
245            .parse()
246            .map_err(|e: std::net::AddrParseError| DiscoveryError::Bind(e.to_string()))?;
247
248        let advertise_addr = if let Some(ref host) = self.config.advertise_host {
249            let mut resolved = None;
250            #[cfg(feature = "cluster")]
251            {
252                if let Ok(addrs) = (host.as_str(), gossip_addr.port()).to_socket_addrs() {
253                    for addr in addrs {
254                        if addr.ip().is_ipv4() {
255                            resolved = Some(addr);
256                            break;
257                        }
258                    }
259                }
260            }
261            if let Some(addr) = resolved {
262                addr
263            } else {
264                return Err(DiscoveryError::Bind(format!(
265                    "failed to resolve configured advertise_host '{host}' (or cluster feature is disabled)"
266                )));
267            }
268        } else if gossip_addr.ip().is_unspecified() {
269            let resolved = {
270                let mut res = None;
271                #[cfg(feature = "cluster")]
272                {
273                    let hostname = gethostname::gethostname();
274                    let hostname_str = hostname.to_string_lossy();
275                    if !hostname_str.is_empty() {
276                        if let Ok(addrs) =
277                            (hostname_str.as_ref(), gossip_addr.port()).to_socket_addrs()
278                        {
279                            for addr in addrs {
280                                if addr.ip().is_ipv4() && !addr.ip().is_loopback() {
281                                    res = Some(addr);
282                                    break;
283                                }
284                            }
285                        }
286                    }
287                }
288                res
289            };
290            resolved.unwrap_or_else(|| {
291                std::net::SocketAddr::new(
292                    std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
293                    gossip_addr.port(),
294                )
295            })
296        } else {
297            gossip_addr
298        };
299
300        let seed_addrs: Vec<String> = self.config.seed_nodes.clone();
301
302        tracing::info!(
303            "Starting gossip discovery: gossip_addr = {}, advertise_addr = {}, seeds = {:?}",
304            gossip_addr,
305            advertise_addr,
306            seed_addrs
307        );
308
309        // Generation: wall-clock millis. A node that was previously
310        // known (same `node_id`) rejoining under the same string
311        // needs a strictly-greater generation so chitchat supersedes
312        // the stale entry rather than treating it as the same
313        // instance.
314        let generation = std::time::SystemTime::now()
315            .duration_since(std::time::UNIX_EPOCH)
316            .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
317
318        let config = chitchat::ChitchatConfig {
319            chitchat_id: chitchat::ChitchatId::new(node_id, generation, advertise_addr),
320            cluster_id: self.config.cluster_id.clone(),
321            gossip_interval: self.config.gossip_interval,
322            listen_addr: gossip_addr,
323            seed_nodes: seed_addrs,
324            failure_detector_config: chitchat::FailureDetectorConfig {
325                phi_threshold: self.config.phi_threshold,
326                initial_interval: self.config.gossip_interval,
327                // Map dead_node_grace_period to the failure detector's GC
328                // timer (W6 fix). Default is 24h which is far too long.
329                dead_node_grace_period: self.config.dead_node_grace_period,
330                ..Default::default()
331            },
332            marked_for_deletion_grace_period: self.config.dead_node_grace_period,
333            extra_liveness_predicate: None,
334            catchup_callback: None,
335        };
336
337        let initial_kvs = Self::local_kvs(&self.config.local_node);
338        let chitchat_handle = chitchat::spawn_chitchat(config, initial_kvs, transport)
339            .await
340            .map_err(|e| DiscoveryError::Bind(e.to_string()))?;
341
342        self.chitchat_handle = Some(chitchat_handle);
343
344        // Spawn membership watcher
345        let peers = Arc::clone(&self.peers);
346        let membership_tx = self.membership_tx.clone();
347        let cancel = self.cancel.clone();
348        let chitchat = self.chitchat_handle.as_ref().unwrap().chitchat().clone();
349        let local_node_id = self.config.node_id;
350
351        tokio::spawn(async move {
352            let mut interval = tokio::time::interval(Duration::from_millis(500));
353            loop {
354                tokio::select! {
355                    () = cancel.cancelled() => break,
356                    _ = interval.tick() => {
357                        let chitchat_guard = chitchat.lock().await;
358                        let mut new_peers: HashMap<u64, NodeInfo> = HashMap::new();
359
360                        // Collect the set of live node IDs from the failure
361                        // detector so we only include reachable peers (C3 fix).
362                        let live_ids: std::collections::HashSet<&chitchat::ChitchatId> =
363                            chitchat_guard.live_nodes().collect();
364
365                        let nodes: Vec<_> = chitchat_guard.node_states().keys().map(|id| format!("{}(live={})", id.node_id, live_ids.contains(id))).collect();
366                        tracing::debug!("Chitchat state nodes: {:?}", nodes);
367
368                        // Iterate all known nodes, tagging dead ones
369                        for (cc_id, state) in chitchat_guard.node_states() {
370                            let kvs: HashMap<String, String> = state
371                                .key_values()
372                                .map(|(k, v)| (k.to_string(), v.to_string()))
373                                .collect();
374
375                            if let Some(mut info) = Self::parse_node_info(
376                                &cc_id.node_id,
377                                &kvs,
378                            ) {
379                                if info.id == local_node_id {
380                                    continue;
381                                }
382
383                                // Override self-reported state with failure
384                                // detector opinion: if chitchat considers this
385                                // node dead, mark it as Suspected/Left rather
386                                // than trusting the gossip KV (C3 fix).
387                                if !live_ids.contains(cc_id) {
388                                    info.state = NodeState::Suspected;
389                                }
390
391                                // A node id can appear twice across a
392                                // rejoin — once as the old entry
393                                // (Suspected or Left) and once as the
394                                // freshly-started entry (Active). Prefer
395                                // the Active record so `current_leader()`
396                                // sees the rejoined instance.
397                                match new_peers.get(&info.id.0) {
398                                    Some(existing)
399                                        if !matches!(info.state, NodeState::Active)
400                                            && matches!(existing.state, NodeState::Active) =>
401                                    {
402                                        // Keep the already-recorded Active entry.
403                                    }
404                                    _ => {
405                                        new_peers.insert(info.id.0, info);
406                                    }
407                                }
408                            }
409                        }
410
411                        let peer_list: Vec<NodeInfo> =
412                            new_peers.values().cloned().collect();
413                        *peers.write() = new_peers;
414                        let _ = membership_tx.send(peer_list);
415                    }
416                }
417            }
418        });
419
420        self.started = true;
421        Ok(())
422    }
423}
424
425impl Discovery for GossipDiscovery {
426    async fn start(&mut self) -> Result<(), DiscoveryError> {
427        self.start_with_transport(&chitchat::transport::UdpTransport)
428            .await
429    }
430
431    async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError> {
432        if !self.started {
433            return Err(DiscoveryError::NotStarted);
434        }
435        let peers = self.peers.read();
436        Ok(peers.values().cloned().collect())
437    }
438
439    async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError> {
440        if !self.started {
441            return Err(DiscoveryError::NotStarted);
442        }
443        if let Some(ref handle) = self.chitchat_handle {
444            let kvs = Self::local_kvs(&info);
445            handle
446                .with_chitchat(|chitchat| {
447                    for (key, value) in &kvs {
448                        chitchat.self_node_state().set(key.clone(), value.clone());
449                    }
450                })
451                .await;
452        }
453        Ok(())
454    }
455
456    fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
457        self.membership_rx.clone()
458    }
459
460    async fn stop(&mut self) -> Result<(), DiscoveryError> {
461        self.cancel.cancel();
462        self.started = false;
463        // Properly shut down chitchat (W8 fix): send shutdown command
464        // and wait for the background task to exit, releasing the UDP socket.
465        if let Some(handle) = self.chitchat_handle.take() {
466            if let Err(e) = handle.shutdown().await {
467                tracing::warn!("Chitchat shutdown error: {e}");
468            }
469        }
470        // Fresh token so restart after stop works
471        self.cancel = CancellationToken::new();
472        Ok(())
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    #[test]
481    fn test_key_namespace() {
482        assert_eq!(keys::NODE_STATE, "node:state");
483        assert_eq!(keys::RPC_ADDRESS, "node:rpc_addr");
484    }
485
486    #[test]
487    fn test_gossip_config_default() {
488        let config = GossipDiscoveryConfig::default();
489        assert_eq!(config.gossip_interval, Duration::from_millis(500));
490        assert!((config.phi_threshold - 8.0).abs() < f64::EPSILON);
491    }
492
493    #[test]
494    fn test_parse_node_info() {
495        let mut kvs = HashMap::new();
496        kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
497        kvs.insert(keys::RAFT_ADDRESS.into(), "127.0.0.1:9001".into());
498        kvs.insert(keys::NODE_NAME.into(), "test-node".into());
499        kvs.insert(keys::NODE_STATE.into(), "active".into());
500        kvs.insert(keys::LOAD_CORES.into(), "4".into());
501        kvs.insert(keys::LOAD_MEMORY.into(), "8589934592".into());
502
503        let info = GossipDiscovery::parse_node_info("node-42", &kvs).unwrap();
504        assert_eq!(info.id, NodeId(42));
505        assert_eq!(info.name, "test-node");
506        assert_eq!(info.metadata.cores, 4);
507        assert_eq!(info.state, NodeState::Active);
508    }
509
510    #[test]
511    fn test_parse_node_info_invalid_id() {
512        let kvs = HashMap::new();
513        assert!(GossipDiscovery::parse_node_info("invalid", &kvs).is_none());
514    }
515
516    #[test]
517    fn test_parse_node_info_missing_rpc() {
518        let kvs = HashMap::new();
519        assert!(GossipDiscovery::parse_node_info("node-1", &kvs).is_none());
520    }
521
522    #[test]
523    fn test_local_kvs() {
524        let info = NodeInfo {
525            id: NodeId(1),
526            name: "n1".into(),
527            rpc_address: "127.0.0.1:9000".into(),
528            raft_address: "127.0.0.1:9001".into(),
529            state: NodeState::Active,
530            metadata: NodeMetadata {
531                cores: 4,
532                memory_bytes: 1024,
533                failure_domain: Some("us-east-1a".into()),
534                owned_partitions: vec![0, 1, 2],
535                ..NodeMetadata::default()
536            },
537            last_heartbeat_ms: 0,
538        };
539        let kvs = GossipDiscovery::local_kvs(&info);
540        assert!(kvs.iter().any(|(k, _)| k == keys::RPC_ADDRESS));
541        assert!(kvs.iter().any(|(k, _)| k == keys::FAILURE_DOMAIN));
542        assert!(kvs
543            .iter()
544            .any(|(k, v)| k == keys::PARTITIONS_OWNED && v == "0,1,2"));
545    }
546
547    #[test]
548    fn test_parse_owned_partitions() {
549        let mut kvs = HashMap::new();
550        kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
551        kvs.insert(keys::PARTITIONS_OWNED.into(), "0,1,5,10".into());
552
553        let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
554        assert_eq!(info.metadata.owned_partitions, vec![0, 1, 5, 10]);
555    }
556
557    #[test]
558    fn test_parse_all_node_states() {
559        for (state_str, expected) in [
560            ("joining", NodeState::Joining),
561            ("active", NodeState::Active),
562            ("suspected", NodeState::Suspected),
563            ("draining", NodeState::Draining),
564            ("left", NodeState::Left),
565        ] {
566            let mut kvs = HashMap::new();
567            kvs.insert(keys::RPC_ADDRESS.into(), "127.0.0.1:9000".into());
568            kvs.insert(keys::NODE_STATE.into(), state_str.into());
569
570            let info = GossipDiscovery::parse_node_info("node-1", &kvs).unwrap();
571            assert_eq!(info.state, expected);
572        }
573    }
574
575    #[tokio::test]
576    async fn test_not_started_errors() {
577        let config = GossipDiscoveryConfig::default();
578        let disc = GossipDiscovery::new(config);
579        assert!(disc.peers().await.is_err());
580    }
581}