laminar_connectors/
checkpoint.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
8
9#[derive(Debug, Clone, Default, PartialEq, Eq)]
17pub struct SourceCheckpoint {
18 offsets: HashMap<String, String>,
20
21 epoch: u64,
23
24 metadata: HashMap<String, String>,
26}
27
28impl SourceCheckpoint {
29 #[must_use]
31 pub fn new(epoch: u64) -> Self {
32 Self {
33 offsets: HashMap::new(),
34 epoch,
35 metadata: HashMap::new(),
36 }
37 }
38
39 #[must_use]
41 pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
42 Self {
43 offsets,
44 epoch,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn set_offset(&mut self, key: impl Into<String>, value: impl Into<String>) {
51 self.offsets.insert(key.into(), value.into());
52 }
53
54 #[must_use]
56 pub fn get_offset(&self, key: &str) -> Option<&str> {
57 self.offsets.get(key).map(String::as_str)
58 }
59
60 #[must_use]
62 pub fn offsets(&self) -> &HashMap<String, String> {
63 &self.offsets
64 }
65
66 #[must_use]
68 pub fn epoch(&self) -> u64 {
69 self.epoch
70 }
71
72 pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
74 self.metadata.insert(key.into(), value.into());
75 }
76
77 #[must_use]
79 pub fn get_metadata(&self, key: &str) -> Option<&str> {
80 self.metadata.get(key).map(String::as_str)
81 }
82
83 #[must_use]
85 pub fn metadata(&self) -> &HashMap<String, String> {
86 &self.metadata
87 }
88
89 #[must_use]
91 pub fn is_empty(&self) -> bool {
92 self.offsets.is_empty()
93 }
94}
95
96#[derive(Debug, Clone)]
99pub struct RuntimeCheckpoint {
100 pub source: SourceCheckpoint,
102
103 pub timestamp_ms: u64,
105
106 pub connector_name: String,
108}
109
110impl RuntimeCheckpoint {
111 #[must_use]
113 pub fn new(
114 source: SourceCheckpoint,
115 connector_name: impl Into<String>,
116 timestamp_ms: u64,
117 ) -> Self {
118 Self {
119 source,
120 timestamp_ms,
121 connector_name: connector_name.into(),
122 }
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn test_source_checkpoint_basic() {
132 let mut cp = SourceCheckpoint::new(1);
133 cp.set_offset("partition-0", "1234");
134 cp.set_offset("partition-1", "5678");
135
136 assert_eq!(cp.epoch(), 1);
137 assert_eq!(cp.get_offset("partition-0"), Some("1234"));
138 assert_eq!(cp.get_offset("partition-1"), Some("5678"));
139 assert_eq!(cp.get_offset("partition-2"), None);
140 assert!(!cp.is_empty());
141 }
142
143 #[test]
144 fn test_source_checkpoint_with_offsets() {
145 let mut offsets = HashMap::new();
146 offsets.insert("lsn".to_string(), "0/1234ABCD".to_string());
147
148 let cp = SourceCheckpoint::with_offsets(5, offsets);
149 assert_eq!(cp.epoch(), 5);
150 assert_eq!(cp.get_offset("lsn"), Some("0/1234ABCD"));
151 }
152
153 #[test]
154 fn test_source_checkpoint_metadata() {
155 let mut cp = SourceCheckpoint::new(1);
156 cp.set_metadata("connector", "kafka");
157 cp.set_metadata("topic", "events");
158
159 assert_eq!(cp.get_metadata("connector"), Some("kafka"));
160 assert_eq!(cp.get_metadata("topic"), Some("events"));
161 }
162
163 #[test]
164 fn test_runtime_checkpoint() {
165 let source = SourceCheckpoint::new(3);
166 let rt = RuntimeCheckpoint::new(source, "my-kafka-source", 1_700_000_000_000);
167
168 assert_eq!(rt.source.epoch(), 3);
169 assert_eq!(rt.connector_name, "my-kafka-source");
170 assert_eq!(rt.timestamp_ms, 1_700_000_000_000);
171 }
172
173 #[test]
174 fn test_empty_checkpoint() {
175 let cp = SourceCheckpoint::new(0);
176 assert!(cp.is_empty());
177 }
178}