Skip to main content

laminar_core/cluster/discovery/
mod.rs

1//! Peer discovery: `StaticDiscovery` (seed list) and `GossipDiscovery`
2//! (chitchat).
3#![allow(clippy::disallowed_types)] // cold path: discovery metadata (serde + rkyv)
4
5mod static_discovery;
6pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
7
8mod gossip_discovery;
9pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
10
11use std::collections::HashMap;
12use std::fmt;
13
14use serde::{Deserialize, Serialize};
15use tokio::sync::watch;
16
17pub use crate::state::NodeId;
18
19/// Current lifecycle state of a node.
20#[derive(
21    Debug,
22    Clone,
23    Copy,
24    PartialEq,
25    Eq,
26    Serialize,
27    Deserialize,
28    rkyv::Archive,
29    rkyv::Serialize,
30    rkyv::Deserialize,
31)]
32pub enum NodeState {
33    /// Node is joining the cluster but not yet fully active.
34    Joining,
35    /// Node is active and participating in the cluster.
36    Active,
37    /// Node is suspected of failure (missed heartbeats).
38    Suspected,
39    /// Node is gracefully draining before leaving.
40    Draining,
41    /// Node has left the cluster.
42    Left,
43}
44
45impl fmt::Display for NodeState {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::Joining => write!(f, "joining"),
49            Self::Active => write!(f, "active"),
50            Self::Suspected => write!(f, "suspected"),
51            Self::Draining => write!(f, "draining"),
52            Self::Left => write!(f, "left"),
53        }
54    }
55}
56
57/// Hardware and deployment metadata for a node.
58#[derive(
59    Debug,
60    Clone,
61    PartialEq,
62    Serialize,
63    Deserialize,
64    rkyv::Archive,
65    rkyv::Serialize,
66    rkyv::Deserialize,
67)]
68pub struct NodeMetadata {
69    /// Number of CPU cores available.
70    pub cores: u32,
71    /// Total memory in bytes.
72    pub memory_bytes: u64,
73    /// Failure domain (e.g., rack, zone, region).
74    pub failure_domain: Option<String>,
75    /// Arbitrary key-value tags.
76    pub tags: HashMap<String, String>,
77    /// Partitions currently owned by this node.
78    pub owned_partitions: Vec<u32>,
79    /// `LaminarDB` version string.
80    pub version: String,
81}
82
83impl Default for NodeMetadata {
84    fn default() -> Self {
85        Self {
86            cores: 1,
87            memory_bytes: 0,
88            failure_domain: None,
89            tags: HashMap::new(),
90            owned_partitions: Vec::new(),
91            version: String::new(),
92        }
93    }
94}
95
96/// Full information about a discovered node.
97#[derive(
98    Debug,
99    Clone,
100    PartialEq,
101    Serialize,
102    Deserialize,
103    rkyv::Archive,
104    rkyv::Serialize,
105    rkyv::Deserialize,
106)]
107pub struct NodeInfo {
108    /// The node's unique identifier.
109    pub id: NodeId,
110    /// Human-readable name.
111    pub name: String,
112    /// Address for gRPC communication.
113    pub rpc_address: String,
114    /// Address for Raft communication.
115    pub raft_address: String,
116    /// Current lifecycle state.
117    pub state: NodeState,
118    /// Hardware/deployment metadata.
119    pub metadata: NodeMetadata,
120    /// Timestamp of the last received heartbeat (millis since epoch).
121    pub last_heartbeat_ms: i64,
122}
123
124/// Node ids eligible to own vnodes: only `Active` nodes. Joining,
125/// Suspected, Draining, and Left are excluded so a draining or failing
126/// node sheds its vnodes on the next rotation.
127#[must_use]
128pub fn assignable_node_ids(members: &[NodeInfo]) -> Vec<NodeId> {
129    let mut ids: Vec<NodeId> = members
130        .iter()
131        .filter(|m| matches!(m.state, NodeState::Active))
132        .map(|m| m.id)
133        .filter(|id| !id.is_unassigned())
134        .collect();
135    ids.sort_unstable();
136    ids.dedup();
137    ids
138}
139
140/// A membership change event.
141#[derive(Debug, Clone)]
142pub enum MembershipEvent {
143    /// A new node has been discovered.
144    NodeJoined(Box<NodeInfo>),
145    /// A node's state has changed.
146    NodeStateChanged {
147        /// The node whose state changed.
148        node_id: NodeId,
149        /// Previous state.
150        old_state: NodeState,
151        /// New state.
152        new_state: NodeState,
153    },
154    /// A node has left or been removed from the cluster.
155    NodeLeft(NodeId),
156}
157
158/// Errors that can occur during discovery operations.
159#[derive(Debug, thiserror::Error)]
160pub enum DiscoveryError {
161    /// Failed to bind to the specified address.
162    #[error("bind error: {0}")]
163    Bind(String),
164
165    /// Failed to connect to a seed/peer node.
166    #[error("connection error to {address}: {reason}")]
167    Connection {
168        /// The address that failed.
169        address: String,
170        /// Reason for failure.
171        reason: String,
172    },
173
174    /// Serialization/deserialization failure.
175    #[error("serialization error: {0}")]
176    Serialization(String),
177
178    /// The discovery service is not running.
179    #[error("discovery not started")]
180    NotStarted,
181
182    /// The discovery service has been shut down.
183    #[error("discovery shut down")]
184    ShutDown,
185
186    /// An I/O error occurred.
187    #[error("I/O error: {0}")]
188    Io(#[from] std::io::Error),
189}
190
191/// Trait for node discovery in a cluster.
192///
193/// Implementations provide the mechanism by which nodes find and
194/// track each other. The trait is async and designed for long-running
195/// background tasks.
196#[allow(async_fn_in_trait)]
197pub trait Discovery: Send + Sync + 'static {
198    /// Start the discovery service.
199    ///
200    /// This spawns background tasks for heartbeating and failure detection.
201    async fn start(&mut self) -> Result<(), DiscoveryError>;
202
203    /// Get the current set of known peers (excluding self).
204    async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
205
206    /// Announce this node's updated information to the cluster.
207    async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
208
209    /// Subscribe to membership change events.
210    ///
211    /// Returns a watch receiver that is updated whenever the membership
212    /// changes. The value is the list of all known peers.
213    fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
214
215    /// Gracefully stop the discovery service.
216    async fn stop(&mut self) -> Result<(), DiscoveryError>;
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn test_node_id_display() {
225        assert_eq!(NodeId(42).to_string(), "node-42");
226    }
227
228    #[test]
229    fn test_node_id_unassigned() {
230        assert!(NodeId::UNASSIGNED.is_unassigned());
231        assert!(!NodeId(1).is_unassigned());
232    }
233
234    #[test]
235    fn test_node_state_display() {
236        assert_eq!(NodeState::Active.to_string(), "active");
237        assert_eq!(NodeState::Suspected.to_string(), "suspected");
238        assert_eq!(NodeState::Draining.to_string(), "draining");
239    }
240
241    fn info_with(id: u64, state: NodeState) -> NodeInfo {
242        NodeInfo {
243            id: NodeId(id),
244            name: format!("n{id}"),
245            rpc_address: String::new(),
246            raft_address: String::new(),
247            state,
248            metadata: NodeMetadata::default(),
249            last_heartbeat_ms: 0,
250        }
251    }
252
253    #[test]
254    fn assignable_includes_only_active_sorted_deduped() {
255        let members = vec![
256            info_with(5, NodeState::Active),
257            info_with(2, NodeState::Joining),
258            info_with(3, NodeState::Suspected),
259            info_with(4, NodeState::Draining),
260            info_with(6, NodeState::Left),
261            info_with(1, NodeState::Active),
262            info_with(1, NodeState::Active), // dup
263        ];
264        assert_eq!(assignable_node_ids(&members), vec![NodeId(1), NodeId(5)]);
265    }
266
267    #[test]
268    fn assignable_drops_unassigned() {
269        let mut unassigned = info_with(7, NodeState::Active);
270        unassigned.id = NodeId::UNASSIGNED;
271        let members = vec![unassigned, info_with(7, NodeState::Active)];
272        assert_eq!(assignable_node_ids(&members), vec![NodeId(7)]);
273    }
274
275    #[test]
276    fn test_node_metadata_default() {
277        let meta = NodeMetadata::default();
278        assert_eq!(meta.cores, 1);
279        assert_eq!(meta.memory_bytes, 0);
280        assert!(meta.failure_domain.is_none());
281        assert!(meta.tags.is_empty());
282        assert!(meta.owned_partitions.is_empty());
283    }
284
285    #[test]
286    fn test_node_id_serialization() {
287        let id = NodeId(123);
288        let json = serde_json::to_string(&id).unwrap();
289        let back: NodeId = serde_json::from_str(&json).unwrap();
290        assert_eq!(id, back);
291    }
292
293    #[test]
294    fn test_node_info_serialization() {
295        let info = NodeInfo {
296            id: NodeId(1),
297            name: "test-node".into(),
298            rpc_address: "127.0.0.1:9000".into(),
299            raft_address: "127.0.0.1:9001".into(),
300            state: NodeState::Active,
301            metadata: NodeMetadata::default(),
302            last_heartbeat_ms: 1000,
303        };
304        let json = serde_json::to_string(&info).unwrap();
305        let back: NodeInfo = serde_json::from_str(&json).unwrap();
306        assert_eq!(back.id, info.id);
307        assert_eq!(back.name, "test-node");
308    }
309}