1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
44
45use super::Watermark;
46
47pub trait WatermarkGenerator: Send {
53 fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
58
59 fn on_periodic(&mut self) -> Option<Watermark>;
63
64 fn current_watermark(&self) -> i64;
66
67 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
73}
74
75pub struct BoundedOutOfOrdernessGenerator {
97 max_out_of_orderness: i64,
98 current_max_timestamp: i64,
99 current_watermark: i64,
100}
101
102impl BoundedOutOfOrdernessGenerator {
103 #[must_use]
109 pub fn new(max_out_of_orderness: i64) -> Self {
110 Self {
111 max_out_of_orderness,
112 current_max_timestamp: i64::MIN,
113 current_watermark: i64::MIN,
114 }
115 }
116
117 #[must_use]
119 #[allow(clippy::cast_possible_truncation)] pub fn from_duration(max_out_of_orderness: Duration) -> Self {
121 Self::new(max_out_of_orderness.as_millis() as i64)
122 }
123
124 #[must_use]
126 pub fn max_out_of_orderness(&self) -> i64 {
127 self.max_out_of_orderness
128 }
129}
130
131impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
132 #[inline]
133 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
134 if timestamp > self.current_max_timestamp {
135 self.current_max_timestamp = timestamp;
136 let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
137 if new_watermark > self.current_watermark {
138 self.current_watermark = new_watermark;
139 return Some(Watermark::new(new_watermark));
140 }
141 }
142 None
143 }
144
145 #[inline]
146 fn on_periodic(&mut self) -> Option<Watermark> {
147 None
149 }
150
151 #[inline]
152 fn current_watermark(&self) -> i64 {
153 self.current_watermark
154 }
155
156 #[inline]
157 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
158 if timestamp > self.current_watermark {
159 self.current_watermark = timestamp;
160 let min_max = timestamp.saturating_add(self.max_out_of_orderness);
162 if min_max > self.current_max_timestamp {
163 self.current_max_timestamp = min_max;
164 }
165 Some(Watermark::new(timestamp))
166 } else {
167 None
168 }
169 }
170}
171
172#[derive(Debug, Default)]
187pub struct AscendingTimestampsGenerator {
188 current_watermark: i64,
189}
190
191impl AscendingTimestampsGenerator {
192 #[must_use]
194 pub fn new() -> Self {
195 Self {
196 current_watermark: i64::MIN,
197 }
198 }
199}
200
201impl WatermarkGenerator for AscendingTimestampsGenerator {
202 #[inline]
203 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
204 if timestamp > self.current_watermark {
205 self.current_watermark = timestamp;
206 Some(Watermark::new(timestamp))
207 } else {
208 None
209 }
210 }
211
212 #[inline]
213 fn on_periodic(&mut self) -> Option<Watermark> {
214 None
215 }
216
217 #[inline]
218 fn current_watermark(&self) -> i64 {
219 self.current_watermark
220 }
221
222 #[inline]
223 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
224 if timestamp > self.current_watermark {
225 self.current_watermark = timestamp;
226 Some(Watermark::new(timestamp))
227 } else {
228 None
229 }
230 }
231}
232
233pub struct PeriodicGenerator<G: WatermarkGenerator> {
258 inner: G,
259 period: Duration,
260 last_emit_time: Instant,
261 last_emitted_watermark: i64,
262}
263
264impl<G: WatermarkGenerator> PeriodicGenerator<G> {
265 #[must_use]
272 pub fn new(inner: G, period: Duration) -> Self {
273 Self {
274 inner,
275 period,
276 last_emit_time: Instant::now(),
277 last_emitted_watermark: i64::MIN,
278 }
279 }
280
281 #[must_use]
283 pub fn inner(&self) -> &G {
284 &self.inner
285 }
286
287 pub fn inner_mut(&mut self) -> &mut G {
289 &mut self.inner
290 }
291}
292
293impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
294 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
295 let wm = self.inner.on_event(timestamp);
296 if let Some(ref w) = wm {
297 self.last_emitted_watermark = w.timestamp();
298 self.last_emit_time = Instant::now();
299 }
300 wm
301 }
302
303 fn on_periodic(&mut self) -> Option<Watermark> {
304 if self.last_emit_time.elapsed() >= self.period {
306 let current = self.inner.current_watermark();
307 if current > self.last_emitted_watermark {
308 self.last_emitted_watermark = current;
309 self.last_emit_time = Instant::now();
310 return Some(Watermark::new(current));
311 }
312 self.last_emit_time = Instant::now();
313 }
314 None
315 }
316
317 fn current_watermark(&self) -> i64 {
318 self.inner.current_watermark()
319 }
320
321 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
322 let wm = self.inner.advance_watermark(timestamp);
323 if let Some(ref w) = wm {
324 self.last_emitted_watermark = w.timestamp();
325 self.last_emit_time = Instant::now();
326 }
327 wm
328 }
329}
330
331pub struct PunctuatedGenerator<F>
354where
355 F: Fn(i64) -> Option<Watermark> + Send,
356{
357 predicate: F,
358 current_watermark: i64,
359}
360
361impl<F> PunctuatedGenerator<F>
362where
363 F: Fn(i64) -> Option<Watermark> + Send,
364{
365 #[must_use]
371 pub fn new(predicate: F) -> Self {
372 Self {
373 predicate,
374 current_watermark: i64::MIN,
375 }
376 }
377}
378
379impl<F> WatermarkGenerator for PunctuatedGenerator<F>
380where
381 F: Fn(i64) -> Option<Watermark> + Send,
382{
383 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
384 if let Some(wm) = (self.predicate)(timestamp) {
385 if wm.timestamp() > self.current_watermark {
386 self.current_watermark = wm.timestamp();
387 return Some(wm);
388 }
389 }
390 None
391 }
392
393 fn on_periodic(&mut self) -> Option<Watermark> {
394 None
395 }
396
397 fn current_watermark(&self) -> i64 {
398 self.current_watermark
399 }
400
401 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
402 if timestamp > self.current_watermark {
403 self.current_watermark = timestamp;
404 Some(Watermark::new(timestamp))
405 } else {
406 None
407 }
408 }
409}
410
411#[derive(Debug)]
436pub struct WatermarkTracker {
437 source_watermarks: Vec<i64>,
439 combined_watermark: i64,
441 idle_sources: Vec<bool>,
443 last_activity: Vec<Instant>,
445 idle_timeout: Duration,
447}
448
449impl WatermarkTracker {
450 #[must_use]
452 pub fn new(num_sources: usize) -> Self {
453 Self {
454 source_watermarks: vec![i64::MIN; num_sources],
455 combined_watermark: i64::MIN,
456 idle_sources: vec![false; num_sources],
457 last_activity: vec![Instant::now(); num_sources],
458 idle_timeout: Duration::from_secs(30), }
460 }
461
462 #[must_use]
464 pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
465 Self {
466 source_watermarks: vec![i64::MIN; num_sources],
467 combined_watermark: i64::MIN,
468 idle_sources: vec![false; num_sources],
469 last_activity: vec![Instant::now(); num_sources],
470 idle_timeout,
471 }
472 }
473
474 pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
478 if source_id >= self.source_watermarks.len() {
479 return None;
480 }
481
482 self.idle_sources[source_id] = false;
484 self.last_activity[source_id] = Instant::now();
485
486 if watermark > self.source_watermarks[source_id] {
488 self.source_watermarks[source_id] = watermark;
489 self.update_combined()
490 } else {
491 None
492 }
493 }
494
495 pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
499 if source_id >= self.idle_sources.len() {
500 return None;
501 }
502
503 self.idle_sources[source_id] = true;
504 self.update_combined()
505 }
506
507 pub fn check_idle_sources(&mut self) -> Option<Watermark> {
511 let mut any_marked = false;
512 for i in 0..self.idle_sources.len() {
513 if !self.idle_sources[i] && self.last_activity[i].elapsed() >= self.idle_timeout {
514 self.idle_sources[i] = true;
515 any_marked = true;
516 }
517 }
518 if any_marked {
519 self.update_combined()
520 } else {
521 None
522 }
523 }
524
525 #[must_use]
527 pub fn current_watermark(&self) -> Option<Watermark> {
528 if self.combined_watermark == i64::MIN {
529 None
530 } else {
531 Some(Watermark::new(self.combined_watermark))
532 }
533 }
534
535 #[must_use]
537 pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
538 self.source_watermarks.get(source_id).copied()
539 }
540
541 #[must_use]
543 pub fn is_idle(&self, source_id: usize) -> bool {
544 self.idle_sources.get(source_id).copied().unwrap_or(false)
545 }
546
547 #[must_use]
549 pub fn num_sources(&self) -> usize {
550 self.source_watermarks.len()
551 }
552
553 #[must_use]
555 pub fn active_source_count(&self) -> usize {
556 self.idle_sources.iter().filter(|&&idle| !idle).count()
557 }
558
559 fn update_combined(&mut self) -> Option<Watermark> {
561 let mut min_watermark = i64::MAX;
563 let mut has_active = false;
564
565 for (i, &wm) in self.source_watermarks.iter().enumerate() {
566 if !self.idle_sources[i] {
567 has_active = true;
568 min_watermark = min_watermark.min(wm);
569 }
570 }
571
572 if !has_active {
574 min_watermark = self
575 .source_watermarks
576 .iter()
577 .copied()
578 .max()
579 .unwrap_or(i64::MIN);
580 }
581
582 if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
583 self.combined_watermark = min_watermark;
584 Some(Watermark::new(min_watermark))
585 } else {
586 None
587 }
588 }
589}
590
591pub struct SourceProvidedGenerator {
596 source_watermark: i64,
598 fallback: BoundedOutOfOrdernessGenerator,
600 prefer_source: bool,
602}
603
604impl SourceProvidedGenerator {
605 #[must_use]
612 pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
613 Self {
614 source_watermark: i64::MIN,
615 fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
616 prefer_source,
617 }
618 }
619
620 pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
624 if watermark > self.source_watermark {
625 self.source_watermark = watermark;
626 if self.prefer_source || watermark > self.fallback.current_watermark() {
627 return Some(Watermark::new(watermark));
628 }
629 }
630 None
631 }
632}
633
634impl WatermarkGenerator for SourceProvidedGenerator {
635 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
636 let fallback_wm = self.fallback.on_event(timestamp);
637
638 if self.prefer_source {
639 if self.source_watermark > i64::MIN {
641 return None; }
643 }
644
645 fallback_wm
646 }
647
648 fn on_periodic(&mut self) -> Option<Watermark> {
649 None
650 }
651
652 fn current_watermark(&self) -> i64 {
653 if self.prefer_source && self.source_watermark > i64::MIN {
654 self.source_watermark
655 } else {
656 self.fallback.current_watermark().max(self.source_watermark)
657 }
658 }
659
660 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
661 self.on_source_watermark(timestamp)
662 }
663}
664
665pub struct ProcessingTimeGenerator {
687 current_watermark: i64,
688}
689
690impl ProcessingTimeGenerator {
691 #[must_use]
693 pub fn new() -> Self {
694 Self {
695 current_watermark: i64::MIN,
696 }
697 }
698
699 #[allow(clippy::cast_possible_truncation)]
701 fn now_millis() -> i64 {
702 SystemTime::now()
703 .duration_since(UNIX_EPOCH)
704 .unwrap_or(Duration::ZERO)
705 .as_millis() as i64
706 }
707}
708
709impl Default for ProcessingTimeGenerator {
710 fn default() -> Self {
711 Self::new()
712 }
713}
714
715impl WatermarkGenerator for ProcessingTimeGenerator {
716 #[inline]
717 fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
718 None
720 }
721
722 #[inline]
723 fn on_periodic(&mut self) -> Option<Watermark> {
724 let now = Self::now_millis();
725 if now > self.current_watermark {
726 self.current_watermark = now;
727 Some(Watermark::new(now))
728 } else {
729 None
730 }
731 }
732
733 #[inline]
734 fn current_watermark(&self) -> i64 {
735 self.current_watermark
736 }
737
738 #[inline]
739 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
740 if timestamp > self.current_watermark {
741 self.current_watermark = timestamp;
742 Some(Watermark::new(timestamp))
743 } else {
744 None
745 }
746 }
747}
748
749#[derive(Debug, Clone, Default)]
751pub struct WatermarkMetrics {
752 pub current_watermark: i64,
754 pub max_event_timestamp: i64,
756 pub watermarks_emitted: u64,
758 pub late_events: u64,
760}
761
762impl WatermarkMetrics {
763 #[must_use]
765 pub fn new() -> Self {
766 Self::default()
767 }
768
769 #[must_use]
771 pub fn lag(&self) -> i64 {
772 self.max_event_timestamp
773 .saturating_sub(self.current_watermark)
774 }
775}
776
777pub struct MeteredGenerator<G: WatermarkGenerator> {
779 inner: G,
780 metrics: WatermarkMetrics,
781}
782
783impl<G: WatermarkGenerator> MeteredGenerator<G> {
784 #[must_use]
786 pub fn new(inner: G) -> Self {
787 Self {
788 inner,
789 metrics: WatermarkMetrics::new(),
790 }
791 }
792
793 #[must_use]
795 pub fn metrics(&self) -> &WatermarkMetrics {
796 &self.metrics
797 }
798
799 pub fn inner_mut(&mut self) -> &mut G {
801 &mut self.inner
802 }
803
804 pub fn record_late_event(&mut self) {
806 self.metrics.late_events += 1;
807 }
808}
809
810impl<G: WatermarkGenerator> WatermarkGenerator for MeteredGenerator<G> {
811 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
812 if timestamp > self.metrics.max_event_timestamp {
814 self.metrics.max_event_timestamp = timestamp;
815 }
816
817 let wm = self.inner.on_event(timestamp);
818 if let Some(ref w) = wm {
819 self.metrics.current_watermark = w.timestamp();
820 self.metrics.watermarks_emitted += 1;
821 }
822 wm
823 }
824
825 fn on_periodic(&mut self) -> Option<Watermark> {
826 let wm = self.inner.on_periodic();
827 if let Some(ref w) = wm {
828 self.metrics.current_watermark = w.timestamp();
829 self.metrics.watermarks_emitted += 1;
830 }
831 wm
832 }
833
834 fn current_watermark(&self) -> i64 {
835 self.inner.current_watermark()
836 }
837
838 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
839 let wm = self.inner.advance_watermark(timestamp);
840 if let Some(ref w) = wm {
841 self.metrics.current_watermark = w.timestamp();
842 self.metrics.watermarks_emitted += 1;
843 }
844 wm
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851
852 #[test]
853 fn test_bounded_generator_first_event() {
854 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
855 let wm = gen.on_event(1000);
856 assert_eq!(wm, Some(Watermark::new(900)));
857 assert_eq!(gen.current_watermark(), 900);
858 }
859
860 #[test]
861 fn test_bounded_generator_out_of_order() {
862 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
863
864 gen.on_event(1000);
866
867 let wm = gen.on_event(800);
869 assert_eq!(wm, None);
870 assert_eq!(gen.current_watermark(), 900); }
872
873 #[test]
874 fn test_bounded_generator_advancement() {
875 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
876
877 gen.on_event(1000);
878 let wm = gen.on_event(1200);
879
880 assert_eq!(wm, Some(Watermark::new(1100)));
881 }
882
883 #[test]
884 fn test_bounded_generator_from_duration() {
885 let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
886 assert_eq!(gen.max_out_of_orderness(), 5000);
887 }
888
889 #[test]
890 fn test_bounded_generator_no_periodic() {
891 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
892 assert_eq!(gen.on_periodic(), None);
893 }
894
895 #[test]
896 fn test_ascending_generator_advances_on_each_event() {
897 let mut gen = AscendingTimestampsGenerator::new();
898
899 let wm1 = gen.on_event(1000);
900 assert_eq!(wm1, Some(Watermark::new(1000)));
901
902 let wm2 = gen.on_event(2000);
903 assert_eq!(wm2, Some(Watermark::new(2000)));
904 }
905
906 #[test]
907 fn test_ascending_generator_ignores_backwards() {
908 let mut gen = AscendingTimestampsGenerator::new();
909
910 gen.on_event(2000);
911 let wm = gen.on_event(1000); assert_eq!(wm, None);
914 assert_eq!(gen.current_watermark(), 2000);
915 }
916
917 #[test]
918 fn test_periodic_generator_passes_through() {
919 let inner = BoundedOutOfOrdernessGenerator::new(100);
920 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
921
922 let wm = gen.on_event(1000);
923 assert_eq!(wm, Some(Watermark::new(900)));
924 }
925
926 #[test]
927 fn test_periodic_generator_inner_access() {
928 let inner = BoundedOutOfOrdernessGenerator::new(100);
929 let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
930
931 assert_eq!(gen.inner().max_out_of_orderness(), 100);
932 }
933
934 #[test]
935 fn test_punctuated_generator_predicate() {
936 let mut gen = PunctuatedGenerator::new(|ts| {
937 if ts % 1000 == 0 {
938 Some(Watermark::new(ts))
939 } else {
940 None
941 }
942 });
943
944 assert_eq!(gen.on_event(500), None);
945 assert_eq!(gen.on_event(999), None);
946 assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
947 assert_eq!(gen.on_event(1500), None);
948 assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
949 }
950
951 #[test]
952 fn test_punctuated_generator_no_regression() {
953 let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
954
955 gen.on_event(2000);
956 let wm = gen.on_event(1000); assert_eq!(wm, None);
959 assert_eq!(gen.current_watermark(), 2000);
960 }
961
962 #[test]
963 fn test_tracker_single_source() {
964 let mut tracker = WatermarkTracker::new(1);
965
966 let wm = tracker.update_source(0, 1000);
967 assert_eq!(wm, Some(Watermark::new(1000)));
968 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
969 }
970
971 #[test]
972 fn test_tracker_multiple_sources() {
973 let mut tracker = WatermarkTracker::new(3);
974
975 tracker.update_source(0, 1000);
977 tracker.update_source(1, 2000);
978 let wm = tracker.update_source(2, 500);
979
980 assert_eq!(wm, Some(Watermark::new(500))); }
982
983 #[test]
984 fn test_tracker_min_watermark() {
985 let mut tracker = WatermarkTracker::new(2);
986
987 tracker.update_source(0, 5000);
988 tracker.update_source(1, 3000);
989
990 assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
991
992 tracker.update_source(1, 4000);
994 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
995 }
996
997 #[test]
998 fn test_tracker_idle_source() {
999 let mut tracker = WatermarkTracker::new(2);
1000
1001 tracker.update_source(0, 5000);
1002 tracker.update_source(1, 1000);
1003
1004 let wm = tracker.mark_idle(1);
1006
1007 assert_eq!(wm, Some(Watermark::new(5000)));
1009 }
1010
1011 #[test]
1012 fn test_tracker_all_idle() {
1013 let mut tracker = WatermarkTracker::new(2);
1014
1015 tracker.update_source(0, 5000);
1016 tracker.update_source(1, 3000);
1017
1018 tracker.mark_idle(0);
1019 let wm = tracker.mark_idle(1);
1020
1021 assert_eq!(wm, Some(Watermark::new(5000)));
1023 }
1024
1025 #[test]
1026 fn test_tracker_source_watermark() {
1027 let mut tracker = WatermarkTracker::new(2);
1028
1029 tracker.update_source(0, 1000);
1030 tracker.update_source(1, 2000);
1031
1032 assert_eq!(tracker.source_watermark(0), Some(1000));
1033 assert_eq!(tracker.source_watermark(1), Some(2000));
1034 assert_eq!(tracker.source_watermark(5), None); }
1036
1037 #[test]
1038 fn test_tracker_active_source_count() {
1039 let mut tracker = WatermarkTracker::new(3);
1040
1041 assert_eq!(tracker.active_source_count(), 3);
1042
1043 tracker.mark_idle(0);
1044 assert_eq!(tracker.active_source_count(), 2);
1045
1046 tracker.mark_idle(2);
1047 assert_eq!(tracker.active_source_count(), 1);
1048
1049 tracker.update_source(0, 1000);
1051 assert_eq!(tracker.active_source_count(), 2);
1052 }
1053
1054 #[test]
1055 fn test_tracker_invalid_source() {
1056 let mut tracker = WatermarkTracker::new(2);
1057
1058 let wm = tracker.update_source(5, 1000); assert_eq!(wm, None);
1060
1061 let wm = tracker.mark_idle(5);
1062 assert_eq!(wm, None);
1063 }
1064
1065 #[test]
1066 fn test_source_provided_fallback() {
1067 let mut gen = SourceProvidedGenerator::new(100, false);
1068
1069 let wm = gen.on_event(1000);
1070 assert_eq!(wm, Some(Watermark::new(900))); }
1072
1073 #[test]
1074 fn test_source_provided_explicit_watermark() {
1075 let mut gen = SourceProvidedGenerator::new(100, true);
1076
1077 let wm = gen.on_source_watermark(500);
1078 assert_eq!(wm, Some(Watermark::new(500)));
1079 assert_eq!(gen.current_watermark(), 500);
1080 }
1081
1082 #[test]
1083 fn test_metered_generator_tracks_metrics() {
1084 let inner = BoundedOutOfOrdernessGenerator::new(100);
1085 let mut gen = MeteredGenerator::new(inner);
1086
1087 gen.on_event(1000);
1088 gen.on_event(2000);
1089 gen.on_event(1500); let metrics = gen.metrics();
1092 assert_eq!(metrics.max_event_timestamp, 2000);
1093 assert_eq!(metrics.watermarks_emitted, 2); }
1095
1096 #[test]
1097 fn test_metered_generator_lag() {
1098 let inner = BoundedOutOfOrdernessGenerator::new(100);
1099 let mut gen = MeteredGenerator::new(inner);
1100
1101 gen.on_event(1000);
1102
1103 let metrics = gen.metrics();
1104 assert_eq!(metrics.lag(), 100); }
1106
1107 #[test]
1108 fn test_metered_generator_late_events() {
1109 let inner = BoundedOutOfOrdernessGenerator::new(100);
1110 let mut gen = MeteredGenerator::new(inner);
1111
1112 gen.record_late_event();
1113 gen.record_late_event();
1114
1115 assert_eq!(gen.metrics().late_events, 2);
1116 }
1117
1118 #[test]
1119 fn test_watermark_metrics_default() {
1120 let metrics = WatermarkMetrics::new();
1121 assert_eq!(metrics.current_watermark, 0);
1122 assert_eq!(metrics.max_event_timestamp, 0);
1123 assert_eq!(metrics.watermarks_emitted, 0);
1124 assert_eq!(metrics.late_events, 0);
1125 }
1126
1127 #[test]
1130 fn test_advance_watermark_bounded_generator() {
1131 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1132
1133 let wm = gen.advance_watermark(500);
1135 assert_eq!(wm, Some(Watermark::new(500)));
1136 assert_eq!(gen.current_watermark(), 500);
1137
1138 let wm = gen.advance_watermark(800);
1140 assert_eq!(wm, Some(Watermark::new(800)));
1141 assert_eq!(gen.current_watermark(), 800);
1142
1143 let wm = gen.advance_watermark(600);
1145 assert_eq!(wm, None);
1146 assert_eq!(gen.current_watermark(), 800);
1147 }
1148
1149 #[test]
1150 fn test_advance_watermark_maintains_invariant() {
1151 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1152
1153 gen.on_event(1000); gen.advance_watermark(1200);
1158 assert_eq!(gen.current_watermark(), 1200);
1159
1160 let wm = gen.on_event(1250);
1163 assert_eq!(wm, None);
1164 assert_eq!(gen.current_watermark(), 1200);
1165
1166 let wm = gen.on_event(1400);
1168 assert_eq!(wm, Some(Watermark::new(1300)));
1169 }
1170
1171 #[test]
1172 fn test_advance_watermark_ascending_generator() {
1173 let mut gen = AscendingTimestampsGenerator::new();
1174
1175 let wm = gen.advance_watermark(500);
1176 assert_eq!(wm, Some(Watermark::new(500)));
1177 assert_eq!(gen.current_watermark(), 500);
1178
1179 let wm = gen.advance_watermark(300);
1181 assert_eq!(wm, None);
1182 assert_eq!(gen.current_watermark(), 500);
1183
1184 let wm = gen.advance_watermark(1000);
1186 assert_eq!(wm, Some(Watermark::new(1000)));
1187 }
1188
1189 #[test]
1190 fn test_advance_watermark_periodic_generator() {
1191 let inner = BoundedOutOfOrdernessGenerator::new(100);
1192 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
1193
1194 let wm = gen.advance_watermark(500);
1195 assert_eq!(wm, Some(Watermark::new(500)));
1196 assert_eq!(gen.current_watermark(), 500);
1197
1198 let wm = gen.advance_watermark(300);
1200 assert_eq!(wm, None);
1201 }
1202
1203 #[test]
1204 fn test_advance_watermark_punctuated_generator() {
1205 let mut gen = PunctuatedGenerator::new(|ts| {
1206 if ts % 1000 == 0 {
1207 Some(Watermark::new(ts))
1208 } else {
1209 None
1210 }
1211 });
1212
1213 let wm = gen.advance_watermark(500);
1215 assert_eq!(wm, Some(Watermark::new(500)));
1216 assert_eq!(gen.current_watermark(), 500);
1217
1218 let wm = gen.advance_watermark(200);
1220 assert_eq!(wm, None);
1221 }
1222
1223 #[test]
1224 fn test_advance_watermark_source_provided_generator() {
1225 let mut gen = SourceProvidedGenerator::new(100, true);
1226
1227 let wm = gen.advance_watermark(500);
1228 assert_eq!(wm, Some(Watermark::new(500)));
1229 assert_eq!(gen.current_watermark(), 500);
1230
1231 let wm = gen.advance_watermark(300);
1233 assert_eq!(wm, None);
1234 }
1235
1236 #[test]
1237 fn test_advance_watermark_metered_generator() {
1238 let inner = BoundedOutOfOrdernessGenerator::new(100);
1239 let mut gen = MeteredGenerator::new(inner);
1240
1241 let wm = gen.advance_watermark(500);
1242 assert_eq!(wm, Some(Watermark::new(500)));
1243 assert_eq!(gen.metrics().current_watermark, 500);
1244 assert_eq!(gen.metrics().watermarks_emitted, 1);
1245
1246 gen.advance_watermark(800);
1248 assert_eq!(gen.metrics().current_watermark, 800);
1249 assert_eq!(gen.metrics().watermarks_emitted, 2);
1250
1251 gen.advance_watermark(600);
1253 assert_eq!(gen.metrics().watermarks_emitted, 2);
1254 }
1255
1256 #[test]
1259 fn test_processing_time_generator_ignores_events() {
1260 let mut gen = ProcessingTimeGenerator::new();
1261 assert_eq!(gen.on_event(1000), None);
1262 assert_eq!(gen.on_event(2000), None);
1263 assert_eq!(gen.current_watermark(), i64::MIN);
1264 }
1265
1266 #[test]
1267 fn test_processing_time_generator_periodic() {
1268 let mut gen = ProcessingTimeGenerator::new();
1269 let wm = gen.on_periodic();
1270 assert!(wm.is_some());
1271 let ts = wm.unwrap().timestamp();
1272 assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
1274 }
1275
1276 #[test]
1277 fn test_processing_time_generator_advance_watermark() {
1278 let mut gen = ProcessingTimeGenerator::new();
1279
1280 let wm = gen.advance_watermark(500);
1281 assert_eq!(wm, Some(Watermark::new(500)));
1282 assert_eq!(gen.current_watermark(), 500);
1283
1284 let wm = gen.advance_watermark(300);
1286 assert_eq!(wm, None);
1287 assert_eq!(gen.current_watermark(), 500);
1288
1289 let wm = gen.advance_watermark(1000);
1291 assert_eq!(wm, Some(Watermark::new(1000)));
1292 }
1293
1294 #[test]
1295 fn test_processing_time_generator_default() {
1296 let gen = ProcessingTimeGenerator::default();
1297 assert_eq!(gen.current_watermark(), i64::MIN);
1298 }
1299}