Skip to main content

laminar_core/time/
watermark.rs

1//! Watermark generators and multi-source tracking.
2
3use std::time::{Duration, Instant};
4
5use super::Watermark;
6
7/// Trait for generating watermarks from event timestamps.
8///
9/// Implementations track observed timestamps and produce watermarks that indicate
10/// event-time progress. The watermark is an assertion that no events with timestamps
11/// earlier than the watermark are expected.
12pub trait WatermarkGenerator: Send {
13    /// Process an event timestamp and potentially emit a new watermark.
14    ///
15    /// Called for each event processed. Returns `Some(watermark)` if the watermark
16    /// should advance, `None` otherwise.
17    fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
18
19    /// Called periodically to emit watermarks based on wall-clock time.
20    ///
21    /// Useful for generating watermarks even when no events are arriving.
22    fn on_periodic(&mut self) -> Option<Watermark>;
23
24    /// Returns the current watermark value without advancing it.
25    fn current_watermark(&self) -> i64;
26
27    /// Advances the watermark to at least the given timestamp from an external source.
28    ///
29    /// Called when the source provides an explicit watermark (e.g., `Source::watermark()`).
30    /// Returns `Some(Watermark)` if the watermark advanced, `None` if the timestamp
31    /// was not higher than the current watermark.
32    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
33
34    /// Whether the watermark is processing-time based (wall clock), rather than
35    /// derived from the event-time column. Such a watermark lives in a different
36    /// time domain than the event timestamps, so comparing the two to drop "late"
37    /// rows would discard every event — callers skip source-side late-filtering
38    /// when this is `true`. Defaults to `false` (event-time generators).
39    fn is_processing_time(&self) -> bool {
40        false
41    }
42}
43
44/// Default `max_future_skew_ms`: 5 min.
45pub const DEFAULT_MAX_FUTURE_SKEW_MS: i64 = 5 * 60 * 1000;
46
47/// `true` if `timestamp` is more than `skew_ms` beyond wall-clock now.
48/// `skew_ms <= 0` or an unset system clock disables the guard.
49#[inline]
50fn is_grossly_future(timestamp: i64, skew_ms: i64) -> bool {
51    if skew_ms <= 0 {
52        return false;
53    }
54    let now = super::now_unix_millis();
55    now > 0 && timestamp > now.saturating_add(skew_ms)
56}
57
58/// Watermark = `max_timestamp_seen - max_out_of_orderness`. `on_event`
59/// ignores timestamps far beyond wall clock for advancement.
60pub struct BoundedOutOfOrdernessGenerator {
61    max_out_of_orderness: i64,
62    current_max_timestamp: i64,
63    current_watermark: i64,
64    /// `0` (or negative) disables the guard (unbounded — legacy behaviour).
65    max_future_skew_ms: i64,
66}
67
68impl BoundedOutOfOrdernessGenerator {
69    /// Creates a new generator with the specified maximum out-of-orderness.
70    ///
71    /// # Arguments
72    ///
73    /// * `max_out_of_orderness` - Maximum allowed lateness in milliseconds
74    #[must_use]
75    pub fn new(max_out_of_orderness: i64) -> Self {
76        Self {
77            max_out_of_orderness,
78            current_max_timestamp: i64::MIN,
79            current_watermark: i64::MIN,
80            max_future_skew_ms: DEFAULT_MAX_FUTURE_SKEW_MS,
81        }
82    }
83
84    /// Sets the future-skew ceiling; `<= 0` disables the guard.
85    #[must_use]
86    pub fn with_max_future_skew(mut self, skew_ms: i64) -> Self {
87        self.max_future_skew_ms = skew_ms;
88        self
89    }
90
91    /// Creates a new generator from a `Duration`.
92    #[must_use]
93    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
94    pub fn from_duration(max_out_of_orderness: Duration) -> Self {
95        Self::new(max_out_of_orderness.as_millis() as i64)
96    }
97
98    /// Returns the maximum out-of-orderness in milliseconds.
99    #[must_use]
100    pub fn max_out_of_orderness(&self) -> i64 {
101        self.max_out_of_orderness
102    }
103}
104
105impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
106    #[inline]
107    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
108        if is_grossly_future(timestamp, self.max_future_skew_ms) {
109            return None;
110        }
111        if timestamp > self.current_max_timestamp {
112            self.current_max_timestamp = timestamp;
113            let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
114            if new_watermark > self.current_watermark {
115                self.current_watermark = new_watermark;
116                return Some(Watermark::new(new_watermark));
117            }
118        }
119        None
120    }
121
122    #[inline]
123    fn on_periodic(&mut self) -> Option<Watermark> {
124        // Bounded out-of-orderness doesn't emit periodic watermarks
125        None
126    }
127
128    #[inline]
129    fn current_watermark(&self) -> i64 {
130        self.current_watermark
131    }
132
133    #[inline]
134    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
135        if is_grossly_future(timestamp, self.max_future_skew_ms) {
136            return None;
137        }
138        if timestamp > self.current_watermark {
139            self.current_watermark = timestamp;
140            // Maintain invariant: current_max_timestamp >= current_watermark + max_out_of_orderness
141            let min_max = timestamp.saturating_add(self.max_out_of_orderness);
142            if min_max > self.current_max_timestamp {
143                self.current_max_timestamp = min_max;
144            }
145            Some(Watermark::new(timestamp))
146        } else {
147            None
148        }
149    }
150}
151
152/// Watermark generator for strictly ascending timestamps; the watermark
153/// equals the current timestamp.
154#[derive(Debug)]
155pub struct AscendingTimestampsGenerator {
156    current_watermark: i64,
157    /// `0` ⇒ disabled.
158    max_future_skew_ms: i64,
159}
160
161impl Default for AscendingTimestampsGenerator {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167impl AscendingTimestampsGenerator {
168    /// Creates a new ascending timestamps generator.
169    #[must_use]
170    pub fn new() -> Self {
171        Self {
172            current_watermark: i64::MIN,
173            max_future_skew_ms: DEFAULT_MAX_FUTURE_SKEW_MS,
174        }
175    }
176
177    /// Override the future-skew ceiling (`0` disables).
178    #[must_use]
179    pub fn with_max_future_skew(mut self, skew_ms: i64) -> Self {
180        self.max_future_skew_ms = skew_ms;
181        self
182    }
183}
184
185impl WatermarkGenerator for AscendingTimestampsGenerator {
186    #[inline]
187    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
188        if is_grossly_future(timestamp, self.max_future_skew_ms) {
189            return None;
190        }
191        if timestamp > self.current_watermark {
192            self.current_watermark = timestamp;
193            Some(Watermark::new(timestamp))
194        } else {
195            None
196        }
197    }
198
199    #[inline]
200    fn on_periodic(&mut self) -> Option<Watermark> {
201        None
202    }
203
204    #[inline]
205    fn current_watermark(&self) -> i64 {
206        self.current_watermark
207    }
208
209    #[inline]
210    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
211        if is_grossly_future(timestamp, self.max_future_skew_ms) {
212            return None;
213        }
214        if timestamp > self.current_watermark {
215            self.current_watermark = timestamp;
216            Some(Watermark::new(timestamp))
217        } else {
218            None
219        }
220    }
221}
222
223/// Wraps another generator and emits watermarks at fixed wall-clock
224/// intervals so idle sources don't stall time-based windows.
225pub struct PeriodicGenerator<G: WatermarkGenerator> {
226    inner: G,
227    period: Duration,
228    last_emit_time: Instant,
229    last_emitted_watermark: i64,
230}
231
232impl<G: WatermarkGenerator> PeriodicGenerator<G> {
233    /// Creates a new periodic generator wrapping another generator.
234    ///
235    /// # Arguments
236    ///
237    /// * `inner` - The underlying watermark generator
238    /// * `period` - How often to emit watermarks (wall-clock time)
239    #[must_use]
240    pub fn new(inner: G, period: Duration) -> Self {
241        Self {
242            inner,
243            period,
244            last_emit_time: Instant::now(),
245            last_emitted_watermark: i64::MIN,
246        }
247    }
248
249    /// Returns a reference to the inner generator.
250    #[must_use]
251    pub fn inner(&self) -> &G {
252        &self.inner
253    }
254
255    /// Returns a mutable reference to the inner generator.
256    pub fn inner_mut(&mut self) -> &mut G {
257        &mut self.inner
258    }
259}
260
261impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
262    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
263        let wm = self.inner.on_event(timestamp);
264        if let Some(ref w) = wm {
265            self.last_emitted_watermark = w.timestamp();
266            self.last_emit_time = Instant::now();
267        }
268        wm
269    }
270
271    fn on_periodic(&mut self) -> Option<Watermark> {
272        // Check if enough wall-clock time has passed
273        if self.last_emit_time.elapsed() >= self.period {
274            let current = self.inner.current_watermark();
275            if current > self.last_emitted_watermark {
276                self.last_emitted_watermark = current;
277                self.last_emit_time = Instant::now();
278                return Some(Watermark::new(current));
279            }
280            self.last_emit_time = Instant::now();
281        }
282        None
283    }
284
285    fn current_watermark(&self) -> i64 {
286        self.inner.current_watermark()
287    }
288
289    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
290        let wm = self.inner.advance_watermark(timestamp);
291        if let Some(ref w) = wm {
292            self.last_emitted_watermark = w.timestamp();
293            self.last_emit_time = Instant::now();
294        }
295        wm
296    }
297
298    fn is_processing_time(&self) -> bool {
299        self.inner.is_processing_time()
300    }
301}
302
303/// Punctuated watermark generator that emits based on special events.
304///
305/// Uses a predicate function to identify watermark-carrying events. When the
306/// predicate returns `Some(watermark)`, that watermark is emitted.
307///
308/// # Example
309///
310/// ```rust
311/// use laminar_core::time::{PunctuatedGenerator, WatermarkGenerator, Watermark};
312///
313/// // Emit watermark on every 1000ms boundary
314/// let mut gen = PunctuatedGenerator::new(|ts| {
315///     if ts % 1000 == 0 {
316///         Some(Watermark::new(ts))
317///     } else {
318///         None
319///     }
320/// });
321///
322/// assert_eq!(gen.on_event(999), None);
323/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
324/// ```
325pub struct PunctuatedGenerator<F>
326where
327    F: Fn(i64) -> Option<Watermark> + Send,
328{
329    predicate: F,
330    current_watermark: i64,
331}
332
333impl<F> PunctuatedGenerator<F>
334where
335    F: Fn(i64) -> Option<Watermark> + Send,
336{
337    /// Creates a new punctuated generator with the given predicate.
338    ///
339    /// # Arguments
340    ///
341    /// * `predicate` - Function that returns `Some(Watermark)` for watermark events
342    #[must_use]
343    pub fn new(predicate: F) -> Self {
344        Self {
345            predicate,
346            current_watermark: i64::MIN,
347        }
348    }
349}
350
351impl<F> WatermarkGenerator for PunctuatedGenerator<F>
352where
353    F: Fn(i64) -> Option<Watermark> + Send,
354{
355    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
356        if let Some(wm) = (self.predicate)(timestamp) {
357            if wm.timestamp() > self.current_watermark {
358                self.current_watermark = wm.timestamp();
359                return Some(wm);
360            }
361        }
362        None
363    }
364
365    fn on_periodic(&mut self) -> Option<Watermark> {
366        None
367    }
368
369    fn current_watermark(&self) -> i64 {
370        self.current_watermark
371    }
372
373    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
374        if timestamp > self.current_watermark {
375            self.current_watermark = timestamp;
376            Some(Watermark::new(timestamp))
377        } else {
378            None
379        }
380    }
381}
382
383/// Tracks watermarks across multiple input sources.
384///
385/// For operators with multiple inputs (e.g., joins, unions), the combined
386/// watermark is the minimum across all sources. This ensures no late events
387/// from any source are missed.
388///
389/// # Example
390///
391/// ```rust
392/// use laminar_core::time::{WatermarkTracker, Watermark};
393///
394/// let mut tracker = WatermarkTracker::new(3); // 3 sources
395///
396/// // Source 0 advances to 1000
397/// let wm = tracker.update_source(0, 1000);
398/// assert_eq!(wm, None); // Other sources still at MIN
399///
400/// // Source 1 advances to 2000
401/// tracker.update_source(1, 2000);
402///
403/// // Source 2 advances to 500
404/// let wm = tracker.update_source(2, 500);
405/// assert_eq!(wm, Some(Watermark::new(500))); // Min of all sources
406/// ```
407#[derive(Debug)]
408pub struct WatermarkTracker {
409    /// Watermark for each source
410    source_watermarks: Vec<i64>,
411    /// Combined minimum watermark
412    combined_watermark: i64,
413    /// Idle status for each source
414    idle_sources: Vec<bool>,
415    /// Last activity time for each source
416    last_activity: Vec<Instant>,
417    /// Per-source idle timeout; `None` ⇒ that source never auto-idles
418    /// (the default — idleness is opt-in, like Flink `withIdleness`).
419    idle_timeout: Vec<Option<Duration>>,
420}
421
422impl WatermarkTracker {
423    /// Creates a tracker with idleness **disabled** for all sources
424    /// (strict correctness; configure per source via
425    /// [`Self::set_idle_timeout`]).
426    #[must_use]
427    pub fn new(num_sources: usize) -> Self {
428        Self {
429            source_watermarks: vec![i64::MIN; num_sources],
430            combined_watermark: i64::MIN,
431            idle_sources: vec![false; num_sources],
432            last_activity: vec![Instant::now(); num_sources],
433            idle_timeout: vec![None; num_sources],
434        }
435    }
436
437    /// Creates a tracker with the same idle timeout for every source.
438    #[must_use]
439    pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
440        Self {
441            source_watermarks: vec![i64::MIN; num_sources],
442            combined_watermark: i64::MIN,
443            idle_sources: vec![false; num_sources],
444            last_activity: vec![Instant::now(); num_sources],
445            idle_timeout: vec![Some(idle_timeout); num_sources],
446        }
447    }
448
449    /// Sets (or clears, with `None`) the idle timeout for one source.
450    pub fn set_idle_timeout(&mut self, source_id: usize, timeout: Option<Duration>) {
451        if let Some(slot) = self.idle_timeout.get_mut(source_id) {
452            *slot = timeout;
453        }
454    }
455
456    /// Updates the watermark for a specific source.
457    ///
458    /// Returns `Some(Watermark)` if the combined watermark advances.
459    pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
460        if source_id >= self.source_watermarks.len() {
461            return None;
462        }
463
464        // Mark source as active
465        self.idle_sources[source_id] = false;
466        self.last_activity[source_id] = Instant::now();
467
468        // Update source watermark
469        if watermark > self.source_watermarks[source_id] {
470            self.source_watermarks[source_id] = watermark;
471            self.update_combined()
472        } else {
473            None
474        }
475    }
476
477    /// Marks a source as idle, excluding it from watermark calculation.
478    ///
479    /// Idle sources don't hold back the combined watermark.
480    pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
481        if source_id >= self.idle_sources.len() {
482            return None;
483        }
484
485        self.idle_sources[source_id] = true;
486        self.update_combined()
487    }
488
489    /// Checks for sources that have been idle longer than the timeout.
490    ///
491    /// Should be called periodically to detect stalled sources.
492    pub fn check_idle_sources(&mut self) -> Option<Watermark> {
493        let mut any_marked = false;
494        for i in 0..self.idle_sources.len() {
495            let Some(timeout) = self.idle_timeout[i] else {
496                continue; // idleness disabled for this source
497            };
498            if !self.idle_sources[i] && self.last_activity[i].elapsed() >= timeout {
499                self.idle_sources[i] = true;
500                any_marked = true;
501            }
502        }
503        if any_marked {
504            self.update_combined()
505        } else {
506            None
507        }
508    }
509
510    /// Returns the current combined watermark.
511    #[must_use]
512    pub fn current_watermark(&self) -> Option<Watermark> {
513        if self.combined_watermark == i64::MIN {
514            None
515        } else {
516            Some(Watermark::new(self.combined_watermark))
517        }
518    }
519
520    /// Returns the watermark for a specific source.
521    #[must_use]
522    pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
523        self.source_watermarks.get(source_id).copied()
524    }
525
526    /// Returns whether a source is marked as idle.
527    #[must_use]
528    pub fn is_idle(&self, source_id: usize) -> bool {
529        self.idle_sources.get(source_id).copied().unwrap_or(false)
530    }
531
532    /// Returns the number of sources being tracked.
533    #[must_use]
534    pub fn num_sources(&self) -> usize {
535        self.source_watermarks.len()
536    }
537
538    /// Returns the number of active (non-idle) sources.
539    #[must_use]
540    pub fn active_source_count(&self) -> usize {
541        self.idle_sources.iter().filter(|&&idle| !idle).count()
542    }
543
544    /// Updates the combined watermark based on all active sources.
545    fn update_combined(&mut self) -> Option<Watermark> {
546        // Calculate minimum across active sources only
547        let mut min_watermark = i64::MAX;
548        let mut has_active = false;
549
550        for (i, &wm) in self.source_watermarks.iter().enumerate() {
551            if !self.idle_sources[i] {
552                has_active = true;
553                min_watermark = min_watermark.min(wm);
554            }
555        }
556
557        // If all sources are idle, use the max watermark
558        if !has_active {
559            min_watermark = self
560                .source_watermarks
561                .iter()
562                .copied()
563                .max()
564                .unwrap_or(i64::MIN);
565        }
566
567        if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
568            self.combined_watermark = min_watermark;
569            Some(Watermark::new(min_watermark))
570        } else {
571            None
572        }
573    }
574}
575
576/// Watermark generator for sources with embedded watermarks.
577///
578/// Some sources (like Kafka with EOS) may provide watermarks directly.
579/// This generator tracks both event timestamps and explicit watermarks.
580pub struct SourceProvidedGenerator {
581    /// Last watermark from the source
582    source_watermark: i64,
583    /// Fallback generator for when source doesn't provide watermarks
584    fallback: BoundedOutOfOrdernessGenerator,
585    /// Whether to use source watermarks when available
586    prefer_source: bool,
587}
588
589impl SourceProvidedGenerator {
590    /// Creates a new source-provided generator.
591    ///
592    /// # Arguments
593    ///
594    /// * `fallback_lateness` - Lateness for fallback bounded generator
595    /// * `prefer_source` - If true, source watermarks take precedence
596    #[must_use]
597    pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
598        Self {
599            source_watermark: i64::MIN,
600            fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
601            prefer_source,
602        }
603    }
604
605    /// Updates the watermark from the source.
606    ///
607    /// Call this when the source provides an explicit watermark.
608    pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
609        if watermark > self.source_watermark {
610            self.source_watermark = watermark;
611            if self.prefer_source || watermark > self.fallback.current_watermark() {
612                return Some(Watermark::new(watermark));
613            }
614        }
615        None
616    }
617}
618
619impl WatermarkGenerator for SourceProvidedGenerator {
620    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
621        let fallback_wm = self.fallback.on_event(timestamp);
622
623        if self.prefer_source {
624            // Only emit if source watermark allows and it's an advancement
625            if self.source_watermark > i64::MIN {
626                return None; // Wait for source watermark
627            }
628        }
629
630        fallback_wm
631    }
632
633    fn on_periodic(&mut self) -> Option<Watermark> {
634        None
635    }
636
637    fn current_watermark(&self) -> i64 {
638        if self.prefer_source && self.source_watermark > i64::MIN {
639            self.source_watermark
640        } else {
641            self.fallback.current_watermark().max(self.source_watermark)
642        }
643    }
644
645    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
646        self.on_source_watermark(timestamp)
647    }
648}
649
650/// Processing-time watermark generator.
651///
652/// Ignores event timestamps entirely and advances the watermark based on
653/// wall-clock time. Use with `PROCTIME()` sources where events are processed
654/// in arrival order without event-time semantics.
655///
656/// - [`on_event`](WatermarkGenerator::on_event) returns `None` (event timestamps are ignored)
657/// - [`on_periodic`](WatermarkGenerator::on_periodic) returns `Some(Watermark::new(now_millis()))`
658///
659/// # Example
660///
661/// ```rust
662/// use laminar_core::time::{ProcessingTimeGenerator, WatermarkGenerator};
663///
664/// let mut gen = ProcessingTimeGenerator::new();
665/// // on_event ignores the timestamp
666/// assert_eq!(gen.on_event(1000), None);
667/// // on_periodic returns wall-clock time
668/// let wm = gen.on_periodic();
669/// assert!(wm.is_some());
670/// ```
671pub struct ProcessingTimeGenerator {
672    current_watermark: i64,
673}
674
675impl ProcessingTimeGenerator {
676    /// Creates a new processing-time watermark generator.
677    #[must_use]
678    pub fn new() -> Self {
679        Self {
680            current_watermark: i64::MIN,
681        }
682    }
683}
684
685impl Default for ProcessingTimeGenerator {
686    fn default() -> Self {
687        Self::new()
688    }
689}
690
691impl WatermarkGenerator for ProcessingTimeGenerator {
692    #[inline]
693    fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
694        // Processing-time mode ignores event timestamps
695        None
696    }
697
698    #[inline]
699    fn on_periodic(&mut self) -> Option<Watermark> {
700        let now = super::now_unix_millis();
701        if now > self.current_watermark {
702            self.current_watermark = now;
703            Some(Watermark::new(now))
704        } else {
705            None
706        }
707    }
708
709    #[inline]
710    fn current_watermark(&self) -> i64 {
711        self.current_watermark
712    }
713
714    #[inline]
715    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
716        if timestamp > self.current_watermark {
717            self.current_watermark = timestamp;
718            Some(Watermark::new(timestamp))
719        } else {
720            None
721        }
722    }
723
724    #[inline]
725    fn is_processing_time(&self) -> bool {
726        true
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733
734    #[test]
735    fn test_bounded_generator_first_event() {
736        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
737        let wm = gen.on_event(1000);
738        assert_eq!(wm, Some(Watermark::new(900)));
739        assert_eq!(gen.current_watermark(), 900);
740    }
741
742    #[test]
743    fn processing_time_domain_is_reported_for_late_filter_skip() {
744        // Source-side late-filtering keys off this: a wall-clock watermark must
745        // not be compared against event-time timestamps, or it drops every row.
746        assert!(ProcessingTimeGenerator::new().is_processing_time());
747        assert!(!BoundedOutOfOrdernessGenerator::new(100).is_processing_time());
748        // The periodic wrapper reports its inner generator's time domain.
749        let p = Duration::from_millis(1);
750        assert!(PeriodicGenerator::new(ProcessingTimeGenerator::new(), p).is_processing_time());
751        assert!(
752            !PeriodicGenerator::new(BoundedOutOfOrdernessGenerator::new(100), p)
753                .is_processing_time()
754        );
755    }
756
757    #[test]
758    fn test_bounded_generator_out_of_order() {
759        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
760
761        // First event
762        gen.on_event(1000);
763
764        // Out of order - should not emit new watermark
765        let wm = gen.on_event(800);
766        assert_eq!(wm, None);
767        assert_eq!(gen.current_watermark(), 900); // Still at 1000 - 100
768    }
769
770    #[test]
771    fn test_bounded_generator_advancement() {
772        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
773
774        gen.on_event(1000);
775        let wm = gen.on_event(1200);
776
777        assert_eq!(wm, Some(Watermark::new(1100)));
778    }
779
780    #[test]
781    fn test_bounded_generator_from_duration() {
782        let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
783        assert_eq!(gen.max_out_of_orderness(), 5000);
784    }
785
786    #[test]
787    fn test_bounded_generator_no_periodic() {
788        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
789        assert_eq!(gen.on_periodic(), None);
790    }
791
792    #[test]
793    fn test_ascending_generator_advances_on_each_event() {
794        let mut gen = AscendingTimestampsGenerator::new();
795
796        let wm1 = gen.on_event(1000);
797        assert_eq!(wm1, Some(Watermark::new(1000)));
798
799        let wm2 = gen.on_event(2000);
800        assert_eq!(wm2, Some(Watermark::new(2000)));
801    }
802
803    #[test]
804    fn test_ascending_generator_ignores_backwards() {
805        let mut gen = AscendingTimestampsGenerator::new();
806
807        gen.on_event(2000);
808        let wm = gen.on_event(1000); // Earlier timestamp
809
810        assert_eq!(wm, None);
811        assert_eq!(gen.current_watermark(), 2000);
812    }
813
814    #[test]
815    fn test_periodic_generator_passes_through() {
816        let inner = BoundedOutOfOrdernessGenerator::new(100);
817        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
818
819        let wm = gen.on_event(1000);
820        assert_eq!(wm, Some(Watermark::new(900)));
821    }
822
823    #[test]
824    fn test_periodic_generator_inner_access() {
825        let inner = BoundedOutOfOrdernessGenerator::new(100);
826        let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
827
828        assert_eq!(gen.inner().max_out_of_orderness(), 100);
829    }
830
831    #[test]
832    fn test_punctuated_generator_predicate() {
833        let mut gen = PunctuatedGenerator::new(|ts| {
834            if ts % 1000 == 0 {
835                Some(Watermark::new(ts))
836            } else {
837                None
838            }
839        });
840
841        assert_eq!(gen.on_event(500), None);
842        assert_eq!(gen.on_event(999), None);
843        assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
844        assert_eq!(gen.on_event(1500), None);
845        assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
846    }
847
848    #[test]
849    fn test_punctuated_generator_no_regression() {
850        let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
851
852        gen.on_event(2000);
853        let wm = gen.on_event(1000); // Lower watermark
854
855        assert_eq!(wm, None);
856        assert_eq!(gen.current_watermark(), 2000);
857    }
858
859    #[test]
860    fn test_tracker_single_source() {
861        let mut tracker = WatermarkTracker::new(1);
862
863        let wm = tracker.update_source(0, 1000);
864        assert_eq!(wm, Some(Watermark::new(1000)));
865        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
866    }
867
868    #[test]
869    fn test_tracker_multiple_sources() {
870        let mut tracker = WatermarkTracker::new(3);
871
872        // All sources need to report before watermark advances
873        tracker.update_source(0, 1000);
874        tracker.update_source(1, 2000);
875        let wm = tracker.update_source(2, 500);
876
877        assert_eq!(wm, Some(Watermark::new(500))); // Minimum
878    }
879
880    #[test]
881    fn test_tracker_min_watermark() {
882        let mut tracker = WatermarkTracker::new(2);
883
884        tracker.update_source(0, 5000);
885        tracker.update_source(1, 3000);
886
887        assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
888
889        // Source 1 advances
890        tracker.update_source(1, 4000);
891        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
892    }
893
894    #[test]
895    fn test_tracker_idle_source() {
896        let mut tracker = WatermarkTracker::new(2);
897
898        tracker.update_source(0, 5000);
899        tracker.update_source(1, 1000);
900
901        // Source 1 is slow, mark it idle
902        let wm = tracker.mark_idle(1);
903
904        // Now only source 0's watermark counts
905        assert_eq!(wm, Some(Watermark::new(5000)));
906    }
907
908    #[test]
909    fn check_idle_sources_advances_then_reactivation_is_monotone() {
910        // A quiet watermarked source must not pin the combined-min, and a
911        // later-reactivating source with an OLD watermark must not regress
912        // it. `idle_timeout = 0` makes any source immediately eligible for
913        // `check_idle_sources` without sleeping.
914        let mut tracker = WatermarkTracker::with_idle_timeout(2, Duration::ZERO);
915        tracker.update_source(0, 5_000); // active, fast
916        tracker.update_source(1, 1_000); // will go quiet
917        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1_000)));
918
919        // Source 1 idle past timeout → excluded → combined jumps to s0.
920        let advanced = tracker.check_idle_sources();
921        assert_eq!(advanced, Some(Watermark::new(5_000)));
922        assert_eq!(tracker.current_watermark(), Some(Watermark::new(5_000)));
923
924        // Source 1 reactivates with a STALE watermark: must not regress.
925        let res = tracker.update_source(1, 1_500);
926        assert_eq!(res, None, "stale reactivation must not emit a regress");
927        assert_eq!(tracker.current_watermark(), Some(Watermark::new(5_000)));
928
929        // Once it catches up past the combined, progress resumes from min.
930        tracker.update_source(0, 9_000);
931        tracker.update_source(1, 8_000);
932        assert_eq!(tracker.current_watermark(), Some(Watermark::new(8_000)));
933    }
934
935    #[test]
936    fn test_tracker_all_idle() {
937        let mut tracker = WatermarkTracker::new(2);
938
939        tracker.update_source(0, 5000);
940        tracker.update_source(1, 3000);
941
942        tracker.mark_idle(0);
943        let wm = tracker.mark_idle(1);
944
945        // Use max when all idle
946        assert_eq!(wm, Some(Watermark::new(5000)));
947    }
948
949    #[test]
950    fn test_tracker_source_watermark() {
951        let mut tracker = WatermarkTracker::new(2);
952
953        tracker.update_source(0, 1000);
954        tracker.update_source(1, 2000);
955
956        assert_eq!(tracker.source_watermark(0), Some(1000));
957        assert_eq!(tracker.source_watermark(1), Some(2000));
958        assert_eq!(tracker.source_watermark(5), None); // Out of bounds
959    }
960
961    #[test]
962    fn test_tracker_active_source_count() {
963        let mut tracker = WatermarkTracker::new(3);
964
965        assert_eq!(tracker.active_source_count(), 3);
966
967        tracker.mark_idle(0);
968        assert_eq!(tracker.active_source_count(), 2);
969
970        tracker.mark_idle(2);
971        assert_eq!(tracker.active_source_count(), 1);
972
973        // Reactivate by updating
974        tracker.update_source(0, 1000);
975        assert_eq!(tracker.active_source_count(), 2);
976    }
977
978    #[test]
979    fn test_tracker_invalid_source() {
980        let mut tracker = WatermarkTracker::new(2);
981
982        let wm = tracker.update_source(5, 1000); // Invalid source ID
983        assert_eq!(wm, None);
984
985        let wm = tracker.mark_idle(5);
986        assert_eq!(wm, None);
987    }
988
989    #[test]
990    fn test_source_provided_fallback() {
991        let mut gen = SourceProvidedGenerator::new(100, false);
992
993        let wm = gen.on_event(1000);
994        assert_eq!(wm, Some(Watermark::new(900))); // Fallback behavior
995    }
996
997    #[test]
998    fn test_source_provided_explicit_watermark() {
999        let mut gen = SourceProvidedGenerator::new(100, true);
1000
1001        let wm = gen.on_source_watermark(500);
1002        assert_eq!(wm, Some(Watermark::new(500)));
1003        assert_eq!(gen.current_watermark(), 500);
1004    }
1005
1006    // --- advance_watermark() tests ---
1007
1008    #[test]
1009    fn test_advance_watermark_bounded_generator() {
1010        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1011
1012        // Advance from initial state
1013        let wm = gen.advance_watermark(500);
1014        assert_eq!(wm, Some(Watermark::new(500)));
1015        assert_eq!(gen.current_watermark(), 500);
1016
1017        // Advance further
1018        let wm = gen.advance_watermark(800);
1019        assert_eq!(wm, Some(Watermark::new(800)));
1020        assert_eq!(gen.current_watermark(), 800);
1021
1022        // No regression
1023        let wm = gen.advance_watermark(600);
1024        assert_eq!(wm, None);
1025        assert_eq!(gen.current_watermark(), 800);
1026    }
1027
1028    #[test]
1029    fn test_advance_watermark_maintains_invariant() {
1030        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1031
1032        // Process an event to set initial state
1033        gen.on_event(1000); // wm=900, max_ts=1000
1034
1035        // Advance watermark beyond current
1036        gen.advance_watermark(1200);
1037        assert_eq!(gen.current_watermark(), 1200);
1038
1039        // Now on_event at 1250 should work correctly: max_ts should be >= 1300
1040        // wm = 1250 - 100 = 1150 which is < 1200, so no new watermark from on_event
1041        let wm = gen.on_event(1250);
1042        assert_eq!(wm, None);
1043        assert_eq!(gen.current_watermark(), 1200);
1044
1045        // But event at 1400: max_ts = 1400, wm = 1300 > 1200
1046        let wm = gen.on_event(1400);
1047        assert_eq!(wm, Some(Watermark::new(1300)));
1048    }
1049
1050    #[test]
1051    fn test_advance_watermark_ascending_generator() {
1052        let mut gen = AscendingTimestampsGenerator::new();
1053
1054        let wm = gen.advance_watermark(500);
1055        assert_eq!(wm, Some(Watermark::new(500)));
1056        assert_eq!(gen.current_watermark(), 500);
1057
1058        // No regression
1059        let wm = gen.advance_watermark(300);
1060        assert_eq!(wm, None);
1061        assert_eq!(gen.current_watermark(), 500);
1062
1063        // Further advance
1064        let wm = gen.advance_watermark(1000);
1065        assert_eq!(wm, Some(Watermark::new(1000)));
1066    }
1067
1068    #[test]
1069    fn test_advance_watermark_periodic_generator() {
1070        let inner = BoundedOutOfOrdernessGenerator::new(100);
1071        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
1072
1073        let wm = gen.advance_watermark(500);
1074        assert_eq!(wm, Some(Watermark::new(500)));
1075        assert_eq!(gen.current_watermark(), 500);
1076
1077        // No regression
1078        let wm = gen.advance_watermark(300);
1079        assert_eq!(wm, None);
1080    }
1081
1082    #[test]
1083    fn test_advance_watermark_punctuated_generator() {
1084        let mut gen = PunctuatedGenerator::new(|ts| {
1085            if ts % 1000 == 0 {
1086                Some(Watermark::new(ts))
1087            } else {
1088                None
1089            }
1090        });
1091
1092        // External advance (does not invoke predicate)
1093        let wm = gen.advance_watermark(500);
1094        assert_eq!(wm, Some(Watermark::new(500)));
1095        assert_eq!(gen.current_watermark(), 500);
1096
1097        // No regression
1098        let wm = gen.advance_watermark(200);
1099        assert_eq!(wm, None);
1100    }
1101
1102    #[test]
1103    fn test_advance_watermark_source_provided_generator() {
1104        let mut gen = SourceProvidedGenerator::new(100, true);
1105
1106        let wm = gen.advance_watermark(500);
1107        assert_eq!(wm, Some(Watermark::new(500)));
1108        assert_eq!(gen.current_watermark(), 500);
1109
1110        // No regression
1111        let wm = gen.advance_watermark(300);
1112        assert_eq!(wm, None);
1113    }
1114
1115    // --- ProcessingTimeGenerator tests ---
1116
1117    #[test]
1118    fn test_processing_time_generator_ignores_events() {
1119        let mut gen = ProcessingTimeGenerator::new();
1120        assert_eq!(gen.on_event(1000), None);
1121        assert_eq!(gen.on_event(2000), None);
1122        assert_eq!(gen.current_watermark(), i64::MIN);
1123    }
1124
1125    #[test]
1126    fn test_processing_time_generator_periodic() {
1127        let mut gen = ProcessingTimeGenerator::new();
1128        let wm = gen.on_periodic();
1129        assert!(wm.is_some());
1130        let ts = wm.unwrap().timestamp();
1131        // Should be a reasonable timestamp (after 2020-01-01)
1132        assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
1133    }
1134
1135    #[test]
1136    fn test_processing_time_generator_advance_watermark() {
1137        let mut gen = ProcessingTimeGenerator::new();
1138
1139        let wm = gen.advance_watermark(500);
1140        assert_eq!(wm, Some(Watermark::new(500)));
1141        assert_eq!(gen.current_watermark(), 500);
1142
1143        // No regression
1144        let wm = gen.advance_watermark(300);
1145        assert_eq!(wm, None);
1146        assert_eq!(gen.current_watermark(), 500);
1147
1148        // Further advance
1149        let wm = gen.advance_watermark(1000);
1150        assert_eq!(wm, Some(Watermark::new(1000)));
1151    }
1152
1153    #[test]
1154    fn test_processing_time_generator_default() {
1155        let gen = ProcessingTimeGenerator::default();
1156        assert_eq!(gen.current_watermark(), i64::MIN);
1157    }
1158
1159    // --- future-skew guard ---
1160
1161    #[test]
1162    fn future_skew_event_does_not_advance_watermark() {
1163        let mut gen = BoundedOutOfOrdernessGenerator::new(0);
1164        let now = crate::time::now_unix_millis();
1165        // A ~2h-future event must not poison the watermark...
1166        assert_eq!(gen.on_event(now + 2 * 60 * 60 * 1000), None);
1167        assert_eq!(gen.current_watermark(), i64::MIN);
1168        // ...but a normal event still advances it.
1169        assert_eq!(gen.on_event(now), Some(Watermark::new(now)));
1170    }
1171}