Skip to main content

laminar_connectors/kafka/
watermarks.rs

1//! Kafka per-partition watermark tracking.
2
3use std::time::{Duration, Instant};
4
5/// Kafka-specific watermark tracker wrapping `PartitionedWatermarkTracker`.
6///
7/// Tracks watermarks per Kafka partition and computes a combined watermark
8/// as the minimum across all active (non-idle) partitions.
9#[derive(Debug)]
10pub struct KafkaWatermarkTracker {
11    /// Source ID for this tracker.
12    source_id: usize,
13    /// Per-partition watermarks (indexed by partition number).
14    partition_watermarks: Vec<PartitionState>,
15    /// Combined watermark (minimum of active partitions).
16    combined_watermark: i64,
17    /// Idle timeout for automatic idle detection.
18    idle_timeout: Duration,
19    /// Maximum out-of-orderness for watermark generation.
20    max_out_of_orderness: Duration,
21    /// Metrics.
22    metrics: WatermarkMetrics,
23}
24
25/// Per-partition watermark state.
26#[derive(Debug, Clone)]
27struct PartitionState {
28    /// Current watermark (event time minus out-of-orderness bound).
29    watermark: i64,
30    /// Maximum event time seen.
31    max_event_time: i64,
32    /// Last activity timestamp.
33    last_activity: Instant,
34    /// Whether marked as idle.
35    is_idle: bool,
36}
37
38impl PartitionState {
39    fn new() -> Self {
40        Self {
41            watermark: i64::MIN,
42            max_event_time: i64::MIN,
43            last_activity: Instant::now(),
44            is_idle: false,
45        }
46    }
47}
48
49/// Metrics for watermark tracking.
50///
51/// The tracker is single-threaded (owned by `KafkaSource`), so plain
52/// `u64` counters are used instead of atomics.
53#[derive(Debug, Clone, Copy, Default)]
54pub struct WatermarkMetrics {
55    /// Total watermark updates.
56    pub updates: u64,
57    /// Watermark advances.
58    pub advances: u64,
59    /// Partitions marked idle.
60    pub idle_transitions: u64,
61    /// Partitions resumed from idle.
62    pub active_transitions: u64,
63}
64
65impl KafkaWatermarkTracker {
66    /// Creates a new Kafka watermark tracker.
67    ///
68    /// # Arguments
69    ///
70    /// * `source_id` - Unique identifier for this source
71    /// * `idle_timeout` - Duration after which an inactive partition is marked idle
72    #[must_use]
73    pub fn new(source_id: usize, idle_timeout: Duration) -> Self {
74        Self {
75            source_id,
76            partition_watermarks: Vec::new(),
77            combined_watermark: i64::MIN,
78            idle_timeout,
79            max_out_of_orderness: Duration::from_secs(5),
80            metrics: WatermarkMetrics::default(),
81        }
82    }
83
84    /// Creates a tracker with custom out-of-orderness bound.
85    #[must_use]
86    pub fn with_max_out_of_orderness(mut self, max_out_of_orderness: Duration) -> Self {
87        self.max_out_of_orderness = max_out_of_orderness;
88        self
89    }
90
91    /// Returns the source ID.
92    #[must_use]
93    pub fn source_id(&self) -> usize {
94        self.source_id
95    }
96
97    /// Registers partitions for tracking.
98    ///
99    /// Call this when partitions are assigned during Kafka rebalance.
100    pub fn register_partitions(&mut self, num_partitions: usize) {
101        self.partition_watermarks
102            .resize_with(num_partitions, PartitionState::new);
103    }
104
105    /// Adds a partition (e.g., during Kafka rebalance).
106    pub fn add_partition(&mut self, partition: i32) {
107        let Some(idx) = usize::try_from(partition).ok() else {
108            return; // Ignore negative partitions
109        };
110        if idx >= self.partition_watermarks.len() {
111            self.partition_watermarks
112                .resize_with(idx + 1, PartitionState::new);
113        }
114    }
115
116    /// Removes a partition (e.g., during Kafka rebalance).
117    pub fn remove_partition(&mut self, partition: i32) {
118        let Some(idx) = usize::try_from(partition).ok() else {
119            return; // Ignore negative partitions
120        };
121        if idx < self.partition_watermarks.len() {
122            // Mark as idle rather than removing to maintain indices
123            self.partition_watermarks[idx].is_idle = true;
124            self.partition_watermarks[idx].watermark = i64::MAX; // Exclude from min
125        }
126        // Truncate trailing idle entries so the Vec doesn't retain
127        // capacity for revoked high-numbered partitions.
128        while self.partition_watermarks.last().is_some_and(|s| s.is_idle) {
129            self.partition_watermarks.pop();
130        }
131        self.recompute_combined();
132    }
133
134    /// Updates the watermark for a partition.
135    ///
136    /// # Arguments
137    ///
138    /// * `partition` - Kafka partition number
139    /// * `event_time` - Event timestamp in milliseconds
140    ///
141    /// # Returns
142    ///
143    /// `true` if the combined watermark advanced.
144    pub fn update_partition(&mut self, partition: i32, event_time: i64) -> bool {
145        let Some(idx) = usize::try_from(partition).ok() else {
146            return false; // Ignore negative partitions
147        };
148        if idx >= self.partition_watermarks.len() {
149            self.partition_watermarks
150                .resize_with(idx + 1, PartitionState::new);
151        }
152
153        let state = &mut self.partition_watermarks[idx];
154        state.last_activity = Instant::now();
155        self.metrics.updates += 1;
156
157        // Resume if was idle
158        if state.is_idle {
159            state.is_idle = false;
160            self.metrics.active_transitions += 1;
161        }
162
163        // Update max event time
164        if event_time > state.max_event_time {
165            state.max_event_time = event_time;
166            // Watermark = max_event_time - max_out_of_orderness
167            // Saturate to i64::MAX if duration is too large (extremely unlikely in practice)
168            let out_of_order_ms =
169                i64::try_from(self.max_out_of_orderness.as_millis()).unwrap_or(i64::MAX);
170            let new_watermark = event_time.saturating_sub(out_of_order_ms);
171            if new_watermark > state.watermark {
172                state.watermark = new_watermark;
173            }
174        }
175
176        self.recompute_combined()
177    }
178
179    /// Marks a partition as idle.
180    ///
181    /// Idle partitions are excluded from watermark computation.
182    pub fn mark_idle(&mut self, partition: i32) {
183        let Some(idx) = usize::try_from(partition).ok() else {
184            return; // Ignore negative partitions
185        };
186        if idx < self.partition_watermarks.len() && !self.partition_watermarks[idx].is_idle {
187            self.partition_watermarks[idx].is_idle = true;
188            self.metrics.idle_transitions += 1;
189            self.recompute_combined();
190        }
191    }
192
193    /// Checks for idle partitions based on timeout and marks them.
194    ///
195    /// Call this periodically (e.g., every poll cycle).
196    pub fn check_idle_partitions(&mut self) {
197        let now = Instant::now();
198        let mut any_changed = false;
199
200        for state in &mut self.partition_watermarks {
201            if !state.is_idle && now.duration_since(state.last_activity) > self.idle_timeout {
202                state.is_idle = true;
203                self.metrics.idle_transitions += 1;
204                any_changed = true;
205            }
206        }
207
208        if any_changed {
209            self.recompute_combined();
210        }
211    }
212
213    /// Returns the current combined watermark.
214    ///
215    /// Returns `None` if no partitions are registered or all are idle.
216    #[must_use]
217    pub fn current_watermark(&self) -> Option<i64> {
218        if self.combined_watermark == i64::MIN {
219            None
220        } else {
221            Some(self.combined_watermark)
222        }
223    }
224
225    /// Returns the number of active (non-idle) partitions.
226    #[must_use]
227    pub fn active_partition_count(&self) -> usize {
228        self.partition_watermarks
229            .iter()
230            .filter(|s| !s.is_idle)
231            .count()
232    }
233
234    /// Returns the number of idle partitions.
235    #[must_use]
236    pub fn idle_partition_count(&self) -> usize {
237        self.partition_watermarks
238            .iter()
239            .filter(|s| s.is_idle)
240            .count()
241    }
242
243    /// Returns the total number of registered partitions.
244    #[must_use]
245    pub fn partition_count(&self) -> usize {
246        self.partition_watermarks.len()
247    }
248
249    /// Returns metrics for this tracker.
250    #[must_use]
251    pub fn metrics(&self) -> &WatermarkMetrics {
252        &self.metrics
253    }
254
255    /// Returns the watermark for a specific partition.
256    #[must_use]
257    pub fn partition_watermark(&self, partition: i32) -> Option<i64> {
258        let idx = usize::try_from(partition).ok()?;
259        self.partition_watermarks.get(idx).and_then(|s| {
260            if s.watermark == i64::MIN {
261                None
262            } else {
263                Some(s.watermark)
264            }
265        })
266    }
267
268    /// Returns whether a partition is idle.
269    #[must_use]
270    pub fn is_partition_idle(&self, partition: i32) -> bool {
271        let Some(idx) = usize::try_from(partition).ok() else {
272            return false;
273        };
274        self.partition_watermarks
275            .get(idx)
276            .is_some_and(|s| s.is_idle)
277    }
278
279    /// Recomputes the combined watermark from all active partitions.
280    fn recompute_combined(&mut self) -> bool {
281        let old = self.combined_watermark;
282
283        // Minimum watermark across active partitions
284        let min = self
285            .partition_watermarks
286            .iter()
287            .filter(|s| !s.is_idle && s.watermark != i64::MIN)
288            .map(|s| s.watermark)
289            .min();
290
291        self.combined_watermark = min.unwrap_or(i64::MIN);
292
293        let advanced = self.combined_watermark > old && old != i64::MIN;
294        if advanced {
295            self.metrics.advances += 1;
296        }
297        advanced
298    }
299}
300
301impl Default for KafkaWatermarkTracker {
302    fn default() -> Self {
303        Self::new(0, Duration::from_secs(30))
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_tracker_new() {
313        let tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
314        assert_eq!(tracker.source_id(), 0);
315        assert_eq!(tracker.partition_count(), 0);
316        assert!(tracker.current_watermark().is_none());
317    }
318
319    #[test]
320    fn test_register_partitions() {
321        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
322        tracker.register_partitions(4);
323        assert_eq!(tracker.partition_count(), 4);
324        assert_eq!(tracker.active_partition_count(), 4);
325        assert_eq!(tracker.idle_partition_count(), 0);
326    }
327
328    #[test]
329    fn test_update_partition() {
330        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
331            .with_max_out_of_orderness(Duration::from_secs(1));
332        tracker.register_partitions(2);
333
334        tracker.update_partition(0, 5000);
335        tracker.update_partition(1, 3000);
336
337        // Watermark = min(5000-1000, 3000-1000) = 2000
338        assert_eq!(tracker.current_watermark(), Some(2000));
339    }
340
341    #[test]
342    fn test_idle_partition() {
343        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
344            .with_max_out_of_orderness(Duration::from_secs(1));
345        tracker.register_partitions(2);
346
347        tracker.update_partition(0, 5000);
348        tracker.update_partition(1, 3000);
349
350        // Mark slow partition as idle
351        tracker.mark_idle(1);
352
353        // Watermark now advances (only considers partition 0)
354        assert_eq!(tracker.current_watermark(), Some(4000));
355        assert_eq!(tracker.active_partition_count(), 1);
356        assert_eq!(tracker.idle_partition_count(), 1);
357    }
358
359    #[test]
360    fn test_resume_from_idle() {
361        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
362            .with_max_out_of_orderness(Duration::from_secs(1));
363        tracker.register_partitions(2);
364
365        tracker.update_partition(0, 5000);
366        tracker.mark_idle(1);
367        assert_eq!(tracker.active_partition_count(), 1);
368
369        // Update idle partition - should resume
370        tracker.update_partition(1, 4000);
371        assert_eq!(tracker.active_partition_count(), 2);
372        // Watermark = min(4000, 3000) = 3000
373        assert_eq!(tracker.current_watermark(), Some(3000));
374    }
375
376    #[test]
377    fn test_add_partition_dynamically() {
378        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
379            .with_max_out_of_orderness(Duration::from_secs(1));
380
381        tracker.update_partition(0, 5000);
382        tracker.add_partition(5);
383        tracker.update_partition(5, 3000);
384
385        assert_eq!(tracker.partition_count(), 6); // 0-5
386        assert_eq!(tracker.current_watermark(), Some(2000));
387    }
388
389    #[test]
390    fn test_remove_partition() {
391        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
392            .with_max_out_of_orderness(Duration::from_secs(1));
393        tracker.register_partitions(2);
394
395        tracker.update_partition(0, 5000);
396        tracker.update_partition(1, 3000);
397
398        tracker.remove_partition(1);
399
400        // Watermark advances (partition 1 excluded)
401        assert_eq!(tracker.current_watermark(), Some(4000));
402    }
403
404    #[test]
405    fn test_partition_watermark() {
406        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
407            .with_max_out_of_orderness(Duration::from_secs(1));
408        tracker.register_partitions(2);
409
410        tracker.update_partition(0, 5000);
411        tracker.update_partition(1, 3000);
412
413        assert_eq!(tracker.partition_watermark(0), Some(4000));
414        assert_eq!(tracker.partition_watermark(1), Some(2000));
415        assert!(tracker.partition_watermark(99).is_none());
416    }
417
418    #[test]
419    fn test_metrics() {
420        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
421        tracker.register_partitions(2);
422
423        tracker.update_partition(0, 5000);
424        tracker.update_partition(1, 3000);
425        tracker.mark_idle(1);
426        tracker.update_partition(1, 4000); // resume
427
428        let m = tracker.metrics();
429        assert_eq!(m.updates, 3);
430        assert_eq!(m.idle_transitions, 1);
431        assert_eq!(m.active_transitions, 1);
432    }
433
434    #[test]
435    fn test_all_partitions_idle() {
436        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
437        tracker.register_partitions(2);
438
439        tracker.update_partition(0, 5000);
440        tracker.update_partition(1, 3000);
441        tracker.mark_idle(0);
442        tracker.mark_idle(1);
443
444        // No active partitions - no watermark
445        assert!(tracker.current_watermark().is_none());
446    }
447
448    #[test]
449    fn test_remove_partition_truncates_trailing() {
450        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
451        tracker.register_partitions(4); // 0, 1, 2, 3
452        tracker.update_partition(0, 1000);
453        tracker.update_partition(1, 1000);
454        tracker.update_partition(3, 2000);
455
456        // Remove trailing partitions — Vec should shrink.
457        // Partition 2 was never updated but is still active (idle=false).
458        // Removing partition 3 (last, now idle) truncates to len=3.
459        // Removing partition 2 marks it idle — but it's not trailing
460        // because partition 2 was registered as active (idle=false) by
461        // register_partitions. We need to remove it explicitly.
462        tracker.remove_partition(3);
463        assert_eq!(tracker.partition_count(), 3, "trailing idle p3 truncated");
464        tracker.remove_partition(2);
465        assert_eq!(tracker.partition_count(), 2, "trailing idle p2 truncated");
466
467        // Remove middle partition — Vec should NOT shrink past active.
468        tracker.register_partitions(4);
469        tracker.update_partition(3, 2000);
470        tracker.remove_partition(1);
471        assert_eq!(
472            tracker.partition_count(),
473            4,
474            "middle idle does not truncate"
475        );
476    }
477}