laminar_core/operator/
sliding_window.rs1use super::window::{WindowAssigner, WindowId, WindowIdVec};
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
11pub struct SlidingWindowAssigner {
12 size_ms: i64,
14 slide_ms: i64,
16 windows_per_event: usize,
18 offset_ms: i64,
20}
21
22impl SlidingWindowAssigner {
23 #[must_use]
29 pub fn new(size: Duration, slide: Duration) -> Self {
30 let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
31 let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
32
33 assert!(size_ms > 0, "Window size must be positive");
34 assert!(slide_ms > 0, "Slide interval must be positive");
35 assert!(
36 slide_ms <= size_ms,
37 "Slide must not exceed size (use tumbling windows for non-overlapping)"
38 );
39
40 let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
41 .expect("Windows per event should fit in usize");
42
43 Self {
44 size_ms,
45 slide_ms,
46 windows_per_event,
47 offset_ms: 0,
48 }
49 }
50
51 #[must_use]
57 #[allow(clippy::cast_sign_loss)]
58 pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
59 assert!(size_ms > 0, "Window size must be positive");
60 assert!(slide_ms > 0, "Slide interval must be positive");
61 assert!(
62 slide_ms <= size_ms,
63 "Slide must not exceed size (use tumbling windows for non-overlapping)"
64 );
65
66 let windows_per_event =
67 usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
68
69 Self {
70 size_ms,
71 slide_ms,
72 windows_per_event,
73 offset_ms: 0,
74 }
75 }
76
77 #[must_use]
79 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
80 self.offset_ms = offset_ms;
81 self
82 }
83
84 #[must_use]
86 pub fn size_ms(&self) -> i64 {
87 self.size_ms
88 }
89
90 #[must_use]
92 pub fn slide_ms(&self) -> i64 {
93 self.slide_ms
94 }
95
96 #[must_use]
98 pub fn windows_per_event(&self) -> usize {
99 self.windows_per_event
100 }
101
102 #[must_use]
104 pub fn offset_ms(&self) -> i64 {
105 self.offset_ms
106 }
107
108 #[inline]
110 fn last_window_start(&self, timestamp: i64) -> i64 {
111 let adjusted = timestamp - self.offset_ms;
112 let base = if adjusted >= 0 {
113 (adjusted / self.slide_ms) * self.slide_ms
114 } else {
115 (adjusted.saturating_sub(self.slide_ms).saturating_add(1) / self.slide_ms)
116 * self.slide_ms
117 };
118 base + self.offset_ms
119 }
120}
121
122impl WindowAssigner for SlidingWindowAssigner {
123 #[inline]
127 fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
128 let mut windows = WindowIdVec::new();
129
130 let last_start = self.last_window_start(timestamp);
131
132 let mut window_start = last_start;
133 while window_start + self.size_ms > timestamp {
134 let window_end = window_start + self.size_ms;
135 windows.push(WindowId::new(window_start, window_end));
136 let prev = window_start;
137 window_start = window_start.saturating_sub(self.slide_ms);
138 if window_start == prev {
139 break;
140 }
141 }
142
143 windows.reverse();
144 windows
145 }
146
147 fn max_timestamp(&self, window_end: i64) -> i64 {
148 window_end - 1
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[test]
157 fn test_sliding_assigner_basic() {
158 let assigner = SlidingWindowAssigner::from_millis(10_000, 5_000);
159 let windows = assigner.assign_windows(7_000);
160 assert_eq!(windows.len(), 2);
161 assert_eq!(windows[0].start, 0);
162 assert_eq!(windows[0].end, 10_000);
163 assert_eq!(windows[1].start, 5_000);
164 assert_eq!(windows[1].end, 15_000);
165 }
166
167 #[test]
168 fn test_sliding_assigner_windows_per_event() {
169 let assigner = SlidingWindowAssigner::from_millis(10_000, 5_000);
170 assert_eq!(assigner.windows_per_event(), 2);
171
172 let assigner = SlidingWindowAssigner::from_millis(15_000, 5_000);
173 assert_eq!(assigner.windows_per_event(), 3);
174 }
175}