Skip to main content

laminar_core/time/
keyed_watermark.rs

1//! # Keyed Watermark Tracking
2//!
3//! Per-key watermark tracking to achieve 99%+ data accuracy compared to 63-67%
4//! with traditional global watermarks. This addresses the fundamental problem of
5//! fast-moving keys causing late data drops for slower keys.
6#![deny(clippy::disallowed_types)]
7//!
8//! ## Problem with Global Watermarks
9//!
10//! ```text
11//! Global Watermark Scenario (Traditional):
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │              Global Watermark = 10:05 (from Key C)              │
14//! │                                                                 │
15//! │  Key A: ████████████░░░░░░ events at 10:03 → DROPPED (late!)   │
16//! │  Key B: ██████░░░░░░░░░░░░ events at 10:01 → DROPPED (late!)   │
17//! │  Key C: █████████████████ events at 10:08 → OK                 │
18//! │                                                                 │
19//! │  Result: Fast-moving Key C advances watermark, slow keys       │
20//! │          have their valid events dropped as "late"             │
21//! └─────────────────────────────────────────────────────────────────┘
22//! ```
23//!
24//! ## Keyed Watermarks Solution
25//!
26//! ```text
27//! Keyed Watermark Scenario (This Feature):
28//! ┌─────────────────────────────────────────────────────────────────┐
29//! │             Per-Key Watermarks                                  │
30//! │                                                                 │
31//! │  Key A: watermark = 10:02 → events at 10:03 → OK               │
32//! │  Key B: watermark = 10:00 → events at 10:01 → OK               │
33//! │  Key C: watermark = 10:07 → events at 10:08 → OK               │
34//! │                                                                 │
35//! │  Global (for ordering): min(A,B,C) = 10:00                     │
36//! │                                                                 │
37//! │  Result: Each key tracks its own progress independently        │
38//! │          99%+ accuracy vs 63-67% with global                   │
39//! └─────────────────────────────────────────────────────────────────┘
40//! ```
41//!
42//! ## Example
43//!
44//! ```rust
45//! use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
46//! use std::time::Duration;
47//!
48//! let config = KeyedWatermarkConfig {
49//!     bounded_delay: Duration::from_secs(5),
50//!     idle_timeout: Duration::from_secs(60),
51//!     ..Default::default()
52//! };
53//!
54//! let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
55//!
56//! // Fast tenant advances quickly
57//! tracker.update("tenant_a".to_string(), 10_000);
58//! tracker.update("tenant_a".to_string(), 15_000);
59//!
60//! // Slow tenant at earlier time
61//! tracker.update("tenant_b".to_string(), 5_000);
62//!
63//! // Per-key watermarks
64//! assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000)); // 15000 - 5000
65//! assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));      // 5000 - 5000
66//!
67//! // Global watermark is minimum across active keys
68//! assert_eq!(tracker.global_watermark(), Some(Watermark::new(0))); // min of 10000, 0
69//!
70//! // Events for tenant_b at 3000 are NOT late (their key watermark allows it)
71//! assert!(!tracker.is_late(&"tenant_b".to_string(), 3000));
72//!
73//! // But events for tenant_a at 3000 ARE late (their key watermark is 10000)
74//! assert!(tracker.is_late(&"tenant_a".to_string(), 3000));
75//! ```
76
77use std::hash::Hash;
78use std::time::{Duration, Instant};
79
80use rustc_hash::FxHashMap;
81
82use super::Watermark;
83
84/// Per-key watermark state.
85///
86/// Tracks the maximum event time seen for a key and computes its watermark
87/// using bounded out-of-orderness.
88#[derive(Debug, Clone)]
89pub struct KeyWatermarkState {
90    /// Maximum event time seen for this key
91    pub max_event_time: i64,
92    /// Current watermark (`max_event_time - bounded_delay`)
93    pub watermark: i64,
94    /// Last activity time (wall clock)
95    pub last_activity: Instant,
96    /// Whether this key is marked idle
97    pub is_idle: bool,
98}
99
100impl KeyWatermarkState {
101    /// Creates a new key watermark state.
102    #[must_use]
103    pub fn new() -> Self {
104        Self {
105            max_event_time: i64::MIN,
106            watermark: i64::MIN,
107            last_activity: Instant::now(),
108            is_idle: false,
109        }
110    }
111
112    /// Updates state with a new event timestamp.
113    ///
114    /// Returns `true` if the watermark advanced.
115    ///
116    /// The `now` parameter should be the current processing time, cached once
117    /// per batch or poll iteration to avoid per-key `Instant::now()` syscalls.
118    #[inline]
119    pub fn update(&mut self, event_time: i64, bounded_delay: i64, now: Instant) -> bool {
120        self.last_activity = now;
121        self.is_idle = false;
122
123        if event_time > self.max_event_time {
124            self.max_event_time = event_time;
125            let new_watermark = event_time.saturating_sub(bounded_delay);
126            if new_watermark > self.watermark {
127                self.watermark = new_watermark;
128                return true;
129            }
130        }
131        false
132    }
133
134    /// Checks if an event is late relative to this key's watermark.
135    #[inline]
136    #[must_use]
137    pub fn is_late(&self, event_time: i64) -> bool {
138        event_time < self.watermark
139    }
140}
141
142impl Default for KeyWatermarkState {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148/// Keyed watermark tracker configuration.
149#[derive(Debug, Clone)]
150pub struct KeyedWatermarkConfig {
151    /// Maximum out-of-orderness for watermark calculation (in milliseconds)
152    pub bounded_delay: Duration,
153    /// Timeout before marking a key as idle
154    pub idle_timeout: Duration,
155    /// Maximum number of keys to track (for memory bounds)
156    pub max_keys: Option<usize>,
157    /// Eviction policy when `max_keys` reached
158    pub eviction_policy: KeyEvictionPolicy,
159}
160
161impl Default for KeyedWatermarkConfig {
162    fn default() -> Self {
163        Self {
164            bounded_delay: Duration::from_secs(5),
165            idle_timeout: Duration::from_secs(60),
166            max_keys: None,
167            eviction_policy: KeyEvictionPolicy::LeastRecentlyActive,
168        }
169    }
170}
171
172impl KeyedWatermarkConfig {
173    /// Creates a new configuration with the specified bounded delay.
174    #[must_use]
175    pub fn with_bounded_delay(bounded_delay: Duration) -> Self {
176        Self {
177            bounded_delay,
178            ..Default::default()
179        }
180    }
181
182    /// Sets the idle timeout.
183    #[must_use]
184    pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
185        self.idle_timeout = timeout;
186        self
187    }
188
189    /// Sets the maximum number of keys to track.
190    #[must_use]
191    pub fn with_max_keys(mut self, max_keys: usize) -> Self {
192        self.max_keys = Some(max_keys);
193        self
194    }
195
196    /// Sets the eviction policy.
197    #[must_use]
198    pub fn with_eviction_policy(mut self, policy: KeyEvictionPolicy) -> Self {
199        self.eviction_policy = policy;
200        self
201    }
202}
203
204/// Policy for evicting keys when `max_keys` is reached.
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
206pub enum KeyEvictionPolicy {
207    /// Evict key with oldest `last_activity`
208    #[default]
209    LeastRecentlyActive,
210    /// Evict key with lowest watermark
211    LowestWatermark,
212    /// Reject new keys (return error)
213    RejectNew,
214}
215
216/// Errors that can occur in keyed watermark operations.
217#[derive(Debug, Clone, thiserror::Error)]
218pub enum KeyedWatermarkError {
219    /// Maximum number of keys reached with `RejectNew` policy
220    #[error("Maximum keys reached ({max_keys}), cannot add new key")]
221    MaxKeysReached {
222        /// Maximum keys configured
223        max_keys: usize,
224    },
225}
226
227/// Metrics for keyed watermark tracking.
228#[derive(Debug, Clone, Default)]
229pub struct KeyedWatermarkMetrics {
230    /// Total unique keys tracked
231    pub total_keys: usize,
232    /// Currently active (non-idle) keys
233    pub active_keys: usize,
234    /// Idle keys
235    pub idle_keys: usize,
236    /// Keys evicted due to `max_keys` limit
237    pub evicted_keys: u64,
238    /// Global watermark advancements
239    pub global_advances: u64,
240    /// Per-key watermark advancements
241    pub key_advances: u64,
242}
243
244impl KeyedWatermarkMetrics {
245    /// Creates new metrics.
246    #[must_use]
247    pub fn new() -> Self {
248        Self::default()
249    }
250}
251
252/// Tracks watermarks per logical key.
253///
254/// Provides fine-grained watermark tracking for multi-tenant workloads
255/// and scenarios with significant event-time skew between keys.
256///
257/// # Research Background
258///
259/// Based on research (March 2025), keyed watermarks achieve
260/// **99%+ accuracy** compared to **63-67%** with global watermarks.
261///
262/// # Example
263///
264/// ```rust
265/// use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
266/// use std::time::Duration;
267///
268/// let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
269/// let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
270///
271/// // Fast tenant advances quickly
272/// tracker.update("tenant_a".to_string(), 10_000);
273/// tracker.update("tenant_a".to_string(), 15_000);
274///
275/// // Slow tenant at earlier time
276/// tracker.update("tenant_b".to_string(), 5_000);
277///
278/// // Per-key watermarks differ
279/// assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
280/// assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));
281///
282/// // Global watermark is minimum
283/// assert_eq!(tracker.global_watermark(), Some(Watermark::new(0)));
284/// ```
285#[derive(Debug)]
286pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> {
287    /// Per-key watermark state
288    key_states: FxHashMap<K, KeyWatermarkState>,
289
290    /// Global watermark (minimum across all active keys)
291    global_watermark: i64,
292
293    /// Configuration
294    config: KeyedWatermarkConfig,
295
296    /// Bounded delay in milliseconds (cached from config)
297    bounded_delay_ms: i64,
298
299    /// Metrics
300    metrics: KeyedWatermarkMetrics,
301}
302
303impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K> {
304    /// Creates a new keyed watermark tracker with the given configuration.
305    #[must_use]
306    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
307    pub fn new(config: KeyedWatermarkConfig) -> Self {
308        let bounded_delay_ms = config.bounded_delay.as_millis() as i64;
309        Self {
310            key_states: FxHashMap::default(),
311            global_watermark: i64::MIN,
312            config,
313            bounded_delay_ms,
314            metrics: KeyedWatermarkMetrics::new(),
315        }
316    }
317
318    /// Creates a tracker with default configuration.
319    #[must_use]
320    pub fn with_defaults() -> Self {
321        Self::new(KeyedWatermarkConfig::default())
322    }
323
324    /// Updates the watermark for a specific key.
325    ///
326    /// # Returns
327    ///
328    /// - `Ok(Some(Watermark))` if the global watermark changes
329    /// - `Ok(None)` if no global change
330    ///
331    /// # Errors
332    ///
333    /// Returns `KeyedWatermarkError::MaxKeysReached` if `max_keys` is reached
334    /// and the `RejectNew` eviction policy is configured.
335    ///
336    /// # Example
337    ///
338    /// ```rust
339    /// use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig};
340    /// use std::time::Duration;
341    ///
342    /// let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
343    /// let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
344    ///
345    /// // First update creates the key
346    /// let wm = tracker.update("key1".to_string(), 1000).unwrap();
347    /// assert!(wm.is_some()); // Global watermark advances
348    /// ```
349    #[allow(clippy::missing_panics_doc)] // Internal invariant: key is always present after insert
350    #[allow(clippy::needless_pass_by_value)] // Key must be owned for FxHashMap insertion
351    pub fn update(
352        &mut self,
353        key: K,
354        event_time: i64,
355    ) -> Result<Option<Watermark>, KeyedWatermarkError> {
356        // Check if we need to create a new key
357        if !self.key_states.contains_key(&key) {
358            // Check max_keys limit
359            if let Some(max_keys) = self.config.max_keys {
360                if self.key_states.len() >= max_keys {
361                    match self.config.eviction_policy {
362                        KeyEvictionPolicy::RejectNew => {
363                            return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
364                        }
365                        KeyEvictionPolicy::LeastRecentlyActive => {
366                            self.evict_least_recently_active();
367                        }
368                        KeyEvictionPolicy::LowestWatermark => {
369                            self.evict_lowest_watermark();
370                        }
371                    }
372                }
373            }
374            self.key_states
375                .insert(key.clone(), KeyWatermarkState::new());
376            self.metrics.total_keys = self.key_states.len();
377        }
378
379        // Update the key's watermark (cache Instant::now() once per update call)
380        let now = Instant::now();
381        let state = self.key_states.get_mut(&key).expect("key just inserted");
382        let watermark_advanced = state.update(event_time, self.bounded_delay_ms, now);
383
384        if watermark_advanced {
385            self.metrics.key_advances += 1;
386        }
387
388        // Update metrics
389        self.update_metrics_counts();
390
391        // Try to advance global watermark
392        Ok(self.try_advance_global())
393    }
394
395    /// Batch update for multiple events (more efficient).
396    ///
397    /// Returns the new global watermark if it changed.
398    ///
399    /// # Errors
400    ///
401    /// Returns `KeyedWatermarkError::MaxKeysReached` if `max_keys` is reached
402    /// and the `RejectNew` eviction policy is configured.
403    #[allow(clippy::missing_panics_doc)] // Internal invariant: key is always present after insert
404    pub fn update_batch(
405        &mut self,
406        events: &[(K, i64)],
407    ) -> Result<Option<Watermark>, KeyedWatermarkError> {
408        // Cache Instant::now() once for the entire batch
409        let now = Instant::now();
410        for (key, event_time) in events {
411            // Check if we need to create a new key
412            if !self.key_states.contains_key(key) {
413                if let Some(max_keys) = self.config.max_keys {
414                    if self.key_states.len() >= max_keys {
415                        match self.config.eviction_policy {
416                            KeyEvictionPolicy::RejectNew => {
417                                return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
418                            }
419                            KeyEvictionPolicy::LeastRecentlyActive => {
420                                self.evict_least_recently_active();
421                            }
422                            KeyEvictionPolicy::LowestWatermark => {
423                                self.evict_lowest_watermark();
424                            }
425                        }
426                    }
427                }
428                self.key_states
429                    .insert(key.clone(), KeyWatermarkState::new());
430            }
431
432            let state = self.key_states.get_mut(key).expect("key just inserted");
433            if state.update(*event_time, self.bounded_delay_ms, now) {
434                self.metrics.key_advances += 1;
435            }
436        }
437
438        self.metrics.total_keys = self.key_states.len();
439        self.update_metrics_counts();
440
441        Ok(self.try_advance_global())
442    }
443
444    /// Returns the watermark for a specific key.
445    #[must_use]
446    pub fn watermark_for_key(&self, key: &K) -> Option<i64> {
447        self.key_states.get(key).map(|s| s.watermark)
448    }
449
450    /// Returns the global watermark (minimum across active keys).
451    #[must_use]
452    pub fn global_watermark(&self) -> Option<Watermark> {
453        if self.global_watermark == i64::MIN {
454            None
455        } else {
456            Some(Watermark::new(self.global_watermark))
457        }
458    }
459
460    /// Checks if an event is late for its key.
461    ///
462    /// Uses the key's individual watermark, not the global watermark.
463    /// If the key doesn't exist, returns `false` (not late).
464    #[must_use]
465    pub fn is_late(&self, key: &K, event_time: i64) -> bool {
466        self.key_states
467            .get(key)
468            .is_some_and(|s| s.is_late(event_time))
469    }
470
471    /// Checks if an event is late using the global watermark.
472    ///
473    /// Use this for cross-key ordering guarantees.
474    #[must_use]
475    pub fn is_late_global(&self, event_time: i64) -> bool {
476        event_time < self.global_watermark
477    }
478
479    /// Marks a key as idle, excluding it from global watermark calculation.
480    ///
481    /// Returns `Some(Watermark)` if the global watermark advances.
482    pub fn mark_idle(&mut self, key: &K) -> Option<Watermark> {
483        if let Some(state) = self.key_states.get_mut(key) {
484            if !state.is_idle {
485                state.is_idle = true;
486                self.update_metrics_counts();
487                return self.try_advance_global();
488            }
489        }
490        None
491    }
492
493    /// Marks a key as active again.
494    pub fn mark_active(&mut self, key: &K) {
495        if let Some(state) = self.key_states.get_mut(key) {
496            if state.is_idle {
497                state.is_idle = false;
498                state.last_activity = Instant::now();
499                self.update_metrics_counts();
500            }
501        }
502    }
503
504    /// Checks for keys that have been idle longer than the timeout.
505    ///
506    /// Should be called periodically from Ring 1.
507    ///
508    /// Returns `Some(Watermark)` if marking idle keys causes the global watermark to advance.
509    pub fn check_idle_keys(&mut self) -> Option<Watermark> {
510        let idle_timeout = self.config.idle_timeout;
511        let mut any_marked = false;
512
513        for state in self.key_states.values_mut() {
514            if !state.is_idle && state.last_activity.elapsed() >= idle_timeout {
515                state.is_idle = true;
516                any_marked = true;
517            }
518        }
519
520        if any_marked {
521            self.update_metrics_counts();
522            self.try_advance_global()
523        } else {
524            None
525        }
526    }
527
528    /// Returns the number of active (non-idle) keys.
529    #[must_use]
530    pub fn active_key_count(&self) -> usize {
531        self.key_states.values().filter(|s| !s.is_idle).count()
532    }
533
534    /// Returns the total number of tracked keys.
535    #[must_use]
536    pub fn total_key_count(&self) -> usize {
537        self.key_states.len()
538    }
539
540    /// Returns metrics.
541    #[must_use]
542    pub fn metrics(&self) -> &KeyedWatermarkMetrics {
543        &self.metrics
544    }
545
546    /// Forces recalculation of global watermark.
547    ///
548    /// Useful after bulk operations or recovery.
549    pub fn recalculate_global(&mut self) -> Option<Watermark> {
550        let new_global = self.calculate_global();
551        if new_global != i64::MAX && new_global != i64::MIN {
552            self.global_watermark = new_global;
553            Some(Watermark::new(new_global))
554        } else {
555            None
556        }
557    }
558
559    /// Removes a key from tracking.
560    ///
561    /// Returns the key's watermark state if it existed.
562    pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState> {
563        let state = self.key_states.remove(key);
564        if state.is_some() {
565            self.metrics.total_keys = self.key_states.len();
566            self.update_metrics_counts();
567            // Recalculate global since we removed a key
568            let new_global = self.calculate_global();
569            if new_global > self.global_watermark && new_global != i64::MAX {
570                self.global_watermark = new_global;
571            }
572        }
573        state
574    }
575
576    /// Clears all tracked keys.
577    pub fn clear(&mut self) {
578        self.key_states.clear();
579        self.global_watermark = i64::MIN;
580        self.metrics = KeyedWatermarkMetrics::new();
581    }
582
583    /// Returns the state for a specific key.
584    #[must_use]
585    pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState> {
586        self.key_states.get(key)
587    }
588
589    /// Returns the configuration.
590    #[must_use]
591    pub fn config(&self) -> &KeyedWatermarkConfig {
592        &self.config
593    }
594
595    /// Checks if a key exists in the tracker.
596    #[must_use]
597    pub fn contains_key(&self, key: &K) -> bool {
598        self.key_states.contains_key(key)
599    }
600
601    /// Returns an iterator over all keys.
602    pub fn keys(&self) -> impl Iterator<Item = &K> {
603        self.key_states.keys()
604    }
605
606    /// Returns an iterator over all key-state pairs.
607    pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)> {
608        self.key_states.iter()
609    }
610
611    /// Returns the bounded delay in milliseconds.
612    #[must_use]
613    pub fn bounded_delay_ms(&self) -> i64 {
614        self.bounded_delay_ms
615    }
616
617    /// Tries to advance the global watermark.
618    ///
619    /// Note: Unlike partition-based tracking where global only advances,
620    /// keyed watermarks can regress when new keys are added (since a new
621    /// key starts with MIN watermark). This is intentional - the global
622    /// watermark always reflects the minimum across active keys.
623    fn try_advance_global(&mut self) -> Option<Watermark> {
624        let new_global = self.calculate_global();
625
626        if new_global != i64::MAX && new_global != i64::MIN && new_global != self.global_watermark {
627            let old_global = self.global_watermark;
628            self.global_watermark = new_global;
629            if new_global > old_global {
630                self.metrics.global_advances += 1;
631            }
632            Some(Watermark::new(new_global))
633        } else {
634            None
635        }
636    }
637
638    /// Calculates the global watermark.
639    fn calculate_global(&self) -> i64 {
640        let mut min_watermark = i64::MAX;
641        let mut has_active = false;
642
643        for state in self.key_states.values() {
644            if !state.is_idle {
645                has_active = true;
646                min_watermark = min_watermark.min(state.watermark);
647            }
648        }
649
650        // If all keys are idle, use the max watermark to allow progress
651        if !has_active {
652            min_watermark = self
653                .key_states
654                .values()
655                .map(|s| s.watermark)
656                .max()
657                .unwrap_or(i64::MIN);
658        }
659
660        min_watermark
661    }
662
663    /// Updates the metrics counts.
664    fn update_metrics_counts(&mut self) {
665        self.metrics.idle_keys = self.key_states.values().filter(|s| s.is_idle).count();
666        self.metrics.active_keys = self.metrics.total_keys - self.metrics.idle_keys;
667    }
668
669    /// Evicts the least recently active key.
670    fn evict_least_recently_active(&mut self) {
671        if let Some(key_to_evict) = self
672            .key_states
673            .iter()
674            .min_by_key(|(_, state)| state.last_activity)
675            .map(|(k, _)| k.clone())
676        {
677            self.key_states.remove(&key_to_evict);
678            self.metrics.evicted_keys += 1;
679        }
680    }
681
682    /// Evicts the key with the lowest watermark.
683    fn evict_lowest_watermark(&mut self) {
684        if let Some(key_to_evict) = self
685            .key_states
686            .iter()
687            .min_by_key(|(_, state)| state.watermark)
688            .map(|(k, _)| k.clone())
689        {
690            self.key_states.remove(&key_to_evict);
691            self.metrics.evicted_keys += 1;
692        }
693    }
694}
695
696/// Keyed watermark tracker with late event handling.
697///
698/// Wraps `KeyedWatermarkTracker` and provides utilities for handling late events,
699/// including counting and optional side-output.
700#[derive(Debug)]
701pub struct KeyedWatermarkTrackerWithLateHandling<K: Hash + Eq + Clone> {
702    /// Inner tracker
703    tracker: KeyedWatermarkTracker<K>,
704    /// Count of late events per key
705    late_events_per_key: FxHashMap<K, u64>,
706    /// Total late events
707    total_late_events: u64,
708}
709
710impl<K: Hash + Eq + Clone> KeyedWatermarkTrackerWithLateHandling<K> {
711    /// Creates a new tracker with late event handling.
712    #[must_use]
713    pub fn new(config: KeyedWatermarkConfig) -> Self {
714        Self {
715            tracker: KeyedWatermarkTracker::new(config),
716            late_events_per_key: FxHashMap::default(),
717            total_late_events: 0,
718        }
719    }
720
721    /// Updates the watermark and checks for late events.
722    ///
723    /// Returns `(watermark_result, is_late)`.
724    ///
725    /// # Errors
726    ///
727    /// Returns `KeyedWatermarkError::MaxKeysReached` if the maximum number of keys
728    /// is reached and the eviction policy is `RejectNew`.
729    pub fn update_with_late_check(
730        &mut self,
731        key: K,
732        event_time: i64,
733    ) -> Result<(Option<Watermark>, bool), KeyedWatermarkError> {
734        let is_late = self.tracker.is_late(&key, event_time);
735
736        if is_late {
737            *self.late_events_per_key.entry(key.clone()).or_insert(0) += 1;
738            self.total_late_events += 1;
739        }
740
741        let wm = self.tracker.update(key, event_time)?;
742        Ok((wm, is_late))
743    }
744
745    /// Returns late event count for a key.
746    #[must_use]
747    pub fn late_events_for_key(&self, key: &K) -> u64 {
748        self.late_events_per_key.get(key).copied().unwrap_or(0)
749    }
750
751    /// Returns total late event count.
752    #[must_use]
753    pub fn total_late_events(&self) -> u64 {
754        self.total_late_events
755    }
756
757    /// Returns a reference to the inner tracker.
758    #[must_use]
759    pub fn inner(&self) -> &KeyedWatermarkTracker<K> {
760        &self.tracker
761    }
762
763    /// Returns a mutable reference to the inner tracker.
764    pub fn inner_mut(&mut self) -> &mut KeyedWatermarkTracker<K> {
765        &mut self.tracker
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772
773    #[test]
774    fn test_key_watermark_state_creation() {
775        let state = KeyWatermarkState::new();
776        assert_eq!(state.max_event_time, i64::MIN);
777        assert_eq!(state.watermark, i64::MIN);
778        assert!(!state.is_idle);
779    }
780
781    #[test]
782    fn test_key_watermark_state_update() {
783        let mut state = KeyWatermarkState::new();
784        let now = Instant::now();
785
786        // First update
787        let advanced = state.update(1000, 100, now);
788        assert!(advanced);
789        assert_eq!(state.max_event_time, 1000);
790        assert_eq!(state.watermark, 900); // 1000 - 100
791
792        // Out-of-order event
793        let advanced = state.update(800, 100, now);
794        assert!(!advanced);
795        assert_eq!(state.max_event_time, 1000); // Unchanged
796
797        // New max
798        let advanced = state.update(1500, 100, now);
799        assert!(advanced);
800        assert_eq!(state.watermark, 1400);
801    }
802
803    #[test]
804    fn test_key_watermark_state_is_late() {
805        let mut state = KeyWatermarkState::new();
806        state.update(1000, 100, Instant::now()); // watermark = 900
807
808        assert!(state.is_late(800)); // Before watermark
809        assert!(state.is_late(899)); // Just before watermark
810        assert!(!state.is_late(900)); // At watermark
811        assert!(!state.is_late(1000)); // After watermark
812    }
813
814    #[test]
815    fn test_config_defaults() {
816        let config = KeyedWatermarkConfig::default();
817        assert_eq!(config.bounded_delay, Duration::from_secs(5));
818        assert_eq!(config.idle_timeout, Duration::from_secs(60));
819        assert!(config.max_keys.is_none());
820        assert_eq!(
821            config.eviction_policy,
822            KeyEvictionPolicy::LeastRecentlyActive
823        );
824    }
825
826    #[test]
827    fn test_config_builder() {
828        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(10))
829            .with_idle_timeout(Duration::from_secs(30))
830            .with_max_keys(1000)
831            .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
832
833        assert_eq!(config.bounded_delay, Duration::from_secs(10));
834        assert_eq!(config.idle_timeout, Duration::from_secs(30));
835        assert_eq!(config.max_keys, Some(1000));
836        assert_eq!(config.eviction_policy, KeyEvictionPolicy::LowestWatermark);
837    }
838
839    #[test]
840    fn test_keyed_tracker_single_key_updates_watermark() {
841        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
842        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
843
844        let wm = tracker.update("key1".to_string(), 1000).unwrap();
845        assert_eq!(wm, Some(Watermark::new(900)));
846        assert_eq!(tracker.watermark_for_key(&"key1".to_string()), Some(900));
847        assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
848    }
849
850    #[test]
851    fn test_keyed_tracker_multiple_keys_independent_watermarks() {
852        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
853        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
854
855        tracker.update("fast".to_string(), 5000).unwrap();
856        tracker.update("slow".to_string(), 1000).unwrap();
857
858        // Each key has its own watermark
859        assert_eq!(tracker.watermark_for_key(&"fast".to_string()), Some(4900));
860        assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
861
862        // Global is minimum
863        assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
864    }
865
866    #[test]
867    fn test_keyed_tracker_global_is_minimum_of_active_keys() {
868        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
869        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
870
871        tracker.update("a".to_string(), 5000).unwrap();
872        tracker.update("b".to_string(), 3000).unwrap();
873        tracker.update("c".to_string(), 7000).unwrap();
874
875        assert_eq!(tracker.global_watermark(), Some(Watermark::new(3000)));
876    }
877
878    #[test]
879    fn test_keyed_tracker_fast_key_does_not_affect_slow_key() {
880        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
881        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
882
883        // Slow key starts first
884        tracker.update("slow".to_string(), 1000).unwrap();
885
886        // Fast key advances rapidly
887        tracker.update("fast".to_string(), 5000).unwrap();
888        tracker.update("fast".to_string(), 10000).unwrap();
889
890        // Slow key's watermark is independent
891        assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
892
893        // Event at 950 is not late for slow key
894        assert!(!tracker.is_late(&"slow".to_string(), 950));
895
896        // But it would be very late for fast key
897        assert!(tracker.is_late(&"fast".to_string(), 950));
898    }
899
900    #[test]
901    fn test_keyed_tracker_is_late_uses_key_watermark_not_global() {
902        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
903        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
904
905        tracker.update("fast".to_string(), 10000).unwrap(); // watermark = 9900
906        tracker.update("slow".to_string(), 1000).unwrap(); // watermark = 900
907
908        // Global is 900, but we use per-key watermarks
909        assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
910
911        // Event at 5000 is NOT late for slow key (5000 >= 900)
912        assert!(!tracker.is_late(&"slow".to_string(), 5000));
913
914        // Event at 5000 IS late for fast key (5000 < 9900)
915        assert!(tracker.is_late(&"fast".to_string(), 5000));
916    }
917
918    #[test]
919    fn test_keyed_tracker_idle_key_excluded_from_global() {
920        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
921        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
922
923        tracker.update("fast".to_string(), 5000).unwrap();
924        tracker.update("slow".to_string(), 1000).unwrap();
925
926        assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
927
928        // Mark slow key as idle
929        let wm = tracker.mark_idle(&"slow".to_string());
930        assert_eq!(wm, Some(Watermark::new(5000)));
931        assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
932    }
933
934    #[test]
935    fn test_keyed_tracker_all_idle_uses_max() {
936        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
937        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
938
939        tracker.update("a".to_string(), 5000).unwrap();
940        tracker.update("b".to_string(), 3000).unwrap();
941
942        tracker.mark_idle(&"a".to_string());
943        let wm = tracker.mark_idle(&"b".to_string());
944
945        // When all idle, use max to allow progress
946        assert_eq!(wm, Some(Watermark::new(5000)));
947    }
948
949    #[test]
950    fn test_keyed_tracker_key_eviction_lru() {
951        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
952            .with_max_keys(2)
953            .with_eviction_policy(KeyEvictionPolicy::LeastRecentlyActive);
954
955        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
956
957        tracker.update("a".to_string(), 1000).unwrap();
958        std::thread::sleep(Duration::from_millis(10));
959        tracker.update("b".to_string(), 2000).unwrap();
960
961        assert_eq!(tracker.total_key_count(), 2);
962
963        // Adding third key should evict "a" (least recently active)
964        tracker.update("c".to_string(), 3000).unwrap();
965
966        assert_eq!(tracker.total_key_count(), 2);
967        assert!(!tracker.contains_key(&"a".to_string())); // Evicted
968        assert!(tracker.contains_key(&"b".to_string()));
969        assert!(tracker.contains_key(&"c".to_string()));
970        assert_eq!(tracker.metrics().evicted_keys, 1);
971    }
972
973    #[test]
974    fn test_keyed_tracker_key_eviction_lowest_watermark() {
975        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
976            .with_max_keys(2)
977            .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
978
979        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
980
981        tracker.update("high".to_string(), 5000).unwrap();
982        tracker.update("low".to_string(), 1000).unwrap();
983
984        // Adding third key should evict "low" (lowest watermark)
985        tracker.update("mid".to_string(), 3000).unwrap();
986
987        assert!(!tracker.contains_key(&"low".to_string())); // Evicted
988        assert!(tracker.contains_key(&"high".to_string()));
989        assert!(tracker.contains_key(&"mid".to_string()));
990    }
991
992    #[test]
993    fn test_keyed_tracker_key_eviction_reject_new() {
994        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
995            .with_max_keys(2)
996            .with_eviction_policy(KeyEvictionPolicy::RejectNew);
997
998        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
999
1000        tracker.update("a".to_string(), 1000).unwrap();
1001        tracker.update("b".to_string(), 2000).unwrap();
1002
1003        // Third key should be rejected
1004        let result = tracker.update("c".to_string(), 3000);
1005        assert!(matches!(
1006            result,
1007            Err(KeyedWatermarkError::MaxKeysReached { max_keys: 2 })
1008        ));
1009
1010        // Existing keys still work
1011        assert!(tracker.update("a".to_string(), 1500).is_ok());
1012    }
1013
1014    #[test]
1015    fn test_keyed_tracker_batch_update_efficient() {
1016        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1017        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1018
1019        let events = vec![
1020            ("a".to_string(), 1000),
1021            ("b".to_string(), 2000),
1022            ("a".to_string(), 1500),
1023            ("c".to_string(), 3000),
1024        ];
1025
1026        let wm = tracker.update_batch(&events).unwrap();
1027        assert!(wm.is_some());
1028
1029        assert_eq!(tracker.total_key_count(), 3);
1030        assert_eq!(tracker.watermark_for_key(&"a".to_string()), Some(1400)); // 1500 - 100
1031        assert_eq!(tracker.watermark_for_key(&"b".to_string()), Some(1900));
1032        assert_eq!(tracker.watermark_for_key(&"c".to_string()), Some(2900));
1033    }
1034
1035    #[test]
1036    fn test_keyed_tracker_remove_key_recalculates_global() {
1037        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1038        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1039
1040        tracker.update("fast".to_string(), 5000).unwrap();
1041        tracker.update("slow".to_string(), 1000).unwrap();
1042
1043        assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
1044
1045        // Remove slow key
1046        let state = tracker.remove_key(&"slow".to_string());
1047        assert!(state.is_some());
1048        assert_eq!(state.unwrap().watermark, 1000);
1049
1050        // Global should now be 5000
1051        assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
1052    }
1053
1054    #[test]
1055    fn test_keyed_tracker_check_idle_keys() {
1056        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
1057            .with_idle_timeout(Duration::from_millis(10));
1058
1059        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1060
1061        tracker.update("fast".to_string(), 5000).unwrap();
1062        tracker.update("slow".to_string(), 1000).unwrap();
1063
1064        // Wait for timeout
1065        std::thread::sleep(Duration::from_millis(20));
1066
1067        // Update only fast key
1068        tracker.update("fast".to_string(), 6000).unwrap();
1069
1070        // Check for idle keys
1071        let wm = tracker.check_idle_keys();
1072
1073        // slow should be marked idle
1074        assert!(tracker.key_state(&"slow".to_string()).unwrap().is_idle);
1075
1076        // Global should advance
1077        assert!(wm.is_some() || tracker.global_watermark() == Some(Watermark::new(6000)));
1078    }
1079
1080    #[test]
1081    fn test_keyed_tracker_metrics() {
1082        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1083        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1084
1085        tracker.update("a".to_string(), 1000).unwrap();
1086        tracker.update("b".to_string(), 2000).unwrap();
1087        tracker.update("a".to_string(), 1500).unwrap(); // Advances a's watermark
1088
1089        let metrics = tracker.metrics();
1090        assert_eq!(metrics.total_keys, 2);
1091        assert_eq!(metrics.active_keys, 2);
1092        assert_eq!(metrics.idle_keys, 0);
1093        assert!(metrics.key_advances >= 3); // At least 3 watermark advances
1094        assert!(metrics.global_advances >= 1);
1095
1096        tracker.mark_idle(&"b".to_string());
1097
1098        let metrics = tracker.metrics();
1099        assert_eq!(metrics.active_keys, 1);
1100        assert_eq!(metrics.idle_keys, 1);
1101    }
1102
1103    #[test]
1104    fn test_keyed_tracker_clear() {
1105        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1106        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1107
1108        tracker.update("a".to_string(), 1000).unwrap();
1109        tracker.update("b".to_string(), 2000).unwrap();
1110
1111        tracker.clear();
1112
1113        assert_eq!(tracker.total_key_count(), 0);
1114        assert_eq!(tracker.global_watermark(), None);
1115    }
1116
1117    #[test]
1118    fn test_keyed_tracker_is_late_global() {
1119        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1120        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1121
1122        tracker.update("fast".to_string(), 5000).unwrap();
1123        tracker.update("slow".to_string(), 1000).unwrap();
1124
1125        // Global watermark is 1000
1126        assert!(!tracker.is_late_global(1000));
1127        assert!(tracker.is_late_global(999));
1128    }
1129
1130    #[test]
1131    fn test_keyed_tracker_iteration() {
1132        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1133        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1134
1135        tracker.update("a".to_string(), 1000).unwrap();
1136        tracker.update("b".to_string(), 2000).unwrap();
1137
1138        let keys: Vec<_> = tracker.keys().collect();
1139        assert_eq!(keys.len(), 2);
1140
1141        let pairs: Vec<_> = tracker.iter().collect();
1142        assert_eq!(pairs.len(), 2);
1143    }
1144
1145    #[test]
1146    fn test_late_handling_tracker() {
1147        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1148        let mut tracker: KeyedWatermarkTrackerWithLateHandling<String> =
1149            KeyedWatermarkTrackerWithLateHandling::new(config);
1150
1151        // First event
1152        let (wm, is_late) = tracker
1153            .update_with_late_check("key1".to_string(), 1000)
1154            .unwrap();
1155        assert!(wm.is_some());
1156        assert!(!is_late);
1157
1158        // On-time event
1159        let (_, is_late) = tracker
1160            .update_with_late_check("key1".to_string(), 950)
1161            .unwrap();
1162        assert!(!is_late); // 950 >= 900 (watermark)
1163
1164        // Late event
1165        let (_, is_late) = tracker
1166            .update_with_late_check("key1".to_string(), 800)
1167            .unwrap();
1168        assert!(is_late); // 800 < 900
1169
1170        assert_eq!(tracker.late_events_for_key(&"key1".to_string()), 1);
1171        assert_eq!(tracker.total_late_events(), 1);
1172    }
1173}