Skip to main content

laminar_connectors/kafka/
offsets.rs

1//! Kafka offset tracking for per-partition consumption progress.
2//!
3//! [`OffsetTracker`] maintains the latest consumed offset for each
4//! topic-partition and supports checkpoint/restore roundtrips via
5//! [`SourceCheckpoint`].
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9
10use rdkafka::Offset;
11use rdkafka::TopicPartitionList;
12
13use crate::checkpoint::SourceCheckpoint;
14
15/// Tracks consumed offsets per topic-partition.
16///
17/// Offsets stored are the last-consumed offset (not the next offset to fetch).
18/// When committing to Kafka, `to_topic_partition_list()` returns offset+1
19/// (the next offset to consume) per Kafka convention.
20#[derive(Debug, Clone, Default)]
21pub struct OffsetTracker {
22    /// Two-level map: topic -> (partition -> offset). Uses `Arc<str>` keys
23    /// to avoid per-message String allocations on the hot path.
24    topics: HashMap<Arc<str>, HashMap<i32, i64>>,
25}
26
27impl OffsetTracker {
28    /// Offsets start empty; populated by `update()` / `from_checkpoint()`.
29    #[must_use]
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Updates the offset (monotonic: only advances forward).
35    pub fn update(&mut self, topic: &str, partition: i32, offset: i64) {
36        if let Some(partitions) = self.topics.get_mut(topic as &str) {
37            partitions
38                .entry(partition)
39                .and_modify(|existing| {
40                    if offset > *existing {
41                        *existing = offset;
42                    }
43                })
44                .or_insert(offset);
45        } else {
46            let mut partitions = HashMap::new();
47            partitions.insert(partition, offset);
48            self.topics.insert(Arc::from(topic), partitions);
49        }
50    }
51
52    /// Updates the offset using a pre-interned topic Arc (avoids allocation).
53    pub fn update_arc(&mut self, topic: &Arc<str>, partition: i32, offset: i64) {
54        if let Some(partitions) = self.topics.get_mut(&**topic as &str) {
55            partitions
56                .entry(partition)
57                .and_modify(|existing| {
58                    if offset > *existing {
59                        *existing = offset;
60                    }
61                })
62                .or_insert(offset);
63        } else {
64            let mut partitions = HashMap::new();
65            partitions.insert(partition, offset);
66            self.topics.insert(Arc::clone(topic), partitions);
67        }
68    }
69
70    /// Unconditionally sets the offset for a topic-partition (used by restore).
71    pub fn update_force(&mut self, topic: &str, partition: i32, offset: i64) {
72        self.topics
73            .entry(Arc::from(topic))
74            .or_default()
75            .insert(partition, offset);
76    }
77
78    /// Gets the last-consumed offset for a topic-partition.
79    #[must_use]
80    pub fn get(&self, topic: &str, partition: i32) -> Option<i64> {
81        self.topics
82            .get(topic)
83            .and_then(|p| p.get(&partition))
84            .copied()
85    }
86
87    /// Returns the total number of tracked partitions across all topics.
88    #[must_use]
89    pub fn partition_count(&self) -> usize {
90        self.topics.values().map(HashMap::len).sum()
91    }
92
93    /// Converts tracked offsets to a [`SourceCheckpoint`].
94    ///
95    /// Key format: `"{topic}-{partition}"`, value: offset as string.
96    #[must_use]
97    pub fn to_checkpoint(&self) -> SourceCheckpoint {
98        let offsets: HashMap<String, String> = self
99            .topics
100            .iter()
101            .flat_map(|(topic, partitions)| {
102                partitions.iter().map(move |(partition, offset)| {
103                    (format!("{topic}-{partition}"), offset.to_string())
104                })
105            })
106            .collect();
107        let mut cp = SourceCheckpoint::with_offsets(0, offsets);
108        cp.set_metadata("connector", "kafka");
109        cp
110    }
111
112    /// Restores offset state from a [`SourceCheckpoint`].
113    ///
114    /// Parses keys in `"{topic}-{partition}"` format. Logs warnings for
115    /// unparseable entries rather than silently dropping them.
116    #[must_use]
117    pub fn from_checkpoint(cp: &SourceCheckpoint) -> Self {
118        let mut tracker = Self::new();
119        for (key, value) in cp.offsets() {
120            match value.parse::<i64>() {
121                Ok(offset) => {
122                    if let Some(dash_pos) = key.rfind('-') {
123                        let topic = &key[..dash_pos];
124                        match key[dash_pos + 1..].parse::<i32>() {
125                            Ok(partition) => tracker.update_force(topic, partition, offset),
126                            Err(_) => {
127                                tracing::warn!(
128                                    key,
129                                    "skipping checkpoint entry with unparseable partition"
130                                );
131                            }
132                        }
133                    } else {
134                        tracing::warn!(
135                            key,
136                            "skipping checkpoint entry without topic-partition separator"
137                        );
138                    }
139                }
140                Err(_) => {
141                    tracing::warn!(
142                        key,
143                        value,
144                        "skipping checkpoint entry with unparseable offset"
145                    );
146                }
147            }
148        }
149        tracker
150    }
151
152    /// Builds an rdkafka [`TopicPartitionList`] for committing.
153    ///
154    /// Per Kafka convention, committed offsets are next-to-fetch (offset+1).
155    #[must_use]
156    pub fn to_topic_partition_list(&self) -> TopicPartitionList {
157        let mut tpl = TopicPartitionList::new();
158        for (topic, partitions) in &self.topics {
159            for (&partition, &offset) in partitions {
160                tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
161                    .ok();
162            }
163        }
164        tpl
165    }
166
167    /// Removes partitions that are not in the `assigned` set.
168    ///
169    /// Called after a rebalance revoke to purge offsets for partitions this
170    /// consumer no longer owns. This prevents stale offsets from leaking
171    /// into checkpoints and causing incorrect partition assignment on recovery.
172    pub fn retain_assigned(&mut self, assigned: &HashSet<(String, i32)>) {
173        self.topics.retain(|topic, partitions| {
174            partitions.retain(|&partition, _| assigned.contains(&(topic.to_string(), partition)));
175            !partitions.is_empty()
176        });
177    }
178
179    /// Builds a checkpoint containing only offsets for currently assigned partitions.
180    ///
181    /// Non-mutating alternative to `retain_assigned()` + `to_checkpoint()`.
182    /// When `assigned` is empty (either before first rebalance or after full
183    /// revocation), returns an empty checkpoint — this is correct because
184    /// there are no partitions this consumer owns.
185    #[must_use]
186    pub fn to_checkpoint_filtered(&self, assigned: &HashSet<(String, i32)>) -> SourceCheckpoint {
187        let offsets: HashMap<String, String> = self
188            .topics
189            .iter()
190            .flat_map(|(topic, partitions)| {
191                partitions.iter().filter_map(move |(partition, offset)| {
192                    if assigned.contains(&(topic.to_string(), *partition)) {
193                        Some((format!("{topic}-{partition}"), offset.to_string()))
194                    } else {
195                        None
196                    }
197                })
198            })
199            .collect();
200        let mut cp = SourceCheckpoint::with_offsets(0, offsets);
201        cp.set_metadata("connector", "kafka");
202        cp
203    }
204
205    /// Builds an rdkafka [`TopicPartitionList`] for only assigned partitions.
206    ///
207    /// Like [`Self::to_topic_partition_list`] but filtered to the `assigned` set.
208    /// When `assigned` is empty, returns an empty TPL.
209    #[must_use]
210    pub fn to_topic_partition_list_filtered(
211        &self,
212        assigned: &HashSet<(String, i32)>,
213    ) -> TopicPartitionList {
214        let mut tpl = TopicPartitionList::new();
215        for (topic, partitions) in &self.topics {
216            for (&partition, &offset) in partitions {
217                if assigned.contains(&(topic.to_string(), partition)) {
218                    tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
219                        .ok();
220                }
221            }
222        }
223        tpl
224    }
225
226    /// Clears all tracked offsets.
227    pub fn clear(&mut self) {
228        self.topics.clear();
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_update_and_get() {
238        let mut tracker = OffsetTracker::new();
239        tracker.update("events", 0, 100);
240        tracker.update("events", 1, 200);
241
242        assert_eq!(tracker.get("events", 0), Some(100));
243        assert_eq!(tracker.get("events", 1), Some(200));
244        assert_eq!(tracker.get("events", 2), None);
245        assert_eq!(tracker.partition_count(), 2);
246    }
247
248    #[test]
249    fn test_update_advances_forward() {
250        let mut tracker = OffsetTracker::new();
251        tracker.update("events", 0, 100);
252        tracker.update("events", 0, 200);
253        assert_eq!(tracker.get("events", 0), Some(200));
254    }
255
256    #[test]
257    fn test_update_rejects_regression() {
258        let mut tracker = OffsetTracker::new();
259        tracker.update("events", 0, 200);
260        tracker.update("events", 0, 100); // should be ignored
261        assert_eq!(tracker.get("events", 0), Some(200));
262    }
263
264    #[test]
265    fn test_update_rejects_equal() {
266        let mut tracker = OffsetTracker::new();
267        tracker.update("events", 0, 100);
268        tracker.update("events", 0, 100); // same offset, no change
269        assert_eq!(tracker.get("events", 0), Some(100));
270    }
271
272    #[test]
273    fn test_update_force_overwrites() {
274        let mut tracker = OffsetTracker::new();
275        tracker.update("events", 0, 200);
276        tracker.update_force("events", 0, 50); // force allows regression
277        assert_eq!(tracker.get("events", 0), Some(50));
278    }
279
280    #[test]
281    fn test_checkpoint_roundtrip() {
282        let mut tracker = OffsetTracker::new();
283        tracker.update("events", 0, 100);
284        tracker.update("events", 1, 200);
285        tracker.update("orders", 0, 50);
286
287        let cp = tracker.to_checkpoint();
288        let restored = OffsetTracker::from_checkpoint(&cp);
289
290        assert_eq!(restored.get("events", 0), Some(100));
291        assert_eq!(restored.get("events", 1), Some(200));
292        assert_eq!(restored.get("orders", 0), Some(50));
293        assert_eq!(restored.partition_count(), 3);
294    }
295
296    #[test]
297    fn test_empty_tracker() {
298        let tracker = OffsetTracker::new();
299        assert_eq!(tracker.partition_count(), 0);
300        assert!(tracker.to_checkpoint().is_empty());
301    }
302
303    #[test]
304    fn test_topic_partition_list() {
305        let mut tracker = OffsetTracker::new();
306        tracker.update("events", 0, 99);
307        tracker.update("events", 1, 199);
308
309        let tpl = tracker.to_topic_partition_list();
310        let elements = tpl.elements();
311        assert_eq!(elements.len(), 2);
312
313        for elem in &elements {
314            match elem.partition() {
315                0 => assert_eq!(elem.offset(), Offset::Offset(100)),
316                1 => assert_eq!(elem.offset(), Offset::Offset(200)),
317                _ => panic!("unexpected partition"),
318            }
319        }
320    }
321
322    #[test]
323    fn test_clear() {
324        let mut tracker = OffsetTracker::new();
325        tracker.update("events", 0, 100);
326        tracker.clear();
327        assert_eq!(tracker.partition_count(), 0);
328        assert_eq!(tracker.get("events", 0), None);
329    }
330
331    #[test]
332    fn test_multi_topic_checkpoint() {
333        let mut tracker = OffsetTracker::new();
334        tracker.update("topic-a", 0, 10);
335        tracker.update("topic-b", 0, 20);
336
337        let cp = tracker.to_checkpoint();
338        let restored = OffsetTracker::from_checkpoint(&cp);
339
340        assert_eq!(restored.get("topic-a", 0), Some(10));
341        assert_eq!(restored.get("topic-b", 0), Some(20));
342    }
343
344    #[test]
345    fn test_checkpoint_metadata() {
346        let tracker = OffsetTracker::new();
347        let cp = tracker.to_checkpoint();
348        assert_eq!(cp.get_metadata("connector"), Some("kafka"));
349    }
350
351    #[test]
352    fn test_retain_assigned() {
353        let mut tracker = OffsetTracker::new();
354        tracker.update("events", 0, 100);
355        tracker.update("events", 1, 200);
356        tracker.update("events", 2, 300);
357        tracker.update("orders", 0, 50);
358
359        let mut assigned = HashSet::new();
360        assigned.insert(("events".to_string(), 0));
361        assigned.insert(("events".to_string(), 2));
362        // orders-0 and events-1 are NOT assigned (revoked)
363
364        tracker.retain_assigned(&assigned);
365
366        assert_eq!(tracker.get("events", 0), Some(100));
367        assert_eq!(tracker.get("events", 1), None); // removed
368        assert_eq!(tracker.get("events", 2), Some(300));
369        assert_eq!(tracker.get("orders", 0), None); // removed
370        assert_eq!(tracker.partition_count(), 2);
371    }
372
373    #[test]
374    fn test_retain_assigned_empty_set_clears_all() {
375        let mut tracker = OffsetTracker::new();
376        tracker.update("events", 0, 100);
377
378        tracker.retain_assigned(&HashSet::new());
379
380        assert_eq!(tracker.partition_count(), 0);
381    }
382
383    #[test]
384    fn test_to_checkpoint_filtered() {
385        let mut tracker = OffsetTracker::new();
386        tracker.update("events", 0, 100);
387        tracker.update("events", 1, 200);
388        tracker.update("orders", 0, 50);
389
390        let mut assigned = HashSet::new();
391        assigned.insert(("events".to_string(), 0));
392        assigned.insert(("orders".to_string(), 0));
393
394        let cp = tracker.to_checkpoint_filtered(&assigned);
395
396        assert_eq!(cp.get_offset("events-0"), Some("100"));
397        assert_eq!(cp.get_offset("events-1"), None); // filtered out
398        assert_eq!(cp.get_offset("orders-0"), Some("50"));
399    }
400
401    #[test]
402    fn test_to_checkpoint_filtered_empty_returns_empty() {
403        let mut tracker = OffsetTracker::new();
404        tracker.update("events", 0, 100);
405        tracker.update("events", 1, 200);
406
407        // Empty assigned set → no owned partitions → empty checkpoint
408        let cp = tracker.to_checkpoint_filtered(&HashSet::new());
409
410        assert!(cp.is_empty());
411    }
412}