1use super::window::{
50 Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
51 ResultToArrow, WindowAssigner, WindowCloseMetrics, WindowId, WindowIdVec,
52};
53use super::{
54 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec,
55 SideOutputData, Timer,
56};
57use crate::state::{StateStore, StateStoreExt};
58use arrow_array::{Int64Array, RecordBatch};
59use arrow_schema::{DataType, Field, Schema, SchemaRef};
60use rkyv::{
61 api::high::{HighDeserializer, HighSerializer, HighValidator},
62 bytecheck::CheckBytes,
63 rancor::Error as RkyvError,
64 ser::allocator::ArenaHandle,
65 util::AlignedVec,
66 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
67};
68use rustc_hash::FxHashMap;
69use smallvec::SmallVec;
70use std::marker::PhantomData;
71use std::sync::atomic::{AtomicU64, Ordering};
72use std::sync::Arc;
73use std::time::Duration;
74
75#[derive(Debug, Clone)]
107pub struct SlidingWindowAssigner {
108 size_ms: i64,
110 slide_ms: i64,
112 windows_per_event: usize,
114 offset_ms: i64,
116}
117
118impl SlidingWindowAssigner {
119 #[must_use]
133 pub fn new(size: Duration, slide: Duration) -> Self {
134 let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
136 let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
137
138 assert!(size_ms > 0, "Window size must be positive");
139 assert!(slide_ms > 0, "Slide interval must be positive");
140 assert!(
141 slide_ms <= size_ms,
142 "Slide must not exceed size (use tumbling windows for non-overlapping)"
143 );
144
145 let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
148 .expect("Windows per event should fit in usize");
149
150 Self {
151 size_ms,
152 slide_ms,
153 windows_per_event,
154 offset_ms: 0,
155 }
156 }
157
158 #[must_use]
164 #[allow(clippy::cast_sign_loss)]
165 pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
166 assert!(size_ms > 0, "Window size must be positive");
167 assert!(slide_ms > 0, "Slide interval must be positive");
168 assert!(
169 slide_ms <= size_ms,
170 "Slide must not exceed size (use tumbling windows for non-overlapping)"
171 );
172
173 let windows_per_event =
175 usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
176
177 Self {
178 size_ms,
179 slide_ms,
180 windows_per_event,
181 offset_ms: 0,
182 }
183 }
184
185 #[must_use]
187 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
188 self.offset_ms = offset_ms;
189 self
190 }
191
192 #[must_use]
194 pub fn size_ms(&self) -> i64 {
195 self.size_ms
196 }
197
198 #[must_use]
200 pub fn slide_ms(&self) -> i64 {
201 self.slide_ms
202 }
203
204 #[must_use]
206 pub fn windows_per_event(&self) -> usize {
207 self.windows_per_event
208 }
209
210 #[must_use]
212 pub fn offset_ms(&self) -> i64 {
213 self.offset_ms
214 }
215
216 #[inline]
221 fn last_window_start(&self, timestamp: i64) -> i64 {
222 let adjusted = timestamp - self.offset_ms;
223 let base = if adjusted >= 0 {
224 (adjusted / self.slide_ms) * self.slide_ms
225 } else {
226 (adjusted.saturating_sub(self.slide_ms).saturating_add(1) / self.slide_ms)
228 * self.slide_ms
229 };
230 base + self.offset_ms
231 }
232}
233
234impl WindowAssigner for SlidingWindowAssigner {
235 #[inline]
239 fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
240 let mut windows = WindowIdVec::new();
241
242 let last_start = self.last_window_start(timestamp);
244
245 let mut window_start = last_start;
247 while window_start + self.size_ms > timestamp {
248 let window_end = window_start + self.size_ms;
249 windows.push(WindowId::new(window_start, window_end));
250 let prev = window_start;
251 window_start = window_start.saturating_sub(self.slide_ms);
252 if window_start == prev {
253 break;
254 }
255 }
256
257 windows.reverse();
259 windows
260 }
261
262 fn max_timestamp(&self, window_end: i64) -> i64 {
265 window_end - 1
266 }
267}
268
269const WINDOW_STATE_PREFIX: &[u8; 4] = b"slw:";
271
272const WINDOW_STATE_KEY_SIZE: usize = 4 + 16;
274
275static SLIDING_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
277
278pub struct SlidingWindowOperator<A: Aggregator> {
310 assigner: SlidingWindowAssigner,
312 aggregator: A,
314 allowed_lateness_ms: i64,
316 registered_windows: rustc_hash::FxHashSet<WindowId>,
318 periodic_timer_windows: rustc_hash::FxHashSet<WindowId>,
320 emit_strategy: EmitStrategy,
322 late_data_config: LateDataConfig,
324 late_data_metrics: LateDataMetrics,
326 window_close_metrics: WindowCloseMetrics,
328 operator_id: String,
330 output_schema: SchemaRef,
332 last_emitted: FxHashMap<WindowId, Event>,
336 _phantom: PhantomData<A::Acc>,
338}
339
340impl<A: Aggregator> SlidingWindowOperator<A>
341where
342 A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
343 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
344 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
345{
346 pub fn new(assigner: SlidingWindowAssigner, aggregator: A, allowed_lateness: Duration) -> Self {
357 let operator_num = SLIDING_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
358 let output_schema = Arc::new(Schema::new(vec![
359 Field::new("window_start", DataType::Int64, false),
360 Field::new("window_end", DataType::Int64, false),
361 Field::new(
362 "result",
363 aggregator.output_data_type(),
364 aggregator.output_nullable(),
365 ),
366 ]));
367 Self {
368 assigner,
369 aggregator,
370 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
371 .expect("Allowed lateness must fit in i64"),
372 registered_windows: rustc_hash::FxHashSet::default(),
373 periodic_timer_windows: rustc_hash::FxHashSet::default(),
374 emit_strategy: EmitStrategy::default(),
375 late_data_config: LateDataConfig::default(),
376 late_data_metrics: LateDataMetrics::new(),
377 window_close_metrics: WindowCloseMetrics::new(),
378 operator_id: format!("sliding_window_{operator_num}"),
379 output_schema,
380 last_emitted: FxHashMap::default(),
381 _phantom: PhantomData,
382 }
383 }
384
385 pub fn with_id(
390 assigner: SlidingWindowAssigner,
391 aggregator: A,
392 allowed_lateness: Duration,
393 operator_id: String,
394 ) -> Self {
395 let output_schema = Arc::new(Schema::new(vec![
396 Field::new("window_start", DataType::Int64, false),
397 Field::new("window_end", DataType::Int64, false),
398 Field::new(
399 "result",
400 aggregator.output_data_type(),
401 aggregator.output_nullable(),
402 ),
403 ]));
404 Self {
405 assigner,
406 aggregator,
407 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
408 .expect("Allowed lateness must fit in i64"),
409 registered_windows: rustc_hash::FxHashSet::default(),
410 periodic_timer_windows: rustc_hash::FxHashSet::default(),
411 emit_strategy: EmitStrategy::default(),
412 late_data_config: LateDataConfig::default(),
413 late_data_metrics: LateDataMetrics::new(),
414 window_close_metrics: WindowCloseMetrics::new(),
415 operator_id,
416 output_schema,
417 last_emitted: FxHashMap::default(),
418 _phantom: PhantomData,
419 }
420 }
421
422 pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
424 self.emit_strategy = strategy;
425 }
426
427 #[must_use]
429 pub fn emit_strategy(&self) -> &EmitStrategy {
430 &self.emit_strategy
431 }
432
433 pub fn set_late_data_config(&mut self, config: LateDataConfig) {
435 self.late_data_config = config;
436 }
437
438 #[must_use]
440 pub fn late_data_config(&self) -> &LateDataConfig {
441 &self.late_data_config
442 }
443
444 #[must_use]
446 pub fn late_data_metrics(&self) -> &LateDataMetrics {
447 &self.late_data_metrics
448 }
449
450 pub fn reset_late_data_metrics(&mut self) {
452 self.late_data_metrics.reset();
453 }
454
455 #[must_use]
457 pub fn window_close_metrics(&self) -> &WindowCloseMetrics {
458 &self.window_close_metrics
459 }
460
461 pub fn reset_window_close_metrics(&mut self) {
463 self.window_close_metrics.reset();
464 }
465
466 #[must_use]
468 pub fn active_windows_count(&self) -> usize {
469 self.registered_windows.len()
470 }
471
472 #[must_use]
474 pub fn assigner(&self) -> &SlidingWindowAssigner {
475 &self.assigner
476 }
477
478 #[must_use]
480 pub fn allowed_lateness_ms(&self) -> i64 {
481 self.allowed_lateness_ms
482 }
483
484 #[inline]
486 fn state_key(window_id: &WindowId) -> [u8; WINDOW_STATE_KEY_SIZE] {
487 let mut key = [0u8; WINDOW_STATE_KEY_SIZE];
488 key[..4].copy_from_slice(WINDOW_STATE_PREFIX);
489 let window_key = window_id.to_key_inline();
490 key[4..20].copy_from_slice(&window_key);
491 key
492 }
493
494 fn get_accumulator(&self, window_id: &WindowId, state: &dyn StateStore) -> A::Acc {
496 let key = Self::state_key(window_id);
497 state
498 .get_typed::<A::Acc>(&key)
499 .ok()
500 .flatten()
501 .unwrap_or_else(|| self.aggregator.create_accumulator())
502 }
503
504 fn put_accumulator(
506 window_id: &WindowId,
507 acc: &A::Acc,
508 state: &mut dyn StateStore,
509 ) -> Result<(), OperatorError> {
510 let key = Self::state_key(window_id);
511 state
512 .put_typed(&key, acc)
513 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
514 }
515
516 fn delete_accumulator(
518 window_id: &WindowId,
519 state: &mut dyn StateStore,
520 ) -> Result<(), OperatorError> {
521 let key = Self::state_key(window_id);
522 state
523 .delete(&key)
524 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
525 }
526
527 fn is_late(&self, event_time: i64, watermark: i64) -> bool {
532 let windows = self.assigner.assign_windows(event_time);
534
535 windows.iter().all(|window_id| {
537 let cleanup_time = window_id.end + self.allowed_lateness_ms;
538 watermark >= cleanup_time
539 })
540 }
541
542 fn maybe_register_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
544 if !self.registered_windows.contains(&window_id) {
545 let trigger_time = window_id.end + self.allowed_lateness_ms;
546 ctx.timers.register_timer(
547 trigger_time,
548 Some(window_id.to_key()),
549 Some(ctx.operator_index),
550 );
551 self.registered_windows.insert(window_id);
552 }
553 }
554
555 fn maybe_register_periodic_timer(&mut self, window_id: WindowId, ctx: &mut OperatorContext) {
557 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
558 if !self.periodic_timer_windows.contains(&window_id) {
559 let interval_ms =
560 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
561 let trigger_time = ctx.processing_time + interval_ms;
562 let key = Self::periodic_timer_key(&window_id);
563 ctx.timers
564 .register_timer(trigger_time, Some(key), Some(ctx.operator_index));
565 self.periodic_timer_windows.insert(window_id);
566 }
567 }
568 }
569
570 #[inline]
572 fn periodic_timer_key(window_id: &WindowId) -> super::TimerKey {
573 let mut key = window_id.to_key();
574 if !key.is_empty() {
575 key[0] |= 0x80;
576 }
577 key
578 }
579
580 #[inline]
582 fn is_periodic_timer_key(key: &[u8]) -> bool {
583 !key.is_empty() && (key[0] & 0x80) != 0
584 }
585
586 #[inline]
588 fn window_id_from_periodic_key(key: &[u8]) -> Option<WindowId> {
589 if key.len() != 16 {
590 return None;
591 }
592 let mut clean_key = [0u8; 16];
593 clean_key.copy_from_slice(key);
594 clean_key[0] &= 0x7F;
595 WindowId::from_key(&clean_key)
596 }
597
598 fn create_intermediate_result(
600 &self,
601 window_id: &WindowId,
602 state: &dyn StateStore,
603 ) -> Option<Event> {
604 let acc = self.get_accumulator(window_id, state);
605
606 if acc.is_empty() {
607 return None;
608 }
609
610 let result = acc.result();
611 let result_array = result.to_arrow_array();
612
613 let batch = RecordBatch::try_new(
614 Arc::clone(&self.output_schema),
615 vec![
616 Arc::new(Int64Array::from(vec![window_id.start])),
617 Arc::new(Int64Array::from(vec![window_id.end])),
618 result_array,
619 ],
620 )
621 .ok()?;
622
623 Some(Event::new(window_id.end, batch))
624 }
625
626 fn handle_periodic_timer(
628 &mut self,
629 window_id: WindowId,
630 ctx: &mut OperatorContext,
631 ) -> OutputVec {
632 let mut output = OutputVec::new();
633
634 if !self.registered_windows.contains(&window_id) {
635 self.periodic_timer_windows.remove(&window_id);
636 return output;
637 }
638
639 if let Some(event) = self.create_intermediate_result(&window_id, ctx.state) {
640 output.push(Output::Event(event));
641 }
642
643 if let EmitStrategy::Periodic(interval) = &self.emit_strategy {
644 let interval_ms =
645 i64::try_from(interval.as_millis()).expect("Interval must fit in i64");
646 let next_trigger = ctx.processing_time + interval_ms;
647 let window_close_time = window_id.end + self.allowed_lateness_ms;
648 if next_trigger < window_close_time {
649 let key = Self::periodic_timer_key(&window_id);
650 ctx.timers
651 .register_timer(next_trigger, Some(key), Some(ctx.operator_index));
652 }
653 }
654
655 output
656 }
657}
658
659impl<A: Aggregator> Operator for SlidingWindowOperator<A>
660where
661 A::Acc: 'static
662 + Archive
663 + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
664 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
665 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
666{
667 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
668 let event_time = event.timestamp;
669
670 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
672
673 let current_wm = ctx.watermark_generator.current_watermark();
675 if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
676 let mut output = OutputVec::new();
677
678 if self.emit_strategy.drops_late_data() {
680 self.late_data_metrics.record_dropped();
681 return output; }
683
684 if let Some(side_output_name) = self.late_data_config.side_output() {
685 self.late_data_metrics.record_side_output();
686 output.push(Output::SideOutput(Box::new(SideOutputData {
687 name: Arc::from(side_output_name),
688 event: event.clone(),
689 })));
690 } else {
691 self.late_data_metrics.record_dropped();
692 output.push(Output::LateEvent(event.clone()));
693 }
694 return output;
695 }
696
697 let windows = self.assigner.assign_windows(event_time);
699
700 let mut updated_windows = SmallVec::<[WindowId; 4]>::new();
702
703 for window_id in &windows {
705 let cleanup_time = window_id.end + self.allowed_lateness_ms;
707 if current_wm > i64::MIN && current_wm >= cleanup_time {
708 continue;
709 }
710
711 let values = self.aggregator.extract_batch(event);
713 if !values.is_empty() {
714 let mut acc = self.get_accumulator(window_id, ctx.state);
715 for value in values {
716 acc.add(value);
717 }
718 if Self::put_accumulator(window_id, &acc, ctx.state).is_ok() {
719 updated_windows.push(*window_id);
720 }
721 }
722
723 self.maybe_register_timer(*window_id, ctx);
725
726 if !self.emit_strategy.suppresses_intermediate() {
728 self.maybe_register_periodic_timer(*window_id, ctx);
729 }
730 }
731
732 let mut output = OutputVec::new();
734
735 if let Some(wm) = emitted_watermark {
737 output.push(Output::Watermark(wm.timestamp()));
738 }
739
740 if !updated_windows.is_empty() {
742 match &self.emit_strategy {
743 EmitStrategy::OnUpdate => {
745 for window_id in &updated_windows {
746 if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
747 output.push(Output::Event(event));
748 }
749 }
750 }
751 EmitStrategy::Changelog => {
754 for window_id in &updated_windows {
755 if let Some(event) = self.create_intermediate_result(window_id, ctx.state) {
756 if let Some(old_event) = self.last_emitted.get(window_id) {
758 let retract =
759 ChangelogRecord::delete(old_event.clone(), ctx.processing_time);
760 output.push(Output::Changelog(retract));
761 }
762 let record =
764 ChangelogRecord::insert(event.clone(), ctx.processing_time);
765 output.push(Output::Changelog(record));
766 self.last_emitted.insert(*window_id, event);
767 if self.last_emitted.len().is_multiple_of(100_000) {
768 tracing::warn!(
769 operator = %self.operator_id,
770 tracked_windows = self.last_emitted.len(),
771 "last_emitted map is large — windows \
772 may not be closing. Check watermark \
773 advancement and window configuration."
774 );
775 }
776 }
777 }
778 }
779 EmitStrategy::OnWatermark
781 | EmitStrategy::Periodic(_)
782 | EmitStrategy::OnWindowClose
783 | EmitStrategy::Final => {}
784 }
785 }
786
787 output
788 }
789
790 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
791 if Self::is_periodic_timer_key(&timer.key) {
793 if self.emit_strategy.suppresses_intermediate() {
795 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
797 self.periodic_timer_windows.remove(&window_id);
798 }
799 return OutputVec::new();
800 }
801
802 if let Some(window_id) = Self::window_id_from_periodic_key(&timer.key) {
803 return self.handle_periodic_timer(window_id, ctx);
804 }
805 return OutputVec::new();
806 }
807
808 let Some(window_id) = WindowId::from_key(&timer.key) else {
810 return OutputVec::new();
811 };
812
813 let acc = self.get_accumulator(&window_id, ctx.state);
815
816 if acc.is_empty() {
818 let _ = Self::delete_accumulator(&window_id, ctx.state);
819 self.registered_windows.remove(&window_id);
820 self.periodic_timer_windows.remove(&window_id);
821 self.last_emitted.remove(&window_id);
822 return OutputVec::new();
823 }
824
825 let result = acc.result();
827
828 let _ = Self::delete_accumulator(&window_id, ctx.state);
830 self.registered_windows.remove(&window_id);
831 self.periodic_timer_windows.remove(&window_id);
832 let result_array = result.to_arrow_array();
836
837 let batch = RecordBatch::try_new(
839 Arc::clone(&self.output_schema),
840 vec![
841 Arc::new(Int64Array::from(vec![window_id.start])),
842 Arc::new(Int64Array::from(vec![window_id.end])),
843 result_array,
844 ],
845 );
846
847 let mut output = OutputVec::new();
848 match batch {
849 Ok(data) => {
850 let event = Event::new(window_id.end, data);
851
852 self.window_close_metrics
854 .record_close(window_id.end, ctx.processing_time);
855
856 match &self.emit_strategy {
858 EmitStrategy::Changelog => {
860 if let Some(old_event) = self.last_emitted.remove(&window_id) {
861 let retract = ChangelogRecord::delete(old_event, ctx.processing_time);
862 output.push(Output::Changelog(retract));
863 }
864 let record = ChangelogRecord::insert(event, ctx.processing_time);
865 output.push(Output::Changelog(record));
866 }
867 EmitStrategy::OnWatermark
869 | EmitStrategy::Periodic(_)
870 | EmitStrategy::OnUpdate
871 | EmitStrategy::OnWindowClose
872 | EmitStrategy::Final => {
873 output.push(Output::Event(event));
874 }
875 }
876 }
877 Err(e) => {
878 tracing::error!("Failed to create output batch: {e}");
879 }
880 }
881 output
882 }
883
884 fn checkpoint(&self) -> OperatorState {
885 let windows: Vec<_> = self.registered_windows.iter().copied().collect();
886 let periodic_windows: Vec<_> = self.periodic_timer_windows.iter().copied().collect();
887
888 let checkpoint_data = (windows, periodic_windows);
889 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
890 .map(|v| v.to_vec())
891 .unwrap_or_default();
892
893 OperatorState {
894 operator_id: self.operator_id.clone(),
895 data,
896 }
897 }
898
899 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
900 if state.operator_id != self.operator_id {
901 return Err(OperatorError::StateAccessFailed(format!(
902 "Operator ID mismatch: expected {}, got {}",
903 self.operator_id, state.operator_id
904 )));
905 }
906
907 if let Ok(archived) =
909 rkyv::access::<rkyv::Archived<(Vec<WindowId>, Vec<WindowId>)>, RkyvError>(&state.data)
910 {
911 if let Ok((windows, periodic_windows)) =
912 rkyv::deserialize::<(Vec<WindowId>, Vec<WindowId>), RkyvError>(archived)
913 {
914 self.registered_windows = windows.into_iter().collect();
915 self.periodic_timer_windows = periodic_windows.into_iter().collect();
916 return Ok(());
917 }
918 }
919
920 let archived = rkyv::access::<rkyv::Archived<Vec<WindowId>>, RkyvError>(&state.data)
922 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
923 let windows: Vec<WindowId> = rkyv::deserialize::<Vec<WindowId>, RkyvError>(archived)
924 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
925
926 self.registered_windows = windows.into_iter().collect();
927 self.periodic_timer_windows = rustc_hash::FxHashSet::default();
928 Ok(())
929 }
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use crate::operator::window::{CountAccumulator, CountAggregator, SumAggregator};
936 use crate::state::InMemoryStore;
937 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
938 use arrow_array::{Int64Array, RecordBatch};
939 use arrow_schema::{DataType, Field, Schema};
940
941 fn create_test_event(timestamp: i64, value: i64) -> Event {
942 let schema = Arc::new(Schema::new(vec![Field::new(
943 "value",
944 DataType::Int64,
945 false,
946 )]));
947 let batch =
948 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
949 Event::new(timestamp, batch)
950 }
951
952 fn create_test_context<'a>(
953 timers: &'a mut TimerService,
954 state: &'a mut dyn StateStore,
955 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
956 ) -> OperatorContext<'a> {
957 OperatorContext {
958 event_time: 0,
959 processing_time: 0,
960 timers,
961 state,
962 watermark_generator: watermark_gen,
963 operator_index: 0,
964 }
965 }
966
967 #[test]
968 fn test_sliding_assigner_creation() {
969 let assigner = SlidingWindowAssigner::new(Duration::from_secs(60), Duration::from_secs(20));
970
971 assert_eq!(assigner.size_ms(), 60_000);
972 assert_eq!(assigner.slide_ms(), 20_000);
973 assert_eq!(assigner.windows_per_event(), 3); }
975
976 #[test]
977 fn test_sliding_assigner_from_millis() {
978 let assigner = SlidingWindowAssigner::from_millis(1000, 200);
979
980 assert_eq!(assigner.size_ms(), 1000);
981 assert_eq!(assigner.slide_ms(), 200);
982 assert_eq!(assigner.windows_per_event(), 5); }
984
985 #[test]
986 #[should_panic(expected = "Window size must be positive")]
987 fn test_sliding_assigner_zero_size() {
988 let _ = SlidingWindowAssigner::from_millis(0, 100);
989 }
990
991 #[test]
992 #[should_panic(expected = "Slide interval must be positive")]
993 fn test_sliding_assigner_zero_slide() {
994 let _ = SlidingWindowAssigner::from_millis(1000, 0);
995 }
996
997 #[test]
998 #[should_panic(expected = "Slide must not exceed size")]
999 fn test_sliding_assigner_slide_exceeds_size() {
1000 let _ = SlidingWindowAssigner::from_millis(100, 200);
1001 }
1002
1003 #[test]
1004 fn test_sliding_assigner_basic_assignment() {
1005 let assigner = SlidingWindowAssigner::from_millis(60_000, 20_000);
1007
1008 let windows = assigner.assign_windows(50_000);
1010
1011 assert_eq!(windows.len(), 3);
1012
1013 assert!(windows.contains(&WindowId::new(0, 60_000)));
1023 assert!(windows.contains(&WindowId::new(20_000, 80_000)));
1024 assert!(windows.contains(&WindowId::new(40_000, 100_000)));
1025 }
1026
1027 #[test]
1028 fn test_sliding_assigner_boundary_event() {
1029 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1030
1031 let windows = assigner.assign_windows(1000);
1033
1034 assert_eq!(windows.len(), 2);
1039 assert!(windows.contains(&WindowId::new(500, 1500)));
1040 assert!(windows.contains(&WindowId::new(1000, 2000)));
1041 }
1042
1043 #[test]
1044 fn test_sliding_assigner_negative_timestamp() {
1045 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1046
1047 let windows = assigner.assign_windows(-500);
1049
1050 assert_eq!(windows.len(), 2);
1056 assert!(windows.contains(&WindowId::new(-1000, 0)));
1057 assert!(windows.contains(&WindowId::new(-500, 500)));
1058 }
1059
1060 #[test]
1061 fn test_sliding_assigner_equal_size_and_slide() {
1062 let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
1064
1065 assert_eq!(assigner.windows_per_event(), 1);
1066
1067 let windows = assigner.assign_windows(500);
1068 assert_eq!(windows.len(), 1);
1069 assert_eq!(windows[0], WindowId::new(0, 1000));
1070 }
1071
1072 #[test]
1073 fn test_sliding_assigner_small_slide() {
1074 let assigner = SlidingWindowAssigner::from_millis(1000, 100);
1076
1077 assert_eq!(assigner.windows_per_event(), 10);
1078
1079 let windows = assigner.assign_windows(500);
1080 assert_eq!(windows.len(), 10);
1081 }
1082
1083 #[test]
1084 fn test_sliding_operator_creation() {
1085 let assigner = SlidingWindowAssigner::from_millis(1000, 200);
1086 let aggregator = CountAggregator::new();
1087 let operator = SlidingWindowOperator::new(assigner, aggregator, Duration::from_millis(100));
1088
1089 assert_eq!(operator.allowed_lateness_ms(), 100);
1090 assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
1091 assert!(operator.late_data_config().should_drop());
1092 }
1093
1094 #[test]
1095 fn test_sliding_operator_with_id() {
1096 let assigner = SlidingWindowAssigner::from_millis(1000, 200);
1097 let aggregator = CountAggregator::new();
1098 let operator = SlidingWindowOperator::with_id(
1099 assigner,
1100 aggregator,
1101 Duration::from_millis(0),
1102 "test_sliding".to_string(),
1103 );
1104
1105 assert_eq!(operator.operator_id, "test_sliding");
1106 }
1107
1108 #[test]
1109 fn test_sliding_operator_process_single_event() {
1110 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1111 let aggregator = CountAggregator::new();
1112 let mut operator = SlidingWindowOperator::with_id(
1113 assigner,
1114 aggregator,
1115 Duration::from_millis(0),
1116 "test_op".to_string(),
1117 );
1118
1119 let mut timers = TimerService::new();
1120 let mut state = InMemoryStore::new();
1121 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1122
1123 let event = create_test_event(600, 1);
1126 {
1127 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1128 operator.process(&event, &mut ctx);
1129 }
1130
1131 assert_eq!(operator.registered_windows.len(), 2);
1133 assert!(operator
1134 .registered_windows
1135 .contains(&WindowId::new(0, 1000)));
1136 assert!(operator
1137 .registered_windows
1138 .contains(&WindowId::new(500, 1500)));
1139 }
1140
1141 #[test]
1142 fn test_sliding_operator_accumulates_correctly() {
1143 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1144 let aggregator = CountAggregator::new();
1145 let mut operator = SlidingWindowOperator::with_id(
1146 assigner.clone(),
1147 aggregator,
1148 Duration::from_millis(0),
1149 "test_op".to_string(),
1150 );
1151
1152 let mut timers = TimerService::new();
1153 let mut state = InMemoryStore::new();
1154 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1155
1156 for ts in [100, 600, 800] {
1161 let event = create_test_event(ts, 1);
1162 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1163 operator.process(&event, &mut ctx);
1164 }
1165
1166 let window_0_1000 = WindowId::new(0, 1000);
1168 let acc: CountAccumulator = operator.get_accumulator(&window_0_1000, &state);
1169 assert_eq!(acc.result(), 3);
1170
1171 let window_500_1500 = WindowId::new(500, 1500);
1173 let acc: CountAccumulator = operator.get_accumulator(&window_500_1500, &state);
1174 assert_eq!(acc.result(), 2);
1175 }
1176
1177 #[test]
1178 fn test_sliding_operator_window_trigger() {
1179 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1180 let aggregator = CountAggregator::new();
1181 let mut operator = SlidingWindowOperator::with_id(
1182 assigner,
1183 aggregator,
1184 Duration::from_millis(0),
1185 "test_op".to_string(),
1186 );
1187
1188 let mut timers = TimerService::new();
1189 let mut state = InMemoryStore::new();
1190 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1191
1192 for ts in [100, 200, 300] {
1194 let event = create_test_event(ts, 1);
1195 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1196 operator.process(&event, &mut ctx);
1197 }
1198
1199 let timer = Timer {
1201 key: WindowId::new(0, 1000).to_key(),
1202 timestamp: 1000,
1203 };
1204
1205 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1206 let outputs = operator.on_timer(timer, &mut ctx);
1207
1208 assert_eq!(outputs.len(), 1);
1209 match &outputs[0] {
1210 Output::Event(event) => {
1211 assert_eq!(event.timestamp, 1000);
1212 let result_col = event.data.column(2);
1213 let result_array = result_col.as_any().downcast_ref::<Int64Array>().unwrap();
1214 assert_eq!(result_array.value(0), 3);
1215 }
1216 _ => panic!("Expected Event output"),
1217 }
1218
1219 assert!(!operator
1221 .registered_windows
1222 .contains(&WindowId::new(0, 1000)));
1223 }
1224
1225 #[test]
1226 fn test_sliding_operator_multiple_window_triggers() {
1227 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1228 let aggregator = SumAggregator::new(0);
1229 let mut operator = SlidingWindowOperator::with_id(
1230 assigner,
1231 aggregator,
1232 Duration::from_millis(0),
1233 "test_op".to_string(),
1234 );
1235
1236 let mut timers = TimerService::new();
1237 let mut state = InMemoryStore::new();
1238 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1239
1240 let event = create_test_event(600, 10);
1243 {
1244 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1245 operator.process(&event, &mut ctx);
1246 }
1247
1248 let t1 = Timer {
1250 key: WindowId::new(0, 1000).to_key(),
1251 timestamp: 1000,
1252 };
1253 let outputs1 = {
1254 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1255 operator.on_timer(t1, &mut ctx)
1256 };
1257
1258 assert_eq!(outputs1.len(), 1);
1259 if let Output::Event(e) = &outputs1[0] {
1260 let result = e
1261 .data
1262 .column(2)
1263 .as_any()
1264 .downcast_ref::<Int64Array>()
1265 .unwrap()
1266 .value(0);
1267 assert_eq!(result, 10);
1268 }
1269
1270 let t2 = Timer {
1272 key: WindowId::new(500, 1500).to_key(),
1273 timestamp: 1500,
1274 };
1275 let outputs2 = {
1276 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1277 operator.on_timer(t2, &mut ctx)
1278 };
1279
1280 assert_eq!(outputs2.len(), 1);
1281 if let Output::Event(e) = &outputs2[0] {
1282 let result = e
1283 .data
1284 .column(2)
1285 .as_any()
1286 .downcast_ref::<Int64Array>()
1287 .unwrap()
1288 .value(0);
1289 assert_eq!(result, 10);
1290 }
1291
1292 assert!(operator.registered_windows.is_empty());
1294 }
1295
1296 #[test]
1297 fn test_sliding_operator_late_event() {
1298 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1299 let aggregator = CountAggregator::new();
1300 let mut operator = SlidingWindowOperator::with_id(
1301 assigner,
1302 aggregator,
1303 Duration::from_millis(0),
1304 "test_op".to_string(),
1305 );
1306
1307 let mut timers = TimerService::new();
1308 let mut state = InMemoryStore::new();
1309 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1310
1311 let event1 = create_test_event(2000, 1);
1313 {
1314 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1315 operator.process(&event1, &mut ctx);
1316 }
1317
1318 let late_event = create_test_event(500, 2);
1320 let outputs = {
1321 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1322 operator.process(&late_event, &mut ctx)
1323 };
1324
1325 let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1327 assert!(is_late);
1328 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1329 }
1330
1331 #[test]
1332 fn test_sliding_operator_late_event_side_output() {
1333 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1334 let aggregator = CountAggregator::new();
1335 let mut operator = SlidingWindowOperator::with_id(
1336 assigner,
1337 aggregator,
1338 Duration::from_millis(0),
1339 "test_op".to_string(),
1340 );
1341
1342 operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1343
1344 let mut timers = TimerService::new();
1345 let mut state = InMemoryStore::new();
1346 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1347
1348 let event1 = create_test_event(2000, 1);
1350 {
1351 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1352 operator.process(&event1, &mut ctx);
1353 }
1354
1355 let late_event = create_test_event(500, 2);
1357 let outputs = {
1358 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1359 operator.process(&late_event, &mut ctx)
1360 };
1361
1362 let side_output = outputs.iter().find_map(|o| {
1364 if let Output::SideOutput(data) = o {
1365 Some(data.name.clone())
1366 } else {
1367 None
1368 }
1369 });
1370 assert_eq!(side_output.as_deref(), Some("late"));
1371 assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1372 }
1373
1374 #[test]
1375 fn test_sliding_operator_emit_on_update() {
1376 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1377 let aggregator = CountAggregator::new();
1378 let mut operator = SlidingWindowOperator::with_id(
1379 assigner,
1380 aggregator,
1381 Duration::from_millis(0),
1382 "test_op".to_string(),
1383 );
1384
1385 operator.set_emit_strategy(EmitStrategy::OnUpdate);
1386
1387 let mut timers = TimerService::new();
1388 let mut state = InMemoryStore::new();
1389 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1390
1391 let event = create_test_event(600, 1);
1393 let outputs = {
1394 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1395 operator.process(&event, &mut ctx)
1396 };
1397
1398 let event_count = outputs
1400 .iter()
1401 .filter(|o| matches!(o, Output::Event(_)))
1402 .count();
1403 assert_eq!(event_count, 2);
1404 }
1405
1406 #[test]
1407 fn test_sliding_operator_checkpoint_restore() {
1408 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1409 let aggregator = CountAggregator::new();
1410 let mut operator = SlidingWindowOperator::with_id(
1411 assigner.clone(),
1412 aggregator.clone(),
1413 Duration::from_millis(0),
1414 "test_op".to_string(),
1415 );
1416
1417 operator.registered_windows.insert(WindowId::new(0, 1000));
1419 operator.registered_windows.insert(WindowId::new(500, 1500));
1420 operator
1421 .periodic_timer_windows
1422 .insert(WindowId::new(0, 1000));
1423
1424 let checkpoint = operator.checkpoint();
1426
1427 let mut restored = SlidingWindowOperator::with_id(
1429 assigner,
1430 aggregator,
1431 Duration::from_millis(0),
1432 "test_op".to_string(),
1433 );
1434 restored.restore(checkpoint).unwrap();
1435
1436 assert_eq!(restored.registered_windows.len(), 2);
1437 assert_eq!(restored.periodic_timer_windows.len(), 1);
1438 assert!(restored
1439 .registered_windows
1440 .contains(&WindowId::new(0, 1000)));
1441 assert!(restored
1442 .registered_windows
1443 .contains(&WindowId::new(500, 1500)));
1444 assert!(restored
1445 .periodic_timer_windows
1446 .contains(&WindowId::new(0, 1000)));
1447 }
1448
1449 #[test]
1450 fn test_sliding_operator_empty_window_trigger() {
1451 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1452 let aggregator = CountAggregator::new();
1453 let mut operator = SlidingWindowOperator::with_id(
1454 assigner,
1455 aggregator,
1456 Duration::from_millis(0),
1457 "test_op".to_string(),
1458 );
1459
1460 let mut timers = TimerService::new();
1461 let mut state = InMemoryStore::new();
1462 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1463
1464 let timer = Timer {
1466 key: WindowId::new(0, 1000).to_key(),
1467 timestamp: 1000,
1468 };
1469
1470 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1471 let outputs = operator.on_timer(timer, &mut ctx);
1472
1473 assert!(outputs.is_empty());
1475 }
1476
1477 #[test]
1478 fn test_sliding_operator_periodic_timer_key() {
1479 let window_id = WindowId::new(1000, 2000);
1480
1481 let periodic_key = SlidingWindowOperator::<CountAggregator>::periodic_timer_key(&window_id);
1482 assert!(SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(&periodic_key));
1483
1484 let extracted =
1485 SlidingWindowOperator::<CountAggregator>::window_id_from_periodic_key(&periodic_key);
1486 assert_eq!(extracted, Some(window_id));
1487
1488 let regular_key = window_id.to_key();
1490 assert!(!SlidingWindowOperator::<CountAggregator>::is_periodic_timer_key(®ular_key));
1491 }
1492
1493 #[test]
1494 fn test_sliding_operator_skips_closed_windows() {
1495 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1496 let aggregator = CountAggregator::new();
1497 let mut operator = SlidingWindowOperator::with_id(
1498 assigner,
1499 aggregator,
1500 Duration::from_millis(0),
1501 "test_op".to_string(),
1502 );
1503
1504 let mut timers = TimerService::new();
1505 let mut state = InMemoryStore::new();
1506 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1507
1508 let event1 = create_test_event(1100, 1);
1510 {
1511 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1512 operator.process(&event1, &mut ctx);
1513 }
1514
1515 let event2 = create_test_event(800, 1);
1518 {
1519 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1520 operator.process(&event2, &mut ctx);
1521 }
1522
1523 assert!(!operator
1525 .registered_windows
1526 .contains(&WindowId::new(0, 1000)));
1527 assert!(operator
1528 .registered_windows
1529 .contains(&WindowId::new(500, 1500)));
1530 }
1531
1532 #[test]
1533 fn test_sliding_assigner_window_assigner_trait() {
1534 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1535
1536 let windows = assigner.assign_windows(600);
1538 assert_eq!(windows.len(), 2);
1539
1540 assert_eq!(assigner.max_timestamp(1000), 999);
1542 }
1543
1544 #[test]
1545 fn test_sliding_operator_allowed_lateness() {
1546 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1547 let aggregator = CountAggregator::new();
1548 let mut operator = SlidingWindowOperator::with_id(
1549 assigner,
1550 aggregator,
1551 Duration::from_millis(500), "test_op".to_string(),
1553 );
1554
1555 let mut timers = TimerService::new();
1556 let mut state = InMemoryStore::new();
1557 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1558
1559 let event1 = create_test_event(1200, 1);
1561 {
1562 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1563 operator.process(&event1, &mut ctx);
1564 }
1565
1566 let event2 = create_test_event(800, 1);
1569 let outputs = {
1570 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1571 operator.process(&event2, &mut ctx)
1572 };
1573
1574 let is_late = outputs
1576 .iter()
1577 .any(|o| matches!(o, Output::LateEvent(_) | Output::SideOutput(_)));
1578 assert!(!is_late);
1579 assert_eq!(operator.late_data_metrics().late_events_total(), 0);
1580 }
1581
1582 #[test]
1587 fn test_eowc_sliding_multiple_windows_per_event() {
1588 let assigner = SlidingWindowAssigner::from_millis(60_000, 20_000);
1591 let aggregator = CountAggregator::new();
1592 let mut operator = SlidingWindowOperator::with_id(
1593 assigner,
1594 aggregator,
1595 Duration::from_millis(0),
1596 "eowc_slide".to_string(),
1597 );
1598 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
1599
1600 let mut timers = TimerService::new();
1601 let mut state = InMemoryStore::new();
1602 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1603
1604 let event = create_test_event(50_000, 1);
1607 let outputs = {
1608 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1609 operator.process(&event, &mut ctx)
1610 };
1611
1612 let event_outputs: Vec<_> = outputs
1614 .iter()
1615 .filter(|o| matches!(o, Output::Event(_)))
1616 .collect();
1617 assert!(
1618 event_outputs.is_empty(),
1619 "EOWC sliding should not emit intermediate results"
1620 );
1621
1622 let windows = [
1624 WindowId::new(0, 60_000),
1625 WindowId::new(20_000, 80_000),
1626 WindowId::new(40_000, 100_000),
1627 ];
1628
1629 let mut emission_count = 0;
1630 for wid in &windows {
1631 let timer = Timer {
1632 key: wid.to_key(),
1633 timestamp: wid.end,
1634 };
1635 let out = {
1636 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1637 operator.on_timer(timer, &mut ctx)
1638 };
1639 assert_eq!(
1640 out.len(),
1641 1,
1642 "Each window should emit exactly once (window [{}, {}))",
1643 wid.start,
1644 wid.end
1645 );
1646 if let Output::Event(e) = &out[0] {
1647 let result = e
1648 .data
1649 .column(2)
1650 .as_any()
1651 .downcast_ref::<Int64Array>()
1652 .unwrap();
1653 assert_eq!(result.value(0), 1, "Each window should have count=1");
1654 }
1655 emission_count += 1;
1656 }
1657 assert_eq!(
1658 emission_count, 3,
1659 "Should have 3 separate emissions for 3 overlapping windows"
1660 );
1661 }
1662
1663 #[test]
1664 fn test_eowc_sliding_no_intermediate_emissions() {
1665 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1667 let aggregator = CountAggregator::new();
1668 let mut operator = SlidingWindowOperator::with_id(
1669 assigner,
1670 aggregator,
1671 Duration::from_millis(0),
1672 "eowc_slide".to_string(),
1673 );
1674 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
1675
1676 let mut timers = TimerService::new();
1677 let mut state = InMemoryStore::new();
1678 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1679
1680 for ts in (0..10).map(|i| i * 200) {
1682 let event = create_test_event(ts, 1);
1683 let outputs = {
1684 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1685 operator.process(&event, &mut ctx)
1686 };
1687 for output in &outputs {
1688 assert!(
1689 !matches!(output, Output::Event(_)),
1690 "process() must not emit Output::Event with OnWindowClose (ts={ts})"
1691 );
1692 }
1693 }
1694 }
1695
1696 #[test]
1697 fn test_eowc_sliding_overlapping_window_close_order() {
1698 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1700 let aggregator = SumAggregator::new(0);
1701 let mut operator = SlidingWindowOperator::with_id(
1702 assigner,
1703 aggregator,
1704 Duration::from_millis(0),
1705 "eowc_slide".to_string(),
1706 );
1707 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
1708
1709 let mut timers = TimerService::new();
1710 let mut state = InMemoryStore::new();
1711 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1712
1713 let event = create_test_event(600, 10);
1715 {
1716 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1717 operator.process(&event, &mut ctx);
1718 }
1719
1720 let win_timer_1 = Timer {
1722 key: WindowId::new(0, 1000).to_key(),
1723 timestamp: 1000,
1724 };
1725 let out1 = {
1726 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1727 operator.on_timer(win_timer_1, &mut ctx)
1728 };
1729 assert_eq!(out1.len(), 1);
1730 if let Output::Event(e) = &out1[0] {
1731 assert_eq!(e.timestamp, 1000, "First emission at window_end=1000");
1732 }
1733
1734 let win_timer_2 = Timer {
1736 key: WindowId::new(500, 1500).to_key(),
1737 timestamp: 1500,
1738 };
1739 let out2 = {
1740 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1741 operator.on_timer(win_timer_2, &mut ctx)
1742 };
1743 assert_eq!(out2.len(), 1);
1744 if let Output::Event(e) = &out2[0] {
1745 assert_eq!(e.timestamp, 1500, "Second emission at window_end=1500");
1746 let result = e
1747 .data
1748 .column(2)
1749 .as_any()
1750 .downcast_ref::<Int64Array>()
1751 .unwrap();
1752 assert_eq!(
1753 result.value(0),
1754 10,
1755 "Both windows should contain the event sum=10"
1756 );
1757 }
1758
1759 assert!(operator.registered_windows.is_empty());
1761 }
1762
1763 #[test]
1768 fn test_sliding_changelog_retraction_on_update() {
1769 let assigner = SlidingWindowAssigner::from_millis(1000, 1000); let aggregator = CountAggregator::new();
1772 let mut operator = SlidingWindowOperator::with_id(
1773 assigner,
1774 aggregator,
1775 Duration::from_millis(0),
1776 "changelog_op".to_string(),
1777 );
1778 operator.set_emit_strategy(EmitStrategy::Changelog);
1779
1780 let mut timers = TimerService::new();
1781 let mut state = InMemoryStore::new();
1782 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1783
1784 let event1 = create_test_event(100, 1);
1786 let out1 = {
1787 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1788 operator.process(&event1, &mut ctx)
1789 };
1790 let changelog1: Vec<_> = out1
1791 .iter()
1792 .filter(|o| matches!(o, Output::Changelog(_)))
1793 .collect();
1794 assert_eq!(
1795 changelog1.len(),
1796 1,
1797 "first event: only insert, no retraction"
1798 );
1799 if let Output::Changelog(rec) = &changelog1[0] {
1800 assert_eq!(rec.weight, 1, "first event should be insert (+1)");
1801 }
1802
1803 let event2 = create_test_event(200, 1);
1805 let out2 = {
1806 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1807 operator.process(&event2, &mut ctx)
1808 };
1809 let changelog2: Vec<_> = out2
1810 .iter()
1811 .filter(|o| matches!(o, Output::Changelog(_)))
1812 .collect();
1813 assert_eq!(
1814 changelog2.len(),
1815 2,
1816 "second event: retraction + insert = 2 changelog records"
1817 );
1818 if let Output::Changelog(retract) = &changelog2[0] {
1819 assert_eq!(retract.weight, -1, "first record should be retraction (-1)");
1820 }
1821 if let Output::Changelog(insert) = &changelog2[1] {
1822 assert_eq!(insert.weight, 1, "second record should be insert (+1)");
1823 }
1824 }
1825
1826 #[test]
1827 fn test_sliding_changelog_retraction_on_close() {
1828 let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
1830 let aggregator = CountAggregator::new();
1831 let mut operator = SlidingWindowOperator::with_id(
1832 assigner,
1833 aggregator,
1834 Duration::from_millis(0),
1835 "changelog_op".to_string(),
1836 );
1837 operator.set_emit_strategy(EmitStrategy::Changelog);
1838
1839 let mut timers = TimerService::new();
1840 let mut state = InMemoryStore::new();
1841 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1842
1843 let event = create_test_event(100, 1);
1845 {
1846 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1847 operator.process(&event, &mut ctx);
1848 }
1849
1850 let timer = Timer {
1852 key: WindowId::new(0, 1000).to_key(),
1853 timestamp: 1000,
1854 };
1855 let out = {
1856 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1857 operator.on_timer(timer, &mut ctx)
1858 };
1859
1860 let changelog: Vec<_> = out
1861 .iter()
1862 .filter(|o| matches!(o, Output::Changelog(_)))
1863 .collect();
1864 assert_eq!(
1865 changelog.len(),
1866 2,
1867 "window close: retraction + final insert"
1868 );
1869 if let Output::Changelog(retract) = &changelog[0] {
1870 assert_eq!(retract.weight, -1, "first should be retraction");
1871 }
1872 if let Output::Changelog(insert) = &changelog[1] {
1873 assert_eq!(insert.weight, 1, "second should be final insert");
1874 }
1875
1876 assert!(operator.last_emitted.is_empty());
1878 }
1879
1880 #[test]
1881 fn test_sliding_changelog_z_set_balance() {
1882 let assigner = SlidingWindowAssigner::from_millis(1000, 1000);
1884 let aggregator = CountAggregator::new();
1885 let mut operator = SlidingWindowOperator::with_id(
1886 assigner,
1887 aggregator,
1888 Duration::from_millis(0),
1889 "changelog_op".to_string(),
1890 );
1891 operator.set_emit_strategy(EmitStrategy::Changelog);
1892
1893 let mut timers = TimerService::new();
1894 let mut state = InMemoryStore::new();
1895 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1896
1897 let mut total_weight: i32 = 0;
1898
1899 for ts in [100, 200, 300] {
1901 let event = create_test_event(ts, 1);
1902 let out = {
1903 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1904 operator.process(&event, &mut ctx)
1905 };
1906 for output in &out {
1907 if let Output::Changelog(rec) = output {
1908 total_weight += rec.weight;
1909 }
1910 }
1911 }
1912
1913 let timer = Timer {
1915 key: WindowId::new(0, 1000).to_key(),
1916 timestamp: 1000,
1917 };
1918 let out = {
1919 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1920 operator.on_timer(timer, &mut ctx)
1921 };
1922 for output in &out {
1923 if let Output::Changelog(rec) = output {
1924 total_weight += rec.weight;
1925 }
1926 }
1927
1928 assert_eq!(
1929 total_weight, 1,
1930 "Z-set balance: net weight should be +1 (final result only)"
1931 );
1932 }
1933
1934 #[test]
1935 fn test_sliding_non_changelog_unaffected() {
1936 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1938 let aggregator = CountAggregator::new();
1939 let mut operator = SlidingWindowOperator::with_id(
1940 assigner,
1941 aggregator,
1942 Duration::from_millis(0),
1943 "on_update_op".to_string(),
1944 );
1945 operator.set_emit_strategy(EmitStrategy::OnUpdate);
1946
1947 let mut timers = TimerService::new();
1948 let mut state = InMemoryStore::new();
1949 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1950
1951 for ts in [100, 200] {
1953 let event = create_test_event(ts, 1);
1954 let out = {
1955 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1956 operator.process(&event, &mut ctx)
1957 };
1958 for output in &out {
1960 assert!(
1961 !matches!(output, Output::Changelog(_)),
1962 "OnUpdate should not produce Changelog outputs"
1963 );
1964 }
1965 }
1966 assert!(operator.last_emitted.is_empty());
1968 }
1969
1970 #[test]
1971 fn test_negative_near_min_no_overflow() {
1972 let assigner = SlidingWindowAssigner::from_millis(1000, 500);
1975
1976 let _start = assigner.last_window_start(i64::MIN + 10);
1978
1979 let _windows = assigner.assign_windows(i64::MIN + 10);
1981
1982 let assigner2 = SlidingWindowAssigner::from_millis(1000, 1000);
1984 let _start2 = assigner2.last_window_start(i64::MIN + 10);
1985 let _windows2 = assigner2.assign_windows(i64::MIN + 10);
1986
1987 let assigner3 = SlidingWindowAssigner::from_millis(1000, 500);
1989 let windows3 = assigner3.assign_windows(-5000);
1990 assert!(
1991 !windows3.is_empty(),
1992 "Negative timestamp should still produce windows"
1993 );
1994 for w in &windows3 {
1995 assert!(
1996 w.start <= -5000 && w.start + 1000 > -5000,
1997 "Window [{}, {}) should contain timestamp -5000",
1998 w.start,
1999 w.end
2000 );
2001 }
2002 }
2003
2004 #[test]
2005 fn test_sliding_window_with_offset() {
2006 let assigner = SlidingWindowAssigner::from_millis(60_000, 30_000).with_offset_ms(15_000);
2008
2009 let windows = assigner.assign_windows(45_000);
2012 assert!(!windows.is_empty());
2013 for w in &windows {
2015 assert!(
2016 w.start <= 45_000 && w.end > 45_000,
2017 "Window [{}, {}) should contain 45000",
2018 w.start,
2019 w.end
2020 );
2021 }
2022 assert_eq!(windows.len(), 2);
2025 }
2026}