Skip to main content

KeyedWatermarkTracker

Struct KeyedWatermarkTracker 

Source
pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> { /* private fields */ }
Expand description

Tracks watermarks per logical key.

Provides fine-grained watermark tracking for multi-tenant workloads and scenarios with significant event-time skew between keys.

§Research Background

Based on research (March 2025), keyed watermarks achieve 99%+ accuracy compared to 63-67% with global watermarks.

§Example

use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
use std::time::Duration;

let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);

// Fast tenant advances quickly
tracker.update("tenant_a".to_string(), 10_000);
tracker.update("tenant_a".to_string(), 15_000);

// Slow tenant at earlier time
tracker.update("tenant_b".to_string(), 5_000);

// Per-key watermarks differ
assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));

// Global watermark is minimum
assert_eq!(tracker.global_watermark(), Some(Watermark::new(0)));

Implementations§

Source§

impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K>

Source

pub fn new(config: KeyedWatermarkConfig) -> Self

Creates a new keyed watermark tracker with the given configuration.

Source

pub fn with_defaults() -> Self

Creates a tracker with default configuration.

Source

pub fn update( &mut self, key: K, event_time: i64, ) -> Result<Option<Watermark>, KeyedWatermarkError>

Updates the watermark for a specific key.

§Returns
  • Ok(Some(Watermark)) if the global watermark changes
  • Ok(None) if no global change
§Errors

Returns KeyedWatermarkError::MaxKeysReached if max_keys is reached and the RejectNew eviction policy is configured.

§Example
use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig};
use std::time::Duration;

let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);

// First update creates the key
let wm = tracker.update("key1".to_string(), 1000).unwrap();
assert!(wm.is_some()); // Global watermark advances
Source

pub fn update_batch( &mut self, events: &[(K, i64)], ) -> Result<Option<Watermark>, KeyedWatermarkError>

Batch update for multiple events (more efficient).

Returns the new global watermark if it changed.

§Errors

Returns KeyedWatermarkError::MaxKeysReached if max_keys is reached and the RejectNew eviction policy is configured.

Source

pub fn watermark_for_key(&self, key: &K) -> Option<i64>

Returns the watermark for a specific key.

Source

pub fn global_watermark(&self) -> Option<Watermark>

Returns the global watermark (minimum across active keys).

Source

pub fn is_late(&self, key: &K, event_time: i64) -> bool

Checks if an event is late for its key.

Uses the key’s individual watermark, not the global watermark. If the key doesn’t exist, returns false (not late).

Source

pub fn is_late_global(&self, event_time: i64) -> bool

Checks if an event is late using the global watermark.

Use this for cross-key ordering guarantees.

Source

pub fn mark_idle(&mut self, key: &K) -> Option<Watermark>

Marks a key as idle, excluding it from global watermark calculation.

Returns Some(Watermark) if the global watermark advances.

Source

pub fn mark_active(&mut self, key: &K)

Marks a key as active again.

Source

pub fn check_idle_keys(&mut self) -> Option<Watermark>

Checks for keys that have been idle longer than the timeout.

Should be called periodically from Ring 1.

Returns Some(Watermark) if marking idle keys causes the global watermark to advance.

Source

pub fn active_key_count(&self) -> usize

Returns the number of active (non-idle) keys.

Source

pub fn total_key_count(&self) -> usize

Returns the total number of tracked keys.

Source

pub fn metrics(&self) -> &KeyedWatermarkMetrics

Returns metrics.

Source

pub fn recalculate_global(&mut self) -> Option<Watermark>

Forces recalculation of global watermark.

Useful after bulk operations or recovery.

Source

pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState>

Removes a key from tracking.

Returns the key’s watermark state if it existed.

Source

pub fn clear(&mut self)

Clears all tracked keys.

Source

pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState>

Returns the state for a specific key.

Source

pub fn config(&self) -> &KeyedWatermarkConfig

Returns the configuration.

Source

pub fn contains_key(&self, key: &K) -> bool

Checks if a key exists in the tracker.

Source

pub fn keys(&self) -> impl Iterator<Item = &K>

Returns an iterator over all keys.

Source

pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)>

Returns an iterator over all key-state pairs.

Source

pub fn bounded_delay_ms(&self) -> i64

Returns the bounded delay in milliseconds.

Trait Implementations§

Source§

impl<K: Debug + Hash + Eq + Clone> Debug for KeyedWatermarkTracker<K>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> ArchivePointee for T

§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> LayoutRaw for T

§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
§

impl<T> Pointee for T

§

type Metadata = ()

The metadata type for pointers and references to this type.
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T> Scope for T

§

fn with<F, R>(self, f: F) -> R
where Self: Sized, F: FnOnce(Self) -> R,

Scoped with ownership.
§

fn with_ref<F, R>(&self, f: F) -> R
where F: FnOnce(&Self) -> R,

Scoped with reference.
§

fn with_mut<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut Self) -> R,

Scoped with mutable reference.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,

§

impl<T> Value for T
where T: Send + Sync + 'static,