1use super::Event;
8use smallvec::SmallVec;
9use std::time::Duration;
10
11#[derive(Debug, Clone, PartialEq, Eq, Default)]
13pub enum EmitStrategy {
14 #[default]
16 OnWatermark,
17 Periodic(Duration),
19 OnUpdate,
21 OnWindowClose,
24 Changelog,
27 Final,
30}
31
32impl EmitStrategy {
33 #[must_use]
35 pub fn needs_periodic_timer(&self) -> bool {
36 matches!(self, Self::Periodic(_))
37 }
38
39 #[must_use]
41 pub fn periodic_interval(&self) -> Option<Duration> {
42 match self {
43 Self::Periodic(d) => Some(*d),
44 _ => None,
45 }
46 }
47
48 #[must_use]
50 pub fn emits_on_update(&self) -> bool {
51 matches!(self, Self::OnUpdate)
52 }
53
54 #[must_use]
56 pub fn emits_intermediate(&self) -> bool {
57 matches!(self, Self::OnUpdate | Self::Periodic(_))
58 }
59
60 #[must_use]
62 pub fn requires_changelog(&self) -> bool {
63 matches!(self, Self::Changelog)
64 }
65
66 #[must_use]
68 pub fn is_append_only_compatible(&self) -> bool {
69 matches!(self, Self::OnWindowClose | Self::Final)
70 }
71
72 #[must_use]
74 pub fn generates_retractions(&self) -> bool {
75 matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
76 }
77
78 #[must_use]
80 pub fn suppresses_intermediate(&self) -> bool {
81 matches!(self, Self::OnWindowClose | Self::Final)
82 }
83
84 #[must_use]
86 pub fn drops_late_data(&self) -> bool {
87 matches!(self, Self::Final)
88 }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub struct WindowId {
94 pub start: i64,
96 pub end: i64,
98}
99
100impl WindowId {
101 #[must_use]
103 pub fn new(start: i64, end: i64) -> Self {
104 Self { start, end }
105 }
106
107 #[must_use]
109 pub fn duration_ms(&self) -> i64 {
110 self.end - self.start
111 }
112
113 #[inline]
115 #[must_use]
116 pub fn to_key(&self) -> super::TimerKey {
117 super::TimerKey::from(self.to_key_inline())
118 }
119
120 #[inline]
122 #[must_use]
123 pub fn to_key_inline(&self) -> [u8; 16] {
124 let mut key = [0u8; 16];
125 key[..8].copy_from_slice(&self.start.to_be_bytes());
126 key[8..16].copy_from_slice(&self.end.to_be_bytes());
127 key
128 }
129
130 #[must_use]
132 pub fn from_key(key: &[u8]) -> Option<Self> {
133 if key.len() != 16 {
134 return None;
135 }
136 let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
137 let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
138 Some(Self { start, end })
139 }
140}
141
142pub type WindowIdVec = SmallVec<[WindowId; 4]>;
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum CdcOperation {
148 Insert,
150 Delete,
152 UpdateBefore,
154 UpdateAfter,
156}
157
158impl CdcOperation {
159 #[must_use]
161 pub fn weight(&self) -> i32 {
162 match self {
163 Self::Insert | Self::UpdateAfter => 1,
164 Self::Delete | Self::UpdateBefore => -1,
165 }
166 }
167
168 #[must_use]
170 pub fn is_insert(&self) -> bool {
171 matches!(self, Self::Insert | Self::UpdateAfter)
172 }
173
174 #[must_use]
176 pub fn is_delete(&self) -> bool {
177 matches!(self, Self::Delete | Self::UpdateBefore)
178 }
179
180 #[inline]
182 #[must_use]
183 pub fn to_u8(self) -> u8 {
184 match self {
185 Self::Insert => 0,
186 Self::Delete => 1,
187 Self::UpdateBefore => 2,
188 Self::UpdateAfter => 3,
189 }
190 }
191
192 #[inline]
194 #[must_use]
195 pub fn from_u8(value: u8) -> Option<Self> {
196 match value {
197 0 => Some(Self::Insert),
198 1 => Some(Self::Delete),
199 2 => Some(Self::UpdateBefore),
200 3 => Some(Self::UpdateAfter),
201 _ => None,
202 }
203 }
204}
205
206#[derive(Debug, Clone)]
208pub struct ChangelogRecord {
209 pub operation: CdcOperation,
211 pub weight: i32,
213 pub emit_timestamp: i64,
215 pub event: Event,
217}
218
219impl ChangelogRecord {
220 #[must_use]
222 pub fn insert(event: Event, emit_timestamp: i64) -> Self {
223 Self {
224 operation: CdcOperation::Insert,
225 weight: 1,
226 emit_timestamp,
227 event,
228 }
229 }
230
231 #[must_use]
233 pub fn delete(event: Event, emit_timestamp: i64) -> Self {
234 Self {
235 operation: CdcOperation::Delete,
236 weight: -1,
237 emit_timestamp,
238 event,
239 }
240 }
241
242 #[must_use]
244 pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
245 let before = Self {
246 operation: CdcOperation::UpdateBefore,
247 weight: -1,
248 emit_timestamp,
249 event: old_event,
250 };
251 let after = Self {
252 operation: CdcOperation::UpdateAfter,
253 weight: 1,
254 emit_timestamp,
255 event: new_event,
256 };
257 (before, after)
258 }
259
260 #[must_use]
262 pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
263 Self {
264 operation,
265 weight: operation.weight(),
266 emit_timestamp,
267 event,
268 }
269 }
270
271 #[must_use]
273 pub fn is_insert(&self) -> bool {
274 self.operation.is_insert()
275 }
276
277 #[must_use]
279 pub fn is_delete(&self) -> bool {
280 self.operation.is_delete()
281 }
282}
283
284pub trait WindowAssigner: Send {
286 fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
288
289 fn max_timestamp(&self, window_end: i64) -> i64 {
291 window_end - 1
292 }
293}
294
295#[derive(Debug, Clone)]
297pub struct TumblingWindowAssigner {
298 size_ms: i64,
299 offset_ms: i64,
300}
301
302impl TumblingWindowAssigner {
303 #[must_use]
306 pub fn new(size: Duration) -> Self {
307 let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
308 assert!(size_ms > 0, "Window size must be positive");
309 Self {
310 size_ms,
311 offset_ms: 0,
312 }
313 }
314
315 #[must_use]
318 pub fn from_millis(size_ms: i64) -> Self {
319 assert!(size_ms > 0, "Window size must be positive");
320 Self {
321 size_ms,
322 offset_ms: 0,
323 }
324 }
325
326 #[must_use]
328 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
329 self.offset_ms = offset_ms;
330 self
331 }
332
333 #[must_use]
335 pub fn size_ms(&self) -> i64 {
336 self.size_ms
337 }
338
339 #[must_use]
341 pub fn offset_ms(&self) -> i64 {
342 self.offset_ms
343 }
344
345 #[inline]
347 #[must_use]
348 pub fn assign(&self, timestamp: i64) -> WindowId {
349 let adjusted = timestamp - self.offset_ms;
350 let window_start = if adjusted >= 0 {
351 (adjusted / self.size_ms) * self.size_ms
352 } else {
353 ((adjusted - self.size_ms + 1) / self.size_ms) * self.size_ms
354 };
355 let window_start = window_start + self.offset_ms;
356 WindowId::new(window_start, window_start + self.size_ms)
357 }
358}
359
360impl WindowAssigner for TumblingWindowAssigner {
361 #[inline]
362 fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
363 let mut windows = WindowIdVec::new();
364 windows.push(self.assign(timestamp));
365 windows
366 }
367}
368
369#[cfg(test)]
370mod tests;