Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod join_parser;
13mod late_data_parser;
14/// Parser for CREATE/DROP LOOKUP TABLE DDL statements
15pub mod lookup_table;
16pub mod order_analyzer;
17mod sink_parser;
18mod source_parser;
19mod statements;
20mod tokenizer;
21mod window_rewriter;
22
23pub use lookup_table::CreateLookupTableStatement;
24pub use statements::{
25    AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
26    FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
27    WindowFunction,
28};
29pub use window_rewriter::WindowRewriter;
30
31use dialect::LaminarDialect;
32use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
33
34/// Parses SQL with streaming extensions.
35///
36/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
37/// for structured parsing. Standard SQL is delegated to sqlparser directly.
38///
39/// # Errors
40///
41/// Returns `ParseError` if the SQL syntax is invalid.
42pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
43    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
44}
45
46/// Parser for streaming SQL extensions.
47///
48/// Provides static methods for parsing streaming SQL statements.
49/// Uses sqlparser's `Parser` API internally for structured parsing
50/// of identifiers, data types, expressions, and queries.
51pub struct StreamingParser;
52
53impl StreamingParser {
54    /// Parse a SQL string with streaming extensions.
55    ///
56    /// Tokenizes the input to detect statement type, then routes to the
57    /// appropriate parser:
58    /// - CREATE SOURCE → `source_parser`
59    /// - CREATE SINK → `sink_parser`
60    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
61    /// - Everything else → `sqlparser::parser::Parser`
62    ///
63    /// # Errors
64    ///
65    /// Returns `ParserError` if the SQL syntax is invalid.
66    #[allow(clippy::too_many_lines)]
67    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
68        let sql_trimmed = sql.trim();
69        if sql_trimmed.is_empty() {
70            return Err(sqlparser::parser::ParserError::ParserError(
71                "Empty SQL statement".to_string(),
72            ));
73        }
74
75        let dialect = LaminarDialect::default();
76
77        // Tokenize to detect statement type (with location for better errors)
78        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
79            .tokenize_with_location()
80            .map_err(|e| {
81                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
82            })?;
83
84        // Route based on token-level detection
85        match detect_streaming_ddl(&tokens) {
86            StreamingDdlKind::CreateSource { .. } => {
87                let mut parser =
88                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
89                let source = source_parser::parse_create_source(&mut parser)
90                    .map_err(parse_error_to_parser_error)?;
91                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
92            }
93            StreamingDdlKind::CreateSink { .. } => {
94                let mut parser =
95                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
96                let sink = sink_parser::parse_create_sink(&mut parser)
97                    .map_err(parse_error_to_parser_error)?;
98                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
99            }
100            StreamingDdlKind::CreateContinuousQuery { .. } => {
101                let mut parser =
102                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
103                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
104                    .map_err(parse_error_to_parser_error)?;
105                Ok(vec![stmt])
106            }
107            StreamingDdlKind::DropSource { .. } => {
108                let mut parser =
109                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
110                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
111                Ok(vec![stmt])
112            }
113            StreamingDdlKind::DropSink { .. } => {
114                let mut parser =
115                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
116                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
117                Ok(vec![stmt])
118            }
119            StreamingDdlKind::DropMaterializedView { .. } => {
120                let mut parser =
121                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
122                let stmt = parse_drop_materialized_view(&mut parser)
123                    .map_err(parse_error_to_parser_error)?;
124                Ok(vec![stmt])
125            }
126            StreamingDdlKind::ShowSources => {
127                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
128            }
129            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
130            StreamingDdlKind::ShowQueries => {
131                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
132            }
133            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
134                ShowCommand::MaterializedViews,
135            )]),
136            StreamingDdlKind::DescribeSource => {
137                let mut parser =
138                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
139                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
140                Ok(vec![stmt])
141            }
142            StreamingDdlKind::ExplainStreaming => {
143                let mut parser =
144                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
145                let stmt =
146                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
147                Ok(vec![stmt])
148            }
149            StreamingDdlKind::CreateMaterializedView { .. } => {
150                let mut parser =
151                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
152                let stmt = parse_create_materialized_view(&mut parser, sql_trimmed)
153                    .map_err(parse_error_to_parser_error)?;
154                Ok(vec![stmt])
155            }
156            StreamingDdlKind::CreateStream { .. } => {
157                let mut parser =
158                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
159                let stmt = parse_create_stream(&mut parser, sql_trimmed)
160                    .map_err(parse_error_to_parser_error)?;
161                Ok(vec![stmt])
162            }
163            StreamingDdlKind::DropStream { .. } => {
164                let mut parser =
165                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
166                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
167                Ok(vec![stmt])
168            }
169            StreamingDdlKind::ShowStreams => {
170                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
171            }
172            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
173            StreamingDdlKind::CreateLookupTable { .. } => {
174                let mut parser =
175                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
176                let lt = lookup_table::parse_create_lookup_table(&mut parser)
177                    .map_err(parse_error_to_parser_error)?;
178                Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
179            }
180            StreamingDdlKind::DropLookupTable { .. } => {
181                let mut parser =
182                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
183                let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
184                    .map_err(parse_error_to_parser_error)?;
185                Ok(vec![StreamingStatement::DropLookupTable {
186                    name,
187                    if_exists,
188                }])
189            }
190            StreamingDdlKind::AlterSource => {
191                let mut parser =
192                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
193                let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
194                Ok(vec![stmt])
195            }
196            StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
197                ShowCommand::CheckpointStatus,
198            )]),
199            StreamingDdlKind::ShowCreateSource => {
200                let mut parser =
201                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
202                let stmt =
203                    parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
204                Ok(vec![stmt])
205            }
206            StreamingDdlKind::ShowCreateSink => {
207                let mut parser =
208                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
209                let stmt =
210                    parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
211                Ok(vec![stmt])
212            }
213            StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
214            StreamingDdlKind::RestoreCheckpoint => {
215                let mut parser =
216                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
217                let stmt =
218                    parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
219                Ok(vec![stmt])
220            }
221            StreamingDdlKind::None => {
222                // Standard SQL - check for INSERT INTO and convert
223                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
224                Ok(statements
225                    .into_iter()
226                    .map(convert_standard_statement)
227                    .collect())
228            }
229        }
230    }
231
232    /// Check if an expression contains a window function.
233    #[must_use]
234    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
235        match expr {
236            sqlparser::ast::Expr::Function(func) => {
237                if let Some(name) = func.name.0.last() {
238                    let func_name = name.to_string().to_uppercase();
239                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
240                } else {
241                    false
242                }
243            }
244            _ => false,
245        }
246    }
247
248    /// Parse EMIT clause from SQL string.
249    ///
250    /// # Errors
251    ///
252    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
253    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
254        emit_parser::parse_emit_clause_from_sql(sql)
255    }
256
257    /// Parse late data handling clause from SQL string.
258    ///
259    /// # Errors
260    ///
261    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
262    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
263        late_data_parser::parse_late_data_clause_from_sql(sql)
264    }
265}
266
267/// Convert `ParseError` to `ParserError` for backward compatibility.
268fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
269    match e {
270        ParseError::SqlParseError(pe) => pe,
271        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
272        ParseError::WindowError(msg) => {
273            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
274        }
275        ParseError::ValidationError(msg) => {
276            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
277        }
278    }
279}
280
281/// Parse a RESTORE FROM CHECKPOINT statement.
282///
283/// Syntax: `RESTORE FROM CHECKPOINT <id>`
284///
285/// # Errors
286///
287/// Returns `ParseError` if the statement syntax is invalid.
288fn parse_restore_checkpoint(
289    parser: &mut sqlparser::parser::Parser,
290) -> Result<StreamingStatement, ParseError> {
291    // Consume RESTORE
292    tokenizer::expect_custom_keyword(parser, "RESTORE")?;
293    // Consume FROM
294    if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
295        return Err(ParseError::StreamingError(
296            "Expected FROM after RESTORE".to_string(),
297        ));
298    }
299    // Consume CHECKPOINT
300    tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
301    // Parse checkpoint ID (numeric literal)
302    let token = parser.next_token();
303    match &token.token {
304        sqlparser::tokenizer::Token::Number(n, _) => {
305            let id: u64 = n
306                .parse()
307                .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
308            Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
309        }
310        other => Err(ParseError::StreamingError(format!(
311            "Expected checkpoint ID (number), found {other}"
312        ))),
313    }
314}
315
316/// Convert a standard sqlparser statement to a `StreamingStatement`.
317///
318/// Detects INSERT INTO statements and converts them to the streaming
319/// `InsertInto` variant. All other statements are wrapped as `Standard`.
320fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
321    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
322        // Extract table name from TableObject
323        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
324            let table_name = name.clone();
325            let columns = insert.columns.clone();
326
327            // Try to extract VALUES rows from source query
328            if let Some(ref source) = insert.source {
329                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
330                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
331                    return StreamingStatement::InsertInto {
332                        table_name,
333                        columns,
334                        values: rows,
335                    };
336                }
337            }
338        }
339    }
340    StreamingStatement::Standard(Box::new(stmt))
341}
342
343/// Parse a DROP SOURCE statement.
344///
345/// Syntax: `DROP SOURCE [IF EXISTS] name [CASCADE]`
346///
347/// # Errors
348///
349/// Returns `ParseError` if the statement syntax is invalid.
350fn parse_drop_source(
351    parser: &mut sqlparser::parser::Parser,
352) -> Result<StreamingStatement, ParseError> {
353    parser
354        .expect_keyword(sqlparser::keywords::Keyword::DROP)
355        .map_err(ParseError::SqlParseError)?;
356    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
357    let if_exists = parser.parse_keywords(&[
358        sqlparser::keywords::Keyword::IF,
359        sqlparser::keywords::Keyword::EXISTS,
360    ]);
361    let name = parser
362        .parse_object_name(false)
363        .map_err(ParseError::SqlParseError)?;
364    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
365    Ok(StreamingStatement::DropSource {
366        name,
367        if_exists,
368        cascade,
369    })
370}
371
372/// Parse a DROP SINK statement.
373///
374/// Syntax: `DROP SINK [IF EXISTS] name [CASCADE]`
375///
376/// # Errors
377///
378/// Returns `ParseError` if the statement syntax is invalid.
379fn parse_drop_sink(
380    parser: &mut sqlparser::parser::Parser,
381) -> Result<StreamingStatement, ParseError> {
382    parser
383        .expect_keyword(sqlparser::keywords::Keyword::DROP)
384        .map_err(ParseError::SqlParseError)?;
385    tokenizer::expect_custom_keyword(parser, "SINK")?;
386    let if_exists = parser.parse_keywords(&[
387        sqlparser::keywords::Keyword::IF,
388        sqlparser::keywords::Keyword::EXISTS,
389    ]);
390    let name = parser
391        .parse_object_name(false)
392        .map_err(ParseError::SqlParseError)?;
393    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
394    Ok(StreamingStatement::DropSink {
395        name,
396        if_exists,
397        cascade,
398    })
399}
400
401/// Parse a DROP MATERIALIZED VIEW statement.
402///
403/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
404///
405/// # Errors
406///
407/// Returns `ParseError` if the statement syntax is invalid.
408fn parse_drop_materialized_view(
409    parser: &mut sqlparser::parser::Parser,
410) -> Result<StreamingStatement, ParseError> {
411    parser
412        .expect_keyword(sqlparser::keywords::Keyword::DROP)
413        .map_err(ParseError::SqlParseError)?;
414    parser
415        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
416        .map_err(ParseError::SqlParseError)?;
417    parser
418        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
419        .map_err(ParseError::SqlParseError)?;
420    let if_exists = parser.parse_keywords(&[
421        sqlparser::keywords::Keyword::IF,
422        sqlparser::keywords::Keyword::EXISTS,
423    ]);
424    let name = parser
425        .parse_object_name(false)
426        .map_err(ParseError::SqlParseError)?;
427    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
428    Ok(StreamingStatement::DropMaterializedView {
429        name,
430        if_exists,
431        cascade,
432    })
433}
434
435/// Parse a CREATE STREAM statement.
436///
437/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
438///
439/// # Errors
440///
441/// Returns `ParseError` if the statement syntax is invalid.
442fn parse_create_stream(
443    parser: &mut sqlparser::parser::Parser,
444    original_sql: &str,
445) -> Result<StreamingStatement, ParseError> {
446    parser
447        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
448        .map_err(ParseError::SqlParseError)?;
449
450    let or_replace = parser.parse_keywords(&[
451        sqlparser::keywords::Keyword::OR,
452        sqlparser::keywords::Keyword::REPLACE,
453    ]);
454
455    tokenizer::expect_custom_keyword(parser, "STREAM")?;
456
457    let if_not_exists = parser.parse_keywords(&[
458        sqlparser::keywords::Keyword::IF,
459        sqlparser::keywords::Keyword::NOT,
460        sqlparser::keywords::Keyword::EXISTS,
461    ]);
462
463    let name = parser
464        .parse_object_name(false)
465        .map_err(ParseError::SqlParseError)?;
466
467    parser
468        .expect_keyword(sqlparser::keywords::Keyword::AS)
469        .map_err(ParseError::SqlParseError)?;
470
471    // Collect remaining tokens and split at EMIT boundary
472    let remaining = collect_remaining_tokens(parser);
473    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
474
475    let query_sql = query_body_sql(original_sql, &query_tokens, &emit_tokens);
476
477    let stream_dialect = LaminarDialect::default();
478
479    let query = if query_tokens.is_empty() {
480        return Err(ParseError::StreamingError(
481            "Expected SELECT query after AS".to_string(),
482        ));
483    } else {
484        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
485            .with_tokens_with_locations(query_tokens);
486        query_parser
487            .parse_query()
488            .map_err(ParseError::SqlParseError)?
489    };
490
491    let query_stmt =
492        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
493
494    let emit_clause = if emit_tokens.is_empty() {
495        None
496    } else {
497        let mut emit_parser =
498            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
499        emit_parser::parse_emit_clause(&mut emit_parser)?
500    };
501
502    Ok(StreamingStatement::CreateStream {
503        name,
504        query: Box::new(query_stmt),
505        emit_clause,
506        or_replace,
507        if_not_exists,
508        query_sql,
509    })
510}
511
512/// Slice `original_sql` from the first query token to the start of EMIT
513/// (or end of input). Preserves custom streaming syntax that sqlparser's
514/// AST would drop. Falls back to joining token text if spans are empty.
515pub(super) fn query_body_sql(
516    original_sql: &str,
517    query_tokens: &[sqlparser::tokenizer::TokenWithSpan],
518    emit_tokens: &[sqlparser::tokenizer::TokenWithSpan],
519) -> String {
520    use sqlparser::tokenizer::Token;
521
522    let from_spans = || -> Option<String> {
523        let first = query_tokens
524            .iter()
525            .find(|t| !matches!(t.token, Token::EOF))?;
526        let start = location_to_byte_offset(original_sql, first.span.start)?;
527        let end = match emit_tokens.first() {
528            Some(t) => location_to_byte_offset(original_sql, t.span.start)?,
529            None => original_sql.len(),
530        };
531        let slice = original_sql.get(start..end)?;
532        Some(
533            slice
534                .trim_end_matches(|c: char| c.is_whitespace() || c == ';')
535                .to_string(),
536        )
537    };
538
539    from_spans().unwrap_or_else(|| {
540        query_tokens
541            .iter()
542            .take_while(|t| !matches!(t.token, Token::EOF))
543            .map(|t| t.token.to_string())
544            .collect::<Vec<_>>()
545            .join(" ")
546    })
547}
548
549/// Parse a DROP STREAM statement.
550///
551/// Syntax: `DROP STREAM [IF EXISTS] name [CASCADE]`
552///
553/// # Errors
554///
555/// Returns `ParseError` if the statement syntax is invalid.
556fn parse_drop_stream(
557    parser: &mut sqlparser::parser::Parser,
558) -> Result<StreamingStatement, ParseError> {
559    parser
560        .expect_keyword(sqlparser::keywords::Keyword::DROP)
561        .map_err(ParseError::SqlParseError)?;
562    tokenizer::expect_custom_keyword(parser, "STREAM")?;
563    let if_exists = parser.parse_keywords(&[
564        sqlparser::keywords::Keyword::IF,
565        sqlparser::keywords::Keyword::EXISTS,
566    ]);
567    let name = parser
568        .parse_object_name(false)
569        .map_err(ParseError::SqlParseError)?;
570    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
571    Ok(StreamingStatement::DropStream {
572        name,
573        if_exists,
574        cascade,
575    })
576}
577
578/// Parse a DESCRIBE statement.
579///
580/// Syntax: `DESCRIBE [EXTENDED] name`
581/// Parse an ALTER SOURCE statement.
582///
583/// Syntax:
584/// - `ALTER SOURCE name ADD COLUMN col_name data_type`
585/// - `ALTER SOURCE name SET ('key' = 'value', ...)`
586///
587/// # Errors
588///
589/// Returns `ParseError` if the statement syntax is invalid.
590fn parse_alter_source(
591    parser: &mut sqlparser::parser::Parser,
592) -> Result<StreamingStatement, ParseError> {
593    parser
594        .expect_keyword(sqlparser::keywords::Keyword::ALTER)
595        .map_err(ParseError::SqlParseError)?;
596    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
597    let name = parser
598        .parse_object_name(false)
599        .map_err(ParseError::SqlParseError)?;
600
601    // Determine operation: ADD COLUMN or SET
602    if parser.parse_keywords(&[
603        sqlparser::keywords::Keyword::ADD,
604        sqlparser::keywords::Keyword::COLUMN,
605    ]) {
606        // ALTER SOURCE name ADD COLUMN col_name data_type
607        let col_name = parser
608            .parse_identifier()
609            .map_err(ParseError::SqlParseError)?;
610        let data_type = parser
611            .parse_data_type()
612            .map_err(ParseError::SqlParseError)?;
613        let column_def = sqlparser::ast::ColumnDef {
614            name: col_name,
615            data_type,
616            options: vec![],
617        };
618        Ok(StreamingStatement::AlterSource {
619            name,
620            operation: statements::AlterSourceOperation::AddColumn { column_def },
621        })
622    } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
623        // ALTER SOURCE name SET ('key' = 'value', ...)
624        parser
625            .expect_token(&sqlparser::tokenizer::Token::LParen)
626            .map_err(ParseError::SqlParseError)?;
627        #[allow(clippy::disallowed_types)] // cold path: SQL parsing
628        let mut properties = std::collections::HashMap::new();
629        loop {
630            let key = parser
631                .parse_literal_string()
632                .map_err(ParseError::SqlParseError)?;
633            parser
634                .expect_token(&sqlparser::tokenizer::Token::Eq)
635                .map_err(ParseError::SqlParseError)?;
636            let value = parser
637                .parse_literal_string()
638                .map_err(ParseError::SqlParseError)?;
639            properties.insert(key, value);
640            if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
641                break;
642            }
643        }
644        parser
645            .expect_token(&sqlparser::tokenizer::Token::RParen)
646            .map_err(ParseError::SqlParseError)?;
647        Ok(StreamingStatement::AlterSource {
648            name,
649            operation: statements::AlterSourceOperation::SetProperties { properties },
650        })
651    } else {
652        Err(ParseError::StreamingError(
653            "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
654        ))
655    }
656}
657
658/// Parse a DESCRIBE statement.
659///
660/// # Errors
661///
662/// Returns `ParseError` if the statement syntax is invalid.
663fn parse_describe(
664    parser: &mut sqlparser::parser::Parser,
665) -> Result<StreamingStatement, ParseError> {
666    // Consume DESCRIBE or DESC
667    let token = parser.next_token();
668    match &token.token {
669        sqlparser::tokenizer::Token::Word(w)
670            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
671                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
672        _ => {
673            return Err(ParseError::StreamingError(
674                "Expected DESCRIBE or DESC".to_string(),
675            ));
676        }
677    }
678    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
679    let name = parser
680        .parse_object_name(false)
681        .map_err(ParseError::SqlParseError)?;
682    Ok(StreamingStatement::Describe { name, extended })
683}
684
685/// Parse `SHOW CREATE SOURCE <name>`.
686fn parse_show_create_source(
687    parser: &mut sqlparser::parser::Parser,
688) -> Result<StreamingStatement, ParseError> {
689    // Consume SHOW CREATE SOURCE
690    parser
691        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
692        .map_err(ParseError::SqlParseError)?;
693    parser
694        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
695        .map_err(ParseError::SqlParseError)?;
696    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
697    let name = parser
698        .parse_object_name(false)
699        .map_err(ParseError::SqlParseError)?;
700    Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
701}
702
703/// Parse `SHOW CREATE SINK <name>`.
704fn parse_show_create_sink(
705    parser: &mut sqlparser::parser::Parser,
706) -> Result<StreamingStatement, ParseError> {
707    // Consume SHOW CREATE SINK
708    parser
709        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
710        .map_err(ParseError::SqlParseError)?;
711    parser
712        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
713        .map_err(ParseError::SqlParseError)?;
714    tokenizer::expect_custom_keyword(parser, "SINK")?;
715    let name = parser
716        .parse_object_name(false)
717        .map_err(ParseError::SqlParseError)?;
718    Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
719}
720
721/// Parse an EXPLAIN [ANALYZE] statement wrapping a streaming query.
722///
723/// Syntax: `EXPLAIN [ANALYZE] <streaming_statement>`
724///
725/// # Errors
726///
727/// Returns `ParseError` if the statement syntax is invalid.
728fn parse_explain(
729    parser: &mut sqlparser::parser::Parser,
730    original_sql: &str,
731) -> Result<StreamingStatement, ParseError> {
732    parser
733        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
734        .map_err(ParseError::SqlParseError)?;
735
736    // Check for optional ANALYZE keyword
737    let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
738
739    // Find the position after EXPLAIN [ANALYZE] in the original SQL
740    let explain_prefix_upper = original_sql.to_uppercase();
741    let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
742    let inner_start = if analyze {
743        explain_prefix_upper
744            .find("ANALYZE")
745            .map_or(0, |pos| pos + "ANALYZE".len())
746    } else {
747        explain_prefix_upper
748            .find("EXPLAIN")
749            .map_or(0, |pos| pos + "EXPLAIN".len())
750    };
751    let inner_sql = original_sql[inner_start..].trim();
752    let _ = skip_keyword; // suppress unused warning
753
754    // Parse the inner statement recursively
755    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
756    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
757        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
758    })?;
759    Ok(StreamingStatement::Explain {
760        statement: Box::new(inner),
761        analyze,
762    })
763}
764
765/// Parse a CREATE MATERIALIZED VIEW statement.
766///
767/// Syntax:
768/// ```sql
769/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
770/// AS <select_query>
771/// [EMIT <strategy>]
772/// ```
773///
774/// # Errors
775///
776/// Returns `ParseError` if the statement syntax is invalid.
777fn parse_create_materialized_view(
778    parser: &mut sqlparser::parser::Parser,
779    original_sql: &str,
780) -> Result<StreamingStatement, ParseError> {
781    parser
782        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
783        .map_err(ParseError::SqlParseError)?;
784
785    let or_replace = parser.parse_keywords(&[
786        sqlparser::keywords::Keyword::OR,
787        sqlparser::keywords::Keyword::REPLACE,
788    ]);
789
790    parser
791        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
792        .map_err(ParseError::SqlParseError)?;
793    parser
794        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
795        .map_err(ParseError::SqlParseError)?;
796
797    let if_not_exists = parser.parse_keywords(&[
798        sqlparser::keywords::Keyword::IF,
799        sqlparser::keywords::Keyword::NOT,
800        sqlparser::keywords::Keyword::EXISTS,
801    ]);
802
803    let name = parser
804        .parse_object_name(false)
805        .map_err(ParseError::SqlParseError)?;
806
807    parser
808        .expect_keyword(sqlparser::keywords::Keyword::AS)
809        .map_err(ParseError::SqlParseError)?;
810
811    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
812    let remaining = collect_remaining_tokens(parser);
813    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
814
815    let query_sql = query_body_sql(original_sql, &query_tokens, &emit_tokens);
816
817    let mv_dialect = LaminarDialect::default();
818
819    let query = if query_tokens.is_empty() {
820        return Err(ParseError::StreamingError(
821            "Expected SELECT query after AS".to_string(),
822        ));
823    } else {
824        let mut query_parser =
825            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
826        query_parser
827            .parse_query()
828            .map_err(ParseError::SqlParseError)?
829    };
830
831    let query_stmt =
832        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
833
834    let emit_clause = if emit_tokens.is_empty() {
835        None
836    } else {
837        let mut emit_parser =
838            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
839        emit_parser::parse_emit_clause(&mut emit_parser)?
840    };
841
842    Ok(StreamingStatement::CreateMaterializedView {
843        name,
844        query: Box::new(query_stmt),
845        emit_clause,
846        or_replace,
847        if_not_exists,
848        query_sql,
849    })
850}
851
852/// Byte offset in `sql` for a sqlparser `Location` (1-indexed line/column).
853fn location_to_byte_offset(sql: &str, loc: sqlparser::tokenizer::Location) -> Option<usize> {
854    if loc.line == 0 {
855        return None;
856    }
857    let (mut line, mut col) = (1u64, 1u64);
858    for (idx, ch) in sql.char_indices() {
859        if line == loc.line && col == loc.column {
860            return Some(idx);
861        }
862        if ch == '\n' {
863            line += 1;
864            col = 1;
865        } else {
866            col += 1;
867        }
868    }
869    (line == loc.line && col == loc.column).then_some(sql.len())
870}
871
872/// Collect all remaining tokens from the parser into a Vec.
873fn collect_remaining_tokens(
874    parser: &mut sqlparser::parser::Parser,
875) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
876    let mut tokens = Vec::new();
877    loop {
878        let token = parser.next_token();
879        if token.token == sqlparser::tokenizer::Token::EOF {
880            tokens.push(token);
881            break;
882        }
883        tokens.push(token);
884    }
885    tokens
886}
887
888/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
889///
890/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
891/// (or is empty if no EMIT found).
892fn split_at_emit(
893    tokens: &[sqlparser::tokenizer::TokenWithSpan],
894) -> (
895    Vec<sqlparser::tokenizer::TokenWithSpan>,
896    Vec<sqlparser::tokenizer::TokenWithSpan>,
897) {
898    let mut depth: i32 = 0;
899    for (i, token) in tokens.iter().enumerate() {
900        match &token.token {
901            sqlparser::tokenizer::Token::LParen => depth += 1,
902            sqlparser::tokenizer::Token::RParen => {
903                depth -= 1;
904            }
905            sqlparser::tokenizer::Token::Word(w)
906                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
907            {
908                let mut query_tokens = tokens[..i].to_vec();
909                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
910                    token: sqlparser::tokenizer::Token::EOF,
911                    span: sqlparser::tokenizer::Span::empty(),
912                });
913                let emit_tokens = tokens[i..].to_vec();
914                return (query_tokens, emit_tokens);
915            }
916            _ => {}
917        }
918    }
919    (tokens.to_vec(), vec![])
920}
921
922/// SQL parsing errors
923#[derive(Debug, thiserror::Error)]
924pub enum ParseError {
925    /// Standard SQL parse error
926    #[error("SQL parse error: {0}")]
927    SqlParseError(#[from] sqlparser::parser::ParserError),
928
929    /// Streaming extension parse error
930    #[error("Streaming SQL error: {0}")]
931    StreamingError(String),
932
933    /// Window function error
934    #[error("Window function error: {0}")]
935    WindowError(String),
936
937    /// Validation error (e.g., invalid option values)
938    #[error("Validation error: {0}")]
939    ValidationError(String),
940}
941
942#[cfg(test)]
943mod tests {
944    use super::*;
945
946    /// Helper to parse SQL and return the first statement.
947    fn parse_one(sql: &str) -> StreamingStatement {
948        let stmts = StreamingParser::parse_sql(sql).unwrap();
949        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
950        stmts.into_iter().next().unwrap()
951    }
952
953    #[test]
954    fn test_parse_drop_source() {
955        let stmt = parse_one("DROP SOURCE events");
956        match stmt {
957            StreamingStatement::DropSource {
958                name,
959                if_exists,
960                cascade,
961            } => {
962                assert_eq!(name.to_string(), "events");
963                assert!(!if_exists);
964                assert!(!cascade);
965            }
966            _ => panic!("Expected DropSource, got {stmt:?}"),
967        }
968    }
969
970    #[test]
971    fn test_parse_drop_source_if_exists() {
972        let stmt = parse_one("DROP SOURCE IF EXISTS events");
973        match stmt {
974            StreamingStatement::DropSource {
975                name,
976                if_exists,
977                cascade,
978            } => {
979                assert_eq!(name.to_string(), "events");
980                assert!(if_exists);
981                assert!(!cascade);
982            }
983            _ => panic!("Expected DropSource, got {stmt:?}"),
984        }
985    }
986
987    #[test]
988    fn test_parse_drop_source_cascade() {
989        let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
990        match stmt {
991            StreamingStatement::DropSource {
992                name,
993                if_exists,
994                cascade,
995            } => {
996                assert_eq!(name.to_string(), "events");
997                assert!(if_exists);
998                assert!(cascade);
999            }
1000            _ => panic!("Expected DropSource, got {stmt:?}"),
1001        }
1002    }
1003
1004    #[test]
1005    fn test_parse_drop_sink() {
1006        let stmt = parse_one("DROP SINK output");
1007        match stmt {
1008            StreamingStatement::DropSink {
1009                name,
1010                if_exists,
1011                cascade,
1012            } => {
1013                assert_eq!(name.to_string(), "output");
1014                assert!(!if_exists);
1015                assert!(!cascade);
1016            }
1017            _ => panic!("Expected DropSink, got {stmt:?}"),
1018        }
1019    }
1020
1021    #[test]
1022    fn test_parse_drop_sink_if_exists() {
1023        let stmt = parse_one("DROP SINK IF EXISTS output");
1024        match stmt {
1025            StreamingStatement::DropSink {
1026                name,
1027                if_exists,
1028                cascade,
1029            } => {
1030                assert_eq!(name.to_string(), "output");
1031                assert!(if_exists);
1032                assert!(!cascade);
1033            }
1034            _ => panic!("Expected DropSink, got {stmt:?}"),
1035        }
1036    }
1037
1038    #[test]
1039    fn test_parse_drop_sink_cascade() {
1040        let stmt = parse_one("DROP SINK output CASCADE");
1041        match stmt {
1042            StreamingStatement::DropSink {
1043                name,
1044                if_exists,
1045                cascade,
1046            } => {
1047                assert_eq!(name.to_string(), "output");
1048                assert!(!if_exists);
1049                assert!(cascade);
1050            }
1051            _ => panic!("Expected DropSink, got {stmt:?}"),
1052        }
1053    }
1054
1055    #[test]
1056    fn test_parse_drop_materialized_view() {
1057        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
1058        match stmt {
1059            StreamingStatement::DropMaterializedView {
1060                name,
1061                if_exists,
1062                cascade,
1063            } => {
1064                assert_eq!(name.to_string(), "live_stats");
1065                assert!(!if_exists);
1066                assert!(!cascade);
1067            }
1068            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1069        }
1070    }
1071
1072    #[test]
1073    fn test_parse_drop_materialized_view_if_exists_cascade() {
1074        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1075        match stmt {
1076            StreamingStatement::DropMaterializedView {
1077                name,
1078                if_exists,
1079                cascade,
1080            } => {
1081                assert_eq!(name.to_string(), "live_stats");
1082                assert!(if_exists);
1083                assert!(cascade);
1084            }
1085            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1086        }
1087    }
1088
1089    #[test]
1090    fn test_parse_show_sources() {
1091        let stmt = parse_one("SHOW SOURCES");
1092        assert!(matches!(
1093            stmt,
1094            StreamingStatement::Show(ShowCommand::Sources)
1095        ));
1096    }
1097
1098    #[test]
1099    fn test_parse_show_sinks() {
1100        let stmt = parse_one("SHOW SINKS");
1101        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1102    }
1103
1104    #[test]
1105    fn test_parse_show_queries() {
1106        let stmt = parse_one("SHOW QUERIES");
1107        assert!(matches!(
1108            stmt,
1109            StreamingStatement::Show(ShowCommand::Queries)
1110        ));
1111    }
1112
1113    #[test]
1114    fn test_parse_show_materialized_views() {
1115        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1116        assert!(matches!(
1117            stmt,
1118            StreamingStatement::Show(ShowCommand::MaterializedViews)
1119        ));
1120    }
1121
1122    #[test]
1123    fn test_parse_describe() {
1124        let stmt = parse_one("DESCRIBE events");
1125        match stmt {
1126            StreamingStatement::Describe { name, extended } => {
1127                assert_eq!(name.to_string(), "events");
1128                assert!(!extended);
1129            }
1130            _ => panic!("Expected Describe, got {stmt:?}"),
1131        }
1132    }
1133
1134    #[test]
1135    fn test_parse_describe_extended() {
1136        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1137        match stmt {
1138            StreamingStatement::Describe { name, extended } => {
1139                assert_eq!(name.to_string(), "my_schema.events");
1140                assert!(extended);
1141            }
1142            _ => panic!("Expected Describe, got {stmt:?}"),
1143        }
1144    }
1145
1146    #[test]
1147    fn test_parse_explain_select() {
1148        let stmt = parse_one("EXPLAIN SELECT * FROM events");
1149        match stmt {
1150            StreamingStatement::Explain {
1151                statement, analyze, ..
1152            } => {
1153                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1154                assert!(!analyze);
1155            }
1156            _ => panic!("Expected Explain, got {stmt:?}"),
1157        }
1158    }
1159
1160    #[test]
1161    fn test_parse_explain_create_source() {
1162        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1163        match stmt {
1164            StreamingStatement::Explain { statement, .. } => {
1165                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1166            }
1167            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1168        }
1169    }
1170
1171    #[test]
1172    fn test_parse_explain_analyze_select() {
1173        let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1174        match stmt {
1175            StreamingStatement::Explain {
1176                statement, analyze, ..
1177            } => {
1178                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1179                assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1180            }
1181            _ => panic!("Expected Explain, got {stmt:?}"),
1182        }
1183    }
1184
1185    #[test]
1186    fn test_parse_create_materialized_view() {
1187        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1188        match stmt {
1189            StreamingStatement::CreateMaterializedView {
1190                name,
1191                emit_clause,
1192                or_replace,
1193                if_not_exists,
1194                ..
1195            } => {
1196                assert_eq!(name.to_string(), "live_stats");
1197                assert!(emit_clause.is_none());
1198                assert!(!or_replace);
1199                assert!(!if_not_exists);
1200            }
1201            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1202        }
1203    }
1204
1205    #[test]
1206    fn test_parse_create_materialized_view_with_emit() {
1207        let stmt = parse_one(
1208            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1209        );
1210        match stmt {
1211            StreamingStatement::CreateMaterializedView {
1212                name, emit_clause, ..
1213            } => {
1214                assert_eq!(name.to_string(), "live_stats");
1215                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1216            }
1217            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1218        }
1219    }
1220
1221    #[test]
1222    fn test_parse_create_or_replace_materialized_view() {
1223        let stmt = parse_one(
1224            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1225        );
1226        match stmt {
1227            StreamingStatement::CreateMaterializedView {
1228                name,
1229                or_replace,
1230                if_not_exists,
1231                ..
1232            } => {
1233                assert_eq!(name.to_string(), "live_stats");
1234                assert!(or_replace);
1235                assert!(!if_not_exists);
1236            }
1237            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1238        }
1239    }
1240
1241    #[test]
1242    fn test_parse_create_materialized_view_if_not_exists() {
1243        let stmt = parse_one(
1244            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1245        );
1246        match stmt {
1247            StreamingStatement::CreateMaterializedView {
1248                name,
1249                or_replace,
1250                if_not_exists,
1251                ..
1252            } => {
1253                assert_eq!(name.to_string(), "live_stats");
1254                assert!(!or_replace);
1255                assert!(if_not_exists);
1256            }
1257            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1258        }
1259    }
1260
1261    #[test]
1262    fn test_parse_insert_into() {
1263        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1264        match stmt {
1265            StreamingStatement::InsertInto {
1266                table_name,
1267                columns,
1268                values,
1269            } => {
1270                assert_eq!(table_name.to_string(), "events");
1271                assert_eq!(columns.len(), 2);
1272                assert_eq!(columns[0].to_string(), "id");
1273                assert_eq!(columns[1].to_string(), "name");
1274                assert_eq!(values.len(), 1);
1275                assert_eq!(values[0].len(), 2);
1276            }
1277            _ => panic!("Expected InsertInto, got {stmt:?}"),
1278        }
1279    }
1280
1281    #[test]
1282    fn test_parse_insert_into_multiple_rows() {
1283        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1284        match stmt {
1285            StreamingStatement::InsertInto {
1286                table_name,
1287                columns,
1288                values,
1289            } => {
1290                assert_eq!(table_name.to_string(), "events");
1291                assert!(columns.is_empty());
1292                assert_eq!(values.len(), 3);
1293            }
1294            _ => panic!("Expected InsertInto, got {stmt:?}"),
1295        }
1296    }
1297
1298    // ── CREATE STREAM tests ─────────────────────────────
1299
1300    #[test]
1301    fn test_parse_create_stream() {
1302        let stmt = parse_one(
1303            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1304        );
1305        match stmt {
1306            StreamingStatement::CreateStream {
1307                name,
1308                or_replace,
1309                if_not_exists,
1310                emit_clause,
1311                ..
1312            } => {
1313                assert_eq!(name.to_string(), "session_activity");
1314                assert!(!or_replace);
1315                assert!(!if_not_exists);
1316                assert!(emit_clause.is_none());
1317            }
1318            _ => panic!("Expected CreateStream, got {stmt:?}"),
1319        }
1320    }
1321
1322    #[test]
1323    fn test_parse_create_or_replace_stream() {
1324        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1325        match stmt {
1326            StreamingStatement::CreateStream { or_replace, .. } => {
1327                assert!(or_replace);
1328            }
1329            _ => panic!("Expected CreateStream, got {stmt:?}"),
1330        }
1331    }
1332
1333    #[test]
1334    fn test_parse_create_stream_if_not_exists() {
1335        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1336        match stmt {
1337            StreamingStatement::CreateStream { if_not_exists, .. } => {
1338                assert!(if_not_exists);
1339            }
1340            _ => panic!("Expected CreateStream, got {stmt:?}"),
1341        }
1342    }
1343
1344    #[test]
1345    fn test_parse_create_stream_with_emit() {
1346        let stmt =
1347            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1348        match stmt {
1349            StreamingStatement::CreateStream { emit_clause, .. } => {
1350                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1351            }
1352            _ => panic!("Expected CreateStream, got {stmt:?}"),
1353        }
1354    }
1355
1356    #[test]
1357    fn test_parse_drop_stream() {
1358        let stmt = parse_one("DROP STREAM my_stream");
1359        match stmt {
1360            StreamingStatement::DropStream {
1361                name,
1362                if_exists,
1363                cascade,
1364            } => {
1365                assert_eq!(name.to_string(), "my_stream");
1366                assert!(!if_exists);
1367                assert!(!cascade);
1368            }
1369            _ => panic!("Expected DropStream, got {stmt:?}"),
1370        }
1371    }
1372
1373    #[test]
1374    fn test_parse_drop_stream_if_exists() {
1375        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1376        match stmt {
1377            StreamingStatement::DropStream {
1378                name,
1379                if_exists,
1380                cascade,
1381            } => {
1382                assert_eq!(name.to_string(), "my_stream");
1383                assert!(if_exists);
1384                assert!(!cascade);
1385            }
1386            _ => panic!("Expected DropStream, got {stmt:?}"),
1387        }
1388    }
1389
1390    #[test]
1391    fn test_parse_drop_stream_cascade() {
1392        let stmt = parse_one("DROP STREAM my_stream CASCADE");
1393        match stmt {
1394            StreamingStatement::DropStream {
1395                name,
1396                if_exists,
1397                cascade,
1398            } => {
1399                assert_eq!(name.to_string(), "my_stream");
1400                assert!(!if_exists);
1401                assert!(cascade);
1402            }
1403            _ => panic!("Expected DropStream, got {stmt:?}"),
1404        }
1405    }
1406
1407    #[test]
1408    fn test_parse_show_streams() {
1409        let stmt = parse_one("SHOW STREAMS");
1410        assert!(matches!(
1411            stmt,
1412            StreamingStatement::Show(ShowCommand::Streams)
1413        ));
1414    }
1415
1416    #[test]
1417    fn test_parse_alter_source_add_column() {
1418        let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1419        match stmt {
1420            StreamingStatement::AlterSource { name, operation } => {
1421                assert_eq!(name.to_string(), "events");
1422                match operation {
1423                    statements::AlterSourceOperation::AddColumn { column_def } => {
1424                        assert_eq!(column_def.name.value, "new_col");
1425                        assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1426                    }
1427                    statements::AlterSourceOperation::SetProperties { .. } => {
1428                        panic!("Expected AddColumn")
1429                    }
1430                }
1431            }
1432            _ => panic!("Expected AlterSource, got {stmt:?}"),
1433        }
1434    }
1435
1436    #[test]
1437    fn test_parse_alter_source_set_properties() {
1438        let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1439        match stmt {
1440            StreamingStatement::AlterSource { name, operation } => {
1441                assert_eq!(name.to_string(), "events");
1442                match operation {
1443                    statements::AlterSourceOperation::SetProperties { properties } => {
1444                        assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1445                        assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1446                    }
1447                    statements::AlterSourceOperation::AddColumn { .. } => {
1448                        panic!("Expected SetProperties")
1449                    }
1450                }
1451            }
1452            _ => panic!("Expected AlterSource, got {stmt:?}"),
1453        }
1454    }
1455
1456    #[test]
1457    fn test_parse_checkpoint() {
1458        let stmt = parse_one("CHECKPOINT");
1459        assert!(
1460            matches!(stmt, StreamingStatement::Checkpoint),
1461            "Expected Checkpoint, got {stmt:?}"
1462        );
1463    }
1464
1465    #[test]
1466    fn test_parse_show_checkpoint_status() {
1467        let stmt = parse_one("SHOW CHECKPOINT STATUS");
1468        assert!(
1469            matches!(
1470                stmt,
1471                StreamingStatement::Show(ShowCommand::CheckpointStatus)
1472            ),
1473            "Expected Show(CheckpointStatus), got {stmt:?}"
1474        );
1475    }
1476
1477    #[test]
1478    fn test_parse_restore_checkpoint() {
1479        let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1480        match stmt {
1481            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1482                assert_eq!(checkpoint_id, 42);
1483            }
1484            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1485        }
1486    }
1487
1488    #[test]
1489    fn test_parse_restore_checkpoint_large_id() {
1490        let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1491        match stmt {
1492            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1493                assert_eq!(checkpoint_id, 123_456);
1494            }
1495            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1496        }
1497    }
1498
1499    #[test]
1500    fn test_parse_show_create_source() {
1501        let stmt = parse_one("SHOW CREATE SOURCE events");
1502        match stmt {
1503            StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1504                assert_eq!(name.to_string(), "events");
1505            }
1506            _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1507        }
1508    }
1509
1510    #[test]
1511    fn test_parse_show_create_sink() {
1512        let stmt = parse_one("SHOW CREATE SINK output");
1513        match stmt {
1514            StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1515                assert_eq!(name.to_string(), "output");
1516            }
1517            _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1518        }
1519    }
1520
1521    #[test]
1522    fn create_stream_preserves_temporal_probe_join() {
1523        let sql = "CREATE STREAM markouts_long AS \
1524                   SELECT t.s, p.offset_ms FROM trade_probe t \
1525                   TEMPORAL PROBE JOIN price_ref r \
1526                       ON (s) TIMESTAMPS (ts, ts) \
1527                       LIST (0s, 1s, 5s, 30s) AS p";
1528        let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1529            panic!("expected CreateStream");
1530        };
1531        assert!(
1532            query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1533            "got: {query_sql}"
1534        );
1535        assert!(
1536            query_sql.contains("LIST (0s, 1s, 5s, 30s)"),
1537            "got: {query_sql}"
1538        );
1539        assert!(query_sql.contains("AS p"), "got: {query_sql}");
1540    }
1541
1542    #[test]
1543    fn create_stream_temporal_probe_range_step_preserved() {
1544        let sql = "CREATE STREAM mk AS \
1545                   SELECT s FROM trades t TEMPORAL PROBE JOIN prices r \
1546                   ON (symbol) TIMESTAMPS (ts, ts) \
1547                   RANGE FROM 0s TO 30s STEP 5s AS p";
1548        let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1549            panic!("expected CreateStream");
1550        };
1551        assert!(
1552            query_sql.contains("RANGE FROM 0s TO 30s STEP 5s"),
1553            "got: {query_sql}"
1554        );
1555    }
1556
1557    #[test]
1558    fn create_mv_preserves_temporal_probe_join() {
1559        let sql = "CREATE MATERIALIZED VIEW mv AS \
1560                   SELECT t.s FROM trades t TEMPORAL PROBE JOIN prices r \
1561                   ON (s) TIMESTAMPS (ts, ts) LIST (0s, 5s) AS p";
1562        let StreamingStatement::CreateMaterializedView { query_sql, .. } = parse_one(sql) else {
1563            panic!("expected CreateMaterializedView");
1564        };
1565        assert!(
1566            query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1567            "got: {query_sql}"
1568        );
1569    }
1570
1571    #[test]
1572    fn create_stream_emit_is_not_captured_in_query_sql() {
1573        let sql = "CREATE STREAM s AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE";
1574        let StreamingStatement::CreateStream {
1575            query_sql,
1576            emit_clause,
1577            ..
1578        } = parse_one(sql)
1579        else {
1580            panic!("expected CreateStream");
1581        };
1582        assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1583        assert!(
1584            !query_sql.to_uppercase().contains("EMIT"),
1585            "got: {query_sql}"
1586        );
1587        assert!(query_sql.contains("FROM events"), "got: {query_sql}");
1588    }
1589
1590    #[test]
1591    fn create_stream_plain_select_query_sql_executes() {
1592        let sql = "CREATE STREAM counts AS SELECT COUNT(*) AS c FROM events";
1593        let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1594            panic!("expected CreateStream");
1595        };
1596        assert!(query_sql.to_uppercase().contains("SELECT"));
1597        assert!(query_sql.contains("FROM events"));
1598    }
1599}