Skip to main content

laminar_core/cluster/control/
chitchat_kv.rs

1//! Chitchat-backed [`ClusterKv`]. Reads filter by `live_nodes()` so
2//! suspected peers don't count toward quorum.
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7
8use super::barrier::ClusterKv;
9use crate::cluster::discovery::NodeId;
10
11/// Chitchat-backed cluster KV.
12pub struct ChitchatKv {
13    chitchat: Arc<tokio::sync::Mutex<chitchat::Chitchat>>,
14}
15
16impl std::fmt::Debug for ChitchatKv {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        f.debug_struct("ChitchatKv").finish_non_exhaustive()
19    }
20}
21
22impl ChitchatKv {
23    /// Construct from a chitchat handle.
24    #[must_use]
25    pub fn from_handle(handle: &chitchat::ChitchatHandle) -> Self {
26        Self {
27            chitchat: handle.chitchat(),
28        }
29    }
30
31    /// Construct from a shared chitchat state.
32    #[must_use]
33    pub fn from_chitchat(chitchat: Arc<tokio::sync::Mutex<chitchat::Chitchat>>) -> Self {
34        Self { chitchat }
35    }
36}
37
38fn encode_node_id(node_id: NodeId) -> String {
39    format!("node-{}", node_id.0)
40}
41
42fn decode_chitchat_id(id: &chitchat::ChitchatId) -> Option<NodeId> {
43    id.node_id
44        .strip_prefix("node-")?
45        .parse::<u64>()
46        .ok()
47        .map(NodeId)
48}
49
50#[async_trait]
51impl ClusterKv for ChitchatKv {
52    async fn write(&self, key: &str, value: String) {
53        let mut guard = self.chitchat.lock().await;
54        guard.self_node_state().set(key, value);
55    }
56
57    async fn read_from(&self, who: NodeId, key: &str) -> Option<String> {
58        let target = encode_node_id(who);
59        let guard = self.chitchat.lock().await;
60        for (cc_id, state) in guard.node_states() {
61            if cc_id.node_id == target {
62                return state.get(key).map(str::to_string);
63            }
64        }
65        None
66    }
67
68    async fn scan(&self, key: &str) -> Vec<(NodeId, String)> {
69        let guard = self.chitchat.lock().await;
70        let live: Vec<&chitchat::ChitchatId> = guard.live_nodes().collect();
71        let mut out = Vec::new();
72        for (cc_id, state) in guard.node_states() {
73            if !live.contains(&cc_id) {
74                continue;
75            }
76            let Some(node_id) = decode_chitchat_id(cc_id) else {
77                continue;
78            };
79            if let Some(value) = state.get(key) {
80                out.push((node_id, value.to_string()));
81            }
82        }
83        out
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90
91    #[test]
92    fn node_id_encoding_roundtrips() {
93        for &v in &[1u64, 42, u64::MAX] {
94            let s = encode_node_id(NodeId(v));
95            assert!(s.starts_with("node-"), "got: {s}");
96            let parsed: u64 = s.strip_prefix("node-").unwrap().parse().unwrap();
97            assert_eq!(parsed, v);
98        }
99    }
100
101    #[test]
102    fn decode_rejects_unexpected_formats() {
103        let bad = chitchat::ChitchatId::new("foo".to_string(), 0, "127.0.0.1:1".parse().unwrap());
104        assert_eq!(decode_chitchat_id(&bad), None);
105    }
106
107    #[test]
108    fn decode_accepts_valid_format() {
109        let good =
110            chitchat::ChitchatId::new("node-42".to_string(), 0, "127.0.0.1:1".parse().unwrap());
111        assert_eq!(decode_chitchat_id(&good), Some(NodeId(42)));
112    }
113}