laminar_core/cluster/control/
decision.rs1use std::sync::Arc;
11
12use bytes::Bytes;
13use object_store::path::Path as OsPath;
14use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
15use tokio_stream::StreamExt;
16
17pub struct CheckpointDecisionStore {
20 store: Arc<dyn ObjectStore>,
21}
22
23impl std::fmt::Debug for CheckpointDecisionStore {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("CheckpointDecisionStore")
26 .finish_non_exhaustive()
27 }
28}
29
30#[derive(Debug, thiserror::Error)]
32pub enum DecisionError {
33 #[error("object store I/O: {0}")]
35 Io(String),
36}
37
38impl CheckpointDecisionStore {
39 #[must_use]
41 pub fn new(store: Arc<dyn ObjectStore>) -> Self {
42 Self { store }
43 }
44
45 fn path(epoch: u64) -> OsPath {
46 OsPath::from(format!("checkpoint-decisions/epoch={epoch}/commit"))
47 }
48
49 pub async fn record_committed(&self, epoch: u64) -> Result<bool, DecisionError> {
56 let opts = PutOptions {
57 mode: PutMode::Create,
58 ..PutOptions::default()
59 };
60 match self
61 .store
62 .put_opts(
63 &Self::path(epoch),
64 PutPayload::from(Bytes::from_static(b"")),
65 opts,
66 )
67 .await
68 {
69 Ok(_) => Ok(true),
70 Err(object_store::Error::AlreadyExists { .. }) => Ok(false),
71 Err(e) => Err(DecisionError::Io(e.to_string())),
72 }
73 }
74
75 pub async fn is_committed(&self, epoch: u64) -> Result<bool, DecisionError> {
80 match self.store.head(&Self::path(epoch)).await {
81 Ok(_) => Ok(true),
82 Err(object_store::Error::NotFound { .. }) => Ok(false),
83 Err(e) => Err(DecisionError::Io(e.to_string())),
84 }
85 }
86
87 pub async fn prune_before(&self, before: u64) -> Result<(), DecisionError> {
94 if before == 0 {
95 return Ok(());
96 }
97 let root = OsPath::from("checkpoint-decisions/");
98 let mut entries = self.store.list(Some(&root));
99 let mut victims: Vec<OsPath> = Vec::new();
100 while let Some(entry) = entries.next().await {
101 let entry = entry.map_err(|e| DecisionError::Io(e.to_string()))?;
102 let loc = entry.location.as_ref();
103 let rest = loc.strip_prefix("checkpoint-decisions/").unwrap_or("");
104 let Some(seg) = rest.split('/').next() else {
105 continue;
106 };
107 let Some(n) = seg.strip_prefix("epoch=") else {
108 continue;
109 };
110 let Ok(epoch) = n.parse::<u64>() else {
111 continue;
112 };
113 if epoch < before {
114 victims.push(entry.location);
115 }
116 }
117 for victim in victims {
118 match self.store.delete(&victim).await {
119 Ok(()) | Err(object_store::Error::NotFound { .. }) => {}
120 Err(e) => tracing::warn!(error = %e, "decision prune: delete failed"),
121 }
122 }
123 Ok(())
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use object_store::local::LocalFileSystem;
131 use tempfile::tempdir;
132
133 fn store_in(dir: &std::path::Path) -> CheckpointDecisionStore {
134 let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
135 CheckpointDecisionStore::new(fs)
136 }
137
138 #[tokio::test]
139 async fn absent_before_recorded() {
140 let dir = tempdir().unwrap();
141 let s = store_in(dir.path());
142 assert!(!s.is_committed(1).await.unwrap());
143 }
144
145 #[tokio::test]
146 async fn record_then_read() {
147 let dir = tempdir().unwrap();
148 let s = store_in(dir.path());
149 assert!(s.record_committed(5).await.unwrap());
150 assert!(s.is_committed(5).await.unwrap());
151 }
152
153 #[tokio::test]
154 async fn second_record_is_noop() {
155 let dir = tempdir().unwrap();
156 let s = store_in(dir.path());
157 assert!(s.record_committed(7).await.unwrap());
158 assert!(!s.record_committed(7).await.unwrap());
159 assert!(s.is_committed(7).await.unwrap());
160 }
161
162 #[tokio::test]
163 async fn epochs_are_independent() {
164 let dir = tempdir().unwrap();
165 let s = store_in(dir.path());
166 s.record_committed(1).await.unwrap();
167 assert!(s.is_committed(1).await.unwrap());
168 assert!(!s.is_committed(2).await.unwrap());
169 }
170
171 #[tokio::test]
172 async fn prune_drops_older() {
173 let dir = tempdir().unwrap();
174 let s = store_in(dir.path());
175 for e in 1..=5 {
176 s.record_committed(e).await.unwrap();
177 }
178 s.prune_before(4).await.unwrap();
179 for e in 1..=3 {
180 assert!(
181 !s.is_committed(e).await.unwrap(),
182 "epoch {e} should be pruned"
183 );
184 }
185 for e in 4..=5 {
186 assert!(s.is_committed(e).await.unwrap(), "epoch {e} should remain");
187 }
188 }
189}