1mod cast;
3mod duration_str;
4mod event_time;
5mod filter;
6mod watermark;
7
8pub use cast::{cast_to_millis_array, CastError};
9pub use duration_str::parse_duration_str;
10pub use event_time::{EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField};
11
12pub use filter::{filter_batch_by_timestamp, FilterError, ThresholdOp};
13
14pub use watermark::{
15 AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, PeriodicGenerator,
16 ProcessingTimeGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
17 WatermarkTracker, DEFAULT_MAX_FUTURE_SKEW_MS,
18};
19
20use smallvec::SmallVec;
21use std::cmp::Ordering;
22use std::collections::BinaryHeap;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25#[must_use]
27pub fn now_unix_millis() -> i64 {
28 SystemTime::now()
29 .duration_since(UNIX_EPOCH)
30 .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
31}
32
33pub type TimerKey = SmallVec<[u8; 16]>;
38
39pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
49pub struct Watermark(pub i64);
50
51impl Watermark {
52 #[inline]
54 #[must_use]
55 pub fn new(timestamp: i64) -> Self {
56 Self(timestamp)
57 }
58
59 #[inline]
61 #[must_use]
62 pub fn timestamp(&self) -> i64 {
63 self.0
64 }
65
66 #[inline]
71 #[must_use]
72 pub fn is_late(&self, event_time: i64) -> bool {
73 event_time < self.0
74 }
75
76 #[must_use]
78 pub fn min(self, other: Self) -> Self {
79 Self(self.0.min(other.0))
80 }
81
82 #[must_use]
84 pub fn max(self, other: Self) -> Self {
85 Self(self.0.max(other.0))
86 }
87}
88
89impl Default for Watermark {
90 fn default() -> Self {
91 Self(i64::MIN)
92 }
93}
94
95impl From<i64> for Watermark {
96 fn from(timestamp: i64) -> Self {
97 Self(timestamp)
98 }
99}
100
101impl From<Watermark> for i64 {
102 fn from(watermark: Watermark) -> Self {
103 watermark.0
104 }
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct TimerRegistration {
113 pub id: u64,
115 pub timestamp: i64,
117 pub key: Option<TimerKey>,
120 pub operator_index: Option<usize>,
122}
123
124impl Ord for TimerRegistration {
125 fn cmp(&self, other: &Self) -> Ordering {
126 match other.timestamp.cmp(&self.timestamp) {
128 Ordering::Equal => other.id.cmp(&self.id),
129 ord => ord,
130 }
131 }
132}
133
134impl PartialOrd for TimerRegistration {
135 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
136 Some(self.cmp(other))
137 }
138}
139
140const TIMER_WARN_THRESHOLD: usize = 100_000;
143
144pub struct TimerService {
150 timers: BinaryHeap<TimerRegistration>,
151 next_timer_id: u64,
152}
153
154impl TimerService {
155 #[must_use]
157 pub fn new() -> Self {
158 Self {
159 timers: BinaryHeap::new(),
160 next_timer_id: 0,
161 }
162 }
163
164 #[must_use]
166 pub fn with_capacity(capacity: usize) -> Self {
167 Self {
168 timers: BinaryHeap::with_capacity(capacity),
169 next_timer_id: 0,
170 }
171 }
172
173 pub fn register_timer(
183 &mut self,
184 timestamp: i64,
185 key: Option<TimerKey>,
186 operator_index: Option<usize>,
187 ) -> u64 {
188 let id = self.next_timer_id;
189 self.next_timer_id += 1;
190
191 self.timers.push(TimerRegistration {
192 id,
193 timestamp,
194 key,
195 operator_index,
196 });
197
198 if self.timers.len() == TIMER_WARN_THRESHOLD {
199 tracing::warn!(
200 pending = self.timers.len(),
201 "Timer heap reached {} pending timers — watermark may be stalled",
202 TIMER_WARN_THRESHOLD,
203 );
204 }
205
206 id
207 }
208
209 #[inline]
219 pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
220 let mut fired = FiredTimersVec::new();
221
222 while let Some(timer) = self.timers.peek() {
223 if timer.timestamp <= current_time {
224 fired.push(self.timers.pop().expect("heap should not be empty"));
226 } else {
227 break;
228 }
229 }
230
231 fired
232 }
233
234 pub fn cancel_timer(&mut self, id: u64) -> bool {
238 let count_before = self.timers.len();
239 self.timers.retain(|t| t.id != id);
240 self.timers.len() < count_before
241 }
242
243 #[must_use]
245 pub fn pending_count(&self) -> usize {
246 self.timers.len()
247 }
248
249 #[must_use]
251 pub fn next_timer_timestamp(&self) -> Option<i64> {
252 self.timers.peek().map(|t| t.timestamp)
253 }
254
255 pub fn clear(&mut self) {
257 self.timers.clear();
258 }
259}
260
261impl Default for TimerService {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267#[derive(Debug, thiserror::Error)]
269pub enum TimeError {
270 #[error("Invalid timestamp: {0}")]
272 InvalidTimestamp(i64),
273
274 #[error("Timer not found: {0}")]
276 TimerNotFound(u64),
277
278 #[error("Watermark regression: current={current}, new={new}")]
280 WatermarkRegression {
281 current: i64,
283 new: i64,
285 },
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_watermark_creation() {
294 let watermark = Watermark::new(1000);
295 assert_eq!(watermark.timestamp(), 1000);
296 }
297
298 #[test]
299 fn test_watermark_late_detection() {
300 let watermark = Watermark::new(1000);
301 assert!(watermark.is_late(999));
302 assert!(!watermark.is_late(1000));
303 assert!(!watermark.is_late(1001));
304 }
305
306 #[test]
307 fn test_watermark_min_max() {
308 let w1 = Watermark::new(1000);
309 let w2 = Watermark::new(2000);
310
311 assert_eq!(w1.min(w2), Watermark::new(1000));
312 assert_eq!(w1.max(w2), Watermark::new(2000));
313 }
314
315 #[test]
316 fn test_watermark_ordering() {
317 let w1 = Watermark::new(1000);
318 let w2 = Watermark::new(2000);
319
320 assert!(w1 < w2);
321 assert!(w2 > w1);
322 assert_eq!(w1, Watermark::new(1000));
323 }
324
325 #[test]
326 fn test_watermark_conversions() {
327 let wm = Watermark::from(1000i64);
328 assert_eq!(wm.timestamp(), 1000);
329
330 let ts: i64 = wm.into();
331 assert_eq!(ts, 1000);
332 }
333
334 #[test]
335 fn test_watermark_default() {
336 let wm = Watermark::default();
337 assert_eq!(wm.timestamp(), i64::MIN);
338 }
339
340 #[test]
341 fn test_timer_service_creation() {
342 let service = TimerService::new();
343 assert_eq!(service.pending_count(), 0);
344 assert_eq!(service.next_timer_timestamp(), None);
345 }
346
347 #[test]
348 fn test_timer_registration() {
349 let mut service = TimerService::new();
350
351 let id1 = service.register_timer(100, None, None);
352 let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
353
354 assert_eq!(service.pending_count(), 2);
355 assert_ne!(id1, id2);
356 }
357
358 #[test]
359 fn test_timer_poll_order() {
360 let mut service = TimerService::new();
361
362 let id1 = service.register_timer(100, None, None);
363 let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
364 let _id3 = service.register_timer(150, None, None);
365
366 let fired = service.poll_timers(75);
368 assert_eq!(fired.len(), 1);
369 assert_eq!(fired[0].id, id2);
370 assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
371
372 let fired = service.poll_timers(125);
374 assert_eq!(fired.len(), 1);
375 assert_eq!(fired[0].id, id1);
376
377 let fired = service.poll_timers(200);
379 assert_eq!(fired.len(), 1);
380
381 assert_eq!(service.pending_count(), 0);
382 }
383
384 #[test]
385 fn test_timer_poll_multiple() {
386 let mut service = TimerService::new();
387
388 service.register_timer(50, None, None);
389 service.register_timer(75, None, None);
390 service.register_timer(100, None, None);
391
392 let fired = service.poll_timers(80);
394 assert_eq!(fired.len(), 2);
395 assert_eq!(fired[0].timestamp, 50);
397 assert_eq!(fired[1].timestamp, 75);
398 }
399
400 #[test]
401 fn test_timer_cancel() {
402 let mut service = TimerService::new();
403
404 let id1 = service.register_timer(100, None, None);
405 let id2 = service.register_timer(200, None, None);
406
407 assert!(service.cancel_timer(id1));
408 assert_eq!(service.pending_count(), 1);
409
410 assert!(!service.cancel_timer(id1));
412
413 assert!(service.cancel_timer(id2));
415 assert_eq!(service.pending_count(), 0);
416 }
417
418 #[test]
419 fn test_timer_next_timestamp() {
420 let mut service = TimerService::new();
421
422 assert_eq!(service.next_timer_timestamp(), None);
423
424 service.register_timer(100, None, None);
425 assert_eq!(service.next_timer_timestamp(), Some(100));
426
427 service.register_timer(50, None, None);
428 assert_eq!(service.next_timer_timestamp(), Some(50));
429 }
430
431 #[test]
432 fn test_timer_clear() {
433 let mut service = TimerService::new();
434
435 service.register_timer(100, None, None);
436 service.register_timer(200, None, None);
437 service.register_timer(300, None, None);
438
439 service.clear();
440 assert_eq!(service.pending_count(), 0);
441 assert_eq!(service.next_timer_timestamp(), None);
442 }
443
444 #[test]
445 fn test_bounded_watermark_generator() {
446 let mut generator = BoundedOutOfOrdernessGenerator::new(100);
447
448 let wm1 = generator.on_event(1000);
450 assert_eq!(wm1, Some(Watermark::new(900)));
451
452 let wm2 = generator.on_event(800);
454 assert!(wm2.is_none());
455
456 let wm3 = generator.on_event(1200);
458 assert_eq!(wm3, Some(Watermark::new(1100)));
459 }
460
461 #[test]
462 fn test_ascending_watermark_generator() {
463 let mut generator = AscendingTimestampsGenerator::new();
464
465 let wm1 = generator.on_event(1000);
466 assert_eq!(wm1, Some(Watermark::new(1000)));
467
468 let wm2 = generator.on_event(2000);
469 assert_eq!(wm2, Some(Watermark::new(2000)));
470
471 let wm3 = generator.on_event(1500);
473 assert_eq!(wm3, None);
474 }
475
476 #[test]
477 fn test_watermark_tracker_basic() {
478 let mut tracker = WatermarkTracker::new(2);
479
480 tracker.update_source(0, 1000);
481 let wm = tracker.update_source(1, 500);
482
483 assert_eq!(wm, Some(Watermark::new(500)));
484 }
485
486 #[test]
487 fn test_watermark_tracker_idle() {
488 let mut tracker = WatermarkTracker::new(2);
489
490 tracker.update_source(0, 5000);
491 tracker.update_source(1, 1000);
492
493 let wm = tracker.mark_idle(1);
495 assert_eq!(wm, Some(Watermark::new(5000)));
496
497 assert!(tracker.is_idle(1));
498 assert!(!tracker.is_idle(0));
499 }
500}