Skip to main content

laminar_core/checkpoint/
checkpoint_store.rs

1//! Checkpoint persistence via the [`CheckpointStore`] trait.
2//!
3//! Provides a filesystem-backed implementation ([`FileSystemCheckpointStore`])
4//! that writes manifests as atomic JSON files with a `latest.txt` pointer
5//! for crash-safe recovery.
6//!
7
8#![allow(clippy::disallowed_types)] // cold path: checkpoint metadata operations
9//! ## Disk Layout
10//!
11//! ```text
12//! {base_dir}/checkpoints/
13//!   checkpoint_000001/
14//!     manifest.json     # CheckpointManifest as pretty-printed JSON
15//!     state.bin         # Optional: large operator state sidecar
16//!   checkpoint_000002/
17//!     manifest.json
18//!   latest.txt          # "checkpoint_000002" — pointer to latest good checkpoint
19//! ```
20
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use object_store::{GetOptions, ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
26use sha2::{Digest, Sha256};
27use tracing::warn;
28
29use crate::checkpoint::checkpoint_manifest::CheckpointManifest;
30
31/// Fsync a file to ensure its contents are durable on disk.
32async fn sync_file(path: &Path) -> Result<(), std::io::Error> {
33    // Must open with write access — Windows requires it for FlushFileBuffers.
34    let f = tokio::fs::OpenOptions::new().write(true).open(path).await?;
35    f.sync_all().await
36}
37
38/// Fsync a directory to make rename operations durable.
39///
40/// On Unix, this flushes directory metadata (new/renamed entries).
41/// On Windows, directory sync is not supported; the OS handles durability.
42#[allow(clippy::unnecessary_wraps, clippy::unused_async)] // no-op on Windows
43async fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
44    #[cfg(unix)]
45    {
46        let f = tokio::fs::File::open(path).await?;
47        f.sync_all().await?;
48    }
49    #[cfg(not(unix))]
50    {
51        let _ = path;
52    }
53    Ok(())
54}
55
56/// Errors from checkpoint store operations.
57#[derive(Debug, thiserror::Error)]
58pub enum CheckpointStoreError {
59    /// I/O error during checkpoint persistence.
60    #[error("checkpoint I/O error: {0}")]
61    Io(#[from] std::io::Error),
62
63    /// JSON serialization/deserialization error.
64    #[error("checkpoint serialization error: {0}")]
65    Serde(#[from] serde_json::Error),
66
67    /// Checkpoint not found.
68    #[error("checkpoint {0} not found")]
69    NotFound(u64),
70
71    /// Object store error.
72    #[error("object store error: {0}")]
73    ObjectStore(#[from] object_store::Error),
74}
75
76// ---------------------------------------------------------------------------
77// Checkpoint validation types
78// ---------------------------------------------------------------------------
79
80/// Classification of a single validation finding.
81///
82/// `ManifestWarning` is non-fatal — the checkpoint is still usable.
83/// `IntegrityFailure` is fatal — recovery must skip this checkpoint.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum ValidationIssue {
86    /// Non-fatal manifest-level warning (e.g. `vnode_count` mismatch,
87    /// orphaned source offset).
88    ManifestWarning(String),
89    /// Fatal: manifest is missing/corrupt, or the sidecar integrity
90    /// check (checksum, presence) failed.
91    IntegrityFailure(String),
92}
93
94impl ValidationIssue {
95    /// True if this issue renders the checkpoint unusable for recovery.
96    #[must_use]
97    pub fn is_fatal(&self) -> bool {
98        matches!(self, Self::IntegrityFailure(_))
99    }
100
101    /// Underlying human-readable message.
102    #[must_use]
103    pub fn message(&self) -> &str {
104        match self {
105            Self::ManifestWarning(s) | Self::IntegrityFailure(s) => s,
106        }
107    }
108}
109
110impl std::fmt::Display for ValidationIssue {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.write_str(self.message())
113    }
114}
115
116/// Result of validating a single checkpoint.
117#[derive(Debug, Clone)]
118pub struct ValidationResult {
119    /// Checkpoint ID that was validated.
120    pub checkpoint_id: u64,
121    /// Whether the checkpoint is valid for recovery. A checkpoint is
122    /// valid iff it has no [`ValidationIssue::IntegrityFailure`] issues.
123    pub valid: bool,
124    /// Issues found during validation.
125    pub issues: Vec<ValidationIssue>,
126}
127
128/// Report from a crash-safe recovery walk.
129///
130/// Captures which checkpoints were tried, which were skipped (and why),
131/// and which was ultimately chosen for recovery.
132#[derive(Debug, Clone)]
133pub struct RecoveryReport {
134    /// The checkpoint that was selected for recovery (`None` if fresh start).
135    pub chosen_id: Option<u64>,
136    /// Checkpoints that were tried and skipped (id, reason).
137    pub skipped: Vec<(u64, String)>,
138    /// Total number of checkpoints examined.
139    pub examined: usize,
140    /// Elapsed time for the recovery walk.
141    pub elapsed: std::time::Duration,
142}
143
144/// Parse a checkpoint id out of an object-store path segment shaped like
145/// `"{prefix}NNNNNN{suffix}"` (e.g. `"manifest-000042.json"`). Scans all
146/// '/'-separated segments so the helper works on prefixed stores. A
147/// segment with the right affixes but a non-numeric middle is logged at
148/// warn — operators need to notice manually-renamed files rather than
149/// see silent gaps in `prune`/`list_ids`.
150fn parse_checkpoint_id_from_path(path: &str, prefix: &str, suffix: &str) -> Option<u64> {
151    for segment in path.split('/') {
152        let Some(rest) = segment.strip_prefix(prefix) else {
153            continue;
154        };
155        let Some(id_str) = rest.strip_suffix(suffix) else {
156            continue;
157        };
158        if let Ok(id) = id_str.parse::<u64>() {
159            return Some(id);
160        }
161        warn!(
162            path,
163            prefix, suffix, "malformed checkpoint id in object path — skipped"
164        );
165        return None;
166    }
167    None
168}
169
170/// Compute SHA-256 hex digest of data.
171fn sha256_hex(data: &[u8]) -> String {
172    let mut hasher = Sha256::new();
173    hasher.update(data);
174    format!("{:x}", hasher.finalize())
175}
176
177/// Compute SHA-256 hex digest across a chain of `Bytes` chunks.
178///
179/// Equivalent to hashing the concatenation of the chunks, but without
180/// materializing that concatenation in memory. Used by
181/// [`CheckpointStore::save_with_state`] to checksum the sidecar before
182/// the multi-chunk write.
183fn sha256_hex_chunks(chunks: &[bytes::Bytes]) -> String {
184    let mut hasher = Sha256::new();
185    for chunk in chunks {
186        hasher.update(chunk);
187    }
188    format!("{:x}", hasher.finalize())
189}
190
191/// Combined checksum for a mixed (inline + external) manifest:
192/// `sha256(inline_hash_hex || concat(sidecar_chunks))`.
193fn sha256_hex_mixed<'a, I>(
194    states: &std::collections::HashMap<
195        String,
196        crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
197    >,
198    sidecar_chunks: I,
199) -> String
200where
201    I: IntoIterator<Item = &'a [u8]>,
202{
203    let inline = sha256_hex_inline_states(states);
204    let mut hasher = Sha256::new();
205    hasher.update(inline.as_bytes());
206    for chunk in sidecar_chunks {
207        hasher.update(chunk);
208    }
209    format!("{:x}", hasher.finalize())
210}
211
212/// SHA-256 over inline operator-state entries in sorted-name order,
213/// used as the `state_checksum` when no sidecar exists.
214fn sha256_hex_inline_states(
215    states: &std::collections::HashMap<
216        String,
217        crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
218    >,
219) -> String {
220    let mut names: Vec<&String> = states.keys().collect();
221    names.sort_unstable();
222    let mut hasher = Sha256::new();
223    for n in names {
224        if let Some(op) = states.get(n) {
225            if op.external {
226                continue;
227            }
228            hasher.update(n.as_bytes());
229            hasher.update([0u8]);
230            if let Some(b64) = &op.state_b64 {
231                hasher.update(b64.as_bytes());
232            }
233            hasher.update([0u8]);
234        }
235    }
236    format!("{:x}", hasher.finalize())
237}
238
239/// Trait for checkpoint persistence backends.
240///
241/// Implementations must guarantee atomic manifest writes (readers never see
242/// a partial manifest). The `latest.txt` pointer is updated only after the
243/// manifest is fully written and synced.
244#[async_trait]
245pub trait CheckpointStore: Send + Sync {
246    /// Runtime vnode count that manifests written by this store are
247    /// expected to use. Consulted when validating loaded manifests —
248    /// a mismatch is reported as a manifest warning. Defaults to
249    /// [`crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT`] when the
250    /// implementation has no configured value.
251    fn vnode_count(&self) -> u16 {
252        crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT
253    }
254
255    /// Atomically persists a checkpoint manifest. Implementations must
256    /// guarantee readers never observe a partial manifest.
257    ///
258    /// # Errors
259    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
260    async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
261
262    /// Loads the most recent checkpoint manifest, or `Ok(None)` on a
263    /// fresh store.
264    ///
265    /// # Errors
266    /// Returns [`CheckpointStoreError`] on I/O or deserialization failure.
267    async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
268
269    /// Loads a specific manifest, or `Ok(None)` if absent.
270    ///
271    /// # Errors
272    /// Returns [`CheckpointStoreError`] on I/O or deserialization failure.
273    async fn load_by_id(&self, id: u64)
274        -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
275
276    /// Lists all available checkpoints as `(id, epoch)` pairs, sorted
277    /// ascending by ID. May read every manifest; callers that only
278    /// need IDs should use [`Self::list_ids`].
279    ///
280    /// # Errors
281    /// Returns [`CheckpointStoreError`] on I/O failure.
282    async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
283
284    /// Lists all checkpoint IDs, **sorted ascending**. Unlike
285    /// [`Self::list`] this enumerates corrupt manifests too (used by
286    /// crash recovery). Callers rely on the ascending invariant.
287    ///
288    /// # Errors
289    /// Returns [`CheckpointStoreError`] on I/O failure.
290    async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
291        // Default: O(N) manifest reads via list(). Production backends
292        // should override; list() already sorts ascending.
293        Ok(self.list().await?.iter().map(|(id, _)| *id).collect())
294    }
295
296    /// Prunes old checkpoints, keeping at most `keep_count` recent
297    /// ones. Returns the number of checkpoints removed.
298    ///
299    /// # Errors
300    /// Returns [`CheckpointStoreError`] on I/O failure.
301    async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
302
303    /// Overwrites an existing manifest, bypassing the conditional-PUT
304    /// fence used by [`Self::save`]. Used after a successful sink
305    /// commit to record per-sink status transitions.
306    ///
307    /// # Errors
308    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
309    async fn update_manifest(
310        &self,
311        manifest: &CheckpointManifest,
312    ) -> Result<(), CheckpointStoreError> {
313        self.save(manifest).await
314    }
315
316    /// Writes operator state sidecar bytes for a checkpoint.
317    ///
318    /// Accepts a chain of `Bytes` chunks (one per operator) rather than
319    /// a single concatenated slice. Backends that support native
320    /// multi-chunk writes (object-store `PutPayload`) avoid copying the
321    /// chunks into a contiguous buffer; backends without such support
322    /// write sequentially.
323    ///
324    /// # Errors
325    /// Returns [`CheckpointStoreError`] on I/O failure.
326    async fn save_state_data(
327        &self,
328        id: u64,
329        chunks: &[bytes::Bytes],
330    ) -> Result<(), CheckpointStoreError>;
331
332    /// Loads operator state sidecar bytes for a checkpoint, or `Ok(None)`
333    /// if no sidecar was written.
334    ///
335    /// # Errors
336    /// Returns [`CheckpointStoreError`] on I/O failure.
337    async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
338
339    /// Validate a specific checkpoint's integrity.
340    ///
341    /// Checks that the manifest is parseable and, if a `state_checksum` is
342    /// present, verifies the sidecar data matches.
343    ///
344    /// # Errors
345    ///
346    /// Returns [`CheckpointStoreError`] on I/O failure.
347    async fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
348        let mut issues = Vec::new();
349
350        // Load manifest — corrupt JSON is a validation failure, not an I/O error.
351        let manifest = match self.load_by_id(id).await {
352            Ok(Some(m)) => m,
353            Ok(None) => {
354                return Ok(ValidationResult {
355                    checkpoint_id: id,
356                    valid: false,
357                    issues: vec![ValidationIssue::IntegrityFailure(format!(
358                        "manifest not found for checkpoint {id}"
359                    ))],
360                });
361            }
362            Err(CheckpointStoreError::Serde(e)) => {
363                return Ok(ValidationResult {
364                    checkpoint_id: id,
365                    valid: false,
366                    issues: vec![ValidationIssue::IntegrityFailure(format!(
367                        "corrupt manifest: {e}"
368                    ))],
369                });
370            }
371            Err(e) => return Err(e),
372        };
373
374        for err in manifest.validate(self.vnode_count()) {
375            issues.push(ValidationIssue::ManifestWarning(format!(
376                "manifest validation: {err}"
377            )));
378        }
379
380        // `state_checksum` covers, depending on shape: the sidecar bytes
381        // (purely-external), the inline operator_states (purely-inline),
382        // or both (mixed) — see `sha256_hex_mixed`.
383        if let Some(expected) = &manifest.state_checksum {
384            let any_inline = manifest.operator_states.values().any(|o| !o.external);
385            let any_external = manifest.operator_states.values().any(|o| o.external);
386            let needs_sidecar = any_external || !any_inline;
387            let sidecar = if needs_sidecar {
388                self.load_state_data(id).await?
389            } else {
390                None
391            };
392            let actual = match (any_inline, &sidecar) {
393                (true, Some(data)) => {
394                    sha256_hex_mixed(&manifest.operator_states, std::iter::once(data.as_slice()))
395                }
396                (true, None) if !any_external => {
397                    sha256_hex_inline_states(&manifest.operator_states)
398                }
399                (_, Some(data)) => sha256_hex(data),
400                (_, None) => {
401                    issues.push(ValidationIssue::IntegrityFailure(
402                        "state.bin referenced by checksum but not found".into(),
403                    ));
404                    String::new()
405                }
406            };
407            if !actual.is_empty() && actual != *expected {
408                let label = if any_inline && any_external {
409                    "mixed state checksum mismatch"
410                } else if any_inline {
411                    "inline state checksum mismatch"
412                } else {
413                    "state.bin checksum mismatch"
414                };
415                issues.push(ValidationIssue::IntegrityFailure(format!(
416                    "{label}: expected {expected}, got {actual}"
417                )));
418            }
419        }
420
421        // epoch=0 or checkpoint_id=0 indicates a corrupted or nonsensical
422        // manifest — reject as invalid regardless of other issues.
423        if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
424            issues.push(ValidationIssue::IntegrityFailure(
425                "epoch or checkpoint_id is 0 — likely corrupted".into(),
426            ));
427        }
428
429        let valid = issues.iter().all(|i| !i.is_fatal());
430        Ok(ValidationResult {
431            checkpoint_id: id,
432            valid,
433            issues,
434        })
435    }
436
437    /// Walk backward from latest to find the first valid checkpoint.
438    ///
439    /// Returns a [`RecoveryReport`] describing the walk. If no valid
440    /// checkpoint is found, `chosen_id` is `None` (fresh start).
441    ///
442    /// # Errors
443    ///
444    /// Returns [`CheckpointStoreError`] on I/O failure.
445    async fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
446        let start = std::time::Instant::now();
447        let mut skipped = Vec::new();
448
449        // list_ids returns ascending per the trait contract; we iterate
450        // newest-first so the first valid checkpoint wins.
451        let mut ids = self.list_ids().await?;
452        ids.reverse();
453
454        let examined = ids.len();
455
456        for id in &ids {
457            let result = self.validate_checkpoint(*id).await?;
458            if result.valid {
459                return Ok(RecoveryReport {
460                    chosen_id: Some(*id),
461                    skipped,
462                    examined,
463                    elapsed: start.elapsed(),
464                });
465            }
466            let reason = result
467                .issues
468                .iter()
469                .map(ToString::to_string)
470                .collect::<Vec<_>>()
471                .join("; ");
472            warn!(
473                checkpoint_id = id,
474                reason = %reason,
475                "skipping invalid checkpoint"
476            );
477            skipped.push((*id, reason));
478        }
479
480        Ok(RecoveryReport {
481            chosen_id: None,
482            skipped,
483            examined,
484            elapsed: start.elapsed(),
485        })
486    }
487
488    /// Delete orphaned state files that have no matching manifest.
489    ///
490    /// Returns the number of orphans cleaned up.
491    ///
492    /// # Errors
493    ///
494    /// Returns [`CheckpointStoreError`] on I/O failure.
495    async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
496        // Default: no-op. Overridden by implementations that can detect orphans.
497        Ok(0)
498    }
499
500    /// Atomically saves a checkpoint manifest with optional sidecar state data.
501    ///
502    /// When `state_data` is provided, the sidecar (`state.bin`) is written and
503    /// fsynced **before** the manifest. This ensures that if the sidecar write
504    /// fails, the manifest is never persisted and `latest.txt` still points to
505    /// the previous valid checkpoint.
506    ///
507    /// Orphaned `state.bin` files (written but no manifest) are harmless and
508    /// cleaned up by [`prune()`](Self::prune).
509    ///
510    /// # Errors
511    ///
512    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
513    async fn save_with_state(
514        &self,
515        manifest: &CheckpointManifest,
516        state_data: Option<&[bytes::Bytes]>,
517    ) -> Result<(), CheckpointStoreError> {
518        let mut manifest = manifest.clone();
519        if let Some(chunks) = state_data {
520            // Compute checksum across the chunks before writing. This is
521            // safe because: (1) save_state_data writes to a temp then
522            // renames atomically, so the on-disk bytes match the
523            // in-memory chain exactly; (2) if the sidecar write fails,
524            // save() is never called, so the manifest with the checksum
525            // is never persisted.
526            manifest.state_checksum = Some(stamp_checksum(&manifest.operator_states, Some(chunks)));
527            self.save_state_data(manifest.checkpoint_id, chunks).await?;
528        } else if !manifest.operator_states.is_empty()
529            && manifest.operator_states.values().all(|o| !o.external)
530            && manifest.state_checksum.is_none()
531        {
532            // Inline-only: checksum guards against a torn manifest.json write.
533            manifest.state_checksum = Some(sha256_hex_inline_states(&manifest.operator_states));
534        }
535        self.save(&manifest).await
536    }
537}
538
539/// Stamp `state_checksum` for a save: mixed manifests hash inline+sidecar
540/// together, purely-external hash the sidecar alone.
541fn stamp_checksum(
542    states: &std::collections::HashMap<
543        String,
544        crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
545    >,
546    chunks: Option<&[bytes::Bytes]>,
547) -> String {
548    let chunks = chunks.unwrap_or_default();
549    let any_inline = states.values().any(|o| !o.external);
550    if any_inline {
551        sha256_hex_mixed(states, chunks.iter().map(AsRef::as_ref))
552    } else {
553        sha256_hex_chunks(chunks)
554    }
555}
556
557/// Filesystem-backed checkpoint store.
558///
559/// Writes checkpoint manifests as JSON files with atomic rename semantics.
560/// A `latest.txt` pointer (not a symlink) tracks the most recent checkpoint
561/// for Windows compatibility.
562pub struct FileSystemCheckpointStore {
563    base_dir: PathBuf,
564    max_retained: usize,
565    vnode_count: u16,
566}
567
568impl FileSystemCheckpointStore {
569    /// Creates a new filesystem checkpoint store.
570    ///
571    /// The `base_dir` is the parent directory; checkpoints are stored under
572    /// `{base_dir}/checkpoints/`. The directory is created lazily on first save.
573    ///
574    /// The store's `vnode_count` defaults to
575    /// [`crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT`]. Hosts that run
576    /// with a non-default value should chain [`Self::with_vnode_count`] so
577    /// manifest validation checks the right invariant.
578    #[must_use]
579    pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
580        Self {
581            base_dir: base_dir.into(),
582            max_retained,
583            vnode_count: crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT,
584        }
585    }
586
587    /// Override the `vnode_count` used during manifest validation.
588    #[must_use]
589    pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
590        self.vnode_count = vnode_count;
591        self
592    }
593
594    /// Returns the checkpoints directory path.
595    fn checkpoints_dir(&self) -> PathBuf {
596        self.base_dir.join("checkpoints")
597    }
598
599    /// Returns the directory path for a specific checkpoint.
600    fn checkpoint_dir(&self, id: u64) -> PathBuf {
601        self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
602    }
603
604    /// Returns the manifest file path for a specific checkpoint.
605    fn manifest_path(&self, id: u64) -> PathBuf {
606        self.checkpoint_dir(id).join("manifest.json")
607    }
608
609    /// Returns the state sidecar file path for a specific checkpoint.
610    fn state_path(&self, id: u64) -> PathBuf {
611        self.checkpoint_dir(id).join("state.bin")
612    }
613
614    /// Returns the latest.txt pointer path.
615    fn latest_path(&self) -> PathBuf {
616        self.checkpoints_dir().join("latest.txt")
617    }
618
619    /// Parses a checkpoint ID from a directory name like `checkpoint_000042`.
620    fn parse_checkpoint_id(name: &str) -> Option<u64> {
621        name.strip_prefix("checkpoint_")
622            .and_then(|s| s.parse().ok())
623    }
624
625    /// Collects and sorts all checkpoint directory entries.
626    async fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
627        let dir = self.checkpoints_dir();
628        let mut reader = match tokio::fs::read_dir(&dir).await {
629            Ok(r) => r,
630            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
631            Err(e) => return Err(e.into()),
632        };
633
634        let mut ids: Vec<u64> = Vec::new();
635        while let Some(entry) = reader.next_entry().await? {
636            let ft = entry.file_type().await?;
637            if !ft.is_dir() {
638                continue;
639            }
640            if let Some(id) = entry
641                .file_name()
642                .to_str()
643                .and_then(Self::parse_checkpoint_id)
644            {
645                ids.push(id);
646            }
647        }
648
649        ids.sort_unstable();
650        Ok(ids)
651    }
652}
653
654impl FileSystemCheckpointStore {
655    /// Find checkpoint directories that have state.bin but no manifest.json
656    /// (orphaned from a crash after sidecar write but before manifest commit).
657    async fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
658        let dir = self.checkpoints_dir();
659        let mut reader = match tokio::fs::read_dir(&dir).await {
660            Ok(r) => r,
661            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
662            Err(e) => return Err(e.into()),
663        };
664
665        let mut orphans = Vec::new();
666        while let Some(entry) = reader.next_entry().await? {
667            let ft = entry.file_type().await?;
668            if !ft.is_dir() {
669                continue;
670            }
671            let path = entry.path();
672            let has_state = tokio::fs::metadata(path.join("state.bin")).await.is_ok();
673            let has_manifest = tokio::fs::metadata(path.join("manifest.json"))
674                .await
675                .is_ok();
676            if has_state && !has_manifest {
677                orphans.push(path);
678            }
679        }
680        Ok(orphans)
681    }
682}
683
684#[async_trait]
685impl CheckpointStore for FileSystemCheckpointStore {
686    fn vnode_count(&self) -> u16 {
687        self.vnode_count
688    }
689
690    async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
691        let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
692        tokio::fs::create_dir_all(&cp_dir).await?;
693
694        let manifest_path = self.manifest_path(manifest.checkpoint_id);
695        let json = serde_json::to_string_pretty(manifest)?;
696
697        // Write to a temp file, fsync, then rename for atomic durability.
698        let tmp_path = manifest_path.with_extension("json.tmp");
699        let write_res = async {
700            tokio::fs::write(&tmp_path, &json).await?;
701            sync_file(&tmp_path).await?;
702            tokio::fs::rename(&tmp_path, &manifest_path).await?;
703            sync_dir(&cp_dir).await
704        }
705        .await;
706        if let Err(e) = write_res {
707            // Clean up temp file to avoid orphans on disk-full.
708            let _ = tokio::fs::remove_file(&tmp_path).await;
709            return Err(e.into());
710        }
711
712        // Update latest.txt pointer — only after manifest is durable.
713        let latest = self.latest_path();
714        let latest_dir = latest.parent().unwrap_or(Path::new(".")).to_path_buf();
715        tokio::fs::create_dir_all(&latest_dir).await?;
716        let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
717        let tmp_latest = latest.with_extension("txt.tmp");
718        tokio::fs::write(&tmp_latest, &latest_content).await?;
719        sync_file(&tmp_latest).await?;
720        tokio::fs::rename(&tmp_latest, &latest).await?;
721        sync_dir(&latest_dir).await?;
722
723        // Auto-prune if configured.
724        if self.max_retained > 0 {
725            if let Err(e) = self.prune(self.max_retained).await {
726                tracing::warn!(
727                    max_retained = self.max_retained,
728                    error = %e,
729                    "[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
730                );
731            }
732        }
733
734        Ok(())
735    }
736
737    async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
738        let latest = self.latest_path();
739        let content = match tokio::fs::read_to_string(&latest).await {
740            Ok(c) => c,
741            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
742            Err(e) => return Err(e.into()),
743        };
744        let dir_name = content.trim();
745        if dir_name.is_empty() {
746            return Ok(None);
747        }
748
749        match Self::parse_checkpoint_id(dir_name) {
750            Some(id) => self.load_by_id(id).await,
751            None => Ok(None),
752        }
753    }
754
755    async fn load_by_id(
756        &self,
757        id: u64,
758    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
759        let path = self.manifest_path(id);
760        let json = match tokio::fs::read_to_string(&path).await {
761            Ok(s) => s,
762            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
763            Err(e) => return Err(e.into()),
764        };
765        let manifest: CheckpointManifest = serde_json::from_str(&json)?;
766
767        let errors = manifest.validate(self.vnode_count());
768        if !errors.is_empty() {
769            tracing::warn!(
770                checkpoint_id = id,
771                error_count = errors.len(),
772                first_error = %errors[0],
773                "loaded checkpoint manifest has validation warnings"
774            );
775        }
776
777        Ok(Some(manifest))
778    }
779
780    async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
781        self.sorted_checkpoint_ids().await
782    }
783
784    async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
785        let ids = self.sorted_checkpoint_ids().await?;
786        let mut result = Vec::with_capacity(ids.len());
787
788        for id in ids {
789            // Skip missing/corrupt manifests — list() is best-effort.
790            if let Ok(Some(manifest)) = self.load_by_id(id).await {
791                result.push((manifest.checkpoint_id, manifest.epoch));
792            }
793        }
794
795        Ok(result)
796    }
797
798    async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
799        let ids = self.sorted_checkpoint_ids().await?;
800        if ids.len() <= keep_count {
801            return Ok(0);
802        }
803
804        let to_remove = ids.len() - keep_count;
805        let mut removed = 0;
806
807        for &id in &ids[..to_remove] {
808            let dir = self.checkpoint_dir(id);
809            if tokio::fs::remove_dir_all(&dir).await.is_ok() {
810                removed += 1;
811            }
812        }
813
814        Ok(removed)
815    }
816
817    async fn save_state_data(
818        &self,
819        id: u64,
820        chunks: &[bytes::Bytes],
821    ) -> Result<(), CheckpointStoreError> {
822        use tokio::io::AsyncWriteExt;
823
824        let cp_dir = self.checkpoint_dir(id);
825        tokio::fs::create_dir_all(&cp_dir).await?;
826
827        let path = self.state_path(id);
828        let tmp = path.with_extension("bin.tmp");
829
830        // Write chunks sequentially to the temp file — no concatenation
831        // into a contiguous buffer. Each chunk is already an owned Bytes;
832        // write_all borrows it.
833        let mut file = tokio::fs::File::create(&tmp).await?;
834        for chunk in chunks {
835            file.write_all(chunk).await?;
836        }
837        file.sync_all().await?;
838        drop(file);
839
840        tokio::fs::rename(&tmp, &path).await?;
841        sync_dir(&cp_dir).await?;
842
843        Ok(())
844    }
845
846    async fn save_with_state(
847        &self,
848        manifest: &CheckpointManifest,
849        state_data: Option<&[bytes::Bytes]>,
850    ) -> Result<(), CheckpointStoreError> {
851        let mut manifest = manifest.clone();
852        // Write sidecar FIRST — if this fails, manifest is never written
853        // and latest.txt still points to the previous valid checkpoint.
854        if let Some(chunks) = state_data {
855            manifest.state_checksum = Some(stamp_checksum(&manifest.operator_states, Some(chunks)));
856            self.save_state_data(manifest.checkpoint_id, chunks).await?;
857        } else if !manifest.operator_states.is_empty()
858            && manifest.operator_states.values().all(|o| !o.external)
859            && manifest.state_checksum.is_none()
860        {
861            // Inline-only: checksum guards against a torn manifest.json write.
862            manifest.state_checksum = Some(sha256_hex_inline_states(&manifest.operator_states));
863        }
864        self.save(&manifest).await
865    }
866
867    async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
868        let orphans = self.find_orphan_dirs().await?;
869        let mut cleaned = 0;
870        for dir in &orphans {
871            if tokio::fs::remove_dir_all(dir).await.is_ok() {
872                tracing::info!(
873                    path = %dir.display(),
874                    "cleaned up orphaned checkpoint directory"
875                );
876                cleaned += 1;
877            }
878        }
879        Ok(cleaned)
880    }
881
882    async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
883        let path = self.state_path(id);
884        match tokio::fs::read(&path).await {
885            Ok(data) => Ok(Some(data)),
886            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
887            Err(e) => Err(e.into()),
888        }
889    }
890}
891
892// ---------------------------------------------------------------------------
893// ObjectStoreCheckpointStore — sync wrapper around any ObjectStore backend
894// ---------------------------------------------------------------------------
895
896/// JSON pointer stored in `manifests/latest.json`.
897#[derive(serde::Serialize, serde::Deserialize)]
898struct LatestPointer {
899    checkpoint_id: u64,
900}
901
902/// Object-store-backed checkpoint store.
903///
904/// Drives any `object_store::ObjectStore` backend (S3, GCS, Azure,
905/// local FS) directly over `.await`; no dedicated runtime. The app
906/// runtime's HTTP connection pool is reused.
907///
908/// ## Object Layout
909///
910/// ```text
911/// {prefix}/
912///   manifests/
913///     manifest-000001.json    # Checkpoint manifest (JSON)
914///     manifest-000002.json
915///     latest.json             # {"checkpoint_id": 2}
916///   checkpoints/
917///     state-000001.bin        # Optional sidecar state
918///     state-000002.bin
919/// ```
920///
921/// Manifest writes use [`PutMode::Create`] for split-brain prevention
922/// (conditional PUT).
923pub struct ObjectStoreCheckpointStore {
924    store: Arc<dyn ObjectStore>,
925    prefix: String,
926    max_retained: usize,
927    vnode_count: u16,
928}
929
930impl ObjectStoreCheckpointStore {
931    /// Create a new object-store-backed checkpoint store.
932    ///
933    /// `prefix` is prepended to all object paths (e.g., `"nodes/abc123/"`).
934    /// It should end with `/` or be empty.
935    ///
936    /// The store's `vnode_count` defaults to
937    /// [`crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT`]. Hosts that run
938    /// with a non-default value should chain [`Self::with_vnode_count`].
939    #[must_use]
940    pub fn new(store: Arc<dyn ObjectStore>, prefix: String, max_retained: usize) -> Self {
941        Self {
942            store,
943            prefix,
944            max_retained,
945            vnode_count: crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT,
946        }
947    }
948
949    /// Override the `vnode_count` used during manifest validation.
950    #[must_use]
951    pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
952        self.vnode_count = vnode_count;
953        self
954    }
955
956    fn manifest_path(&self, id: u64) -> object_store::path::Path {
957        object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
958    }
959
960    fn latest_pointer_path(&self) -> object_store::path::Path {
961        object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
962    }
963
964    fn state_path(&self, id: u64) -> object_store::path::Path {
965        object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
966    }
967
968    // ── Helpers ──
969
970    /// Put a payload with bounded retry + jittered backoff for idempotent
971    /// writes (sidecar state, pointer update). Retries on
972    /// `object_store::Error::Generic` — which covers most transient
973    /// 5xx / connection failures across backends — and bubbles every
974    /// other error immediately. Non-idempotent writes (conditional
975    /// creates on the manifest path) MUST NOT use this helper.
976    ///
977    /// `payload` is consumed on the happy path and cloned on retry.
978    /// `PutPayload::clone` is cheap (Arc-bump on each underlying
979    /// `Bytes` chunk), so multi-chunk payloads cost nothing extra to
980    /// retry.
981    async fn put_with_retry(
982        &self,
983        path: &object_store::path::Path,
984        payload: PutPayload,
985        opts: &PutOptions,
986    ) -> Result<(), CheckpointStoreError> {
987        const BACKOFFS_MS: &[u64] = &[100, 500, 2000];
988        let mut attempt = 0usize;
989        loop {
990            let result = self
991                .store
992                .put_opts(path, payload.clone(), opts.clone())
993                .await;
994            match result {
995                Ok(_) => return Ok(()),
996                Err(object_store::Error::Generic { .. }) if attempt < BACKOFFS_MS.len() => {
997                    let delay = std::time::Duration::from_millis(BACKOFFS_MS[attempt]);
998                    tracing::warn!(
999                        path = %path,
1000                        attempt = attempt + 1,
1001                        delay_ms = delay.as_millis(),
1002                        "transient put error, retrying"
1003                    );
1004                    tokio::time::sleep(delay).await;
1005                    attempt += 1;
1006                }
1007                Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
1008            }
1009        }
1010    }
1011
1012    /// GET an object, returning `Ok(None)` for `NotFound`.
1013    async fn get_bytes(
1014        &self,
1015        path: &object_store::path::Path,
1016    ) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
1017        match self.store.get_opts(path, GetOptions::default()).await {
1018            Ok(get_result) => {
1019                let data = get_result.bytes().await?;
1020                Ok(Some(data))
1021            }
1022            Err(object_store::Error::NotFound { .. }) => Ok(None),
1023            Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
1024        }
1025    }
1026
1027    /// Load a manifest from a specific path, returning `Ok(None)` for `NotFound`.
1028    async fn load_manifest_at(
1029        &self,
1030        path: &object_store::path::Path,
1031    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1032        match self.get_bytes(path).await? {
1033            Some(data) => {
1034                let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
1035                Ok(Some(manifest))
1036            }
1037            None => Ok(None),
1038        }
1039    }
1040
1041    /// List checkpoint IDs by scanning `manifests/manifest-NNNNNN.json`.
1042    async fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
1043        use futures::TryStreamExt;
1044
1045        let mut ids = std::collections::BTreeSet::new();
1046
1047        let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
1048        let entries: Vec<_> = self
1049            .store
1050            .list(Some(&manifests_prefix))
1051            .try_collect()
1052            .await?;
1053        for entry in &entries {
1054            if let Some(id) =
1055                parse_checkpoint_id_from_path(entry.location.as_ref(), "manifest-", ".json")
1056            {
1057                ids.insert(id);
1058            }
1059        }
1060
1061        Ok(ids.into_iter().collect())
1062    }
1063}
1064
1065#[async_trait]
1066impl CheckpointStore for ObjectStoreCheckpointStore {
1067    fn vnode_count(&self) -> u16 {
1068        self.vnode_count
1069    }
1070
1071    async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
1072        let json = serde_json::to_string_pretty(manifest)?;
1073        let path = self.manifest_path(manifest.checkpoint_id);
1074        let json_bytes = bytes::Bytes::from(json);
1075
1076        // Conditional PUT — prevents duplicate manifest writes (split-brain safety).
1077        let create_opts = PutOptions {
1078            mode: PutMode::Create,
1079            ..PutOptions::default()
1080        };
1081        let result = self
1082            .store
1083            .put_opts(
1084                &path,
1085                PutPayload::from_bytes(json_bytes.clone()),
1086                create_opts,
1087            )
1088            .await;
1089
1090        match result {
1091            Ok(_) => {}
1092            Err(object_store::Error::AlreadyExists { .. }) => {
1093                tracing::warn!(
1094                    checkpoint_id = manifest.checkpoint_id,
1095                    "[LDB-6010] Manifest already exists — skipping write"
1096                );
1097            }
1098            Err(object_store::Error::NotImplemented { .. }) => {
1099                // Backend doesn't support conditional PUT — fall back to overwrite.
1100                self.store
1101                    .put_opts(
1102                        &path,
1103                        PutPayload::from_bytes(json_bytes),
1104                        PutOptions::default(),
1105                    )
1106                    .await?;
1107            }
1108            Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
1109        }
1110
1111        // Monotonic pointer update. A stale writer must not regress the
1112        // pointer from id N+1 back to id N. Read the current pointer; if
1113        // it already references a newer id, skip the write. Same-writer
1114        // races (not expected — one coordinator per store instance) can
1115        // still race past this check, but cross-leader stomps from a
1116        // delayed ex-leader are caught.
1117        let latest = self.latest_pointer_path();
1118        if let Some(current) = self.get_bytes(&latest).await? {
1119            if let Ok(existing) = serde_json::from_slice::<LatestPointer>(&current) {
1120                if existing.checkpoint_id > manifest.checkpoint_id {
1121                    tracing::warn!(
1122                        current = existing.checkpoint_id,
1123                        ours = manifest.checkpoint_id,
1124                        "[LDB-6010] latest.json already points at a newer checkpoint — \
1125                         skipping pointer update (possible split-brain or delayed writer)"
1126                    );
1127                    return Ok(());
1128                }
1129            }
1130        }
1131        let pointer = serde_json::to_string(&LatestPointer {
1132            checkpoint_id: manifest.checkpoint_id,
1133        })?;
1134        self.put_with_retry(
1135            &latest,
1136            PutPayload::from_bytes(bytes::Bytes::from(pointer)),
1137            &PutOptions::default(),
1138        )
1139        .await?;
1140
1141        // Auto-prune
1142        if self.max_retained > 0 {
1143            if let Err(e) = self.prune(self.max_retained).await {
1144                tracing::warn!(
1145                    max_retained = self.max_retained,
1146                    error = %e,
1147                    "[LDB-6009] Object store checkpoint prune failed"
1148                );
1149            }
1150        }
1151
1152        Ok(())
1153    }
1154
1155    async fn update_manifest(
1156        &self,
1157        manifest: &CheckpointManifest,
1158    ) -> Result<(), CheckpointStoreError> {
1159        let json = serde_json::to_string_pretty(manifest)?;
1160        let path = self.manifest_path(manifest.checkpoint_id);
1161        let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
1162
1163        // Unconditional PUT — overwrites the existing manifest.
1164        self.store
1165            .put_opts(&path, payload, PutOptions::default())
1166            .await?;
1167
1168        Ok(())
1169    }
1170
1171    async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1172        if let Some(data) = self.get_bytes(&self.latest_pointer_path()).await? {
1173            let pointer: LatestPointer = serde_json::from_slice(&data)?;
1174            return self.load_by_id(pointer.checkpoint_id).await;
1175        }
1176
1177        Ok(None)
1178    }
1179
1180    async fn load_by_id(
1181        &self,
1182        id: u64,
1183    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
1184        self.load_manifest_at(&self.manifest_path(id)).await
1185    }
1186
1187    async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
1188        self.list_checkpoint_ids().await
1189    }
1190
1191    async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
1192        let ids = self.list_checkpoint_ids().await?;
1193        let mut result = Vec::with_capacity(ids.len());
1194
1195        for id in ids {
1196            if let Ok(Some(manifest)) = self.load_by_id(id).await {
1197                result.push((manifest.checkpoint_id, manifest.epoch));
1198            }
1199        }
1200
1201        Ok(result)
1202    }
1203
1204    async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
1205        let ids = self.list_checkpoint_ids().await?;
1206        if ids.len() <= keep_count {
1207            return Ok(0);
1208        }
1209
1210        let to_remove = ids.len() - keep_count;
1211        let mut removed = 0;
1212        let mut logged_error = false;
1213
1214        for &id in &ids[..to_remove] {
1215            let manifest = self.manifest_path(id);
1216            let state = self.state_path(id);
1217            let manifest_res = self.store.delete(&manifest).await;
1218            let state_res = self.store.delete(&state).await;
1219
1220            // Count the id as removed only if the manifest is gone
1221            // (state.bin is optional — its absence is fine).
1222            let manifest_ok = matches!(
1223                manifest_res,
1224                Ok(()) | Err(object_store::Error::NotFound { .. })
1225            );
1226            if manifest_ok {
1227                removed += 1;
1228            }
1229
1230            // Surface the first real error so operators can notice.
1231            // Permission errors silently leak old checkpoints forever
1232            // otherwise.
1233            for err in [manifest_res, state_res]
1234                .into_iter()
1235                .filter_map(Result::err)
1236            {
1237                if matches!(err, object_store::Error::NotFound { .. }) {
1238                    continue;
1239                }
1240                if !logged_error {
1241                    tracing::warn!(
1242                        checkpoint_id = id,
1243                        error = %err,
1244                        "[LDB-6027] checkpoint prune: delete failed — \
1245                         retained objects may accumulate"
1246                    );
1247                    logged_error = true;
1248                }
1249            }
1250        }
1251
1252        Ok(removed)
1253    }
1254
1255    async fn save_state_data(
1256        &self,
1257        id: u64,
1258        chunks: &[bytes::Bytes],
1259    ) -> Result<(), CheckpointStoreError> {
1260        let path = self.state_path(id);
1261        // PutPayload is a chain of Bytes — no concatenation into a
1262        // contiguous buffer. Each Arc bump is ~nothing; the underlying
1263        // bytes reach the object-store client untouched.
1264        let payload: PutPayload = chunks.iter().cloned().collect();
1265        // Sidecar writes are idempotent — retry transients.
1266        self.put_with_retry(&path, payload, &PutOptions::default())
1267            .await
1268    }
1269
1270    async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
1271        Ok(self
1272            .get_bytes(&self.state_path(id))
1273            .await?
1274            .map(|d| d.to_vec()))
1275    }
1276
1277    async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
1278        use futures::{StreamExt, TryStreamExt};
1279
1280        // Collect state IDs that have a state.bin but no matching manifest.
1281        let manifest_ids: std::collections::BTreeSet<u64> =
1282            self.list_checkpoint_ids().await?.into_iter().collect();
1283
1284        // List state files: checkpoints/state-NNNNNN.bin
1285        let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
1286        let entries: Vec<_> = self.store.list(Some(&state_prefix)).try_collect().await?;
1287
1288        let mut orphan_paths = Vec::new();
1289        for entry in &entries {
1290            if let Some(id) =
1291                parse_checkpoint_id_from_path(entry.location.as_ref(), "state-", ".bin")
1292            {
1293                if !manifest_ids.contains(&id) {
1294                    orphan_paths.push(entry.location.clone());
1295                }
1296            }
1297        }
1298
1299        let count = orphan_paths.len();
1300        if !orphan_paths.is_empty() {
1301            let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
1302            let mut results = self.store.delete_stream(stream);
1303            while let Some(result) = results.next().await {
1304                if let Err(e) = result {
1305                    tracing::warn!(error = %e, "failed to delete orphan state file");
1306                }
1307            }
1308        }
1309
1310        Ok(count)
1311    }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316    use super::*;
1317    use crate::checkpoint::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
1318    #[allow(clippy::disallowed_types)] // cold path: checkpoint store
1319    use std::collections::HashMap;
1320
1321    fn make_store(dir: &Path) -> FileSystemCheckpointStore {
1322        FileSystemCheckpointStore::new(dir, 3)
1323    }
1324
1325    fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
1326        CheckpointManifest::new(id, epoch)
1327    }
1328
1329    #[tokio::test]
1330    async fn test_save_and_load_latest() {
1331        let dir = tempfile::tempdir().unwrap();
1332        let store = make_store(dir.path());
1333
1334        let m = make_manifest(1, 1);
1335        store.save(&m).await.unwrap();
1336
1337        let loaded = store.load_latest().await.unwrap().unwrap();
1338        assert_eq!(loaded.checkpoint_id, 1);
1339        assert_eq!(loaded.epoch, 1);
1340    }
1341
1342    #[tokio::test]
1343    async fn test_load_latest_returns_none_when_empty() {
1344        let dir = tempfile::tempdir().unwrap();
1345        let store = make_store(dir.path());
1346        assert!(store.load_latest().await.unwrap().is_none());
1347    }
1348
1349    #[tokio::test]
1350    async fn test_load_latest_returns_most_recent() {
1351        let dir = tempfile::tempdir().unwrap();
1352        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1353
1354        for i in 1..=5 {
1355            store.save(&make_manifest(i, i)).await.unwrap();
1356        }
1357
1358        let latest = store.load_latest().await.unwrap().unwrap();
1359        assert_eq!(latest.checkpoint_id, 5);
1360        assert_eq!(latest.epoch, 5);
1361    }
1362
1363    #[tokio::test]
1364    async fn test_load_by_id() {
1365        let dir = tempfile::tempdir().unwrap();
1366        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1367
1368        store.save(&make_manifest(1, 10)).await.unwrap();
1369        store.save(&make_manifest(2, 20)).await.unwrap();
1370
1371        let m = store.load_by_id(1).await.unwrap().unwrap();
1372        assert_eq!(m.epoch, 10);
1373
1374        let m = store.load_by_id(2).await.unwrap().unwrap();
1375        assert_eq!(m.epoch, 20);
1376
1377        assert!(store.load_by_id(99).await.unwrap().is_none());
1378    }
1379
1380    #[tokio::test]
1381    async fn test_list() {
1382        let dir = tempfile::tempdir().unwrap();
1383        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1384
1385        store.save(&make_manifest(1, 10)).await.unwrap();
1386        store.save(&make_manifest(3, 30)).await.unwrap();
1387        store.save(&make_manifest(2, 20)).await.unwrap();
1388
1389        let list = store.list().await.unwrap();
1390        assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1391    }
1392
1393    #[tokio::test]
1394    async fn test_prune_keeps_max() {
1395        let dir = tempfile::tempdir().unwrap();
1396        let store = FileSystemCheckpointStore::new(dir.path(), 10); // no auto-prune
1397
1398        for i in 1..=5 {
1399            store.save(&make_manifest(i, i)).await.unwrap();
1400        }
1401
1402        let removed = store.prune(2).await.unwrap();
1403        assert_eq!(removed, 3);
1404
1405        let list = store.list().await.unwrap();
1406        assert_eq!(list.len(), 2);
1407        assert_eq!(list[0].0, 4);
1408        assert_eq!(list[1].0, 5);
1409    }
1410
1411    #[tokio::test]
1412    async fn test_auto_prune_on_save() {
1413        let dir = tempfile::tempdir().unwrap();
1414        let store = FileSystemCheckpointStore::new(dir.path(), 2);
1415
1416        for i in 1..=5 {
1417            store.save(&make_manifest(i, i)).await.unwrap();
1418        }
1419
1420        let list = store.list().await.unwrap();
1421        assert_eq!(list.len(), 2);
1422        // Should keep the two most recent
1423        assert_eq!(list[0].0, 4);
1424        assert_eq!(list[1].0, 5);
1425    }
1426
1427    #[tokio::test]
1428    async fn test_save_and_load_state_data() {
1429        let dir = tempfile::tempdir().unwrap();
1430        let store = make_store(dir.path());
1431
1432        store.save(&make_manifest(1, 1)).await.unwrap();
1433
1434        let data = b"large operator state binary blob";
1435        store
1436            .save_state_data(1, &[bytes::Bytes::from_static(data)])
1437            .await
1438            .unwrap();
1439
1440        let loaded = store.load_state_data(1).await.unwrap().unwrap();
1441        assert_eq!(loaded, data);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_load_state_data_returns_none() {
1446        let dir = tempfile::tempdir().unwrap();
1447        let store = make_store(dir.path());
1448        assert!(store.load_state_data(99).await.unwrap().is_none());
1449    }
1450
1451    #[tokio::test]
1452    async fn test_full_manifest_round_trip() {
1453        let dir = tempfile::tempdir().unwrap();
1454        let store = make_store(dir.path());
1455
1456        let mut m = make_manifest(1, 5);
1457        m.source_offsets.insert(
1458            "kafka-src".into(),
1459            ConnectorCheckpoint::with_offsets(
1460                5,
1461                HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
1462            ),
1463        );
1464        m.sink_epochs.insert("pg-sink".into(), 4);
1465        m.table_offsets.insert(
1466            "instruments".into(),
1467            ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
1468        );
1469        m.operator_states
1470            .insert("window".into(), OperatorCheckpoint::inline(b"data"));
1471        m.watermark = Some(999_000);
1472
1473        store.save(&m).await.unwrap();
1474
1475        let loaded = store.load_latest().await.unwrap().unwrap();
1476        assert_eq!(loaded.checkpoint_id, 1);
1477        assert_eq!(loaded.epoch, 5);
1478        assert_eq!(loaded.watermark, Some(999_000));
1479
1480        let src = loaded.source_offsets.get("kafka-src").unwrap();
1481        assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
1482
1483        assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
1484
1485        let tbl = loaded.table_offsets.get("instruments").unwrap();
1486        assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
1487
1488        let op = loaded.operator_states.get("window").unwrap();
1489        assert_eq!(op.decode_inline().unwrap(), b"data");
1490    }
1491
1492    #[tokio::test]
1493    async fn test_empty_latest_txt() {
1494        let dir = tempfile::tempdir().unwrap();
1495        let store = make_store(dir.path());
1496
1497        let cp_dir = dir.path().join("checkpoints");
1498        std::fs::create_dir_all(&cp_dir).unwrap();
1499        std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
1500
1501        assert!(store.load_latest().await.unwrap().is_none());
1502    }
1503
1504    #[tokio::test]
1505    async fn test_latest_points_to_missing_checkpoint() {
1506        let dir = tempfile::tempdir().unwrap();
1507        let store = make_store(dir.path());
1508
1509        let cp_dir = dir.path().join("checkpoints");
1510        std::fs::create_dir_all(&cp_dir).unwrap();
1511        std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
1512
1513        assert!(store.load_latest().await.unwrap().is_none());
1514    }
1515
1516    #[tokio::test]
1517    async fn test_prune_no_op_when_under_limit() {
1518        let dir = tempfile::tempdir().unwrap();
1519        let store = make_store(dir.path());
1520
1521        store.save(&make_manifest(1, 1)).await.unwrap();
1522        let removed = store.prune(5).await.unwrap();
1523        assert_eq!(removed, 0);
1524    }
1525
1526    #[tokio::test]
1527    async fn test_save_with_state_writes_sidecar_before_manifest() {
1528        let dir = tempfile::tempdir().unwrap();
1529        let store = make_store(dir.path());
1530
1531        let m = make_manifest(1, 1);
1532        let state = b"large-operator-state-blob";
1533        store
1534            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1535            .await
1536            .unwrap();
1537
1538        // Both manifest and state should be present.
1539        let loaded = store.load_latest().await.unwrap().unwrap();
1540        assert_eq!(loaded.checkpoint_id, 1);
1541
1542        let loaded_state = store.load_state_data(1).await.unwrap().unwrap();
1543        assert_eq!(loaded_state, state);
1544    }
1545
1546    #[tokio::test]
1547    async fn test_save_with_state_none_is_same_as_save() {
1548        let dir = tempfile::tempdir().unwrap();
1549        let store = make_store(dir.path());
1550
1551        let m = make_manifest(1, 1);
1552        store.save_with_state(&m, None).await.unwrap();
1553
1554        let loaded = store.load_latest().await.unwrap().unwrap();
1555        assert_eq!(loaded.checkpoint_id, 1);
1556        assert!(store.load_state_data(1).await.unwrap().is_none());
1557    }
1558
1559    #[tokio::test]
1560    async fn test_orphaned_state_without_manifest_is_ignored() {
1561        let dir = tempfile::tempdir().unwrap();
1562        let store = make_store(dir.path());
1563
1564        // Write only sidecar state, no manifest (simulates crash after
1565        // state write but before manifest write).
1566        store
1567            .save_state_data(1, &[bytes::Bytes::from_static(b"orphaned")])
1568            .await
1569            .unwrap();
1570
1571        // load_latest should return None — the orphan is not visible.
1572        assert!(store.load_latest().await.unwrap().is_none());
1573
1574        // list should not include the orphan (no manifest.json).
1575        assert!(store.list().await.unwrap().is_empty());
1576    }
1577
1578    // -----------------------------------------------------------------------
1579    // ObjectStoreCheckpointStore tests (using InMemory backend)
1580    // -----------------------------------------------------------------------
1581
1582    fn make_obj_store() -> ObjectStoreCheckpointStore {
1583        let store = Arc::new(object_store::memory::InMemory::new());
1584        ObjectStoreCheckpointStore::new(store, String::new(), 3)
1585    }
1586
1587    #[tokio::test]
1588    async fn test_obj_save_and_load_latest() {
1589        let store = make_obj_store();
1590        let m = make_manifest(1, 1);
1591        store.save(&m).await.unwrap();
1592
1593        let loaded = store.load_latest().await.unwrap().unwrap();
1594        assert_eq!(loaded.checkpoint_id, 1);
1595        assert_eq!(loaded.epoch, 1);
1596    }
1597
1598    #[tokio::test]
1599    async fn test_obj_load_latest_returns_none_when_empty() {
1600        let store = make_obj_store();
1601        assert!(store.load_latest().await.unwrap().is_none());
1602    }
1603
1604    #[tokio::test]
1605    async fn test_obj_load_by_id() {
1606        let store = ObjectStoreCheckpointStore::new(
1607            Arc::new(object_store::memory::InMemory::new()),
1608            String::new(),
1609            10,
1610        );
1611
1612        store.save(&make_manifest(1, 10)).await.unwrap();
1613        store.save(&make_manifest(2, 20)).await.unwrap();
1614
1615        let m = store.load_by_id(1).await.unwrap().unwrap();
1616        assert_eq!(m.epoch, 10);
1617        let m = store.load_by_id(2).await.unwrap().unwrap();
1618        assert_eq!(m.epoch, 20);
1619        assert!(store.load_by_id(99).await.unwrap().is_none());
1620    }
1621
1622    #[tokio::test]
1623    async fn test_obj_list() {
1624        let store = ObjectStoreCheckpointStore::new(
1625            Arc::new(object_store::memory::InMemory::new()),
1626            String::new(),
1627            10,
1628        );
1629
1630        store.save(&make_manifest(1, 10)).await.unwrap();
1631        store.save(&make_manifest(3, 30)).await.unwrap();
1632        store.save(&make_manifest(2, 20)).await.unwrap();
1633
1634        let list = store.list().await.unwrap();
1635        assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1636    }
1637
1638    #[tokio::test]
1639    async fn test_obj_prune() {
1640        let store = ObjectStoreCheckpointStore::new(
1641            Arc::new(object_store::memory::InMemory::new()),
1642            String::new(),
1643            10,
1644        );
1645
1646        for i in 1..=5 {
1647            store.save(&make_manifest(i, i)).await.unwrap();
1648        }
1649
1650        let removed = store.prune(2).await.unwrap();
1651        assert_eq!(removed, 3);
1652
1653        let list = store.list().await.unwrap();
1654        assert_eq!(list.len(), 2);
1655        assert_eq!(list[0].0, 4);
1656        assert_eq!(list[1].0, 5);
1657    }
1658
1659    #[tokio::test]
1660    async fn test_obj_auto_prune_on_save() {
1661        let store = ObjectStoreCheckpointStore::new(
1662            Arc::new(object_store::memory::InMemory::new()),
1663            String::new(),
1664            2,
1665        );
1666
1667        for i in 1..=5 {
1668            store.save(&make_manifest(i, i)).await.unwrap();
1669        }
1670
1671        let list = store.list().await.unwrap();
1672        assert_eq!(list.len(), 2);
1673        assert_eq!(list[0].0, 4);
1674        assert_eq!(list[1].0, 5);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_obj_save_and_load_state_data() {
1679        let store = make_obj_store();
1680        store.save(&make_manifest(1, 1)).await.unwrap();
1681
1682        let data = b"large operator state binary blob";
1683        store
1684            .save_state_data(1, &[bytes::Bytes::from_static(data)])
1685            .await
1686            .unwrap();
1687
1688        let loaded = store.load_state_data(1).await.unwrap().unwrap();
1689        assert_eq!(loaded, data);
1690    }
1691
1692    #[tokio::test]
1693    async fn test_obj_load_state_data_returns_none() {
1694        let store = make_obj_store();
1695        assert!(store.load_state_data(99).await.unwrap().is_none());
1696    }
1697
1698    #[tokio::test]
1699    async fn test_obj_with_prefix() {
1700        let inner = Arc::new(object_store::memory::InMemory::new());
1701        let store = ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10);
1702
1703        store.save(&make_manifest(1, 42)).await.unwrap();
1704        let loaded = store.load_latest().await.unwrap().unwrap();
1705        assert_eq!(loaded.checkpoint_id, 1);
1706        assert_eq!(loaded.epoch, 42);
1707    }
1708
1709    // -----------------------------------------------------------------------
1710    // v2 layout verification + backward compatibility tests
1711    // -----------------------------------------------------------------------
1712
1713    #[tokio::test]
1714    async fn test_obj_layout_paths() {
1715        let inner = Arc::new(object_store::memory::InMemory::new());
1716        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1717
1718        store.save(&make_manifest(1, 10)).await.unwrap();
1719
1720        let result = inner
1721            .get_opts(
1722                &object_store::path::Path::from("manifests/manifest-000001.json"),
1723                GetOptions::default(),
1724            )
1725            .await;
1726        assert!(result.is_ok(), "manifest path should exist");
1727
1728        let result = inner
1729            .get_opts(
1730                &object_store::path::Path::from("manifests/latest.json"),
1731                GetOptions::default(),
1732            )
1733            .await;
1734        assert!(result.is_ok(), "latest.json should exist");
1735    }
1736
1737    #[tokio::test]
1738    async fn test_obj_conditional_put_idempotent() {
1739        let store = ObjectStoreCheckpointStore::new(
1740            Arc::new(object_store::memory::InMemory::new()),
1741            String::new(),
1742            10,
1743        );
1744
1745        let m = make_manifest(1, 10);
1746        store.save(&m).await.unwrap();
1747
1748        // Second save with same ID should succeed (logs warning, skips write)
1749        store.save(&m).await.unwrap();
1750
1751        let loaded = store.load_latest().await.unwrap().unwrap();
1752        assert_eq!(loaded.checkpoint_id, 1);
1753        assert_eq!(loaded.epoch, 10);
1754    }
1755
1756    #[tokio::test]
1757    async fn test_obj_update_manifest_overwrites() {
1758        use crate::checkpoint::checkpoint_manifest::SinkCommitStatus;
1759
1760        let store = make_obj_store();
1761
1762        // Step 5: initial save with Pending sink status
1763        let mut m = make_manifest(1, 10);
1764        m.sink_commit_statuses
1765            .insert("pg-sink".into(), SinkCommitStatus::Pending);
1766        store.save(&m).await.unwrap();
1767
1768        // Verify Pending persisted
1769        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1770        assert_eq!(
1771            loaded.sink_commit_statuses.get("pg-sink"),
1772            Some(&SinkCommitStatus::Pending)
1773        );
1774
1775        // Step 6b: update manifest with Committed status
1776        m.sink_commit_statuses
1777            .insert("pg-sink".into(), SinkCommitStatus::Committed);
1778        store.update_manifest(&m).await.unwrap();
1779
1780        // Verify Committed persisted (the bug: save() would skip this)
1781        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1782        assert_eq!(
1783            loaded.sink_commit_statuses.get("pg-sink"),
1784            Some(&SinkCommitStatus::Committed)
1785        );
1786    }
1787
1788    #[tokio::test]
1789    async fn test_obj_save_still_uses_conditional_put() {
1790        let store = make_obj_store();
1791
1792        let m = make_manifest(1, 10);
1793        store.save(&m).await.unwrap();
1794
1795        // Second save() with same ID skips (conditional PUT)
1796        // but does not error
1797        store.save(&m).await.unwrap();
1798
1799        // update_manifest() with same ID overwrites
1800        let mut m2 = make_manifest(1, 10);
1801        m2.watermark = Some(42);
1802        store.update_manifest(&m2).await.unwrap();
1803
1804        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1805        assert_eq!(loaded.watermark, Some(42));
1806    }
1807
1808    #[tokio::test]
1809    async fn test_fs_update_manifest_overwrites() {
1810        use crate::checkpoint::checkpoint_manifest::SinkCommitStatus;
1811
1812        let dir = tempfile::tempdir().unwrap();
1813        let store = make_store(dir.path());
1814
1815        let mut m = make_manifest(1, 10);
1816        m.sink_commit_statuses
1817            .insert("sink-a".into(), SinkCommitStatus::Pending);
1818        store.save(&m).await.unwrap();
1819
1820        m.sink_commit_statuses
1821            .insert("sink-a".into(), SinkCommitStatus::Committed);
1822        store.update_manifest(&m).await.unwrap();
1823
1824        let loaded = store.load_by_id(1).await.unwrap().unwrap();
1825        assert_eq!(
1826            loaded.sink_commit_statuses.get("sink-a"),
1827            Some(&SinkCommitStatus::Committed)
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn test_obj_state_paths() {
1833        let inner = Arc::new(object_store::memory::InMemory::new());
1834        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1835
1836        store.save(&make_manifest(1, 1)).await.unwrap();
1837        store
1838            .save_state_data(1, &[bytes::Bytes::from_static(b"state-blob")])
1839            .await
1840            .unwrap();
1841
1842        let result = inner
1843            .get_opts(
1844                &object_store::path::Path::from("checkpoints/state-000001.bin"),
1845                GetOptions::default(),
1846            )
1847            .await;
1848        assert!(result.is_ok(), "state path should exist");
1849    }
1850
1851    #[tokio::test]
1852    async fn test_obj_latest_json_format() {
1853        let inner = Arc::new(object_store::memory::InMemory::new());
1854        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1855
1856        store.save(&make_manifest(5, 50)).await.unwrap();
1857
1858        let data = inner
1859            .get_opts(
1860                &object_store::path::Path::from("manifests/latest.json"),
1861                GetOptions::default(),
1862            )
1863            .await
1864            .unwrap()
1865            .bytes()
1866            .await
1867            .unwrap();
1868
1869        let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
1870        assert_eq!(pointer.checkpoint_id, 5);
1871    }
1872
1873    #[tokio::test]
1874    async fn test_obj_latest_monotonic_guard_skips_regression() {
1875        let inner = Arc::new(object_store::memory::InMemory::new());
1876        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
1877
1878        store.save(&make_manifest(10, 10)).await.unwrap();
1879        // A delayed writer (e.g., paused ex-leader) tries to write id=5
1880        // after the current leader already advanced to id=10. The pointer
1881        // must not regress.
1882        store.save(&make_manifest(5, 5)).await.unwrap();
1883
1884        let loaded = store.load_latest().await.unwrap().unwrap();
1885        assert_eq!(
1886            loaded.checkpoint_id, 10,
1887            "latest pointer should not regress to an older id"
1888        );
1889    }
1890
1891    #[tokio::test]
1892    async fn test_validate_checkpoint_valid() {
1893        let dir = tempfile::tempdir().unwrap();
1894        let store = make_store(dir.path());
1895
1896        let m = make_manifest(1, 1);
1897        store.save(&m).await.unwrap();
1898
1899        let result = store.validate_checkpoint(1).await.unwrap();
1900        assert!(result.valid, "valid checkpoint: {:?}", result.issues);
1901        assert!(result.issues.is_empty());
1902    }
1903
1904    #[tokio::test]
1905    async fn test_validate_checkpoint_epoch_zero_invalid() {
1906        let dir = tempfile::tempdir().unwrap();
1907        let store = make_store(dir.path());
1908
1909        // Manually save a manifest with epoch=0 (bypassing normal creation)
1910        let m = make_manifest(1, 0);
1911        store.save(&m).await.unwrap();
1912
1913        let result = store.validate_checkpoint(1).await.unwrap();
1914        assert!(!result.valid, "epoch=0 should be invalid");
1915        assert!(
1916            result.issues.iter().any(|i| i.message().contains("epoch")),
1917            "should mention epoch: {:?}",
1918            result.issues
1919        );
1920    }
1921
1922    #[tokio::test]
1923    async fn test_validate_checkpoint_missing_manifest() {
1924        let dir = tempfile::tempdir().unwrap();
1925        let store = make_store(dir.path());
1926
1927        let result = store.validate_checkpoint(99).await.unwrap();
1928        assert!(!result.valid);
1929        assert!(result.issues[0].message().contains("not found"));
1930    }
1931
1932    #[tokio::test]
1933    async fn test_validate_checkpoint_corrupt_manifest() {
1934        let dir = tempfile::tempdir().unwrap();
1935        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1936
1937        // Create a checkpoint dir with corrupt manifest JSON.
1938        let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
1939        std::fs::create_dir_all(&cp_dir).unwrap();
1940        std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
1941
1942        // Corrupt manifest is a validation failure, not an I/O error.
1943        let result = store.validate_checkpoint(1).await.unwrap();
1944        assert!(!result.valid);
1945        assert!(
1946            result.issues[0].message().contains("corrupt manifest"),
1947            "expected corrupt manifest issue: {:?}",
1948            result.issues
1949        );
1950    }
1951
1952    #[tokio::test]
1953    async fn test_validate_checkpoint_state_checksum_ok() {
1954        let dir = tempfile::tempdir().unwrap();
1955        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1956
1957        let state = b"important operator state";
1958        let m = make_manifest(1, 1);
1959        store
1960            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1961            .await
1962            .unwrap();
1963
1964        let result = store.validate_checkpoint(1).await.unwrap();
1965        assert!(result.valid, "checksum should match: {:?}", result.issues);
1966    }
1967
1968    #[tokio::test]
1969    async fn test_validate_checkpoint_state_checksum_mismatch() {
1970        let dir = tempfile::tempdir().unwrap();
1971        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1972
1973        // Save with state to get a checksum.
1974        let state = b"original state";
1975        let m = make_manifest(1, 1);
1976        store
1977            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
1978            .await
1979            .unwrap();
1980
1981        // Now corrupt the state.bin on disk.
1982        let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1983        std::fs::write(&state_path, b"corrupted data!!").unwrap();
1984
1985        let result = store.validate_checkpoint(1).await.unwrap();
1986        assert!(!result.valid, "corrupted state should be invalid");
1987        assert!(
1988            result
1989                .issues
1990                .iter()
1991                .any(|i| i.message().contains("checksum mismatch")),
1992            "should report checksum mismatch: {:?}",
1993            result.issues
1994        );
1995    }
1996
1997    #[tokio::test]
1998    async fn test_validate_checkpoint_state_missing_when_expected() {
1999        let dir = tempfile::tempdir().unwrap();
2000        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2001
2002        // Save with state.
2003        let m = make_manifest(1, 1);
2004        store
2005            .save_with_state(&m, Some(&[bytes::Bytes::from_static(b"state")]))
2006            .await
2007            .unwrap();
2008
2009        // Delete the state.bin file to simulate partial crash.
2010        let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
2011        std::fs::remove_file(&state_path).unwrap();
2012
2013        let result = store.validate_checkpoint(1).await.unwrap();
2014        assert!(!result.valid);
2015        assert!(
2016            result
2017                .issues
2018                .iter()
2019                .any(|i| i.message().contains("not found")),
2020            "should report missing state: {:?}",
2021            result.issues
2022        );
2023    }
2024
2025    #[tokio::test]
2026    async fn test_recover_latest_validated_skips_corrupt() {
2027        let dir = tempfile::tempdir().unwrap();
2028        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2029
2030        // Save two checkpoints.
2031        store.save(&make_manifest(1, 10)).await.unwrap();
2032        store.save(&make_manifest(2, 20)).await.unwrap();
2033
2034        // Corrupt the latest checkpoint's manifest.
2035        let cp2_manifest = dir
2036            .path()
2037            .join("checkpoints/checkpoint_000002/manifest.json");
2038        std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
2039
2040        // Recovery should skip checkpoint 2 and pick checkpoint 1.
2041        let report = store.recover_latest_validated().await.unwrap();
2042        assert_eq!(report.chosen_id, Some(1));
2043        assert_eq!(report.skipped.len(), 1);
2044        assert_eq!(report.skipped[0].0, 2);
2045        assert_eq!(report.examined, 2);
2046    }
2047
2048    #[tokio::test]
2049    async fn test_recover_latest_validated_fresh_start() {
2050        let dir = tempfile::tempdir().unwrap();
2051        let store = make_store(dir.path());
2052
2053        let report = store.recover_latest_validated().await.unwrap();
2054        assert!(report.chosen_id.is_none());
2055        assert_eq!(report.examined, 0);
2056    }
2057
2058    #[tokio::test]
2059    async fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
2060        let dir = tempfile::tempdir().unwrap();
2061        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2062
2063        // Save a checkpoint, then corrupt it.
2064        store.save(&make_manifest(1, 1)).await.unwrap();
2065        let cp_manifest = dir
2066            .path()
2067            .join("checkpoints/checkpoint_000001/manifest.json");
2068        std::fs::write(cp_manifest, "corrupt").unwrap();
2069
2070        // The corrupt manifest will cause load_by_id (via list()) to fail,
2071        // so it may not appear in the list at all. Either way, recovery
2072        // should not select it.
2073        let report = store.recover_latest_validated().await.unwrap();
2074        assert!(report.chosen_id.is_none());
2075    }
2076
2077    #[tokio::test]
2078    async fn test_cleanup_orphans_removes_stateless_dirs() {
2079        let dir = tempfile::tempdir().unwrap();
2080        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2081
2082        // Create an orphan: state.bin exists but no manifest.json.
2083        let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
2084        std::fs::create_dir_all(&orphan_dir).unwrap();
2085        std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
2086
2087        // Normal checkpoint (has manifest).
2088        store.save(&make_manifest(1, 1)).await.unwrap();
2089
2090        let cleaned = store.cleanup_orphans().await.unwrap();
2091        assert_eq!(cleaned, 1);
2092
2093        // Orphan dir should be gone.
2094        assert!(!orphan_dir.exists());
2095        // Normal checkpoint should still be there.
2096        assert!(store.load_by_id(1).await.unwrap().is_some());
2097    }
2098
2099    #[tokio::test]
2100    async fn test_cleanup_orphans_noop_when_clean() {
2101        let dir = tempfile::tempdir().unwrap();
2102        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2103
2104        store.save(&make_manifest(1, 1)).await.unwrap();
2105        let cleaned = store.cleanup_orphans().await.unwrap();
2106        assert_eq!(cleaned, 0);
2107    }
2108
2109    #[tokio::test]
2110    async fn test_save_with_state_writes_checksum() {
2111        let dir = tempfile::tempdir().unwrap();
2112        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2113
2114        let state = b"state-data-for-checksum";
2115        let m = make_manifest(1, 1);
2116        store
2117            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2118            .await
2119            .unwrap();
2120
2121        let loaded = store.load_latest().await.unwrap().unwrap();
2122        assert!(
2123            loaded.state_checksum.is_some(),
2124            "state_checksum should be set"
2125        );
2126        let expected = sha256_hex(state);
2127        assert_eq!(loaded.state_checksum.unwrap(), expected);
2128    }
2129
2130    #[tokio::test]
2131    async fn test_state_checksum_backward_compat() {
2132        // Older manifests without state_checksum should deserialize fine.
2133        let json = r#"{
2134            "version": 1,
2135            "checkpoint_id": 1,
2136            "epoch": 1,
2137            "timestamp_ms": 1000
2138        }"#;
2139        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
2140        assert!(m.state_checksum.is_none());
2141    }
2142
2143    // ObjectStore variants
2144
2145    #[tokio::test]
2146    async fn test_obj_validate_checkpoint_valid() {
2147        let store = make_obj_store();
2148        store.save(&make_manifest(1, 1)).await.unwrap();
2149
2150        let result = store.validate_checkpoint(1).await.unwrap();
2151        assert!(result.valid, "valid checkpoint: {:?}", result.issues);
2152    }
2153
2154    #[tokio::test]
2155    async fn test_obj_validate_checkpoint_missing() {
2156        let store = make_obj_store();
2157        let result = store.validate_checkpoint(99).await.unwrap();
2158        assert!(!result.valid);
2159    }
2160
2161    #[tokio::test]
2162    async fn test_obj_validate_state_checksum() {
2163        let store = ObjectStoreCheckpointStore::new(
2164            Arc::new(object_store::memory::InMemory::new()),
2165            String::new(),
2166            10,
2167        );
2168
2169        let state = b"obj-store-state-data";
2170        let m = make_manifest(1, 1);
2171        store
2172            .save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
2173            .await
2174            .unwrap();
2175
2176        let result = store.validate_checkpoint(1).await.unwrap();
2177        assert!(result.valid, "checksum should match: {:?}", result.issues);
2178    }
2179
2180    #[tokio::test]
2181    async fn test_obj_recover_latest_validated() {
2182        let store = ObjectStoreCheckpointStore::new(
2183            Arc::new(object_store::memory::InMemory::new()),
2184            String::new(),
2185            10,
2186        );
2187
2188        store.save(&make_manifest(1, 10)).await.unwrap();
2189        store.save(&make_manifest(2, 20)).await.unwrap();
2190
2191        let report = store.recover_latest_validated().await.unwrap();
2192        assert_eq!(report.chosen_id, Some(2));
2193        assert!(report.skipped.is_empty());
2194    }
2195
2196    #[tokio::test]
2197    async fn test_obj_cleanup_orphans() {
2198        let inner = Arc::new(object_store::memory::InMemory::new());
2199        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
2200
2201        // Save a checkpoint (creates manifest + state).
2202        let state = b"state-with-manifest";
2203        store
2204            .save_with_state(
2205                &make_manifest(1, 1),
2206                Some(&[bytes::Bytes::from_static(state)]),
2207            )
2208            .await
2209            .unwrap();
2210
2211        // Write an orphan state file (no manifest).
2212        let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
2213        inner
2214            .put_opts(
2215                &orphan_path,
2216                PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
2217                PutOptions::default(),
2218            )
2219            .await
2220            .unwrap();
2221
2222        let cleaned = store.cleanup_orphans().await.unwrap();
2223        assert_eq!(cleaned, 1);
2224
2225        // Verify orphan is gone but real state is intact.
2226        let real_state = store.load_state_data(1).await.unwrap();
2227        assert!(real_state.is_some());
2228    }
2229}