laminar_core/cluster/control/
controller.rs1use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::sync::watch;
9
10use super::barrier::{
11 BarrierAck, BarrierAnnouncement, BarrierCoordinator, ClusterKv, Phase, QuorumOutcome,
12};
13use super::leader::leader_of;
14use super::snapshot::AssignmentSnapshotStore;
15use crate::cluster::discovery::{NodeId, NodeInfo, NodeState};
16
17pub struct ClusterController {
19 instance_id: NodeId,
20 barrier: BarrierCoordinator,
21 snapshot: Option<Arc<AssignmentSnapshotStore>>,
22 members_rx: watch::Receiver<Vec<NodeInfo>>,
23 cluster_min_watermark: Arc<AtomicI64>,
29}
30
31impl std::fmt::Debug for ClusterController {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 f.debug_struct("ClusterController")
34 .field("instance_id", &self.instance_id)
35 .finish_non_exhaustive()
36 }
37}
38
39impl ClusterController {
40 #[must_use]
42 pub fn new(
43 instance_id: NodeId,
44 kv: Arc<dyn ClusterKv>,
45 snapshot: Option<Arc<AssignmentSnapshotStore>>,
46 members_rx: watch::Receiver<Vec<NodeInfo>>,
47 ) -> Self {
48 Self {
49 instance_id,
50 barrier: BarrierCoordinator::new(kv),
51 snapshot,
52 members_rx,
53 cluster_min_watermark: Arc::new(AtomicI64::new(i64::MIN)),
54 }
55 }
56
57 #[must_use]
61 pub fn cluster_min_watermark(&self) -> Option<i64> {
62 let v = self.cluster_min_watermark.load(Ordering::Acquire);
63 if v == i64::MIN {
64 None
65 } else {
66 Some(v)
67 }
68 }
69
70 pub fn publish_cluster_min_watermark(&self, wm: i64) {
78 let mut cur = self.cluster_min_watermark.load(Ordering::Acquire);
79 while wm > cur {
80 match self.cluster_min_watermark.compare_exchange(
81 cur,
82 wm,
83 Ordering::AcqRel,
84 Ordering::Acquire,
85 ) {
86 Ok(_) => break,
87 Err(observed) => cur = observed,
88 }
89 }
90 }
91
92 #[must_use]
94 pub fn instance_id(&self) -> NodeId {
95 self.instance_id
96 }
97
98 #[must_use]
100 pub fn current_leader(&self) -> Option<NodeId> {
101 let members = self.members_rx.borrow();
102 let mut ids: Vec<NodeId> = members
103 .iter()
104 .filter(|m| matches!(m.state, NodeState::Active))
105 .map(|m| m.id)
106 .collect();
107 ids.push(self.instance_id);
109 leader_of(&ids)
110 }
111
112 #[must_use]
114 pub fn is_leader(&self) -> bool {
115 self.current_leader() == Some(self.instance_id)
116 }
117
118 #[must_use]
120 pub fn live_instances(&self) -> Vec<NodeId> {
121 let mut ids: Vec<NodeId> = self
122 .members_rx
123 .borrow()
124 .iter()
125 .filter(|m| matches!(m.state, NodeState::Active))
126 .map(|m| m.id)
127 .collect();
128 ids.push(self.instance_id);
129 ids
130 }
131
132 #[must_use]
136 pub fn members_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
137 self.members_rx.clone()
138 }
139
140 pub async fn announce_barrier(&self, ann: &BarrierAnnouncement) -> Result<(), String> {
145 self.barrier.announce(ann).await
146 }
147
148 pub async fn observe_barrier(&self) -> Result<Option<BarrierAnnouncement>, String> {
158 let Some(leader) = self.current_leader() else {
159 return Ok(None);
160 };
161 let observed = self.barrier.observe(leader).await?;
162 if let Some(ref ann) = observed {
163 if ann.phase == Phase::Commit {
164 if let Some(wm) = ann.min_watermark_ms {
165 let mut cur = self.cluster_min_watermark.load(Ordering::Acquire);
168 while wm > cur {
169 match self.cluster_min_watermark.compare_exchange(
170 cur,
171 wm,
172 Ordering::AcqRel,
173 Ordering::Acquire,
174 ) {
175 Ok(_) => break,
176 Err(observed) => cur = observed,
177 }
178 }
179 }
180 }
181 }
182 Ok(observed)
183 }
184
185 pub async fn ack_barrier(&self, ack: &BarrierAck) -> Result<(), String> {
190 self.barrier.ack(ack).await
191 }
192
193 pub async fn wait_for_quorum(
195 &self,
196 epoch: u64,
197 expected: &[NodeId],
198 deadline: Duration,
199 ) -> QuorumOutcome {
200 self.barrier
201 .wait_for_quorum(epoch, expected, deadline)
202 .await
203 }
204
205 #[must_use]
207 pub fn snapshot_store(&self) -> Option<&AssignmentSnapshotStore> {
208 self.snapshot.as_deref()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::cluster::control::barrier::InMemoryKv;
216 use crate::cluster::discovery::{NodeMetadata, NodeState};
217
218 fn info(id: u64) -> NodeInfo {
219 NodeInfo {
220 id: NodeId(id),
221 name: format!("n{id}"),
222 rpc_address: String::new(),
223 raft_address: String::new(),
224 state: NodeState::Active,
225 metadata: NodeMetadata::default(),
226 last_heartbeat_ms: 0,
227 }
228 }
229
230 fn ctl(self_id: u64, peers: Vec<NodeInfo>) -> ClusterController {
231 let (_tx, rx) = watch::channel(peers);
232 let kv: Arc<dyn ClusterKv> = Arc::new(InMemoryKv::new(NodeId(self_id)));
233 ClusterController::new(NodeId(self_id), kv, None, rx)
234 }
235
236 #[test]
237 fn is_leader_when_lowest_id() {
238 let c = ctl(1, vec![info(5), info(7)]);
239 assert!(c.is_leader());
240 }
241
242 #[test]
243 fn follower_when_peer_has_lower_id() {
244 let c = ctl(7, vec![info(3), info(5)]);
245 assert!(!c.is_leader());
246 assert_eq!(c.current_leader(), Some(NodeId(3)));
247 }
248
249 #[test]
250 fn solo_instance_is_leader() {
251 let c = ctl(42, vec![]);
252 assert!(c.is_leader());
253 }
254
255 #[tokio::test]
256 async fn announce_observe_roundtrip_when_alone() {
257 let c = ctl(1, vec![]);
260 c.announce_barrier(&BarrierAnnouncement {
261 epoch: 5,
262 checkpoint_id: 1,
263 phase: crate::cluster::control::Phase::Prepare,
264 flags: 0,
265 min_watermark_ms: None,
266 })
267 .await
268 .unwrap();
269 let got = c.observe_barrier().await.unwrap().unwrap();
270 assert_eq!(got.epoch, 5);
271 }
272
273 #[test]
274 fn publish_cluster_min_watermark_is_monotonic() {
275 let c = ctl(1, vec![]);
278 assert_eq!(c.cluster_min_watermark(), None);
279
280 c.publish_cluster_min_watermark(100);
281 assert_eq!(c.cluster_min_watermark(), Some(100));
282
283 c.publish_cluster_min_watermark(250);
285 assert_eq!(c.cluster_min_watermark(), Some(250));
286
287 c.publish_cluster_min_watermark(42);
289 assert_eq!(c.cluster_min_watermark(), Some(250));
290
291 c.publish_cluster_min_watermark(250);
293 assert_eq!(c.cluster_min_watermark(), Some(250));
294 }
295
296 #[tokio::test]
297 async fn observe_commit_publishes_cluster_min_watermark() {
298 let c = ctl(1, vec![]);
302 assert_eq!(c.cluster_min_watermark(), None, "uninitialised");
303
304 c.announce_barrier(&BarrierAnnouncement {
305 epoch: 9,
306 checkpoint_id: 1,
307 phase: crate::cluster::control::Phase::Commit,
308 flags: 0,
309 min_watermark_ms: Some(12_345),
310 })
311 .await
312 .unwrap();
313 c.observe_barrier().await.unwrap();
314 assert_eq!(c.cluster_min_watermark(), Some(12_345));
315
316 c.announce_barrier(&BarrierAnnouncement {
319 epoch: 10,
320 checkpoint_id: 2,
321 phase: crate::cluster::control::Phase::Commit,
322 flags: 0,
323 min_watermark_ms: Some(100), })
325 .await
326 .unwrap();
327 c.observe_barrier().await.unwrap();
328 assert_eq!(
329 c.cluster_min_watermark(),
330 Some(12_345),
331 "stale Commit must not lower the published watermark",
332 );
333
334 c.announce_barrier(&BarrierAnnouncement {
336 epoch: 11,
337 checkpoint_id: 3,
338 phase: crate::cluster::control::Phase::Prepare,
339 flags: 0,
340 min_watermark_ms: None,
341 })
342 .await
343 .unwrap();
344 c.observe_barrier().await.unwrap();
345 assert_eq!(c.cluster_min_watermark(), Some(12_345));
346 }
347}