Skip to main content

laminar_core/time/
mod.rs

1//! Event time, watermarks, and timer management.
2mod cast;
3mod duration_str;
4mod event_time;
5mod filter;
6mod watermark;
7
8pub use cast::{cast_to_millis_array, CastError};
9pub use duration_str::parse_duration_str;
10pub use event_time::{EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField};
11
12pub use filter::{filter_batch_by_timestamp, FilterError, ThresholdOp};
13
14pub use watermark::{
15    AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, PeriodicGenerator,
16    ProcessingTimeGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
17    WatermarkTracker, DEFAULT_MAX_FUTURE_SKEW_MS,
18};
19
20use smallvec::SmallVec;
21use std::cmp::Ordering;
22use std::collections::BinaryHeap;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25/// Wall clock as epoch milliseconds; `0` if unreadable (callers fail open).
26#[must_use]
27pub fn now_unix_millis() -> i64 {
28    SystemTime::now()
29        .duration_since(UNIX_EPOCH)
30        .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
31}
32
33/// Timer key type optimized for window IDs (16 bytes).
34///
35/// Uses `SmallVec` to avoid heap allocation for keys up to 16 bytes,
36/// which covers the common case of `WindowId` keys.
37pub type TimerKey = SmallVec<[u8; 16]>;
38
39/// Collection type for fired timers.
40///
41/// Uses `SmallVec` to avoid heap allocation when few timers fire per poll.
42/// Size 8 covers most practical cases where timers fire in small batches.
43pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
44
45/// A monotonically-increasing assertion that no events with timestamps
46/// earlier than this will arrive. Drives window emission, late-event
47/// detection, and cross-operator time alignment.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
49pub struct Watermark(pub i64);
50
51impl Watermark {
52    /// Creates a new watermark with the given timestamp.
53    #[inline]
54    #[must_use]
55    pub fn new(timestamp: i64) -> Self {
56        Self(timestamp)
57    }
58
59    /// Returns the watermark timestamp in milliseconds.
60    #[inline]
61    #[must_use]
62    pub fn timestamp(&self) -> i64 {
63        self.0
64    }
65
66    /// Checks if an event is late relative to this watermark.
67    ///
68    /// An event is considered late if its timestamp is strictly less than
69    /// the watermark timestamp.
70    #[inline]
71    #[must_use]
72    pub fn is_late(&self, event_time: i64) -> bool {
73        event_time < self.0
74    }
75
76    /// Returns the minimum (earlier) of two watermarks.
77    #[must_use]
78    pub fn min(self, other: Self) -> Self {
79        Self(self.0.min(other.0))
80    }
81
82    /// Returns the maximum (later) of two watermarks.
83    #[must_use]
84    pub fn max(self, other: Self) -> Self {
85        Self(self.0.max(other.0))
86    }
87}
88
89impl Default for Watermark {
90    fn default() -> Self {
91        Self(i64::MIN)
92    }
93}
94
95impl From<i64> for Watermark {
96    fn from(timestamp: i64) -> Self {
97        Self(timestamp)
98    }
99}
100
101impl From<Watermark> for i64 {
102    fn from(watermark: Watermark) -> Self {
103        watermark.0
104    }
105}
106
107/// A timer registration for delayed processing.
108///
109/// Timers are used by operators to schedule future actions, typically for
110/// window triggering or timeouts.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct TimerRegistration {
113    /// Unique timer ID
114    pub id: u64,
115    /// Scheduled timestamp (event time, in milliseconds)
116    pub timestamp: i64,
117    /// Timer key (for keyed operators).
118    /// Uses `TimerKey` (`SmallVec`) to avoid heap allocation for keys up to 16 bytes.
119    pub key: Option<TimerKey>,
120    /// The index of the operator that registered this timer
121    pub operator_index: Option<usize>,
122}
123
124impl Ord for TimerRegistration {
125    fn cmp(&self, other: &Self) -> Ordering {
126        // Reverse ordering for min-heap behavior (earliest first)
127        match other.timestamp.cmp(&self.timestamp) {
128            Ordering::Equal => other.id.cmp(&self.id),
129            ord => ord,
130        }
131    }
132}
133
134impl PartialOrd for TimerRegistration {
135    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
136        Some(self.cmp(other))
137    }
138}
139
140// Threshold at which the timer service logs a warning about accumulated timers.
141// This typically indicates a stalled watermark preventing timer firing.
142const TIMER_WARN_THRESHOLD: usize = 100_000;
143
144/// Timer service for scheduling and managing timers.
145///
146/// The timer service maintains a priority queue of timer registrations,
147/// ordered by timestamp. Operators can register timers to be fired at
148/// specific event times.
149pub struct TimerService {
150    timers: BinaryHeap<TimerRegistration>,
151    next_timer_id: u64,
152}
153
154impl TimerService {
155    /// Creates a new timer service.
156    #[must_use]
157    pub fn new() -> Self {
158        Self {
159            timers: BinaryHeap::new(),
160            next_timer_id: 0,
161        }
162    }
163
164    /// Creates a new timer service with pre-allocated capacity.
165    #[must_use]
166    pub fn with_capacity(capacity: usize) -> Self {
167        Self {
168            timers: BinaryHeap::with_capacity(capacity),
169            next_timer_id: 0,
170        }
171    }
172
173    /// Registers a new timer.
174    ///
175    /// Returns the unique timer ID that can be used to cancel the timer.
176    ///
177    /// # Arguments
178    ///
179    /// * `timestamp` - The event time at which the timer should fire
180    /// * `key` - Optional key for keyed operators
181    /// * `operator_index` - Optional index of the operator registering the timer(must match the index in the reactor)
182    pub fn register_timer(
183        &mut self,
184        timestamp: i64,
185        key: Option<TimerKey>,
186        operator_index: Option<usize>,
187    ) -> u64 {
188        let id = self.next_timer_id;
189        self.next_timer_id += 1;
190
191        self.timers.push(TimerRegistration {
192            id,
193            timestamp,
194            key,
195            operator_index,
196        });
197
198        if self.timers.len() == TIMER_WARN_THRESHOLD {
199            tracing::warn!(
200                pending = self.timers.len(),
201                "Timer heap reached {} pending timers — watermark may be stalled",
202                TIMER_WARN_THRESHOLD,
203            );
204        }
205
206        id
207    }
208
209    /// Polls for timers that should fire at or before the given timestamp.
210    ///
211    /// Returns all timers with timestamps <= `current_time`, in order.
212    /// Uses `FiredTimersVec` (`SmallVec`) to avoid heap allocation when few timers fire.
213    ///
214    /// # Panics
215    ///
216    /// This function should not panic under normal circumstances. The internal
217    /// `expect` is only called after verifying the heap is not empty via `peek`.
218    #[inline]
219    pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
220        let mut fired = FiredTimersVec::new();
221
222        while let Some(timer) = self.timers.peek() {
223            if timer.timestamp <= current_time {
224                // SAFETY: We just peeked and confirmed the heap is not empty
225                fired.push(self.timers.pop().expect("heap should not be empty"));
226            } else {
227                break;
228            }
229        }
230
231        fired
232    }
233
234    /// Cancels a timer by ID.
235    ///
236    /// Returns `true` if the timer was found and cancelled.
237    pub fn cancel_timer(&mut self, id: u64) -> bool {
238        let count_before = self.timers.len();
239        self.timers.retain(|t| t.id != id);
240        self.timers.len() < count_before
241    }
242
243    /// Returns the number of pending timers.
244    #[must_use]
245    pub fn pending_count(&self) -> usize {
246        self.timers.len()
247    }
248
249    /// Returns the timestamp of the next timer to fire, if any.
250    #[must_use]
251    pub fn next_timer_timestamp(&self) -> Option<i64> {
252        self.timers.peek().map(|t| t.timestamp)
253    }
254
255    /// Clears all pending timers.
256    pub fn clear(&mut self) {
257        self.timers.clear();
258    }
259}
260
261impl Default for TimerService {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267/// Errors that can occur in time operations.
268#[derive(Debug, thiserror::Error)]
269pub enum TimeError {
270    /// Invalid timestamp value
271    #[error("Invalid timestamp: {0}")]
272    InvalidTimestamp(i64),
273
274    /// Timer not found
275    #[error("Timer not found: {0}")]
276    TimerNotFound(u64),
277
278    /// Watermark regression (going backwards)
279    #[error("Watermark regression: current={current}, new={new}")]
280    WatermarkRegression {
281        /// Current watermark value
282        current: i64,
283        /// Attempted new watermark value
284        new: i64,
285    },
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_watermark_creation() {
294        let watermark = Watermark::new(1000);
295        assert_eq!(watermark.timestamp(), 1000);
296    }
297
298    #[test]
299    fn test_watermark_late_detection() {
300        let watermark = Watermark::new(1000);
301        assert!(watermark.is_late(999));
302        assert!(!watermark.is_late(1000));
303        assert!(!watermark.is_late(1001));
304    }
305
306    #[test]
307    fn test_watermark_min_max() {
308        let w1 = Watermark::new(1000);
309        let w2 = Watermark::new(2000);
310
311        assert_eq!(w1.min(w2), Watermark::new(1000));
312        assert_eq!(w1.max(w2), Watermark::new(2000));
313    }
314
315    #[test]
316    fn test_watermark_ordering() {
317        let w1 = Watermark::new(1000);
318        let w2 = Watermark::new(2000);
319
320        assert!(w1 < w2);
321        assert!(w2 > w1);
322        assert_eq!(w1, Watermark::new(1000));
323    }
324
325    #[test]
326    fn test_watermark_conversions() {
327        let wm = Watermark::from(1000i64);
328        assert_eq!(wm.timestamp(), 1000);
329
330        let ts: i64 = wm.into();
331        assert_eq!(ts, 1000);
332    }
333
334    #[test]
335    fn test_watermark_default() {
336        let wm = Watermark::default();
337        assert_eq!(wm.timestamp(), i64::MIN);
338    }
339
340    #[test]
341    fn test_timer_service_creation() {
342        let service = TimerService::new();
343        assert_eq!(service.pending_count(), 0);
344        assert_eq!(service.next_timer_timestamp(), None);
345    }
346
347    #[test]
348    fn test_timer_registration() {
349        let mut service = TimerService::new();
350
351        let id1 = service.register_timer(100, None, None);
352        let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
353
354        assert_eq!(service.pending_count(), 2);
355        assert_ne!(id1, id2);
356    }
357
358    #[test]
359    fn test_timer_poll_order() {
360        let mut service = TimerService::new();
361
362        let id1 = service.register_timer(100, None, None);
363        let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
364        let _id3 = service.register_timer(150, None, None);
365
366        // Poll at time 75 - should get timer at t=50
367        let fired = service.poll_timers(75);
368        assert_eq!(fired.len(), 1);
369        assert_eq!(fired[0].id, id2);
370        assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
371
372        // Poll at time 125 - should get timer at t=100
373        let fired = service.poll_timers(125);
374        assert_eq!(fired.len(), 1);
375        assert_eq!(fired[0].id, id1);
376
377        // Poll at time 200 - should get timer at t=150
378        let fired = service.poll_timers(200);
379        assert_eq!(fired.len(), 1);
380
381        assert_eq!(service.pending_count(), 0);
382    }
383
384    #[test]
385    fn test_timer_poll_multiple() {
386        let mut service = TimerService::new();
387
388        service.register_timer(50, None, None);
389        service.register_timer(75, None, None);
390        service.register_timer(100, None, None);
391
392        // Poll at time 80 - should get timers at t=50 and t=75
393        let fired = service.poll_timers(80);
394        assert_eq!(fired.len(), 2);
395        // Should be in timestamp order
396        assert_eq!(fired[0].timestamp, 50);
397        assert_eq!(fired[1].timestamp, 75);
398    }
399
400    #[test]
401    fn test_timer_cancel() {
402        let mut service = TimerService::new();
403
404        let id1 = service.register_timer(100, None, None);
405        let id2 = service.register_timer(200, None, None);
406
407        assert!(service.cancel_timer(id1));
408        assert_eq!(service.pending_count(), 1);
409
410        // Should not be able to cancel again
411        assert!(!service.cancel_timer(id1));
412
413        // Cancel the remaining timer
414        assert!(service.cancel_timer(id2));
415        assert_eq!(service.pending_count(), 0);
416    }
417
418    #[test]
419    fn test_timer_next_timestamp() {
420        let mut service = TimerService::new();
421
422        assert_eq!(service.next_timer_timestamp(), None);
423
424        service.register_timer(100, None, None);
425        assert_eq!(service.next_timer_timestamp(), Some(100));
426
427        service.register_timer(50, None, None);
428        assert_eq!(service.next_timer_timestamp(), Some(50));
429    }
430
431    #[test]
432    fn test_timer_clear() {
433        let mut service = TimerService::new();
434
435        service.register_timer(100, None, None);
436        service.register_timer(200, None, None);
437        service.register_timer(300, None, None);
438
439        service.clear();
440        assert_eq!(service.pending_count(), 0);
441        assert_eq!(service.next_timer_timestamp(), None);
442    }
443
444    #[test]
445    fn test_bounded_watermark_generator() {
446        let mut generator = BoundedOutOfOrdernessGenerator::new(100);
447
448        // First event
449        let wm1 = generator.on_event(1000);
450        assert_eq!(wm1, Some(Watermark::new(900)));
451
452        // Out of order event - no new watermark
453        let wm2 = generator.on_event(800);
454        assert!(wm2.is_none());
455
456        // New max timestamp
457        let wm3 = generator.on_event(1200);
458        assert_eq!(wm3, Some(Watermark::new(1100)));
459    }
460
461    #[test]
462    fn test_ascending_watermark_generator() {
463        let mut generator = AscendingTimestampsGenerator::new();
464
465        let wm1 = generator.on_event(1000);
466        assert_eq!(wm1, Some(Watermark::new(1000)));
467
468        let wm2 = generator.on_event(2000);
469        assert_eq!(wm2, Some(Watermark::new(2000)));
470
471        // Out of order - no watermark
472        let wm3 = generator.on_event(1500);
473        assert_eq!(wm3, None);
474    }
475
476    #[test]
477    fn test_watermark_tracker_basic() {
478        let mut tracker = WatermarkTracker::new(2);
479
480        tracker.update_source(0, 1000);
481        let wm = tracker.update_source(1, 500);
482
483        assert_eq!(wm, Some(Watermark::new(500)));
484    }
485
486    #[test]
487    fn test_watermark_tracker_idle() {
488        let mut tracker = WatermarkTracker::new(2);
489
490        tracker.update_source(0, 5000);
491        tracker.update_source(1, 1000);
492
493        // Mark slow source as idle
494        let wm = tracker.mark_idle(1);
495        assert_eq!(wm, Some(Watermark::new(5000)));
496
497        assert!(tracker.is_idle(1));
498        assert!(!tracker.is_idle(0));
499    }
500}