Expand description
§Time Module
Event time processing, watermarks, and timer management.
§Concepts
- Event Time: Timestamp when the event actually occurred
- Processing Time: Timestamp when the event is processed
- Watermark: Assertion that no events with timestamp < watermark will arrive
- Timer: Scheduled callback for window triggers or timeouts
§Event Time Extraction
Use EventTimeExtractor to extract timestamps from Arrow RecordBatch columns:
ⓘ
use laminar_core::time::{EventTimeExtractor, TimestampFormat, ExtractionMode};
// Extract millisecond timestamps from a column
let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
// Use Max mode for multi-row batches
let extractor = extractor.with_mode(ExtractionMode::Max);
let timestamp = extractor.extract(&batch)?;§Watermark Generation
Use watermark generators to track event-time progress:
use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};
// Allow events to be up to 1 second late
let mut generator = BoundedOutOfOrdernessGenerator::new(1000);
// Process events
let wm = generator.on_event(5000);
assert_eq!(wm, Some(Watermark::new(4000))); // 5000 - 1000§Multi-Source Watermark Tracking
For operators with multiple inputs, use WatermarkTracker:
use laminar_core::time::{WatermarkTracker, Watermark};
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 5000);
tracker.update_source(1, 3000);
// Combined watermark is the minimum
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));§Per-Partition Watermark Tracking
For Kafka sources with multiple partitions, use PartitionedWatermarkTracker:
use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
let mut tracker = PartitionedWatermarkTracker::new();
// Register a Kafka source with 4 partitions
tracker.register_source(0, 4);
// Update ALL partitions (all must have valid watermarks)
tracker.update_partition(PartitionId::new(0, 0), 5000);
tracker.update_partition(PartitionId::new(0, 1), 3000);
tracker.update_partition(PartitionId::new(0, 2), 4000);
tracker.update_partition(PartitionId::new(0, 3), 4500);
// Combined watermark is minimum across active partitions
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
// Mark slow partition as idle to allow progress
tracker.mark_partition_idle(PartitionId::new(0, 1));
assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));§Per-Key Watermark Tracking
For multi-tenant workloads or scenarios with significant event-time skew between
keys, use KeyedWatermarkTracker to achieve 99%+ accuracy vs 63-67% with global:
use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
use std::time::Duration;
let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
// Fast tenant advances quickly
tracker.update("tenant_a".to_string(), 15_000);
// Slow tenant at earlier time
tracker.update("tenant_b".to_string(), 5_000);
// Per-key watermarks differ - each key has independent tracking
assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));
// Events for tenant_b at 3000 are NOT late (their key watermark is 0)
assert!(!tracker.is_late(&"tenant_b".to_string(), 3000));
// But events for tenant_a at 3000 ARE late (their key watermark is 10000)
assert!(tracker.is_late(&"tenant_a".to_string(), 3000));§Watermark Alignment Groups
For stream-stream joins and multi-source operators, use WatermarkAlignmentGroup
to prevent unbounded state growth when sources have different processing speeds:
use laminar_core::time::{
WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
EnforcementMode, AlignmentAction,
};
use std::time::Duration;
let config = AlignmentGroupConfig::new("orders-payments")
.with_max_drift(Duration::from_secs(300)); // 5 minute max drift
let mut group = WatermarkAlignmentGroup::new(config);
group.register_source(0); // orders
group.register_source(1); // payments
// Both start at 0
group.report_watermark(0, 0);
group.report_watermark(1, 0);
// Orders advances within limit - OK
let action = group.report_watermark(0, 200_000); // 200 seconds
assert_eq!(action, AlignmentAction::Continue);
// Orders advances beyond limit - PAUSED
let action = group.report_watermark(0, 400_000); // 400 seconds (drift > 300)
assert_eq!(action, AlignmentAction::Pause);Structs§
- Alignment
Group Config - Configuration for a watermark alignment group.
- Alignment
Group Coordinator - Manages multiple alignment groups.
- Alignment
Group Id - Identifier for an alignment group.
- Alignment
Group Metrics - Metrics for an alignment group.
- Alignment
Source State - State for a source within an alignment group.
- Ascending
Timestamps Generator - Watermark generator for strictly ascending timestamps.
- Bounded
OutOf Orderness Generator - Watermark generator with bounded out-of-orderness.
- Core
Watermark State - Per-core partition watermark aggregator.
- Event
Time Extractor - Extracts event timestamps from Arrow
RecordBatchcolumns. - Global
Watermark Collector - Collects watermarks from multiple cores and computes the global watermark.
- KeyWatermark
State - Per-key watermark state.
- Keyed
Watermark Config - Keyed watermark tracker configuration.
- Keyed
Watermark Metrics - Metrics for keyed watermark tracking.
- Keyed
Watermark Tracker - Tracks watermarks per logical key.
- Keyed
Watermark Tracker With Late Handling - Keyed watermark tracker with late event handling.
- Metered
Generator - Watermark generator wrapper that collects metrics.
- Partition
Id - Partition identifier within a source.
- Partition
Watermark State - Per-partition watermark state.
- Partitioned
Watermark Metrics - Metrics for partitioned watermark tracking.
- Partitioned
Watermark Tracker - Tracks watermarks across partitions within sources.
- Periodic
Generator - Periodic watermark generator that emits at fixed wall-clock intervals.
- Processing
Time Generator - Processing-time watermark generator.
- Punctuated
Generator - Punctuated watermark generator that emits based on special events.
- Source
Provided Generator - Watermark generator for sources with embedded watermarks.
- Timer
Registration - A timer registration for delayed processing.
- Timer
Service - Timer service for scheduling and managing timers.
- Watermark
- A watermark indicating event time progress.
- Watermark
Alignment Group - Manages watermark alignment across sources in a group.
- Watermark
Metrics - Metrics for watermark tracking.
- Watermark
Tracker - Tracks watermarks across multiple input sources.
Enums§
- Alignment
Action - Action to take for a source based on alignment.
- Alignment
Error - Error type for alignment group operations.
- Enforcement
Mode - Enforcement mode for alignment groups.
- Event
Time Error - Errors that can occur during event time extraction.
- Extraction
Mode - Multi-row extraction strategy.
- KeyEviction
Policy - Policy for evicting keys when
max_keysis reached. - Keyed
Watermark Error - Errors that can occur in keyed watermark operations.
- Time
Error - Errors that can occur in time operations.
- Timestamp
Field - Column identifier for timestamp field.
- Timestamp
Format - Timestamp format variants for extraction.
- Watermark
Error - Errors that can occur in partitioned watermark operations.
Traits§
- Watermark
Generator - Trait for generating watermarks from event timestamps.
Type Aliases§
- Fired
Timers Vec - Collection type for fired timers.
- Timer
Key - Timer key type optimized for window IDs (16 bytes).