Skip to main content

laminar_core/checkpoint/
checkpoint_manifest.rs

1//! Checkpoint manifest types.
2//!
3//! Manifests are JSON for debuggability. Large operator state goes into a
4//! separate `state.bin` sidecar referenced by offset/length in the manifest.
5
6#![allow(clippy::disallowed_types)] // cold path: manifest serialization
7
8use std::collections::HashMap;
9
10/// Per-sink commit status tracked during the checkpoint commit phase.
11///
12/// After the manifest is persisted (Step 5), sinks start as [`Pending`](Self::Pending).
13/// The coordinator updates each sink's status during commit (Step 6) and
14/// saves the manifest again. Recovery uses these statuses to determine
15/// which sinks need rollback.
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
17pub enum SinkCommitStatus {
18    /// Sink has been pre-committed but not yet committed.
19    Pending,
20    /// Sink commit succeeded.
21    Committed,
22    /// Sink commit failed.
23    Failed(String),
24}
25
26/// Default virtual partition count for state key distribution.
27///
28/// Manifests are written with this value unless the caller overrides via
29/// [`CheckpointManifest::new_with_vnode_count`]. `CheckpointStore`
30/// impls pass the runtime value into [`CheckpointManifest::validate`]
31/// so a manifest written with a different count is flagged on restore.
32pub const DEFAULT_VNODE_COUNT: u16 = 256;
33
34/// A point-in-time snapshot of all pipeline state.
35#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
36pub struct CheckpointManifest {
37    /// Manifest format version (for future evolution).
38    pub version: u32,
39    /// Unique, monotonically increasing checkpoint ID.
40    pub checkpoint_id: u64,
41    /// Epoch number for exactly-once coordination.
42    pub epoch: u64,
43    /// Timestamp when checkpoint was created (millis since Unix epoch).
44    pub timestamp_ms: u64,
45
46    // ── Connector State ──
47    /// Per-source connector offsets (key: source name).
48    #[serde(default)]
49    pub source_offsets: HashMap<String, ConnectorCheckpoint>,
50    /// Per-sink last committed epoch (key: sink name).
51    #[serde(default)]
52    pub sink_epochs: HashMap<String, u64>,
53    /// Per-sink commit status (key: sink name).
54    ///
55    /// Populated during the commit phase (Step 6) and saved to the manifest
56    /// afterward. Recovery uses this to decide which sinks need rollback
57    /// (those with [`SinkCommitStatus::Pending`] or [`SinkCommitStatus::Failed`]).
58    #[serde(default)]
59    pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
60    /// Per-table source offsets for reference tables (key: table name).
61    #[serde(default)]
62    pub table_offsets: HashMap<String, ConnectorCheckpoint>,
63
64    // ── Operator State ──
65    /// Per-operator checkpoint data (key: operator/node name).
66    ///
67    /// Small state is inlined as base64. Large state is stored in a separate
68    /// `state.bin` file and this map holds only a reference marker.
69    #[serde(default)]
70    pub operator_states: HashMap<String, OperatorCheckpoint>,
71
72    // ── Storage State ──
73    /// Path to the table store checkpoint, if any.
74    #[serde(default)]
75    pub table_store_checkpoint_path: Option<String>,
76    // ── Time State ──
77    /// Global watermark at checkpoint time.
78    #[serde(default)]
79    pub watermark: Option<i64>,
80    /// Per-source watermarks (key: source name).
81    #[serde(default)]
82    pub source_watermarks: HashMap<String, i64>,
83
84    // ── Topology ──
85    /// Sorted names of all registered sources at checkpoint time.
86    ///
87    /// Used during recovery to detect topology changes (added/removed sources)
88    /// and warn the operator.
89    #[serde(default)]
90    pub source_names: Vec<String>,
91    /// Sorted names of all registered sinks at checkpoint time.
92    #[serde(default)]
93    pub sink_names: Vec<String>,
94
95    // ── Pipeline Identity ──
96    /// Hash of the pipeline configuration at checkpoint time.
97    ///
98    /// Computed from SQL queries, source/sink configuration, and connector
99    /// options. Recovery logs a warning when this changes, indicating
100    /// operator state may be incompatible with the new configuration.
101    #[serde(default)]
102    pub pipeline_hash: Option<u64>,
103
104    // ── Metadata ──
105    /// Virtual partition count for state key distribution.
106    #[serde(default)]
107    pub vnode_count: u16,
108
109    // ── Integrity ──
110    /// SHA-256 hex digest of the sidecar `state.bin` file (if any).
111    ///
112    /// Written during checkpoint commit so that recovery can verify the
113    /// sidecar hasn't been corrupted or truncated on disk/S3.
114    #[serde(default)]
115    pub state_checksum: Option<String>,
116}
117
118/// Errors found during manifest validation.
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct ManifestValidationError {
121    /// Human-readable description of the issue.
122    pub message: String,
123}
124
125impl std::fmt::Display for ManifestValidationError {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(f, "{}", self.message)
128    }
129}
130
131impl CheckpointManifest {
132    /// Validates manifest consistency before recovery.
133    ///
134    /// `expected_vnode_count` is the runtime's configured vnode count;
135    /// a manifest written with a different count can't be safely restored
136    /// because state keys won't map to the same shards. Pass
137    /// [`DEFAULT_VNODE_COUNT`] if the runtime hasn't overridden it.
138    ///
139    /// Returns a list of issues found. An empty list means the manifest is valid.
140    /// Callers should treat non-empty results as warnings (recovery may still
141    /// proceed) or errors depending on severity.
142    #[must_use]
143    pub fn validate(&self, expected_vnode_count: u16) -> Vec<ManifestValidationError> {
144        let mut errors = Vec::new();
145
146        if self.version == 0 {
147            errors.push(ManifestValidationError {
148                message: "manifest version is 0".into(),
149            });
150        }
151
152        if self.checkpoint_id == 0 {
153            errors.push(ManifestValidationError {
154                message: "checkpoint_id is 0".into(),
155            });
156        }
157
158        if self.epoch == 0 {
159            errors.push(ManifestValidationError {
160                message: "epoch is 0".into(),
161            });
162        }
163
164        if self.timestamp_ms == 0 {
165            errors.push(ManifestValidationError {
166                message: "timestamp_ms is 0 (missing creation time)".into(),
167            });
168        }
169
170        // Sink epochs should match sink commit statuses
171        for sink_name in self.sink_epochs.keys() {
172            if !self.sink_commit_statuses.is_empty()
173                && !self.sink_commit_statuses.contains_key(sink_name)
174            {
175                errors.push(ManifestValidationError {
176                    message: format!("sink '{sink_name}' has epoch but no commit status"),
177                });
178            }
179        }
180
181        // Source offsets should reference known sources (if topology is recorded)
182        if !self.source_names.is_empty() {
183            for name in self.source_offsets.keys() {
184                if !self.source_names.contains(name) {
185                    errors.push(ManifestValidationError {
186                        message: format!("source_offsets contains '{name}' not in source_names"),
187                    });
188                }
189            }
190        }
191
192        if self.vnode_count == 0 {
193            errors.push(ManifestValidationError {
194                message: "vnode_count is 0 (missing or legacy checkpoint)".into(),
195            });
196        } else if self.vnode_count != expected_vnode_count {
197            errors.push(ManifestValidationError {
198                message: format!(
199                    "vnode_count mismatch: checkpoint has {}, runtime expects {expected_vnode_count}",
200                    self.vnode_count,
201                ),
202            });
203        }
204
205        errors
206    }
207
208    /// Creates a new manifest with the given ID and epoch, using the
209    /// default vnode count. Use [`Self::new_with_vnode_count`] when a
210    /// pipeline runs with a non-default vnode count.
211    #[must_use]
212    pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
213        Self::new_with_vnode_count(checkpoint_id, epoch, DEFAULT_VNODE_COUNT)
214    }
215
216    /// Creates a new manifest with an explicit vnode count.
217    #[must_use]
218    pub fn new_with_vnode_count(checkpoint_id: u64, epoch: u64, vnode_count: u16) -> Self {
219        #[allow(clippy::cast_possible_truncation)] // u64 millis won't overflow until year 584M
220        let timestamp_ms = std::time::SystemTime::now()
221            .duration_since(std::time::UNIX_EPOCH)
222            .unwrap_or_default()
223            .as_millis() as u64;
224
225        Self {
226            version: 1,
227            checkpoint_id,
228            epoch,
229            timestamp_ms,
230            source_offsets: HashMap::new(),
231            sink_epochs: HashMap::new(),
232            sink_commit_statuses: HashMap::new(),
233            table_offsets: HashMap::new(),
234            operator_states: HashMap::new(),
235            table_store_checkpoint_path: None,
236            watermark: None,
237            source_watermarks: HashMap::new(),
238            source_names: Vec::new(),
239            sink_names: Vec::new(),
240            pipeline_hash: None,
241            vnode_count,
242            state_checksum: None,
243        }
244    }
245}
246
247/// Connector-agnostic offset container.
248///
249/// Uses string key-value pairs to support all connector types:
250/// - **Kafka**: `{"partition-0": "1234", "partition-1": "5678"}`
251/// - **`PostgreSQL` CDC**: `{"lsn": "0/1234ABCD"}`
252/// - **`MySQL` CDC**: `{"gtid_set": "uuid:1-5", "binlog_file": "mysql-bin.000003"}`
253/// - **Delta Lake**: `{"version": "42"}`
254#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
255pub struct ConnectorCheckpoint {
256    /// Connector-specific offset data.
257    pub offsets: HashMap<String, String>,
258    /// Epoch this checkpoint belongs to.
259    pub epoch: u64,
260    /// Optional metadata (connector type, topic name, etc.).
261    #[serde(default)]
262    pub metadata: HashMap<String, String>,
263}
264
265impl ConnectorCheckpoint {
266    /// Creates a new connector checkpoint with the given epoch.
267    #[must_use]
268    pub fn new(epoch: u64) -> Self {
269        Self {
270            offsets: HashMap::new(),
271            epoch,
272            metadata: HashMap::new(),
273        }
274    }
275
276    /// Creates a connector checkpoint with pre-populated offsets.
277    #[must_use]
278    pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
279        Self {
280            offsets,
281            epoch,
282            metadata: HashMap::new(),
283        }
284    }
285}
286
287/// Serialized operator state stored in the manifest.
288#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
289pub struct OperatorCheckpoint {
290    /// Base64-encoded binary state (for small payloads inlined in JSON).
291    #[serde(default)]
292    pub state_b64: Option<String>,
293    /// If true, state is stored externally in the state.bin sidecar file.
294    #[serde(default)]
295    pub external: bool,
296    /// Byte offset into the state.bin file (if external).
297    #[serde(default)]
298    pub external_offset: u64,
299    /// Byte length of the state in the state.bin file (if external).
300    #[serde(default)]
301    pub external_length: u64,
302}
303
304impl OperatorCheckpoint {
305    /// Creates an inline operator checkpoint from raw bytes.
306    ///
307    /// The bytes are base64-encoded for JSON storage.
308    #[must_use]
309    pub fn inline(data: &[u8]) -> Self {
310        use base64::Engine;
311        Self {
312            state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
313            external: false,
314            external_offset: 0,
315            external_length: 0,
316        }
317    }
318
319    /// Creates an external reference to state in the sidecar file.
320    #[must_use]
321    pub fn external(offset: u64, length: u64) -> Self {
322        Self {
323            state_b64: None,
324            external: true,
325            external_offset: offset,
326            external_length: length,
327        }
328    }
329
330    /// Decodes the inline state, returning the raw bytes.
331    ///
332    /// Returns `None` if the state is external, no inline data is present,
333    /// or if the base64 data is corrupted (logs a warning in that case).
334    #[must_use]
335    pub fn decode_inline(&self) -> Option<Vec<u8>> {
336        use base64::Engine;
337        self.state_b64.as_ref().and_then(|b64| {
338            match base64::engine::general_purpose::STANDARD.decode(b64) {
339                Ok(data) => Some(data),
340                Err(e) => {
341                    tracing::warn!(
342                        error = %e,
343                        b64_len = b64.len(),
344                        "[LDB-4004] Failed to decode inline operator state from base64 — \
345                         operator will start from scratch"
346                    );
347                    None
348                }
349            }
350        })
351    }
352
353    /// Decodes the inline state, returning a `Result` for callers that need
354    /// to distinguish between "no inline state" and "corrupted state".
355    ///
356    /// Returns `Ok(None)` if no inline data is present (external or absent).
357    /// Returns `Ok(Some(bytes))` on successful decode.
358    ///
359    /// # Errors
360    ///
361    /// Returns `Err` if base64 data is present but corrupted.
362    pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
363        use base64::Engine;
364        match &self.state_b64 {
365            None => Ok(None),
366            Some(b64) => base64::engine::general_purpose::STANDARD
367                .decode(b64)
368                .map(Some)
369                .map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
370        }
371    }
372
373    /// Creates an `OperatorCheckpoint` from raw bytes using a size threshold.
374    ///
375    /// If `data.len() <= threshold`, the state is inlined as base64.
376    /// If `data.len() > threshold`, the state is marked as external with the
377    /// given offset and length, and the raw data is returned for sidecar storage.
378    ///
379    /// # Arguments
380    ///
381    /// * `data` — Raw operator state bytes
382    /// * `threshold` — Maximum size in bytes for inline storage
383    /// * `current_offset` — Byte offset into the sidecar file for this blob
384    ///
385    /// # Returns
386    ///
387    /// A tuple of the checkpoint entry and optional raw data for the sidecar.
388    #[must_use]
389    #[allow(clippy::cast_possible_truncation)]
390    pub fn from_bytes(
391        data: &[u8],
392        threshold: usize,
393        current_offset: u64,
394    ) -> (Self, Option<Vec<u8>>) {
395        if data.len() <= threshold {
396            (Self::inline(data), None)
397        } else {
398            let length = data.len() as u64;
399            (Self::external(current_offset, length), Some(data.to_vec()))
400        }
401    }
402
403    /// Shared-buffer variant of [`Self::from_bytes`].
404    ///
405    /// Takes an owned [`bytes::Bytes`] and returns the same type on the
406    /// external path, avoiding the `data.to_vec()` copy the `&[u8]`
407    /// version has to make. The checkpoint pipeline passes rkyv output
408    /// through as `Bytes`, so per-operator state no longer doubles in
409    /// memory when crossing this boundary.
410    #[must_use]
411    #[allow(clippy::cast_possible_truncation)]
412    pub fn from_bytes_shared(
413        data: bytes::Bytes,
414        threshold: usize,
415        current_offset: u64,
416    ) -> (Self, Option<bytes::Bytes>) {
417        if data.len() <= threshold {
418            (Self::inline(&data), None)
419        } else {
420            let length = data.len() as u64;
421            (Self::external(current_offset, length), Some(data))
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_manifest_new() {
432        let m = CheckpointManifest::new(1, 5);
433        assert_eq!(m.version, 1);
434        assert_eq!(m.checkpoint_id, 1);
435        assert_eq!(m.epoch, 5);
436        assert!(m.timestamp_ms > 0);
437        assert!(m.source_offsets.is_empty());
438        assert!(m.sink_epochs.is_empty());
439        assert!(m.operator_states.is_empty());
440    }
441
442    #[test]
443    fn test_manifest_json_round_trip() {
444        let mut m = CheckpointManifest::new(42, 10);
445        m.source_offsets.insert(
446            "kafka-src".into(),
447            ConnectorCheckpoint::with_offsets(
448                10,
449                HashMap::from([
450                    ("partition-0".into(), "1234".into()),
451                    ("partition-1".into(), "5678".into()),
452                ]),
453            ),
454        );
455        m.sink_epochs.insert("pg-sink".into(), 9);
456        m.watermark = Some(999_000);
457        m.operator_states
458            .insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
459
460        let json = serde_json::to_string_pretty(&m).unwrap();
461        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
462
463        assert_eq!(restored.checkpoint_id, 42);
464        assert_eq!(restored.epoch, 10);
465        assert_eq!(restored.watermark, Some(999_000));
466        let src = restored.source_offsets.get("kafka-src").unwrap();
467        assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
468        assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
469
470        let op = restored.operator_states.get("window-agg").unwrap();
471        assert_eq!(op.decode_inline().unwrap(), b"hello");
472    }
473
474    #[test]
475    fn test_manifest_backward_compat_missing_fields() {
476        // Simulate an older manifest with only mandatory fields
477        let json = r#"{
478            "version": 1,
479            "checkpoint_id": 1,
480            "epoch": 1,
481            "timestamp_ms": 1000
482        }"#;
483
484        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
485        assert_eq!(m.version, 1);
486        assert!(m.source_offsets.is_empty());
487        assert!(m.sink_epochs.is_empty());
488        assert!(m.operator_states.is_empty());
489        assert!(m.watermark.is_none());
490    }
491
492    #[test]
493    fn test_connector_checkpoint_new() {
494        let cp = ConnectorCheckpoint::new(5);
495        assert_eq!(cp.epoch, 5);
496        assert!(cp.offsets.is_empty());
497        assert!(cp.metadata.is_empty());
498    }
499
500    #[test]
501    fn test_connector_checkpoint_with_offsets() {
502        let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
503        let cp = ConnectorCheckpoint::with_offsets(3, offsets);
504        assert_eq!(cp.epoch, 3);
505        assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
506    }
507
508    #[test]
509    fn test_operator_checkpoint_inline() {
510        let op = OperatorCheckpoint::inline(b"state-data");
511        assert!(!op.external);
512        assert!(op.state_b64.is_some());
513        assert_eq!(op.decode_inline().unwrap(), b"state-data");
514    }
515
516    #[test]
517    fn test_operator_checkpoint_external() {
518        let op = OperatorCheckpoint::external(1024, 256);
519        assert!(op.external);
520        assert_eq!(op.external_offset, 1024);
521        assert_eq!(op.external_length, 256);
522        assert!(op.decode_inline().is_none());
523    }
524
525    #[test]
526    fn test_operator_checkpoint_empty_inline() {
527        let op = OperatorCheckpoint::inline(b"");
528        assert_eq!(op.decode_inline().unwrap(), b"");
529    }
530
531    #[test]
532    fn test_manifest_table_offsets() {
533        let mut m = CheckpointManifest::new(1, 1);
534        m.table_offsets.insert(
535            "instruments".into(),
536            ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
537        );
538        m.table_store_checkpoint_path = Some("/tmp/table_store_cp".into());
539
540        let json = serde_json::to_string(&m).unwrap();
541        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
542
543        assert_eq!(restored.table_offsets.len(), 1);
544        assert_eq!(
545            restored.table_store_checkpoint_path.as_deref(),
546            Some("/tmp/table_store_cp")
547        );
548    }
549
550    #[test]
551    fn test_manifest_topology_fields_round_trip() {
552        let mut m = CheckpointManifest::new(1, 1);
553        m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
554        m.sink_names = vec!["pg-sink".into()];
555
556        let json = serde_json::to_string(&m).unwrap();
557        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
558
559        assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
560        assert_eq!(restored.sink_names, vec!["pg-sink"]);
561    }
562
563    #[test]
564    fn test_manifest_topology_backward_compat() {
565        // Older manifests without topology fields should deserialize fine.
566        let json = r#"{
567            "version": 1,
568            "checkpoint_id": 5,
569            "epoch": 3,
570            "timestamp_ms": 1000
571        }"#;
572        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
573        assert!(m.source_names.is_empty());
574        assert!(m.sink_names.is_empty());
575    }
576
577    #[test]
578    fn test_validate_orphaned_source_offset() {
579        let mut m = CheckpointManifest::new(1, 1);
580        m.source_names = vec!["a".into(), "b".into()];
581        m.source_offsets
582            .insert("c".into(), ConnectorCheckpoint::new(1));
583
584        let errors = m.validate(DEFAULT_VNODE_COUNT);
585        assert!(
586            errors
587                .iter()
588                .any(|e| e.message.contains("'c' not in source_names")),
589            "expected orphaned source offset error: {errors:?}"
590        );
591    }
592
593    #[test]
594    fn test_manifest_pipeline_hash_round_trip() {
595        let mut m = CheckpointManifest::new(1, 1);
596        m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
597
598        let json = serde_json::to_string(&m).unwrap();
599        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
600
601        assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
602    }
603
604    #[test]
605    fn test_from_bytes_inline() {
606        let data = b"small-state";
607        let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
608        assert!(!op.external);
609        assert!(sidecar.is_none());
610        assert_eq!(op.decode_inline().unwrap(), data);
611    }
612
613    #[test]
614    fn test_from_bytes_external() {
615        let data = vec![0xAB; 2048];
616        let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
617        assert!(op.external);
618        assert_eq!(op.external_offset, 512);
619        assert_eq!(op.external_length, 2048);
620        assert!(op.decode_inline().is_none());
621        assert_eq!(sidecar.unwrap(), data);
622    }
623
624    #[test]
625    fn test_from_bytes_at_threshold_boundary() {
626        // Exactly at threshold → inline
627        let data = vec![0xFF; 100];
628        let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
629        assert!(!op.external);
630        assert!(sidecar.is_none());
631        assert_eq!(op.decode_inline().unwrap(), data);
632
633        // One byte over threshold → external
634        let data_over = vec![0xFF; 101];
635        let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
636        assert!(op2.external);
637        assert!(sidecar2.is_some());
638    }
639
640    #[test]
641    fn test_from_bytes_empty_data() {
642        let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
643        assert!(!op.external);
644        assert!(sidecar.is_none());
645        assert_eq!(op.decode_inline().unwrap(), b"");
646    }
647
648    #[test]
649    fn test_manifest_pipeline_hash_backward_compat() {
650        let json = r#"{
651            "version": 1,
652            "checkpoint_id": 1,
653            "epoch": 1,
654            "timestamp_ms": 1000
655        }"#;
656        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
657        assert!(m.pipeline_hash.is_none());
658    }
659}