Skip to main content

laminar_storage/
checkpoint_store.rs

1//! Checkpoint persistence via the [`CheckpointStore`] trait.
2//!
3//! Provides a filesystem-backed implementation ([`FileSystemCheckpointStore`])
4//! that writes manifests as atomic JSON files with a `latest.txt` pointer
5//! for crash-safe recovery.
6//!
7//! ## Disk Layout
8//!
9//! ```text
10//! {base_dir}/checkpoints/
11//!   checkpoint_000001/
12//!     manifest.json     # CheckpointManifest as pretty-printed JSON
13//!     state.bin         # Optional: large operator state sidecar
14//!   checkpoint_000002/
15//!     manifest.json
16//!   latest.txt          # "checkpoint_000002" — pointer to latest good checkpoint
17//! ```
18
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use object_store::{GetOptions, ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
24use sha2::{Digest, Sha256};
25use tracing::warn;
26
27use crate::checkpoint_manifest::CheckpointManifest;
28
29/// Fsync a file to ensure its contents are durable on disk.
30async fn sync_file(path: &Path) -> Result<(), std::io::Error> {
31    // Must open with write access — Windows requires it for FlushFileBuffers.
32    let f = tokio::fs::OpenOptions::new().write(true).open(path).await?;
33    f.sync_all().await
34}
35
36/// Fsync a directory to make rename operations durable.
37///
38/// On Unix, this flushes directory metadata (new/renamed entries).
39/// On Windows, directory sync is not supported; the OS handles durability.
40#[allow(clippy::unnecessary_wraps, clippy::unused_async)] // no-op on Windows
41async fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
42    #[cfg(unix)]
43    {
44        let f = tokio::fs::File::open(path).await?;
45        f.sync_all().await?;
46    }
47    #[cfg(not(unix))]
48    {
49        let _ = path;
50    }
51    Ok(())
52}
53
54/// Errors from checkpoint store operations.
55#[derive(Debug, thiserror::Error)]
56pub enum CheckpointStoreError {
57    /// I/O error during checkpoint persistence.
58    #[error("checkpoint I/O error: {0}")]
59    Io(#[from] std::io::Error),
60
61    /// JSON serialization/deserialization error.
62    #[error("checkpoint serialization error: {0}")]
63    Serde(#[from] serde_json::Error),
64
65    /// Checkpoint not found.
66    #[error("checkpoint {0} not found")]
67    NotFound(u64),
68
69    /// Object store error.
70    #[error("object store error: {0}")]
71    ObjectStore(#[from] object_store::Error),
72}
73
74// ---------------------------------------------------------------------------
75// Checkpoint validation types
76// ---------------------------------------------------------------------------
77
78/// Classification of a single validation finding.
79///
80/// `ManifestWarning` is non-fatal — the checkpoint is still usable.
81/// `IntegrityFailure` is fatal — recovery must skip this checkpoint.
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub enum ValidationIssue {
84    /// Non-fatal manifest-level warning (e.g. `vnode_count` mismatch,
85    /// orphaned source offset).
86    ManifestWarning(String),
87    /// Fatal: manifest is missing/corrupt, or the sidecar integrity
88    /// check (checksum, presence) failed.
89    IntegrityFailure(String),
90}
91
92impl ValidationIssue {
93    /// True if this issue renders the checkpoint unusable for recovery.
94    #[must_use]
95    pub fn is_fatal(&self) -> bool {
96        matches!(self, Self::IntegrityFailure(_))
97    }
98
99    /// Underlying human-readable message.
100    #[must_use]
101    pub fn message(&self) -> &str {
102        match self {
103            Self::ManifestWarning(s) | Self::IntegrityFailure(s) => s,
104        }
105    }
106}
107
108impl std::fmt::Display for ValidationIssue {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        f.write_str(self.message())
111    }
112}
113
114/// Result of validating a single checkpoint.
115#[derive(Debug, Clone)]
116pub struct ValidationResult {
117    /// Checkpoint ID that was validated.
118    pub checkpoint_id: u64,
119    /// Whether the checkpoint is valid for recovery. A checkpoint is
120    /// valid iff it has no [`ValidationIssue::IntegrityFailure`] issues.
121    pub valid: bool,
122    /// Issues found during validation.
123    pub issues: Vec<ValidationIssue>,
124}
125
126/// Report from a crash-safe recovery walk.
127///
128/// Captures which checkpoints were tried, which were skipped (and why),
129/// and which was ultimately chosen for recovery.
130#[derive(Debug, Clone)]
131pub struct RecoveryReport {
132    /// The checkpoint that was selected for recovery (`None` if fresh start).
133    pub chosen_id: Option<u64>,
134    /// Checkpoints that were tried and skipped (id, reason).
135    pub skipped: Vec<(u64, String)>,
136    /// Total number of checkpoints examined.
137    pub examined: usize,
138    /// Elapsed time for the recovery walk.
139    pub elapsed: std::time::Duration,
140}
141
142/// Parse a checkpoint id out of an object-store path segment shaped like
143/// `"{prefix}NNNNNN{suffix}"` (e.g. `"manifest-000042.json"`). Scans all
144/// '/'-separated segments so the helper works on prefixed stores. A
145/// segment with the right affixes but a non-numeric middle is logged at
146/// warn — operators need to notice manually-renamed files rather than
147/// see silent gaps in `prune`/`list_ids`.
148fn parse_checkpoint_id_from_path(path: &str, prefix: &str, suffix: &str) -> Option<u64> {
149    for segment in path.split('/') {
150        let Some(rest) = segment.strip_prefix(prefix) else {
151            continue;
152        };
153        let Some(id_str) = rest.strip_suffix(suffix) else {
154            continue;
155        };
156        if let Ok(id) = id_str.parse::<u64>() {
157            return Some(id);
158        }
159        warn!(
160            path,
161            prefix, suffix, "malformed checkpoint id in object path — skipped"
162        );
163        return None;
164    }
165    None
166}
167
168/// Compute SHA-256 hex digest of data.
169fn sha256_hex(data: &[u8]) -> String {
170    let mut hasher = Sha256::new();
171    hasher.update(data);
172    format!("{:x}", hasher.finalize())
173}
174
175/// Compute SHA-256 hex digest across a chain of `Bytes` chunks.
176///
177/// Equivalent to hashing the concatenation of the chunks, but without
178/// materializing that concatenation in memory. Used by
179/// [`CheckpointStore::save_with_state`] to checksum the sidecar before
180/// the multi-chunk write.
181fn sha256_hex_chunks(chunks: &[bytes::Bytes]) -> String {
182    let mut hasher = Sha256::new();
183    for chunk in chunks {
184        hasher.update(chunk);
185    }
186    format!("{:x}", hasher.finalize())
187}
188
189/// Trait for checkpoint persistence backends.
190///
191/// Implementations must guarantee atomic manifest writes (readers never see
192/// a partial manifest). The `latest.txt` pointer is updated only after the
193/// manifest is fully written and synced.
194#[async_trait]
195pub trait CheckpointStore: Send + Sync {
196    /// Runtime vnode count that manifests written by this store are
197    /// expected to use. Consulted when validating loaded manifests —
198    /// a mismatch is reported as a manifest warning. Defaults to
199    /// [`crate::checkpoint_manifest::DEFAULT_VNODE_COUNT`] when the
200    /// implementation has no configured value.
201    fn vnode_count(&self) -> u16 {
202        crate::checkpoint_manifest::DEFAULT_VNODE_COUNT
203    }
204
205    /// Atomically persists a checkpoint manifest. Implementations must
206    /// guarantee readers never observe a partial manifest.
207    ///
208    /// # Errors
209    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
210    async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
211
212    /// Loads the most recent checkpoint manifest, or `Ok(None)` on a
213    /// fresh store.
214    ///
215    /// # Errors
216    /// Returns [`CheckpointStoreError`] on I/O or deserialization failure.
217    async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
218
219    /// Loads a specific manifest, or `Ok(None)` if absent.
220    ///
221    /// # Errors
222    /// Returns [`CheckpointStoreError`] on I/O or deserialization failure.
223    async fn load_by_id(&self, id: u64)
224        -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
225
226    /// Lists all available checkpoints as `(id, epoch)` pairs, sorted
227    /// ascending by ID. May read every manifest; callers that only
228    /// need IDs should use [`Self::list_ids`].
229    ///
230    /// # Errors
231    /// Returns [`CheckpointStoreError`] on I/O failure.
232    async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
233
234    /// Lists all checkpoint IDs, **sorted ascending**. Unlike
235    /// [`Self::list`] this enumerates corrupt manifests too (used by
236    /// crash recovery). Callers rely on the ascending invariant.
237    ///
238    /// # Errors
239    /// Returns [`CheckpointStoreError`] on I/O failure.
240    async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
241        // Default: O(N) manifest reads via list(). Production backends
242        // should override; list() already sorts ascending.
243        Ok(self.list().await?.iter().map(|(id, _)| *id).collect())
244    }
245
246    /// Prunes old checkpoints, keeping at most `keep_count` recent
247    /// ones. Returns the number of checkpoints removed.
248    ///
249    /// # Errors
250    /// Returns [`CheckpointStoreError`] on I/O failure.
251    async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
252
253    /// Overwrites an existing manifest, bypassing the conditional-PUT
254    /// fence used by [`Self::save`]. Used after a successful sink
255    /// commit to record per-sink status transitions.
256    ///
257    /// # Errors
258    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
259    async fn update_manifest(
260        &self,
261        manifest: &CheckpointManifest,
262    ) -> Result<(), CheckpointStoreError> {
263        self.save(manifest).await
264    }
265
266    /// Writes operator state sidecar bytes for a checkpoint.
267    ///
268    /// Accepts a chain of `Bytes` chunks (one per operator) rather than
269    /// a single concatenated slice. Backends that support native
270    /// multi-chunk writes (object-store `PutPayload`) avoid copying the
271    /// chunks into a contiguous buffer; backends without such support
272    /// write sequentially.
273    ///
274    /// # Errors
275    /// Returns [`CheckpointStoreError`] on I/O failure.
276    async fn save_state_data(
277        &self,
278        id: u64,
279        chunks: &[bytes::Bytes],
280    ) -> Result<(), CheckpointStoreError>;
281
282    /// Loads operator state sidecar bytes for a checkpoint, or `Ok(None)`
283    /// if no sidecar was written.
284    ///
285    /// # Errors
286    /// Returns [`CheckpointStoreError`] on I/O failure.
287    async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
288
289    /// Validate a specific checkpoint's integrity.
290    ///
291    /// Checks that the manifest is parseable and, if a `state_checksum` is
292    /// present, verifies the sidecar data matches.
293    ///
294    /// # Errors
295    ///
296    /// Returns [`CheckpointStoreError`] on I/O failure.
297    async fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
298        let mut issues = Vec::new();
299
300        // Load manifest — corrupt JSON is a validation failure, not an I/O error.
301        let manifest = match self.load_by_id(id).await {
302            Ok(Some(m)) => m,
303            Ok(None) => {
304                return Ok(ValidationResult {
305                    checkpoint_id: id,
306                    valid: false,
307                    issues: vec![ValidationIssue::IntegrityFailure(format!(
308                        "manifest not found for checkpoint {id}"
309                    ))],
310                });
311            }
312            Err(CheckpointStoreError::Serde(e)) => {
313                return Ok(ValidationResult {
314                    checkpoint_id: id,
315                    valid: false,
316                    issues: vec![ValidationIssue::IntegrityFailure(format!(
317                        "corrupt manifest: {e}"
318                    ))],
319                });
320            }
321            Err(e) => return Err(e),
322        };
323
324        for err in manifest.validate(self.vnode_count()) {
325            issues.push(ValidationIssue::ManifestWarning(format!(
326                "manifest validation: {err}"
327            )));
328        }
329
330        // Verify state sidecar checksum.
331        if let Some(expected) = &manifest.state_checksum {
332            match self.load_state_data(id).await? {
333                Some(data) => {
334                    let actual = sha256_hex(&data);
335                    if actual != *expected {
336                        issues.push(ValidationIssue::IntegrityFailure(format!(
337                            "state.bin checksum mismatch: expected {expected}, got {actual}"
338                        )));
339                    }
340                }
341                None => {
342                    issues.push(ValidationIssue::IntegrityFailure(
343                        "state.bin referenced by checksum but not found".into(),
344                    ));
345                }
346            }
347        }
348
349        // epoch=0 or checkpoint_id=0 indicates a corrupted or nonsensical
350        // manifest — reject as invalid regardless of other issues.
351        if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
352            issues.push(ValidationIssue::IntegrityFailure(
353                "epoch or checkpoint_id is 0 — likely corrupted".into(),
354            ));
355        }
356
357        let valid = issues.iter().all(|i| !i.is_fatal());
358        Ok(ValidationResult {
359            checkpoint_id: id,
360            valid,
361            issues,
362        })
363    }
364
365    /// Walk backward from latest to find the first valid checkpoint.
366    ///
367    /// Returns a [`RecoveryReport`] describing the walk. If no valid
368    /// checkpoint is found, `chosen_id` is `None` (fresh start).
369    ///
370    /// # Errors
371    ///
372    /// Returns [`CheckpointStoreError`] on I/O failure.
373    async fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
374        let start = std::time::Instant::now();
375        let mut skipped = Vec::new();
376
377        // list_ids returns ascending per the trait contract; we iterate
378        // newest-first so the first valid checkpoint wins.
379        let mut ids = self.list_ids().await?;
380        ids.reverse();
381
382        let examined = ids.len();
383
384        for id in &ids {
385            let result = self.validate_checkpoint(*id).await?;
386            if result.valid {
387                return Ok(RecoveryReport {
388                    chosen_id: Some(*id),
389                    skipped,
390                    examined,
391                    elapsed: start.elapsed(),
392                });
393            }
394            let reason = result
395                .issues
396                .iter()
397                .map(ToString::to_string)
398                .collect::<Vec<_>>()
399                .join("; ");
400            warn!(
401                checkpoint_id = id,
402                reason = %reason,
403                "skipping invalid checkpoint"
404            );
405            skipped.push((*id, reason));
406        }
407
408        Ok(RecoveryReport {
409            chosen_id: None,
410            skipped,
411            examined,
412            elapsed: start.elapsed(),
413        })
414    }
415
416    /// Delete orphaned state files that have no matching manifest.
417    ///
418    /// Returns the number of orphans cleaned up.
419    ///
420    /// # Errors
421    ///
422    /// Returns [`CheckpointStoreError`] on I/O failure.
423    async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
424        // Default: no-op. Overridden by implementations that can detect orphans.
425        Ok(0)
426    }
427
428    /// Atomically saves a checkpoint manifest with optional sidecar state data.
429    ///
430    /// When `state_data` is provided, the sidecar (`state.bin`) is written and
431    /// fsynced **before** the manifest. This ensures that if the sidecar write
432    /// fails, the manifest is never persisted and `latest.txt` still points to
433    /// the previous valid checkpoint.
434    ///
435    /// Orphaned `state.bin` files (written but no manifest) are harmless and
436    /// cleaned up by [`prune()`](Self::prune).
437    ///
438    /// # Errors
439    ///
440    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
441    async fn save_with_state(
442        &self,
443        manifest: &CheckpointManifest,
444        state_data: Option<&[bytes::Bytes]>,
445    ) -> Result<(), CheckpointStoreError> {
446        let mut manifest = manifest.clone();
447        if let Some(chunks) = state_data {
448            // Compute checksum across the chunks before writing. This is
449            // safe because: (1) save_state_data writes to a temp then
450            // renames atomically, so the on-disk bytes match the
451            // in-memory chain exactly; (2) if the sidecar write fails,
452            // save() is never called, so the manifest with the checksum
453            // is never persisted.
454            manifest.state_checksum = Some(sha256_hex_chunks(chunks));
455            self.save_state_data(manifest.checkpoint_id, chunks).await?;
456        }
457        self.save(&manifest).await
458    }
459}
460
461/// Filesystem-backed checkpoint store.
462///
463/// Writes checkpoint manifests as JSON files with atomic rename semantics.
464/// A `latest.txt` pointer (not a symlink) tracks the most recent checkpoint
465/// for Windows compatibility.
466pub struct FileSystemCheckpointStore {
467    base_dir: PathBuf,
468    max_retained: usize,
469    vnode_count: u16,
470}
471
472impl FileSystemCheckpointStore {
473    /// Creates a new filesystem checkpoint store.
474    ///
475    /// The `base_dir` is the parent directory; checkpoints are stored under
476    /// `{base_dir}/checkpoints/`. The directory is created lazily on first save.
477    ///
478    /// The store's `vnode_count` defaults to
479    /// [`crate::checkpoint_manifest::DEFAULT_VNODE_COUNT`]. Hosts that run
480    /// with a non-default value should chain [`Self::with_vnode_count`] so
481    /// manifest validation checks the right invariant.
482    #[must_use]
483    pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
484        Self {
485            base_dir: base_dir.into(),
486            max_retained,
487            vnode_count: crate::checkpoint_manifest::DEFAULT_VNODE_COUNT,
488        }
489    }
490
491    /// Override the `vnode_count` used during manifest validation.
492    #[must_use]
493    pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
494        self.vnode_count = vnode_count;
495        self
496    }
497
498    /// Returns the checkpoints directory path.
499    fn checkpoints_dir(&self) -> PathBuf {
500        self.base_dir.join("checkpoints")
501    }
502
503    /// Returns the directory path for a specific checkpoint.
504    fn checkpoint_dir(&self, id: u64) -> PathBuf {
505        self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
506    }
507
508    /// Returns the manifest file path for a specific checkpoint.
509    fn manifest_path(&self, id: u64) -> PathBuf {
510        self.checkpoint_dir(id).join("manifest.json")
511    }
512
513    /// Returns the state sidecar file path for a specific checkpoint.
514    fn state_path(&self, id: u64) -> PathBuf {
515        self.checkpoint_dir(id).join("state.bin")
516    }
517
518    /// Returns the latest.txt pointer path.
519    fn latest_path(&self) -> PathBuf {
520        self.checkpoints_dir().join("latest.txt")
521    }
522
523    /// Parses a checkpoint ID from a directory name like `checkpoint_000042`.
524    fn parse_checkpoint_id(name: &str) -> Option<u64> {
525        name.strip_prefix("checkpoint_")
526            .and_then(|s| s.parse().ok())
527    }
528
529    /// Collects and sorts all checkpoint directory entries.
530    async fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
531        let dir = self.checkpoints_dir();
532        let mut reader = match tokio::fs::read_dir(&dir).await {
533            Ok(r) => r,
534            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
535            Err(e) => return Err(e.into()),
536        };
537
538        let mut ids: Vec<u64> = Vec::new();
539        while let Some(entry) = reader.next_entry().await? {
540            let ft = entry.file_type().await?;
541            if !ft.is_dir() {
542                continue;
543            }
544            if let Some(id) = entry
545                .file_name()
546                .to_str()
547                .and_then(Self::parse_checkpoint_id)
548            {
549                ids.push(id);
550            }
551        }
552
553        ids.sort_unstable();
554        Ok(ids)
555    }
556}
557
558impl FileSystemCheckpointStore {
559    /// Find checkpoint directories that have state.bin but no manifest.json
560    /// (orphaned from a crash after sidecar write but before manifest commit).
561    async fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
562        let dir = self.checkpoints_dir();
563        let mut reader = match tokio::fs::read_dir(&dir).await {
564            Ok(r) => r,
565            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
566            Err(e) => return Err(e.into()),
567        };
568
569        let mut orphans = Vec::new();
570        while let Some(entry) = reader.next_entry().await? {
571            let ft = entry.file_type().await?;
572            if !ft.is_dir() {
573                continue;
574            }
575            let path = entry.path();
576            let has_state = tokio::fs::metadata(path.join("state.bin")).await.is_ok();
577            let has_manifest = tokio::fs::metadata(path.join("manifest.json"))
578                .await
579                .is_ok();
580            if has_state && !has_manifest {
581                orphans.push(path);
582            }
583        }
584        Ok(orphans)
585    }
586}
587
588#[async_trait]
589impl CheckpointStore for FileSystemCheckpointStore {
590    fn vnode_count(&self) -> u16 {
591        self.vnode_count
592    }
593
594    async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
595        let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
596        tokio::fs::create_dir_all(&cp_dir).await?;
597
598        let manifest_path = self.manifest_path(manifest.checkpoint_id);
599        let json = serde_json::to_string_pretty(manifest)?;
600
601        // Write to a temp file, fsync, then rename for atomic durability.
602        let tmp_path = manifest_path.with_extension("json.tmp");
603        let write_res = async {
604            tokio::fs::write(&tmp_path, &json).await?;
605            sync_file(&tmp_path).await?;
606            tokio::fs::rename(&tmp_path, &manifest_path).await?;
607            sync_dir(&cp_dir).await
608        }
609        .await;
610        if let Err(e) = write_res {
611            // Clean up temp file to avoid orphans on disk-full.
612            let _ = tokio::fs::remove_file(&tmp_path).await;
613            return Err(e.into());
614        }
615
616        // Update latest.txt pointer — only after manifest is durable.
617        let latest = self.latest_path();
618        let latest_dir = latest.parent().unwrap_or(Path::new(".")).to_path_buf();
619        tokio::fs::create_dir_all(&latest_dir).await?;
620        let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
621        let tmp_latest = latest.with_extension("txt.tmp");
622        tokio::fs::write(&tmp_latest, &latest_content).await?;
623        sync_file(&tmp_latest).await?;
624        tokio::fs::rename(&tmp_latest, &latest).await?;
625        sync_dir(&latest_dir).await?;
626
627        // Auto-prune if configured.
628        if self.max_retained > 0 {
629            if let Err(e) = self.prune(self.max_retained).await {
630                tracing::warn!(
631                    max_retained = self.max_retained,
632                    error = %e,
633                    "[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
634                );
635            }
636        }
637
638        Ok(())
639    }
640
641    async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
642        let latest = self.latest_path();
643        let content = match tokio::fs::read_to_string(&latest).await {
644            Ok(c) => c,
645            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
646            Err(e) => return Err(e.into()),
647        };
648        let dir_name = content.trim();
649        if dir_name.is_empty() {
650            return Ok(None);
651        }
652
653        match Self::parse_checkpoint_id(dir_name) {
654            Some(id) => self.load_by_id(id).await,
655            None => Ok(None),
656        }
657    }
658
659    async fn load_by_id(
660        &self,
661        id: u64,
662    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
663        let path = self.manifest_path(id);
664        let json = match tokio::fs::read_to_string(&path).await {
665            Ok(s) => s,
666            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
667            Err(e) => return Err(e.into()),
668        };
669        let manifest: CheckpointManifest = serde_json::from_str(&json)?;
670
671        let errors = manifest.validate(self.vnode_count());
672        if !errors.is_empty() {
673            tracing::warn!(
674                checkpoint_id = id,
675                error_count = errors.len(),
676                first_error = %errors[0],
677                "loaded checkpoint manifest has validation warnings"
678            );
679        }
680
681        Ok(Some(manifest))
682    }
683
684    async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
685        self.sorted_checkpoint_ids().await
686    }
687
688    async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
689        let ids = self.sorted_checkpoint_ids().await?;
690        let mut result = Vec::with_capacity(ids.len());
691
692        for id in ids {
693            // Skip missing/corrupt manifests — list() is best-effort.
694            if let Ok(Some(manifest)) = self.load_by_id(id).await {
695                result.push((manifest.checkpoint_id, manifest.epoch));
696            }
697        }
698
699        Ok(result)
700    }
701
702    async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
703        let ids = self.sorted_checkpoint_ids().await?;
704        if ids.len() <= keep_count {
705            return Ok(0);
706        }
707
708        let to_remove = ids.len() - keep_count;
709        let mut removed = 0;
710
711        for &id in &ids[..to_remove] {
712            let dir = self.checkpoint_dir(id);
713            if tokio::fs::remove_dir_all(&dir).await.is_ok() {
714                removed += 1;
715            }
716        }
717
718        Ok(removed)
719    }
720
721    async fn save_state_data(
722        &self,
723        id: u64,
724        chunks: &[bytes::Bytes],
725    ) -> Result<(), CheckpointStoreError> {
726        use tokio::io::AsyncWriteExt;
727
728        let cp_dir = self.checkpoint_dir(id);
729        tokio::fs::create_dir_all(&cp_dir).await?;
730
731        let path = self.state_path(id);
732        let tmp = path.with_extension("bin.tmp");
733
734        // Write chunks sequentially to the temp file — no concatenation
735        // into a contiguous buffer. Each chunk is already an owned Bytes;
736        // write_all borrows it.
737        let mut file = tokio::fs::File::create(&tmp).await?;
738        for chunk in chunks {
739            file.write_all(chunk).await?;
740        }
741        file.sync_all().await?;
742        drop(file);
743
744        tokio::fs::rename(&tmp, &path).await?;
745        sync_dir(&cp_dir).await?;
746
747        Ok(())
748    }
749
750    async fn save_with_state(
751        &self,
752        manifest: &CheckpointManifest,
753        state_data: Option<&[bytes::Bytes]>,
754    ) -> Result<(), CheckpointStoreError> {
755        let mut manifest = manifest.clone();
756        // Write sidecar FIRST — if this fails, manifest is never written
757        // and latest.txt still points to the previous valid checkpoint.
758        if let Some(chunks) = state_data {
759            manifest.state_checksum = Some(sha256_hex_chunks(chunks));
760            self.save_state_data(manifest.checkpoint_id, chunks).await?;
761        }
762        self.save(&manifest).await
763    }
764
765    async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
766        let orphans = self.find_orphan_dirs().await?;
767        let mut cleaned = 0;
768        for dir in &orphans {
769            if tokio::fs::remove_dir_all(dir).await.is_ok() {
770                tracing::info!(
771                    path = %dir.display(),
772                    "cleaned up orphaned checkpoint directory"
773                );
774                cleaned += 1;
775            }
776        }
777        Ok(cleaned)
778    }
779
780    async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
781        let path = self.state_path(id);
782        match tokio::fs::read(&path).await {
783            Ok(data) => Ok(Some(data)),
784            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
785            Err(e) => Err(e.into()),
786        }
787    }
788}
789
790// ---------------------------------------------------------------------------
791// ObjectStoreCheckpointStore — sync wrapper around any ObjectStore backend
792// ---------------------------------------------------------------------------
793
794/// JSON pointer stored in `manifests/latest.json`.
795#[derive(serde::Serialize, serde::Deserialize)]
796struct LatestPointer {
797    checkpoint_id: u64,
798}
799
800/// Object-store-backed checkpoint store.
801///
802/// Drives any `object_store::ObjectStore` backend (S3, GCS, Azure,
803/// local FS) directly over `.await`; no dedicated runtime. The app
804/// runtime's HTTP connection pool is reused.
805///
806/// ## Object Layout
807///
808/// ```text
809/// {prefix}/
810///   manifests/
811///     manifest-000001.json    # Checkpoint manifest (JSON)
812///     manifest-000002.json
813///     latest.json             # {"checkpoint_id": 2}
814///   checkpoints/
815///     state-000001.bin        # Optional sidecar state
816///     state-000002.bin
817/// ```
818///
819/// Manifest writes use [`PutMode::Create`] for split-brain prevention
820/// (conditional PUT).
821pub struct ObjectStoreCheckpointStore {
822    store: Arc<dyn ObjectStore>,
823    prefix: String,
824    max_retained: usize,
825    vnode_count: u16,
826}
827
828impl ObjectStoreCheckpointStore {
829    /// Create a new object-store-backed checkpoint store.
830    ///
831    /// `prefix` is prepended to all object paths (e.g., `"nodes/abc123/"`).
832    /// It should end with `/` or be empty.
833    ///
834    /// The store's `vnode_count` defaults to
835    /// [`crate::checkpoint_manifest::DEFAULT_VNODE_COUNT`]. Hosts that run
836    /// with a non-default value should chain [`Self::with_vnode_count`].
837    #[must_use]
838    pub fn new(store: Arc<dyn ObjectStore>, prefix: String, max_retained: usize) -> Self {
839        Self {
840            store,
841            prefix,
842            max_retained,
843            vnode_count: crate::checkpoint_manifest::DEFAULT_VNODE_COUNT,
844        }
845    }
846
847    /// Override the `vnode_count` used during manifest validation.
848    #[must_use]
849    pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
850        self.vnode_count = vnode_count;
851        self
852    }
853
854    fn manifest_path(&self, id: u64) -> object_store::path::Path {
855        object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
856    }
857
858    fn latest_pointer_path(&self) -> object_store::path::Path {
859        object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
860    }
861
862    fn state_path(&self, id: u64) -> object_store::path::Path {
863        object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
864    }
865
866    // ── Helpers ──
867
868    /// Put a payload with bounded retry + jittered backoff for idempotent
869    /// writes (sidecar state, pointer update). Retries on
870    /// `object_store::Error::Generic` — which covers most transient
871    /// 5xx / connection failures across backends — and bubbles every
872    /// other error immediately. Non-idempotent writes (conditional
873    /// creates on the manifest path) MUST NOT use this helper.
874    ///
875    /// `payload` is consumed on the happy path and cloned on retry.
876    /// `PutPayload::clone` is cheap (Arc-bump on each underlying
877    /// `Bytes` chunk), so multi-chunk payloads cost nothing extra to
878    /// retry.
879    async fn put_with_retry(
880        &self,
881        path: &object_store::path::Path,
882        payload: PutPayload,
883        opts: &PutOptions,
884    ) -> Result<(), CheckpointStoreError> {
885        const BACKOFFS_MS: &[u64] = &[100, 500, 2000];
886        let mut attempt = 0usize;
887        loop {
888            let result = self
889                .store
890                .put_opts(path, payload.clone(), opts.clone())
891                .await;
892            match result {
893                Ok(_) => return Ok(()),
894                Err(object_store::Error::Generic { .. }) if attempt < BACKOFFS_MS.len() => {
895                    let delay = std::time::Duration::from_millis(BACKOFFS_MS[attempt]);
896                    tracing::warn!(
897                        path = %path,
898                        attempt = attempt + 1,
899                        delay_ms = delay.as_millis(),
900                        "transient put error, retrying"
901                    );
902                    tokio::time::sleep(delay).await;
903                    attempt += 1;
904                }
905                Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
906            }
907        }
908    }
909
910    /// GET an object, returning `Ok(None)` for `NotFound`.
911    async fn get_bytes(
912        &self,
913        path: &object_store::path::Path,
914    ) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
915        match self.store.get_opts(path, GetOptions::default()).await {
916            Ok(get_result) => {
917                let data = get_result.bytes().await?;
918                Ok(Some(data))
919            }
920            Err(object_store::Error::NotFound { .. }) => Ok(None),
921            Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
922        }
923    }
924
925    /// Load a manifest from a specific path, returning `Ok(None)` for `NotFound`.
926    async fn load_manifest_at(
927        &self,
928        path: &object_store::path::Path,
929    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
930        match self.get_bytes(path).await? {
931            Some(data) => {
932                let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
933                Ok(Some(manifest))
934            }
935            None => Ok(None),
936        }
937    }
938
939    /// List checkpoint IDs by scanning `manifests/manifest-NNNNNN.json`.
940    async fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
941        use futures::TryStreamExt;
942
943        let mut ids = std::collections::BTreeSet::new();
944
945        let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
946        let entries: Vec<_> = self
947            .store
948            .list(Some(&manifests_prefix))
949            .try_collect()
950            .await?;
951        for entry in &entries {
952            if let Some(id) =
953                parse_checkpoint_id_from_path(entry.location.as_ref(), "manifest-", ".json")
954            {
955                ids.insert(id);
956            }
957        }
958
959        Ok(ids.into_iter().collect())
960    }
961}
962
963#[async_trait]
964impl CheckpointStore for ObjectStoreCheckpointStore {
965    fn vnode_count(&self) -> u16 {
966        self.vnode_count
967    }
968
969    async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
970        let json = serde_json::to_string_pretty(manifest)?;
971        let path = self.manifest_path(manifest.checkpoint_id);
972        let json_bytes = bytes::Bytes::from(json);
973
974        // Conditional PUT — prevents duplicate manifest writes (split-brain safety).
975        let create_opts = PutOptions {
976            mode: PutMode::Create,
977            ..PutOptions::default()
978        };
979        let result = self
980            .store
981            .put_opts(
982                &path,
983                PutPayload::from_bytes(json_bytes.clone()),
984                create_opts,
985            )
986            .await;
987
988        match result {
989            Ok(_) => {}
990            Err(object_store::Error::AlreadyExists { .. }) => {
991                tracing::warn!(
992                    checkpoint_id = manifest.checkpoint_id,
993                    "[LDB-6010] Manifest already exists — skipping write"
994                );
995            }
996            Err(object_store::Error::NotImplemented { .. }) => {
997                // Backend doesn't support conditional PUT — fall back to overwrite.
998                self.store
999                    .put_opts(
1000                        &path,
1001                        PutPayload::from_bytes(json_bytes),
1002                        PutOptions::default(),
1003                    )
1004                    .await?;
1005            }
1006            Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
1007        }
1008
1009        // Monotonic pointer update. A stale writer must not regress the
1010        // pointer from id N+1 back to id N. Read the current pointer; if
1011        // it already references a newer id, skip the write. Same-writer
1012        // races (not expected — one coordinator per store instance) can
1013        // still race past this check, but cross-leader stomps from a
1014        // delayed ex-leader are caught.
1015        let latest = self.latest_pointer_path();
1016        if let Some(current) = self.get_bytes(&latest).await? {
1017            if let Ok(existing) = serde_json::from_slice::<LatestPointer>(&current) {
1018                if existing.checkpoint_id > manifest.checkpoint_id {
1019                    tracing::warn!(
1020                        current = existing.checkpoint_id,
1021                        ours = manifest.checkpoint_id,
1022                        "[LDB-6010] latest.json already points at a newer checkpoint — \
1023                         skipping pointer update (possible split-brain or delayed writer)"
1024                    );
1025                    return Ok(());
1026                }
1027            }
1028        }
1029        let pointer = serde_json::to_string(&LatestPointer {
1030            checkpoint_id: manifest.checkpoint_id,
1031        })?;
1032        self.put_with_retry(
1033            &latest,
1034            PutPayload::from_bytes(bytes::Bytes::from(pointer)),
1035            &PutOptions::default(),
1036        )
1037        .await?;
1038
1039        // Auto-prune
1040        if self.max_retained > 0 {
1041            if let Err(e) = self.prune(self.max_retained).await {
1042                tracing::warn!(
1043                    max_retained = self.max_retained,
1044                    error = %e,
1045                    "[LDB-6009] Object store checkpoint prune failed"
1046                );
1047            }
1048        }
1049
1050        Ok(())
1051    }
1052
1053    async fn update_manifest(
1054        &self,
1055        manifest: &CheckpointManifest,
1056    ) -> Result<(), CheckpointStoreError> {
1057        let json = serde_json::to_string_pretty(manifest)?;
1058        let path = self.manifest_path(manifest.checkpoint_id);
1059        let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
1060
1061        // Unconditional PUT — overwrites the existing manifest.
1062        self.store
1063            .put_opts(&path, payload, PutOptions::default())
1064            .await?;
1065
1066        Ok(())
1067    }
1068
1069    async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1070        if let Some(data) = self.get_bytes(&self.latest_pointer_path()).await? {
1071            let pointer: LatestPointer = serde_json::from_slice(&data)?;
1072            return self.load_by_id(pointer.checkpoint_id).await;
1073        }
1074
1075        Ok(None)
1076    }
1077
1078    async fn load_by_id(
1079        &self,
1080        id: u64,
1081    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1082        self.load_manifest_at(&self.manifest_path(id)).await
1083    }
1084
1085    async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
1086        self.list_checkpoint_ids().await
1087    }
1088
1089    async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
1090        let ids = self.list_checkpoint_ids().await?;
1091        let mut result = Vec::with_capacity(ids.len());
1092
1093        for id in ids {
1094            if let Ok(Some(manifest)) = self.load_by_id(id).await {
1095                result.push((manifest.checkpoint_id, manifest.epoch));
1096            }
1097        }
1098
1099        Ok(result)
1100    }
1101
1102    async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
1103        let ids = self.list_checkpoint_ids().await?;
1104        if ids.len() <= keep_count {
1105            return Ok(0);
1106        }
1107
1108        let to_remove = ids.len() - keep_count;
1109        let mut removed = 0;
1110        let mut logged_error = false;
1111
1112        for &id in &ids[..to_remove] {
1113            let manifest = self.manifest_path(id);
1114            let state = self.state_path(id);
1115            let manifest_res = self.store.delete(&manifest).await;
1116            let state_res = self.store.delete(&state).await;
1117
1118            // Count the id as removed only if the manifest is gone
1119            // (state.bin is optional — its absence is fine).
1120            let manifest_ok = matches!(
1121                manifest_res,
1122                Ok(()) | Err(object_store::Error::NotFound { .. })
1123            );
1124            if manifest_ok {
1125                removed += 1;
1126            }
1127
1128            // Surface the first real error so operators can notice.
1129            // Permission errors silently leak old checkpoints forever
1130            // otherwise.
1131            for err in [manifest_res, state_res]
1132                .into_iter()
1133                .filter_map(Result::err)
1134            {
1135                if matches!(err, object_store::Error::NotFound { .. }) {
1136                    continue;
1137                }
1138                if !logged_error {
1139                    tracing::warn!(
1140                        checkpoint_id = id,
1141                        error = %err,
1142                        "[LDB-6027] checkpoint prune: delete failed — \
1143                         retained objects may accumulate"
1144                    );
1145                    logged_error = true;
1146                }
1147            }
1148        }
1149
1150        Ok(removed)
1151    }
1152
1153    async fn save_state_data(
1154        &self,
1155        id: u64,
1156        chunks: &[bytes::Bytes],
1157    ) -> Result<(), CheckpointStoreError> {
1158        let path = self.state_path(id);
1159        // PutPayload is a chain of Bytes — no concatenation into a
1160        // contiguous buffer. Each Arc bump is ~nothing; the underlying
1161        // bytes reach the object-store client untouched.
1162        let payload: PutPayload = chunks.iter().cloned().collect();
1163        // Sidecar writes are idempotent — retry transients.
1164        self.put_with_retry(&path, payload, &PutOptions::default())
1165            .await
1166    }
1167
1168    async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
1169        Ok(self
1170            .get_bytes(&self.state_path(id))
1171            .await?
1172            .map(|d| d.to_vec()))
1173    }
1174
1175    async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
1176        use futures::{StreamExt, TryStreamExt};
1177
1178        // Collect state IDs that have a state.bin but no matching manifest.
1179        let manifest_ids: std::collections::BTreeSet<u64> =
1180            self.list_checkpoint_ids().await?.into_iter().collect();
1181
1182        // List state files: checkpoints/state-NNNNNN.bin
1183        let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
1184        let entries: Vec<_> = self.store.list(Some(&state_prefix)).try_collect().await?;
1185
1186        let mut orphan_paths = Vec::new();
1187        for entry in &entries {
1188            if let Some(id) =
1189                parse_checkpoint_id_from_path(entry.location.as_ref(), "state-", ".bin")
1190            {
1191                if !manifest_ids.contains(&id) {
1192                    orphan_paths.push(entry.location.clone());
1193                }
1194            }
1195        }
1196
1197        let count = orphan_paths.len();
1198        if !orphan_paths.is_empty() {
1199            let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
1200            let mut results = self.store.delete_stream(stream);
1201            while let Some(result) = results.next().await {
1202                if let Err(e) = result {
1203                    tracing::warn!(error = %e, "failed to delete orphan state file");
1204                }
1205            }
1206        }
1207
1208        Ok(count)
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use super::*;
1215    use crate::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
1216    #[allow(clippy::disallowed_types)] // cold path: checkpoint store
1217    use std::collections::HashMap;
1218
1219    fn make_store(dir: &Path) -> FileSystemCheckpointStore {
1220        FileSystemCheckpointStore::new(dir, 3)
1221    }
1222
1223    fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
1224        CheckpointManifest::new(id, epoch)
1225    }
1226
1227    #[tokio::test]
1228    async fn test_save_and_load_latest() {
1229        let dir = tempfile::tempdir().unwrap();
1230        let store = make_store(dir.path());
1231
1232        let m = make_manifest(1, 1);
1233        store.save(&m).await.unwrap();
1234
1235        let loaded = store.load_latest().await.unwrap().unwrap();
1236        assert_eq!(loaded.checkpoint_id, 1);
1237        assert_eq!(loaded.epoch, 1);
1238    }
1239
1240    #[tokio::test]
1241    async fn test_load_latest_returns_none_when_empty() {
1242        let dir = tempfile::tempdir().unwrap();
1243        let store = make_store(dir.path());
1244        assert!(store.load_latest().await.unwrap().is_none());
1245    }
1246
1247    #[tokio::test]
1248    async fn test_load_latest_returns_most_recent() {
1249        let dir = tempfile::tempdir().unwrap();
1250        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1251
1252        for i in 1..=5 {
1253            store.save(&make_manifest(i, i)).await.unwrap();
1254        }
1255
1256        let latest = store.load_latest().await.unwrap().unwrap();
1257        assert_eq!(latest.checkpoint_id, 5);
1258        assert_eq!(latest.epoch, 5);
1259    }
1260
1261    #[tokio::test]
1262    async fn test_load_by_id() {
1263        let dir = tempfile::tempdir().unwrap();
1264        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1265
1266        store.save(&make_manifest(1, 10)).await.unwrap();
1267        store.save(&make_manifest(2, 20)).await.unwrap();
1268
1269        let m = store.load_by_id(1).await.unwrap().unwrap();
1270        assert_eq!(m.epoch, 10);
1271
1272        let m = store.load_by_id(2).await.unwrap().unwrap();
1273        assert_eq!(m.epoch, 20);
1274
1275        assert!(store.load_by_id(99).await.unwrap().is_none());
1276    }
1277
1278    #[tokio::test]
1279    async fn test_list() {
1280        let dir = tempfile::tempdir().unwrap();
1281        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1282
1283        store.save(&make_manifest(1, 10)).await.unwrap();
1284        store.save(&make_manifest(3, 30)).await.unwrap();
1285        store.save(&make_manifest(2, 20)).await.unwrap();
1286
1287        let list = store.list().await.unwrap();
1288        assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1289    }
1290
1291    #[tokio::test]
1292    async fn test_prune_keeps_max() {
1293        let dir = tempfile::tempdir().unwrap();
1294        let store = FileSystemCheckpointStore::new(dir.path(), 10); // no auto-prune
1295
1296        for i in 1..=5 {
1297            store.save(&make_manifest(i, i)).await.unwrap();
1298        }
1299
1300        let removed = store.prune(2).await.unwrap();
1301        assert_eq!(removed, 3);
1302
1303        let list = store.list().await.unwrap();
1304        assert_eq!(list.len(), 2);
1305        assert_eq!(list[0].0, 4);
1306        assert_eq!(list[1].0, 5);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_auto_prune_on_save() {
1311        let dir = tempfile::tempdir().unwrap();
1312        let store = FileSystemCheckpointStore::new(dir.path(), 2);
1313
1314        for i in 1..=5 {
1315            store.save(&make_manifest(i, i)).await.unwrap();
1316        }
1317
1318        let list = store.list().await.unwrap();
1319        assert_eq!(list.len(), 2);
1320        // Should keep the two most recent
1321        assert_eq!(list[0].0, 4);
1322        assert_eq!(list[1].0, 5);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_save_and_load_state_data() {
1327        let dir = tempfile::tempdir().unwrap();
1328        let store = make_store(dir.path());
1329
1330        store.save(&make_manifest(1, 1)).await.unwrap();
1331
1332        let data = b"large operator state binary blob";
1333        store
1334            .save_state_data(1, &[bytes::Bytes::from_static(data)])
1335            .await
1336            .unwrap();
1337
1338        let loaded = store.load_state_data(1).await.unwrap().unwrap();
1339        assert_eq!(loaded, data);
1340    }
1341
1342    #[tokio::test]
1343    async fn test_load_state_data_returns_none() {
1344        let dir = tempfile::tempdir().unwrap();
1345        let store = make_store(dir.path());
1346        assert!(store.load_state_data(99).await.unwrap().is_none());
1347    }
1348
1349    #[tokio::test]
1350    async fn test_full_manifest_round_trip() {
1351        let dir = tempfile::tempdir().unwrap();
1352        let store = make_store(dir.path());
1353
1354        let mut m = make_manifest(1, 5);
1355        m.source_offsets.insert(
1356            "kafka-src".into(),
1357            ConnectorCheckpoint::with_offsets(
1358                5,
1359                HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
1360            ),
1361        );
1362        m.sink_epochs.insert("pg-sink".into(), 4);
1363        m.table_offsets.insert(
1364            "instruments".into(),
1365            ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
1366        );
1367        m.operator_states
1368            .insert("window".into(), OperatorCheckpoint::inline(b"data"));
1369        m.watermark = Some(999_000);
1370
1371        store.save(&m).await.unwrap();
1372
1373        let loaded = store.load_latest().await.unwrap().unwrap();
1374        assert_eq!(loaded.checkpoint_id, 1);
1375        assert_eq!(loaded.epoch, 5);
1376        assert_eq!(loaded.watermark, Some(999_000));
1377
1378        let src = loaded.source_offsets.get("kafka-src").unwrap();
1379        assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
1380
1381        assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
1382
1383        let tbl = loaded.table_offsets.get("instruments").unwrap();
1384        assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
1385
1386        let op = loaded.operator_states.get("window").unwrap();
1387        assert_eq!(op.decode_inline().unwrap(), b"data");
1388    }
1389
1390    #[tokio::test]
1391    async fn test_empty_latest_txt() {
1392        let dir = tempfile::tempdir().unwrap();
1393        let store = make_store(dir.path());
1394
1395        let cp_dir = dir.path().join("checkpoints");
1396        std::fs::create_dir_all(&cp_dir).unwrap();
1397        std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
1398
1399        assert!(store.load_latest().await.unwrap().is_none());
1400    }
1401
1402    #[tokio::test]
1403    async fn test_latest_points_to_missing_checkpoint() {
1404        let dir = tempfile::tempdir().unwrap();
1405        let store = make_store(dir.path());
1406
1407        let cp_dir = dir.path().join("checkpoints");
1408        std::fs::create_dir_all(&cp_dir).unwrap();
1409        std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
1410
1411        assert!(store.load_latest().await.unwrap().is_none());
1412    }
1413
1414    #[tokio::test]
1415    async fn test_prune_no_op_when_under_limit() {
1416        let dir = tempfile::tempdir().unwrap();
1417        let store = make_store(dir.path());
1418
1419        store.save(&make_manifest(1, 1)).await.unwrap();
1420        let removed = store.prune(5).await.unwrap();
1421        assert_eq!(removed, 0);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_save_with_state_writes_sidecar_before_manifest() {
1426        let dir = tempfile::tempdir().unwrap();
1427        let store = make_store(dir.path());
1428
1429        let m = make_manifest(1, 1);
1430        let state = b"large-operator-state-blob";
1431        store
1432            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1433            .await
1434            .unwrap();
1435
1436        // Both manifest and state should be present.
1437        let loaded = store.load_latest().await.unwrap().unwrap();
1438        assert_eq!(loaded.checkpoint_id, 1);
1439
1440        let loaded_state = store.load_state_data(1).await.unwrap().unwrap();
1441        assert_eq!(loaded_state, state);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_save_with_state_none_is_same_as_save() {
1446        let dir = tempfile::tempdir().unwrap();
1447        let store = make_store(dir.path());
1448
1449        let m = make_manifest(1, 1);
1450        store.save_with_state(&m, None).await.unwrap();
1451
1452        let loaded = store.load_latest().await.unwrap().unwrap();
1453        assert_eq!(loaded.checkpoint_id, 1);
1454        assert!(store.load_state_data(1).await.unwrap().is_none());
1455    }
1456
1457    #[tokio::test]
1458    async fn test_orphaned_state_without_manifest_is_ignored() {
1459        let dir = tempfile::tempdir().unwrap();
1460        let store = make_store(dir.path());
1461
1462        // Write only sidecar state, no manifest (simulates crash after
1463        // state write but before manifest write).
1464        store
1465            .save_state_data(1, &[bytes::Bytes::from_static(b"orphaned")])
1466            .await
1467            .unwrap();
1468
1469        // load_latest should return None — the orphan is not visible.
1470        assert!(store.load_latest().await.unwrap().is_none());
1471
1472        // list should not include the orphan (no manifest.json).
1473        assert!(store.list().await.unwrap().is_empty());
1474    }
1475
1476    // -----------------------------------------------------------------------
1477    // ObjectStoreCheckpointStore tests (using InMemory backend)
1478    // -----------------------------------------------------------------------
1479
1480    fn make_obj_store() -> ObjectStoreCheckpointStore {
1481        let store = Arc::new(object_store::memory::InMemory::new());
1482        ObjectStoreCheckpointStore::new(store, String::new(), 3)
1483    }
1484
1485    #[tokio::test]
1486    async fn test_obj_save_and_load_latest() {
1487        let store = make_obj_store();
1488        let m = make_manifest(1, 1);
1489        store.save(&m).await.unwrap();
1490
1491        let loaded = store.load_latest().await.unwrap().unwrap();
1492        assert_eq!(loaded.checkpoint_id, 1);
1493        assert_eq!(loaded.epoch, 1);
1494    }
1495
1496    #[tokio::test]
1497    async fn test_obj_load_latest_returns_none_when_empty() {
1498        let store = make_obj_store();
1499        assert!(store.load_latest().await.unwrap().is_none());
1500    }
1501
1502    #[tokio::test]
1503    async fn test_obj_load_by_id() {
1504        let store = ObjectStoreCheckpointStore::new(
1505            Arc::new(object_store::memory::InMemory::new()),
1506            String::new(),
1507            10,
1508        );
1509
1510        store.save(&make_manifest(1, 10)).await.unwrap();
1511        store.save(&make_manifest(2, 20)).await.unwrap();
1512
1513        let m = store.load_by_id(1).await.unwrap().unwrap();
1514        assert_eq!(m.epoch, 10);
1515        let m = store.load_by_id(2).await.unwrap().unwrap();
1516        assert_eq!(m.epoch, 20);
1517        assert!(store.load_by_id(99).await.unwrap().is_none());
1518    }
1519
1520    #[tokio::test]
1521    async fn test_obj_list() {
1522        let store = ObjectStoreCheckpointStore::new(
1523            Arc::new(object_store::memory::InMemory::new()),
1524            String::new(),
1525            10,
1526        );
1527
1528        store.save(&make_manifest(1, 10)).await.unwrap();
1529        store.save(&make_manifest(3, 30)).await.unwrap();
1530        store.save(&make_manifest(2, 20)).await.unwrap();
1531
1532        let list = store.list().await.unwrap();
1533        assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1534    }
1535
1536    #[tokio::test]
1537    async fn test_obj_prune() {
1538        let store = ObjectStoreCheckpointStore::new(
1539            Arc::new(object_store::memory::InMemory::new()),
1540            String::new(),
1541            10,
1542        );
1543
1544        for i in 1..=5 {
1545            store.save(&make_manifest(i, i)).await.unwrap();
1546        }
1547
1548        let removed = store.prune(2).await.unwrap();
1549        assert_eq!(removed, 3);
1550
1551        let list = store.list().await.unwrap();
1552        assert_eq!(list.len(), 2);
1553        assert_eq!(list[0].0, 4);
1554        assert_eq!(list[1].0, 5);
1555    }
1556
1557    #[tokio::test]
1558    async fn test_obj_auto_prune_on_save() {
1559        let store = ObjectStoreCheckpointStore::new(
1560            Arc::new(object_store::memory::InMemory::new()),
1561            String::new(),
1562            2,
1563        );
1564
1565        for i in 1..=5 {
1566            store.save(&make_manifest(i, i)).await.unwrap();
1567        }
1568
1569        let list = store.list().await.unwrap();
1570        assert_eq!(list.len(), 2);
1571        assert_eq!(list[0].0, 4);
1572        assert_eq!(list[1].0, 5);
1573    }
1574
1575    #[tokio::test]
1576    async fn test_obj_save_and_load_state_data() {
1577        let store = make_obj_store();
1578        store.save(&make_manifest(1, 1)).await.unwrap();
1579
1580        let data = b"large operator state binary blob";
1581        store
1582            .save_state_data(1, &[bytes::Bytes::from_static(data)])
1583            .await
1584            .unwrap();
1585
1586        let loaded = store.load_state_data(1).await.unwrap().unwrap();
1587        assert_eq!(loaded, data);
1588    }
1589
1590    #[tokio::test]
1591    async fn test_obj_load_state_data_returns_none() {
1592        let store = make_obj_store();
1593        assert!(store.load_state_data(99).await.unwrap().is_none());
1594    }
1595
1596    #[tokio::test]
1597    async fn test_obj_with_prefix() {
1598        let inner = Arc::new(object_store::memory::InMemory::new());
1599        let store = ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10);
1600
1601        store.save(&make_manifest(1, 42)).await.unwrap();
1602        let loaded = store.load_latest().await.unwrap().unwrap();
1603        assert_eq!(loaded.checkpoint_id, 1);
1604        assert_eq!(loaded.epoch, 42);
1605    }
1606
1607    // -----------------------------------------------------------------------
1608    // v2 layout verification + backward compatibility tests
1609    // -----------------------------------------------------------------------
1610
1611    #[tokio::test]
1612    async fn test_obj_layout_paths() {
1613        let inner = Arc::new(object_store::memory::InMemory::new());
1614        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1615
1616        store.save(&make_manifest(1, 10)).await.unwrap();
1617
1618        let result = inner
1619            .get_opts(
1620                &object_store::path::Path::from("manifests/manifest-000001.json"),
1621                GetOptions::default(),
1622            )
1623            .await;
1624        assert!(result.is_ok(), "manifest path should exist");
1625
1626        let result = inner
1627            .get_opts(
1628                &object_store::path::Path::from("manifests/latest.json"),
1629                GetOptions::default(),
1630            )
1631            .await;
1632        assert!(result.is_ok(), "latest.json should exist");
1633    }
1634
1635    #[tokio::test]
1636    async fn test_obj_conditional_put_idempotent() {
1637        let store = ObjectStoreCheckpointStore::new(
1638            Arc::new(object_store::memory::InMemory::new()),
1639            String::new(),
1640            10,
1641        );
1642
1643        let m = make_manifest(1, 10);
1644        store.save(&m).await.unwrap();
1645
1646        // Second save with same ID should succeed (logs warning, skips write)
1647        store.save(&m).await.unwrap();
1648
1649        let loaded = store.load_latest().await.unwrap().unwrap();
1650        assert_eq!(loaded.checkpoint_id, 1);
1651        assert_eq!(loaded.epoch, 10);
1652    }
1653
1654    #[tokio::test]
1655    async fn test_obj_update_manifest_overwrites() {
1656        use crate::checkpoint_manifest::SinkCommitStatus;
1657
1658        let store = make_obj_store();
1659
1660        // Step 5: initial save with Pending sink status
1661        let mut m = make_manifest(1, 10);
1662        m.sink_commit_statuses
1663            .insert("pg-sink".into(), SinkCommitStatus::Pending);
1664        store.save(&m).await.unwrap();
1665
1666        // Verify Pending persisted
1667        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1668        assert_eq!(
1669            loaded.sink_commit_statuses.get("pg-sink"),
1670            Some(&SinkCommitStatus::Pending)
1671        );
1672
1673        // Step 6b: update manifest with Committed status
1674        m.sink_commit_statuses
1675            .insert("pg-sink".into(), SinkCommitStatus::Committed);
1676        store.update_manifest(&m).await.unwrap();
1677
1678        // Verify Committed persisted (the bug: save() would skip this)
1679        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1680        assert_eq!(
1681            loaded.sink_commit_statuses.get("pg-sink"),
1682            Some(&SinkCommitStatus::Committed)
1683        );
1684    }
1685
1686    #[tokio::test]
1687    async fn test_obj_save_still_uses_conditional_put() {
1688        let store = make_obj_store();
1689
1690        let m = make_manifest(1, 10);
1691        store.save(&m).await.unwrap();
1692
1693        // Second save() with same ID skips (conditional PUT)
1694        // but does not error
1695        store.save(&m).await.unwrap();
1696
1697        // update_manifest() with same ID overwrites
1698        let mut m2 = make_manifest(1, 10);
1699        m2.watermark = Some(42);
1700        store.update_manifest(&m2).await.unwrap();
1701
1702        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1703        assert_eq!(loaded.watermark, Some(42));
1704    }
1705
1706    #[tokio::test]
1707    async fn test_fs_update_manifest_overwrites() {
1708        use crate::checkpoint_manifest::SinkCommitStatus;
1709
1710        let dir = tempfile::tempdir().unwrap();
1711        let store = make_store(dir.path());
1712
1713        let mut m = make_manifest(1, 10);
1714        m.sink_commit_statuses
1715            .insert("sink-a".into(), SinkCommitStatus::Pending);
1716        store.save(&m).await.unwrap();
1717
1718        m.sink_commit_statuses
1719            .insert("sink-a".into(), SinkCommitStatus::Committed);
1720        store.update_manifest(&m).await.unwrap();
1721
1722        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1723        assert_eq!(
1724            loaded.sink_commit_statuses.get("sink-a"),
1725            Some(&SinkCommitStatus::Committed)
1726        );
1727    }
1728
1729    #[tokio::test]
1730    async fn test_obj_state_paths() {
1731        let inner = Arc::new(object_store::memory::InMemory::new());
1732        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1733
1734        store.save(&make_manifest(1, 1)).await.unwrap();
1735        store
1736            .save_state_data(1, &[bytes::Bytes::from_static(b"state-blob")])
1737            .await
1738            .unwrap();
1739
1740        let result = inner
1741            .get_opts(
1742                &object_store::path::Path::from("checkpoints/state-000001.bin"),
1743                GetOptions::default(),
1744            )
1745            .await;
1746        assert!(result.is_ok(), "state path should exist");
1747    }
1748
1749    #[tokio::test]
1750    async fn test_obj_latest_json_format() {
1751        let inner = Arc::new(object_store::memory::InMemory::new());
1752        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1753
1754        store.save(&make_manifest(5, 50)).await.unwrap();
1755
1756        let data = inner
1757            .get_opts(
1758                &object_store::path::Path::from("manifests/latest.json"),
1759                GetOptions::default(),
1760            )
1761            .await
1762            .unwrap()
1763            .bytes()
1764            .await
1765            .unwrap();
1766
1767        let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
1768        assert_eq!(pointer.checkpoint_id, 5);
1769    }
1770
1771    #[tokio::test]
1772    async fn test_obj_latest_monotonic_guard_skips_regression() {
1773        let inner = Arc::new(object_store::memory::InMemory::new());
1774        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1775
1776        store.save(&make_manifest(10, 10)).await.unwrap();
1777        // A delayed writer (e.g., paused ex-leader) tries to write id=5
1778        // after the current leader already advanced to id=10. The pointer
1779        // must not regress.
1780        store.save(&make_manifest(5, 5)).await.unwrap();
1781
1782        let loaded = store.load_latest().await.unwrap().unwrap();
1783        assert_eq!(
1784            loaded.checkpoint_id, 10,
1785            "latest pointer should not regress to an older id"
1786        );
1787    }
1788
1789    #[tokio::test]
1790    async fn test_validate_checkpoint_valid() {
1791        let dir = tempfile::tempdir().unwrap();
1792        let store = make_store(dir.path());
1793
1794        let m = make_manifest(1, 1);
1795        store.save(&m).await.unwrap();
1796
1797        let result = store.validate_checkpoint(1).await.unwrap();
1798        assert!(result.valid, "valid checkpoint: {:?}", result.issues);
1799        assert!(result.issues.is_empty());
1800    }
1801
1802    #[tokio::test]
1803    async fn test_validate_checkpoint_epoch_zero_invalid() {
1804        let dir = tempfile::tempdir().unwrap();
1805        let store = make_store(dir.path());
1806
1807        // Manually save a manifest with epoch=0 (bypassing normal creation)
1808        let m = make_manifest(1, 0);
1809        store.save(&m).await.unwrap();
1810
1811        let result = store.validate_checkpoint(1).await.unwrap();
1812        assert!(!result.valid, "epoch=0 should be invalid");
1813        assert!(
1814            result.issues.iter().any(|i| i.message().contains("epoch")),
1815            "should mention epoch: {:?}",
1816            result.issues
1817        );
1818    }
1819
1820    #[tokio::test]
1821    async fn test_validate_checkpoint_missing_manifest() {
1822        let dir = tempfile::tempdir().unwrap();
1823        let store = make_store(dir.path());
1824
1825        let result = store.validate_checkpoint(99).await.unwrap();
1826        assert!(!result.valid);
1827        assert!(result.issues[0].message().contains("not found"));
1828    }
1829
1830    #[tokio::test]
1831    async fn test_validate_checkpoint_corrupt_manifest() {
1832        let dir = tempfile::tempdir().unwrap();
1833        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1834
1835        // Create a checkpoint dir with corrupt manifest JSON.
1836        let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
1837        std::fs::create_dir_all(&cp_dir).unwrap();
1838        std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
1839
1840        // Corrupt manifest is a validation failure, not an I/O error.
1841        let result = store.validate_checkpoint(1).await.unwrap();
1842        assert!(!result.valid);
1843        assert!(
1844            result.issues[0].message().contains("corrupt manifest"),
1845            "expected corrupt manifest issue: {:?}",
1846            result.issues
1847        );
1848    }
1849
1850    #[tokio::test]
1851    async fn test_validate_checkpoint_state_checksum_ok() {
1852        let dir = tempfile::tempdir().unwrap();
1853        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1854
1855        let state = b"important operator state";
1856        let m = make_manifest(1, 1);
1857        store
1858            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1859            .await
1860            .unwrap();
1861
1862        let result = store.validate_checkpoint(1).await.unwrap();
1863        assert!(result.valid, "checksum should match: {:?}", result.issues);
1864    }
1865
1866    #[tokio::test]
1867    async fn test_validate_checkpoint_state_checksum_mismatch() {
1868        let dir = tempfile::tempdir().unwrap();
1869        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1870
1871        // Save with state to get a checksum.
1872        let state = b"original state";
1873        let m = make_manifest(1, 1);
1874        store
1875            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1876            .await
1877            .unwrap();
1878
1879        // Now corrupt the state.bin on disk.
1880        let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1881        std::fs::write(&state_path, b"corrupted data!!").unwrap();
1882
1883        let result = store.validate_checkpoint(1).await.unwrap();
1884        assert!(!result.valid, "corrupted state should be invalid");
1885        assert!(
1886            result
1887                .issues
1888                .iter()
1889                .any(|i| i.message().contains("checksum mismatch")),
1890            "should report checksum mismatch: {:?}",
1891            result.issues
1892        );
1893    }
1894
1895    #[tokio::test]
1896    async fn test_validate_checkpoint_state_missing_when_expected() {
1897        let dir = tempfile::tempdir().unwrap();
1898        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1899
1900        // Save with state.
1901        let m = make_manifest(1, 1);
1902        store
1903            .save_with_state(&m, Some(&[bytes::Bytes::from_static(b"state")]))
1904            .await
1905            .unwrap();
1906
1907        // Delete the state.bin file to simulate partial crash.
1908        let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1909        std::fs::remove_file(&state_path).unwrap();
1910
1911        let result = store.validate_checkpoint(1).await.unwrap();
1912        assert!(!result.valid);
1913        assert!(
1914            result
1915                .issues
1916                .iter()
1917                .any(|i| i.message().contains("not found")),
1918            "should report missing state: {:?}",
1919            result.issues
1920        );
1921    }
1922
1923    #[tokio::test]
1924    async fn test_recover_latest_validated_skips_corrupt() {
1925        let dir = tempfile::tempdir().unwrap();
1926        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1927
1928        // Save two checkpoints.
1929        store.save(&make_manifest(1, 10)).await.unwrap();
1930        store.save(&make_manifest(2, 20)).await.unwrap();
1931
1932        // Corrupt the latest checkpoint's manifest.
1933        let cp2_manifest = dir
1934            .path()
1935            .join("checkpoints/checkpoint_000002/manifest.json");
1936        std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
1937
1938        // Recovery should skip checkpoint 2 and pick checkpoint 1.
1939        let report = store.recover_latest_validated().await.unwrap();
1940        assert_eq!(report.chosen_id, Some(1));
1941        assert_eq!(report.skipped.len(), 1);
1942        assert_eq!(report.skipped[0].0, 2);
1943        assert_eq!(report.examined, 2);
1944    }
1945
1946    #[tokio::test]
1947    async fn test_recover_latest_validated_fresh_start() {
1948        let dir = tempfile::tempdir().unwrap();
1949        let store = make_store(dir.path());
1950
1951        let report = store.recover_latest_validated().await.unwrap();
1952        assert!(report.chosen_id.is_none());
1953        assert_eq!(report.examined, 0);
1954    }
1955
1956    #[tokio::test]
1957    async fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
1958        let dir = tempfile::tempdir().unwrap();
1959        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1960
1961        // Save a checkpoint, then corrupt it.
1962        store.save(&make_manifest(1, 1)).await.unwrap();
1963        let cp_manifest = dir
1964            .path()
1965            .join("checkpoints/checkpoint_000001/manifest.json");
1966        std::fs::write(cp_manifest, "corrupt").unwrap();
1967
1968        // The corrupt manifest will cause load_by_id (via list()) to fail,
1969        // so it may not appear in the list at all. Either way, recovery
1970        // should not select it.
1971        let report = store.recover_latest_validated().await.unwrap();
1972        assert!(report.chosen_id.is_none());
1973    }
1974
1975    #[tokio::test]
1976    async fn test_cleanup_orphans_removes_stateless_dirs() {
1977        let dir = tempfile::tempdir().unwrap();
1978        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1979
1980        // Create an orphan: state.bin exists but no manifest.json.
1981        let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
1982        std::fs::create_dir_all(&orphan_dir).unwrap();
1983        std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
1984
1985        // Normal checkpoint (has manifest).
1986        store.save(&make_manifest(1, 1)).await.unwrap();
1987
1988        let cleaned = store.cleanup_orphans().await.unwrap();
1989        assert_eq!(cleaned, 1);
1990
1991        // Orphan dir should be gone.
1992        assert!(!orphan_dir.exists());
1993        // Normal checkpoint should still be there.
1994        assert!(store.load_by_id(1).await.unwrap().is_some());
1995    }
1996
1997    #[tokio::test]
1998    async fn test_cleanup_orphans_noop_when_clean() {
1999        let dir = tempfile::tempdir().unwrap();
2000        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2001
2002        store.save(&make_manifest(1, 1)).await.unwrap();
2003        let cleaned = store.cleanup_orphans().await.unwrap();
2004        assert_eq!(cleaned, 0);
2005    }
2006
2007    #[tokio::test]
2008    async fn test_save_with_state_writes_checksum() {
2009        let dir = tempfile::tempdir().unwrap();
2010        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2011
2012        let state = b"state-data-for-checksum";
2013        let m = make_manifest(1, 1);
2014        store
2015            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2016            .await
2017            .unwrap();
2018
2019        let loaded = store.load_latest().await.unwrap().unwrap();
2020        assert!(
2021            loaded.state_checksum.is_some(),
2022            "state_checksum should be set"
2023        );
2024        let expected = sha256_hex(state);
2025        assert_eq!(loaded.state_checksum.unwrap(), expected);
2026    }
2027
2028    #[tokio::test]
2029    async fn test_state_checksum_backward_compat() {
2030        // Older manifests without state_checksum should deserialize fine.
2031        let json = r#"{
2032            "version": 1,
2033            "checkpoint_id": 1,
2034            "epoch": 1,
2035            "timestamp_ms": 1000
2036        }"#;
2037        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
2038        assert!(m.state_checksum.is_none());
2039    }
2040
2041    // ObjectStore variants
2042
2043    #[tokio::test]
2044    async fn test_obj_validate_checkpoint_valid() {
2045        let store = make_obj_store();
2046        store.save(&make_manifest(1, 1)).await.unwrap();
2047
2048        let result = store.validate_checkpoint(1).await.unwrap();
2049        assert!(result.valid, "valid checkpoint: {:?}", result.issues);
2050    }
2051
2052    #[tokio::test]
2053    async fn test_obj_validate_checkpoint_missing() {
2054        let store = make_obj_store();
2055        let result = store.validate_checkpoint(99).await.unwrap();
2056        assert!(!result.valid);
2057    }
2058
2059    #[tokio::test]
2060    async fn test_obj_validate_state_checksum() {
2061        let store = ObjectStoreCheckpointStore::new(
2062            Arc::new(object_store::memory::InMemory::new()),
2063            String::new(),
2064            10,
2065        );
2066
2067        let state = b"obj-store-state-data";
2068        let m = make_manifest(1, 1);
2069        store
2070            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2071            .await
2072            .unwrap();
2073
2074        let result = store.validate_checkpoint(1).await.unwrap();
2075        assert!(result.valid, "checksum should match: {:?}", result.issues);
2076    }
2077
2078    #[tokio::test]
2079    async fn test_obj_recover_latest_validated() {
2080        let store = ObjectStoreCheckpointStore::new(
2081            Arc::new(object_store::memory::InMemory::new()),
2082            String::new(),
2083            10,
2084        );
2085
2086        store.save(&make_manifest(1, 10)).await.unwrap();
2087        store.save(&make_manifest(2, 20)).await.unwrap();
2088
2089        let report = store.recover_latest_validated().await.unwrap();
2090        assert_eq!(report.chosen_id, Some(2));
2091        assert!(report.skipped.is_empty());
2092    }
2093
2094    #[tokio::test]
2095    async fn test_obj_cleanup_orphans() {
2096        let inner = Arc::new(object_store::memory::InMemory::new());
2097        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
2098
2099        // Save a checkpoint (creates manifest + state).
2100        let state = b"state-with-manifest";
2101        store
2102            .save_with_state(
2103                &make_manifest(1, 1),
2104                Some(&[bytes::Bytes::from_static(state)]),
2105            )
2106            .await
2107            .unwrap();
2108
2109        // Write an orphan state file (no manifest).
2110        let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
2111        inner
2112            .put_opts(
2113                &orphan_path,
2114                PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
2115                PutOptions::default(),
2116            )
2117            .await
2118            .unwrap();
2119
2120        let cleaned = store.cleanup_orphans().await.unwrap();
2121        assert_eq!(cleaned, 1);
2122
2123        // Verify orphan is gone but real state is intact.
2124        let real_state = store.load_state_data(1).await.unwrap();
2125        assert!(real_state.is_some());
2126    }
2127}