Skip to main content

laminar_connectors/kafka/
offsets.rs

1//! Kafka offset tracking for per-partition consumption progress.
2//!
3//! [`OffsetTracker`] maintains the latest consumed offset for each
4//! topic-partition and supports checkpoint/restore roundtrips via
5//! [`SourceCheckpoint`].
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9
10use rdkafka::Offset;
11use rdkafka::TopicPartitionList;
12
13use crate::checkpoint::SourceCheckpoint;
14
15/// Tracks consumed offsets per topic-partition.
16///
17/// Offsets stored are the last-consumed offset (not the next offset to fetch).
18/// When committing to Kafka, `to_topic_partition_list()` returns offset+1
19/// (the next offset to consume) per Kafka convention.
20#[derive(Debug, Clone, Default)]
21pub struct OffsetTracker {
22    /// Two-level map: topic -> (partition -> offset). Uses `Arc<str>` keys
23    /// to avoid per-message String allocations on the hot path.
24    topics: HashMap<Arc<str>, HashMap<i32, i64>>,
25}
26
27impl OffsetTracker {
28    /// Offsets start empty; populated by `update()` / `from_checkpoint()`.
29    #[must_use]
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Updates the offset (monotonic: only advances forward).
35    pub fn update(&mut self, topic: &str, partition: i32, offset: i64) {
36        if let Some(partitions) = self.topics.get_mut(topic as &str) {
37            partitions
38                .entry(partition)
39                .and_modify(|existing| {
40                    if offset > *existing {
41                        *existing = offset;
42                    }
43                })
44                .or_insert(offset);
45        } else {
46            let mut partitions = HashMap::new();
47            partitions.insert(partition, offset);
48            self.topics.insert(Arc::from(topic), partitions);
49        }
50    }
51
52    /// Updates the offset using a pre-interned topic Arc (avoids allocation).
53    pub fn update_arc(&mut self, topic: &Arc<str>, partition: i32, offset: i64) {
54        if let Some(partitions) = self.topics.get_mut(&**topic as &str) {
55            partitions
56                .entry(partition)
57                .and_modify(|existing| {
58                    if offset > *existing {
59                        *existing = offset;
60                    }
61                })
62                .or_insert(offset);
63        } else {
64            let mut partitions = HashMap::new();
65            partitions.insert(partition, offset);
66            self.topics.insert(Arc::clone(topic), partitions);
67        }
68    }
69
70    /// Unconditionally sets the offset for a topic-partition (used by restore).
71    pub fn update_force(&mut self, topic: &str, partition: i32, offset: i64) {
72        self.topics
73            .entry(Arc::from(topic))
74            .or_default()
75            .insert(partition, offset);
76    }
77
78    /// Gets the last-consumed offset for a topic-partition.
79    #[must_use]
80    pub fn get(&self, topic: &str, partition: i32) -> Option<i64> {
81        self.topics
82            .get(topic)
83            .and_then(|p| p.get(&partition))
84            .copied()
85    }
86
87    /// Returns the total number of tracked partitions across all topics.
88    #[must_use]
89    pub fn partition_count(&self) -> usize {
90        self.topics.values().map(HashMap::len).sum()
91    }
92
93    /// Converts tracked offsets to a [`SourceCheckpoint`].
94    ///
95    /// Key format: `"{topic}-{partition}"`, value: offset as string.
96    #[must_use]
97    pub fn to_checkpoint(&self) -> SourceCheckpoint {
98        let offsets: HashMap<String, String> = self
99            .topics
100            .iter()
101            .flat_map(|(topic, partitions)| {
102                partitions.iter().map(move |(partition, offset)| {
103                    (format!("{topic}-{partition}"), offset.to_string())
104                })
105            })
106            .collect();
107        let mut cp = SourceCheckpoint::with_offsets(0, offsets);
108        cp.set_metadata("connector", "kafka");
109        cp
110    }
111
112    /// Restores offset state from a [`SourceCheckpoint`].
113    ///
114    /// Parses keys in `"{topic}-{partition}"` format. Logs warnings for
115    /// unparseable entries rather than silently dropping them.
116    #[must_use]
117    pub fn from_checkpoint(cp: &SourceCheckpoint) -> Self {
118        let mut tracker = Self::new();
119        for (key, value) in cp.offsets() {
120            match value.parse::<i64>() {
121                Ok(offset) => {
122                    if let Some(dash_pos) = key.rfind('-') {
123                        let topic = &key[..dash_pos];
124                        match key[dash_pos + 1..].parse::<i32>() {
125                            Ok(partition) => tracker.update_force(topic, partition, offset),
126                            Err(_) => {
127                                tracing::warn!(
128                                    key,
129                                    "skipping checkpoint entry with unparseable partition"
130                                );
131                            }
132                        }
133                    } else {
134                        tracing::warn!(
135                            key,
136                            "skipping checkpoint entry without topic-partition separator"
137                        );
138                    }
139                }
140                Err(_) => {
141                    tracing::warn!(
142                        key,
143                        value,
144                        "skipping checkpoint entry with unparseable offset"
145                    );
146                }
147            }
148        }
149        tracker
150    }
151
152    /// Builds an rdkafka [`TopicPartitionList`] for committing.
153    ///
154    /// Per Kafka convention, committed offsets are next-to-fetch (offset+1).
155    #[must_use]
156    pub fn to_topic_partition_list(&self) -> TopicPartitionList {
157        let mut tpl = TopicPartitionList::new();
158        for (topic, partitions) in &self.topics {
159            for (&partition, &offset) in partitions {
160                if let Err(e) =
161                    tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
162                {
163                    tracing::warn!(
164                        %topic, partition, offset,
165                        error = %e,
166                        "failed to add partition offset to commit list"
167                    );
168                }
169            }
170        }
171        tpl
172    }
173
174    /// Builds a [`TopicPartitionList`] for seeking after a rebalance assign.
175    ///
176    /// Only includes partitions where the tracker holds a known offset,
177    /// using `Offset::Offset(offset + 1)` (next-to-fetch). Partitions NOT
178    /// in the tracker are omitted — callers should let `auto.offset.reset`
179    /// handle those.
180    #[must_use]
181    pub fn to_seek_tpl(&self, assigned: &[(String, i32)]) -> TopicPartitionList {
182        let mut tpl = TopicPartitionList::new();
183        for (topic, partition) in assigned {
184            if let Some(off) = self.get(topic, *partition) {
185                if let Err(e) = tpl.add_partition_offset(topic, *partition, Offset::Offset(off + 1))
186                {
187                    tracing::warn!(
188                        %topic, partition, offset = off,
189                        error = %e,
190                        "failed to add partition to rebalance seek list"
191                    );
192                }
193            }
194        }
195        tpl
196    }
197
198    /// Removes partitions that are not in the `assigned` set.
199    ///
200    /// Called after a rebalance revoke to purge offsets for partitions this
201    /// consumer no longer owns. This prevents stale offsets from leaking
202    /// into checkpoints and causing incorrect partition assignment on recovery.
203    pub fn retain_assigned(&mut self, assigned: &HashSet<(String, i32)>) {
204        self.topics.retain(|topic, partitions| {
205            partitions.retain(|&partition, _| assigned.contains(&(topic.to_string(), partition)));
206            !partitions.is_empty()
207        });
208    }
209
210    /// Builds a checkpoint containing only offsets for currently assigned partitions.
211    ///
212    /// Non-mutating alternative to `retain_assigned()` + `to_checkpoint()`.
213    /// When `assigned` is empty (either before first rebalance or after full
214    /// revocation), returns an empty checkpoint — this is correct because
215    /// there are no partitions this consumer owns.
216    #[must_use]
217    pub fn to_checkpoint_filtered(&self, assigned: &HashSet<(String, i32)>) -> SourceCheckpoint {
218        let offsets: HashMap<String, String> = self
219            .topics
220            .iter()
221            .flat_map(|(topic, partitions)| {
222                partitions.iter().filter_map(move |(partition, offset)| {
223                    if assigned.contains(&(topic.to_string(), *partition)) {
224                        Some((format!("{topic}-{partition}"), offset.to_string()))
225                    } else {
226                        None
227                    }
228                })
229            })
230            .collect();
231        let mut cp = SourceCheckpoint::with_offsets(0, offsets);
232        cp.set_metadata("connector", "kafka");
233        cp
234    }
235
236    /// Builds an rdkafka [`TopicPartitionList`] for only assigned partitions.
237    ///
238    /// Like [`Self::to_topic_partition_list`] but filtered to the `assigned` set.
239    /// When `assigned` is empty, returns an empty TPL.
240    #[must_use]
241    pub fn to_topic_partition_list_filtered(
242        &self,
243        assigned: &HashSet<(String, i32)>,
244    ) -> TopicPartitionList {
245        let mut tpl = TopicPartitionList::new();
246        for (topic, partitions) in &self.topics {
247            for (&partition, &offset) in partitions {
248                if assigned.contains(&(topic.to_string(), partition)) {
249                    if let Err(e) =
250                        tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
251                    {
252                        tracing::warn!(
253                            %topic, partition, offset,
254                            error = %e,
255                            "failed to add partition offset to filtered commit list"
256                        );
257                    }
258                }
259            }
260        }
261        tpl
262    }
263
264    /// Clears all tracked offsets.
265    pub fn clear(&mut self) {
266        self.topics.clear();
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn test_update_and_get() {
276        let mut tracker = OffsetTracker::new();
277        tracker.update("events", 0, 100);
278        tracker.update("events", 1, 200);
279
280        assert_eq!(tracker.get("events", 0), Some(100));
281        assert_eq!(tracker.get("events", 1), Some(200));
282        assert_eq!(tracker.get("events", 2), None);
283        assert_eq!(tracker.partition_count(), 2);
284    }
285
286    #[test]
287    fn test_update_advances_forward() {
288        let mut tracker = OffsetTracker::new();
289        tracker.update("events", 0, 100);
290        tracker.update("events", 0, 200);
291        assert_eq!(tracker.get("events", 0), Some(200));
292    }
293
294    #[test]
295    fn test_update_rejects_regression() {
296        let mut tracker = OffsetTracker::new();
297        tracker.update("events", 0, 200);
298        tracker.update("events", 0, 100); // should be ignored
299        assert_eq!(tracker.get("events", 0), Some(200));
300    }
301
302    #[test]
303    fn test_update_rejects_equal() {
304        let mut tracker = OffsetTracker::new();
305        tracker.update("events", 0, 100);
306        tracker.update("events", 0, 100); // same offset, no change
307        assert_eq!(tracker.get("events", 0), Some(100));
308    }
309
310    #[test]
311    fn test_update_force_overwrites() {
312        let mut tracker = OffsetTracker::new();
313        tracker.update("events", 0, 200);
314        tracker.update_force("events", 0, 50); // force allows regression
315        assert_eq!(tracker.get("events", 0), Some(50));
316    }
317
318    #[test]
319    fn test_checkpoint_roundtrip() {
320        let mut tracker = OffsetTracker::new();
321        tracker.update("events", 0, 100);
322        tracker.update("events", 1, 200);
323        tracker.update("orders", 0, 50);
324
325        let cp = tracker.to_checkpoint();
326        let restored = OffsetTracker::from_checkpoint(&cp);
327
328        assert_eq!(restored.get("events", 0), Some(100));
329        assert_eq!(restored.get("events", 1), Some(200));
330        assert_eq!(restored.get("orders", 0), Some(50));
331        assert_eq!(restored.partition_count(), 3);
332    }
333
334    #[test]
335    fn test_empty_tracker() {
336        let tracker = OffsetTracker::new();
337        assert_eq!(tracker.partition_count(), 0);
338        assert!(tracker.to_checkpoint().is_empty());
339    }
340
341    #[test]
342    fn test_topic_partition_list() {
343        let mut tracker = OffsetTracker::new();
344        tracker.update("events", 0, 99);
345        tracker.update("events", 1, 199);
346
347        let tpl = tracker.to_topic_partition_list();
348        let elements = tpl.elements();
349        assert_eq!(elements.len(), 2);
350
351        for elem in &elements {
352            match elem.partition() {
353                0 => assert_eq!(elem.offset(), Offset::Offset(100)),
354                1 => assert_eq!(elem.offset(), Offset::Offset(200)),
355                _ => panic!("unexpected partition"),
356            }
357        }
358    }
359
360    #[test]
361    fn test_clear() {
362        let mut tracker = OffsetTracker::new();
363        tracker.update("events", 0, 100);
364        tracker.clear();
365        assert_eq!(tracker.partition_count(), 0);
366        assert_eq!(tracker.get("events", 0), None);
367    }
368
369    #[test]
370    fn test_multi_topic_checkpoint() {
371        let mut tracker = OffsetTracker::new();
372        tracker.update("topic-a", 0, 10);
373        tracker.update("topic-b", 0, 20);
374
375        let cp = tracker.to_checkpoint();
376        let restored = OffsetTracker::from_checkpoint(&cp);
377
378        assert_eq!(restored.get("topic-a", 0), Some(10));
379        assert_eq!(restored.get("topic-b", 0), Some(20));
380    }
381
382    #[test]
383    fn test_checkpoint_metadata() {
384        let tracker = OffsetTracker::new();
385        let cp = tracker.to_checkpoint();
386        assert_eq!(cp.get_metadata("connector"), Some("kafka"));
387    }
388
389    #[test]
390    fn test_retain_assigned() {
391        let mut tracker = OffsetTracker::new();
392        tracker.update("events", 0, 100);
393        tracker.update("events", 1, 200);
394        tracker.update("events", 2, 300);
395        tracker.update("orders", 0, 50);
396
397        let mut assigned = HashSet::new();
398        assigned.insert(("events".to_string(), 0));
399        assigned.insert(("events".to_string(), 2));
400        // orders-0 and events-1 are NOT assigned (revoked)
401
402        tracker.retain_assigned(&assigned);
403
404        assert_eq!(tracker.get("events", 0), Some(100));
405        assert_eq!(tracker.get("events", 1), None); // removed
406        assert_eq!(tracker.get("events", 2), Some(300));
407        assert_eq!(tracker.get("orders", 0), None); // removed
408        assert_eq!(tracker.partition_count(), 2);
409    }
410
411    #[test]
412    fn test_retain_assigned_empty_set_clears_all() {
413        let mut tracker = OffsetTracker::new();
414        tracker.update("events", 0, 100);
415
416        tracker.retain_assigned(&HashSet::new());
417
418        assert_eq!(tracker.partition_count(), 0);
419    }
420
421    #[test]
422    fn test_to_checkpoint_filtered() {
423        let mut tracker = OffsetTracker::new();
424        tracker.update("events", 0, 100);
425        tracker.update("events", 1, 200);
426        tracker.update("orders", 0, 50);
427
428        let mut assigned = HashSet::new();
429        assigned.insert(("events".to_string(), 0));
430        assigned.insert(("orders".to_string(), 0));
431
432        let cp = tracker.to_checkpoint_filtered(&assigned);
433
434        assert_eq!(cp.get_offset("events-0"), Some("100"));
435        assert_eq!(cp.get_offset("events-1"), None); // filtered out
436        assert_eq!(cp.get_offset("orders-0"), Some("50"));
437    }
438
439    #[test]
440    fn test_to_checkpoint_filtered_empty_returns_empty() {
441        let mut tracker = OffsetTracker::new();
442        tracker.update("events", 0, 100);
443        tracker.update("events", 1, 200);
444
445        // Empty assigned set → no owned partitions → empty checkpoint
446        let cp = tracker.to_checkpoint_filtered(&HashSet::new());
447
448        assert!(cp.is_empty());
449    }
450
451    #[test]
452    fn test_to_seek_tpl_known_and_unknown() {
453        let mut tracker = OffsetTracker::new();
454        tracker.update("events", 0, 99);
455        tracker.update("events", 1, 199);
456
457        let assigned = vec![
458            ("events".to_string(), 0),
459            ("events".to_string(), 1),
460            ("events".to_string(), 2), // unknown — omitted from seek TPL
461        ];
462        let tpl = tracker.to_seek_tpl(&assigned);
463        // Only partitions with known offsets are included.
464        assert_eq!(tpl.count(), 2);
465
466        for elem in tpl.elements() {
467            match (elem.topic(), elem.partition()) {
468                ("events", 0) => assert_eq!(elem.offset(), Offset::Offset(100)),
469                ("events", 1) => assert_eq!(elem.offset(), Offset::Offset(200)),
470                _ => panic!("unexpected partition {}-{}", elem.topic(), elem.partition()),
471            }
472        }
473    }
474
475    #[test]
476    fn test_to_seek_tpl_empty_tracker() {
477        let tracker = OffsetTracker::new();
478        let assigned = vec![("events".to_string(), 0)];
479        let tpl = tracker.to_seek_tpl(&assigned);
480        // No known offsets → empty TPL.
481        assert_eq!(tpl.count(), 0);
482    }
483}