Skip to main content

laminar_core/delta/discovery/
mod.rs

1//! # Node Discovery
2//!
3//! Traits and implementations for discovering peer nodes in a LaminarDB
4//! delta.
5//!
6//! ## Implementations
7//!
8//! - `StaticDiscovery`: Pre-configured seed list with TCP heartbeats
9//! - `GossipDiscovery`: Chitchat-based gossip protocol
10#![allow(clippy::disallowed_types)] // cold path: discovery metadata (serde + rkyv)
11
12mod static_discovery;
13pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
14
15mod gossip_discovery;
16pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
17
18use std::collections::HashMap;
19use std::fmt;
20
21use serde::{Deserialize, Serialize};
22use tokio::sync::watch;
23
24/// Unique identifier for a node in the delta.
25#[derive(
26    Debug,
27    Clone,
28    Copy,
29    PartialEq,
30    Eq,
31    Hash,
32    Serialize,
33    Deserialize,
34    rkyv::Archive,
35    rkyv::Serialize,
36    rkyv::Deserialize,
37)]
38pub struct NodeId(pub u64);
39
40impl NodeId {
41    /// Sentinel value representing "unassigned" (no owner).
42    pub const UNASSIGNED: Self = Self(0);
43
44    /// Returns `true` if this is the unassigned sentinel.
45    #[must_use]
46    pub const fn is_unassigned(&self) -> bool {
47        self.0 == 0
48    }
49}
50
51impl fmt::Display for NodeId {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        write!(f, "node-{}", self.0)
54    }
55}
56
57/// Current lifecycle state of a node.
58#[derive(
59    Debug,
60    Clone,
61    Copy,
62    PartialEq,
63    Eq,
64    Serialize,
65    Deserialize,
66    rkyv::Archive,
67    rkyv::Serialize,
68    rkyv::Deserialize,
69)]
70pub enum NodeState {
71    /// Node is joining the cluster but not yet fully active.
72    Joining,
73    /// Node is active and participating in the cluster.
74    Active,
75    /// Node is suspected of failure (missed heartbeats).
76    Suspected,
77    /// Node is gracefully draining before leaving.
78    Draining,
79    /// Node has left the cluster.
80    Left,
81}
82
83impl fmt::Display for NodeState {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        match self {
86            Self::Joining => write!(f, "joining"),
87            Self::Active => write!(f, "active"),
88            Self::Suspected => write!(f, "suspected"),
89            Self::Draining => write!(f, "draining"),
90            Self::Left => write!(f, "left"),
91        }
92    }
93}
94
95/// Hardware and deployment metadata for a node.
96#[derive(
97    Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
98)]
99pub struct NodeMetadata {
100    /// Number of CPU cores available.
101    pub cores: u32,
102    /// Total memory in bytes.
103    pub memory_bytes: u64,
104    /// Failure domain (e.g., rack, zone, region).
105    pub failure_domain: Option<String>,
106    /// Arbitrary key-value tags.
107    pub tags: HashMap<String, String>,
108    /// Partitions currently owned by this node.
109    pub owned_partitions: Vec<u32>,
110    /// `LaminarDB` version string.
111    pub version: String,
112}
113
114impl Default for NodeMetadata {
115    fn default() -> Self {
116        Self {
117            cores: 1,
118            memory_bytes: 0,
119            failure_domain: None,
120            tags: HashMap::new(),
121            owned_partitions: Vec::new(),
122            version: String::new(),
123        }
124    }
125}
126
127/// Full information about a discovered node.
128#[derive(
129    Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
130)]
131pub struct NodeInfo {
132    /// The node's unique identifier.
133    pub id: NodeId,
134    /// Human-readable name.
135    pub name: String,
136    /// Address for gRPC communication.
137    pub rpc_address: String,
138    /// Address for Raft communication.
139    pub raft_address: String,
140    /// Current lifecycle state.
141    pub state: NodeState,
142    /// Hardware/deployment metadata.
143    pub metadata: NodeMetadata,
144    /// Timestamp of the last received heartbeat (millis since epoch).
145    pub last_heartbeat_ms: i64,
146}
147
148/// A membership change event.
149#[derive(Debug, Clone)]
150pub enum MembershipEvent {
151    /// A new node has been discovered.
152    NodeJoined(Box<NodeInfo>),
153    /// A node's state has changed.
154    NodeStateChanged {
155        /// The node whose state changed.
156        node_id: NodeId,
157        /// Previous state.
158        old_state: NodeState,
159        /// New state.
160        new_state: NodeState,
161    },
162    /// A node has left or been removed from the cluster.
163    NodeLeft(NodeId),
164}
165
166/// Errors that can occur during discovery operations.
167#[derive(Debug, thiserror::Error)]
168pub enum DiscoveryError {
169    /// Failed to bind to the specified address.
170    #[error("bind error: {0}")]
171    Bind(String),
172
173    /// Failed to connect to a seed/peer node.
174    #[error("connection error to {address}: {reason}")]
175    Connection {
176        /// The address that failed.
177        address: String,
178        /// Reason for failure.
179        reason: String,
180    },
181
182    /// Serialization/deserialization failure.
183    #[error("serialization error: {0}")]
184    Serialization(String),
185
186    /// The discovery service is not running.
187    #[error("discovery not started")]
188    NotStarted,
189
190    /// The discovery service has been shut down.
191    #[error("discovery shut down")]
192    ShutDown,
193
194    /// An I/O error occurred.
195    #[error("I/O error: {0}")]
196    Io(#[from] std::io::Error),
197}
198
199/// Trait for node discovery in a delta.
200///
201/// Implementations provide the mechanism by which nodes find and
202/// track each other. The trait is async and designed for long-running
203/// background tasks.
204#[allow(async_fn_in_trait)]
205pub trait Discovery: Send + Sync + 'static {
206    /// Start the discovery service.
207    ///
208    /// This spawns background tasks for heartbeating and failure detection.
209    async fn start(&mut self) -> Result<(), DiscoveryError>;
210
211    /// Get the current set of known peers (excluding self).
212    async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
213
214    /// Announce this node's updated information to the cluster.
215    async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
216
217    /// Subscribe to membership change events.
218    ///
219    /// Returns a watch receiver that is updated whenever the membership
220    /// changes. The value is the list of all known peers.
221    fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
222
223    /// Gracefully stop the discovery service.
224    async fn stop(&mut self) -> Result<(), DiscoveryError>;
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_node_id_display() {
233        assert_eq!(NodeId(42).to_string(), "node-42");
234    }
235
236    #[test]
237    fn test_node_id_unassigned() {
238        assert!(NodeId::UNASSIGNED.is_unassigned());
239        assert!(!NodeId(1).is_unassigned());
240    }
241
242    #[test]
243    fn test_node_state_display() {
244        assert_eq!(NodeState::Active.to_string(), "active");
245        assert_eq!(NodeState::Suspected.to_string(), "suspected");
246        assert_eq!(NodeState::Draining.to_string(), "draining");
247    }
248
249    #[test]
250    fn test_node_metadata_default() {
251        let meta = NodeMetadata::default();
252        assert_eq!(meta.cores, 1);
253        assert_eq!(meta.memory_bytes, 0);
254        assert!(meta.failure_domain.is_none());
255        assert!(meta.tags.is_empty());
256        assert!(meta.owned_partitions.is_empty());
257    }
258
259    #[test]
260    fn test_node_id_serialization() {
261        let id = NodeId(123);
262        let json = serde_json::to_string(&id).unwrap();
263        let back: NodeId = serde_json::from_str(&json).unwrap();
264        assert_eq!(id, back);
265    }
266
267    #[test]
268    fn test_node_info_serialization() {
269        let info = NodeInfo {
270            id: NodeId(1),
271            name: "test-node".into(),
272            rpc_address: "127.0.0.1:9000".into(),
273            raft_address: "127.0.0.1:9001".into(),
274            state: NodeState::Active,
275            metadata: NodeMetadata::default(),
276            last_heartbeat_ms: 1000,
277        };
278        let json = serde_json::to_string(&info).unwrap();
279        let back: NodeInfo = serde_json::from_str(&json).unwrap();
280        assert_eq!(back.id, info.id);
281        assert_eq!(back.name, "test-node");
282    }
283}