Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use arrow_schema::{DataType, Field};
9
10use crate::parser::{
11    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
12};
13
14/// Type of window operation
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum WindowType {
17    /// Fixed-size non-overlapping windows
18    Tumbling,
19    /// Fixed-size overlapping windows with slide
20    Sliding,
21    /// Dynamic windows based on activity gaps
22    Session,
23    /// Incrementally growing windows within fixed-size epochs
24    Cumulate,
25}
26
27/// Complete configuration for instantiating a window operator.
28///
29/// This structure holds all the information needed to create and configure
30/// a window operator in Ring 0.
31///
32/// # EMIT ON WINDOW CLOSE
33///
34/// When `emit_strategy` is `OnWindowClose` or `FinalOnly`, use [`validate()`](Self::validate)
35/// to ensure the configuration is valid. These strategies require:
36/// - A watermark definition on the source (timers are driven by watermark)
37/// - A windowed aggregation context (non-windowed queries cannot use EOWC)
38#[derive(Debug, Clone)]
39pub struct WindowOperatorConfig {
40    /// The type of window (tumbling, sliding, session)
41    pub window_type: WindowType,
42    /// The time column name used for windowing
43    pub time_column: String,
44    /// Window size (for tumbling and sliding)
45    pub size: Duration,
46    /// Slide interval for sliding windows
47    pub slide: Option<Duration>,
48    /// Gap interval for session windows
49    pub gap: Option<Duration>,
50    /// Window offset in milliseconds for timezone-aligned windows
51    pub offset_ms: i64,
52    /// Maximum allowed lateness for late events
53    pub allowed_lateness: Duration,
54    /// Emit strategy (when to output results)
55    pub emit_strategy: EmitStrategy,
56    /// Side output name for late data (if configured)
57    pub late_data_side_output: Option<String>,
58}
59
60/// Format a Duration as a human-readable string (e.g., "60s", "5m", "1h").
61fn format_duration(d: Duration) -> String {
62    let secs = d.as_secs();
63    if secs == 0 {
64        return format!("{}ms", d.as_millis());
65    }
66    if secs.is_multiple_of(3600) {
67        format!("{}h", secs / 3600)
68    } else if secs.is_multiple_of(60) {
69        format!("{}m", secs / 60)
70    } else {
71        format!("{secs}s")
72    }
73}
74
75impl std::fmt::Display for WindowType {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            WindowType::Tumbling => write!(f, "TUMBLE"),
79            WindowType::Sliding => write!(f, "HOP"),
80            WindowType::Session => write!(f, "SESSION"),
81            WindowType::Cumulate => write!(f, "CUMULATE"),
82        }
83    }
84}
85
86impl std::fmt::Display for WindowOperatorConfig {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self.window_type {
89            WindowType::Tumbling => {
90                write!(
91                    f,
92                    "TUMBLE({}, {})",
93                    self.time_column,
94                    format_duration(self.size)
95                )
96            }
97            WindowType::Sliding => {
98                let slide = self.slide.unwrap_or(self.size);
99                write!(
100                    f,
101                    "HOP({}, {} SLIDE {})",
102                    self.time_column,
103                    format_duration(self.size),
104                    format_duration(slide)
105                )
106            }
107            WindowType::Session => {
108                let gap = self.gap.unwrap_or(Duration::ZERO);
109                write!(
110                    f,
111                    "SESSION({}, GAP {})",
112                    self.time_column,
113                    format_duration(gap)
114                )
115            }
116            WindowType::Cumulate => {
117                let step = self.slide.unwrap_or(self.size);
118                write!(
119                    f,
120                    "CUMULATE({}, STEP {} SIZE {})",
121                    self.time_column,
122                    format_duration(step),
123                    format_duration(self.size)
124                )
125            }
126        }
127    }
128}
129
130impl WindowOperatorConfig {
131    /// Fields the window operator prepends to every output batch.
132    /// Single source of truth for the windowed-stream output contract.
133    #[must_use]
134    pub fn output_prefix_fields() -> Vec<Field> {
135        vec![
136            Field::new("window_start", DataType::Int64, false),
137            Field::new("window_end", DataType::Int64, false),
138        ]
139    }
140
141    /// Create a new tumbling window configuration.
142    #[must_use]
143    pub fn tumbling(time_column: String, size: Duration) -> Self {
144        Self {
145            window_type: WindowType::Tumbling,
146            time_column,
147            size,
148            slide: None,
149            gap: None,
150            offset_ms: 0,
151            allowed_lateness: Duration::ZERO,
152            emit_strategy: EmitStrategy::OnWatermark,
153            late_data_side_output: None,
154        }
155    }
156
157    /// Create a new sliding window configuration.
158    #[must_use]
159    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
160        Self {
161            window_type: WindowType::Sliding,
162            time_column,
163            size,
164            slide: Some(slide),
165            gap: None,
166            offset_ms: 0,
167            allowed_lateness: Duration::ZERO,
168            emit_strategy: EmitStrategy::OnWatermark,
169            late_data_side_output: None,
170        }
171    }
172
173    /// Create a new session window configuration.
174    #[must_use]
175    pub fn session(time_column: String, gap: Duration) -> Self {
176        Self {
177            window_type: WindowType::Session,
178            time_column,
179            size: Duration::ZERO, // Not used for session windows
180            slide: None,
181            gap: Some(gap),
182            offset_ms: 0,
183            allowed_lateness: Duration::ZERO,
184            emit_strategy: EmitStrategy::OnWatermark,
185            late_data_side_output: None,
186        }
187    }
188
189    /// Create a new cumulate window configuration.
190    ///
191    /// `step` is the window growth increment and `max_size` is the epoch
192    /// size. The `slide` field is reused to store the step interval.
193    #[must_use]
194    pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
195        Self {
196            window_type: WindowType::Cumulate,
197            time_column,
198            size: max_size,
199            slide: Some(step),
200            gap: None,
201            offset_ms: 0,
202            allowed_lateness: Duration::ZERO,
203            emit_strategy: EmitStrategy::OnWatermark,
204            late_data_side_output: None,
205        }
206    }
207
208    /// Set window offset in milliseconds.
209    #[must_use]
210    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
211        self.offset_ms = offset_ms;
212        self
213    }
214
215    /// Build configuration from a parsed `WindowFunction`.
216    ///
217    /// # Errors
218    ///
219    /// Returns `ParseError::WindowError` if:
220    /// - Time column cannot be extracted
221    /// - Interval cannot be parsed
222    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
223        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
224            ParseError::WindowError("Cannot extract time column name".to_string())
225        })?;
226
227        match window {
228            WindowFunction::Tumble {
229                interval, offset, ..
230            } => {
231                let size = WindowRewriter::parse_interval_to_duration(interval)?;
232                let mut config = Self::tumbling(time_column, size);
233                if let Some(ref off) = offset {
234                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
235                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
236                }
237                Ok(config)
238            }
239            WindowFunction::Hop {
240                slide_interval,
241                window_interval,
242                offset,
243                ..
244            } => {
245                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
246                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
247                let mut config = Self::sliding(time_column, size, slide);
248                if let Some(ref off) = offset {
249                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
250                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
251                }
252                Ok(config)
253            }
254            WindowFunction::Session { gap_interval, .. } => {
255                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
256                Ok(Self::session(time_column, gap))
257            }
258            WindowFunction::Cumulate {
259                step_interval,
260                max_size_interval,
261                ..
262            } => {
263                let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
264                let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
265                Ok(Self::cumulate(time_column, step, max_size))
266            }
267        }
268    }
269
270    /// Apply EMIT clause configuration.
271    ///
272    /// # Errors
273    ///
274    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
275    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
276        self.emit_strategy = emit_clause.to_emit_strategy()?;
277        Ok(self)
278    }
279
280    /// Apply late data clause configuration.
281    ///
282    /// # Errors
283    ///
284    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
285    pub fn with_late_data_clause(
286        mut self,
287        late_data_clause: &LateDataClause,
288    ) -> Result<Self, ParseError> {
289        if late_data_clause.side_output.is_some() {
290            return Err(ParseError::WindowError(
291                "LATE DATA SIDE OUTPUT is not yet supported in pipeline mode; \
292                 use ALLOWED LATENESS without a side output, or omit the clause"
293                    .to_string(),
294            ));
295        }
296        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
297        self.late_data_side_output
298            .clone_from(&late_data_clause.side_output);
299        Ok(self)
300    }
301
302    /// Set allowed lateness duration.
303    #[must_use]
304    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
305        self.allowed_lateness = lateness;
306        self
307    }
308
309    /// Set emit strategy.
310    #[must_use]
311    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
312        self.emit_strategy = strategy;
313        self
314    }
315
316    /// Set late data side output.
317    #[must_use]
318    pub fn with_late_data_side_output(mut self, name: String) -> Self {
319        self.late_data_side_output = Some(name);
320        self
321    }
322
323    /// Validates that the window operator configuration is used in a valid context.
324    /// Specifically, `EMIT ON WINDOW CLOSE` and `EMIT FINAL` both require a
325    /// watermark on the source *and* a windowed aggregation in the query.
326    ///
327    /// # Errors
328    ///
329    /// Returns `ParseError::WindowError` if validation fails.
330    pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
331        if matches!(
332            self.emit_strategy,
333            EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
334        ) {
335            if !has_watermark {
336                return Err(ParseError::WindowError(
337                    "EMIT ON WINDOW CLOSE requires a watermark definition \
338                     on the source. Add WATERMARK FOR <column> AS <expr> \
339                     to the CREATE SOURCE statement."
340                        .to_string(),
341                ));
342            }
343            if !has_window {
344                return Err(ParseError::WindowError(
345                    "EMIT ON WINDOW CLOSE is only valid with windowed \
346                     aggregation queries. Use EMIT ON UPDATE for \
347                     non-windowed queries."
348                        .to_string(),
349                ));
350            }
351        }
352        Ok(())
353    }
354
355    /// Check if this configuration supports append-only output.
356    ///
357    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
358    /// that don't produce retractions.
359    #[must_use]
360    pub fn is_append_only_compatible(&self) -> bool {
361        matches!(
362            self.emit_strategy,
363            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
364        )
365    }
366
367    /// Check if late data handling is configured.
368    #[must_use]
369    pub fn has_late_data_handling(&self) -> bool {
370        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use sqlparser::ast::{Expr, Ident};
378
379    fn make_tumble_window() -> WindowFunction {
380        // Create a simple tumble window for testing
381        WindowFunction::Tumble {
382            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
383            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
384            offset: None,
385        }
386    }
387
388    #[test]
389    fn test_tumbling_config() {
390        let config =
391            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
392
393        assert_eq!(config.window_type, WindowType::Tumbling);
394        assert_eq!(config.time_column, "event_time");
395        assert_eq!(config.size, Duration::from_secs(300));
396        assert!(config.slide.is_none());
397        assert!(config.gap.is_none());
398    }
399
400    #[test]
401    fn test_sliding_config() {
402        let config = WindowOperatorConfig::sliding(
403            "ts".to_string(),
404            Duration::from_secs(300),
405            Duration::from_secs(60),
406        );
407
408        assert_eq!(config.window_type, WindowType::Sliding);
409        assert_eq!(config.size, Duration::from_secs(300));
410        assert_eq!(config.slide, Some(Duration::from_secs(60)));
411    }
412
413    #[test]
414    fn test_session_config() {
415        let config =
416            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
417
418        assert_eq!(config.window_type, WindowType::Session);
419        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
420    }
421
422    #[test]
423    fn test_from_window_function() {
424        let window = make_tumble_window();
425        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
426
427        assert_eq!(config.window_type, WindowType::Tumbling);
428        assert_eq!(config.time_column, "event_time");
429        assert_eq!(config.size, Duration::from_secs(300));
430    }
431
432    #[test]
433    fn test_with_emit_clause() {
434        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435
436        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
437        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
438
439        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
440        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
441        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
442    }
443
444    #[test]
445    fn test_with_late_data_clause_side_output_rejected() {
446        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
447
448        // Side output is not yet wired in pipeline mode — must be rejected
449        let late_clause = LateDataClause::side_output_only("late_events".to_string());
450        let result = config.with_late_data_clause(&late_clause);
451        assert!(result.is_err());
452        assert!(result
453            .unwrap_err()
454            .to_string()
455            .contains("not yet supported"));
456    }
457
458    #[test]
459    fn test_with_late_data_clause_lateness_only_accepted() {
460        use sqlparser::ast::Expr;
461
462        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
463
464        // Allowed lateness without side output is fine
465        let late_clause = LateDataClause::with_allowed_lateness(Expr::Value(
466            sqlparser::ast::Value::SingleQuotedString("5 SECONDS".to_string()).into(),
467        ));
468        let config = config.with_late_data_clause(&late_clause).unwrap();
469        assert_eq!(config.allowed_lateness, Duration::from_secs(5));
470        assert!(config.late_data_side_output.is_none());
471    }
472
473    #[test]
474    fn test_append_only_compatible() {
475        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
476
477        // Default emit strategy (OnWatermark) is append-only compatible
478        assert!(config.is_append_only_compatible());
479
480        // OnUpdate is NOT append-only compatible (produces retractions)
481        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
482        assert!(!config2.is_append_only_compatible());
483
484        // Changelog is NOT append-only compatible
485        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
486            .with_emit_strategy(EmitStrategy::Changelog);
487        assert!(!config3.is_append_only_compatible());
488    }
489
490    #[test]
491    fn test_has_late_data_handling() {
492        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
493
494        // No late data handling by default
495        assert!(!config.has_late_data_handling());
496
497        // With allowed lateness
498        let config2 = config
499            .clone()
500            .with_allowed_lateness(Duration::from_secs(60));
501        assert!(config2.has_late_data_handling());
502
503        // With side output
504        let config3 = config.with_late_data_side_output("late".to_string());
505        assert!(config3.has_late_data_handling());
506    }
507
508    #[test]
509    fn test_eowc_without_watermark_errors() {
510        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
511            .with_emit_strategy(EmitStrategy::OnWindowClose);
512
513        let result = config.validate(false, true);
514        assert!(result.is_err());
515        let err = result.unwrap_err().to_string();
516        assert!(
517            err.contains("watermark"),
518            "Expected watermark error, got: {err}"
519        );
520    }
521
522    #[test]
523    fn test_eowc_without_window_errors() {
524        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
525            .with_emit_strategy(EmitStrategy::OnWindowClose);
526
527        let result = config.validate(true, false);
528        assert!(result.is_err());
529        let err = result.unwrap_err().to_string();
530        assert!(
531            err.contains("windowed"),
532            "Expected windowed query error, got: {err}"
533        );
534    }
535
536    #[test]
537    fn test_eowc_with_watermark_and_window_passes() {
538        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
539            .with_emit_strategy(EmitStrategy::OnWindowClose);
540
541        assert!(config.validate(true, true).is_ok());
542    }
543
544    #[test]
545    fn test_final_without_watermark_errors() {
546        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
547            .with_emit_strategy(EmitStrategy::FinalOnly);
548
549        let result = config.validate(false, true);
550        assert!(result.is_err());
551    }
552
553    #[test]
554    fn test_non_eowc_without_watermark_ok() {
555        // OnUpdate does not require watermark
556        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
557            .with_emit_strategy(EmitStrategy::OnUpdate);
558        assert!(config.validate(false, false).is_ok());
559
560        // Periodic does not require watermark
561        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
562            .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
563        assert!(config2.validate(false, false).is_ok());
564
565        // Changelog does not require watermark
566        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
567            .with_emit_strategy(EmitStrategy::Changelog);
568        assert!(config3.validate(false, false).is_ok());
569    }
570
571    #[test]
572    fn test_display_tumbling_window() {
573        let config =
574            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
575        assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
576    }
577
578    #[test]
579    fn test_display_sliding_window() {
580        let config = WindowOperatorConfig::sliding(
581            "ts".to_string(),
582            Duration::from_secs(300),
583            Duration::from_secs(60),
584        );
585        assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
586    }
587
588    #[test]
589    fn test_display_session_window() {
590        let config =
591            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
592        assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
593    }
594
595    #[test]
596    fn test_cumulate_config() {
597        let config = WindowOperatorConfig::cumulate(
598            "ts".to_string(),
599            Duration::from_secs(60),
600            Duration::from_secs(300),
601        );
602
603        assert_eq!(config.window_type, WindowType::Cumulate);
604        assert_eq!(config.time_column, "ts");
605        assert_eq!(config.size, Duration::from_secs(300));
606        assert_eq!(config.slide, Some(Duration::from_secs(60)));
607        assert!(config.gap.is_none());
608    }
609
610    #[test]
611    fn test_cumulate_from_window_function() {
612        let window = WindowFunction::Cumulate {
613            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
614            step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
615            max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
616        };
617        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
618
619        assert_eq!(config.window_type, WindowType::Cumulate);
620        assert_eq!(config.time_column, "event_time");
621        assert_eq!(config.size, Duration::from_secs(300));
622        assert_eq!(config.slide, Some(Duration::from_secs(60)));
623    }
624
625    #[test]
626    fn test_display_cumulate_window() {
627        let config = WindowOperatorConfig::cumulate(
628            "ts".to_string(),
629            Duration::from_secs(60),
630            Duration::from_secs(300),
631        );
632        assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
633    }
634
635    #[test]
636    fn test_display_window_type() {
637        assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
638        assert_eq!(format!("{}", WindowType::Sliding), "HOP");
639        assert_eq!(format!("{}", WindowType::Session), "SESSION");
640        assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
641    }
642
643    #[test]
644    fn test_display_duration_formatting() {
645        // Hours
646        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
647        assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
648
649        // Seconds (non-round minutes)
650        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
651        assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
652
653        // Milliseconds (sub-second)
654        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
655        assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
656    }
657}