Skip to main content

laminar_core/time/
watermark.rs

1//! # Watermark Generation and Tracking
2//!
3//! Watermarks indicate event-time progress through a streaming system. They are assertions
4//! that no events with timestamps earlier than the watermark are expected to arrive.
5//!
6//! ## Watermark Generation Strategies
7//!
8//! - [`BoundedOutOfOrdernessGenerator`]: Allows events to be late by a fixed duration
9//! - [`AscendingTimestampsGenerator`]: Assumes strictly increasing timestamps (no lateness)
10//! - [`PeriodicGenerator`]: Emits watermarks at fixed wall-clock intervals
11//! - [`PunctuatedGenerator`]: Emits watermarks based on special marker events
12//!
13//! ## Multi-Source Alignment
14//!
15//! When processing multiple input streams (e.g., joins), use [`WatermarkTracker`] to
16//! track the minimum watermark across all sources.
17//!
18//! ## Idle Source Handling
19//!
20//! Sources that stop producing events can block watermark progress. The
21//! [`IdleSourceDetector`] marks sources as idle after a configurable timeout.
22//!
23//! # Example
24//!
25//! ```rust
26//! use laminar_core::time::{
27//!     WatermarkGenerator, BoundedOutOfOrdernessGenerator,
28//!     WatermarkTracker, Watermark,
29//! };
30//!
31//! // Single source with bounded out-of-orderness
32//! let mut generator = BoundedOutOfOrdernessGenerator::new(1000); // 1 second
33//! let wm = generator.on_event(5000);
34//! assert_eq!(wm, Some(Watermark::new(4000)));
35//!
36//! // Multi-source tracking
37//! let mut tracker = WatermarkTracker::new(2);
38//! tracker.update_source(0, 5000);
39//! tracker.update_source(1, 3000);
40//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
41//! ```
42
43use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
44
45use super::Watermark;
46
47/// Trait for generating watermarks from event timestamps.
48///
49/// Implementations track observed timestamps and produce watermarks that indicate
50/// event-time progress. The watermark is an assertion that no events with timestamps
51/// earlier than the watermark are expected.
52pub trait WatermarkGenerator: Send {
53    /// Process an event timestamp and potentially emit a new watermark.
54    ///
55    /// Called for each event processed. Returns `Some(watermark)` if the watermark
56    /// should advance, `None` otherwise.
57    fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
58
59    /// Called periodically to emit watermarks based on wall-clock time.
60    ///
61    /// Useful for generating watermarks even when no events are arriving.
62    fn on_periodic(&mut self) -> Option<Watermark>;
63
64    /// Returns the current watermark value without advancing it.
65    fn current_watermark(&self) -> i64;
66
67    /// Advances the watermark to at least the given timestamp from an external source.
68    ///
69    /// Called when the source provides an explicit watermark (e.g., `Source::watermark()`).
70    /// Returns `Some(Watermark)` if the watermark advanced, `None` if the timestamp
71    /// was not higher than the current watermark.
72    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
73}
74
75/// Watermark generator with bounded out-of-orderness.
76///
77/// Allows events to arrive out of order by up to `max_out_of_orderness` milliseconds.
78/// The watermark is always `max_timestamp_seen - max_out_of_orderness`.
79///
80/// # Example
81///
82/// ```rust
83/// use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};
84///
85/// let mut gen = BoundedOutOfOrdernessGenerator::new(100); // 100ms lateness allowed
86///
87/// // First event at t=1000
88/// assert_eq!(gen.on_event(1000), Some(Watermark::new(900)));
89///
90/// // Out-of-order event at t=800 - no watermark advance
91/// assert_eq!(gen.on_event(800), None);
92///
93/// // New max at t=1200
94/// assert_eq!(gen.on_event(1200), Some(Watermark::new(1100)));
95/// ```
96pub struct BoundedOutOfOrdernessGenerator {
97    max_out_of_orderness: i64,
98    current_max_timestamp: i64,
99    current_watermark: i64,
100}
101
102impl BoundedOutOfOrdernessGenerator {
103    /// Creates a new generator with the specified maximum out-of-orderness.
104    ///
105    /// # Arguments
106    ///
107    /// * `max_out_of_orderness` - Maximum allowed lateness in milliseconds
108    #[must_use]
109    pub fn new(max_out_of_orderness: i64) -> Self {
110        Self {
111            max_out_of_orderness,
112            current_max_timestamp: i64::MIN,
113            current_watermark: i64::MIN,
114        }
115    }
116
117    /// Creates a new generator from a `Duration`.
118    #[must_use]
119    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
120    pub fn from_duration(max_out_of_orderness: Duration) -> Self {
121        Self::new(max_out_of_orderness.as_millis() as i64)
122    }
123
124    /// Returns the maximum out-of-orderness in milliseconds.
125    #[must_use]
126    pub fn max_out_of_orderness(&self) -> i64 {
127        self.max_out_of_orderness
128    }
129}
130
131impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
132    #[inline]
133    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
134        if timestamp > self.current_max_timestamp {
135            self.current_max_timestamp = timestamp;
136            let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
137            if new_watermark > self.current_watermark {
138                self.current_watermark = new_watermark;
139                return Some(Watermark::new(new_watermark));
140            }
141        }
142        None
143    }
144
145    #[inline]
146    fn on_periodic(&mut self) -> Option<Watermark> {
147        // Bounded out-of-orderness doesn't emit periodic watermarks
148        None
149    }
150
151    #[inline]
152    fn current_watermark(&self) -> i64 {
153        self.current_watermark
154    }
155
156    #[inline]
157    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
158        if timestamp > self.current_watermark {
159            self.current_watermark = timestamp;
160            // Maintain invariant: current_max_timestamp >= current_watermark + max_out_of_orderness
161            let min_max = timestamp.saturating_add(self.max_out_of_orderness);
162            if min_max > self.current_max_timestamp {
163                self.current_max_timestamp = min_max;
164            }
165            Some(Watermark::new(timestamp))
166        } else {
167            None
168        }
169    }
170}
171
172/// Watermark generator for strictly ascending timestamps.
173///
174/// Assumes events arrive in timestamp order with no lateness. The watermark
175/// equals the current timestamp. Use this for sources that guarantee ordering.
176///
177/// # Example
178///
179/// ```rust
180/// use laminar_core::time::{AscendingTimestampsGenerator, WatermarkGenerator, Watermark};
181///
182/// let mut gen = AscendingTimestampsGenerator::new();
183/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
184/// assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
185/// ```
186#[derive(Debug, Default)]
187pub struct AscendingTimestampsGenerator {
188    current_watermark: i64,
189}
190
191impl AscendingTimestampsGenerator {
192    /// Creates a new ascending timestamps generator.
193    #[must_use]
194    pub fn new() -> Self {
195        Self {
196            current_watermark: i64::MIN,
197        }
198    }
199}
200
201impl WatermarkGenerator for AscendingTimestampsGenerator {
202    #[inline]
203    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
204        if timestamp > self.current_watermark {
205            self.current_watermark = timestamp;
206            Some(Watermark::new(timestamp))
207        } else {
208            None
209        }
210    }
211
212    #[inline]
213    fn on_periodic(&mut self) -> Option<Watermark> {
214        None
215    }
216
217    #[inline]
218    fn current_watermark(&self) -> i64 {
219        self.current_watermark
220    }
221
222    #[inline]
223    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
224        if timestamp > self.current_watermark {
225            self.current_watermark = timestamp;
226            Some(Watermark::new(timestamp))
227        } else {
228            None
229        }
230    }
231}
232
233/// Periodic watermark generator that emits at fixed wall-clock intervals.
234///
235/// Wraps another generator and emits watermarks periodically even when no
236/// events are arriving. Useful for handling idle sources and ensuring
237/// time-based windows eventually trigger.
238///
239/// # Example
240///
241/// ```rust,no_run
242/// use laminar_core::time::{
243///     PeriodicGenerator, BoundedOutOfOrdernessGenerator, WatermarkGenerator,
244/// };
245/// use std::time::Duration;
246///
247/// let inner = BoundedOutOfOrdernessGenerator::new(100);
248/// let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(500));
249///
250/// // First event
251/// gen.on_event(1000);
252///
253/// // Later, periodic check may emit watermark
254/// // (depends on wall-clock time elapsed)
255/// let wm = gen.on_periodic();
256/// ```
257pub struct PeriodicGenerator<G: WatermarkGenerator> {
258    inner: G,
259    period: Duration,
260    last_emit_time: Instant,
261    last_emitted_watermark: i64,
262}
263
264impl<G: WatermarkGenerator> PeriodicGenerator<G> {
265    /// Creates a new periodic generator wrapping another generator.
266    ///
267    /// # Arguments
268    ///
269    /// * `inner` - The underlying watermark generator
270    /// * `period` - How often to emit watermarks (wall-clock time)
271    #[must_use]
272    pub fn new(inner: G, period: Duration) -> Self {
273        Self {
274            inner,
275            period,
276            last_emit_time: Instant::now(),
277            last_emitted_watermark: i64::MIN,
278        }
279    }
280
281    /// Returns a reference to the inner generator.
282    #[must_use]
283    pub fn inner(&self) -> &G {
284        &self.inner
285    }
286
287    /// Returns a mutable reference to the inner generator.
288    pub fn inner_mut(&mut self) -> &mut G {
289        &mut self.inner
290    }
291}
292
293impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
294    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
295        let wm = self.inner.on_event(timestamp);
296        if let Some(ref w) = wm {
297            self.last_emitted_watermark = w.timestamp();
298            self.last_emit_time = Instant::now();
299        }
300        wm
301    }
302
303    fn on_periodic(&mut self) -> Option<Watermark> {
304        // Check if enough wall-clock time has passed
305        if self.last_emit_time.elapsed() >= self.period {
306            let current = self.inner.current_watermark();
307            if current > self.last_emitted_watermark {
308                self.last_emitted_watermark = current;
309                self.last_emit_time = Instant::now();
310                return Some(Watermark::new(current));
311            }
312            self.last_emit_time = Instant::now();
313        }
314        None
315    }
316
317    fn current_watermark(&self) -> i64 {
318        self.inner.current_watermark()
319    }
320
321    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
322        let wm = self.inner.advance_watermark(timestamp);
323        if let Some(ref w) = wm {
324            self.last_emitted_watermark = w.timestamp();
325            self.last_emit_time = Instant::now();
326        }
327        wm
328    }
329}
330
331/// Punctuated watermark generator that emits based on special events.
332///
333/// Uses a predicate function to identify watermark-carrying events. When the
334/// predicate returns `Some(watermark)`, that watermark is emitted.
335///
336/// # Example
337///
338/// ```rust
339/// use laminar_core::time::{PunctuatedGenerator, WatermarkGenerator, Watermark};
340///
341/// // Emit watermark on every 1000ms boundary
342/// let mut gen = PunctuatedGenerator::new(|ts| {
343///     if ts % 1000 == 0 {
344///         Some(Watermark::new(ts))
345///     } else {
346///         None
347///     }
348/// });
349///
350/// assert_eq!(gen.on_event(999), None);
351/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
352/// ```
353pub struct PunctuatedGenerator<F>
354where
355    F: Fn(i64) -> Option<Watermark> + Send,
356{
357    predicate: F,
358    current_watermark: i64,
359}
360
361impl<F> PunctuatedGenerator<F>
362where
363    F: Fn(i64) -> Option<Watermark> + Send,
364{
365    /// Creates a new punctuated generator with the given predicate.
366    ///
367    /// # Arguments
368    ///
369    /// * `predicate` - Function that returns `Some(Watermark)` for watermark events
370    #[must_use]
371    pub fn new(predicate: F) -> Self {
372        Self {
373            predicate,
374            current_watermark: i64::MIN,
375        }
376    }
377}
378
379impl<F> WatermarkGenerator for PunctuatedGenerator<F>
380where
381    F: Fn(i64) -> Option<Watermark> + Send,
382{
383    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
384        if let Some(wm) = (self.predicate)(timestamp) {
385            if wm.timestamp() > self.current_watermark {
386                self.current_watermark = wm.timestamp();
387                return Some(wm);
388            }
389        }
390        None
391    }
392
393    fn on_periodic(&mut self) -> Option<Watermark> {
394        None
395    }
396
397    fn current_watermark(&self) -> i64 {
398        self.current_watermark
399    }
400
401    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
402        if timestamp > self.current_watermark {
403            self.current_watermark = timestamp;
404            Some(Watermark::new(timestamp))
405        } else {
406            None
407        }
408    }
409}
410
411/// Tracks watermarks across multiple input sources.
412///
413/// For operators with multiple inputs (e.g., joins, unions), the combined
414/// watermark is the minimum across all sources. This ensures no late events
415/// from any source are missed.
416///
417/// # Example
418///
419/// ```rust
420/// use laminar_core::time::{WatermarkTracker, Watermark};
421///
422/// let mut tracker = WatermarkTracker::new(3); // 3 sources
423///
424/// // Source 0 advances to 1000
425/// let wm = tracker.update_source(0, 1000);
426/// assert_eq!(wm, None); // Other sources still at MIN
427///
428/// // Source 1 advances to 2000
429/// tracker.update_source(1, 2000);
430///
431/// // Source 2 advances to 500
432/// let wm = tracker.update_source(2, 500);
433/// assert_eq!(wm, Some(Watermark::new(500))); // Min of all sources
434/// ```
435#[derive(Debug)]
436pub struct WatermarkTracker {
437    /// Watermark for each source
438    source_watermarks: Vec<i64>,
439    /// Combined minimum watermark
440    combined_watermark: i64,
441    /// Idle status for each source
442    idle_sources: Vec<bool>,
443    /// Last activity time for each source
444    last_activity: Vec<Instant>,
445    /// Idle timeout duration
446    idle_timeout: Duration,
447}
448
449impl WatermarkTracker {
450    /// Creates a new tracker for the specified number of sources.
451    #[must_use]
452    pub fn new(num_sources: usize) -> Self {
453        Self {
454            source_watermarks: vec![i64::MIN; num_sources],
455            combined_watermark: i64::MIN,
456            idle_sources: vec![false; num_sources],
457            last_activity: vec![Instant::now(); num_sources],
458            idle_timeout: Duration::from_secs(30), // Default 30 second idle timeout
459        }
460    }
461
462    /// Creates a new tracker with a custom idle timeout.
463    #[must_use]
464    pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
465        Self {
466            source_watermarks: vec![i64::MIN; num_sources],
467            combined_watermark: i64::MIN,
468            idle_sources: vec![false; num_sources],
469            last_activity: vec![Instant::now(); num_sources],
470            idle_timeout,
471        }
472    }
473
474    /// Updates the watermark for a specific source.
475    ///
476    /// Returns `Some(Watermark)` if the combined watermark advances.
477    pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
478        if source_id >= self.source_watermarks.len() {
479            return None;
480        }
481
482        // Mark source as active
483        self.idle_sources[source_id] = false;
484        self.last_activity[source_id] = Instant::now();
485
486        // Update source watermark
487        if watermark > self.source_watermarks[source_id] {
488            self.source_watermarks[source_id] = watermark;
489            self.update_combined()
490        } else {
491            None
492        }
493    }
494
495    /// Marks a source as idle, excluding it from watermark calculation.
496    ///
497    /// Idle sources don't hold back the combined watermark.
498    pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
499        if source_id >= self.idle_sources.len() {
500            return None;
501        }
502
503        self.idle_sources[source_id] = true;
504        self.update_combined()
505    }
506
507    /// Checks for sources that have been idle longer than the timeout.
508    ///
509    /// Should be called periodically to detect stalled sources.
510    pub fn check_idle_sources(&mut self) -> Option<Watermark> {
511        let mut any_marked = false;
512        for i in 0..self.idle_sources.len() {
513            if !self.idle_sources[i] && self.last_activity[i].elapsed() >= self.idle_timeout {
514                self.idle_sources[i] = true;
515                any_marked = true;
516            }
517        }
518        if any_marked {
519            self.update_combined()
520        } else {
521            None
522        }
523    }
524
525    /// Returns the current combined watermark.
526    #[must_use]
527    pub fn current_watermark(&self) -> Option<Watermark> {
528        if self.combined_watermark == i64::MIN {
529            None
530        } else {
531            Some(Watermark::new(self.combined_watermark))
532        }
533    }
534
535    /// Returns the watermark for a specific source.
536    #[must_use]
537    pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
538        self.source_watermarks.get(source_id).copied()
539    }
540
541    /// Returns whether a source is marked as idle.
542    #[must_use]
543    pub fn is_idle(&self, source_id: usize) -> bool {
544        self.idle_sources.get(source_id).copied().unwrap_or(false)
545    }
546
547    /// Returns the number of sources being tracked.
548    #[must_use]
549    pub fn num_sources(&self) -> usize {
550        self.source_watermarks.len()
551    }
552
553    /// Returns the number of active (non-idle) sources.
554    #[must_use]
555    pub fn active_source_count(&self) -> usize {
556        self.idle_sources.iter().filter(|&&idle| !idle).count()
557    }
558
559    /// Updates the combined watermark based on all active sources.
560    fn update_combined(&mut self) -> Option<Watermark> {
561        // Calculate minimum across active sources only
562        let mut min_watermark = i64::MAX;
563        let mut has_active = false;
564
565        for (i, &wm) in self.source_watermarks.iter().enumerate() {
566            if !self.idle_sources[i] {
567                has_active = true;
568                min_watermark = min_watermark.min(wm);
569            }
570        }
571
572        // If all sources are idle, use the max watermark
573        if !has_active {
574            min_watermark = self
575                .source_watermarks
576                .iter()
577                .copied()
578                .max()
579                .unwrap_or(i64::MIN);
580        }
581
582        if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
583            self.combined_watermark = min_watermark;
584            Some(Watermark::new(min_watermark))
585        } else {
586            None
587        }
588    }
589}
590
591/// Watermark generator for sources with embedded watermarks.
592///
593/// Some sources (like Kafka with EOS) may provide watermarks directly.
594/// This generator tracks both event timestamps and explicit watermarks.
595pub struct SourceProvidedGenerator {
596    /// Last watermark from the source
597    source_watermark: i64,
598    /// Fallback generator for when source doesn't provide watermarks
599    fallback: BoundedOutOfOrdernessGenerator,
600    /// Whether to use source watermarks when available
601    prefer_source: bool,
602}
603
604impl SourceProvidedGenerator {
605    /// Creates a new source-provided generator.
606    ///
607    /// # Arguments
608    ///
609    /// * `fallback_lateness` - Lateness for fallback bounded generator
610    /// * `prefer_source` - If true, source watermarks take precedence
611    #[must_use]
612    pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
613        Self {
614            source_watermark: i64::MIN,
615            fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
616            prefer_source,
617        }
618    }
619
620    /// Updates the watermark from the source.
621    ///
622    /// Call this when the source provides an explicit watermark.
623    pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
624        if watermark > self.source_watermark {
625            self.source_watermark = watermark;
626            if self.prefer_source || watermark > self.fallback.current_watermark() {
627                return Some(Watermark::new(watermark));
628            }
629        }
630        None
631    }
632}
633
634impl WatermarkGenerator for SourceProvidedGenerator {
635    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
636        let fallback_wm = self.fallback.on_event(timestamp);
637
638        if self.prefer_source {
639            // Only emit if source watermark allows and it's an advancement
640            if self.source_watermark > i64::MIN {
641                return None; // Wait for source watermark
642            }
643        }
644
645        fallback_wm
646    }
647
648    fn on_periodic(&mut self) -> Option<Watermark> {
649        None
650    }
651
652    fn current_watermark(&self) -> i64 {
653        if self.prefer_source && self.source_watermark > i64::MIN {
654            self.source_watermark
655        } else {
656            self.fallback.current_watermark().max(self.source_watermark)
657        }
658    }
659
660    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
661        self.on_source_watermark(timestamp)
662    }
663}
664
665/// Processing-time watermark generator.
666///
667/// Ignores event timestamps entirely and advances the watermark based on
668/// wall-clock time. Use with `PROCTIME()` sources where events are processed
669/// in arrival order without event-time semantics.
670///
671/// - [`on_event`](WatermarkGenerator::on_event) returns `None` (event timestamps are ignored)
672/// - [`on_periodic`](WatermarkGenerator::on_periodic) returns `Some(Watermark::new(now_millis()))`
673///
674/// # Example
675///
676/// ```rust
677/// use laminar_core::time::{ProcessingTimeGenerator, WatermarkGenerator};
678///
679/// let mut gen = ProcessingTimeGenerator::new();
680/// // on_event ignores the timestamp
681/// assert_eq!(gen.on_event(1000), None);
682/// // on_periodic returns wall-clock time
683/// let wm = gen.on_periodic();
684/// assert!(wm.is_some());
685/// ```
686pub struct ProcessingTimeGenerator {
687    current_watermark: i64,
688}
689
690impl ProcessingTimeGenerator {
691    /// Creates a new processing-time watermark generator.
692    #[must_use]
693    pub fn new() -> Self {
694        Self {
695            current_watermark: i64::MIN,
696        }
697    }
698
699    /// Returns the current wall-clock time in milliseconds since Unix epoch.
700    #[allow(clippy::cast_possible_truncation)]
701    fn now_millis() -> i64 {
702        SystemTime::now()
703            .duration_since(UNIX_EPOCH)
704            .unwrap_or(Duration::ZERO)
705            .as_millis() as i64
706    }
707}
708
709impl Default for ProcessingTimeGenerator {
710    fn default() -> Self {
711        Self::new()
712    }
713}
714
715impl WatermarkGenerator for ProcessingTimeGenerator {
716    #[inline]
717    fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
718        // Processing-time mode ignores event timestamps
719        None
720    }
721
722    #[inline]
723    fn on_periodic(&mut self) -> Option<Watermark> {
724        let now = Self::now_millis();
725        if now > self.current_watermark {
726            self.current_watermark = now;
727            Some(Watermark::new(now))
728        } else {
729            None
730        }
731    }
732
733    #[inline]
734    fn current_watermark(&self) -> i64 {
735        self.current_watermark
736    }
737
738    #[inline]
739    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
740        if timestamp > self.current_watermark {
741            self.current_watermark = timestamp;
742            Some(Watermark::new(timestamp))
743        } else {
744            None
745        }
746    }
747}
748
749/// Metrics for watermark tracking.
750#[derive(Debug, Clone, Default)]
751pub struct WatermarkMetrics {
752    /// Current watermark timestamp
753    pub current_watermark: i64,
754    /// Maximum observed event timestamp
755    pub max_event_timestamp: i64,
756    /// Number of watermark emissions
757    pub watermarks_emitted: u64,
758    /// Number of late events detected
759    pub late_events: u64,
760}
761
762impl WatermarkMetrics {
763    /// Creates new metrics.
764    #[must_use]
765    pub fn new() -> Self {
766        Self::default()
767    }
768
769    /// Returns the watermark lag (difference between max event time and watermark).
770    #[must_use]
771    pub fn lag(&self) -> i64 {
772        self.max_event_timestamp
773            .saturating_sub(self.current_watermark)
774    }
775}
776
777/// Watermark generator wrapper that collects metrics.
778pub struct MeteredGenerator<G: WatermarkGenerator> {
779    inner: G,
780    metrics: WatermarkMetrics,
781}
782
783impl<G: WatermarkGenerator> MeteredGenerator<G> {
784    /// Creates a new metered generator.
785    #[must_use]
786    pub fn new(inner: G) -> Self {
787        Self {
788            inner,
789            metrics: WatermarkMetrics::new(),
790        }
791    }
792
793    /// Returns the current metrics.
794    #[must_use]
795    pub fn metrics(&self) -> &WatermarkMetrics {
796        &self.metrics
797    }
798
799    /// Returns a mutable reference to the inner generator.
800    pub fn inner_mut(&mut self) -> &mut G {
801        &mut self.inner
802    }
803
804    /// Records a late event.
805    pub fn record_late_event(&mut self) {
806        self.metrics.late_events += 1;
807    }
808}
809
810impl<G: WatermarkGenerator> WatermarkGenerator for MeteredGenerator<G> {
811    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
812        // Track max event timestamp
813        if timestamp > self.metrics.max_event_timestamp {
814            self.metrics.max_event_timestamp = timestamp;
815        }
816
817        let wm = self.inner.on_event(timestamp);
818        if let Some(ref w) = wm {
819            self.metrics.current_watermark = w.timestamp();
820            self.metrics.watermarks_emitted += 1;
821        }
822        wm
823    }
824
825    fn on_periodic(&mut self) -> Option<Watermark> {
826        let wm = self.inner.on_periodic();
827        if let Some(ref w) = wm {
828            self.metrics.current_watermark = w.timestamp();
829            self.metrics.watermarks_emitted += 1;
830        }
831        wm
832    }
833
834    fn current_watermark(&self) -> i64 {
835        self.inner.current_watermark()
836    }
837
838    fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
839        let wm = self.inner.advance_watermark(timestamp);
840        if let Some(ref w) = wm {
841            self.metrics.current_watermark = w.timestamp();
842            self.metrics.watermarks_emitted += 1;
843        }
844        wm
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851
852    #[test]
853    fn test_bounded_generator_first_event() {
854        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
855        let wm = gen.on_event(1000);
856        assert_eq!(wm, Some(Watermark::new(900)));
857        assert_eq!(gen.current_watermark(), 900);
858    }
859
860    #[test]
861    fn test_bounded_generator_out_of_order() {
862        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
863
864        // First event
865        gen.on_event(1000);
866
867        // Out of order - should not emit new watermark
868        let wm = gen.on_event(800);
869        assert_eq!(wm, None);
870        assert_eq!(gen.current_watermark(), 900); // Still at 1000 - 100
871    }
872
873    #[test]
874    fn test_bounded_generator_advancement() {
875        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
876
877        gen.on_event(1000);
878        let wm = gen.on_event(1200);
879
880        assert_eq!(wm, Some(Watermark::new(1100)));
881    }
882
883    #[test]
884    fn test_bounded_generator_from_duration() {
885        let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
886        assert_eq!(gen.max_out_of_orderness(), 5000);
887    }
888
889    #[test]
890    fn test_bounded_generator_no_periodic() {
891        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
892        assert_eq!(gen.on_periodic(), None);
893    }
894
895    #[test]
896    fn test_ascending_generator_advances_on_each_event() {
897        let mut gen = AscendingTimestampsGenerator::new();
898
899        let wm1 = gen.on_event(1000);
900        assert_eq!(wm1, Some(Watermark::new(1000)));
901
902        let wm2 = gen.on_event(2000);
903        assert_eq!(wm2, Some(Watermark::new(2000)));
904    }
905
906    #[test]
907    fn test_ascending_generator_ignores_backwards() {
908        let mut gen = AscendingTimestampsGenerator::new();
909
910        gen.on_event(2000);
911        let wm = gen.on_event(1000); // Earlier timestamp
912
913        assert_eq!(wm, None);
914        assert_eq!(gen.current_watermark(), 2000);
915    }
916
917    #[test]
918    fn test_periodic_generator_passes_through() {
919        let inner = BoundedOutOfOrdernessGenerator::new(100);
920        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
921
922        let wm = gen.on_event(1000);
923        assert_eq!(wm, Some(Watermark::new(900)));
924    }
925
926    #[test]
927    fn test_periodic_generator_inner_access() {
928        let inner = BoundedOutOfOrdernessGenerator::new(100);
929        let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
930
931        assert_eq!(gen.inner().max_out_of_orderness(), 100);
932    }
933
934    #[test]
935    fn test_punctuated_generator_predicate() {
936        let mut gen = PunctuatedGenerator::new(|ts| {
937            if ts % 1000 == 0 {
938                Some(Watermark::new(ts))
939            } else {
940                None
941            }
942        });
943
944        assert_eq!(gen.on_event(500), None);
945        assert_eq!(gen.on_event(999), None);
946        assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
947        assert_eq!(gen.on_event(1500), None);
948        assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
949    }
950
951    #[test]
952    fn test_punctuated_generator_no_regression() {
953        let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
954
955        gen.on_event(2000);
956        let wm = gen.on_event(1000); // Lower watermark
957
958        assert_eq!(wm, None);
959        assert_eq!(gen.current_watermark(), 2000);
960    }
961
962    #[test]
963    fn test_tracker_single_source() {
964        let mut tracker = WatermarkTracker::new(1);
965
966        let wm = tracker.update_source(0, 1000);
967        assert_eq!(wm, Some(Watermark::new(1000)));
968        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
969    }
970
971    #[test]
972    fn test_tracker_multiple_sources() {
973        let mut tracker = WatermarkTracker::new(3);
974
975        // All sources need to report before watermark advances
976        tracker.update_source(0, 1000);
977        tracker.update_source(1, 2000);
978        let wm = tracker.update_source(2, 500);
979
980        assert_eq!(wm, Some(Watermark::new(500))); // Minimum
981    }
982
983    #[test]
984    fn test_tracker_min_watermark() {
985        let mut tracker = WatermarkTracker::new(2);
986
987        tracker.update_source(0, 5000);
988        tracker.update_source(1, 3000);
989
990        assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
991
992        // Source 1 advances
993        tracker.update_source(1, 4000);
994        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
995    }
996
997    #[test]
998    fn test_tracker_idle_source() {
999        let mut tracker = WatermarkTracker::new(2);
1000
1001        tracker.update_source(0, 5000);
1002        tracker.update_source(1, 1000);
1003
1004        // Source 1 is slow, mark it idle
1005        let wm = tracker.mark_idle(1);
1006
1007        // Now only source 0's watermark counts
1008        assert_eq!(wm, Some(Watermark::new(5000)));
1009    }
1010
1011    #[test]
1012    fn test_tracker_all_idle() {
1013        let mut tracker = WatermarkTracker::new(2);
1014
1015        tracker.update_source(0, 5000);
1016        tracker.update_source(1, 3000);
1017
1018        tracker.mark_idle(0);
1019        let wm = tracker.mark_idle(1);
1020
1021        // Use max when all idle
1022        assert_eq!(wm, Some(Watermark::new(5000)));
1023    }
1024
1025    #[test]
1026    fn test_tracker_source_watermark() {
1027        let mut tracker = WatermarkTracker::new(2);
1028
1029        tracker.update_source(0, 1000);
1030        tracker.update_source(1, 2000);
1031
1032        assert_eq!(tracker.source_watermark(0), Some(1000));
1033        assert_eq!(tracker.source_watermark(1), Some(2000));
1034        assert_eq!(tracker.source_watermark(5), None); // Out of bounds
1035    }
1036
1037    #[test]
1038    fn test_tracker_active_source_count() {
1039        let mut tracker = WatermarkTracker::new(3);
1040
1041        assert_eq!(tracker.active_source_count(), 3);
1042
1043        tracker.mark_idle(0);
1044        assert_eq!(tracker.active_source_count(), 2);
1045
1046        tracker.mark_idle(2);
1047        assert_eq!(tracker.active_source_count(), 1);
1048
1049        // Reactivate by updating
1050        tracker.update_source(0, 1000);
1051        assert_eq!(tracker.active_source_count(), 2);
1052    }
1053
1054    #[test]
1055    fn test_tracker_invalid_source() {
1056        let mut tracker = WatermarkTracker::new(2);
1057
1058        let wm = tracker.update_source(5, 1000); // Invalid source ID
1059        assert_eq!(wm, None);
1060
1061        let wm = tracker.mark_idle(5);
1062        assert_eq!(wm, None);
1063    }
1064
1065    #[test]
1066    fn test_source_provided_fallback() {
1067        let mut gen = SourceProvidedGenerator::new(100, false);
1068
1069        let wm = gen.on_event(1000);
1070        assert_eq!(wm, Some(Watermark::new(900))); // Fallback behavior
1071    }
1072
1073    #[test]
1074    fn test_source_provided_explicit_watermark() {
1075        let mut gen = SourceProvidedGenerator::new(100, true);
1076
1077        let wm = gen.on_source_watermark(500);
1078        assert_eq!(wm, Some(Watermark::new(500)));
1079        assert_eq!(gen.current_watermark(), 500);
1080    }
1081
1082    #[test]
1083    fn test_metered_generator_tracks_metrics() {
1084        let inner = BoundedOutOfOrdernessGenerator::new(100);
1085        let mut gen = MeteredGenerator::new(inner);
1086
1087        gen.on_event(1000);
1088        gen.on_event(2000);
1089        gen.on_event(1500); // Out of order
1090
1091        let metrics = gen.metrics();
1092        assert_eq!(metrics.max_event_timestamp, 2000);
1093        assert_eq!(metrics.watermarks_emitted, 2); // 1000 and 2000 advanced
1094    }
1095
1096    #[test]
1097    fn test_metered_generator_lag() {
1098        let inner = BoundedOutOfOrdernessGenerator::new(100);
1099        let mut gen = MeteredGenerator::new(inner);
1100
1101        gen.on_event(1000);
1102
1103        let metrics = gen.metrics();
1104        assert_eq!(metrics.lag(), 100); // max_event (1000) - watermark (900)
1105    }
1106
1107    #[test]
1108    fn test_metered_generator_late_events() {
1109        let inner = BoundedOutOfOrdernessGenerator::new(100);
1110        let mut gen = MeteredGenerator::new(inner);
1111
1112        gen.record_late_event();
1113        gen.record_late_event();
1114
1115        assert_eq!(gen.metrics().late_events, 2);
1116    }
1117
1118    #[test]
1119    fn test_watermark_metrics_default() {
1120        let metrics = WatermarkMetrics::new();
1121        assert_eq!(metrics.current_watermark, 0);
1122        assert_eq!(metrics.max_event_timestamp, 0);
1123        assert_eq!(metrics.watermarks_emitted, 0);
1124        assert_eq!(metrics.late_events, 0);
1125    }
1126
1127    // --- advance_watermark() tests ---
1128
1129    #[test]
1130    fn test_advance_watermark_bounded_generator() {
1131        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1132
1133        // Advance from initial state
1134        let wm = gen.advance_watermark(500);
1135        assert_eq!(wm, Some(Watermark::new(500)));
1136        assert_eq!(gen.current_watermark(), 500);
1137
1138        // Advance further
1139        let wm = gen.advance_watermark(800);
1140        assert_eq!(wm, Some(Watermark::new(800)));
1141        assert_eq!(gen.current_watermark(), 800);
1142
1143        // No regression
1144        let wm = gen.advance_watermark(600);
1145        assert_eq!(wm, None);
1146        assert_eq!(gen.current_watermark(), 800);
1147    }
1148
1149    #[test]
1150    fn test_advance_watermark_maintains_invariant() {
1151        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
1152
1153        // Process an event to set initial state
1154        gen.on_event(1000); // wm=900, max_ts=1000
1155
1156        // Advance watermark beyond current
1157        gen.advance_watermark(1200);
1158        assert_eq!(gen.current_watermark(), 1200);
1159
1160        // Now on_event at 1250 should work correctly: max_ts should be >= 1300
1161        // wm = 1250 - 100 = 1150 which is < 1200, so no new watermark from on_event
1162        let wm = gen.on_event(1250);
1163        assert_eq!(wm, None);
1164        assert_eq!(gen.current_watermark(), 1200);
1165
1166        // But event at 1400: max_ts = 1400, wm = 1300 > 1200
1167        let wm = gen.on_event(1400);
1168        assert_eq!(wm, Some(Watermark::new(1300)));
1169    }
1170
1171    #[test]
1172    fn test_advance_watermark_ascending_generator() {
1173        let mut gen = AscendingTimestampsGenerator::new();
1174
1175        let wm = gen.advance_watermark(500);
1176        assert_eq!(wm, Some(Watermark::new(500)));
1177        assert_eq!(gen.current_watermark(), 500);
1178
1179        // No regression
1180        let wm = gen.advance_watermark(300);
1181        assert_eq!(wm, None);
1182        assert_eq!(gen.current_watermark(), 500);
1183
1184        // Further advance
1185        let wm = gen.advance_watermark(1000);
1186        assert_eq!(wm, Some(Watermark::new(1000)));
1187    }
1188
1189    #[test]
1190    fn test_advance_watermark_periodic_generator() {
1191        let inner = BoundedOutOfOrdernessGenerator::new(100);
1192        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
1193
1194        let wm = gen.advance_watermark(500);
1195        assert_eq!(wm, Some(Watermark::new(500)));
1196        assert_eq!(gen.current_watermark(), 500);
1197
1198        // No regression
1199        let wm = gen.advance_watermark(300);
1200        assert_eq!(wm, None);
1201    }
1202
1203    #[test]
1204    fn test_advance_watermark_punctuated_generator() {
1205        let mut gen = PunctuatedGenerator::new(|ts| {
1206            if ts % 1000 == 0 {
1207                Some(Watermark::new(ts))
1208            } else {
1209                None
1210            }
1211        });
1212
1213        // External advance (does not invoke predicate)
1214        let wm = gen.advance_watermark(500);
1215        assert_eq!(wm, Some(Watermark::new(500)));
1216        assert_eq!(gen.current_watermark(), 500);
1217
1218        // No regression
1219        let wm = gen.advance_watermark(200);
1220        assert_eq!(wm, None);
1221    }
1222
1223    #[test]
1224    fn test_advance_watermark_source_provided_generator() {
1225        let mut gen = SourceProvidedGenerator::new(100, true);
1226
1227        let wm = gen.advance_watermark(500);
1228        assert_eq!(wm, Some(Watermark::new(500)));
1229        assert_eq!(gen.current_watermark(), 500);
1230
1231        // No regression
1232        let wm = gen.advance_watermark(300);
1233        assert_eq!(wm, None);
1234    }
1235
1236    #[test]
1237    fn test_advance_watermark_metered_generator() {
1238        let inner = BoundedOutOfOrdernessGenerator::new(100);
1239        let mut gen = MeteredGenerator::new(inner);
1240
1241        let wm = gen.advance_watermark(500);
1242        assert_eq!(wm, Some(Watermark::new(500)));
1243        assert_eq!(gen.metrics().current_watermark, 500);
1244        assert_eq!(gen.metrics().watermarks_emitted, 1);
1245
1246        // Advance further
1247        gen.advance_watermark(800);
1248        assert_eq!(gen.metrics().current_watermark, 800);
1249        assert_eq!(gen.metrics().watermarks_emitted, 2);
1250
1251        // No regression doesn't bump metrics
1252        gen.advance_watermark(600);
1253        assert_eq!(gen.metrics().watermarks_emitted, 2);
1254    }
1255
1256    // --- ProcessingTimeGenerator tests ---
1257
1258    #[test]
1259    fn test_processing_time_generator_ignores_events() {
1260        let mut gen = ProcessingTimeGenerator::new();
1261        assert_eq!(gen.on_event(1000), None);
1262        assert_eq!(gen.on_event(2000), None);
1263        assert_eq!(gen.current_watermark(), i64::MIN);
1264    }
1265
1266    #[test]
1267    fn test_processing_time_generator_periodic() {
1268        let mut gen = ProcessingTimeGenerator::new();
1269        let wm = gen.on_periodic();
1270        assert!(wm.is_some());
1271        let ts = wm.unwrap().timestamp();
1272        // Should be a reasonable timestamp (after 2020-01-01)
1273        assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
1274    }
1275
1276    #[test]
1277    fn test_processing_time_generator_advance_watermark() {
1278        let mut gen = ProcessingTimeGenerator::new();
1279
1280        let wm = gen.advance_watermark(500);
1281        assert_eq!(wm, Some(Watermark::new(500)));
1282        assert_eq!(gen.current_watermark(), 500);
1283
1284        // No regression
1285        let wm = gen.advance_watermark(300);
1286        assert_eq!(wm, None);
1287        assert_eq!(gen.current_watermark(), 500);
1288
1289        // Further advance
1290        let wm = gen.advance_watermark(1000);
1291        assert_eq!(wm, Some(Watermark::new(1000)));
1292    }
1293
1294    #[test]
1295    fn test_processing_time_generator_default() {
1296        let gen = ProcessingTimeGenerator::default();
1297        assert_eq!(gen.current_watermark(), i64::MIN);
1298    }
1299}