Skip to main content

SessionWindowOperator

Struct SessionWindowOperator 

Source
pub struct SessionWindowOperator<A: Aggregator> { /* private fields */ }
Expand description

Creates the standard session output schema. Session window operator.

Groups events by activity periods separated by gaps. Each unique key maintains its own session state independently, supporting multiple concurrent sessions per key.

§Session Lifecycle

  1. Start: First event for a key creates a new session
  2. Extend: Events within gap period extend the session
  3. Close: Timer fires when gap expires, emitting results
  4. Merge: Late data may merge previously separate sessions

§State Management

Session state is stored using prefixed keys:

  • six:<key_hash> - SessionIndex (all sessions for a key)
  • sac:<session_id> - Per-session accumulator state

§Emit Strategies

  • OnWatermark: Emit when watermark passes session end
  • OnUpdate: Emit after every state update
  • OnWindowClose: Only emit on final closure
  • Changelog: Emit CDC records with Z-set weights
  • Final: Suppress all intermediate, drop late data

Implementations§

Source§

impl<A: Aggregator> SessionWindowOperator<A>
where A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, Error>>, <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, Error>> + RkyvDeserialize<A::Acc, HighDeserializer<Error>>,

Source

pub fn new(gap: Duration, aggregator: A, allowed_lateness: Duration) -> Self

Creates a new session window operator.

§Arguments
  • gap - The inactivity gap that closes a session
  • aggregator - Aggregation function to apply within sessions
  • allowed_lateness - Grace period for late data after session close
§Panics

Panics if gap or allowed lateness does not fit in i64.

Source

pub fn with_id( gap: Duration, aggregator: A, allowed_lateness: Duration, operator_id: String, ) -> Self

Creates a new session window operator with a custom operator ID.

§Panics

Panics if gap or allowed lateness does not fit in i64.

Source

pub fn set_max_cached_indices(&mut self, max: usize)

Sets the maximum number of session indices to cache in-memory.

When the cache exceeds this limit, it is cleared and entries are reloaded from the persistent state store on demand. Default: 16,384.

Source

pub fn set_key_column(&mut self, column_index: usize)

Sets the key column for per-key session tracking.

If not set, a single global session is maintained.

Source

pub fn key_column(&self) -> Option<usize>

Returns the key column index if set.

Source

pub fn set_emit_strategy(&mut self, strategy: EmitStrategy)

Sets the emit strategy for this operator.

Source

pub fn emit_strategy(&self) -> &EmitStrategy

Returns the current emit strategy.

Source

pub fn set_late_data_config(&mut self, config: LateDataConfig)

Sets the late data handling configuration.

Source

pub fn late_data_config(&self) -> &LateDataConfig

Returns the current late data configuration.

Source

pub fn late_data_metrics(&self) -> &LateDataMetrics

Returns the late data metrics.

Source

pub fn reset_late_data_metrics(&mut self)

Resets the late data metrics counters.

Source

pub fn window_close_metrics(&self) -> &WindowCloseMetrics

Returns the window close metrics.

Source

pub fn reset_window_close_metrics(&mut self)

Resets the window close metrics counters.

Source

pub fn gap_ms(&self) -> i64

Returns the gap timeout in milliseconds.

Source

pub fn allowed_lateness_ms(&self) -> i64

Returns the allowed lateness in milliseconds.

Source

pub fn active_session_count(&self) -> usize

Returns the number of active sessions across all keys.

Trait Implementations§

Source§

impl<A: Aggregator> Operator for SessionWindowOperator<A>
where A::Acc: 'static + Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, Error>>, <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, Error>> + RkyvDeserialize<A::Acc, HighDeserializer<Error>>,

Source§

fn process(&mut self, event: &Event, ctx: &mut OperatorContext<'_>) -> OutputVec

Process an incoming event
Source§

fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext<'_>) -> OutputVec

Handle timer expiration
Source§

fn checkpoint(&self) -> OperatorState

Checkpoint the operator’s state
Source§

fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>

Restore from a checkpoint Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> ArchivePointee for T

§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> LayoutRaw for T

§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
§

impl<T> Pointee for T

§

type Metadata = ()

The metadata type for pointers and references to this type.
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T> Scope for T

§

fn with<F, R>(self, f: F) -> R
where Self: Sized, F: FnOnce(Self) -> R,

Scoped with ownership.
§

fn with_ref<F, R>(&self, f: F) -> R
where F: FnOnce(&Self) -> R,

Scoped with reference.
§

fn with_mut<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut Self) -> R,

Scoped with mutable reference.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,

§

impl<T> Value for T
where T: Send + Sync + 'static,