Skip to main content

laminar_core/operator/
sliding_window.rs

1//! Sliding (hopping) window assigner.
2
3use super::window::{WindowAssigner, WindowId, WindowIdVec};
4use std::time::Duration;
5
6/// Sliding window assigner.
7///
8/// Each event is assigned to one or more overlapping windows.
9/// The number of windows per event is `ceil(size / slide)`.
10#[derive(Debug, Clone)]
11pub struct SlidingWindowAssigner {
12    /// Window size in milliseconds
13    size_ms: i64,
14    /// Slide interval in milliseconds
15    slide_ms: i64,
16    /// Number of windows per event (cached for performance)
17    windows_per_event: usize,
18    /// Offset in milliseconds for timezone-aligned windows
19    offset_ms: i64,
20}
21
22impl SlidingWindowAssigner {
23    /// Creates a new sliding window assigner.
24    ///
25    /// # Panics
26    ///
27    /// Panics if size or slide is zero/negative, or if slide > size.
28    #[must_use]
29    pub fn new(size: Duration, slide: Duration) -> Self {
30        let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
31        let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
32
33        assert!(size_ms > 0, "Window size must be positive");
34        assert!(slide_ms > 0, "Slide interval must be positive");
35        assert!(
36            slide_ms <= size_ms,
37            "Slide must not exceed size (use tumbling windows for non-overlapping)"
38        );
39
40        let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
41            .expect("Windows per event should fit in usize");
42
43        Self {
44            size_ms,
45            slide_ms,
46            windows_per_event,
47            offset_ms: 0,
48        }
49    }
50
51    /// Creates a new sliding window assigner with sizes in milliseconds.
52    ///
53    /// # Panics
54    ///
55    /// Panics if size or slide is zero/negative, or if slide > size.
56    #[must_use]
57    #[allow(clippy::cast_sign_loss)]
58    pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
59        assert!(size_ms > 0, "Window size must be positive");
60        assert!(slide_ms > 0, "Slide interval must be positive");
61        assert!(
62            slide_ms <= size_ms,
63            "Slide must not exceed size (use tumbling windows for non-overlapping)"
64        );
65
66        let windows_per_event =
67            usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
68
69        Self {
70            size_ms,
71            slide_ms,
72            windows_per_event,
73            offset_ms: 0,
74        }
75    }
76
77    /// Set window offset in milliseconds.
78    #[must_use]
79    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
80        self.offset_ms = offset_ms;
81        self
82    }
83
84    /// Returns the window size in milliseconds.
85    #[must_use]
86    pub fn size_ms(&self) -> i64 {
87        self.size_ms
88    }
89
90    /// Returns the slide interval in milliseconds.
91    #[must_use]
92    pub fn slide_ms(&self) -> i64 {
93        self.slide_ms
94    }
95
96    /// Returns the number of windows each event belongs to.
97    #[must_use]
98    pub fn windows_per_event(&self) -> usize {
99        self.windows_per_event
100    }
101
102    /// Returns the window offset in milliseconds.
103    #[must_use]
104    pub fn offset_ms(&self) -> i64 {
105        self.offset_ms
106    }
107
108    /// Computes the last window start that could contain this timestamp.
109    #[inline]
110    fn last_window_start(&self, timestamp: i64) -> i64 {
111        let adjusted = timestamp - self.offset_ms;
112        let base = if adjusted >= 0 {
113            (adjusted / self.slide_ms) * self.slide_ms
114        } else {
115            (adjusted.saturating_sub(self.slide_ms).saturating_add(1) / self.slide_ms)
116                * self.slide_ms
117        };
118        base + self.offset_ms
119    }
120}
121
122impl WindowAssigner for SlidingWindowAssigner {
123    /// Assigns a timestamp to all overlapping windows.
124    ///
125    /// Returns windows in order from earliest to latest start time.
126    #[inline]
127    fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
128        let mut windows = WindowIdVec::new();
129
130        let last_start = self.last_window_start(timestamp);
131
132        let mut window_start = last_start;
133        while window_start + self.size_ms > timestamp {
134            let window_end = window_start + self.size_ms;
135            windows.push(WindowId::new(window_start, window_end));
136            let prev = window_start;
137            window_start = window_start.saturating_sub(self.slide_ms);
138            if window_start == prev {
139                break;
140            }
141        }
142
143        windows.reverse();
144        windows
145    }
146
147    fn max_timestamp(&self, window_end: i64) -> i64 {
148        window_end - 1
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[test]
157    fn test_sliding_assigner_basic() {
158        let assigner = SlidingWindowAssigner::from_millis(10_000, 5_000);
159        let windows = assigner.assign_windows(7_000);
160        assert_eq!(windows.len(), 2);
161        assert_eq!(windows[0].start, 0);
162        assert_eq!(windows[0].end, 10_000);
163        assert_eq!(windows[1].start, 5_000);
164        assert_eq!(windows[1].end, 15_000);
165    }
166
167    #[test]
168    fn test_sliding_assigner_windows_per_event() {
169        let assigner = SlidingWindowAssigner::from_millis(10_000, 5_000);
170        assert_eq!(assigner.windows_per_event(), 2);
171
172        let assigner = SlidingWindowAssigner::from_millis(15_000, 5_000);
173        assert_eq!(assigner.windows_per_event(), 3);
174    }
175}