1#![deny(clippy::disallowed_types)]
7use std::hash::Hash;
78use std::time::{Duration, Instant};
79
80use rustc_hash::FxHashMap;
81
82use super::Watermark;
83
84#[derive(Debug, Clone)]
89pub struct KeyWatermarkState {
90 pub max_event_time: i64,
92 pub watermark: i64,
94 pub last_activity: Instant,
96 pub is_idle: bool,
98}
99
100impl KeyWatermarkState {
101 #[must_use]
103 pub fn new() -> Self {
104 Self {
105 max_event_time: i64::MIN,
106 watermark: i64::MIN,
107 last_activity: Instant::now(),
108 is_idle: false,
109 }
110 }
111
112 #[inline]
119 pub fn update(&mut self, event_time: i64, bounded_delay: i64, now: Instant) -> bool {
120 self.last_activity = now;
121 self.is_idle = false;
122
123 if event_time > self.max_event_time {
124 self.max_event_time = event_time;
125 let new_watermark = event_time.saturating_sub(bounded_delay);
126 if new_watermark > self.watermark {
127 self.watermark = new_watermark;
128 return true;
129 }
130 }
131 false
132 }
133
134 #[inline]
136 #[must_use]
137 pub fn is_late(&self, event_time: i64) -> bool {
138 event_time < self.watermark
139 }
140}
141
142impl Default for KeyWatermarkState {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct KeyedWatermarkConfig {
151 pub bounded_delay: Duration,
153 pub idle_timeout: Duration,
155 pub max_keys: Option<usize>,
157 pub eviction_policy: KeyEvictionPolicy,
159}
160
161impl Default for KeyedWatermarkConfig {
162 fn default() -> Self {
163 Self {
164 bounded_delay: Duration::from_secs(5),
165 idle_timeout: Duration::from_secs(60),
166 max_keys: None,
167 eviction_policy: KeyEvictionPolicy::LeastRecentlyActive,
168 }
169 }
170}
171
172impl KeyedWatermarkConfig {
173 #[must_use]
175 pub fn with_bounded_delay(bounded_delay: Duration) -> Self {
176 Self {
177 bounded_delay,
178 ..Default::default()
179 }
180 }
181
182 #[must_use]
184 pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
185 self.idle_timeout = timeout;
186 self
187 }
188
189 #[must_use]
191 pub fn with_max_keys(mut self, max_keys: usize) -> Self {
192 self.max_keys = Some(max_keys);
193 self
194 }
195
196 #[must_use]
198 pub fn with_eviction_policy(mut self, policy: KeyEvictionPolicy) -> Self {
199 self.eviction_policy = policy;
200 self
201 }
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
206pub enum KeyEvictionPolicy {
207 #[default]
209 LeastRecentlyActive,
210 LowestWatermark,
212 RejectNew,
214}
215
216#[derive(Debug, Clone, thiserror::Error)]
218pub enum KeyedWatermarkError {
219 #[error("Maximum keys reached ({max_keys}), cannot add new key")]
221 MaxKeysReached {
222 max_keys: usize,
224 },
225}
226
227#[derive(Debug, Clone, Default)]
229pub struct KeyedWatermarkMetrics {
230 pub total_keys: usize,
232 pub active_keys: usize,
234 pub idle_keys: usize,
236 pub evicted_keys: u64,
238 pub global_advances: u64,
240 pub key_advances: u64,
242}
243
244impl KeyedWatermarkMetrics {
245 #[must_use]
247 pub fn new() -> Self {
248 Self::default()
249 }
250}
251
252#[derive(Debug)]
286pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> {
287 key_states: FxHashMap<K, KeyWatermarkState>,
289
290 global_watermark: i64,
292
293 config: KeyedWatermarkConfig,
295
296 bounded_delay_ms: i64,
298
299 metrics: KeyedWatermarkMetrics,
301}
302
303impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K> {
304 #[must_use]
306 #[allow(clippy::cast_possible_truncation)] pub fn new(config: KeyedWatermarkConfig) -> Self {
308 let bounded_delay_ms = config.bounded_delay.as_millis() as i64;
309 Self {
310 key_states: FxHashMap::default(),
311 global_watermark: i64::MIN,
312 config,
313 bounded_delay_ms,
314 metrics: KeyedWatermarkMetrics::new(),
315 }
316 }
317
318 #[must_use]
320 pub fn with_defaults() -> Self {
321 Self::new(KeyedWatermarkConfig::default())
322 }
323
324 #[allow(clippy::missing_panics_doc)] #[allow(clippy::needless_pass_by_value)] pub fn update(
352 &mut self,
353 key: K,
354 event_time: i64,
355 ) -> Result<Option<Watermark>, KeyedWatermarkError> {
356 if !self.key_states.contains_key(&key) {
358 if let Some(max_keys) = self.config.max_keys {
360 if self.key_states.len() >= max_keys {
361 match self.config.eviction_policy {
362 KeyEvictionPolicy::RejectNew => {
363 return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
364 }
365 KeyEvictionPolicy::LeastRecentlyActive => {
366 self.evict_least_recently_active();
367 }
368 KeyEvictionPolicy::LowestWatermark => {
369 self.evict_lowest_watermark();
370 }
371 }
372 }
373 }
374 self.key_states
375 .insert(key.clone(), KeyWatermarkState::new());
376 self.metrics.total_keys = self.key_states.len();
377 }
378
379 let now = Instant::now();
381 let state = self.key_states.get_mut(&key).expect("key just inserted");
382 let watermark_advanced = state.update(event_time, self.bounded_delay_ms, now);
383
384 if watermark_advanced {
385 self.metrics.key_advances += 1;
386 }
387
388 self.update_metrics_counts();
390
391 Ok(self.try_advance_global())
393 }
394
395 #[allow(clippy::missing_panics_doc)] pub fn update_batch(
405 &mut self,
406 events: &[(K, i64)],
407 ) -> Result<Option<Watermark>, KeyedWatermarkError> {
408 let now = Instant::now();
410 for (key, event_time) in events {
411 if !self.key_states.contains_key(key) {
413 if let Some(max_keys) = self.config.max_keys {
414 if self.key_states.len() >= max_keys {
415 match self.config.eviction_policy {
416 KeyEvictionPolicy::RejectNew => {
417 return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
418 }
419 KeyEvictionPolicy::LeastRecentlyActive => {
420 self.evict_least_recently_active();
421 }
422 KeyEvictionPolicy::LowestWatermark => {
423 self.evict_lowest_watermark();
424 }
425 }
426 }
427 }
428 self.key_states
429 .insert(key.clone(), KeyWatermarkState::new());
430 }
431
432 let state = self.key_states.get_mut(key).expect("key just inserted");
433 if state.update(*event_time, self.bounded_delay_ms, now) {
434 self.metrics.key_advances += 1;
435 }
436 }
437
438 self.metrics.total_keys = self.key_states.len();
439 self.update_metrics_counts();
440
441 Ok(self.try_advance_global())
442 }
443
444 #[must_use]
446 pub fn watermark_for_key(&self, key: &K) -> Option<i64> {
447 self.key_states.get(key).map(|s| s.watermark)
448 }
449
450 #[must_use]
452 pub fn global_watermark(&self) -> Option<Watermark> {
453 if self.global_watermark == i64::MIN {
454 None
455 } else {
456 Some(Watermark::new(self.global_watermark))
457 }
458 }
459
460 #[must_use]
465 pub fn is_late(&self, key: &K, event_time: i64) -> bool {
466 self.key_states
467 .get(key)
468 .is_some_and(|s| s.is_late(event_time))
469 }
470
471 #[must_use]
475 pub fn is_late_global(&self, event_time: i64) -> bool {
476 event_time < self.global_watermark
477 }
478
479 pub fn mark_idle(&mut self, key: &K) -> Option<Watermark> {
483 if let Some(state) = self.key_states.get_mut(key) {
484 if !state.is_idle {
485 state.is_idle = true;
486 self.update_metrics_counts();
487 return self.try_advance_global();
488 }
489 }
490 None
491 }
492
493 pub fn mark_active(&mut self, key: &K) {
495 if let Some(state) = self.key_states.get_mut(key) {
496 if state.is_idle {
497 state.is_idle = false;
498 state.last_activity = Instant::now();
499 self.update_metrics_counts();
500 }
501 }
502 }
503
504 pub fn check_idle_keys(&mut self) -> Option<Watermark> {
510 let idle_timeout = self.config.idle_timeout;
511 let mut any_marked = false;
512
513 for state in self.key_states.values_mut() {
514 if !state.is_idle && state.last_activity.elapsed() >= idle_timeout {
515 state.is_idle = true;
516 any_marked = true;
517 }
518 }
519
520 if any_marked {
521 self.update_metrics_counts();
522 self.try_advance_global()
523 } else {
524 None
525 }
526 }
527
528 #[must_use]
530 pub fn active_key_count(&self) -> usize {
531 self.key_states.values().filter(|s| !s.is_idle).count()
532 }
533
534 #[must_use]
536 pub fn total_key_count(&self) -> usize {
537 self.key_states.len()
538 }
539
540 #[must_use]
542 pub fn metrics(&self) -> &KeyedWatermarkMetrics {
543 &self.metrics
544 }
545
546 pub fn recalculate_global(&mut self) -> Option<Watermark> {
550 let new_global = self.calculate_global();
551 if new_global != i64::MAX && new_global != i64::MIN {
552 self.global_watermark = new_global;
553 Some(Watermark::new(new_global))
554 } else {
555 None
556 }
557 }
558
559 pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState> {
563 let state = self.key_states.remove(key);
564 if state.is_some() {
565 self.metrics.total_keys = self.key_states.len();
566 self.update_metrics_counts();
567 let new_global = self.calculate_global();
569 if new_global > self.global_watermark && new_global != i64::MAX {
570 self.global_watermark = new_global;
571 }
572 }
573 state
574 }
575
576 pub fn clear(&mut self) {
578 self.key_states.clear();
579 self.global_watermark = i64::MIN;
580 self.metrics = KeyedWatermarkMetrics::new();
581 }
582
583 #[must_use]
585 pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState> {
586 self.key_states.get(key)
587 }
588
589 #[must_use]
591 pub fn config(&self) -> &KeyedWatermarkConfig {
592 &self.config
593 }
594
595 #[must_use]
597 pub fn contains_key(&self, key: &K) -> bool {
598 self.key_states.contains_key(key)
599 }
600
601 pub fn keys(&self) -> impl Iterator<Item = &K> {
603 self.key_states.keys()
604 }
605
606 pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)> {
608 self.key_states.iter()
609 }
610
611 #[must_use]
613 pub fn bounded_delay_ms(&self) -> i64 {
614 self.bounded_delay_ms
615 }
616
617 fn try_advance_global(&mut self) -> Option<Watermark> {
624 let new_global = self.calculate_global();
625
626 if new_global != i64::MAX && new_global != i64::MIN && new_global != self.global_watermark {
627 let old_global = self.global_watermark;
628 self.global_watermark = new_global;
629 if new_global > old_global {
630 self.metrics.global_advances += 1;
631 }
632 Some(Watermark::new(new_global))
633 } else {
634 None
635 }
636 }
637
638 fn calculate_global(&self) -> i64 {
640 let mut min_watermark = i64::MAX;
641 let mut has_active = false;
642
643 for state in self.key_states.values() {
644 if !state.is_idle {
645 has_active = true;
646 min_watermark = min_watermark.min(state.watermark);
647 }
648 }
649
650 if !has_active {
652 min_watermark = self
653 .key_states
654 .values()
655 .map(|s| s.watermark)
656 .max()
657 .unwrap_or(i64::MIN);
658 }
659
660 min_watermark
661 }
662
663 fn update_metrics_counts(&mut self) {
665 self.metrics.idle_keys = self.key_states.values().filter(|s| s.is_idle).count();
666 self.metrics.active_keys = self.metrics.total_keys - self.metrics.idle_keys;
667 }
668
669 fn evict_least_recently_active(&mut self) {
671 if let Some(key_to_evict) = self
672 .key_states
673 .iter()
674 .min_by_key(|(_, state)| state.last_activity)
675 .map(|(k, _)| k.clone())
676 {
677 self.key_states.remove(&key_to_evict);
678 self.metrics.evicted_keys += 1;
679 }
680 }
681
682 fn evict_lowest_watermark(&mut self) {
684 if let Some(key_to_evict) = self
685 .key_states
686 .iter()
687 .min_by_key(|(_, state)| state.watermark)
688 .map(|(k, _)| k.clone())
689 {
690 self.key_states.remove(&key_to_evict);
691 self.metrics.evicted_keys += 1;
692 }
693 }
694}
695
696#[derive(Debug)]
701pub struct KeyedWatermarkTrackerWithLateHandling<K: Hash + Eq + Clone> {
702 tracker: KeyedWatermarkTracker<K>,
704 late_events_per_key: FxHashMap<K, u64>,
706 total_late_events: u64,
708}
709
710impl<K: Hash + Eq + Clone> KeyedWatermarkTrackerWithLateHandling<K> {
711 #[must_use]
713 pub fn new(config: KeyedWatermarkConfig) -> Self {
714 Self {
715 tracker: KeyedWatermarkTracker::new(config),
716 late_events_per_key: FxHashMap::default(),
717 total_late_events: 0,
718 }
719 }
720
721 pub fn update_with_late_check(
730 &mut self,
731 key: K,
732 event_time: i64,
733 ) -> Result<(Option<Watermark>, bool), KeyedWatermarkError> {
734 let is_late = self.tracker.is_late(&key, event_time);
735
736 if is_late {
737 *self.late_events_per_key.entry(key.clone()).or_insert(0) += 1;
738 self.total_late_events += 1;
739 }
740
741 let wm = self.tracker.update(key, event_time)?;
742 Ok((wm, is_late))
743 }
744
745 #[must_use]
747 pub fn late_events_for_key(&self, key: &K) -> u64 {
748 self.late_events_per_key.get(key).copied().unwrap_or(0)
749 }
750
751 #[must_use]
753 pub fn total_late_events(&self) -> u64 {
754 self.total_late_events
755 }
756
757 #[must_use]
759 pub fn inner(&self) -> &KeyedWatermarkTracker<K> {
760 &self.tracker
761 }
762
763 pub fn inner_mut(&mut self) -> &mut KeyedWatermarkTracker<K> {
765 &mut self.tracker
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[test]
774 fn test_key_watermark_state_creation() {
775 let state = KeyWatermarkState::new();
776 assert_eq!(state.max_event_time, i64::MIN);
777 assert_eq!(state.watermark, i64::MIN);
778 assert!(!state.is_idle);
779 }
780
781 #[test]
782 fn test_key_watermark_state_update() {
783 let mut state = KeyWatermarkState::new();
784 let now = Instant::now();
785
786 let advanced = state.update(1000, 100, now);
788 assert!(advanced);
789 assert_eq!(state.max_event_time, 1000);
790 assert_eq!(state.watermark, 900); let advanced = state.update(800, 100, now);
794 assert!(!advanced);
795 assert_eq!(state.max_event_time, 1000); let advanced = state.update(1500, 100, now);
799 assert!(advanced);
800 assert_eq!(state.watermark, 1400);
801 }
802
803 #[test]
804 fn test_key_watermark_state_is_late() {
805 let mut state = KeyWatermarkState::new();
806 state.update(1000, 100, Instant::now()); assert!(state.is_late(800)); assert!(state.is_late(899)); assert!(!state.is_late(900)); assert!(!state.is_late(1000)); }
813
814 #[test]
815 fn test_config_defaults() {
816 let config = KeyedWatermarkConfig::default();
817 assert_eq!(config.bounded_delay, Duration::from_secs(5));
818 assert_eq!(config.idle_timeout, Duration::from_secs(60));
819 assert!(config.max_keys.is_none());
820 assert_eq!(
821 config.eviction_policy,
822 KeyEvictionPolicy::LeastRecentlyActive
823 );
824 }
825
826 #[test]
827 fn test_config_builder() {
828 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(10))
829 .with_idle_timeout(Duration::from_secs(30))
830 .with_max_keys(1000)
831 .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
832
833 assert_eq!(config.bounded_delay, Duration::from_secs(10));
834 assert_eq!(config.idle_timeout, Duration::from_secs(30));
835 assert_eq!(config.max_keys, Some(1000));
836 assert_eq!(config.eviction_policy, KeyEvictionPolicy::LowestWatermark);
837 }
838
839 #[test]
840 fn test_keyed_tracker_single_key_updates_watermark() {
841 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
842 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
843
844 let wm = tracker.update("key1".to_string(), 1000).unwrap();
845 assert_eq!(wm, Some(Watermark::new(900)));
846 assert_eq!(tracker.watermark_for_key(&"key1".to_string()), Some(900));
847 assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
848 }
849
850 #[test]
851 fn test_keyed_tracker_multiple_keys_independent_watermarks() {
852 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
853 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
854
855 tracker.update("fast".to_string(), 5000).unwrap();
856 tracker.update("slow".to_string(), 1000).unwrap();
857
858 assert_eq!(tracker.watermark_for_key(&"fast".to_string()), Some(4900));
860 assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
861
862 assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
864 }
865
866 #[test]
867 fn test_keyed_tracker_global_is_minimum_of_active_keys() {
868 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
869 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
870
871 tracker.update("a".to_string(), 5000).unwrap();
872 tracker.update("b".to_string(), 3000).unwrap();
873 tracker.update("c".to_string(), 7000).unwrap();
874
875 assert_eq!(tracker.global_watermark(), Some(Watermark::new(3000)));
876 }
877
878 #[test]
879 fn test_keyed_tracker_fast_key_does_not_affect_slow_key() {
880 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
881 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
882
883 tracker.update("slow".to_string(), 1000).unwrap();
885
886 tracker.update("fast".to_string(), 5000).unwrap();
888 tracker.update("fast".to_string(), 10000).unwrap();
889
890 assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
892
893 assert!(!tracker.is_late(&"slow".to_string(), 950));
895
896 assert!(tracker.is_late(&"fast".to_string(), 950));
898 }
899
900 #[test]
901 fn test_keyed_tracker_is_late_uses_key_watermark_not_global() {
902 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
903 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
904
905 tracker.update("fast".to_string(), 10000).unwrap(); tracker.update("slow".to_string(), 1000).unwrap(); assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
910
911 assert!(!tracker.is_late(&"slow".to_string(), 5000));
913
914 assert!(tracker.is_late(&"fast".to_string(), 5000));
916 }
917
918 #[test]
919 fn test_keyed_tracker_idle_key_excluded_from_global() {
920 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
921 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
922
923 tracker.update("fast".to_string(), 5000).unwrap();
924 tracker.update("slow".to_string(), 1000).unwrap();
925
926 assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
927
928 let wm = tracker.mark_idle(&"slow".to_string());
930 assert_eq!(wm, Some(Watermark::new(5000)));
931 assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
932 }
933
934 #[test]
935 fn test_keyed_tracker_all_idle_uses_max() {
936 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
937 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
938
939 tracker.update("a".to_string(), 5000).unwrap();
940 tracker.update("b".to_string(), 3000).unwrap();
941
942 tracker.mark_idle(&"a".to_string());
943 let wm = tracker.mark_idle(&"b".to_string());
944
945 assert_eq!(wm, Some(Watermark::new(5000)));
947 }
948
949 #[test]
950 fn test_keyed_tracker_key_eviction_lru() {
951 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
952 .with_max_keys(2)
953 .with_eviction_policy(KeyEvictionPolicy::LeastRecentlyActive);
954
955 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
956
957 tracker.update("a".to_string(), 1000).unwrap();
958 std::thread::sleep(Duration::from_millis(10));
959 tracker.update("b".to_string(), 2000).unwrap();
960
961 assert_eq!(tracker.total_key_count(), 2);
962
963 tracker.update("c".to_string(), 3000).unwrap();
965
966 assert_eq!(tracker.total_key_count(), 2);
967 assert!(!tracker.contains_key(&"a".to_string())); assert!(tracker.contains_key(&"b".to_string()));
969 assert!(tracker.contains_key(&"c".to_string()));
970 assert_eq!(tracker.metrics().evicted_keys, 1);
971 }
972
973 #[test]
974 fn test_keyed_tracker_key_eviction_lowest_watermark() {
975 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
976 .with_max_keys(2)
977 .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
978
979 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
980
981 tracker.update("high".to_string(), 5000).unwrap();
982 tracker.update("low".to_string(), 1000).unwrap();
983
984 tracker.update("mid".to_string(), 3000).unwrap();
986
987 assert!(!tracker.contains_key(&"low".to_string())); assert!(tracker.contains_key(&"high".to_string()));
989 assert!(tracker.contains_key(&"mid".to_string()));
990 }
991
992 #[test]
993 fn test_keyed_tracker_key_eviction_reject_new() {
994 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
995 .with_max_keys(2)
996 .with_eviction_policy(KeyEvictionPolicy::RejectNew);
997
998 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
999
1000 tracker.update("a".to_string(), 1000).unwrap();
1001 tracker.update("b".to_string(), 2000).unwrap();
1002
1003 let result = tracker.update("c".to_string(), 3000);
1005 assert!(matches!(
1006 result,
1007 Err(KeyedWatermarkError::MaxKeysReached { max_keys: 2 })
1008 ));
1009
1010 assert!(tracker.update("a".to_string(), 1500).is_ok());
1012 }
1013
1014 #[test]
1015 fn test_keyed_tracker_batch_update_efficient() {
1016 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1017 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1018
1019 let events = vec![
1020 ("a".to_string(), 1000),
1021 ("b".to_string(), 2000),
1022 ("a".to_string(), 1500),
1023 ("c".to_string(), 3000),
1024 ];
1025
1026 let wm = tracker.update_batch(&events).unwrap();
1027 assert!(wm.is_some());
1028
1029 assert_eq!(tracker.total_key_count(), 3);
1030 assert_eq!(tracker.watermark_for_key(&"a".to_string()), Some(1400)); assert_eq!(tracker.watermark_for_key(&"b".to_string()), Some(1900));
1032 assert_eq!(tracker.watermark_for_key(&"c".to_string()), Some(2900));
1033 }
1034
1035 #[test]
1036 fn test_keyed_tracker_remove_key_recalculates_global() {
1037 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1038 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1039
1040 tracker.update("fast".to_string(), 5000).unwrap();
1041 tracker.update("slow".to_string(), 1000).unwrap();
1042
1043 assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
1044
1045 let state = tracker.remove_key(&"slow".to_string());
1047 assert!(state.is_some());
1048 assert_eq!(state.unwrap().watermark, 1000);
1049
1050 assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
1052 }
1053
1054 #[test]
1055 fn test_keyed_tracker_check_idle_keys() {
1056 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
1057 .with_idle_timeout(Duration::from_millis(10));
1058
1059 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1060
1061 tracker.update("fast".to_string(), 5000).unwrap();
1062 tracker.update("slow".to_string(), 1000).unwrap();
1063
1064 std::thread::sleep(Duration::from_millis(20));
1066
1067 tracker.update("fast".to_string(), 6000).unwrap();
1069
1070 let wm = tracker.check_idle_keys();
1072
1073 assert!(tracker.key_state(&"slow".to_string()).unwrap().is_idle);
1075
1076 assert!(wm.is_some() || tracker.global_watermark() == Some(Watermark::new(6000)));
1078 }
1079
1080 #[test]
1081 fn test_keyed_tracker_metrics() {
1082 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1083 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1084
1085 tracker.update("a".to_string(), 1000).unwrap();
1086 tracker.update("b".to_string(), 2000).unwrap();
1087 tracker.update("a".to_string(), 1500).unwrap(); let metrics = tracker.metrics();
1090 assert_eq!(metrics.total_keys, 2);
1091 assert_eq!(metrics.active_keys, 2);
1092 assert_eq!(metrics.idle_keys, 0);
1093 assert!(metrics.key_advances >= 3); assert!(metrics.global_advances >= 1);
1095
1096 tracker.mark_idle(&"b".to_string());
1097
1098 let metrics = tracker.metrics();
1099 assert_eq!(metrics.active_keys, 1);
1100 assert_eq!(metrics.idle_keys, 1);
1101 }
1102
1103 #[test]
1104 fn test_keyed_tracker_clear() {
1105 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1106 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1107
1108 tracker.update("a".to_string(), 1000).unwrap();
1109 tracker.update("b".to_string(), 2000).unwrap();
1110
1111 tracker.clear();
1112
1113 assert_eq!(tracker.total_key_count(), 0);
1114 assert_eq!(tracker.global_watermark(), None);
1115 }
1116
1117 #[test]
1118 fn test_keyed_tracker_is_late_global() {
1119 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1120 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1121
1122 tracker.update("fast".to_string(), 5000).unwrap();
1123 tracker.update("slow".to_string(), 1000).unwrap();
1124
1125 assert!(!tracker.is_late_global(1000));
1127 assert!(tracker.is_late_global(999));
1128 }
1129
1130 #[test]
1131 fn test_keyed_tracker_iteration() {
1132 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1133 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1134
1135 tracker.update("a".to_string(), 1000).unwrap();
1136 tracker.update("b".to_string(), 2000).unwrap();
1137
1138 let keys: Vec<_> = tracker.keys().collect();
1139 assert_eq!(keys.len(), 2);
1140
1141 let pairs: Vec<_> = tracker.iter().collect();
1142 assert_eq!(pairs.len(), 2);
1143 }
1144
1145 #[test]
1146 fn test_late_handling_tracker() {
1147 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1148 let mut tracker: KeyedWatermarkTrackerWithLateHandling<String> =
1149 KeyedWatermarkTrackerWithLateHandling::new(config);
1150
1151 let (wm, is_late) = tracker
1153 .update_with_late_check("key1".to_string(), 1000)
1154 .unwrap();
1155 assert!(wm.is_some());
1156 assert!(!is_late);
1157
1158 let (_, is_late) = tracker
1160 .update_with_late_check("key1".to_string(), 950)
1161 .unwrap();
1162 assert!(!is_late); let (_, is_late) = tracker
1166 .update_with_late_check("key1".to_string(), 800)
1167 .unwrap();
1168 assert!(is_late); assert_eq!(tracker.late_events_for_key(&"key1".to_string()), 1);
1171 assert_eq!(tracker.total_late_events(), 1);
1172 }
1173}