Skip to main content

laminar_storage/
checkpoint_manifest.rs

1//! Unified checkpoint manifest types.
2//!
3//! The [`CheckpointManifest`] is the single source of truth for checkpoint state,
4//! replacing the previously separate `PipelineCheckpoint`, `DagCheckpointSnapshot`,
5//! and `CheckpointMetadata` types. One manifest captures ALL state at a point in
6//! time: source offsets, sink epochs, operator state, WAL positions, and watermarks.
7//!
8//! ## Manifest Format
9//!
10//! Manifests are serialized as JSON for human readability and debuggability.
11//! Large operator state (binary blobs) is stored in a separate `state.bin` file
12//! referenced by the manifest.
13//!
14//! ## Connector Checkpoint
15//!
16//! The [`ConnectorCheckpoint`] type provides a connector-agnostic offset container
17//! that supports all connector types (Kafka partitions, PostgreSQL LSNs, MySQL GTIDs)
18//! through string key-value pairs.
19
20#[allow(clippy::disallowed_types)] // cold path: manifest serialization
21use std::collections::HashMap;
22
23/// Per-sink commit status tracked during the checkpoint commit phase.
24///
25/// After the manifest is persisted (Step 5), sinks start as [`Pending`](Self::Pending).
26/// The coordinator updates each sink's status during commit (Step 6) and
27/// saves the manifest again. Recovery uses these statuses to determine
28/// which sinks need rollback.
29#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
30pub enum SinkCommitStatus {
31    /// Sink has been pre-committed but not yet committed.
32    Pending,
33    /// Sink commit succeeded.
34    Committed,
35    /// Sink commit failed.
36    Failed(String),
37}
38
39/// A point-in-time snapshot of all pipeline state.
40///
41/// This is the single source of truth for checkpoint persistence, replacing
42/// the three previously disconnected checkpoint systems:
43/// - `PipelineCheckpoint` (source offsets + sink epochs)
44/// - `DagCheckpointSnapshot` (operator state — in-memory only)
45/// - `CheckpointMetadata` (WAL position + watermark)
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
47pub struct CheckpointManifest {
48    /// Manifest format version (for future evolution).
49    pub version: u32,
50    /// Unique, monotonically increasing checkpoint ID.
51    pub checkpoint_id: u64,
52    /// Epoch number for exactly-once coordination.
53    pub epoch: u64,
54    /// Timestamp when checkpoint was created (millis since Unix epoch).
55    pub timestamp_ms: u64,
56
57    // ── Connector State ──
58    /// Per-source connector offsets (key: source name).
59    #[serde(default)]
60    pub source_offsets: HashMap<String, ConnectorCheckpoint>,
61    /// Per-sink last committed epoch (key: sink name).
62    #[serde(default)]
63    pub sink_epochs: HashMap<String, u64>,
64    /// Per-sink commit status (key: sink name).
65    ///
66    /// Populated during the commit phase (Step 6) and saved to the manifest
67    /// afterward. Recovery uses this to decide which sinks need rollback
68    /// (those with [`SinkCommitStatus::Pending`] or [`SinkCommitStatus::Failed`]).
69    #[serde(default)]
70    pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
71    /// Per-table source offsets for reference tables (key: table name).
72    #[serde(default)]
73    pub table_offsets: HashMap<String, ConnectorCheckpoint>,
74
75    // ── Operator State ──
76    /// Per-operator checkpoint data (key: operator/node name).
77    ///
78    /// Small state is inlined as base64. Large state is stored in a separate
79    /// `state.bin` file and this map holds only a reference marker.
80    #[serde(default)]
81    pub operator_states: HashMap<String, OperatorCheckpoint>,
82
83    // ── Storage State ──
84    /// Path to the table store checkpoint, if any.
85    #[serde(default)]
86    pub table_store_checkpoint_path: Option<String>,
87    /// WAL position for single-writer mode.
88    #[serde(default)]
89    pub wal_position: u64,
90    /// Per-core WAL positions at the time of operator snapshot.
91    ///
92    /// During recovery, WAL replay must start **after** these positions to
93    /// avoid replaying entries already reflected in the operator state.
94    /// Index `i` corresponds to the WAL segment for core `i`.
95    #[serde(default)]
96    pub per_core_wal_positions: Vec<u64>,
97
98    // ── Time State ──
99    /// Global watermark at checkpoint time.
100    #[serde(default)]
101    pub watermark: Option<i64>,
102    /// Per-source watermarks (key: source name).
103    #[serde(default)]
104    pub source_watermarks: HashMap<String, i64>,
105
106    // ── Topology ──
107    /// Sorted names of all registered sources at checkpoint time.
108    ///
109    /// Used during recovery to detect topology changes (added/removed sources)
110    /// and warn the operator.
111    #[serde(default)]
112    pub source_names: Vec<String>,
113    /// Sorted names of all registered sinks at checkpoint time.
114    #[serde(default)]
115    pub sink_names: Vec<String>,
116
117    // ── Pipeline Identity ──
118    /// Hash of the pipeline configuration at checkpoint time.
119    ///
120    /// Computed from SQL queries, source/sink configuration, and connector
121    /// options. Recovery logs a warning when this changes, indicating
122    /// operator state may be incompatible with the new configuration.
123    #[serde(default)]
124    pub pipeline_hash: Option<u64>,
125
126    // ── In-flight Data (unaligned checkpoints) ──
127    /// In-flight channel data captured during unaligned checkpoints.
128    ///
129    /// Key: operator name. Value: list of in-flight records from input channels.
130    /// Empty for aligned checkpoints. During recovery, these events are replayed
131    /// before resuming normal processing.
132    #[serde(default)]
133    pub inflight_data: HashMap<String, Vec<InFlightRecord>>,
134
135    // ── Metadata ──
136    /// Total size of all checkpoint data in bytes (manifest + state.bin).
137    #[serde(default)]
138    pub size_bytes: u64,
139    /// Whether this is an incremental checkpoint (only deltas since parent).
140    #[serde(default)]
141    pub is_incremental: bool,
142    /// Parent checkpoint ID for incremental checkpoints.
143    #[serde(default)]
144    pub parent_id: Option<u64>,
145
146    // ── Integrity ──
147    /// SHA-256 hex digest of the sidecar `state.bin` file (if any).
148    ///
149    /// Written during checkpoint commit so that recovery can verify the
150    /// sidecar hasn't been corrupted or truncated on disk/S3.
151    #[serde(default)]
152    pub state_checksum: Option<String>,
153}
154
155/// Errors found during manifest validation.
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct ManifestValidationError {
158    /// Human-readable description of the issue.
159    pub message: String,
160}
161
162impl std::fmt::Display for ManifestValidationError {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(f, "{}", self.message)
165    }
166}
167
168impl CheckpointManifest {
169    /// Validates manifest consistency before recovery.
170    ///
171    /// Returns a list of issues found. An empty list means the manifest is valid.
172    /// Callers should treat non-empty results as warnings (recovery may still
173    /// proceed) or errors depending on severity.
174    #[must_use]
175    pub fn validate(&self) -> Vec<ManifestValidationError> {
176        let mut errors = Vec::new();
177
178        if self.version == 0 {
179            errors.push(ManifestValidationError {
180                message: "manifest version is 0".into(),
181            });
182        }
183
184        if self.checkpoint_id == 0 {
185            errors.push(ManifestValidationError {
186                message: "checkpoint_id is 0".into(),
187            });
188        }
189
190        if self.epoch == 0 {
191            errors.push(ManifestValidationError {
192                message: "epoch is 0".into(),
193            });
194        }
195
196        if self.timestamp_ms == 0 {
197            errors.push(ManifestValidationError {
198                message: "timestamp_ms is 0 (missing creation time)".into(),
199            });
200        }
201
202        // Sink epochs should match sink commit statuses
203        for sink_name in self.sink_epochs.keys() {
204            if !self.sink_commit_statuses.is_empty()
205                && !self.sink_commit_statuses.contains_key(sink_name)
206            {
207                errors.push(ManifestValidationError {
208                    message: format!("sink '{sink_name}' has epoch but no commit status"),
209                });
210            }
211        }
212
213        // Source offsets should reference known sources (if topology is recorded)
214        if !self.source_names.is_empty() {
215            for name in self.source_offsets.keys() {
216                if !self.source_names.contains(name) {
217                    errors.push(ManifestValidationError {
218                        message: format!("source_offsets contains '{name}' not in source_names"),
219                    });
220                }
221            }
222        }
223
224        // Incremental checkpoints must have a parent
225        if self.is_incremental && self.parent_id.is_none() {
226            errors.push(ManifestValidationError {
227                message: "incremental checkpoint has no parent_id".into(),
228            });
229        }
230
231        errors
232    }
233
234    /// Creates a new manifest with the given ID and epoch.
235    #[must_use]
236    pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
237        #[allow(clippy::cast_possible_truncation)] // u64 millis won't overflow until year 584M
238        let timestamp_ms = std::time::SystemTime::now()
239            .duration_since(std::time::UNIX_EPOCH)
240            .unwrap_or_default()
241            .as_millis() as u64;
242
243        Self {
244            version: 1,
245            checkpoint_id,
246            epoch,
247            timestamp_ms,
248            source_offsets: HashMap::new(),
249            sink_epochs: HashMap::new(),
250            sink_commit_statuses: HashMap::new(),
251            table_offsets: HashMap::new(),
252            operator_states: HashMap::new(),
253            table_store_checkpoint_path: None,
254            wal_position: 0,
255            per_core_wal_positions: Vec::new(),
256            watermark: None,
257            source_watermarks: HashMap::new(),
258            source_names: Vec::new(),
259            sink_names: Vec::new(),
260            pipeline_hash: None,
261            inflight_data: HashMap::new(),
262            size_bytes: 0,
263            is_incremental: false,
264            parent_id: None,
265            state_checksum: None,
266        }
267    }
268}
269
270/// Connector-agnostic offset container.
271///
272/// Uses string key-value pairs to support all connector types:
273/// - **Kafka**: `{"partition-0": "1234", "partition-1": "5678"}`
274/// - **`PostgreSQL` CDC**: `{"lsn": "0/1234ABCD"}`
275/// - **`MySQL` CDC**: `{"gtid_set": "uuid:1-5", "binlog_file": "mysql-bin.000003"}`
276/// - **Delta Lake**: `{"version": "42"}`
277#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
278pub struct ConnectorCheckpoint {
279    /// Connector-specific offset data.
280    pub offsets: HashMap<String, String>,
281    /// Epoch this checkpoint belongs to.
282    pub epoch: u64,
283    /// Optional metadata (connector type, topic name, etc.).
284    #[serde(default)]
285    pub metadata: HashMap<String, String>,
286}
287
288impl ConnectorCheckpoint {
289    /// Creates a new connector checkpoint with the given epoch.
290    #[must_use]
291    pub fn new(epoch: u64) -> Self {
292        Self {
293            offsets: HashMap::new(),
294            epoch,
295            metadata: HashMap::new(),
296        }
297    }
298
299    /// Creates a connector checkpoint with pre-populated offsets.
300    #[must_use]
301    pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
302        Self {
303            offsets,
304            epoch,
305            metadata: HashMap::new(),
306        }
307    }
308}
309
310/// In-flight data record from an unaligned checkpoint.
311///
312/// Each record represents serialized events buffered in a single input
313/// channel at the time of the unaligned snapshot.
314#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
315pub struct InFlightRecord {
316    /// Input channel index.
317    pub input_id: usize,
318    /// Base64-encoded serialized event data.
319    pub data_b64: String,
320}
321
322/// Serialized operator state stored in the manifest.
323#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
324pub struct OperatorCheckpoint {
325    /// Base64-encoded binary state (for small payloads inlined in JSON).
326    #[serde(default)]
327    pub state_b64: Option<String>,
328    /// If true, state is stored externally in the state.bin sidecar file.
329    #[serde(default)]
330    pub external: bool,
331    /// Byte offset into the state.bin file (if external).
332    #[serde(default)]
333    pub external_offset: u64,
334    /// Byte length of the state in the state.bin file (if external).
335    #[serde(default)]
336    pub external_length: u64,
337}
338
339impl OperatorCheckpoint {
340    /// Creates an inline operator checkpoint from raw bytes.
341    ///
342    /// The bytes are base64-encoded for JSON storage.
343    #[must_use]
344    pub fn inline(data: &[u8]) -> Self {
345        use base64::Engine;
346        Self {
347            state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
348            external: false,
349            external_offset: 0,
350            external_length: 0,
351        }
352    }
353
354    /// Creates an external reference to state in the sidecar file.
355    #[must_use]
356    pub fn external(offset: u64, length: u64) -> Self {
357        Self {
358            state_b64: None,
359            external: true,
360            external_offset: offset,
361            external_length: length,
362        }
363    }
364
365    /// Decodes the inline state, returning the raw bytes.
366    ///
367    /// Returns `None` if the state is external, no inline data is present,
368    /// or if the base64 data is corrupted (logs a warning in that case).
369    #[must_use]
370    pub fn decode_inline(&self) -> Option<Vec<u8>> {
371        use base64::Engine;
372        self.state_b64.as_ref().and_then(|b64| {
373            match base64::engine::general_purpose::STANDARD.decode(b64) {
374                Ok(data) => Some(data),
375                Err(e) => {
376                    tracing::warn!(
377                        error = %e,
378                        b64_len = b64.len(),
379                        "[LDB-4004] Failed to decode inline operator state from base64 — \
380                         operator will start from scratch"
381                    );
382                    None
383                }
384            }
385        })
386    }
387
388    /// Decodes the inline state, returning a `Result` for callers that need
389    /// to distinguish between "no inline state" and "corrupted state".
390    ///
391    /// Returns `Ok(None)` if no inline data is present (external or absent).
392    /// Returns `Ok(Some(bytes))` on successful decode.
393    ///
394    /// # Errors
395    ///
396    /// Returns `Err` if base64 data is present but corrupted.
397    pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
398        use base64::Engine;
399        match &self.state_b64 {
400            None => Ok(None),
401            Some(b64) => base64::engine::general_purpose::STANDARD
402                .decode(b64)
403                .map(Some)
404                .map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
405        }
406    }
407
408    /// Creates an `OperatorCheckpoint` from raw bytes using a size threshold.
409    ///
410    /// If `data.len() <= threshold`, the state is inlined as base64.
411    /// If `data.len() > threshold`, the state is marked as external with the
412    /// given offset and length, and the raw data is returned for sidecar storage.
413    ///
414    /// # Arguments
415    ///
416    /// * `data` — Raw operator state bytes
417    /// * `threshold` — Maximum size in bytes for inline storage
418    /// * `current_offset` — Byte offset into the sidecar file for this blob
419    ///
420    /// # Returns
421    ///
422    /// A tuple of the checkpoint entry and optional raw data for the sidecar.
423    #[must_use]
424    #[allow(clippy::cast_possible_truncation)]
425    pub fn from_bytes(
426        data: &[u8],
427        threshold: usize,
428        current_offset: u64,
429    ) -> (Self, Option<Vec<u8>>) {
430        if data.len() <= threshold {
431            (Self::inline(data), None)
432        } else {
433            let length = data.len() as u64;
434            (Self::external(current_offset, length), Some(data.to_vec()))
435        }
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_manifest_new() {
445        let m = CheckpointManifest::new(1, 5);
446        assert_eq!(m.version, 1);
447        assert_eq!(m.checkpoint_id, 1);
448        assert_eq!(m.epoch, 5);
449        assert!(m.timestamp_ms > 0);
450        assert!(m.source_offsets.is_empty());
451        assert!(m.sink_epochs.is_empty());
452        assert!(m.operator_states.is_empty());
453        assert!(!m.is_incremental);
454        assert!(m.parent_id.is_none());
455    }
456
457    #[test]
458    fn test_manifest_json_round_trip() {
459        let mut m = CheckpointManifest::new(42, 10);
460        m.source_offsets.insert(
461            "kafka-src".into(),
462            ConnectorCheckpoint::with_offsets(
463                10,
464                HashMap::from([
465                    ("partition-0".into(), "1234".into()),
466                    ("partition-1".into(), "5678".into()),
467                ]),
468            ),
469        );
470        m.sink_epochs.insert("pg-sink".into(), 9);
471        m.watermark = Some(999_000);
472        m.wal_position = 4096;
473        m.operator_states
474            .insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
475
476        let json = serde_json::to_string_pretty(&m).unwrap();
477        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
478
479        assert_eq!(restored.checkpoint_id, 42);
480        assert_eq!(restored.epoch, 10);
481        assert_eq!(restored.watermark, Some(999_000));
482        assert_eq!(restored.wal_position, 4096);
483
484        let src = restored.source_offsets.get("kafka-src").unwrap();
485        assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
486        assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
487
488        let op = restored.operator_states.get("window-agg").unwrap();
489        assert_eq!(op.decode_inline().unwrap(), b"hello");
490    }
491
492    #[test]
493    fn test_manifest_backward_compat_missing_fields() {
494        // Simulate an older manifest with only mandatory fields
495        let json = r#"{
496            "version": 1,
497            "checkpoint_id": 1,
498            "epoch": 1,
499            "timestamp_ms": 1000
500        }"#;
501
502        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
503        assert_eq!(m.version, 1);
504        assert!(m.source_offsets.is_empty());
505        assert!(m.sink_epochs.is_empty());
506        assert!(m.operator_states.is_empty());
507        assert!(m.per_core_wal_positions.is_empty());
508        assert!(m.watermark.is_none());
509        assert!(!m.is_incremental);
510    }
511
512    #[test]
513    fn test_connector_checkpoint_new() {
514        let cp = ConnectorCheckpoint::new(5);
515        assert_eq!(cp.epoch, 5);
516        assert!(cp.offsets.is_empty());
517        assert!(cp.metadata.is_empty());
518    }
519
520    #[test]
521    fn test_connector_checkpoint_with_offsets() {
522        let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
523        let cp = ConnectorCheckpoint::with_offsets(3, offsets);
524        assert_eq!(cp.epoch, 3);
525        assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
526    }
527
528    #[test]
529    fn test_operator_checkpoint_inline() {
530        let op = OperatorCheckpoint::inline(b"state-data");
531        assert!(!op.external);
532        assert!(op.state_b64.is_some());
533        assert_eq!(op.decode_inline().unwrap(), b"state-data");
534    }
535
536    #[test]
537    fn test_operator_checkpoint_external() {
538        let op = OperatorCheckpoint::external(1024, 256);
539        assert!(op.external);
540        assert_eq!(op.external_offset, 1024);
541        assert_eq!(op.external_length, 256);
542        assert!(op.decode_inline().is_none());
543    }
544
545    #[test]
546    fn test_operator_checkpoint_empty_inline() {
547        let op = OperatorCheckpoint::inline(b"");
548        assert_eq!(op.decode_inline().unwrap(), b"");
549    }
550
551    #[test]
552    fn test_manifest_with_incremental() {
553        let mut m = CheckpointManifest::new(5, 5);
554        m.is_incremental = true;
555        m.parent_id = Some(4);
556
557        let json = serde_json::to_string(&m).unwrap();
558        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
559
560        assert!(restored.is_incremental);
561        assert_eq!(restored.parent_id, Some(4));
562    }
563
564    #[test]
565    fn test_manifest_per_core_wal_positions() {
566        let mut m = CheckpointManifest::new(1, 1);
567        m.per_core_wal_positions = vec![100, 200, 300, 400];
568
569        let json = serde_json::to_string(&m).unwrap();
570        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
571        assert_eq!(restored.per_core_wal_positions, vec![100, 200, 300, 400]);
572    }
573
574    #[test]
575    fn test_manifest_table_offsets() {
576        let mut m = CheckpointManifest::new(1, 1);
577        m.table_offsets.insert(
578            "instruments".into(),
579            ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
580        );
581        m.table_store_checkpoint_path = Some("/tmp/rocksdb_cp".into());
582
583        let json = serde_json::to_string(&m).unwrap();
584        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
585
586        assert_eq!(restored.table_offsets.len(), 1);
587        assert_eq!(
588            restored.table_store_checkpoint_path.as_deref(),
589            Some("/tmp/rocksdb_cp")
590        );
591    }
592
593    #[test]
594    fn test_manifest_topology_fields_round_trip() {
595        let mut m = CheckpointManifest::new(1, 1);
596        m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
597        m.sink_names = vec!["pg-sink".into()];
598
599        let json = serde_json::to_string(&m).unwrap();
600        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
601
602        assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
603        assert_eq!(restored.sink_names, vec!["pg-sink"]);
604    }
605
606    #[test]
607    fn test_manifest_topology_backward_compat() {
608        // Older manifests without topology fields should deserialize fine.
609        let json = r#"{
610            "version": 1,
611            "checkpoint_id": 5,
612            "epoch": 3,
613            "timestamp_ms": 1000
614        }"#;
615        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
616        assert!(m.source_names.is_empty());
617        assert!(m.sink_names.is_empty());
618    }
619
620    #[test]
621    fn test_validate_orphaned_source_offset() {
622        let mut m = CheckpointManifest::new(1, 1);
623        m.source_names = vec!["a".into(), "b".into()];
624        m.source_offsets
625            .insert("c".into(), ConnectorCheckpoint::new(1));
626
627        let errors = m.validate();
628        assert!(
629            errors
630                .iter()
631                .any(|e| e.message.contains("'c' not in source_names")),
632            "expected orphaned source offset error: {errors:?}"
633        );
634    }
635
636    #[test]
637    fn test_manifest_pipeline_hash_round_trip() {
638        let mut m = CheckpointManifest::new(1, 1);
639        m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
640
641        let json = serde_json::to_string(&m).unwrap();
642        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
643
644        assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
645    }
646
647    #[test]
648    fn test_from_bytes_inline() {
649        let data = b"small-state";
650        let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
651        assert!(!op.external);
652        assert!(sidecar.is_none());
653        assert_eq!(op.decode_inline().unwrap(), data);
654    }
655
656    #[test]
657    fn test_from_bytes_external() {
658        let data = vec![0xAB; 2048];
659        let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
660        assert!(op.external);
661        assert_eq!(op.external_offset, 512);
662        assert_eq!(op.external_length, 2048);
663        assert!(op.decode_inline().is_none());
664        assert_eq!(sidecar.unwrap(), data);
665    }
666
667    #[test]
668    fn test_from_bytes_at_threshold_boundary() {
669        // Exactly at threshold → inline
670        let data = vec![0xFF; 100];
671        let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
672        assert!(!op.external);
673        assert!(sidecar.is_none());
674        assert_eq!(op.decode_inline().unwrap(), data);
675
676        // One byte over threshold → external
677        let data_over = vec![0xFF; 101];
678        let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
679        assert!(op2.external);
680        assert!(sidecar2.is_some());
681    }
682
683    #[test]
684    fn test_from_bytes_empty_data() {
685        let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
686        assert!(!op.external);
687        assert!(sidecar.is_none());
688        assert_eq!(op.decode_inline().unwrap(), b"");
689    }
690
691    #[test]
692    fn test_manifest_inflight_round_trip() {
693        use base64::Engine;
694
695        let mut m = CheckpointManifest::new(1, 1);
696        let record = InFlightRecord {
697            input_id: 2,
698            data_b64: base64::engine::general_purpose::STANDARD.encode(b"buffered-event"),
699        };
700        m.inflight_data.insert("join-op".into(), vec![record]);
701
702        let json = serde_json::to_string_pretty(&m).unwrap();
703        let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
704
705        assert_eq!(restored.inflight_data.len(), 1);
706        let records = restored.inflight_data.get("join-op").unwrap();
707        assert_eq!(records.len(), 1);
708        assert_eq!(records[0].input_id, 2);
709
710        let decoded = base64::engine::general_purpose::STANDARD
711            .decode(&records[0].data_b64)
712            .unwrap();
713        assert_eq!(decoded, b"buffered-event");
714    }
715
716    #[test]
717    fn test_manifest_inflight_backward_compat() {
718        // Older manifests without inflight_data should deserialize fine.
719        let json = r#"{
720            "version": 1,
721            "checkpoint_id": 1,
722            "epoch": 1,
723            "timestamp_ms": 1000
724        }"#;
725        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
726        assert!(m.inflight_data.is_empty());
727    }
728
729    #[test]
730    fn test_manifest_pipeline_hash_backward_compat() {
731        let json = r#"{
732            "version": 1,
733            "checkpoint_id": 1,
734            "epoch": 1,
735            "timestamp_ms": 1000
736        }"#;
737        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
738        assert!(m.pipeline_hash.is_none());
739    }
740}