1use std::time::{Duration, Instant};
4
5use super::Watermark;
6
7pub trait WatermarkGenerator: Send {
13 fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
18
19 fn on_periodic(&mut self) -> Option<Watermark>;
23
24 fn current_watermark(&self) -> i64;
26
27 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
33
34 fn is_processing_time(&self) -> bool {
40 false
41 }
42}
43
44pub const DEFAULT_MAX_FUTURE_SKEW_MS: i64 = 5 * 60 * 1000;
46
47#[inline]
50fn is_grossly_future(timestamp: i64, skew_ms: i64) -> bool {
51 if skew_ms <= 0 {
52 return false;
53 }
54 let now = super::now_unix_millis();
55 now > 0 && timestamp > now.saturating_add(skew_ms)
56}
57
58pub struct BoundedOutOfOrdernessGenerator {
61 max_out_of_orderness: i64,
62 current_max_timestamp: i64,
63 current_watermark: i64,
64 max_future_skew_ms: i64,
66}
67
68impl BoundedOutOfOrdernessGenerator {
69 #[must_use]
75 pub fn new(max_out_of_orderness: i64) -> Self {
76 Self {
77 max_out_of_orderness,
78 current_max_timestamp: i64::MIN,
79 current_watermark: i64::MIN,
80 max_future_skew_ms: DEFAULT_MAX_FUTURE_SKEW_MS,
81 }
82 }
83
84 #[must_use]
86 pub fn with_max_future_skew(mut self, skew_ms: i64) -> Self {
87 self.max_future_skew_ms = skew_ms;
88 self
89 }
90
91 #[must_use]
93 #[allow(clippy::cast_possible_truncation)] pub fn from_duration(max_out_of_orderness: Duration) -> Self {
95 Self::new(max_out_of_orderness.as_millis() as i64)
96 }
97
98 #[must_use]
100 pub fn max_out_of_orderness(&self) -> i64 {
101 self.max_out_of_orderness
102 }
103}
104
105impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
106 #[inline]
107 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
108 if is_grossly_future(timestamp, self.max_future_skew_ms) {
109 return None;
110 }
111 if timestamp > self.current_max_timestamp {
112 self.current_max_timestamp = timestamp;
113 let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
114 if new_watermark > self.current_watermark {
115 self.current_watermark = new_watermark;
116 return Some(Watermark::new(new_watermark));
117 }
118 }
119 None
120 }
121
122 #[inline]
123 fn on_periodic(&mut self) -> Option<Watermark> {
124 None
126 }
127
128 #[inline]
129 fn current_watermark(&self) -> i64 {
130 self.current_watermark
131 }
132
133 #[inline]
134 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
135 if is_grossly_future(timestamp, self.max_future_skew_ms) {
136 return None;
137 }
138 if timestamp > self.current_watermark {
139 self.current_watermark = timestamp;
140 let min_max = timestamp.saturating_add(self.max_out_of_orderness);
142 if min_max > self.current_max_timestamp {
143 self.current_max_timestamp = min_max;
144 }
145 Some(Watermark::new(timestamp))
146 } else {
147 None
148 }
149 }
150}
151
152#[derive(Debug)]
155pub struct AscendingTimestampsGenerator {
156 current_watermark: i64,
157 max_future_skew_ms: i64,
159}
160
161impl Default for AscendingTimestampsGenerator {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl AscendingTimestampsGenerator {
168 #[must_use]
170 pub fn new() -> Self {
171 Self {
172 current_watermark: i64::MIN,
173 max_future_skew_ms: DEFAULT_MAX_FUTURE_SKEW_MS,
174 }
175 }
176
177 #[must_use]
179 pub fn with_max_future_skew(mut self, skew_ms: i64) -> Self {
180 self.max_future_skew_ms = skew_ms;
181 self
182 }
183}
184
185impl WatermarkGenerator for AscendingTimestampsGenerator {
186 #[inline]
187 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
188 if is_grossly_future(timestamp, self.max_future_skew_ms) {
189 return None;
190 }
191 if timestamp > self.current_watermark {
192 self.current_watermark = timestamp;
193 Some(Watermark::new(timestamp))
194 } else {
195 None
196 }
197 }
198
199 #[inline]
200 fn on_periodic(&mut self) -> Option<Watermark> {
201 None
202 }
203
204 #[inline]
205 fn current_watermark(&self) -> i64 {
206 self.current_watermark
207 }
208
209 #[inline]
210 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
211 if is_grossly_future(timestamp, self.max_future_skew_ms) {
212 return None;
213 }
214 if timestamp > self.current_watermark {
215 self.current_watermark = timestamp;
216 Some(Watermark::new(timestamp))
217 } else {
218 None
219 }
220 }
221}
222
223pub struct PeriodicGenerator<G: WatermarkGenerator> {
226 inner: G,
227 period: Duration,
228 last_emit_time: Instant,
229 last_emitted_watermark: i64,
230}
231
232impl<G: WatermarkGenerator> PeriodicGenerator<G> {
233 #[must_use]
240 pub fn new(inner: G, period: Duration) -> Self {
241 Self {
242 inner,
243 period,
244 last_emit_time: Instant::now(),
245 last_emitted_watermark: i64::MIN,
246 }
247 }
248
249 #[must_use]
251 pub fn inner(&self) -> &G {
252 &self.inner
253 }
254
255 pub fn inner_mut(&mut self) -> &mut G {
257 &mut self.inner
258 }
259}
260
261impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
262 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
263 let wm = self.inner.on_event(timestamp);
264 if let Some(ref w) = wm {
265 self.last_emitted_watermark = w.timestamp();
266 self.last_emit_time = Instant::now();
267 }
268 wm
269 }
270
271 fn on_periodic(&mut self) -> Option<Watermark> {
272 if self.last_emit_time.elapsed() >= self.period {
274 let current = self.inner.current_watermark();
275 if current > self.last_emitted_watermark {
276 self.last_emitted_watermark = current;
277 self.last_emit_time = Instant::now();
278 return Some(Watermark::new(current));
279 }
280 self.last_emit_time = Instant::now();
281 }
282 None
283 }
284
285 fn current_watermark(&self) -> i64 {
286 self.inner.current_watermark()
287 }
288
289 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
290 let wm = self.inner.advance_watermark(timestamp);
291 if let Some(ref w) = wm {
292 self.last_emitted_watermark = w.timestamp();
293 self.last_emit_time = Instant::now();
294 }
295 wm
296 }
297
298 fn is_processing_time(&self) -> bool {
299 self.inner.is_processing_time()
300 }
301}
302
303pub struct PunctuatedGenerator<F>
326where
327 F: Fn(i64) -> Option<Watermark> + Send,
328{
329 predicate: F,
330 current_watermark: i64,
331}
332
333impl<F> PunctuatedGenerator<F>
334where
335 F: Fn(i64) -> Option<Watermark> + Send,
336{
337 #[must_use]
343 pub fn new(predicate: F) -> Self {
344 Self {
345 predicate,
346 current_watermark: i64::MIN,
347 }
348 }
349}
350
351impl<F> WatermarkGenerator for PunctuatedGenerator<F>
352where
353 F: Fn(i64) -> Option<Watermark> + Send,
354{
355 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
356 if let Some(wm) = (self.predicate)(timestamp) {
357 if wm.timestamp() > self.current_watermark {
358 self.current_watermark = wm.timestamp();
359 return Some(wm);
360 }
361 }
362 None
363 }
364
365 fn on_periodic(&mut self) -> Option<Watermark> {
366 None
367 }
368
369 fn current_watermark(&self) -> i64 {
370 self.current_watermark
371 }
372
373 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
374 if timestamp > self.current_watermark {
375 self.current_watermark = timestamp;
376 Some(Watermark::new(timestamp))
377 } else {
378 None
379 }
380 }
381}
382
383#[derive(Debug)]
408pub struct WatermarkTracker {
409 source_watermarks: Vec<i64>,
411 combined_watermark: i64,
413 idle_sources: Vec<bool>,
415 last_activity: Vec<Instant>,
417 idle_timeout: Vec<Option<Duration>>,
420}
421
422impl WatermarkTracker {
423 #[must_use]
427 pub fn new(num_sources: usize) -> Self {
428 Self {
429 source_watermarks: vec![i64::MIN; num_sources],
430 combined_watermark: i64::MIN,
431 idle_sources: vec![false; num_sources],
432 last_activity: vec![Instant::now(); num_sources],
433 idle_timeout: vec![None; num_sources],
434 }
435 }
436
437 #[must_use]
439 pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
440 Self {
441 source_watermarks: vec![i64::MIN; num_sources],
442 combined_watermark: i64::MIN,
443 idle_sources: vec![false; num_sources],
444 last_activity: vec![Instant::now(); num_sources],
445 idle_timeout: vec![Some(idle_timeout); num_sources],
446 }
447 }
448
449 pub fn set_idle_timeout(&mut self, source_id: usize, timeout: Option<Duration>) {
451 if let Some(slot) = self.idle_timeout.get_mut(source_id) {
452 *slot = timeout;
453 }
454 }
455
456 pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
460 if source_id >= self.source_watermarks.len() {
461 return None;
462 }
463
464 self.idle_sources[source_id] = false;
466 self.last_activity[source_id] = Instant::now();
467
468 if watermark > self.source_watermarks[source_id] {
470 self.source_watermarks[source_id] = watermark;
471 self.update_combined()
472 } else {
473 None
474 }
475 }
476
477 pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
481 if source_id >= self.idle_sources.len() {
482 return None;
483 }
484
485 self.idle_sources[source_id] = true;
486 self.update_combined()
487 }
488
489 pub fn check_idle_sources(&mut self) -> Option<Watermark> {
493 let mut any_marked = false;
494 for i in 0..self.idle_sources.len() {
495 let Some(timeout) = self.idle_timeout[i] else {
496 continue; };
498 if !self.idle_sources[i] && self.last_activity[i].elapsed() >= timeout {
499 self.idle_sources[i] = true;
500 any_marked = true;
501 }
502 }
503 if any_marked {
504 self.update_combined()
505 } else {
506 None
507 }
508 }
509
510 #[must_use]
512 pub fn current_watermark(&self) -> Option<Watermark> {
513 if self.combined_watermark == i64::MIN {
514 None
515 } else {
516 Some(Watermark::new(self.combined_watermark))
517 }
518 }
519
520 #[must_use]
522 pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
523 self.source_watermarks.get(source_id).copied()
524 }
525
526 #[must_use]
528 pub fn is_idle(&self, source_id: usize) -> bool {
529 self.idle_sources.get(source_id).copied().unwrap_or(false)
530 }
531
532 #[must_use]
534 pub fn num_sources(&self) -> usize {
535 self.source_watermarks.len()
536 }
537
538 #[must_use]
540 pub fn active_source_count(&self) -> usize {
541 self.idle_sources.iter().filter(|&&idle| !idle).count()
542 }
543
544 fn update_combined(&mut self) -> Option<Watermark> {
546 let mut min_watermark = i64::MAX;
548 let mut has_active = false;
549
550 for (i, &wm) in self.source_watermarks.iter().enumerate() {
551 if !self.idle_sources[i] {
552 has_active = true;
553 min_watermark = min_watermark.min(wm);
554 }
555 }
556
557 if !has_active {
559 min_watermark = self
560 .source_watermarks
561 .iter()
562 .copied()
563 .max()
564 .unwrap_or(i64::MIN);
565 }
566
567 if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
568 self.combined_watermark = min_watermark;
569 Some(Watermark::new(min_watermark))
570 } else {
571 None
572 }
573 }
574}
575
576pub struct SourceProvidedGenerator {
581 source_watermark: i64,
583 fallback: BoundedOutOfOrdernessGenerator,
585 prefer_source: bool,
587}
588
589impl SourceProvidedGenerator {
590 #[must_use]
597 pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
598 Self {
599 source_watermark: i64::MIN,
600 fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
601 prefer_source,
602 }
603 }
604
605 pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
609 if watermark > self.source_watermark {
610 self.source_watermark = watermark;
611 if self.prefer_source || watermark > self.fallback.current_watermark() {
612 return Some(Watermark::new(watermark));
613 }
614 }
615 None
616 }
617}
618
619impl WatermarkGenerator for SourceProvidedGenerator {
620 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
621 let fallback_wm = self.fallback.on_event(timestamp);
622
623 if self.prefer_source {
624 if self.source_watermark > i64::MIN {
626 return None; }
628 }
629
630 fallback_wm
631 }
632
633 fn on_periodic(&mut self) -> Option<Watermark> {
634 None
635 }
636
637 fn current_watermark(&self) -> i64 {
638 if self.prefer_source && self.source_watermark > i64::MIN {
639 self.source_watermark
640 } else {
641 self.fallback.current_watermark().max(self.source_watermark)
642 }
643 }
644
645 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
646 self.on_source_watermark(timestamp)
647 }
648}
649
650pub struct ProcessingTimeGenerator {
672 current_watermark: i64,
673}
674
675impl ProcessingTimeGenerator {
676 #[must_use]
678 pub fn new() -> Self {
679 Self {
680 current_watermark: i64::MIN,
681 }
682 }
683}
684
685impl Default for ProcessingTimeGenerator {
686 fn default() -> Self {
687 Self::new()
688 }
689}
690
691impl WatermarkGenerator for ProcessingTimeGenerator {
692 #[inline]
693 fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
694 None
696 }
697
698 #[inline]
699 fn on_periodic(&mut self) -> Option<Watermark> {
700 let now = super::now_unix_millis();
701 if now > self.current_watermark {
702 self.current_watermark = now;
703 Some(Watermark::new(now))
704 } else {
705 None
706 }
707 }
708
709 #[inline]
710 fn current_watermark(&self) -> i64 {
711 self.current_watermark
712 }
713
714 #[inline]
715 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
716 if timestamp > self.current_watermark {
717 self.current_watermark = timestamp;
718 Some(Watermark::new(timestamp))
719 } else {
720 None
721 }
722 }
723
724 #[inline]
725 fn is_processing_time(&self) -> bool {
726 true
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733
734 #[test]
735 fn test_bounded_generator_first_event() {
736 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
737 let wm = gen.on_event(1000);
738 assert_eq!(wm, Some(Watermark::new(900)));
739 assert_eq!(gen.current_watermark(), 900);
740 }
741
742 #[test]
743 fn processing_time_domain_is_reported_for_late_filter_skip() {
744 assert!(ProcessingTimeGenerator::new().is_processing_time());
747 assert!(!BoundedOutOfOrdernessGenerator::new(100).is_processing_time());
748 let p = Duration::from_millis(1);
750 assert!(PeriodicGenerator::new(ProcessingTimeGenerator::new(), p).is_processing_time());
751 assert!(
752 !PeriodicGenerator::new(BoundedOutOfOrdernessGenerator::new(100), p)
753 .is_processing_time()
754 );
755 }
756
757 #[test]
758 fn test_bounded_generator_out_of_order() {
759 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
760
761 gen.on_event(1000);
763
764 let wm = gen.on_event(800);
766 assert_eq!(wm, None);
767 assert_eq!(gen.current_watermark(), 900); }
769
770 #[test]
771 fn test_bounded_generator_advancement() {
772 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
773
774 gen.on_event(1000);
775 let wm = gen.on_event(1200);
776
777 assert_eq!(wm, Some(Watermark::new(1100)));
778 }
779
780 #[test]
781 fn test_bounded_generator_from_duration() {
782 let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
783 assert_eq!(gen.max_out_of_orderness(), 5000);
784 }
785
786 #[test]
787 fn test_bounded_generator_no_periodic() {
788 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
789 assert_eq!(gen.on_periodic(), None);
790 }
791
792 #[test]
793 fn test_ascending_generator_advances_on_each_event() {
794 let mut gen = AscendingTimestampsGenerator::new();
795
796 let wm1 = gen.on_event(1000);
797 assert_eq!(wm1, Some(Watermark::new(1000)));
798
799 let wm2 = gen.on_event(2000);
800 assert_eq!(wm2, Some(Watermark::new(2000)));
801 }
802
803 #[test]
804 fn test_ascending_generator_ignores_backwards() {
805 let mut gen = AscendingTimestampsGenerator::new();
806
807 gen.on_event(2000);
808 let wm = gen.on_event(1000); assert_eq!(wm, None);
811 assert_eq!(gen.current_watermark(), 2000);
812 }
813
814 #[test]
815 fn test_periodic_generator_passes_through() {
816 let inner = BoundedOutOfOrdernessGenerator::new(100);
817 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
818
819 let wm = gen.on_event(1000);
820 assert_eq!(wm, Some(Watermark::new(900)));
821 }
822
823 #[test]
824 fn test_periodic_generator_inner_access() {
825 let inner = BoundedOutOfOrdernessGenerator::new(100);
826 let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
827
828 assert_eq!(gen.inner().max_out_of_orderness(), 100);
829 }
830
831 #[test]
832 fn test_punctuated_generator_predicate() {
833 let mut gen = PunctuatedGenerator::new(|ts| {
834 if ts % 1000 == 0 {
835 Some(Watermark::new(ts))
836 } else {
837 None
838 }
839 });
840
841 assert_eq!(gen.on_event(500), None);
842 assert_eq!(gen.on_event(999), None);
843 assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
844 assert_eq!(gen.on_event(1500), None);
845 assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
846 }
847
848 #[test]
849 fn test_punctuated_generator_no_regression() {
850 let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
851
852 gen.on_event(2000);
853 let wm = gen.on_event(1000); assert_eq!(wm, None);
856 assert_eq!(gen.current_watermark(), 2000);
857 }
858
859 #[test]
860 fn test_tracker_single_source() {
861 let mut tracker = WatermarkTracker::new(1);
862
863 let wm = tracker.update_source(0, 1000);
864 assert_eq!(wm, Some(Watermark::new(1000)));
865 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
866 }
867
868 #[test]
869 fn test_tracker_multiple_sources() {
870 let mut tracker = WatermarkTracker::new(3);
871
872 tracker.update_source(0, 1000);
874 tracker.update_source(1, 2000);
875 let wm = tracker.update_source(2, 500);
876
877 assert_eq!(wm, Some(Watermark::new(500))); }
879
880 #[test]
881 fn test_tracker_min_watermark() {
882 let mut tracker = WatermarkTracker::new(2);
883
884 tracker.update_source(0, 5000);
885 tracker.update_source(1, 3000);
886
887 assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
888
889 tracker.update_source(1, 4000);
891 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
892 }
893
894 #[test]
895 fn test_tracker_idle_source() {
896 let mut tracker = WatermarkTracker::new(2);
897
898 tracker.update_source(0, 5000);
899 tracker.update_source(1, 1000);
900
901 let wm = tracker.mark_idle(1);
903
904 assert_eq!(wm, Some(Watermark::new(5000)));
906 }
907
908 #[test]
909 fn check_idle_sources_advances_then_reactivation_is_monotone() {
910 let mut tracker = WatermarkTracker::with_idle_timeout(2, Duration::ZERO);
915 tracker.update_source(0, 5_000); tracker.update_source(1, 1_000); assert_eq!(tracker.current_watermark(), Some(Watermark::new(1_000)));
918
919 let advanced = tracker.check_idle_sources();
921 assert_eq!(advanced, Some(Watermark::new(5_000)));
922 assert_eq!(tracker.current_watermark(), Some(Watermark::new(5_000)));
923
924 let res = tracker.update_source(1, 1_500);
926 assert_eq!(res, None, "stale reactivation must not emit a regress");
927 assert_eq!(tracker.current_watermark(), Some(Watermark::new(5_000)));
928
929 tracker.update_source(0, 9_000);
931 tracker.update_source(1, 8_000);
932 assert_eq!(tracker.current_watermark(), Some(Watermark::new(8_000)));
933 }
934
935 #[test]
936 fn test_tracker_all_idle() {
937 let mut tracker = WatermarkTracker::new(2);
938
939 tracker.update_source(0, 5000);
940 tracker.update_source(1, 3000);
941
942 tracker.mark_idle(0);
943 let wm = tracker.mark_idle(1);
944
945 assert_eq!(wm, Some(Watermark::new(5000)));
947 }
948
949 #[test]
950 fn test_tracker_source_watermark() {
951 let mut tracker = WatermarkTracker::new(2);
952
953 tracker.update_source(0, 1000);
954 tracker.update_source(1, 2000);
955
956 assert_eq!(tracker.source_watermark(0), Some(1000));
957 assert_eq!(tracker.source_watermark(1), Some(2000));
958 assert_eq!(tracker.source_watermark(5), None); }
960
961 #[test]
962 fn test_tracker_active_source_count() {
963 let mut tracker = WatermarkTracker::new(3);
964
965 assert_eq!(tracker.active_source_count(), 3);
966
967 tracker.mark_idle(0);
968 assert_eq!(tracker.active_source_count(), 2);
969
970 tracker.mark_idle(2);
971 assert_eq!(tracker.active_source_count(), 1);
972
973 tracker.update_source(0, 1000);
975 assert_eq!(tracker.active_source_count(), 2);
976 }
977
978 #[test]
979 fn test_tracker_invalid_source() {
980 let mut tracker = WatermarkTracker::new(2);
981
982 let wm = tracker.update_source(5, 1000); assert_eq!(wm, None);
984
985 let wm = tracker.mark_idle(5);
986 assert_eq!(wm, None);
987 }
988
989 #[test]
990 fn test_source_provided_fallback() {
991 let mut gen = SourceProvidedGenerator::new(100, false);
992
993 let wm = gen.on_event(1000);
994 assert_eq!(wm, Some(Watermark::new(900))); }
996
997 #[test]
998 fn test_source_provided_explicit_watermark() {
999 let mut gen = SourceProvidedGenerator::new(100, true);
1000
1001 let wm = gen.on_source_watermark(500);
1002 assert_eq!(wm, Some(Watermark::new(500)));
1003 assert_eq!(gen.current_watermark(), 500);
1004 }
1005
1006 #[test]
1009 fn test_advance_watermark_bounded_generator() {
1010 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1011
1012 let wm = gen.advance_watermark(500);
1014 assert_eq!(wm, Some(Watermark::new(500)));
1015 assert_eq!(gen.current_watermark(), 500);
1016
1017 let wm = gen.advance_watermark(800);
1019 assert_eq!(wm, Some(Watermark::new(800)));
1020 assert_eq!(gen.current_watermark(), 800);
1021
1022 let wm = gen.advance_watermark(600);
1024 assert_eq!(wm, None);
1025 assert_eq!(gen.current_watermark(), 800);
1026 }
1027
1028 #[test]
1029 fn test_advance_watermark_maintains_invariant() {
1030 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1031
1032 gen.on_event(1000); gen.advance_watermark(1200);
1037 assert_eq!(gen.current_watermark(), 1200);
1038
1039 let wm = gen.on_event(1250);
1042 assert_eq!(wm, None);
1043 assert_eq!(gen.current_watermark(), 1200);
1044
1045 let wm = gen.on_event(1400);
1047 assert_eq!(wm, Some(Watermark::new(1300)));
1048 }
1049
1050 #[test]
1051 fn test_advance_watermark_ascending_generator() {
1052 let mut gen = AscendingTimestampsGenerator::new();
1053
1054 let wm = gen.advance_watermark(500);
1055 assert_eq!(wm, Some(Watermark::new(500)));
1056 assert_eq!(gen.current_watermark(), 500);
1057
1058 let wm = gen.advance_watermark(300);
1060 assert_eq!(wm, None);
1061 assert_eq!(gen.current_watermark(), 500);
1062
1063 let wm = gen.advance_watermark(1000);
1065 assert_eq!(wm, Some(Watermark::new(1000)));
1066 }
1067
1068 #[test]
1069 fn test_advance_watermark_periodic_generator() {
1070 let inner = BoundedOutOfOrdernessGenerator::new(100);
1071 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
1072
1073 let wm = gen.advance_watermark(500);
1074 assert_eq!(wm, Some(Watermark::new(500)));
1075 assert_eq!(gen.current_watermark(), 500);
1076
1077 let wm = gen.advance_watermark(300);
1079 assert_eq!(wm, None);
1080 }
1081
1082 #[test]
1083 fn test_advance_watermark_punctuated_generator() {
1084 let mut gen = PunctuatedGenerator::new(|ts| {
1085 if ts % 1000 == 0 {
1086 Some(Watermark::new(ts))
1087 } else {
1088 None
1089 }
1090 });
1091
1092 let wm = gen.advance_watermark(500);
1094 assert_eq!(wm, Some(Watermark::new(500)));
1095 assert_eq!(gen.current_watermark(), 500);
1096
1097 let wm = gen.advance_watermark(200);
1099 assert_eq!(wm, None);
1100 }
1101
1102 #[test]
1103 fn test_advance_watermark_source_provided_generator() {
1104 let mut gen = SourceProvidedGenerator::new(100, true);
1105
1106 let wm = gen.advance_watermark(500);
1107 assert_eq!(wm, Some(Watermark::new(500)));
1108 assert_eq!(gen.current_watermark(), 500);
1109
1110 let wm = gen.advance_watermark(300);
1112 assert_eq!(wm, None);
1113 }
1114
1115 #[test]
1118 fn test_processing_time_generator_ignores_events() {
1119 let mut gen = ProcessingTimeGenerator::new();
1120 assert_eq!(gen.on_event(1000), None);
1121 assert_eq!(gen.on_event(2000), None);
1122 assert_eq!(gen.current_watermark(), i64::MIN);
1123 }
1124
1125 #[test]
1126 fn test_processing_time_generator_periodic() {
1127 let mut gen = ProcessingTimeGenerator::new();
1128 let wm = gen.on_periodic();
1129 assert!(wm.is_some());
1130 let ts = wm.unwrap().timestamp();
1131 assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
1133 }
1134
1135 #[test]
1136 fn test_processing_time_generator_advance_watermark() {
1137 let mut gen = ProcessingTimeGenerator::new();
1138
1139 let wm = gen.advance_watermark(500);
1140 assert_eq!(wm, Some(Watermark::new(500)));
1141 assert_eq!(gen.current_watermark(), 500);
1142
1143 let wm = gen.advance_watermark(300);
1145 assert_eq!(wm, None);
1146 assert_eq!(gen.current_watermark(), 500);
1147
1148 let wm = gen.advance_watermark(1000);
1150 assert_eq!(wm, Some(Watermark::new(1000)));
1151 }
1152
1153 #[test]
1154 fn test_processing_time_generator_default() {
1155 let gen = ProcessingTimeGenerator::default();
1156 assert_eq!(gen.current_watermark(), i64::MIN);
1157 }
1158
1159 #[test]
1162 fn future_skew_event_does_not_advance_watermark() {
1163 let mut gen = BoundedOutOfOrdernessGenerator::new(0);
1164 let now = crate::time::now_unix_millis();
1165 assert_eq!(gen.on_event(now + 2 * 60 * 60 * 1000), None);
1167 assert_eq!(gen.current_watermark(), i64::MIN);
1168 assert_eq!(gen.on_event(now), Some(Watermark::new(now)));
1170 }
1171}