1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9
10use rdkafka::Offset;
11use rdkafka::TopicPartitionList;
12
13use crate::checkpoint::SourceCheckpoint;
14
15#[derive(Debug, Clone, Default)]
21pub struct OffsetTracker {
22 topics: HashMap<Arc<str>, HashMap<i32, i64>>,
25}
26
27impl OffsetTracker {
28 #[must_use]
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn update(&mut self, topic: &str, partition: i32, offset: i64) {
36 if let Some(partitions) = self.topics.get_mut(topic as &str) {
37 partitions
38 .entry(partition)
39 .and_modify(|existing| {
40 if offset > *existing {
41 *existing = offset;
42 }
43 })
44 .or_insert(offset);
45 } else {
46 let mut partitions = HashMap::new();
47 partitions.insert(partition, offset);
48 self.topics.insert(Arc::from(topic), partitions);
49 }
50 }
51
52 pub fn update_arc(&mut self, topic: &Arc<str>, partition: i32, offset: i64) {
54 if let Some(partitions) = self.topics.get_mut(&**topic as &str) {
55 partitions
56 .entry(partition)
57 .and_modify(|existing| {
58 if offset > *existing {
59 *existing = offset;
60 }
61 })
62 .or_insert(offset);
63 } else {
64 let mut partitions = HashMap::new();
65 partitions.insert(partition, offset);
66 self.topics.insert(Arc::clone(topic), partitions);
67 }
68 }
69
70 pub fn update_force(&mut self, topic: &str, partition: i32, offset: i64) {
72 self.topics
73 .entry(Arc::from(topic))
74 .or_default()
75 .insert(partition, offset);
76 }
77
78 #[must_use]
80 pub fn get(&self, topic: &str, partition: i32) -> Option<i64> {
81 self.topics
82 .get(topic)
83 .and_then(|p| p.get(&partition))
84 .copied()
85 }
86
87 #[must_use]
89 pub fn partition_count(&self) -> usize {
90 self.topics.values().map(HashMap::len).sum()
91 }
92
93 #[must_use]
97 pub fn to_checkpoint(&self) -> SourceCheckpoint {
98 let offsets: HashMap<String, String> = self
99 .topics
100 .iter()
101 .flat_map(|(topic, partitions)| {
102 partitions.iter().map(move |(partition, offset)| {
103 (format!("{topic}-{partition}"), offset.to_string())
104 })
105 })
106 .collect();
107 let mut cp = SourceCheckpoint::with_offsets(0, offsets);
108 cp.set_metadata("connector", "kafka");
109 cp
110 }
111
112 #[must_use]
117 pub fn from_checkpoint(cp: &SourceCheckpoint) -> Self {
118 let mut tracker = Self::new();
119 for (key, value) in cp.offsets() {
120 match value.parse::<i64>() {
121 Ok(offset) => {
122 if let Some(dash_pos) = key.rfind('-') {
123 let topic = &key[..dash_pos];
124 match key[dash_pos + 1..].parse::<i32>() {
125 Ok(partition) => tracker.update_force(topic, partition, offset),
126 Err(_) => {
127 tracing::warn!(
128 key,
129 "skipping checkpoint entry with unparseable partition"
130 );
131 }
132 }
133 } else {
134 tracing::warn!(
135 key,
136 "skipping checkpoint entry without topic-partition separator"
137 );
138 }
139 }
140 Err(_) => {
141 tracing::warn!(
142 key,
143 value,
144 "skipping checkpoint entry with unparseable offset"
145 );
146 }
147 }
148 }
149 tracker
150 }
151
152 #[must_use]
156 pub fn to_topic_partition_list(&self) -> TopicPartitionList {
157 let mut tpl = TopicPartitionList::new();
158 for (topic, partitions) in &self.topics {
159 for (&partition, &offset) in partitions {
160 tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
161 .ok();
162 }
163 }
164 tpl
165 }
166
167 pub fn retain_assigned(&mut self, assigned: &HashSet<(String, i32)>) {
173 self.topics.retain(|topic, partitions| {
174 partitions.retain(|&partition, _| assigned.contains(&(topic.to_string(), partition)));
175 !partitions.is_empty()
176 });
177 }
178
179 #[must_use]
186 pub fn to_checkpoint_filtered(&self, assigned: &HashSet<(String, i32)>) -> SourceCheckpoint {
187 let offsets: HashMap<String, String> = self
188 .topics
189 .iter()
190 .flat_map(|(topic, partitions)| {
191 partitions.iter().filter_map(move |(partition, offset)| {
192 if assigned.contains(&(topic.to_string(), *partition)) {
193 Some((format!("{topic}-{partition}"), offset.to_string()))
194 } else {
195 None
196 }
197 })
198 })
199 .collect();
200 let mut cp = SourceCheckpoint::with_offsets(0, offsets);
201 cp.set_metadata("connector", "kafka");
202 cp
203 }
204
205 #[must_use]
210 pub fn to_topic_partition_list_filtered(
211 &self,
212 assigned: &HashSet<(String, i32)>,
213 ) -> TopicPartitionList {
214 let mut tpl = TopicPartitionList::new();
215 for (topic, partitions) in &self.topics {
216 for (&partition, &offset) in partitions {
217 if assigned.contains(&(topic.to_string(), partition)) {
218 tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
219 .ok();
220 }
221 }
222 }
223 tpl
224 }
225
226 pub fn clear(&mut self) {
228 self.topics.clear();
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn test_update_and_get() {
238 let mut tracker = OffsetTracker::new();
239 tracker.update("events", 0, 100);
240 tracker.update("events", 1, 200);
241
242 assert_eq!(tracker.get("events", 0), Some(100));
243 assert_eq!(tracker.get("events", 1), Some(200));
244 assert_eq!(tracker.get("events", 2), None);
245 assert_eq!(tracker.partition_count(), 2);
246 }
247
248 #[test]
249 fn test_update_advances_forward() {
250 let mut tracker = OffsetTracker::new();
251 tracker.update("events", 0, 100);
252 tracker.update("events", 0, 200);
253 assert_eq!(tracker.get("events", 0), Some(200));
254 }
255
256 #[test]
257 fn test_update_rejects_regression() {
258 let mut tracker = OffsetTracker::new();
259 tracker.update("events", 0, 200);
260 tracker.update("events", 0, 100); assert_eq!(tracker.get("events", 0), Some(200));
262 }
263
264 #[test]
265 fn test_update_rejects_equal() {
266 let mut tracker = OffsetTracker::new();
267 tracker.update("events", 0, 100);
268 tracker.update("events", 0, 100); assert_eq!(tracker.get("events", 0), Some(100));
270 }
271
272 #[test]
273 fn test_update_force_overwrites() {
274 let mut tracker = OffsetTracker::new();
275 tracker.update("events", 0, 200);
276 tracker.update_force("events", 0, 50); assert_eq!(tracker.get("events", 0), Some(50));
278 }
279
280 #[test]
281 fn test_checkpoint_roundtrip() {
282 let mut tracker = OffsetTracker::new();
283 tracker.update("events", 0, 100);
284 tracker.update("events", 1, 200);
285 tracker.update("orders", 0, 50);
286
287 let cp = tracker.to_checkpoint();
288 let restored = OffsetTracker::from_checkpoint(&cp);
289
290 assert_eq!(restored.get("events", 0), Some(100));
291 assert_eq!(restored.get("events", 1), Some(200));
292 assert_eq!(restored.get("orders", 0), Some(50));
293 assert_eq!(restored.partition_count(), 3);
294 }
295
296 #[test]
297 fn test_empty_tracker() {
298 let tracker = OffsetTracker::new();
299 assert_eq!(tracker.partition_count(), 0);
300 assert!(tracker.to_checkpoint().is_empty());
301 }
302
303 #[test]
304 fn test_topic_partition_list() {
305 let mut tracker = OffsetTracker::new();
306 tracker.update("events", 0, 99);
307 tracker.update("events", 1, 199);
308
309 let tpl = tracker.to_topic_partition_list();
310 let elements = tpl.elements();
311 assert_eq!(elements.len(), 2);
312
313 for elem in &elements {
314 match elem.partition() {
315 0 => assert_eq!(elem.offset(), Offset::Offset(100)),
316 1 => assert_eq!(elem.offset(), Offset::Offset(200)),
317 _ => panic!("unexpected partition"),
318 }
319 }
320 }
321
322 #[test]
323 fn test_clear() {
324 let mut tracker = OffsetTracker::new();
325 tracker.update("events", 0, 100);
326 tracker.clear();
327 assert_eq!(tracker.partition_count(), 0);
328 assert_eq!(tracker.get("events", 0), None);
329 }
330
331 #[test]
332 fn test_multi_topic_checkpoint() {
333 let mut tracker = OffsetTracker::new();
334 tracker.update("topic-a", 0, 10);
335 tracker.update("topic-b", 0, 20);
336
337 let cp = tracker.to_checkpoint();
338 let restored = OffsetTracker::from_checkpoint(&cp);
339
340 assert_eq!(restored.get("topic-a", 0), Some(10));
341 assert_eq!(restored.get("topic-b", 0), Some(20));
342 }
343
344 #[test]
345 fn test_checkpoint_metadata() {
346 let tracker = OffsetTracker::new();
347 let cp = tracker.to_checkpoint();
348 assert_eq!(cp.get_metadata("connector"), Some("kafka"));
349 }
350
351 #[test]
352 fn test_retain_assigned() {
353 let mut tracker = OffsetTracker::new();
354 tracker.update("events", 0, 100);
355 tracker.update("events", 1, 200);
356 tracker.update("events", 2, 300);
357 tracker.update("orders", 0, 50);
358
359 let mut assigned = HashSet::new();
360 assigned.insert(("events".to_string(), 0));
361 assigned.insert(("events".to_string(), 2));
362 tracker.retain_assigned(&assigned);
365
366 assert_eq!(tracker.get("events", 0), Some(100));
367 assert_eq!(tracker.get("events", 1), None); assert_eq!(tracker.get("events", 2), Some(300));
369 assert_eq!(tracker.get("orders", 0), None); assert_eq!(tracker.partition_count(), 2);
371 }
372
373 #[test]
374 fn test_retain_assigned_empty_set_clears_all() {
375 let mut tracker = OffsetTracker::new();
376 tracker.update("events", 0, 100);
377
378 tracker.retain_assigned(&HashSet::new());
379
380 assert_eq!(tracker.partition_count(), 0);
381 }
382
383 #[test]
384 fn test_to_checkpoint_filtered() {
385 let mut tracker = OffsetTracker::new();
386 tracker.update("events", 0, 100);
387 tracker.update("events", 1, 200);
388 tracker.update("orders", 0, 50);
389
390 let mut assigned = HashSet::new();
391 assigned.insert(("events".to_string(), 0));
392 assigned.insert(("orders".to_string(), 0));
393
394 let cp = tracker.to_checkpoint_filtered(&assigned);
395
396 assert_eq!(cp.get_offset("events-0"), Some("100"));
397 assert_eq!(cp.get_offset("events-1"), None); assert_eq!(cp.get_offset("orders-0"), Some("50"));
399 }
400
401 #[test]
402 fn test_to_checkpoint_filtered_empty_returns_empty() {
403 let mut tracker = OffsetTracker::new();
404 tracker.update("events", 0, 100);
405 tracker.update("events", 1, 200);
406
407 let cp = tracker.to_checkpoint_filtered(&HashSet::new());
409
410 assert!(cp.is_empty());
411 }
412}