laminar_core/delta/discovery/
mod.rs1#![allow(clippy::disallowed_types)] mod static_discovery;
13pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
14
15mod gossip_discovery;
16pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
17
18use std::collections::HashMap;
19use std::fmt;
20
21use serde::{Deserialize, Serialize};
22use tokio::sync::watch;
23
24#[derive(
26 Debug,
27 Clone,
28 Copy,
29 PartialEq,
30 Eq,
31 Hash,
32 Serialize,
33 Deserialize,
34 rkyv::Archive,
35 rkyv::Serialize,
36 rkyv::Deserialize,
37)]
38pub struct NodeId(pub u64);
39
40impl NodeId {
41 pub const UNASSIGNED: Self = Self(0);
43
44 #[must_use]
46 pub const fn is_unassigned(&self) -> bool {
47 self.0 == 0
48 }
49}
50
51impl fmt::Display for NodeId {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 write!(f, "node-{}", self.0)
54 }
55}
56
57#[derive(
59 Debug,
60 Clone,
61 Copy,
62 PartialEq,
63 Eq,
64 Serialize,
65 Deserialize,
66 rkyv::Archive,
67 rkyv::Serialize,
68 rkyv::Deserialize,
69)]
70pub enum NodeState {
71 Joining,
73 Active,
75 Suspected,
77 Draining,
79 Left,
81}
82
83impl fmt::Display for NodeState {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 match self {
86 Self::Joining => write!(f, "joining"),
87 Self::Active => write!(f, "active"),
88 Self::Suspected => write!(f, "suspected"),
89 Self::Draining => write!(f, "draining"),
90 Self::Left => write!(f, "left"),
91 }
92 }
93}
94
95#[derive(
97 Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
98)]
99pub struct NodeMetadata {
100 pub cores: u32,
102 pub memory_bytes: u64,
104 pub failure_domain: Option<String>,
106 pub tags: HashMap<String, String>,
108 pub owned_partitions: Vec<u32>,
110 pub version: String,
112}
113
114impl Default for NodeMetadata {
115 fn default() -> Self {
116 Self {
117 cores: 1,
118 memory_bytes: 0,
119 failure_domain: None,
120 tags: HashMap::new(),
121 owned_partitions: Vec::new(),
122 version: String::new(),
123 }
124 }
125}
126
127#[derive(
129 Debug, Clone, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize,
130)]
131pub struct NodeInfo {
132 pub id: NodeId,
134 pub name: String,
136 pub rpc_address: String,
138 pub raft_address: String,
140 pub state: NodeState,
142 pub metadata: NodeMetadata,
144 pub last_heartbeat_ms: i64,
146}
147
148#[derive(Debug, Clone)]
150pub enum MembershipEvent {
151 NodeJoined(Box<NodeInfo>),
153 NodeStateChanged {
155 node_id: NodeId,
157 old_state: NodeState,
159 new_state: NodeState,
161 },
162 NodeLeft(NodeId),
164}
165
166#[derive(Debug, thiserror::Error)]
168pub enum DiscoveryError {
169 #[error("bind error: {0}")]
171 Bind(String),
172
173 #[error("connection error to {address}: {reason}")]
175 Connection {
176 address: String,
178 reason: String,
180 },
181
182 #[error("serialization error: {0}")]
184 Serialization(String),
185
186 #[error("discovery not started")]
188 NotStarted,
189
190 #[error("discovery shut down")]
192 ShutDown,
193
194 #[error("I/O error: {0}")]
196 Io(#[from] std::io::Error),
197}
198
199#[allow(async_fn_in_trait)]
205pub trait Discovery: Send + Sync + 'static {
206 async fn start(&mut self) -> Result<(), DiscoveryError>;
210
211 async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
213
214 async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
216
217 fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
222
223 async fn stop(&mut self) -> Result<(), DiscoveryError>;
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_node_id_display() {
233 assert_eq!(NodeId(42).to_string(), "node-42");
234 }
235
236 #[test]
237 fn test_node_id_unassigned() {
238 assert!(NodeId::UNASSIGNED.is_unassigned());
239 assert!(!NodeId(1).is_unassigned());
240 }
241
242 #[test]
243 fn test_node_state_display() {
244 assert_eq!(NodeState::Active.to_string(), "active");
245 assert_eq!(NodeState::Suspected.to_string(), "suspected");
246 assert_eq!(NodeState::Draining.to_string(), "draining");
247 }
248
249 #[test]
250 fn test_node_metadata_default() {
251 let meta = NodeMetadata::default();
252 assert_eq!(meta.cores, 1);
253 assert_eq!(meta.memory_bytes, 0);
254 assert!(meta.failure_domain.is_none());
255 assert!(meta.tags.is_empty());
256 assert!(meta.owned_partitions.is_empty());
257 }
258
259 #[test]
260 fn test_node_id_serialization() {
261 let id = NodeId(123);
262 let json = serde_json::to_string(&id).unwrap();
263 let back: NodeId = serde_json::from_str(&json).unwrap();
264 assert_eq!(id, back);
265 }
266
267 #[test]
268 fn test_node_info_serialization() {
269 let info = NodeInfo {
270 id: NodeId(1),
271 name: "test-node".into(),
272 rpc_address: "127.0.0.1:9000".into(),
273 raft_address: "127.0.0.1:9001".into(),
274 state: NodeState::Active,
275 metadata: NodeMetadata::default(),
276 last_heartbeat_ms: 1000,
277 };
278 let json = serde_json::to_string(&info).unwrap();
279 let back: NodeInfo = serde_json::from_str(&json).unwrap();
280 assert_eq!(back.id, info.id);
281 assert_eq!(back.name, "test-node");
282 }
283}