Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6#[allow(clippy::disallowed_types)] // cold path: SQL parsing
7use std::collections::HashMap;
8use std::time::Duration;
9
10use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
11
12use super::window_rewriter::WindowRewriter;
13use super::ParseError;
14
15/// SHOW command variants for listing streaming objects.
16#[derive(Debug, Clone, PartialEq)]
17pub enum ShowCommand {
18    /// SHOW SOURCES - list all registered sources
19    Sources,
20    /// SHOW SINKS - list all registered sinks
21    Sinks,
22    /// SHOW QUERIES - list all running continuous queries
23    Queries,
24    /// SHOW MATERIALIZED VIEWS - list all materialized views
25    MaterializedViews,
26    /// SHOW STREAMS - list all named streams
27    Streams,
28    /// SHOW TABLES - list all reference/dimension tables
29    Tables,
30    /// SHOW CHECKPOINT STATUS - display checkpoint state
31    CheckpointStatus,
32    /// `SHOW CREATE SOURCE` — reconstruct source DDL
33    CreateSource {
34        /// Source name
35        name: ObjectName,
36    },
37    /// `SHOW CREATE SINK` — reconstruct sink DDL
38    CreateSink {
39        /// Sink name
40        name: ObjectName,
41    },
42}
43
44/// Streaming-specific SQL statements
45#[derive(Debug, Clone, PartialEq)]
46pub enum StreamingStatement {
47    /// Standard SQL statement
48    Standard(Box<sqlparser::ast::Statement>),
49
50    /// CREATE SOURCE statement
51    CreateSource(Box<CreateSourceStatement>),
52
53    /// CREATE SINK statement
54    CreateSink(Box<CreateSinkStatement>),
55
56    /// CREATE CONTINUOUS QUERY
57    CreateContinuousQuery {
58        /// Query name
59        name: ObjectName,
60        /// SQL query with streaming extensions
61        query: Box<StreamingStatement>,
62        /// EMIT clause if present
63        emit_clause: Option<EmitClause>,
64    },
65
66    /// DROP SOURCE statement
67    DropSource {
68        /// Source name to drop
69        name: ObjectName,
70        /// Whether IF EXISTS was specified
71        if_exists: bool,
72        /// Whether CASCADE was specified (drops dependent streams/MVs)
73        cascade: bool,
74    },
75
76    /// DROP SINK statement
77    DropSink {
78        /// Sink name to drop
79        name: ObjectName,
80        /// Whether IF EXISTS was specified
81        if_exists: bool,
82        /// Whether CASCADE was specified
83        cascade: bool,
84    },
85
86    /// DROP MATERIALIZED VIEW statement
87    DropMaterializedView {
88        /// View name to drop
89        name: ObjectName,
90        /// Whether IF EXISTS was specified
91        if_exists: bool,
92        /// Whether CASCADE was specified
93        cascade: bool,
94    },
95
96    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
97    Show(ShowCommand),
98
99    /// DESCRIBE source, sink, or other streaming object
100    Describe {
101        /// Object name to describe
102        name: ObjectName,
103        /// Whether EXTENDED was specified for additional detail
104        extended: bool,
105    },
106
107    /// `EXPLAIN [ANALYZE]` a streaming query plan
108    Explain {
109        /// The statement to explain
110        statement: Box<StreamingStatement>,
111        /// Whether ANALYZE was specified (execute and collect metrics)
112        analyze: bool,
113    },
114
115    /// CREATE MATERIALIZED VIEW
116    CreateMaterializedView {
117        /// View name
118        name: ObjectName,
119        /// The backing query
120        query: Box<StreamingStatement>,
121        /// Optional EMIT clause
122        emit_clause: Option<EmitClause>,
123        /// Whether OR REPLACE was specified
124        or_replace: bool,
125        /// Whether IF NOT EXISTS was specified
126        if_not_exists: bool,
127        /// Raw query text between `AS` and `EMIT`.
128        query_sql: String,
129    },
130
131    /// CREATE STREAM — named streaming pipeline
132    CreateStream {
133        /// Stream name
134        name: ObjectName,
135        /// Backing query (AS SELECT ...)
136        query: Box<StreamingStatement>,
137        /// Optional EMIT clause
138        emit_clause: Option<EmitClause>,
139        /// Whether OR REPLACE was specified
140        or_replace: bool,
141        /// Whether IF NOT EXISTS was specified
142        if_not_exists: bool,
143        /// Raw query text between `AS` and `EMIT`.
144        query_sql: String,
145        /// `RETAIN_HISTORY` cap in bytes from the trailing `WITH (...)`.
146        retention_bytes: Option<u64>,
147    },
148
149    /// DROP STREAM statement
150    DropStream {
151        /// Stream name to drop
152        name: ObjectName,
153        /// Whether IF EXISTS was specified
154        if_exists: bool,
155        /// Whether CASCADE was specified
156        cascade: bool,
157    },
158
159    /// ALTER SOURCE — modify a source definition
160    AlterSource {
161        /// Source name to alter
162        name: ObjectName,
163        /// The alteration to apply
164        operation: AlterSourceOperation,
165    },
166
167    /// INSERT INTO a streaming source or table
168    InsertInto {
169        /// Target table or source name
170        table_name: ObjectName,
171        /// Column names (empty if not specified)
172        columns: Vec<Ident>,
173        /// Row values
174        values: Vec<Vec<Expr>>,
175    },
176
177    /// CREATE LOOKUP TABLE statement
178    CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
179
180    /// DROP LOOKUP TABLE statement
181    DropLookupTable {
182        /// Lookup table name to drop
183        name: ObjectName,
184        /// Whether IF EXISTS was specified
185        if_exists: bool,
186    },
187
188    /// CHECKPOINT — trigger an immediate checkpoint
189    Checkpoint,
190
191    /// `RESTORE FROM CHECKPOINT <id>`
192    RestoreCheckpoint {
193        /// The checkpoint ID to restore from
194        checkpoint_id: u64,
195    },
196
197    /// `SUBSCRIBE <name> [WHERE ...] [WITH (...)]`.
198    Subscribe(Box<SubscribeStatement>),
199
200    /// `DECLARE <name> [NO SCROLL] CURSOR [WITHOUT HOLD] FOR SUBSCRIBE …`
201    ///
202    /// Forward-only cursor over a SUBSCRIBE, scoped to the current SimpleQuery
203    /// connection. SCROLL, BINARY, WITH HOLD, INSENSITIVE, and ASENSITIVE are
204    /// rejected at parse time.
205    DeclareCursorForSubscribe {
206        /// Cursor identifier as supplied by the client.
207        name: Ident,
208        /// `NO SCROLL` was explicit in the source. We never emit SCROLL.
209        no_scroll: bool,
210        /// The SUBSCRIBE body the cursor wraps.
211        subscribe: Box<SubscribeStatement>,
212    },
213}
214
215/// `SUBSCRIBE <name> [AS OF EPOCH n] [WHERE <fragment>] [WITH (...)]`.
216#[derive(Debug, Clone, PartialEq)]
217pub struct SubscribeStatement {
218    /// Target stream or materialized view name.
219    pub name: ObjectName,
220    /// Raw WHERE fragment, compiled by the engine against the target schema.
221    pub filter_sql: Option<String>,
222    /// `AS OF EPOCH n`: replay everything emitted strictly after barrier `n`.
223    pub as_of_epoch: Option<u64>,
224    /// Reserved for future WITH options.
225    pub options: HashMap<String, String>,
226}
227
228/// Operations for ALTER SOURCE statements.
229#[derive(Debug, Clone, PartialEq)]
230pub enum AlterSourceOperation {
231    /// Add a new column: `ALTER SOURCE name ADD COLUMN col_name data_type`
232    AddColumn {
233        /// Column definition
234        column_def: ColumnDef,
235    },
236    /// Set source properties: `ALTER SOURCE name SET ('key' = 'value', ...)`
237    SetProperties {
238        /// Key-value pairs
239        properties: HashMap<String, String>,
240    },
241}
242
243/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
244#[derive(Debug, Clone, PartialEq)]
245pub struct FormatSpec {
246    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
247    pub format_type: String,
248    /// Additional format options (from WITH clause after FORMAT).
249    pub options: HashMap<String, String>,
250}
251
252/// CREATE SOURCE statement
253#[derive(Debug, Clone, PartialEq)]
254pub struct CreateSourceStatement {
255    /// Source name
256    pub name: ObjectName,
257    /// Column definitions
258    pub columns: Vec<ColumnDef>,
259    /// Watermark definition
260    pub watermark: Option<WatermarkDef>,
261    /// Source connector options (from WITH clause)
262    pub with_options: HashMap<String, String>,
263    /// Whether to replace existing source
264    pub or_replace: bool,
265    /// Whether to skip if exists
266    pub if_not_exists: bool,
267    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
268    pub connector_type: Option<String>,
269    /// Connector-specific options (from `FROM KAFKA (...)`)
270    pub connector_options: HashMap<String, String>,
271    /// Format specification (e.g., `FORMAT JSON`)
272    pub format: Option<FormatSpec>,
273    /// Whether the column list includes a `*` wildcard for schema inference.
274    pub has_wildcard: bool,
275    /// Optional prefix for wildcard-expanded columns (from `PREFIX 'str'`).
276    pub wildcard_prefix: Option<String>,
277}
278
279/// CREATE SINK statement
280#[derive(Debug, Clone, PartialEq)]
281pub struct CreateSinkStatement {
282    /// Sink name
283    pub name: ObjectName,
284    /// Input query or table
285    pub from: SinkFrom,
286    /// Sink connector options (from WITH clause)
287    pub with_options: HashMap<String, String>,
288    /// Whether to replace existing sink
289    pub or_replace: bool,
290    /// Whether to skip if exists
291    pub if_not_exists: bool,
292    /// Optional WHERE filter expression
293    pub filter: Option<Expr>,
294    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
295    pub connector_type: Option<String>,
296    /// Connector-specific options (from `INTO KAFKA (...)`)
297    pub connector_options: HashMap<String, String>,
298    /// Format specification (e.g., `FORMAT JSON`)
299    pub format: Option<FormatSpec>,
300    /// Output options (from `WITH (key = ...)` after FORMAT)
301    pub output_options: HashMap<String, String>,
302}
303
304/// Source for a sink
305#[derive(Debug, Clone, PartialEq)]
306pub enum SinkFrom {
307    /// From a table or source
308    Table(ObjectName),
309    /// From a SELECT query
310    Query(Box<StreamingStatement>),
311}
312
313/// Watermark definition
314#[derive(Debug, Clone, PartialEq)]
315pub struct WatermarkDef {
316    /// Column to use for watermark
317    pub column: Ident,
318    /// Watermark expression (e.g., column - INTERVAL '5' SECOND).
319    /// `None` when `WATERMARK FOR col` is used without `AS expr`,
320    /// meaning watermark advances via `source.watermark()` with zero delay.
321    pub expression: Option<Expr>,
322}
323
324/// Late data handling clause.
325///
326/// Controls what happens to events that arrive after their window has closed.
327/// This is the SQL AST representation of late data configuration.
328/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
329#[derive(Debug, Clone, PartialEq, Default)]
330pub struct LateDataClause {
331    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
332    pub allowed_lateness: Option<Box<Expr>>,
333    /// Side output name for late events (e.g., `late_events`)
334    pub side_output: Option<String>,
335}
336
337impl LateDataClause {
338    /// Creates a clause with allowed lateness only.
339    #[must_use]
340    pub fn with_allowed_lateness(lateness: Expr) -> Self {
341        Self {
342            allowed_lateness: Some(Box::new(lateness)),
343            side_output: None,
344        }
345    }
346
347    /// Creates a clause with both allowed lateness and side output.
348    #[must_use]
349    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
350        Self {
351            allowed_lateness: Some(Box::new(lateness)),
352            side_output: Some(side_output),
353        }
354    }
355
356    /// Creates a clause with side output only (uses default lateness).
357    #[must_use]
358    pub fn side_output_only(side_output: String) -> Self {
359        Self {
360            allowed_lateness: None,
361            side_output: Some(side_output),
362        }
363    }
364
365    /// Convert to allowed lateness Duration.
366    ///
367    /// # Errors
368    ///
369    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
370    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
371        match &self.allowed_lateness {
372            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
373            None => Ok(Duration::ZERO),
374        }
375    }
376
377    /// Check if this clause has a side output configured.
378    #[must_use]
379    pub fn has_side_output(&self) -> bool {
380        self.side_output.is_some()
381    }
382
383    /// Get the side output name, if configured.
384    #[must_use]
385    pub fn get_side_output(&self) -> Option<&str> {
386        self.side_output.as_deref()
387    }
388}
389
390/// Emit strategy for runtime operator configuration.
391///
392/// This is the runtime representation that operators use.
393#[derive(Debug, Clone, PartialEq)]
394pub enum EmitStrategy {
395    /// Emit when watermark passes window end
396    OnWatermark,
397    /// Emit only when window closes (no intermediate results)
398    OnWindowClose,
399    /// Emit at fixed intervals
400    Periodic(Duration),
401    /// Emit on every state change
402    OnUpdate,
403    /// Emit changelog records with Z-set weights
404    Changelog,
405    /// Emit only final results, suppress all intermediate
406    FinalOnly,
407}
408
409/// EMIT clause for controlling output timing.
410///
411/// This is the SQL AST representation of emit strategies.
412/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
413#[derive(Debug, Clone, PartialEq)]
414pub enum EmitClause {
415    // === Existing ===
416    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
417    ///
418    /// Emit results when the watermark passes the window end.
419    /// This is the most efficient strategy.
420    AfterWatermark,
421
422    /// EMIT ON WINDOW CLOSE
423    ///
424    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
425    /// Only emits when window closes, no intermediate results.
426    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
427    OnWindowClose,
428
429    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
430    ///
431    /// Emit intermediate results at fixed intervals.
432    /// Final results are still emitted on watermark.
433    Periodically {
434        /// The interval expression (e.g., INTERVAL '5' SECOND)
435        interval: Box<Expr>,
436    },
437
438    /// EMIT ON UPDATE
439    ///
440    /// Emit updated results after every state change.
441    /// This provides lowest latency but highest overhead.
442    OnUpdate,
443
444    // === New ===
445    /// EMIT CHANGES
446    ///
447    /// Emit changelog records with Z-set weights for CDC pipelines.
448    /// Every emission includes operation type and weight:
449    /// - Insert (+1 weight)
450    /// - Delete (-1 weight)
451    /// - Update (retraction pair: -1 old, +1 new)
452    ///
453    /// Required for:
454    /// - CDC pipelines
455    /// - Cascading materialized views
456    /// - Downstream consumers that need to track changes
457    Changes,
458
459    /// EMIT FINAL
460    ///
461    /// Suppress ALL intermediate results, emit only finalized.
462    /// Also drops late data entirely after window close.
463    /// Use for BI reporting where only final, exact results matter.
464    Final,
465}
466
467impl std::fmt::Display for EmitClause {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        match self {
470            EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
471            EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
472            EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
473            EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
474            EmitClause::Changes => write!(f, "EMIT CHANGES"),
475            EmitClause::Final => write!(f, "EMIT FINAL"),
476        }
477    }
478}
479
480impl EmitClause {
481    /// Convert to runtime EmitStrategy.
482    ///
483    /// # Errors
484    ///
485    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
486    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
487        match self {
488            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
489            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
490            EmitClause::Periodically { interval } => {
491                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
492                Ok(EmitStrategy::Periodic(duration))
493            }
494            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
495            EmitClause::Changes => Ok(EmitStrategy::Changelog),
496            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
497        }
498    }
499
500    /// Check if this emit strategy requires changelog/retraction support.
501    #[must_use]
502    pub fn requires_changelog(&self) -> bool {
503        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
504    }
505
506    /// Check if this emit strategy is append-only (no retractions).
507    #[must_use]
508    pub fn is_append_only(&self) -> bool {
509        matches!(
510            self,
511            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
512        )
513    }
514
515    /// Returns true if this emit strategy requires a watermark on the source.
516    ///
517    /// `OnWindowClose`, `Final`, and `AfterWatermark` all depend on watermark
518    /// advancement to trigger window closure. Without a watermark, timers will
519    /// never fire and windows will never close.
520    #[must_use]
521    pub fn requires_watermark(&self) -> bool {
522        matches!(
523            self,
524            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
525        )
526    }
527}
528
529/// Window function types
530#[derive(Debug, Clone, PartialEq)]
531pub enum WindowFunction {
532    /// TUMBLE(column, interval [, offset])
533    Tumble {
534        /// The time column to window on
535        time_column: Box<Expr>,
536        /// The window interval
537        interval: Box<Expr>,
538        /// Optional offset for timezone-aligned windows
539        offset: Option<Box<Expr>>,
540    },
541    /// HOP(column, slide, size [, offset])
542    Hop {
543        /// The time column to window on
544        time_column: Box<Expr>,
545        /// The slide interval (how often to create a new window)
546        slide_interval: Box<Expr>,
547        /// The window size interval
548        window_interval: Box<Expr>,
549        /// Optional offset for timezone-aligned windows
550        offset: Option<Box<Expr>>,
551    },
552    /// SESSION(column, gap)
553    Session {
554        /// The time column to window on
555        time_column: Box<Expr>,
556        /// The gap interval (max gap between events in same session)
557        gap_interval: Box<Expr>,
558    },
559    /// CUMULATE(column, step, size)
560    Cumulate {
561        /// The time column to window on
562        time_column: Box<Expr>,
563        /// The step interval (window growth increment)
564        step_interval: Box<Expr>,
565        /// The max window size interval (epoch size)
566        max_size_interval: Box<Expr>,
567    },
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
574
575    #[test]
576    fn test_create_source_statement() {
577        let stmt = CreateSourceStatement {
578            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
579            columns: vec![
580                ColumnDef {
581                    name: Ident::new("id"),
582                    data_type: DataType::BigInt(None),
583                    options: vec![],
584                },
585                ColumnDef {
586                    name: Ident::new("timestamp"),
587                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
588                    options: vec![],
589                },
590            ],
591            watermark: Some(WatermarkDef {
592                column: Ident::new("timestamp"),
593                expression: Some(Expr::Identifier(Ident::new("timestamp"))),
594            }),
595            with_options: HashMap::from([
596                ("connector".to_string(), "kafka".to_string()),
597                ("topic".to_string(), "events".to_string()),
598            ]),
599            or_replace: false,
600            if_not_exists: true,
601            connector_type: None,
602            connector_options: HashMap::new(),
603            format: None,
604            has_wildcard: false,
605            wildcard_prefix: None,
606        };
607
608        // Check the statement fields
609        assert_eq!(stmt.columns.len(), 2);
610        assert!(stmt.watermark.is_some());
611        assert_eq!(
612            stmt.with_options.get("connector"),
613            Some(&"kafka".to_string())
614        );
615    }
616
617    #[test]
618    fn test_emit_clause_variants() {
619        let emit1 = EmitClause::AfterWatermark;
620        let emit2 = EmitClause::OnWindowClose;
621        let emit3 = EmitClause::Periodically {
622            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
623        };
624        let emit4 = EmitClause::OnUpdate;
625
626        match emit1 {
627            EmitClause::AfterWatermark => (),
628            _ => panic!("Expected AfterWatermark"),
629        }
630
631        match emit2 {
632            EmitClause::OnWindowClose => (),
633            _ => panic!("Expected OnWindowClose"),
634        }
635
636        match emit3 {
637            EmitClause::Periodically { .. } => (),
638            _ => panic!("Expected Periodically"),
639        }
640
641        match emit4 {
642            EmitClause::OnUpdate => (),
643            _ => panic!("Expected OnUpdate"),
644        }
645    }
646
647    #[test]
648    fn test_window_functions() {
649        let tumble = WindowFunction::Tumble {
650            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
651            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
652            offset: None,
653        };
654
655        let hop = WindowFunction::Hop {
656            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
657            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
658            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
659            offset: None,
660        };
661
662        match tumble {
663            WindowFunction::Tumble { .. } => (),
664            _ => panic!("Expected Tumble"),
665        }
666
667        match hop {
668            WindowFunction::Hop { .. } => (),
669            _ => panic!("Expected Hop"),
670        }
671    }
672
673    #[test]
674    fn test_late_data_clause_default() {
675        let clause = LateDataClause::default();
676        assert!(clause.allowed_lateness.is_none());
677        assert!(clause.side_output.is_none());
678    }
679
680    #[test]
681    fn test_late_data_clause_with_allowed_lateness() {
682        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
683        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
684        assert!(clause.allowed_lateness.is_some());
685        assert!(clause.side_output.is_none());
686    }
687
688    #[test]
689    fn test_late_data_clause_with_side_output() {
690        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
691        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
692        assert!(clause.allowed_lateness.is_some());
693        assert_eq!(clause.side_output, Some("late_events".to_string()));
694    }
695
696    #[test]
697    fn test_late_data_clause_side_output_only() {
698        let clause = LateDataClause::side_output_only("late_events".to_string());
699        assert!(clause.allowed_lateness.is_none());
700        assert_eq!(clause.side_output, Some("late_events".to_string()));
701    }
702
703    #[test]
704    fn test_show_command_variants() {
705        let sources = ShowCommand::Sources;
706        let sinks = ShowCommand::Sinks;
707        let queries = ShowCommand::Queries;
708        let mvs = ShowCommand::MaterializedViews;
709
710        assert_eq!(sources, ShowCommand::Sources);
711        assert_eq!(sinks, ShowCommand::Sinks);
712        assert_eq!(queries, ShowCommand::Queries);
713        assert_eq!(mvs, ShowCommand::MaterializedViews);
714    }
715
716    #[test]
717    fn test_show_command_clone() {
718        let cmd = ShowCommand::Sources;
719        let cloned = cmd.clone();
720        assert_eq!(cmd, cloned);
721    }
722
723    #[test]
724    fn test_drop_source_statement() {
725        let stmt = StreamingStatement::DropSource {
726            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
727            if_exists: true,
728            cascade: false,
729        };
730        match stmt {
731            StreamingStatement::DropSource {
732                name,
733                if_exists,
734                cascade,
735            } => {
736                assert_eq!(name.to_string(), "events");
737                assert!(if_exists);
738                assert!(!cascade);
739            }
740            _ => panic!("Expected DropSource"),
741        }
742    }
743
744    #[test]
745    fn test_drop_sink_statement() {
746        let stmt = StreamingStatement::DropSink {
747            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
748            if_exists: false,
749            cascade: false,
750        };
751        match stmt {
752            StreamingStatement::DropSink {
753                name,
754                if_exists,
755                cascade,
756            } => {
757                assert_eq!(name.to_string(), "output");
758                assert!(!if_exists);
759                assert!(!cascade);
760            }
761            _ => panic!("Expected DropSink"),
762        }
763    }
764
765    #[test]
766    fn test_drop_materialized_view_statement() {
767        let stmt = StreamingStatement::DropMaterializedView {
768            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
769            if_exists: true,
770            cascade: true,
771        };
772        match stmt {
773            StreamingStatement::DropMaterializedView {
774                name,
775                if_exists,
776                cascade,
777            } => {
778                assert_eq!(name.to_string(), "live_stats");
779                assert!(if_exists);
780                assert!(cascade);
781            }
782            _ => panic!("Expected DropMaterializedView"),
783        }
784    }
785
786    #[test]
787    fn test_show_statement() {
788        let stmt = StreamingStatement::Show(ShowCommand::Sources);
789        match stmt {
790            StreamingStatement::Show(ShowCommand::Sources) => (),
791            _ => panic!("Expected Show(Sources)"),
792        }
793    }
794
795    #[test]
796    fn test_describe_statement() {
797        let stmt = StreamingStatement::Describe {
798            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
799            extended: true,
800        };
801        match stmt {
802            StreamingStatement::Describe { name, extended } => {
803                assert_eq!(name.to_string(), "events");
804                assert!(extended);
805            }
806            _ => panic!("Expected Describe"),
807        }
808    }
809
810    #[test]
811    fn test_explain_statement() {
812        // Build an inner Standard statement using sqlparser
813        let dialect = sqlparser::dialect::GenericDialect {};
814        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
815        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
816
817        let stmt = StreamingStatement::Explain {
818            statement: Box::new(inner),
819            analyze: false,
820        };
821        match stmt {
822            StreamingStatement::Explain { statement, .. } => {
823                assert!(matches!(*statement, StreamingStatement::Standard(_)));
824            }
825            _ => panic!("Expected Explain"),
826        }
827    }
828
829    #[test]
830    fn test_create_materialized_view_statement() {
831        // Build a query statement using sqlparser
832        let dialect = sqlparser::dialect::GenericDialect {};
833        let stmts =
834            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
835        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
836
837        let stmt = StreamingStatement::CreateMaterializedView {
838            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
839            query: Box::new(query),
840            emit_clause: Some(EmitClause::OnWindowClose),
841            or_replace: false,
842            if_not_exists: true,
843            query_sql: "SELECT COUNT(*) FROM events".to_string(),
844        };
845        match stmt {
846            StreamingStatement::CreateMaterializedView {
847                name,
848                emit_clause,
849                or_replace,
850                if_not_exists,
851                ..
852            } => {
853                assert_eq!(name.to_string(), "live_stats");
854                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
855                assert!(!or_replace);
856                assert!(if_not_exists);
857            }
858            _ => panic!("Expected CreateMaterializedView"),
859        }
860    }
861
862    #[test]
863    fn test_insert_into_statement() {
864        let stmt = StreamingStatement::InsertInto {
865            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
866            columns: vec![Ident::new("id"), Ident::new("name")],
867            values: vec![vec![
868                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
869                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
870            ]],
871        };
872        match stmt {
873            StreamingStatement::InsertInto {
874                table_name,
875                columns,
876                values,
877            } => {
878                assert_eq!(table_name.to_string(), "events");
879                assert_eq!(columns.len(), 2);
880                assert_eq!(values.len(), 1);
881                assert_eq!(values[0].len(), 2);
882            }
883            _ => panic!("Expected InsertInto"),
884        }
885    }
886
887    #[test]
888    fn test_eowc_requires_watermark_helper() {
889        // Watermark-dependent strategies
890        assert!(EmitClause::OnWindowClose.requires_watermark());
891        assert!(EmitClause::Final.requires_watermark());
892        assert!(EmitClause::AfterWatermark.requires_watermark());
893
894        // Non-watermark strategies
895        assert!(!EmitClause::OnUpdate.requires_watermark());
896        assert!(!EmitClause::Changes.requires_watermark());
897        let periodic = EmitClause::Periodically {
898            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
899        };
900        assert!(!periodic.requires_watermark());
901    }
902}