1use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use object_store::{GetOptions, ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
24use sha2::{Digest, Sha256};
25use tracing::warn;
26
27use crate::checkpoint_manifest::CheckpointManifest;
28
29async fn sync_file(path: &Path) -> Result<(), std::io::Error> {
31 let f = tokio::fs::OpenOptions::new().write(true).open(path).await?;
33 f.sync_all().await
34}
35
36#[allow(clippy::unnecessary_wraps, clippy::unused_async)] async fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
42 #[cfg(unix)]
43 {
44 let f = tokio::fs::File::open(path).await?;
45 f.sync_all().await?;
46 }
47 #[cfg(not(unix))]
48 {
49 let _ = path;
50 }
51 Ok(())
52}
53
54#[derive(Debug, thiserror::Error)]
56pub enum CheckpointStoreError {
57 #[error("checkpoint I/O error: {0}")]
59 Io(#[from] std::io::Error),
60
61 #[error("checkpoint serialization error: {0}")]
63 Serde(#[from] serde_json::Error),
64
65 #[error("checkpoint {0} not found")]
67 NotFound(u64),
68
69 #[error("object store error: {0}")]
71 ObjectStore(#[from] object_store::Error),
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
83pub enum ValidationIssue {
84 ManifestWarning(String),
87 IntegrityFailure(String),
90}
91
92impl ValidationIssue {
93 #[must_use]
95 pub fn is_fatal(&self) -> bool {
96 matches!(self, Self::IntegrityFailure(_))
97 }
98
99 #[must_use]
101 pub fn message(&self) -> &str {
102 match self {
103 Self::ManifestWarning(s) | Self::IntegrityFailure(s) => s,
104 }
105 }
106}
107
108impl std::fmt::Display for ValidationIssue {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 f.write_str(self.message())
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct ValidationResult {
117 pub checkpoint_id: u64,
119 pub valid: bool,
122 pub issues: Vec<ValidationIssue>,
124}
125
126#[derive(Debug, Clone)]
131pub struct RecoveryReport {
132 pub chosen_id: Option<u64>,
134 pub skipped: Vec<(u64, String)>,
136 pub examined: usize,
138 pub elapsed: std::time::Duration,
140}
141
142fn parse_checkpoint_id_from_path(path: &str, prefix: &str, suffix: &str) -> Option<u64> {
149 for segment in path.split('/') {
150 let Some(rest) = segment.strip_prefix(prefix) else {
151 continue;
152 };
153 let Some(id_str) = rest.strip_suffix(suffix) else {
154 continue;
155 };
156 if let Ok(id) = id_str.parse::<u64>() {
157 return Some(id);
158 }
159 warn!(
160 path,
161 prefix, suffix, "malformed checkpoint id in object path — skipped"
162 );
163 return None;
164 }
165 None
166}
167
168fn sha256_hex(data: &[u8]) -> String {
170 let mut hasher = Sha256::new();
171 hasher.update(data);
172 format!("{:x}", hasher.finalize())
173}
174
175fn sha256_hex_chunks(chunks: &[bytes::Bytes]) -> String {
182 let mut hasher = Sha256::new();
183 for chunk in chunks {
184 hasher.update(chunk);
185 }
186 format!("{:x}", hasher.finalize())
187}
188
189#[async_trait]
195pub trait CheckpointStore: Send + Sync {
196 fn vnode_count(&self) -> u16 {
202 crate::checkpoint_manifest::DEFAULT_VNODE_COUNT
203 }
204
205 async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
211
212 async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
218
219 async fn load_by_id(&self, id: u64)
224 -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
225
226 async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
233
234 async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
241 Ok(self.list().await?.iter().map(|(id, _)| *id).collect())
244 }
245
246 async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
252
253 async fn update_manifest(
260 &self,
261 manifest: &CheckpointManifest,
262 ) -> Result<(), CheckpointStoreError> {
263 self.save(manifest).await
264 }
265
266 async fn save_state_data(
277 &self,
278 id: u64,
279 chunks: &[bytes::Bytes],
280 ) -> Result<(), CheckpointStoreError>;
281
282 async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
288
289 async fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
298 let mut issues = Vec::new();
299
300 let manifest = match self.load_by_id(id).await {
302 Ok(Some(m)) => m,
303 Ok(None) => {
304 return Ok(ValidationResult {
305 checkpoint_id: id,
306 valid: false,
307 issues: vec![ValidationIssue::IntegrityFailure(format!(
308 "manifest not found for checkpoint {id}"
309 ))],
310 });
311 }
312 Err(CheckpointStoreError::Serde(e)) => {
313 return Ok(ValidationResult {
314 checkpoint_id: id,
315 valid: false,
316 issues: vec![ValidationIssue::IntegrityFailure(format!(
317 "corrupt manifest: {e}"
318 ))],
319 });
320 }
321 Err(e) => return Err(e),
322 };
323
324 for err in manifest.validate(self.vnode_count()) {
325 issues.push(ValidationIssue::ManifestWarning(format!(
326 "manifest validation: {err}"
327 )));
328 }
329
330 if let Some(expected) = &manifest.state_checksum {
332 match self.load_state_data(id).await? {
333 Some(data) => {
334 let actual = sha256_hex(&data);
335 if actual != *expected {
336 issues.push(ValidationIssue::IntegrityFailure(format!(
337 "state.bin checksum mismatch: expected {expected}, got {actual}"
338 )));
339 }
340 }
341 None => {
342 issues.push(ValidationIssue::IntegrityFailure(
343 "state.bin referenced by checksum but not found".into(),
344 ));
345 }
346 }
347 }
348
349 if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
352 issues.push(ValidationIssue::IntegrityFailure(
353 "epoch or checkpoint_id is 0 — likely corrupted".into(),
354 ));
355 }
356
357 let valid = issues.iter().all(|i| !i.is_fatal());
358 Ok(ValidationResult {
359 checkpoint_id: id,
360 valid,
361 issues,
362 })
363 }
364
365 async fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
374 let start = std::time::Instant::now();
375 let mut skipped = Vec::new();
376
377 let mut ids = self.list_ids().await?;
380 ids.reverse();
381
382 let examined = ids.len();
383
384 for id in &ids {
385 let result = self.validate_checkpoint(*id).await?;
386 if result.valid {
387 return Ok(RecoveryReport {
388 chosen_id: Some(*id),
389 skipped,
390 examined,
391 elapsed: start.elapsed(),
392 });
393 }
394 let reason = result
395 .issues
396 .iter()
397 .map(ToString::to_string)
398 .collect::<Vec<_>>()
399 .join("; ");
400 warn!(
401 checkpoint_id = id,
402 reason = %reason,
403 "skipping invalid checkpoint"
404 );
405 skipped.push((*id, reason));
406 }
407
408 Ok(RecoveryReport {
409 chosen_id: None,
410 skipped,
411 examined,
412 elapsed: start.elapsed(),
413 })
414 }
415
416 async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
424 Ok(0)
426 }
427
428 async fn save_with_state(
442 &self,
443 manifest: &CheckpointManifest,
444 state_data: Option<&[bytes::Bytes]>,
445 ) -> Result<(), CheckpointStoreError> {
446 let mut manifest = manifest.clone();
447 if let Some(chunks) = state_data {
448 manifest.state_checksum = Some(sha256_hex_chunks(chunks));
455 self.save_state_data(manifest.checkpoint_id, chunks).await?;
456 }
457 self.save(&manifest).await
458 }
459}
460
461pub struct FileSystemCheckpointStore {
467 base_dir: PathBuf,
468 max_retained: usize,
469 vnode_count: u16,
470}
471
472impl FileSystemCheckpointStore {
473 #[must_use]
483 pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
484 Self {
485 base_dir: base_dir.into(),
486 max_retained,
487 vnode_count: crate::checkpoint_manifest::DEFAULT_VNODE_COUNT,
488 }
489 }
490
491 #[must_use]
493 pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
494 self.vnode_count = vnode_count;
495 self
496 }
497
498 fn checkpoints_dir(&self) -> PathBuf {
500 self.base_dir.join("checkpoints")
501 }
502
503 fn checkpoint_dir(&self, id: u64) -> PathBuf {
505 self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
506 }
507
508 fn manifest_path(&self, id: u64) -> PathBuf {
510 self.checkpoint_dir(id).join("manifest.json")
511 }
512
513 fn state_path(&self, id: u64) -> PathBuf {
515 self.checkpoint_dir(id).join("state.bin")
516 }
517
518 fn latest_path(&self) -> PathBuf {
520 self.checkpoints_dir().join("latest.txt")
521 }
522
523 fn parse_checkpoint_id(name: &str) -> Option<u64> {
525 name.strip_prefix("checkpoint_")
526 .and_then(|s| s.parse().ok())
527 }
528
529 async fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
531 let dir = self.checkpoints_dir();
532 let mut reader = match tokio::fs::read_dir(&dir).await {
533 Ok(r) => r,
534 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
535 Err(e) => return Err(e.into()),
536 };
537
538 let mut ids: Vec<u64> = Vec::new();
539 while let Some(entry) = reader.next_entry().await? {
540 let ft = entry.file_type().await?;
541 if !ft.is_dir() {
542 continue;
543 }
544 if let Some(id) = entry
545 .file_name()
546 .to_str()
547 .and_then(Self::parse_checkpoint_id)
548 {
549 ids.push(id);
550 }
551 }
552
553 ids.sort_unstable();
554 Ok(ids)
555 }
556}
557
558impl FileSystemCheckpointStore {
559 async fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
562 let dir = self.checkpoints_dir();
563 let mut reader = match tokio::fs::read_dir(&dir).await {
564 Ok(r) => r,
565 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
566 Err(e) => return Err(e.into()),
567 };
568
569 let mut orphans = Vec::new();
570 while let Some(entry) = reader.next_entry().await? {
571 let ft = entry.file_type().await?;
572 if !ft.is_dir() {
573 continue;
574 }
575 let path = entry.path();
576 let has_state = tokio::fs::metadata(path.join("state.bin")).await.is_ok();
577 let has_manifest = tokio::fs::metadata(path.join("manifest.json"))
578 .await
579 .is_ok();
580 if has_state && !has_manifest {
581 orphans.push(path);
582 }
583 }
584 Ok(orphans)
585 }
586}
587
588#[async_trait]
589impl CheckpointStore for FileSystemCheckpointStore {
590 fn vnode_count(&self) -> u16 {
591 self.vnode_count
592 }
593
594 async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
595 let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
596 tokio::fs::create_dir_all(&cp_dir).await?;
597
598 let manifest_path = self.manifest_path(manifest.checkpoint_id);
599 let json = serde_json::to_string_pretty(manifest)?;
600
601 let tmp_path = manifest_path.with_extension("json.tmp");
603 let write_res = async {
604 tokio::fs::write(&tmp_path, &json).await?;
605 sync_file(&tmp_path).await?;
606 tokio::fs::rename(&tmp_path, &manifest_path).await?;
607 sync_dir(&cp_dir).await
608 }
609 .await;
610 if let Err(e) = write_res {
611 let _ = tokio::fs::remove_file(&tmp_path).await;
613 return Err(e.into());
614 }
615
616 let latest = self.latest_path();
618 let latest_dir = latest.parent().unwrap_or(Path::new(".")).to_path_buf();
619 tokio::fs::create_dir_all(&latest_dir).await?;
620 let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
621 let tmp_latest = latest.with_extension("txt.tmp");
622 tokio::fs::write(&tmp_latest, &latest_content).await?;
623 sync_file(&tmp_latest).await?;
624 tokio::fs::rename(&tmp_latest, &latest).await?;
625 sync_dir(&latest_dir).await?;
626
627 if self.max_retained > 0 {
629 if let Err(e) = self.prune(self.max_retained).await {
630 tracing::warn!(
631 max_retained = self.max_retained,
632 error = %e,
633 "[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
634 );
635 }
636 }
637
638 Ok(())
639 }
640
641 async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
642 let latest = self.latest_path();
643 let content = match tokio::fs::read_to_string(&latest).await {
644 Ok(c) => c,
645 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
646 Err(e) => return Err(e.into()),
647 };
648 let dir_name = content.trim();
649 if dir_name.is_empty() {
650 return Ok(None);
651 }
652
653 match Self::parse_checkpoint_id(dir_name) {
654 Some(id) => self.load_by_id(id).await,
655 None => Ok(None),
656 }
657 }
658
659 async fn load_by_id(
660 &self,
661 id: u64,
662 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
663 let path = self.manifest_path(id);
664 let json = match tokio::fs::read_to_string(&path).await {
665 Ok(s) => s,
666 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
667 Err(e) => return Err(e.into()),
668 };
669 let manifest: CheckpointManifest = serde_json::from_str(&json)?;
670
671 let errors = manifest.validate(self.vnode_count());
672 if !errors.is_empty() {
673 tracing::warn!(
674 checkpoint_id = id,
675 error_count = errors.len(),
676 first_error = %errors[0],
677 "loaded checkpoint manifest has validation warnings"
678 );
679 }
680
681 Ok(Some(manifest))
682 }
683
684 async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
685 self.sorted_checkpoint_ids().await
686 }
687
688 async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
689 let ids = self.sorted_checkpoint_ids().await?;
690 let mut result = Vec::with_capacity(ids.len());
691
692 for id in ids {
693 if let Ok(Some(manifest)) = self.load_by_id(id).await {
695 result.push((manifest.checkpoint_id, manifest.epoch));
696 }
697 }
698
699 Ok(result)
700 }
701
702 async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
703 let ids = self.sorted_checkpoint_ids().await?;
704 if ids.len() <= keep_count {
705 return Ok(0);
706 }
707
708 let to_remove = ids.len() - keep_count;
709 let mut removed = 0;
710
711 for &id in &ids[..to_remove] {
712 let dir = self.checkpoint_dir(id);
713 if tokio::fs::remove_dir_all(&dir).await.is_ok() {
714 removed += 1;
715 }
716 }
717
718 Ok(removed)
719 }
720
721 async fn save_state_data(
722 &self,
723 id: u64,
724 chunks: &[bytes::Bytes],
725 ) -> Result<(), CheckpointStoreError> {
726 use tokio::io::AsyncWriteExt;
727
728 let cp_dir = self.checkpoint_dir(id);
729 tokio::fs::create_dir_all(&cp_dir).await?;
730
731 let path = self.state_path(id);
732 let tmp = path.with_extension("bin.tmp");
733
734 let mut file = tokio::fs::File::create(&tmp).await?;
738 for chunk in chunks {
739 file.write_all(chunk).await?;
740 }
741 file.sync_all().await?;
742 drop(file);
743
744 tokio::fs::rename(&tmp, &path).await?;
745 sync_dir(&cp_dir).await?;
746
747 Ok(())
748 }
749
750 async fn save_with_state(
751 &self,
752 manifest: &CheckpointManifest,
753 state_data: Option<&[bytes::Bytes]>,
754 ) -> Result<(), CheckpointStoreError> {
755 let mut manifest = manifest.clone();
756 if let Some(chunks) = state_data {
759 manifest.state_checksum = Some(sha256_hex_chunks(chunks));
760 self.save_state_data(manifest.checkpoint_id, chunks).await?;
761 }
762 self.save(&manifest).await
763 }
764
765 async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
766 let orphans = self.find_orphan_dirs().await?;
767 let mut cleaned = 0;
768 for dir in &orphans {
769 if tokio::fs::remove_dir_all(dir).await.is_ok() {
770 tracing::info!(
771 path = %dir.display(),
772 "cleaned up orphaned checkpoint directory"
773 );
774 cleaned += 1;
775 }
776 }
777 Ok(cleaned)
778 }
779
780 async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
781 let path = self.state_path(id);
782 match tokio::fs::read(&path).await {
783 Ok(data) => Ok(Some(data)),
784 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
785 Err(e) => Err(e.into()),
786 }
787 }
788}
789
790#[derive(serde::Serialize, serde::Deserialize)]
796struct LatestPointer {
797 checkpoint_id: u64,
798}
799
800pub struct ObjectStoreCheckpointStore {
822 store: Arc<dyn ObjectStore>,
823 prefix: String,
824 max_retained: usize,
825 vnode_count: u16,
826}
827
828impl ObjectStoreCheckpointStore {
829 #[must_use]
838 pub fn new(store: Arc<dyn ObjectStore>, prefix: String, max_retained: usize) -> Self {
839 Self {
840 store,
841 prefix,
842 max_retained,
843 vnode_count: crate::checkpoint_manifest::DEFAULT_VNODE_COUNT,
844 }
845 }
846
847 #[must_use]
849 pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
850 self.vnode_count = vnode_count;
851 self
852 }
853
854 fn manifest_path(&self, id: u64) -> object_store::path::Path {
855 object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
856 }
857
858 fn latest_pointer_path(&self) -> object_store::path::Path {
859 object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
860 }
861
862 fn state_path(&self, id: u64) -> object_store::path::Path {
863 object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
864 }
865
866 async fn put_with_retry(
880 &self,
881 path: &object_store::path::Path,
882 payload: PutPayload,
883 opts: &PutOptions,
884 ) -> Result<(), CheckpointStoreError> {
885 const BACKOFFS_MS: &[u64] = &[100, 500, 2000];
886 let mut attempt = 0usize;
887 loop {
888 let result = self
889 .store
890 .put_opts(path, payload.clone(), opts.clone())
891 .await;
892 match result {
893 Ok(_) => return Ok(()),
894 Err(object_store::Error::Generic { .. }) if attempt < BACKOFFS_MS.len() => {
895 let delay = std::time::Duration::from_millis(BACKOFFS_MS[attempt]);
896 tracing::warn!(
897 path = %path,
898 attempt = attempt + 1,
899 delay_ms = delay.as_millis(),
900 "transient put error, retrying"
901 );
902 tokio::time::sleep(delay).await;
903 attempt += 1;
904 }
905 Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
906 }
907 }
908 }
909
910 async fn get_bytes(
912 &self,
913 path: &object_store::path::Path,
914 ) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
915 match self.store.get_opts(path, GetOptions::default()).await {
916 Ok(get_result) => {
917 let data = get_result.bytes().await?;
918 Ok(Some(data))
919 }
920 Err(object_store::Error::NotFound { .. }) => Ok(None),
921 Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
922 }
923 }
924
925 async fn load_manifest_at(
927 &self,
928 path: &object_store::path::Path,
929 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
930 match self.get_bytes(path).await? {
931 Some(data) => {
932 let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
933 Ok(Some(manifest))
934 }
935 None => Ok(None),
936 }
937 }
938
939 async fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
941 use futures::TryStreamExt;
942
943 let mut ids = std::collections::BTreeSet::new();
944
945 let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
946 let entries: Vec<_> = self
947 .store
948 .list(Some(&manifests_prefix))
949 .try_collect()
950 .await?;
951 for entry in &entries {
952 if let Some(id) =
953 parse_checkpoint_id_from_path(entry.location.as_ref(), "manifest-", ".json")
954 {
955 ids.insert(id);
956 }
957 }
958
959 Ok(ids.into_iter().collect())
960 }
961}
962
963#[async_trait]
964impl CheckpointStore for ObjectStoreCheckpointStore {
965 fn vnode_count(&self) -> u16 {
966 self.vnode_count
967 }
968
969 async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
970 let json = serde_json::to_string_pretty(manifest)?;
971 let path = self.manifest_path(manifest.checkpoint_id);
972 let json_bytes = bytes::Bytes::from(json);
973
974 let create_opts = PutOptions {
976 mode: PutMode::Create,
977 ..PutOptions::default()
978 };
979 let result = self
980 .store
981 .put_opts(
982 &path,
983 PutPayload::from_bytes(json_bytes.clone()),
984 create_opts,
985 )
986 .await;
987
988 match result {
989 Ok(_) => {}
990 Err(object_store::Error::AlreadyExists { .. }) => {
991 tracing::warn!(
992 checkpoint_id = manifest.checkpoint_id,
993 "[LDB-6010] Manifest already exists — skipping write"
994 );
995 }
996 Err(object_store::Error::NotImplemented { .. }) => {
997 self.store
999 .put_opts(
1000 &path,
1001 PutPayload::from_bytes(json_bytes),
1002 PutOptions::default(),
1003 )
1004 .await?;
1005 }
1006 Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
1007 }
1008
1009 let latest = self.latest_pointer_path();
1016 if let Some(current) = self.get_bytes(&latest).await? {
1017 if let Ok(existing) = serde_json::from_slice::<LatestPointer>(¤t) {
1018 if existing.checkpoint_id > manifest.checkpoint_id {
1019 tracing::warn!(
1020 current = existing.checkpoint_id,
1021 ours = manifest.checkpoint_id,
1022 "[LDB-6010] latest.json already points at a newer checkpoint — \
1023 skipping pointer update (possible split-brain or delayed writer)"
1024 );
1025 return Ok(());
1026 }
1027 }
1028 }
1029 let pointer = serde_json::to_string(&LatestPointer {
1030 checkpoint_id: manifest.checkpoint_id,
1031 })?;
1032 self.put_with_retry(
1033 &latest,
1034 PutPayload::from_bytes(bytes::Bytes::from(pointer)),
1035 &PutOptions::default(),
1036 )
1037 .await?;
1038
1039 if self.max_retained > 0 {
1041 if let Err(e) = self.prune(self.max_retained).await {
1042 tracing::warn!(
1043 max_retained = self.max_retained,
1044 error = %e,
1045 "[LDB-6009] Object store checkpoint prune failed"
1046 );
1047 }
1048 }
1049
1050 Ok(())
1051 }
1052
1053 async fn update_manifest(
1054 &self,
1055 manifest: &CheckpointManifest,
1056 ) -> Result<(), CheckpointStoreError> {
1057 let json = serde_json::to_string_pretty(manifest)?;
1058 let path = self.manifest_path(manifest.checkpoint_id);
1059 let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
1060
1061 self.store
1063 .put_opts(&path, payload, PutOptions::default())
1064 .await?;
1065
1066 Ok(())
1067 }
1068
1069 async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1070 if let Some(data) = self.get_bytes(&self.latest_pointer_path()).await? {
1071 let pointer: LatestPointer = serde_json::from_slice(&data)?;
1072 return self.load_by_id(pointer.checkpoint_id).await;
1073 }
1074
1075 Ok(None)
1076 }
1077
1078 async fn load_by_id(
1079 &self,
1080 id: u64,
1081 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1082 self.load_manifest_at(&self.manifest_path(id)).await
1083 }
1084
1085 async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
1086 self.list_checkpoint_ids().await
1087 }
1088
1089 async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
1090 let ids = self.list_checkpoint_ids().await?;
1091 let mut result = Vec::with_capacity(ids.len());
1092
1093 for id in ids {
1094 if let Ok(Some(manifest)) = self.load_by_id(id).await {
1095 result.push((manifest.checkpoint_id, manifest.epoch));
1096 }
1097 }
1098
1099 Ok(result)
1100 }
1101
1102 async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
1103 let ids = self.list_checkpoint_ids().await?;
1104 if ids.len() <= keep_count {
1105 return Ok(0);
1106 }
1107
1108 let to_remove = ids.len() - keep_count;
1109 let mut removed = 0;
1110 let mut logged_error = false;
1111
1112 for &id in &ids[..to_remove] {
1113 let manifest = self.manifest_path(id);
1114 let state = self.state_path(id);
1115 let manifest_res = self.store.delete(&manifest).await;
1116 let state_res = self.store.delete(&state).await;
1117
1118 let manifest_ok = matches!(
1121 manifest_res,
1122 Ok(()) | Err(object_store::Error::NotFound { .. })
1123 );
1124 if manifest_ok {
1125 removed += 1;
1126 }
1127
1128 for err in [manifest_res, state_res]
1132 .into_iter()
1133 .filter_map(Result::err)
1134 {
1135 if matches!(err, object_store::Error::NotFound { .. }) {
1136 continue;
1137 }
1138 if !logged_error {
1139 tracing::warn!(
1140 checkpoint_id = id,
1141 error = %err,
1142 "[LDB-6027] checkpoint prune: delete failed — \
1143 retained objects may accumulate"
1144 );
1145 logged_error = true;
1146 }
1147 }
1148 }
1149
1150 Ok(removed)
1151 }
1152
1153 async fn save_state_data(
1154 &self,
1155 id: u64,
1156 chunks: &[bytes::Bytes],
1157 ) -> Result<(), CheckpointStoreError> {
1158 let path = self.state_path(id);
1159 let payload: PutPayload = chunks.iter().cloned().collect();
1163 self.put_with_retry(&path, payload, &PutOptions::default())
1165 .await
1166 }
1167
1168 async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
1169 Ok(self
1170 .get_bytes(&self.state_path(id))
1171 .await?
1172 .map(|d| d.to_vec()))
1173 }
1174
1175 async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
1176 use futures::{StreamExt, TryStreamExt};
1177
1178 let manifest_ids: std::collections::BTreeSet<u64> =
1180 self.list_checkpoint_ids().await?.into_iter().collect();
1181
1182 let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
1184 let entries: Vec<_> = self.store.list(Some(&state_prefix)).try_collect().await?;
1185
1186 let mut orphan_paths = Vec::new();
1187 for entry in &entries {
1188 if let Some(id) =
1189 parse_checkpoint_id_from_path(entry.location.as_ref(), "state-", ".bin")
1190 {
1191 if !manifest_ids.contains(&id) {
1192 orphan_paths.push(entry.location.clone());
1193 }
1194 }
1195 }
1196
1197 let count = orphan_paths.len();
1198 if !orphan_paths.is_empty() {
1199 let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
1200 let mut results = self.store.delete_stream(stream);
1201 while let Some(result) = results.next().await {
1202 if let Err(e) = result {
1203 tracing::warn!(error = %e, "failed to delete orphan state file");
1204 }
1205 }
1206 }
1207
1208 Ok(count)
1209 }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use super::*;
1215 use crate::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
1216 #[allow(clippy::disallowed_types)] use std::collections::HashMap;
1218
1219 fn make_store(dir: &Path) -> FileSystemCheckpointStore {
1220 FileSystemCheckpointStore::new(dir, 3)
1221 }
1222
1223 fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
1224 CheckpointManifest::new(id, epoch)
1225 }
1226
1227 #[tokio::test]
1228 async fn test_save_and_load_latest() {
1229 let dir = tempfile::tempdir().unwrap();
1230 let store = make_store(dir.path());
1231
1232 let m = make_manifest(1, 1);
1233 store.save(&m).await.unwrap();
1234
1235 let loaded = store.load_latest().await.unwrap().unwrap();
1236 assert_eq!(loaded.checkpoint_id, 1);
1237 assert_eq!(loaded.epoch, 1);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_load_latest_returns_none_when_empty() {
1242 let dir = tempfile::tempdir().unwrap();
1243 let store = make_store(dir.path());
1244 assert!(store.load_latest().await.unwrap().is_none());
1245 }
1246
1247 #[tokio::test]
1248 async fn test_load_latest_returns_most_recent() {
1249 let dir = tempfile::tempdir().unwrap();
1250 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1251
1252 for i in 1..=5 {
1253 store.save(&make_manifest(i, i)).await.unwrap();
1254 }
1255
1256 let latest = store.load_latest().await.unwrap().unwrap();
1257 assert_eq!(latest.checkpoint_id, 5);
1258 assert_eq!(latest.epoch, 5);
1259 }
1260
1261 #[tokio::test]
1262 async fn test_load_by_id() {
1263 let dir = tempfile::tempdir().unwrap();
1264 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1265
1266 store.save(&make_manifest(1, 10)).await.unwrap();
1267 store.save(&make_manifest(2, 20)).await.unwrap();
1268
1269 let m = store.load_by_id(1).await.unwrap().unwrap();
1270 assert_eq!(m.epoch, 10);
1271
1272 let m = store.load_by_id(2).await.unwrap().unwrap();
1273 assert_eq!(m.epoch, 20);
1274
1275 assert!(store.load_by_id(99).await.unwrap().is_none());
1276 }
1277
1278 #[tokio::test]
1279 async fn test_list() {
1280 let dir = tempfile::tempdir().unwrap();
1281 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1282
1283 store.save(&make_manifest(1, 10)).await.unwrap();
1284 store.save(&make_manifest(3, 30)).await.unwrap();
1285 store.save(&make_manifest(2, 20)).await.unwrap();
1286
1287 let list = store.list().await.unwrap();
1288 assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1289 }
1290
1291 #[tokio::test]
1292 async fn test_prune_keeps_max() {
1293 let dir = tempfile::tempdir().unwrap();
1294 let store = FileSystemCheckpointStore::new(dir.path(), 10); for i in 1..=5 {
1297 store.save(&make_manifest(i, i)).await.unwrap();
1298 }
1299
1300 let removed = store.prune(2).await.unwrap();
1301 assert_eq!(removed, 3);
1302
1303 let list = store.list().await.unwrap();
1304 assert_eq!(list.len(), 2);
1305 assert_eq!(list[0].0, 4);
1306 assert_eq!(list[1].0, 5);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_auto_prune_on_save() {
1311 let dir = tempfile::tempdir().unwrap();
1312 let store = FileSystemCheckpointStore::new(dir.path(), 2);
1313
1314 for i in 1..=5 {
1315 store.save(&make_manifest(i, i)).await.unwrap();
1316 }
1317
1318 let list = store.list().await.unwrap();
1319 assert_eq!(list.len(), 2);
1320 assert_eq!(list[0].0, 4);
1322 assert_eq!(list[1].0, 5);
1323 }
1324
1325 #[tokio::test]
1326 async fn test_save_and_load_state_data() {
1327 let dir = tempfile::tempdir().unwrap();
1328 let store = make_store(dir.path());
1329
1330 store.save(&make_manifest(1, 1)).await.unwrap();
1331
1332 let data = b"large operator state binary blob";
1333 store
1334 .save_state_data(1, &[bytes::Bytes::from_static(data)])
1335 .await
1336 .unwrap();
1337
1338 let loaded = store.load_state_data(1).await.unwrap().unwrap();
1339 assert_eq!(loaded, data);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_load_state_data_returns_none() {
1344 let dir = tempfile::tempdir().unwrap();
1345 let store = make_store(dir.path());
1346 assert!(store.load_state_data(99).await.unwrap().is_none());
1347 }
1348
1349 #[tokio::test]
1350 async fn test_full_manifest_round_trip() {
1351 let dir = tempfile::tempdir().unwrap();
1352 let store = make_store(dir.path());
1353
1354 let mut m = make_manifest(1, 5);
1355 m.source_offsets.insert(
1356 "kafka-src".into(),
1357 ConnectorCheckpoint::with_offsets(
1358 5,
1359 HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
1360 ),
1361 );
1362 m.sink_epochs.insert("pg-sink".into(), 4);
1363 m.table_offsets.insert(
1364 "instruments".into(),
1365 ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
1366 );
1367 m.operator_states
1368 .insert("window".into(), OperatorCheckpoint::inline(b"data"));
1369 m.watermark = Some(999_000);
1370
1371 store.save(&m).await.unwrap();
1372
1373 let loaded = store.load_latest().await.unwrap().unwrap();
1374 assert_eq!(loaded.checkpoint_id, 1);
1375 assert_eq!(loaded.epoch, 5);
1376 assert_eq!(loaded.watermark, Some(999_000));
1377
1378 let src = loaded.source_offsets.get("kafka-src").unwrap();
1379 assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
1380
1381 assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
1382
1383 let tbl = loaded.table_offsets.get("instruments").unwrap();
1384 assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
1385
1386 let op = loaded.operator_states.get("window").unwrap();
1387 assert_eq!(op.decode_inline().unwrap(), b"data");
1388 }
1389
1390 #[tokio::test]
1391 async fn test_empty_latest_txt() {
1392 let dir = tempfile::tempdir().unwrap();
1393 let store = make_store(dir.path());
1394
1395 let cp_dir = dir.path().join("checkpoints");
1396 std::fs::create_dir_all(&cp_dir).unwrap();
1397 std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
1398
1399 assert!(store.load_latest().await.unwrap().is_none());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_latest_points_to_missing_checkpoint() {
1404 let dir = tempfile::tempdir().unwrap();
1405 let store = make_store(dir.path());
1406
1407 let cp_dir = dir.path().join("checkpoints");
1408 std::fs::create_dir_all(&cp_dir).unwrap();
1409 std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
1410
1411 assert!(store.load_latest().await.unwrap().is_none());
1412 }
1413
1414 #[tokio::test]
1415 async fn test_prune_no_op_when_under_limit() {
1416 let dir = tempfile::tempdir().unwrap();
1417 let store = make_store(dir.path());
1418
1419 store.save(&make_manifest(1, 1)).await.unwrap();
1420 let removed = store.prune(5).await.unwrap();
1421 assert_eq!(removed, 0);
1422 }
1423
1424 #[tokio::test]
1425 async fn test_save_with_state_writes_sidecar_before_manifest() {
1426 let dir = tempfile::tempdir().unwrap();
1427 let store = make_store(dir.path());
1428
1429 let m = make_manifest(1, 1);
1430 let state = b"large-operator-state-blob";
1431 store
1432 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1433 .await
1434 .unwrap();
1435
1436 let loaded = store.load_latest().await.unwrap().unwrap();
1438 assert_eq!(loaded.checkpoint_id, 1);
1439
1440 let loaded_state = store.load_state_data(1).await.unwrap().unwrap();
1441 assert_eq!(loaded_state, state);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_save_with_state_none_is_same_as_save() {
1446 let dir = tempfile::tempdir().unwrap();
1447 let store = make_store(dir.path());
1448
1449 let m = make_manifest(1, 1);
1450 store.save_with_state(&m, None).await.unwrap();
1451
1452 let loaded = store.load_latest().await.unwrap().unwrap();
1453 assert_eq!(loaded.checkpoint_id, 1);
1454 assert!(store.load_state_data(1).await.unwrap().is_none());
1455 }
1456
1457 #[tokio::test]
1458 async fn test_orphaned_state_without_manifest_is_ignored() {
1459 let dir = tempfile::tempdir().unwrap();
1460 let store = make_store(dir.path());
1461
1462 store
1465 .save_state_data(1, &[bytes::Bytes::from_static(b"orphaned")])
1466 .await
1467 .unwrap();
1468
1469 assert!(store.load_latest().await.unwrap().is_none());
1471
1472 assert!(store.list().await.unwrap().is_empty());
1474 }
1475
1476 fn make_obj_store() -> ObjectStoreCheckpointStore {
1481 let store = Arc::new(object_store::memory::InMemory::new());
1482 ObjectStoreCheckpointStore::new(store, String::new(), 3)
1483 }
1484
1485 #[tokio::test]
1486 async fn test_obj_save_and_load_latest() {
1487 let store = make_obj_store();
1488 let m = make_manifest(1, 1);
1489 store.save(&m).await.unwrap();
1490
1491 let loaded = store.load_latest().await.unwrap().unwrap();
1492 assert_eq!(loaded.checkpoint_id, 1);
1493 assert_eq!(loaded.epoch, 1);
1494 }
1495
1496 #[tokio::test]
1497 async fn test_obj_load_latest_returns_none_when_empty() {
1498 let store = make_obj_store();
1499 assert!(store.load_latest().await.unwrap().is_none());
1500 }
1501
1502 #[tokio::test]
1503 async fn test_obj_load_by_id() {
1504 let store = ObjectStoreCheckpointStore::new(
1505 Arc::new(object_store::memory::InMemory::new()),
1506 String::new(),
1507 10,
1508 );
1509
1510 store.save(&make_manifest(1, 10)).await.unwrap();
1511 store.save(&make_manifest(2, 20)).await.unwrap();
1512
1513 let m = store.load_by_id(1).await.unwrap().unwrap();
1514 assert_eq!(m.epoch, 10);
1515 let m = store.load_by_id(2).await.unwrap().unwrap();
1516 assert_eq!(m.epoch, 20);
1517 assert!(store.load_by_id(99).await.unwrap().is_none());
1518 }
1519
1520 #[tokio::test]
1521 async fn test_obj_list() {
1522 let store = ObjectStoreCheckpointStore::new(
1523 Arc::new(object_store::memory::InMemory::new()),
1524 String::new(),
1525 10,
1526 );
1527
1528 store.save(&make_manifest(1, 10)).await.unwrap();
1529 store.save(&make_manifest(3, 30)).await.unwrap();
1530 store.save(&make_manifest(2, 20)).await.unwrap();
1531
1532 let list = store.list().await.unwrap();
1533 assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1534 }
1535
1536 #[tokio::test]
1537 async fn test_obj_prune() {
1538 let store = ObjectStoreCheckpointStore::new(
1539 Arc::new(object_store::memory::InMemory::new()),
1540 String::new(),
1541 10,
1542 );
1543
1544 for i in 1..=5 {
1545 store.save(&make_manifest(i, i)).await.unwrap();
1546 }
1547
1548 let removed = store.prune(2).await.unwrap();
1549 assert_eq!(removed, 3);
1550
1551 let list = store.list().await.unwrap();
1552 assert_eq!(list.len(), 2);
1553 assert_eq!(list[0].0, 4);
1554 assert_eq!(list[1].0, 5);
1555 }
1556
1557 #[tokio::test]
1558 async fn test_obj_auto_prune_on_save() {
1559 let store = ObjectStoreCheckpointStore::new(
1560 Arc::new(object_store::memory::InMemory::new()),
1561 String::new(),
1562 2,
1563 );
1564
1565 for i in 1..=5 {
1566 store.save(&make_manifest(i, i)).await.unwrap();
1567 }
1568
1569 let list = store.list().await.unwrap();
1570 assert_eq!(list.len(), 2);
1571 assert_eq!(list[0].0, 4);
1572 assert_eq!(list[1].0, 5);
1573 }
1574
1575 #[tokio::test]
1576 async fn test_obj_save_and_load_state_data() {
1577 let store = make_obj_store();
1578 store.save(&make_manifest(1, 1)).await.unwrap();
1579
1580 let data = b"large operator state binary blob";
1581 store
1582 .save_state_data(1, &[bytes::Bytes::from_static(data)])
1583 .await
1584 .unwrap();
1585
1586 let loaded = store.load_state_data(1).await.unwrap().unwrap();
1587 assert_eq!(loaded, data);
1588 }
1589
1590 #[tokio::test]
1591 async fn test_obj_load_state_data_returns_none() {
1592 let store = make_obj_store();
1593 assert!(store.load_state_data(99).await.unwrap().is_none());
1594 }
1595
1596 #[tokio::test]
1597 async fn test_obj_with_prefix() {
1598 let inner = Arc::new(object_store::memory::InMemory::new());
1599 let store = ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10);
1600
1601 store.save(&make_manifest(1, 42)).await.unwrap();
1602 let loaded = store.load_latest().await.unwrap().unwrap();
1603 assert_eq!(loaded.checkpoint_id, 1);
1604 assert_eq!(loaded.epoch, 42);
1605 }
1606
1607 #[tokio::test]
1612 async fn test_obj_layout_paths() {
1613 let inner = Arc::new(object_store::memory::InMemory::new());
1614 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1615
1616 store.save(&make_manifest(1, 10)).await.unwrap();
1617
1618 let result = inner
1619 .get_opts(
1620 &object_store::path::Path::from("manifests/manifest-000001.json"),
1621 GetOptions::default(),
1622 )
1623 .await;
1624 assert!(result.is_ok(), "manifest path should exist");
1625
1626 let result = inner
1627 .get_opts(
1628 &object_store::path::Path::from("manifests/latest.json"),
1629 GetOptions::default(),
1630 )
1631 .await;
1632 assert!(result.is_ok(), "latest.json should exist");
1633 }
1634
1635 #[tokio::test]
1636 async fn test_obj_conditional_put_idempotent() {
1637 let store = ObjectStoreCheckpointStore::new(
1638 Arc::new(object_store::memory::InMemory::new()),
1639 String::new(),
1640 10,
1641 );
1642
1643 let m = make_manifest(1, 10);
1644 store.save(&m).await.unwrap();
1645
1646 store.save(&m).await.unwrap();
1648
1649 let loaded = store.load_latest().await.unwrap().unwrap();
1650 assert_eq!(loaded.checkpoint_id, 1);
1651 assert_eq!(loaded.epoch, 10);
1652 }
1653
1654 #[tokio::test]
1655 async fn test_obj_update_manifest_overwrites() {
1656 use crate::checkpoint_manifest::SinkCommitStatus;
1657
1658 let store = make_obj_store();
1659
1660 let mut m = make_manifest(1, 10);
1662 m.sink_commit_statuses
1663 .insert("pg-sink".into(), SinkCommitStatus::Pending);
1664 store.save(&m).await.unwrap();
1665
1666 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1668 assert_eq!(
1669 loaded.sink_commit_statuses.get("pg-sink"),
1670 Some(&SinkCommitStatus::Pending)
1671 );
1672
1673 m.sink_commit_statuses
1675 .insert("pg-sink".into(), SinkCommitStatus::Committed);
1676 store.update_manifest(&m).await.unwrap();
1677
1678 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1680 assert_eq!(
1681 loaded.sink_commit_statuses.get("pg-sink"),
1682 Some(&SinkCommitStatus::Committed)
1683 );
1684 }
1685
1686 #[tokio::test]
1687 async fn test_obj_save_still_uses_conditional_put() {
1688 let store = make_obj_store();
1689
1690 let m = make_manifest(1, 10);
1691 store.save(&m).await.unwrap();
1692
1693 store.save(&m).await.unwrap();
1696
1697 let mut m2 = make_manifest(1, 10);
1699 m2.watermark = Some(42);
1700 store.update_manifest(&m2).await.unwrap();
1701
1702 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1703 assert_eq!(loaded.watermark, Some(42));
1704 }
1705
1706 #[tokio::test]
1707 async fn test_fs_update_manifest_overwrites() {
1708 use crate::checkpoint_manifest::SinkCommitStatus;
1709
1710 let dir = tempfile::tempdir().unwrap();
1711 let store = make_store(dir.path());
1712
1713 let mut m = make_manifest(1, 10);
1714 m.sink_commit_statuses
1715 .insert("sink-a".into(), SinkCommitStatus::Pending);
1716 store.save(&m).await.unwrap();
1717
1718 m.sink_commit_statuses
1719 .insert("sink-a".into(), SinkCommitStatus::Committed);
1720 store.update_manifest(&m).await.unwrap();
1721
1722 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1723 assert_eq!(
1724 loaded.sink_commit_statuses.get("sink-a"),
1725 Some(&SinkCommitStatus::Committed)
1726 );
1727 }
1728
1729 #[tokio::test]
1730 async fn test_obj_state_paths() {
1731 let inner = Arc::new(object_store::memory::InMemory::new());
1732 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1733
1734 store.save(&make_manifest(1, 1)).await.unwrap();
1735 store
1736 .save_state_data(1, &[bytes::Bytes::from_static(b"state-blob")])
1737 .await
1738 .unwrap();
1739
1740 let result = inner
1741 .get_opts(
1742 &object_store::path::Path::from("checkpoints/state-000001.bin"),
1743 GetOptions::default(),
1744 )
1745 .await;
1746 assert!(result.is_ok(), "state path should exist");
1747 }
1748
1749 #[tokio::test]
1750 async fn test_obj_latest_json_format() {
1751 let inner = Arc::new(object_store::memory::InMemory::new());
1752 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1753
1754 store.save(&make_manifest(5, 50)).await.unwrap();
1755
1756 let data = inner
1757 .get_opts(
1758 &object_store::path::Path::from("manifests/latest.json"),
1759 GetOptions::default(),
1760 )
1761 .await
1762 .unwrap()
1763 .bytes()
1764 .await
1765 .unwrap();
1766
1767 let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
1768 assert_eq!(pointer.checkpoint_id, 5);
1769 }
1770
1771 #[tokio::test]
1772 async fn test_obj_latest_monotonic_guard_skips_regression() {
1773 let inner = Arc::new(object_store::memory::InMemory::new());
1774 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1775
1776 store.save(&make_manifest(10, 10)).await.unwrap();
1777 store.save(&make_manifest(5, 5)).await.unwrap();
1781
1782 let loaded = store.load_latest().await.unwrap().unwrap();
1783 assert_eq!(
1784 loaded.checkpoint_id, 10,
1785 "latest pointer should not regress to an older id"
1786 );
1787 }
1788
1789 #[tokio::test]
1790 async fn test_validate_checkpoint_valid() {
1791 let dir = tempfile::tempdir().unwrap();
1792 let store = make_store(dir.path());
1793
1794 let m = make_manifest(1, 1);
1795 store.save(&m).await.unwrap();
1796
1797 let result = store.validate_checkpoint(1).await.unwrap();
1798 assert!(result.valid, "valid checkpoint: {:?}", result.issues);
1799 assert!(result.issues.is_empty());
1800 }
1801
1802 #[tokio::test]
1803 async fn test_validate_checkpoint_epoch_zero_invalid() {
1804 let dir = tempfile::tempdir().unwrap();
1805 let store = make_store(dir.path());
1806
1807 let m = make_manifest(1, 0);
1809 store.save(&m).await.unwrap();
1810
1811 let result = store.validate_checkpoint(1).await.unwrap();
1812 assert!(!result.valid, "epoch=0 should be invalid");
1813 assert!(
1814 result.issues.iter().any(|i| i.message().contains("epoch")),
1815 "should mention epoch: {:?}",
1816 result.issues
1817 );
1818 }
1819
1820 #[tokio::test]
1821 async fn test_validate_checkpoint_missing_manifest() {
1822 let dir = tempfile::tempdir().unwrap();
1823 let store = make_store(dir.path());
1824
1825 let result = store.validate_checkpoint(99).await.unwrap();
1826 assert!(!result.valid);
1827 assert!(result.issues[0].message().contains("not found"));
1828 }
1829
1830 #[tokio::test]
1831 async fn test_validate_checkpoint_corrupt_manifest() {
1832 let dir = tempfile::tempdir().unwrap();
1833 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1834
1835 let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
1837 std::fs::create_dir_all(&cp_dir).unwrap();
1838 std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
1839
1840 let result = store.validate_checkpoint(1).await.unwrap();
1842 assert!(!result.valid);
1843 assert!(
1844 result.issues[0].message().contains("corrupt manifest"),
1845 "expected corrupt manifest issue: {:?}",
1846 result.issues
1847 );
1848 }
1849
1850 #[tokio::test]
1851 async fn test_validate_checkpoint_state_checksum_ok() {
1852 let dir = tempfile::tempdir().unwrap();
1853 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1854
1855 let state = b"important operator state";
1856 let m = make_manifest(1, 1);
1857 store
1858 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1859 .await
1860 .unwrap();
1861
1862 let result = store.validate_checkpoint(1).await.unwrap();
1863 assert!(result.valid, "checksum should match: {:?}", result.issues);
1864 }
1865
1866 #[tokio::test]
1867 async fn test_validate_checkpoint_state_checksum_mismatch() {
1868 let dir = tempfile::tempdir().unwrap();
1869 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1870
1871 let state = b"original state";
1873 let m = make_manifest(1, 1);
1874 store
1875 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1876 .await
1877 .unwrap();
1878
1879 let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1881 std::fs::write(&state_path, b"corrupted data!!").unwrap();
1882
1883 let result = store.validate_checkpoint(1).await.unwrap();
1884 assert!(!result.valid, "corrupted state should be invalid");
1885 assert!(
1886 result
1887 .issues
1888 .iter()
1889 .any(|i| i.message().contains("checksum mismatch")),
1890 "should report checksum mismatch: {:?}",
1891 result.issues
1892 );
1893 }
1894
1895 #[tokio::test]
1896 async fn test_validate_checkpoint_state_missing_when_expected() {
1897 let dir = tempfile::tempdir().unwrap();
1898 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1899
1900 let m = make_manifest(1, 1);
1902 store
1903 .save_with_state(&m, Some(&[bytes::Bytes::from_static(b"state")]))
1904 .await
1905 .unwrap();
1906
1907 let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1909 std::fs::remove_file(&state_path).unwrap();
1910
1911 let result = store.validate_checkpoint(1).await.unwrap();
1912 assert!(!result.valid);
1913 assert!(
1914 result
1915 .issues
1916 .iter()
1917 .any(|i| i.message().contains("not found")),
1918 "should report missing state: {:?}",
1919 result.issues
1920 );
1921 }
1922
1923 #[tokio::test]
1924 async fn test_recover_latest_validated_skips_corrupt() {
1925 let dir = tempfile::tempdir().unwrap();
1926 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1927
1928 store.save(&make_manifest(1, 10)).await.unwrap();
1930 store.save(&make_manifest(2, 20)).await.unwrap();
1931
1932 let cp2_manifest = dir
1934 .path()
1935 .join("checkpoints/checkpoint_000002/manifest.json");
1936 std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
1937
1938 let report = store.recover_latest_validated().await.unwrap();
1940 assert_eq!(report.chosen_id, Some(1));
1941 assert_eq!(report.skipped.len(), 1);
1942 assert_eq!(report.skipped[0].0, 2);
1943 assert_eq!(report.examined, 2);
1944 }
1945
1946 #[tokio::test]
1947 async fn test_recover_latest_validated_fresh_start() {
1948 let dir = tempfile::tempdir().unwrap();
1949 let store = make_store(dir.path());
1950
1951 let report = store.recover_latest_validated().await.unwrap();
1952 assert!(report.chosen_id.is_none());
1953 assert_eq!(report.examined, 0);
1954 }
1955
1956 #[tokio::test]
1957 async fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
1958 let dir = tempfile::tempdir().unwrap();
1959 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1960
1961 store.save(&make_manifest(1, 1)).await.unwrap();
1963 let cp_manifest = dir
1964 .path()
1965 .join("checkpoints/checkpoint_000001/manifest.json");
1966 std::fs::write(cp_manifest, "corrupt").unwrap();
1967
1968 let report = store.recover_latest_validated().await.unwrap();
1972 assert!(report.chosen_id.is_none());
1973 }
1974
1975 #[tokio::test]
1976 async fn test_cleanup_orphans_removes_stateless_dirs() {
1977 let dir = tempfile::tempdir().unwrap();
1978 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1979
1980 let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
1982 std::fs::create_dir_all(&orphan_dir).unwrap();
1983 std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
1984
1985 store.save(&make_manifest(1, 1)).await.unwrap();
1987
1988 let cleaned = store.cleanup_orphans().await.unwrap();
1989 assert_eq!(cleaned, 1);
1990
1991 assert!(!orphan_dir.exists());
1993 assert!(store.load_by_id(1).await.unwrap().is_some());
1995 }
1996
1997 #[tokio::test]
1998 async fn test_cleanup_orphans_noop_when_clean() {
1999 let dir = tempfile::tempdir().unwrap();
2000 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2001
2002 store.save(&make_manifest(1, 1)).await.unwrap();
2003 let cleaned = store.cleanup_orphans().await.unwrap();
2004 assert_eq!(cleaned, 0);
2005 }
2006
2007 #[tokio::test]
2008 async fn test_save_with_state_writes_checksum() {
2009 let dir = tempfile::tempdir().unwrap();
2010 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2011
2012 let state = b"state-data-for-checksum";
2013 let m = make_manifest(1, 1);
2014 store
2015 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2016 .await
2017 .unwrap();
2018
2019 let loaded = store.load_latest().await.unwrap().unwrap();
2020 assert!(
2021 loaded.state_checksum.is_some(),
2022 "state_checksum should be set"
2023 );
2024 let expected = sha256_hex(state);
2025 assert_eq!(loaded.state_checksum.unwrap(), expected);
2026 }
2027
2028 #[tokio::test]
2029 async fn test_state_checksum_backward_compat() {
2030 let json = r#"{
2032 "version": 1,
2033 "checkpoint_id": 1,
2034 "epoch": 1,
2035 "timestamp_ms": 1000
2036 }"#;
2037 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
2038 assert!(m.state_checksum.is_none());
2039 }
2040
2041 #[tokio::test]
2044 async fn test_obj_validate_checkpoint_valid() {
2045 let store = make_obj_store();
2046 store.save(&make_manifest(1, 1)).await.unwrap();
2047
2048 let result = store.validate_checkpoint(1).await.unwrap();
2049 assert!(result.valid, "valid checkpoint: {:?}", result.issues);
2050 }
2051
2052 #[tokio::test]
2053 async fn test_obj_validate_checkpoint_missing() {
2054 let store = make_obj_store();
2055 let result = store.validate_checkpoint(99).await.unwrap();
2056 assert!(!result.valid);
2057 }
2058
2059 #[tokio::test]
2060 async fn test_obj_validate_state_checksum() {
2061 let store = ObjectStoreCheckpointStore::new(
2062 Arc::new(object_store::memory::InMemory::new()),
2063 String::new(),
2064 10,
2065 );
2066
2067 let state = b"obj-store-state-data";
2068 let m = make_manifest(1, 1);
2069 store
2070 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2071 .await
2072 .unwrap();
2073
2074 let result = store.validate_checkpoint(1).await.unwrap();
2075 assert!(result.valid, "checksum should match: {:?}", result.issues);
2076 }
2077
2078 #[tokio::test]
2079 async fn test_obj_recover_latest_validated() {
2080 let store = ObjectStoreCheckpointStore::new(
2081 Arc::new(object_store::memory::InMemory::new()),
2082 String::new(),
2083 10,
2084 );
2085
2086 store.save(&make_manifest(1, 10)).await.unwrap();
2087 store.save(&make_manifest(2, 20)).await.unwrap();
2088
2089 let report = store.recover_latest_validated().await.unwrap();
2090 assert_eq!(report.chosen_id, Some(2));
2091 assert!(report.skipped.is_empty());
2092 }
2093
2094 #[tokio::test]
2095 async fn test_obj_cleanup_orphans() {
2096 let inner = Arc::new(object_store::memory::InMemory::new());
2097 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
2098
2099 let state = b"state-with-manifest";
2101 store
2102 .save_with_state(
2103 &make_manifest(1, 1),
2104 Some(&[bytes::Bytes::from_static(state)]),
2105 )
2106 .await
2107 .unwrap();
2108
2109 let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
2111 inner
2112 .put_opts(
2113 &orphan_path,
2114 PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
2115 PutOptions::default(),
2116 )
2117 .await
2118 .unwrap();
2119
2120 let cleaned = store.cleanup_orphans().await.unwrap();
2121 assert_eq!(cleaned, 1);
2122
2123 let real_state = store.load_state_data(1).await.unwrap();
2125 assert!(real_state.is_some());
2126 }
2127}