1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
15use std::fmt;
16
17use serde::{Deserialize, Serialize};
18
19use crate::checkpoint::layout::SourceOffsetEntry;
20use crate::checkpoint_manifest::ConnectorCheckpoint;
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
24pub struct SourceId(pub String);
25
26impl SourceId {
27 #[must_use]
29 pub fn new(id: impl Into<String>) -> Self {
30 Self(id.into())
31 }
32
33 #[must_use]
35 pub fn as_str(&self) -> &str {
36 &self.0
37 }
38}
39
40impl fmt::Display for SourceId {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "{}", self.0)
43 }
44}
45
46impl From<&str> for SourceId {
47 fn from(s: &str) -> Self {
48 Self(s.to_string())
49 }
50}
51
52impl From<String> for SourceId {
53 fn from(s: String) -> Self {
54 Self(s)
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct SourceOffset {
64 pub source_id: SourceId,
66 pub position: SourcePosition,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72pub struct KafkaPartitionOffset {
73 pub topic: String,
75 pub partition: i32,
77 pub offset: i64,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
83pub struct KafkaPosition {
84 pub group_id: String,
86 pub partitions: Vec<KafkaPartitionOffset>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
92pub struct PostgresCdcPosition {
93 pub confirmed_flush_lsn: String,
95 pub write_lsn: Option<String>,
97 pub slot_name: String,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
103pub struct MysqlCdcPosition {
104 pub gtid_set: Option<String>,
106 pub binlog_file: Option<String>,
108 pub binlog_position: Option<u64>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
114pub struct FilePosition {
115 pub path: String,
117 pub byte_offset: u64,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
123pub struct GenericPosition {
124 pub connector_type: String,
126 pub offsets: HashMap<String, String>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
135#[serde(tag = "type")]
136pub enum SourcePosition {
137 Kafka(KafkaPosition),
139 PostgresCdc(PostgresCdcPosition),
141 MysqlCdc(MysqlCdcPosition),
143 File(FilePosition),
145 Generic(GenericPosition),
147}
148
149impl SourcePosition {
150 #[must_use]
152 pub fn to_connector_checkpoint(&self, epoch: u64) -> ConnectorCheckpoint {
153 let mut cp = ConnectorCheckpoint::new(epoch);
154 match self {
155 Self::Kafka(pos) => {
156 cp.metadata.insert("connector_type".into(), "kafka".into());
157 cp.metadata.insert("group_id".into(), pos.group_id.clone());
158 for p in &pos.partitions {
159 cp.offsets
160 .insert(format!("{}-{}", p.topic, p.partition), p.offset.to_string());
161 }
162 }
163 Self::PostgresCdc(pos) => {
164 cp.metadata
165 .insert("connector_type".into(), "postgres_cdc".into());
166 cp.offsets.insert(
167 "confirmed_flush_lsn".into(),
168 pos.confirmed_flush_lsn.clone(),
169 );
170 if let Some(ref lsn) = pos.write_lsn {
171 cp.offsets.insert("write_lsn".into(), lsn.clone());
172 }
173 cp.metadata
174 .insert("slot_name".into(), pos.slot_name.clone());
175 }
176 Self::MysqlCdc(pos) => {
177 cp.metadata
178 .insert("connector_type".into(), "mysql_cdc".into());
179 if let Some(ref gtid) = pos.gtid_set {
180 cp.offsets.insert("gtid_set".into(), gtid.clone());
181 }
182 if let Some(ref file) = pos.binlog_file {
183 cp.offsets.insert("binlog_file".into(), file.clone());
184 }
185 if let Some(binlog_pos) = pos.binlog_position {
186 cp.offsets
187 .insert("binlog_position".into(), binlog_pos.to_string());
188 }
189 }
190 Self::File(pos) => {
191 cp.metadata.insert("connector_type".into(), "file".into());
192 cp.offsets.insert("path".into(), pos.path.clone());
193 cp.offsets
194 .insert("byte_offset".into(), pos.byte_offset.to_string());
195 }
196 Self::Generic(pos) => {
197 cp.metadata
198 .insert("connector_type".into(), pos.connector_type.clone());
199 cp.offsets.clone_from(&pos.offsets);
200 }
201 }
202 cp
203 }
204
205 #[must_use]
211 pub fn from_connector_checkpoint(
212 cp: &ConnectorCheckpoint,
213 type_hint: Option<&str>,
214 ) -> Option<Self> {
215 let connector_type =
216 type_hint.or_else(|| cp.metadata.get("connector_type").map(String::as_str))?;
217
218 match connector_type {
219 "kafka" => {
220 let group_id = cp.metadata.get("group_id").cloned().unwrap_or_else(|| {
221 tracing::warn!(
222 "[LDB-6011] Kafka source checkpoint missing 'group_id' metadata; \
223 defaulting to empty — offset recovery may use wrong consumer group"
224 );
225 String::new()
226 });
227 let mut partitions = Vec::new();
228 for (key, value) in &cp.offsets {
229 if let Some(dash_pos) = key.rfind('-') {
231 let topic = key[..dash_pos].to_string();
232 if let Ok(partition) = key[dash_pos + 1..].parse::<i32>() {
233 if let Ok(offset) = value.parse::<i64>() {
234 partitions.push(KafkaPartitionOffset {
235 topic,
236 partition,
237 offset,
238 });
239 }
240 }
241 }
242 }
243 Some(Self::Kafka(KafkaPosition {
244 group_id,
245 partitions,
246 }))
247 }
248 "postgres_cdc" => {
249 let confirmed_flush_lsn = cp.offsets.get("confirmed_flush_lsn")?.clone();
250 let write_lsn = cp.offsets.get("write_lsn").cloned();
251 let slot_name = cp.metadata.get("slot_name").cloned().unwrap_or_else(|| {
252 tracing::warn!(
253 "[LDB-6011] Postgres CDC checkpoint missing 'slot_name' metadata; \
254 defaulting to empty — replication slot recovery may fail"
255 );
256 String::new()
257 });
258 Some(Self::PostgresCdc(PostgresCdcPosition {
259 confirmed_flush_lsn,
260 write_lsn,
261 slot_name,
262 }))
263 }
264 "mysql_cdc" => Some(Self::MysqlCdc(MysqlCdcPosition {
265 gtid_set: cp.offsets.get("gtid_set").cloned(),
266 binlog_file: cp.offsets.get("binlog_file").cloned(),
267 binlog_position: cp
268 .offsets
269 .get("binlog_position")
270 .and_then(|s| s.parse().ok()),
271 })),
272 "file" => {
273 let path = cp.offsets.get("path")?.clone();
274 let byte_offset = cp
275 .offsets
276 .get("byte_offset")
277 .and_then(|s| s.parse().ok())
278 .unwrap_or(0);
279 Some(Self::File(FilePosition { path, byte_offset }))
280 }
281 _ => Some(Self::Generic(GenericPosition {
282 connector_type: connector_type.to_string(),
283 offsets: cp.offsets.clone(),
284 })),
285 }
286 }
287
288 #[must_use]
290 pub fn to_offset_entry(&self, epoch: u64) -> SourceOffsetEntry {
291 let (source_type, offsets) = match self {
292 Self::Kafka(pos) => {
293 let mut offsets = HashMap::new();
294 offsets.insert("group_id".into(), pos.group_id.clone());
295 for p in &pos.partitions {
296 offsets.insert(format!("{}-{}", p.topic, p.partition), p.offset.to_string());
297 }
298 ("kafka".to_string(), offsets)
299 }
300 Self::PostgresCdc(pos) => {
301 let mut offsets = HashMap::new();
302 offsets.insert(
303 "confirmed_flush_lsn".into(),
304 pos.confirmed_flush_lsn.clone(),
305 );
306 if let Some(ref lsn) = pos.write_lsn {
307 offsets.insert("write_lsn".into(), lsn.clone());
308 }
309 offsets.insert("slot_name".into(), pos.slot_name.clone());
310 ("postgres_cdc".to_string(), offsets)
311 }
312 Self::MysqlCdc(pos) => {
313 let mut offsets = HashMap::new();
314 if let Some(ref gtid) = pos.gtid_set {
315 offsets.insert("gtid_set".into(), gtid.clone());
316 }
317 if let Some(ref file) = pos.binlog_file {
318 offsets.insert("binlog_file".into(), file.clone());
319 }
320 if let Some(binlog_pos) = pos.binlog_position {
321 offsets.insert("binlog_position".into(), binlog_pos.to_string());
322 }
323 ("mysql_cdc".to_string(), offsets)
324 }
325 Self::File(pos) => {
326 let mut offsets = HashMap::new();
327 offsets.insert("path".into(), pos.path.clone());
328 offsets.insert("byte_offset".into(), pos.byte_offset.to_string());
329 ("file".to_string(), offsets)
330 }
331 Self::Generic(pos) => (pos.connector_type.clone(), pos.offsets.clone()),
332 };
333 SourceOffsetEntry {
334 source_type,
335 offsets,
336 epoch,
337 }
338 }
339
340 #[must_use]
345 pub fn from_offset_entry(entry: &SourceOffsetEntry) -> Option<Self> {
346 match entry.source_type.as_str() {
347 "kafka" => {
348 let group_id = entry.offsets.get("group_id").cloned().unwrap_or_default();
349 let mut partitions = Vec::new();
350 for (key, value) in &entry.offsets {
351 if key == "group_id" {
352 continue;
353 }
354 if let Some(dash_pos) = key.rfind('-') {
355 let topic = key[..dash_pos].to_string();
356 if let Ok(partition) = key[dash_pos + 1..].parse::<i32>() {
357 if let Ok(offset) = value.parse::<i64>() {
358 partitions.push(KafkaPartitionOffset {
359 topic,
360 partition,
361 offset,
362 });
363 }
364 }
365 }
366 }
367 Some(Self::Kafka(KafkaPosition {
368 group_id,
369 partitions,
370 }))
371 }
372 "postgres_cdc" => {
373 let confirmed_flush_lsn = entry.offsets.get("confirmed_flush_lsn")?.clone();
374 let write_lsn = entry.offsets.get("write_lsn").cloned();
375 let slot_name = entry.offsets.get("slot_name").cloned().unwrap_or_default();
376 Some(Self::PostgresCdc(PostgresCdcPosition {
377 confirmed_flush_lsn,
378 write_lsn,
379 slot_name,
380 }))
381 }
382 "mysql_cdc" => Some(Self::MysqlCdc(MysqlCdcPosition {
383 gtid_set: entry.offsets.get("gtid_set").cloned(),
384 binlog_file: entry.offsets.get("binlog_file").cloned(),
385 binlog_position: entry
386 .offsets
387 .get("binlog_position")
388 .and_then(|s| s.parse().ok()),
389 })),
390 "file" => {
391 let path = entry.offsets.get("path")?.clone();
392 let byte_offset = entry
393 .offsets
394 .get("byte_offset")
395 .and_then(|s| s.parse().ok())
396 .unwrap_or(0);
397 Some(Self::File(FilePosition { path, byte_offset }))
398 }
399 _ => Some(Self::Generic(GenericPosition {
400 connector_type: entry.source_type.clone(),
401 offsets: entry.offsets.clone(),
402 })),
403 }
404 }
405}
406
407#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
409pub enum WarningSeverity {
410 Info,
412 Warning,
414 Error,
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
420pub struct DeterminismWarning {
421 pub source_name: String,
423 pub message: String,
425 pub severity: WarningSeverity,
427}
428
429#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
434pub struct RecoveryPlan {
435 pub source_positions: HashMap<String, SourcePosition>,
437 pub resume_epoch: u64,
439 pub warnings: Vec<DeterminismWarning>,
441}
442
443impl RecoveryPlan {
444 #[must_use]
446 pub fn from_manifest(
447 source_offsets: &HashMap<String, ConnectorCheckpoint>,
448 epoch: u64,
449 ) -> Self {
450 let mut source_positions = HashMap::new();
451 let mut warnings = Vec::new();
452
453 for (name, cp) in source_offsets {
454 if let Some(pos) = SourcePosition::from_connector_checkpoint(cp, None) {
455 source_positions.insert(name.clone(), pos);
456 } else {
457 warnings.push(DeterminismWarning {
458 source_name: name.clone(),
459 message: "Could not reconstruct typed position; \
460 will use raw offsets for recovery"
461 .into(),
462 severity: WarningSeverity::Warning,
463 });
464 }
465 }
466
467 Self {
468 source_positions,
469 resume_epoch: epoch,
470 warnings,
471 }
472 }
473
474 #[must_use]
476 pub fn from_manifest_v2(
477 source_offsets: &HashMap<String, SourceOffsetEntry>,
478 epoch: u64,
479 ) -> Self {
480 let mut source_positions = HashMap::new();
481 let mut warnings = Vec::new();
482
483 for (name, entry) in source_offsets {
484 if let Some(pos) = SourcePosition::from_offset_entry(entry) {
485 source_positions.insert(name.clone(), pos);
486 } else {
487 warnings.push(DeterminismWarning {
488 source_name: name.clone(),
489 message: "Could not reconstruct typed position from V2 \
490 offset entry; will use raw offsets for recovery"
491 .into(),
492 severity: WarningSeverity::Warning,
493 });
494 }
495 }
496
497 Self {
498 source_positions,
499 resume_epoch: epoch,
500 warnings,
501 }
502 }
503}
504
505pub trait OperatorDescriptor {
512 fn id(&self) -> &str;
514
515 fn uses_wall_clock(&self) -> bool;
517
518 fn uses_random(&self) -> bool;
520
521 fn has_external_side_effects(&self) -> bool;
523}
524
525#[derive(Debug, Clone, PartialEq, Eq)]
527pub enum OperatorDeterminismWarning {
528 WallClockUsage {
530 operator_id: String,
532 suggestion: String,
534 },
535 RandomUsage {
537 operator_id: String,
539 suggestion: String,
541 },
542 ExternalSideEffect {
544 operator_id: String,
546 suggestion: String,
548 },
549}
550
551pub struct DeterminismValidator;
557
558impl DeterminismValidator {
559 #[must_use]
564 pub fn validate(operator: &dyn OperatorDescriptor) -> Vec<OperatorDeterminismWarning> {
565 let mut warnings = Vec::new();
566
567 if operator.uses_wall_clock() {
568 warnings.push(OperatorDeterminismWarning::WallClockUsage {
569 operator_id: operator.id().to_string(),
570 suggestion: "Use event-time instead of processing-time".into(),
571 });
572 }
573
574 if operator.uses_random() {
575 warnings.push(OperatorDeterminismWarning::RandomUsage {
576 operator_id: operator.id().to_string(),
577 suggestion: "Use deterministic seed or remove randomness".into(),
578 });
579 }
580
581 if operator.has_external_side_effects() {
582 warnings.push(OperatorDeterminismWarning::ExternalSideEffect {
583 operator_id: operator.id().to_string(),
584 suggestion: "Move side effects to a sink connector".into(),
585 });
586 }
587
588 warnings
589 }
590
591 #[must_use]
595 pub fn validate_all(operators: &[&dyn OperatorDescriptor]) -> Vec<OperatorDeterminismWarning> {
596 operators
597 .iter()
598 .flat_map(|op| Self::validate(*op))
599 .collect()
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606
607 #[test]
608 fn test_kafka_position_serde_roundtrip() {
609 let pos = SourcePosition::Kafka(KafkaPosition {
610 group_id: "my-group".into(),
611 partitions: vec![
612 KafkaPartitionOffset {
613 topic: "events".into(),
614 partition: 0,
615 offset: 1234,
616 },
617 KafkaPartitionOffset {
618 topic: "events".into(),
619 partition: 1,
620 offset: 5678,
621 },
622 ],
623 });
624 let json = serde_json::to_string(&pos).unwrap();
625 let restored: SourcePosition = serde_json::from_str(&json).unwrap();
626 assert_eq!(pos, restored);
627 }
628
629 #[test]
630 fn test_postgres_cdc_position_serde_roundtrip() {
631 let pos = SourcePosition::PostgresCdc(PostgresCdcPosition {
632 confirmed_flush_lsn: "0/1234ABCD".into(),
633 write_lsn: Some("0/1234ABCE".into()),
634 slot_name: "laminar_slot".into(),
635 });
636 let json = serde_json::to_string(&pos).unwrap();
637 let restored: SourcePosition = serde_json::from_str(&json).unwrap();
638 assert_eq!(pos, restored);
639 }
640
641 #[test]
642 fn test_mysql_cdc_position_serde_roundtrip() {
643 let pos = SourcePosition::MysqlCdc(MysqlCdcPosition {
644 gtid_set: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".into()),
645 binlog_file: Some("mysql-bin.000003".into()),
646 binlog_position: Some(154),
647 });
648 let json = serde_json::to_string(&pos).unwrap();
649 let restored: SourcePosition = serde_json::from_str(&json).unwrap();
650 assert_eq!(pos, restored);
651 }
652
653 #[test]
654 fn test_file_position_serde_roundtrip() {
655 let pos = SourcePosition::File(FilePosition {
656 path: "/data/events.csv".into(),
657 byte_offset: 4096,
658 });
659 let json = serde_json::to_string(&pos).unwrap();
660 let restored: SourcePosition = serde_json::from_str(&json).unwrap();
661 assert_eq!(pos, restored);
662 }
663
664 #[test]
665 fn test_generic_position_serde_roundtrip() {
666 let pos = SourcePosition::Generic(GenericPosition {
667 connector_type: "custom".into(),
668 offsets: HashMap::from([("cursor".into(), "abc123".into())]),
669 });
670 let json = serde_json::to_string(&pos).unwrap();
671 let restored: SourcePosition = serde_json::from_str(&json).unwrap();
672 assert_eq!(pos, restored);
673 }
674
675 #[test]
676 fn test_kafka_to_connector_checkpoint() {
677 let pos = SourcePosition::Kafka(KafkaPosition {
678 group_id: "my-group".into(),
679 partitions: vec![KafkaPartitionOffset {
680 topic: "events".into(),
681 partition: 0,
682 offset: 1234,
683 }],
684 });
685 let cp = pos.to_connector_checkpoint(5);
686 assert_eq!(cp.epoch, 5);
687 assert_eq!(cp.offsets.get("events-0"), Some(&"1234".to_string()));
688 assert_eq!(
689 cp.metadata.get("connector_type"),
690 Some(&"kafka".to_string())
691 );
692 assert_eq!(cp.metadata.get("group_id"), Some(&"my-group".to_string()));
693 }
694
695 #[test]
696 fn test_postgres_to_from_checkpoint() {
697 let original = SourcePosition::PostgresCdc(PostgresCdcPosition {
698 confirmed_flush_lsn: "0/ABCD".into(),
699 write_lsn: Some("0/ABCE".into()),
700 slot_name: "test_slot".into(),
701 });
702 let cp = original.to_connector_checkpoint(10);
703 let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
704 assert_eq!(original, restored);
705 }
706
707 #[test]
708 fn test_mysql_to_from_checkpoint() {
709 let original = SourcePosition::MysqlCdc(MysqlCdcPosition {
710 gtid_set: Some("uuid:1-5".into()),
711 binlog_file: Some("mysql-bin.000003".into()),
712 binlog_position: Some(154),
713 });
714 let cp = original.to_connector_checkpoint(3);
715 let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
716 assert_eq!(original, restored);
717 }
718
719 #[test]
720 fn test_file_to_from_checkpoint() {
721 let original = SourcePosition::File(FilePosition {
722 path: "/data/file.csv".into(),
723 byte_offset: 4096,
724 });
725 let cp = original.to_connector_checkpoint(1);
726 let restored = SourcePosition::from_connector_checkpoint(&cp, None).unwrap();
727 assert_eq!(original, restored);
728 }
729
730 #[test]
731 fn test_from_checkpoint_with_type_hint() {
732 let mut cp = ConnectorCheckpoint::new(1);
733 cp.offsets.insert("events-0".into(), "100".into());
734 let pos = SourcePosition::from_connector_checkpoint(&cp, Some("kafka")).unwrap();
736 match pos {
737 SourcePosition::Kafka(k) => {
738 assert_eq!(k.partitions.len(), 1);
739 assert_eq!(k.partitions[0].offset, 100);
740 }
741 _ => panic!("Expected Kafka position"),
742 }
743 }
744
745 #[test]
746 fn test_from_checkpoint_no_type_returns_none() {
747 let cp = ConnectorCheckpoint::new(1);
748 assert!(SourcePosition::from_connector_checkpoint(&cp, None).is_none());
749 }
750
751 #[test]
752 fn test_recovery_plan_from_manifest() {
753 let mut source_offsets = HashMap::new();
754
755 let mut kafka_cp = ConnectorCheckpoint::new(5);
756 kafka_cp
757 .metadata
758 .insert("connector_type".into(), "kafka".into());
759 kafka_cp.metadata.insert("group_id".into(), "g1".into());
760 kafka_cp.offsets.insert("topic-0".into(), "100".into());
761 source_offsets.insert("kafka-src".into(), kafka_cp);
762
763 let empty_cp = ConnectorCheckpoint::new(5);
765 source_offsets.insert("unknown-src".into(), empty_cp);
766
767 let plan = RecoveryPlan::from_manifest(&source_offsets, 5);
768 assert_eq!(plan.resume_epoch, 5);
769 assert_eq!(plan.source_positions.len(), 1);
770 assert!(plan.source_positions.contains_key("kafka-src"));
771 assert_eq!(plan.warnings.len(), 1);
772 assert_eq!(plan.warnings[0].source_name, "unknown-src");
773 }
774
775 #[test]
776 fn test_source_id_display() {
777 let id = SourceId::new("kafka-orders");
778 assert_eq!(id.to_string(), "kafka-orders");
779 assert_eq!(id.as_str(), "kafka-orders");
780 }
781
782 #[test]
783 fn test_source_id_from_str() {
784 let id: SourceId = "my-source".into();
785 assert_eq!(id.0, "my-source");
786 }
787
788 #[test]
789 fn test_source_offset_serde_roundtrip() {
790 let offset = SourceOffset {
791 source_id: SourceId::new("kafka-src"),
792 position: SourcePosition::Kafka(KafkaPosition {
793 group_id: "g1".into(),
794 partitions: vec![KafkaPartitionOffset {
795 topic: "events".into(),
796 partition: 0,
797 offset: 100,
798 }],
799 }),
800 };
801 let json = serde_json::to_string(&offset).unwrap();
802 let restored: SourceOffset = serde_json::from_str(&json).unwrap();
803 assert_eq!(offset, restored);
804 }
805
806 #[test]
807 fn test_kafka_to_offset_entry() {
808 let pos = SourcePosition::Kafka(KafkaPosition {
809 group_id: "g1".into(),
810 partitions: vec![KafkaPartitionOffset {
811 topic: "events".into(),
812 partition: 0,
813 offset: 1234,
814 }],
815 });
816 let entry = pos.to_offset_entry(5);
817 assert_eq!(entry.source_type, "kafka");
818 assert_eq!(entry.epoch, 5);
819 assert_eq!(entry.offsets.get("group_id"), Some(&"g1".to_string()));
820 assert_eq!(entry.offsets.get("events-0"), Some(&"1234".to_string()));
821 }
822
823 #[test]
824 fn test_kafka_offset_entry_roundtrip() {
825 let pos = SourcePosition::Kafka(KafkaPosition {
826 group_id: "g1".into(),
827 partitions: vec![KafkaPartitionOffset {
828 topic: "events".into(),
829 partition: 0,
830 offset: 1234,
831 }],
832 });
833 let entry = pos.to_offset_entry(5);
834 let restored = SourcePosition::from_offset_entry(&entry).unwrap();
835 assert_eq!(pos, restored);
836 }
837
838 #[test]
839 fn test_postgres_offset_entry_roundtrip() {
840 let pos = SourcePosition::PostgresCdc(PostgresCdcPosition {
841 confirmed_flush_lsn: "0/ABCD".into(),
842 write_lsn: Some("0/ABCE".into()),
843 slot_name: "test_slot".into(),
844 });
845 let entry = pos.to_offset_entry(10);
846 assert_eq!(entry.source_type, "postgres_cdc");
847 let restored = SourcePosition::from_offset_entry(&entry).unwrap();
848 assert_eq!(pos, restored);
849 }
850
851 #[test]
852 fn test_mysql_offset_entry_roundtrip() {
853 let pos = SourcePosition::MysqlCdc(MysqlCdcPosition {
854 gtid_set: Some("uuid:1-5".into()),
855 binlog_file: Some("mysql-bin.000003".into()),
856 binlog_position: Some(154),
857 });
858 let entry = pos.to_offset_entry(3);
859 assert_eq!(entry.source_type, "mysql_cdc");
860 let restored = SourcePosition::from_offset_entry(&entry).unwrap();
861 assert_eq!(pos, restored);
862 }
863
864 #[test]
865 fn test_file_offset_entry_roundtrip() {
866 let pos = SourcePosition::File(FilePosition {
867 path: "/data/file.csv".into(),
868 byte_offset: 4096,
869 });
870 let entry = pos.to_offset_entry(1);
871 assert_eq!(entry.source_type, "file");
872 let restored = SourcePosition::from_offset_entry(&entry).unwrap();
873 assert_eq!(pos, restored);
874 }
875
876 #[test]
877 fn test_generic_offset_entry_roundtrip() {
878 let pos = SourcePosition::Generic(GenericPosition {
879 connector_type: "custom".into(),
880 offsets: HashMap::from([("cursor".into(), "abc123".into())]),
881 });
882 let entry = pos.to_offset_entry(1);
883 assert_eq!(entry.source_type, "custom");
884 let restored = SourcePosition::from_offset_entry(&entry).unwrap();
885 assert_eq!(pos, restored);
886 }
887
888 #[test]
889 fn test_recovery_plan_from_manifest_v2() {
890 use crate::checkpoint::layout::SourceOffsetEntry;
891
892 let mut source_offsets = HashMap::new();
893 source_offsets.insert(
894 "kafka-src".into(),
895 SourceOffsetEntry {
896 source_type: "kafka".into(),
897 offsets: HashMap::from([
898 ("group_id".into(), "g1".into()),
899 ("topic-0".into(), "100".into()),
900 ]),
901 epoch: 5,
902 },
903 );
904 source_offsets.insert(
907 "custom-src".into(),
908 SourceOffsetEntry {
909 source_type: "redis".into(),
910 offsets: HashMap::from([("cursor".into(), "42".into())]),
911 epoch: 5,
912 },
913 );
914
915 let plan = RecoveryPlan::from_manifest_v2(&source_offsets, 5);
916 assert_eq!(plan.resume_epoch, 5);
917 assert_eq!(plan.source_positions.len(), 2);
918 assert!(plan.source_positions.contains_key("kafka-src"));
919 assert!(plan.source_positions.contains_key("custom-src"));
920 assert!(plan.warnings.is_empty());
921 }
922
923 #[test]
924 fn test_warning_severity_serde() {
925 let warning = DeterminismWarning {
926 source_name: "src".into(),
927 message: "test".into(),
928 severity: WarningSeverity::Warning,
929 };
930 let json = serde_json::to_string(&warning).unwrap();
931 let restored: DeterminismWarning = serde_json::from_str(&json).unwrap();
932 assert_eq!(warning, restored);
933 }
934
935 struct TestOperator {
938 id: String,
939 wall_clock: bool,
940 random: bool,
941 side_effects: bool,
942 }
943
944 impl TestOperator {
945 fn deterministic(id: &str) -> Self {
946 Self {
947 id: id.into(),
948 wall_clock: false,
949 random: false,
950 side_effects: false,
951 }
952 }
953 }
954
955 impl OperatorDescriptor for TestOperator {
956 fn id(&self) -> &str {
957 &self.id
958 }
959 fn uses_wall_clock(&self) -> bool {
960 self.wall_clock
961 }
962 fn uses_random(&self) -> bool {
963 self.random
964 }
965 fn has_external_side_effects(&self) -> bool {
966 self.side_effects
967 }
968 }
969
970 #[test]
971 fn test_determinism_validator_clean_operator() {
972 let op = TestOperator::deterministic("agg-1");
973 let warnings = DeterminismValidator::validate(&op);
974 assert!(warnings.is_empty());
975 }
976
977 #[test]
978 fn test_determinism_validator_wall_clock_warning() {
979 let op = TestOperator {
980 wall_clock: true,
981 ..TestOperator::deterministic("timer-op")
982 };
983 let warnings = DeterminismValidator::validate(&op);
984 assert_eq!(warnings.len(), 1);
985 assert!(matches!(
986 &warnings[0],
987 OperatorDeterminismWarning::WallClockUsage { operator_id, .. }
988 if operator_id == "timer-op"
989 ));
990 }
991
992 #[test]
993 fn test_determinism_validator_random_warning() {
994 let op = TestOperator {
995 random: true,
996 ..TestOperator::deterministic("shuffle-op")
997 };
998 let warnings = DeterminismValidator::validate(&op);
999 assert_eq!(warnings.len(), 1);
1000 assert!(matches!(
1001 &warnings[0],
1002 OperatorDeterminismWarning::RandomUsage { operator_id, .. }
1003 if operator_id == "shuffle-op"
1004 ));
1005 }
1006
1007 #[test]
1008 fn test_determinism_validator_side_effect_warning() {
1009 let op = TestOperator {
1010 side_effects: true,
1011 ..TestOperator::deterministic("http-op")
1012 };
1013 let warnings = DeterminismValidator::validate(&op);
1014 assert_eq!(warnings.len(), 1);
1015 assert!(matches!(
1016 &warnings[0],
1017 OperatorDeterminismWarning::ExternalSideEffect { operator_id, .. }
1018 if operator_id == "http-op"
1019 ));
1020 }
1021
1022 #[test]
1023 fn test_determinism_validator_multiple_warnings() {
1024 let op = TestOperator {
1025 id: "bad-op".into(),
1026 wall_clock: true,
1027 random: true,
1028 side_effects: true,
1029 };
1030 let warnings = DeterminismValidator::validate(&op);
1031 assert_eq!(warnings.len(), 3);
1032 }
1033
1034 #[test]
1035 fn test_determinism_validator_validate_all() {
1036 let clean = TestOperator::deterministic("clean");
1037 let bad = TestOperator {
1038 wall_clock: true,
1039 ..TestOperator::deterministic("bad")
1040 };
1041 let warnings = DeterminismValidator::validate_all(&[&clean, &bad]);
1042 assert_eq!(warnings.len(), 1);
1043 assert!(matches!(
1044 &warnings[0],
1045 OperatorDeterminismWarning::WallClockUsage { operator_id, .. }
1046 if operator_id == "bad"
1047 ));
1048 }
1049}