1#![allow(clippy::disallowed_types)] use std::path::{Path, PathBuf};
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use object_store::{GetOptions, ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
26use sha2::{Digest, Sha256};
27use tracing::warn;
28
29use crate::checkpoint::checkpoint_manifest::CheckpointManifest;
30
31async fn sync_file(path: &Path) -> Result<(), std::io::Error> {
33 let f = tokio::fs::OpenOptions::new().write(true).open(path).await?;
35 f.sync_all().await
36}
37
38#[allow(clippy::unnecessary_wraps, clippy::unused_async)] async fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
44 #[cfg(unix)]
45 {
46 let f = tokio::fs::File::open(path).await?;
47 f.sync_all().await?;
48 }
49 #[cfg(not(unix))]
50 {
51 let _ = path;
52 }
53 Ok(())
54}
55
56#[derive(Debug, thiserror::Error)]
58pub enum CheckpointStoreError {
59 #[error("checkpoint I/O error: {0}")]
61 Io(#[from] std::io::Error),
62
63 #[error("checkpoint serialization error: {0}")]
65 Serde(#[from] serde_json::Error),
66
67 #[error("checkpoint {0} not found")]
69 NotFound(u64),
70
71 #[error("object store error: {0}")]
73 ObjectStore(#[from] object_store::Error),
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum ValidationIssue {
86 ManifestWarning(String),
89 IntegrityFailure(String),
92}
93
94impl ValidationIssue {
95 #[must_use]
97 pub fn is_fatal(&self) -> bool {
98 matches!(self, Self::IntegrityFailure(_))
99 }
100
101 #[must_use]
103 pub fn message(&self) -> &str {
104 match self {
105 Self::ManifestWarning(s) | Self::IntegrityFailure(s) => s,
106 }
107 }
108}
109
110impl std::fmt::Display for ValidationIssue {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.write_str(self.message())
113 }
114}
115
116#[derive(Debug, Clone)]
118pub struct ValidationResult {
119 pub checkpoint_id: u64,
121 pub valid: bool,
124 pub issues: Vec<ValidationIssue>,
126}
127
128#[derive(Debug, Clone)]
133pub struct RecoveryReport {
134 pub chosen_id: Option<u64>,
136 pub skipped: Vec<(u64, String)>,
138 pub examined: usize,
140 pub elapsed: std::time::Duration,
142}
143
144fn parse_checkpoint_id_from_path(path: &str, prefix: &str, suffix: &str) -> Option<u64> {
151 for segment in path.split('/') {
152 let Some(rest) = segment.strip_prefix(prefix) else {
153 continue;
154 };
155 let Some(id_str) = rest.strip_suffix(suffix) else {
156 continue;
157 };
158 if let Ok(id) = id_str.parse::<u64>() {
159 return Some(id);
160 }
161 warn!(
162 path,
163 prefix, suffix, "malformed checkpoint id in object path — skipped"
164 );
165 return None;
166 }
167 None
168}
169
170fn sha256_hex(data: &[u8]) -> String {
172 let mut hasher = Sha256::new();
173 hasher.update(data);
174 format!("{:x}", hasher.finalize())
175}
176
177fn sha256_hex_chunks(chunks: &[bytes::Bytes]) -> String {
184 let mut hasher = Sha256::new();
185 for chunk in chunks {
186 hasher.update(chunk);
187 }
188 format!("{:x}", hasher.finalize())
189}
190
191fn sha256_hex_mixed<'a, I>(
194 states: &std::collections::HashMap<
195 String,
196 crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
197 >,
198 sidecar_chunks: I,
199) -> String
200where
201 I: IntoIterator<Item = &'a [u8]>,
202{
203 let inline = sha256_hex_inline_states(states);
204 let mut hasher = Sha256::new();
205 hasher.update(inline.as_bytes());
206 for chunk in sidecar_chunks {
207 hasher.update(chunk);
208 }
209 format!("{:x}", hasher.finalize())
210}
211
212fn sha256_hex_inline_states(
215 states: &std::collections::HashMap<
216 String,
217 crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
218 >,
219) -> String {
220 let mut names: Vec<&String> = states.keys().collect();
221 names.sort_unstable();
222 let mut hasher = Sha256::new();
223 for n in names {
224 if let Some(op) = states.get(n) {
225 if op.external {
226 continue;
227 }
228 hasher.update(n.as_bytes());
229 hasher.update([0u8]);
230 if let Some(b64) = &op.state_b64 {
231 hasher.update(b64.as_bytes());
232 }
233 hasher.update([0u8]);
234 }
235 }
236 format!("{:x}", hasher.finalize())
237}
238
239#[async_trait]
245pub trait CheckpointStore: Send + Sync {
246 fn vnode_count(&self) -> u16 {
252 crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT
253 }
254
255 async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
261
262 async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
268
269 async fn load_by_id(&self, id: u64)
274 -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
275
276 async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
283
284 async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
291 Ok(self.list().await?.iter().map(|(id, _)| *id).collect())
294 }
295
296 async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
302
303 async fn update_manifest(
310 &self,
311 manifest: &CheckpointManifest,
312 ) -> Result<(), CheckpointStoreError> {
313 self.save(manifest).await
314 }
315
316 async fn save_state_data(
327 &self,
328 id: u64,
329 chunks: &[bytes::Bytes],
330 ) -> Result<(), CheckpointStoreError>;
331
332 async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
338
339 async fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
348 let mut issues = Vec::new();
349
350 let manifest = match self.load_by_id(id).await {
352 Ok(Some(m)) => m,
353 Ok(None) => {
354 return Ok(ValidationResult {
355 checkpoint_id: id,
356 valid: false,
357 issues: vec![ValidationIssue::IntegrityFailure(format!(
358 "manifest not found for checkpoint {id}"
359 ))],
360 });
361 }
362 Err(CheckpointStoreError::Serde(e)) => {
363 return Ok(ValidationResult {
364 checkpoint_id: id,
365 valid: false,
366 issues: vec![ValidationIssue::IntegrityFailure(format!(
367 "corrupt manifest: {e}"
368 ))],
369 });
370 }
371 Err(e) => return Err(e),
372 };
373
374 for err in manifest.validate(self.vnode_count()) {
375 issues.push(ValidationIssue::ManifestWarning(format!(
376 "manifest validation: {err}"
377 )));
378 }
379
380 if let Some(expected) = &manifest.state_checksum {
384 let any_inline = manifest.operator_states.values().any(|o| !o.external);
385 let any_external = manifest.operator_states.values().any(|o| o.external);
386 let needs_sidecar = any_external || !any_inline;
387 let sidecar = if needs_sidecar {
388 self.load_state_data(id).await?
389 } else {
390 None
391 };
392 let actual = match (any_inline, &sidecar) {
393 (true, Some(data)) => {
394 sha256_hex_mixed(&manifest.operator_states, std::iter::once(data.as_slice()))
395 }
396 (true, None) if !any_external => {
397 sha256_hex_inline_states(&manifest.operator_states)
398 }
399 (_, Some(data)) => sha256_hex(data),
400 (_, None) => {
401 issues.push(ValidationIssue::IntegrityFailure(
402 "state.bin referenced by checksum but not found".into(),
403 ));
404 String::new()
405 }
406 };
407 if !actual.is_empty() && actual != *expected {
408 let label = if any_inline && any_external {
409 "mixed state checksum mismatch"
410 } else if any_inline {
411 "inline state checksum mismatch"
412 } else {
413 "state.bin checksum mismatch"
414 };
415 issues.push(ValidationIssue::IntegrityFailure(format!(
416 "{label}: expected {expected}, got {actual}"
417 )));
418 }
419 }
420
421 if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
424 issues.push(ValidationIssue::IntegrityFailure(
425 "epoch or checkpoint_id is 0 — likely corrupted".into(),
426 ));
427 }
428
429 let valid = issues.iter().all(|i| !i.is_fatal());
430 Ok(ValidationResult {
431 checkpoint_id: id,
432 valid,
433 issues,
434 })
435 }
436
437 async fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
446 let start = std::time::Instant::now();
447 let mut skipped = Vec::new();
448
449 let mut ids = self.list_ids().await?;
452 ids.reverse();
453
454 let examined = ids.len();
455
456 for id in &ids {
457 let result = self.validate_checkpoint(*id).await?;
458 if result.valid {
459 return Ok(RecoveryReport {
460 chosen_id: Some(*id),
461 skipped,
462 examined,
463 elapsed: start.elapsed(),
464 });
465 }
466 let reason = result
467 .issues
468 .iter()
469 .map(ToString::to_string)
470 .collect::<Vec<_>>()
471 .join("; ");
472 warn!(
473 checkpoint_id = id,
474 reason = %reason,
475 "skipping invalid checkpoint"
476 );
477 skipped.push((*id, reason));
478 }
479
480 Ok(RecoveryReport {
481 chosen_id: None,
482 skipped,
483 examined,
484 elapsed: start.elapsed(),
485 })
486 }
487
488 async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
496 Ok(0)
498 }
499
500 async fn save_with_state(
514 &self,
515 manifest: &CheckpointManifest,
516 state_data: Option<&[bytes::Bytes]>,
517 ) -> Result<(), CheckpointStoreError> {
518 let mut manifest = manifest.clone();
519 if let Some(chunks) = state_data {
520 manifest.state_checksum = Some(stamp_checksum(&manifest.operator_states, Some(chunks)));
527 self.save_state_data(manifest.checkpoint_id, chunks).await?;
528 } else if !manifest.operator_states.is_empty()
529 && manifest.operator_states.values().all(|o| !o.external)
530 && manifest.state_checksum.is_none()
531 {
532 manifest.state_checksum = Some(sha256_hex_inline_states(&manifest.operator_states));
534 }
535 self.save(&manifest).await
536 }
537}
538
539fn stamp_checksum(
542 states: &std::collections::HashMap<
543 String,
544 crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
545 >,
546 chunks: Option<&[bytes::Bytes]>,
547) -> String {
548 let chunks = chunks.unwrap_or_default();
549 let any_inline = states.values().any(|o| !o.external);
550 if any_inline {
551 sha256_hex_mixed(states, chunks.iter().map(AsRef::as_ref))
552 } else {
553 sha256_hex_chunks(chunks)
554 }
555}
556
557pub struct FileSystemCheckpointStore {
563 base_dir: PathBuf,
564 max_retained: usize,
565 vnode_count: u16,
566}
567
568impl FileSystemCheckpointStore {
569 #[must_use]
579 pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
580 Self {
581 base_dir: base_dir.into(),
582 max_retained,
583 vnode_count: crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT,
584 }
585 }
586
587 #[must_use]
589 pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
590 self.vnode_count = vnode_count;
591 self
592 }
593
594 fn checkpoints_dir(&self) -> PathBuf {
596 self.base_dir.join("checkpoints")
597 }
598
599 fn checkpoint_dir(&self, id: u64) -> PathBuf {
601 self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
602 }
603
604 fn manifest_path(&self, id: u64) -> PathBuf {
606 self.checkpoint_dir(id).join("manifest.json")
607 }
608
609 fn state_path(&self, id: u64) -> PathBuf {
611 self.checkpoint_dir(id).join("state.bin")
612 }
613
614 fn latest_path(&self) -> PathBuf {
616 self.checkpoints_dir().join("latest.txt")
617 }
618
619 fn parse_checkpoint_id(name: &str) -> Option<u64> {
621 name.strip_prefix("checkpoint_")
622 .and_then(|s| s.parse().ok())
623 }
624
625 async fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
627 let dir = self.checkpoints_dir();
628 let mut reader = match tokio::fs::read_dir(&dir).await {
629 Ok(r) => r,
630 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
631 Err(e) => return Err(e.into()),
632 };
633
634 let mut ids: Vec<u64> = Vec::new();
635 while let Some(entry) = reader.next_entry().await? {
636 let ft = entry.file_type().await?;
637 if !ft.is_dir() {
638 continue;
639 }
640 if let Some(id) = entry
641 .file_name()
642 .to_str()
643 .and_then(Self::parse_checkpoint_id)
644 {
645 ids.push(id);
646 }
647 }
648
649 ids.sort_unstable();
650 Ok(ids)
651 }
652}
653
654impl FileSystemCheckpointStore {
655 async fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
658 let dir = self.checkpoints_dir();
659 let mut reader = match tokio::fs::read_dir(&dir).await {
660 Ok(r) => r,
661 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
662 Err(e) => return Err(e.into()),
663 };
664
665 let mut orphans = Vec::new();
666 while let Some(entry) = reader.next_entry().await? {
667 let ft = entry.file_type().await?;
668 if !ft.is_dir() {
669 continue;
670 }
671 let path = entry.path();
672 let has_state = tokio::fs::metadata(path.join("state.bin")).await.is_ok();
673 let has_manifest = tokio::fs::metadata(path.join("manifest.json"))
674 .await
675 .is_ok();
676 if has_state && !has_manifest {
677 orphans.push(path);
678 }
679 }
680 Ok(orphans)
681 }
682}
683
684#[async_trait]
685impl CheckpointStore for FileSystemCheckpointStore {
686 fn vnode_count(&self) -> u16 {
687 self.vnode_count
688 }
689
690 async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
691 let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
692 tokio::fs::create_dir_all(&cp_dir).await?;
693
694 let manifest_path = self.manifest_path(manifest.checkpoint_id);
695 let json = serde_json::to_string_pretty(manifest)?;
696
697 let tmp_path = manifest_path.with_extension("json.tmp");
699 let write_res = async {
700 tokio::fs::write(&tmp_path, &json).await?;
701 sync_file(&tmp_path).await?;
702 tokio::fs::rename(&tmp_path, &manifest_path).await?;
703 sync_dir(&cp_dir).await
704 }
705 .await;
706 if let Err(e) = write_res {
707 let _ = tokio::fs::remove_file(&tmp_path).await;
709 return Err(e.into());
710 }
711
712 let latest = self.latest_path();
714 let latest_dir = latest.parent().unwrap_or(Path::new(".")).to_path_buf();
715 tokio::fs::create_dir_all(&latest_dir).await?;
716 let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
717 let tmp_latest = latest.with_extension("txt.tmp");
718 tokio::fs::write(&tmp_latest, &latest_content).await?;
719 sync_file(&tmp_latest).await?;
720 tokio::fs::rename(&tmp_latest, &latest).await?;
721 sync_dir(&latest_dir).await?;
722
723 if self.max_retained > 0 {
725 if let Err(e) = self.prune(self.max_retained).await {
726 tracing::warn!(
727 max_retained = self.max_retained,
728 error = %e,
729 "[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
730 );
731 }
732 }
733
734 Ok(())
735 }
736
737 async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
738 let latest = self.latest_path();
739 let content = match tokio::fs::read_to_string(&latest).await {
740 Ok(c) => c,
741 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
742 Err(e) => return Err(e.into()),
743 };
744 let dir_name = content.trim();
745 if dir_name.is_empty() {
746 return Ok(None);
747 }
748
749 match Self::parse_checkpoint_id(dir_name) {
750 Some(id) => self.load_by_id(id).await,
751 None => Ok(None),
752 }
753 }
754
755 async fn load_by_id(
756 &self,
757 id: u64,
758 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
759 let path = self.manifest_path(id);
760 let json = match tokio::fs::read_to_string(&path).await {
761 Ok(s) => s,
762 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
763 Err(e) => return Err(e.into()),
764 };
765 let manifest: CheckpointManifest = serde_json::from_str(&json)?;
766
767 let errors = manifest.validate(self.vnode_count());
768 if !errors.is_empty() {
769 tracing::warn!(
770 checkpoint_id = id,
771 error_count = errors.len(),
772 first_error = %errors[0],
773 "loaded checkpoint manifest has validation warnings"
774 );
775 }
776
777 Ok(Some(manifest))
778 }
779
780 async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
781 self.sorted_checkpoint_ids().await
782 }
783
784 async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
785 let ids = self.sorted_checkpoint_ids().await?;
786 let mut result = Vec::with_capacity(ids.len());
787
788 for id in ids {
789 if let Ok(Some(manifest)) = self.load_by_id(id).await {
791 result.push((manifest.checkpoint_id, manifest.epoch));
792 }
793 }
794
795 Ok(result)
796 }
797
798 async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
799 let ids = self.sorted_checkpoint_ids().await?;
800 if ids.len() <= keep_count {
801 return Ok(0);
802 }
803
804 let to_remove = ids.len() - keep_count;
805 let mut removed = 0;
806
807 for &id in &ids[..to_remove] {
808 let dir = self.checkpoint_dir(id);
809 if tokio::fs::remove_dir_all(&dir).await.is_ok() {
810 removed += 1;
811 }
812 }
813
814 Ok(removed)
815 }
816
817 async fn save_state_data(
818 &self,
819 id: u64,
820 chunks: &[bytes::Bytes],
821 ) -> Result<(), CheckpointStoreError> {
822 use tokio::io::AsyncWriteExt;
823
824 let cp_dir = self.checkpoint_dir(id);
825 tokio::fs::create_dir_all(&cp_dir).await?;
826
827 let path = self.state_path(id);
828 let tmp = path.with_extension("bin.tmp");
829
830 let mut file = tokio::fs::File::create(&tmp).await?;
834 for chunk in chunks {
835 file.write_all(chunk).await?;
836 }
837 file.sync_all().await?;
838 drop(file);
839
840 tokio::fs::rename(&tmp, &path).await?;
841 sync_dir(&cp_dir).await?;
842
843 Ok(())
844 }
845
846 async fn save_with_state(
847 &self,
848 manifest: &CheckpointManifest,
849 state_data: Option<&[bytes::Bytes]>,
850 ) -> Result<(), CheckpointStoreError> {
851 let mut manifest = manifest.clone();
852 if let Some(chunks) = state_data {
855 manifest.state_checksum = Some(stamp_checksum(&manifest.operator_states, Some(chunks)));
856 self.save_state_data(manifest.checkpoint_id, chunks).await?;
857 } else if !manifest.operator_states.is_empty()
858 && manifest.operator_states.values().all(|o| !o.external)
859 && manifest.state_checksum.is_none()
860 {
861 manifest.state_checksum = Some(sha256_hex_inline_states(&manifest.operator_states));
863 }
864 self.save(&manifest).await
865 }
866
867 async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
868 let orphans = self.find_orphan_dirs().await?;
869 let mut cleaned = 0;
870 for dir in &orphans {
871 if tokio::fs::remove_dir_all(dir).await.is_ok() {
872 tracing::info!(
873 path = %dir.display(),
874 "cleaned up orphaned checkpoint directory"
875 );
876 cleaned += 1;
877 }
878 }
879 Ok(cleaned)
880 }
881
882 async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
883 let path = self.state_path(id);
884 match tokio::fs::read(&path).await {
885 Ok(data) => Ok(Some(data)),
886 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
887 Err(e) => Err(e.into()),
888 }
889 }
890}
891
892#[derive(serde::Serialize, serde::Deserialize)]
898struct LatestPointer {
899 checkpoint_id: u64,
900}
901
902pub struct ObjectStoreCheckpointStore {
924 store: Arc<dyn ObjectStore>,
925 prefix: String,
926 max_retained: usize,
927 vnode_count: u16,
928}
929
930impl ObjectStoreCheckpointStore {
931 #[must_use]
940 pub fn new(store: Arc<dyn ObjectStore>, prefix: String, max_retained: usize) -> Self {
941 Self {
942 store,
943 prefix,
944 max_retained,
945 vnode_count: crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT,
946 }
947 }
948
949 #[must_use]
951 pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
952 self.vnode_count = vnode_count;
953 self
954 }
955
956 fn manifest_path(&self, id: u64) -> object_store::path::Path {
957 object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
958 }
959
960 fn latest_pointer_path(&self) -> object_store::path::Path {
961 object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
962 }
963
964 fn state_path(&self, id: u64) -> object_store::path::Path {
965 object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
966 }
967
968 async fn put_with_retry(
982 &self,
983 path: &object_store::path::Path,
984 payload: PutPayload,
985 opts: &PutOptions,
986 ) -> Result<(), CheckpointStoreError> {
987 const BACKOFFS_MS: &[u64] = &[100, 500, 2000];
988 let mut attempt = 0usize;
989 loop {
990 let result = self
991 .store
992 .put_opts(path, payload.clone(), opts.clone())
993 .await;
994 match result {
995 Ok(_) => return Ok(()),
996 Err(object_store::Error::Generic { .. }) if attempt < BACKOFFS_MS.len() => {
997 let delay = std::time::Duration::from_millis(BACKOFFS_MS[attempt]);
998 tracing::warn!(
999 path = %path,
1000 attempt = attempt + 1,
1001 delay_ms = delay.as_millis(),
1002 "transient put error, retrying"
1003 );
1004 tokio::time::sleep(delay).await;
1005 attempt += 1;
1006 }
1007 Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
1008 }
1009 }
1010 }
1011
1012 async fn get_bytes(
1014 &self,
1015 path: &object_store::path::Path,
1016 ) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
1017 match self.store.get_opts(path, GetOptions::default()).await {
1018 Ok(get_result) => {
1019 let data = get_result.bytes().await?;
1020 Ok(Some(data))
1021 }
1022 Err(object_store::Error::NotFound { .. }) => Ok(None),
1023 Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
1024 }
1025 }
1026
1027 async fn load_manifest_at(
1029 &self,
1030 path: &object_store::path::Path,
1031 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1032 match self.get_bytes(path).await? {
1033 Some(data) => {
1034 let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
1035 Ok(Some(manifest))
1036 }
1037 None => Ok(None),
1038 }
1039 }
1040
1041 async fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
1043 use futures::TryStreamExt;
1044
1045 let mut ids = std::collections::BTreeSet::new();
1046
1047 let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
1048 let entries: Vec<_> = self
1049 .store
1050 .list(Some(&manifests_prefix))
1051 .try_collect()
1052 .await?;
1053 for entry in &entries {
1054 if let Some(id) =
1055 parse_checkpoint_id_from_path(entry.location.as_ref(), "manifest-", ".json")
1056 {
1057 ids.insert(id);
1058 }
1059 }
1060
1061 Ok(ids.into_iter().collect())
1062 }
1063}
1064
1065#[async_trait]
1066impl CheckpointStore for ObjectStoreCheckpointStore {
1067 fn vnode_count(&self) -> u16 {
1068 self.vnode_count
1069 }
1070
1071 async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
1072 let json = serde_json::to_string_pretty(manifest)?;
1073 let path = self.manifest_path(manifest.checkpoint_id);
1074 let json_bytes = bytes::Bytes::from(json);
1075
1076 let create_opts = PutOptions {
1078 mode: PutMode::Create,
1079 ..PutOptions::default()
1080 };
1081 let result = self
1082 .store
1083 .put_opts(
1084 &path,
1085 PutPayload::from_bytes(json_bytes.clone()),
1086 create_opts,
1087 )
1088 .await;
1089
1090 match result {
1091 Ok(_) => {}
1092 Err(object_store::Error::AlreadyExists { .. }) => {
1093 tracing::warn!(
1094 checkpoint_id = manifest.checkpoint_id,
1095 "[LDB-6010] Manifest already exists — skipping write"
1096 );
1097 }
1098 Err(object_store::Error::NotImplemented { .. }) => {
1099 self.store
1101 .put_opts(
1102 &path,
1103 PutPayload::from_bytes(json_bytes),
1104 PutOptions::default(),
1105 )
1106 .await?;
1107 }
1108 Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
1109 }
1110
1111 let latest = self.latest_pointer_path();
1118 if let Some(current) = self.get_bytes(&latest).await? {
1119 if let Ok(existing) = serde_json::from_slice::<LatestPointer>(¤t) {
1120 if existing.checkpoint_id > manifest.checkpoint_id {
1121 tracing::warn!(
1122 current = existing.checkpoint_id,
1123 ours = manifest.checkpoint_id,
1124 "[LDB-6010] latest.json already points at a newer checkpoint — \
1125 skipping pointer update (possible split-brain or delayed writer)"
1126 );
1127 return Ok(());
1128 }
1129 }
1130 }
1131 let pointer = serde_json::to_string(&LatestPointer {
1132 checkpoint_id: manifest.checkpoint_id,
1133 })?;
1134 self.put_with_retry(
1135 &latest,
1136 PutPayload::from_bytes(bytes::Bytes::from(pointer)),
1137 &PutOptions::default(),
1138 )
1139 .await?;
1140
1141 if self.max_retained > 0 {
1143 if let Err(e) = self.prune(self.max_retained).await {
1144 tracing::warn!(
1145 max_retained = self.max_retained,
1146 error = %e,
1147 "[LDB-6009] Object store checkpoint prune failed"
1148 );
1149 }
1150 }
1151
1152 Ok(())
1153 }
1154
1155 async fn update_manifest(
1156 &self,
1157 manifest: &CheckpointManifest,
1158 ) -> Result<(), CheckpointStoreError> {
1159 let json = serde_json::to_string_pretty(manifest)?;
1160 let path = self.manifest_path(manifest.checkpoint_id);
1161 let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
1162
1163 self.store
1165 .put_opts(&path, payload, PutOptions::default())
1166 .await?;
1167
1168 Ok(())
1169 }
1170
1171 async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1172 if let Some(data) = self.get_bytes(&self.latest_pointer_path()).await? {
1173 let pointer: LatestPointer = serde_json::from_slice(&data)?;
1174 return self.load_by_id(pointer.checkpoint_id).await;
1175 }
1176
1177 Ok(None)
1178 }
1179
1180 async fn load_by_id(
1181 &self,
1182 id: u64,
1183 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1184 self.load_manifest_at(&self.manifest_path(id)).await
1185 }
1186
1187 async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
1188 self.list_checkpoint_ids().await
1189 }
1190
1191 async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
1192 let ids = self.list_checkpoint_ids().await?;
1193 let mut result = Vec::with_capacity(ids.len());
1194
1195 for id in ids {
1196 if let Ok(Some(manifest)) = self.load_by_id(id).await {
1197 result.push((manifest.checkpoint_id, manifest.epoch));
1198 }
1199 }
1200
1201 Ok(result)
1202 }
1203
1204 async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
1205 let ids = self.list_checkpoint_ids().await?;
1206 if ids.len() <= keep_count {
1207 return Ok(0);
1208 }
1209
1210 let to_remove = ids.len() - keep_count;
1211 let mut removed = 0;
1212 let mut logged_error = false;
1213
1214 for &id in &ids[..to_remove] {
1215 let manifest = self.manifest_path(id);
1216 let state = self.state_path(id);
1217 let manifest_res = self.store.delete(&manifest).await;
1218 let state_res = self.store.delete(&state).await;
1219
1220 let manifest_ok = matches!(
1223 manifest_res,
1224 Ok(()) | Err(object_store::Error::NotFound { .. })
1225 );
1226 if manifest_ok {
1227 removed += 1;
1228 }
1229
1230 for err in [manifest_res, state_res]
1234 .into_iter()
1235 .filter_map(Result::err)
1236 {
1237 if matches!(err, object_store::Error::NotFound { .. }) {
1238 continue;
1239 }
1240 if !logged_error {
1241 tracing::warn!(
1242 checkpoint_id = id,
1243 error = %err,
1244 "[LDB-6027] checkpoint prune: delete failed — \
1245 retained objects may accumulate"
1246 );
1247 logged_error = true;
1248 }
1249 }
1250 }
1251
1252 Ok(removed)
1253 }
1254
1255 async fn save_state_data(
1256 &self,
1257 id: u64,
1258 chunks: &[bytes::Bytes],
1259 ) -> Result<(), CheckpointStoreError> {
1260 let path = self.state_path(id);
1261 let payload: PutPayload = chunks.iter().cloned().collect();
1265 self.put_with_retry(&path, payload, &PutOptions::default())
1267 .await
1268 }
1269
1270 async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
1271 Ok(self
1272 .get_bytes(&self.state_path(id))
1273 .await?
1274 .map(|d| d.to_vec()))
1275 }
1276
1277 async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
1278 use futures::{StreamExt, TryStreamExt};
1279
1280 let manifest_ids: std::collections::BTreeSet<u64> =
1282 self.list_checkpoint_ids().await?.into_iter().collect();
1283
1284 let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
1286 let entries: Vec<_> = self.store.list(Some(&state_prefix)).try_collect().await?;
1287
1288 let mut orphan_paths = Vec::new();
1289 for entry in &entries {
1290 if let Some(id) =
1291 parse_checkpoint_id_from_path(entry.location.as_ref(), "state-", ".bin")
1292 {
1293 if !manifest_ids.contains(&id) {
1294 orphan_paths.push(entry.location.clone());
1295 }
1296 }
1297 }
1298
1299 let count = orphan_paths.len();
1300 if !orphan_paths.is_empty() {
1301 let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
1302 let mut results = self.store.delete_stream(stream);
1303 while let Some(result) = results.next().await {
1304 if let Err(e) = result {
1305 tracing::warn!(error = %e, "failed to delete orphan state file");
1306 }
1307 }
1308 }
1309
1310 Ok(count)
1311 }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316 use super::*;
1317 use crate::checkpoint::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
1318 #[allow(clippy::disallowed_types)] use std::collections::HashMap;
1320
1321 fn make_store(dir: &Path) -> FileSystemCheckpointStore {
1322 FileSystemCheckpointStore::new(dir, 3)
1323 }
1324
1325 fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
1326 CheckpointManifest::new(id, epoch)
1327 }
1328
1329 #[tokio::test]
1330 async fn test_save_and_load_latest() {
1331 let dir = tempfile::tempdir().unwrap();
1332 let store = make_store(dir.path());
1333
1334 let m = make_manifest(1, 1);
1335 store.save(&m).await.unwrap();
1336
1337 let loaded = store.load_latest().await.unwrap().unwrap();
1338 assert_eq!(loaded.checkpoint_id, 1);
1339 assert_eq!(loaded.epoch, 1);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_load_latest_returns_none_when_empty() {
1344 let dir = tempfile::tempdir().unwrap();
1345 let store = make_store(dir.path());
1346 assert!(store.load_latest().await.unwrap().is_none());
1347 }
1348
1349 #[tokio::test]
1350 async fn test_load_latest_returns_most_recent() {
1351 let dir = tempfile::tempdir().unwrap();
1352 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1353
1354 for i in 1..=5 {
1355 store.save(&make_manifest(i, i)).await.unwrap();
1356 }
1357
1358 let latest = store.load_latest().await.unwrap().unwrap();
1359 assert_eq!(latest.checkpoint_id, 5);
1360 assert_eq!(latest.epoch, 5);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_load_by_id() {
1365 let dir = tempfile::tempdir().unwrap();
1366 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1367
1368 store.save(&make_manifest(1, 10)).await.unwrap();
1369 store.save(&make_manifest(2, 20)).await.unwrap();
1370
1371 let m = store.load_by_id(1).await.unwrap().unwrap();
1372 assert_eq!(m.epoch, 10);
1373
1374 let m = store.load_by_id(2).await.unwrap().unwrap();
1375 assert_eq!(m.epoch, 20);
1376
1377 assert!(store.load_by_id(99).await.unwrap().is_none());
1378 }
1379
1380 #[tokio::test]
1381 async fn test_list() {
1382 let dir = tempfile::tempdir().unwrap();
1383 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1384
1385 store.save(&make_manifest(1, 10)).await.unwrap();
1386 store.save(&make_manifest(3, 30)).await.unwrap();
1387 store.save(&make_manifest(2, 20)).await.unwrap();
1388
1389 let list = store.list().await.unwrap();
1390 assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1391 }
1392
1393 #[tokio::test]
1394 async fn test_prune_keeps_max() {
1395 let dir = tempfile::tempdir().unwrap();
1396 let store = FileSystemCheckpointStore::new(dir.path(), 10); for i in 1..=5 {
1399 store.save(&make_manifest(i, i)).await.unwrap();
1400 }
1401
1402 let removed = store.prune(2).await.unwrap();
1403 assert_eq!(removed, 3);
1404
1405 let list = store.list().await.unwrap();
1406 assert_eq!(list.len(), 2);
1407 assert_eq!(list[0].0, 4);
1408 assert_eq!(list[1].0, 5);
1409 }
1410
1411 #[tokio::test]
1412 async fn test_auto_prune_on_save() {
1413 let dir = tempfile::tempdir().unwrap();
1414 let store = FileSystemCheckpointStore::new(dir.path(), 2);
1415
1416 for i in 1..=5 {
1417 store.save(&make_manifest(i, i)).await.unwrap();
1418 }
1419
1420 let list = store.list().await.unwrap();
1421 assert_eq!(list.len(), 2);
1422 assert_eq!(list[0].0, 4);
1424 assert_eq!(list[1].0, 5);
1425 }
1426
1427 #[tokio::test]
1428 async fn test_save_and_load_state_data() {
1429 let dir = tempfile::tempdir().unwrap();
1430 let store = make_store(dir.path());
1431
1432 store.save(&make_manifest(1, 1)).await.unwrap();
1433
1434 let data = b"large operator state binary blob";
1435 store
1436 .save_state_data(1, &[bytes::Bytes::from_static(data)])
1437 .await
1438 .unwrap();
1439
1440 let loaded = store.load_state_data(1).await.unwrap().unwrap();
1441 assert_eq!(loaded, data);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_load_state_data_returns_none() {
1446 let dir = tempfile::tempdir().unwrap();
1447 let store = make_store(dir.path());
1448 assert!(store.load_state_data(99).await.unwrap().is_none());
1449 }
1450
1451 #[tokio::test]
1452 async fn test_full_manifest_round_trip() {
1453 let dir = tempfile::tempdir().unwrap();
1454 let store = make_store(dir.path());
1455
1456 let mut m = make_manifest(1, 5);
1457 m.source_offsets.insert(
1458 "kafka-src".into(),
1459 ConnectorCheckpoint::with_offsets(
1460 5,
1461 HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
1462 ),
1463 );
1464 m.sink_epochs.insert("pg-sink".into(), 4);
1465 m.table_offsets.insert(
1466 "instruments".into(),
1467 ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
1468 );
1469 m.operator_states
1470 .insert("window".into(), OperatorCheckpoint::inline(b"data"));
1471 m.watermark = Some(999_000);
1472
1473 store.save(&m).await.unwrap();
1474
1475 let loaded = store.load_latest().await.unwrap().unwrap();
1476 assert_eq!(loaded.checkpoint_id, 1);
1477 assert_eq!(loaded.epoch, 5);
1478 assert_eq!(loaded.watermark, Some(999_000));
1479
1480 let src = loaded.source_offsets.get("kafka-src").unwrap();
1481 assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
1482
1483 assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
1484
1485 let tbl = loaded.table_offsets.get("instruments").unwrap();
1486 assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
1487
1488 let op = loaded.operator_states.get("window").unwrap();
1489 assert_eq!(op.decode_inline().unwrap(), b"data");
1490 }
1491
1492 #[tokio::test]
1493 async fn test_empty_latest_txt() {
1494 let dir = tempfile::tempdir().unwrap();
1495 let store = make_store(dir.path());
1496
1497 let cp_dir = dir.path().join("checkpoints");
1498 std::fs::create_dir_all(&cp_dir).unwrap();
1499 std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
1500
1501 assert!(store.load_latest().await.unwrap().is_none());
1502 }
1503
1504 #[tokio::test]
1505 async fn test_latest_points_to_missing_checkpoint() {
1506 let dir = tempfile::tempdir().unwrap();
1507 let store = make_store(dir.path());
1508
1509 let cp_dir = dir.path().join("checkpoints");
1510 std::fs::create_dir_all(&cp_dir).unwrap();
1511 std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
1512
1513 assert!(store.load_latest().await.unwrap().is_none());
1514 }
1515
1516 #[tokio::test]
1517 async fn test_prune_no_op_when_under_limit() {
1518 let dir = tempfile::tempdir().unwrap();
1519 let store = make_store(dir.path());
1520
1521 store.save(&make_manifest(1, 1)).await.unwrap();
1522 let removed = store.prune(5).await.unwrap();
1523 assert_eq!(removed, 0);
1524 }
1525
1526 #[tokio::test]
1527 async fn test_save_with_state_writes_sidecar_before_manifest() {
1528 let dir = tempfile::tempdir().unwrap();
1529 let store = make_store(dir.path());
1530
1531 let m = make_manifest(1, 1);
1532 let state = b"large-operator-state-blob";
1533 store
1534 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1535 .await
1536 .unwrap();
1537
1538 let loaded = store.load_latest().await.unwrap().unwrap();
1540 assert_eq!(loaded.checkpoint_id, 1);
1541
1542 let loaded_state = store.load_state_data(1).await.unwrap().unwrap();
1543 assert_eq!(loaded_state, state);
1544 }
1545
1546 #[tokio::test]
1547 async fn test_save_with_state_none_is_same_as_save() {
1548 let dir = tempfile::tempdir().unwrap();
1549 let store = make_store(dir.path());
1550
1551 let m = make_manifest(1, 1);
1552 store.save_with_state(&m, None).await.unwrap();
1553
1554 let loaded = store.load_latest().await.unwrap().unwrap();
1555 assert_eq!(loaded.checkpoint_id, 1);
1556 assert!(store.load_state_data(1).await.unwrap().is_none());
1557 }
1558
1559 #[tokio::test]
1560 async fn test_orphaned_state_without_manifest_is_ignored() {
1561 let dir = tempfile::tempdir().unwrap();
1562 let store = make_store(dir.path());
1563
1564 store
1567 .save_state_data(1, &[bytes::Bytes::from_static(b"orphaned")])
1568 .await
1569 .unwrap();
1570
1571 assert!(store.load_latest().await.unwrap().is_none());
1573
1574 assert!(store.list().await.unwrap().is_empty());
1576 }
1577
1578 fn make_obj_store() -> ObjectStoreCheckpointStore {
1583 let store = Arc::new(object_store::memory::InMemory::new());
1584 ObjectStoreCheckpointStore::new(store, String::new(), 3)
1585 }
1586
1587 #[tokio::test]
1588 async fn test_obj_save_and_load_latest() {
1589 let store = make_obj_store();
1590 let m = make_manifest(1, 1);
1591 store.save(&m).await.unwrap();
1592
1593 let loaded = store.load_latest().await.unwrap().unwrap();
1594 assert_eq!(loaded.checkpoint_id, 1);
1595 assert_eq!(loaded.epoch, 1);
1596 }
1597
1598 #[tokio::test]
1599 async fn test_obj_load_latest_returns_none_when_empty() {
1600 let store = make_obj_store();
1601 assert!(store.load_latest().await.unwrap().is_none());
1602 }
1603
1604 #[tokio::test]
1605 async fn test_obj_load_by_id() {
1606 let store = ObjectStoreCheckpointStore::new(
1607 Arc::new(object_store::memory::InMemory::new()),
1608 String::new(),
1609 10,
1610 );
1611
1612 store.save(&make_manifest(1, 10)).await.unwrap();
1613 store.save(&make_manifest(2, 20)).await.unwrap();
1614
1615 let m = store.load_by_id(1).await.unwrap().unwrap();
1616 assert_eq!(m.epoch, 10);
1617 let m = store.load_by_id(2).await.unwrap().unwrap();
1618 assert_eq!(m.epoch, 20);
1619 assert!(store.load_by_id(99).await.unwrap().is_none());
1620 }
1621
1622 #[tokio::test]
1623 async fn test_obj_list() {
1624 let store = ObjectStoreCheckpointStore::new(
1625 Arc::new(object_store::memory::InMemory::new()),
1626 String::new(),
1627 10,
1628 );
1629
1630 store.save(&make_manifest(1, 10)).await.unwrap();
1631 store.save(&make_manifest(3, 30)).await.unwrap();
1632 store.save(&make_manifest(2, 20)).await.unwrap();
1633
1634 let list = store.list().await.unwrap();
1635 assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1636 }
1637
1638 #[tokio::test]
1639 async fn test_obj_prune() {
1640 let store = ObjectStoreCheckpointStore::new(
1641 Arc::new(object_store::memory::InMemory::new()),
1642 String::new(),
1643 10,
1644 );
1645
1646 for i in 1..=5 {
1647 store.save(&make_manifest(i, i)).await.unwrap();
1648 }
1649
1650 let removed = store.prune(2).await.unwrap();
1651 assert_eq!(removed, 3);
1652
1653 let list = store.list().await.unwrap();
1654 assert_eq!(list.len(), 2);
1655 assert_eq!(list[0].0, 4);
1656 assert_eq!(list[1].0, 5);
1657 }
1658
1659 #[tokio::test]
1660 async fn test_obj_auto_prune_on_save() {
1661 let store = ObjectStoreCheckpointStore::new(
1662 Arc::new(object_store::memory::InMemory::new()),
1663 String::new(),
1664 2,
1665 );
1666
1667 for i in 1..=5 {
1668 store.save(&make_manifest(i, i)).await.unwrap();
1669 }
1670
1671 let list = store.list().await.unwrap();
1672 assert_eq!(list.len(), 2);
1673 assert_eq!(list[0].0, 4);
1674 assert_eq!(list[1].0, 5);
1675 }
1676
1677 #[tokio::test]
1678 async fn test_obj_save_and_load_state_data() {
1679 let store = make_obj_store();
1680 store.save(&make_manifest(1, 1)).await.unwrap();
1681
1682 let data = b"large operator state binary blob";
1683 store
1684 .save_state_data(1, &[bytes::Bytes::from_static(data)])
1685 .await
1686 .unwrap();
1687
1688 let loaded = store.load_state_data(1).await.unwrap().unwrap();
1689 assert_eq!(loaded, data);
1690 }
1691
1692 #[tokio::test]
1693 async fn test_obj_load_state_data_returns_none() {
1694 let store = make_obj_store();
1695 assert!(store.load_state_data(99).await.unwrap().is_none());
1696 }
1697
1698 #[tokio::test]
1699 async fn test_obj_with_prefix() {
1700 let inner = Arc::new(object_store::memory::InMemory::new());
1701 let store = ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10);
1702
1703 store.save(&make_manifest(1, 42)).await.unwrap();
1704 let loaded = store.load_latest().await.unwrap().unwrap();
1705 assert_eq!(loaded.checkpoint_id, 1);
1706 assert_eq!(loaded.epoch, 42);
1707 }
1708
1709 #[tokio::test]
1714 async fn test_obj_layout_paths() {
1715 let inner = Arc::new(object_store::memory::InMemory::new());
1716 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1717
1718 store.save(&make_manifest(1, 10)).await.unwrap();
1719
1720 let result = inner
1721 .get_opts(
1722 &object_store::path::Path::from("manifests/manifest-000001.json"),
1723 GetOptions::default(),
1724 )
1725 .await;
1726 assert!(result.is_ok(), "manifest path should exist");
1727
1728 let result = inner
1729 .get_opts(
1730 &object_store::path::Path::from("manifests/latest.json"),
1731 GetOptions::default(),
1732 )
1733 .await;
1734 assert!(result.is_ok(), "latest.json should exist");
1735 }
1736
1737 #[tokio::test]
1738 async fn test_obj_conditional_put_idempotent() {
1739 let store = ObjectStoreCheckpointStore::new(
1740 Arc::new(object_store::memory::InMemory::new()),
1741 String::new(),
1742 10,
1743 );
1744
1745 let m = make_manifest(1, 10);
1746 store.save(&m).await.unwrap();
1747
1748 store.save(&m).await.unwrap();
1750
1751 let loaded = store.load_latest().await.unwrap().unwrap();
1752 assert_eq!(loaded.checkpoint_id, 1);
1753 assert_eq!(loaded.epoch, 10);
1754 }
1755
1756 #[tokio::test]
1757 async fn test_obj_update_manifest_overwrites() {
1758 use crate::checkpoint::checkpoint_manifest::SinkCommitStatus;
1759
1760 let store = make_obj_store();
1761
1762 let mut m = make_manifest(1, 10);
1764 m.sink_commit_statuses
1765 .insert("pg-sink".into(), SinkCommitStatus::Pending);
1766 store.save(&m).await.unwrap();
1767
1768 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1770 assert_eq!(
1771 loaded.sink_commit_statuses.get("pg-sink"),
1772 Some(&SinkCommitStatus::Pending)
1773 );
1774
1775 m.sink_commit_statuses
1777 .insert("pg-sink".into(), SinkCommitStatus::Committed);
1778 store.update_manifest(&m).await.unwrap();
1779
1780 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1782 assert_eq!(
1783 loaded.sink_commit_statuses.get("pg-sink"),
1784 Some(&SinkCommitStatus::Committed)
1785 );
1786 }
1787
1788 #[tokio::test]
1789 async fn test_obj_save_still_uses_conditional_put() {
1790 let store = make_obj_store();
1791
1792 let m = make_manifest(1, 10);
1793 store.save(&m).await.unwrap();
1794
1795 store.save(&m).await.unwrap();
1798
1799 let mut m2 = make_manifest(1, 10);
1801 m2.watermark = Some(42);
1802 store.update_manifest(&m2).await.unwrap();
1803
1804 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1805 assert_eq!(loaded.watermark, Some(42));
1806 }
1807
1808 #[tokio::test]
1809 async fn test_fs_update_manifest_overwrites() {
1810 use crate::checkpoint::checkpoint_manifest::SinkCommitStatus;
1811
1812 let dir = tempfile::tempdir().unwrap();
1813 let store = make_store(dir.path());
1814
1815 let mut m = make_manifest(1, 10);
1816 m.sink_commit_statuses
1817 .insert("sink-a".into(), SinkCommitStatus::Pending);
1818 store.save(&m).await.unwrap();
1819
1820 m.sink_commit_statuses
1821 .insert("sink-a".into(), SinkCommitStatus::Committed);
1822 store.update_manifest(&m).await.unwrap();
1823
1824 let loaded = store.load_by_id(1).await.unwrap().unwrap();
1825 assert_eq!(
1826 loaded.sink_commit_statuses.get("sink-a"),
1827 Some(&SinkCommitStatus::Committed)
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn test_obj_state_paths() {
1833 let inner = Arc::new(object_store::memory::InMemory::new());
1834 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1835
1836 store.save(&make_manifest(1, 1)).await.unwrap();
1837 store
1838 .save_state_data(1, &[bytes::Bytes::from_static(b"state-blob")])
1839 .await
1840 .unwrap();
1841
1842 let result = inner
1843 .get_opts(
1844 &object_store::path::Path::from("checkpoints/state-000001.bin"),
1845 GetOptions::default(),
1846 )
1847 .await;
1848 assert!(result.is_ok(), "state path should exist");
1849 }
1850
1851 #[tokio::test]
1852 async fn test_obj_latest_json_format() {
1853 let inner = Arc::new(object_store::memory::InMemory::new());
1854 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1855
1856 store.save(&make_manifest(5, 50)).await.unwrap();
1857
1858 let data = inner
1859 .get_opts(
1860 &object_store::path::Path::from("manifests/latest.json"),
1861 GetOptions::default(),
1862 )
1863 .await
1864 .unwrap()
1865 .bytes()
1866 .await
1867 .unwrap();
1868
1869 let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
1870 assert_eq!(pointer.checkpoint_id, 5);
1871 }
1872
1873 #[tokio::test]
1874 async fn test_obj_latest_monotonic_guard_skips_regression() {
1875 let inner = Arc::new(object_store::memory::InMemory::new());
1876 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1877
1878 store.save(&make_manifest(10, 10)).await.unwrap();
1879 store.save(&make_manifest(5, 5)).await.unwrap();
1883
1884 let loaded = store.load_latest().await.unwrap().unwrap();
1885 assert_eq!(
1886 loaded.checkpoint_id, 10,
1887 "latest pointer should not regress to an older id"
1888 );
1889 }
1890
1891 #[tokio::test]
1892 async fn test_validate_checkpoint_valid() {
1893 let dir = tempfile::tempdir().unwrap();
1894 let store = make_store(dir.path());
1895
1896 let m = make_manifest(1, 1);
1897 store.save(&m).await.unwrap();
1898
1899 let result = store.validate_checkpoint(1).await.unwrap();
1900 assert!(result.valid, "valid checkpoint: {:?}", result.issues);
1901 assert!(result.issues.is_empty());
1902 }
1903
1904 #[tokio::test]
1905 async fn test_validate_checkpoint_epoch_zero_invalid() {
1906 let dir = tempfile::tempdir().unwrap();
1907 let store = make_store(dir.path());
1908
1909 let m = make_manifest(1, 0);
1911 store.save(&m).await.unwrap();
1912
1913 let result = store.validate_checkpoint(1).await.unwrap();
1914 assert!(!result.valid, "epoch=0 should be invalid");
1915 assert!(
1916 result.issues.iter().any(|i| i.message().contains("epoch")),
1917 "should mention epoch: {:?}",
1918 result.issues
1919 );
1920 }
1921
1922 #[tokio::test]
1923 async fn test_validate_checkpoint_missing_manifest() {
1924 let dir = tempfile::tempdir().unwrap();
1925 let store = make_store(dir.path());
1926
1927 let result = store.validate_checkpoint(99).await.unwrap();
1928 assert!(!result.valid);
1929 assert!(result.issues[0].message().contains("not found"));
1930 }
1931
1932 #[tokio::test]
1933 async fn test_validate_checkpoint_corrupt_manifest() {
1934 let dir = tempfile::tempdir().unwrap();
1935 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1936
1937 let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
1939 std::fs::create_dir_all(&cp_dir).unwrap();
1940 std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
1941
1942 let result = store.validate_checkpoint(1).await.unwrap();
1944 assert!(!result.valid);
1945 assert!(
1946 result.issues[0].message().contains("corrupt manifest"),
1947 "expected corrupt manifest issue: {:?}",
1948 result.issues
1949 );
1950 }
1951
1952 #[tokio::test]
1953 async fn test_validate_checkpoint_state_checksum_ok() {
1954 let dir = tempfile::tempdir().unwrap();
1955 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1956
1957 let state = b"important operator state";
1958 let m = make_manifest(1, 1);
1959 store
1960 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1961 .await
1962 .unwrap();
1963
1964 let result = store.validate_checkpoint(1).await.unwrap();
1965 assert!(result.valid, "checksum should match: {:?}", result.issues);
1966 }
1967
1968 #[tokio::test]
1969 async fn test_validate_checkpoint_state_checksum_mismatch() {
1970 let dir = tempfile::tempdir().unwrap();
1971 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1972
1973 let state = b"original state";
1975 let m = make_manifest(1, 1);
1976 store
1977 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1978 .await
1979 .unwrap();
1980
1981 let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1983 std::fs::write(&state_path, b"corrupted data!!").unwrap();
1984
1985 let result = store.validate_checkpoint(1).await.unwrap();
1986 assert!(!result.valid, "corrupted state should be invalid");
1987 assert!(
1988 result
1989 .issues
1990 .iter()
1991 .any(|i| i.message().contains("checksum mismatch")),
1992 "should report checksum mismatch: {:?}",
1993 result.issues
1994 );
1995 }
1996
1997 #[tokio::test]
1998 async fn test_validate_checkpoint_state_missing_when_expected() {
1999 let dir = tempfile::tempdir().unwrap();
2000 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2001
2002 let m = make_manifest(1, 1);
2004 store
2005 .save_with_state(&m, Some(&[bytes::Bytes::from_static(b"state")]))
2006 .await
2007 .unwrap();
2008
2009 let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
2011 std::fs::remove_file(&state_path).unwrap();
2012
2013 let result = store.validate_checkpoint(1).await.unwrap();
2014 assert!(!result.valid);
2015 assert!(
2016 result
2017 .issues
2018 .iter()
2019 .any(|i| i.message().contains("not found")),
2020 "should report missing state: {:?}",
2021 result.issues
2022 );
2023 }
2024
2025 #[tokio::test]
2026 async fn test_recover_latest_validated_skips_corrupt() {
2027 let dir = tempfile::tempdir().unwrap();
2028 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2029
2030 store.save(&make_manifest(1, 10)).await.unwrap();
2032 store.save(&make_manifest(2, 20)).await.unwrap();
2033
2034 let cp2_manifest = dir
2036 .path()
2037 .join("checkpoints/checkpoint_000002/manifest.json");
2038 std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
2039
2040 let report = store.recover_latest_validated().await.unwrap();
2042 assert_eq!(report.chosen_id, Some(1));
2043 assert_eq!(report.skipped.len(), 1);
2044 assert_eq!(report.skipped[0].0, 2);
2045 assert_eq!(report.examined, 2);
2046 }
2047
2048 #[tokio::test]
2049 async fn test_recover_latest_validated_fresh_start() {
2050 let dir = tempfile::tempdir().unwrap();
2051 let store = make_store(dir.path());
2052
2053 let report = store.recover_latest_validated().await.unwrap();
2054 assert!(report.chosen_id.is_none());
2055 assert_eq!(report.examined, 0);
2056 }
2057
2058 #[tokio::test]
2059 async fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
2060 let dir = tempfile::tempdir().unwrap();
2061 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2062
2063 store.save(&make_manifest(1, 1)).await.unwrap();
2065 let cp_manifest = dir
2066 .path()
2067 .join("checkpoints/checkpoint_000001/manifest.json");
2068 std::fs::write(cp_manifest, "corrupt").unwrap();
2069
2070 let report = store.recover_latest_validated().await.unwrap();
2074 assert!(report.chosen_id.is_none());
2075 }
2076
2077 #[tokio::test]
2078 async fn test_cleanup_orphans_removes_stateless_dirs() {
2079 let dir = tempfile::tempdir().unwrap();
2080 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2081
2082 let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
2084 std::fs::create_dir_all(&orphan_dir).unwrap();
2085 std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
2086
2087 store.save(&make_manifest(1, 1)).await.unwrap();
2089
2090 let cleaned = store.cleanup_orphans().await.unwrap();
2091 assert_eq!(cleaned, 1);
2092
2093 assert!(!orphan_dir.exists());
2095 assert!(store.load_by_id(1).await.unwrap().is_some());
2097 }
2098
2099 #[tokio::test]
2100 async fn test_cleanup_orphans_noop_when_clean() {
2101 let dir = tempfile::tempdir().unwrap();
2102 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2103
2104 store.save(&make_manifest(1, 1)).await.unwrap();
2105 let cleaned = store.cleanup_orphans().await.unwrap();
2106 assert_eq!(cleaned, 0);
2107 }
2108
2109 #[tokio::test]
2110 async fn test_save_with_state_writes_checksum() {
2111 let dir = tempfile::tempdir().unwrap();
2112 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2113
2114 let state = b"state-data-for-checksum";
2115 let m = make_manifest(1, 1);
2116 store
2117 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2118 .await
2119 .unwrap();
2120
2121 let loaded = store.load_latest().await.unwrap().unwrap();
2122 assert!(
2123 loaded.state_checksum.is_some(),
2124 "state_checksum should be set"
2125 );
2126 let expected = sha256_hex(state);
2127 assert_eq!(loaded.state_checksum.unwrap(), expected);
2128 }
2129
2130 #[tokio::test]
2131 async fn test_state_checksum_backward_compat() {
2132 let json = r#"{
2134 "version": 1,
2135 "checkpoint_id": 1,
2136 "epoch": 1,
2137 "timestamp_ms": 1000
2138 }"#;
2139 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
2140 assert!(m.state_checksum.is_none());
2141 }
2142
2143 #[tokio::test]
2146 async fn test_obj_validate_checkpoint_valid() {
2147 let store = make_obj_store();
2148 store.save(&make_manifest(1, 1)).await.unwrap();
2149
2150 let result = store.validate_checkpoint(1).await.unwrap();
2151 assert!(result.valid, "valid checkpoint: {:?}", result.issues);
2152 }
2153
2154 #[tokio::test]
2155 async fn test_obj_validate_checkpoint_missing() {
2156 let store = make_obj_store();
2157 let result = store.validate_checkpoint(99).await.unwrap();
2158 assert!(!result.valid);
2159 }
2160
2161 #[tokio::test]
2162 async fn test_obj_validate_state_checksum() {
2163 let store = ObjectStoreCheckpointStore::new(
2164 Arc::new(object_store::memory::InMemory::new()),
2165 String::new(),
2166 10,
2167 );
2168
2169 let state = b"obj-store-state-data";
2170 let m = make_manifest(1, 1);
2171 store
2172 .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2173 .await
2174 .unwrap();
2175
2176 let result = store.validate_checkpoint(1).await.unwrap();
2177 assert!(result.valid, "checksum should match: {:?}", result.issues);
2178 }
2179
2180 #[tokio::test]
2181 async fn test_obj_recover_latest_validated() {
2182 let store = ObjectStoreCheckpointStore::new(
2183 Arc::new(object_store::memory::InMemory::new()),
2184 String::new(),
2185 10,
2186 );
2187
2188 store.save(&make_manifest(1, 10)).await.unwrap();
2189 store.save(&make_manifest(2, 20)).await.unwrap();
2190
2191 let report = store.recover_latest_validated().await.unwrap();
2192 assert_eq!(report.chosen_id, Some(2));
2193 assert!(report.skipped.is_empty());
2194 }
2195
2196 #[tokio::test]
2197 async fn test_obj_cleanup_orphans() {
2198 let inner = Arc::new(object_store::memory::InMemory::new());
2199 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
2200
2201 let state = b"state-with-manifest";
2203 store
2204 .save_with_state(
2205 &make_manifest(1, 1),
2206 Some(&[bytes::Bytes::from_static(state)]),
2207 )
2208 .await
2209 .unwrap();
2210
2211 let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
2213 inner
2214 .put_opts(
2215 &orphan_path,
2216 PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
2217 PutOptions::default(),
2218 )
2219 .await
2220 .unwrap();
2221
2222 let cleaned = store.cleanup_orphans().await.unwrap();
2223 assert_eq!(cleaned, 1);
2224
2225 let real_state = store.load_state_data(1).await.unwrap();
2227 assert!(real_state.is_some());
2228 }
2229}