Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6#[allow(clippy::disallowed_types)] // cold path: SQL parsing
7use std::collections::HashMap;
8use std::time::Duration;
9
10use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
11
12use super::window_rewriter::WindowRewriter;
13use super::ParseError;
14
15/// SHOW command variants for listing streaming objects.
16#[derive(Debug, Clone, PartialEq)]
17pub enum ShowCommand {
18    /// SHOW SOURCES - list all registered sources
19    Sources,
20    /// SHOW SINKS - list all registered sinks
21    Sinks,
22    /// SHOW QUERIES - list all running continuous queries
23    Queries,
24    /// SHOW MATERIALIZED VIEWS - list all materialized views
25    MaterializedViews,
26    /// SHOW STREAMS - list all named streams
27    Streams,
28    /// SHOW TABLES - list all reference/dimension tables
29    Tables,
30    /// SHOW CHECKPOINT STATUS - display checkpoint state
31    CheckpointStatus,
32    /// `SHOW CREATE SOURCE` — reconstruct source DDL
33    CreateSource {
34        /// Source name
35        name: ObjectName,
36    },
37    /// `SHOW CREATE SINK` — reconstruct sink DDL
38    CreateSink {
39        /// Sink name
40        name: ObjectName,
41    },
42}
43
44/// Streaming-specific SQL statements
45#[derive(Debug, Clone, PartialEq)]
46pub enum StreamingStatement {
47    /// Standard SQL statement
48    Standard(Box<sqlparser::ast::Statement>),
49
50    /// CREATE SOURCE statement
51    CreateSource(Box<CreateSourceStatement>),
52
53    /// CREATE SINK statement
54    CreateSink(Box<CreateSinkStatement>),
55
56    /// CREATE CONTINUOUS QUERY
57    CreateContinuousQuery {
58        /// Query name
59        name: ObjectName,
60        /// SQL query with streaming extensions
61        query: Box<StreamingStatement>,
62        /// EMIT clause if present
63        emit_clause: Option<EmitClause>,
64    },
65
66    /// DROP SOURCE statement
67    DropSource {
68        /// Source name to drop
69        name: ObjectName,
70        /// Whether IF EXISTS was specified
71        if_exists: bool,
72        /// Whether CASCADE was specified (drops dependent streams/MVs)
73        cascade: bool,
74    },
75
76    /// DROP SINK statement
77    DropSink {
78        /// Sink name to drop
79        name: ObjectName,
80        /// Whether IF EXISTS was specified
81        if_exists: bool,
82        /// Whether CASCADE was specified
83        cascade: bool,
84    },
85
86    /// DROP MATERIALIZED VIEW statement
87    DropMaterializedView {
88        /// View name to drop
89        name: ObjectName,
90        /// Whether IF EXISTS was specified
91        if_exists: bool,
92        /// Whether CASCADE was specified
93        cascade: bool,
94    },
95
96    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
97    Show(ShowCommand),
98
99    /// DESCRIBE source, sink, or other streaming object
100    Describe {
101        /// Object name to describe
102        name: ObjectName,
103        /// Whether EXTENDED was specified for additional detail
104        extended: bool,
105    },
106
107    /// `EXPLAIN [ANALYZE]` a streaming query plan
108    Explain {
109        /// The statement to explain
110        statement: Box<StreamingStatement>,
111        /// Whether ANALYZE was specified (execute and collect metrics)
112        analyze: bool,
113    },
114
115    /// CREATE MATERIALIZED VIEW
116    CreateMaterializedView {
117        /// View name
118        name: ObjectName,
119        /// The backing query
120        query: Box<StreamingStatement>,
121        /// Optional EMIT clause
122        emit_clause: Option<EmitClause>,
123        /// Whether OR REPLACE was specified
124        or_replace: bool,
125        /// Whether IF NOT EXISTS was specified
126        if_not_exists: bool,
127    },
128
129    /// CREATE STREAM — named streaming pipeline
130    CreateStream {
131        /// Stream name
132        name: ObjectName,
133        /// Backing query (AS SELECT ...)
134        query: Box<StreamingStatement>,
135        /// Optional EMIT clause
136        emit_clause: Option<EmitClause>,
137        /// Whether OR REPLACE was specified
138        or_replace: bool,
139        /// Whether IF NOT EXISTS was specified
140        if_not_exists: bool,
141    },
142
143    /// DROP STREAM statement
144    DropStream {
145        /// Stream name to drop
146        name: ObjectName,
147        /// Whether IF EXISTS was specified
148        if_exists: bool,
149        /// Whether CASCADE was specified
150        cascade: bool,
151    },
152
153    /// ALTER SOURCE — modify a source definition
154    AlterSource {
155        /// Source name to alter
156        name: ObjectName,
157        /// The alteration to apply
158        operation: AlterSourceOperation,
159    },
160
161    /// INSERT INTO a streaming source or table
162    InsertInto {
163        /// Target table or source name
164        table_name: ObjectName,
165        /// Column names (empty if not specified)
166        columns: Vec<Ident>,
167        /// Row values
168        values: Vec<Vec<Expr>>,
169    },
170
171    /// CREATE LOOKUP TABLE statement
172    CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
173
174    /// DROP LOOKUP TABLE statement
175    DropLookupTable {
176        /// Lookup table name to drop
177        name: ObjectName,
178        /// Whether IF EXISTS was specified
179        if_exists: bool,
180    },
181
182    /// CHECKPOINT — trigger an immediate checkpoint
183    Checkpoint,
184
185    /// `RESTORE FROM CHECKPOINT <id>`
186    RestoreCheckpoint {
187        /// The checkpoint ID to restore from
188        checkpoint_id: u64,
189    },
190}
191
192/// Operations for ALTER SOURCE statements.
193#[derive(Debug, Clone, PartialEq)]
194pub enum AlterSourceOperation {
195    /// Add a new column: `ALTER SOURCE name ADD COLUMN col_name data_type`
196    AddColumn {
197        /// Column definition
198        column_def: ColumnDef,
199    },
200    /// Set source properties: `ALTER SOURCE name SET ('key' = 'value', ...)`
201    SetProperties {
202        /// Key-value pairs
203        properties: HashMap<String, String>,
204    },
205}
206
207/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
208#[derive(Debug, Clone, PartialEq)]
209pub struct FormatSpec {
210    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
211    pub format_type: String,
212    /// Additional format options (from WITH clause after FORMAT).
213    pub options: HashMap<String, String>,
214}
215
216/// CREATE SOURCE statement
217#[derive(Debug, Clone, PartialEq)]
218pub struct CreateSourceStatement {
219    /// Source name
220    pub name: ObjectName,
221    /// Column definitions
222    pub columns: Vec<ColumnDef>,
223    /// Watermark definition
224    pub watermark: Option<WatermarkDef>,
225    /// Source connector options (from WITH clause)
226    pub with_options: HashMap<String, String>,
227    /// Whether to replace existing source
228    pub or_replace: bool,
229    /// Whether to skip if exists
230    pub if_not_exists: bool,
231    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
232    pub connector_type: Option<String>,
233    /// Connector-specific options (from `FROM KAFKA (...)`)
234    pub connector_options: HashMap<String, String>,
235    /// Format specification (e.g., `FORMAT JSON`)
236    pub format: Option<FormatSpec>,
237    /// Whether the column list includes a `*` wildcard for schema inference.
238    pub has_wildcard: bool,
239    /// Optional prefix for wildcard-expanded columns (from `PREFIX 'str'`).
240    pub wildcard_prefix: Option<String>,
241}
242
243/// CREATE SINK statement
244#[derive(Debug, Clone, PartialEq)]
245pub struct CreateSinkStatement {
246    /// Sink name
247    pub name: ObjectName,
248    /// Input query or table
249    pub from: SinkFrom,
250    /// Sink connector options (from WITH clause)
251    pub with_options: HashMap<String, String>,
252    /// Whether to replace existing sink
253    pub or_replace: bool,
254    /// Whether to skip if exists
255    pub if_not_exists: bool,
256    /// Optional WHERE filter expression
257    pub filter: Option<Expr>,
258    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
259    pub connector_type: Option<String>,
260    /// Connector-specific options (from `INTO KAFKA (...)`)
261    pub connector_options: HashMap<String, String>,
262    /// Format specification (e.g., `FORMAT JSON`)
263    pub format: Option<FormatSpec>,
264    /// Output options (from `WITH (key = ...)` after FORMAT)
265    pub output_options: HashMap<String, String>,
266}
267
268/// Source for a sink
269#[derive(Debug, Clone, PartialEq)]
270pub enum SinkFrom {
271    /// From a table or source
272    Table(ObjectName),
273    /// From a SELECT query
274    Query(Box<StreamingStatement>),
275}
276
277/// Watermark definition
278#[derive(Debug, Clone, PartialEq)]
279pub struct WatermarkDef {
280    /// Column to use for watermark
281    pub column: Ident,
282    /// Watermark expression (e.g., column - INTERVAL '5' SECOND).
283    /// `None` when `WATERMARK FOR col` is used without `AS expr`,
284    /// meaning watermark advances via `source.watermark()` with zero delay.
285    pub expression: Option<Expr>,
286}
287
288/// Late data handling clause.
289///
290/// Controls what happens to events that arrive after their window has closed.
291/// This is the SQL AST representation of late data configuration.
292/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
293#[derive(Debug, Clone, PartialEq, Default)]
294pub struct LateDataClause {
295    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
296    pub allowed_lateness: Option<Box<Expr>>,
297    /// Side output name for late events (e.g., `late_events`)
298    pub side_output: Option<String>,
299}
300
301impl LateDataClause {
302    /// Creates a clause with allowed lateness only.
303    #[must_use]
304    pub fn with_allowed_lateness(lateness: Expr) -> Self {
305        Self {
306            allowed_lateness: Some(Box::new(lateness)),
307            side_output: None,
308        }
309    }
310
311    /// Creates a clause with both allowed lateness and side output.
312    #[must_use]
313    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
314        Self {
315            allowed_lateness: Some(Box::new(lateness)),
316            side_output: Some(side_output),
317        }
318    }
319
320    /// Creates a clause with side output only (uses default lateness).
321    #[must_use]
322    pub fn side_output_only(side_output: String) -> Self {
323        Self {
324            allowed_lateness: None,
325            side_output: Some(side_output),
326        }
327    }
328
329    /// Convert to allowed lateness Duration.
330    ///
331    /// # Errors
332    ///
333    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
334    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
335        match &self.allowed_lateness {
336            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
337            None => Ok(Duration::ZERO),
338        }
339    }
340
341    /// Check if this clause has a side output configured.
342    #[must_use]
343    pub fn has_side_output(&self) -> bool {
344        self.side_output.is_some()
345    }
346
347    /// Get the side output name, if configured.
348    #[must_use]
349    pub fn get_side_output(&self) -> Option<&str> {
350        self.side_output.as_deref()
351    }
352}
353
354/// Emit strategy for runtime operator configuration.
355///
356/// This is the runtime representation that operators use.
357#[derive(Debug, Clone, PartialEq)]
358pub enum EmitStrategy {
359    /// Emit when watermark passes window end
360    OnWatermark,
361    /// Emit only when window closes (no intermediate results)
362    OnWindowClose,
363    /// Emit at fixed intervals
364    Periodic(Duration),
365    /// Emit on every state change
366    OnUpdate,
367    /// Emit changelog records with Z-set weights
368    Changelog,
369    /// Emit only final results, suppress all intermediate
370    FinalOnly,
371}
372
373/// EMIT clause for controlling output timing.
374///
375/// This is the SQL AST representation of emit strategies.
376/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
377#[derive(Debug, Clone, PartialEq)]
378pub enum EmitClause {
379    // === Existing ===
380    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
381    ///
382    /// Emit results when the watermark passes the window end.
383    /// This is the most efficient strategy.
384    AfterWatermark,
385
386    /// EMIT ON WINDOW CLOSE
387    ///
388    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
389    /// Only emits when window closes, no intermediate results.
390    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
391    OnWindowClose,
392
393    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
394    ///
395    /// Emit intermediate results at fixed intervals.
396    /// Final results are still emitted on watermark.
397    Periodically {
398        /// The interval expression (e.g., INTERVAL '5' SECOND)
399        interval: Box<Expr>,
400    },
401
402    /// EMIT ON UPDATE
403    ///
404    /// Emit updated results after every state change.
405    /// This provides lowest latency but highest overhead.
406    OnUpdate,
407
408    // === New ===
409    /// EMIT CHANGES
410    ///
411    /// Emit changelog records with Z-set weights for CDC pipelines.
412    /// Every emission includes operation type and weight:
413    /// - Insert (+1 weight)
414    /// - Delete (-1 weight)
415    /// - Update (retraction pair: -1 old, +1 new)
416    ///
417    /// Required for:
418    /// - CDC pipelines
419    /// - Cascading materialized views
420    /// - Downstream consumers that need to track changes
421    Changes,
422
423    /// EMIT FINAL
424    ///
425    /// Suppress ALL intermediate results, emit only finalized.
426    /// Also drops late data entirely after window close.
427    /// Use for BI reporting where only final, exact results matter.
428    Final,
429}
430
431impl std::fmt::Display for EmitClause {
432    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433        match self {
434            EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
435            EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
436            EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
437            EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
438            EmitClause::Changes => write!(f, "EMIT CHANGES"),
439            EmitClause::Final => write!(f, "EMIT FINAL"),
440        }
441    }
442}
443
444impl EmitClause {
445    /// Convert to runtime EmitStrategy.
446    ///
447    /// # Errors
448    ///
449    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
450    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
451        match self {
452            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
453            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
454            EmitClause::Periodically { interval } => {
455                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
456                Ok(EmitStrategy::Periodic(duration))
457            }
458            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
459            EmitClause::Changes => Ok(EmitStrategy::Changelog),
460            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
461        }
462    }
463
464    /// Check if this emit strategy requires changelog/retraction support.
465    #[must_use]
466    pub fn requires_changelog(&self) -> bool {
467        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
468    }
469
470    /// Check if this emit strategy is append-only (no retractions).
471    #[must_use]
472    pub fn is_append_only(&self) -> bool {
473        matches!(
474            self,
475            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
476        )
477    }
478
479    /// Returns true if this emit strategy requires a watermark on the source.
480    ///
481    /// `OnWindowClose`, `Final`, and `AfterWatermark` all depend on watermark
482    /// advancement to trigger window closure. Without a watermark, timers will
483    /// never fire and windows will never close.
484    #[must_use]
485    pub fn requires_watermark(&self) -> bool {
486        matches!(
487            self,
488            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
489        )
490    }
491}
492
493/// Window function types
494#[derive(Debug, Clone, PartialEq)]
495pub enum WindowFunction {
496    /// TUMBLE(column, interval [, offset])
497    Tumble {
498        /// The time column to window on
499        time_column: Box<Expr>,
500        /// The window interval
501        interval: Box<Expr>,
502        /// Optional offset for timezone-aligned windows
503        offset: Option<Box<Expr>>,
504    },
505    /// HOP(column, slide, size [, offset])
506    Hop {
507        /// The time column to window on
508        time_column: Box<Expr>,
509        /// The slide interval (how often to create a new window)
510        slide_interval: Box<Expr>,
511        /// The window size interval
512        window_interval: Box<Expr>,
513        /// Optional offset for timezone-aligned windows
514        offset: Option<Box<Expr>>,
515    },
516    /// SESSION(column, gap)
517    Session {
518        /// The time column to window on
519        time_column: Box<Expr>,
520        /// The gap interval (max gap between events in same session)
521        gap_interval: Box<Expr>,
522    },
523    /// CUMULATE(column, step, size)
524    Cumulate {
525        /// The time column to window on
526        time_column: Box<Expr>,
527        /// The step interval (window growth increment)
528        step_interval: Box<Expr>,
529        /// The max window size interval (epoch size)
530        max_size_interval: Box<Expr>,
531    },
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
538
539    #[test]
540    fn test_create_source_statement() {
541        let stmt = CreateSourceStatement {
542            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
543            columns: vec![
544                ColumnDef {
545                    name: Ident::new("id"),
546                    data_type: DataType::BigInt(None),
547                    options: vec![],
548                },
549                ColumnDef {
550                    name: Ident::new("timestamp"),
551                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
552                    options: vec![],
553                },
554            ],
555            watermark: Some(WatermarkDef {
556                column: Ident::new("timestamp"),
557                expression: Some(Expr::Identifier(Ident::new("timestamp"))),
558            }),
559            with_options: HashMap::from([
560                ("connector".to_string(), "kafka".to_string()),
561                ("topic".to_string(), "events".to_string()),
562            ]),
563            or_replace: false,
564            if_not_exists: true,
565            connector_type: None,
566            connector_options: HashMap::new(),
567            format: None,
568            has_wildcard: false,
569            wildcard_prefix: None,
570        };
571
572        // Check the statement fields
573        assert_eq!(stmt.columns.len(), 2);
574        assert!(stmt.watermark.is_some());
575        assert_eq!(
576            stmt.with_options.get("connector"),
577            Some(&"kafka".to_string())
578        );
579    }
580
581    #[test]
582    fn test_emit_clause_variants() {
583        let emit1 = EmitClause::AfterWatermark;
584        let emit2 = EmitClause::OnWindowClose;
585        let emit3 = EmitClause::Periodically {
586            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
587        };
588        let emit4 = EmitClause::OnUpdate;
589
590        match emit1 {
591            EmitClause::AfterWatermark => (),
592            _ => panic!("Expected AfterWatermark"),
593        }
594
595        match emit2 {
596            EmitClause::OnWindowClose => (),
597            _ => panic!("Expected OnWindowClose"),
598        }
599
600        match emit3 {
601            EmitClause::Periodically { .. } => (),
602            _ => panic!("Expected Periodically"),
603        }
604
605        match emit4 {
606            EmitClause::OnUpdate => (),
607            _ => panic!("Expected OnUpdate"),
608        }
609    }
610
611    #[test]
612    fn test_window_functions() {
613        let tumble = WindowFunction::Tumble {
614            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
615            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
616            offset: None,
617        };
618
619        let hop = WindowFunction::Hop {
620            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
621            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
622            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
623            offset: None,
624        };
625
626        match tumble {
627            WindowFunction::Tumble { .. } => (),
628            _ => panic!("Expected Tumble"),
629        }
630
631        match hop {
632            WindowFunction::Hop { .. } => (),
633            _ => panic!("Expected Hop"),
634        }
635    }
636
637    #[test]
638    fn test_late_data_clause_default() {
639        let clause = LateDataClause::default();
640        assert!(clause.allowed_lateness.is_none());
641        assert!(clause.side_output.is_none());
642    }
643
644    #[test]
645    fn test_late_data_clause_with_allowed_lateness() {
646        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
647        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
648        assert!(clause.allowed_lateness.is_some());
649        assert!(clause.side_output.is_none());
650    }
651
652    #[test]
653    fn test_late_data_clause_with_side_output() {
654        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
655        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
656        assert!(clause.allowed_lateness.is_some());
657        assert_eq!(clause.side_output, Some("late_events".to_string()));
658    }
659
660    #[test]
661    fn test_late_data_clause_side_output_only() {
662        let clause = LateDataClause::side_output_only("late_events".to_string());
663        assert!(clause.allowed_lateness.is_none());
664        assert_eq!(clause.side_output, Some("late_events".to_string()));
665    }
666
667    #[test]
668    fn test_show_command_variants() {
669        let sources = ShowCommand::Sources;
670        let sinks = ShowCommand::Sinks;
671        let queries = ShowCommand::Queries;
672        let mvs = ShowCommand::MaterializedViews;
673
674        assert_eq!(sources, ShowCommand::Sources);
675        assert_eq!(sinks, ShowCommand::Sinks);
676        assert_eq!(queries, ShowCommand::Queries);
677        assert_eq!(mvs, ShowCommand::MaterializedViews);
678    }
679
680    #[test]
681    fn test_show_command_clone() {
682        let cmd = ShowCommand::Sources;
683        let cloned = cmd.clone();
684        assert_eq!(cmd, cloned);
685    }
686
687    #[test]
688    fn test_drop_source_statement() {
689        let stmt = StreamingStatement::DropSource {
690            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
691            if_exists: true,
692            cascade: false,
693        };
694        match stmt {
695            StreamingStatement::DropSource {
696                name,
697                if_exists,
698                cascade,
699            } => {
700                assert_eq!(name.to_string(), "events");
701                assert!(if_exists);
702                assert!(!cascade);
703            }
704            _ => panic!("Expected DropSource"),
705        }
706    }
707
708    #[test]
709    fn test_drop_sink_statement() {
710        let stmt = StreamingStatement::DropSink {
711            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
712            if_exists: false,
713            cascade: false,
714        };
715        match stmt {
716            StreamingStatement::DropSink {
717                name,
718                if_exists,
719                cascade,
720            } => {
721                assert_eq!(name.to_string(), "output");
722                assert!(!if_exists);
723                assert!(!cascade);
724            }
725            _ => panic!("Expected DropSink"),
726        }
727    }
728
729    #[test]
730    fn test_drop_materialized_view_statement() {
731        let stmt = StreamingStatement::DropMaterializedView {
732            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
733            if_exists: true,
734            cascade: true,
735        };
736        match stmt {
737            StreamingStatement::DropMaterializedView {
738                name,
739                if_exists,
740                cascade,
741            } => {
742                assert_eq!(name.to_string(), "live_stats");
743                assert!(if_exists);
744                assert!(cascade);
745            }
746            _ => panic!("Expected DropMaterializedView"),
747        }
748    }
749
750    #[test]
751    fn test_show_statement() {
752        let stmt = StreamingStatement::Show(ShowCommand::Sources);
753        match stmt {
754            StreamingStatement::Show(ShowCommand::Sources) => (),
755            _ => panic!("Expected Show(Sources)"),
756        }
757    }
758
759    #[test]
760    fn test_describe_statement() {
761        let stmt = StreamingStatement::Describe {
762            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
763            extended: true,
764        };
765        match stmt {
766            StreamingStatement::Describe { name, extended } => {
767                assert_eq!(name.to_string(), "events");
768                assert!(extended);
769            }
770            _ => panic!("Expected Describe"),
771        }
772    }
773
774    #[test]
775    fn test_explain_statement() {
776        // Build an inner Standard statement using sqlparser
777        let dialect = sqlparser::dialect::GenericDialect {};
778        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
779        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
780
781        let stmt = StreamingStatement::Explain {
782            statement: Box::new(inner),
783            analyze: false,
784        };
785        match stmt {
786            StreamingStatement::Explain { statement, .. } => {
787                assert!(matches!(*statement, StreamingStatement::Standard(_)));
788            }
789            _ => panic!("Expected Explain"),
790        }
791    }
792
793    #[test]
794    fn test_create_materialized_view_statement() {
795        // Build a query statement using sqlparser
796        let dialect = sqlparser::dialect::GenericDialect {};
797        let stmts =
798            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
799        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
800
801        let stmt = StreamingStatement::CreateMaterializedView {
802            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
803            query: Box::new(query),
804            emit_clause: Some(EmitClause::OnWindowClose),
805            or_replace: false,
806            if_not_exists: true,
807        };
808        match stmt {
809            StreamingStatement::CreateMaterializedView {
810                name,
811                emit_clause,
812                or_replace,
813                if_not_exists,
814                ..
815            } => {
816                assert_eq!(name.to_string(), "live_stats");
817                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
818                assert!(!or_replace);
819                assert!(if_not_exists);
820            }
821            _ => panic!("Expected CreateMaterializedView"),
822        }
823    }
824
825    #[test]
826    fn test_insert_into_statement() {
827        let stmt = StreamingStatement::InsertInto {
828            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
829            columns: vec![Ident::new("id"), Ident::new("name")],
830            values: vec![vec![
831                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
832                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
833            ]],
834        };
835        match stmt {
836            StreamingStatement::InsertInto {
837                table_name,
838                columns,
839                values,
840            } => {
841                assert_eq!(table_name.to_string(), "events");
842                assert_eq!(columns.len(), 2);
843                assert_eq!(values.len(), 1);
844                assert_eq!(values[0].len(), 2);
845            }
846            _ => panic!("Expected InsertInto"),
847        }
848    }
849
850    #[test]
851    fn test_eowc_requires_watermark_helper() {
852        // Watermark-dependent strategies
853        assert!(EmitClause::OnWindowClose.requires_watermark());
854        assert!(EmitClause::Final.requires_watermark());
855        assert!(EmitClause::AfterWatermark.requires_watermark());
856
857        // Non-watermark strategies
858        assert!(!EmitClause::OnUpdate.requires_watermark());
859        assert!(!EmitClause::Changes.requires_watermark());
860        let periodic = EmitClause::Periodically {
861            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
862        };
863        assert!(!periodic.requires_watermark());
864    }
865}