Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6#[allow(clippy::disallowed_types)] // cold path: SQL parsing
7use std::collections::HashMap;
8use std::time::Duration;
9
10use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
11
12use super::window_rewriter::WindowRewriter;
13use super::ParseError;
14
15/// SHOW command variants for listing streaming objects.
16#[derive(Debug, Clone, PartialEq)]
17pub enum ShowCommand {
18    /// SHOW SOURCES - list all registered sources
19    Sources,
20    /// SHOW SINKS - list all registered sinks
21    Sinks,
22    /// SHOW QUERIES - list all running continuous queries
23    Queries,
24    /// SHOW MATERIALIZED VIEWS - list all materialized views
25    MaterializedViews,
26    /// SHOW STREAMS - list all named streams
27    Streams,
28    /// SHOW TABLES - list all reference/dimension tables
29    Tables,
30    /// SHOW CHECKPOINT STATUS - display checkpoint state
31    CheckpointStatus,
32    /// `SHOW CREATE SOURCE` — reconstruct source DDL
33    CreateSource {
34        /// Source name
35        name: ObjectName,
36    },
37    /// `SHOW CREATE SINK` — reconstruct sink DDL
38    CreateSink {
39        /// Sink name
40        name: ObjectName,
41    },
42}
43
44/// Streaming-specific SQL statements
45#[derive(Debug, Clone, PartialEq)]
46pub enum StreamingStatement {
47    /// Standard SQL statement
48    Standard(Box<sqlparser::ast::Statement>),
49
50    /// CREATE SOURCE statement
51    CreateSource(Box<CreateSourceStatement>),
52
53    /// CREATE SINK statement
54    CreateSink(Box<CreateSinkStatement>),
55
56    /// CREATE CONTINUOUS QUERY
57    CreateContinuousQuery {
58        /// Query name
59        name: ObjectName,
60        /// SQL query with streaming extensions
61        query: Box<StreamingStatement>,
62        /// EMIT clause if present
63        emit_clause: Option<EmitClause>,
64    },
65
66    /// DROP SOURCE statement
67    DropSource {
68        /// Source name to drop
69        name: ObjectName,
70        /// Whether IF EXISTS was specified
71        if_exists: bool,
72        /// Whether CASCADE was specified (drops dependent streams/MVs)
73        cascade: bool,
74    },
75
76    /// DROP SINK statement
77    DropSink {
78        /// Sink name to drop
79        name: ObjectName,
80        /// Whether IF EXISTS was specified
81        if_exists: bool,
82        /// Whether CASCADE was specified
83        cascade: bool,
84    },
85
86    /// DROP MATERIALIZED VIEW statement
87    DropMaterializedView {
88        /// View name to drop
89        name: ObjectName,
90        /// Whether IF EXISTS was specified
91        if_exists: bool,
92        /// Whether CASCADE was specified
93        cascade: bool,
94    },
95
96    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
97    Show(ShowCommand),
98
99    /// DESCRIBE source, sink, or other streaming object
100    Describe {
101        /// Object name to describe
102        name: ObjectName,
103        /// Whether EXTENDED was specified for additional detail
104        extended: bool,
105    },
106
107    /// `EXPLAIN [ANALYZE]` a streaming query plan
108    Explain {
109        /// The statement to explain
110        statement: Box<StreamingStatement>,
111        /// Whether ANALYZE was specified (execute and collect metrics)
112        analyze: bool,
113    },
114
115    /// CREATE MATERIALIZED VIEW
116    CreateMaterializedView {
117        /// View name
118        name: ObjectName,
119        /// The backing query
120        query: Box<StreamingStatement>,
121        /// Optional EMIT clause
122        emit_clause: Option<EmitClause>,
123        /// Whether OR REPLACE was specified
124        or_replace: bool,
125        /// Whether IF NOT EXISTS was specified
126        if_not_exists: bool,
127        /// Raw query text between `AS` and `EMIT`.
128        query_sql: String,
129    },
130
131    /// CREATE STREAM — named streaming pipeline
132    CreateStream {
133        /// Stream name
134        name: ObjectName,
135        /// Backing query (AS SELECT ...)
136        query: Box<StreamingStatement>,
137        /// Optional EMIT clause
138        emit_clause: Option<EmitClause>,
139        /// Whether OR REPLACE was specified
140        or_replace: bool,
141        /// Whether IF NOT EXISTS was specified
142        if_not_exists: bool,
143        /// Raw query text between `AS` and `EMIT`.
144        query_sql: String,
145    },
146
147    /// DROP STREAM statement
148    DropStream {
149        /// Stream name to drop
150        name: ObjectName,
151        /// Whether IF EXISTS was specified
152        if_exists: bool,
153        /// Whether CASCADE was specified
154        cascade: bool,
155    },
156
157    /// ALTER SOURCE — modify a source definition
158    AlterSource {
159        /// Source name to alter
160        name: ObjectName,
161        /// The alteration to apply
162        operation: AlterSourceOperation,
163    },
164
165    /// INSERT INTO a streaming source or table
166    InsertInto {
167        /// Target table or source name
168        table_name: ObjectName,
169        /// Column names (empty if not specified)
170        columns: Vec<Ident>,
171        /// Row values
172        values: Vec<Vec<Expr>>,
173    },
174
175    /// CREATE LOOKUP TABLE statement
176    CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
177
178    /// DROP LOOKUP TABLE statement
179    DropLookupTable {
180        /// Lookup table name to drop
181        name: ObjectName,
182        /// Whether IF EXISTS was specified
183        if_exists: bool,
184    },
185
186    /// CHECKPOINT — trigger an immediate checkpoint
187    Checkpoint,
188
189    /// `RESTORE FROM CHECKPOINT <id>`
190    RestoreCheckpoint {
191        /// The checkpoint ID to restore from
192        checkpoint_id: u64,
193    },
194}
195
196/// Operations for ALTER SOURCE statements.
197#[derive(Debug, Clone, PartialEq)]
198pub enum AlterSourceOperation {
199    /// Add a new column: `ALTER SOURCE name ADD COLUMN col_name data_type`
200    AddColumn {
201        /// Column definition
202        column_def: ColumnDef,
203    },
204    /// Set source properties: `ALTER SOURCE name SET ('key' = 'value', ...)`
205    SetProperties {
206        /// Key-value pairs
207        properties: HashMap<String, String>,
208    },
209}
210
211/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
212#[derive(Debug, Clone, PartialEq)]
213pub struct FormatSpec {
214    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
215    pub format_type: String,
216    /// Additional format options (from WITH clause after FORMAT).
217    pub options: HashMap<String, String>,
218}
219
220/// CREATE SOURCE statement
221#[derive(Debug, Clone, PartialEq)]
222pub struct CreateSourceStatement {
223    /// Source name
224    pub name: ObjectName,
225    /// Column definitions
226    pub columns: Vec<ColumnDef>,
227    /// Watermark definition
228    pub watermark: Option<WatermarkDef>,
229    /// Source connector options (from WITH clause)
230    pub with_options: HashMap<String, String>,
231    /// Whether to replace existing source
232    pub or_replace: bool,
233    /// Whether to skip if exists
234    pub if_not_exists: bool,
235    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
236    pub connector_type: Option<String>,
237    /// Connector-specific options (from `FROM KAFKA (...)`)
238    pub connector_options: HashMap<String, String>,
239    /// Format specification (e.g., `FORMAT JSON`)
240    pub format: Option<FormatSpec>,
241    /// Whether the column list includes a `*` wildcard for schema inference.
242    pub has_wildcard: bool,
243    /// Optional prefix for wildcard-expanded columns (from `PREFIX 'str'`).
244    pub wildcard_prefix: Option<String>,
245}
246
247/// CREATE SINK statement
248#[derive(Debug, Clone, PartialEq)]
249pub struct CreateSinkStatement {
250    /// Sink name
251    pub name: ObjectName,
252    /// Input query or table
253    pub from: SinkFrom,
254    /// Sink connector options (from WITH clause)
255    pub with_options: HashMap<String, String>,
256    /// Whether to replace existing sink
257    pub or_replace: bool,
258    /// Whether to skip if exists
259    pub if_not_exists: bool,
260    /// Optional WHERE filter expression
261    pub filter: Option<Expr>,
262    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
263    pub connector_type: Option<String>,
264    /// Connector-specific options (from `INTO KAFKA (...)`)
265    pub connector_options: HashMap<String, String>,
266    /// Format specification (e.g., `FORMAT JSON`)
267    pub format: Option<FormatSpec>,
268    /// Output options (from `WITH (key = ...)` after FORMAT)
269    pub output_options: HashMap<String, String>,
270}
271
272/// Source for a sink
273#[derive(Debug, Clone, PartialEq)]
274pub enum SinkFrom {
275    /// From a table or source
276    Table(ObjectName),
277    /// From a SELECT query
278    Query(Box<StreamingStatement>),
279}
280
281/// Watermark definition
282#[derive(Debug, Clone, PartialEq)]
283pub struct WatermarkDef {
284    /// Column to use for watermark
285    pub column: Ident,
286    /// Watermark expression (e.g., column - INTERVAL '5' SECOND).
287    /// `None` when `WATERMARK FOR col` is used without `AS expr`,
288    /// meaning watermark advances via `source.watermark()` with zero delay.
289    pub expression: Option<Expr>,
290}
291
292/// Late data handling clause.
293///
294/// Controls what happens to events that arrive after their window has closed.
295/// This is the SQL AST representation of late data configuration.
296/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
297#[derive(Debug, Clone, PartialEq, Default)]
298pub struct LateDataClause {
299    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
300    pub allowed_lateness: Option<Box<Expr>>,
301    /// Side output name for late events (e.g., `late_events`)
302    pub side_output: Option<String>,
303}
304
305impl LateDataClause {
306    /// Creates a clause with allowed lateness only.
307    #[must_use]
308    pub fn with_allowed_lateness(lateness: Expr) -> Self {
309        Self {
310            allowed_lateness: Some(Box::new(lateness)),
311            side_output: None,
312        }
313    }
314
315    /// Creates a clause with both allowed lateness and side output.
316    #[must_use]
317    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
318        Self {
319            allowed_lateness: Some(Box::new(lateness)),
320            side_output: Some(side_output),
321        }
322    }
323
324    /// Creates a clause with side output only (uses default lateness).
325    #[must_use]
326    pub fn side_output_only(side_output: String) -> Self {
327        Self {
328            allowed_lateness: None,
329            side_output: Some(side_output),
330        }
331    }
332
333    /// Convert to allowed lateness Duration.
334    ///
335    /// # Errors
336    ///
337    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
338    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
339        match &self.allowed_lateness {
340            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
341            None => Ok(Duration::ZERO),
342        }
343    }
344
345    /// Check if this clause has a side output configured.
346    #[must_use]
347    pub fn has_side_output(&self) -> bool {
348        self.side_output.is_some()
349    }
350
351    /// Get the side output name, if configured.
352    #[must_use]
353    pub fn get_side_output(&self) -> Option<&str> {
354        self.side_output.as_deref()
355    }
356}
357
358/// Emit strategy for runtime operator configuration.
359///
360/// This is the runtime representation that operators use.
361#[derive(Debug, Clone, PartialEq)]
362pub enum EmitStrategy {
363    /// Emit when watermark passes window end
364    OnWatermark,
365    /// Emit only when window closes (no intermediate results)
366    OnWindowClose,
367    /// Emit at fixed intervals
368    Periodic(Duration),
369    /// Emit on every state change
370    OnUpdate,
371    /// Emit changelog records with Z-set weights
372    Changelog,
373    /// Emit only final results, suppress all intermediate
374    FinalOnly,
375}
376
377/// EMIT clause for controlling output timing.
378///
379/// This is the SQL AST representation of emit strategies.
380/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
381#[derive(Debug, Clone, PartialEq)]
382pub enum EmitClause {
383    // === Existing ===
384    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
385    ///
386    /// Emit results when the watermark passes the window end.
387    /// This is the most efficient strategy.
388    AfterWatermark,
389
390    /// EMIT ON WINDOW CLOSE
391    ///
392    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
393    /// Only emits when window closes, no intermediate results.
394    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
395    OnWindowClose,
396
397    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
398    ///
399    /// Emit intermediate results at fixed intervals.
400    /// Final results are still emitted on watermark.
401    Periodically {
402        /// The interval expression (e.g., INTERVAL '5' SECOND)
403        interval: Box<Expr>,
404    },
405
406    /// EMIT ON UPDATE
407    ///
408    /// Emit updated results after every state change.
409    /// This provides lowest latency but highest overhead.
410    OnUpdate,
411
412    // === New ===
413    /// EMIT CHANGES
414    ///
415    /// Emit changelog records with Z-set weights for CDC pipelines.
416    /// Every emission includes operation type and weight:
417    /// - Insert (+1 weight)
418    /// - Delete (-1 weight)
419    /// - Update (retraction pair: -1 old, +1 new)
420    ///
421    /// Required for:
422    /// - CDC pipelines
423    /// - Cascading materialized views
424    /// - Downstream consumers that need to track changes
425    Changes,
426
427    /// EMIT FINAL
428    ///
429    /// Suppress ALL intermediate results, emit only finalized.
430    /// Also drops late data entirely after window close.
431    /// Use for BI reporting where only final, exact results matter.
432    Final,
433}
434
435impl std::fmt::Display for EmitClause {
436    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437        match self {
438            EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
439            EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
440            EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
441            EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
442            EmitClause::Changes => write!(f, "EMIT CHANGES"),
443            EmitClause::Final => write!(f, "EMIT FINAL"),
444        }
445    }
446}
447
448impl EmitClause {
449    /// Convert to runtime EmitStrategy.
450    ///
451    /// # Errors
452    ///
453    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
454    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
455        match self {
456            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
457            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
458            EmitClause::Periodically { interval } => {
459                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
460                Ok(EmitStrategy::Periodic(duration))
461            }
462            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
463            EmitClause::Changes => Ok(EmitStrategy::Changelog),
464            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
465        }
466    }
467
468    /// Check if this emit strategy requires changelog/retraction support.
469    #[must_use]
470    pub fn requires_changelog(&self) -> bool {
471        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
472    }
473
474    /// Check if this emit strategy is append-only (no retractions).
475    #[must_use]
476    pub fn is_append_only(&self) -> bool {
477        matches!(
478            self,
479            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
480        )
481    }
482
483    /// Returns true if this emit strategy requires a watermark on the source.
484    ///
485    /// `OnWindowClose`, `Final`, and `AfterWatermark` all depend on watermark
486    /// advancement to trigger window closure. Without a watermark, timers will
487    /// never fire and windows will never close.
488    #[must_use]
489    pub fn requires_watermark(&self) -> bool {
490        matches!(
491            self,
492            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
493        )
494    }
495}
496
497/// Window function types
498#[derive(Debug, Clone, PartialEq)]
499pub enum WindowFunction {
500    /// TUMBLE(column, interval [, offset])
501    Tumble {
502        /// The time column to window on
503        time_column: Box<Expr>,
504        /// The window interval
505        interval: Box<Expr>,
506        /// Optional offset for timezone-aligned windows
507        offset: Option<Box<Expr>>,
508    },
509    /// HOP(column, slide, size [, offset])
510    Hop {
511        /// The time column to window on
512        time_column: Box<Expr>,
513        /// The slide interval (how often to create a new window)
514        slide_interval: Box<Expr>,
515        /// The window size interval
516        window_interval: Box<Expr>,
517        /// Optional offset for timezone-aligned windows
518        offset: Option<Box<Expr>>,
519    },
520    /// SESSION(column, gap)
521    Session {
522        /// The time column to window on
523        time_column: Box<Expr>,
524        /// The gap interval (max gap between events in same session)
525        gap_interval: Box<Expr>,
526    },
527    /// CUMULATE(column, step, size)
528    Cumulate {
529        /// The time column to window on
530        time_column: Box<Expr>,
531        /// The step interval (window growth increment)
532        step_interval: Box<Expr>,
533        /// The max window size interval (epoch size)
534        max_size_interval: Box<Expr>,
535    },
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
542
543    #[test]
544    fn test_create_source_statement() {
545        let stmt = CreateSourceStatement {
546            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
547            columns: vec![
548                ColumnDef {
549                    name: Ident::new("id"),
550                    data_type: DataType::BigInt(None),
551                    options: vec![],
552                },
553                ColumnDef {
554                    name: Ident::new("timestamp"),
555                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
556                    options: vec![],
557                },
558            ],
559            watermark: Some(WatermarkDef {
560                column: Ident::new("timestamp"),
561                expression: Some(Expr::Identifier(Ident::new("timestamp"))),
562            }),
563            with_options: HashMap::from([
564                ("connector".to_string(), "kafka".to_string()),
565                ("topic".to_string(), "events".to_string()),
566            ]),
567            or_replace: false,
568            if_not_exists: true,
569            connector_type: None,
570            connector_options: HashMap::new(),
571            format: None,
572            has_wildcard: false,
573            wildcard_prefix: None,
574        };
575
576        // Check the statement fields
577        assert_eq!(stmt.columns.len(), 2);
578        assert!(stmt.watermark.is_some());
579        assert_eq!(
580            stmt.with_options.get("connector"),
581            Some(&"kafka".to_string())
582        );
583    }
584
585    #[test]
586    fn test_emit_clause_variants() {
587        let emit1 = EmitClause::AfterWatermark;
588        let emit2 = EmitClause::OnWindowClose;
589        let emit3 = EmitClause::Periodically {
590            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
591        };
592        let emit4 = EmitClause::OnUpdate;
593
594        match emit1 {
595            EmitClause::AfterWatermark => (),
596            _ => panic!("Expected AfterWatermark"),
597        }
598
599        match emit2 {
600            EmitClause::OnWindowClose => (),
601            _ => panic!("Expected OnWindowClose"),
602        }
603
604        match emit3 {
605            EmitClause::Periodically { .. } => (),
606            _ => panic!("Expected Periodically"),
607        }
608
609        match emit4 {
610            EmitClause::OnUpdate => (),
611            _ => panic!("Expected OnUpdate"),
612        }
613    }
614
615    #[test]
616    fn test_window_functions() {
617        let tumble = WindowFunction::Tumble {
618            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
619            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
620            offset: None,
621        };
622
623        let hop = WindowFunction::Hop {
624            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
625            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
626            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
627            offset: None,
628        };
629
630        match tumble {
631            WindowFunction::Tumble { .. } => (),
632            _ => panic!("Expected Tumble"),
633        }
634
635        match hop {
636            WindowFunction::Hop { .. } => (),
637            _ => panic!("Expected Hop"),
638        }
639    }
640
641    #[test]
642    fn test_late_data_clause_default() {
643        let clause = LateDataClause::default();
644        assert!(clause.allowed_lateness.is_none());
645        assert!(clause.side_output.is_none());
646    }
647
648    #[test]
649    fn test_late_data_clause_with_allowed_lateness() {
650        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
651        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
652        assert!(clause.allowed_lateness.is_some());
653        assert!(clause.side_output.is_none());
654    }
655
656    #[test]
657    fn test_late_data_clause_with_side_output() {
658        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
659        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
660        assert!(clause.allowed_lateness.is_some());
661        assert_eq!(clause.side_output, Some("late_events".to_string()));
662    }
663
664    #[test]
665    fn test_late_data_clause_side_output_only() {
666        let clause = LateDataClause::side_output_only("late_events".to_string());
667        assert!(clause.allowed_lateness.is_none());
668        assert_eq!(clause.side_output, Some("late_events".to_string()));
669    }
670
671    #[test]
672    fn test_show_command_variants() {
673        let sources = ShowCommand::Sources;
674        let sinks = ShowCommand::Sinks;
675        let queries = ShowCommand::Queries;
676        let mvs = ShowCommand::MaterializedViews;
677
678        assert_eq!(sources, ShowCommand::Sources);
679        assert_eq!(sinks, ShowCommand::Sinks);
680        assert_eq!(queries, ShowCommand::Queries);
681        assert_eq!(mvs, ShowCommand::MaterializedViews);
682    }
683
684    #[test]
685    fn test_show_command_clone() {
686        let cmd = ShowCommand::Sources;
687        let cloned = cmd.clone();
688        assert_eq!(cmd, cloned);
689    }
690
691    #[test]
692    fn test_drop_source_statement() {
693        let stmt = StreamingStatement::DropSource {
694            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
695            if_exists: true,
696            cascade: false,
697        };
698        match stmt {
699            StreamingStatement::DropSource {
700                name,
701                if_exists,
702                cascade,
703            } => {
704                assert_eq!(name.to_string(), "events");
705                assert!(if_exists);
706                assert!(!cascade);
707            }
708            _ => panic!("Expected DropSource"),
709        }
710    }
711
712    #[test]
713    fn test_drop_sink_statement() {
714        let stmt = StreamingStatement::DropSink {
715            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
716            if_exists: false,
717            cascade: false,
718        };
719        match stmt {
720            StreamingStatement::DropSink {
721                name,
722                if_exists,
723                cascade,
724            } => {
725                assert_eq!(name.to_string(), "output");
726                assert!(!if_exists);
727                assert!(!cascade);
728            }
729            _ => panic!("Expected DropSink"),
730        }
731    }
732
733    #[test]
734    fn test_drop_materialized_view_statement() {
735        let stmt = StreamingStatement::DropMaterializedView {
736            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
737            if_exists: true,
738            cascade: true,
739        };
740        match stmt {
741            StreamingStatement::DropMaterializedView {
742                name,
743                if_exists,
744                cascade,
745            } => {
746                assert_eq!(name.to_string(), "live_stats");
747                assert!(if_exists);
748                assert!(cascade);
749            }
750            _ => panic!("Expected DropMaterializedView"),
751        }
752    }
753
754    #[test]
755    fn test_show_statement() {
756        let stmt = StreamingStatement::Show(ShowCommand::Sources);
757        match stmt {
758            StreamingStatement::Show(ShowCommand::Sources) => (),
759            _ => panic!("Expected Show(Sources)"),
760        }
761    }
762
763    #[test]
764    fn test_describe_statement() {
765        let stmt = StreamingStatement::Describe {
766            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
767            extended: true,
768        };
769        match stmt {
770            StreamingStatement::Describe { name, extended } => {
771                assert_eq!(name.to_string(), "events");
772                assert!(extended);
773            }
774            _ => panic!("Expected Describe"),
775        }
776    }
777
778    #[test]
779    fn test_explain_statement() {
780        // Build an inner Standard statement using sqlparser
781        let dialect = sqlparser::dialect::GenericDialect {};
782        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
783        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
784
785        let stmt = StreamingStatement::Explain {
786            statement: Box::new(inner),
787            analyze: false,
788        };
789        match stmt {
790            StreamingStatement::Explain { statement, .. } => {
791                assert!(matches!(*statement, StreamingStatement::Standard(_)));
792            }
793            _ => panic!("Expected Explain"),
794        }
795    }
796
797    #[test]
798    fn test_create_materialized_view_statement() {
799        // Build a query statement using sqlparser
800        let dialect = sqlparser::dialect::GenericDialect {};
801        let stmts =
802            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
803        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
804
805        let stmt = StreamingStatement::CreateMaterializedView {
806            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
807            query: Box::new(query),
808            emit_clause: Some(EmitClause::OnWindowClose),
809            or_replace: false,
810            if_not_exists: true,
811            query_sql: "SELECT COUNT(*) FROM events".to_string(),
812        };
813        match stmt {
814            StreamingStatement::CreateMaterializedView {
815                name,
816                emit_clause,
817                or_replace,
818                if_not_exists,
819                ..
820            } => {
821                assert_eq!(name.to_string(), "live_stats");
822                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
823                assert!(!or_replace);
824                assert!(if_not_exists);
825            }
826            _ => panic!("Expected CreateMaterializedView"),
827        }
828    }
829
830    #[test]
831    fn test_insert_into_statement() {
832        let stmt = StreamingStatement::InsertInto {
833            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
834            columns: vec![Ident::new("id"), Ident::new("name")],
835            values: vec![vec![
836                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
837                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
838            ]],
839        };
840        match stmt {
841            StreamingStatement::InsertInto {
842                table_name,
843                columns,
844                values,
845            } => {
846                assert_eq!(table_name.to_string(), "events");
847                assert_eq!(columns.len(), 2);
848                assert_eq!(values.len(), 1);
849                assert_eq!(values[0].len(), 2);
850            }
851            _ => panic!("Expected InsertInto"),
852        }
853    }
854
855    #[test]
856    fn test_eowc_requires_watermark_helper() {
857        // Watermark-dependent strategies
858        assert!(EmitClause::OnWindowClose.requires_watermark());
859        assert!(EmitClause::Final.requires_watermark());
860        assert!(EmitClause::AfterWatermark.requires_watermark());
861
862        // Non-watermark strategies
863        assert!(!EmitClause::OnUpdate.requires_watermark());
864        assert!(!EmitClause::Changes.requires_watermark());
865        let periodic = EmitClause::Periodically {
866            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
867        };
868        assert!(!periodic.requires_watermark());
869    }
870}