1mod cast;
3mod duration_str;
4mod event_time;
5mod filter;
6mod watermark;
7
8pub use cast::{cast_to_millis_array, CastError};
9pub use duration_str::parse_duration_str;
10pub use event_time::{EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField};
11
12pub use filter::{filter_batch_by_timestamp, ThresholdOp};
13
14pub use watermark::{
15 AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, PeriodicGenerator,
16 ProcessingTimeGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
17 WatermarkTracker,
18};
19
20use smallvec::SmallVec;
21use std::cmp::Ordering;
22use std::collections::BinaryHeap;
23
24pub type TimerKey = SmallVec<[u8; 16]>;
29
30pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct Watermark(pub i64);
59
60impl Watermark {
61 #[inline]
63 #[must_use]
64 pub fn new(timestamp: i64) -> Self {
65 Self(timestamp)
66 }
67
68 #[inline]
70 #[must_use]
71 pub fn timestamp(&self) -> i64 {
72 self.0
73 }
74
75 #[inline]
80 #[must_use]
81 pub fn is_late(&self, event_time: i64) -> bool {
82 event_time < self.0
83 }
84
85 #[must_use]
87 pub fn min(self, other: Self) -> Self {
88 Self(self.0.min(other.0))
89 }
90
91 #[must_use]
93 pub fn max(self, other: Self) -> Self {
94 Self(self.0.max(other.0))
95 }
96}
97
98impl Default for Watermark {
99 fn default() -> Self {
100 Self(i64::MIN)
101 }
102}
103
104impl From<i64> for Watermark {
105 fn from(timestamp: i64) -> Self {
106 Self(timestamp)
107 }
108}
109
110impl From<Watermark> for i64 {
111 fn from(watermark: Watermark) -> Self {
112 watermark.0
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct TimerRegistration {
122 pub id: u64,
124 pub timestamp: i64,
126 pub key: Option<TimerKey>,
129 pub operator_index: Option<usize>,
131}
132
133impl Ord for TimerRegistration {
134 fn cmp(&self, other: &Self) -> Ordering {
135 match other.timestamp.cmp(&self.timestamp) {
137 Ordering::Equal => other.id.cmp(&self.id),
138 ord => ord,
139 }
140 }
141}
142
143impl PartialOrd for TimerRegistration {
144 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
145 Some(self.cmp(other))
146 }
147}
148
149const TIMER_WARN_THRESHOLD: usize = 100_000;
174
175pub struct TimerService {
181 timers: BinaryHeap<TimerRegistration>,
182 next_timer_id: u64,
183}
184
185impl TimerService {
186 #[must_use]
188 pub fn new() -> Self {
189 Self {
190 timers: BinaryHeap::new(),
191 next_timer_id: 0,
192 }
193 }
194
195 #[must_use]
197 pub fn with_capacity(capacity: usize) -> Self {
198 Self {
199 timers: BinaryHeap::with_capacity(capacity),
200 next_timer_id: 0,
201 }
202 }
203
204 pub fn register_timer(
214 &mut self,
215 timestamp: i64,
216 key: Option<TimerKey>,
217 operator_index: Option<usize>,
218 ) -> u64 {
219 let id = self.next_timer_id;
220 self.next_timer_id += 1;
221
222 self.timers.push(TimerRegistration {
223 id,
224 timestamp,
225 key,
226 operator_index,
227 });
228
229 if self.timers.len() == TIMER_WARN_THRESHOLD {
230 tracing::warn!(
231 pending = self.timers.len(),
232 "Timer heap reached {} pending timers — watermark may be stalled",
233 TIMER_WARN_THRESHOLD,
234 );
235 }
236
237 id
238 }
239
240 #[inline]
250 pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
251 let mut fired = FiredTimersVec::new();
252
253 while let Some(timer) = self.timers.peek() {
254 if timer.timestamp <= current_time {
255 fired.push(self.timers.pop().expect("heap should not be empty"));
257 } else {
258 break;
259 }
260 }
261
262 fired
263 }
264
265 pub fn cancel_timer(&mut self, id: u64) -> bool {
269 let count_before = self.timers.len();
270 self.timers.retain(|t| t.id != id);
271 self.timers.len() < count_before
272 }
273
274 #[must_use]
276 pub fn pending_count(&self) -> usize {
277 self.timers.len()
278 }
279
280 #[must_use]
282 pub fn next_timer_timestamp(&self) -> Option<i64> {
283 self.timers.peek().map(|t| t.timestamp)
284 }
285
286 pub fn clear(&mut self) {
288 self.timers.clear();
289 }
290}
291
292impl Default for TimerService {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298#[derive(Debug, thiserror::Error)]
300pub enum TimeError {
301 #[error("Invalid timestamp: {0}")]
303 InvalidTimestamp(i64),
304
305 #[error("Timer not found: {0}")]
307 TimerNotFound(u64),
308
309 #[error("Watermark regression: current={current}, new={new}")]
311 WatermarkRegression {
312 current: i64,
314 new: i64,
316 },
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn test_watermark_creation() {
325 let watermark = Watermark::new(1000);
326 assert_eq!(watermark.timestamp(), 1000);
327 }
328
329 #[test]
330 fn test_watermark_late_detection() {
331 let watermark = Watermark::new(1000);
332 assert!(watermark.is_late(999));
333 assert!(!watermark.is_late(1000));
334 assert!(!watermark.is_late(1001));
335 }
336
337 #[test]
338 fn test_watermark_min_max() {
339 let w1 = Watermark::new(1000);
340 let w2 = Watermark::new(2000);
341
342 assert_eq!(w1.min(w2), Watermark::new(1000));
343 assert_eq!(w1.max(w2), Watermark::new(2000));
344 }
345
346 #[test]
347 fn test_watermark_ordering() {
348 let w1 = Watermark::new(1000);
349 let w2 = Watermark::new(2000);
350
351 assert!(w1 < w2);
352 assert!(w2 > w1);
353 assert_eq!(w1, Watermark::new(1000));
354 }
355
356 #[test]
357 fn test_watermark_conversions() {
358 let wm = Watermark::from(1000i64);
359 assert_eq!(wm.timestamp(), 1000);
360
361 let ts: i64 = wm.into();
362 assert_eq!(ts, 1000);
363 }
364
365 #[test]
366 fn test_watermark_default() {
367 let wm = Watermark::default();
368 assert_eq!(wm.timestamp(), i64::MIN);
369 }
370
371 #[test]
372 fn test_timer_service_creation() {
373 let service = TimerService::new();
374 assert_eq!(service.pending_count(), 0);
375 assert_eq!(service.next_timer_timestamp(), None);
376 }
377
378 #[test]
379 fn test_timer_registration() {
380 let mut service = TimerService::new();
381
382 let id1 = service.register_timer(100, None, None);
383 let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
384
385 assert_eq!(service.pending_count(), 2);
386 assert_ne!(id1, id2);
387 }
388
389 #[test]
390 fn test_timer_poll_order() {
391 let mut service = TimerService::new();
392
393 let id1 = service.register_timer(100, None, None);
394 let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
395 let _id3 = service.register_timer(150, None, None);
396
397 let fired = service.poll_timers(75);
399 assert_eq!(fired.len(), 1);
400 assert_eq!(fired[0].id, id2);
401 assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
402
403 let fired = service.poll_timers(125);
405 assert_eq!(fired.len(), 1);
406 assert_eq!(fired[0].id, id1);
407
408 let fired = service.poll_timers(200);
410 assert_eq!(fired.len(), 1);
411
412 assert_eq!(service.pending_count(), 0);
413 }
414
415 #[test]
416 fn test_timer_poll_multiple() {
417 let mut service = TimerService::new();
418
419 service.register_timer(50, None, None);
420 service.register_timer(75, None, None);
421 service.register_timer(100, None, None);
422
423 let fired = service.poll_timers(80);
425 assert_eq!(fired.len(), 2);
426 assert_eq!(fired[0].timestamp, 50);
428 assert_eq!(fired[1].timestamp, 75);
429 }
430
431 #[test]
432 fn test_timer_cancel() {
433 let mut service = TimerService::new();
434
435 let id1 = service.register_timer(100, None, None);
436 let id2 = service.register_timer(200, None, None);
437
438 assert!(service.cancel_timer(id1));
439 assert_eq!(service.pending_count(), 1);
440
441 assert!(!service.cancel_timer(id1));
443
444 assert!(service.cancel_timer(id2));
446 assert_eq!(service.pending_count(), 0);
447 }
448
449 #[test]
450 fn test_timer_next_timestamp() {
451 let mut service = TimerService::new();
452
453 assert_eq!(service.next_timer_timestamp(), None);
454
455 service.register_timer(100, None, None);
456 assert_eq!(service.next_timer_timestamp(), Some(100));
457
458 service.register_timer(50, None, None);
459 assert_eq!(service.next_timer_timestamp(), Some(50));
460 }
461
462 #[test]
463 fn test_timer_clear() {
464 let mut service = TimerService::new();
465
466 service.register_timer(100, None, None);
467 service.register_timer(200, None, None);
468 service.register_timer(300, None, None);
469
470 service.clear();
471 assert_eq!(service.pending_count(), 0);
472 assert_eq!(service.next_timer_timestamp(), None);
473 }
474
475 #[test]
476 fn test_bounded_watermark_generator() {
477 let mut generator = BoundedOutOfOrdernessGenerator::new(100);
478
479 let wm1 = generator.on_event(1000);
481 assert_eq!(wm1, Some(Watermark::new(900)));
482
483 let wm2 = generator.on_event(800);
485 assert!(wm2.is_none());
486
487 let wm3 = generator.on_event(1200);
489 assert_eq!(wm3, Some(Watermark::new(1100)));
490 }
491
492 #[test]
493 fn test_ascending_watermark_generator() {
494 let mut generator = AscendingTimestampsGenerator::new();
495
496 let wm1 = generator.on_event(1000);
497 assert_eq!(wm1, Some(Watermark::new(1000)));
498
499 let wm2 = generator.on_event(2000);
500 assert_eq!(wm2, Some(Watermark::new(2000)));
501
502 let wm3 = generator.on_event(1500);
504 assert_eq!(wm3, None);
505 }
506
507 #[test]
508 fn test_watermark_tracker_basic() {
509 let mut tracker = WatermarkTracker::new(2);
510
511 tracker.update_source(0, 1000);
512 let wm = tracker.update_source(1, 500);
513
514 assert_eq!(wm, Some(Watermark::new(500)));
515 }
516
517 #[test]
518 fn test_watermark_tracker_idle() {
519 let mut tracker = WatermarkTracker::new(2);
520
521 tracker.update_source(0, 5000);
522 tracker.update_source(1, 1000);
523
524 let wm = tracker.mark_idle(1);
526 assert_eq!(wm, Some(Watermark::new(5000)));
527
528 assert!(tracker.is_idle(1));
529 assert!(!tracker.is_idle(0));
530 }
531}