Skip to main content

Module time

Module time 

Source
Expand description

§Time Module

Event time processing, watermarks, and timer management.

§Concepts

  • Event Time: Timestamp when the event actually occurred
  • Processing Time: Timestamp when the event is processed
  • Watermark: Assertion that no events with timestamp < watermark will arrive
  • Timer: Scheduled callback for window triggers or timeouts

§Event Time Extraction

Use EventTimeExtractor to extract timestamps from Arrow RecordBatch columns:

use laminar_core::time::{EventTimeExtractor, TimestampFormat, ExtractionMode};

// Extract millisecond timestamps from a column
let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);

// Use Max mode for multi-row batches
let extractor = extractor.with_mode(ExtractionMode::Max);

let timestamp = extractor.extract(&batch)?;

§Watermark Generation

Use watermark generators to track event-time progress:

use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};

// Allow events to be up to 1 second late
let mut generator = BoundedOutOfOrdernessGenerator::new(1000);

// Process events
let wm = generator.on_event(5000);
assert_eq!(wm, Some(Watermark::new(4000))); // 5000 - 1000

§Multi-Source Watermark Tracking

For operators with multiple inputs, use WatermarkTracker:

use laminar_core::time::{WatermarkTracker, Watermark};

let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 5000);
tracker.update_source(1, 3000);

// Combined watermark is the minimum
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));

§Per-Partition Watermark Tracking

For Kafka sources with multiple partitions, use PartitionedWatermarkTracker:

use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};

let mut tracker = PartitionedWatermarkTracker::new();

// Register a Kafka source with 4 partitions
tracker.register_source(0, 4);

// Update ALL partitions (all must have valid watermarks)
tracker.update_partition(PartitionId::new(0, 0), 5000);
tracker.update_partition(PartitionId::new(0, 1), 3000);
tracker.update_partition(PartitionId::new(0, 2), 4000);
tracker.update_partition(PartitionId::new(0, 3), 4500);

// Combined watermark is minimum across active partitions
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));

// Mark slow partition as idle to allow progress
tracker.mark_partition_idle(PartitionId::new(0, 1));
assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));

§Per-Key Watermark Tracking

For multi-tenant workloads or scenarios with significant event-time skew between keys, use KeyedWatermarkTracker to achieve 99%+ accuracy vs 63-67% with global:

use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
use std::time::Duration;

let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);

// Fast tenant advances quickly
tracker.update("tenant_a".to_string(), 15_000);

// Slow tenant at earlier time
tracker.update("tenant_b".to_string(), 5_000);

// Per-key watermarks differ - each key has independent tracking
assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));

// Events for tenant_b at 3000 are NOT late (their key watermark is 0)
assert!(!tracker.is_late(&"tenant_b".to_string(), 3000));

// But events for tenant_a at 3000 ARE late (their key watermark is 10000)
assert!(tracker.is_late(&"tenant_a".to_string(), 3000));

§Watermark Alignment Groups

For stream-stream joins and multi-source operators, use WatermarkAlignmentGroup to prevent unbounded state growth when sources have different processing speeds:

use laminar_core::time::{
    WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
    EnforcementMode, AlignmentAction,
};
use std::time::Duration;

let config = AlignmentGroupConfig::new("orders-payments")
    .with_max_drift(Duration::from_secs(300)); // 5 minute max drift

let mut group = WatermarkAlignmentGroup::new(config);
group.register_source(0); // orders
group.register_source(1); // payments

// Both start at 0
group.report_watermark(0, 0);
group.report_watermark(1, 0);

// Orders advances within limit - OK
let action = group.report_watermark(0, 200_000); // 200 seconds
assert_eq!(action, AlignmentAction::Continue);

// Orders advances beyond limit - PAUSED
let action = group.report_watermark(0, 400_000); // 400 seconds (drift > 300)
assert_eq!(action, AlignmentAction::Pause);

Structs§

AlignmentGroupConfig
Configuration for a watermark alignment group.
AlignmentGroupCoordinator
Manages multiple alignment groups.
AlignmentGroupId
Identifier for an alignment group.
AlignmentGroupMetrics
Metrics for an alignment group.
AlignmentSourceState
State for a source within an alignment group.
AscendingTimestampsGenerator
Watermark generator for strictly ascending timestamps.
BoundedOutOfOrdernessGenerator
Watermark generator with bounded out-of-orderness.
CoreWatermarkState
Per-core partition watermark aggregator.
EventTimeExtractor
Extracts event timestamps from Arrow RecordBatch columns.
GlobalWatermarkCollector
Collects watermarks from multiple cores and computes the global watermark.
KeyWatermarkState
Per-key watermark state.
KeyedWatermarkConfig
Keyed watermark tracker configuration.
KeyedWatermarkMetrics
Metrics for keyed watermark tracking.
KeyedWatermarkTracker
Tracks watermarks per logical key.
KeyedWatermarkTrackerWithLateHandling
Keyed watermark tracker with late event handling.
MeteredGenerator
Watermark generator wrapper that collects metrics.
PartitionId
Partition identifier within a source.
PartitionWatermarkState
Per-partition watermark state.
PartitionedWatermarkMetrics
Metrics for partitioned watermark tracking.
PartitionedWatermarkTracker
Tracks watermarks across partitions within sources.
PeriodicGenerator
Periodic watermark generator that emits at fixed wall-clock intervals.
ProcessingTimeGenerator
Processing-time watermark generator.
PunctuatedGenerator
Punctuated watermark generator that emits based on special events.
SourceProvidedGenerator
Watermark generator for sources with embedded watermarks.
TimerRegistration
A timer registration for delayed processing.
TimerService
Timer service for scheduling and managing timers.
Watermark
A watermark indicating event time progress.
WatermarkAlignmentGroup
Manages watermark alignment across sources in a group.
WatermarkMetrics
Metrics for watermark tracking.
WatermarkTracker
Tracks watermarks across multiple input sources.

Enums§

AlignmentAction
Action to take for a source based on alignment.
AlignmentError
Error type for alignment group operations.
EnforcementMode
Enforcement mode for alignment groups.
EventTimeError
Errors that can occur during event time extraction.
ExtractionMode
Multi-row extraction strategy.
KeyEvictionPolicy
Policy for evicting keys when max_keys is reached.
KeyedWatermarkError
Errors that can occur in keyed watermark operations.
TimeError
Errors that can occur in time operations.
TimestampField
Column identifier for timestamp field.
TimestampFormat
Timestamp format variants for extraction.
WatermarkError
Errors that can occur in partitioned watermark operations.

Traits§

WatermarkGenerator
Trait for generating watermarks from event timestamps.

Type Aliases§

FiredTimersVec
Collection type for fired timers.
TimerKey
Timer key type optimized for window IDs (16 bytes).