laminar_core/cluster/control/
chitchat_kv.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7
8use super::barrier::ClusterKv;
9use crate::cluster::discovery::NodeId;
10
11pub struct ChitchatKv {
13 chitchat: Arc<tokio::sync::Mutex<chitchat::Chitchat>>,
14}
15
16impl std::fmt::Debug for ChitchatKv {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 f.debug_struct("ChitchatKv").finish_non_exhaustive()
19 }
20}
21
22impl ChitchatKv {
23 #[must_use]
25 pub fn from_handle(handle: &chitchat::ChitchatHandle) -> Self {
26 Self {
27 chitchat: handle.chitchat(),
28 }
29 }
30
31 #[must_use]
33 pub fn from_chitchat(chitchat: Arc<tokio::sync::Mutex<chitchat::Chitchat>>) -> Self {
34 Self { chitchat }
35 }
36}
37
38fn encode_node_id(node_id: NodeId) -> String {
39 format!("node-{}", node_id.0)
40}
41
42fn decode_chitchat_id(id: &chitchat::ChitchatId) -> Option<NodeId> {
43 id.node_id
44 .strip_prefix("node-")?
45 .parse::<u64>()
46 .ok()
47 .map(NodeId)
48}
49
50#[async_trait]
51impl ClusterKv for ChitchatKv {
52 async fn write(&self, key: &str, value: String) {
53 let mut guard = self.chitchat.lock().await;
54 guard.self_node_state().set(key, value);
55 }
56
57 async fn read_from(&self, who: NodeId, key: &str) -> Option<String> {
58 let target = encode_node_id(who);
59 let guard = self.chitchat.lock().await;
60 for (cc_id, state) in guard.node_states() {
61 if cc_id.node_id == target {
62 return state.get(key).map(str::to_string);
63 }
64 }
65 None
66 }
67
68 async fn scan(&self, key: &str) -> Vec<(NodeId, String)> {
69 let guard = self.chitchat.lock().await;
70 let live: Vec<&chitchat::ChitchatId> = guard.live_nodes().collect();
71 let mut out = Vec::new();
72 for (cc_id, state) in guard.node_states() {
73 if !live.contains(&cc_id) {
74 continue;
75 }
76 let Some(node_id) = decode_chitchat_id(cc_id) else {
77 continue;
78 };
79 if let Some(value) = state.get(key) {
80 out.push((node_id, value.to_string()));
81 }
82 }
83 out
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90
91 #[test]
92 fn node_id_encoding_roundtrips() {
93 for &v in &[1u64, 42, u64::MAX] {
94 let s = encode_node_id(NodeId(v));
95 assert!(s.starts_with("node-"), "got: {s}");
96 let parsed: u64 = s.strip_prefix("node-").unwrap().parse().unwrap();
97 assert_eq!(parsed, v);
98 }
99 }
100
101 #[test]
102 fn decode_rejects_unexpected_formats() {
103 let bad = chitchat::ChitchatId::new("foo".to_string(), 0, "127.0.0.1:1".parse().unwrap());
104 assert_eq!(decode_chitchat_id(&bad), None);
105 }
106
107 #[test]
108 fn decode_accepts_valid_format() {
109 let good =
110 chitchat::ChitchatId::new("node-42".to_string(), 0, "127.0.0.1:1".parse().unwrap());
111 assert_eq!(decode_chitchat_id(&good), Some(NodeId(42)));
112 }
113}