1use std::time::Duration;
7
8use arrow_schema::{DataType, Field};
9
10use crate::parser::{
11 EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
12};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum WindowType {
17 Tumbling,
19 Sliding,
21 Session,
23 Cumulate,
25}
26
27#[derive(Debug, Clone)]
39pub struct WindowOperatorConfig {
40 pub window_type: WindowType,
42 pub time_column: String,
44 pub size: Duration,
46 pub slide: Option<Duration>,
48 pub gap: Option<Duration>,
50 pub offset_ms: i64,
52 pub allowed_lateness: Duration,
54 pub emit_strategy: EmitStrategy,
56 pub late_data_side_output: Option<String>,
58}
59
60fn format_duration(d: Duration) -> String {
62 let secs = d.as_secs();
63 if secs == 0 {
64 return format!("{}ms", d.as_millis());
65 }
66 if secs.is_multiple_of(3600) {
67 format!("{}h", secs / 3600)
68 } else if secs.is_multiple_of(60) {
69 format!("{}m", secs / 60)
70 } else {
71 format!("{secs}s")
72 }
73}
74
75impl std::fmt::Display for WindowType {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 WindowType::Tumbling => write!(f, "TUMBLE"),
79 WindowType::Sliding => write!(f, "HOP"),
80 WindowType::Session => write!(f, "SESSION"),
81 WindowType::Cumulate => write!(f, "CUMULATE"),
82 }
83 }
84}
85
86impl std::fmt::Display for WindowOperatorConfig {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self.window_type {
89 WindowType::Tumbling => {
90 write!(
91 f,
92 "TUMBLE({}, {})",
93 self.time_column,
94 format_duration(self.size)
95 )
96 }
97 WindowType::Sliding => {
98 let slide = self.slide.unwrap_or(self.size);
99 write!(
100 f,
101 "HOP({}, {} SLIDE {})",
102 self.time_column,
103 format_duration(self.size),
104 format_duration(slide)
105 )
106 }
107 WindowType::Session => {
108 let gap = self.gap.unwrap_or(Duration::ZERO);
109 write!(
110 f,
111 "SESSION({}, GAP {})",
112 self.time_column,
113 format_duration(gap)
114 )
115 }
116 WindowType::Cumulate => {
117 let step = self.slide.unwrap_or(self.size);
118 write!(
119 f,
120 "CUMULATE({}, STEP {} SIZE {})",
121 self.time_column,
122 format_duration(step),
123 format_duration(self.size)
124 )
125 }
126 }
127 }
128}
129
130impl WindowOperatorConfig {
131 #[must_use]
134 pub fn output_prefix_fields() -> Vec<Field> {
135 vec![
136 Field::new("window_start", DataType::Int64, false),
137 Field::new("window_end", DataType::Int64, false),
138 ]
139 }
140
141 #[must_use]
143 pub fn tumbling(time_column: String, size: Duration) -> Self {
144 Self {
145 window_type: WindowType::Tumbling,
146 time_column,
147 size,
148 slide: None,
149 gap: None,
150 offset_ms: 0,
151 allowed_lateness: Duration::ZERO,
152 emit_strategy: EmitStrategy::OnWatermark,
153 late_data_side_output: None,
154 }
155 }
156
157 #[must_use]
159 pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
160 Self {
161 window_type: WindowType::Sliding,
162 time_column,
163 size,
164 slide: Some(slide),
165 gap: None,
166 offset_ms: 0,
167 allowed_lateness: Duration::ZERO,
168 emit_strategy: EmitStrategy::OnWatermark,
169 late_data_side_output: None,
170 }
171 }
172
173 #[must_use]
175 pub fn session(time_column: String, gap: Duration) -> Self {
176 Self {
177 window_type: WindowType::Session,
178 time_column,
179 size: Duration::ZERO, slide: None,
181 gap: Some(gap),
182 offset_ms: 0,
183 allowed_lateness: Duration::ZERO,
184 emit_strategy: EmitStrategy::OnWatermark,
185 late_data_side_output: None,
186 }
187 }
188
189 #[must_use]
194 pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
195 Self {
196 window_type: WindowType::Cumulate,
197 time_column,
198 size: max_size,
199 slide: Some(step),
200 gap: None,
201 offset_ms: 0,
202 allowed_lateness: Duration::ZERO,
203 emit_strategy: EmitStrategy::OnWatermark,
204 late_data_side_output: None,
205 }
206 }
207
208 #[must_use]
210 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
211 self.offset_ms = offset_ms;
212 self
213 }
214
215 pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
223 let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
224 ParseError::WindowError("Cannot extract time column name".to_string())
225 })?;
226
227 match window {
228 WindowFunction::Tumble {
229 interval, offset, ..
230 } => {
231 let size = WindowRewriter::parse_interval_to_duration(interval)?;
232 let mut config = Self::tumbling(time_column, size);
233 if let Some(ref off) = offset {
234 let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
235 config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
236 }
237 Ok(config)
238 }
239 WindowFunction::Hop {
240 slide_interval,
241 window_interval,
242 offset,
243 ..
244 } => {
245 let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
246 let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
247 let mut config = Self::sliding(time_column, size, slide);
248 if let Some(ref off) = offset {
249 let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
250 config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
251 }
252 Ok(config)
253 }
254 WindowFunction::Session { gap_interval, .. } => {
255 let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
256 Ok(Self::session(time_column, gap))
257 }
258 WindowFunction::Cumulate {
259 step_interval,
260 max_size_interval,
261 ..
262 } => {
263 let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
264 let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
265 Ok(Self::cumulate(time_column, step, max_size))
266 }
267 }
268 }
269
270 pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
276 self.emit_strategy = emit_clause.to_emit_strategy()?;
277 Ok(self)
278 }
279
280 pub fn with_late_data_clause(
286 mut self,
287 late_data_clause: &LateDataClause,
288 ) -> Result<Self, ParseError> {
289 if late_data_clause.side_output.is_some() {
290 return Err(ParseError::WindowError(
291 "LATE DATA SIDE OUTPUT is not yet supported in pipeline mode; \
292 use ALLOWED LATENESS without a side output, or omit the clause"
293 .to_string(),
294 ));
295 }
296 self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
297 self.late_data_side_output
298 .clone_from(&late_data_clause.side_output);
299 Ok(self)
300 }
301
302 #[must_use]
304 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
305 self.allowed_lateness = lateness;
306 self
307 }
308
309 #[must_use]
311 pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
312 self.emit_strategy = strategy;
313 self
314 }
315
316 #[must_use]
318 pub fn with_late_data_side_output(mut self, name: String) -> Self {
319 self.late_data_side_output = Some(name);
320 self
321 }
322
323 pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
331 if matches!(
332 self.emit_strategy,
333 EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
334 ) {
335 if !has_watermark {
336 return Err(ParseError::WindowError(
337 "EMIT ON WINDOW CLOSE requires a watermark definition \
338 on the source. Add WATERMARK FOR <column> AS <expr> \
339 to the CREATE SOURCE statement."
340 .to_string(),
341 ));
342 }
343 if !has_window {
344 return Err(ParseError::WindowError(
345 "EMIT ON WINDOW CLOSE is only valid with windowed \
346 aggregation queries. Use EMIT ON UPDATE for \
347 non-windowed queries."
348 .to_string(),
349 ));
350 }
351 }
352 Ok(())
353 }
354
355 #[must_use]
360 pub fn is_append_only_compatible(&self) -> bool {
361 matches!(
362 self.emit_strategy,
363 EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
364 )
365 }
366
367 #[must_use]
369 pub fn has_late_data_handling(&self) -> bool {
370 self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use sqlparser::ast::{Expr, Ident};
378
379 fn make_tumble_window() -> WindowFunction {
380 WindowFunction::Tumble {
382 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
383 interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
384 offset: None,
385 }
386 }
387
388 #[test]
389 fn test_tumbling_config() {
390 let config =
391 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
392
393 assert_eq!(config.window_type, WindowType::Tumbling);
394 assert_eq!(config.time_column, "event_time");
395 assert_eq!(config.size, Duration::from_secs(300));
396 assert!(config.slide.is_none());
397 assert!(config.gap.is_none());
398 }
399
400 #[test]
401 fn test_sliding_config() {
402 let config = WindowOperatorConfig::sliding(
403 "ts".to_string(),
404 Duration::from_secs(300),
405 Duration::from_secs(60),
406 );
407
408 assert_eq!(config.window_type, WindowType::Sliding);
409 assert_eq!(config.size, Duration::from_secs(300));
410 assert_eq!(config.slide, Some(Duration::from_secs(60)));
411 }
412
413 #[test]
414 fn test_session_config() {
415 let config =
416 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
417
418 assert_eq!(config.window_type, WindowType::Session);
419 assert_eq!(config.gap, Some(Duration::from_secs(1800)));
420 }
421
422 #[test]
423 fn test_from_window_function() {
424 let window = make_tumble_window();
425 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
426
427 assert_eq!(config.window_type, WindowType::Tumbling);
428 assert_eq!(config.time_column, "event_time");
429 assert_eq!(config.size, Duration::from_secs(300));
430 }
431
432 #[test]
433 fn test_with_emit_clause() {
434 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435
436 let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
437 assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
438
439 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
440 let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
441 assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
442 }
443
444 #[test]
445 fn test_with_late_data_clause_side_output_rejected() {
446 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
447
448 let late_clause = LateDataClause::side_output_only("late_events".to_string());
450 let result = config.with_late_data_clause(&late_clause);
451 assert!(result.is_err());
452 assert!(result
453 .unwrap_err()
454 .to_string()
455 .contains("not yet supported"));
456 }
457
458 #[test]
459 fn test_with_late_data_clause_lateness_only_accepted() {
460 use sqlparser::ast::Expr;
461
462 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
463
464 let late_clause = LateDataClause::with_allowed_lateness(Expr::Value(
466 sqlparser::ast::Value::SingleQuotedString("5 SECONDS".to_string()).into(),
467 ));
468 let config = config.with_late_data_clause(&late_clause).unwrap();
469 assert_eq!(config.allowed_lateness, Duration::from_secs(5));
470 assert!(config.late_data_side_output.is_none());
471 }
472
473 #[test]
474 fn test_append_only_compatible() {
475 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
476
477 assert!(config.is_append_only_compatible());
479
480 let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
482 assert!(!config2.is_append_only_compatible());
483
484 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
486 .with_emit_strategy(EmitStrategy::Changelog);
487 assert!(!config3.is_append_only_compatible());
488 }
489
490 #[test]
491 fn test_has_late_data_handling() {
492 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
493
494 assert!(!config.has_late_data_handling());
496
497 let config2 = config
499 .clone()
500 .with_allowed_lateness(Duration::from_secs(60));
501 assert!(config2.has_late_data_handling());
502
503 let config3 = config.with_late_data_side_output("late".to_string());
505 assert!(config3.has_late_data_handling());
506 }
507
508 #[test]
509 fn test_eowc_without_watermark_errors() {
510 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
511 .with_emit_strategy(EmitStrategy::OnWindowClose);
512
513 let result = config.validate(false, true);
514 assert!(result.is_err());
515 let err = result.unwrap_err().to_string();
516 assert!(
517 err.contains("watermark"),
518 "Expected watermark error, got: {err}"
519 );
520 }
521
522 #[test]
523 fn test_eowc_without_window_errors() {
524 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
525 .with_emit_strategy(EmitStrategy::OnWindowClose);
526
527 let result = config.validate(true, false);
528 assert!(result.is_err());
529 let err = result.unwrap_err().to_string();
530 assert!(
531 err.contains("windowed"),
532 "Expected windowed query error, got: {err}"
533 );
534 }
535
536 #[test]
537 fn test_eowc_with_watermark_and_window_passes() {
538 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
539 .with_emit_strategy(EmitStrategy::OnWindowClose);
540
541 assert!(config.validate(true, true).is_ok());
542 }
543
544 #[test]
545 fn test_final_without_watermark_errors() {
546 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
547 .with_emit_strategy(EmitStrategy::FinalOnly);
548
549 let result = config.validate(false, true);
550 assert!(result.is_err());
551 }
552
553 #[test]
554 fn test_non_eowc_without_watermark_ok() {
555 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
557 .with_emit_strategy(EmitStrategy::OnUpdate);
558 assert!(config.validate(false, false).is_ok());
559
560 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
562 .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
563 assert!(config2.validate(false, false).is_ok());
564
565 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
567 .with_emit_strategy(EmitStrategy::Changelog);
568 assert!(config3.validate(false, false).is_ok());
569 }
570
571 #[test]
572 fn test_display_tumbling_window() {
573 let config =
574 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
575 assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
576 }
577
578 #[test]
579 fn test_display_sliding_window() {
580 let config = WindowOperatorConfig::sliding(
581 "ts".to_string(),
582 Duration::from_secs(300),
583 Duration::from_secs(60),
584 );
585 assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
586 }
587
588 #[test]
589 fn test_display_session_window() {
590 let config =
591 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
592 assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
593 }
594
595 #[test]
596 fn test_cumulate_config() {
597 let config = WindowOperatorConfig::cumulate(
598 "ts".to_string(),
599 Duration::from_secs(60),
600 Duration::from_secs(300),
601 );
602
603 assert_eq!(config.window_type, WindowType::Cumulate);
604 assert_eq!(config.time_column, "ts");
605 assert_eq!(config.size, Duration::from_secs(300));
606 assert_eq!(config.slide, Some(Duration::from_secs(60)));
607 assert!(config.gap.is_none());
608 }
609
610 #[test]
611 fn test_cumulate_from_window_function() {
612 let window = WindowFunction::Cumulate {
613 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
614 step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
615 max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
616 };
617 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
618
619 assert_eq!(config.window_type, WindowType::Cumulate);
620 assert_eq!(config.time_column, "event_time");
621 assert_eq!(config.size, Duration::from_secs(300));
622 assert_eq!(config.slide, Some(Duration::from_secs(60)));
623 }
624
625 #[test]
626 fn test_display_cumulate_window() {
627 let config = WindowOperatorConfig::cumulate(
628 "ts".to_string(),
629 Duration::from_secs(60),
630 Duration::from_secs(300),
631 );
632 assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
633 }
634
635 #[test]
636 fn test_display_window_type() {
637 assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
638 assert_eq!(format!("{}", WindowType::Sliding), "HOP");
639 assert_eq!(format!("{}", WindowType::Session), "SESSION");
640 assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
641 }
642
643 #[test]
644 fn test_display_duration_formatting() {
645 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
647 assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
648
649 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
651 assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
652
653 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
655 assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
656 }
657}