Skip to main content

laminar_core/cluster/control/
controller.rs

1//! Facade over `ClusterKv` + `BarrierCoordinator` + membership watch.
2//! `None` on `CheckpointCoordinator` means single-instance mode.
3
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::sync::watch;
9
10use super::barrier::{
11    BarrierAck, BarrierAnnouncement, BarrierCoordinator, ClusterKv, Phase, QuorumOutcome,
12};
13use super::leader::leader_of;
14use super::snapshot::AssignmentSnapshotStore;
15use crate::cluster::discovery::{NodeId, NodeInfo, NodeState};
16
17/// Facade composing the cluster-control primitives.
18pub struct ClusterController {
19    instance_id: NodeId,
20    barrier: BarrierCoordinator,
21    snapshot: Option<Arc<AssignmentSnapshotStore>>,
22    members_rx: watch::Receiver<Vec<NodeInfo>>,
23    /// Latest cluster-wide minimum watermark published by the leader
24    /// in a `Commit` announcement. `i64::MIN` means uninitialised
25    /// (no Commit observed yet). Operators consult this instead of
26    /// their local watermark so event-time decisions stay consistent
27    /// across the cluster.
28    cluster_min_watermark: Arc<AtomicI64>,
29}
30
31impl std::fmt::Debug for ClusterController {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("ClusterController")
34            .field("instance_id", &self.instance_id)
35            .finish_non_exhaustive()
36    }
37}
38
39impl ClusterController {
40    /// Wrap the given primitives.
41    #[must_use]
42    pub fn new(
43        instance_id: NodeId,
44        kv: Arc<dyn ClusterKv>,
45        snapshot: Option<Arc<AssignmentSnapshotStore>>,
46        members_rx: watch::Receiver<Vec<NodeInfo>>,
47    ) -> Self {
48        Self {
49            instance_id,
50            barrier: BarrierCoordinator::new(kv),
51            snapshot,
52            members_rx,
53            cluster_min_watermark: Arc::new(AtomicI64::new(i64::MIN)),
54        }
55    }
56
57    /// Latest cluster-wide minimum watermark seen by this instance.
58    /// `None` until the leader has published a `Commit` with a
59    /// populated `min_watermark_ms`.
60    #[must_use]
61    pub fn cluster_min_watermark(&self) -> Option<i64> {
62        let v = self.cluster_min_watermark.load(Ordering::Acquire);
63        if v == i64::MIN {
64            None
65        } else {
66            Some(v)
67        }
68    }
69
70    /// Leader-side monotonic publish. The leader computes the
71    /// cluster-wide minimum watermark in `await_prepare_quorum`
72    /// (its own local watermark folded with every follower's ack)
73    /// and must mirror it into the controller atomic so its own
74    /// operators see the same value that followers pick up via
75    /// `observe_barrier` on the matching `Commit`. Never lowers the
76    /// published value — event-time progress is monotonic.
77    pub fn publish_cluster_min_watermark(&self, wm: i64) {
78        let mut cur = self.cluster_min_watermark.load(Ordering::Acquire);
79        while wm > cur {
80            match self.cluster_min_watermark.compare_exchange(
81                cur,
82                wm,
83                Ordering::AcqRel,
84                Ordering::Acquire,
85            ) {
86                Ok(_) => break,
87                Err(observed) => cur = observed,
88            }
89        }
90    }
91
92    /// This instance's ID.
93    #[must_use]
94    pub fn instance_id(&self) -> NodeId {
95        self.instance_id
96    }
97
98    /// Current leader (lowest id among `Active` peers plus self).
99    #[must_use]
100    pub fn current_leader(&self) -> Option<NodeId> {
101        let members = self.members_rx.borrow();
102        let mut ids: Vec<NodeId> = members
103            .iter()
104            .filter(|m| matches!(m.state, NodeState::Active))
105            .map(|m| m.id)
106            .collect();
107        // Include ourselves — we're trivially Active from our own view.
108        ids.push(self.instance_id);
109        leader_of(&ids)
110    }
111
112    /// True if this instance is currently the leader.
113    #[must_use]
114    pub fn is_leader(&self) -> bool {
115        self.current_leader() == Some(self.instance_id)
116    }
117
118    /// Live instance IDs: `Active` peers plus self.
119    #[must_use]
120    pub fn live_instances(&self) -> Vec<NodeId> {
121        let mut ids: Vec<NodeId> = self
122            .members_rx
123            .borrow()
124            .iter()
125            .filter(|m| matches!(m.state, NodeState::Active))
126            .map(|m| m.id)
127            .collect();
128        ids.push(self.instance_id);
129        ids
130    }
131
132    /// Cloneable membership watch. Background tasks subscribe to
133    /// this to react to join/leave events (`changed().await`) without
134    /// polling [`Self::live_instances`] on a timer.
135    #[must_use]
136    pub fn members_watch(&self) -> watch::Receiver<Vec<NodeInfo>> {
137        self.members_rx.clone()
138    }
139
140    /// Leader-side announce.
141    ///
142    /// # Errors
143    /// Propagates [`BarrierCoordinator::announce`] errors.
144    pub async fn announce_barrier(&self, ann: &BarrierAnnouncement) -> Result<(), String> {
145        self.barrier.announce(ann).await
146    }
147
148    /// Follower-side observe; `Ok(None)` if no leader is visible.
149    ///
150    /// As a side effect, a `Commit` announcement with a populated
151    /// `min_watermark_ms` updates the shared cluster-min-watermark
152    /// atomic so operators on this instance see the cluster-wide
153    /// minimum without a separate polling path.
154    ///
155    /// # Errors
156    /// Propagates [`BarrierCoordinator::observe`] errors.
157    pub async fn observe_barrier(&self) -> Result<Option<BarrierAnnouncement>, String> {
158        let Some(leader) = self.current_leader() else {
159            return Ok(None);
160        };
161        let observed = self.barrier.observe(leader).await?;
162        if let Some(ref ann) = observed {
163            if ann.phase == Phase::Commit {
164                if let Some(wm) = ann.min_watermark_ms {
165                    // Monotonic publish — never lower the watermark,
166                    // even if a stale announcement re-gossips.
167                    let mut cur = self.cluster_min_watermark.load(Ordering::Acquire);
168                    while wm > cur {
169                        match self.cluster_min_watermark.compare_exchange(
170                            cur,
171                            wm,
172                            Ordering::AcqRel,
173                            Ordering::Acquire,
174                        ) {
175                            Ok(_) => break,
176                            Err(observed) => cur = observed,
177                        }
178                    }
179                }
180            }
181        }
182        Ok(observed)
183    }
184
185    /// Follower-side ack.
186    ///
187    /// # Errors
188    /// Propagates [`BarrierCoordinator::ack`] errors.
189    pub async fn ack_barrier(&self, ack: &BarrierAck) -> Result<(), String> {
190        self.barrier.ack(ack).await
191    }
192
193    /// Leader-side: poll until quorum or `deadline`.
194    pub async fn wait_for_quorum(
195        &self,
196        epoch: u64,
197        expected: &[NodeId],
198        deadline: Duration,
199    ) -> QuorumOutcome {
200        self.barrier
201            .wait_for_quorum(epoch, expected, deadline)
202            .await
203    }
204
205    /// Assignment snapshot store, if configured.
206    #[must_use]
207    pub fn snapshot_store(&self) -> Option<&AssignmentSnapshotStore> {
208        self.snapshot.as_deref()
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::cluster::control::barrier::InMemoryKv;
216    use crate::cluster::discovery::{NodeMetadata, NodeState};
217
218    fn info(id: u64) -> NodeInfo {
219        NodeInfo {
220            id: NodeId(id),
221            name: format!("n{id}"),
222            rpc_address: String::new(),
223            raft_address: String::new(),
224            state: NodeState::Active,
225            metadata: NodeMetadata::default(),
226            last_heartbeat_ms: 0,
227        }
228    }
229
230    fn ctl(self_id: u64, peers: Vec<NodeInfo>) -> ClusterController {
231        let (_tx, rx) = watch::channel(peers);
232        let kv: Arc<dyn ClusterKv> = Arc::new(InMemoryKv::new(NodeId(self_id)));
233        ClusterController::new(NodeId(self_id), kv, None, rx)
234    }
235
236    #[test]
237    fn is_leader_when_lowest_id() {
238        let c = ctl(1, vec![info(5), info(7)]);
239        assert!(c.is_leader());
240    }
241
242    #[test]
243    fn follower_when_peer_has_lower_id() {
244        let c = ctl(7, vec![info(3), info(5)]);
245        assert!(!c.is_leader());
246        assert_eq!(c.current_leader(), Some(NodeId(3)));
247    }
248
249    #[test]
250    fn solo_instance_is_leader() {
251        let c = ctl(42, vec![]);
252        assert!(c.is_leader());
253    }
254
255    #[tokio::test]
256    async fn announce_observe_roundtrip_when_alone() {
257        // Single-instance: self == leader; own announcement is visible
258        // to own observe.
259        let c = ctl(1, vec![]);
260        c.announce_barrier(&BarrierAnnouncement {
261            epoch: 5,
262            checkpoint_id: 1,
263            phase: crate::cluster::control::Phase::Prepare,
264            flags: 0,
265            min_watermark_ms: None,
266        })
267        .await
268        .unwrap();
269        let got = c.observe_barrier().await.unwrap().unwrap();
270        assert_eq!(got.epoch, 5);
271    }
272
273    #[test]
274    fn publish_cluster_min_watermark_is_monotonic() {
275        // Leader-side publish mirrors the monotonic contract the
276        // follower path already enforces via observe_barrier.
277        let c = ctl(1, vec![]);
278        assert_eq!(c.cluster_min_watermark(), None);
279
280        c.publish_cluster_min_watermark(100);
281        assert_eq!(c.cluster_min_watermark(), Some(100));
282
283        // Higher value advances.
284        c.publish_cluster_min_watermark(250);
285        assert_eq!(c.cluster_min_watermark(), Some(250));
286
287        // Lower value must not regress.
288        c.publish_cluster_min_watermark(42);
289        assert_eq!(c.cluster_min_watermark(), Some(250));
290
291        // Equal value is a no-op; still Some(250).
292        c.publish_cluster_min_watermark(250);
293        assert_eq!(c.cluster_min_watermark(), Some(250));
294    }
295
296    #[tokio::test]
297    async fn observe_commit_publishes_cluster_min_watermark() {
298        // Commit announcements with `min_watermark_ms` populated
299        // propagate into the shared atomic so operators can read
300        // cluster-wide progress without a separate channel.
301        let c = ctl(1, vec![]);
302        assert_eq!(c.cluster_min_watermark(), None, "uninitialised");
303
304        c.announce_barrier(&BarrierAnnouncement {
305            epoch: 9,
306            checkpoint_id: 1,
307            phase: crate::cluster::control::Phase::Commit,
308            flags: 0,
309            min_watermark_ms: Some(12_345),
310        })
311        .await
312        .unwrap();
313        c.observe_barrier().await.unwrap();
314        assert_eq!(c.cluster_min_watermark(), Some(12_345));
315
316        // A later Commit with a lower value must NOT regress the atomic —
317        // event-time can only advance.
318        c.announce_barrier(&BarrierAnnouncement {
319            epoch: 10,
320            checkpoint_id: 2,
321            phase: crate::cluster::control::Phase::Commit,
322            flags: 0,
323            min_watermark_ms: Some(100), // stale re-gossip
324        })
325        .await
326        .unwrap();
327        c.observe_barrier().await.unwrap();
328        assert_eq!(
329            c.cluster_min_watermark(),
330            Some(12_345),
331            "stale Commit must not lower the published watermark",
332        );
333
334        // A Prepare announcement (no min_watermark_ms carried) is a no-op.
335        c.announce_barrier(&BarrierAnnouncement {
336            epoch: 11,
337            checkpoint_id: 3,
338            phase: crate::cluster::control::Phase::Prepare,
339            flags: 0,
340            min_watermark_ms: None,
341        })
342        .await
343        .unwrap();
344        c.observe_barrier().await.unwrap();
345        assert_eq!(c.cluster_min_watermark(), Some(12_345));
346    }
347}