Skip to main content

laminar_connectors/kafka/
watermarks.rs

1//! Kafka watermark integration.
2//!
3//! Integrates `laminar-core` per-partition watermarks and watermark
4//! alignment groups with the Kafka source connector.
5//!
6//! # Per-Partition Watermarks
7//!
8//! Tracks watermarks at Kafka partition granularity, allowing progress even
9//! when some partitions are slow or idle:
10//!
11//! ```rust,ignore
12//! use laminar_connectors::kafka::watermarks::KafkaWatermarkTracker;
13//! use std::time::Duration;
14//!
15//! let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
16//! tracker.register_partitions(4);
17//!
18//! // Update individual partitions
19//! tracker.update_partition(0, 5000);
20//! tracker.update_partition(1, 3000); // slow partition
21//!
22//! // Combined watermark is minimum (3000)
23//! assert_eq!(tracker.current_watermark(), Some(3000));
24//!
25//! // Mark slow partition as idle
26//! tracker.mark_idle(1);
27//!
28//! // Now watermark advances (5000)
29//! assert_eq!(tracker.current_watermark(), Some(5000));
30//! ```
31//!
32//! # Watermark Alignment
33//!
34//! For multi-topic joins, alignment prevents fast topics from building
35//! unbounded state while waiting for slow topics:
36//!
37//! ```rust,ignore
38//! use laminar_connectors::kafka::watermarks::{KafkaAlignmentConfig, KafkaAlignmentMode};
39//! use std::time::Duration;
40//!
41//! let config = KafkaAlignmentConfig {
42//!     group_id: "orders-payments".to_string(),
43//!     max_drift: Duration::from_secs(300), // 5 minutes
44//!     mode: KafkaAlignmentMode::Pause,
45//! };
46//! ```
47
48use std::sync::atomic::{AtomicU64, Ordering};
49use std::time::{Duration, Instant};
50
51/// Kafka-specific watermark tracker wrapping `PartitionedWatermarkTracker`.
52///
53/// Tracks watermarks per Kafka partition and computes a combined watermark
54/// as the minimum across all active (non-idle) partitions.
55#[derive(Debug)]
56pub struct KafkaWatermarkTracker {
57    /// Source ID for this tracker.
58    source_id: usize,
59    /// Per-partition watermarks (indexed by partition number).
60    partition_watermarks: Vec<PartitionState>,
61    /// Combined watermark (minimum of active partitions).
62    combined_watermark: i64,
63    /// Idle timeout for automatic idle detection.
64    idle_timeout: Duration,
65    /// Maximum out-of-orderness for watermark generation.
66    max_out_of_orderness: Duration,
67    /// Metrics.
68    metrics: WatermarkMetrics,
69}
70
71/// Per-partition watermark state.
72#[derive(Debug, Clone)]
73struct PartitionState {
74    /// Current watermark (event time minus out-of-orderness bound).
75    watermark: i64,
76    /// Maximum event time seen.
77    max_event_time: i64,
78    /// Last activity timestamp.
79    last_activity: Instant,
80    /// Whether marked as idle.
81    is_idle: bool,
82}
83
84impl PartitionState {
85    fn new() -> Self {
86        Self {
87            watermark: i64::MIN,
88            max_event_time: i64::MIN,
89            last_activity: Instant::now(),
90            is_idle: false,
91        }
92    }
93}
94
95/// Metrics for watermark tracking.
96#[derive(Debug, Default)]
97pub struct WatermarkMetrics {
98    /// Total watermark updates.
99    pub updates: AtomicU64,
100    /// Watermark advances.
101    pub advances: AtomicU64,
102    /// Partitions marked idle.
103    pub idle_transitions: AtomicU64,
104    /// Partitions resumed from idle.
105    pub active_transitions: AtomicU64,
106}
107
108impl WatermarkMetrics {
109    fn new() -> Self {
110        Self::default()
111    }
112
113    /// Returns a snapshot of current metrics.
114    #[must_use]
115    pub fn snapshot(&self) -> WatermarkMetricsSnapshot {
116        WatermarkMetricsSnapshot {
117            updates: self.updates.load(Ordering::Relaxed),
118            advances: self.advances.load(Ordering::Relaxed),
119            idle_transitions: self.idle_transitions.load(Ordering::Relaxed),
120            active_transitions: self.active_transitions.load(Ordering::Relaxed),
121        }
122    }
123}
124
125/// Snapshot of watermark metrics.
126#[derive(Debug, Clone, Default)]
127pub struct WatermarkMetricsSnapshot {
128    /// Total watermark updates.
129    pub updates: u64,
130    /// Watermark advances.
131    pub advances: u64,
132    /// Partitions marked idle.
133    pub idle_transitions: u64,
134    /// Partitions resumed from idle.
135    pub active_transitions: u64,
136}
137
138impl KafkaWatermarkTracker {
139    /// Creates a new Kafka watermark tracker.
140    ///
141    /// # Arguments
142    ///
143    /// * `source_id` - Unique identifier for this source
144    /// * `idle_timeout` - Duration after which an inactive partition is marked idle
145    #[must_use]
146    pub fn new(source_id: usize, idle_timeout: Duration) -> Self {
147        Self {
148            source_id,
149            partition_watermarks: Vec::new(),
150            combined_watermark: i64::MIN,
151            idle_timeout,
152            max_out_of_orderness: Duration::from_secs(5),
153            metrics: WatermarkMetrics::new(),
154        }
155    }
156
157    /// Creates a tracker with custom out-of-orderness bound.
158    #[must_use]
159    pub fn with_max_out_of_orderness(mut self, max_out_of_orderness: Duration) -> Self {
160        self.max_out_of_orderness = max_out_of_orderness;
161        self
162    }
163
164    /// Returns the source ID.
165    #[must_use]
166    pub fn source_id(&self) -> usize {
167        self.source_id
168    }
169
170    /// Registers partitions for tracking.
171    ///
172    /// Call this when partitions are assigned during Kafka rebalance.
173    pub fn register_partitions(&mut self, num_partitions: usize) {
174        self.partition_watermarks
175            .resize_with(num_partitions, PartitionState::new);
176    }
177
178    /// Adds a partition (e.g., during Kafka rebalance).
179    pub fn add_partition(&mut self, partition: i32) {
180        let Some(idx) = usize::try_from(partition).ok() else {
181            return; // Ignore negative partitions
182        };
183        if idx >= self.partition_watermarks.len() {
184            self.partition_watermarks
185                .resize_with(idx + 1, PartitionState::new);
186        }
187    }
188
189    /// Removes a partition (e.g., during Kafka rebalance).
190    pub fn remove_partition(&mut self, partition: i32) {
191        let Some(idx) = usize::try_from(partition).ok() else {
192            return; // Ignore negative partitions
193        };
194        if idx < self.partition_watermarks.len() {
195            // Mark as idle rather than removing to maintain indices
196            self.partition_watermarks[idx].is_idle = true;
197            self.partition_watermarks[idx].watermark = i64::MAX; // Exclude from min
198        }
199        self.recompute_combined();
200    }
201
202    /// Updates the watermark for a partition.
203    ///
204    /// # Arguments
205    ///
206    /// * `partition` - Kafka partition number
207    /// * `event_time` - Event timestamp in milliseconds
208    ///
209    /// # Returns
210    ///
211    /// `true` if the combined watermark advanced.
212    pub fn update_partition(&mut self, partition: i32, event_time: i64) -> bool {
213        let Some(idx) = usize::try_from(partition).ok() else {
214            return false; // Ignore negative partitions
215        };
216        if idx >= self.partition_watermarks.len() {
217            self.partition_watermarks
218                .resize_with(idx + 1, PartitionState::new);
219        }
220
221        let state = &mut self.partition_watermarks[idx];
222        state.last_activity = Instant::now();
223        self.metrics.updates.fetch_add(1, Ordering::Relaxed);
224
225        // Resume if was idle
226        if state.is_idle {
227            state.is_idle = false;
228            self.metrics
229                .active_transitions
230                .fetch_add(1, Ordering::Relaxed);
231        }
232
233        // Update max event time
234        if event_time > state.max_event_time {
235            state.max_event_time = event_time;
236            // Watermark = max_event_time - max_out_of_orderness
237            // Saturate to i64::MAX if duration is too large (extremely unlikely in practice)
238            let out_of_order_ms =
239                i64::try_from(self.max_out_of_orderness.as_millis()).unwrap_or(i64::MAX);
240            let new_watermark = event_time.saturating_sub(out_of_order_ms);
241            if new_watermark > state.watermark {
242                state.watermark = new_watermark;
243            }
244        }
245
246        self.recompute_combined()
247    }
248
249    /// Marks a partition as idle.
250    ///
251    /// Idle partitions are excluded from watermark computation.
252    pub fn mark_idle(&mut self, partition: i32) {
253        let Some(idx) = usize::try_from(partition).ok() else {
254            return; // Ignore negative partitions
255        };
256        if idx < self.partition_watermarks.len() && !self.partition_watermarks[idx].is_idle {
257            self.partition_watermarks[idx].is_idle = true;
258            self.metrics
259                .idle_transitions
260                .fetch_add(1, Ordering::Relaxed);
261            self.recompute_combined();
262        }
263    }
264
265    /// Checks for idle partitions based on timeout and marks them.
266    ///
267    /// Call this periodically (e.g., every poll cycle).
268    pub fn check_idle_partitions(&mut self) {
269        let now = Instant::now();
270        let mut any_changed = false;
271
272        for state in &mut self.partition_watermarks {
273            if !state.is_idle && now.duration_since(state.last_activity) > self.idle_timeout {
274                state.is_idle = true;
275                self.metrics
276                    .idle_transitions
277                    .fetch_add(1, Ordering::Relaxed);
278                any_changed = true;
279            }
280        }
281
282        if any_changed {
283            self.recompute_combined();
284        }
285    }
286
287    /// Returns the current combined watermark.
288    ///
289    /// Returns `None` if no partitions are registered or all are idle.
290    #[must_use]
291    pub fn current_watermark(&self) -> Option<i64> {
292        if self.combined_watermark == i64::MIN {
293            None
294        } else {
295            Some(self.combined_watermark)
296        }
297    }
298
299    /// Returns the number of active (non-idle) partitions.
300    #[must_use]
301    pub fn active_partition_count(&self) -> usize {
302        self.partition_watermarks
303            .iter()
304            .filter(|s| !s.is_idle)
305            .count()
306    }
307
308    /// Returns the number of idle partitions.
309    #[must_use]
310    pub fn idle_partition_count(&self) -> usize {
311        self.partition_watermarks
312            .iter()
313            .filter(|s| s.is_idle)
314            .count()
315    }
316
317    /// Returns the total number of registered partitions.
318    #[must_use]
319    pub fn partition_count(&self) -> usize {
320        self.partition_watermarks.len()
321    }
322
323    /// Returns metrics for this tracker.
324    #[must_use]
325    pub fn metrics(&self) -> &WatermarkMetrics {
326        &self.metrics
327    }
328
329    /// Returns the watermark for a specific partition.
330    #[must_use]
331    pub fn partition_watermark(&self, partition: i32) -> Option<i64> {
332        let idx = usize::try_from(partition).ok()?;
333        self.partition_watermarks.get(idx).and_then(|s| {
334            if s.watermark == i64::MIN {
335                None
336            } else {
337                Some(s.watermark)
338            }
339        })
340    }
341
342    /// Returns whether a partition is idle.
343    #[must_use]
344    pub fn is_partition_idle(&self, partition: i32) -> bool {
345        let Some(idx) = usize::try_from(partition).ok() else {
346            return false;
347        };
348        self.partition_watermarks
349            .get(idx)
350            .is_some_and(|s| s.is_idle)
351    }
352
353    /// Recomputes the combined watermark from all active partitions.
354    fn recompute_combined(&mut self) -> bool {
355        let old = self.combined_watermark;
356
357        // Minimum watermark across active partitions
358        let min = self
359            .partition_watermarks
360            .iter()
361            .filter(|s| !s.is_idle && s.watermark != i64::MIN)
362            .map(|s| s.watermark)
363            .min();
364
365        self.combined_watermark = min.unwrap_or(i64::MIN);
366
367        let advanced = self.combined_watermark > old && old != i64::MIN;
368        if advanced {
369            self.metrics.advances.fetch_add(1, Ordering::Relaxed);
370        }
371        advanced
372    }
373}
374
375impl Default for KafkaWatermarkTracker {
376    fn default() -> Self {
377        Self::new(0, Duration::from_secs(30))
378    }
379}
380
381/// Alignment mode for multi-source watermark coordination.
382#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
383pub enum KafkaAlignmentMode {
384    /// Pause fast sources until slow sources catch up.
385    #[default]
386    Pause,
387    /// Emit warnings but don't pause.
388    WarnOnly,
389    /// Drop events from fast sources that exceed drift.
390    DropExcess,
391}
392
393str_enum!(KafkaAlignmentMode, lowercase_udash, String, "invalid alignment mode",
394    Pause => "pause";
395    WarnOnly => "warn-only", "warnonly", "warn";
396    DropExcess => "drop-excess", "dropexcess", "drop"
397);
398
399/// Configuration for Kafka source watermark alignment.
400///
401/// When multiple Kafka sources are used in a join, alignment ensures
402/// fast sources don't get too far ahead of slow sources.
403#[derive(Debug, Clone)]
404pub struct KafkaAlignmentConfig {
405    /// Alignment group identifier (sources with same ID coordinate).
406    pub group_id: String,
407    /// Maximum allowed drift between fastest and slowest source.
408    pub max_drift: Duration,
409    /// Enforcement mode.
410    pub mode: KafkaAlignmentMode,
411}
412
413impl KafkaAlignmentConfig {
414    /// Creates a new alignment config with defaults.
415    #[must_use]
416    pub fn new(group_id: impl Into<String>) -> Self {
417        Self {
418            group_id: group_id.into(),
419            max_drift: Duration::from_secs(300), // 5 minutes
420            mode: KafkaAlignmentMode::Pause,
421        }
422    }
423
424    /// Sets the maximum drift.
425    #[must_use]
426    pub fn with_max_drift(mut self, max_drift: Duration) -> Self {
427        self.max_drift = max_drift;
428        self
429    }
430
431    /// Sets the enforcement mode.
432    #[must_use]
433    pub fn with_mode(mut self, mode: KafkaAlignmentMode) -> Self {
434        self.mode = mode;
435        self
436    }
437}
438
439impl Default for KafkaAlignmentConfig {
440    fn default() -> Self {
441        Self::new("default")
442    }
443}
444
445/// Result of an alignment check.
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum AlignmentCheckResult {
448    /// Continue processing normally.
449    Continue,
450    /// Source should pause (too far ahead).
451    Pause,
452    /// Source can resume (caught up).
453    Resume,
454    /// Event should be dropped (`DropExcess` mode).
455    Drop,
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_tracker_new() {
464        let tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
465        assert_eq!(tracker.source_id(), 0);
466        assert_eq!(tracker.partition_count(), 0);
467        assert!(tracker.current_watermark().is_none());
468    }
469
470    #[test]
471    fn test_register_partitions() {
472        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
473        tracker.register_partitions(4);
474        assert_eq!(tracker.partition_count(), 4);
475        assert_eq!(tracker.active_partition_count(), 4);
476        assert_eq!(tracker.idle_partition_count(), 0);
477    }
478
479    #[test]
480    fn test_update_partition() {
481        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
482            .with_max_out_of_orderness(Duration::from_millis(1000));
483        tracker.register_partitions(2);
484
485        tracker.update_partition(0, 5000);
486        tracker.update_partition(1, 3000);
487
488        // Watermark = min(5000-1000, 3000-1000) = 2000
489        assert_eq!(tracker.current_watermark(), Some(2000));
490    }
491
492    #[test]
493    fn test_idle_partition() {
494        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
495            .with_max_out_of_orderness(Duration::from_millis(1000));
496        tracker.register_partitions(2);
497
498        tracker.update_partition(0, 5000);
499        tracker.update_partition(1, 3000);
500
501        // Mark slow partition as idle
502        tracker.mark_idle(1);
503
504        // Watermark now advances (only considers partition 0)
505        assert_eq!(tracker.current_watermark(), Some(4000));
506        assert_eq!(tracker.active_partition_count(), 1);
507        assert_eq!(tracker.idle_partition_count(), 1);
508    }
509
510    #[test]
511    fn test_resume_from_idle() {
512        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
513            .with_max_out_of_orderness(Duration::from_millis(1000));
514        tracker.register_partitions(2);
515
516        tracker.update_partition(0, 5000);
517        tracker.mark_idle(1);
518        assert_eq!(tracker.active_partition_count(), 1);
519
520        // Update idle partition - should resume
521        tracker.update_partition(1, 4000);
522        assert_eq!(tracker.active_partition_count(), 2);
523        // Watermark = min(4000, 3000) = 3000
524        assert_eq!(tracker.current_watermark(), Some(3000));
525    }
526
527    #[test]
528    fn test_add_partition_dynamically() {
529        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
530            .with_max_out_of_orderness(Duration::from_millis(1000));
531
532        tracker.update_partition(0, 5000);
533        tracker.add_partition(5);
534        tracker.update_partition(5, 3000);
535
536        assert_eq!(tracker.partition_count(), 6); // 0-5
537        assert_eq!(tracker.current_watermark(), Some(2000));
538    }
539
540    #[test]
541    fn test_remove_partition() {
542        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
543            .with_max_out_of_orderness(Duration::from_millis(1000));
544        tracker.register_partitions(2);
545
546        tracker.update_partition(0, 5000);
547        tracker.update_partition(1, 3000);
548
549        tracker.remove_partition(1);
550
551        // Watermark advances (partition 1 excluded)
552        assert_eq!(tracker.current_watermark(), Some(4000));
553    }
554
555    #[test]
556    fn test_partition_watermark() {
557        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30))
558            .with_max_out_of_orderness(Duration::from_millis(1000));
559        tracker.register_partitions(2);
560
561        tracker.update_partition(0, 5000);
562        tracker.update_partition(1, 3000);
563
564        assert_eq!(tracker.partition_watermark(0), Some(4000));
565        assert_eq!(tracker.partition_watermark(1), Some(2000));
566        assert!(tracker.partition_watermark(99).is_none());
567    }
568
569    #[test]
570    fn test_metrics() {
571        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
572        tracker.register_partitions(2);
573
574        tracker.update_partition(0, 5000);
575        tracker.update_partition(1, 3000);
576        tracker.mark_idle(1);
577        tracker.update_partition(1, 4000); // resume
578
579        let snapshot = tracker.metrics().snapshot();
580        assert_eq!(snapshot.updates, 3);
581        assert_eq!(snapshot.idle_transitions, 1);
582        assert_eq!(snapshot.active_transitions, 1);
583    }
584
585    #[test]
586    fn test_alignment_mode_parsing() {
587        assert_eq!(
588            "pause".parse::<KafkaAlignmentMode>().unwrap(),
589            KafkaAlignmentMode::Pause
590        );
591        assert_eq!(
592            "warn-only".parse::<KafkaAlignmentMode>().unwrap(),
593            KafkaAlignmentMode::WarnOnly
594        );
595        assert_eq!(
596            "drop-excess".parse::<KafkaAlignmentMode>().unwrap(),
597            KafkaAlignmentMode::DropExcess
598        );
599        assert!("invalid".parse::<KafkaAlignmentMode>().is_err());
600    }
601
602    #[test]
603    fn test_alignment_config() {
604        let config = KafkaAlignmentConfig::new("test-group")
605            .with_max_drift(Duration::from_secs(60))
606            .with_mode(KafkaAlignmentMode::WarnOnly);
607
608        assert_eq!(config.group_id, "test-group");
609        assert_eq!(config.max_drift, Duration::from_secs(60));
610        assert_eq!(config.mode, KafkaAlignmentMode::WarnOnly);
611    }
612
613    #[test]
614    fn test_all_partitions_idle() {
615        let mut tracker = KafkaWatermarkTracker::new(0, Duration::from_secs(30));
616        tracker.register_partitions(2);
617
618        tracker.update_partition(0, 5000);
619        tracker.update_partition(1, 3000);
620        tracker.mark_idle(0);
621        tracker.mark_idle(1);
622
623        // No active partitions - no watermark
624        assert!(tracker.current_watermark().is_none());
625    }
626}