1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
9
10#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
17pub enum SinkCommitStatus {
18 Pending,
20 Committed,
22 Failed(String),
24}
25
26pub const DEFAULT_VNODE_COUNT: u16 = 256;
33
34#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
36pub struct CheckpointManifest {
37 pub version: u32,
39 pub checkpoint_id: u64,
41 pub epoch: u64,
43 pub timestamp_ms: u64,
45
46 #[serde(default)]
49 pub source_offsets: HashMap<String, ConnectorCheckpoint>,
50 #[serde(default)]
52 pub sink_epochs: HashMap<String, u64>,
53 #[serde(default)]
59 pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
60 #[serde(default)]
62 pub table_offsets: HashMap<String, ConnectorCheckpoint>,
63
64 #[serde(default)]
70 pub operator_states: HashMap<String, OperatorCheckpoint>,
71
72 #[serde(default)]
75 pub table_store_checkpoint_path: Option<String>,
76 #[serde(default)]
79 pub watermark: Option<i64>,
80 #[serde(default)]
82 pub source_watermarks: HashMap<String, i64>,
83
84 #[serde(default)]
90 pub source_names: Vec<String>,
91 #[serde(default)]
93 pub sink_names: Vec<String>,
94
95 #[serde(default)]
102 pub pipeline_hash: Option<u64>,
103
104 #[serde(default)]
107 pub vnode_count: u16,
108
109 #[serde(default)]
115 pub state_checksum: Option<String>,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct ManifestValidationError {
121 pub message: String,
123}
124
125impl std::fmt::Display for ManifestValidationError {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(f, "{}", self.message)
128 }
129}
130
131impl CheckpointManifest {
132 #[must_use]
143 pub fn validate(&self, expected_vnode_count: u16) -> Vec<ManifestValidationError> {
144 let mut errors = Vec::new();
145
146 if self.version == 0 {
147 errors.push(ManifestValidationError {
148 message: "manifest version is 0".into(),
149 });
150 }
151
152 if self.checkpoint_id == 0 {
153 errors.push(ManifestValidationError {
154 message: "checkpoint_id is 0".into(),
155 });
156 }
157
158 if self.epoch == 0 {
159 errors.push(ManifestValidationError {
160 message: "epoch is 0".into(),
161 });
162 }
163
164 if self.timestamp_ms == 0 {
165 errors.push(ManifestValidationError {
166 message: "timestamp_ms is 0 (missing creation time)".into(),
167 });
168 }
169
170 for sink_name in self.sink_epochs.keys() {
172 if !self.sink_commit_statuses.is_empty()
173 && !self.sink_commit_statuses.contains_key(sink_name)
174 {
175 errors.push(ManifestValidationError {
176 message: format!("sink '{sink_name}' has epoch but no commit status"),
177 });
178 }
179 }
180
181 if !self.source_names.is_empty() {
183 for name in self.source_offsets.keys() {
184 if !self.source_names.contains(name) {
185 errors.push(ManifestValidationError {
186 message: format!("source_offsets contains '{name}' not in source_names"),
187 });
188 }
189 }
190 }
191
192 if self.vnode_count == 0 {
193 errors.push(ManifestValidationError {
194 message: "vnode_count is 0 (missing or legacy checkpoint)".into(),
195 });
196 } else if self.vnode_count != expected_vnode_count {
197 errors.push(ManifestValidationError {
198 message: format!(
199 "vnode_count mismatch: checkpoint has {}, runtime expects {expected_vnode_count}",
200 self.vnode_count,
201 ),
202 });
203 }
204
205 errors
206 }
207
208 #[must_use]
212 pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
213 Self::new_with_vnode_count(checkpoint_id, epoch, DEFAULT_VNODE_COUNT)
214 }
215
216 #[must_use]
218 pub fn new_with_vnode_count(checkpoint_id: u64, epoch: u64, vnode_count: u16) -> Self {
219 #[allow(clippy::cast_possible_truncation)] let timestamp_ms = std::time::SystemTime::now()
221 .duration_since(std::time::UNIX_EPOCH)
222 .unwrap_or_default()
223 .as_millis() as u64;
224
225 Self {
226 version: 1,
227 checkpoint_id,
228 epoch,
229 timestamp_ms,
230 source_offsets: HashMap::new(),
231 sink_epochs: HashMap::new(),
232 sink_commit_statuses: HashMap::new(),
233 table_offsets: HashMap::new(),
234 operator_states: HashMap::new(),
235 table_store_checkpoint_path: None,
236 watermark: None,
237 source_watermarks: HashMap::new(),
238 source_names: Vec::new(),
239 sink_names: Vec::new(),
240 pipeline_hash: None,
241 vnode_count,
242 state_checksum: None,
243 }
244 }
245}
246
247#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
255pub struct ConnectorCheckpoint {
256 pub offsets: HashMap<String, String>,
258 pub epoch: u64,
260 #[serde(default)]
262 pub metadata: HashMap<String, String>,
263}
264
265impl ConnectorCheckpoint {
266 #[must_use]
268 pub fn new(epoch: u64) -> Self {
269 Self {
270 offsets: HashMap::new(),
271 epoch,
272 metadata: HashMap::new(),
273 }
274 }
275
276 #[must_use]
278 pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
279 Self {
280 offsets,
281 epoch,
282 metadata: HashMap::new(),
283 }
284 }
285}
286
287#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
289pub struct OperatorCheckpoint {
290 #[serde(default)]
292 pub state_b64: Option<String>,
293 #[serde(default)]
295 pub external: bool,
296 #[serde(default)]
298 pub external_offset: u64,
299 #[serde(default)]
301 pub external_length: u64,
302}
303
304impl OperatorCheckpoint {
305 #[must_use]
309 pub fn inline(data: &[u8]) -> Self {
310 use base64::Engine;
311 Self {
312 state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
313 external: false,
314 external_offset: 0,
315 external_length: 0,
316 }
317 }
318
319 #[must_use]
321 pub fn external(offset: u64, length: u64) -> Self {
322 Self {
323 state_b64: None,
324 external: true,
325 external_offset: offset,
326 external_length: length,
327 }
328 }
329
330 #[must_use]
335 pub fn decode_inline(&self) -> Option<Vec<u8>> {
336 use base64::Engine;
337 self.state_b64.as_ref().and_then(|b64| {
338 match base64::engine::general_purpose::STANDARD.decode(b64) {
339 Ok(data) => Some(data),
340 Err(e) => {
341 tracing::warn!(
342 error = %e,
343 b64_len = b64.len(),
344 "[LDB-4004] Failed to decode inline operator state from base64 — \
345 operator will start from scratch"
346 );
347 None
348 }
349 }
350 })
351 }
352
353 pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
363 use base64::Engine;
364 match &self.state_b64 {
365 None => Ok(None),
366 Some(b64) => base64::engine::general_purpose::STANDARD
367 .decode(b64)
368 .map(Some)
369 .map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
370 }
371 }
372
373 #[must_use]
389 #[allow(clippy::cast_possible_truncation)]
390 pub fn from_bytes(
391 data: &[u8],
392 threshold: usize,
393 current_offset: u64,
394 ) -> (Self, Option<Vec<u8>>) {
395 if data.len() <= threshold {
396 (Self::inline(data), None)
397 } else {
398 let length = data.len() as u64;
399 (Self::external(current_offset, length), Some(data.to_vec()))
400 }
401 }
402
403 #[must_use]
411 #[allow(clippy::cast_possible_truncation)]
412 pub fn from_bytes_shared(
413 data: bytes::Bytes,
414 threshold: usize,
415 current_offset: u64,
416 ) -> (Self, Option<bytes::Bytes>) {
417 if data.len() <= threshold {
418 (Self::inline(&data), None)
419 } else {
420 let length = data.len() as u64;
421 (Self::external(current_offset, length), Some(data))
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
431 fn test_manifest_new() {
432 let m = CheckpointManifest::new(1, 5);
433 assert_eq!(m.version, 1);
434 assert_eq!(m.checkpoint_id, 1);
435 assert_eq!(m.epoch, 5);
436 assert!(m.timestamp_ms > 0);
437 assert!(m.source_offsets.is_empty());
438 assert!(m.sink_epochs.is_empty());
439 assert!(m.operator_states.is_empty());
440 }
441
442 #[test]
443 fn test_manifest_json_round_trip() {
444 let mut m = CheckpointManifest::new(42, 10);
445 m.source_offsets.insert(
446 "kafka-src".into(),
447 ConnectorCheckpoint::with_offsets(
448 10,
449 HashMap::from([
450 ("partition-0".into(), "1234".into()),
451 ("partition-1".into(), "5678".into()),
452 ]),
453 ),
454 );
455 m.sink_epochs.insert("pg-sink".into(), 9);
456 m.watermark = Some(999_000);
457 m.operator_states
458 .insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
459
460 let json = serde_json::to_string_pretty(&m).unwrap();
461 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
462
463 assert_eq!(restored.checkpoint_id, 42);
464 assert_eq!(restored.epoch, 10);
465 assert_eq!(restored.watermark, Some(999_000));
466 let src = restored.source_offsets.get("kafka-src").unwrap();
467 assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
468 assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
469
470 let op = restored.operator_states.get("window-agg").unwrap();
471 assert_eq!(op.decode_inline().unwrap(), b"hello");
472 }
473
474 #[test]
475 fn test_manifest_backward_compat_missing_fields() {
476 let json = r#"{
478 "version": 1,
479 "checkpoint_id": 1,
480 "epoch": 1,
481 "timestamp_ms": 1000
482 }"#;
483
484 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
485 assert_eq!(m.version, 1);
486 assert!(m.source_offsets.is_empty());
487 assert!(m.sink_epochs.is_empty());
488 assert!(m.operator_states.is_empty());
489 assert!(m.watermark.is_none());
490 }
491
492 #[test]
493 fn test_connector_checkpoint_new() {
494 let cp = ConnectorCheckpoint::new(5);
495 assert_eq!(cp.epoch, 5);
496 assert!(cp.offsets.is_empty());
497 assert!(cp.metadata.is_empty());
498 }
499
500 #[test]
501 fn test_connector_checkpoint_with_offsets() {
502 let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
503 let cp = ConnectorCheckpoint::with_offsets(3, offsets);
504 assert_eq!(cp.epoch, 3);
505 assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
506 }
507
508 #[test]
509 fn test_operator_checkpoint_inline() {
510 let op = OperatorCheckpoint::inline(b"state-data");
511 assert!(!op.external);
512 assert!(op.state_b64.is_some());
513 assert_eq!(op.decode_inline().unwrap(), b"state-data");
514 }
515
516 #[test]
517 fn test_operator_checkpoint_external() {
518 let op = OperatorCheckpoint::external(1024, 256);
519 assert!(op.external);
520 assert_eq!(op.external_offset, 1024);
521 assert_eq!(op.external_length, 256);
522 assert!(op.decode_inline().is_none());
523 }
524
525 #[test]
526 fn test_operator_checkpoint_empty_inline() {
527 let op = OperatorCheckpoint::inline(b"");
528 assert_eq!(op.decode_inline().unwrap(), b"");
529 }
530
531 #[test]
532 fn test_manifest_table_offsets() {
533 let mut m = CheckpointManifest::new(1, 1);
534 m.table_offsets.insert(
535 "instruments".into(),
536 ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
537 );
538 m.table_store_checkpoint_path = Some("/tmp/table_store_cp".into());
539
540 let json = serde_json::to_string(&m).unwrap();
541 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
542
543 assert_eq!(restored.table_offsets.len(), 1);
544 assert_eq!(
545 restored.table_store_checkpoint_path.as_deref(),
546 Some("/tmp/table_store_cp")
547 );
548 }
549
550 #[test]
551 fn test_manifest_topology_fields_round_trip() {
552 let mut m = CheckpointManifest::new(1, 1);
553 m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
554 m.sink_names = vec!["pg-sink".into()];
555
556 let json = serde_json::to_string(&m).unwrap();
557 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
558
559 assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
560 assert_eq!(restored.sink_names, vec!["pg-sink"]);
561 }
562
563 #[test]
564 fn test_manifest_topology_backward_compat() {
565 let json = r#"{
567 "version": 1,
568 "checkpoint_id": 5,
569 "epoch": 3,
570 "timestamp_ms": 1000
571 }"#;
572 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
573 assert!(m.source_names.is_empty());
574 assert!(m.sink_names.is_empty());
575 }
576
577 #[test]
578 fn test_validate_orphaned_source_offset() {
579 let mut m = CheckpointManifest::new(1, 1);
580 m.source_names = vec!["a".into(), "b".into()];
581 m.source_offsets
582 .insert("c".into(), ConnectorCheckpoint::new(1));
583
584 let errors = m.validate(DEFAULT_VNODE_COUNT);
585 assert!(
586 errors
587 .iter()
588 .any(|e| e.message.contains("'c' not in source_names")),
589 "expected orphaned source offset error: {errors:?}"
590 );
591 }
592
593 #[test]
594 fn test_manifest_pipeline_hash_round_trip() {
595 let mut m = CheckpointManifest::new(1, 1);
596 m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
597
598 let json = serde_json::to_string(&m).unwrap();
599 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
600
601 assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
602 }
603
604 #[test]
605 fn test_from_bytes_inline() {
606 let data = b"small-state";
607 let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
608 assert!(!op.external);
609 assert!(sidecar.is_none());
610 assert_eq!(op.decode_inline().unwrap(), data);
611 }
612
613 #[test]
614 fn test_from_bytes_external() {
615 let data = vec![0xAB; 2048];
616 let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
617 assert!(op.external);
618 assert_eq!(op.external_offset, 512);
619 assert_eq!(op.external_length, 2048);
620 assert!(op.decode_inline().is_none());
621 assert_eq!(sidecar.unwrap(), data);
622 }
623
624 #[test]
625 fn test_from_bytes_at_threshold_boundary() {
626 let data = vec![0xFF; 100];
628 let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
629 assert!(!op.external);
630 assert!(sidecar.is_none());
631 assert_eq!(op.decode_inline().unwrap(), data);
632
633 let data_over = vec![0xFF; 101];
635 let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
636 assert!(op2.external);
637 assert!(sidecar2.is_some());
638 }
639
640 #[test]
641 fn test_from_bytes_empty_data() {
642 let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
643 assert!(!op.external);
644 assert!(sidecar.is_none());
645 assert_eq!(op.decode_inline().unwrap(), b"");
646 }
647
648 #[test]
649 fn test_manifest_pipeline_hash_backward_compat() {
650 let json = r#"{
651 "version": 1,
652 "checkpoint_id": 1,
653 "epoch": 1,
654 "timestamp_ms": 1000
655 }"#;
656 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
657 assert!(m.pipeline_hash.is_none());
658 }
659}