1use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use object_store::path::Path as OsPath;
15use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
16
17use super::backend::{StateBackend, StateBackendError};
18
19const PRUNE_FULL_SCAN_EVERY: u64 = 32;
21
22pub struct ObjectStoreBackend {
24 store: Arc<dyn ObjectStore>,
25 instance_id: String,
26 committer_bytes: Bytes,
30 vnode_capacity: u32,
31 latest_pruned_epoch: AtomicU64,
36 prune_passes: AtomicU64,
39 authoritative_version: Arc<AtomicU64>,
48}
49
50impl std::fmt::Debug for ObjectStoreBackend {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("ObjectStoreBackend")
53 .field("instance_id", &self.instance_id)
54 .field("vnode_capacity", &self.vnode_capacity)
55 .finish_non_exhaustive()
56 }
57}
58
59impl ObjectStoreBackend {
60 #[must_use]
62 pub fn new(
63 store: Arc<dyn ObjectStore>,
64 instance_id: impl Into<String>,
65 vnode_capacity: u32,
66 ) -> Self {
67 let instance_id = instance_id.into();
68 let committer_bytes = Bytes::from(instance_id.clone().into_bytes());
69 Self {
70 store,
71 instance_id,
72 committer_bytes,
73 vnode_capacity,
74 latest_pruned_epoch: AtomicU64::new(0),
75 prune_passes: AtomicU64::new(0),
76 authoritative_version: Arc::new(AtomicU64::new(0)),
77 }
78 }
79
80 #[must_use]
82 pub fn vnode_capacity(&self) -> u32 {
83 self.vnode_capacity
84 }
85
86 #[must_use]
91 pub fn authoritative_version_handle(&self) -> Arc<AtomicU64> {
92 Arc::clone(&self.authoritative_version)
93 }
94
95 fn check_vnode(&self, v: u32) -> Result<(), StateBackendError> {
96 if v >= self.vnode_capacity {
97 Err(StateBackendError::Io(format!(
98 "vnode {v} out of range (capacity {})",
99 self.vnode_capacity
100 )))
101 } else {
102 Ok(())
103 }
104 }
105
106 fn partial_path(epoch: u64, vnode: u32) -> OsPath {
107 OsPath::from(format!("epoch={epoch}/vnode={vnode}/partial.bin"))
108 }
109
110 fn commit_path(epoch: u64) -> OsPath {
111 OsPath::from(format!("epoch={epoch}/_COMMIT"))
112 }
113
114 fn epoch_of_first_segment(loc: &str) -> Option<u64> {
118 let first = loc.split('/').next().unwrap_or("");
119 first.strip_prefix("epoch=")?.parse::<u64>().ok()
120 }
121}
122
123#[async_trait]
124impl StateBackend for ObjectStoreBackend {
125 async fn write_partial(
126 &self,
127 vnode: u32,
128 epoch: u64,
129 assignment_version: u64,
130 bytes: Bytes,
131 ) -> Result<(), StateBackendError> {
132 self.check_vnode(vnode)?;
133 let authoritative = self.authoritative_version.load(Ordering::Acquire);
139 if authoritative > 0 && assignment_version < authoritative {
140 return Err(StateBackendError::StaleVersion {
141 caller: assignment_version,
142 authoritative,
143 });
144 }
145 let path = Self::partial_path(epoch, vnode);
146 self.store
147 .put(&path, PutPayload::from(bytes))
148 .await
149 .map_err(|e| StateBackendError::Io(e.to_string()))?;
150 Ok(())
151 }
152
153 async fn read_partial(
154 &self,
155 vnode: u32,
156 epoch: u64,
157 ) -> Result<Option<Bytes>, StateBackendError> {
158 self.check_vnode(vnode)?;
159 let path = Self::partial_path(epoch, vnode);
160 match self.store.get(&path).await {
161 Ok(res) => {
162 let b = res
163 .bytes()
164 .await
165 .map_err(|e| StateBackendError::Io(e.to_string()))?;
166 Ok(Some(b))
167 }
168 Err(object_store::Error::NotFound { .. }) => Ok(None),
169 Err(e) => Err(StateBackendError::Io(e.to_string())),
170 }
171 }
172
173 async fn epoch_complete(&self, epoch: u64, vnodes: &[u32]) -> Result<bool, StateBackendError> {
174 use rustc_hash::FxHashSet;
175 use tokio_stream::StreamExt;
176
177 let commit = Self::commit_path(epoch);
178 match self.store.head(&commit).await {
183 Ok(_) => return self.verify_commit_marker(&commit).await,
184 Err(object_store::Error::NotFound { .. }) => {}
185 Err(e) => return Err(StateBackendError::Io(e.to_string())),
186 }
187
188 for &v in vnodes {
189 self.check_vnode(v)?;
190 }
191
192 let prefix = OsPath::from(format!("epoch={epoch}/"));
195 let mut entries = self.store.list(Some(&prefix));
196 let mut found_paths: FxHashSet<OsPath> = FxHashSet::default();
197 while let Some(entry) = entries.next().await {
198 let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
199 found_paths.insert(entry.location);
200 }
201
202 for &v in vnodes {
203 let path = Self::partial_path(epoch, v);
204 if !found_paths.contains(&path) {
205 return Ok(false);
206 }
207 }
208
209 let payload = PutPayload::from(self.committer_bytes.clone());
211 let opts = PutOptions {
212 mode: PutMode::Create,
213 ..Default::default()
214 };
215 match self.store.put_opts(&commit, payload, opts).await {
216 Ok(_) => Ok(true),
217 Err(object_store::Error::AlreadyExists { .. }) => {
221 self.verify_commit_marker(&commit).await
222 }
223 Err(e) => Err(StateBackendError::Io(e.to_string())),
224 }
225 }
226
227 async fn prune_before(&self, before: u64) -> Result<(), StateBackendError> {
228 use futures::stream::{self, StreamExt};
229
230 let pass = self.prune_passes.fetch_add(1, Ordering::AcqRel);
231 let start = if pass.is_multiple_of(PRUNE_FULL_SCAN_EVERY) {
232 0
233 } else {
234 self.latest_pruned_epoch.load(Ordering::Acquire)
235 };
236
237 let mut victims: Vec<OsPath> = Vec::new();
238 if start == 0 {
239 let mut entries = self.store.list(None);
243 while let Some(entry) = entries.next().await {
244 let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
245 let Some(epoch) = Self::epoch_of_first_segment(entry.location.as_ref()) else {
246 continue;
247 };
248 if epoch < before {
249 victims.push(entry.location);
250 }
251 }
252 } else {
253 for epoch in start..before {
257 let prefix = OsPath::from(format!("epoch={epoch}/"));
258 let mut entries = self.store.list(Some(&prefix));
259 while let Some(entry) = entries.next().await {
260 let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
261 victims.push(entry.location);
262 }
263 }
264 }
265
266 let mut delete_failed = false;
269 if !victims.is_empty() {
270 let locations =
271 stream::iter(victims.into_iter().map(Ok::<OsPath, object_store::Error>)).boxed();
272 let mut deletes = self.store.delete_stream(locations);
273 while let Some(res) = deletes.next().await {
274 match res {
275 Ok(_) | Err(object_store::Error::NotFound { .. }) => {}
276 Err(e) => {
277 delete_failed = true;
278 tracing::warn!(error = %e, "state backend prune: delete failed");
279 }
280 }
281 }
282 }
283
284 if !delete_failed {
289 self.latest_pruned_epoch.fetch_max(before, Ordering::AcqRel);
290 }
291 Ok(())
292 }
293
294 async fn latest_committed_epoch(&self) -> Result<Option<u64>, StateBackendError> {
295 use tokio_stream::StreamExt;
296
297 let mut entries = self.store.list(None);
301 let mut highest: Option<u64> = None;
302 while let Some(entry) = entries.next().await {
303 let entry = entry.map_err(|e| StateBackendError::Io(e.to_string()))?;
304 let loc = entry.location.as_ref();
305 if !loc.ends_with("/_COMMIT") {
306 continue;
307 }
308 if let Some(epoch) = Self::epoch_of_first_segment(loc) {
309 highest = Some(highest.map_or(epoch, |h| h.max(epoch)));
310 }
311 }
312 Ok(highest)
313 }
314
315 fn set_authoritative_version(&self, version: u64) {
316 let mut cur = self.authoritative_version.load(Ordering::Acquire);
318 while version > cur {
319 match self.authoritative_version.compare_exchange(
320 cur,
321 version,
322 Ordering::AcqRel,
323 Ordering::Acquire,
324 ) {
325 Ok(_) => return,
326 Err(observed) => cur = observed,
327 }
328 }
329 }
330
331 fn authoritative_version(&self) -> u64 {
332 self.authoritative_version.load(Ordering::Acquire)
333 }
334}
335
336impl ObjectStoreBackend {
337 async fn verify_commit_marker(&self, commit: &OsPath) -> Result<bool, StateBackendError> {
343 let res = self
344 .store
345 .get(commit)
346 .await
347 .map_err(|e| StateBackendError::Io(e.to_string()))?;
348 let bytes = res
349 .bytes()
350 .await
351 .map_err(|e| StateBackendError::Io(e.to_string()))?;
352 let committer = std::str::from_utf8(&bytes).map_err(|e| {
353 StateBackendError::Serialization(format!("commit marker not utf8: {e}"))
354 })?;
355 if committer == self.instance_id.as_str() {
356 Ok(true)
357 } else {
358 Err(StateBackendError::SplitBrainCommit {
359 committer: committer.to_string(),
360 self_id: self.instance_id.clone(),
361 })
362 }
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use object_store::local::LocalFileSystem;
370 use tempfile::tempdir;
371
372 fn make_store(dir: &std::path::Path) -> Arc<dyn ObjectStore> {
373 Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap())
374 }
375
376 #[tokio::test]
377 async fn write_read_roundtrip() {
378 let dir = tempdir().unwrap();
379 let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
380 backend
381 .write_partial(0, 1, 0, Bytes::from_static(b"hello"))
382 .await
383 .unwrap();
384 let got = backend.read_partial(0, 1).await.unwrap().unwrap();
385 assert_eq!(&got[..], b"hello");
386 }
387
388 #[tokio::test]
389 async fn epoch_complete_cas_commit() {
390 let dir = tempdir().unwrap();
391 let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
392 let vnodes = [0u32, 1, 2];
393
394 assert!(!backend.epoch_complete(1, &vnodes).await.unwrap());
395 for v in &vnodes {
396 backend
397 .write_partial(*v, 1, 0, Bytes::from_static(b"y"))
398 .await
399 .unwrap();
400 }
401 assert!(backend.epoch_complete(1, &vnodes).await.unwrap());
402 assert!(backend.epoch_complete(1, &vnodes).await.unwrap());
404 }
405
406 #[tokio::test]
412 async fn epoch_complete_detects_split_brain_committer() {
413 let dir = tempdir().unwrap();
414 let store = make_store(dir.path());
415 let winner = ObjectStoreBackend::new(Arc::clone(&store), "winner", 4);
416 let loser = ObjectStoreBackend::new(Arc::clone(&store), "loser", 4);
417
418 let vnodes = [0u32, 1];
419 for v in &vnodes {
421 winner
422 .write_partial(*v, 7, 0, Bytes::from_static(b"w"))
423 .await
424 .unwrap();
425 }
426
427 assert!(winner.epoch_complete(7, &vnodes).await.unwrap());
429
430 let err = loser.epoch_complete(7, &vnodes).await.unwrap_err();
433 match err {
434 StateBackendError::SplitBrainCommit { committer, self_id } => {
435 assert_eq!(committer, "winner");
436 assert_eq!(self_id, "loser");
437 }
438 other => panic!("expected SplitBrainCommit, got {other:?}"),
439 }
440
441 assert!(winner.epoch_complete(7, &vnodes).await.unwrap());
443 }
444
445 #[tokio::test]
450 async fn epoch_complete_detects_split_brain_on_cas_loser_path() {
451 let dir = tempdir().unwrap();
452 let store = make_store(dir.path());
453 let winner = ObjectStoreBackend::new(Arc::clone(&store), "winner", 4);
454 let loser = ObjectStoreBackend::new(Arc::clone(&store), "loser", 4);
455
456 let vnodes = [0u32, 1];
457 for v in &vnodes {
458 winner
459 .write_partial(*v, 3, 0, Bytes::from_static(b"w"))
460 .await
461 .unwrap();
462 }
463 let commit = ObjectStoreBackend::commit_path(3);
467 store
468 .put(&commit, PutPayload::from(Bytes::from_static(b"winner")))
469 .await
470 .unwrap();
471
472 let err = loser.epoch_complete(3, &vnodes).await.unwrap_err();
473 assert!(matches!(
474 err,
475 StateBackendError::SplitBrainCommit { ref committer, .. }
476 if committer == "winner"
477 ));
478 }
479
480 #[tokio::test]
481 async fn stale_version_rejected() {
482 let dir = tempdir().unwrap();
486 let store = make_store(dir.path());
487 let stale = ObjectStoreBackend::new(Arc::clone(&store), "node-stale", 4);
488 let fresh = ObjectStoreBackend::new(Arc::clone(&store), "node-fresh", 4);
489
490 fresh.set_authoritative_version(2);
493
494 fresh
496 .write_partial(0, 1, 2, Bytes::from_static(b"fresh"))
497 .await
498 .unwrap();
499
500 stale.set_authoritative_version(2);
505 let err = stale
506 .write_partial(0, 1, 1, Bytes::from_static(b"stale"))
507 .await
508 .unwrap_err();
509 match err {
510 StateBackendError::StaleVersion {
511 caller,
512 authoritative,
513 } => {
514 assert_eq!(caller, 1);
515 assert_eq!(authoritative, 2);
516 }
517 other => panic!("expected StaleVersion, got {other:?}"),
518 }
519
520 let unfenced = ObjectStoreBackend::new(Arc::clone(&store), "node-unfenced", 4);
523 unfenced
524 .write_partial(1, 1, 0, Bytes::from_static(b"ok"))
525 .await
526 .unwrap();
527 }
528
529 #[test]
530 fn authoritative_version_is_monotonic() {
531 let dir = tempdir().unwrap();
532 let b = ObjectStoreBackend::new(make_store(dir.path()), "node", 2);
533 assert_eq!(b.authoritative_version(), 0);
534 b.set_authoritative_version(3);
535 assert_eq!(b.authoritative_version(), 3);
536 b.set_authoritative_version(1);
538 assert_eq!(b.authoritative_version(), 3);
539 b.set_authoritative_version(4);
540 assert_eq!(b.authoritative_version(), 4);
541 }
542
543 #[tokio::test]
544 async fn object_safe_behind_arc() {
545 let dir = tempdir().unwrap();
546 let _: Arc<dyn StateBackend> =
547 Arc::new(ObjectStoreBackend::new(make_store(dir.path()), "node-0", 2));
548 }
549
550 #[tokio::test]
551 async fn latest_committed_epoch_tracks_highest_sealed() {
552 let dir = tempdir().unwrap();
553 let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
554
555 assert_eq!(backend.latest_committed_epoch().await.unwrap(), None);
557
558 let vnodes = [0u32, 1];
561 for &epoch in &[3u64, 7] {
562 for v in &vnodes {
563 backend
564 .write_partial(*v, epoch, 0, Bytes::from_static(b"s"))
565 .await
566 .unwrap();
567 }
568 assert!(backend.epoch_complete(epoch, &vnodes).await.unwrap());
569 }
570
571 backend
573 .write_partial(0, 5, 0, Bytes::from_static(b"uncommitted"))
574 .await
575 .unwrap();
576
577 assert_eq!(backend.latest_committed_epoch().await.unwrap(), Some(7));
578 }
579
580 #[tokio::test]
581 async fn prune_before_deletes_old_epochs() {
582 let dir = tempdir().unwrap();
583 let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
584
585 for epoch in 1..=5u64 {
587 backend
588 .write_partial(0, epoch, 0, Bytes::from_static(b"x"))
589 .await
590 .unwrap();
591 }
592
593 backend.prune_before(4).await.unwrap();
594
595 for epoch in 1..=3 {
596 assert!(
597 backend.read_partial(0, epoch).await.unwrap().is_none(),
598 "epoch {epoch} should be pruned",
599 );
600 }
601 for epoch in 4..=5 {
602 assert!(
603 backend.read_partial(0, epoch).await.unwrap().is_some(),
604 "epoch {epoch} should be retained",
605 );
606 }
607 }
608
609 #[tokio::test]
614 async fn prune_before_is_incremental_and_advances_horizon() {
615 let dir = tempdir().unwrap();
616 let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
617
618 for epoch in 1..=6u64 {
620 for v in 0..2u32 {
621 backend
622 .write_partial(v, epoch, 0, Bytes::from_static(b"x"))
623 .await
624 .unwrap();
625 }
626 }
627
628 backend.prune_before(3).await.unwrap();
631 assert_eq!(backend.latest_pruned_epoch.load(Ordering::Relaxed), 3);
632
633 backend.prune_before(5).await.unwrap();
636 assert_eq!(backend.latest_pruned_epoch.load(Ordering::Relaxed), 5);
637
638 for epoch in 1..=4u64 {
639 for v in 0..2u32 {
640 assert!(
641 backend.read_partial(v, epoch).await.unwrap().is_none(),
642 "epoch {epoch} vnode {v} should be pruned",
643 );
644 }
645 }
646 for epoch in 5..=6u64 {
647 for v in 0..2u32 {
648 assert!(
649 backend.read_partial(v, epoch).await.unwrap().is_some(),
650 "epoch {epoch} vnode {v} should be retained",
651 );
652 }
653 }
654
655 backend.prune_before(5).await.unwrap();
657 assert_eq!(backend.latest_pruned_epoch.load(Ordering::Relaxed), 5);
658 assert!(backend.read_partial(0, 5).await.unwrap().is_some());
659 }
660
661 #[tokio::test]
664 async fn periodic_full_scan_reclaims_late_write_below_cursor() {
665 let dir = tempdir().unwrap();
666 let backend = ObjectStoreBackend::new(make_store(dir.path()), "node-0", 4);
667
668 backend
669 .write_partial(0, 1, 0, Bytes::from_static(b"x"))
670 .await
671 .unwrap();
672 backend.prune_before(3).await.unwrap();
673 assert!(backend.read_partial(0, 1).await.unwrap().is_none());
674
675 backend
677 .write_partial(0, 1, 0, Bytes::from_static(b"late"))
678 .await
679 .unwrap();
680 backend.prune_before(3).await.unwrap();
681 assert!(
682 backend.read_partial(0, 1).await.unwrap().is_some(),
683 "incremental prune cannot see below the cursor",
684 );
685
686 for _ in 0..PRUNE_FULL_SCAN_EVERY {
688 backend.prune_before(3).await.unwrap();
689 }
690 assert!(backend.read_partial(0, 1).await.unwrap().is_none());
691 }
692}