Skip to main content

laminar_core/time/
mod.rs

1//! Event time, watermarks, and timer management.
2mod cast;
3mod duration_str;
4mod event_time;
5mod filter;
6mod watermark;
7
8pub use cast::{cast_to_millis_array, CastError};
9pub use duration_str::parse_duration_str;
10pub use event_time::{EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField};
11
12pub use filter::{filter_batch_by_timestamp, ThresholdOp};
13
14pub use watermark::{
15    AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, PeriodicGenerator,
16    ProcessingTimeGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
17    WatermarkTracker,
18};
19
20use smallvec::SmallVec;
21use std::cmp::Ordering;
22use std::collections::BinaryHeap;
23
24/// Timer key type optimized for window IDs (16 bytes).
25///
26/// Uses `SmallVec` to avoid heap allocation for keys up to 16 bytes,
27/// which covers the common case of `WindowId` keys.
28pub type TimerKey = SmallVec<[u8; 16]>;
29
30/// Collection type for fired timers.
31///
32/// Uses `SmallVec` to avoid heap allocation when few timers fire per poll.
33/// Size 8 covers most practical cases where timers fire in small batches.
34pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
35
36/// A watermark indicating event time progress.
37///
38/// Watermarks are monotonically increasing assertions that no events with
39/// timestamps earlier than the watermark will arrive. They are used to:
40///
41/// - Trigger window emissions
42/// - Detect late events
43/// - Coordinate time progress across operators
44///
45/// # Example
46///
47/// ```rust
48/// use laminar_core::time::Watermark;
49///
50/// let watermark = Watermark::new(1000);
51///
52/// // Check if an event is late
53/// assert!(watermark.is_late(999));  // Before watermark
54/// assert!(!watermark.is_late(1000)); // At watermark
55/// assert!(!watermark.is_late(1001)); // After watermark
56/// ```
57#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct Watermark(pub i64);
59
60impl Watermark {
61    /// Creates a new watermark with the given timestamp.
62    #[inline]
63    #[must_use]
64    pub fn new(timestamp: i64) -> Self {
65        Self(timestamp)
66    }
67
68    /// Returns the watermark timestamp in milliseconds.
69    #[inline]
70    #[must_use]
71    pub fn timestamp(&self) -> i64 {
72        self.0
73    }
74
75    /// Checks if an event is late relative to this watermark.
76    ///
77    /// An event is considered late if its timestamp is strictly less than
78    /// the watermark timestamp.
79    #[inline]
80    #[must_use]
81    pub fn is_late(&self, event_time: i64) -> bool {
82        event_time < self.0
83    }
84
85    /// Returns the minimum (earlier) of two watermarks.
86    #[must_use]
87    pub fn min(self, other: Self) -> Self {
88        Self(self.0.min(other.0))
89    }
90
91    /// Returns the maximum (later) of two watermarks.
92    #[must_use]
93    pub fn max(self, other: Self) -> Self {
94        Self(self.0.max(other.0))
95    }
96}
97
98impl Default for Watermark {
99    fn default() -> Self {
100        Self(i64::MIN)
101    }
102}
103
104impl From<i64> for Watermark {
105    fn from(timestamp: i64) -> Self {
106        Self(timestamp)
107    }
108}
109
110impl From<Watermark> for i64 {
111    fn from(watermark: Watermark) -> Self {
112        watermark.0
113    }
114}
115
116/// A timer registration for delayed processing.
117///
118/// Timers are used by operators to schedule future actions, typically for
119/// window triggering or timeouts.
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct TimerRegistration {
122    /// Unique timer ID
123    pub id: u64,
124    /// Scheduled timestamp (event time, in milliseconds)
125    pub timestamp: i64,
126    /// Timer key (for keyed operators).
127    /// Uses `TimerKey` (`SmallVec`) to avoid heap allocation for keys up to 16 bytes.
128    pub key: Option<TimerKey>,
129    /// The index of the operator that registered this timer
130    pub operator_index: Option<usize>,
131}
132
133impl Ord for TimerRegistration {
134    fn cmp(&self, other: &Self) -> Ordering {
135        // Reverse ordering for min-heap behavior (earliest first)
136        match other.timestamp.cmp(&self.timestamp) {
137            Ordering::Equal => other.id.cmp(&self.id),
138            ord => ord,
139        }
140    }
141}
142
143impl PartialOrd for TimerRegistration {
144    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
145        Some(self.cmp(other))
146    }
147}
148
149/// Timer service for scheduling and managing timers.
150///
151/// The timer service maintains a priority queue of timer registrations,
152/// ordered by timestamp. Operators can register timers to be fired at
153/// specific event times.
154///
155/// # Example
156///
157/// ```rust
158/// use laminar_core::time::{TimerService, TimerKey};
159///
160/// let mut service = TimerService::new();
161///
162/// // Register timers at different times
163/// let id1 = service.register_timer(100, None, None);
164/// let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), None);
165///
166/// // Poll for timers that should fire at time 75
167/// let fired = service.poll_timers(75);
168/// assert_eq!(fired.len(), 1);
169/// assert_eq!(fired[0].id, id2); // Timer at t=50 fires first
170/// ```
171// Threshold at which the timer service logs a warning about accumulated timers.
172// This typically indicates a stalled watermark preventing timer firing.
173const TIMER_WARN_THRESHOLD: usize = 100_000;
174
175/// Timer service for scheduling and managing timers.
176///
177/// The timer service maintains a priority queue of timer registrations,
178/// ordered by timestamp. Operators can register timers to be fired at
179/// specific event times.
180pub struct TimerService {
181    timers: BinaryHeap<TimerRegistration>,
182    next_timer_id: u64,
183}
184
185impl TimerService {
186    /// Creates a new timer service.
187    #[must_use]
188    pub fn new() -> Self {
189        Self {
190            timers: BinaryHeap::new(),
191            next_timer_id: 0,
192        }
193    }
194
195    /// Creates a new timer service with pre-allocated capacity.
196    #[must_use]
197    pub fn with_capacity(capacity: usize) -> Self {
198        Self {
199            timers: BinaryHeap::with_capacity(capacity),
200            next_timer_id: 0,
201        }
202    }
203
204    /// Registers a new timer.
205    ///
206    /// Returns the unique timer ID that can be used to cancel the timer.
207    ///
208    /// # Arguments
209    ///
210    /// * `timestamp` - The event time at which the timer should fire
211    /// * `key` - Optional key for keyed operators
212    /// * `operator_index` - Optional index of the operator registering the timer(must match the index in the reactor)
213    pub fn register_timer(
214        &mut self,
215        timestamp: i64,
216        key: Option<TimerKey>,
217        operator_index: Option<usize>,
218    ) -> u64 {
219        let id = self.next_timer_id;
220        self.next_timer_id += 1;
221
222        self.timers.push(TimerRegistration {
223            id,
224            timestamp,
225            key,
226            operator_index,
227        });
228
229        if self.timers.len() == TIMER_WARN_THRESHOLD {
230            tracing::warn!(
231                pending = self.timers.len(),
232                "Timer heap reached {} pending timers — watermark may be stalled",
233                TIMER_WARN_THRESHOLD,
234            );
235        }
236
237        id
238    }
239
240    /// Polls for timers that should fire at or before the given timestamp.
241    ///
242    /// Returns all timers with timestamps <= `current_time`, in order.
243    /// Uses `FiredTimersVec` (`SmallVec`) to avoid heap allocation when few timers fire.
244    ///
245    /// # Panics
246    ///
247    /// This function should not panic under normal circumstances. The internal
248    /// `expect` is only called after verifying the heap is not empty via `peek`.
249    #[inline]
250    pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
251        let mut fired = FiredTimersVec::new();
252
253        while let Some(timer) = self.timers.peek() {
254            if timer.timestamp <= current_time {
255                // SAFETY: We just peeked and confirmed the heap is not empty
256                fired.push(self.timers.pop().expect("heap should not be empty"));
257            } else {
258                break;
259            }
260        }
261
262        fired
263    }
264
265    /// Cancels a timer by ID.
266    ///
267    /// Returns `true` if the timer was found and cancelled.
268    pub fn cancel_timer(&mut self, id: u64) -> bool {
269        let count_before = self.timers.len();
270        self.timers.retain(|t| t.id != id);
271        self.timers.len() < count_before
272    }
273
274    /// Returns the number of pending timers.
275    #[must_use]
276    pub fn pending_count(&self) -> usize {
277        self.timers.len()
278    }
279
280    /// Returns the timestamp of the next timer to fire, if any.
281    #[must_use]
282    pub fn next_timer_timestamp(&self) -> Option<i64> {
283        self.timers.peek().map(|t| t.timestamp)
284    }
285
286    /// Clears all pending timers.
287    pub fn clear(&mut self) {
288        self.timers.clear();
289    }
290}
291
292impl Default for TimerService {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298/// Errors that can occur in time operations.
299#[derive(Debug, thiserror::Error)]
300pub enum TimeError {
301    /// Invalid timestamp value
302    #[error("Invalid timestamp: {0}")]
303    InvalidTimestamp(i64),
304
305    /// Timer not found
306    #[error("Timer not found: {0}")]
307    TimerNotFound(u64),
308
309    /// Watermark regression (going backwards)
310    #[error("Watermark regression: current={current}, new={new}")]
311    WatermarkRegression {
312        /// Current watermark value
313        current: i64,
314        /// Attempted new watermark value
315        new: i64,
316    },
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn test_watermark_creation() {
325        let watermark = Watermark::new(1000);
326        assert_eq!(watermark.timestamp(), 1000);
327    }
328
329    #[test]
330    fn test_watermark_late_detection() {
331        let watermark = Watermark::new(1000);
332        assert!(watermark.is_late(999));
333        assert!(!watermark.is_late(1000));
334        assert!(!watermark.is_late(1001));
335    }
336
337    #[test]
338    fn test_watermark_min_max() {
339        let w1 = Watermark::new(1000);
340        let w2 = Watermark::new(2000);
341
342        assert_eq!(w1.min(w2), Watermark::new(1000));
343        assert_eq!(w1.max(w2), Watermark::new(2000));
344    }
345
346    #[test]
347    fn test_watermark_ordering() {
348        let w1 = Watermark::new(1000);
349        let w2 = Watermark::new(2000);
350
351        assert!(w1 < w2);
352        assert!(w2 > w1);
353        assert_eq!(w1, Watermark::new(1000));
354    }
355
356    #[test]
357    fn test_watermark_conversions() {
358        let wm = Watermark::from(1000i64);
359        assert_eq!(wm.timestamp(), 1000);
360
361        let ts: i64 = wm.into();
362        assert_eq!(ts, 1000);
363    }
364
365    #[test]
366    fn test_watermark_default() {
367        let wm = Watermark::default();
368        assert_eq!(wm.timestamp(), i64::MIN);
369    }
370
371    #[test]
372    fn test_timer_service_creation() {
373        let service = TimerService::new();
374        assert_eq!(service.pending_count(), 0);
375        assert_eq!(service.next_timer_timestamp(), None);
376    }
377
378    #[test]
379    fn test_timer_registration() {
380        let mut service = TimerService::new();
381
382        let id1 = service.register_timer(100, None, None);
383        let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
384
385        assert_eq!(service.pending_count(), 2);
386        assert_ne!(id1, id2);
387    }
388
389    #[test]
390    fn test_timer_poll_order() {
391        let mut service = TimerService::new();
392
393        let id1 = service.register_timer(100, None, None);
394        let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
395        let _id3 = service.register_timer(150, None, None);
396
397        // Poll at time 75 - should get timer at t=50
398        let fired = service.poll_timers(75);
399        assert_eq!(fired.len(), 1);
400        assert_eq!(fired[0].id, id2);
401        assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
402
403        // Poll at time 125 - should get timer at t=100
404        let fired = service.poll_timers(125);
405        assert_eq!(fired.len(), 1);
406        assert_eq!(fired[0].id, id1);
407
408        // Poll at time 200 - should get timer at t=150
409        let fired = service.poll_timers(200);
410        assert_eq!(fired.len(), 1);
411
412        assert_eq!(service.pending_count(), 0);
413    }
414
415    #[test]
416    fn test_timer_poll_multiple() {
417        let mut service = TimerService::new();
418
419        service.register_timer(50, None, None);
420        service.register_timer(75, None, None);
421        service.register_timer(100, None, None);
422
423        // Poll at time 80 - should get timers at t=50 and t=75
424        let fired = service.poll_timers(80);
425        assert_eq!(fired.len(), 2);
426        // Should be in timestamp order
427        assert_eq!(fired[0].timestamp, 50);
428        assert_eq!(fired[1].timestamp, 75);
429    }
430
431    #[test]
432    fn test_timer_cancel() {
433        let mut service = TimerService::new();
434
435        let id1 = service.register_timer(100, None, None);
436        let id2 = service.register_timer(200, None, None);
437
438        assert!(service.cancel_timer(id1));
439        assert_eq!(service.pending_count(), 1);
440
441        // Should not be able to cancel again
442        assert!(!service.cancel_timer(id1));
443
444        // Cancel the remaining timer
445        assert!(service.cancel_timer(id2));
446        assert_eq!(service.pending_count(), 0);
447    }
448
449    #[test]
450    fn test_timer_next_timestamp() {
451        let mut service = TimerService::new();
452
453        assert_eq!(service.next_timer_timestamp(), None);
454
455        service.register_timer(100, None, None);
456        assert_eq!(service.next_timer_timestamp(), Some(100));
457
458        service.register_timer(50, None, None);
459        assert_eq!(service.next_timer_timestamp(), Some(50));
460    }
461
462    #[test]
463    fn test_timer_clear() {
464        let mut service = TimerService::new();
465
466        service.register_timer(100, None, None);
467        service.register_timer(200, None, None);
468        service.register_timer(300, None, None);
469
470        service.clear();
471        assert_eq!(service.pending_count(), 0);
472        assert_eq!(service.next_timer_timestamp(), None);
473    }
474
475    #[test]
476    fn test_bounded_watermark_generator() {
477        let mut generator = BoundedOutOfOrdernessGenerator::new(100);
478
479        // First event
480        let wm1 = generator.on_event(1000);
481        assert_eq!(wm1, Some(Watermark::new(900)));
482
483        // Out of order event - no new watermark
484        let wm2 = generator.on_event(800);
485        assert!(wm2.is_none());
486
487        // New max timestamp
488        let wm3 = generator.on_event(1200);
489        assert_eq!(wm3, Some(Watermark::new(1100)));
490    }
491
492    #[test]
493    fn test_ascending_watermark_generator() {
494        let mut generator = AscendingTimestampsGenerator::new();
495
496        let wm1 = generator.on_event(1000);
497        assert_eq!(wm1, Some(Watermark::new(1000)));
498
499        let wm2 = generator.on_event(2000);
500        assert_eq!(wm2, Some(Watermark::new(2000)));
501
502        // Out of order - no watermark
503        let wm3 = generator.on_event(1500);
504        assert_eq!(wm3, None);
505    }
506
507    #[test]
508    fn test_watermark_tracker_basic() {
509        let mut tracker = WatermarkTracker::new(2);
510
511        tracker.update_source(0, 1000);
512        let wm = tracker.update_source(1, 500);
513
514        assert_eq!(wm, Some(Watermark::new(500)));
515    }
516
517    #[test]
518    fn test_watermark_tracker_idle() {
519        let mut tracker = WatermarkTracker::new(2);
520
521        tracker.update_source(0, 5000);
522        tracker.update_source(1, 1000);
523
524        // Mark slow source as idle
525        let wm = tracker.mark_idle(1);
526        assert_eq!(wm, Some(Watermark::new(5000)));
527
528        assert!(tracker.is_idle(1));
529        assert!(!tracker.is_idle(0));
530    }
531}