1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use super::Watermark;
6
7pub trait WatermarkGenerator: Send {
13 fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
18
19 fn on_periodic(&mut self) -> Option<Watermark>;
23
24 fn current_watermark(&self) -> i64;
26
27 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
33}
34
35pub struct BoundedOutOfOrdernessGenerator {
57 max_out_of_orderness: i64,
58 current_max_timestamp: i64,
59 current_watermark: i64,
60}
61
62impl BoundedOutOfOrdernessGenerator {
63 #[must_use]
69 pub fn new(max_out_of_orderness: i64) -> Self {
70 Self {
71 max_out_of_orderness,
72 current_max_timestamp: i64::MIN,
73 current_watermark: i64::MIN,
74 }
75 }
76
77 #[must_use]
79 #[allow(clippy::cast_possible_truncation)] pub fn from_duration(max_out_of_orderness: Duration) -> Self {
81 Self::new(max_out_of_orderness.as_millis() as i64)
82 }
83
84 #[must_use]
86 pub fn max_out_of_orderness(&self) -> i64 {
87 self.max_out_of_orderness
88 }
89}
90
91impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
92 #[inline]
93 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
94 if timestamp > self.current_max_timestamp {
95 self.current_max_timestamp = timestamp;
96 let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
97 if new_watermark > self.current_watermark {
98 self.current_watermark = new_watermark;
99 return Some(Watermark::new(new_watermark));
100 }
101 }
102 None
103 }
104
105 #[inline]
106 fn on_periodic(&mut self) -> Option<Watermark> {
107 None
109 }
110
111 #[inline]
112 fn current_watermark(&self) -> i64 {
113 self.current_watermark
114 }
115
116 #[inline]
117 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
118 if timestamp > self.current_watermark {
119 self.current_watermark = timestamp;
120 let min_max = timestamp.saturating_add(self.max_out_of_orderness);
122 if min_max > self.current_max_timestamp {
123 self.current_max_timestamp = min_max;
124 }
125 Some(Watermark::new(timestamp))
126 } else {
127 None
128 }
129 }
130}
131
132#[derive(Debug, Default)]
147pub struct AscendingTimestampsGenerator {
148 current_watermark: i64,
149}
150
151impl AscendingTimestampsGenerator {
152 #[must_use]
154 pub fn new() -> Self {
155 Self {
156 current_watermark: i64::MIN,
157 }
158 }
159}
160
161impl WatermarkGenerator for AscendingTimestampsGenerator {
162 #[inline]
163 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
164 if timestamp > self.current_watermark {
165 self.current_watermark = timestamp;
166 Some(Watermark::new(timestamp))
167 } else {
168 None
169 }
170 }
171
172 #[inline]
173 fn on_periodic(&mut self) -> Option<Watermark> {
174 None
175 }
176
177 #[inline]
178 fn current_watermark(&self) -> i64 {
179 self.current_watermark
180 }
181
182 #[inline]
183 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
184 if timestamp > self.current_watermark {
185 self.current_watermark = timestamp;
186 Some(Watermark::new(timestamp))
187 } else {
188 None
189 }
190 }
191}
192
193pub struct PeriodicGenerator<G: WatermarkGenerator> {
218 inner: G,
219 period: Duration,
220 last_emit_time: Instant,
221 last_emitted_watermark: i64,
222}
223
224impl<G: WatermarkGenerator> PeriodicGenerator<G> {
225 #[must_use]
232 pub fn new(inner: G, period: Duration) -> Self {
233 Self {
234 inner,
235 period,
236 last_emit_time: Instant::now(),
237 last_emitted_watermark: i64::MIN,
238 }
239 }
240
241 #[must_use]
243 pub fn inner(&self) -> &G {
244 &self.inner
245 }
246
247 pub fn inner_mut(&mut self) -> &mut G {
249 &mut self.inner
250 }
251}
252
253impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
254 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
255 let wm = self.inner.on_event(timestamp);
256 if let Some(ref w) = wm {
257 self.last_emitted_watermark = w.timestamp();
258 self.last_emit_time = Instant::now();
259 }
260 wm
261 }
262
263 fn on_periodic(&mut self) -> Option<Watermark> {
264 if self.last_emit_time.elapsed() >= self.period {
266 let current = self.inner.current_watermark();
267 if current > self.last_emitted_watermark {
268 self.last_emitted_watermark = current;
269 self.last_emit_time = Instant::now();
270 return Some(Watermark::new(current));
271 }
272 self.last_emit_time = Instant::now();
273 }
274 None
275 }
276
277 fn current_watermark(&self) -> i64 {
278 self.inner.current_watermark()
279 }
280
281 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
282 let wm = self.inner.advance_watermark(timestamp);
283 if let Some(ref w) = wm {
284 self.last_emitted_watermark = w.timestamp();
285 self.last_emit_time = Instant::now();
286 }
287 wm
288 }
289}
290
291pub struct PunctuatedGenerator<F>
314where
315 F: Fn(i64) -> Option<Watermark> + Send,
316{
317 predicate: F,
318 current_watermark: i64,
319}
320
321impl<F> PunctuatedGenerator<F>
322where
323 F: Fn(i64) -> Option<Watermark> + Send,
324{
325 #[must_use]
331 pub fn new(predicate: F) -> Self {
332 Self {
333 predicate,
334 current_watermark: i64::MIN,
335 }
336 }
337}
338
339impl<F> WatermarkGenerator for PunctuatedGenerator<F>
340where
341 F: Fn(i64) -> Option<Watermark> + Send,
342{
343 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
344 if let Some(wm) = (self.predicate)(timestamp) {
345 if wm.timestamp() > self.current_watermark {
346 self.current_watermark = wm.timestamp();
347 return Some(wm);
348 }
349 }
350 None
351 }
352
353 fn on_periodic(&mut self) -> Option<Watermark> {
354 None
355 }
356
357 fn current_watermark(&self) -> i64 {
358 self.current_watermark
359 }
360
361 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
362 if timestamp > self.current_watermark {
363 self.current_watermark = timestamp;
364 Some(Watermark::new(timestamp))
365 } else {
366 None
367 }
368 }
369}
370
371#[derive(Debug)]
396pub struct WatermarkTracker {
397 source_watermarks: Vec<i64>,
399 combined_watermark: i64,
401 idle_sources: Vec<bool>,
403 last_activity: Vec<Instant>,
405 idle_timeout: Duration,
407}
408
409impl WatermarkTracker {
410 #[must_use]
412 pub fn new(num_sources: usize) -> Self {
413 Self {
414 source_watermarks: vec![i64::MIN; num_sources],
415 combined_watermark: i64::MIN,
416 idle_sources: vec![false; num_sources],
417 last_activity: vec![Instant::now(); num_sources],
418 idle_timeout: Duration::from_secs(30), }
420 }
421
422 #[must_use]
424 pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
425 Self {
426 source_watermarks: vec![i64::MIN; num_sources],
427 combined_watermark: i64::MIN,
428 idle_sources: vec![false; num_sources],
429 last_activity: vec![Instant::now(); num_sources],
430 idle_timeout,
431 }
432 }
433
434 pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
438 if source_id >= self.source_watermarks.len() {
439 return None;
440 }
441
442 self.idle_sources[source_id] = false;
444 self.last_activity[source_id] = Instant::now();
445
446 if watermark > self.source_watermarks[source_id] {
448 self.source_watermarks[source_id] = watermark;
449 self.update_combined()
450 } else {
451 None
452 }
453 }
454
455 pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
459 if source_id >= self.idle_sources.len() {
460 return None;
461 }
462
463 self.idle_sources[source_id] = true;
464 self.update_combined()
465 }
466
467 pub fn check_idle_sources(&mut self) -> Option<Watermark> {
471 let mut any_marked = false;
472 for i in 0..self.idle_sources.len() {
473 if !self.idle_sources[i] && self.last_activity[i].elapsed() >= self.idle_timeout {
474 self.idle_sources[i] = true;
475 any_marked = true;
476 }
477 }
478 if any_marked {
479 self.update_combined()
480 } else {
481 None
482 }
483 }
484
485 #[must_use]
487 pub fn current_watermark(&self) -> Option<Watermark> {
488 if self.combined_watermark == i64::MIN {
489 None
490 } else {
491 Some(Watermark::new(self.combined_watermark))
492 }
493 }
494
495 #[must_use]
497 pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
498 self.source_watermarks.get(source_id).copied()
499 }
500
501 #[must_use]
503 pub fn is_idle(&self, source_id: usize) -> bool {
504 self.idle_sources.get(source_id).copied().unwrap_or(false)
505 }
506
507 #[must_use]
509 pub fn num_sources(&self) -> usize {
510 self.source_watermarks.len()
511 }
512
513 #[must_use]
515 pub fn active_source_count(&self) -> usize {
516 self.idle_sources.iter().filter(|&&idle| !idle).count()
517 }
518
519 fn update_combined(&mut self) -> Option<Watermark> {
521 let mut min_watermark = i64::MAX;
523 let mut has_active = false;
524
525 for (i, &wm) in self.source_watermarks.iter().enumerate() {
526 if !self.idle_sources[i] {
527 has_active = true;
528 min_watermark = min_watermark.min(wm);
529 }
530 }
531
532 if !has_active {
534 min_watermark = self
535 .source_watermarks
536 .iter()
537 .copied()
538 .max()
539 .unwrap_or(i64::MIN);
540 }
541
542 if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
543 self.combined_watermark = min_watermark;
544 Some(Watermark::new(min_watermark))
545 } else {
546 None
547 }
548 }
549}
550
551pub struct SourceProvidedGenerator {
556 source_watermark: i64,
558 fallback: BoundedOutOfOrdernessGenerator,
560 prefer_source: bool,
562}
563
564impl SourceProvidedGenerator {
565 #[must_use]
572 pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
573 Self {
574 source_watermark: i64::MIN,
575 fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
576 prefer_source,
577 }
578 }
579
580 pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
584 if watermark > self.source_watermark {
585 self.source_watermark = watermark;
586 if self.prefer_source || watermark > self.fallback.current_watermark() {
587 return Some(Watermark::new(watermark));
588 }
589 }
590 None
591 }
592}
593
594impl WatermarkGenerator for SourceProvidedGenerator {
595 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
596 let fallback_wm = self.fallback.on_event(timestamp);
597
598 if self.prefer_source {
599 if self.source_watermark > i64::MIN {
601 return None; }
603 }
604
605 fallback_wm
606 }
607
608 fn on_periodic(&mut self) -> Option<Watermark> {
609 None
610 }
611
612 fn current_watermark(&self) -> i64 {
613 if self.prefer_source && self.source_watermark > i64::MIN {
614 self.source_watermark
615 } else {
616 self.fallback.current_watermark().max(self.source_watermark)
617 }
618 }
619
620 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
621 self.on_source_watermark(timestamp)
622 }
623}
624
625pub struct ProcessingTimeGenerator {
647 current_watermark: i64,
648}
649
650impl ProcessingTimeGenerator {
651 #[must_use]
653 pub fn new() -> Self {
654 Self {
655 current_watermark: i64::MIN,
656 }
657 }
658
659 #[allow(clippy::cast_possible_truncation)]
661 fn now_millis() -> i64 {
662 SystemTime::now()
663 .duration_since(UNIX_EPOCH)
664 .unwrap_or(Duration::ZERO)
665 .as_millis() as i64
666 }
667}
668
669impl Default for ProcessingTimeGenerator {
670 fn default() -> Self {
671 Self::new()
672 }
673}
674
675impl WatermarkGenerator for ProcessingTimeGenerator {
676 #[inline]
677 fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
678 None
680 }
681
682 #[inline]
683 fn on_periodic(&mut self) -> Option<Watermark> {
684 let now = Self::now_millis();
685 if now > self.current_watermark {
686 self.current_watermark = now;
687 Some(Watermark::new(now))
688 } else {
689 None
690 }
691 }
692
693 #[inline]
694 fn current_watermark(&self) -> i64 {
695 self.current_watermark
696 }
697
698 #[inline]
699 fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
700 if timestamp > self.current_watermark {
701 self.current_watermark = timestamp;
702 Some(Watermark::new(timestamp))
703 } else {
704 None
705 }
706 }
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712
713 #[test]
714 fn test_bounded_generator_first_event() {
715 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
716 let wm = gen.on_event(1000);
717 assert_eq!(wm, Some(Watermark::new(900)));
718 assert_eq!(gen.current_watermark(), 900);
719 }
720
721 #[test]
722 fn test_bounded_generator_out_of_order() {
723 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
724
725 gen.on_event(1000);
727
728 let wm = gen.on_event(800);
730 assert_eq!(wm, None);
731 assert_eq!(gen.current_watermark(), 900); }
733
734 #[test]
735 fn test_bounded_generator_advancement() {
736 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
737
738 gen.on_event(1000);
739 let wm = gen.on_event(1200);
740
741 assert_eq!(wm, Some(Watermark::new(1100)));
742 }
743
744 #[test]
745 fn test_bounded_generator_from_duration() {
746 let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
747 assert_eq!(gen.max_out_of_orderness(), 5000);
748 }
749
750 #[test]
751 fn test_bounded_generator_no_periodic() {
752 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
753 assert_eq!(gen.on_periodic(), None);
754 }
755
756 #[test]
757 fn test_ascending_generator_advances_on_each_event() {
758 let mut gen = AscendingTimestampsGenerator::new();
759
760 let wm1 = gen.on_event(1000);
761 assert_eq!(wm1, Some(Watermark::new(1000)));
762
763 let wm2 = gen.on_event(2000);
764 assert_eq!(wm2, Some(Watermark::new(2000)));
765 }
766
767 #[test]
768 fn test_ascending_generator_ignores_backwards() {
769 let mut gen = AscendingTimestampsGenerator::new();
770
771 gen.on_event(2000);
772 let wm = gen.on_event(1000); assert_eq!(wm, None);
775 assert_eq!(gen.current_watermark(), 2000);
776 }
777
778 #[test]
779 fn test_periodic_generator_passes_through() {
780 let inner = BoundedOutOfOrdernessGenerator::new(100);
781 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
782
783 let wm = gen.on_event(1000);
784 assert_eq!(wm, Some(Watermark::new(900)));
785 }
786
787 #[test]
788 fn test_periodic_generator_inner_access() {
789 let inner = BoundedOutOfOrdernessGenerator::new(100);
790 let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
791
792 assert_eq!(gen.inner().max_out_of_orderness(), 100);
793 }
794
795 #[test]
796 fn test_punctuated_generator_predicate() {
797 let mut gen = PunctuatedGenerator::new(|ts| {
798 if ts % 1000 == 0 {
799 Some(Watermark::new(ts))
800 } else {
801 None
802 }
803 });
804
805 assert_eq!(gen.on_event(500), None);
806 assert_eq!(gen.on_event(999), None);
807 assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
808 assert_eq!(gen.on_event(1500), None);
809 assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
810 }
811
812 #[test]
813 fn test_punctuated_generator_no_regression() {
814 let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
815
816 gen.on_event(2000);
817 let wm = gen.on_event(1000); assert_eq!(wm, None);
820 assert_eq!(gen.current_watermark(), 2000);
821 }
822
823 #[test]
824 fn test_tracker_single_source() {
825 let mut tracker = WatermarkTracker::new(1);
826
827 let wm = tracker.update_source(0, 1000);
828 assert_eq!(wm, Some(Watermark::new(1000)));
829 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
830 }
831
832 #[test]
833 fn test_tracker_multiple_sources() {
834 let mut tracker = WatermarkTracker::new(3);
835
836 tracker.update_source(0, 1000);
838 tracker.update_source(1, 2000);
839 let wm = tracker.update_source(2, 500);
840
841 assert_eq!(wm, Some(Watermark::new(500))); }
843
844 #[test]
845 fn test_tracker_min_watermark() {
846 let mut tracker = WatermarkTracker::new(2);
847
848 tracker.update_source(0, 5000);
849 tracker.update_source(1, 3000);
850
851 assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
852
853 tracker.update_source(1, 4000);
855 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
856 }
857
858 #[test]
859 fn test_tracker_idle_source() {
860 let mut tracker = WatermarkTracker::new(2);
861
862 tracker.update_source(0, 5000);
863 tracker.update_source(1, 1000);
864
865 let wm = tracker.mark_idle(1);
867
868 assert_eq!(wm, Some(Watermark::new(5000)));
870 }
871
872 #[test]
873 fn test_tracker_all_idle() {
874 let mut tracker = WatermarkTracker::new(2);
875
876 tracker.update_source(0, 5000);
877 tracker.update_source(1, 3000);
878
879 tracker.mark_idle(0);
880 let wm = tracker.mark_idle(1);
881
882 assert_eq!(wm, Some(Watermark::new(5000)));
884 }
885
886 #[test]
887 fn test_tracker_source_watermark() {
888 let mut tracker = WatermarkTracker::new(2);
889
890 tracker.update_source(0, 1000);
891 tracker.update_source(1, 2000);
892
893 assert_eq!(tracker.source_watermark(0), Some(1000));
894 assert_eq!(tracker.source_watermark(1), Some(2000));
895 assert_eq!(tracker.source_watermark(5), None); }
897
898 #[test]
899 fn test_tracker_active_source_count() {
900 let mut tracker = WatermarkTracker::new(3);
901
902 assert_eq!(tracker.active_source_count(), 3);
903
904 tracker.mark_idle(0);
905 assert_eq!(tracker.active_source_count(), 2);
906
907 tracker.mark_idle(2);
908 assert_eq!(tracker.active_source_count(), 1);
909
910 tracker.update_source(0, 1000);
912 assert_eq!(tracker.active_source_count(), 2);
913 }
914
915 #[test]
916 fn test_tracker_invalid_source() {
917 let mut tracker = WatermarkTracker::new(2);
918
919 let wm = tracker.update_source(5, 1000); assert_eq!(wm, None);
921
922 let wm = tracker.mark_idle(5);
923 assert_eq!(wm, None);
924 }
925
926 #[test]
927 fn test_source_provided_fallback() {
928 let mut gen = SourceProvidedGenerator::new(100, false);
929
930 let wm = gen.on_event(1000);
931 assert_eq!(wm, Some(Watermark::new(900))); }
933
934 #[test]
935 fn test_source_provided_explicit_watermark() {
936 let mut gen = SourceProvidedGenerator::new(100, true);
937
938 let wm = gen.on_source_watermark(500);
939 assert_eq!(wm, Some(Watermark::new(500)));
940 assert_eq!(gen.current_watermark(), 500);
941 }
942
943 #[test]
946 fn test_advance_watermark_bounded_generator() {
947 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
948
949 let wm = gen.advance_watermark(500);
951 assert_eq!(wm, Some(Watermark::new(500)));
952 assert_eq!(gen.current_watermark(), 500);
953
954 let wm = gen.advance_watermark(800);
956 assert_eq!(wm, Some(Watermark::new(800)));
957 assert_eq!(gen.current_watermark(), 800);
958
959 let wm = gen.advance_watermark(600);
961 assert_eq!(wm, None);
962 assert_eq!(gen.current_watermark(), 800);
963 }
964
965 #[test]
966 fn test_advance_watermark_maintains_invariant() {
967 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
968
969 gen.on_event(1000); gen.advance_watermark(1200);
974 assert_eq!(gen.current_watermark(), 1200);
975
976 let wm = gen.on_event(1250);
979 assert_eq!(wm, None);
980 assert_eq!(gen.current_watermark(), 1200);
981
982 let wm = gen.on_event(1400);
984 assert_eq!(wm, Some(Watermark::new(1300)));
985 }
986
987 #[test]
988 fn test_advance_watermark_ascending_generator() {
989 let mut gen = AscendingTimestampsGenerator::new();
990
991 let wm = gen.advance_watermark(500);
992 assert_eq!(wm, Some(Watermark::new(500)));
993 assert_eq!(gen.current_watermark(), 500);
994
995 let wm = gen.advance_watermark(300);
997 assert_eq!(wm, None);
998 assert_eq!(gen.current_watermark(), 500);
999
1000 let wm = gen.advance_watermark(1000);
1002 assert_eq!(wm, Some(Watermark::new(1000)));
1003 }
1004
1005 #[test]
1006 fn test_advance_watermark_periodic_generator() {
1007 let inner = BoundedOutOfOrdernessGenerator::new(100);
1008 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
1009
1010 let wm = gen.advance_watermark(500);
1011 assert_eq!(wm, Some(Watermark::new(500)));
1012 assert_eq!(gen.current_watermark(), 500);
1013
1014 let wm = gen.advance_watermark(300);
1016 assert_eq!(wm, None);
1017 }
1018
1019 #[test]
1020 fn test_advance_watermark_punctuated_generator() {
1021 let mut gen = PunctuatedGenerator::new(|ts| {
1022 if ts % 1000 == 0 {
1023 Some(Watermark::new(ts))
1024 } else {
1025 None
1026 }
1027 });
1028
1029 let wm = gen.advance_watermark(500);
1031 assert_eq!(wm, Some(Watermark::new(500)));
1032 assert_eq!(gen.current_watermark(), 500);
1033
1034 let wm = gen.advance_watermark(200);
1036 assert_eq!(wm, None);
1037 }
1038
1039 #[test]
1040 fn test_advance_watermark_source_provided_generator() {
1041 let mut gen = SourceProvidedGenerator::new(100, true);
1042
1043 let wm = gen.advance_watermark(500);
1044 assert_eq!(wm, Some(Watermark::new(500)));
1045 assert_eq!(gen.current_watermark(), 500);
1046
1047 let wm = gen.advance_watermark(300);
1049 assert_eq!(wm, None);
1050 }
1051
1052 #[test]
1055 fn test_processing_time_generator_ignores_events() {
1056 let mut gen = ProcessingTimeGenerator::new();
1057 assert_eq!(gen.on_event(1000), None);
1058 assert_eq!(gen.on_event(2000), None);
1059 assert_eq!(gen.current_watermark(), i64::MIN);
1060 }
1061
1062 #[test]
1063 fn test_processing_time_generator_periodic() {
1064 let mut gen = ProcessingTimeGenerator::new();
1065 let wm = gen.on_periodic();
1066 assert!(wm.is_some());
1067 let ts = wm.unwrap().timestamp();
1068 assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
1070 }
1071
1072 #[test]
1073 fn test_processing_time_generator_advance_watermark() {
1074 let mut gen = ProcessingTimeGenerator::new();
1075
1076 let wm = gen.advance_watermark(500);
1077 assert_eq!(wm, Some(Watermark::new(500)));
1078 assert_eq!(gen.current_watermark(), 500);
1079
1080 let wm = gen.advance_watermark(300);
1082 assert_eq!(wm, None);
1083 assert_eq!(gen.current_watermark(), 500);
1084
1085 let wm = gen.advance_watermark(1000);
1087 assert_eq!(wm, Some(Watermark::new(1000)));
1088 }
1089
1090 #[test]
1091 fn test_processing_time_generator_default() {
1092 let gen = ProcessingTimeGenerator::default();
1093 assert_eq!(gen.current_watermark(), i64::MIN);
1094 }
1095}