1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
22
23#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
30pub enum SinkCommitStatus {
31 Pending,
33 Committed,
35 Failed(String),
37}
38
39#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
47pub struct CheckpointManifest {
48 pub version: u32,
50 pub checkpoint_id: u64,
52 pub epoch: u64,
54 pub timestamp_ms: u64,
56
57 #[serde(default)]
60 pub source_offsets: HashMap<String, ConnectorCheckpoint>,
61 #[serde(default)]
63 pub sink_epochs: HashMap<String, u64>,
64 #[serde(default)]
70 pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
71 #[serde(default)]
73 pub table_offsets: HashMap<String, ConnectorCheckpoint>,
74
75 #[serde(default)]
81 pub operator_states: HashMap<String, OperatorCheckpoint>,
82
83 #[serde(default)]
86 pub table_store_checkpoint_path: Option<String>,
87 #[serde(default)]
89 pub wal_position: u64,
90 #[serde(default)]
96 pub per_core_wal_positions: Vec<u64>,
97
98 #[serde(default)]
101 pub watermark: Option<i64>,
102 #[serde(default)]
104 pub source_watermarks: HashMap<String, i64>,
105
106 #[serde(default)]
112 pub source_names: Vec<String>,
113 #[serde(default)]
115 pub sink_names: Vec<String>,
116
117 #[serde(default)]
124 pub pipeline_hash: Option<u64>,
125
126 #[serde(default)]
133 pub inflight_data: HashMap<String, Vec<InFlightRecord>>,
134
135 #[serde(default)]
138 pub size_bytes: u64,
139 #[serde(default)]
141 pub is_incremental: bool,
142 #[serde(default)]
144 pub parent_id: Option<u64>,
145
146 #[serde(default)]
152 pub state_checksum: Option<String>,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct ManifestValidationError {
158 pub message: String,
160}
161
162impl std::fmt::Display for ManifestValidationError {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(f, "{}", self.message)
165 }
166}
167
168impl CheckpointManifest {
169 #[must_use]
175 pub fn validate(&self) -> Vec<ManifestValidationError> {
176 let mut errors = Vec::new();
177
178 if self.version == 0 {
179 errors.push(ManifestValidationError {
180 message: "manifest version is 0".into(),
181 });
182 }
183
184 if self.checkpoint_id == 0 {
185 errors.push(ManifestValidationError {
186 message: "checkpoint_id is 0".into(),
187 });
188 }
189
190 if self.epoch == 0 {
191 errors.push(ManifestValidationError {
192 message: "epoch is 0".into(),
193 });
194 }
195
196 if self.timestamp_ms == 0 {
197 errors.push(ManifestValidationError {
198 message: "timestamp_ms is 0 (missing creation time)".into(),
199 });
200 }
201
202 for sink_name in self.sink_epochs.keys() {
204 if !self.sink_commit_statuses.is_empty()
205 && !self.sink_commit_statuses.contains_key(sink_name)
206 {
207 errors.push(ManifestValidationError {
208 message: format!("sink '{sink_name}' has epoch but no commit status"),
209 });
210 }
211 }
212
213 if !self.source_names.is_empty() {
215 for name in self.source_offsets.keys() {
216 if !self.source_names.contains(name) {
217 errors.push(ManifestValidationError {
218 message: format!("source_offsets contains '{name}' not in source_names"),
219 });
220 }
221 }
222 }
223
224 if self.is_incremental && self.parent_id.is_none() {
226 errors.push(ManifestValidationError {
227 message: "incremental checkpoint has no parent_id".into(),
228 });
229 }
230
231 errors
232 }
233
234 #[must_use]
236 pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
237 #[allow(clippy::cast_possible_truncation)] let timestamp_ms = std::time::SystemTime::now()
239 .duration_since(std::time::UNIX_EPOCH)
240 .unwrap_or_default()
241 .as_millis() as u64;
242
243 Self {
244 version: 1,
245 checkpoint_id,
246 epoch,
247 timestamp_ms,
248 source_offsets: HashMap::new(),
249 sink_epochs: HashMap::new(),
250 sink_commit_statuses: HashMap::new(),
251 table_offsets: HashMap::new(),
252 operator_states: HashMap::new(),
253 table_store_checkpoint_path: None,
254 wal_position: 0,
255 per_core_wal_positions: Vec::new(),
256 watermark: None,
257 source_watermarks: HashMap::new(),
258 source_names: Vec::new(),
259 sink_names: Vec::new(),
260 pipeline_hash: None,
261 inflight_data: HashMap::new(),
262 size_bytes: 0,
263 is_incremental: false,
264 parent_id: None,
265 state_checksum: None,
266 }
267 }
268}
269
270#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
278pub struct ConnectorCheckpoint {
279 pub offsets: HashMap<String, String>,
281 pub epoch: u64,
283 #[serde(default)]
285 pub metadata: HashMap<String, String>,
286}
287
288impl ConnectorCheckpoint {
289 #[must_use]
291 pub fn new(epoch: u64) -> Self {
292 Self {
293 offsets: HashMap::new(),
294 epoch,
295 metadata: HashMap::new(),
296 }
297 }
298
299 #[must_use]
301 pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
302 Self {
303 offsets,
304 epoch,
305 metadata: HashMap::new(),
306 }
307 }
308}
309
310#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
315pub struct InFlightRecord {
316 pub input_id: usize,
318 pub data_b64: String,
320}
321
322#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
324pub struct OperatorCheckpoint {
325 #[serde(default)]
327 pub state_b64: Option<String>,
328 #[serde(default)]
330 pub external: bool,
331 #[serde(default)]
333 pub external_offset: u64,
334 #[serde(default)]
336 pub external_length: u64,
337}
338
339impl OperatorCheckpoint {
340 #[must_use]
344 pub fn inline(data: &[u8]) -> Self {
345 use base64::Engine;
346 Self {
347 state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
348 external: false,
349 external_offset: 0,
350 external_length: 0,
351 }
352 }
353
354 #[must_use]
356 pub fn external(offset: u64, length: u64) -> Self {
357 Self {
358 state_b64: None,
359 external: true,
360 external_offset: offset,
361 external_length: length,
362 }
363 }
364
365 #[must_use]
370 pub fn decode_inline(&self) -> Option<Vec<u8>> {
371 use base64::Engine;
372 self.state_b64.as_ref().and_then(|b64| {
373 match base64::engine::general_purpose::STANDARD.decode(b64) {
374 Ok(data) => Some(data),
375 Err(e) => {
376 tracing::warn!(
377 error = %e,
378 b64_len = b64.len(),
379 "[LDB-4004] Failed to decode inline operator state from base64 — \
380 operator will start from scratch"
381 );
382 None
383 }
384 }
385 })
386 }
387
388 pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
398 use base64::Engine;
399 match &self.state_b64 {
400 None => Ok(None),
401 Some(b64) => base64::engine::general_purpose::STANDARD
402 .decode(b64)
403 .map(Some)
404 .map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
405 }
406 }
407
408 #[must_use]
424 #[allow(clippy::cast_possible_truncation)]
425 pub fn from_bytes(
426 data: &[u8],
427 threshold: usize,
428 current_offset: u64,
429 ) -> (Self, Option<Vec<u8>>) {
430 if data.len() <= threshold {
431 (Self::inline(data), None)
432 } else {
433 let length = data.len() as u64;
434 (Self::external(current_offset, length), Some(data.to_vec()))
435 }
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn test_manifest_new() {
445 let m = CheckpointManifest::new(1, 5);
446 assert_eq!(m.version, 1);
447 assert_eq!(m.checkpoint_id, 1);
448 assert_eq!(m.epoch, 5);
449 assert!(m.timestamp_ms > 0);
450 assert!(m.source_offsets.is_empty());
451 assert!(m.sink_epochs.is_empty());
452 assert!(m.operator_states.is_empty());
453 assert!(!m.is_incremental);
454 assert!(m.parent_id.is_none());
455 }
456
457 #[test]
458 fn test_manifest_json_round_trip() {
459 let mut m = CheckpointManifest::new(42, 10);
460 m.source_offsets.insert(
461 "kafka-src".into(),
462 ConnectorCheckpoint::with_offsets(
463 10,
464 HashMap::from([
465 ("partition-0".into(), "1234".into()),
466 ("partition-1".into(), "5678".into()),
467 ]),
468 ),
469 );
470 m.sink_epochs.insert("pg-sink".into(), 9);
471 m.watermark = Some(999_000);
472 m.wal_position = 4096;
473 m.operator_states
474 .insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
475
476 let json = serde_json::to_string_pretty(&m).unwrap();
477 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
478
479 assert_eq!(restored.checkpoint_id, 42);
480 assert_eq!(restored.epoch, 10);
481 assert_eq!(restored.watermark, Some(999_000));
482 assert_eq!(restored.wal_position, 4096);
483
484 let src = restored.source_offsets.get("kafka-src").unwrap();
485 assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
486 assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
487
488 let op = restored.operator_states.get("window-agg").unwrap();
489 assert_eq!(op.decode_inline().unwrap(), b"hello");
490 }
491
492 #[test]
493 fn test_manifest_backward_compat_missing_fields() {
494 let json = r#"{
496 "version": 1,
497 "checkpoint_id": 1,
498 "epoch": 1,
499 "timestamp_ms": 1000
500 }"#;
501
502 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
503 assert_eq!(m.version, 1);
504 assert!(m.source_offsets.is_empty());
505 assert!(m.sink_epochs.is_empty());
506 assert!(m.operator_states.is_empty());
507 assert!(m.per_core_wal_positions.is_empty());
508 assert!(m.watermark.is_none());
509 assert!(!m.is_incremental);
510 }
511
512 #[test]
513 fn test_connector_checkpoint_new() {
514 let cp = ConnectorCheckpoint::new(5);
515 assert_eq!(cp.epoch, 5);
516 assert!(cp.offsets.is_empty());
517 assert!(cp.metadata.is_empty());
518 }
519
520 #[test]
521 fn test_connector_checkpoint_with_offsets() {
522 let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
523 let cp = ConnectorCheckpoint::with_offsets(3, offsets);
524 assert_eq!(cp.epoch, 3);
525 assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
526 }
527
528 #[test]
529 fn test_operator_checkpoint_inline() {
530 let op = OperatorCheckpoint::inline(b"state-data");
531 assert!(!op.external);
532 assert!(op.state_b64.is_some());
533 assert_eq!(op.decode_inline().unwrap(), b"state-data");
534 }
535
536 #[test]
537 fn test_operator_checkpoint_external() {
538 let op = OperatorCheckpoint::external(1024, 256);
539 assert!(op.external);
540 assert_eq!(op.external_offset, 1024);
541 assert_eq!(op.external_length, 256);
542 assert!(op.decode_inline().is_none());
543 }
544
545 #[test]
546 fn test_operator_checkpoint_empty_inline() {
547 let op = OperatorCheckpoint::inline(b"");
548 assert_eq!(op.decode_inline().unwrap(), b"");
549 }
550
551 #[test]
552 fn test_manifest_with_incremental() {
553 let mut m = CheckpointManifest::new(5, 5);
554 m.is_incremental = true;
555 m.parent_id = Some(4);
556
557 let json = serde_json::to_string(&m).unwrap();
558 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
559
560 assert!(restored.is_incremental);
561 assert_eq!(restored.parent_id, Some(4));
562 }
563
564 #[test]
565 fn test_manifest_per_core_wal_positions() {
566 let mut m = CheckpointManifest::new(1, 1);
567 m.per_core_wal_positions = vec![100, 200, 300, 400];
568
569 let json = serde_json::to_string(&m).unwrap();
570 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
571 assert_eq!(restored.per_core_wal_positions, vec![100, 200, 300, 400]);
572 }
573
574 #[test]
575 fn test_manifest_table_offsets() {
576 let mut m = CheckpointManifest::new(1, 1);
577 m.table_offsets.insert(
578 "instruments".into(),
579 ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
580 );
581 m.table_store_checkpoint_path = Some("/tmp/rocksdb_cp".into());
582
583 let json = serde_json::to_string(&m).unwrap();
584 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
585
586 assert_eq!(restored.table_offsets.len(), 1);
587 assert_eq!(
588 restored.table_store_checkpoint_path.as_deref(),
589 Some("/tmp/rocksdb_cp")
590 );
591 }
592
593 #[test]
594 fn test_manifest_topology_fields_round_trip() {
595 let mut m = CheckpointManifest::new(1, 1);
596 m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
597 m.sink_names = vec!["pg-sink".into()];
598
599 let json = serde_json::to_string(&m).unwrap();
600 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
601
602 assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
603 assert_eq!(restored.sink_names, vec!["pg-sink"]);
604 }
605
606 #[test]
607 fn test_manifest_topology_backward_compat() {
608 let json = r#"{
610 "version": 1,
611 "checkpoint_id": 5,
612 "epoch": 3,
613 "timestamp_ms": 1000
614 }"#;
615 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
616 assert!(m.source_names.is_empty());
617 assert!(m.sink_names.is_empty());
618 }
619
620 #[test]
621 fn test_validate_orphaned_source_offset() {
622 let mut m = CheckpointManifest::new(1, 1);
623 m.source_names = vec!["a".into(), "b".into()];
624 m.source_offsets
625 .insert("c".into(), ConnectorCheckpoint::new(1));
626
627 let errors = m.validate();
628 assert!(
629 errors
630 .iter()
631 .any(|e| e.message.contains("'c' not in source_names")),
632 "expected orphaned source offset error: {errors:?}"
633 );
634 }
635
636 #[test]
637 fn test_manifest_pipeline_hash_round_trip() {
638 let mut m = CheckpointManifest::new(1, 1);
639 m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
640
641 let json = serde_json::to_string(&m).unwrap();
642 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
643
644 assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
645 }
646
647 #[test]
648 fn test_from_bytes_inline() {
649 let data = b"small-state";
650 let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
651 assert!(!op.external);
652 assert!(sidecar.is_none());
653 assert_eq!(op.decode_inline().unwrap(), data);
654 }
655
656 #[test]
657 fn test_from_bytes_external() {
658 let data = vec![0xAB; 2048];
659 let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
660 assert!(op.external);
661 assert_eq!(op.external_offset, 512);
662 assert_eq!(op.external_length, 2048);
663 assert!(op.decode_inline().is_none());
664 assert_eq!(sidecar.unwrap(), data);
665 }
666
667 #[test]
668 fn test_from_bytes_at_threshold_boundary() {
669 let data = vec![0xFF; 100];
671 let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
672 assert!(!op.external);
673 assert!(sidecar.is_none());
674 assert_eq!(op.decode_inline().unwrap(), data);
675
676 let data_over = vec![0xFF; 101];
678 let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
679 assert!(op2.external);
680 assert!(sidecar2.is_some());
681 }
682
683 #[test]
684 fn test_from_bytes_empty_data() {
685 let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
686 assert!(!op.external);
687 assert!(sidecar.is_none());
688 assert_eq!(op.decode_inline().unwrap(), b"");
689 }
690
691 #[test]
692 fn test_manifest_inflight_round_trip() {
693 use base64::Engine;
694
695 let mut m = CheckpointManifest::new(1, 1);
696 let record = InFlightRecord {
697 input_id: 2,
698 data_b64: base64::engine::general_purpose::STANDARD.encode(b"buffered-event"),
699 };
700 m.inflight_data.insert("join-op".into(), vec![record]);
701
702 let json = serde_json::to_string_pretty(&m).unwrap();
703 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
704
705 assert_eq!(restored.inflight_data.len(), 1);
706 let records = restored.inflight_data.get("join-op").unwrap();
707 assert_eq!(records.len(), 1);
708 assert_eq!(records[0].input_id, 2);
709
710 let decoded = base64::engine::general_purpose::STANDARD
711 .decode(&records[0].data_b64)
712 .unwrap();
713 assert_eq!(decoded, b"buffered-event");
714 }
715
716 #[test]
717 fn test_manifest_inflight_backward_compat() {
718 let json = r#"{
720 "version": 1,
721 "checkpoint_id": 1,
722 "epoch": 1,
723 "timestamp_ms": 1000
724 }"#;
725 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
726 assert!(m.inflight_data.is_empty());
727 }
728
729 #[test]
730 fn test_manifest_pipeline_hash_backward_compat() {
731 let json = r#"{
732 "version": 1,
733 "checkpoint_id": 1,
734 "epoch": 1,
735 "timestamp_ms": 1000
736 }"#;
737 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
738 assert!(m.pipeline_hash.is_none());
739 }
740}