1use super::window::{
43 Accumulator, Aggregator, ChangelogRecord, EmitStrategy, LateDataConfig, LateDataMetrics,
44 ResultToArrow, WindowCloseMetrics, WindowId,
45};
46use super::{
47 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec,
48 SideOutputData, Timer,
49};
50use crate::state::{StateStore, StateStoreExt};
51use arrow_array::{Array, Int64Array, RecordBatch};
52use arrow_schema::{DataType, Field, Schema, SchemaRef};
53use rkyv::{
54 api::high::{HighDeserializer, HighSerializer, HighValidator},
55 bytecheck::CheckBytes,
56 rancor::Error as RkyvError,
57 ser::allocator::ArenaHandle,
58 util::AlignedVec,
59 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
60};
61use rustc_hash::FxHashMap;
62use smallvec::SmallVec;
63use std::marker::PhantomData;
64use std::sync::atomic::{AtomicU64, Ordering};
65use std::sync::Arc;
66use std::time::Duration;
67
68const SESSION_INDEX_PREFIX: &[u8; 4] = b"six:";
70
71const SESSION_ACC_PREFIX: &[u8; 4] = b"sac:";
73
74const SESSION_TIMER_PREFIX: u8 = 0x01;
76
77const DEFAULT_MAX_CACHED_INDICES: usize = 16_384;
80
81static SESSION_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Archive, RkyvSerialize, RkyvDeserialize)]
89pub struct SessionId(u64);
90
91impl SessionId {
92 pub fn generate(operator_id: &str, counter: &AtomicU64) -> Self {
97 let op_hash = {
98 use std::hash::{Hash, Hasher};
99 let mut hasher = rustc_hash::FxHasher::default();
100 operator_id.as_bytes().hash(&mut hasher);
101 hasher.finish()
102 };
103 let seq = counter.fetch_add(1, Ordering::Relaxed);
104 Self(op_hash ^ seq)
105 }
106
107 #[must_use]
109 pub fn to_bytes(self) -> [u8; 8] {
110 self.0.to_be_bytes()
111 }
112
113 #[must_use]
117 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
118 let arr: [u8; 8] = bytes.try_into().ok()?;
119 Some(Self(u64::from_be_bytes(arr)))
120 }
121
122 #[must_use]
124 pub fn as_u64(self) -> u64 {
125 self.0
126 }
127}
128
129#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
134pub struct SessionMetadata {
135 pub id: SessionId,
137 pub start: i64,
139 pub end: i64,
141 pub emitted: bool,
143}
144
145impl SessionMetadata {
146 fn new(id: SessionId, timestamp: i64, gap_ms: i64) -> Self {
150 Self {
151 id,
152 start: timestamp,
153 end: timestamp + gap_ms,
154 emitted: false,
155 }
156 }
157
158 #[must_use]
160 pub fn window_id(&self) -> WindowId {
161 WindowId::new(self.start, self.end)
162 }
163
164 fn overlaps(&self, timestamp: i64, gap_ms: i64) -> bool {
169 let event_start = timestamp;
170 let event_end = timestamp + gap_ms;
171 event_start < self.end && self.start < event_end
173 }
174
175 fn extend(&mut self, timestamp: i64, gap_ms: i64) {
180 self.start = self.start.min(timestamp);
181 self.end = self.end.max(timestamp + gap_ms);
182 }
183
184 fn merge(&mut self, other: &SessionMetadata) {
189 self.start = self.start.min(other.start);
190 self.end = self.end.max(other.end);
191 }
192}
193
194#[derive(Debug, Clone, Default, Archive, RkyvSerialize, RkyvDeserialize)]
199pub struct SessionIndex {
200 pub sessions: Vec<SessionMetadata>,
202}
203
204impl SessionIndex {
205 fn insert(&mut self, session: SessionMetadata) {
207 let pos = self
208 .sessions
209 .binary_search_by_key(&session.start, |s| s.start)
210 .unwrap_or_else(|pos| pos);
211 self.sessions.insert(pos, session);
212 }
213
214 fn remove(&mut self, id: SessionId) -> Option<SessionMetadata> {
216 if let Some(pos) = self.sessions.iter().position(|s| s.id == id) {
217 Some(self.sessions.remove(pos))
218 } else {
219 None
220 }
221 }
222
223 fn find_overlapping(&self, timestamp: i64, gap_ms: i64) -> SmallVec<[SessionId; 4]> {
225 self.sessions
226 .iter()
227 .filter(|s| s.overlaps(timestamp, gap_ms))
228 .map(|s| s.id)
229 .collect()
230 }
231
232 fn get(&self, id: SessionId) -> Option<&SessionMetadata> {
234 self.sessions.iter().find(|s| s.id == id)
235 }
236
237 fn get_mut(&mut self, id: SessionId) -> Option<&mut SessionMetadata> {
239 self.sessions.iter_mut().find(|s| s.id == id)
240 }
241
242 #[must_use]
244 fn len(&self) -> usize {
245 self.sessions.len()
246 }
247
248 #[must_use]
250 fn is_empty(&self) -> bool {
251 self.sessions.is_empty()
252 }
253}
254
255pub struct SessionWindowOperator<A: Aggregator> {
283 gap_ms: i64,
285 aggregator: A,
287 allowed_lateness_ms: i64,
289 session_id_counter: AtomicU64,
291 session_indices: FxHashMap<u64, SessionIndex>,
297 max_cached_indices: usize,
299 pending_timers: FxHashMap<u64, (i64, u64)>,
301 emit_strategy: EmitStrategy,
303 late_data_config: LateDataConfig,
305 late_data_metrics: LateDataMetrics,
307 window_close_metrics: WindowCloseMetrics,
309 operator_id: String,
311 output_schema: SchemaRef,
313 key_column: Option<usize>,
315 needs_timer_reregistration: bool,
317 _phantom: PhantomData<A::Acc>,
319}
320
321impl<A: Aggregator> SessionWindowOperator<A>
322where
323 A::Acc: Archive + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
324 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
325 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
326{
327 pub fn new(gap: Duration, aggregator: A, allowed_lateness: Duration) -> Self {
339 let operator_num = SESSION_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
340 let output_schema = Arc::new(Schema::new(vec![
341 Field::new("window_start", DataType::Int64, false),
342 Field::new("window_end", DataType::Int64, false),
343 Field::new(
344 "result",
345 aggregator.output_data_type(),
346 aggregator.output_nullable(),
347 ),
348 ]));
349 Self {
350 gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
351 aggregator,
352 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
353 .expect("Allowed lateness must fit in i64"),
354 session_id_counter: AtomicU64::new(0),
355 session_indices: FxHashMap::default(),
356 max_cached_indices: DEFAULT_MAX_CACHED_INDICES,
357 pending_timers: FxHashMap::default(),
358 emit_strategy: EmitStrategy::default(),
359 late_data_config: LateDataConfig::default(),
360 late_data_metrics: LateDataMetrics::new(),
361 window_close_metrics: WindowCloseMetrics::new(),
362 operator_id: format!("session_window_{operator_num}"),
363 output_schema,
364 key_column: None,
365 needs_timer_reregistration: false,
366 _phantom: PhantomData,
367 }
368 }
369
370 pub fn with_id(
376 gap: Duration,
377 aggregator: A,
378 allowed_lateness: Duration,
379 operator_id: String,
380 ) -> Self {
381 let output_schema = Arc::new(Schema::new(vec![
382 Field::new("window_start", DataType::Int64, false),
383 Field::new("window_end", DataType::Int64, false),
384 Field::new(
385 "result",
386 aggregator.output_data_type(),
387 aggregator.output_nullable(),
388 ),
389 ]));
390 Self {
391 gap_ms: i64::try_from(gap.as_millis()).expect("Gap must fit in i64"),
392 aggregator,
393 allowed_lateness_ms: i64::try_from(allowed_lateness.as_millis())
394 .expect("Allowed lateness must fit in i64"),
395 session_id_counter: AtomicU64::new(0),
396 session_indices: FxHashMap::default(),
397 max_cached_indices: DEFAULT_MAX_CACHED_INDICES,
398 pending_timers: FxHashMap::default(),
399 emit_strategy: EmitStrategy::default(),
400 late_data_config: LateDataConfig::default(),
401 late_data_metrics: LateDataMetrics::new(),
402 window_close_metrics: WindowCloseMetrics::new(),
403 operator_id,
404 output_schema,
405 key_column: None,
406 needs_timer_reregistration: false,
407 _phantom: PhantomData,
408 }
409 }
410
411 pub fn set_max_cached_indices(&mut self, max: usize) {
416 self.max_cached_indices = max;
417 }
418
419 pub fn set_key_column(&mut self, column_index: usize) {
423 self.key_column = Some(column_index);
424 }
425
426 #[must_use]
428 pub fn key_column(&self) -> Option<usize> {
429 self.key_column
430 }
431
432 pub fn set_emit_strategy(&mut self, strategy: EmitStrategy) {
434 self.emit_strategy = strategy;
435 }
436
437 #[must_use]
439 pub fn emit_strategy(&self) -> &EmitStrategy {
440 &self.emit_strategy
441 }
442
443 pub fn set_late_data_config(&mut self, config: LateDataConfig) {
445 self.late_data_config = config;
446 }
447
448 #[must_use]
450 pub fn late_data_config(&self) -> &LateDataConfig {
451 &self.late_data_config
452 }
453
454 #[must_use]
456 pub fn late_data_metrics(&self) -> &LateDataMetrics {
457 &self.late_data_metrics
458 }
459
460 pub fn reset_late_data_metrics(&mut self) {
462 self.late_data_metrics.reset();
463 }
464
465 #[must_use]
467 pub fn window_close_metrics(&self) -> &WindowCloseMetrics {
468 &self.window_close_metrics
469 }
470
471 pub fn reset_window_close_metrics(&mut self) {
473 self.window_close_metrics.reset();
474 }
475
476 #[must_use]
478 pub fn gap_ms(&self) -> i64 {
479 self.gap_ms
480 }
481
482 #[must_use]
484 pub fn allowed_lateness_ms(&self) -> i64 {
485 self.allowed_lateness_ms
486 }
487
488 #[must_use]
490 pub fn active_session_count(&self) -> usize {
491 self.session_indices.values().map(SessionIndex::len).sum()
492 }
493
494 fn extract_key(&self, event: &Event) -> SmallVec<[u8; 16]> {
496 use arrow_array::cast::AsArray;
497 use arrow_array::types::Int64Type;
498
499 if let Some(col_idx) = self.key_column {
500 if col_idx < event.data.num_columns() {
501 let column = event.data.column(col_idx);
502 if let Some(array) = column.as_primitive_opt::<Int64Type>() {
503 if !array.is_empty() && !array.is_null(0) {
504 return SmallVec::from_slice(&array.value(0).to_be_bytes());
505 }
506 }
507 if let Some(array) = column.as_string_opt::<i32>() {
509 if !array.is_empty() && !array.is_null(0) {
510 return SmallVec::from_slice(array.value(0).as_bytes());
511 }
512 }
513 }
514 }
515 SmallVec::new()
517 }
518
519 fn key_hash(key: &[u8]) -> u64 {
521 use std::hash::{Hash, Hasher};
522 let mut hasher = rustc_hash::FxHasher::default();
523 key.hash(&mut hasher);
524 hasher.finish()
525 }
526
527 fn session_index_key(key_hash: u64) -> [u8; 12] {
529 let mut key = [0u8; 12];
530 key[..4].copy_from_slice(SESSION_INDEX_PREFIX);
531 key[4..12].copy_from_slice(&key_hash.to_be_bytes());
532 key
533 }
534
535 fn session_acc_key(session_id: SessionId) -> [u8; 12] {
537 let mut key = [0u8; 12];
538 key[..4].copy_from_slice(SESSION_ACC_PREFIX);
539 key[4..12].copy_from_slice(&session_id.to_bytes());
540 key
541 }
542
543 fn timer_key(session_id: SessionId) -> super::TimerKey {
545 let mut key = super::TimerKey::new();
546 key.push(SESSION_TIMER_PREFIX);
547 key.extend_from_slice(&session_id.to_bytes());
548 key
549 }
550
551 fn session_id_from_timer(timer_key: &[u8]) -> Option<SessionId> {
553 if timer_key.len() != 9 || timer_key[0] != SESSION_TIMER_PREFIX {
554 return None;
555 }
556 SessionId::from_bytes(&timer_key[1..9])
557 }
558
559 fn load_session_index(&mut self, key_hash: u64, state: &dyn StateStore) -> SessionIndex {
566 if let Some(index) = self.session_indices.get(&key_hash) {
568 return index.clone();
569 }
570
571 if self.session_indices.len() >= self.max_cached_indices {
574 self.session_indices.clear();
575 }
576
577 let state_key = Self::session_index_key(key_hash);
579 if let Ok(Some(index)) = state.get_typed::<SessionIndex>(&state_key) {
580 self.session_indices.insert(key_hash, index.clone());
581 return index;
582 }
583
584 SessionIndex::default()
585 }
586
587 fn store_session_index(
589 &mut self,
590 key_hash: u64,
591 index: &SessionIndex,
592 state: &mut dyn StateStore,
593 ) -> Result<(), OperatorError> {
594 let state_key = Self::session_index_key(key_hash);
595 if index.is_empty() {
596 state
597 .delete(&state_key)
598 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
599 self.session_indices.remove(&key_hash);
600 } else {
601 state
602 .put_typed(&state_key, index)
603 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))?;
604 self.session_indices.insert(key_hash, index.clone());
605 }
606 Ok(())
607 }
608
609 fn load_accumulator(&self, session_id: SessionId, state: &dyn StateStore) -> A::Acc {
611 let acc_key = Self::session_acc_key(session_id);
612 state
613 .get_typed::<A::Acc>(&acc_key)
614 .ok()
615 .flatten()
616 .unwrap_or_else(|| self.aggregator.create_accumulator())
617 }
618
619 fn store_accumulator(
621 session_id: SessionId,
622 acc: &A::Acc,
623 state: &mut dyn StateStore,
624 ) -> Result<(), OperatorError> {
625 let acc_key = Self::session_acc_key(session_id);
626 state
627 .put_typed(&acc_key, acc)
628 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
629 }
630
631 fn delete_accumulator(
633 session_id: SessionId,
634 state: &mut dyn StateStore,
635 ) -> Result<(), OperatorError> {
636 let acc_key = Self::session_acc_key(session_id);
637 state
638 .delete(&acc_key)
639 .map_err(|e| OperatorError::StateAccessFailed(e.to_string()))
640 }
641
642 fn register_timer(
644 &mut self,
645 session_id: SessionId,
646 key_hash: u64,
647 session: &SessionMetadata,
648 ctx: &mut OperatorContext,
649 ) {
650 let trigger_time = session.end + self.allowed_lateness_ms;
651
652 if let Some(&(old_time, _)) = self.pending_timers.get(&session_id.as_u64()) {
654 if old_time == trigger_time {
655 return; }
657 }
659
660 let timer_key = Self::timer_key(session_id);
661 ctx.timers
662 .register_timer(trigger_time, Some(timer_key), Some(ctx.operator_index));
663 self.pending_timers
664 .insert(session_id.as_u64(), (trigger_time, key_hash));
665 }
666
667 fn is_late(&self, timestamp: i64, watermark: i64) -> bool {
669 let potential_cleanup = timestamp + self.gap_ms + self.allowed_lateness_ms;
672 watermark >= potential_cleanup
673 }
674
675 fn create_output(&self, session: &SessionMetadata, acc: &A::Acc) -> Option<Event> {
677 if acc.is_empty() {
678 return None;
679 }
680
681 let result = acc.result();
682 let result_array = result.to_arrow_array();
683
684 let batch = RecordBatch::try_new(
685 Arc::clone(&self.output_schema),
686 vec![
687 Arc::new(Int64Array::from(vec![session.start])),
688 Arc::new(Int64Array::from(vec![session.end])),
689 result_array,
690 ],
691 )
692 .ok()?;
693
694 Some(Event::new(session.end, batch))
695 }
696
697 fn merge_sessions(
708 &mut self,
709 index: &mut SessionIndex,
710 overlapping: &[SessionId],
711 ctx: &mut OperatorContext,
712 output: &mut OutputVec,
713 ) -> SessionId {
714 let winner_id = overlapping[0];
715
716 let mut winner_acc = self.load_accumulator(winner_id, ctx.state);
718
719 if matches!(self.emit_strategy, EmitStrategy::Changelog) {
722 if let Some(winner_meta) = index.get(winner_id) {
723 if winner_meta.emitted {
724 if let Some(old_evt) = self.create_output(winner_meta, &winner_acc) {
725 output.push(Output::Changelog(ChangelogRecord::delete(
726 old_evt,
727 ctx.processing_time,
728 )));
729 }
730 }
731 }
732 }
733
734 for &loser_id in &overlapping[1..] {
736 let loser_acc = self.load_accumulator(loser_id, ctx.state);
737
738 if matches!(self.emit_strategy, EmitStrategy::Changelog) {
740 if let Some(loser_meta) = index.get(loser_id) {
741 if loser_meta.emitted {
742 if let Some(old_evt) = self.create_output(loser_meta, &loser_acc) {
743 output.push(Output::Changelog(ChangelogRecord::delete(
744 old_evt,
745 ctx.processing_time,
746 )));
747 }
748 }
749 }
750 }
751
752 winner_acc.merge(&loser_acc);
754
755 if let Some(loser_meta) = index.get(loser_id).cloned() {
757 if let Some(winner_meta) = index.get_mut(winner_id) {
758 winner_meta.merge(&loser_meta);
759 }
760 }
761
762 let _ = Self::delete_accumulator(loser_id, ctx.state);
764 self.pending_timers.remove(&loser_id.as_u64());
765 index.remove(loser_id);
766 }
767
768 let _ = Self::store_accumulator(winner_id, &winner_acc, ctx.state);
770
771 winner_id
772 }
773}
774
775impl<A: Aggregator> Operator for SessionWindowOperator<A>
776where
777 A::Acc: 'static
778 + Archive
779 + for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
780 <A::Acc as Archive>::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
781 + RkyvDeserialize<A::Acc, HighDeserializer<RkyvError>>,
782{
783 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
784 if self.needs_timer_reregistration {
786 self.needs_timer_reregistration = false;
787 for (&sid_raw, &(trigger_time, _key_hash)) in &self.pending_timers {
788 let timer_key = Self::timer_key(SessionId(sid_raw));
789 ctx.timers
790 .register_timer(trigger_time, Some(timer_key), Some(ctx.operator_index));
791 }
792 }
793
794 let event_time = event.timestamp;
795 let mut output = OutputVec::new();
796
797 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
799
800 let current_wm = ctx.watermark_generator.current_watermark();
802 if current_wm > i64::MIN && self.is_late(event_time, current_wm) {
803 if self.emit_strategy.drops_late_data() {
805 self.late_data_metrics.record_dropped();
806 return output;
807 }
808
809 if let Some(side_output_name) = self.late_data_config.side_output() {
810 self.late_data_metrics.record_side_output();
811 output.push(Output::SideOutput(Box::new(SideOutputData {
812 name: Arc::from(side_output_name),
813 event: event.clone(),
814 })));
815 } else {
816 self.late_data_metrics.record_dropped();
817 output.push(Output::LateEvent(event.clone()));
818 }
819 return output;
820 }
821
822 let key = self.extract_key(event);
824 let key_hash = Self::key_hash(&key);
825
826 let mut index = self.load_session_index(key_hash, ctx.state);
828 let overlapping = index.find_overlapping(event_time, self.gap_ms);
829
830 let session_id;
831 match overlapping.len() {
832 0 => {
833 let id = SessionId::generate(&self.operator_id, &self.session_id_counter);
835 let session = SessionMetadata::new(id, event_time, self.gap_ms);
836 index.insert(session);
837 session_id = id;
838 }
839 1 => {
840 session_id = overlapping[0];
842 if let Some(session) = index.get_mut(session_id) {
843 session.extend(event_time, self.gap_ms);
844 }
845 }
846 _ => {
847 session_id = self.merge_sessions(&mut index, &overlapping, ctx, &mut output);
849 if let Some(session) = index.get_mut(session_id) {
851 session.extend(event_time, self.gap_ms);
852 }
853 }
854 }
855
856 let mut acc = self.load_accumulator(session_id, ctx.state);
858 let values = self.aggregator.extract_batch(event);
859 for value in values {
860 acc.add(value);
861 }
862
863 let _ = Self::store_accumulator(session_id, &acc, ctx.state);
865
866 if let Some(session) = index.get(session_id) {
868 self.register_timer(session_id, key_hash, session, ctx);
869
870 match &self.emit_strategy {
872 EmitStrategy::OnUpdate => {
873 if let Some(evt) = self.create_output(session, &acc) {
874 output.push(Output::Event(evt));
875 }
876 if let Some(s) = index.get_mut(session_id) {
877 s.emitted = true;
878 }
879 }
880 EmitStrategy::Changelog => {
881 if let Some(evt) = self.create_output(session, &acc) {
882 let record = ChangelogRecord::insert(evt, ctx.processing_time);
883 output.push(Output::Changelog(record));
884 }
885 if let Some(s) = index.get_mut(session_id) {
886 s.emitted = true;
887 }
888 }
889 EmitStrategy::OnWatermark
891 | EmitStrategy::Periodic(_)
892 | EmitStrategy::OnWindowClose
893 | EmitStrategy::Final => {}
894 }
895 }
896
897 let _ = self.store_session_index(key_hash, &index, ctx.state);
899
900 if let Some(wm) = emitted_watermark {
902 output.push(Output::Watermark(wm.timestamp()));
903 }
904
905 output
906 }
907
908 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
909 let mut output = OutputVec::new();
910
911 let Some(session_id) = Self::session_id_from_timer(&timer.key) else {
913 return output;
914 };
915
916 let Some(&(expected_time, key_hash)) = self.pending_timers.get(&session_id.as_u64()) else {
918 return output; };
920
921 if expected_time != timer.timestamp {
922 return output; }
924
925 let mut index = self.load_session_index(key_hash, ctx.state);
927 let Some(session) = index.get(session_id).cloned() else {
928 self.pending_timers.remove(&session_id.as_u64());
929 return output;
930 };
931
932 let acc = self.load_accumulator(session_id, ctx.state);
934 if let Some(event) = self.create_output(&session, &acc) {
935 self.window_close_metrics
937 .record_close(session.end, ctx.processing_time);
938
939 match &self.emit_strategy {
940 EmitStrategy::Changelog => {
941 let record = ChangelogRecord::insert(event, ctx.processing_time);
942 output.push(Output::Changelog(record));
943 }
944 _ => {
945 output.push(Output::Event(event));
946 }
947 }
948 }
949
950 index.remove(session_id);
952 let _ = Self::delete_accumulator(session_id, ctx.state);
953 let _ = self.store_session_index(key_hash, &index, ctx.state);
954 self.pending_timers.remove(&session_id.as_u64());
955
956 output
957 }
958
959 fn checkpoint(&self) -> OperatorState {
960 let timer_entries: Vec<(u64, i64, u64)> = self
962 .pending_timers
963 .iter()
964 .map(|(&sid, &(time, kh))| (sid, time, kh))
965 .collect();
966 let counter_val = self.session_id_counter.load(Ordering::Relaxed);
967
968 let checkpoint_data: (Vec<(u64, i64, u64)>, u64) = (timer_entries, counter_val);
970
971 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
972 .map(|v| v.to_vec())
973 .unwrap_or_default();
974
975 OperatorState {
976 operator_id: self.operator_id.clone(),
977 data,
978 }
979 }
980
981 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
982 if state.operator_id != self.operator_id {
983 return Err(OperatorError::StateAccessFailed(format!(
984 "Operator ID mismatch: expected {}, got {}",
985 self.operator_id, state.operator_id
986 )));
987 }
988
989 if state.data.is_empty() {
990 return Ok(());
991 }
992
993 let archived =
994 rkyv::access::<rkyv::Archived<(Vec<(u64, i64, u64)>, u64)>, RkyvError>(&state.data)
995 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
996 let (timers, counter_val) =
997 rkyv::deserialize::<(Vec<(u64, i64, u64)>, u64), RkyvError>(archived)
998 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
999
1000 self.pending_timers = timers
1001 .into_iter()
1002 .map(|(sid, time, kh)| (sid, (time, kh)))
1003 .collect();
1004 self.session_id_counter
1005 .store(counter_val, Ordering::Relaxed);
1006 self.needs_timer_reregistration = !self.pending_timers.is_empty();
1007 Ok(())
1010 }
1011}
1012
1013#[derive(Debug, Clone, Default)]
1015pub struct SessionMetrics {
1016 pub sessions_created: u64,
1018 pub sessions_closed: u64,
1020 pub sessions_merged: u64,
1022 pub active_sessions: u64,
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use super::*;
1029 use crate::operator::window::{CountAggregator, SumAggregator};
1030 use crate::state::InMemoryStore;
1031 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
1032 use arrow_array::{Int64Array, RecordBatch};
1033 use arrow_schema::{DataType, Field, Schema};
1034
1035 fn create_test_event(timestamp: i64, value: i64) -> Event {
1036 let schema = Arc::new(Schema::new(vec![Field::new(
1037 "value",
1038 DataType::Int64,
1039 false,
1040 )]));
1041 let batch =
1042 RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![value]))]).unwrap();
1043 Event::new(timestamp, batch)
1044 }
1045
1046 fn create_keyed_event(timestamp: i64, key: i64, value: i64) -> Event {
1047 let schema = Arc::new(Schema::new(vec![
1048 Field::new("key", DataType::Int64, false),
1049 Field::new("value", DataType::Int64, false),
1050 ]));
1051 let batch = RecordBatch::try_new(
1052 schema,
1053 vec![
1054 Arc::new(Int64Array::from(vec![key])),
1055 Arc::new(Int64Array::from(vec![value])),
1056 ],
1057 )
1058 .unwrap();
1059 Event::new(timestamp, batch)
1060 }
1061
1062 fn create_test_context<'a>(
1063 timers: &'a mut TimerService,
1064 state: &'a mut dyn StateStore,
1065 watermark_gen: &'a mut dyn crate::time::WatermarkGenerator,
1066 ) -> OperatorContext<'a> {
1067 OperatorContext {
1068 event_time: 0,
1069 processing_time: 0,
1070 timers,
1071 state,
1072 watermark_generator: watermark_gen,
1073 operator_index: 0,
1074 }
1075 }
1076
1077 fn first_pending_timer(op: &SessionWindowOperator<impl Aggregator>) -> (SessionId, i64, u64) {
1079 let (&sid_raw, &(time, kh)) = op.pending_timers.iter().next().expect("no pending timer");
1080 (SessionId(sid_raw), time, kh)
1081 }
1082
1083 #[test]
1088 fn test_session_id_generation() {
1089 let counter = AtomicU64::new(0);
1090 let id1 = SessionId::generate("op_a", &counter);
1091 let id2 = SessionId::generate("op_a", &counter);
1092 let id3 = SessionId::generate("op_b", &counter);
1093
1094 assert_ne!(id1, id2);
1096 assert_ne!(id1, id3);
1098 }
1099
1100 #[test]
1101 fn test_session_id_bytes_roundtrip() {
1102 let original = SessionId(0x1234_5678_9ABC_DEF0);
1103 let bytes = original.to_bytes();
1104 let restored = SessionId::from_bytes(&bytes);
1105
1106 assert_eq!(restored, Some(original));
1107 assert_eq!(SessionId::from_bytes(&[0u8; 4]), None); }
1109
1110 #[test]
1115 fn test_session_metadata_creation() {
1116 let id = SessionId(42);
1117 let meta = SessionMetadata::new(id, 1000, 5000);
1118
1119 assert_eq!(meta.id, id);
1120 assert_eq!(meta.start, 1000);
1121 assert_eq!(meta.end, 6000); assert!(!meta.emitted);
1123 }
1124
1125 #[test]
1126 fn test_session_metadata_overlaps() {
1127 let id = SessionId(1);
1128 let meta = SessionMetadata::new(id, 1000, 5000); assert!(meta.overlaps(1000, 5000)); assert!(meta.overlaps(3000, 5000)); assert!(meta.overlaps(5999, 5000)); assert!(meta.overlaps(500, 5000)); assert!(meta.overlaps(-1000, 5000));
1138
1139 assert!(!meta.overlaps(-5001, 5000));
1141
1142 assert!(!meta.overlaps(6000, 5000));
1144 }
1145
1146 #[test]
1147 fn test_session_metadata_extend() {
1148 let id = SessionId(1);
1149 let mut meta = SessionMetadata::new(id, 1000, 5000); meta.extend(8000, 5000);
1153 assert_eq!(meta.start, 1000);
1154 assert_eq!(meta.end, 13000); meta.extend(500, 5000);
1158 assert_eq!(meta.start, 500);
1159 assert_eq!(meta.end, 13000); }
1161
1162 #[test]
1163 fn test_session_metadata_merge() {
1164 let mut s1 = SessionMetadata::new(SessionId(1), 1000, 5000); let s2 = SessionMetadata::new(SessionId(2), 8000, 5000); s1.merge(&s2);
1168 assert_eq!(s1.start, 1000);
1169 assert_eq!(s1.end, 13000);
1170 }
1171
1172 #[test]
1173 fn test_session_metadata_window_id() {
1174 let meta = SessionMetadata::new(SessionId(1), 1000, 5000);
1175 let wid = meta.window_id();
1176 assert_eq!(wid.start, 1000);
1177 assert_eq!(wid.end, 6000);
1178 }
1179
1180 #[test]
1185 fn test_session_index_sorted_insert() {
1186 let mut idx = SessionIndex::default();
1187
1188 let s3 = SessionMetadata::new(SessionId(3), 3000, 1000);
1189 let s1 = SessionMetadata::new(SessionId(1), 1000, 1000);
1190 let s2 = SessionMetadata::new(SessionId(2), 2000, 1000);
1191
1192 idx.insert(s3);
1193 idx.insert(s1);
1194 idx.insert(s2);
1195
1196 assert_eq!(idx.len(), 3);
1197 assert_eq!(idx.sessions[0].start, 1000);
1198 assert_eq!(idx.sessions[1].start, 2000);
1199 assert_eq!(idx.sessions[2].start, 3000);
1200 }
1201
1202 #[test]
1203 fn test_session_index_find_overlapping() {
1204 let mut idx = SessionIndex::default();
1205
1206 idx.insert(SessionMetadata::new(SessionId(1), 100, 500));
1208 idx.insert(SessionMetadata::new(SessionId(2), 2000, 500));
1209
1210 let hits = idx.find_overlapping(300, 500);
1212 assert_eq!(hits.as_slice(), &[SessionId(1)]);
1213
1214 let hits = idx.find_overlapping(1500, 500);
1216 assert!(hits.is_empty());
1217
1218 let hits = idx.find_overlapping(1800, 500);
1220 assert_eq!(hits.as_slice(), &[SessionId(2)]);
1221 }
1222
1223 #[test]
1224 fn test_session_index_remove() {
1225 let mut idx = SessionIndex::default();
1226 idx.insert(SessionMetadata::new(SessionId(1), 100, 500));
1227 idx.insert(SessionMetadata::new(SessionId(2), 2000, 500));
1228
1229 assert_eq!(idx.len(), 2);
1230
1231 let removed = idx.remove(SessionId(1));
1232 assert!(removed.is_some());
1233 assert_eq!(removed.unwrap().start, 100);
1234 assert_eq!(idx.len(), 1);
1235
1236 assert!(idx.remove(SessionId(99)).is_none());
1238 }
1239
1240 #[test]
1241 fn test_session_index_get_and_get_mut() {
1242 let mut idx = SessionIndex::default();
1243 idx.insert(SessionMetadata::new(SessionId(1), 100, 500));
1244
1245 assert!(idx.get(SessionId(1)).is_some());
1246 assert!(idx.get(SessionId(99)).is_none());
1247
1248 idx.get_mut(SessionId(1)).unwrap().emitted = true;
1249 assert!(idx.get(SessionId(1)).unwrap().emitted);
1250 }
1251
1252 #[test]
1257 fn test_session_operator_creation() {
1258 let aggregator = CountAggregator::new();
1259 let operator = SessionWindowOperator::new(
1260 Duration::from_secs(30),
1261 aggregator,
1262 Duration::from_secs(60),
1263 );
1264
1265 assert_eq!(operator.gap_ms(), 30_000);
1266 assert_eq!(operator.allowed_lateness_ms(), 60_000);
1267 assert_eq!(operator.active_session_count(), 0);
1268 assert_eq!(*operator.emit_strategy(), EmitStrategy::OnWatermark);
1269 }
1270
1271 #[test]
1272 fn test_session_operator_with_id() {
1273 let aggregator = CountAggregator::new();
1274 let operator = SessionWindowOperator::with_id(
1275 Duration::from_secs(30),
1276 aggregator,
1277 Duration::from_secs(0),
1278 "test_session".to_string(),
1279 );
1280
1281 assert_eq!(operator.operator_id, "test_session");
1282 }
1283
1284 #[test]
1289 fn test_timer_key_roundtrip() {
1290 let sid = SessionId(0x1234_5678_9ABC_DEF0);
1291 let timer_key = SessionWindowOperator::<CountAggregator>::timer_key(sid);
1292 let parsed = SessionWindowOperator::<CountAggregator>::session_id_from_timer(&timer_key);
1293 assert_eq!(parsed, Some(sid));
1294 }
1295
1296 #[test]
1297 fn test_timer_key_invalid() {
1298 let invalid1 = vec![0x02, 0, 0, 0, 0, 0, 0, 0, 0];
1300 assert!(
1301 SessionWindowOperator::<CountAggregator>::session_id_from_timer(&invalid1).is_none()
1302 );
1303
1304 let invalid2 = vec![SESSION_TIMER_PREFIX, 0, 0, 0];
1306 assert!(
1307 SessionWindowOperator::<CountAggregator>::session_id_from_timer(&invalid2).is_none()
1308 );
1309 }
1310
1311 #[test]
1316 fn test_session_single_event() {
1317 let aggregator = CountAggregator::new();
1318 let mut operator = SessionWindowOperator::with_id(
1319 Duration::from_millis(1000),
1320 aggregator,
1321 Duration::from_millis(0),
1322 "test_op".to_string(),
1323 );
1324
1325 let mut timers = TimerService::new();
1326 let mut state = InMemoryStore::new();
1327 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1328
1329 let event = create_test_event(500, 1);
1330 {
1331 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1332 operator.process(&event, &mut ctx);
1333 }
1334
1335 assert_eq!(operator.active_session_count(), 1);
1336 assert_eq!(operator.pending_timers.len(), 1);
1337 }
1338
1339 #[test]
1344 fn test_session_multiple_events_same_session() {
1345 let aggregator = CountAggregator::new();
1346 let mut operator = SessionWindowOperator::with_id(
1347 Duration::from_millis(1000),
1348 aggregator,
1349 Duration::from_millis(0),
1350 "test_op".to_string(),
1351 );
1352
1353 let mut timers = TimerService::new();
1354 let mut state = InMemoryStore::new();
1355 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1356
1357 for ts in [100, 500, 900, 1500] {
1359 let event = create_test_event(ts, 1);
1360 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1361 operator.process(&event, &mut ctx);
1362 }
1363
1364 assert_eq!(operator.active_session_count(), 1);
1366
1367 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1369 let index = operator.load_session_index(key_hash, &state);
1370 assert_eq!(index.len(), 1);
1371 let sid = index.sessions[0].id;
1372 let acc = operator.load_accumulator(sid, &state);
1373 assert_eq!(acc.result(), 4);
1374 }
1375
1376 #[test]
1381 fn test_session_gap_creates_new_session() {
1382 let aggregator = CountAggregator::new();
1383 let mut operator = SessionWindowOperator::with_id(
1384 Duration::from_millis(1000),
1385 aggregator,
1386 Duration::from_millis(0),
1387 "test_op".to_string(),
1388 );
1389 operator.set_emit_strategy(EmitStrategy::OnUpdate);
1390
1391 let mut timers = TimerService::new();
1392 let mut state = InMemoryStore::new();
1393 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1394
1395 let event1 = create_test_event(100, 1);
1397 {
1398 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1399 operator.process(&event1, &mut ctx);
1400 }
1401
1402 let event2 = create_test_event(3000, 1);
1404 let outputs = {
1405 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1406 operator.process(&event2, &mut ctx)
1407 };
1408
1409 assert_eq!(operator.active_session_count(), 2);
1411
1412 let event_count = outputs
1414 .iter()
1415 .filter(|o| matches!(o, Output::Event(_)))
1416 .count();
1417 assert_eq!(event_count, 1);
1418 }
1419
1420 #[test]
1425 fn test_session_timer_triggers_emission() {
1426 let aggregator = CountAggregator::new();
1427 let mut operator = SessionWindowOperator::with_id(
1428 Duration::from_millis(1000),
1429 aggregator,
1430 Duration::from_millis(0),
1431 "test_op".to_string(),
1432 );
1433
1434 let mut timers = TimerService::new();
1435 let mut state = InMemoryStore::new();
1436 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1437
1438 let event = create_test_event(500, 1);
1440 {
1441 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1442 operator.process(&event, &mut ctx);
1443 }
1444
1445 let (sid, timer_time, _kh) = first_pending_timer(&operator);
1447
1448 let timer = Timer {
1450 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
1451 timestamp: timer_time,
1452 };
1453
1454 let outputs = {
1455 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1456 operator.on_timer(timer, &mut ctx)
1457 };
1458
1459 assert_eq!(outputs.len(), 1);
1460 match &outputs[0] {
1461 Output::Event(e) => {
1462 assert_eq!(e.timestamp, 1500); let result = e
1464 .data
1465 .column(2)
1466 .as_any()
1467 .downcast_ref::<Int64Array>()
1468 .unwrap()
1469 .value(0);
1470 assert_eq!(result, 1);
1471 }
1472 _ => panic!("Expected Event output"),
1473 }
1474
1475 assert_eq!(operator.active_session_count(), 0);
1477 }
1478
1479 #[test]
1484 fn test_session_keyed_tracking() {
1485 let aggregator = SumAggregator::new(1); let mut operator = SessionWindowOperator::with_id(
1487 Duration::from_millis(1000),
1488 aggregator,
1489 Duration::from_millis(0),
1490 "test_op".to_string(),
1491 );
1492 operator.set_key_column(0); let mut timers = TimerService::new();
1495 let mut state = InMemoryStore::new();
1496 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1497
1498 let event1 = create_keyed_event(100, 1, 10);
1500 let event2 = create_keyed_event(500, 1, 20);
1501
1502 let event3 = create_keyed_event(200, 2, 100);
1504 let event4 = create_keyed_event(600, 2, 200);
1505
1506 for event in [event1, event2, event3, event4] {
1507 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1508 operator.process(&event, &mut ctx);
1509 }
1510
1511 assert_eq!(operator.active_session_count(), 2);
1513 }
1514
1515 #[test]
1520 fn test_session_late_event_dropped() {
1521 let aggregator = CountAggregator::new();
1522 let mut operator = SessionWindowOperator::with_id(
1523 Duration::from_millis(1000),
1524 aggregator,
1525 Duration::from_millis(0),
1526 "test_op".to_string(),
1527 );
1528
1529 let mut timers = TimerService::new();
1530 let mut state = InMemoryStore::new();
1531 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1532
1533 let event1 = create_test_event(10000, 1);
1535 {
1536 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1537 operator.process(&event1, &mut ctx);
1538 }
1539
1540 let late_event = create_test_event(100, 1);
1542 let outputs = {
1543 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1544 operator.process(&late_event, &mut ctx)
1545 };
1546
1547 let is_late = outputs.iter().any(|o| matches!(o, Output::LateEvent(_)));
1548 assert!(is_late);
1549 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1550 }
1551
1552 #[test]
1553 fn test_session_late_event_side_output() {
1554 let aggregator = CountAggregator::new();
1555 let mut operator = SessionWindowOperator::with_id(
1556 Duration::from_millis(1000),
1557 aggregator,
1558 Duration::from_millis(0),
1559 "test_op".to_string(),
1560 );
1561 operator.set_late_data_config(LateDataConfig::with_side_output("late".to_string()));
1562
1563 let mut timers = TimerService::new();
1564 let mut state = InMemoryStore::new();
1565 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1566
1567 let event1 = create_test_event(10000, 1);
1569 {
1570 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1571 operator.process(&event1, &mut ctx);
1572 }
1573
1574 let late_event = create_test_event(100, 1);
1576 let outputs = {
1577 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1578 operator.process(&late_event, &mut ctx)
1579 };
1580
1581 let side_output = outputs.iter().find_map(|o| {
1582 if let Output::SideOutput(data) = o {
1583 Some(data.name.clone())
1584 } else {
1585 None
1586 }
1587 });
1588 assert_eq!(side_output.as_deref(), Some("late"));
1589 assert_eq!(operator.late_data_metrics().late_events_side_output(), 1);
1590 }
1591
1592 #[test]
1597 fn test_session_emit_on_update() {
1598 let aggregator = CountAggregator::new();
1599 let mut operator = SessionWindowOperator::with_id(
1600 Duration::from_millis(1000),
1601 aggregator,
1602 Duration::from_millis(0),
1603 "test_op".to_string(),
1604 );
1605 operator.set_emit_strategy(EmitStrategy::OnUpdate);
1606
1607 let mut timers = TimerService::new();
1608 let mut state = InMemoryStore::new();
1609 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1610
1611 let event = create_test_event(500, 1);
1612 let outputs = {
1613 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1614 operator.process(&event, &mut ctx)
1615 };
1616
1617 let event_count = outputs
1618 .iter()
1619 .filter(|o| matches!(o, Output::Event(_)))
1620 .count();
1621 assert_eq!(event_count, 1);
1622 }
1623
1624 #[test]
1625 fn test_session_emit_changelog() {
1626 let aggregator = CountAggregator::new();
1627 let mut operator = SessionWindowOperator::with_id(
1628 Duration::from_millis(1000),
1629 aggregator,
1630 Duration::from_millis(0),
1631 "test_op".to_string(),
1632 );
1633 operator.set_emit_strategy(EmitStrategy::Changelog);
1634
1635 let mut timers = TimerService::new();
1636 let mut state = InMemoryStore::new();
1637 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1638
1639 let event = create_test_event(500, 1);
1640 let outputs = {
1641 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1642 operator.process(&event, &mut ctx)
1643 };
1644
1645 let changelog_count = outputs
1646 .iter()
1647 .filter(|o| matches!(o, Output::Changelog(_)))
1648 .count();
1649 assert_eq!(changelog_count, 1);
1650 }
1651
1652 #[test]
1653 fn test_session_emit_final_drops_late() {
1654 let aggregator = CountAggregator::new();
1655 let mut operator = SessionWindowOperator::with_id(
1656 Duration::from_millis(1000),
1657 aggregator,
1658 Duration::from_millis(0),
1659 "test_op".to_string(),
1660 );
1661 operator.set_emit_strategy(EmitStrategy::Final);
1662
1663 let mut timers = TimerService::new();
1664 let mut state = InMemoryStore::new();
1665 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
1666
1667 let event1 = create_test_event(10000, 1);
1669 {
1670 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1671 operator.process(&event1, &mut ctx);
1672 }
1673
1674 let late_event = create_test_event(100, 1);
1676 let outputs = {
1677 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1678 operator.process(&late_event, &mut ctx)
1679 };
1680
1681 assert!(outputs.is_empty());
1682 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
1683 }
1684
1685 #[test]
1690 fn test_session_checkpoint_restore() {
1691 let aggregator = CountAggregator::new();
1692 let mut operator = SessionWindowOperator::with_id(
1693 Duration::from_millis(1000),
1694 aggregator.clone(),
1695 Duration::from_millis(0),
1696 "test_op".to_string(),
1697 );
1698
1699 let mut timers = TimerService::new();
1700 let mut state = InMemoryStore::new();
1701 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1702
1703 for ts in [100, 500] {
1705 let event = create_test_event(ts, 1);
1706 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1707 operator.process(&event, &mut ctx);
1708 }
1709
1710 let counter_before = operator.session_id_counter.load(Ordering::Relaxed);
1711
1712 let checkpoint = operator.checkpoint();
1714
1715 let mut restored = SessionWindowOperator::with_id(
1717 Duration::from_millis(1000),
1718 aggregator,
1719 Duration::from_millis(0),
1720 "test_op".to_string(),
1721 );
1722 restored.restore(checkpoint).unwrap();
1723
1724 assert_eq!(restored.pending_timers.len(), 1);
1726 assert_eq!(
1727 restored.session_id_counter.load(Ordering::Relaxed),
1728 counter_before,
1729 );
1730 }
1731
1732 #[test]
1737 fn test_session_stale_timer_ignored() {
1738 let aggregator = CountAggregator::new();
1739 let mut operator = SessionWindowOperator::with_id(
1740 Duration::from_millis(1000),
1741 aggregator,
1742 Duration::from_millis(0),
1743 "test_op".to_string(),
1744 );
1745
1746 let mut timers = TimerService::new();
1747 let mut state = InMemoryStore::new();
1748 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1749
1750 let event1 = create_test_event(500, 1);
1752 {
1753 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1754 operator.process(&event1, &mut ctx);
1755 }
1756
1757 let (sid, old_timer_time, _kh) = first_pending_timer(&operator);
1758
1759 let event2 = create_test_event(1200, 1);
1761 {
1762 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1763 operator.process(&event2, &mut ctx);
1764 }
1765
1766 let stale_timer = Timer {
1768 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
1769 timestamp: old_timer_time,
1770 };
1771
1772 let outputs = {
1773 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1774 operator.on_timer(stale_timer, &mut ctx)
1775 };
1776
1777 assert!(outputs.is_empty());
1778 assert_eq!(operator.active_session_count(), 1);
1779 }
1780
1781 #[test]
1786 fn test_session_sum_aggregation() {
1787 let aggregator = SumAggregator::new(0);
1788 let mut operator = SessionWindowOperator::with_id(
1789 Duration::from_millis(1000),
1790 aggregator,
1791 Duration::from_millis(0),
1792 "test_op".to_string(),
1793 );
1794
1795 let mut timers = TimerService::new();
1796 let mut state = InMemoryStore::new();
1797 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1798
1799 for (ts, value) in [(100, 10), (500, 20), (800, 30)] {
1800 let event = create_test_event(ts, value);
1801 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1802 operator.process(&event, &mut ctx);
1803 }
1804
1805 let (sid, timer_time, _kh) = first_pending_timer(&operator);
1807 let timer = Timer {
1808 key: SessionWindowOperator::<SumAggregator>::timer_key(sid),
1809 timestamp: timer_time,
1810 };
1811
1812 let outputs = {
1813 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1814 operator.on_timer(timer, &mut ctx)
1815 };
1816
1817 match &outputs[0] {
1818 Output::Event(e) => {
1819 let result = e
1820 .data
1821 .column(2)
1822 .as_any()
1823 .downcast_ref::<Int64Array>()
1824 .unwrap()
1825 .value(0);
1826 assert_eq!(result, 60); }
1828 _ => panic!("Expected Event output"),
1829 }
1830 }
1831
1832 #[test]
1837 fn test_session_output_schema() {
1838 use arrow_schema::Schema;
1839 let schema = Schema::new(vec![
1840 arrow_schema::Field::new("window_start", arrow_schema::DataType::Int64, false),
1841 arrow_schema::Field::new("window_end", arrow_schema::DataType::Int64, false),
1842 arrow_schema::Field::new("result", arrow_schema::DataType::Int64, false),
1843 ]);
1844
1845 assert_eq!(schema.fields().len(), 3);
1846 assert_eq!(schema.field(0).name(), "window_start");
1847 assert_eq!(schema.field(1).name(), "window_end");
1848 assert_eq!(schema.field(2).name(), "result");
1849 }
1850
1851 #[test]
1856 fn test_multi_session_per_key() {
1857 let aggregator = CountAggregator::new();
1858 let mut operator = SessionWindowOperator::with_id(
1859 Duration::from_millis(1000),
1860 aggregator,
1861 Duration::from_millis(0),
1862 "test_op".to_string(),
1863 );
1864
1865 let mut timers = TimerService::new();
1866 let mut state = InMemoryStore::new();
1867 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1868
1869 let event1 = create_test_event(100, 1);
1871 {
1872 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1873 operator.process(&event1, &mut ctx);
1874 }
1875 assert_eq!(operator.active_session_count(), 1);
1876
1877 let event2 = create_test_event(5000, 1);
1879 {
1880 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1881 operator.process(&event2, &mut ctx);
1882 }
1883 assert_eq!(operator.active_session_count(), 2);
1884
1885 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1887 let index = operator.load_session_index(key_hash, &state);
1888 assert_eq!(index.len(), 2);
1889 assert_eq!(index.sessions[0].start, 100);
1890 assert_eq!(index.sessions[1].start, 5000);
1891 }
1892
1893 #[test]
1898 fn test_session_persists_across_batch_boundary() {
1899 let aggregator = CountAggregator::new();
1900 let mut operator = SessionWindowOperator::with_id(
1901 Duration::from_millis(1000),
1902 aggregator,
1903 Duration::from_millis(0),
1904 "test_op".to_string(),
1905 );
1906
1907 let mut timers = TimerService::new();
1908 let mut state = InMemoryStore::new();
1909 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1910
1911 let event1 = create_test_event(500, 1);
1913 {
1914 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1915 operator.process(&event1, &mut ctx);
1916 }
1917 assert_eq!(operator.active_session_count(), 1);
1918
1919 operator.session_indices.clear();
1921 assert_eq!(operator.active_session_count(), 0); let event2 = create_test_event(800, 1);
1925 {
1926 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1927 operator.process(&event2, &mut ctx);
1928 }
1929
1930 assert_eq!(operator.active_session_count(), 1);
1932
1933 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1934 let index = operator.load_session_index(key_hash, &state);
1935 assert_eq!(index.len(), 1);
1936 assert_eq!(index.sessions[0].start, 500);
1938 assert_eq!(index.sessions[0].end, 1800);
1939
1940 let sid = index.sessions[0].id;
1942 let acc = operator.load_accumulator(sid, &state);
1943 assert_eq!(acc.result(), 2);
1944 }
1945
1946 #[test]
1951 fn test_two_way_session_merge() {
1952 let aggregator = CountAggregator::new();
1955 let mut operator = SessionWindowOperator::with_id(
1956 Duration::from_millis(1000),
1957 aggregator,
1958 Duration::from_millis(0),
1959 "test_merge".to_string(),
1960 );
1961
1962 let mut timers = TimerService::new();
1963 let mut state = InMemoryStore::new();
1964 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1965
1966 let event1 = create_test_event(100, 1);
1968 {
1969 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1970 operator.process(&event1, &mut ctx);
1971 }
1972
1973 let event2 = create_test_event(1200, 1);
1975 {
1976 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1977 operator.process(&event2, &mut ctx);
1978 }
1979 assert_eq!(operator.active_session_count(), 2);
1980
1981 let bridge = create_test_event(1050, 1);
1983 {
1984 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1985 operator.process(&bridge, &mut ctx);
1986 }
1987
1988 assert_eq!(operator.active_session_count(), 1);
1990
1991 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
1992 let index = operator.load_session_index(key_hash, &state);
1993 assert_eq!(index.len(), 1);
1994
1995 assert_eq!(index.sessions[0].start, 100);
1997 assert_eq!(index.sessions[0].end, 2200);
1998
1999 let sid = index.sessions[0].id;
2001 let acc = operator.load_accumulator(sid, &state);
2002 assert_eq!(acc.result(), 3);
2003 }
2004
2005 #[test]
2006 fn test_three_way_session_merge() {
2007 let aggregator = CountAggregator::new();
2013 let mut operator = SessionWindowOperator::with_id(
2014 Duration::from_millis(500),
2015 aggregator,
2016 Duration::from_millis(5000),
2017 "test_merge3".to_string(),
2018 );
2019
2020 let mut timers = TimerService::new();
2021 let mut state = InMemoryStore::new();
2022 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2023
2024 for ts in [100, 700, 1300] {
2026 let event = create_test_event(ts, 1);
2027 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2028 operator.process(&event, &mut ctx);
2029 }
2030 assert_eq!(operator.active_session_count(), 3);
2031
2032 let bridge1 = create_test_event(550, 1);
2034 {
2035 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2036 operator.process(&bridge1, &mut ctx);
2037 }
2038 assert_eq!(operator.active_session_count(), 2);
2039
2040 let key_hash = SessionWindowOperator::<CountAggregator>::key_hash(&[]);
2042 let index = operator.load_session_index(key_hash, &state);
2043 assert_eq!(index.len(), 2);
2044
2045 let bridge2 = create_test_event(1150, 1);
2047 {
2048 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2049 operator.process(&bridge2, &mut ctx);
2050 }
2051 assert_eq!(operator.active_session_count(), 1);
2052
2053 let index = operator.load_session_index(key_hash, &state);
2054 assert_eq!(index.len(), 1);
2055
2056 assert_eq!(index.sessions[0].start, 100);
2058 assert_eq!(index.sessions[0].end, 1800);
2059
2060 let sid = index.sessions[0].id;
2062 let acc = operator.load_accumulator(sid, &state);
2063 assert_eq!(acc.result(), 5);
2064 }
2065
2066 #[test]
2067 fn test_merge_emits_changelog_retractions() {
2068 let aggregator = CountAggregator::new();
2072 let mut operator = SessionWindowOperator::with_id(
2073 Duration::from_millis(1000),
2074 aggregator,
2075 Duration::from_millis(0),
2076 "test_cl_merge".to_string(),
2077 );
2078 operator.set_emit_strategy(EmitStrategy::Changelog);
2079
2080 let mut timers = TimerService::new();
2081 let mut state = InMemoryStore::new();
2082 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2083
2084 let event1 = create_test_event(100, 1);
2086 let out1 = {
2087 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2088 operator.process(&event1, &mut ctx)
2089 };
2090 let insert_count_1 = out1
2091 .iter()
2092 .filter(|o| matches!(o, Output::Changelog(r) if r.weight == 1))
2093 .count();
2094 assert_eq!(insert_count_1, 1);
2095
2096 let event2 = create_test_event(1200, 1);
2098 let out2 = {
2099 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2100 operator.process(&event2, &mut ctx)
2101 };
2102 let insert_count_2 = out2
2103 .iter()
2104 .filter(|o| matches!(o, Output::Changelog(r) if r.weight == 1))
2105 .count();
2106 assert_eq!(insert_count_2, 1);
2107 assert_eq!(operator.active_session_count(), 2);
2108
2109 let bridge = create_test_event(1050, 1);
2111 let out3 = {
2112 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2113 operator.process(&bridge, &mut ctx)
2114 };
2115
2116 let deletes: Vec<_> = out3
2118 .iter()
2119 .filter(|o| matches!(o, Output::Changelog(r) if r.weight == -1))
2120 .collect();
2121 let inserts: Vec<_> = out3
2122 .iter()
2123 .filter(|o| matches!(o, Output::Changelog(r) if r.weight == 1))
2124 .collect();
2125
2126 assert_eq!(deletes.len(), 2, "Expected 2 delete retractions");
2127 assert_eq!(inserts.len(), 1, "Expected 1 insert for merged result");
2128
2129 assert_eq!(operator.active_session_count(), 1);
2131 }
2132
2133 #[test]
2134 fn test_merge_accumulator_correctness_sum() {
2135 let aggregator = SumAggregator::new(0);
2143 let mut operator = SessionWindowOperator::with_id(
2144 Duration::from_millis(500),
2145 aggregator,
2146 Duration::from_millis(0),
2147 "test_sum_merge".to_string(),
2148 );
2149
2150 let mut timers = TimerService::new();
2151 let mut state = InMemoryStore::new();
2152 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2153
2154 for (ts, val) in [(100, 10), (300, 20)] {
2156 let event = create_test_event(ts, val);
2157 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2158 operator.process(&event, &mut ctx);
2159 }
2160
2161 let event_b = create_test_event(900, 50);
2163 {
2164 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2165 operator.process(&event_b, &mut ctx);
2166 }
2167 assert_eq!(operator.active_session_count(), 2);
2168
2169 let bridge = create_test_event(750, 5);
2171 {
2172 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2173 operator.process(&bridge, &mut ctx);
2174 }
2175 assert_eq!(operator.active_session_count(), 1);
2176
2177 let key_hash = SessionWindowOperator::<SumAggregator>::key_hash(&[]);
2179 let index = operator.load_session_index(key_hash, &state);
2180 let sid = index.sessions[0].id;
2181 let acc = operator.load_accumulator(sid, &state);
2182 assert_eq!(acc.result(), 85); }
2184
2185 #[test]
2186 fn test_merge_cleans_up_loser_timers() {
2187 let aggregator = CountAggregator::new();
2190 let mut operator = SessionWindowOperator::with_id(
2191 Duration::from_millis(1000),
2192 aggregator,
2193 Duration::from_millis(0),
2194 "test_timer_merge".to_string(),
2195 );
2196
2197 let mut timers = TimerService::new();
2198 let mut state = InMemoryStore::new();
2199 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2200
2201 let event1 = create_test_event(100, 1);
2203 {
2204 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2205 operator.process(&event1, &mut ctx);
2206 }
2207
2208 let event2 = create_test_event(1200, 1);
2210 {
2211 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2212 operator.process(&event2, &mut ctx);
2213 }
2214 assert_eq!(operator.pending_timers.len(), 2);
2215
2216 let bridge = create_test_event(1050, 1);
2218 {
2219 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2220 operator.process(&bridge, &mut ctx);
2221 }
2222
2223 assert_eq!(operator.pending_timers.len(), 1);
2225 assert_eq!(operator.active_session_count(), 1);
2226 }
2227
2228 #[test]
2229 fn test_merge_no_retractions_for_on_watermark() {
2230 let aggregator = CountAggregator::new();
2234 let mut operator = SessionWindowOperator::with_id(
2235 Duration::from_millis(1000),
2236 aggregator,
2237 Duration::from_millis(0),
2238 "test_wm_merge".to_string(),
2239 );
2240 let mut timers = TimerService::new();
2243 let mut state = InMemoryStore::new();
2244 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2245
2246 let event1 = create_test_event(100, 1);
2248 {
2249 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2250 operator.process(&event1, &mut ctx);
2251 }
2252 let event2 = create_test_event(1200, 1);
2253 {
2254 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2255 operator.process(&event2, &mut ctx);
2256 }
2257
2258 let bridge = create_test_event(1050, 1);
2260 let outputs = {
2261 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2262 operator.process(&bridge, &mut ctx)
2263 };
2264
2265 let changelog_count = outputs
2266 .iter()
2267 .filter(|o| matches!(o, Output::Changelog(_)))
2268 .count();
2269 assert_eq!(changelog_count, 0);
2270 assert_eq!(operator.active_session_count(), 1);
2271 }
2272
2273 #[test]
2278 fn test_on_watermark_no_intermediate_emits_on_timer() {
2279 let aggregator = CountAggregator::new();
2282 let mut operator = SessionWindowOperator::with_id(
2283 Duration::from_millis(1000),
2284 aggregator,
2285 Duration::from_millis(0),
2286 "test_wm_emit".to_string(),
2287 );
2288 let mut timers = TimerService::new();
2291 let mut state = InMemoryStore::new();
2292 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2293
2294 let event1 = create_test_event(500, 1);
2296 let out1 = {
2297 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2298 operator.process(&event1, &mut ctx)
2299 };
2300
2301 let event2 = create_test_event(800, 1);
2302 let out2 = {
2303 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2304 operator.process(&event2, &mut ctx)
2305 };
2306
2307 let event_or_changelog = |o: &&Output| matches!(o, Output::Event(_) | Output::Changelog(_));
2309 assert_eq!(out1.iter().filter(event_or_changelog).count(), 0);
2310 assert_eq!(out2.iter().filter(event_or_changelog).count(), 0);
2311
2312 let (sid, timer_time, _kh) = first_pending_timer(&operator);
2314 let timer = Timer {
2315 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2316 timestamp: timer_time,
2317 };
2318 let timer_out = {
2319 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2320 operator.on_timer(timer, &mut ctx)
2321 };
2322
2323 assert_eq!(timer_out.len(), 1);
2324 match &timer_out[0] {
2325 Output::Event(e) => {
2326 let result = e
2327 .data
2328 .column(2)
2329 .as_any()
2330 .downcast_ref::<Int64Array>()
2331 .unwrap()
2332 .value(0);
2333 assert_eq!(result, 2);
2334 }
2335 _ => panic!("Expected Event output from on_timer"),
2336 }
2337 }
2338
2339 #[test]
2340 fn test_on_window_close_no_intermediate_emits_on_timer() {
2341 let aggregator = CountAggregator::new();
2344 let mut operator = SessionWindowOperator::with_id(
2345 Duration::from_millis(1000),
2346 aggregator,
2347 Duration::from_millis(0),
2348 "test_owc_emit".to_string(),
2349 );
2350 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2351
2352 let mut timers = TimerService::new();
2353 let mut state = InMemoryStore::new();
2354 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2355
2356 let event = create_test_event(500, 1);
2358 let out = {
2359 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2360 operator.process(&event, &mut ctx)
2361 };
2362
2363 let event_or_changelog = |o: &&Output| matches!(o, Output::Event(_) | Output::Changelog(_));
2365 assert_eq!(out.iter().filter(event_or_changelog).count(), 0);
2366
2367 let (sid, timer_time, _kh) = first_pending_timer(&operator);
2369 let timer = Timer {
2370 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2371 timestamp: timer_time,
2372 };
2373 let timer_out = {
2374 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2375 operator.on_timer(timer, &mut ctx)
2376 };
2377
2378 assert_eq!(timer_out.len(), 1);
2379 assert!(matches!(&timer_out[0], Output::Event(_)));
2380 }
2381
2382 #[test]
2383 fn test_changelog_timer_emits_changelog_record() {
2384 let aggregator = CountAggregator::new();
2387 let mut operator = SessionWindowOperator::with_id(
2388 Duration::from_millis(1000),
2389 aggregator,
2390 Duration::from_millis(0),
2391 "test_cl_timer".to_string(),
2392 );
2393 operator.set_emit_strategy(EmitStrategy::Changelog);
2394
2395 let mut timers = TimerService::new();
2396 let mut state = InMemoryStore::new();
2397 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2398
2399 let event = create_test_event(500, 1);
2401 {
2402 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2403 operator.process(&event, &mut ctx);
2404 }
2405
2406 let (sid, timer_time, _kh) = first_pending_timer(&operator);
2408 let timer = Timer {
2409 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2410 timestamp: timer_time,
2411 };
2412 let timer_out = {
2413 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2414 operator.on_timer(timer, &mut ctx)
2415 };
2416
2417 assert_eq!(timer_out.len(), 1);
2418 match &timer_out[0] {
2419 Output::Changelog(record) => {
2420 assert_eq!(record.weight, 1, "Expected insert weight");
2421 }
2422 _ => panic!("Expected Changelog output from on_timer"),
2423 }
2424 }
2425
2426 #[test]
2427 fn test_on_update_emits_updated_count() {
2428 let aggregator = CountAggregator::new();
2430 let mut operator = SessionWindowOperator::with_id(
2431 Duration::from_millis(1000),
2432 aggregator,
2433 Duration::from_millis(0),
2434 "test_ou_emit".to_string(),
2435 );
2436 operator.set_emit_strategy(EmitStrategy::OnUpdate);
2437
2438 let mut timers = TimerService::new();
2439 let mut state = InMemoryStore::new();
2440 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2441
2442 let event1 = create_test_event(500, 1);
2444 let out1 = {
2445 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2446 operator.process(&event1, &mut ctx)
2447 };
2448
2449 let emitted1: Vec<_> = out1
2450 .iter()
2451 .filter_map(|o| match o {
2452 Output::Event(e) => Some(e),
2453 _ => None,
2454 })
2455 .collect();
2456 assert_eq!(emitted1.len(), 1);
2457 let result1 = emitted1[0]
2458 .data
2459 .column(2)
2460 .as_any()
2461 .downcast_ref::<Int64Array>()
2462 .unwrap()
2463 .value(0);
2464 assert_eq!(result1, 1);
2465
2466 let event2 = create_test_event(800, 1);
2468 let out2 = {
2469 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2470 operator.process(&event2, &mut ctx)
2471 };
2472
2473 let emitted2: Vec<_> = out2
2474 .iter()
2475 .filter_map(|o| match o {
2476 Output::Event(e) => Some(e),
2477 _ => None,
2478 })
2479 .collect();
2480 assert_eq!(emitted2.len(), 1);
2481 let result2 = emitted2[0]
2482 .data
2483 .column(2)
2484 .as_any()
2485 .downcast_ref::<Int64Array>()
2486 .unwrap()
2487 .value(0);
2488 assert_eq!(result2, 2);
2489 }
2490
2491 #[test]
2496 fn test_timer_reregistration_after_restore() {
2497 let aggregator = CountAggregator::new();
2503 let mut operator = SessionWindowOperator::with_id(
2504 Duration::from_millis(1000),
2505 aggregator.clone(),
2506 Duration::from_millis(0),
2507 "test_restore_timers".to_string(),
2508 );
2509
2510 let mut timers = TimerService::new();
2511 let mut state = InMemoryStore::new();
2512 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2513
2514 let event = create_test_event(500, 1);
2516 {
2517 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2518 operator.process(&event, &mut ctx);
2519 }
2520 assert_eq!(operator.pending_timers.len(), 1);
2521 let (_, original_timer_time, _) = first_pending_timer(&operator);
2522 assert_eq!(original_timer_time, 1500);
2523
2524 let checkpoint = operator.checkpoint();
2526
2527 let mut restored = SessionWindowOperator::with_id(
2529 Duration::from_millis(1000),
2530 aggregator,
2531 Duration::from_millis(0),
2532 "test_restore_timers".to_string(),
2533 );
2534 let mut fresh_timers = TimerService::new();
2535 assert_eq!(fresh_timers.pending_count(), 0);
2536
2537 restored.restore(checkpoint).unwrap();
2539 assert!(restored.needs_timer_reregistration);
2540 assert_eq!(restored.pending_timers.len(), 1);
2541
2542 assert_eq!(fresh_timers.pending_count(), 0);
2544
2545 let event2 = create_test_event(5000, 1);
2547 {
2548 let mut ctx = create_test_context(&mut fresh_timers, &mut state, &mut watermark_gen);
2549 restored.process(&event2, &mut ctx);
2550 }
2551
2552 assert!(
2555 fresh_timers.pending_count() >= 2,
2556 "Expected at least 2 timers, got {}",
2557 fresh_timers.pending_count()
2558 );
2559 assert!(!restored.needs_timer_reregistration);
2560
2561 let fired = fresh_timers.poll_timers(1500);
2563 assert_eq!(fired.len(), 1);
2564 assert_eq!(fired[0].timestamp, 1500);
2565
2566 let timer = Timer {
2568 key: fired[0].key.clone().unwrap(),
2569 timestamp: fired[0].timestamp,
2570 };
2571 let timer_out = {
2572 let mut ctx = create_test_context(&mut fresh_timers, &mut state, &mut watermark_gen);
2573 restored.on_timer(timer, &mut ctx)
2574 };
2575
2576 assert_eq!(timer_out.len(), 1);
2578 match &timer_out[0] {
2579 Output::Event(e) => {
2580 let result = e
2581 .data
2582 .column(2)
2583 .as_any()
2584 .downcast_ref::<Int64Array>()
2585 .unwrap()
2586 .value(0);
2587 assert_eq!(result, 1);
2588 }
2589 _ => panic!("Expected Event output from restored timer"),
2590 }
2591 }
2592
2593 #[test]
2598 fn test_eowc_session_basic_close() {
2599 let aggregator = CountAggregator::new();
2602 let mut operator = SessionWindowOperator::with_id(
2603 Duration::from_millis(30_000), aggregator,
2605 Duration::from_millis(0),
2606 "eowc_session".to_string(),
2607 );
2608 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2609
2610 let mut timers = TimerService::new();
2611 let mut state = InMemoryStore::new();
2612 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2613
2614 for ts in [0, 10_000, 20_000] {
2616 let event = create_test_event(ts, 1);
2617 let outputs = {
2618 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2619 operator.process(&event, &mut ctx)
2620 };
2621 let event_count = outputs
2623 .iter()
2624 .filter(|o| matches!(o, Output::Event(_)))
2625 .count();
2626 assert_eq!(
2627 event_count, 0,
2628 "EOWC session should not emit intermediate results"
2629 );
2630 }
2631
2632 assert_eq!(operator.active_session_count(), 1);
2634
2635 let (sid, timer_time, _kh) = first_pending_timer(&operator);
2637 let timer = Timer {
2638 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2639 timestamp: timer_time,
2640 };
2641 let outputs = {
2642 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2643 operator.on_timer(timer, &mut ctx)
2644 };
2645
2646 assert_eq!(outputs.len(), 1, "Should emit exactly once");
2647 match &outputs[0] {
2648 Output::Event(e) => {
2649 let result = e
2650 .data
2651 .column(2)
2652 .as_any()
2653 .downcast_ref::<Int64Array>()
2654 .unwrap();
2655 assert_eq!(result.value(0), 3, "Session count should be 3");
2656 }
2657 other => panic!("Expected Output::Event, got: {other:?}"),
2658 }
2659
2660 assert_eq!(operator.active_session_count(), 0);
2662 }
2663
2664 #[test]
2665 fn test_eowc_session_no_intermediate_on_extend() {
2666 let aggregator = CountAggregator::new();
2669 let mut operator = SessionWindowOperator::with_id(
2670 Duration::from_millis(1000),
2671 aggregator,
2672 Duration::from_millis(0),
2673 "eowc_extend".to_string(),
2674 );
2675 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2676
2677 let mut timers = TimerService::new();
2678 let mut state = InMemoryStore::new();
2679 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2680
2681 for ts in (0..5).map(|i| i * 500) {
2683 let event = create_test_event(ts, 1);
2684 let outputs = {
2685 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2686 operator.process(&event, &mut ctx)
2687 };
2688 for output in &outputs {
2689 assert!(
2690 !matches!(output, Output::Event(_)),
2691 "No intermediate Event on session extend (ts={ts})"
2692 );
2693 assert!(
2694 !matches!(output, Output::Changelog(_)),
2695 "No intermediate Changelog on session extend (ts={ts})"
2696 );
2697 }
2698 }
2699
2700 assert_eq!(operator.active_session_count(), 1);
2702 }
2703
2704 #[test]
2705 fn test_eowc_session_merge_before_close() {
2706 let aggregator = CountAggregator::new();
2709 let mut operator = SessionWindowOperator::with_id(
2710 Duration::from_millis(1000), aggregator,
2712 Duration::from_millis(0),
2713 "eowc_merge".to_string(),
2714 );
2715 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2716
2717 let mut timers = TimerService::new();
2718 let mut state = InMemoryStore::new();
2719 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2720
2721 let e1 = create_test_event(100, 1);
2723 {
2724 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2725 operator.process(&e1, &mut ctx);
2726 }
2727
2728 let e2 = create_test_event(1200, 1);
2730 {
2731 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2732 operator.process(&e2, &mut ctx);
2733 }
2734 assert_eq!(
2735 operator.active_session_count(),
2736 2,
2737 "Should have 2 sessions before merge"
2738 );
2739
2740 let bridge = create_test_event(1050, 1);
2742 let bridge_outputs = {
2743 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2744 operator.process(&bridge, &mut ctx)
2745 };
2746
2747 let event_count = bridge_outputs
2749 .iter()
2750 .filter(|o| matches!(o, Output::Event(_)))
2751 .count();
2752 assert_eq!(
2753 event_count, 0,
2754 "Merge should not emit intermediate results with EOWC"
2755 );
2756 assert_eq!(
2757 operator.active_session_count(),
2758 1,
2759 "Should have 1 merged session"
2760 );
2761
2762 let (sid, timer_time, _kh) = first_pending_timer(&operator);
2764 let timer = Timer {
2765 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2766 timestamp: timer_time,
2767 };
2768 let outputs = {
2769 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2770 operator.on_timer(timer, &mut ctx)
2771 };
2772
2773 assert_eq!(outputs.len(), 1, "Merged session should emit once");
2774 if let Output::Event(e) = &outputs[0] {
2775 let result = e
2776 .data
2777 .column(2)
2778 .as_any()
2779 .downcast_ref::<Int64Array>()
2780 .unwrap();
2781 assert_eq!(
2782 result.value(0),
2783 3,
2784 "Merged session count should be 3 (1+1+1)"
2785 );
2786 } else {
2787 panic!("Expected Event output for merged session");
2788 }
2789 }
2790
2791 #[test]
2792 fn test_eowc_session_late_data_after_close() {
2793 let aggregator = CountAggregator::new();
2796 let mut operator = SessionWindowOperator::with_id(
2797 Duration::from_millis(1000),
2798 aggregator,
2799 Duration::from_millis(0),
2800 "eowc_late".to_string(),
2801 );
2802 operator.set_emit_strategy(EmitStrategy::OnWindowClose);
2803
2804 let mut timers = TimerService::new();
2805 let mut state = InMemoryStore::new();
2806 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2807
2808 let e1 = create_test_event(500, 1);
2810 {
2811 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2812 operator.process(&e1, &mut ctx);
2813 }
2814
2815 let (sid, timer_time, _kh) = first_pending_timer(&operator);
2817 let timer = Timer {
2818 key: SessionWindowOperator::<CountAggregator>::timer_key(sid),
2819 timestamp: timer_time,
2820 };
2821 {
2822 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2823 operator.on_timer(timer, &mut ctx);
2824 }
2825 assert_eq!(operator.active_session_count(), 0);
2826
2827 let advance = create_test_event(5000, 1);
2829 {
2830 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2831 operator.process(&advance, &mut ctx);
2832 }
2833
2834 let late = create_test_event(600, 99);
2836 let late_outputs = {
2837 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2838 operator.process(&late, &mut ctx)
2839 };
2840
2841 let is_late = late_outputs
2843 .iter()
2844 .any(|o| matches!(o, Output::LateEvent(_)));
2845 assert!(is_late, "Late event after session close should be detected");
2846
2847 let is_event = late_outputs.iter().any(|o| matches!(o, Output::Event(_)));
2849 assert!(!is_event, "Late event should not re-open closed session");
2850
2851 assert_eq!(operator.late_data_metrics().late_events_dropped(), 1);
2852 }
2853}