Skip to main content

laminar_core/operator/
sliding_window.rs

1//! # Sliding Window Operators
2//!
3//! Implementation of sliding (hopping) windows for stream processing.
4//!
5//! Sliding windows are fixed-size windows that overlap. Each event can belong
6//! to multiple windows. The window size defines the duration, and the slide
7//! defines how much each window advances.
8//!
9//! ## Example
10//!
11//! ```text
12//! Window size: 1 hour, Slide: 15 minutes
13//!
14//! Window 1: [00:00, 01:00)
15//! Window 2: [00:15, 01:15)
16//! Window 3: [00:30, 01:30)
17//! Window 4: [00:45, 01:45)
18//!
19//! An event at 00:40 belongs to windows 1, 2, 3
20//! ```
21//!
22//! ## Performance
23//!
24//! - Each event is assigned to `ceil(size / slide)` windows
25//! - Uses `SmallVec<[WindowId; 4]>` to avoid heap allocation for common cases
26//! - State is stored per-window, so memory usage scales with active windows
27//!
28//! ## Usage
29//!
30//! ```rust,no_run
31//! use laminar_core::operator::sliding_window::{
32//!     SlidingWindowAssigner, SlidingWindowOperator,
33//! };
34//! use laminar_core::operator::window::CountAggregator;
35//! use std::time::Duration;
36//!
37//! // Create a 1-hour sliding window with 15-minute slide
38//! let assigner = SlidingWindowAssigner::new(
39//!     Duration::from_secs(3600),  // 1 hour window
40//!     Duration::from_secs(900),   // 15 minute slide
41//! );
42//! let operator = SlidingWindowOperator::new(
43//!     assigner,
44//!     CountAggregator::new(),
45//!     Duration::from_secs(60), // 1 minute grace period
46//! );
47//! ```
48
49use super::window::{
50    Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
51    ResultToArrow, WindowAssigner, WindowCloseMetrics, WindowId, WindowIdVec,
52};
53use super::{
54    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec,
55    SideOutputData, Timer,
56};
57use crate::state::{StateStore, StateStoreExt};
58use arrow_array::{Int64Array, RecordBatch};
59use arrow_schema::{DataType, Field, Schema, SchemaRef};
60use rkyv::{
61    api::high::{HighDeserializer, HighSerializer, HighValidator},
62    bytecheck::CheckBytes,
63    rancor::Error as RkyvError,
64    ser::allocator::ArenaHandle,
65    util::AlignedVec,
66    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
67};
68use rustc_hash::FxHashMap;
69use smallvec::SmallVec;
70use std::marker::PhantomData;
71use std::sync::atomic::{AtomicU64, Ordering};
72use std::sync::Arc;
73use std::time::Duration;
74
75/// Sliding window assigner.
76///
77/// Assigns each event to multiple overlapping windows based on its timestamp.
78/// Windows are aligned to epoch (timestamp 0).
79///
80/// # Parameters
81///
82/// - `size_ms`: The duration of each window in milliseconds
83/// - `slide_ms`: The advance interval between windows in milliseconds
84///
85/// # Window Assignment
86///
87/// An event at timestamp `t` belongs to all windows `[start, start + size)`
88/// where `start <= t < start + size` and `start` is a multiple of `slide`.
89///
90/// The number of windows per event is `ceil(size / slide)`.
91///
92/// # Example
93///
94/// ```rust,no_run
95/// use laminar_core::operator::sliding_window::SlidingWindowAssigner;
96/// use std::time::Duration;
97///
98/// // 1-minute window with 20-second slide
99/// let assigner = SlidingWindowAssigner::new(
100///     Duration::from_secs(60),
101///     Duration::from_secs(20),
102/// );
103///
104/// // Event at t=50 belongs to windows: [0,60), [20,80), [40,100)
105/// ```
106#[derive(Debug, Clone)]
107pub struct SlidingWindowAssigner {
108    /// Window size in milliseconds
109    size_ms: i64,
110    /// Slide interval in milliseconds
111    slide_ms: i64,
112    /// Number of windows per event (cached for performance)
113    windows_per_event: usize,
114    /// Offset in milliseconds for timezone-aligned windows
115    offset_ms: i64,
116}
117
118impl SlidingWindowAssigner {
119    /// Creates a new sliding window assigner.
120    ///
121    /// # Arguments
122    ///
123    /// * `size` - The duration of each window
124    /// * `slide` - The advance interval between windows
125    ///
126    /// # Panics
127    ///
128    /// Panics if:
129    /// - Size is zero or negative
130    /// - Slide is zero or negative
131    /// - Slide is greater than size (use tumbling windows instead)
132    #[must_use]
133    pub fn new(size: Duration, slide: Duration) -> Self {
134        // Ensure size and slide fit in i64
135        let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
136        let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
137
138        assert!(size_ms > 0, "Window size must be positive");
139        assert!(slide_ms > 0, "Slide interval must be positive");
140        assert!(
141            slide_ms <= size_ms,
142            "Slide must not exceed size (use tumbling windows for non-overlapping)"
143        );
144
145        // Calculate the number of windows each event belongs to
146        // This is ceil(size / slide)
147        let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
148            .expect("Windows per event should fit in usize");
149
150        Self {
151            size_ms,
152            slide_ms,
153            windows_per_event,
154            offset_ms: 0,
155        }
156    }
157
158    /// Creates a new sliding window assigner with sizes in milliseconds.
159    ///
160    /// # Panics
161    ///
162    /// Panics if size or slide is zero/negative, or if slide > size.
163    #[must_use]
164    #[allow(clippy::cast_sign_loss)]
165    pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
166        assert!(size_ms > 0, "Window size must be positive");
167        assert!(slide_ms > 0, "Slide interval must be positive");
168        assert!(
169            slide_ms <= size_ms,
170            "Slide must not exceed size (use tumbling windows for non-overlapping)"
171        );
172
173        // Truncation is acceptable: number of windows per event will never exceed reasonable limits
174        let windows_per_event =
175            usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
176
177        Self {
178            size_ms,
179            slide_ms,
180            windows_per_event,
181            offset_ms: 0,
182        }
183    }
184
185    /// Set window offset in milliseconds.
186    #[must_use]
187    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
188        self.offset_ms = offset_ms;
189        self
190    }
191
192    /// Returns the window size in milliseconds.
193    #[must_use]
194    pub fn size_ms(&self) -> i64 {
195        self.size_ms
196    }
197
198    /// Returns the slide interval in milliseconds.
199    #[must_use]
200    pub fn slide_ms(&self) -> i64 {
201        self.slide_ms
202    }
203
204    /// Returns the number of windows each event belongs to.
205    #[must_use]
206    pub fn windows_per_event(&self) -> usize {
207        self.windows_per_event
208    }
209
210    /// Returns the window offset in milliseconds.
211    #[must_use]
212    pub fn offset_ms(&self) -> i64 {
213        self.offset_ms
214    }
215
216    /// Computes the last window start that could contain this timestamp.
217    ///
218    /// This is the window with the largest start time where start <= timestamp.
219    /// When an offset is set, window boundaries are shifted accordingly.
220    #[inline]
221    fn last_window_start(&self, timestamp: i64) -> i64 {
222        let adjusted = timestamp - self.offset_ms;
223        let base = if adjusted >= 0 {
224            (adjusted / self.slide_ms) * self.slide_ms
225        } else {
226            // For negative timestamps, use floor division with saturating arithmetic
227            (adjusted.saturating_sub(self.slide_ms).saturating_add(1) / self.slide_ms)
228                * self.slide_ms
229        };
230        base + self.offset_ms
231    }
232}
233
234impl WindowAssigner for SlidingWindowAssigner {
235    /// Assigns a timestamp to all overlapping windows.
236    ///
237    /// Returns windows in order from earliest to latest start time.
238    #[inline]
239    fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
240        let mut windows = WindowIdVec::new();
241
242        // Find the last window that could contain this timestamp
243        let last_start = self.last_window_start(timestamp);
244
245        // Walk backwards through all windows that contain this timestamp
246        let mut window_start = last_start;
247        while window_start + self.size_ms > timestamp {
248            let window_end = window_start + self.size_ms;
249            windows.push(WindowId::new(window_start, window_end));
250            let prev = window_start;
251            window_start = window_start.saturating_sub(self.slide_ms);
252            if window_start == prev {
253                break;
254            }
255        }
256
257        // Reverse to get windows in chronological order (earliest first)
258        windows.reverse();
259        windows
260    }
261
262    /// Returns the maximum timestamp that could still be assigned to a window
263    /// ending at `window_end`.
264    fn max_timestamp(&self, window_end: i64) -> i64 {
265        window_end - 1
266    }
267}
268
269/// State key prefix for window accumulators (4 bytes)
270const WINDOW_STATE_PREFIX: &[u8; 4] = b"slw:";
271
272/// Total size of window state key: prefix (4) + `WindowId` (16) = 20 bytes
273const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
274
275/// Static counter for generating unique operator IDs without allocation.
276static SLIDING_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
277
278/// Sliding window operator.
279///
280/// Processes events through overlapping, fixed-size time windows.
281/// Each event is assigned to multiple windows based on the slide interval.
282/// Results are emitted based on the configured [`EmitStrategy`].
283///
284/// # Emit Strategies
285///
286/// - `OnWatermark` (default): Emit when watermark passes window end
287/// - `Periodic`: Emit intermediate results at intervals, final on watermark
288/// - `OnUpdate`: Emit after every state update (can produce many outputs)
289///
290/// # Late Data Handling
291///
292/// Events that arrive after `window_end + allowed_lateness` are considered late.
293/// Their behavior is controlled by [`LateDataConfig`]:
294/// - Drop the event (default)
295/// - Route to a named side output for separate processing
296///
297/// # State Management
298///
299/// Window state is stored in the operator context's state store using
300/// prefixed keys: `slw:<window_id>` - Accumulator state
301///
302/// # Performance Considerations
303///
304/// Each event updates `ceil(size / slide)` windows. For example:
305/// - 1-hour window, 15-minute slide = 4 windows per event
306/// - 1-minute window, 10-second slide = 6 windows per event
307///
308/// State usage scales linearly with active windows.
309pub struct SlidingWindowOperator<A: Aggregator> {
310    /// Window assigner
311    assigner: SlidingWindowAssigner,
312    /// Aggregator function
313    aggregator: A,
314    /// Allowed lateness for late data
315    allowed_lateness_ms: i64,
316    /// Track registered timers to avoid duplicates
317    registered_windows: rustc_hash::FxHashSet<WindowId>,
318    /// Track windows with registered periodic timers
319    periodic_timer_windows: rustc_hash::FxHashSet<WindowId>,
320    /// Emit strategy for controlling when results are output
321    emit_strategy: EmitStrategy,
322    /// Late data handling configuration
323    late_data_config: LateDataConfig,
324    /// Metrics for late data tracking
325    late_data_metrics: LateDataMetrics,
326    /// Metrics for window close tracking
327    window_close_metrics: WindowCloseMetrics,
328    /// Operator ID for checkpointing
329    operator_id: String,
330    /// Cached output schema (avoids allocation on every emit)
331    output_schema: SchemaRef,
332    /// Last emitted intermediate result per window (for Changelog retraction).
333    /// Only populated when `emit_strategy` is `Changelog`.
334    /// Enables Z-set balance: retract (-1) old value before inserting (+1) new.
335    last_emitted: FxHashMap<WindowId, Event>,
336    /// Phantom data for accumulator type
337    _phantom: PhantomData<A::Acc>,
338}
339
340impl<A: Aggregator> SlidingWindowOperator<A>
341where
342    A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
343    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
344        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
345{
346    /// Creates a new sliding window operator.
347    ///
348    /// # Arguments
349    ///
350    /// * `assigner` - Window assigner for determining window boundaries
351    /// * `aggregator` - Aggregation function to apply within windows
352    /// * `allowed_lateness` - Grace period for late data after window close
353    /// # Panics
354    ///
355    /// Panics if allowed lateness does not fit in i64.
356    pub fn new(assigner: SlidingWindowAssigner, aggregator: A, allowed_lateness: Duration) -> Self {
357        let operator_num = SLIDING_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
358        let output_schema = Arc::new(Schema::new(vec![
359            Field::new("window_start", DataType::Int64, false),
360            Field::new("window_end", DataType::Int64, false),
361            Field::new(
362                "result",
363                aggregator.output_data_type(),
364                aggregator.output_nullable(),
365            ),
366        ]));
367        Self {
368            assigner,
369            aggregator,
370            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
371                .expect("Allowed lateness must fit in i64"),
372            registered_windows: rustc_hash::FxHashSet::default(),
373            periodic_timer_windows: rustc_hash::FxHashSet::default(),
374            emit_strategy: EmitStrategy::default(),
375            late_data_config: LateDataConfig::default(),
376            late_data_metrics: LateDataMetrics::new(),
377            window_close_metrics: WindowCloseMetrics::new(),
378            operator_id: format!("sliding_window_{operator_num}"),
379            output_schema,
380            last_emitted: FxHashMap::default(),
381            _phantom: PhantomData,
382        }
383    }
384
385    /// Creates a new sliding window operator with a custom operator ID.
386    /// # Panics
387    ///
388    /// Panics if allowed lateness does not fit in i64.
389    pub fn with_id(
390        assigner: SlidingWindowAssigner,
391        aggregator: A,
392        allowed_lateness: Duration,
393        operator_id: String,
394    ) -> Self {
395        let output_schema = Arc::new(Schema::new(vec![
396            Field::new("window_start", DataType::Int64, false),
397            Field::new("window_end", DataType::Int64, false),
398            Field::new(
399                "result",
400                aggregator.output_data_type(),
401                aggregator.output_nullable(),
402            ),
403        ]));
404        Self {
405            assigner,
406            aggregator,
407            allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
408                .expect("Allowed lateness must fit in i64"),
409            registered_windows: rustc_hash::FxHashSet::default(),
410            periodic_timer_windows: rustc_hash::FxHashSet::default(),
411            emit_strategy: EmitStrategy::default(),
412            late_data_config: LateDataConfig::default(),
413            late_data_metrics: LateDataMetrics::new(),
414            window_close_metrics: WindowCloseMetrics::new(),
415            operator_id,
416            output_schema,
417            last_emitted: FxHashMap::default(),
418            _phantom: PhantomData,
419        }
420    }
421
422    /// Sets the emit strategy for this window operator.
423    pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
424        self.emit_strategy = strategy;
425    }
426
427    /// Returns the current emit strategy.
428    #[must_use]
429    pub fn emit_strategy(&self) -> &EmitStrategy {
430        &self.emit_strategy
431    }
432
433    /// Sets the late data handling configuration.
434    pub fn set_late_data_config(&mut self, config: LateDataConfig) {
435        self.late_data_config = config;
436    }
437
438    /// Returns the current late data configuration.
439    #[must_use]
440    pub fn late_data_config(&self) -> &LateDataConfig {
441        &self.late_data_config
442    }
443
444    /// Returns the late data metrics.
445    #[must_use]
446    pub fn late_data_metrics(&self) -> &LateDataMetrics {
447        &self.late_data_metrics
448    }
449
450    /// Resets the late data metrics counters.
451    pub fn reset_late_data_metrics(&mut self) {
452        self.late_data_metrics.reset();
453    }
454
455    /// Returns the window close metrics.
456    #[must_use]
457    pub fn window_close_metrics(&self) -> &WindowCloseMetrics {
458        &self.window_close_metrics
459    }
460
461    /// Resets the window close metrics counters.
462    pub fn reset_window_close_metrics(&mut self) {
463        self.window_close_metrics.reset();
464    }
465
466    /// Returns the number of windows currently accumulating events.
467    #[must_use]
468    pub fn active_windows_count(&self) -> usize {
469        self.registered_windows.len()
470    }
471
472    /// Returns the window assigner.
473    #[must_use]
474    pub fn assigner(&self) -> &SlidingWindowAssigner {
475        &self.assigner
476    }
477
478    /// Returns the allowed lateness in milliseconds.
479    #[must_use]
480    pub fn allowed_lateness_ms(&self) -> i64 {
481        self.allowed_lateness_ms
482    }
483
484    /// Generates the state key for a window's accumulator.
485    #[inline]
486    fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
487        let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
488        key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
489        let window_key = window_id.to_key_inline();
490        key[4..20].copy_from_slice(&window_key);
491        key
492    }
493
494    /// Gets the accumulator for a window, creating a new one if needed.
495    fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
496        let key = Self::state_key(window_id);
497        state
498            .get_typed::<A::Acc>(&key)
499            .ok()
500            .flatten()
501            .unwrap_or_else(|| self.aggregator.create_accumulator())
502    }
503
504    /// Stores the accumulator for a window.
505    fn put_accumulator(
506        window_id: &WindowId,
507        acc: &A::Acc,
508        state: &mut dyn StateStore,
509    ) -> Result<(), OperatorError> {
510        let key = Self::state_key(window_id);
511        state
512            .put_typed(&key, acc)
513            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
514    }
515
516    /// Deletes the accumulator for a window.
517    fn delete_accumulator(
518        window_id: &WindowId,
519        state: &mut dyn StateStore,
520    ) -> Result<(), OperatorError> {
521        let key = Self::state_key(window_id);
522        state
523            .delete(&key)
524            .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
525    }
526
527    /// Checks if an event is late for all possible windows.
528    ///
529    /// An event is late if all windows it would belong to have already closed
530    /// (watermark has passed their end + allowed lateness).
531    fn is_late(&self, event_time: i64, watermark: i64) -> bool {
532        // Get all windows this event would belong to
533        let windows = self.assigner.assign_windows(event_time);
534
535        // Event is late only if ALL its windows have closed
536        windows.iter().all(|window_id| {
537            let cleanup_time = window_id.end + self.allowed_lateness_ms;
538            watermark >= cleanup_time
539        })
540    }
541
542    /// Registers a timer for window triggering if not already registered.
543    fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
544        if !self.registered_windows.contains(&window_id) {
545            let trigger_time = window_id.end + self.allowed_lateness_ms;
546            ctx.timers.register_timer(
547                trigger_time,
548                Some(window_id.to_key()),
549                Some(ctx.operator_index),
550            );
551            self.registered_windows.insert(window_id);
552        }
553    }
554
555    /// Registers a periodic timer for intermediate emissions.
556    fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
557        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
558            if !self.periodic_timer_windows.contains(&window_id) {
559                let interval_ms =
560                    i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
561                let trigger_time = ctx.processing_time + interval_ms;
562                let key = Self::periodic_timer_key(&window_id);
563                ctx.timers
564                    .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
565                self.periodic_timer_windows.insert(window_id);
566            }
567        }
568    }
569
570    /// Creates a periodic timer key from a window ID.
571    #[inline]
572    fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
573        let mut key = window_id.to_key();
574        if !key.is_empty() {
575            key[0] |= 0x80;
576        }
577        key
578    }
579
580    /// Checks if a timer key is for a periodic timer.
581    #[inline]
582    fn is_periodic_timer_key(key: &[u8]) -> bool {
583        !key.is_empty() && (key[0] & 0x80) != 0
584    }
585
586    /// Extracts the window ID from a periodic timer key.
587    #[inline]
588    fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
589        if key.len() != 16 {
590            return None;
591        }
592        let mut clean_key = [0u8; 16];
593        clean_key.copy_from_slice(key);
594        clean_key[0] &= 0x7F;
595        WindowId::from_key(&clean_key)
596    }
597
598    /// Creates an intermediate result for a window without cleaning up state.
599    fn create_intermediate_result(
600        &self,
601        window_id: &WindowId,
602        state: &dyn StateStore,
603    ) -> Option<Event> {
604        let acc = self.get_accumulator(window_id, state);
605
606        if acc.is_empty() {
607            return None;
608        }
609
610        let result = acc.result();
611        let result_array = result.to_arrow_array();
612
613        let batch = RecordBatch::try_new(
614            Arc::clone(&self.output_schema),
615            vec![
616                Arc::new(Int64Array::from(vec![window_id.start])),
617                Arc::new(Int64Array::from(vec![window_id.end])),
618                result_array,
619            ],
620        )
621        .ok()?;
622
623        Some(Event::new(window_id.end, batch))
624    }
625
626    /// Handles periodic timer expiration for intermediate emissions.
627    fn handle_periodic_timer(
628        &mut self,
629        window_id: WindowId,
630        ctx: &mut OperatorContext,
631    ) -> OutputVec {
632        let mut output = OutputVec::new();
633
634        if !self.registered_windows.contains(&window_id) {
635            self.periodic_timer_windows.remove(&window_id);
636            return output;
637        }
638
639        if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
640            output.push(Output::Event(event));
641        }
642
643        if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
644            let interval_ms =
645                i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
646            let next_trigger = ctx.processing_time + interval_ms;
647            let window_close_time = window_id.end + self.allowed_lateness_ms;
648            if next_trigger < window_close_time {
649                let key = Self::periodic_timer_key(&window_id);
650                ctx.timers
651                    .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
652            }
653        }
654
655        output
656    }
657}
658
659impl<A: Aggregator> Operator for SlidingWindowOperator<A>
660where
661    A::Acc: 'static
662        + Archive
663        + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
664    <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
665        + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
666{
667    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
668        let event_time = event.timestamp;
669
670        // Update watermark with the new event
671        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
672
673        // Check if this event is too late (all windows closed)
674        let current_wm = ctx.watermark_generator.current_watermark();
675        if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
676            let mut output = OutputVec::new();
677
678            // EMIT FINAL drops late data entirely
679            if self.emit_strategy.drops_late_data() {
680                self.late_data_metrics.record_dropped();
681                return output; // Silently drop - no LateEvent output
682            }
683
684            if let Some(side_output_name) = self.late_data_config.side_output() {
685                self.late_data_metrics.record_side_output();
686                output.push(Output::SideOutput(Box::new(SideOutputData {
687                    name: Arc::from(side_output_name),
688                    event: event.clone(),
689                })));
690            } else {
691                self.late_data_metrics.record_dropped();
692                output.push(Output::LateEvent(event.clone()));
693            }
694            return output;
695        }
696
697        // Assign event to all overlapping windows
698        let windows = self.assigner.assign_windows(event_time);
699
700        // Track windows that were updated (for OnUpdate and Changelog strategies)
701        let mut updated_windows = SmallVec::<[WindowId; 4]>::new();
702
703        // Update accumulator for each window
704        for window_id in &windows {
705            // Skip windows that have already closed
706            let cleanup_time = window_id.end + self.allowed_lateness_ms;
707            if current_wm > i64::MIN && current_wm >= cleanup_time {
708                continue;
709            }
710
711            // Extract values for each window (handles multi-row batches)
712            let values = self.aggregator.extract_batch(event);
713            if !values.is_empty() {
714                let mut acc = self.get_accumulator(window_id, ctx.state);
715                for value in values {
716                    acc.add(value);
717                }
718                if Self::put_accumulator(window_id, &acc, ctx.state).is_ok() {
719                    updated_windows.push(*window_id);
720                }
721            }
722
723            // Register timers for this window
724            self.maybe_register_timer(*window_id, ctx);
725
726            // OnWindowClose and Final suppress intermediate emissions
727            if !self.emit_strategy.suppresses_intermediate() {
728                self.maybe_register_periodic_timer(*window_id, ctx);
729            }
730        }
731
732        // Build output
733        let mut output = OutputVec::new();
734
735        // Emit watermark update if generated
736        if let Some(wm) = emitted_watermark {
737            output.push(Output::Watermark(wm.timestamp()));
738        }
739
740        // Handle different emit strategies for intermediate emissions
741        if !updated_windows.is_empty() {
742            match &self.emit_strategy {
743                // OnUpdate: emit intermediate result as regular event
744                EmitStrategy::OnUpdate => {
745                    for window_id in &updated_windows {
746                        if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
747                            output.push(Output::Event(event));
748                        }
749                    }
750                }
751                // Changelog: emit retraction of previous value, then insert new.
752                // This maintains Z-set balance: net weight per window = +1 (final).
753                EmitStrategy::Changelog => {
754                    for window_id in &updated_windows {
755                        if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
756                            // Retract previous intermediate if one was emitted
757                            if let Some(old_event) = self.last_emitted.get(window_id) {
758                                let retract =
759                                    ChangelogRecord::delete(old_event.clone(), ctx.processing_time);
760                                output.push(Output::Changelog(retract));
761                            }
762                            // Emit new intermediate
763                            let record =
764                                ChangelogRecord::insert(event.clone(), ctx.processing_time);
765                            output.push(Output::Changelog(record));
766                            self.last_emitted.insert(*window_id, event);
767                            if self.last_emitted.len().is_multiple_of(100_000) {
768                                tracing::warn!(
769                                    operator = %self.operator_id,
770                                    tracked_windows = self.last_emitted.len(),
771                                    "last_emitted map is large — windows \
772                                     may not be closing. Check watermark \
773                                     advancement and window configuration."
774                                );
775                            }
776                        }
777                    }
778                }
779                // Other strategies: no intermediate emission
780                EmitStrategy::OnWatermark
781                | EmitStrategy::Periodic(_)
782                | EmitStrategy::OnWindowClose
783                | EmitStrategy::Final => {}
784            }
785        }
786
787        output
788    }
789
790    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
791        // Check if this is a periodic timer
792        if Self::is_periodic_timer_key(&timer.key) {
793            // OnWindowClose and Final suppress periodic emissions
794            if self.emit_strategy.suppresses_intermediate() {
795                // Don't emit, just clean up the periodic timer tracking
796                if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
797                    self.periodic_timer_windows.remove(&window_id);
798                }
799                return OutputVec::new();
800            }
801
802            if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
803                return self.handle_periodic_timer(window_id, ctx);
804            }
805            return OutputVec::new();
806        }
807
808        // Parse window ID from timer key (final emission timer)
809        let Some(window_id) = WindowId::from_key(&timer.key) else {
810            return OutputVec::new();
811        };
812
813        // Get the accumulator
814        let acc = self.get_accumulator(&window_id, ctx.state);
815
816        // Skip empty windows
817        if acc.is_empty() {
818            let _ = Self::delete_accumulator(&window_id, ctx.state);
819            self.registered_windows.remove(&window_id);
820            self.periodic_timer_windows.remove(&window_id);
821            self.last_emitted.remove(&window_id);
822            return OutputVec::new();
823        }
824
825        // Get the result
826        let result = acc.result();
827
828        // Clean up window state (last_emitted retraction handled above in Changelog branch)
829        let _ = Self::delete_accumulator(&window_id, ctx.state);
830        self.registered_windows.remove(&window_id);
831        self.periodic_timer_windows.remove(&window_id);
832        // Note: last_emitted for this window_id already removed in Changelog branch above
833
834        // Convert result to Arrow array (preserves native type)
835        let result_array = result.to_arrow_array();
836
837        // Create output batch
838        let batch = RecordBatch::try_new(
839            Arc::clone(&self.output_schema),
840            vec![
841                Arc::new(Int64Array::from(vec![window_id.start])),
842                Arc::new(Int64Array::from(vec![window_id.end])),
843                result_array,
844            ],
845        );
846
847        let mut output = OutputVec::new();
848        match batch {
849            Ok(data) => {
850                let event = Event::new(window_id.end, data);
851
852                // Record window close metrics
853                self.window_close_metrics
854                    .record_close(window_id.end, ctx.processing_time);
855
856                // Emit based on strategy
857                match &self.emit_strategy {
858                    // Changelog: retract last intermediate, then emit final insert
859                    EmitStrategy::Changelog => {
860                        if let Some(old_event) = self.last_emitted.remove(&window_id) {
861                            let retract = ChangelogRecord::delete(old_event, ctx.processing_time);
862                            output.push(Output::Changelog(retract));
863                        }
864                        let record = ChangelogRecord::insert(event, ctx.processing_time);
865                        output.push(Output::Changelog(record));
866                    }
867                    // All other strategies: emit as regular event
868                    EmitStrategy::OnWatermark
869                    | EmitStrategy::Periodic(_)
870                    | EmitStrategy::OnUpdate
871                    | EmitStrategy::OnWindowClose
872                    | EmitStrategy::Final => {
873                        output.push(Output::Event(event));
874                    }
875                }
876            }
877            Err(e) => {
878                tracing::error!("Failed to create output batch: {e}");
879            }
880        }
881        output
882    }
883
884    fn checkpoint(&self) -> OperatorState {
885        let windows: Vec<_> = self.registered_windows.iter().copied().collect();
886        let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
887
888        let checkpoint_data = (windows, periodic_windows);
889        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
890            .map(|v| v.to_vec())
891            .unwrap_or_default();
892
893        OperatorState {
894            operator_id: self.operator_id.clone(),
895            data,
896        }
897    }
898
899    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
900        if state.operator_id != self.operator_id {
901            return Err(OperatorError::StateAccessFailed(format!(
902                "Operator ID mismatch: expected {}, got {}",
903                self.operator_id, state.operator_id
904            )));
905        }
906
907        // Try to deserialize as the new format (tuple of two vectors)
908        if let Ok(archived) =
909            rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
910        {
911            if let Ok((windows, periodic_windows)) =
912                rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
913            {
914                self.registered_windows = windows.into_iter().collect();
915                self.periodic_timer_windows = periodic_windows.into_iter().collect();
916                return Ok(());
917            }
918        }
919
920        // Fall back to old format for backwards compatibility
921        let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
922            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
923        let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
924            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
925
926        self.registered_windows = windows.into_iter().collect();
927        self.periodic_timer_windows = rustc_hash::FxHashSet::default();
928        Ok(())
929    }
930}
931
932#[cfg(test)]
933mod tests {
934    use super::*;
935    use crate::operator::window::{CountAccumulator, CountAggregator, SumAggregator};
936    use crate::state::InMemoryStore;
937    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
938    use arrow_array::{Int64Array, RecordBatch};
939    use arrow_schema::{DataType, Field, Schema};
940
941    fn create_test_event(timestamp: i64, value: i64) -> Event {
942        let schema = Arc::new(Schema::new(vec![Field::new(
943            "value",
944            DataType::Int64,
945            false,
946        )]));
947        let batch =
948            RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
949        Event::new(timestamp, batch)
950    }
951
952    fn create_test_context<'a>(
953        timers: &'a mut TimerService,
954        state: &'a mut dyn StateStore,
955        watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
956    ) -> OperatorContext<'a> {
957        OperatorContext {
958            event_time: 0,
959            processing_time: 0,
960            timers,
961            state,
962            watermark_generator: watermark_gen,
963            operator_index: 0,
964        }
965    }
966
967    #[test]
968    fn test_sliding_assigner_creation() {
969        let assigner = SlidingWindowAssigner::new(Duration::from_secs(60), Duration::from_secs(20));
970
971        assert_eq!(assigner.size_ms(), 60_000);
972        assert_eq!(assigner.slide_ms(), 20_000);
973        assert_eq!(assigner.windows_per_event(), 3); // ceil(60/20) = 3
974    }
975
976    #[test]
977    fn test_sliding_assigner_from_millis() {
978        let assigner = SlidingWindowAssigner::from_millis(1000, 200);
979
980        assert_eq!(assigner.size_ms(), 1000);
981        assert_eq!(assigner.slide_ms(), 200);
982        assert_eq!(assigner.windows_per_event(), 5); // ceil(1000/200) = 5
983    }
984
985    #[test]
986    #[should_panic(expected = "Window size must be positive")]
987    fn test_sliding_assigner_zero_size() {
988        let _ = SlidingWindowAssigner::from_millis(0, 100);
989    }
990
991    #[test]
992    #[should_panic(expected = "Slide interval must be positive")]
993    fn test_sliding_assigner_zero_slide() {
994        let _ = SlidingWindowAssigner::from_millis(1000, 0);
995    }
996
997    #[test]
998    #[should_panic(expected = "Slide must not exceed size")]
999    fn test_sliding_assigner_slide_exceeds_size() {
1000        let _ = SlidingWindowAssigner::from_millis(100, 200);
1001    }
1002
1003    #[test]
1004    fn test_sliding_assigner_basic_assignment() {
1005        // 1-minute window with 20-second slide
1006        let assigner = SlidingWindowAssigner::from_millis(60_000, 20_000);
1007
1008        // Event at t=50000 should belong to 3 windows
1009        let windows = assigner.assign_windows(50_000);
1010
1011        assert_eq!(windows.len(), 3);
1012
1013        // Windows should be (in chronological order):
1014        // [20000, 80000), [40000, 100000), [60000, 120000) - but wait, 50000 is NOT in [60000, 120000)
1015        // Let me recalculate:
1016        // last_window_start(50000) = (50000 / 20000) * 20000 = 40000
1017        // Window [40000, 100000) contains 50000? Yes (40000 <= 50000 < 100000)
1018        // Window [20000, 80000) contains 50000? Yes (20000 <= 50000 < 80000)
1019        // Window [0, 60000) contains 50000? Yes (0 <= 50000 < 60000)
1020        // Window [-20000, 40000) contains 50000? No (50000 >= 40000)
1021
1022        assert!(windows.contains(&WindowId::new(0, 60_000)));
1023        assert!(windows.contains(&WindowId::new(20_000, 80_000)));
1024        assert!(windows.contains(&WindowId::new(40_000, 100_000)));
1025    }
1026
1027    #[test]
1028    fn test_sliding_assigner_boundary_event() {
1029        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1030
1031        // Event exactly at window boundary (t=1000)
1032        let windows = assigner.assign_windows(1000);
1033
1034        // Should belong to windows starting at 500 and 1000
1035        // Window [1000, 2000) contains 1000? Yes
1036        // Window [500, 1500) contains 1000? Yes
1037        // Window [0, 1000) contains 1000? No (end is exclusive)
1038        assert_eq!(windows.len(), 2);
1039        assert!(windows.contains(&WindowId::new(500, 1500)));
1040        assert!(windows.contains(&WindowId::new(1000, 2000)));
1041    }
1042
1043    #[test]
1044    fn test_sliding_assigner_negative_timestamp() {
1045        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1046
1047        // Event at t=-500
1048        let windows = assigner.assign_windows(-500);
1049
1050        // Should belong to windows containing -500
1051        // last_window_start(-500) = floor(-500 / 500) * 500 = -500
1052        // Window [-500, 500) contains -500? Yes
1053        // Window [-1000, 0) contains -500? Yes
1054        // Window [-1500, -500) contains -500? No (end is exclusive)
1055        assert_eq!(windows.len(), 2);
1056        assert!(windows.contains(&WindowId::new(-1000, 0)));
1057        assert!(windows.contains(&WindowId::new(-500, 500)));
1058    }
1059
1060    #[test]
1061    fn test_sliding_assigner_equal_size_and_slide() {
1062        // When size == slide, should behave like tumbling windows
1063        let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
1064
1065        assert_eq!(assigner.windows_per_event(), 1);
1066
1067        let windows = assigner.assign_windows(500);
1068        assert_eq!(windows.len(), 1);
1069        assert_eq!(windows[0], WindowId::new(0, 1000));
1070    }
1071
1072    #[test]
1073    fn test_sliding_assigner_small_slide() {
1074        // 1 second window, 100ms slide = 10 windows per event
1075        let assigner = SlidingWindowAssigner::from_millis(1000, 100);
1076
1077        assert_eq!(assigner.windows_per_event(), 10);
1078
1079        let windows = assigner.assign_windows(500);
1080        assert_eq!(windows.len(), 10);
1081    }
1082
1083    #[test]
1084    fn test_sliding_operator_creation() {
1085        let assigner = SlidingWindowAssigner::from_millis(1000, 200);
1086        let aggregator = CountAggregator::new();
1087        let operator = SlidingWindowOperator::new(assigner, aggregator, Duration::from_millis(100));
1088
1089        assert_eq!(operator.allowed_lateness_ms(), 100);
1090        assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
1091        assert!(operator.late_data_config().should_drop());
1092    }
1093
1094    #[test]
1095    fn test_sliding_operator_with_id() {
1096        let assigner = SlidingWindowAssigner::from_millis(1000, 200);
1097        let aggregator = CountAggregator::new();
1098        let operator = SlidingWindowOperator::with_id(
1099            assigner,
1100            aggregator,
1101            Duration::from_millis(0),
1102            "test_sliding".to_string(),
1103        );
1104
1105        assert_eq!(operator.operator_id, "test_sliding");
1106    }
1107
1108    #[test]
1109    fn test_sliding_operator_process_single_event() {
1110        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1111        let aggregator = CountAggregator::new();
1112        let mut operator = SlidingWindowOperator::with_id(
1113            assigner,
1114            aggregator,
1115            Duration::from_millis(0),
1116            "test_op".to_string(),
1117        );
1118
1119        let mut timers = TimerService::new();
1120        let mut state = InMemoryStore::new();
1121        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1122
1123        // Process event at t=600
1124        // Should belong to windows [0, 1000) and [500, 1500)
1125        let event = create_test_event(600, 1);
1126        {
1127            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1128            operator.process(&event, &mut ctx);
1129        }
1130
1131        // Should have 2 registered windows
1132        assert_eq!(operator.registered_windows.len(), 2);
1133        assert!(operator
1134            .registered_windows
1135            .contains(&WindowId::new(0, 1000)));
1136        assert!(operator
1137            .registered_windows
1138            .contains(&WindowId::new(500, 1500)));
1139    }
1140
1141    #[test]
1142    fn test_sliding_operator_accumulates_correctly() {
1143        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1144        let aggregator = CountAggregator::new();
1145        let mut operator = SlidingWindowOperator::with_id(
1146            assigner.clone(),
1147            aggregator,
1148            Duration::from_millis(0),
1149            "test_op".to_string(),
1150        );
1151
1152        let mut timers = TimerService::new();
1153        let mut state = InMemoryStore::new();
1154        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1155
1156        // Process events at t=100, t=600, t=800
1157        // t=100: belongs to [0, 1000)
1158        // t=600: belongs to [0, 1000), [500, 1500)
1159        // t=800: belongs to [0, 1000), [500, 1500)
1160        for ts in [100, 600, 800] {
1161            let event = create_test_event(ts, 1);
1162            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1163            operator.process(&event, &mut ctx);
1164        }
1165
1166        // Window [0, 1000) should have count = 3
1167        let window_0_1000 = WindowId::new(0, 1000);
1168        let acc: CountAccumulator = operator.get_accumulator(&window_0_1000, &state);
1169        assert_eq!(acc.result(), 3);
1170
1171        // Window [500, 1500) should have count = 2
1172        let window_500_1500 = WindowId::new(500, 1500);
1173        let acc: CountAccumulator = operator.get_accumulator(&window_500_1500, &state);
1174        assert_eq!(acc.result(), 2);
1175    }
1176
1177    #[test]
1178    fn test_sliding_operator_window_trigger() {
1179        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1180        let aggregator = CountAggregator::new();
1181        let mut operator = SlidingWindowOperator::with_id(
1182            assigner,
1183            aggregator,
1184            Duration::from_millis(0),
1185            "test_op".to_string(),
1186        );
1187
1188        let mut timers = TimerService::new();
1189        let mut state = InMemoryStore::new();
1190        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1191
1192        // Process 3 events in window [0, 1000)
1193        for ts in [100, 200, 300] {
1194            let event = create_test_event(ts, 1);
1195            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1196            operator.process(&event, &mut ctx);
1197        }
1198
1199        // Trigger window [0, 1000)
1200        let timer = Timer {
1201            key: WindowId::new(0, 1000).to_key(),
1202            timestamp: 1000,
1203        };
1204
1205        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1206        let outputs = operator.on_timer(timer, &mut ctx);
1207
1208        assert_eq!(outputs.len(), 1);
1209        match &outputs[0] {
1210            Output::Event(event) => {
1211                assert_eq!(event.timestamp, 1000);
1212                let result_col = event.data.column(2);
1213                let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
1214                assert_eq!(result_array.value(0), 3);
1215            }
1216            _ => panic!("Expected Event output"),
1217        }
1218
1219        // Window should be cleaned up
1220        assert!(!operator
1221            .registered_windows
1222            .contains(&WindowId::new(0, 1000)));
1223    }
1224
1225    #[test]
1226    fn test_sliding_operator_multiple_window_triggers() {
1227        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1228        let aggregator = SumAggregator::new(0);
1229        let mut operator = SlidingWindowOperator::with_id(
1230            assigner,
1231            aggregator,
1232            Duration::from_millis(0),
1233            "test_op".to_string(),
1234        );
1235
1236        let mut timers = TimerService::new();
1237        let mut state = InMemoryStore::new();
1238        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1239
1240        // Process event at t=600 with value 10
1241        // Belongs to [0, 1000) and [500, 1500)
1242        let event = create_test_event(600, 10);
1243        {
1244            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1245            operator.process(&event, &mut ctx);
1246        }
1247
1248        // Trigger first window [0, 1000)
1249        let t1 = Timer {
1250            key: WindowId::new(0, 1000).to_key(),
1251            timestamp: 1000,
1252        };
1253        let outputs1 = {
1254            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1255            operator.on_timer(t1, &mut ctx)
1256        };
1257
1258        assert_eq!(outputs1.len(), 1);
1259        if let Output::Event(e) = &outputs1[0] {
1260            let result = e
1261                .data
1262                .column(2)
1263                .as_any()
1264                .downcast_ref::<Int64Array>()
1265                .unwrap()
1266                .value(0);
1267            assert_eq!(result, 10);
1268        }
1269
1270        // Trigger second window [500, 1500)
1271        let t2 = Timer {
1272            key: WindowId::new(500, 1500).to_key(),
1273            timestamp: 1500,
1274        };
1275        let outputs2 = {
1276            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1277            operator.on_timer(t2, &mut ctx)
1278        };
1279
1280        assert_eq!(outputs2.len(), 1);
1281        if let Output::Event(e) = &outputs2[0] {
1282            let result = e
1283                .data
1284                .column(2)
1285                .as_any()
1286                .downcast_ref::<Int64Array>()
1287                .unwrap()
1288                .value(0);
1289            assert_eq!(result, 10);
1290        }
1291
1292        // Both windows should be cleaned up
1293        assert!(operator.registered_windows.is_empty());
1294    }
1295
1296    #[test]
1297    fn test_sliding_operator_late_event() {
1298        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1299        let aggregator = CountAggregator::new();
1300        let mut operator = SlidingWindowOperator::with_id(
1301            assigner,
1302            aggregator,
1303            Duration::from_millis(0),
1304            "test_op".to_string(),
1305        );
1306
1307        let mut timers = TimerService::new();
1308        let mut state = InMemoryStore::new();
1309        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1310
1311        // Advance watermark to 2000
1312        let event1 = create_test_event(2000, 1);
1313        {
1314            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1315            operator.process(&event1, &mut ctx);
1316        }
1317
1318        // Process late event at t=500 (all windows closed)
1319        let late_event = create_test_event(500, 2);
1320        let outputs = {
1321            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1322            operator.process(&late_event, &mut ctx)
1323        };
1324
1325        // Should emit LateEvent
1326        let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1327        assert!(is_late);
1328        assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1329    }
1330
1331    #[test]
1332    fn test_sliding_operator_late_event_side_output() {
1333        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1334        let aggregator = CountAggregator::new();
1335        let mut operator = SlidingWindowOperator::with_id(
1336            assigner,
1337            aggregator,
1338            Duration::from_millis(0),
1339            "test_op".to_string(),
1340        );
1341
1342        operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1343
1344        let mut timers = TimerService::new();
1345        let mut state = InMemoryStore::new();
1346        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1347
1348        // Advance watermark
1349        let event1 = create_test_event(2000, 1);
1350        {
1351            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1352            operator.process(&event1, &mut ctx);
1353        }
1354
1355        // Process late event
1356        let late_event = create_test_event(500, 2);
1357        let outputs = {
1358            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1359            operator.process(&late_event, &mut ctx)
1360        };
1361
1362        // Should emit SideOutput
1363        let side_output = outputs.iter().find_map(|o| {
1364            if let Output::SideOutput(data) = o {
1365                Some(data.name.clone())
1366            } else {
1367                None
1368            }
1369        });
1370        assert_eq!(side_output.as_deref(), Some("late"));
1371        assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1372    }
1373
1374    #[test]
1375    fn test_sliding_operator_emit_on_update() {
1376        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1377        let aggregator = CountAggregator::new();
1378        let mut operator = SlidingWindowOperator::with_id(
1379            assigner,
1380            aggregator,
1381            Duration::from_millis(0),
1382            "test_op".to_string(),
1383        );
1384
1385        operator.set_emit_strategy(EmitStrategy::OnUpdate);
1386
1387        let mut timers = TimerService::new();
1388        let mut state = InMemoryStore::new();
1389        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1390
1391        // Process event - should emit intermediate results for both windows
1392        let event = create_test_event(600, 1);
1393        let outputs = {
1394            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1395            operator.process(&event, &mut ctx)
1396        };
1397
1398        // Should have 2 Event outputs (one per window)
1399        let event_count = outputs
1400            .iter()
1401            .filter(|o| matches!(o, Output::Event(_)))
1402            .count();
1403        assert_eq!(event_count, 2);
1404    }
1405
1406    #[test]
1407    fn test_sliding_operator_checkpoint_restore() {
1408        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1409        let aggregator = CountAggregator::new();
1410        let mut operator = SlidingWindowOperator::with_id(
1411            assigner.clone(),
1412            aggregator.clone(),
1413            Duration::from_millis(0),
1414            "test_op".to_string(),
1415        );
1416
1417        // Register some windows
1418        operator.registered_windows.insert(WindowId::new(0, 1000));
1419        operator.registered_windows.insert(WindowId::new(500, 1500));
1420        operator
1421            .periodic_timer_windows
1422            .insert(WindowId::new(0, 1000));
1423
1424        // Checkpoint
1425        let checkpoint = operator.checkpoint();
1426
1427        // Create new operator and restore
1428        let mut restored = SlidingWindowOperator::with_id(
1429            assigner,
1430            aggregator,
1431            Duration::from_millis(0),
1432            "test_op".to_string(),
1433        );
1434        restored.restore(checkpoint).unwrap();
1435
1436        assert_eq!(restored.registered_windows.len(), 2);
1437        assert_eq!(restored.periodic_timer_windows.len(), 1);
1438        assert!(restored
1439            .registered_windows
1440            .contains(&WindowId::new(0, 1000)));
1441        assert!(restored
1442            .registered_windows
1443            .contains(&WindowId::new(500, 1500)));
1444        assert!(restored
1445            .periodic_timer_windows
1446            .contains(&WindowId::new(0, 1000)));
1447    }
1448
1449    #[test]
1450    fn test_sliding_operator_empty_window_trigger() {
1451        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1452        let aggregator = CountAggregator::new();
1453        let mut operator = SlidingWindowOperator::with_id(
1454            assigner,
1455            aggregator,
1456            Duration::from_millis(0),
1457            "test_op".to_string(),
1458        );
1459
1460        let mut timers = TimerService::new();
1461        let mut state = InMemoryStore::new();
1462        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1463
1464        // Trigger without any events
1465        let timer = Timer {
1466            key: WindowId::new(0, 1000).to_key(),
1467            timestamp: 1000,
1468        };
1469
1470        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1471        let outputs = operator.on_timer(timer, &mut ctx);
1472
1473        // Empty window should produce no output
1474        assert!(outputs.is_empty());
1475    }
1476
1477    #[test]
1478    fn test_sliding_operator_periodic_timer_key() {
1479        let window_id = WindowId::new(1000, 2000);
1480
1481        let periodic_key = SlidingWindowOperator::<CountAggregator>::periodic_timer_key(&window_id);
1482        assert!(SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(&periodic_key));
1483
1484        let extracted =
1485            SlidingWindowOperator::<CountAggregator>::window_id_from_periodic_key(&periodic_key);
1486        assert_eq!(extracted, Some(window_id));
1487
1488        // Regular key should not be detected as periodic
1489        let regular_key = window_id.to_key();
1490        assert!(!SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(&regular_key));
1491    }
1492
1493    #[test]
1494    fn test_sliding_operator_skips_closed_windows() {
1495        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1496        let aggregator = CountAggregator::new();
1497        let mut operator = SlidingWindowOperator::with_id(
1498            assigner,
1499            aggregator,
1500            Duration::from_millis(0),
1501            "test_op".to_string(),
1502        );
1503
1504        let mut timers = TimerService::new();
1505        let mut state = InMemoryStore::new();
1506        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1507
1508        // Advance watermark to 1100 (window [0, 1000) is closed, [500, 1500) is open)
1509        let event1 = create_test_event(1100, 1);
1510        {
1511            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1512            operator.process(&event1, &mut ctx);
1513        }
1514
1515        // Process event at t=800 - belongs to [0, 1000) and [500, 1500)
1516        // But [0, 1000) is closed, so should only update [500, 1500)
1517        let event2 = create_test_event(800, 1);
1518        {
1519            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1520            operator.process(&event2, &mut ctx);
1521        }
1522
1523        // Only [500, 1500) and [1000, 2000) should be registered (not [0, 1000))
1524        assert!(!operator
1525            .registered_windows
1526            .contains(&WindowId::new(0, 1000)));
1527        assert!(operator
1528            .registered_windows
1529            .contains(&WindowId::new(500, 1500)));
1530    }
1531
1532    #[test]
1533    fn test_sliding_assigner_window_assigner_trait() {
1534        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1535
1536        // Test the WindowAssigner trait method
1537        let windows = assigner.assign_windows(600);
1538        assert_eq!(windows.len(), 2);
1539
1540        // Test max_timestamp
1541        assert_eq!(assigner.max_timestamp(1000), 999);
1542    }
1543
1544    #[test]
1545    fn test_sliding_operator_allowed_lateness() {
1546        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1547        let aggregator = CountAggregator::new();
1548        let mut operator = SlidingWindowOperator::with_id(
1549            assigner,
1550            aggregator,
1551            Duration::from_millis(500), // 500ms allowed lateness
1552            "test_op".to_string(),
1553        );
1554
1555        let mut timers = TimerService::new();
1556        let mut state = InMemoryStore::new();
1557        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1558
1559        // Advance watermark to 1200
1560        let event1 = create_test_event(1200, 1);
1561        {
1562            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1563            operator.process(&event1, &mut ctx);
1564        }
1565
1566        // Process event at t=800 - window [0, 1000) cleanup is at 1500
1567        // Watermark (1200) < cleanup time (1500), so NOT late
1568        let event2 = create_test_event(800, 1);
1569        let outputs = {
1570            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1571            operator.process(&event2, &mut ctx)
1572        };
1573
1574        // Should NOT be late
1575        let is_late = outputs
1576            .iter()
1577            .any(|o| matches!(o, Output::LateEvent(_) | Output::SideOutput(_)));
1578        assert!(!is_late);
1579        assert_eq!(operator.late_data_metrics().late_events_total(), 0);
1580    }
1581
1582    // ========================================================================
1583    // EMIT ON WINDOW CLOSE (EOWC) — Sliding Window Tests (Issue #52)
1584    // ========================================================================
1585
1586    #[test]
1587    fn test_eowc_sliding_multiple_windows_per_event() {
1588        // An event at t=50000 with size=60s, slide=20s belongs to 3 windows.
1589        // Each window should emit independently when its timer fires.
1590        let assigner = SlidingWindowAssigner::from_millis(60_000, 20_000);
1591        let aggregator = CountAggregator::new();
1592        let mut operator = SlidingWindowOperator::with_id(
1593            assigner,
1594            aggregator,
1595            Duration::from_millis(0),
1596            "eowc_slide".to_string(),
1597        );
1598        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
1599
1600        let mut timers = TimerService::new();
1601        let mut state = InMemoryStore::new();
1602        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1603
1604        // Single event at t=50000
1605        // Belongs to windows: [0, 60000), [20000, 80000), [40000, 100000)
1606        let event = create_test_event(50_000, 1);
1607        let outputs = {
1608            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1609            operator.process(&event, &mut ctx)
1610        };
1611
1612        // No intermediate emissions
1613        let event_outputs: Vec<_> = outputs
1614            .iter()
1615            .filter(|o| matches!(o, Output::Event(_)))
1616            .collect();
1617        assert!(
1618            event_outputs.is_empty(),
1619            "EOWC sliding should not emit intermediate results"
1620        );
1621
1622        // Fire each window's timer and collect emissions
1623        let windows = [
1624            WindowId::new(0, 60_000),
1625            WindowId::new(20_000, 80_000),
1626            WindowId::new(40_000, 100_000),
1627        ];
1628
1629        let mut emission_count = 0;
1630        for wid in &windows {
1631            let timer = Timer {
1632                key: wid.to_key(),
1633                timestamp: wid.end,
1634            };
1635            let out = {
1636                let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1637                operator.on_timer(timer, &mut ctx)
1638            };
1639            assert_eq!(
1640                out.len(),
1641                1,
1642                "Each window should emit exactly once (window [{}, {}))",
1643                wid.start,
1644                wid.end
1645            );
1646            if let Output::Event(e) = &out[0] {
1647                let result = e
1648                    .data
1649                    .column(2)
1650                    .as_any()
1651                    .downcast_ref::<Int64Array>()
1652                    .unwrap();
1653                assert_eq!(result.value(0), 1, "Each window should have count=1");
1654            }
1655            emission_count += 1;
1656        }
1657        assert_eq!(
1658            emission_count, 3,
1659            "Should have 3 separate emissions for 3 overlapping windows"
1660        );
1661    }
1662
1663    #[test]
1664    fn test_eowc_sliding_no_intermediate_emissions() {
1665        // Verify process() never returns Output::Event for OnWindowClose.
1666        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1667        let aggregator = CountAggregator::new();
1668        let mut operator = SlidingWindowOperator::with_id(
1669            assigner,
1670            aggregator,
1671            Duration::from_millis(0),
1672            "eowc_slide".to_string(),
1673        );
1674        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
1675
1676        let mut timers = TimerService::new();
1677        let mut state = InMemoryStore::new();
1678        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1679
1680        // Process events across multiple windows
1681        for ts in (0..10).map(|i| i * 200) {
1682            let event = create_test_event(ts, 1);
1683            let outputs = {
1684                let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1685                operator.process(&event, &mut ctx)
1686            };
1687            for output in &outputs {
1688                assert!(
1689                    !matches!(output, Output::Event(_)),
1690                    "process() must not emit Output::Event with OnWindowClose (ts={ts})"
1691                );
1692            }
1693        }
1694    }
1695
1696    #[test]
1697    fn test_eowc_sliding_overlapping_window_close_order() {
1698        // Verify windows close in chronological order (earliest window_end first).
1699        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1700        let aggregator = SumAggregator::new(0);
1701        let mut operator = SlidingWindowOperator::with_id(
1702            assigner,
1703            aggregator,
1704            Duration::from_millis(0),
1705            "eowc_slide".to_string(),
1706        );
1707        operator.set_emit_strategy(EmitStrategy::OnWindowClose);
1708
1709        let mut timers = TimerService::new();
1710        let mut state = InMemoryStore::new();
1711        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1712
1713        // Event at t=600 belongs to [0, 1000) and [500, 1500)
1714        let event = create_test_event(600, 10);
1715        {
1716            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1717            operator.process(&event, &mut ctx);
1718        }
1719
1720        // Fire window [0, 1000) first (earlier end)
1721        let win_timer_1 = Timer {
1722            key: WindowId::new(0, 1000).to_key(),
1723            timestamp: 1000,
1724        };
1725        let out1 = {
1726            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1727            operator.on_timer(win_timer_1, &mut ctx)
1728        };
1729        assert_eq!(out1.len(), 1);
1730        if let Output::Event(e) = &out1[0] {
1731            assert_eq!(e.timestamp, 1000, "First emission at window_end=1000");
1732        }
1733
1734        // Fire window [500, 1500) second (later end)
1735        let win_timer_2 = Timer {
1736            key: WindowId::new(500, 1500).to_key(),
1737            timestamp: 1500,
1738        };
1739        let out2 = {
1740            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1741            operator.on_timer(win_timer_2, &mut ctx)
1742        };
1743        assert_eq!(out2.len(), 1);
1744        if let Output::Event(e) = &out2[0] {
1745            assert_eq!(e.timestamp, 1500, "Second emission at window_end=1500");
1746            let result = e
1747                .data
1748                .column(2)
1749                .as_any()
1750                .downcast_ref::<Int64Array>()
1751                .unwrap();
1752            assert_eq!(
1753                result.value(0),
1754                10,
1755                "Both windows should contain the event sum=10"
1756            );
1757        }
1758
1759        // All windows should be cleaned up
1760        assert!(operator.registered_windows.is_empty());
1761    }
1762
1763    // ========================================================================
1764    // Changelog Retraction Tests
1765    // ========================================================================
1766
1767    #[test]
1768    fn test_sliding_changelog_retraction_on_update() {
1769        // Two events to the same window → second should emit delete(-1) then insert(+1)
1770        let assigner = SlidingWindowAssigner::from_millis(1000, 1000); // tumbling-like
1771        let aggregator = CountAggregator::new();
1772        let mut operator = SlidingWindowOperator::with_id(
1773            assigner,
1774            aggregator,
1775            Duration::from_millis(0),
1776            "changelog_op".to_string(),
1777        );
1778        operator.set_emit_strategy(EmitStrategy::Changelog);
1779
1780        let mut timers = TimerService::new();
1781        let mut state = InMemoryStore::new();
1782        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1783
1784        // First event at t=100 → insert only (no previous to retract)
1785        let event1 = create_test_event(100, 1);
1786        let out1 = {
1787            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1788            operator.process(&event1, &mut ctx)
1789        };
1790        let changelog1: Vec<_> = out1
1791            .iter()
1792            .filter(|o| matches!(o, Output::Changelog(_)))
1793            .collect();
1794        assert_eq!(
1795            changelog1.len(),
1796            1,
1797            "first event: only insert, no retraction"
1798        );
1799        if let Output::Changelog(rec) = &changelog1[0] {
1800            assert_eq!(rec.weight, 1, "first event should be insert (+1)");
1801        }
1802
1803        // Second event at t=200 in same window → retract previous, then insert new
1804        let event2 = create_test_event(200, 1);
1805        let out2 = {
1806            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1807            operator.process(&event2, &mut ctx)
1808        };
1809        let changelog2: Vec<_> = out2
1810            .iter()
1811            .filter(|o| matches!(o, Output::Changelog(_)))
1812            .collect();
1813        assert_eq!(
1814            changelog2.len(),
1815            2,
1816            "second event: retraction + insert = 2 changelog records"
1817        );
1818        if let Output::Changelog(retract) = &changelog2[0] {
1819            assert_eq!(retract.weight, -1, "first record should be retraction (-1)");
1820        }
1821        if let Output::Changelog(insert) = &changelog2[1] {
1822            assert_eq!(insert.weight, 1, "second record should be insert (+1)");
1823        }
1824    }
1825
1826    #[test]
1827    fn test_sliding_changelog_retraction_on_close() {
1828        // Window close should retract last intermediate before emitting final
1829        let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
1830        let aggregator = CountAggregator::new();
1831        let mut operator = SlidingWindowOperator::with_id(
1832            assigner,
1833            aggregator,
1834            Duration::from_millis(0),
1835            "changelog_op".to_string(),
1836        );
1837        operator.set_emit_strategy(EmitStrategy::Changelog);
1838
1839        let mut timers = TimerService::new();
1840        let mut state = InMemoryStore::new();
1841        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1842
1843        // Process an event to populate last_emitted
1844        let event = create_test_event(100, 1);
1845        {
1846            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1847            operator.process(&event, &mut ctx);
1848        }
1849
1850        // Fire the window timer → should emit retraction + final insert
1851        let timer = Timer {
1852            key: WindowId::new(0, 1000).to_key(),
1853            timestamp: 1000,
1854        };
1855        let out = {
1856            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1857            operator.on_timer(timer, &mut ctx)
1858        };
1859
1860        let changelog: Vec<_> = out
1861            .iter()
1862            .filter(|o| matches!(o, Output::Changelog(_)))
1863            .collect();
1864        assert_eq!(
1865            changelog.len(),
1866            2,
1867            "window close: retraction + final insert"
1868        );
1869        if let Output::Changelog(retract) = &changelog[0] {
1870            assert_eq!(retract.weight, -1, "first should be retraction");
1871        }
1872        if let Output::Changelog(insert) = &changelog[1] {
1873            assert_eq!(insert.weight, 1, "second should be final insert");
1874        }
1875
1876        // last_emitted should be cleaned up
1877        assert!(operator.last_emitted.is_empty());
1878    }
1879
1880    #[test]
1881    fn test_sliding_changelog_z_set_balance() {
1882        // Full lifecycle: 3 updates + close → net weight = +1 (final result)
1883        let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
1884        let aggregator = CountAggregator::new();
1885        let mut operator = SlidingWindowOperator::with_id(
1886            assigner,
1887            aggregator,
1888            Duration::from_millis(0),
1889            "changelog_op".to_string(),
1890        );
1891        operator.set_emit_strategy(EmitStrategy::Changelog);
1892
1893        let mut timers = TimerService::new();
1894        let mut state = InMemoryStore::new();
1895        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1896
1897        let mut total_weight: i32 = 0;
1898
1899        // 3 events in same window
1900        for ts in [100, 200, 300] {
1901            let event = create_test_event(ts, 1);
1902            let out = {
1903                let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1904                operator.process(&event, &mut ctx)
1905            };
1906            for output in &out {
1907                if let Output::Changelog(rec) = output {
1908                    total_weight += rec.weight;
1909                }
1910            }
1911        }
1912
1913        // Close the window
1914        let timer = Timer {
1915            key: WindowId::new(0, 1000).to_key(),
1916            timestamp: 1000,
1917        };
1918        let out = {
1919            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1920            operator.on_timer(timer, &mut ctx)
1921        };
1922        for output in &out {
1923            if let Output::Changelog(rec) = output {
1924                total_weight += rec.weight;
1925            }
1926        }
1927
1928        assert_eq!(
1929            total_weight, 1,
1930            "Z-set balance: net weight should be +1 (final result only)"
1931        );
1932    }
1933
1934    #[test]
1935    fn test_sliding_non_changelog_unaffected() {
1936        // OnUpdate strategy should NOT emit retractions
1937        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1938        let aggregator = CountAggregator::new();
1939        let mut operator = SlidingWindowOperator::with_id(
1940            assigner,
1941            aggregator,
1942            Duration::from_millis(0),
1943            "on_update_op".to_string(),
1944        );
1945        operator.set_emit_strategy(EmitStrategy::OnUpdate);
1946
1947        let mut timers = TimerService::new();
1948        let mut state = InMemoryStore::new();
1949        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1950
1951        // Two events in same window
1952        for ts in [100, 200] {
1953            let event = create_test_event(ts, 1);
1954            let out = {
1955                let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1956                operator.process(&event, &mut ctx)
1957            };
1958            // OnUpdate should only emit Event outputs, never Changelog
1959            for output in &out {
1960                assert!(
1961                    !matches!(output, Output::Changelog(_)),
1962                    "OnUpdate should not produce Changelog outputs"
1963                );
1964            }
1965        }
1966        // last_emitted should be empty (only used for Changelog)
1967        assert!(operator.last_emitted.is_empty());
1968    }
1969
1970    #[test]
1971    fn test_negative_near_min_no_overflow() {
1972        // The primary goal: saturating arithmetic prevents overflow panics near i64::MIN.
1973        // Before the fix, this would panic with "attempt to subtract with overflow".
1974        let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1975
1976        // last_window_start must not panic
1977        let _start = assigner.last_window_start(i64::MIN + 10);
1978
1979        // assign_windows must not panic
1980        let _windows = assigner.assign_windows(i64::MIN + 10);
1981
1982        // Also verify with slide_ms == size_ms (tumbling-like sliding)
1983        let assigner2 = SlidingWindowAssigner::from_millis(1000, 1000);
1984        let _start2 = assigner2.last_window_start(i64::MIN + 10);
1985        let _windows2 = assigner2.assign_windows(i64::MIN + 10);
1986
1987        // Verify slightly less extreme value still produces correct windows
1988        let assigner3 = SlidingWindowAssigner::from_millis(1000, 500);
1989        let windows3 = assigner3.assign_windows(-5000);
1990        assert!(
1991            !windows3.is_empty(),
1992            "Negative timestamp should still produce windows"
1993        );
1994        for w in &windows3 {
1995            assert!(
1996                w.start <= -5000 && w.start + 1000 > -5000,
1997                "Window [{}, {}) should contain timestamp -5000",
1998                w.start,
1999                w.end
2000            );
2001        }
2002    }
2003
2004    #[test]
2005    fn test_sliding_window_with_offset() {
2006        // 60s window, 30s slide, 15s offset
2007        let assigner = SlidingWindowAssigner::from_millis(60_000, 30_000).with_offset_ms(15_000);
2008
2009        // ts=45_000 → adjusted = 30_000
2010        // last_window_start: floor(30_000 / 30_000) * 30_000 = 30_000 + 15_000 = 45_000
2011        let windows = assigner.assign_windows(45_000);
2012        assert!(!windows.is_empty());
2013        // All windows should contain the timestamp
2014        for w in &windows {
2015            assert!(
2016                w.start <= 45_000 && w.end > 45_000,
2017                "Window [{}, {}) should contain 45000",
2018                w.start,
2019                w.end
2020            );
2021        }
2022        // With offset 15s, windows start at 15_000, 45_000, etc.
2023        // Windows containing 45_000: [45_000, 105_000) and [15_000, 75_000)
2024        assert_eq!(windows.len(), 2);
2025    }
2026}