Skip to main content

laminar_core/cluster/control/
snapshot.rs

1//! Durable vnode→instance assignment snapshots. One object per
2//! version at `control/assignment-snapshots/v{N:016}.json`. Chitchat
3//! carries the ephemeral copy; these files survive full-cluster
4//! restart.
5//!
6//! Rotation uses `PutMode::Create` on a per-version path so the CAS
7//! works on every backend (`LocalFileSystem` included, which doesn't
8//! yet implement `PutMode::Update`).
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::Bytes;
14use object_store::path::Path as OsPath;
15use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
16use serde::{Deserialize, Serialize};
17use tokio_stream::StreamExt;
18
19use crate::cluster::discovery::NodeId;
20
21const SNAPSHOT_PREFIX: &str = "control/assignment-snapshots/";
22
23fn snapshot_path(version: u64) -> OsPath {
24    // Fixed-width so lexicographic list order matches numeric order.
25    OsPath::from(format!("{SNAPSHOT_PREFIX}v{version:016}.json"))
26}
27
28/// Durable vnode-to-instance assignment snapshot.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct AssignmentSnapshot {
31    /// Monotonic version. Writers bump on each update.
32    pub version: u64,
33    /// Vnode id → owning instance. `BTreeMap` (not `Vec`) so snapshots
34    /// with different `vnode_count` are still deserializable — sparse
35    /// indices surface as missing keys the caller can diagnose.
36    pub vnodes: BTreeMap<u32, NodeId>,
37    /// Wall-clock timestamp of the last update, millis since epoch.
38    pub updated_at_ms: i64,
39}
40
41impl AssignmentSnapshot {
42    /// Empty snapshot at version 0.
43    #[must_use]
44    pub fn empty() -> Self {
45        Self {
46            version: 0,
47            vnodes: BTreeMap::new(),
48            updated_at_ms: 0,
49        }
50    }
51
52    /// Next snapshot with bumped version and current wall-clock time.
53    #[must_use]
54    pub fn next(&self, vnodes: BTreeMap<u32, NodeId>) -> Self {
55        #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
56        let now_ms = std::time::SystemTime::now()
57            .duration_since(std::time::UNIX_EPOCH)
58            .map_or(0, |d| d.as_millis() as i64);
59        Self {
60            version: self.version + 1,
61            vnodes,
62            updated_at_ms: now_ms,
63        }
64    }
65
66    /// Convert a `Vec<NodeId>` (one entry per vnode id, dense) into the
67    /// `BTreeMap` shape this snapshot uses. Mirrors the layout returned
68    /// by `round_robin_assignment`.
69    #[must_use]
70    pub fn vnodes_from_vec(assignment: &[NodeId]) -> BTreeMap<u32, NodeId> {
71        #[allow(clippy::cast_possible_truncation)]
72        assignment
73            .iter()
74            .enumerate()
75            .map(|(i, n)| (i as u32, *n))
76            .collect()
77    }
78
79    /// Dense `Vec<NodeId>` of length `vnode_count`. Missing entries (a
80    /// stale snapshot from a smaller vnode topology) are filled with
81    /// `NodeId::UNASSIGNED` so callers can detect the mismatch.
82    #[must_use]
83    pub fn to_vnode_vec(&self, vnode_count: u32) -> Vec<NodeId> {
84        (0..vnode_count)
85            .map(|v| self.vnodes.get(&v).copied().unwrap_or(NodeId::UNASSIGNED))
86            .collect()
87    }
88}
89
90/// I/O wrapper for [`AssignmentSnapshot`] on an object store.
91pub struct AssignmentSnapshotStore {
92    store: Arc<dyn ObjectStore>,
93}
94
95impl std::fmt::Debug for AssignmentSnapshotStore {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("AssignmentSnapshotStore")
98            .finish_non_exhaustive()
99    }
100}
101
102/// Errors loading or saving an [`AssignmentSnapshot`].
103#[derive(Debug, thiserror::Error)]
104pub enum SnapshotError {
105    /// Underlying object store I/O failure.
106    #[error("object store I/O: {0}")]
107    Io(String),
108    /// JSON de/serialization failure.
109    #[error("JSON: {0}")]
110    Json(#[from] serde_json::Error),
111}
112
113impl AssignmentSnapshotStore {
114    /// Wrap a pre-constructed object store.
115    #[must_use]
116    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
117        Self { store }
118    }
119
120    /// Scan the snapshot prefix and return every stored version in
121    /// ascending order. Cheap on small clusters (rotations are rare);
122    /// the list operation is one round trip on every backend.
123    async fn list_versions(&self) -> Result<Vec<u64>, SnapshotError> {
124        let prefix = OsPath::from(SNAPSHOT_PREFIX);
125        let mut entries = self.store.list(Some(&prefix));
126        let mut versions: Vec<u64> = Vec::new();
127        while let Some(entry) = entries.next().await {
128            let entry = entry.map_err(|e| SnapshotError::Io(e.to_string()))?;
129            let loc = entry.location.as_ref();
130            // Accept only `v{N:016}.json` entries so unrelated
131            // siblings in the bucket can't shift the CAS token.
132            let Some(rest) = loc.strip_prefix(SNAPSHOT_PREFIX) else {
133                continue;
134            };
135            let Some(num) = rest.strip_prefix('v').and_then(|s| s.strip_suffix(".json")) else {
136                continue;
137            };
138            if let Ok(v) = num.parse::<u64>() {
139                versions.push(v);
140            }
141        }
142        versions.sort_unstable();
143        Ok(versions)
144    }
145
146    /// Load the current (highest-versioned) snapshot; `Ok(None)` on
147    /// fresh cluster.
148    ///
149    /// # Errors
150    /// Object-store I/O or JSON decode failure.
151    pub async fn load(&self) -> Result<Option<AssignmentSnapshot>, SnapshotError> {
152        let versions = self.list_versions().await?;
153        let Some(&latest) = versions.last() else {
154            return Ok(None);
155        };
156        self.load_version(latest).await
157    }
158
159    /// Load a specific version's snapshot. `Ok(None)` if that version
160    /// was never written or has been pruned.
161    ///
162    /// # Errors
163    /// Object-store I/O or JSON decode failure.
164    pub async fn load_version(
165        &self,
166        version: u64,
167    ) -> Result<Option<AssignmentSnapshot>, SnapshotError> {
168        let path = snapshot_path(version);
169        match self.store.get(&path).await {
170            Ok(res) => {
171                let bytes = res
172                    .bytes()
173                    .await
174                    .map_err(|e| SnapshotError::Io(e.to_string()))?;
175                let snap = serde_json::from_slice(&bytes)?;
176                Ok(Some(snap))
177            }
178            Err(object_store::Error::NotFound { .. }) => Ok(None),
179            Err(e) => Err(SnapshotError::Io(e.to_string())),
180        }
181    }
182
183    /// CAS-create the object for `snapshot.version`. `Ok(None)` means
184    /// another writer landed there first. Used for both the initial
185    /// seed and for rotations (via [`Self::save_if_version`]).
186    ///
187    /// # Errors
188    /// Object-store I/O or JSON encode failure.
189    pub async fn save_if_absent(
190        &self,
191        snapshot: &AssignmentSnapshot,
192    ) -> Result<Option<AssignmentSnapshot>, SnapshotError> {
193        let path = snapshot_path(snapshot.version);
194        let bytes = serde_json::to_vec_pretty(snapshot)?;
195        let opts = PutOptions {
196            mode: PutMode::Create,
197            ..PutOptions::default()
198        };
199        match self
200            .store
201            .put_opts(&path, PutPayload::from(Bytes::from(bytes)), opts)
202            .await
203        {
204            Ok(_) => Ok(Some(snapshot.clone())),
205            Err(object_store::Error::AlreadyExists { .. }) => Ok(None),
206            Err(e) => Err(SnapshotError::Io(e.to_string())),
207        }
208    }
209
210    /// Rotate to `snapshot` assuming the current durable version is
211    /// `prior_version`. Returns [`RotateOutcome::Conflict`] carrying
212    /// the winner's snapshot if a racer produced `prior_version + 1`
213    /// first.
214    ///
215    /// # Errors
216    /// Object-store I/O, JSON encode, or a non-monotonic version bump
217    /// (caller bug).
218    pub async fn save_if_version(
219        &self,
220        snapshot: &AssignmentSnapshot,
221        prior_version: u64,
222    ) -> Result<RotateOutcome, SnapshotError> {
223        if snapshot.version != prior_version + 1 {
224            return Err(SnapshotError::Io(format!(
225                "save_if_version requires monotonic +1 bump: prior={prior_version}, \
226                 proposed={}",
227                snapshot.version,
228            )));
229        }
230        if self.save_if_absent(snapshot).await?.is_some() {
231            return Ok(RotateOutcome::Rotated);
232        }
233        let winner = self.load_version(snapshot.version).await?.ok_or_else(|| {
234            SnapshotError::Io("CAS conflict but load of winner returned None".into())
235        })?;
236        Ok(RotateOutcome::Conflict(winner))
237    }
238
239    /// Delete every snapshot object with `version < before`.
240    /// Idempotent — missing objects are tolerated.
241    ///
242    /// # Errors
243    /// Object-store I/O.
244    pub async fn prune_before(&self, before: u64) -> Result<(), SnapshotError> {
245        if before == 0 {
246            return Ok(());
247        }
248        let versions = self.list_versions().await?;
249        for v in versions {
250            if v >= before {
251                break;
252            }
253            let path = snapshot_path(v);
254            match self.store.delete(&path).await {
255                Ok(()) | Err(object_store::Error::NotFound { .. }) => {}
256                Err(e) => {
257                    tracing::warn!(version = v, error = %e, "snapshot prune: delete failed");
258                }
259            }
260        }
261        Ok(())
262    }
263}
264
265/// Outcome of [`AssignmentSnapshotStore::save_if_version`].
266#[derive(Debug, Clone)]
267pub enum RotateOutcome {
268    /// Our write landed. The snapshot we passed in is now canonical.
269    Rotated,
270    /// Another writer (a racing leader) won the CAS. The attached
271    /// snapshot is what's currently durable; the caller must adopt it
272    /// rather than retry with a stale view.
273    Conflict(AssignmentSnapshot),
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use object_store::local::LocalFileSystem;
280    use tempfile::tempdir;
281
282    fn store_in(dir: &std::path::Path) -> AssignmentSnapshotStore {
283        let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
284        AssignmentSnapshotStore::new(fs)
285    }
286
287    #[tokio::test]
288    async fn load_missing_returns_none() {
289        let dir = tempdir().unwrap();
290        let s = store_in(dir.path());
291        assert!(s.load().await.unwrap().is_none());
292    }
293
294    #[tokio::test]
295    async fn save_if_absent_then_load_roundtrip() {
296        let dir = tempdir().unwrap();
297        let s = store_in(dir.path());
298
299        let mut vnodes = BTreeMap::new();
300        vnodes.insert(0, NodeId(1));
301        vnodes.insert(1, NodeId(2));
302        let snap = AssignmentSnapshot::empty().next(vnodes);
303
304        assert_eq!(s.save_if_absent(&snap).await.unwrap().as_ref(), Some(&snap),);
305        let loaded = s.load().await.unwrap().unwrap();
306        assert_eq!(loaded, snap);
307    }
308
309    #[tokio::test]
310    async fn load_returns_highest_version() {
311        let dir = tempdir().unwrap();
312        let s = store_in(dir.path());
313
314        let mut v1_map = BTreeMap::new();
315        v1_map.insert(0, NodeId(1));
316        let v1 = AssignmentSnapshot::empty().next(v1_map);
317        s.save_if_absent(&v1).await.unwrap();
318
319        let mut v2_map = BTreeMap::new();
320        v2_map.insert(0, NodeId(2));
321        let v2 = v1.next(v2_map);
322        // Rotate via save_if_version — the canonical post-boot path.
323        assert!(matches!(
324            s.save_if_version(&v2, v1.version).await.unwrap(),
325            RotateOutcome::Rotated,
326        ));
327
328        let loaded = s.load().await.unwrap().unwrap();
329        assert_eq!(loaded.version, 2);
330        assert_eq!(loaded.vnodes.get(&0), Some(&NodeId(2)));
331
332        // Older version is still readable directly until pruned.
333        let v1_loaded = s.load_version(1).await.unwrap().unwrap();
334        assert_eq!(v1_loaded, v1);
335    }
336
337    #[tokio::test]
338    async fn save_if_absent_first_writer_wins() {
339        let dir = tempdir().unwrap();
340        let s = store_in(dir.path());
341
342        let mut first_map = BTreeMap::new();
343        first_map.insert(0, NodeId(1));
344        first_map.insert(1, NodeId(2));
345        let first = AssignmentSnapshot::empty().next(first_map);
346
347        let winner = s.save_if_absent(&first).await.unwrap();
348        assert_eq!(winner.as_ref(), Some(&first), "first writer must win");
349
350        // Second writer attempts a different assignment; should be
351        // rejected without mutating the store.
352        let mut second_map = BTreeMap::new();
353        second_map.insert(0, NodeId(99));
354        let second = AssignmentSnapshot::empty().next(second_map);
355        let rejected = s.save_if_absent(&second).await.unwrap();
356        assert!(rejected.is_none(), "second writer must lose the CAS");
357
358        let loaded = s.load().await.unwrap().unwrap();
359        assert_eq!(loaded, first, "stored snapshot is the first writer's");
360    }
361
362    #[tokio::test]
363    async fn save_if_version_rejects_non_monotonic_bump() {
364        let dir = tempdir().unwrap();
365        let s = store_in(dir.path());
366
367        let mut m = BTreeMap::new();
368        m.insert(0, NodeId(1));
369        let v1 = AssignmentSnapshot::empty().next(m);
370        s.save_if_absent(&v1).await.unwrap();
371
372        // Caller builds v3 but claims prior=1 — enforcing monotonic +1
373        // catches accidental gap-skipping bugs before they land on
374        // durable storage.
375        let mut m2 = BTreeMap::new();
376        m2.insert(0, NodeId(2));
377        let v2 = v1.next(m2);
378        let mut m3 = BTreeMap::new();
379        m3.insert(0, NodeId(3));
380        let v3 = v2.next(m3);
381        let err = s.save_if_version(&v3, 1).await.unwrap_err();
382        assert!(
383            matches!(err, SnapshotError::Io(msg) if msg.contains("monotonic")),
384            "non-monotonic bump must surface a clear error",
385        );
386    }
387
388    #[tokio::test]
389    async fn save_if_version_succeeds_on_match() {
390        let dir = tempdir().unwrap();
391        let s = store_in(dir.path());
392
393        let mut v1_map = BTreeMap::new();
394        v1_map.insert(0, NodeId(1));
395        let first = AssignmentSnapshot::empty().next(v1_map);
396        s.save_if_absent(&first).await.unwrap();
397
398        let mut v2_map = BTreeMap::new();
399        v2_map.insert(0, NodeId(2));
400        let second = first.next(v2_map);
401        let outcome = s.save_if_version(&second, first.version).await.unwrap();
402        assert!(matches!(outcome, RotateOutcome::Rotated));
403
404        let loaded = s.load().await.unwrap().unwrap();
405        assert_eq!(loaded, second);
406    }
407
408    #[tokio::test]
409    async fn save_if_version_conflict_surfaces_winner() {
410        // Two racing rotations both propose v2 from v1. CAS at
411        // `v{2}.json` picks one; the loser reloads and finds the
412        // winner's canonical snapshot.
413        let dir = tempdir().unwrap();
414        let s = store_in(dir.path());
415
416        let mut seed = BTreeMap::new();
417        seed.insert(0, NodeId(1));
418        let v1 = AssignmentSnapshot::empty().next(seed);
419        s.save_if_absent(&v1).await.unwrap();
420
421        let mut winner_map = BTreeMap::new();
422        winner_map.insert(0, NodeId(10));
423        let winner = v1.next(winner_map);
424        assert!(matches!(
425            s.save_if_version(&winner, v1.version).await.unwrap(),
426            RotateOutcome::Rotated,
427        ));
428
429        let mut loser_map = BTreeMap::new();
430        loser_map.insert(0, NodeId(20));
431        let loser = v1.next(loser_map);
432        match s.save_if_version(&loser, v1.version).await.unwrap() {
433            RotateOutcome::Conflict(current) => {
434                assert_eq!(
435                    current, winner,
436                    "conflict must surface the winner's snapshot",
437                );
438            }
439            RotateOutcome::Rotated => {
440                panic!("stale-token update must not win the CAS");
441            }
442        }
443
444        let loaded = s.load().await.unwrap().unwrap();
445        assert_eq!(loaded, winner, "stored snapshot is the CAS winner's");
446    }
447
448    #[tokio::test]
449    async fn prune_before_drops_old_versions() {
450        let dir = tempdir().unwrap();
451        let s = store_in(dir.path());
452
453        // Seed v1..=v4 by repeatedly rotating.
454        let mut m = BTreeMap::new();
455        m.insert(0, NodeId(1));
456        let mut current = AssignmentSnapshot::empty().next(m);
457        s.save_if_absent(&current).await.unwrap();
458        for _ in 0..3 {
459            let next = current.next(current.vnodes.clone());
460            s.save_if_version(&next, current.version).await.unwrap();
461            current = next;
462        }
463
464        s.prune_before(3).await.unwrap();
465
466        assert!(s.load_version(1).await.unwrap().is_none());
467        assert!(s.load_version(2).await.unwrap().is_none());
468        assert!(s.load_version(3).await.unwrap().is_some());
469        assert!(s.load_version(4).await.unwrap().is_some());
470        // `load()` still returns the most recent surviving snapshot.
471        assert_eq!(s.load().await.unwrap().unwrap().version, 4);
472    }
473
474    #[test]
475    fn empty_starts_at_version_zero() {
476        let s = AssignmentSnapshot::empty();
477        assert_eq!(s.version, 0);
478        assert!(s.vnodes.is_empty());
479    }
480
481    #[test]
482    fn next_bumps_version() {
483        let mut vnodes = BTreeMap::new();
484        vnodes.insert(0, NodeId(1));
485        let s = AssignmentSnapshot::empty().next(vnodes);
486        assert_eq!(s.version, 1);
487    }
488
489    #[test]
490    fn roundtrip_vec_conversions() {
491        let assignment = vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)];
492        let map = AssignmentSnapshot::vnodes_from_vec(&assignment);
493        let snap = AssignmentSnapshot::empty().next(map);
494        let back = snap.to_vnode_vec(u32::try_from(assignment.len()).expect("test len fits u32"));
495        assert_eq!(back, assignment);
496    }
497}