laminar_storage/checkpoint/mod.rs
1//! Checkpoint infrastructure for state persistence and recovery.
2//!
3//! - `layout`: object-store checkpoint layout
4//! - `checkpointer`: Async checkpoint persistence via object stores
5//! - `source_offsets`: Typed source position tracking
6
7/// Async checkpoint persistence via object stores.
8pub mod checkpointer;
9/// Object-store checkpoint layout with UUID v7 identifiers.
10pub mod layout;
11/// Distributed recovery manager.
12pub mod recovery;
13/// Typed source position tracking for checkpoint recovery.
14pub mod source_offsets;
15
16#[allow(clippy::disallowed_types)] // cold path: checkpoint recovery
17use std::collections::HashMap;
18use std::path::PathBuf;
19
20use anyhow::{Context, Result};
21
22use crate::wal::WalPosition;
23
24/// Checkpoint metadata stored alongside checkpoint data.
25#[derive(Debug)]
26pub struct CheckpointMetadata {
27 /// Unique checkpoint ID (monotonically increasing).
28 pub id: u64,
29
30 /// Unix timestamp when checkpoint was created.
31 pub timestamp: u64,
32
33 /// WAL position at time of checkpoint.
34 pub wal_position: WalPosition,
35
36 /// Source offsets for exactly-once semantics.
37 pub source_offsets: HashMap<String, u64>,
38
39 /// Size of the state snapshot in bytes.
40 pub state_size: u64,
41
42 /// Current watermark at checkpoint time (for recovery).
43 pub watermark: Option<i64>,
44}
45
46/// A completed checkpoint on disk.
47#[derive(Debug)]
48pub struct Checkpoint {
49 /// Checkpoint metadata.
50 pub metadata: CheckpointMetadata,
51
52 /// Path to checkpoint directory.
53 pub path: PathBuf,
54}
55
56impl Checkpoint {
57 /// Path to the metadata file.
58 #[must_use]
59 pub fn metadata_path(&self) -> PathBuf {
60 self.path.join("metadata.rkyv")
61 }
62
63 /// Path to the state snapshot file.
64 #[must_use]
65 pub fn state_path(&self) -> PathBuf {
66 self.path.join("state.rkyv")
67 }
68
69 /// Path to the source offsets file.
70 #[must_use]
71 pub fn offsets_path(&self) -> PathBuf {
72 self.path.join("offsets.json")
73 }
74
75 /// Load the state snapshot from disk.
76 ///
77 /// # Errors
78 ///
79 /// Returns an error if the state file cannot be read.
80 pub fn load_state(&self) -> Result<Vec<u8>> {
81 std::fs::read(self.state_path()).context("Failed to read state snapshot")
82 }
83
84 /// Load source offsets from disk.
85 ///
86 /// # Errors
87 ///
88 /// Returns an error if the offsets file cannot be read or parsed.
89 pub fn load_offsets(&self) -> Result<HashMap<String, u64>> {
90 let path = self.offsets_path();
91 if path.exists() {
92 let data = std::fs::read_to_string(&path).context("Failed to read source offsets")?;
93 serde_json::from_str(&data).context("Failed to parse source offsets")
94 } else {
95 Ok(HashMap::new())
96 }
97 }
98}