laminar_core/cluster/discovery/
mod.rs1#![allow(clippy::disallowed_types)] mod static_discovery;
6pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
7
8mod gossip_discovery;
9pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
10
11use std::collections::HashMap;
12use std::fmt;
13
14use serde::{Deserialize, Serialize};
15use tokio::sync::watch;
16
17pub use crate::state::NodeId;
18
19#[derive(
21 Debug,
22 Clone,
23 Copy,
24 PartialEq,
25 Eq,
26 Serialize,
27 Deserialize,
28 rkyv::Archive,
29 rkyv::Serialize,
30 rkyv::Deserialize,
31)]
32pub enum NodeState {
33 Joining,
35 Active,
37 Suspected,
39 Draining,
41 Left,
43}
44
45impl fmt::Display for NodeState {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Self::Joining => write!(f, "joining"),
49 Self::Active => write!(f, "active"),
50 Self::Suspected => write!(f, "suspected"),
51 Self::Draining => write!(f, "draining"),
52 Self::Left => write!(f, "left"),
53 }
54 }
55}
56
57#[derive(
59 Debug,
60 Clone,
61 PartialEq,
62 Serialize,
63 Deserialize,
64 rkyv::Archive,
65 rkyv::Serialize,
66 rkyv::Deserialize,
67)]
68pub struct NodeMetadata {
69 pub cores: u32,
71 pub memory_bytes: u64,
73 pub failure_domain: Option<String>,
75 pub tags: HashMap<String, String>,
77 pub owned_partitions: Vec<u32>,
79 pub version: String,
81}
82
83impl Default for NodeMetadata {
84 fn default() -> Self {
85 Self {
86 cores: 1,
87 memory_bytes: 0,
88 failure_domain: None,
89 tags: HashMap::new(),
90 owned_partitions: Vec::new(),
91 version: String::new(),
92 }
93 }
94}
95
96#[derive(
98 Debug,
99 Clone,
100 PartialEq,
101 Serialize,
102 Deserialize,
103 rkyv::Archive,
104 rkyv::Serialize,
105 rkyv::Deserialize,
106)]
107pub struct NodeInfo {
108 pub id: NodeId,
110 pub name: String,
112 pub rpc_address: String,
114 pub raft_address: String,
116 pub state: NodeState,
118 pub metadata: NodeMetadata,
120 pub last_heartbeat_ms: i64,
122}
123
124#[must_use]
128pub fn assignable_node_ids(members: &[NodeInfo]) -> Vec<NodeId> {
129 let mut ids: Vec<NodeId> = members
130 .iter()
131 .filter(|m| matches!(m.state, NodeState::Active))
132 .map(|m| m.id)
133 .filter(|id| !id.is_unassigned())
134 .collect();
135 ids.sort_unstable();
136 ids.dedup();
137 ids
138}
139
140#[derive(Debug, Clone)]
142pub enum MembershipEvent {
143 NodeJoined(Box<NodeInfo>),
145 NodeStateChanged {
147 node_id: NodeId,
149 old_state: NodeState,
151 new_state: NodeState,
153 },
154 NodeLeft(NodeId),
156}
157
158#[derive(Debug, thiserror::Error)]
160pub enum DiscoveryError {
161 #[error("bind error: {0}")]
163 Bind(String),
164
165 #[error("connection error to {address}: {reason}")]
167 Connection {
168 address: String,
170 reason: String,
172 },
173
174 #[error("serialization error: {0}")]
176 Serialization(String),
177
178 #[error("discovery not started")]
180 NotStarted,
181
182 #[error("discovery shut down")]
184 ShutDown,
185
186 #[error("I/O error: {0}")]
188 Io(#[from] std::io::Error),
189}
190
191#[allow(async_fn_in_trait)]
197pub trait Discovery: Send + Sync + 'static {
198 async fn start(&mut self) -> Result<(), DiscoveryError>;
202
203 async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
205
206 async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
208
209 fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
214
215 async fn stop(&mut self) -> Result<(), DiscoveryError>;
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_node_id_display() {
225 assert_eq!(NodeId(42).to_string(), "node-42");
226 }
227
228 #[test]
229 fn test_node_id_unassigned() {
230 assert!(NodeId::UNASSIGNED.is_unassigned());
231 assert!(!NodeId(1).is_unassigned());
232 }
233
234 #[test]
235 fn test_node_state_display() {
236 assert_eq!(NodeState::Active.to_string(), "active");
237 assert_eq!(NodeState::Suspected.to_string(), "suspected");
238 assert_eq!(NodeState::Draining.to_string(), "draining");
239 }
240
241 fn info_with(id: u64, state: NodeState) -> NodeInfo {
242 NodeInfo {
243 id: NodeId(id),
244 name: format!("n{id}"),
245 rpc_address: String::new(),
246 raft_address: String::new(),
247 state,
248 metadata: NodeMetadata::default(),
249 last_heartbeat_ms: 0,
250 }
251 }
252
253 #[test]
254 fn assignable_includes_only_active_sorted_deduped() {
255 let members = vec![
256 info_with(5, NodeState::Active),
257 info_with(2, NodeState::Joining),
258 info_with(3, NodeState::Suspected),
259 info_with(4, NodeState::Draining),
260 info_with(6, NodeState::Left),
261 info_with(1, NodeState::Active),
262 info_with(1, NodeState::Active), ];
264 assert_eq!(assignable_node_ids(&members), vec![NodeId(1), NodeId(5)]);
265 }
266
267 #[test]
268 fn assignable_drops_unassigned() {
269 let mut unassigned = info_with(7, NodeState::Active);
270 unassigned.id = NodeId::UNASSIGNED;
271 let members = vec![unassigned, info_with(7, NodeState::Active)];
272 assert_eq!(assignable_node_ids(&members), vec![NodeId(7)]);
273 }
274
275 #[test]
276 fn test_node_metadata_default() {
277 let meta = NodeMetadata::default();
278 assert_eq!(meta.cores, 1);
279 assert_eq!(meta.memory_bytes, 0);
280 assert!(meta.failure_domain.is_none());
281 assert!(meta.tags.is_empty());
282 assert!(meta.owned_partitions.is_empty());
283 }
284
285 #[test]
286 fn test_node_id_serialization() {
287 let id = NodeId(123);
288 let json = serde_json::to_string(&id).unwrap();
289 let back: NodeId = serde_json::from_str(&json).unwrap();
290 assert_eq!(id, back);
291 }
292
293 #[test]
294 fn test_node_info_serialization() {
295 let info = NodeInfo {
296 id: NodeId(1),
297 name: "test-node".into(),
298 rpc_address: "127.0.0.1:9000".into(),
299 raft_address: "127.0.0.1:9001".into(),
300 state: NodeState::Active,
301 metadata: NodeMetadata::default(),
302 last_heartbeat_ms: 1000,
303 };
304 let json = serde_json::to_string(&info).unwrap();
305 let back: NodeInfo = serde_json::from_str(&json).unwrap();
306 assert_eq!(back.id, info.id);
307 assert_eq!(back.name, "test-node");
308 }
309}