1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9
10use rdkafka::Offset;
11use rdkafka::TopicPartitionList;
12
13use crate::checkpoint::SourceCheckpoint;
14
15#[derive(Debug, Clone, Default)]
21pub struct OffsetTracker {
22 topics: HashMap<Arc<str>, HashMap<i32, i64>>,
25}
26
27impl OffsetTracker {
28 #[must_use]
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn update(&mut self, topic: &str, partition: i32, offset: i64) {
36 if let Some(partitions) = self.topics.get_mut(topic as &str) {
37 partitions
38 .entry(partition)
39 .and_modify(|existing| {
40 if offset > *existing {
41 *existing = offset;
42 }
43 })
44 .or_insert(offset);
45 } else {
46 let mut partitions = HashMap::new();
47 partitions.insert(partition, offset);
48 self.topics.insert(Arc::from(topic), partitions);
49 }
50 }
51
52 pub fn update_arc(&mut self, topic: &Arc<str>, partition: i32, offset: i64) {
54 if let Some(partitions) = self.topics.get_mut(&**topic as &str) {
55 partitions
56 .entry(partition)
57 .and_modify(|existing| {
58 if offset > *existing {
59 *existing = offset;
60 }
61 })
62 .or_insert(offset);
63 } else {
64 let mut partitions = HashMap::new();
65 partitions.insert(partition, offset);
66 self.topics.insert(Arc::clone(topic), partitions);
67 }
68 }
69
70 pub fn update_force(&mut self, topic: &str, partition: i32, offset: i64) {
72 self.topics
73 .entry(Arc::from(topic))
74 .or_default()
75 .insert(partition, offset);
76 }
77
78 #[must_use]
80 pub fn get(&self, topic: &str, partition: i32) -> Option<i64> {
81 self.topics
82 .get(topic)
83 .and_then(|p| p.get(&partition))
84 .copied()
85 }
86
87 #[must_use]
89 pub fn partition_count(&self) -> usize {
90 self.topics.values().map(HashMap::len).sum()
91 }
92
93 #[must_use]
97 pub fn to_checkpoint(&self) -> SourceCheckpoint {
98 let offsets: HashMap<String, String> = self
99 .topics
100 .iter()
101 .flat_map(|(topic, partitions)| {
102 partitions.iter().map(move |(partition, offset)| {
103 (format!("{topic}-{partition}"), offset.to_string())
104 })
105 })
106 .collect();
107 let mut cp = SourceCheckpoint::with_offsets(0, offsets);
108 cp.set_metadata("connector", "kafka");
109 cp
110 }
111
112 #[must_use]
117 pub fn from_checkpoint(cp: &SourceCheckpoint) -> Self {
118 let mut tracker = Self::new();
119 for (key, value) in cp.offsets() {
120 match value.parse::<i64>() {
121 Ok(offset) => {
122 if let Some(dash_pos) = key.rfind('-') {
123 let topic = &key[..dash_pos];
124 match key[dash_pos + 1..].parse::<i32>() {
125 Ok(partition) => tracker.update_force(topic, partition, offset),
126 Err(_) => {
127 tracing::warn!(
128 key,
129 "skipping checkpoint entry with unparseable partition"
130 );
131 }
132 }
133 } else {
134 tracing::warn!(
135 key,
136 "skipping checkpoint entry without topic-partition separator"
137 );
138 }
139 }
140 Err(_) => {
141 tracing::warn!(
142 key,
143 value,
144 "skipping checkpoint entry with unparseable offset"
145 );
146 }
147 }
148 }
149 tracker
150 }
151
152 #[must_use]
156 pub fn to_topic_partition_list(&self) -> TopicPartitionList {
157 let mut tpl = TopicPartitionList::new();
158 for (topic, partitions) in &self.topics {
159 for (&partition, &offset) in partitions {
160 if let Err(e) =
161 tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
162 {
163 tracing::warn!(
164 %topic, partition, offset,
165 error = %e,
166 "failed to add partition offset to commit list"
167 );
168 }
169 }
170 }
171 tpl
172 }
173
174 #[must_use]
181 pub fn to_seek_tpl(&self, assigned: &[(String, i32)]) -> TopicPartitionList {
182 let mut tpl = TopicPartitionList::new();
183 for (topic, partition) in assigned {
184 if let Some(off) = self.get(topic, *partition) {
185 if let Err(e) = tpl.add_partition_offset(topic, *partition, Offset::Offset(off + 1))
186 {
187 tracing::warn!(
188 %topic, partition, offset = off,
189 error = %e,
190 "failed to add partition to rebalance seek list"
191 );
192 }
193 }
194 }
195 tpl
196 }
197
198 pub fn retain_assigned(&mut self, assigned: &HashSet<(String, i32)>) {
204 self.topics.retain(|topic, partitions| {
205 partitions.retain(|&partition, _| assigned.contains(&(topic.to_string(), partition)));
206 !partitions.is_empty()
207 });
208 }
209
210 #[must_use]
217 pub fn to_checkpoint_filtered(&self, assigned: &HashSet<(String, i32)>) -> SourceCheckpoint {
218 let offsets: HashMap<String, String> = self
219 .topics
220 .iter()
221 .flat_map(|(topic, partitions)| {
222 partitions.iter().filter_map(move |(partition, offset)| {
223 if assigned.contains(&(topic.to_string(), *partition)) {
224 Some((format!("{topic}-{partition}"), offset.to_string()))
225 } else {
226 None
227 }
228 })
229 })
230 .collect();
231 let mut cp = SourceCheckpoint::with_offsets(0, offsets);
232 cp.set_metadata("connector", "kafka");
233 cp
234 }
235
236 #[must_use]
241 pub fn to_topic_partition_list_filtered(
242 &self,
243 assigned: &HashSet<(String, i32)>,
244 ) -> TopicPartitionList {
245 let mut tpl = TopicPartitionList::new();
246 for (topic, partitions) in &self.topics {
247 for (&partition, &offset) in partitions {
248 if assigned.contains(&(topic.to_string(), partition)) {
249 if let Err(e) =
250 tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
251 {
252 tracing::warn!(
253 %topic, partition, offset,
254 error = %e,
255 "failed to add partition offset to filtered commit list"
256 );
257 }
258 }
259 }
260 }
261 tpl
262 }
263
264 pub fn clear(&mut self) {
266 self.topics.clear();
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn test_update_and_get() {
276 let mut tracker = OffsetTracker::new();
277 tracker.update("events", 0, 100);
278 tracker.update("events", 1, 200);
279
280 assert_eq!(tracker.get("events", 0), Some(100));
281 assert_eq!(tracker.get("events", 1), Some(200));
282 assert_eq!(tracker.get("events", 2), None);
283 assert_eq!(tracker.partition_count(), 2);
284 }
285
286 #[test]
287 fn test_update_advances_forward() {
288 let mut tracker = OffsetTracker::new();
289 tracker.update("events", 0, 100);
290 tracker.update("events", 0, 200);
291 assert_eq!(tracker.get("events", 0), Some(200));
292 }
293
294 #[test]
295 fn test_update_rejects_regression() {
296 let mut tracker = OffsetTracker::new();
297 tracker.update("events", 0, 200);
298 tracker.update("events", 0, 100); assert_eq!(tracker.get("events", 0), Some(200));
300 }
301
302 #[test]
303 fn test_update_rejects_equal() {
304 let mut tracker = OffsetTracker::new();
305 tracker.update("events", 0, 100);
306 tracker.update("events", 0, 100); assert_eq!(tracker.get("events", 0), Some(100));
308 }
309
310 #[test]
311 fn test_update_force_overwrites() {
312 let mut tracker = OffsetTracker::new();
313 tracker.update("events", 0, 200);
314 tracker.update_force("events", 0, 50); assert_eq!(tracker.get("events", 0), Some(50));
316 }
317
318 #[test]
319 fn test_checkpoint_roundtrip() {
320 let mut tracker = OffsetTracker::new();
321 tracker.update("events", 0, 100);
322 tracker.update("events", 1, 200);
323 tracker.update("orders", 0, 50);
324
325 let cp = tracker.to_checkpoint();
326 let restored = OffsetTracker::from_checkpoint(&cp);
327
328 assert_eq!(restored.get("events", 0), Some(100));
329 assert_eq!(restored.get("events", 1), Some(200));
330 assert_eq!(restored.get("orders", 0), Some(50));
331 assert_eq!(restored.partition_count(), 3);
332 }
333
334 #[test]
335 fn test_empty_tracker() {
336 let tracker = OffsetTracker::new();
337 assert_eq!(tracker.partition_count(), 0);
338 assert!(tracker.to_checkpoint().is_empty());
339 }
340
341 #[test]
342 fn test_topic_partition_list() {
343 let mut tracker = OffsetTracker::new();
344 tracker.update("events", 0, 99);
345 tracker.update("events", 1, 199);
346
347 let tpl = tracker.to_topic_partition_list();
348 let elements = tpl.elements();
349 assert_eq!(elements.len(), 2);
350
351 for elem in &elements {
352 match elem.partition() {
353 0 => assert_eq!(elem.offset(), Offset::Offset(100)),
354 1 => assert_eq!(elem.offset(), Offset::Offset(200)),
355 _ => panic!("unexpected partition"),
356 }
357 }
358 }
359
360 #[test]
361 fn test_clear() {
362 let mut tracker = OffsetTracker::new();
363 tracker.update("events", 0, 100);
364 tracker.clear();
365 assert_eq!(tracker.partition_count(), 0);
366 assert_eq!(tracker.get("events", 0), None);
367 }
368
369 #[test]
370 fn test_multi_topic_checkpoint() {
371 let mut tracker = OffsetTracker::new();
372 tracker.update("topic-a", 0, 10);
373 tracker.update("topic-b", 0, 20);
374
375 let cp = tracker.to_checkpoint();
376 let restored = OffsetTracker::from_checkpoint(&cp);
377
378 assert_eq!(restored.get("topic-a", 0), Some(10));
379 assert_eq!(restored.get("topic-b", 0), Some(20));
380 }
381
382 #[test]
383 fn test_checkpoint_metadata() {
384 let tracker = OffsetTracker::new();
385 let cp = tracker.to_checkpoint();
386 assert_eq!(cp.get_metadata("connector"), Some("kafka"));
387 }
388
389 #[test]
390 fn test_retain_assigned() {
391 let mut tracker = OffsetTracker::new();
392 tracker.update("events", 0, 100);
393 tracker.update("events", 1, 200);
394 tracker.update("events", 2, 300);
395 tracker.update("orders", 0, 50);
396
397 let mut assigned = HashSet::new();
398 assigned.insert(("events".to_string(), 0));
399 assigned.insert(("events".to_string(), 2));
400 tracker.retain_assigned(&assigned);
403
404 assert_eq!(tracker.get("events", 0), Some(100));
405 assert_eq!(tracker.get("events", 1), None); assert_eq!(tracker.get("events", 2), Some(300));
407 assert_eq!(tracker.get("orders", 0), None); assert_eq!(tracker.partition_count(), 2);
409 }
410
411 #[test]
412 fn test_retain_assigned_empty_set_clears_all() {
413 let mut tracker = OffsetTracker::new();
414 tracker.update("events", 0, 100);
415
416 tracker.retain_assigned(&HashSet::new());
417
418 assert_eq!(tracker.partition_count(), 0);
419 }
420
421 #[test]
422 fn test_to_checkpoint_filtered() {
423 let mut tracker = OffsetTracker::new();
424 tracker.update("events", 0, 100);
425 tracker.update("events", 1, 200);
426 tracker.update("orders", 0, 50);
427
428 let mut assigned = HashSet::new();
429 assigned.insert(("events".to_string(), 0));
430 assigned.insert(("orders".to_string(), 0));
431
432 let cp = tracker.to_checkpoint_filtered(&assigned);
433
434 assert_eq!(cp.get_offset("events-0"), Some("100"));
435 assert_eq!(cp.get_offset("events-1"), None); assert_eq!(cp.get_offset("orders-0"), Some("50"));
437 }
438
439 #[test]
440 fn test_to_checkpoint_filtered_empty_returns_empty() {
441 let mut tracker = OffsetTracker::new();
442 tracker.update("events", 0, 100);
443 tracker.update("events", 1, 200);
444
445 let cp = tracker.to_checkpoint_filtered(&HashSet::new());
447
448 assert!(cp.is_empty());
449 }
450
451 #[test]
452 fn test_to_seek_tpl_known_and_unknown() {
453 let mut tracker = OffsetTracker::new();
454 tracker.update("events", 0, 99);
455 tracker.update("events", 1, 199);
456
457 let assigned = vec![
458 ("events".to_string(), 0),
459 ("events".to_string(), 1),
460 ("events".to_string(), 2), ];
462 let tpl = tracker.to_seek_tpl(&assigned);
463 assert_eq!(tpl.count(), 2);
465
466 for elem in tpl.elements() {
467 match (elem.topic(), elem.partition()) {
468 ("events", 0) => assert_eq!(elem.offset(), Offset::Offset(100)),
469 ("events", 1) => assert_eq!(elem.offset(), Offset::Offset(200)),
470 _ => panic!("unexpected partition {}-{}", elem.topic(), elem.partition()),
471 }
472 }
473 }
474
475 #[test]
476 fn test_to_seek_tpl_empty_tracker() {
477 let tracker = OffsetTracker::new();
478 let assigned = vec![("events".to_string(), 0)];
479 let tpl = tracker.to_seek_tpl(&assigned);
480 assert_eq!(tpl.count(), 0);
482 }
483}