Skip to main content

laminar_core/operator/
session_window.rs

1//! # Session Window Operators
2//!
3//! Implementation of session windows for stream processing.
4//!
5//! Session windows are dynamic windows that group events by activity periods
6//! separated by gaps. Unlike tumbling and sliding windows which have fixed
7//! boundaries, session windows grow with activity and close after inactivity.
8//!
9//! ## Key Characteristics
10//!
11//! - **Dynamic boundaries**: Sessions start with the first event and extend
12//!   with each new event within the gap period
13//! - **Per-key tracking**: Each key maintains independent session state
14//! - **Gap-based closure**: Sessions close when no events arrive within the gap
15//! - **Session merging**: Late data can merge previously separate sessions
16//!
17//! ## Example
18//!
19//! ```text
20//! Gap: 30 seconds
21//!
22//! Events: [t=0] [t=10] [t=20]  ...gap...  [t=100] [t=110]
23//!         |<---- Session 1 ---->|          |<- Session 2 ->|
24//!         [0, 50)                          [100, 140)
25//! ```
26//!
27//! ## Usage
28//!
29//! ```rust,no_run
30//! use laminar_core::operator::session_window::SessionWindowOperator;
31//! use laminar_core::operator::window::CountAggregator;
32//! use std::time::Duration;
33//!
34//! // Create a session window with 30-second gap
35//! let operator = SessionWindowOperator::new(
36//!     Duration::from_secs(30),    // gap timeout
37//!     CountAggregator::new(),
38//!     Duration::from_secs(60),    // allowed lateness
39//! );
40//! ```
41
42use super::window::{
43    Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
44    ResultToArrow, WindowCloseMetrics, WindowId,
45};
46use super::{
47    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec,
48    SideOutputData, Timer,
49};
50use crate::state::{StateStore, StateStoreExt};
51use arrow_array::{Array, Int64Array, RecordBatch};
52use arrow_schema::{DataType, Field, Schema, SchemaRef};
53use rkyv::{
54    api::high::{HighDeserializer, HighSerializer, HighValidator},
55    bytecheck::CheckBytes,
56    rancor::Error as RkyvError,
57    ser::allocator::ArenaHandle,
58    util::AlignedVec,
59    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
60};
61use rustc_hash::FxHashMap;
62use smallvec::SmallVec;
63use std::marker::PhantomData;
64use std::sync::atomic::{AtomicU64, Ordering};
65use std::sync::Arc;
66use std::time::Duration;
67
68/// State key prefix for session index (4 bytes)
69const SESSION_INDEX_PREFIX: &[u8; 4] = b"six:";
70
71/// State key prefix for session accumulator (4 bytes)
72const SESSION_ACC_PREFIX: &[u8; 4] = b"sac:";
73
74/// Timer key prefix for session closure (1 byte)
75const SESSION_TIMER_PREFIX: u8 = 0x01;
76
77/// Default maximum number of session indices to cache in-memory.
78/// Beyond this, the cache is cleared to prevent unbounded memory growth.
79const DEFAULT_MAX_CACHED_INDICES: usize = 16_384;
80
81/// Static counter for generating unique operator IDs.
82static SESSION_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
83
84/// Unique identifier for a session instance.
85///
86/// Generated by XOR-ing a hash of the operator ID with a monotonic counter,
87/// ensuring uniqueness within and across operators.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Archive, RkyvSerialize, RkyvDeserialize)]
89pub struct SessionId(u64);
90
91impl SessionId {
92    /// Generates a new unique session ID.
93    ///
94    /// Combines a hash of the operator ID with a monotonic counter to produce
95    /// IDs that are unique across operators and invocations.
96    pub fn generate(operator_id: &str, counter: &AtomicU64) -> Self {
97        let op_hash = {
98            use std::hash::{Hash, Hasher};
99            let mut hasher = rustc_hash::FxHasher::default();
100            operator_id.as_bytes().hash(&mut hasher);
101            hasher.finish()
102        };
103        let seq = counter.fetch_add(1, Ordering::Relaxed);
104        Self(op_hash ^ seq)
105    }
106
107    /// Serializes this session ID to a big-endian byte array.
108    #[must_use]
109    pub fn to_bytes(self) -> [u8; 8] {
110        self.0.to_be_bytes()
111    }
112
113    /// Deserializes a session ID from a big-endian byte slice.
114    ///
115    /// Returns `None` if the slice is not exactly 8 bytes.
116    #[must_use]
117    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
118        let arr: [u8; 8] = bytes.try_into().ok()?;
119        Some(Self(u64::from_be_bytes(arr)))
120    }
121
122    /// Returns the raw `u64` value.
123    #[must_use]
124    pub fn as_u64(self) -> u64 {
125        self.0
126    }
127}
128
129/// Metadata for a single session within a key's session index.
130///
131/// Tracks the temporal bounds and emission state of a session. The accumulator
132/// state is stored separately under `sac:<session_id>`.
133#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
134pub struct SessionMetadata {
135    /// Unique session identifier.
136    pub id: SessionId,
137    /// Session start timestamp (inclusive).
138    pub start: i64,
139    /// Session end timestamp (exclusive, = last event time + gap).
140    pub end: i64,
141    /// Whether this session has been emitted (used for retraction tracking).
142    pub emitted: bool,
143}
144
145impl SessionMetadata {
146    /// Creates a new session from a first event timestamp.
147    ///
148    /// The session spans `[timestamp, timestamp + gap_ms)`.
149    fn new(id: SessionId, timestamp: i64, gap_ms: i64) -> Self {
150        Self {
151            id,
152            start: timestamp,
153            end: timestamp + gap_ms,
154            emitted: false,
155        }
156    }
157
158    /// Returns the window ID for this session.
159    #[must_use]
160    pub fn window_id(&self) -> WindowId {
161        WindowId::new(self.start, self.end)
162    }
163
164    /// Checks whether an event at `timestamp` would overlap this session.
165    ///
166    /// An event overlaps if its potential session `[timestamp, timestamp + gap_ms)`
167    /// intersects with this session's bounds `[start, end)`.
168    fn overlaps(&self, timestamp: i64, gap_ms: i64) -> bool {
169        let event_start = timestamp;
170        let event_end = timestamp + gap_ms;
171        // Two intervals [a, b) and [c, d) overlap when a < d && c < b
172        event_start < self.end && self.start < event_end
173    }
174
175    /// Extends this session to include a new event timestamp.
176    ///
177    /// Widens the bounds: start becomes `min(start, timestamp)`,
178    /// end becomes `max(end, timestamp + gap_ms)`.
179    fn extend(&mut self, timestamp: i64, gap_ms: i64) {
180        self.start = self.start.min(timestamp);
181        self.end = self.end.max(timestamp + gap_ms);
182    }
183
184    /// Merges another session into this one.
185    ///
186    /// Takes the union of the two sessions' bounds. Used for
187    /// session merging when late data bridges two previously separate sessions.
188    fn merge(&mut self, other: &SessionMetadata) {
189        self.start = self.start.min(other.start);
190        self.end = self.end.max(other.end);
191    }
192}
193
194/// Index of all active sessions for a single key.
195///
196/// Sessions are kept sorted by start time for efficient overlap queries.
197/// Stored in the state store under `six:<key_hash>`.
198#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
199pub struct SessionIndex {
200    /// Active sessions sorted by start time.
201    pub sessions: Vec<SessionMetadata>,
202}
203
204impl SessionIndex {
205    /// Inserts a session and maintains sorted order by start time.
206    fn insert(&mut self, session: SessionMetadata) {
207        let pos = self
208            .sessions
209            .binary_search_by_key(&session.start, |s| s.start)
210            .unwrap_or_else(|pos| pos);
211        self.sessions.insert(pos, session);
212    }
213
214    /// Removes a session by ID, returning it if found.
215    fn remove(&mut self, id: SessionId) -> Option<SessionMetadata> {
216        if let Some(pos) = self.sessions.iter().position(|s| s.id == id) {
217            Some(self.sessions.remove(pos))
218        } else {
219            None
220        }
221    }
222
223    /// Finds all sessions that overlap with an event at `timestamp`.
224    fn find_overlapping(&self, timestamp: i64, gap_ms: i64) -> SmallVec<[SessionId; 4]> {
225        self.sessions
226            .iter()
227            .filter(|s| s.overlaps(timestamp, gap_ms))
228            .map(|s| s.id)
229            .collect()
230    }
231
232    /// Returns a reference to the session with the given ID.
233    fn get(&self, id: SessionId) -> Option<&SessionMetadata> {
234        self.sessions.iter().find(|s| s.id == id)
235    }
236
237    /// Returns a mutable reference to the session with the given ID.
238    fn get_mut(&mut self, id: SessionId) -> Option<&mut SessionMetadata> {
239        self.sessions.iter_mut().find(|s| s.id == id)
240    }
241
242    /// Returns the number of active sessions.
243    #[must_use]
244    fn len(&self) -> usize {
245        self.sessions.len()
246    }
247
248    /// Returns `true` if there are no active sessions.
249    #[must_use]
250    fn is_empty(&self) -> bool {
251        self.sessions.is_empty()
252    }
253}
254
255/// Creates the standard session output schema.
256/// Session window operator.
257///
258/// Groups events by activity periods separated by gaps. Each unique key
259/// maintains its own session state independently, supporting **multiple
260/// concurrent sessions per key**.
261///
262/// # Session Lifecycle
263///
264/// 1. **Start**: First event for a key creates a new session
265/// 2. **Extend**: Events within gap period extend the session
266/// 3. **Close**: Timer fires when gap expires, emitting results
267/// 4. **Merge**: Late data may merge previously separate sessions
268///
269/// # State Management
270///
271/// Session state is stored using prefixed keys:
272/// - `six:<key_hash>` - [`SessionIndex`] (all sessions for a key)
273/// - `sac:<session_id>` - Per-session accumulator state
274///
275/// # Emit Strategies
276///
277/// - `OnWatermark`: Emit when watermark passes session end
278/// - `OnUpdate`: Emit after every state update
279/// - `OnWindowClose`: Only emit on final closure
280/// - `Changelog`: Emit CDC records with Z-set weights
281/// - `Final`: Suppress all intermediate, drop late data
282pub struct SessionWindowOperator<A: Aggregator> {
283    /// Gap timeout in milliseconds
284    gap_ms: i64,
285    /// Aggregator function
286    aggregator: A,
287    /// Allowed lateness for late data
288    allowed_lateness_ms: i64,
289    /// Monotonic counter for generating unique session IDs
290    session_id_counter: AtomicU64,
291    /// Per-key session indices (in-memory cache, backed by state store).
292    ///
293    /// Bounded by `max_cached_indices` to prevent unbounded memory growth
294    /// in high-cardinality key spaces. Evicted entries are reloaded from
295    /// the state store on demand.
296    session_indices: FxHashMap<u64, SessionIndex>,
297    /// Maximum number of session indices to keep in-memory.
298    max_cached_indices: usize,
299    /// Pending timers: `session_id` -> (`timer_time`, `key_hash`)
300    pending_timers: FxHashMap<u64, (i64, u64)>,
301    /// Emit strategy
302    emit_strategy: EmitStrategy,
303    /// Late data configuration
304    late_data_config: LateDataConfig,
305    /// Late data metrics
306    late_data_metrics: LateDataMetrics,
307    /// Window close metrics
308    window_close_metrics: WindowCloseMetrics,
309    /// Operator ID for checkpointing
310    operator_id: String,
311    /// Cached output schema
312    output_schema: SchemaRef,
313    /// Key column index for partitioning (None = global session)
314    key_column: Option<usize>,
315    /// Whether timers need re-registration with `TimerService` after restore
316    needs_timer_reregistration: bool,
317    /// Phantom data for accumulator type
318    _phantom: PhantomData<A::Acc>,
319}
320
321impl<A: Aggregator> SessionWindowOperator<A>
322where
323    A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
324    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
325        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
326{
327    /// Creates a new session window operator.
328    ///
329    /// # Arguments
330    ///
331    /// * `gap` - The inactivity gap that closes a session
332    /// * `aggregator` - Aggregation function to apply within sessions
333    /// * `allowed_lateness` - Grace period for late data after session close
334    ///
335    /// # Panics
336    ///
337    /// Panics if gap or allowed lateness does not fit in i64.
338    pub fn new(gap: Duration, aggregator: A, allowed_lateness: Duration) -> Self {
339        let operator_num = SESSION_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
340        let output_schema = Arc::new(Schema::new(vec![
341            Field::new("window_start", DataType::Int64, false),
342            Field::new("window_end", DataType::Int64, false),
343            Field::new(
344                "result",
345                aggregator.output_data_type(),
346                aggregator.output_nullable(),
347            ),
348        ]));
349        Self {
350            gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
351            aggregator,
352            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
353                .expect("Allowed lateness must fit in i64"),
354            session_id_counter: AtomicU64::new(0),
355            session_indices: FxHashMap::default(),
356            max_cached_indices: DEFAULT_MAX_CACHED_INDICES,
357            pending_timers: FxHashMap::default(),
358            emit_strategy: EmitStrategy::default(),
359            late_data_config: LateDataConfig::default(),
360            late_data_metrics: LateDataMetrics::new(),
361            window_close_metrics: WindowCloseMetrics::new(),
362            operator_id: format!("session_window_{operator_num}"),
363            output_schema,
364            key_column: None,
365            needs_timer_reregistration: false,
366            _phantom: PhantomData,
367        }
368    }
369
370    /// Creates a new session window operator with a custom operator ID.
371    ///
372    /// # Panics
373    ///
374    /// Panics if gap or allowed lateness does not fit in i64.
375    pub fn with_id(
376        gap: Duration,
377        aggregator: A,
378        allowed_lateness: Duration,
379        operator_id: String,
380    ) -> Self {
381        let output_schema = Arc::new(Schema::new(vec![
382            Field::new("window_start", DataType::Int64, false),
383            Field::new("window_end", DataType::Int64, false),
384            Field::new(
385                "result",
386                aggregator.output_data_type(),
387                aggregator.output_nullable(),
388            ),
389        ]));
390        Self {
391            gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
392            aggregator,
393            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
394                .expect("Allowed lateness must fit in i64"),
395            session_id_counter: AtomicU64::new(0),
396            session_indices: FxHashMap::default(),
397            max_cached_indices: DEFAULT_MAX_CACHED_INDICES,
398            pending_timers: FxHashMap::default(),
399            emit_strategy: EmitStrategy::default(),
400            late_data_config: LateDataConfig::default(),
401            late_data_metrics: LateDataMetrics::new(),
402            window_close_metrics: WindowCloseMetrics::new(),
403            operator_id,
404            output_schema,
405            key_column: None,
406            needs_timer_reregistration: false,
407            _phantom: PhantomData,
408        }
409    }
410
411    /// Sets the maximum number of session indices to cache in-memory.
412    ///
413    /// When the cache exceeds this limit, it is cleared and entries are
414    /// reloaded from the persistent state store on demand. Default: 16,384.
415    pub fn set_max_cached_indices(&mut self, max: usize) {
416        self.max_cached_indices = max;
417    }
418
419    /// Sets the key column for per-key session tracking.
420    ///
421    /// If not set, a single global session is maintained.
422    pub fn set_key_column(&mut self, column_index: usize) {
423        self.key_column = Some(column_index);
424    }
425
426    /// Returns the key column index if set.
427    #[must_use]
428    pub fn key_column(&self) -> Option<usize> {
429        self.key_column
430    }
431
432    /// Sets the emit strategy for this operator.
433    pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
434        self.emit_strategy = strategy;
435    }
436
437    /// Returns the current emit strategy.
438    #[must_use]
439    pub fn emit_strategy(&self) -> &EmitStrategy {
440        &self.emit_strategy
441    }
442
443    /// Sets the late data handling configuration.
444    pub fn set_late_data_config(&mut self, config: LateDataConfig) {
445        self.late_data_config = config;
446    }
447
448    /// Returns the current late data configuration.
449    #[must_use]
450    pub fn late_data_config(&self) -> &LateDataConfig {
451        &self.late_data_config
452    }
453
454    /// Returns the late data metrics.
455    #[must_use]
456    pub fn late_data_metrics(&self) -> &LateDataMetrics {
457        &self.late_data_metrics
458    }
459
460    /// Resets the late data metrics counters.
461    pub fn reset_late_data_metrics(&mut self) {
462        self.late_data_metrics.reset();
463    }
464
465    /// Returns the window close metrics.
466    #[must_use]
467    pub fn window_close_metrics(&self) -> &WindowCloseMetrics {
468        &self.window_close_metrics
469    }
470
471    /// Resets the window close metrics counters.
472    pub fn reset_window_close_metrics(&mut self) {
473        self.window_close_metrics.reset();
474    }
475
476    /// Returns the gap timeout in milliseconds.
477    #[must_use]
478    pub fn gap_ms(&self) -> i64 {
479        self.gap_ms
480    }
481
482    /// Returns the allowed lateness in milliseconds.
483    #[must_use]
484    pub fn allowed_lateness_ms(&self) -> i64 {
485        self.allowed_lateness_ms
486    }
487
488    /// Returns the number of active sessions across all keys.
489    #[must_use]
490    pub fn active_session_count(&self) -> usize {
491        self.session_indices.values().map(SessionIndex::len).sum()
492    }
493
494    /// Extracts the key from an event.
495    fn extract_key(&self, event: &Event) -> SmallVec<[u8; 16]> {
496        use arrow_array::cast::AsArray;
497        use arrow_array::types::Int64Type;
498
499        if let Some(col_idx) = self.key_column {
500            if col_idx < event.data.num_columns() {
501                let column = event.data.column(col_idx);
502                if let Some(array) = column.as_primitive_opt::<Int64Type>() {
503                    if !array.is_empty() && !array.is_null(0) {
504                        return SmallVec::from_slice(&array.value(0).to_be_bytes());
505                    }
506                }
507                // Try string column
508                if let Some(array) = column.as_string_opt::<i32>() {
509                    if !array.is_empty() && !array.is_null(0) {
510                        return SmallVec::from_slice(array.value(0).as_bytes());
511                    }
512                }
513            }
514        }
515        // Default: global session (empty key)
516        SmallVec::new()
517    }
518
519    /// Computes a hash for the key.
520    fn key_hash(key: &[u8]) -> u64 {
521        use std::hash::{Hash, Hasher};
522        let mut hasher = rustc_hash::FxHasher::default();
523        key.hash(&mut hasher);
524        hasher.finish()
525    }
526
527    /// Generates the state key for a session index (per key hash).
528    fn session_index_key(key_hash: u64) -> [u8; 12] {
529        let mut key = [0u8; 12];
530        key[..4].copy_from_slice(SESSION_INDEX_PREFIX);
531        key[4..12].copy_from_slice(&key_hash.to_be_bytes());
532        key
533    }
534
535    /// Generates the state key for a session accumulator (per session ID).
536    fn session_acc_key(session_id: SessionId) -> [u8; 12] {
537        let mut key = [0u8; 12];
538        key[..4].copy_from_slice(SESSION_ACC_PREFIX);
539        key[4..12].copy_from_slice(&session_id.to_bytes());
540        key
541    }
542
543    /// Generates the timer key for session closure (keyed by session ID).
544    fn timer_key(session_id: SessionId) -> super::TimerKey {
545        let mut key = super::TimerKey::new();
546        key.push(SESSION_TIMER_PREFIX);
547        key.extend_from_slice(&session_id.to_bytes());
548        key
549    }
550
551    /// Parses a [`SessionId`] from a timer key.
552    fn session_id_from_timer(timer_key: &[u8]) -> Option<SessionId> {
553        if timer_key.len() != 9 || timer_key[0] != SESSION_TIMER_PREFIX {
554            return None;
555        }
556        SessionId::from_bytes(&timer_key[1..9])
557    }
558
559    /// Loads the session index for a key, checking the in-memory cache first
560    /// and falling back to the persistent state store.
561    ///
562    /// If the cache exceeds `max_cached_indices`, it is cleared before
563    /// inserting the new entry. This prevents unbounded memory growth
564    /// in high-cardinality key spaces while keeping hot entries fast.
565    fn load_session_index(&mut self, key_hash: u64, state: &dyn StateStore) -> SessionIndex {
566        // Check in-memory cache first
567        if let Some(index) = self.session_indices.get(&key_hash) {
568            return index.clone();
569        }
570
571        // Evict the cache if it exceeds the limit. All entries are backed
572        // by the persistent state store, so they can be reloaded on demand.
573        if self.session_indices.len() >= self.max_cached_indices {
574            self.session_indices.clear();
575        }
576
577        // Check persistent state
578        let state_key = Self::session_index_key(key_hash);
579        if let Ok(Some(index)) = state.get_typed::<SessionIndex>(&state_key) {
580            self.session_indices.insert(key_hash, index.clone());
581            return index;
582        }
583
584        SessionIndex::default()
585    }
586
587    /// Persists the session index for a key. Removes the state entry if empty.
588    fn store_session_index(
589        &mut self,
590        key_hash: u64,
591        index: &SessionIndex,
592        state: &mut dyn StateStore,
593    ) -> Result<(), OperatorError> {
594        let state_key = Self::session_index_key(key_hash);
595        if index.is_empty() {
596            state
597                .delete(&state_key)
598                .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
599            self.session_indices.remove(&key_hash);
600        } else {
601            state
602                .put_typed(&state_key, index)
603                .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
604            self.session_indices.insert(key_hash, index.clone());
605        }
606        Ok(())
607    }
608
609    /// Loads the accumulator for a session, creating a fresh one if absent.
610    fn load_accumulator(&self, session_id: SessionId, state: &dyn StateStore) -> A::Acc {
611        let acc_key = Self::session_acc_key(session_id);
612        state
613            .get_typed::<A::Acc>(&acc_key)
614            .ok()
615            .flatten()
616            .unwrap_or_else(|| self.aggregator.create_accumulator())
617    }
618
619    /// Persists the accumulator for a session.
620    fn store_accumulator(
621        session_id: SessionId,
622        acc: &A::Acc,
623        state: &mut dyn StateStore,
624    ) -> Result<(), OperatorError> {
625        let acc_key = Self::session_acc_key(session_id);
626        state
627            .put_typed(&acc_key, acc)
628            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
629    }
630
631    /// Deletes the accumulator for a session.
632    fn delete_accumulator(
633        session_id: SessionId,
634        state: &mut dyn StateStore,
635    ) -> Result<(), OperatorError> {
636        let acc_key = Self::session_acc_key(session_id);
637        state
638            .delete(&acc_key)
639            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
640    }
641
642    /// Registers or updates a timer for session closure.
643    fn register_timer(
644        &mut self,
645        session_id: SessionId,
646        key_hash: u64,
647        session: &SessionMetadata,
648        ctx: &mut OperatorContext,
649    ) {
650        let trigger_time = session.end + self.allowed_lateness_ms;
651
652        // Cancel previous timer if different
653        if let Some(&(old_time, _)) = self.pending_timers.get(&session_id.as_u64()) {
654            if old_time == trigger_time {
655                return; // Timer already set for correct time
656            }
657            // Note: We can't cancel timers, but the handler will check staleness
658        }
659
660        let timer_key = Self::timer_key(session_id);
661        ctx.timers
662            .register_timer(trigger_time, Some(timer_key), Some(ctx.operator_index));
663        self.pending_timers
664            .insert(session_id.as_u64(), (trigger_time, key_hash));
665    }
666
667    /// Checks if an event is late for its potential session.
668    fn is_late(&self, timestamp: i64, watermark: i64) -> bool {
669        // An event is late if its session would have already closed
670        // Session end = timestamp + gap, cleanup = session end + allowed lateness
671        let potential_cleanup = timestamp + self.gap_ms + self.allowed_lateness_ms;
672        watermark >= potential_cleanup
673    }
674
675    /// Creates an output event from a session.
676    fn create_output(&self, session: &SessionMetadata, acc: &A::Acc) -> Option<Event> {
677        if acc.is_empty() {
678            return None;
679        }
680
681        let result = acc.result();
682        let result_array = result.to_arrow_array();
683
684        let batch = RecordBatch::try_new(
685            Arc::clone(&self.output_schema),
686            vec![
687                Arc::new(Int64Array::from(vec![session.start])),
688                Arc::new(Int64Array::from(vec![session.end])),
689                result_array,
690            ],
691        )
692        .ok()?;
693
694        Some(Event::new(session.end, batch))
695    }
696
697    /// Merges multiple overlapping sessions into one.
698    ///
699    /// The first session in `overlapping` becomes the "winner" that absorbs all
700    /// others. Each "loser" session has its accumulator merged into the winner,
701    /// its timer removed, its state cleaned up, and (for `Changelog` strategy)
702    /// a retraction emitted if it was previously emitted.
703    ///
704    /// Returns the winner's [`SessionId`]. The caller is responsible for
705    /// extending the winner to include the new event's timestamp and adding
706    /// the current event's value to the accumulator.
707    fn merge_sessions(
708        &mut self,
709        index: &mut SessionIndex,
710        overlapping: &[SessionId],
711        ctx: &mut OperatorContext,
712        output: &mut OutputVec,
713    ) -> SessionId {
714        let winner_id = overlapping[0];
715
716        // Load the winner's accumulator so we can merge losers into it
717        let mut winner_acc = self.load_accumulator(winner_id, ctx.state);
718
719        // If the winner was previously emitted and strategy is Changelog,
720        // retract the winner's old state (since the merged result will differ)
721        if matches!(self.emit_strategy, EmitStrategy::Changelog) {
722            if let Some(winner_meta) = index.get(winner_id) {
723                if winner_meta.emitted {
724                    if let Some(old_evt) = self.create_output(winner_meta, &winner_acc) {
725                        output.push(Output::Changelog(ChangelogRecord::delete(
726                            old_evt,
727                            ctx.processing_time,
728                        )));
729                    }
730                }
731            }
732        }
733
734        // Process each loser: merge accumulator, emit retraction, clean up
735        for &loser_id in &overlapping[1..] {
736            let loser_acc = self.load_accumulator(loser_id, ctx.state);
737
738            // Emit retraction for the loser if it was previously emitted
739            if matches!(self.emit_strategy, EmitStrategy::Changelog) {
740                if let Some(loser_meta) = index.get(loser_id) {
741                    if loser_meta.emitted {
742                        if let Some(old_evt) = self.create_output(loser_meta, &loser_acc) {
743                            output.push(Output::Changelog(ChangelogRecord::delete(
744                                old_evt,
745                                ctx.processing_time,
746                            )));
747                        }
748                    }
749                }
750            }
751
752            // Merge loser's accumulator into winner
753            winner_acc.merge(&loser_acc);
754
755            // Merge loser's metadata bounds into winner
756            if let Some(loser_meta) = index.get(loser_id).cloned() {
757                if let Some(winner_meta) = index.get_mut(winner_id) {
758                    winner_meta.merge(&loser_meta);
759                }
760            }
761
762            // Clean up loser: delete accumulator, remove timer, remove from index
763            let _ = Self::delete_accumulator(loser_id, ctx.state);
764            self.pending_timers.remove(&loser_id.as_u64());
765            index.remove(loser_id);
766        }
767
768        // Store the merged accumulator for the winner
769        let _ = Self::store_accumulator(winner_id, &winner_acc, ctx.state);
770
771        winner_id
772    }
773}
774
775impl<A: Aggregator> Operator for SessionWindowOperator<A>
776where
777    A::Acc: 'static
778        + Archive
779        + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
780    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
781        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
782{
783    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
784        // Lazy timer re-registration after restore (restore() has no OperatorContext)
785        if self.needs_timer_reregistration {
786            self.needs_timer_reregistration = false;
787            for (&sid_raw, &(trigger_time, _key_hash)) in &self.pending_timers {
788                let timer_key = Self::timer_key(SessionId(sid_raw));
789                ctx.timers
790                    .register_timer(trigger_time, Some(timer_key), Some(ctx.operator_index));
791            }
792        }
793
794        let event_time = event.timestamp;
795        let mut output = OutputVec::new();
796
797        // Update watermark
798        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
799
800        // Check if event is late
801        let current_wm = ctx.watermark_generator.current_watermark();
802        if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
803            // EMIT FINAL drops late data entirely
804            if self.emit_strategy.drops_late_data() {
805                self.late_data_metrics.record_dropped();
806                return output;
807            }
808
809            if let Some(side_output_name) = self.late_data_config.side_output() {
810                self.late_data_metrics.record_side_output();
811                output.push(Output::SideOutput(Box::new(SideOutputData {
812                    name: Arc::from(side_output_name),
813                    event: event.clone(),
814                })));
815            } else {
816                self.late_data_metrics.record_dropped();
817                output.push(Output::LateEvent(event.clone()));
818            }
819            return output;
820        }
821
822        // Extract key and compute hash
823        let key = self.extract_key(event);
824        let key_hash = Self::key_hash(&key);
825
826        // Load session index for this key
827        let mut index = self.load_session_index(key_hash, ctx.state);
828        let overlapping = index.find_overlapping(event_time, self.gap_ms);
829
830        let session_id;
831        match overlapping.len() {
832            0 => {
833                // No overlapping session — create a new one
834                let id = SessionId::generate(&self.operator_id, &self.session_id_counter);
835                let session = SessionMetadata::new(id, event_time, self.gap_ms);
836                index.insert(session);
837                session_id = id;
838            }
839            1 => {
840                // Exactly one overlap — extend it
841                session_id = overlapping[0];
842                if let Some(session) = index.get_mut(session_id) {
843                    session.extend(event_time, self.gap_ms);
844                }
845            }
846            _ => {
847                // Multiple overlaps — merge all into one session
848                session_id = self.merge_sessions(&mut index, &overlapping, ctx, &mut output);
849                // Extend the merged winner to include this event
850                if let Some(session) = index.get_mut(session_id) {
851                    session.extend(event_time, self.gap_ms);
852                }
853            }
854        }
855
856        // Load and update accumulator (handles multi-row batches)
857        let mut acc = self.load_accumulator(session_id, ctx.state);
858        let values = self.aggregator.extract_batch(event);
859        for value in values {
860            acc.add(value);
861        }
862
863        // Persist accumulator
864        let _ = Self::store_accumulator(session_id, &acc, ctx.state);
865
866        // Register timer for the affected session
867        if let Some(session) = index.get(session_id) {
868            self.register_timer(session_id, key_hash, session, ctx);
869
870            // Handle emit strategy
871            match &self.emit_strategy {
872                EmitStrategy::OnUpdate => {
873                    if let Some(evt) = self.create_output(session, &acc) {
874                        output.push(Output::Event(evt));
875                    }
876                    if let Some(s) = index.get_mut(session_id) {
877                        s.emitted = true;
878                    }
879                }
880                EmitStrategy::Changelog => {
881                    if let Some(evt) = self.create_output(session, &acc) {
882                        let record = ChangelogRecord::insert(evt, ctx.processing_time);
883                        output.push(Output::Changelog(record));
884                    }
885                    if let Some(s) = index.get_mut(session_id) {
886                        s.emitted = true;
887                    }
888                }
889                // Other strategies: no intermediate emission
890                EmitStrategy::OnWatermark
891                | EmitStrategy::Periodic(_)
892                | EmitStrategy::OnWindowClose
893                | EmitStrategy::Final => {}
894            }
895        }
896
897        // Persist session index after emit (so emitted flag is saved)
898        let _ = self.store_session_index(key_hash, &index, ctx.state);
899
900        // Emit watermark if generated
901        if let Some(wm) = emitted_watermark {
902            output.push(Output::Watermark(wm.timestamp()));
903        }
904
905        output
906    }
907
908    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
909        let mut output = OutputVec::new();
910
911        // Parse session ID from timer key
912        let Some(session_id) = Self::session_id_from_timer(&timer.key) else {
913            return output;
914        };
915
916        // Look up the pending timer entry: (expected_time, key_hash)
917        let Some(&(expected_time, key_hash)) = self.pending_timers.get(&session_id.as_u64()) else {
918            return output; // Timer was cancelled or session no longer exists
919        };
920
921        if expected_time != timer.timestamp {
922            return output; // Stale timer, session was extended
923        }
924
925        // Load session index for the owning key
926        let mut index = self.load_session_index(key_hash, ctx.state);
927        let Some(session) = index.get(session_id).cloned() else {
928            self.pending_timers.remove(&session_id.as_u64());
929            return output;
930        };
931
932        // Load accumulator and create output
933        let acc = self.load_accumulator(session_id, ctx.state);
934        if let Some(event) = self.create_output(&session, &acc) {
935            // Record window close metrics
936            self.window_close_metrics
937                .record_close(session.end, ctx.processing_time);
938
939            match &self.emit_strategy {
940                EmitStrategy::Changelog => {
941                    let record = ChangelogRecord::insert(event, ctx.processing_time);
942                    output.push(Output::Changelog(record));
943                }
944                _ => {
945                    output.push(Output::Event(event));
946                }
947            }
948        }
949
950        // Clean up: remove session from index, delete accumulator, update state
951        index.remove(session_id);
952        let _ = Self::delete_accumulator(session_id, ctx.state);
953        let _ = self.store_session_index(key_hash, &index, ctx.state);
954        self.pending_timers.remove(&session_id.as_u64());
955
956        output
957    }
958
959    fn checkpoint(&self) -> OperatorState {
960        // Serialize pending timers: (session_id, timer_time, key_hash)
961        let timer_entries: Vec<(u64, i64, u64)> = self
962            .pending_timers
963            .iter()
964            .map(|(&sid, &(time, kh))| (sid, time, kh))
965            .collect();
966        let counter_val = self.session_id_counter.load(Ordering::Relaxed);
967
968        // Checkpoint format: (timer_entries, session_id_counter)
969        let checkpoint_data: (Vec<(u64, i64, u64)>, u64) = (timer_entries, counter_val);
970
971        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
972            .map(|v| v.to_vec())
973            .unwrap_or_default();
974
975        OperatorState {
976            operator_id: self.operator_id.clone(),
977            data,
978        }
979    }
980
981    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
982        if state.operator_id != self.operator_id {
983            return Err(OperatorError::StateAccessFailed(format!(
984                "Operator ID mismatch: expected {}, got {}",
985                self.operator_id, state.operator_id
986            )));
987        }
988
989        if state.data.is_empty() {
990            return Ok(());
991        }
992
993        let archived =
994            rkyv::access::<rkyv::Archived<(Vec<(u64, i64, u64)>, u64)>, RkyvError>(&state.data)
995                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
996        let (timers, counter_val) =
997            rkyv::deserialize::<(Vec<(u64, i64, u64)>, u64), RkyvError>(archived)
998                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
999
1000        self.pending_timers = timers
1001            .into_iter()
1002            .map(|(sid, time, kh)| (sid, (time, kh)))
1003            .collect();
1004        self.session_id_counter
1005            .store(counter_val, Ordering::Relaxed);
1006        self.needs_timer_reregistration = !self.pending_timers.is_empty();
1007        // Note: session_indices will be populated lazily from state store
1008
1009        Ok(())
1010    }
1011}
1012
1013/// Session metrics for monitoring.
1014#[derive(Debug, Clone, Default)]
1015pub struct SessionMetrics {
1016    /// Total sessions created
1017    pub sessions_created: u64,
1018    /// Total sessions closed
1019    pub sessions_closed: u64,
1020    /// Total sessions merged
1021    pub sessions_merged: u64,
1022    /// Current active sessions
1023    pub active_sessions: u64,
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028    use super::*;
1029    use crate::operator::window::{CountAggregator, SumAggregator};
1030    use crate::state::InMemoryStore;
1031    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
1032    use arrow_array::{Int64Array, RecordBatch};
1033    use arrow_schema::{DataType, Field, Schema};
1034
1035    fn create_test_event(timestamp: i64, value: i64) -> Event {
1036        let schema = Arc::new(Schema::new(vec![Field::new(
1037            "value",
1038            DataType::Int64,
1039            false,
1040        )]));
1041        let batch =
1042            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
1043        Event::new(timestamp, batch)
1044    }
1045
1046    fn create_keyed_event(timestamp: i64, key: i64, value: i64) -> Event {
1047        let schema = Arc::new(Schema::new(vec![
1048            Field::new("key", DataType::Int64, false),
1049            Field::new("value", DataType::Int64, false),
1050        ]));
1051        let batch = RecordBatch::try_new(
1052            schema,
1053            vec![
1054                Arc::new(Int64Array::from(vec![key])),
1055                Arc::new(Int64Array::from(vec![value])),
1056            ],
1057        )
1058        .unwrap();
1059        Event::new(timestamp, batch)
1060    }
1061
1062    fn create_test_context<'a>(
1063        timers: &'a mut TimerService,
1064        state: &'a mut dyn StateStore,
1065        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
1066    ) -> OperatorContext<'a> {
1067        OperatorContext {
1068            event_time: 0,
1069            processing_time: 0,
1070            timers,
1071            state,
1072            watermark_generator: watermark_gen,
1073            operator_index: 0,
1074        }
1075    }
1076
1077    /// Helper: get the first (`session_id`, `timer_time`, `key_hash`) from `pending_timers`.
1078    fn first_pending_timer(op: &SessionWindowOperator<impl Aggregator>) -> (SessionId, i64, u64) {
1079        let (&sid_raw, &(time, kh)) = op.pending_timers.iter().next().expect("no pending timer");
1080        (SessionId(sid_raw), time, kh)
1081    }
1082
1083    // ---------------------------------------------------------------
1084    //  SessionId tests
1085    // ---------------------------------------------------------------
1086
1087    #[test]
1088    fn test_session_id_generation() {
1089        let counter = AtomicU64::new(0);
1090        let id1 = SessionId::generate("op_a", &counter);
1091        let id2 = SessionId::generate("op_a", &counter);
1092        let id3 = SessionId::generate("op_b", &counter);
1093
1094        // Same operator, different counter → different IDs
1095        assert_ne!(id1, id2);
1096        // Different operator → different IDs (overwhelmingly likely)
1097        assert_ne!(id1, id3);
1098    }
1099
1100    #[test]
1101    fn test_session_id_bytes_roundtrip() {
1102        let original = SessionId(0x1234_5678_9ABC_DEF0);
1103        let bytes = original.to_bytes();
1104        let restored = SessionId::from_bytes(&bytes);
1105
1106        assert_eq!(restored, Some(original));
1107        assert_eq!(SessionId::from_bytes(&[0u8; 4]), None); // wrong length
1108    }
1109
1110    // ---------------------------------------------------------------
1111    //  SessionMetadata tests
1112    // ---------------------------------------------------------------
1113
1114    #[test]
1115    fn test_session_metadata_creation() {
1116        let id = SessionId(42);
1117        let meta = SessionMetadata::new(id, 1000, 5000);
1118
1119        assert_eq!(meta.id, id);
1120        assert_eq!(meta.start, 1000);
1121        assert_eq!(meta.end, 6000); // start + gap
1122        assert!(!meta.emitted);
1123    }
1124
1125    #[test]
1126    fn test_session_metadata_overlaps() {
1127        let id = SessionId(1);
1128        let meta = SessionMetadata::new(id, 1000, 5000); // [1000, 6000)
1129
1130        // Event whose session [ts, ts+gap) overlaps [1000, 6000)
1131        assert!(meta.overlaps(1000, 5000)); // [1000, 6000) exact match
1132        assert!(meta.overlaps(3000, 5000)); // [3000, 8000) overlaps
1133        assert!(meta.overlaps(5999, 5000)); // [5999, 10999)
1134        assert!(meta.overlaps(500, 5000)); // [500, 5500) overlaps
1135
1136        // Event at -1000 with gap 5000 → [-1000, 4000) overlaps [1000, 6000)
1137        assert!(meta.overlaps(-1000, 5000));
1138
1139        // Truly before: [-5001, -1) does not reach 1000
1140        assert!(!meta.overlaps(-5001, 5000));
1141
1142        // After: [6000, 11000) starts at session end → no overlap (half-open)
1143        assert!(!meta.overlaps(6000, 5000));
1144    }
1145
1146    #[test]
1147    fn test_session_metadata_extend() {
1148        let id = SessionId(1);
1149        let mut meta = SessionMetadata::new(id, 1000, 5000); // [1000, 6000)
1150
1151        // Extend forward
1152        meta.extend(8000, 5000);
1153        assert_eq!(meta.start, 1000);
1154        assert_eq!(meta.end, 13000); // 8000 + 5000
1155
1156        // Extend backward
1157        meta.extend(500, 5000);
1158        assert_eq!(meta.start, 500);
1159        assert_eq!(meta.end, 13000); // Unchanged (max)
1160    }
1161
1162    #[test]
1163    fn test_session_metadata_merge() {
1164        let mut s1 = SessionMetadata::new(SessionId(1), 1000, 5000); // [1000, 6000)
1165        let s2 = SessionMetadata::new(SessionId(2), 8000, 5000); // [8000, 13000)
1166
1167        s1.merge(&s2);
1168        assert_eq!(s1.start, 1000);
1169        assert_eq!(s1.end, 13000);
1170    }
1171
1172    #[test]
1173    fn test_session_metadata_window_id() {
1174        let meta = SessionMetadata::new(SessionId(1), 1000, 5000);
1175        let wid = meta.window_id();
1176        assert_eq!(wid.start, 1000);
1177        assert_eq!(wid.end, 6000);
1178    }
1179
1180    // ---------------------------------------------------------------
1181    //  SessionIndex tests
1182    // ---------------------------------------------------------------
1183
1184    #[test]
1185    fn test_session_index_sorted_insert() {
1186        let mut idx = SessionIndex::default();
1187
1188        let s3 = SessionMetadata::new(SessionId(3), 3000, 1000);
1189        let s1 = SessionMetadata::new(SessionId(1), 1000, 1000);
1190        let s2 = SessionMetadata::new(SessionId(2), 2000, 1000);
1191
1192        idx.insert(s3);
1193        idx.insert(s1);
1194        idx.insert(s2);
1195
1196        assert_eq!(idx.len(), 3);
1197        assert_eq!(idx.sessions[0].start, 1000);
1198        assert_eq!(idx.sessions[1].start, 2000);
1199        assert_eq!(idx.sessions[2].start, 3000);
1200    }
1201
1202    #[test]
1203    fn test_session_index_find_overlapping() {
1204        let mut idx = SessionIndex::default();
1205
1206        // Two non-overlapping sessions: [100, 600) and [2000, 2500)
1207        idx.insert(SessionMetadata::new(SessionId(1), 100, 500));
1208        idx.insert(SessionMetadata::new(SessionId(2), 2000, 500));
1209
1210        // Event at 300 with gap=500 → [300, 800) overlaps [100, 600)
1211        let hits = idx.find_overlapping(300, 500);
1212        assert_eq!(hits.as_slice(), &[SessionId(1)]);
1213
1214        // Event at 1500 with gap=500 → [1500, 2000) doesn't overlap either
1215        let hits = idx.find_overlapping(1500, 500);
1216        assert!(hits.is_empty());
1217
1218        // Event at 1800 with gap=500 → [1800, 2300) overlaps [2000, 2500)
1219        let hits = idx.find_overlapping(1800, 500);
1220        assert_eq!(hits.as_slice(), &[SessionId(2)]);
1221    }
1222
1223    #[test]
1224    fn test_session_index_remove() {
1225        let mut idx = SessionIndex::default();
1226        idx.insert(SessionMetadata::new(SessionId(1), 100, 500));
1227        idx.insert(SessionMetadata::new(SessionId(2), 2000, 500));
1228
1229        assert_eq!(idx.len(), 2);
1230
1231        let removed = idx.remove(SessionId(1));
1232        assert!(removed.is_some());
1233        assert_eq!(removed.unwrap().start, 100);
1234        assert_eq!(idx.len(), 1);
1235
1236        // Removing non-existent ID
1237        assert!(idx.remove(SessionId(99)).is_none());
1238    }
1239
1240    #[test]
1241    fn test_session_index_get_and_get_mut() {
1242        let mut idx = SessionIndex::default();
1243        idx.insert(SessionMetadata::new(SessionId(1), 100, 500));
1244
1245        assert!(idx.get(SessionId(1)).is_some());
1246        assert!(idx.get(SessionId(99)).is_none());
1247
1248        idx.get_mut(SessionId(1)).unwrap().emitted = true;
1249        assert!(idx.get(SessionId(1)).unwrap().emitted);
1250    }
1251
1252    // ---------------------------------------------------------------
1253    //  Operator creation
1254    // ---------------------------------------------------------------
1255
1256    #[test]
1257    fn test_session_operator_creation() {
1258        let aggregator = CountAggregator::new();
1259        let operator = SessionWindowOperator::new(
1260            Duration::from_secs(30),
1261            aggregator,
1262            Duration::from_secs(60),
1263        );
1264
1265        assert_eq!(operator.gap_ms(), 30_000);
1266        assert_eq!(operator.allowed_lateness_ms(), 60_000);
1267        assert_eq!(operator.active_session_count(), 0);
1268        assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
1269    }
1270
1271    #[test]
1272    fn test_session_operator_with_id() {
1273        let aggregator = CountAggregator::new();
1274        let operator = SessionWindowOperator::with_id(
1275            Duration::from_secs(30),
1276            aggregator,
1277            Duration::from_secs(0),
1278            "test_session".to_string(),
1279        );
1280
1281        assert_eq!(operator.operator_id, "test_session");
1282    }
1283
1284    // ---------------------------------------------------------------
1285    //  Timer key roundtrip (now uses SessionId)
1286    // ---------------------------------------------------------------
1287
1288    #[test]
1289    fn test_timer_key_roundtrip() {
1290        let sid = SessionId(0x1234_5678_9ABC_DEF0);
1291        let timer_key = SessionWindowOperator::<CountAggregator>::timer_key(sid);
1292        let parsed = SessionWindowOperator::<CountAggregator>::session_id_from_timer(&timer_key);
1293        assert_eq!(parsed, Some(sid));
1294    }
1295
1296    #[test]
1297    fn test_timer_key_invalid() {
1298        // Wrong prefix
1299        let invalid1 = vec![0x02, 0, 0, 0, 0, 0, 0, 0, 0];
1300        assert!(
1301            SessionWindowOperator::<CountAggregator>::session_id_from_timer(&invalid1).is_none()
1302        );
1303
1304        // Wrong length
1305        let invalid2 = vec![SESSION_TIMER_PREFIX, 0, 0, 0];
1306        assert!(
1307            SessionWindowOperator::<CountAggregator>::session_id_from_timer(&invalid2).is_none()
1308        );
1309    }
1310
1311    // ---------------------------------------------------------------
1312    //  Single event
1313    // ---------------------------------------------------------------
1314
1315    #[test]
1316    fn test_session_single_event() {
1317        let aggregator = CountAggregator::new();
1318        let mut operator = SessionWindowOperator::with_id(
1319            Duration::from_millis(1000),
1320            aggregator,
1321            Duration::from_millis(0),
1322            "test_op".to_string(),
1323        );
1324
1325        let mut timers = TimerService::new();
1326        let mut state = InMemoryStore::new();
1327        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1328
1329        let event = create_test_event(500, 1);
1330        {
1331            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1332            operator.process(&event, &mut ctx);
1333        }
1334
1335        assert_eq!(operator.active_session_count(), 1);
1336        assert_eq!(operator.pending_timers.len(), 1);
1337    }
1338
1339    // ---------------------------------------------------------------
1340    //  Multiple events in the same session
1341    // ---------------------------------------------------------------
1342
1343    #[test]
1344    fn test_session_multiple_events_same_session() {
1345        let aggregator = CountAggregator::new();
1346        let mut operator = SessionWindowOperator::with_id(
1347            Duration::from_millis(1000),
1348            aggregator,
1349            Duration::from_millis(0),
1350            "test_op".to_string(),
1351        );
1352
1353        let mut timers = TimerService::new();
1354        let mut state = InMemoryStore::new();
1355        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1356
1357        // Events within gap (1000ms)
1358        for ts in [100, 500, 900, 1500] {
1359            let event = create_test_event(ts, 1);
1360            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1361            operator.process(&event, &mut ctx);
1362        }
1363
1364        // All events should be in the same session
1365        assert_eq!(operator.active_session_count(), 1);
1366
1367        // Verify accumulator via the session index
1368        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1369        let index = operator.load_session_index(key_hash, &state);
1370        assert_eq!(index.len(), 1);
1371        let sid = index.sessions[0].id;
1372        let acc = operator.load_accumulator(sid, &state);
1373        assert_eq!(acc.result(), 4);
1374    }
1375
1376    // ---------------------------------------------------------------
1377    //  Gap creates a new session (multi-session per key)
1378    // ---------------------------------------------------------------
1379
1380    #[test]
1381    fn test_session_gap_creates_new_session() {
1382        let aggregator = CountAggregator::new();
1383        let mut operator = SessionWindowOperator::with_id(
1384            Duration::from_millis(1000),
1385            aggregator,
1386            Duration::from_millis(0),
1387            "test_op".to_string(),
1388        );
1389        operator.set_emit_strategy(EmitStrategy::OnUpdate);
1390
1391        let mut timers = TimerService::new();
1392        let mut state = InMemoryStore::new();
1393        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1394
1395        // First session
1396        let event1 = create_test_event(100, 1);
1397        {
1398            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1399            operator.process(&event1, &mut ctx);
1400        }
1401
1402        // Gap > 1000ms ⇒ creates a *second* session in the same index
1403        let event2 = create_test_event(3000, 1);
1404        let outputs = {
1405            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1406            operator.process(&event2, &mut ctx)
1407        };
1408
1409        // Two sessions now exist for the same key
1410        assert_eq!(operator.active_session_count(), 2);
1411
1412        // OnUpdate emits for the new session
1413        let event_count = outputs
1414            .iter()
1415            .filter(|o| matches!(o, Output::Event(_)))
1416            .count();
1417        assert_eq!(event_count, 1);
1418    }
1419
1420    // ---------------------------------------------------------------
1421    //  Timer triggers emission
1422    // ---------------------------------------------------------------
1423
1424    #[test]
1425    fn test_session_timer_triggers_emission() {
1426        let aggregator = CountAggregator::new();
1427        let mut operator = SessionWindowOperator::with_id(
1428            Duration::from_millis(1000),
1429            aggregator,
1430            Duration::from_millis(0),
1431            "test_op".to_string(),
1432        );
1433
1434        let mut timers = TimerService::new();
1435        let mut state = InMemoryStore::new();
1436        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1437
1438        // Create session
1439        let event = create_test_event(500, 1);
1440        {
1441            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1442            operator.process(&event, &mut ctx);
1443        }
1444
1445        // Get the pending timer (session_id, timer_time, _key_hash)
1446        let (sid, timer_time, _kh) = first_pending_timer(&operator);
1447
1448        // Fire timer
1449        let timer = Timer {
1450            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
1451            timestamp: timer_time,
1452        };
1453
1454        let outputs = {
1455            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1456            operator.on_timer(timer, &mut ctx)
1457        };
1458
1459        assert_eq!(outputs.len(), 1);
1460        match &outputs[0] {
1461            Output::Event(e) => {
1462                assert_eq!(e.timestamp, 1500); // 500 + gap (1000)
1463                let result = e
1464                    .data
1465                    .column(2)
1466                    .as_any()
1467                    .downcast_ref::<Int64Array>()
1468                    .unwrap()
1469                    .value(0);
1470                assert_eq!(result, 1);
1471            }
1472            _ => panic!("Expected Event output"),
1473        }
1474
1475        // Session should be cleaned up
1476        assert_eq!(operator.active_session_count(), 0);
1477    }
1478
1479    // ---------------------------------------------------------------
1480    //  Keyed session tracking
1481    // ---------------------------------------------------------------
1482
1483    #[test]
1484    fn test_session_keyed_tracking() {
1485        let aggregator = SumAggregator::new(1); // Sum column 1 (value)
1486        let mut operator = SessionWindowOperator::with_id(
1487            Duration::from_millis(1000),
1488            aggregator,
1489            Duration::from_millis(0),
1490            "test_op".to_string(),
1491        );
1492        operator.set_key_column(0); // Key by column 0
1493
1494        let mut timers = TimerService::new();
1495        let mut state = InMemoryStore::new();
1496        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1497
1498        // Events for key 1
1499        let event1 = create_keyed_event(100, 1, 10);
1500        let event2 = create_keyed_event(500, 1, 20);
1501
1502        // Events for key 2
1503        let event3 = create_keyed_event(200, 2, 100);
1504        let event4 = create_keyed_event(600, 2, 200);
1505
1506        for event in [event1, event2, event3, event4] {
1507            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1508            operator.process(&event, &mut ctx);
1509        }
1510
1511        // Should have 2 active sessions (one per key)
1512        assert_eq!(operator.active_session_count(), 2);
1513    }
1514
1515    // ---------------------------------------------------------------
1516    //  Late events
1517    // ---------------------------------------------------------------
1518
1519    #[test]
1520    fn test_session_late_event_dropped() {
1521        let aggregator = CountAggregator::new();
1522        let mut operator = SessionWindowOperator::with_id(
1523            Duration::from_millis(1000),
1524            aggregator,
1525            Duration::from_millis(0),
1526            "test_op".to_string(),
1527        );
1528
1529        let mut timers = TimerService::new();
1530        let mut state = InMemoryStore::new();
1531        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1532
1533        // Advance watermark far ahead
1534        let event1 = create_test_event(10000, 1);
1535        {
1536            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1537            operator.process(&event1, &mut ctx);
1538        }
1539
1540        // Process late event
1541        let late_event = create_test_event(100, 1);
1542        let outputs = {
1543            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1544            operator.process(&late_event, &mut ctx)
1545        };
1546
1547        let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1548        assert!(is_late);
1549        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1550    }
1551
1552    #[test]
1553    fn test_session_late_event_side_output() {
1554        let aggregator = CountAggregator::new();
1555        let mut operator = SessionWindowOperator::with_id(
1556            Duration::from_millis(1000),
1557            aggregator,
1558            Duration::from_millis(0),
1559            "test_op".to_string(),
1560        );
1561        operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1562
1563        let mut timers = TimerService::new();
1564        let mut state = InMemoryStore::new();
1565        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1566
1567        // Advance watermark
1568        let event1 = create_test_event(10000, 1);
1569        {
1570            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1571            operator.process(&event1, &mut ctx);
1572        }
1573
1574        // Process late event
1575        let late_event = create_test_event(100, 1);
1576        let outputs = {
1577            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1578            operator.process(&late_event, &mut ctx)
1579        };
1580
1581        let side_output = outputs.iter().find_map(|o| {
1582            if let Output::SideOutput(data) = o {
1583                Some(data.name.clone())
1584            } else {
1585                None
1586            }
1587        });
1588        assert_eq!(side_output.as_deref(), Some("late"));
1589        assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1590    }
1591
1592    // ---------------------------------------------------------------
1593    //  Emit strategies
1594    // ---------------------------------------------------------------
1595
1596    #[test]
1597    fn test_session_emit_on_update() {
1598        let aggregator = CountAggregator::new();
1599        let mut operator = SessionWindowOperator::with_id(
1600            Duration::from_millis(1000),
1601            aggregator,
1602            Duration::from_millis(0),
1603            "test_op".to_string(),
1604        );
1605        operator.set_emit_strategy(EmitStrategy::OnUpdate);
1606
1607        let mut timers = TimerService::new();
1608        let mut state = InMemoryStore::new();
1609        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1610
1611        let event = create_test_event(500, 1);
1612        let outputs = {
1613            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1614            operator.process(&event, &mut ctx)
1615        };
1616
1617        let event_count = outputs
1618            .iter()
1619            .filter(|o| matches!(o, Output::Event(_)))
1620            .count();
1621        assert_eq!(event_count, 1);
1622    }
1623
1624    #[test]
1625    fn test_session_emit_changelog() {
1626        let aggregator = CountAggregator::new();
1627        let mut operator = SessionWindowOperator::with_id(
1628            Duration::from_millis(1000),
1629            aggregator,
1630            Duration::from_millis(0),
1631            "test_op".to_string(),
1632        );
1633        operator.set_emit_strategy(EmitStrategy::Changelog);
1634
1635        let mut timers = TimerService::new();
1636        let mut state = InMemoryStore::new();
1637        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1638
1639        let event = create_test_event(500, 1);
1640        let outputs = {
1641            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1642            operator.process(&event, &mut ctx)
1643        };
1644
1645        let changelog_count = outputs
1646            .iter()
1647            .filter(|o| matches!(o, Output::Changelog(_)))
1648            .count();
1649        assert_eq!(changelog_count, 1);
1650    }
1651
1652    #[test]
1653    fn test_session_emit_final_drops_late() {
1654        let aggregator = CountAggregator::new();
1655        let mut operator = SessionWindowOperator::with_id(
1656            Duration::from_millis(1000),
1657            aggregator,
1658            Duration::from_millis(0),
1659            "test_op".to_string(),
1660        );
1661        operator.set_emit_strategy(EmitStrategy::Final);
1662
1663        let mut timers = TimerService::new();
1664        let mut state = InMemoryStore::new();
1665        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1666
1667        // Advance watermark
1668        let event1 = create_test_event(10000, 1);
1669        {
1670            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1671            operator.process(&event1, &mut ctx);
1672        }
1673
1674        // Process late event - should be silently dropped
1675        let late_event = create_test_event(100, 1);
1676        let outputs = {
1677            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1678            operator.process(&late_event, &mut ctx)
1679        };
1680
1681        assert!(outputs.is_empty());
1682        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1683    }
1684
1685    // ---------------------------------------------------------------
1686    //  Checkpoint / restore
1687    // ---------------------------------------------------------------
1688
1689    #[test]
1690    fn test_session_checkpoint_restore() {
1691        let aggregator = CountAggregator::new();
1692        let mut operator = SessionWindowOperator::with_id(
1693            Duration::from_millis(1000),
1694            aggregator.clone(),
1695            Duration::from_millis(0),
1696            "test_op".to_string(),
1697        );
1698
1699        let mut timers = TimerService::new();
1700        let mut state = InMemoryStore::new();
1701        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1702
1703        // Create some sessions
1704        for ts in [100, 500] {
1705            let event = create_test_event(ts, 1);
1706            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1707            operator.process(&event, &mut ctx);
1708        }
1709
1710        let counter_before = operator.session_id_counter.load(Ordering::Relaxed);
1711
1712        // Checkpoint
1713        let checkpoint = operator.checkpoint();
1714
1715        // Create new operator and restore
1716        let mut restored = SessionWindowOperator::with_id(
1717            Duration::from_millis(1000),
1718            aggregator,
1719            Duration::from_millis(0),
1720            "test_op".to_string(),
1721        );
1722        restored.restore(checkpoint).unwrap();
1723
1724        // Pending timers and counter should be restored
1725        assert_eq!(restored.pending_timers.len(), 1);
1726        assert_eq!(
1727            restored.session_id_counter.load(Ordering::Relaxed),
1728            counter_before,
1729        );
1730    }
1731
1732    // ---------------------------------------------------------------
1733    //  Stale timer
1734    // ---------------------------------------------------------------
1735
1736    #[test]
1737    fn test_session_stale_timer_ignored() {
1738        let aggregator = CountAggregator::new();
1739        let mut operator = SessionWindowOperator::with_id(
1740            Duration::from_millis(1000),
1741            aggregator,
1742            Duration::from_millis(0),
1743            "test_op".to_string(),
1744        );
1745
1746        let mut timers = TimerService::new();
1747        let mut state = InMemoryStore::new();
1748        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1749
1750        // Create session
1751        let event1 = create_test_event(500, 1);
1752        {
1753            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1754            operator.process(&event1, &mut ctx);
1755        }
1756
1757        let (sid, old_timer_time, _kh) = first_pending_timer(&operator);
1758
1759        // Extend session — timer time changes
1760        let event2 = create_test_event(1200, 1);
1761        {
1762            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1763            operator.process(&event2, &mut ctx);
1764        }
1765
1766        // Fire the *old* timer (stale)
1767        let stale_timer = Timer {
1768            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
1769            timestamp: old_timer_time,
1770        };
1771
1772        let outputs = {
1773            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1774            operator.on_timer(stale_timer, &mut ctx)
1775        };
1776
1777        assert!(outputs.is_empty());
1778        assert_eq!(operator.active_session_count(), 1);
1779    }
1780
1781    // ---------------------------------------------------------------
1782    //  Sum aggregation end-to-end
1783    // ---------------------------------------------------------------
1784
1785    #[test]
1786    fn test_session_sum_aggregation() {
1787        let aggregator = SumAggregator::new(0);
1788        let mut operator = SessionWindowOperator::with_id(
1789            Duration::from_millis(1000),
1790            aggregator,
1791            Duration::from_millis(0),
1792            "test_op".to_string(),
1793        );
1794
1795        let mut timers = TimerService::new();
1796        let mut state = InMemoryStore::new();
1797        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1798
1799        for (ts, value) in [(100, 10), (500, 20), (800, 30)] {
1800            let event = create_test_event(ts, value);
1801            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1802            operator.process(&event, &mut ctx);
1803        }
1804
1805        // Fire timer
1806        let (sid, timer_time, _kh) = first_pending_timer(&operator);
1807        let timer = Timer {
1808            key: SessionWindowOperator::<SumAggregator>::timer_key(sid),
1809            timestamp: timer_time,
1810        };
1811
1812        let outputs = {
1813            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1814            operator.on_timer(timer, &mut ctx)
1815        };
1816
1817        match &outputs[0] {
1818            Output::Event(e) => {
1819                let result = e
1820                    .data
1821                    .column(2)
1822                    .as_any()
1823                    .downcast_ref::<Int64Array>()
1824                    .unwrap()
1825                    .value(0);
1826                assert_eq!(result, 60); // 10 + 20 + 30
1827            }
1828            _ => panic!("Expected Event output"),
1829        }
1830    }
1831
1832    // ---------------------------------------------------------------
1833    //  Output schema
1834    // ---------------------------------------------------------------
1835
1836    #[test]
1837    fn test_session_output_schema() {
1838        use arrow_schema::Schema;
1839        let schema = Schema::new(vec![
1840            arrow_schema::Field::new("window_start", arrow_schema::DataType::Int64, false),
1841            arrow_schema::Field::new("window_end", arrow_schema::DataType::Int64, false),
1842            arrow_schema::Field::new("result", arrow_schema::DataType::Int64, false),
1843        ]);
1844
1845        assert_eq!(schema.fields().len(), 3);
1846        assert_eq!(schema.field(0).name(), "window_start");
1847        assert_eq!(schema.field(1).name(), "window_end");
1848        assert_eq!(schema.field(2).name(), "result");
1849    }
1850
1851    // ---------------------------------------------------------------
1852    //  NEW: Multi-session per key
1853    // ---------------------------------------------------------------
1854
1855    #[test]
1856    fn test_multi_session_per_key() {
1857        let aggregator = CountAggregator::new();
1858        let mut operator = SessionWindowOperator::with_id(
1859            Duration::from_millis(1000),
1860            aggregator,
1861            Duration::from_millis(0),
1862            "test_op".to_string(),
1863        );
1864
1865        let mut timers = TimerService::new();
1866        let mut state = InMemoryStore::new();
1867        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1868
1869        // First event creates session at [100, 1100)
1870        let event1 = create_test_event(100, 1);
1871        {
1872            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1873            operator.process(&event1, &mut ctx);
1874        }
1875        assert_eq!(operator.active_session_count(), 1);
1876
1877        // Second event at 5000 — gap > 1000 ⇒ new session at [5000, 6000)
1878        let event2 = create_test_event(5000, 1);
1879        {
1880            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1881            operator.process(&event2, &mut ctx);
1882        }
1883        assert_eq!(operator.active_session_count(), 2);
1884
1885        // Both sessions exist in the same index
1886        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1887        let index = operator.load_session_index(key_hash, &state);
1888        assert_eq!(index.len(), 2);
1889        assert_eq!(index.sessions[0].start, 100);
1890        assert_eq!(index.sessions[1].start, 5000);
1891    }
1892
1893    // ---------------------------------------------------------------
1894    //  NEW: Session persists across batch boundaries
1895    // ---------------------------------------------------------------
1896
1897    #[test]
1898    fn test_session_persists_across_batch_boundary() {
1899        let aggregator = CountAggregator::new();
1900        let mut operator = SessionWindowOperator::with_id(
1901            Duration::from_millis(1000),
1902            aggregator,
1903            Duration::from_millis(0),
1904            "test_op".to_string(),
1905        );
1906
1907        let mut timers = TimerService::new();
1908        let mut state = InMemoryStore::new();
1909        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1910
1911        // Batch 1: create session at [500, 1500)
1912        let event1 = create_test_event(500, 1);
1913        {
1914            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1915            operator.process(&event1, &mut ctx);
1916        }
1917        assert_eq!(operator.active_session_count(), 1);
1918
1919        // Simulate batch boundary: clear in-memory cache
1920        operator.session_indices.clear();
1921        assert_eq!(operator.active_session_count(), 0); // cache cleared
1922
1923        // Batch 2: event at 800 should reload from state and extend
1924        let event2 = create_test_event(800, 1);
1925        {
1926            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1927            operator.process(&event2, &mut ctx);
1928        }
1929
1930        // Session should have been loaded from state store and extended
1931        assert_eq!(operator.active_session_count(), 1);
1932
1933        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1934        let index = operator.load_session_index(key_hash, &state);
1935        assert_eq!(index.len(), 1);
1936        // start stays 500, end extends to max(1500, 800+1000=1800)
1937        assert_eq!(index.sessions[0].start, 500);
1938        assert_eq!(index.sessions[0].end, 1800);
1939
1940        // Accumulator should reflect both events
1941        let sid = index.sessions[0].id;
1942        let acc = operator.load_accumulator(sid, &state);
1943        assert_eq!(acc.result(), 2);
1944    }
1945
1946    // ---------------------------------------------------------------
1947    //  Session merging tests
1948    // ---------------------------------------------------------------
1949
1950    #[test]
1951    fn test_two_way_session_merge() {
1952        // gap=1000. Create 2 sessions: [100, 1100) and [1200, 2200).
1953        // Bridge event at 1050 → potential [1050, 2050) overlaps both → merge.
1954        let aggregator = CountAggregator::new();
1955        let mut operator = SessionWindowOperator::with_id(
1956            Duration::from_millis(1000),
1957            aggregator,
1958            Duration::from_millis(0),
1959            "test_merge".to_string(),
1960        );
1961
1962        let mut timers = TimerService::new();
1963        let mut state = InMemoryStore::new();
1964        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1965
1966        // Create first session [100, 1100)
1967        let event1 = create_test_event(100, 1);
1968        {
1969            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1970            operator.process(&event1, &mut ctx);
1971        }
1972
1973        // Create second session [1200, 2200) (gap of 100ms between sessions)
1974        let event2 = create_test_event(1200, 1);
1975        {
1976            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1977            operator.process(&event2, &mut ctx);
1978        }
1979        assert_eq!(operator.active_session_count(), 2);
1980
1981        // Bridge at 1050 → [1050, 2050) overlaps [100,1100) and [1200,2200)
1982        let bridge = create_test_event(1050, 1);
1983        {
1984            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1985            operator.process(&bridge, &mut ctx);
1986        }
1987
1988        // Should have merged into a single session
1989        assert_eq!(operator.active_session_count(), 1);
1990
1991        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1992        let index = operator.load_session_index(key_hash, &state);
1993        assert_eq!(index.len(), 1);
1994
1995        // Merged bounds: start=100, end=max(1100, 2200, 2050)=2200
1996        assert_eq!(index.sessions[0].start, 100);
1997        assert_eq!(index.sessions[0].end, 2200);
1998
1999        // Accumulator: 3 events total
2000        let sid = index.sessions[0].id;
2001        let acc = operator.load_accumulator(sid, &state);
2002        assert_eq!(acc.result(), 3);
2003    }
2004
2005    #[test]
2006    fn test_three_way_session_merge() {
2007        // gap=500. Create 3 sessions with small gaps between them.
2008        // Use large allowed_lateness to prevent late data dropping on bridge events.
2009        // Session 1: [100, 600), Session 2: [700, 1200), Session 3: [1300, 1800)
2010        // Bridge at 550 → [550, 1050) overlaps sessions 1 and 2 → merge to [100, 1200)
2011        // Bridge at 1150 → [1150, 1650) overlaps [100, 1200) and [1300, 1800) → merge all
2012        let aggregator = CountAggregator::new();
2013        let mut operator = SessionWindowOperator::with_id(
2014            Duration::from_millis(500),
2015            aggregator,
2016            Duration::from_millis(5000),
2017            "test_merge3".to_string(),
2018        );
2019
2020        let mut timers = TimerService::new();
2021        let mut state = InMemoryStore::new();
2022        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2023
2024        // Create 3 sessions with 100ms gaps between them
2025        for ts in [100, 700, 1300] {
2026            let event = create_test_event(ts, 1);
2027            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2028            operator.process(&event, &mut ctx);
2029        }
2030        assert_eq!(operator.active_session_count(), 3);
2031
2032        // Bridge at 550 → [550, 1050) overlaps [100,600) and [700,1200)
2033        let bridge1 = create_test_event(550, 1);
2034        {
2035            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2036            operator.process(&bridge1, &mut ctx);
2037        }
2038        assert_eq!(operator.active_session_count(), 2);
2039
2040        // Check intermediate state
2041        let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
2042        let index = operator.load_session_index(key_hash, &state);
2043        assert_eq!(index.len(), 2);
2044
2045        // Bridge at 1150 → [1150, 1650) overlaps merged [100,1200) and [1300,1800)
2046        let bridge2 = create_test_event(1150, 1);
2047        {
2048            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2049            operator.process(&bridge2, &mut ctx);
2050        }
2051        assert_eq!(operator.active_session_count(), 1);
2052
2053        let index = operator.load_session_index(key_hash, &state);
2054        assert_eq!(index.len(), 1);
2055
2056        // Merged bounds: [100, 1800)
2057        assert_eq!(index.sessions[0].start, 100);
2058        assert_eq!(index.sessions[0].end, 1800);
2059
2060        // 5 events total
2061        let sid = index.sessions[0].id;
2062        let acc = operator.load_accumulator(sid, &state);
2063        assert_eq!(acc.result(), 5);
2064    }
2065
2066    #[test]
2067    fn test_merge_emits_changelog_retractions() {
2068        // Use Changelog strategy. Create 2 sessions (2 inserts emitted).
2069        // Merge them → should emit deletes for both old states + insert for merged.
2070        // gap=1000, sessions [100, 1100) and [1200, 2200) — 100ms gap between them.
2071        let aggregator = CountAggregator::new();
2072        let mut operator = SessionWindowOperator::with_id(
2073            Duration::from_millis(1000),
2074            aggregator,
2075            Duration::from_millis(0),
2076            "test_cl_merge".to_string(),
2077        );
2078        operator.set_emit_strategy(EmitStrategy::Changelog);
2079
2080        let mut timers = TimerService::new();
2081        let mut state = InMemoryStore::new();
2082        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2083
2084        // Create first session [100, 1100) — emits Changelog insert
2085        let event1 = create_test_event(100, 1);
2086        let out1 = {
2087            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2088            operator.process(&event1, &mut ctx)
2089        };
2090        let insert_count_1 = out1
2091            .iter()
2092            .filter(|o| matches!(o, Output::Changelog(r) if r.weight == 1))
2093            .count();
2094        assert_eq!(insert_count_1, 1);
2095
2096        // Create second session [1200, 2200) — emits Changelog insert
2097        let event2 = create_test_event(1200, 1);
2098        let out2 = {
2099            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2100            operator.process(&event2, &mut ctx)
2101        };
2102        let insert_count_2 = out2
2103            .iter()
2104            .filter(|o| matches!(o, Output::Changelog(r) if r.weight == 1))
2105            .count();
2106        assert_eq!(insert_count_2, 1);
2107        assert_eq!(operator.active_session_count(), 2);
2108
2109        // Bridge at 1050 → [1050, 2050) overlaps both sessions → merge
2110        let bridge = create_test_event(1050, 1);
2111        let out3 = {
2112            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2113            operator.process(&bridge, &mut ctx)
2114        };
2115
2116        // Expect: 2 deletes (one for winner's old state, one for loser) + 1 insert (merged)
2117        let deletes: Vec<_> = out3
2118            .iter()
2119            .filter(|o| matches!(o, Output::Changelog(r) if r.weight == -1))
2120            .collect();
2121        let inserts: Vec<_> = out3
2122            .iter()
2123            .filter(|o| matches!(o, Output::Changelog(r) if r.weight == 1))
2124            .collect();
2125
2126        assert_eq!(deletes.len(), 2, "Expected 2 delete retractions");
2127        assert_eq!(inserts.len(), 1, "Expected 1 insert for merged result");
2128
2129        // Session count should be 1 after merge
2130        assert_eq!(operator.active_session_count(), 1);
2131    }
2132
2133    #[test]
2134    fn test_merge_accumulator_correctness_sum() {
2135        // Use SumAggregator. Session A: events 10, 20 (sum=30).
2136        // Session B: event 50 (sum=50). Bridge with value 5.
2137        // gap=500 to keep sessions tight.
2138        // Session A: events at 100 and 300 → extends to [100, 800).
2139        // Session B: event at 900 → [900, 1400). Gap of 100ms.
2140        // Bridge at 750 → [750, 1250) overlaps both → merge.
2141        // Merged sum = 10 + 20 + 50 + 5 = 85.
2142        let aggregator = SumAggregator::new(0);
2143        let mut operator = SessionWindowOperator::with_id(
2144            Duration::from_millis(500),
2145            aggregator,
2146            Duration::from_millis(0),
2147            "test_sum_merge".to_string(),
2148        );
2149
2150        let mut timers = TimerService::new();
2151        let mut state = InMemoryStore::new();
2152        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2153
2154        // Session A: [100, 800) with sum=30
2155        for (ts, val) in [(100, 10), (300, 20)] {
2156            let event = create_test_event(ts, val);
2157            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2158            operator.process(&event, &mut ctx);
2159        }
2160
2161        // Session B: [900, 1400) with sum=50
2162        let event_b = create_test_event(900, 50);
2163        {
2164            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2165            operator.process(&event_b, &mut ctx);
2166        }
2167        assert_eq!(operator.active_session_count(), 2);
2168
2169        // Bridge at 750 with value 5 → [750, 1250) overlaps both → merge
2170        let bridge = create_test_event(750, 5);
2171        {
2172            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2173            operator.process(&bridge, &mut ctx);
2174        }
2175        assert_eq!(operator.active_session_count(), 1);
2176
2177        // Verify merged sum
2178        let key_hash = SessionWindowOperator::<SumAggregator>::key_hash(&[]);
2179        let index = operator.load_session_index(key_hash, &state);
2180        let sid = index.sessions[0].id;
2181        let acc = operator.load_accumulator(sid, &state);
2182        assert_eq!(acc.result(), 85); // 10 + 20 + 50 + 5
2183    }
2184
2185    #[test]
2186    fn test_merge_cleans_up_loser_timers() {
2187        // Create 2 sessions with pending timers. Merge them.
2188        // gap=1000, sessions [100, 1100) and [1200, 2200). Bridge at 1050.
2189        let aggregator = CountAggregator::new();
2190        let mut operator = SessionWindowOperator::with_id(
2191            Duration::from_millis(1000),
2192            aggregator,
2193            Duration::from_millis(0),
2194            "test_timer_merge".to_string(),
2195        );
2196
2197        let mut timers = TimerService::new();
2198        let mut state = InMemoryStore::new();
2199        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2200
2201        // Create first session [100, 1100)
2202        let event1 = create_test_event(100, 1);
2203        {
2204            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2205            operator.process(&event1, &mut ctx);
2206        }
2207
2208        // Create second session [1200, 2200)
2209        let event2 = create_test_event(1200, 1);
2210        {
2211            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2212            operator.process(&event2, &mut ctx);
2213        }
2214        assert_eq!(operator.pending_timers.len(), 2);
2215
2216        // Bridge at 1050 → [1050, 2050) merges both sessions
2217        let bridge = create_test_event(1050, 1);
2218        {
2219            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2220            operator.process(&bridge, &mut ctx);
2221        }
2222
2223        // Only 1 timer should remain (for the winner)
2224        assert_eq!(operator.pending_timers.len(), 1);
2225        assert_eq!(operator.active_session_count(), 1);
2226    }
2227
2228    #[test]
2229    fn test_merge_no_retractions_for_on_watermark() {
2230        // OnWatermark strategy should NOT emit retractions on merge
2231        // (no intermediate output was emitted).
2232        // gap=1000, sessions [100, 1100) and [1200, 2200). Bridge at 1050.
2233        let aggregator = CountAggregator::new();
2234        let mut operator = SessionWindowOperator::with_id(
2235            Duration::from_millis(1000),
2236            aggregator,
2237            Duration::from_millis(0),
2238            "test_wm_merge".to_string(),
2239        );
2240        // Default strategy is OnWatermark
2241
2242        let mut timers = TimerService::new();
2243        let mut state = InMemoryStore::new();
2244        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2245
2246        // Create 2 sessions
2247        let event1 = create_test_event(100, 1);
2248        {
2249            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2250            operator.process(&event1, &mut ctx);
2251        }
2252        let event2 = create_test_event(1200, 1);
2253        {
2254            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2255            operator.process(&event2, &mut ctx);
2256        }
2257
2258        // Bridge at 1050 → merge. No changelog outputs expected.
2259        let bridge = create_test_event(1050, 1);
2260        let outputs = {
2261            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2262            operator.process(&bridge, &mut ctx)
2263        };
2264
2265        let changelog_count = outputs
2266            .iter()
2267            .filter(|o| matches!(o, Output::Changelog(_)))
2268            .count();
2269        assert_eq!(changelog_count, 0);
2270        assert_eq!(operator.active_session_count(), 1);
2271    }
2272
2273    // ---------------------------------------------------------------
2274    //  Emit strategy verification tests
2275    // ---------------------------------------------------------------
2276
2277    #[test]
2278    fn test_on_watermark_no_intermediate_emits_on_timer() {
2279        // OnWatermark (default): process() should NOT emit Event/Changelog,
2280        // only on_timer() should emit.
2281        let aggregator = CountAggregator::new();
2282        let mut operator = SessionWindowOperator::with_id(
2283            Duration::from_millis(1000),
2284            aggregator,
2285            Duration::from_millis(0),
2286            "test_wm_emit".to_string(),
2287        );
2288        // Default strategy is OnWatermark
2289
2290        let mut timers = TimerService::new();
2291        let mut state = InMemoryStore::new();
2292        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2293
2294        // Process 2 events in the same session
2295        let event1 = create_test_event(500, 1);
2296        let out1 = {
2297            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2298            operator.process(&event1, &mut ctx)
2299        };
2300
2301        let event2 = create_test_event(800, 1);
2302        let out2 = {
2303            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2304            operator.process(&event2, &mut ctx)
2305        };
2306
2307        // No Event or Changelog output from process() (only Watermark allowed)
2308        let event_or_changelog = |o: &&Output| matches!(o, Output::Event(_) | Output::Changelog(_));
2309        assert_eq!(out1.iter().filter(event_or_changelog).count(), 0);
2310        assert_eq!(out2.iter().filter(event_or_changelog).count(), 0);
2311
2312        // Fire timer → should emit Event with count=2
2313        let (sid, timer_time, _kh) = first_pending_timer(&operator);
2314        let timer = Timer {
2315            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2316            timestamp: timer_time,
2317        };
2318        let timer_out = {
2319            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2320            operator.on_timer(timer, &mut ctx)
2321        };
2322
2323        assert_eq!(timer_out.len(), 1);
2324        match &timer_out[0] {
2325            Output::Event(e) => {
2326                let result = e
2327                    .data
2328                    .column(2)
2329                    .as_any()
2330                    .downcast_ref::<Int64Array>()
2331                    .unwrap()
2332                    .value(0);
2333                assert_eq!(result, 2);
2334            }
2335            _ => panic!("Expected Event output from on_timer"),
2336        }
2337    }
2338
2339    #[test]
2340    fn test_on_window_close_no_intermediate_emits_on_timer() {
2341        // OnWindowClose: process() should NOT emit Event/Changelog,
2342        // only on_timer() should emit.
2343        let aggregator = CountAggregator::new();
2344        let mut operator = SessionWindowOperator::with_id(
2345            Duration::from_millis(1000),
2346            aggregator,
2347            Duration::from_millis(0),
2348            "test_owc_emit".to_string(),
2349        );
2350        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2351
2352        let mut timers = TimerService::new();
2353        let mut state = InMemoryStore::new();
2354        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2355
2356        // Process event
2357        let event = create_test_event(500, 1);
2358        let out = {
2359            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2360            operator.process(&event, &mut ctx)
2361        };
2362
2363        // No Event or Changelog from process()
2364        let event_or_changelog = |o: &&Output| matches!(o, Output::Event(_) | Output::Changelog(_));
2365        assert_eq!(out.iter().filter(event_or_changelog).count(), 0);
2366
2367        // Fire timer → should emit Event
2368        let (sid, timer_time, _kh) = first_pending_timer(&operator);
2369        let timer = Timer {
2370            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2371            timestamp: timer_time,
2372        };
2373        let timer_out = {
2374            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2375            operator.on_timer(timer, &mut ctx)
2376        };
2377
2378        assert_eq!(timer_out.len(), 1);
2379        assert!(matches!(&timer_out[0], Output::Event(_)));
2380    }
2381
2382    #[test]
2383    fn test_changelog_timer_emits_changelog_record() {
2384        // Changelog strategy: on_timer() should emit Output::Changelog
2385        // with weight == 1 (insert).
2386        let aggregator = CountAggregator::new();
2387        let mut operator = SessionWindowOperator::with_id(
2388            Duration::from_millis(1000),
2389            aggregator,
2390            Duration::from_millis(0),
2391            "test_cl_timer".to_string(),
2392        );
2393        operator.set_emit_strategy(EmitStrategy::Changelog);
2394
2395        let mut timers = TimerService::new();
2396        let mut state = InMemoryStore::new();
2397        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2398
2399        // Process event (emits intermediate Changelog insert)
2400        let event = create_test_event(500, 1);
2401        {
2402            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2403            operator.process(&event, &mut ctx);
2404        }
2405
2406        // Fire timer → should emit Changelog insert
2407        let (sid, timer_time, _kh) = first_pending_timer(&operator);
2408        let timer = Timer {
2409            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2410            timestamp: timer_time,
2411        };
2412        let timer_out = {
2413            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2414            operator.on_timer(timer, &mut ctx)
2415        };
2416
2417        assert_eq!(timer_out.len(), 1);
2418        match &timer_out[0] {
2419            Output::Changelog(record) => {
2420                assert_eq!(record.weight, 1, "Expected insert weight");
2421            }
2422            _ => panic!("Expected Changelog output from on_timer"),
2423        }
2424    }
2425
2426    #[test]
2427    fn test_on_update_emits_updated_count() {
2428        // OnUpdate: each process() call should emit Event with the latest count.
2429        let aggregator = CountAggregator::new();
2430        let mut operator = SessionWindowOperator::with_id(
2431            Duration::from_millis(1000),
2432            aggregator,
2433            Duration::from_millis(0),
2434            "test_ou_emit".to_string(),
2435        );
2436        operator.set_emit_strategy(EmitStrategy::OnUpdate);
2437
2438        let mut timers = TimerService::new();
2439        let mut state = InMemoryStore::new();
2440        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2441
2442        // First event → count=1
2443        let event1 = create_test_event(500, 1);
2444        let out1 = {
2445            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2446            operator.process(&event1, &mut ctx)
2447        };
2448
2449        let emitted1: Vec<_> = out1
2450            .iter()
2451            .filter_map(|o| match o {
2452                Output::Event(e) => Some(e),
2453                _ => None,
2454            })
2455            .collect();
2456        assert_eq!(emitted1.len(), 1);
2457        let result1 = emitted1[0]
2458            .data
2459            .column(2)
2460            .as_any()
2461            .downcast_ref::<Int64Array>()
2462            .unwrap()
2463            .value(0);
2464        assert_eq!(result1, 1);
2465
2466        // Second event in same session → count=2
2467        let event2 = create_test_event(800, 1);
2468        let out2 = {
2469            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2470            operator.process(&event2, &mut ctx)
2471        };
2472
2473        let emitted2: Vec<_> = out2
2474            .iter()
2475            .filter_map(|o| match o {
2476                Output::Event(e) => Some(e),
2477                _ => None,
2478            })
2479            .collect();
2480        assert_eq!(emitted2.len(), 1);
2481        let result2 = emitted2[0]
2482            .data
2483            .column(2)
2484            .as_any()
2485            .downcast_ref::<Int64Array>()
2486            .unwrap()
2487            .value(0);
2488        assert_eq!(result2, 2);
2489    }
2490
2491    // ---------------------------------------------------------------
2492    //  Timer persistence after checkpoint/restore
2493    // ---------------------------------------------------------------
2494
2495    #[test]
2496    fn test_timer_reregistration_after_restore() {
2497        // Create session, checkpoint, restore to fresh operator + fresh TimerService.
2498        // Verify needs_timer_reregistration is true.
2499        // Process a new event (triggers lazy re-registration).
2500        // Verify TimerService has both the restored timer and the new timer.
2501        // Poll timers at restored timer time → verify timer fires.
2502        let aggregator = CountAggregator::new();
2503        let mut operator = SessionWindowOperator::with_id(
2504            Duration::from_millis(1000),
2505            aggregator.clone(),
2506            Duration::from_millis(0),
2507            "test_restore_timers".to_string(),
2508        );
2509
2510        let mut timers = TimerService::new();
2511        let mut state = InMemoryStore::new();
2512        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2513
2514        // Create a session at t=500 → timer at 500+1000+0 = 1500
2515        let event = create_test_event(500, 1);
2516        {
2517            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2518            operator.process(&event, &mut ctx);
2519        }
2520        assert_eq!(operator.pending_timers.len(), 1);
2521        let (_, original_timer_time, _) = first_pending_timer(&operator);
2522        assert_eq!(original_timer_time, 1500);
2523
2524        // Checkpoint
2525        let checkpoint = operator.checkpoint();
2526
2527        // Create fresh operator and fresh TimerService (simulating restart)
2528        let mut restored = SessionWindowOperator::with_id(
2529            Duration::from_millis(1000),
2530            aggregator,
2531            Duration::from_millis(0),
2532            "test_restore_timers".to_string(),
2533        );
2534        let mut fresh_timers = TimerService::new();
2535        assert_eq!(fresh_timers.pending_count(), 0);
2536
2537        // Restore
2538        restored.restore(checkpoint).unwrap();
2539        assert!(restored.needs_timer_reregistration);
2540        assert_eq!(restored.pending_timers.len(), 1);
2541
2542        // Fresh TimerService still has no timers (not re-registered yet)
2543        assert_eq!(fresh_timers.pending_count(), 0);
2544
2545        // Process a new event → triggers lazy re-registration + creates new session
2546        let event2 = create_test_event(5000, 1);
2547        {
2548            let mut ctx = create_test_context(&mut fresh_timers, &mut state, &mut watermark_gen);
2549            restored.process(&event2, &mut ctx);
2550        }
2551
2552        // TimerService should now have at least 2 timers:
2553        // restored timer (1500) + new timer (5000+1000=6000)
2554        assert!(
2555            fresh_timers.pending_count() >= 2,
2556            "Expected at least 2 timers, got {}",
2557            fresh_timers.pending_count()
2558        );
2559        assert!(!restored.needs_timer_reregistration);
2560
2561        // Poll timers at the restored timer time → should fire
2562        let fired = fresh_timers.poll_timers(1500);
2563        assert_eq!(fired.len(), 1);
2564        assert_eq!(fired[0].timestamp, 1500);
2565
2566        // Use the fired timer to invoke on_timer
2567        let timer = Timer {
2568            key: fired[0].key.clone().unwrap(),
2569            timestamp: fired[0].timestamp,
2570        };
2571        let timer_out = {
2572            let mut ctx = create_test_context(&mut fresh_timers, &mut state, &mut watermark_gen);
2573            restored.on_timer(timer, &mut ctx)
2574        };
2575
2576        // Should emit the restored session's result (count=1 from the original event)
2577        assert_eq!(timer_out.len(), 1);
2578        match &timer_out[0] {
2579            Output::Event(e) => {
2580                let result = e
2581                    .data
2582                    .column(2)
2583                    .as_any()
2584                    .downcast_ref::<Int64Array>()
2585                    .unwrap()
2586                    .value(0);
2587                assert_eq!(result, 1);
2588            }
2589            _ => panic!("Expected Event output from restored timer"),
2590        }
2591    }
2592
2593    // ========================================================================
2594    // EMIT ON WINDOW CLOSE (EOWC) — Session Window Tests (Issue #52)
2595    // ========================================================================
2596
2597    #[test]
2598    fn test_eowc_session_basic_close() {
2599        // Create session with 30s gap, send events at t=0, 10000, 20000.
2600        // Session = [0, 50000). Fire timer. Verify single emission.
2601        let aggregator = CountAggregator::new();
2602        let mut operator = SessionWindowOperator::with_id(
2603            Duration::from_millis(30_000), // 30s gap
2604            aggregator,
2605            Duration::from_millis(0),
2606            "eowc_session".to_string(),
2607        );
2608        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2609
2610        let mut timers = TimerService::new();
2611        let mut state = InMemoryStore::new();
2612        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2613
2614        // Send 3 events within the same session
2615        for ts in [0, 10_000, 20_000] {
2616            let event = create_test_event(ts, 1);
2617            let outputs = {
2618                let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2619                operator.process(&event, &mut ctx)
2620            };
2621            // No intermediate Event emissions
2622            let event_count = outputs
2623                .iter()
2624                .filter(|o| matches!(o, Output::Event(_)))
2625                .count();
2626            assert_eq!(
2627                event_count, 0,
2628                "EOWC session should not emit intermediate results"
2629            );
2630        }
2631
2632        // Session should be [0, 50000) (last event at 20000 + gap 30000)
2633        assert_eq!(operator.active_session_count(), 1);
2634
2635        // Get the pending timer and fire it
2636        let (sid, timer_time, _kh) = first_pending_timer(&operator);
2637        let timer = Timer {
2638            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2639            timestamp: timer_time,
2640        };
2641        let outputs = {
2642            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2643            operator.on_timer(timer, &mut ctx)
2644        };
2645
2646        assert_eq!(outputs.len(), 1, "Should emit exactly once");
2647        match &outputs[0] {
2648            Output::Event(e) => {
2649                let result = e
2650                    .data
2651                    .column(2)
2652                    .as_any()
2653                    .downcast_ref::<Int64Array>()
2654                    .unwrap();
2655                assert_eq!(result.value(0), 3, "Session count should be 3");
2656            }
2657            other => panic!("Expected Output::Event, got: {other:?}"),
2658        }
2659
2660        // Session should be cleaned up
2661        assert_eq!(operator.active_session_count(), 0);
2662    }
2663
2664    #[test]
2665    fn test_eowc_session_no_intermediate_on_extend() {
2666        // Extending a session with new events should not produce
2667        // intermediate emissions with OnWindowClose.
2668        let aggregator = CountAggregator::new();
2669        let mut operator = SessionWindowOperator::with_id(
2670            Duration::from_millis(1000),
2671            aggregator,
2672            Duration::from_millis(0),
2673            "eowc_extend".to_string(),
2674        );
2675        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2676
2677        let mut timers = TimerService::new();
2678        let mut state = InMemoryStore::new();
2679        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2680
2681        // Repeatedly extend the session
2682        for ts in (0..5).map(|i| i * 500) {
2683            let event = create_test_event(ts, 1);
2684            let outputs = {
2685                let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2686                operator.process(&event, &mut ctx)
2687            };
2688            for output in &outputs {
2689                assert!(
2690                    !matches!(output, Output::Event(_)),
2691                    "No intermediate Event on session extend (ts={ts})"
2692                );
2693                assert!(
2694                    !matches!(output, Output::Changelog(_)),
2695                    "No intermediate Changelog on session extend (ts={ts})"
2696                );
2697            }
2698        }
2699
2700        // Still 1 session, not yet emitted
2701        assert_eq!(operator.active_session_count(), 1);
2702    }
2703
2704    #[test]
2705    fn test_eowc_session_merge_before_close() {
2706        // Create two separate sessions, then bridge them.
2707        // Merged session should emit once with combined count.
2708        let aggregator = CountAggregator::new();
2709        let mut operator = SessionWindowOperator::with_id(
2710            Duration::from_millis(1000), // 1s gap
2711            aggregator,
2712            Duration::from_millis(0),
2713            "eowc_merge".to_string(),
2714        );
2715        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2716
2717        let mut timers = TimerService::new();
2718        let mut state = InMemoryStore::new();
2719        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2720
2721        // Session 1: event at t=100 → session [100, 1100)
2722        let e1 = create_test_event(100, 1);
2723        {
2724            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2725            operator.process(&e1, &mut ctx);
2726        }
2727
2728        // Session 2: event at t=1200 → session [1200, 2200)
2729        let e2 = create_test_event(1200, 1);
2730        {
2731            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2732            operator.process(&e2, &mut ctx);
2733        }
2734        assert_eq!(
2735            operator.active_session_count(),
2736            2,
2737            "Should have 2 sessions before merge"
2738        );
2739
2740        // Bridge: event at t=1050 → session [1050, 2050) overlaps both
2741        let bridge = create_test_event(1050, 1);
2742        let bridge_outputs = {
2743            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2744            operator.process(&bridge, &mut ctx)
2745        };
2746
2747        // No intermediate Event from merge with OnWindowClose
2748        let event_count = bridge_outputs
2749            .iter()
2750            .filter(|o| matches!(o, Output::Event(_)))
2751            .count();
2752        assert_eq!(
2753            event_count, 0,
2754            "Merge should not emit intermediate results with EOWC"
2755        );
2756        assert_eq!(
2757            operator.active_session_count(),
2758            1,
2759            "Should have 1 merged session"
2760        );
2761
2762        // Fire the merged session's timer
2763        let (sid, timer_time, _kh) = first_pending_timer(&operator);
2764        let timer = Timer {
2765            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2766            timestamp: timer_time,
2767        };
2768        let outputs = {
2769            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2770            operator.on_timer(timer, &mut ctx)
2771        };
2772
2773        assert_eq!(outputs.len(), 1, "Merged session should emit once");
2774        if let Output::Event(e) = &outputs[0] {
2775            let result = e
2776                .data
2777                .column(2)
2778                .as_any()
2779                .downcast_ref::<Int64Array>()
2780                .unwrap();
2781            assert_eq!(
2782                result.value(0),
2783                3,
2784                "Merged session count should be 3 (1+1+1)"
2785            );
2786        } else {
2787            panic!("Expected Event output for merged session");
2788        }
2789    }
2790
2791    #[test]
2792    fn test_eowc_session_late_data_after_close() {
2793        // Close a session, then send late event for the same time range.
2794        // Late event should be dropped (or routed to side output).
2795        let aggregator = CountAggregator::new();
2796        let mut operator = SessionWindowOperator::with_id(
2797            Duration::from_millis(1000),
2798            aggregator,
2799            Duration::from_millis(0),
2800            "eowc_late".to_string(),
2801        );
2802        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2803
2804        let mut timers = TimerService::new();
2805        let mut state = InMemoryStore::new();
2806        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2807
2808        // Create session at t=500 → session [500, 1500)
2809        let e1 = create_test_event(500, 1);
2810        {
2811            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2812            operator.process(&e1, &mut ctx);
2813        }
2814
2815        // Fire timer to close the session
2816        let (sid, timer_time, _kh) = first_pending_timer(&operator);
2817        let timer = Timer {
2818            key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2819            timestamp: timer_time,
2820        };
2821        {
2822            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2823            operator.on_timer(timer, &mut ctx);
2824        }
2825        assert_eq!(operator.active_session_count(), 0);
2826
2827        // Advance watermark past the session close
2828        let advance = create_test_event(5000, 1);
2829        {
2830            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2831            operator.process(&advance, &mut ctx);
2832        }
2833
2834        // Send late event for the closed session's time range
2835        let late = create_test_event(600, 99);
2836        let late_outputs = {
2837            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2838            operator.process(&late, &mut ctx)
2839        };
2840
2841        // Should be classified as late
2842        let is_late = late_outputs
2843            .iter()
2844            .any(|o| matches!(o, Output::LateEvent(_)));
2845        assert!(is_late, "Late event after session close should be detected");
2846
2847        // Should NOT produce a new Event
2848        let is_event = late_outputs.iter().any(|o| matches!(o, Output::Event(_)));
2849        assert!(!is_event, "Late event should not re-open closed session");
2850
2851        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
2852    }
2853}