Skip to main content

laminar_storage/
checkpoint_manifest.rs

1//! Checkpoint manifest types.
2//!
3//! Manifests are JSON for debuggability. Large operator state goes into a
4//! separate `state.bin` sidecar referenced by offset/length in the manifest.
5
6#[allow(clippy::disallowed_types)] // cold path: manifest serialization
7use std::collections::HashMap;
8
9/// Per-sink commit status tracked during the checkpoint commit phase.
10///
11/// After the manifest is persisted (Step 5), sinks start as [`Pending`](Self::Pending).
12/// The coordinator updates each sink's status during commit (Step 6) and
13/// saves the manifest again. Recovery uses these statuses to determine
14/// which sinks need rollback.
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
16pub enum SinkCommitStatus {
17    /// Sink has been pre-committed but not yet committed.
18    Pending,
19    /// Sink commit succeeded.
20    Committed,
21    /// Sink commit failed.
22    Failed(String),
23}
24
25/// Default virtual partition count for state key distribution.
26///
27/// Manifests are written with this value unless the caller overrides via
28/// [`CheckpointManifest::new_with_vnode_count`]. `CheckpointStore`
29/// impls pass the runtime value into [`CheckpointManifest::validate`]
30/// so a manifest written with a different count is flagged on restore.
31pub const DEFAULT_VNODE_COUNT: u16 = 256;
32
33/// A point-in-time snapshot of all pipeline state.
34#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
35pub struct CheckpointManifest {
36    /// Manifest format version (for future evolution).
37    pub version: u32,
38    /// Unique, monotonically increasing checkpoint ID.
39    pub checkpoint_id: u64,
40    /// Epoch number for exactly-once coordination.
41    pub epoch: u64,
42    /// Timestamp when checkpoint was created (millis since Unix epoch).
43    pub timestamp_ms: u64,
44
45    // ── Connector State ──
46    /// Per-source connector offsets (key: source name).
47    #[serde(default)]
48    pub source_offsets: HashMap<String, ConnectorCheckpoint>,
49    /// Per-sink last committed epoch (key: sink name).
50    #[serde(default)]
51    pub sink_epochs: HashMap<String, u64>,
52    /// Per-sink commit status (key: sink name).
53    ///
54    /// Populated during the commit phase (Step 6) and saved to the manifest
55    /// afterward. Recovery uses this to decide which sinks need rollback
56    /// (those with [`SinkCommitStatus::Pending`] or [`SinkCommitStatus::Failed`]).
57    #[serde(default)]
58    pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
59    /// Per-table source offsets for reference tables (key: table name).
60    #[serde(default)]
61    pub table_offsets: HashMap<String, ConnectorCheckpoint>,
62
63    // ── Operator State ──
64    /// Per-operator checkpoint data (key: operator/node name).
65    ///
66    /// Small state is inlined as base64. Large state is stored in a separate
67    /// `state.bin` file and this map holds only a reference marker.
68    #[serde(default)]
69    pub operator_states: HashMap<String, OperatorCheckpoint>,
70
71    // ── Storage State ──
72    /// Path to the table store checkpoint, if any.
73    #[serde(default)]
74    pub table_store_checkpoint_path: Option<String>,
75    // ── Time State ──
76    /// Global watermark at checkpoint time.
77    #[serde(default)]
78    pub watermark: Option<i64>,
79    /// Per-source watermarks (key: source name).
80    #[serde(default)]
81    pub source_watermarks: HashMap<String, i64>,
82
83    // ── Topology ──
84    /// Sorted names of all registered sources at checkpoint time.
85    ///
86    /// Used during recovery to detect topology changes (added/removed sources)
87    /// and warn the operator.
88    #[serde(default)]
89    pub source_names: Vec<String>,
90    /// Sorted names of all registered sinks at checkpoint time.
91    #[serde(default)]
92    pub sink_names: Vec<String>,
93
94    // ── Pipeline Identity ──
95    /// Hash of the pipeline configuration at checkpoint time.
96    ///
97    /// Computed from SQL queries, source/sink configuration, and connector
98    /// options. Recovery logs a warning when this changes, indicating
99    /// operator state may be incompatible with the new configuration.
100    #[serde(default)]
101    pub pipeline_hash: Option<u64>,
102
103    // ── Metadata ──
104    /// Virtual partition count for state key distribution.
105    #[serde(default)]
106    pub vnode_count: u16,
107
108    // ── Integrity ──
109    /// SHA-256 hex digest of the sidecar `state.bin` file (if any).
110    ///
111    /// Written during checkpoint commit so that recovery can verify the
112    /// sidecar hasn't been corrupted or truncated on disk/S3.
113    #[serde(default)]
114    pub state_checksum: Option<String>,
115}
116
117/// Errors found during manifest validation.
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ManifestValidationError {
120    /// Human-readable description of the issue.
121    pub message: String,
122}
123
124impl std::fmt::Display for ManifestValidationError {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        write!(f, "{}", self.message)
127    }
128}
129
130impl CheckpointManifest {
131    /// Validates manifest consistency before recovery.
132    ///
133    /// `expected_vnode_count` is the runtime's configured vnode count;
134    /// a manifest written with a different count can't be safely restored
135    /// because state keys won't map to the same shards. Pass
136    /// [`DEFAULT_VNODE_COUNT`] if the runtime hasn't overridden it.
137    ///
138    /// Returns a list of issues found. An empty list means the manifest is valid.
139    /// Callers should treat non-empty results as warnings (recovery may still
140    /// proceed) or errors depending on severity.
141    #[must_use]
142    pub fn validate(&self, expected_vnode_count: u16) -> Vec<ManifestValidationError> {
143        let mut errors = Vec::new();
144
145        if self.version == 0 {
146            errors.push(ManifestValidationError {
147                message: "manifest version is 0".into(),
148            });
149        }
150
151        if self.checkpoint_id == 0 {
152            errors.push(ManifestValidationError {
153                message: "checkpoint_id is 0".into(),
154            });
155        }
156
157        if self.epoch == 0 {
158            errors.push(ManifestValidationError {
159                message: "epoch is 0".into(),
160            });
161        }
162
163        if self.timestamp_ms == 0 {
164            errors.push(ManifestValidationError {
165                message: "timestamp_ms is 0 (missing creation time)".into(),
166            });
167        }
168
169        // Sink epochs should match sink commit statuses
170        for sink_name in self.sink_epochs.keys() {
171            if !self.sink_commit_statuses.is_empty()
172                && !self.sink_commit_statuses.contains_key(sink_name)
173            {
174                errors.push(ManifestValidationError {
175                    message: format!("sink '{sink_name}' has epoch but no commit status"),
176                });
177            }
178        }
179
180        // Source offsets should reference known sources (if topology is recorded)
181        if !self.source_names.is_empty() {
182            for name in self.source_offsets.keys() {
183                if !self.source_names.contains(name) {
184                    errors.push(ManifestValidationError {
185                        message: format!("source_offsets contains '{name}' not in source_names"),
186                    });
187                }
188            }
189        }
190
191        if self.vnode_count == 0 {
192            errors.push(ManifestValidationError {
193                message: "vnode_count is 0 (missing or legacy checkpoint)".into(),
194            });
195        } else if self.vnode_count != expected_vnode_count {
196            errors.push(ManifestValidationError {
197                message: format!(
198                    "vnode_count mismatch: checkpoint has {}, runtime expects {expected_vnode_count}",
199                    self.vnode_count,
200                ),
201            });
202        }
203
204        errors
205    }
206
207    /// Creates a new manifest with the given ID and epoch, using the
208    /// default vnode count. Use [`Self::new_with_vnode_count`] when a
209    /// pipeline runs with a non-default vnode count.
210    #[must_use]
211    pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
212        Self::new_with_vnode_count(checkpoint_id, epoch, DEFAULT_VNODE_COUNT)
213    }
214
215    /// Creates a new manifest with an explicit vnode count.
216    #[must_use]
217    pub fn new_with_vnode_count(checkpoint_id: u64, epoch: u64, vnode_count: u16) -> Self {
218        #[allow(clippy::cast_possible_truncation)] // u64 millis won't overflow until year 584M
219        let timestamp_ms = std::time::SystemTime::now()
220            .duration_since(std::time::UNIX_EPOCH)
221            .unwrap_or_default()
222            .as_millis() as u64;
223
224        Self {
225            version: 1,
226            checkpoint_id,
227            epoch,
228            timestamp_ms,
229            source_offsets: HashMap::new(),
230            sink_epochs: HashMap::new(),
231            sink_commit_statuses: HashMap::new(),
232            table_offsets: HashMap::new(),
233            operator_states: HashMap::new(),
234            table_store_checkpoint_path: None,
235            watermark: None,
236            source_watermarks: HashMap::new(),
237            source_names: Vec::new(),
238            sink_names: Vec::new(),
239            pipeline_hash: None,
240            vnode_count,
241            state_checksum: None,
242        }
243    }
244}
245
246/// Connector-agnostic offset container.
247///
248/// Uses string key-value pairs to support all connector types:
249/// - **Kafka**: `{"partition-0": "1234", "partition-1": "5678"}`
250/// - **`PostgreSQL` CDC**: `{"lsn": "0/1234ABCD"}`
251/// - **`MySQL` CDC**: `{"gtid_set": "uuid:1-5", "binlog_file": "mysql-bin.000003"}`
252/// - **Delta Lake**: `{"version": "42"}`
253#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
254pub struct ConnectorCheckpoint {
255    /// Connector-specific offset data.
256    pub offsets: HashMap<String, String>,
257    /// Epoch this checkpoint belongs to.
258    pub epoch: u64,
259    /// Optional metadata (connector type, topic name, etc.).
260    #[serde(default)]
261    pub metadata: HashMap<String, String>,
262}
263
264impl ConnectorCheckpoint {
265    /// Creates a new connector checkpoint with the given epoch.
266    #[must_use]
267    pub fn new(epoch: u64) -> Self {
268        Self {
269            offsets: HashMap::new(),
270            epoch,
271            metadata: HashMap::new(),
272        }
273    }
274
275    /// Creates a connector checkpoint with pre-populated offsets.
276    #[must_use]
277    pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
278        Self {
279            offsets,
280            epoch,
281            metadata: HashMap::new(),
282        }
283    }
284}
285
286/// Serialized operator state stored in the manifest.
287#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
288pub struct OperatorCheckpoint {
289    /// Base64-encoded binary state (for small payloads inlined in JSON).
290    #[serde(default)]
291    pub state_b64: Option<String>,
292    /// If true, state is stored externally in the state.bin sidecar file.
293    #[serde(default)]
294    pub external: bool,
295    /// Byte offset into the state.bin file (if external).
296    #[serde(default)]
297    pub external_offset: u64,
298    /// Byte length of the state in the state.bin file (if external).
299    #[serde(default)]
300    pub external_length: u64,
301}
302
303impl OperatorCheckpoint {
304    /// Creates an inline operator checkpoint from raw bytes.
305    ///
306    /// The bytes are base64-encoded for JSON storage.
307    #[must_use]
308    pub fn inline(data: &[u8]) -> Self {
309        use base64::Engine;
310        Self {
311            state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
312            external: false,
313            external_offset: 0,
314            external_length: 0,
315        }
316    }
317
318    /// Creates an external reference to state in the sidecar file.
319    #[must_use]
320    pub fn external(offset: u64, length: u64) -> Self {
321        Self {
322            state_b64: None,
323            external: true,
324            external_offset: offset,
325            external_length: length,
326        }
327    }
328
329    /// Decodes the inline state, returning the raw bytes.
330    ///
331    /// Returns `None` if the state is external, no inline data is present,
332    /// or if the base64 data is corrupted (logs a warning in that case).
333    #[must_use]
334    pub fn decode_inline(&self) -> Option<Vec<u8>> {
335        use base64::Engine;
336        self.state_b64.as_ref().and_then(|b64| {
337            match base64::engine::general_purpose::STANDARD.decode(b64) {
338                Ok(data) => Some(data),
339                Err(e) => {
340                    tracing::warn!(
341                        error = %e,
342                        b64_len = b64.len(),
343                        "[LDB-4004] Failed to decode inline operator state from base64 — \
344                         operator will start from scratch"
345                    );
346                    None
347                }
348            }
349        })
350    }
351
352    /// Decodes the inline state, returning a `Result` for callers that need
353    /// to distinguish between "no inline state" and "corrupted state".
354    ///
355    /// Returns `Ok(None)` if no inline data is present (external or absent).
356    /// Returns `Ok(Some(bytes))` on successful decode.
357    ///
358    /// # Errors
359    ///
360    /// Returns `Err` if base64 data is present but corrupted.
361    pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
362        use base64::Engine;
363        match &self.state_b64 {
364            None => Ok(None),
365            Some(b64) => base64::engine::general_purpose::STANDARD
366                .decode(b64)
367                .map(Some)
368                .map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
369        }
370    }
371
372    /// Creates an `OperatorCheckpoint` from raw bytes using a size threshold.
373    ///
374    /// If `data.len() <= threshold`, the state is inlined as base64.
375    /// If `data.len() > threshold`, the state is marked as external with the
376    /// given offset and length, and the raw data is returned for sidecar storage.
377    ///
378    /// # Arguments
379    ///
380    /// * `data` — Raw operator state bytes
381    /// * `threshold` — Maximum size in bytes for inline storage
382    /// * `current_offset` — Byte offset into the sidecar file for this blob
383    ///
384    /// # Returns
385    ///
386    /// A tuple of the checkpoint entry and optional raw data for the sidecar.
387    #[must_use]
388    #[allow(clippy::cast_possible_truncation)]
389    pub fn from_bytes(
390        data: &[u8],
391        threshold: usize,
392        current_offset: u64,
393    ) -> (Self, Option<Vec<u8>>) {
394        if data.len() <= threshold {
395            (Self::inline(data), None)
396        } else {
397            let length = data.len() as u64;
398            (Self::external(current_offset, length), Some(data.to_vec()))
399        }
400    }
401
402    /// Shared-buffer variant of [`Self::from_bytes`].
403    ///
404    /// Takes an owned [`bytes::Bytes`] and returns the same type on the
405    /// external path, avoiding the `data.to_vec()` copy the `&[u8]`
406    /// version has to make. The checkpoint pipeline passes rkyv output
407    /// through as `Bytes`, so per-operator state no longer doubles in
408    /// memory when crossing this boundary.
409    #[must_use]
410    #[allow(clippy::cast_possible_truncation)]
411    pub fn from_bytes_shared(
412        data: bytes::Bytes,
413        threshold: usize,
414        current_offset: u64,
415    ) -> (Self, Option<bytes::Bytes>) {
416        if data.len() <= threshold {
417            (Self::inline(&data), None)
418        } else {
419            let length = data.len() as u64;
420            (Self::external(current_offset, length), Some(data))
421        }
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428
429    #[test]
430    fn test_manifest_new() {
431        let m = CheckpointManifest::new(1, 5);
432        assert_eq!(m.version, 1);
433        assert_eq!(m.checkpoint_id, 1);
434        assert_eq!(m.epoch, 5);
435        assert!(m.timestamp_ms > 0);
436        assert!(m.source_offsets.is_empty());
437        assert!(m.sink_epochs.is_empty());
438        assert!(m.operator_states.is_empty());
439    }
440
441    #[test]
442    fn test_manifest_json_round_trip() {
443        let mut m = CheckpointManifest::new(42, 10);
444        m.source_offsets.insert(
445            "kafka-src".into(),
446            ConnectorCheckpoint::with_offsets(
447                10,
448                HashMap::from([
449                    ("partition-0".into(), "1234".into()),
450                    ("partition-1".into(), "5678".into()),
451                ]),
452            ),
453        );
454        m.sink_epochs.insert("pg-sink".into(), 9);
455        m.watermark = Some(999_000);
456        m.operator_states
457            .insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
458
459        let json = serde_json::to_string_pretty(&m).unwrap();
460        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
461
462        assert_eq!(restored.checkpoint_id, 42);
463        assert_eq!(restored.epoch, 10);
464        assert_eq!(restored.watermark, Some(999_000));
465        let src = restored.source_offsets.get("kafka-src").unwrap();
466        assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
467        assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
468
469        let op = restored.operator_states.get("window-agg").unwrap();
470        assert_eq!(op.decode_inline().unwrap(), b"hello");
471    }
472
473    #[test]
474    fn test_manifest_backward_compat_missing_fields() {
475        // Simulate an older manifest with only mandatory fields
476        let json = r#"{
477            "version": 1,
478            "checkpoint_id": 1,
479            "epoch": 1,
480            "timestamp_ms": 1000
481        }"#;
482
483        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
484        assert_eq!(m.version, 1);
485        assert!(m.source_offsets.is_empty());
486        assert!(m.sink_epochs.is_empty());
487        assert!(m.operator_states.is_empty());
488        assert!(m.watermark.is_none());
489    }
490
491    #[test]
492    fn test_connector_checkpoint_new() {
493        let cp = ConnectorCheckpoint::new(5);
494        assert_eq!(cp.epoch, 5);
495        assert!(cp.offsets.is_empty());
496        assert!(cp.metadata.is_empty());
497    }
498
499    #[test]
500    fn test_connector_checkpoint_with_offsets() {
501        let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
502        let cp = ConnectorCheckpoint::with_offsets(3, offsets);
503        assert_eq!(cp.epoch, 3);
504        assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
505    }
506
507    #[test]
508    fn test_operator_checkpoint_inline() {
509        let op = OperatorCheckpoint::inline(b"state-data");
510        assert!(!op.external);
511        assert!(op.state_b64.is_some());
512        assert_eq!(op.decode_inline().unwrap(), b"state-data");
513    }
514
515    #[test]
516    fn test_operator_checkpoint_external() {
517        let op = OperatorCheckpoint::external(1024, 256);
518        assert!(op.external);
519        assert_eq!(op.external_offset, 1024);
520        assert_eq!(op.external_length, 256);
521        assert!(op.decode_inline().is_none());
522    }
523
524    #[test]
525    fn test_operator_checkpoint_empty_inline() {
526        let op = OperatorCheckpoint::inline(b"");
527        assert_eq!(op.decode_inline().unwrap(), b"");
528    }
529
530    #[test]
531    fn test_manifest_table_offsets() {
532        let mut m = CheckpointManifest::new(1, 1);
533        m.table_offsets.insert(
534            "instruments".into(),
535            ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
536        );
537        m.table_store_checkpoint_path = Some("/tmp/rocksdb_cp".into());
538
539        let json = serde_json::to_string(&m).unwrap();
540        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
541
542        assert_eq!(restored.table_offsets.len(), 1);
543        assert_eq!(
544            restored.table_store_checkpoint_path.as_deref(),
545            Some("/tmp/rocksdb_cp")
546        );
547    }
548
549    #[test]
550    fn test_manifest_topology_fields_round_trip() {
551        let mut m = CheckpointManifest::new(1, 1);
552        m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
553        m.sink_names = vec!["pg-sink".into()];
554
555        let json = serde_json::to_string(&m).unwrap();
556        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
557
558        assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
559        assert_eq!(restored.sink_names, vec!["pg-sink"]);
560    }
561
562    #[test]
563    fn test_manifest_topology_backward_compat() {
564        // Older manifests without topology fields should deserialize fine.
565        let json = r#"{
566            "version": 1,
567            "checkpoint_id": 5,
568            "epoch": 3,
569            "timestamp_ms": 1000
570        }"#;
571        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
572        assert!(m.source_names.is_empty());
573        assert!(m.sink_names.is_empty());
574    }
575
576    #[test]
577    fn test_validate_orphaned_source_offset() {
578        let mut m = CheckpointManifest::new(1, 1);
579        m.source_names = vec!["a".into(), "b".into()];
580        m.source_offsets
581            .insert("c".into(), ConnectorCheckpoint::new(1));
582
583        let errors = m.validate(DEFAULT_VNODE_COUNT);
584        assert!(
585            errors
586                .iter()
587                .any(|e| e.message.contains("'c' not in source_names")),
588            "expected orphaned source offset error: {errors:?}"
589        );
590    }
591
592    #[test]
593    fn test_manifest_pipeline_hash_round_trip() {
594        let mut m = CheckpointManifest::new(1, 1);
595        m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
596
597        let json = serde_json::to_string(&m).unwrap();
598        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
599
600        assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
601    }
602
603    #[test]
604    fn test_from_bytes_inline() {
605        let data = b"small-state";
606        let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
607        assert!(!op.external);
608        assert!(sidecar.is_none());
609        assert_eq!(op.decode_inline().unwrap(), data);
610    }
611
612    #[test]
613    fn test_from_bytes_external() {
614        let data = vec![0xAB; 2048];
615        let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
616        assert!(op.external);
617        assert_eq!(op.external_offset, 512);
618        assert_eq!(op.external_length, 2048);
619        assert!(op.decode_inline().is_none());
620        assert_eq!(sidecar.unwrap(), data);
621    }
622
623    #[test]
624    fn test_from_bytes_at_threshold_boundary() {
625        // Exactly at threshold → inline
626        let data = vec![0xFF; 100];
627        let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
628        assert!(!op.external);
629        assert!(sidecar.is_none());
630        assert_eq!(op.decode_inline().unwrap(), data);
631
632        // One byte over threshold → external
633        let data_over = vec![0xFF; 101];
634        let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
635        assert!(op2.external);
636        assert!(sidecar2.is_some());
637    }
638
639    #[test]
640    fn test_from_bytes_empty_data() {
641        let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
642        assert!(!op.external);
643        assert!(sidecar.is_none());
644        assert_eq!(op.decode_inline().unwrap(), b"");
645    }
646
647    #[test]
648    fn test_manifest_pipeline_hash_backward_compat() {
649        let json = r#"{
650            "version": 1,
651            "checkpoint_id": 1,
652            "epoch": 1,
653            "timestamp_ms": 1000
654        }"#;
655        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
656        assert!(m.pipeline_hash.is_none());
657    }
658}