laminar_connectors/
checkpoint.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
6
7#[derive(Debug, Clone, Default, PartialEq, Eq)]
15pub struct SourceCheckpoint {
16 offsets: HashMap<String, String>,
18
19 epoch: u64,
21
22 metadata: HashMap<String, String>,
24}
25
26impl SourceCheckpoint {
27 #[must_use]
29 pub fn new(epoch: u64) -> Self {
30 Self {
31 offsets: HashMap::new(),
32 epoch,
33 metadata: HashMap::new(),
34 }
35 }
36
37 #[must_use]
39 pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
40 Self {
41 offsets,
42 epoch,
43 metadata: HashMap::new(),
44 }
45 }
46
47 pub fn set_offset(&mut self, key: impl Into<String>, value: impl Into<String>) {
49 self.offsets.insert(key.into(), value.into());
50 }
51
52 #[must_use]
54 pub fn get_offset(&self, key: &str) -> Option<&str> {
55 self.offsets.get(key).map(String::as_str)
56 }
57
58 #[must_use]
60 pub fn offsets(&self) -> &HashMap<String, String> {
61 &self.offsets
62 }
63
64 #[must_use]
66 pub fn epoch(&self) -> u64 {
67 self.epoch
68 }
69
70 pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
72 self.metadata.insert(key.into(), value.into());
73 }
74
75 #[must_use]
77 pub fn get_metadata(&self, key: &str) -> Option<&str> {
78 self.metadata.get(key).map(String::as_str)
79 }
80
81 #[must_use]
83 pub fn metadata(&self) -> &HashMap<String, String> {
84 &self.metadata
85 }
86
87 #[must_use]
89 pub fn is_empty(&self) -> bool {
90 self.offsets.is_empty()
91 }
92}
93
94#[derive(Debug, Clone)]
97pub struct RuntimeCheckpoint {
98 pub source: SourceCheckpoint,
100
101 pub timestamp_ms: u64,
103
104 pub connector_name: String,
106}
107
108impl RuntimeCheckpoint {
109 #[must_use]
111 pub fn new(
112 source: SourceCheckpoint,
113 connector_name: impl Into<String>,
114 timestamp_ms: u64,
115 ) -> Self {
116 Self {
117 source,
118 timestamp_ms,
119 connector_name: connector_name.into(),
120 }
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn test_source_checkpoint_basic() {
130 let mut cp = SourceCheckpoint::new(1);
131 cp.set_offset("partition-0", "1234");
132 cp.set_offset("partition-1", "5678");
133
134 assert_eq!(cp.epoch(), 1);
135 assert_eq!(cp.get_offset("partition-0"), Some("1234"));
136 assert_eq!(cp.get_offset("partition-1"), Some("5678"));
137 assert_eq!(cp.get_offset("partition-2"), None);
138 assert!(!cp.is_empty());
139 }
140
141 #[test]
142 fn test_source_checkpoint_with_offsets() {
143 let mut offsets = HashMap::new();
144 offsets.insert("lsn".to_string(), "0/1234ABCD".to_string());
145
146 let cp = SourceCheckpoint::with_offsets(5, offsets);
147 assert_eq!(cp.epoch(), 5);
148 assert_eq!(cp.get_offset("lsn"), Some("0/1234ABCD"));
149 }
150
151 #[test]
152 fn test_source_checkpoint_metadata() {
153 let mut cp = SourceCheckpoint::new(1);
154 cp.set_metadata("connector", "kafka");
155 cp.set_metadata("topic", "events");
156
157 assert_eq!(cp.get_metadata("connector"), Some("kafka"));
158 assert_eq!(cp.get_metadata("topic"), Some("events"));
159 }
160
161 #[test]
162 fn test_runtime_checkpoint() {
163 let source = SourceCheckpoint::new(3);
164 let rt = RuntimeCheckpoint::new(source, "my-kafka-source", 1_700_000_000_000);
165
166 assert_eq!(rt.source.epoch(), 3);
167 assert_eq!(rt.connector_name, "my-kafka-source");
168 assert_eq!(rt.timestamp_ms, 1_700_000_000_000);
169 }
170
171 #[test]
172 fn test_empty_checkpoint() {
173 let cp = SourceCheckpoint::new(0);
174 assert!(cp.is_empty());
175 }
176}