laminar_core/cluster/discovery/
mod.rs1#![allow(clippy::disallowed_types)] mod static_discovery;
6pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
7
8mod gossip_discovery;
9pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
10
11use std::collections::HashMap;
12use std::fmt;
13
14use serde::{Deserialize, Serialize};
15use tokio::sync::watch;
16
17pub use crate::state::NodeId;
18
19#[derive(
21 Debug,
22 Clone,
23 Copy,
24 PartialEq,
25 Eq,
26 Serialize,
27 Deserialize,
28 rkyv::Archive,
29 rkyv::Serialize,
30 rkyv::Deserialize,
31)]
32pub enum NodeState {
33 Joining,
35 Active,
37 Suspected,
39 Draining,
41 Left,
43}
44
45impl fmt::Display for NodeState {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Self::Joining => write!(f, "joining"),
49 Self::Active => write!(f, "active"),
50 Self::Suspected => write!(f, "suspected"),
51 Self::Draining => write!(f, "draining"),
52 Self::Left => write!(f, "left"),
53 }
54 }
55}
56
57#[derive(
59 Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
60)]
61pub struct NodeMetadata {
62 pub cores: u32,
64 pub memory_bytes: u64,
66 pub failure_domain: Option<String>,
68 pub tags: HashMap<String, String>,
70 pub owned_partitions: Vec<u32>,
72 pub version: String,
74}
75
76impl Default for NodeMetadata {
77 fn default() -> Self {
78 Self {
79 cores: 1,
80 memory_bytes: 0,
81 failure_domain: None,
82 tags: HashMap::new(),
83 owned_partitions: Vec::new(),
84 version: String::new(),
85 }
86 }
87}
88
89#[derive(
91 Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
92)]
93pub struct NodeInfo {
94 pub id: NodeId,
96 pub name: String,
98 pub rpc_address: String,
100 pub raft_address: String,
102 pub state: NodeState,
104 pub metadata: NodeMetadata,
106 pub last_heartbeat_ms: i64,
108}
109
110#[derive(Debug, Clone)]
112pub enum MembershipEvent {
113 NodeJoined(Box<NodeInfo>),
115 NodeStateChanged {
117 node_id: NodeId,
119 old_state: NodeState,
121 new_state: NodeState,
123 },
124 NodeLeft(NodeId),
126}
127
128#[derive(Debug, thiserror::Error)]
130pub enum DiscoveryError {
131 #[error("bind error: {0}")]
133 Bind(String),
134
135 #[error("connection error to {address}: {reason}")]
137 Connection {
138 address: String,
140 reason: String,
142 },
143
144 #[error("serialization error: {0}")]
146 Serialization(String),
147
148 #[error("discovery not started")]
150 NotStarted,
151
152 #[error("discovery shut down")]
154 ShutDown,
155
156 #[error("I/O error: {0}")]
158 Io(#[from] std::io::Error),
159}
160
161#[allow(async_fn_in_trait)]
167pub trait Discovery: Send + Sync + 'static {
168 async fn start(&mut self) -> Result<(), DiscoveryError>;
172
173 async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
175
176 async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
178
179 fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
184
185 async fn stop(&mut self) -> Result<(), DiscoveryError>;
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_node_id_display() {
195 assert_eq!(NodeId(42).to_string(), "node-42");
196 }
197
198 #[test]
199 fn test_node_id_unassigned() {
200 assert!(NodeId::UNASSIGNED.is_unassigned());
201 assert!(!NodeId(1).is_unassigned());
202 }
203
204 #[test]
205 fn test_node_state_display() {
206 assert_eq!(NodeState::Active.to_string(), "active");
207 assert_eq!(NodeState::Suspected.to_string(), "suspected");
208 assert_eq!(NodeState::Draining.to_string(), "draining");
209 }
210
211 #[test]
212 fn test_node_metadata_default() {
213 let meta = NodeMetadata::default();
214 assert_eq!(meta.cores, 1);
215 assert_eq!(meta.memory_bytes, 0);
216 assert!(meta.failure_domain.is_none());
217 assert!(meta.tags.is_empty());
218 assert!(meta.owned_partitions.is_empty());
219 }
220
221 #[test]
222 fn test_node_id_serialization() {
223 let id = NodeId(123);
224 let json = serde_json::to_string(&id).unwrap();
225 let back: NodeId = serde_json::from_str(&json).unwrap();
226 assert_eq!(id, back);
227 }
228
229 #[test]
230 fn test_node_info_serialization() {
231 let info = NodeInfo {
232 id: NodeId(1),
233 name: "test-node".into(),
234 rpc_address: "127.0.0.1:9000".into(),
235 raft_address: "127.0.0.1:9001".into(),
236 state: NodeState::Active,
237 metadata: NodeMetadata::default(),
238 last_heartbeat_ms: 1000,
239 };
240 let json = serde_json::to_string(&info).unwrap();
241 let back: NodeInfo = serde_json::from_str(&json).unwrap();
242 assert_eq!(back.id, info.id);
243 assert_eq!(back.name, "test-node");
244 }
245}