1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
8
9#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
16pub enum SinkCommitStatus {
17 Pending,
19 Committed,
21 Failed(String),
23}
24
25pub const DEFAULT_VNODE_COUNT: u16 = 256;
32
33#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
35pub struct CheckpointManifest {
36 pub version: u32,
38 pub checkpoint_id: u64,
40 pub epoch: u64,
42 pub timestamp_ms: u64,
44
45 #[serde(default)]
48 pub source_offsets: HashMap<String, ConnectorCheckpoint>,
49 #[serde(default)]
51 pub sink_epochs: HashMap<String, u64>,
52 #[serde(default)]
58 pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
59 #[serde(default)]
61 pub table_offsets: HashMap<String, ConnectorCheckpoint>,
62
63 #[serde(default)]
69 pub operator_states: HashMap<String, OperatorCheckpoint>,
70
71 #[serde(default)]
74 pub table_store_checkpoint_path: Option<String>,
75 #[serde(default)]
78 pub watermark: Option<i64>,
79 #[serde(default)]
81 pub source_watermarks: HashMap<String, i64>,
82
83 #[serde(default)]
89 pub source_names: Vec<String>,
90 #[serde(default)]
92 pub sink_names: Vec<String>,
93
94 #[serde(default)]
101 pub pipeline_hash: Option<u64>,
102
103 #[serde(default)]
106 pub vnode_count: u16,
107
108 #[serde(default)]
114 pub state_checksum: Option<String>,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ManifestValidationError {
120 pub message: String,
122}
123
124impl std::fmt::Display for ManifestValidationError {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 write!(f, "{}", self.message)
127 }
128}
129
130impl CheckpointManifest {
131 #[must_use]
142 pub fn validate(&self, expected_vnode_count: u16) -> Vec<ManifestValidationError> {
143 let mut errors = Vec::new();
144
145 if self.version == 0 {
146 errors.push(ManifestValidationError {
147 message: "manifest version is 0".into(),
148 });
149 }
150
151 if self.checkpoint_id == 0 {
152 errors.push(ManifestValidationError {
153 message: "checkpoint_id is 0".into(),
154 });
155 }
156
157 if self.epoch == 0 {
158 errors.push(ManifestValidationError {
159 message: "epoch is 0".into(),
160 });
161 }
162
163 if self.timestamp_ms == 0 {
164 errors.push(ManifestValidationError {
165 message: "timestamp_ms is 0 (missing creation time)".into(),
166 });
167 }
168
169 for sink_name in self.sink_epochs.keys() {
171 if !self.sink_commit_statuses.is_empty()
172 && !self.sink_commit_statuses.contains_key(sink_name)
173 {
174 errors.push(ManifestValidationError {
175 message: format!("sink '{sink_name}' has epoch but no commit status"),
176 });
177 }
178 }
179
180 if !self.source_names.is_empty() {
182 for name in self.source_offsets.keys() {
183 if !self.source_names.contains(name) {
184 errors.push(ManifestValidationError {
185 message: format!("source_offsets contains '{name}' not in source_names"),
186 });
187 }
188 }
189 }
190
191 if self.vnode_count == 0 {
192 errors.push(ManifestValidationError {
193 message: "vnode_count is 0 (missing or legacy checkpoint)".into(),
194 });
195 } else if self.vnode_count != expected_vnode_count {
196 errors.push(ManifestValidationError {
197 message: format!(
198 "vnode_count mismatch: checkpoint has {}, runtime expects {expected_vnode_count}",
199 self.vnode_count,
200 ),
201 });
202 }
203
204 errors
205 }
206
207 #[must_use]
211 pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
212 Self::new_with_vnode_count(checkpoint_id, epoch, DEFAULT_VNODE_COUNT)
213 }
214
215 #[must_use]
217 pub fn new_with_vnode_count(checkpoint_id: u64, epoch: u64, vnode_count: u16) -> Self {
218 #[allow(clippy::cast_possible_truncation)] let timestamp_ms = std::time::SystemTime::now()
220 .duration_since(std::time::UNIX_EPOCH)
221 .unwrap_or_default()
222 .as_millis() as u64;
223
224 Self {
225 version: 1,
226 checkpoint_id,
227 epoch,
228 timestamp_ms,
229 source_offsets: HashMap::new(),
230 sink_epochs: HashMap::new(),
231 sink_commit_statuses: HashMap::new(),
232 table_offsets: HashMap::new(),
233 operator_states: HashMap::new(),
234 table_store_checkpoint_path: None,
235 watermark: None,
236 source_watermarks: HashMap::new(),
237 source_names: Vec::new(),
238 sink_names: Vec::new(),
239 pipeline_hash: None,
240 vnode_count,
241 state_checksum: None,
242 }
243 }
244}
245
246#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
254pub struct ConnectorCheckpoint {
255 pub offsets: HashMap<String, String>,
257 pub epoch: u64,
259 #[serde(default)]
261 pub metadata: HashMap<String, String>,
262}
263
264impl ConnectorCheckpoint {
265 #[must_use]
267 pub fn new(epoch: u64) -> Self {
268 Self {
269 offsets: HashMap::new(),
270 epoch,
271 metadata: HashMap::new(),
272 }
273 }
274
275 #[must_use]
277 pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
278 Self {
279 offsets,
280 epoch,
281 metadata: HashMap::new(),
282 }
283 }
284}
285
286#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
288pub struct OperatorCheckpoint {
289 #[serde(default)]
291 pub state_b64: Option<String>,
292 #[serde(default)]
294 pub external: bool,
295 #[serde(default)]
297 pub external_offset: u64,
298 #[serde(default)]
300 pub external_length: u64,
301}
302
303impl OperatorCheckpoint {
304 #[must_use]
308 pub fn inline(data: &[u8]) -> Self {
309 use base64::Engine;
310 Self {
311 state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
312 external: false,
313 external_offset: 0,
314 external_length: 0,
315 }
316 }
317
318 #[must_use]
320 pub fn external(offset: u64, length: u64) -> Self {
321 Self {
322 state_b64: None,
323 external: true,
324 external_offset: offset,
325 external_length: length,
326 }
327 }
328
329 #[must_use]
334 pub fn decode_inline(&self) -> Option<Vec<u8>> {
335 use base64::Engine;
336 self.state_b64.as_ref().and_then(|b64| {
337 match base64::engine::general_purpose::STANDARD.decode(b64) {
338 Ok(data) => Some(data),
339 Err(e) => {
340 tracing::warn!(
341 error = %e,
342 b64_len = b64.len(),
343 "[LDB-4004] Failed to decode inline operator state from base64 — \
344 operator will start from scratch"
345 );
346 None
347 }
348 }
349 })
350 }
351
352 pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
362 use base64::Engine;
363 match &self.state_b64 {
364 None => Ok(None),
365 Some(b64) => base64::engine::general_purpose::STANDARD
366 .decode(b64)
367 .map(Some)
368 .map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
369 }
370 }
371
372 #[must_use]
388 #[allow(clippy::cast_possible_truncation)]
389 pub fn from_bytes(
390 data: &[u8],
391 threshold: usize,
392 current_offset: u64,
393 ) -> (Self, Option<Vec<u8>>) {
394 if data.len() <= threshold {
395 (Self::inline(data), None)
396 } else {
397 let length = data.len() as u64;
398 (Self::external(current_offset, length), Some(data.to_vec()))
399 }
400 }
401
402 #[must_use]
410 #[allow(clippy::cast_possible_truncation)]
411 pub fn from_bytes_shared(
412 data: bytes::Bytes,
413 threshold: usize,
414 current_offset: u64,
415 ) -> (Self, Option<bytes::Bytes>) {
416 if data.len() <= threshold {
417 (Self::inline(&data), None)
418 } else {
419 let length = data.len() as u64;
420 (Self::external(current_offset, length), Some(data))
421 }
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428
429 #[test]
430 fn test_manifest_new() {
431 let m = CheckpointManifest::new(1, 5);
432 assert_eq!(m.version, 1);
433 assert_eq!(m.checkpoint_id, 1);
434 assert_eq!(m.epoch, 5);
435 assert!(m.timestamp_ms > 0);
436 assert!(m.source_offsets.is_empty());
437 assert!(m.sink_epochs.is_empty());
438 assert!(m.operator_states.is_empty());
439 }
440
441 #[test]
442 fn test_manifest_json_round_trip() {
443 let mut m = CheckpointManifest::new(42, 10);
444 m.source_offsets.insert(
445 "kafka-src".into(),
446 ConnectorCheckpoint::with_offsets(
447 10,
448 HashMap::from([
449 ("partition-0".into(), "1234".into()),
450 ("partition-1".into(), "5678".into()),
451 ]),
452 ),
453 );
454 m.sink_epochs.insert("pg-sink".into(), 9);
455 m.watermark = Some(999_000);
456 m.operator_states
457 .insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
458
459 let json = serde_json::to_string_pretty(&m).unwrap();
460 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
461
462 assert_eq!(restored.checkpoint_id, 42);
463 assert_eq!(restored.epoch, 10);
464 assert_eq!(restored.watermark, Some(999_000));
465 let src = restored.source_offsets.get("kafka-src").unwrap();
466 assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
467 assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
468
469 let op = restored.operator_states.get("window-agg").unwrap();
470 assert_eq!(op.decode_inline().unwrap(), b"hello");
471 }
472
473 #[test]
474 fn test_manifest_backward_compat_missing_fields() {
475 let json = r#"{
477 "version": 1,
478 "checkpoint_id": 1,
479 "epoch": 1,
480 "timestamp_ms": 1000
481 }"#;
482
483 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
484 assert_eq!(m.version, 1);
485 assert!(m.source_offsets.is_empty());
486 assert!(m.sink_epochs.is_empty());
487 assert!(m.operator_states.is_empty());
488 assert!(m.watermark.is_none());
489 }
490
491 #[test]
492 fn test_connector_checkpoint_new() {
493 let cp = ConnectorCheckpoint::new(5);
494 assert_eq!(cp.epoch, 5);
495 assert!(cp.offsets.is_empty());
496 assert!(cp.metadata.is_empty());
497 }
498
499 #[test]
500 fn test_connector_checkpoint_with_offsets() {
501 let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
502 let cp = ConnectorCheckpoint::with_offsets(3, offsets);
503 assert_eq!(cp.epoch, 3);
504 assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
505 }
506
507 #[test]
508 fn test_operator_checkpoint_inline() {
509 let op = OperatorCheckpoint::inline(b"state-data");
510 assert!(!op.external);
511 assert!(op.state_b64.is_some());
512 assert_eq!(op.decode_inline().unwrap(), b"state-data");
513 }
514
515 #[test]
516 fn test_operator_checkpoint_external() {
517 let op = OperatorCheckpoint::external(1024, 256);
518 assert!(op.external);
519 assert_eq!(op.external_offset, 1024);
520 assert_eq!(op.external_length, 256);
521 assert!(op.decode_inline().is_none());
522 }
523
524 #[test]
525 fn test_operator_checkpoint_empty_inline() {
526 let op = OperatorCheckpoint::inline(b"");
527 assert_eq!(op.decode_inline().unwrap(), b"");
528 }
529
530 #[test]
531 fn test_manifest_table_offsets() {
532 let mut m = CheckpointManifest::new(1, 1);
533 m.table_offsets.insert(
534 "instruments".into(),
535 ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
536 );
537 m.table_store_checkpoint_path = Some("/tmp/rocksdb_cp".into());
538
539 let json = serde_json::to_string(&m).unwrap();
540 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
541
542 assert_eq!(restored.table_offsets.len(), 1);
543 assert_eq!(
544 restored.table_store_checkpoint_path.as_deref(),
545 Some("/tmp/rocksdb_cp")
546 );
547 }
548
549 #[test]
550 fn test_manifest_topology_fields_round_trip() {
551 let mut m = CheckpointManifest::new(1, 1);
552 m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
553 m.sink_names = vec!["pg-sink".into()];
554
555 let json = serde_json::to_string(&m).unwrap();
556 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
557
558 assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
559 assert_eq!(restored.sink_names, vec!["pg-sink"]);
560 }
561
562 #[test]
563 fn test_manifest_topology_backward_compat() {
564 let json = r#"{
566 "version": 1,
567 "checkpoint_id": 5,
568 "epoch": 3,
569 "timestamp_ms": 1000
570 }"#;
571 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
572 assert!(m.source_names.is_empty());
573 assert!(m.sink_names.is_empty());
574 }
575
576 #[test]
577 fn test_validate_orphaned_source_offset() {
578 let mut m = CheckpointManifest::new(1, 1);
579 m.source_names = vec!["a".into(), "b".into()];
580 m.source_offsets
581 .insert("c".into(), ConnectorCheckpoint::new(1));
582
583 let errors = m.validate(DEFAULT_VNODE_COUNT);
584 assert!(
585 errors
586 .iter()
587 .any(|e| e.message.contains("'c' not in source_names")),
588 "expected orphaned source offset error: {errors:?}"
589 );
590 }
591
592 #[test]
593 fn test_manifest_pipeline_hash_round_trip() {
594 let mut m = CheckpointManifest::new(1, 1);
595 m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
596
597 let json = serde_json::to_string(&m).unwrap();
598 let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
599
600 assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
601 }
602
603 #[test]
604 fn test_from_bytes_inline() {
605 let data = b"small-state";
606 let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
607 assert!(!op.external);
608 assert!(sidecar.is_none());
609 assert_eq!(op.decode_inline().unwrap(), data);
610 }
611
612 #[test]
613 fn test_from_bytes_external() {
614 let data = vec![0xAB; 2048];
615 let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
616 assert!(op.external);
617 assert_eq!(op.external_offset, 512);
618 assert_eq!(op.external_length, 2048);
619 assert!(op.decode_inline().is_none());
620 assert_eq!(sidecar.unwrap(), data);
621 }
622
623 #[test]
624 fn test_from_bytes_at_threshold_boundary() {
625 let data = vec![0xFF; 100];
627 let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
628 assert!(!op.external);
629 assert!(sidecar.is_none());
630 assert_eq!(op.decode_inline().unwrap(), data);
631
632 let data_over = vec![0xFF; 101];
634 let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
635 assert!(op2.external);
636 assert!(sidecar2.is_some());
637 }
638
639 #[test]
640 fn test_from_bytes_empty_data() {
641 let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
642 assert!(!op.external);
643 assert!(sidecar.is_none());
644 assert_eq!(op.decode_inline().unwrap(), b"");
645 }
646
647 #[test]
648 fn test_manifest_pipeline_hash_backward_compat() {
649 let json = r#"{
650 "version": 1,
651 "checkpoint_id": 1,
652 "epoch": 1,
653 "timestamp_ms": 1000
654 }"#;
655 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
656 assert!(m.pipeline_hash.is_none());
657 }
658}