Skip to main content

laminar_core/operator/
window.rs

1//! Window assignment, emit strategies, and CDC types for stream processing.
2//!
3//! - [`TumblingWindowAssigner`]: Fixed-size, non-overlapping windows
4//! - [`EmitStrategy`]: Controls when window results are emitted
5//! - [`ChangelogRecord`]: CDC records with Z-set weights
6
7use super::Event;
8use smallvec::SmallVec;
9use std::time::Duration;
10
11/// Strategy for when window results should be emitted.
12#[derive(Debug, Clone, PartialEq, Eq, Default)]
13pub enum EmitStrategy {
14    /// Emit when watermark passes window end (default, most efficient).
15    #[default]
16    OnWatermark,
17    /// Emit intermediate results at fixed intervals, plus final on watermark.
18    Periodic(Duration),
19    /// Emit after every state change (lowest latency, highest overhead).
20    OnUpdate,
21    /// Emit only when window closes. Append-only safe, no retractions.
22    /// SQL: `EMIT ON WINDOW CLOSE`
23    OnWindowClose,
24    /// Emit changelog records with Z-set weights for CDC pipelines.
25    /// SQL: `EMIT CHANGES`
26    Changelog,
27    /// Suppress all intermediate results, emit only finalized.
28    /// SQL: `EMIT FINAL`
29    Final,
30}
31
32impl EmitStrategy {
33    /// Returns true if this strategy requires periodic timer registration.
34    #[must_use]
35    pub fn needs_periodic_timer(&self) -> bool {
36        matches!(self, Self::Periodic(_))
37    }
38
39    /// Returns the periodic interval if this is a periodic strategy.
40    #[must_use]
41    pub fn periodic_interval(&self) -> Option<Duration> {
42        match self {
43            Self::Periodic(d) => Some(*d),
44            _ => None,
45        }
46    }
47
48    /// Returns true if results should be emitted on every update.
49    #[must_use]
50    pub fn emits_on_update(&self) -> bool {
51        matches!(self, Self::OnUpdate)
52    }
53
54    /// Returns true if this strategy emits intermediate results before window close.
55    #[must_use]
56    pub fn emits_intermediate(&self) -> bool {
57        matches!(self, Self::OnUpdate | Self::Periodic(_))
58    }
59
60    /// Returns true if this strategy requires changelog/Z-set support.
61    #[must_use]
62    pub fn requires_changelog(&self) -> bool {
63        matches!(self, Self::Changelog)
64    }
65
66    /// Returns true if safe for append-only sinks (no retractions).
67    #[must_use]
68    pub fn is_append_only_compatible(&self) -> bool {
69        matches!(self, Self::OnWindowClose | Self::Final)
70    }
71
72    /// Returns true if late data should generate retractions.
73    #[must_use]
74    pub fn generates_retractions(&self) -> bool {
75        matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
76    }
77
78    /// Returns true if this strategy should suppress intermediate emissions.
79    #[must_use]
80    pub fn suppresses_intermediate(&self) -> bool {
81        matches!(self, Self::OnWindowClose | Self::Final)
82    }
83
84    /// Returns true if late data should be dropped entirely.
85    #[must_use]
86    pub fn drops_late_data(&self) -> bool {
87        matches!(self, Self::Final)
88    }
89}
90
91/// Unique identifier for a window (start inclusive, end exclusive, milliseconds).
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub struct WindowId {
94    /// Window start timestamp (inclusive, milliseconds).
95    pub start: i64,
96    /// Window end timestamp (exclusive, milliseconds).
97    pub end: i64,
98}
99
100impl WindowId {
101    /// Creates a new window ID.
102    #[must_use]
103    pub fn new(start: i64, end: i64) -> Self {
104        Self { start, end }
105    }
106
107    /// Window duration in milliseconds.
108    #[must_use]
109    pub fn duration_ms(&self) -> i64 {
110        self.end - self.start
111    }
112
113    /// Converts to a 16-byte big-endian key for state storage.
114    #[inline]
115    #[must_use]
116    pub fn to_key(&self) -> super::TimerKey {
117        super::TimerKey::from(self.to_key_inline())
118    }
119
120    /// Stack-allocated 16-byte key (zero-allocation hot path).
121    #[inline]
122    #[must_use]
123    pub fn to_key_inline(&self) -> [u8; 16] {
124        let mut key = [0u8; 16];
125        key[..8].copy_from_slice(&self.start.to_be_bytes());
126        key[8..16].copy_from_slice(&self.end.to_be_bytes());
127        key
128    }
129
130    /// Parses from a 16-byte big-endian key. Returns `None` if wrong length.
131    #[must_use]
132    pub fn from_key(key: &[u8]) -> Option<Self> {
133        if key.len() != 16 {
134            return None;
135        }
136        let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
137        let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
138        Some(Self { start, end })
139    }
140}
141
142/// Window assignment results. Inline storage for up to 4 windows (avoids heap).
143pub type WindowIdVec = SmallVec<[WindowId; 4]>;
144
145/// CDC operation type with Z-set weights.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum CdcOperation {
148    /// +1 weight
149    Insert,
150    /// -1 weight
151    Delete,
152    /// -1 weight (retraction before update)
153    UpdateBefore,
154    /// +1 weight (new value after update)
155    UpdateAfter,
156}
157
158impl CdcOperation {
159    /// Z-set weight: +1 for inserts, -1 for deletes.
160    #[must_use]
161    pub fn weight(&self) -> i32 {
162        match self {
163            Self::Insert | Self::UpdateAfter => 1,
164            Self::Delete | Self::UpdateBefore => -1,
165        }
166    }
167
168    /// Returns true for insert-type operations.
169    #[must_use]
170    pub fn is_insert(&self) -> bool {
171        matches!(self, Self::Insert | Self::UpdateAfter)
172    }
173
174    /// Returns true for delete-type operations.
175    #[must_use]
176    pub fn is_delete(&self) -> bool {
177        matches!(self, Self::Delete | Self::UpdateBefore)
178    }
179
180    /// Compact u8 encoding for storage.
181    #[inline]
182    #[must_use]
183    pub fn to_u8(self) -> u8 {
184        match self {
185            Self::Insert => 0,
186            Self::Delete => 1,
187            Self::UpdateBefore => 2,
188            Self::UpdateAfter => 3,
189        }
190    }
191
192    /// Decode from u8. Returns `None` for out-of-range values.
193    #[inline]
194    #[must_use]
195    pub fn from_u8(value: u8) -> Option<Self> {
196        match value {
197            0 => Some(Self::Insert),
198            1 => Some(Self::Delete),
199            2 => Some(Self::UpdateBefore),
200            3 => Some(Self::UpdateAfter),
201            _ => None,
202        }
203    }
204}
205
206/// Changelog record with Z-set weight for CDC pipelines.
207#[derive(Debug, Clone)]
208pub struct ChangelogRecord {
209    /// The CDC operation type.
210    pub operation: CdcOperation,
211    /// Z-set weight (+1 for insert, -1 for delete).
212    pub weight: i32,
213    /// Timestamp when this change was emitted.
214    pub emit_timestamp: i64,
215    /// The event data.
216    pub event: Event,
217}
218
219impl ChangelogRecord {
220    /// Creates an insert record (+1 weight).
221    #[must_use]
222    pub fn insert(event: Event, emit_timestamp: i64) -> Self {
223        Self {
224            operation: CdcOperation::Insert,
225            weight: 1,
226            emit_timestamp,
227            event,
228        }
229    }
230
231    /// Creates a delete record (-1 weight).
232    #[must_use]
233    pub fn delete(event: Event, emit_timestamp: i64) -> Self {
234        Self {
235            operation: CdcOperation::Delete,
236            weight: -1,
237            emit_timestamp,
238            event,
239        }
240    }
241
242    /// Creates an update retraction pair (`UpdateBefore`, `UpdateAfter`).
243    #[must_use]
244    pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
245        let before = Self {
246            operation: CdcOperation::UpdateBefore,
247            weight: -1,
248            emit_timestamp,
249            event: old_event,
250        };
251        let after = Self {
252            operation: CdcOperation::UpdateAfter,
253            weight: 1,
254            emit_timestamp,
255            event: new_event,
256        };
257        (before, after)
258    }
259
260    /// Creates a record from raw parts.
261    #[must_use]
262    pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
263        Self {
264            operation,
265            weight: operation.weight(),
266            emit_timestamp,
267            event,
268        }
269    }
270
271    /// Returns true for insert-type records.
272    #[must_use]
273    pub fn is_insert(&self) -> bool {
274        self.operation.is_insert()
275    }
276
277    /// Returns true for delete-type records.
278    #[must_use]
279    pub fn is_delete(&self) -> bool {
280        self.operation.is_delete()
281    }
282}
283
284/// Trait for assigning events to windows.
285pub trait WindowAssigner: Send {
286    /// Assigns a timestamp to one or more windows.
287    fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
288
289    /// Maximum timestamp assignable to a window ending at `window_end`.
290    fn max_timestamp(&self, window_end: i64) -> i64 {
291        window_end - 1
292    }
293}
294
295/// Tumbling window assigner: fixed-size, non-overlapping windows aligned to epoch.
296#[derive(Debug, Clone)]
297pub struct TumblingWindowAssigner {
298    size_ms: i64,
299    offset_ms: i64,
300}
301
302impl TumblingWindowAssigner {
303    /// # Panics
304    /// Panics if size is zero.
305    #[must_use]
306    pub fn new(size: Duration) -> Self {
307        let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
308        assert!(size_ms > 0, "Window size must be positive");
309        Self {
310            size_ms,
311            offset_ms: 0,
312        }
313    }
314
315    /// # Panics
316    /// Panics if `size_ms` is zero or negative.
317    #[must_use]
318    pub fn from_millis(size_ms: i64) -> Self {
319        assert!(size_ms > 0, "Window size must be positive");
320        Self {
321            size_ms,
322            offset_ms: 0,
323        }
324    }
325
326    /// Sets window offset in milliseconds for timezone-aligned windows.
327    #[must_use]
328    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
329        self.offset_ms = offset_ms;
330        self
331    }
332
333    /// Window size in milliseconds.
334    #[must_use]
335    pub fn size_ms(&self) -> i64 {
336        self.size_ms
337    }
338
339    /// Window offset in milliseconds.
340    #[must_use]
341    pub fn offset_ms(&self) -> i64 {
342        self.offset_ms
343    }
344
345    /// O(1) window assignment. Floor-divides timestamp into window boundaries.
346    #[inline]
347    #[must_use]
348    pub fn assign(&self, timestamp: i64) -> WindowId {
349        let adjusted = timestamp - self.offset_ms;
350        let window_start = if adjusted >= 0 {
351            (adjusted / self.size_ms) * self.size_ms
352        } else {
353            ((adjusted - self.size_ms + 1) / self.size_ms) * self.size_ms
354        };
355        let window_start = window_start + self.offset_ms;
356        WindowId::new(window_start, window_start + self.size_ms)
357    }
358}
359
360impl WindowAssigner for TumblingWindowAssigner {
361    #[inline]
362    fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
363        let mut windows = WindowIdVec::new();
364        windows.push(self.assign(timestamp));
365        windows
366    }
367}
368
369#[cfg(test)]
370mod tests;