Skip to main content

laminar_core/time/
watermark.rs

1//! Watermark generators and multi-source tracking.
2
3use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use super::Watermark;
6
7/// Trait for generating watermarks from event timestamps.
8///
9/// Implementations track observed timestamps and produce watermarks that indicate
10/// event-time progress. The watermark is an assertion that no events with timestamps
11/// earlier than the watermark are expected.
12pub trait WatermarkGenerator: Send {
13    /// Process an event timestamp and potentially emit a new watermark.
14    ///
15    /// Called for each event processed. Returns `Some(watermark)` if the watermark
16    /// should advance, `None` otherwise.
17    fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
18
19    /// Called periodically to emit watermarks based on wall-clock time.
20    ///
21    /// Useful for generating watermarks even when no events are arriving.
22    fn on_periodic(&mut self) -> Option<Watermark>;
23
24    /// Returns the current watermark value without advancing it.
25    fn current_watermark(&self) -> i64;
26
27    /// Advances the watermark to at least the given timestamp from an external source.
28    ///
29    /// Called when the source provides an explicit watermark (e.g., `Source::watermark()`).
30    /// Returns `Some(Watermark)` if the watermark advanced, `None` if the timestamp
31    /// was not higher than the current watermark.
32    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
33}
34
35/// Watermark generator with bounded out-of-orderness.
36///
37/// Allows events to arrive out of order by up to `max_out_of_orderness` milliseconds.
38/// The watermark is always `max_timestamp_seen - max_out_of_orderness`.
39///
40/// # Example
41///
42/// ```rust
43/// use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};
44///
45/// let mut gen = BoundedOutOfOrdernessGenerator::new(100); // 100ms lateness allowed
46///
47/// // First event at t=1000
48/// assert_eq!(gen.on_event(1000), Some(Watermark::new(900)));
49///
50/// // Out-of-order event at t=800 - no watermark advance
51/// assert_eq!(gen.on_event(800), None);
52///
53/// // New max at t=1200
54/// assert_eq!(gen.on_event(1200), Some(Watermark::new(1100)));
55/// ```
56pub struct BoundedOutOfOrdernessGenerator {
57    max_out_of_orderness: i64,
58    current_max_timestamp: i64,
59    current_watermark: i64,
60}
61
62impl BoundedOutOfOrdernessGenerator {
63    /// Creates a new generator with the specified maximum out-of-orderness.
64    ///
65    /// # Arguments
66    ///
67    /// * `max_out_of_orderness` - Maximum allowed lateness in milliseconds
68    #[must_use]
69    pub fn new(max_out_of_orderness: i64) -> Self {
70        Self {
71            max_out_of_orderness,
72            current_max_timestamp: i64::MIN,
73            current_watermark: i64::MIN,
74        }
75    }
76
77    /// Creates a new generator from a `Duration`.
78    #[must_use]
79    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
80    pub fn from_duration(max_out_of_orderness: Duration) -> Self {
81        Self::new(max_out_of_orderness.as_millis() as i64)
82    }
83
84    /// Returns the maximum out-of-orderness in milliseconds.
85    #[must_use]
86    pub fn max_out_of_orderness(&self) -> i64 {
87        self.max_out_of_orderness
88    }
89}
90
91impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
92    #[inline]
93    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
94        if timestamp > self.current_max_timestamp {
95            self.current_max_timestamp = timestamp;
96            let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
97            if new_watermark > self.current_watermark {
98                self.current_watermark = new_watermark;
99                return Some(Watermark::new(new_watermark));
100            }
101        }
102        None
103    }
104
105    #[inline]
106    fn on_periodic(&mut self) -> Option<Watermark> {
107        // Bounded out-of-orderness doesn't emit periodic watermarks
108        None
109    }
110
111    #[inline]
112    fn current_watermark(&self) -> i64 {
113        self.current_watermark
114    }
115
116    #[inline]
117    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
118        if timestamp > self.current_watermark {
119            self.current_watermark = timestamp;
120            // Maintain invariant: current_max_timestamp >= current_watermark + max_out_of_orderness
121            let min_max = timestamp.saturating_add(self.max_out_of_orderness);
122            if min_max > self.current_max_timestamp {
123                self.current_max_timestamp = min_max;
124            }
125            Some(Watermark::new(timestamp))
126        } else {
127            None
128        }
129    }
130}
131
132/// Watermark generator for strictly ascending timestamps.
133///
134/// Assumes events arrive in timestamp order with no lateness. The watermark
135/// equals the current timestamp. Use this for sources that guarantee ordering.
136///
137/// # Example
138///
139/// ```rust
140/// use laminar_core::time::{AscendingTimestampsGenerator, WatermarkGenerator, Watermark};
141///
142/// let mut gen = AscendingTimestampsGenerator::new();
143/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
144/// assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
145/// ```
146#[derive(Debug, Default)]
147pub struct AscendingTimestampsGenerator {
148    current_watermark: i64,
149}
150
151impl AscendingTimestampsGenerator {
152    /// Creates a new ascending timestamps generator.
153    #[must_use]
154    pub fn new() -> Self {
155        Self {
156            current_watermark: i64::MIN,
157        }
158    }
159}
160
161impl WatermarkGenerator for AscendingTimestampsGenerator {
162    #[inline]
163    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
164        if timestamp > self.current_watermark {
165            self.current_watermark = timestamp;
166            Some(Watermark::new(timestamp))
167        } else {
168            None
169        }
170    }
171
172    #[inline]
173    fn on_periodic(&mut self) -> Option<Watermark> {
174        None
175    }
176
177    #[inline]
178    fn current_watermark(&self) -> i64 {
179        self.current_watermark
180    }
181
182    #[inline]
183    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
184        if timestamp > self.current_watermark {
185            self.current_watermark = timestamp;
186            Some(Watermark::new(timestamp))
187        } else {
188            None
189        }
190    }
191}
192
193/// Periodic watermark generator that emits at fixed wall-clock intervals.
194///
195/// Wraps another generator and emits watermarks periodically even when no
196/// events are arriving. Useful for handling idle sources and ensuring
197/// time-based windows eventually trigger.
198///
199/// # Example
200///
201/// ```rust,no_run
202/// use laminar_core::time::{
203///     PeriodicGenerator, BoundedOutOfOrdernessGenerator, WatermarkGenerator,
204/// };
205/// use std::time::Duration;
206///
207/// let inner = BoundedOutOfOrdernessGenerator::new(100);
208/// let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(500));
209///
210/// // First event
211/// gen.on_event(1000);
212///
213/// // Later, periodic check may emit watermark
214/// // (depends on wall-clock time elapsed)
215/// let wm = gen.on_periodic();
216/// ```
217pub struct PeriodicGenerator<G: WatermarkGenerator> {
218    inner: G,
219    period: Duration,
220    last_emit_time: Instant,
221    last_emitted_watermark: i64,
222}
223
224impl<G: WatermarkGenerator> PeriodicGenerator<G> {
225    /// Creates a new periodic generator wrapping another generator.
226    ///
227    /// # Arguments
228    ///
229    /// * `inner` - The underlying watermark generator
230    /// * `period` - How often to emit watermarks (wall-clock time)
231    #[must_use]
232    pub fn new(inner: G, period: Duration) -> Self {
233        Self {
234            inner,
235            period,
236            last_emit_time: Instant::now(),
237            last_emitted_watermark: i64::MIN,
238        }
239    }
240
241    /// Returns a reference to the inner generator.
242    #[must_use]
243    pub fn inner(&self) -> &G {
244        &self.inner
245    }
246
247    /// Returns a mutable reference to the inner generator.
248    pub fn inner_mut(&mut self) -> &mut G {
249        &mut self.inner
250    }
251}
252
253impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
254    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
255        let wm = self.inner.on_event(timestamp);
256        if let Some(ref w) = wm {
257            self.last_emitted_watermark = w.timestamp();
258            self.last_emit_time = Instant::now();
259        }
260        wm
261    }
262
263    fn on_periodic(&mut self) -> Option<Watermark> {
264        // Check if enough wall-clock time has passed
265        if self.last_emit_time.elapsed() >= self.period {
266            let current = self.inner.current_watermark();
267            if current > self.last_emitted_watermark {
268                self.last_emitted_watermark = current;
269                self.last_emit_time = Instant::now();
270                return Some(Watermark::new(current));
271            }
272            self.last_emit_time = Instant::now();
273        }
274        None
275    }
276
277    fn current_watermark(&self) -> i64 {
278        self.inner.current_watermark()
279    }
280
281    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
282        let wm = self.inner.advance_watermark(timestamp);
283        if let Some(ref w) = wm {
284            self.last_emitted_watermark = w.timestamp();
285            self.last_emit_time = Instant::now();
286        }
287        wm
288    }
289}
290
291/// Punctuated watermark generator that emits based on special events.
292///
293/// Uses a predicate function to identify watermark-carrying events. When the
294/// predicate returns `Some(watermark)`, that watermark is emitted.
295///
296/// # Example
297///
298/// ```rust
299/// use laminar_core::time::{PunctuatedGenerator, WatermarkGenerator, Watermark};
300///
301/// // Emit watermark on every 1000ms boundary
302/// let mut gen = PunctuatedGenerator::new(|ts| {
303///     if ts % 1000 == 0 {
304///         Some(Watermark::new(ts))
305///     } else {
306///         None
307///     }
308/// });
309///
310/// assert_eq!(gen.on_event(999), None);
311/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
312/// ```
313pub struct PunctuatedGenerator<F>
314where
315    F: Fn(i64) -> Option<Watermark> + Send,
316{
317    predicate: F,
318    current_watermark: i64,
319}
320
321impl<F> PunctuatedGenerator<F>
322where
323    F: Fn(i64) -> Option<Watermark> + Send,
324{
325    /// Creates a new punctuated generator with the given predicate.
326    ///
327    /// # Arguments
328    ///
329    /// * `predicate` - Function that returns `Some(Watermark)` for watermark events
330    #[must_use]
331    pub fn new(predicate: F) -> Self {
332        Self {
333            predicate,
334            current_watermark: i64::MIN,
335        }
336    }
337}
338
339impl<F> WatermarkGenerator for PunctuatedGenerator<F>
340where
341    F: Fn(i64) -> Option<Watermark> + Send,
342{
343    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
344        if let Some(wm) = (self.predicate)(timestamp) {
345            if wm.timestamp() > self.current_watermark {
346                self.current_watermark = wm.timestamp();
347                return Some(wm);
348            }
349        }
350        None
351    }
352
353    fn on_periodic(&mut self) -> Option<Watermark> {
354        None
355    }
356
357    fn current_watermark(&self) -> i64 {
358        self.current_watermark
359    }
360
361    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
362        if timestamp > self.current_watermark {
363            self.current_watermark = timestamp;
364            Some(Watermark::new(timestamp))
365        } else {
366            None
367        }
368    }
369}
370
371/// Tracks watermarks across multiple input sources.
372///
373/// For operators with multiple inputs (e.g., joins, unions), the combined
374/// watermark is the minimum across all sources. This ensures no late events
375/// from any source are missed.
376///
377/// # Example
378///
379/// ```rust
380/// use laminar_core::time::{WatermarkTracker, Watermark};
381///
382/// let mut tracker = WatermarkTracker::new(3); // 3 sources
383///
384/// // Source 0 advances to 1000
385/// let wm = tracker.update_source(0, 1000);
386/// assert_eq!(wm, None); // Other sources still at MIN
387///
388/// // Source 1 advances to 2000
389/// tracker.update_source(1, 2000);
390///
391/// // Source 2 advances to 500
392/// let wm = tracker.update_source(2, 500);
393/// assert_eq!(wm, Some(Watermark::new(500))); // Min of all sources
394/// ```
395#[derive(Debug)]
396pub struct WatermarkTracker {
397    /// Watermark for each source
398    source_watermarks: Vec<i64>,
399    /// Combined minimum watermark
400    combined_watermark: i64,
401    /// Idle status for each source
402    idle_sources: Vec<bool>,
403    /// Last activity time for each source
404    last_activity: Vec<Instant>,
405    /// Idle timeout duration
406    idle_timeout: Duration,
407}
408
409impl WatermarkTracker {
410    /// Creates a new tracker for the specified number of sources.
411    #[must_use]
412    pub fn new(num_sources: usize) -> Self {
413        Self {
414            source_watermarks: vec![i64::MIN; num_sources],
415            combined_watermark: i64::MIN,
416            idle_sources: vec![false; num_sources],
417            last_activity: vec![Instant::now(); num_sources],
418            idle_timeout: Duration::from_secs(30), // Default 30 second idle timeout
419        }
420    }
421
422    /// Creates a new tracker with a custom idle timeout.
423    #[must_use]
424    pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
425        Self {
426            source_watermarks: vec![i64::MIN; num_sources],
427            combined_watermark: i64::MIN,
428            idle_sources: vec![false; num_sources],
429            last_activity: vec![Instant::now(); num_sources],
430            idle_timeout,
431        }
432    }
433
434    /// Updates the watermark for a specific source.
435    ///
436    /// Returns `Some(Watermark)` if the combined watermark advances.
437    pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
438        if source_id >= self.source_watermarks.len() {
439            return None;
440        }
441
442        // Mark source as active
443        self.idle_sources[source_id] = false;
444        self.last_activity[source_id] = Instant::now();
445
446        // Update source watermark
447        if watermark > self.source_watermarks[source_id] {
448            self.source_watermarks[source_id] = watermark;
449            self.update_combined()
450        } else {
451            None
452        }
453    }
454
455    /// Marks a source as idle, excluding it from watermark calculation.
456    ///
457    /// Idle sources don't hold back the combined watermark.
458    pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
459        if source_id >= self.idle_sources.len() {
460            return None;
461        }
462
463        self.idle_sources[source_id] = true;
464        self.update_combined()
465    }
466
467    /// Checks for sources that have been idle longer than the timeout.
468    ///
469    /// Should be called periodically to detect stalled sources.
470    pub fn check_idle_sources(&mut self) -> Option<Watermark> {
471        let mut any_marked = false;
472        for i in 0..self.idle_sources.len() {
473            if !self.idle_sources[i] && self.last_activity[i].elapsed() >= self.idle_timeout {
474                self.idle_sources[i] = true;
475                any_marked = true;
476            }
477        }
478        if any_marked {
479            self.update_combined()
480        } else {
481            None
482        }
483    }
484
485    /// Returns the current combined watermark.
486    #[must_use]
487    pub fn current_watermark(&self) -> Option<Watermark> {
488        if self.combined_watermark == i64::MIN {
489            None
490        } else {
491            Some(Watermark::new(self.combined_watermark))
492        }
493    }
494
495    /// Returns the watermark for a specific source.
496    #[must_use]
497    pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
498        self.source_watermarks.get(source_id).copied()
499    }
500
501    /// Returns whether a source is marked as idle.
502    #[must_use]
503    pub fn is_idle(&self, source_id: usize) -> bool {
504        self.idle_sources.get(source_id).copied().unwrap_or(false)
505    }
506
507    /// Returns the number of sources being tracked.
508    #[must_use]
509    pub fn num_sources(&self) -> usize {
510        self.source_watermarks.len()
511    }
512
513    /// Returns the number of active (non-idle) sources.
514    #[must_use]
515    pub fn active_source_count(&self) -> usize {
516        self.idle_sources.iter().filter(|&&idle| !idle).count()
517    }
518
519    /// Updates the combined watermark based on all active sources.
520    fn update_combined(&mut self) -> Option<Watermark> {
521        // Calculate minimum across active sources only
522        let mut min_watermark = i64::MAX;
523        let mut has_active = false;
524
525        for (i, &wm) in self.source_watermarks.iter().enumerate() {
526            if !self.idle_sources[i] {
527                has_active = true;
528                min_watermark = min_watermark.min(wm);
529            }
530        }
531
532        // If all sources are idle, use the max watermark
533        if !has_active {
534            min_watermark = self
535                .source_watermarks
536                .iter()
537                .copied()
538                .max()
539                .unwrap_or(i64::MIN);
540        }
541
542        if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
543            self.combined_watermark = min_watermark;
544            Some(Watermark::new(min_watermark))
545        } else {
546            None
547        }
548    }
549}
550
551/// Watermark generator for sources with embedded watermarks.
552///
553/// Some sources (like Kafka with EOS) may provide watermarks directly.
554/// This generator tracks both event timestamps and explicit watermarks.
555pub struct SourceProvidedGenerator {
556    /// Last watermark from the source
557    source_watermark: i64,
558    /// Fallback generator for when source doesn't provide watermarks
559    fallback: BoundedOutOfOrdernessGenerator,
560    /// Whether to use source watermarks when available
561    prefer_source: bool,
562}
563
564impl SourceProvidedGenerator {
565    /// Creates a new source-provided generator.
566    ///
567    /// # Arguments
568    ///
569    /// * `fallback_lateness` - Lateness for fallback bounded generator
570    /// * `prefer_source` - If true, source watermarks take precedence
571    #[must_use]
572    pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
573        Self {
574            source_watermark: i64::MIN,
575            fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
576            prefer_source,
577        }
578    }
579
580    /// Updates the watermark from the source.
581    ///
582    /// Call this when the source provides an explicit watermark.
583    pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
584        if watermark > self.source_watermark {
585            self.source_watermark = watermark;
586            if self.prefer_source || watermark > self.fallback.current_watermark() {
587                return Some(Watermark::new(watermark));
588            }
589        }
590        None
591    }
592}
593
594impl WatermarkGenerator for SourceProvidedGenerator {
595    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
596        let fallback_wm = self.fallback.on_event(timestamp);
597
598        if self.prefer_source {
599            // Only emit if source watermark allows and it's an advancement
600            if self.source_watermark > i64::MIN {
601                return None; // Wait for source watermark
602            }
603        }
604
605        fallback_wm
606    }
607
608    fn on_periodic(&mut self) -> Option<Watermark> {
609        None
610    }
611
612    fn current_watermark(&self) -> i64 {
613        if self.prefer_source && self.source_watermark > i64::MIN {
614            self.source_watermark
615        } else {
616            self.fallback.current_watermark().max(self.source_watermark)
617        }
618    }
619
620    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
621        self.on_source_watermark(timestamp)
622    }
623}
624
625/// Processing-time watermark generator.
626///
627/// Ignores event timestamps entirely and advances the watermark based on
628/// wall-clock time. Use with `PROCTIME()` sources where events are processed
629/// in arrival order without event-time semantics.
630///
631/// - [`on_event`](WatermarkGenerator::on_event) returns `None` (event timestamps are ignored)
632/// - [`on_periodic`](WatermarkGenerator::on_periodic) returns `Some(Watermark::new(now_millis()))`
633///
634/// # Example
635///
636/// ```rust
637/// use laminar_core::time::{ProcessingTimeGenerator, WatermarkGenerator};
638///
639/// let mut gen = ProcessingTimeGenerator::new();
640/// // on_event ignores the timestamp
641/// assert_eq!(gen.on_event(1000), None);
642/// // on_periodic returns wall-clock time
643/// let wm = gen.on_periodic();
644/// assert!(wm.is_some());
645/// ```
646pub struct ProcessingTimeGenerator {
647    current_watermark: i64,
648}
649
650impl ProcessingTimeGenerator {
651    /// Creates a new processing-time watermark generator.
652    #[must_use]
653    pub fn new() -> Self {
654        Self {
655            current_watermark: i64::MIN,
656        }
657    }
658
659    /// Returns the current wall-clock time in milliseconds since Unix epoch.
660    #[allow(clippy::cast_possible_truncation)]
661    fn now_millis() -> i64 {
662        SystemTime::now()
663            .duration_since(UNIX_EPOCH)
664            .unwrap_or(Duration::ZERO)
665            .as_millis() as i64
666    }
667}
668
669impl Default for ProcessingTimeGenerator {
670    fn default() -> Self {
671        Self::new()
672    }
673}
674
675impl WatermarkGenerator for ProcessingTimeGenerator {
676    #[inline]
677    fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
678        // Processing-time mode ignores event timestamps
679        None
680    }
681
682    #[inline]
683    fn on_periodic(&mut self) -> Option<Watermark> {
684        let now = Self::now_millis();
685        if now > self.current_watermark {
686            self.current_watermark = now;
687            Some(Watermark::new(now))
688        } else {
689            None
690        }
691    }
692
693    #[inline]
694    fn current_watermark(&self) -> i64 {
695        self.current_watermark
696    }
697
698    #[inline]
699    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
700        if timestamp > self.current_watermark {
701            self.current_watermark = timestamp;
702            Some(Watermark::new(timestamp))
703        } else {
704            None
705        }
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712
713    #[test]
714    fn test_bounded_generator_first_event() {
715        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
716        let wm = gen.on_event(1000);
717        assert_eq!(wm, Some(Watermark::new(900)));
718        assert_eq!(gen.current_watermark(), 900);
719    }
720
721    #[test]
722    fn test_bounded_generator_out_of_order() {
723        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
724
725        // First event
726        gen.on_event(1000);
727
728        // Out of order - should not emit new watermark
729        let wm = gen.on_event(800);
730        assert_eq!(wm, None);
731        assert_eq!(gen.current_watermark(), 900); // Still at 1000 - 100
732    }
733
734    #[test]
735    fn test_bounded_generator_advancement() {
736        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
737
738        gen.on_event(1000);
739        let wm = gen.on_event(1200);
740
741        assert_eq!(wm, Some(Watermark::new(1100)));
742    }
743
744    #[test]
745    fn test_bounded_generator_from_duration() {
746        let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
747        assert_eq!(gen.max_out_of_orderness(), 5000);
748    }
749
750    #[test]
751    fn test_bounded_generator_no_periodic() {
752        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
753        assert_eq!(gen.on_periodic(), None);
754    }
755
756    #[test]
757    fn test_ascending_generator_advances_on_each_event() {
758        let mut gen = AscendingTimestampsGenerator::new();
759
760        let wm1 = gen.on_event(1000);
761        assert_eq!(wm1, Some(Watermark::new(1000)));
762
763        let wm2 = gen.on_event(2000);
764        assert_eq!(wm2, Some(Watermark::new(2000)));
765    }
766
767    #[test]
768    fn test_ascending_generator_ignores_backwards() {
769        let mut gen = AscendingTimestampsGenerator::new();
770
771        gen.on_event(2000);
772        let wm = gen.on_event(1000); // Earlier timestamp
773
774        assert_eq!(wm, None);
775        assert_eq!(gen.current_watermark(), 2000);
776    }
777
778    #[test]
779    fn test_periodic_generator_passes_through() {
780        let inner = BoundedOutOfOrdernessGenerator::new(100);
781        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
782
783        let wm = gen.on_event(1000);
784        assert_eq!(wm, Some(Watermark::new(900)));
785    }
786
787    #[test]
788    fn test_periodic_generator_inner_access() {
789        let inner = BoundedOutOfOrdernessGenerator::new(100);
790        let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
791
792        assert_eq!(gen.inner().max_out_of_orderness(), 100);
793    }
794
795    #[test]
796    fn test_punctuated_generator_predicate() {
797        let mut gen = PunctuatedGenerator::new(|ts| {
798            if ts % 1000 == 0 {
799                Some(Watermark::new(ts))
800            } else {
801                None
802            }
803        });
804
805        assert_eq!(gen.on_event(500), None);
806        assert_eq!(gen.on_event(999), None);
807        assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
808        assert_eq!(gen.on_event(1500), None);
809        assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
810    }
811
812    #[test]
813    fn test_punctuated_generator_no_regression() {
814        let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
815
816        gen.on_event(2000);
817        let wm = gen.on_event(1000); // Lower watermark
818
819        assert_eq!(wm, None);
820        assert_eq!(gen.current_watermark(), 2000);
821    }
822
823    #[test]
824    fn test_tracker_single_source() {
825        let mut tracker = WatermarkTracker::new(1);
826
827        let wm = tracker.update_source(0, 1000);
828        assert_eq!(wm, Some(Watermark::new(1000)));
829        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
830    }
831
832    #[test]
833    fn test_tracker_multiple_sources() {
834        let mut tracker = WatermarkTracker::new(3);
835
836        // All sources need to report before watermark advances
837        tracker.update_source(0, 1000);
838        tracker.update_source(1, 2000);
839        let wm = tracker.update_source(2, 500);
840
841        assert_eq!(wm, Some(Watermark::new(500))); // Minimum
842    }
843
844    #[test]
845    fn test_tracker_min_watermark() {
846        let mut tracker = WatermarkTracker::new(2);
847
848        tracker.update_source(0, 5000);
849        tracker.update_source(1, 3000);
850
851        assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
852
853        // Source 1 advances
854        tracker.update_source(1, 4000);
855        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
856    }
857
858    #[test]
859    fn test_tracker_idle_source() {
860        let mut tracker = WatermarkTracker::new(2);
861
862        tracker.update_source(0, 5000);
863        tracker.update_source(1, 1000);
864
865        // Source 1 is slow, mark it idle
866        let wm = tracker.mark_idle(1);
867
868        // Now only source 0's watermark counts
869        assert_eq!(wm, Some(Watermark::new(5000)));
870    }
871
872    #[test]
873    fn test_tracker_all_idle() {
874        let mut tracker = WatermarkTracker::new(2);
875
876        tracker.update_source(0, 5000);
877        tracker.update_source(1, 3000);
878
879        tracker.mark_idle(0);
880        let wm = tracker.mark_idle(1);
881
882        // Use max when all idle
883        assert_eq!(wm, Some(Watermark::new(5000)));
884    }
885
886    #[test]
887    fn test_tracker_source_watermark() {
888        let mut tracker = WatermarkTracker::new(2);
889
890        tracker.update_source(0, 1000);
891        tracker.update_source(1, 2000);
892
893        assert_eq!(tracker.source_watermark(0), Some(1000));
894        assert_eq!(tracker.source_watermark(1), Some(2000));
895        assert_eq!(tracker.source_watermark(5), None); // Out of bounds
896    }
897
898    #[test]
899    fn test_tracker_active_source_count() {
900        let mut tracker = WatermarkTracker::new(3);
901
902        assert_eq!(tracker.active_source_count(), 3);
903
904        tracker.mark_idle(0);
905        assert_eq!(tracker.active_source_count(), 2);
906
907        tracker.mark_idle(2);
908        assert_eq!(tracker.active_source_count(), 1);
909
910        // Reactivate by updating
911        tracker.update_source(0, 1000);
912        assert_eq!(tracker.active_source_count(), 2);
913    }
914
915    #[test]
916    fn test_tracker_invalid_source() {
917        let mut tracker = WatermarkTracker::new(2);
918
919        let wm = tracker.update_source(5, 1000); // Invalid source ID
920        assert_eq!(wm, None);
921
922        let wm = tracker.mark_idle(5);
923        assert_eq!(wm, None);
924    }
925
926    #[test]
927    fn test_source_provided_fallback() {
928        let mut gen = SourceProvidedGenerator::new(100, false);
929
930        let wm = gen.on_event(1000);
931        assert_eq!(wm, Some(Watermark::new(900))); // Fallback behavior
932    }
933
934    #[test]
935    fn test_source_provided_explicit_watermark() {
936        let mut gen = SourceProvidedGenerator::new(100, true);
937
938        let wm = gen.on_source_watermark(500);
939        assert_eq!(wm, Some(Watermark::new(500)));
940        assert_eq!(gen.current_watermark(), 500);
941    }
942
943    // --- advance_watermark() tests ---
944
945    #[test]
946    fn test_advance_watermark_bounded_generator() {
947        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
948
949        // Advance from initial state
950        let wm = gen.advance_watermark(500);
951        assert_eq!(wm, Some(Watermark::new(500)));
952        assert_eq!(gen.current_watermark(), 500);
953
954        // Advance further
955        let wm = gen.advance_watermark(800);
956        assert_eq!(wm, Some(Watermark::new(800)));
957        assert_eq!(gen.current_watermark(), 800);
958
959        // No regression
960        let wm = gen.advance_watermark(600);
961        assert_eq!(wm, None);
962        assert_eq!(gen.current_watermark(), 800);
963    }
964
965    #[test]
966    fn test_advance_watermark_maintains_invariant() {
967        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
968
969        // Process an event to set initial state
970        gen.on_event(1000); // wm=900, max_ts=1000
971
972        // Advance watermark beyond current
973        gen.advance_watermark(1200);
974        assert_eq!(gen.current_watermark(), 1200);
975
976        // Now on_event at 1250 should work correctly: max_ts should be >= 1300
977        // wm = 1250 - 100 = 1150 which is < 1200, so no new watermark from on_event
978        let wm = gen.on_event(1250);
979        assert_eq!(wm, None);
980        assert_eq!(gen.current_watermark(), 1200);
981
982        // But event at 1400: max_ts = 1400, wm = 1300 > 1200
983        let wm = gen.on_event(1400);
984        assert_eq!(wm, Some(Watermark::new(1300)));
985    }
986
987    #[test]
988    fn test_advance_watermark_ascending_generator() {
989        let mut gen = AscendingTimestampsGenerator::new();
990
991        let wm = gen.advance_watermark(500);
992        assert_eq!(wm, Some(Watermark::new(500)));
993        assert_eq!(gen.current_watermark(), 500);
994
995        // No regression
996        let wm = gen.advance_watermark(300);
997        assert_eq!(wm, None);
998        assert_eq!(gen.current_watermark(), 500);
999
1000        // Further advance
1001        let wm = gen.advance_watermark(1000);
1002        assert_eq!(wm, Some(Watermark::new(1000)));
1003    }
1004
1005    #[test]
1006    fn test_advance_watermark_periodic_generator() {
1007        let inner = BoundedOutOfOrdernessGenerator::new(100);
1008        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
1009
1010        let wm = gen.advance_watermark(500);
1011        assert_eq!(wm, Some(Watermark::new(500)));
1012        assert_eq!(gen.current_watermark(), 500);
1013
1014        // No regression
1015        let wm = gen.advance_watermark(300);
1016        assert_eq!(wm, None);
1017    }
1018
1019    #[test]
1020    fn test_advance_watermark_punctuated_generator() {
1021        let mut gen = PunctuatedGenerator::new(|ts| {
1022            if ts % 1000 == 0 {
1023                Some(Watermark::new(ts))
1024            } else {
1025                None
1026            }
1027        });
1028
1029        // External advance (does not invoke predicate)
1030        let wm = gen.advance_watermark(500);
1031        assert_eq!(wm, Some(Watermark::new(500)));
1032        assert_eq!(gen.current_watermark(), 500);
1033
1034        // No regression
1035        let wm = gen.advance_watermark(200);
1036        assert_eq!(wm, None);
1037    }
1038
1039    #[test]
1040    fn test_advance_watermark_source_provided_generator() {
1041        let mut gen = SourceProvidedGenerator::new(100, true);
1042
1043        let wm = gen.advance_watermark(500);
1044        assert_eq!(wm, Some(Watermark::new(500)));
1045        assert_eq!(gen.current_watermark(), 500);
1046
1047        // No regression
1048        let wm = gen.advance_watermark(300);
1049        assert_eq!(wm, None);
1050    }
1051
1052    // --- ProcessingTimeGenerator tests ---
1053
1054    #[test]
1055    fn test_processing_time_generator_ignores_events() {
1056        let mut gen = ProcessingTimeGenerator::new();
1057        assert_eq!(gen.on_event(1000), None);
1058        assert_eq!(gen.on_event(2000), None);
1059        assert_eq!(gen.current_watermark(), i64::MIN);
1060    }
1061
1062    #[test]
1063    fn test_processing_time_generator_periodic() {
1064        let mut gen = ProcessingTimeGenerator::new();
1065        let wm = gen.on_periodic();
1066        assert!(wm.is_some());
1067        let ts = wm.unwrap().timestamp();
1068        // Should be a reasonable timestamp (after 2020-01-01)
1069        assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
1070    }
1071
1072    #[test]
1073    fn test_processing_time_generator_advance_watermark() {
1074        let mut gen = ProcessingTimeGenerator::new();
1075
1076        let wm = gen.advance_watermark(500);
1077        assert_eq!(wm, Some(Watermark::new(500)));
1078        assert_eq!(gen.current_watermark(), 500);
1079
1080        // No regression
1081        let wm = gen.advance_watermark(300);
1082        assert_eq!(wm, None);
1083        assert_eq!(gen.current_watermark(), 500);
1084
1085        // Further advance
1086        let wm = gen.advance_watermark(1000);
1087        assert_eq!(wm, Some(Watermark::new(1000)));
1088    }
1089
1090    #[test]
1091    fn test_processing_time_generator_default() {
1092        let gen = ProcessingTimeGenerator::default();
1093        assert_eq!(gen.current_watermark(), i64::MIN);
1094    }
1095}