Skip to main content

laminar_core/cluster/discovery/
mod.rs

1//! Peer discovery: `StaticDiscovery` (seed list) and `GossipDiscovery`
2//! (chitchat).
3#![allow(clippy::disallowed_types)] // cold path: discovery metadata (serde + rkyv)
4
5mod static_discovery;
6pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
7
8mod gossip_discovery;
9pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
10
11use std::collections::HashMap;
12use std::fmt;
13
14use serde::{Deserialize, Serialize};
15use tokio::sync::watch;
16
17pub use crate::state::NodeId;
18
19/// Current lifecycle state of a node.
20#[derive(
21    Debug,
22    Clone,
23    Copy,
24    PartialEq,
25    Eq,
26    Serialize,
27    Deserialize,
28    rkyv::Archive,
29    rkyv::Serialize,
30    rkyv::Deserialize,
31)]
32pub enum NodeState {
33    /// Node is joining the cluster but not yet fully active.
34    Joining,
35    /// Node is active and participating in the cluster.
36    Active,
37    /// Node is suspected of failure (missed heartbeats).
38    Suspected,
39    /// Node is gracefully draining before leaving.
40    Draining,
41    /// Node has left the cluster.
42    Left,
43}
44
45impl fmt::Display for NodeState {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::Joining => write!(f, "joining"),
49            Self::Active => write!(f, "active"),
50            Self::Suspected => write!(f, "suspected"),
51            Self::Draining => write!(f, "draining"),
52            Self::Left => write!(f, "left"),
53        }
54    }
55}
56
57/// Hardware and deployment metadata for a node.
58#[derive(
59    Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
60)]
61pub struct NodeMetadata {
62    /// Number of CPU cores available.
63    pub cores: u32,
64    /// Total memory in bytes.
65    pub memory_bytes: u64,
66    /// Failure domain (e.g., rack, zone, region).
67    pub failure_domain: Option<String>,
68    /// Arbitrary key-value tags.
69    pub tags: HashMap<String, String>,
70    /// Partitions currently owned by this node.
71    pub owned_partitions: Vec<u32>,
72    /// `LaminarDB` version string.
73    pub version: String,
74}
75
76impl Default for NodeMetadata {
77    fn default() -> Self {
78        Self {
79            cores: 1,
80            memory_bytes: 0,
81            failure_domain: None,
82            tags: HashMap::new(),
83            owned_partitions: Vec::new(),
84            version: String::new(),
85        }
86    }
87}
88
89/// Full information about a discovered node.
90#[derive(
91    Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
92)]
93pub struct NodeInfo {
94    /// The node's unique identifier.
95    pub id: NodeId,
96    /// Human-readable name.
97    pub name: String,
98    /// Address for gRPC communication.
99    pub rpc_address: String,
100    /// Address for Raft communication.
101    pub raft_address: String,
102    /// Current lifecycle state.
103    pub state: NodeState,
104    /// Hardware/deployment metadata.
105    pub metadata: NodeMetadata,
106    /// Timestamp of the last received heartbeat (millis since epoch).
107    pub last_heartbeat_ms: i64,
108}
109
110/// A membership change event.
111#[derive(Debug, Clone)]
112pub enum MembershipEvent {
113    /// A new node has been discovered.
114    NodeJoined(Box<NodeInfo>),
115    /// A node's state has changed.
116    NodeStateChanged {
117        /// The node whose state changed.
118        node_id: NodeId,
119        /// Previous state.
120        old_state: NodeState,
121        /// New state.
122        new_state: NodeState,
123    },
124    /// A node has left or been removed from the cluster.
125    NodeLeft(NodeId),
126}
127
128/// Errors that can occur during discovery operations.
129#[derive(Debug, thiserror::Error)]
130pub enum DiscoveryError {
131    /// Failed to bind to the specified address.
132    #[error("bind error: {0}")]
133    Bind(String),
134
135    /// Failed to connect to a seed/peer node.
136    #[error("connection error to {address}: {reason}")]
137    Connection {
138        /// The address that failed.
139        address: String,
140        /// Reason for failure.
141        reason: String,
142    },
143
144    /// Serialization/deserialization failure.
145    #[error("serialization error: {0}")]
146    Serialization(String),
147
148    /// The discovery service is not running.
149    #[error("discovery not started")]
150    NotStarted,
151
152    /// The discovery service has been shut down.
153    #[error("discovery shut down")]
154    ShutDown,
155
156    /// An I/O error occurred.
157    #[error("I/O error: {0}")]
158    Io(#[from] std::io::Error),
159}
160
161/// Trait for node discovery in a delta.
162///
163/// Implementations provide the mechanism by which nodes find and
164/// track each other. The trait is async and designed for long-running
165/// background tasks.
166#[allow(async_fn_in_trait)]
167pub trait Discovery: Send + Sync + 'static {
168    /// Start the discovery service.
169    ///
170    /// This spawns background tasks for heartbeating and failure detection.
171    async fn start(&mut self) -> Result<(), DiscoveryError>;
172
173    /// Get the current set of known peers (excluding self).
174    async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
175
176    /// Announce this node's updated information to the cluster.
177    async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
178
179    /// Subscribe to membership change events.
180    ///
181    /// Returns a watch receiver that is updated whenever the membership
182    /// changes. The value is the list of all known peers.
183    fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
184
185    /// Gracefully stop the discovery service.
186    async fn stop(&mut self) -> Result<(), DiscoveryError>;
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_node_id_display() {
195        assert_eq!(NodeId(42).to_string(), "node-42");
196    }
197
198    #[test]
199    fn test_node_id_unassigned() {
200        assert!(NodeId::UNASSIGNED.is_unassigned());
201        assert!(!NodeId(1).is_unassigned());
202    }
203
204    #[test]
205    fn test_node_state_display() {
206        assert_eq!(NodeState::Active.to_string(), "active");
207        assert_eq!(NodeState::Suspected.to_string(), "suspected");
208        assert_eq!(NodeState::Draining.to_string(), "draining");
209    }
210
211    #[test]
212    fn test_node_metadata_default() {
213        let meta = NodeMetadata::default();
214        assert_eq!(meta.cores, 1);
215        assert_eq!(meta.memory_bytes, 0);
216        assert!(meta.failure_domain.is_none());
217        assert!(meta.tags.is_empty());
218        assert!(meta.owned_partitions.is_empty());
219    }
220
221    #[test]
222    fn test_node_id_serialization() {
223        let id = NodeId(123);
224        let json = serde_json::to_string(&id).unwrap();
225        let back: NodeId = serde_json::from_str(&json).unwrap();
226        assert_eq!(id, back);
227    }
228
229    #[test]
230    fn test_node_info_serialization() {
231        let info = NodeInfo {
232            id: NodeId(1),
233            name: "test-node".into(),
234            rpc_address: "127.0.0.1:9000".into(),
235            raft_address: "127.0.0.1:9001".into(),
236            state: NodeState::Active,
237            metadata: NodeMetadata::default(),
238            last_heartbeat_ms: 1000,
239        };
240        let json = serde_json::to_string(&info).unwrap();
241        let back: NodeInfo = serde_json::from_str(&json).unwrap();
242        assert_eq!(back.id, info.id);
243        assert_eq!(back.name, "test-node");
244    }
245}