Skip to main content

laminar_core/cluster/control/
decision.rs

1//! Durable cluster-wide commit marker for checkpoint 2PC.
2//!
3//! The leader's `Commit` barrier announcement on gossip KV is
4//! ephemeral. A leader that crashes between "announce Commit" and
5//! "commit own sinks" leaves surviving followers in a different state
6//! than a new leader, which would otherwise pick `Abort` as the safe
7//! default. We record a durable marker on shared storage _before_ the
8//! announcement so recovery can recover the cluster vote.
9
10use std::sync::Arc;
11
12use bytes::Bytes;
13use object_store::path::Path as OsPath;
14use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
15use tokio_stream::StreamExt;
16
17/// Per-epoch commit marker store. Presence means committed; absence
18/// means the leader never reached the commit point (safe to abort).
19pub struct CheckpointDecisionStore {
20    store: Arc<dyn ObjectStore>,
21}
22
23impl std::fmt::Debug for CheckpointDecisionStore {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("CheckpointDecisionStore")
26            .finish_non_exhaustive()
27    }
28}
29
30/// Errors raised by [`CheckpointDecisionStore`] operations.
31#[derive(Debug, thiserror::Error)]
32pub enum DecisionError {
33    /// Underlying object-store I/O failure.
34    #[error("object store I/O: {0}")]
35    Io(String),
36}
37
38impl CheckpointDecisionStore {
39    /// Wrap an existing object store.
40    #[must_use]
41    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
42        Self { store }
43    }
44
45    fn path(epoch: u64) -> OsPath {
46        OsPath::from(format!("checkpoint-decisions/epoch={epoch}/commit"))
47    }
48
49    /// CAS-create the commit marker for `epoch`. `Ok(true)` means our
50    /// write landed; `Ok(false)` means someone else recorded first
51    /// (idempotent — retries after commit are cheap no-ops).
52    ///
53    /// # Errors
54    /// Object-store I/O.
55    pub async fn record_committed(&self, epoch: u64) -> Result<bool, DecisionError> {
56        let opts = PutOptions {
57            mode: PutMode::Create,
58            ..PutOptions::default()
59        };
60        match self
61            .store
62            .put_opts(
63                &Self::path(epoch),
64                PutPayload::from(Bytes::from_static(b"")),
65                opts,
66            )
67            .await
68        {
69            Ok(_) => Ok(true),
70            Err(object_store::Error::AlreadyExists { .. }) => Ok(false),
71            Err(e) => Err(DecisionError::Io(e.to_string())),
72        }
73    }
74
75    /// True iff a commit marker exists for `epoch`.
76    ///
77    /// # Errors
78    /// Object-store I/O.
79    pub async fn is_committed(&self, epoch: u64) -> Result<bool, DecisionError> {
80        match self.store.head(&Self::path(epoch)).await {
81            Ok(_) => Ok(true),
82            Err(object_store::Error::NotFound { .. }) => Ok(false),
83            Err(e) => Err(DecisionError::Io(e.to_string())),
84        }
85    }
86
87    /// Delete commit markers for `epoch < before`. Called by the
88    /// checkpoint coordinator after its state-backend prune so
89    /// markers don't accumulate one-per-checkpoint forever.
90    ///
91    /// # Errors
92    /// Object-store I/O.
93    pub async fn prune_before(&self, before: u64) -> Result<(), DecisionError> {
94        if before == 0 {
95            return Ok(());
96        }
97        let root = OsPath::from("checkpoint-decisions/");
98        let mut entries = self.store.list(Some(&root));
99        let mut victims: Vec<OsPath> = Vec::new();
100        while let Some(entry) = entries.next().await {
101            let entry = entry.map_err(|e| DecisionError::Io(e.to_string()))?;
102            let loc = entry.location.as_ref();
103            let rest = loc.strip_prefix("checkpoint-decisions/").unwrap_or("");
104            let Some(seg) = rest.split('/').next() else {
105                continue;
106            };
107            let Some(n) = seg.strip_prefix("epoch=") else {
108                continue;
109            };
110            let Ok(epoch) = n.parse::<u64>() else {
111                continue;
112            };
113            if epoch < before {
114                victims.push(entry.location);
115            }
116        }
117        for victim in victims {
118            match self.store.delete(&victim).await {
119                Ok(()) | Err(object_store::Error::NotFound { .. }) => {}
120                Err(e) => tracing::warn!(error = %e, "decision prune: delete failed"),
121            }
122        }
123        Ok(())
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use object_store::local::LocalFileSystem;
131    use tempfile::tempdir;
132
133    fn store_in(dir: &std::path::Path) -> CheckpointDecisionStore {
134        let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
135        CheckpointDecisionStore::new(fs)
136    }
137
138    #[tokio::test]
139    async fn absent_before_recorded() {
140        let dir = tempdir().unwrap();
141        let s = store_in(dir.path());
142        assert!(!s.is_committed(1).await.unwrap());
143    }
144
145    #[tokio::test]
146    async fn record_then_read() {
147        let dir = tempdir().unwrap();
148        let s = store_in(dir.path());
149        assert!(s.record_committed(5).await.unwrap());
150        assert!(s.is_committed(5).await.unwrap());
151    }
152
153    #[tokio::test]
154    async fn second_record_is_noop() {
155        let dir = tempdir().unwrap();
156        let s = store_in(dir.path());
157        assert!(s.record_committed(7).await.unwrap());
158        assert!(!s.record_committed(7).await.unwrap());
159        assert!(s.is_committed(7).await.unwrap());
160    }
161
162    #[tokio::test]
163    async fn epochs_are_independent() {
164        let dir = tempdir().unwrap();
165        let s = store_in(dir.path());
166        s.record_committed(1).await.unwrap();
167        assert!(s.is_committed(1).await.unwrap());
168        assert!(!s.is_committed(2).await.unwrap());
169    }
170
171    #[tokio::test]
172    async fn prune_drops_older() {
173        let dir = tempdir().unwrap();
174        let s = store_in(dir.path());
175        for e in 1..=5 {
176            s.record_committed(e).await.unwrap();
177        }
178        s.prune_before(4).await.unwrap();
179        for e in 1..=3 {
180            assert!(
181                !s.is_committed(e).await.unwrap(),
182                "epoch {e} should be pruned"
183            );
184        }
185        for e in 4..=5 {
186            assert!(s.is_committed(e).await.unwrap(), "epoch {e} should remain");
187        }
188    }
189}