Skip to main content

laminar_core/state/
object_store.rs

1//! [`ObjectStoreBackend`] — durable partial-state storage backed by any
2//! `object_store` implementation (S3, GCS, Azure, `LocalFileSystem`).
3//!
4//! `epoch_complete(epoch, vnodes)` performs a CAS-commit: if every
5//! vnode's `partial.bin` is present, `put(_COMMIT, Create)` seals the
6//! epoch. The `_COMMIT` marker is the durability boundary the
7//! checkpoint coordinator consults before releasing sinks.
8
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use object_store::path::Path as OsPath;
15use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
16
17use super::backend::{StateBackend, StateBackendError};
18
19/// Every Nth prune does a full listing instead of the incremental window.
20const PRUNE_FULL_SCAN_EVERY: u64 = 32;
21
22/// Object-store-backed [`StateBackend`].
23pub struct ObjectStoreBackend {
24    store: Arc<dyn ObjectStore>,
25    instance_id: String,
26    /// Pre-encoded audit body for the `_COMMIT` CAS — derived once
27    /// from `instance_id` to avoid cloning a String into `Bytes` on
28    /// every commit attempt.
29    committer_bytes: Bytes,
30    vnode_capacity: u32,
31    /// Highest prune horizon already covered cleanly: later prunes list only
32    /// `epoch={N}/` prefixes in `[latest_pruned_epoch, before)` instead of the
33    /// whole store. `0` = no baseline yet; the first prune does one full
34    /// listing, then bounds every subsequent one.
35    latest_pruned_epoch: AtomicU64,
36    /// Prune-call counter driving the periodic full-scan re-baseline, which
37    /// bounds how long a straggler write below the cursor can leak.
38    prune_passes: AtomicU64,
39    /// Authoritative vnode-assignment version known to this backend.
40    /// Split-brain fence: [`write_partial`](Self::write_partial) rejects
41    /// any caller whose `assignment_version` is strictly less than this
42    /// value. Updated via [`set_authoritative_version`](Self::set_authoritative_version)
43    /// whenever the host sees a newer `AssignmentSnapshot` rotate in.
44    ///
45    /// Default is `0`, which disables the fence — unconfigured
46    /// callers (most single-instance paths) are accepted unchanged.
47    authoritative_version: Arc<AtomicU64>,
48}
49
50impl std::fmt::Debug for ObjectStoreBackend {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("ObjectStoreBackend")
53            .field("instance_id", &self.instance_id)
54            .field("vnode_capacity", &self.vnode_capacity)
55            .finish_non_exhaustive()
56    }
57}
58
59impl ObjectStoreBackend {
60    /// Wrap an existing [`ObjectStore`].
61    #[must_use]
62    pub fn new(
63        store: Arc<dyn ObjectStore>,
64        instance_id: impl Into<String>,
65        vnode_capacity: u32,
66    ) -> Self {
67        let instance_id = instance_id.into();
68        let committer_bytes = Bytes::from(instance_id.clone().into_bytes());
69        Self {
70            store,
71            instance_id,
72            committer_bytes,
73            vnode_capacity,
74            latest_pruned_epoch: AtomicU64::new(0),
75            prune_passes: AtomicU64::new(0),
76            authoritative_version: Arc::new(AtomicU64::new(0)),
77        }
78    }
79
80    /// Vnode range this backend is configured for.
81    #[must_use]
82    pub fn vnode_capacity(&self) -> u32 {
83        self.vnode_capacity
84    }
85
86    /// Shared handle to the authoritative version counter. Callers that
87    /// want to bump several objects (e.g. backend plus a future metric)
88    /// from a single owner can clone this handle instead of relaying
89    /// through [`StateBackend::set_authoritative_version`].
90    #[must_use]
91    pub fn authoritative_version_handle(&self) -> Arc<AtomicU64> {
92        Arc::clone(&self.authoritative_version)
93    }
94
95    fn check_vnode(&self, v: u32) -> Result<(), StateBackendError> {
96        if v >= self.vnode_capacity {
97            Err(StateBackendError::Io(format!(
98                "vnode {v} out of range (capacity {})",
99                self.vnode_capacity
100            )))
101        } else {
102            Ok(())
103        }
104    }
105
106    fn partial_path(epoch: u64, vnode: u32) -> OsPath {
107        OsPath::from(format!("epoch={epoch}/vnode={vnode}/partial.bin"))
108    }
109
110    fn commit_path(epoch: u64) -> OsPath {
111        OsPath::from(format!("epoch={epoch}/_COMMIT"))
112    }
113
114    /// Parse `N` from a location whose first path segment is `epoch=N`.
115    /// `None` for any sibling object that doesn't follow the layout.
116    /// `str::split` always yields at least one segment.
117    fn epoch_of_first_segment(loc: &str) -> Option<u64> {
118        let first = loc.split('/').next().unwrap_or("");
119        first.strip_prefix("epoch=")?.parse::<u64>().ok()
120    }
121}
122
123#[async_trait]
124impl StateBackend for ObjectStoreBackend {
125    async fn write_partial(
126        &self,
127        vnode: u32,
128        epoch: u64,
129        assignment_version: u64,
130        bytes: Bytes,
131    ) -> Result<(), StateBackendError> {
132        self.check_vnode(vnode)?;
133        // Split-brain fence. `authoritative_version == 0` means
134        // "unconfigured" — accept every write (matches the legacy
135        // single-instance behavior). Non-zero authoritative means we
136        // know of a specific assignment generation; writes stamped with
137        // an older generation are rejected.
138        let authoritative = self.authoritative_version.load(Ordering::Acquire);
139        if authoritative > 0 && assignment_version < authoritative {
140            return Err(StateBackendError::StaleVersion {
141                caller: assignment_version,
142                authoritative,
143            });
144        }
145        let path = Self::partial_path(epoch, vnode);
146        self.store
147            .put(&path, PutPayload::from(bytes))
148            .await
149            .map_err(|e| StateBackendError::Io(e.to_string()))?;
150        Ok(())
151    }
152
153    async fn read_partial(
154        &self,
155        vnode: u32,
156        epoch: u64,
157    ) -> Result<Option<Bytes>, StateBackendError> {
158        self.check_vnode(vnode)?;
159        let path = Self::partial_path(epoch, vnode);
160        match self.store.get(&path).await {
161            Ok(res) => {
162                let b = res
163                    .bytes()
164                    .await
165                    .map_err(|e| StateBackendError::Io(e.to_string()))?;
166                Ok(Some(b))
167            }
168            Err(object_store::Error::NotFound { .. }) => Ok(None),
169            Err(e) => Err(StateBackendError::Io(e.to_string())),
170        }
171    }
172
173    async fn epoch_complete(&self, epoch: u64, vnodes: &[u32]) -> Result<bool, StateBackendError> {
174        use rustc_hash::FxHashSet;
175        use tokio_stream::StreamExt;
176
177        let commit = Self::commit_path(epoch);
178        // Fast path: a marker already exists. Previously we returned
179        // `Ok(true)` blindly — that swallowed split-brain (two leaders
180        // racing, the loser silently agreed it had committed). Now we
181        // read the audit body and reject if the committer isn't us.
182        match self.store.head(&commit).await {
183            Ok(_) => return self.verify_commit_marker(&commit).await,
184            Err(object_store::Error::NotFound { .. }) => {}
185            Err(e) => return Err(StateBackendError::Io(e.to_string())),
186        }
187
188        for &v in vnodes {
189            self.check_vnode(v)?;
190        }
191
192        // List the epoch prefix once, then check every required vnode's
193        // partial is present — one round trip instead of O(vnodes) HEADs.
194        let prefix = OsPath::from(format!("epoch={epoch}/"));
195        let mut entries = self.store.list(Some(&prefix));
196        let mut found_paths: FxHashSet<OsPath> = FxHashSet::default();
197        while let Some(entry) = entries.next().await {
198            let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
199            found_paths.insert(entry.location);
200        }
201
202        for &v in vnodes {
203            let path = Self::partial_path(epoch, v);
204            if !found_paths.contains(&path) {
205                return Ok(false);
206            }
207        }
208
209        // CAS the commit marker; payload is the committer's id for audit.
210        let payload = PutPayload::from(self.committer_bytes.clone());
211        let opts = PutOptions {
212            mode: PutMode::Create,
213            ..Default::default()
214        };
215        match self.store.put_opts(&commit, payload, opts).await {
216            Ok(_) => Ok(true),
217            // AlreadyExists means a peer raced us to the CAS. Don't
218            // silently agree — verify who actually wrote the marker
219            // so a stale leader doesn't keep driving the commit phase.
220            Err(object_store::Error::AlreadyExists { .. }) => {
221                self.verify_commit_marker(&commit).await
222            }
223            Err(e) => Err(StateBackendError::Io(e.to_string())),
224        }
225    }
226
227    async fn prune_before(&self, before: u64) -> Result<(), StateBackendError> {
228        use futures::stream::{self, StreamExt};
229
230        let pass = self.prune_passes.fetch_add(1, Ordering::AcqRel);
231        let start = if pass.is_multiple_of(PRUNE_FULL_SCAN_EVERY) {
232            0
233        } else {
234            self.latest_pruned_epoch.load(Ordering::Acquire)
235        };
236
237        let mut victims: Vec<OsPath> = Vec::new();
238        if start == 0 {
239            // No baseline yet: one full listing. `epoch={epoch}/...` has a
240            // dynamic first segment and the `object_store` API matches whole
241            // segments, so a bare `epoch=` prefix would match nothing.
242            let mut entries = self.store.list(None);
243            while let Some(entry) = entries.next().await {
244                let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
245                let Some(epoch) = Self::epoch_of_first_segment(entry.location.as_ref()) else {
246                    continue;
247                };
248                if epoch < before {
249                    victims.push(entry.location);
250                }
251            }
252        } else {
253            // Only epochs in `[start, before)` can still hold objects, and
254            // `epoch={N}/` is an exact segment, so per-epoch listings cost
255            // O(epochs-since-last-prune × vnodes) instead of O(store).
256            for epoch in start..before {
257                let prefix = OsPath::from(format!("epoch={epoch}/"));
258                let mut entries = self.store.list(Some(&prefix));
259                while let Some(entry) = entries.next().await {
260                    let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
261                    victims.push(entry.location);
262                }
263            }
264        }
265
266        // `delete_stream` coalesces into bulk-delete API calls where the store
267        // supports them (S3 `DeleteObjects`); a missing object is a no-op.
268        let mut delete_failed = false;
269        if !victims.is_empty() {
270            let locations =
271                stream::iter(victims.into_iter().map(Ok::<OsPath, object_store::Error>)).boxed();
272            let mut deletes = self.store.delete_stream(locations);
273            while let Some(res) = deletes.next().await {
274                match res {
275                    Ok(_) | Err(object_store::Error::NotFound { .. }) => {}
276                    Err(e) => {
277                        delete_failed = true;
278                        tracing::warn!(error = %e, "state backend prune: delete failed");
279                    }
280                }
281            }
282        }
283
284        // Advance the cursor only on a clean pass: a failed delete must stay
285        // above it so the next prune re-lists that epoch and retries, instead
286        // of orphaning the object until a process restart. `fetch_max` keeps
287        // the cursor monotonic under concurrent prunes.
288        if !delete_failed {
289            self.latest_pruned_epoch.fetch_max(before, Ordering::AcqRel);
290        }
291        Ok(())
292    }
293
294    async fn latest_committed_epoch(&self) -> Result<Option<u64>, StateBackendError> {
295        use tokio_stream::StreamExt;
296
297        // Same listing constraint as `prune_before`: the first path
298        // segment (`epoch=N`) is dynamic, so we scan the whole store and
299        // filter for the `_COMMIT` markers a sealed epoch leaves behind.
300        let mut entries = self.store.list(None);
301        let mut highest: Option<u64> = None;
302        while let Some(entry) = entries.next().await {
303            let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
304            let loc = entry.location.as_ref();
305            if !loc.ends_with("/_COMMIT") {
306                continue;
307            }
308            if let Some(epoch) = Self::epoch_of_first_segment(loc) {
309                highest = Some(highest.map_or(epoch, |h| h.max(epoch)));
310            }
311        }
312        Ok(highest)
313    }
314
315    fn set_authoritative_version(&self, version: u64) {
316        // CAS loop avoids lowering the version on a late call.
317        let mut cur = self.authoritative_version.load(Ordering::Acquire);
318        while version > cur {
319            match self.authoritative_version.compare_exchange(
320                cur,
321                version,
322                Ordering::AcqRel,
323                Ordering::Acquire,
324            ) {
325                Ok(_) => return,
326                Err(observed) => cur = observed,
327            }
328        }
329    }
330
331    fn authoritative_version(&self) -> u64 {
332        self.authoritative_version.load(Ordering::Acquire)
333    }
334}
335
336impl ObjectStoreBackend {
337    /// Read the epoch's `_COMMIT` marker and compare its audit body
338    /// against this backend's `instance_id`. Match → `Ok(true)` (we
339    /// committed, a retry or observation is fine). Mismatch →
340    /// [`StateBackendError::SplitBrainCommit`] so the caller aborts
341    /// rather than double-committing downstream.
342    async fn verify_commit_marker(&self, commit: &OsPath) -> Result<bool, StateBackendError> {
343        let res = self
344            .store
345            .get(commit)
346            .await
347            .map_err(|e| StateBackendError::Io(e.to_string()))?;
348        let bytes = res
349            .bytes()
350            .await
351            .map_err(|e| StateBackendError::Io(e.to_string()))?;
352        let committer = std::str::from_utf8(&bytes).map_err(|e| {
353            StateBackendError::Serialization(format!("commit marker not utf8: {e}"))
354        })?;
355        if committer == self.instance_id.as_str() {
356            Ok(true)
357        } else {
358            Err(StateBackendError::SplitBrainCommit {
359                committer: committer.to_string(),
360                self_id: self.instance_id.clone(),
361            })
362        }
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use object_store::local::LocalFileSystem;
370    use tempfile::tempdir;
371
372    fn make_store(dir: &std::path::Path) -> Arc<dyn ObjectStore> {
373        Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap())
374    }
375
376    #[tokio::test]
377    async fn write_read_roundtrip() {
378        let dir = tempdir().unwrap();
379        let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
380        backend
381            .write_partial(0, 1, 0, Bytes::from_static(b"hello"))
382            .await
383            .unwrap();
384        let got = backend.read_partial(0, 1).await.unwrap().unwrap();
385        assert_eq!(&got[..], b"hello");
386    }
387
388    #[tokio::test]
389    async fn epoch_complete_cas_commit() {
390        let dir = tempdir().unwrap();
391        let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
392        let vnodes = [0u32, 1, 2];
393
394        assert!(!backend.epoch_complete(1, &vnodes).await.unwrap());
395        for v in &vnodes {
396            backend
397                .write_partial(*v, 1, 0, Bytes::from_static(b"y"))
398                .await
399                .unwrap();
400        }
401        assert!(backend.epoch_complete(1, &vnodes).await.unwrap());
402        // Idempotent — same committer id in the audit body.
403        assert!(backend.epoch_complete(1, &vnodes).await.unwrap());
404    }
405
406    /// Split-brain commit protection. Previously the CAS-create's
407    /// `AlreadyExists` branch was folded into the success branch, so a
408    /// stale leader racing a fresh one would happily agree it had also
409    /// committed the epoch. Now the loser reads the marker, sees a
410    /// mismatched audit body, and fails loud.
411    #[tokio::test]
412    async fn epoch_complete_detects_split_brain_committer() {
413        let dir = tempdir().unwrap();
414        let store = make_store(dir.path());
415        let winner = ObjectStoreBackend::new(Arc::clone(&store), "winner", 4);
416        let loser = ObjectStoreBackend::new(Arc::clone(&store), "loser", 4);
417
418        let vnodes = [0u32, 1];
419        // Both "nodes" wrote partials for the epoch.
420        for v in &vnodes {
421            winner
422                .write_partial(*v, 7, 0, Bytes::from_static(b"w"))
423                .await
424                .unwrap();
425        }
426
427        // Winner CAS-creates the commit marker first.
428        assert!(winner.epoch_complete(7, &vnodes).await.unwrap());
429
430        // Loser finds the marker already there (HEAD fast-path) and
431        // must NOT agree it committed — that's the split-brain case.
432        let err = loser.epoch_complete(7, &vnodes).await.unwrap_err();
433        match err {
434            StateBackendError::SplitBrainCommit { committer, self_id } => {
435                assert_eq!(committer, "winner");
436                assert_eq!(self_id, "loser");
437            }
438            other => panic!("expected SplitBrainCommit, got {other:?}"),
439        }
440
441        // And the winner's repeated call is still idempotent Ok(true).
442        assert!(winner.epoch_complete(7, &vnodes).await.unwrap());
443    }
444
445    /// Same contract on the CAS-loser path: if the marker doesn't exist
446    /// at HEAD time but a peer sneaks in between our vnode-presence
447    /// check and our own PUT, our `put_opts` fails with `AlreadyExists`.
448    /// That branch must also compare committers, not silently succeed.
449    #[tokio::test]
450    async fn epoch_complete_detects_split_brain_on_cas_loser_path() {
451        let dir = tempdir().unwrap();
452        let store = make_store(dir.path());
453        let winner = ObjectStoreBackend::new(Arc::clone(&store), "winner", 4);
454        let loser = ObjectStoreBackend::new(Arc::clone(&store), "loser", 4);
455
456        let vnodes = [0u32, 1];
457        for v in &vnodes {
458            winner
459                .write_partial(*v, 3, 0, Bytes::from_static(b"w"))
460                .await
461                .unwrap();
462        }
463        // Manually pre-seed the commit marker under "winner" to
464        // simulate the TOCTOU race deterministically — the loser's
465        // put_opts will hit AlreadyExists on its own PUT attempt.
466        let commit = ObjectStoreBackend::commit_path(3);
467        store
468            .put(&commit, PutPayload::from(Bytes::from_static(b"winner")))
469            .await
470            .unwrap();
471
472        let err = loser.epoch_complete(3, &vnodes).await.unwrap_err();
473        assert!(matches!(
474            err,
475            StateBackendError::SplitBrainCommit { ref committer, .. }
476                if committer == "winner"
477        ));
478    }
479
480    #[tokio::test]
481    async fn stale_version_rejected() {
482        // Force two "nodes" (backend instances wrapping the same store)
483        // to claim the same vnode at different generations. The stale
484        // writer must be rejected.
485        let dir = tempdir().unwrap();
486        let store = make_store(dir.path());
487        let stale = ObjectStoreBackend::new(Arc::clone(&store), "node-stale", 4);
488        let fresh = ObjectStoreBackend::new(Arc::clone(&store), "node-fresh", 4);
489
490        // Fresh learns about a new assignment generation — e.g. a new
491        // snapshot rotated in after a leader election.
492        fresh.set_authoritative_version(2);
493
494        // Fresh writes at the current version: accepted.
495        fresh
496            .write_partial(0, 1, 2, Bytes::from_static(b"fresh"))
497            .await
498            .unwrap();
499
500        // Stale tries to write at version 1 — but only IF it's also
501        // learned of the rotation. Model that by promoting stale's
502        // view too; the check is intra-backend here because the
503        // durable version-broadcast channel is out of scope for this test.
504        stale.set_authoritative_version(2);
505        let err = stale
506            .write_partial(0, 1, 1, Bytes::from_static(b"stale"))
507            .await
508            .unwrap_err();
509        match err {
510            StateBackendError::StaleVersion {
511                caller,
512                authoritative,
513            } => {
514                assert_eq!(caller, 1);
515                assert_eq!(authoritative, 2);
516            }
517            other => panic!("expected StaleVersion, got {other:?}"),
518        }
519
520        // Fence-disabled backend (authoritative stays at 0) accepts
521        // any version — preserves legacy single-instance behavior.
522        let unfenced = ObjectStoreBackend::new(Arc::clone(&store), "node-unfenced", 4);
523        unfenced
524            .write_partial(1, 1, 0, Bytes::from_static(b"ok"))
525            .await
526            .unwrap();
527    }
528
529    #[test]
530    fn authoritative_version_is_monotonic() {
531        let dir = tempdir().unwrap();
532        let b = ObjectStoreBackend::new(make_store(dir.path()), "node", 2);
533        assert_eq!(b.authoritative_version(), 0);
534        b.set_authoritative_version(3);
535        assert_eq!(b.authoritative_version(), 3);
536        // Attempts to lower the version are no-ops.
537        b.set_authoritative_version(1);
538        assert_eq!(b.authoritative_version(), 3);
539        b.set_authoritative_version(4);
540        assert_eq!(b.authoritative_version(), 4);
541    }
542
543    #[tokio::test]
544    async fn object_safe_behind_arc() {
545        let dir = tempdir().unwrap();
546        let _: Arc<dyn StateBackend> =
547            Arc::new(ObjectStoreBackend::new(make_store(dir.path()), "node-0", 2));
548    }
549
550    #[tokio::test]
551    async fn latest_committed_epoch_tracks_highest_sealed() {
552        let dir = tempdir().unwrap();
553        let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
554
555        // Fresh store: nothing committed.
556        assert_eq!(backend.latest_committed_epoch().await.unwrap(), None);
557
558        // Seal epochs 3 and 7 (out of order) by writing every vnode's
559        // partial and running the CAS commit gate.
560        let vnodes = [0u32, 1];
561        for &epoch in &[3u64, 7] {
562            for v in &vnodes {
563                backend
564                    .write_partial(*v, epoch, 0, Bytes::from_static(b"s"))
565                    .await
566                    .unwrap();
567            }
568            assert!(backend.epoch_complete(epoch, &vnodes).await.unwrap());
569        }
570
571        // Epoch 5 has partials but no commit marker — must be ignored.
572        backend
573            .write_partial(0, 5, 0, Bytes::from_static(b"uncommitted"))
574            .await
575            .unwrap();
576
577        assert_eq!(backend.latest_committed_epoch().await.unwrap(), Some(7));
578    }
579
580    #[tokio::test]
581    async fn prune_before_deletes_old_epochs() {
582        let dir = tempdir().unwrap();
583        let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
584
585        // Seed epochs 1..=5 with one vnode each.
586        for epoch in 1..=5u64 {
587            backend
588                .write_partial(0, epoch, 0, Bytes::from_static(b"x"))
589                .await
590                .unwrap();
591        }
592
593        backend.prune_before(4).await.unwrap();
594
595        for epoch in 1..=3 {
596            assert!(
597                backend.read_partial(0, epoch).await.unwrap().is_none(),
598                "epoch {epoch} should be pruned",
599            );
600        }
601        for epoch in 4..=5 {
602            assert!(
603                backend.read_partial(0, epoch).await.unwrap().is_some(),
604                "epoch {epoch} should be retained",
605            );
606        }
607    }
608
609    /// The horizon cursor must advance so the second prune takes the
610    /// bounded `[latest_pruned_epoch, before)` window (hot path) rather
611    /// than re-listing the whole store, while still deleting exactly the
612    /// epochs below the new horizon.
613    #[tokio::test]
614    async fn prune_before_is_incremental_and_advances_horizon() {
615        let dir = tempdir().unwrap();
616        let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
617
618        // Seed epochs 1..=6, two vnodes each so deletes touch >1 object.
619        for epoch in 1..=6u64 {
620            for v in 0..2u32 {
621                backend
622                    .write_partial(v, epoch, 0, Bytes::from_static(b"x"))
623                    .await
624                    .unwrap();
625            }
626        }
627
628        // First prune: cold path (cursor still at the `0` sentinel) — one
629        // full scan, drops epochs 1..=2, then advances the cursor to 3.
630        backend.prune_before(3).await.unwrap();
631        assert_eq!(backend.latest_pruned_epoch.load(Ordering::Relaxed), 3);
632
633        // Second prune: hot path now (cursor == 3), only walks epochs
634        // [3, 5) yet must still leave the store as if a full scan ran.
635        backend.prune_before(5).await.unwrap();
636        assert_eq!(backend.latest_pruned_epoch.load(Ordering::Relaxed), 5);
637
638        for epoch in 1..=4u64 {
639            for v in 0..2u32 {
640                assert!(
641                    backend.read_partial(v, epoch).await.unwrap().is_none(),
642                    "epoch {epoch} vnode {v} should be pruned",
643                );
644            }
645        }
646        for epoch in 5..=6u64 {
647            for v in 0..2u32 {
648                assert!(
649                    backend.read_partial(v, epoch).await.unwrap().is_some(),
650                    "epoch {epoch} vnode {v} should be retained",
651                );
652            }
653        }
654
655        // Idempotent re-prune at the same horizon is a no-op.
656        backend.prune_before(5).await.unwrap();
657        assert_eq!(backend.latest_pruned_epoch.load(Ordering::Relaxed), 5);
658        assert!(backend.read_partial(0, 5).await.unwrap().is_some());
659    }
660
661    /// A straggler write below the cursor is invisible to incremental prunes;
662    /// the periodic full-scan re-baseline must reclaim it.
663    #[tokio::test]
664    async fn periodic_full_scan_reclaims_late_write_below_cursor() {
665        let dir = tempdir().unwrap();
666        let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
667
668        backend
669            .write_partial(0, 1, 0, Bytes::from_static(b"x"))
670            .await
671            .unwrap();
672        backend.prune_before(3).await.unwrap();
673        assert!(backend.read_partial(0, 1).await.unwrap().is_none());
674
675        // Straggler lands in an epoch the cursor has already passed.
676        backend
677            .write_partial(0, 1, 0, Bytes::from_static(b"late"))
678            .await
679            .unwrap();
680        backend.prune_before(3).await.unwrap();
681        assert!(
682            backend.read_partial(0, 1).await.unwrap().is_some(),
683            "incremental prune cannot see below the cursor",
684        );
685
686        // Drive past the re-baseline pass; the full scan reclaims it.
687        for _ in 0..PRUNE_FULL_SCAN_EVERY {
688            backend.prune_before(3).await.unwrap();
689        }
690        assert!(backend.read_partial(0, 1).await.unwrap().is_none());
691    }
692}