Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10mod declare_parser;
11pub(crate) mod dialect;
12mod emit_parser;
13pub mod join_parser;
14mod late_data_parser;
15/// Parser for CREATE/DROP LOOKUP TABLE DDL statements
16pub mod lookup_table;
17pub mod order_analyzer;
18mod sink_parser;
19mod source_parser;
20mod statements;
21mod subscribe_parser;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27    AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28    FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, SubscribeStatement,
29    WatermarkDef, WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36/// Parses SQL with streaming extensions.
37///
38/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
39/// for structured parsing. Standard SQL is delegated to sqlparser directly.
40///
41/// # Errors
42///
43/// Returns `ParseError` if the SQL syntax is invalid.
44pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48/// Parser for streaming SQL extensions.
49///
50/// Provides static methods for parsing streaming SQL statements.
51/// Uses sqlparser's `Parser` API internally for structured parsing
52/// of identifiers, data types, expressions, and queries.
53pub struct StreamingParser;
54
55impl StreamingParser {
56    /// Parse a SQL string with streaming extensions.
57    ///
58    /// Tokenizes the input to detect statement type, then routes to the
59    /// appropriate parser:
60    /// - CREATE SOURCE → `source_parser`
61    /// - CREATE SINK → `sink_parser`
62    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
63    /// - Everything else → `sqlparser::parser::Parser`
64    ///
65    /// # Errors
66    ///
67    /// Returns `ParserError` if the SQL syntax is invalid.
68    #[allow(clippy::too_many_lines)]
69    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70        let sql_trimmed = sql.trim();
71        if sql_trimmed.is_empty() {
72            return Err(sqlparser::parser::ParserError::ParserError(
73                "Empty SQL statement".to_string(),
74            ));
75        }
76
77        let dialect = LaminarDialect::default();
78
79        // Tokenize to detect statement type (with location for better errors)
80        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81            .tokenize_with_location()
82            .map_err(|e| {
83                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84            })?;
85
86        // Route based on token-level detection
87        match detect_streaming_ddl(&tokens) {
88            StreamingDdlKind::CreateSource { .. } => {
89                let mut parser =
90                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91                let source = source_parser::parse_create_source(&mut parser)
92                    .map_err(parse_error_to_parser_error)?;
93                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94            }
95            StreamingDdlKind::CreateSink { .. } => {
96                let mut parser =
97                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98                let sink = sink_parser::parse_create_sink(&mut parser)
99                    .map_err(parse_error_to_parser_error)?;
100                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101            }
102            StreamingDdlKind::CreateContinuousQuery { .. } => {
103                let mut parser =
104                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106                    .map_err(parse_error_to_parser_error)?;
107                Ok(vec![stmt])
108            }
109            StreamingDdlKind::DropSource { .. } => {
110                let mut parser =
111                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113                Ok(vec![stmt])
114            }
115            StreamingDdlKind::DropSink { .. } => {
116                let mut parser =
117                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119                Ok(vec![stmt])
120            }
121            StreamingDdlKind::DropMaterializedView { .. } => {
122                let mut parser =
123                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124                let stmt = parse_drop_materialized_view(&mut parser)
125                    .map_err(parse_error_to_parser_error)?;
126                Ok(vec![stmt])
127            }
128            StreamingDdlKind::ShowSources => {
129                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130            }
131            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132            StreamingDdlKind::ShowQueries => {
133                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134            }
135            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136                ShowCommand::MaterializedViews,
137            )]),
138            StreamingDdlKind::DescribeSource => {
139                let mut parser =
140                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142                Ok(vec![stmt])
143            }
144            StreamingDdlKind::ExplainStreaming => {
145                let mut parser =
146                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147                let stmt =
148                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149                Ok(vec![stmt])
150            }
151            StreamingDdlKind::CreateMaterializedView { .. } => {
152                let mut parser =
153                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154                let stmt = parse_create_materialized_view(&mut parser, sql_trimmed)
155                    .map_err(parse_error_to_parser_error)?;
156                Ok(vec![stmt])
157            }
158            StreamingDdlKind::CreateStream { .. } => {
159                let mut parser =
160                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161                let stmt = parse_create_stream(&mut parser, sql_trimmed)
162                    .map_err(parse_error_to_parser_error)?;
163                Ok(vec![stmt])
164            }
165            StreamingDdlKind::DropStream { .. } => {
166                let mut parser =
167                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169                Ok(vec![stmt])
170            }
171            StreamingDdlKind::ShowStreams => {
172                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173            }
174            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175            StreamingDdlKind::CreateLookupTable { .. } => {
176                let mut parser =
177                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178                let lt = lookup_table::parse_create_lookup_table(&mut parser)
179                    .map_err(parse_error_to_parser_error)?;
180                Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181            }
182            StreamingDdlKind::DropLookupTable { .. } => {
183                let mut parser =
184                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185                let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186                    .map_err(parse_error_to_parser_error)?;
187                Ok(vec![StreamingStatement::DropLookupTable {
188                    name,
189                    if_exists,
190                }])
191            }
192            StreamingDdlKind::AlterSource => {
193                let mut parser =
194                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195                let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196                Ok(vec![stmt])
197            }
198            StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
199                ShowCommand::CheckpointStatus,
200            )]),
201            StreamingDdlKind::ShowCreateSource => {
202                let mut parser =
203                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
204                let stmt =
205                    parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
206                Ok(vec![stmt])
207            }
208            StreamingDdlKind::ShowCreateSink => {
209                let mut parser =
210                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
211                let stmt =
212                    parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
213                Ok(vec![stmt])
214            }
215            StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
216            StreamingDdlKind::Subscribe => {
217                let mut parser =
218                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
219                let stmt = subscribe_parser::parse_subscribe(&mut parser)
220                    .map_err(parse_error_to_parser_error)?;
221                Ok(vec![StreamingStatement::Subscribe(Box::new(stmt))])
222            }
223            StreamingDdlKind::DeclareCursor => {
224                let mut parser =
225                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
226                let stmt = declare_parser::parse_declare_cursor(&mut parser)
227                    .map_err(parse_error_to_parser_error)?;
228                Ok(vec![stmt])
229            }
230            StreamingDdlKind::RestoreCheckpoint => {
231                let mut parser =
232                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
233                let stmt =
234                    parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
235                Ok(vec![stmt])
236            }
237            StreamingDdlKind::None => {
238                // Standard SQL - check for INSERT INTO and convert
239                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
240                Ok(statements
241                    .into_iter()
242                    .map(convert_standard_statement)
243                    .collect())
244            }
245        }
246    }
247
248    /// Check if an expression contains a window function.
249    #[must_use]
250    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
251        match expr {
252            sqlparser::ast::Expr::Function(func) => {
253                if let Some(name) = func.name.0.last() {
254                    let func_name = name.to_string().to_uppercase();
255                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
256                } else {
257                    false
258                }
259            }
260            _ => false,
261        }
262    }
263
264    /// Parse EMIT clause from SQL string.
265    ///
266    /// # Errors
267    ///
268    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
269    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
270        emit_parser::parse_emit_clause_from_sql(sql)
271    }
272
273    /// Parse late data handling clause from SQL string.
274    ///
275    /// # Errors
276    ///
277    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
278    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
279        late_data_parser::parse_late_data_clause_from_sql(sql)
280    }
281}
282
283/// Convert `ParseError` to `ParserError` for backward compatibility.
284fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
285    match e {
286        ParseError::SqlParseError(pe) => pe,
287        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
288        ParseError::WindowError(msg) => {
289            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
290        }
291        ParseError::ValidationError(msg) => {
292            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
293        }
294    }
295}
296
297/// Parse a RESTORE FROM CHECKPOINT statement.
298///
299/// Syntax: `RESTORE FROM CHECKPOINT <id>`
300///
301/// # Errors
302///
303/// Returns `ParseError` if the statement syntax is invalid.
304fn parse_restore_checkpoint(
305    parser: &mut sqlparser::parser::Parser,
306) -> Result<StreamingStatement, ParseError> {
307    // Consume RESTORE
308    tokenizer::expect_custom_keyword(parser, "RESTORE")?;
309    // Consume FROM
310    if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
311        return Err(ParseError::StreamingError(
312            "Expected FROM after RESTORE".to_string(),
313        ));
314    }
315    // Consume CHECKPOINT
316    tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
317    // Parse checkpoint ID (numeric literal)
318    let token = parser.next_token();
319    match &token.token {
320        sqlparser::tokenizer::Token::Number(n, _) => {
321            let id: u64 = n
322                .parse()
323                .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
324            Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
325        }
326        other => Err(ParseError::StreamingError(format!(
327            "Expected checkpoint ID (number), found {other}"
328        ))),
329    }
330}
331
332/// Convert a standard sqlparser statement to a `StreamingStatement`.
333///
334/// Detects INSERT INTO statements and converts them to the streaming
335/// `InsertInto` variant. All other statements are wrapped as `Standard`.
336fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
337    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
338        // Extract table name from TableObject
339        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
340            let table_name = name.clone();
341            let columns = insert.columns.clone();
342
343            // Try to extract VALUES rows from source query
344            if let Some(ref source) = insert.source {
345                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
346                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
347                    return StreamingStatement::InsertInto {
348                        table_name,
349                        columns,
350                        values: rows,
351                    };
352                }
353            }
354        }
355    }
356    StreamingStatement::Standard(Box::new(stmt))
357}
358
359/// Parse a DROP SOURCE statement.
360///
361/// Syntax: `DROP SOURCE [IF EXISTS] name [CASCADE]`
362///
363/// # Errors
364///
365/// Returns `ParseError` if the statement syntax is invalid.
366fn parse_drop_source(
367    parser: &mut sqlparser::parser::Parser,
368) -> Result<StreamingStatement, ParseError> {
369    parser
370        .expect_keyword(sqlparser::keywords::Keyword::DROP)
371        .map_err(ParseError::SqlParseError)?;
372    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
373    let if_exists = parser.parse_keywords(&[
374        sqlparser::keywords::Keyword::IF,
375        sqlparser::keywords::Keyword::EXISTS,
376    ]);
377    let name = parser
378        .parse_object_name(false)
379        .map_err(ParseError::SqlParseError)?;
380    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
381    Ok(StreamingStatement::DropSource {
382        name,
383        if_exists,
384        cascade,
385    })
386}
387
388/// Parse a DROP SINK statement.
389///
390/// Syntax: `DROP SINK [IF EXISTS] name [CASCADE]`
391///
392/// # Errors
393///
394/// Returns `ParseError` if the statement syntax is invalid.
395fn parse_drop_sink(
396    parser: &mut sqlparser::parser::Parser,
397) -> Result<StreamingStatement, ParseError> {
398    parser
399        .expect_keyword(sqlparser::keywords::Keyword::DROP)
400        .map_err(ParseError::SqlParseError)?;
401    tokenizer::expect_custom_keyword(parser, "SINK")?;
402    let if_exists = parser.parse_keywords(&[
403        sqlparser::keywords::Keyword::IF,
404        sqlparser::keywords::Keyword::EXISTS,
405    ]);
406    let name = parser
407        .parse_object_name(false)
408        .map_err(ParseError::SqlParseError)?;
409    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
410    Ok(StreamingStatement::DropSink {
411        name,
412        if_exists,
413        cascade,
414    })
415}
416
417/// Parse a DROP MATERIALIZED VIEW statement.
418///
419/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
420///
421/// # Errors
422///
423/// Returns `ParseError` if the statement syntax is invalid.
424fn parse_drop_materialized_view(
425    parser: &mut sqlparser::parser::Parser,
426) -> Result<StreamingStatement, ParseError> {
427    parser
428        .expect_keyword(sqlparser::keywords::Keyword::DROP)
429        .map_err(ParseError::SqlParseError)?;
430    parser
431        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
432        .map_err(ParseError::SqlParseError)?;
433    parser
434        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
435        .map_err(ParseError::SqlParseError)?;
436    let if_exists = parser.parse_keywords(&[
437        sqlparser::keywords::Keyword::IF,
438        sqlparser::keywords::Keyword::EXISTS,
439    ]);
440    let name = parser
441        .parse_object_name(false)
442        .map_err(ParseError::SqlParseError)?;
443    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
444    Ok(StreamingStatement::DropMaterializedView {
445        name,
446        if_exists,
447        cascade,
448    })
449}
450
451/// Parse a CREATE STREAM statement.
452///
453/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
454///
455/// # Errors
456///
457/// Returns `ParseError` if the statement syntax is invalid.
458fn parse_create_stream(
459    parser: &mut sqlparser::parser::Parser,
460    original_sql: &str,
461) -> Result<StreamingStatement, ParseError> {
462    parser
463        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
464        .map_err(ParseError::SqlParseError)?;
465
466    let or_replace = parser.parse_keywords(&[
467        sqlparser::keywords::Keyword::OR,
468        sqlparser::keywords::Keyword::REPLACE,
469    ]);
470
471    tokenizer::expect_custom_keyword(parser, "STREAM")?;
472
473    let if_not_exists = parser.parse_keywords(&[
474        sqlparser::keywords::Keyword::IF,
475        sqlparser::keywords::Keyword::NOT,
476        sqlparser::keywords::Keyword::EXISTS,
477    ]);
478
479    let name = parser
480        .parse_object_name(false)
481        .map_err(ParseError::SqlParseError)?;
482
483    parser
484        .expect_keyword(sqlparser::keywords::Keyword::AS)
485        .map_err(ParseError::SqlParseError)?;
486
487    // Collect remaining tokens, then peel off the optional trailing
488    // `WITH (...)` before splitting query / EMIT.
489    let remaining = collect_remaining_tokens(parser);
490    let (head_tokens, with_tokens) = split_off_trailing_with(&remaining);
491    let (query_tokens, emit_tokens) = split_at_emit(&head_tokens);
492
493    let query_sql = query_body_sql(
494        original_sql,
495        &query_tokens,
496        &emit_tokens,
497        with_tokens.as_deref(),
498    );
499
500    let stream_dialect = LaminarDialect::default();
501
502    let query = if query_tokens.is_empty() {
503        return Err(ParseError::StreamingError(
504            "Expected SELECT query after AS".to_string(),
505        ));
506    } else {
507        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
508            .with_tokens_with_locations(query_tokens);
509        query_parser
510            .parse_query()
511            .map_err(ParseError::SqlParseError)?
512    };
513
514    let query_stmt =
515        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
516
517    let emit_clause = if emit_tokens.is_empty() {
518        None
519    } else {
520        let mut emit_parser =
521            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
522        emit_parser::parse_emit_clause(&mut emit_parser)?
523    };
524
525    let retention_bytes = match with_tokens {
526        None => None,
527        Some(tokens) => {
528            let mut with_parser =
529                sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(tokens);
530            let opts = tokenizer::parse_with_options(&mut with_parser)?;
531            extract_retention_bytes(&opts)?
532        }
533    };
534
535    Ok(StreamingStatement::CreateStream {
536        name,
537        query: Box::new(query_stmt),
538        emit_clause,
539        or_replace,
540        if_not_exists,
541        query_sql,
542        retention_bytes,
543    })
544}
545
546/// Split off a trailing `WITH (` at depth 0. CTE-style `WITH ident AS (...)`
547/// is ignored because it's followed by an identifier, not `(`.
548fn split_off_trailing_with(
549    tokens: &[sqlparser::tokenizer::TokenWithSpan],
550) -> (
551    Vec<sqlparser::tokenizer::TokenWithSpan>,
552    Option<Vec<sqlparser::tokenizer::TokenWithSpan>>,
553) {
554    use sqlparser::tokenizer::Token;
555    let mut depth: i32 = 0;
556    let mut last_with_idx: Option<usize> = None;
557    for (i, t) in tokens.iter().enumerate() {
558        match &t.token {
559            Token::LParen => depth += 1,
560            Token::RParen => depth -= 1,
561            Token::Word(w) if depth == 0 && w.value.eq_ignore_ascii_case("WITH") => {
562                if matches!(tokens.get(i + 1).map(|t| &t.token), Some(Token::LParen)) {
563                    last_with_idx = Some(i);
564                }
565            }
566            _ => {}
567        }
568    }
569    match last_with_idx {
570        None => (tokens.to_vec(), None),
571        Some(i) => {
572            let mut head = tokens[..i].to_vec();
573            head.push(sqlparser::tokenizer::TokenWithSpan {
574                token: Token::EOF,
575                span: sqlparser::tokenizer::Span::empty(),
576            });
577            let tail = tokens[i..].to_vec();
578            (head, Some(tail))
579        }
580    }
581}
582
583/// Unknown keys are rejected so typos surface immediately.
584fn extract_retention_bytes(
585    opts: &std::collections::HashMap<String, String>,
586) -> Result<Option<u64>, ParseError> {
587    for key in opts.keys() {
588        if !key.eq_ignore_ascii_case("retain_history") {
589            return Err(ParseError::StreamingError(format!(
590                "unknown CREATE STREAM option '{key}' (expected RETAIN_HISTORY)"
591            )));
592        }
593    }
594    match opts
595        .iter()
596        .find(|(k, _)| k.eq_ignore_ascii_case("retain_history"))
597    {
598        None => Ok(None),
599        Some((_, v)) => {
600            let bytes = lookup_table::ByteSize::parse(v)?;
601            Ok(Some(bytes.as_bytes()))
602        }
603    }
604}
605
606/// Slice `original_sql` from the first query token to whichever trailing
607/// clause comes next (`EMIT` or `WITH`), or end of input if neither is
608/// present. Preserves custom streaming syntax that sqlparser's AST would
609/// drop. Falls back to joining token text if spans are empty.
610pub(super) fn query_body_sql(
611    original_sql: &str,
612    query_tokens: &[sqlparser::tokenizer::TokenWithSpan],
613    emit_tokens: &[sqlparser::tokenizer::TokenWithSpan],
614    with_tokens: Option<&[sqlparser::tokenizer::TokenWithSpan]>,
615) -> String {
616    use sqlparser::tokenizer::Token;
617
618    let from_spans = || -> Option<String> {
619        let first = query_tokens
620            .iter()
621            .find(|t| !matches!(t.token, Token::EOF))?;
622        let start = location_to_byte_offset(original_sql, first.span.start)?;
623        let trailer_starts = [emit_tokens.first(), with_tokens.and_then(|t| t.first())]
624            .into_iter()
625            .flatten()
626            .filter_map(|t| location_to_byte_offset(original_sql, t.span.start));
627        let end = trailer_starts.min().unwrap_or(original_sql.len());
628        let slice = original_sql.get(start..end)?;
629        Some(
630            slice
631                .trim_end_matches(|c: char| c.is_whitespace() || c == ';')
632                .to_string(),
633        )
634    };
635
636    from_spans().unwrap_or_else(|| {
637        query_tokens
638            .iter()
639            .take_while(|t| !matches!(t.token, Token::EOF))
640            .map(|t| t.token.to_string())
641            .collect::<Vec<_>>()
642            .join(" ")
643    })
644}
645
646/// Parse a DROP STREAM statement.
647///
648/// Syntax: `DROP STREAM [IF EXISTS] name [CASCADE]`
649///
650/// # Errors
651///
652/// Returns `ParseError` if the statement syntax is invalid.
653fn parse_drop_stream(
654    parser: &mut sqlparser::parser::Parser,
655) -> Result<StreamingStatement, ParseError> {
656    parser
657        .expect_keyword(sqlparser::keywords::Keyword::DROP)
658        .map_err(ParseError::SqlParseError)?;
659    tokenizer::expect_custom_keyword(parser, "STREAM")?;
660    let if_exists = parser.parse_keywords(&[
661        sqlparser::keywords::Keyword::IF,
662        sqlparser::keywords::Keyword::EXISTS,
663    ]);
664    let name = parser
665        .parse_object_name(false)
666        .map_err(ParseError::SqlParseError)?;
667    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
668    Ok(StreamingStatement::DropStream {
669        name,
670        if_exists,
671        cascade,
672    })
673}
674
675/// Parse a DESCRIBE statement.
676///
677/// Syntax: `DESCRIBE [EXTENDED] name`
678/// Parse an ALTER SOURCE statement.
679///
680/// Syntax:
681/// - `ALTER SOURCE name ADD COLUMN col_name data_type`
682/// - `ALTER SOURCE name SET ('key' = 'value', ...)`
683///
684/// # Errors
685///
686/// Returns `ParseError` if the statement syntax is invalid.
687fn parse_alter_source(
688    parser: &mut sqlparser::parser::Parser,
689) -> Result<StreamingStatement, ParseError> {
690    parser
691        .expect_keyword(sqlparser::keywords::Keyword::ALTER)
692        .map_err(ParseError::SqlParseError)?;
693    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
694    let name = parser
695        .parse_object_name(false)
696        .map_err(ParseError::SqlParseError)?;
697
698    // Determine operation: ADD COLUMN or SET
699    if parser.parse_keywords(&[
700        sqlparser::keywords::Keyword::ADD,
701        sqlparser::keywords::Keyword::COLUMN,
702    ]) {
703        // ALTER SOURCE name ADD COLUMN col_name data_type
704        let col_name = parser
705            .parse_identifier()
706            .map_err(ParseError::SqlParseError)?;
707        let data_type = parser
708            .parse_data_type()
709            .map_err(ParseError::SqlParseError)?;
710        let column_def = sqlparser::ast::ColumnDef {
711            name: col_name,
712            data_type,
713            options: vec![],
714        };
715        Ok(StreamingStatement::AlterSource {
716            name,
717            operation: statements::AlterSourceOperation::AddColumn { column_def },
718        })
719    } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
720        // ALTER SOURCE name SET ('key' = 'value', ...)
721        parser
722            .expect_token(&sqlparser::tokenizer::Token::LParen)
723            .map_err(ParseError::SqlParseError)?;
724        #[allow(clippy::disallowed_types)] // cold path: SQL parsing
725        let mut properties = std::collections::HashMap::new();
726        loop {
727            let key = parser
728                .parse_literal_string()
729                .map_err(ParseError::SqlParseError)?;
730            parser
731                .expect_token(&sqlparser::tokenizer::Token::Eq)
732                .map_err(ParseError::SqlParseError)?;
733            let value = parser
734                .parse_literal_string()
735                .map_err(ParseError::SqlParseError)?;
736            properties.insert(key, value);
737            if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
738                break;
739            }
740        }
741        parser
742            .expect_token(&sqlparser::tokenizer::Token::RParen)
743            .map_err(ParseError::SqlParseError)?;
744        Ok(StreamingStatement::AlterSource {
745            name,
746            operation: statements::AlterSourceOperation::SetProperties { properties },
747        })
748    } else {
749        Err(ParseError::StreamingError(
750            "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
751        ))
752    }
753}
754
755/// Parse a DESCRIBE statement.
756///
757/// # Errors
758///
759/// Returns `ParseError` if the statement syntax is invalid.
760fn parse_describe(
761    parser: &mut sqlparser::parser::Parser,
762) -> Result<StreamingStatement, ParseError> {
763    // Consume DESCRIBE or DESC
764    let token = parser.next_token();
765    match &token.token {
766        sqlparser::tokenizer::Token::Word(w)
767            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
768                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
769        _ => {
770            return Err(ParseError::StreamingError(
771                "Expected DESCRIBE or DESC".to_string(),
772            ));
773        }
774    }
775    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
776    let name = parser
777        .parse_object_name(false)
778        .map_err(ParseError::SqlParseError)?;
779    Ok(StreamingStatement::Describe { name, extended })
780}
781
782/// Parse `SHOW CREATE SOURCE <name>`.
783fn parse_show_create_source(
784    parser: &mut sqlparser::parser::Parser,
785) -> Result<StreamingStatement, ParseError> {
786    // Consume SHOW CREATE SOURCE
787    parser
788        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
789        .map_err(ParseError::SqlParseError)?;
790    parser
791        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
792        .map_err(ParseError::SqlParseError)?;
793    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
794    let name = parser
795        .parse_object_name(false)
796        .map_err(ParseError::SqlParseError)?;
797    Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
798}
799
800/// Parse `SHOW CREATE SINK <name>`.
801fn parse_show_create_sink(
802    parser: &mut sqlparser::parser::Parser,
803) -> Result<StreamingStatement, ParseError> {
804    // Consume SHOW CREATE SINK
805    parser
806        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
807        .map_err(ParseError::SqlParseError)?;
808    parser
809        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
810        .map_err(ParseError::SqlParseError)?;
811    tokenizer::expect_custom_keyword(parser, "SINK")?;
812    let name = parser
813        .parse_object_name(false)
814        .map_err(ParseError::SqlParseError)?;
815    Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
816}
817
818/// Parse an EXPLAIN [ANALYZE] statement wrapping a streaming query.
819///
820/// Syntax: `EXPLAIN [ANALYZE] <streaming_statement>`
821///
822/// # Errors
823///
824/// Returns `ParseError` if the statement syntax is invalid.
825fn parse_explain(
826    parser: &mut sqlparser::parser::Parser,
827    original_sql: &str,
828) -> Result<StreamingStatement, ParseError> {
829    parser
830        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
831        .map_err(ParseError::SqlParseError)?;
832
833    // Check for optional ANALYZE keyword
834    let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
835
836    // Find the position after EXPLAIN [ANALYZE] in the original SQL
837    let explain_prefix_upper = original_sql.to_uppercase();
838    let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
839    let inner_start = if analyze {
840        explain_prefix_upper
841            .find("ANALYZE")
842            .map_or(0, |pos| pos + "ANALYZE".len())
843    } else {
844        explain_prefix_upper
845            .find("EXPLAIN")
846            .map_or(0, |pos| pos + "EXPLAIN".len())
847    };
848    let inner_sql = original_sql[inner_start..].trim();
849    let _ = skip_keyword; // suppress unused warning
850
851    // Parse the inner statement recursively
852    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
853    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
854        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
855    })?;
856    Ok(StreamingStatement::Explain {
857        statement: Box::new(inner),
858        analyze,
859    })
860}
861
862/// Parse a CREATE MATERIALIZED VIEW statement.
863///
864/// Syntax:
865/// ```sql
866/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
867/// AS <select_query>
868/// [EMIT <strategy>]
869/// ```
870///
871/// # Errors
872///
873/// Returns `ParseError` if the statement syntax is invalid.
874fn parse_create_materialized_view(
875    parser: &mut sqlparser::parser::Parser,
876    original_sql: &str,
877) -> Result<StreamingStatement, ParseError> {
878    parser
879        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
880        .map_err(ParseError::SqlParseError)?;
881
882    let or_replace = parser.parse_keywords(&[
883        sqlparser::keywords::Keyword::OR,
884        sqlparser::keywords::Keyword::REPLACE,
885    ]);
886
887    parser
888        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
889        .map_err(ParseError::SqlParseError)?;
890    parser
891        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
892        .map_err(ParseError::SqlParseError)?;
893
894    let if_not_exists = parser.parse_keywords(&[
895        sqlparser::keywords::Keyword::IF,
896        sqlparser::keywords::Keyword::NOT,
897        sqlparser::keywords::Keyword::EXISTS,
898    ]);
899
900    let name = parser
901        .parse_object_name(false)
902        .map_err(ParseError::SqlParseError)?;
903
904    parser
905        .expect_keyword(sqlparser::keywords::Keyword::AS)
906        .map_err(ParseError::SqlParseError)?;
907
908    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
909    let remaining = collect_remaining_tokens(parser);
910    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
911
912    let query_sql = query_body_sql(original_sql, &query_tokens, &emit_tokens, None);
913
914    let mv_dialect = LaminarDialect::default();
915
916    let query = if query_tokens.is_empty() {
917        return Err(ParseError::StreamingError(
918            "Expected SELECT query after AS".to_string(),
919        ));
920    } else {
921        let mut query_parser =
922            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
923        query_parser
924            .parse_query()
925            .map_err(ParseError::SqlParseError)?
926    };
927
928    let query_stmt =
929        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
930
931    let emit_clause = if emit_tokens.is_empty() {
932        None
933    } else {
934        let mut emit_parser =
935            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
936        emit_parser::parse_emit_clause(&mut emit_parser)?
937    };
938
939    Ok(StreamingStatement::CreateMaterializedView {
940        name,
941        query: Box::new(query_stmt),
942        emit_clause,
943        or_replace,
944        if_not_exists,
945        query_sql,
946    })
947}
948
949/// Byte offset in `sql` for a sqlparser `Location` (1-indexed line/column).
950fn location_to_byte_offset(sql: &str, loc: sqlparser::tokenizer::Location) -> Option<usize> {
951    if loc.line == 0 {
952        return None;
953    }
954    let (mut line, mut col) = (1u64, 1u64);
955    for (idx, ch) in sql.char_indices() {
956        if line == loc.line && col == loc.column {
957            return Some(idx);
958        }
959        if ch == '\n' {
960            line += 1;
961            col = 1;
962        } else {
963            col += 1;
964        }
965    }
966    (line == loc.line && col == loc.column).then_some(sql.len())
967}
968
969/// Collect all remaining tokens from the parser into a Vec.
970fn collect_remaining_tokens(
971    parser: &mut sqlparser::parser::Parser,
972) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
973    let mut tokens = Vec::new();
974    loop {
975        let token = parser.next_token();
976        if token.token == sqlparser::tokenizer::Token::EOF {
977            tokens.push(token);
978            break;
979        }
980        tokens.push(token);
981    }
982    tokens
983}
984
985/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
986///
987/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
988/// (or is empty if no EMIT found).
989fn split_at_emit(
990    tokens: &[sqlparser::tokenizer::TokenWithSpan],
991) -> (
992    Vec<sqlparser::tokenizer::TokenWithSpan>,
993    Vec<sqlparser::tokenizer::TokenWithSpan>,
994) {
995    let mut depth: i32 = 0;
996    for (i, token) in tokens.iter().enumerate() {
997        match &token.token {
998            sqlparser::tokenizer::Token::LParen => depth += 1,
999            sqlparser::tokenizer::Token::RParen => {
1000                depth -= 1;
1001            }
1002            sqlparser::tokenizer::Token::Word(w)
1003                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
1004            {
1005                let mut query_tokens = tokens[..i].to_vec();
1006                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
1007                    token: sqlparser::tokenizer::Token::EOF,
1008                    span: sqlparser::tokenizer::Span::empty(),
1009                });
1010                let emit_tokens = tokens[i..].to_vec();
1011                return (query_tokens, emit_tokens);
1012            }
1013            _ => {}
1014        }
1015    }
1016    (tokens.to_vec(), vec![])
1017}
1018
1019/// SQL parsing errors
1020#[derive(Debug, thiserror::Error)]
1021pub enum ParseError {
1022    /// Standard SQL parse error
1023    #[error("SQL parse error: {0}")]
1024    SqlParseError(#[from] sqlparser::parser::ParserError),
1025
1026    /// Streaming extension parse error
1027    #[error("Streaming SQL error: {0}")]
1028    StreamingError(String),
1029
1030    /// Window function error
1031    #[error("Window function error: {0}")]
1032    WindowError(String),
1033
1034    /// Validation error (e.g., invalid option values)
1035    #[error("Validation error: {0}")]
1036    ValidationError(String),
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use super::*;
1042
1043    /// Helper to parse SQL and return the first statement.
1044    fn parse_one(sql: &str) -> StreamingStatement {
1045        let stmts = StreamingParser::parse_sql(sql).unwrap();
1046        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
1047        stmts.into_iter().next().unwrap()
1048    }
1049
1050    #[test]
1051    fn test_parse_drop_source() {
1052        let stmt = parse_one("DROP SOURCE events");
1053        match stmt {
1054            StreamingStatement::DropSource {
1055                name,
1056                if_exists,
1057                cascade,
1058            } => {
1059                assert_eq!(name.to_string(), "events");
1060                assert!(!if_exists);
1061                assert!(!cascade);
1062            }
1063            _ => panic!("Expected DropSource, got {stmt:?}"),
1064        }
1065    }
1066
1067    #[test]
1068    fn test_parse_drop_source_if_exists() {
1069        let stmt = parse_one("DROP SOURCE IF EXISTS events");
1070        match stmt {
1071            StreamingStatement::DropSource {
1072                name,
1073                if_exists,
1074                cascade,
1075            } => {
1076                assert_eq!(name.to_string(), "events");
1077                assert!(if_exists);
1078                assert!(!cascade);
1079            }
1080            _ => panic!("Expected DropSource, got {stmt:?}"),
1081        }
1082    }
1083
1084    #[test]
1085    fn test_parse_drop_source_cascade() {
1086        let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
1087        match stmt {
1088            StreamingStatement::DropSource {
1089                name,
1090                if_exists,
1091                cascade,
1092            } => {
1093                assert_eq!(name.to_string(), "events");
1094                assert!(if_exists);
1095                assert!(cascade);
1096            }
1097            _ => panic!("Expected DropSource, got {stmt:?}"),
1098        }
1099    }
1100
1101    #[test]
1102    fn test_parse_drop_sink() {
1103        let stmt = parse_one("DROP SINK output");
1104        match stmt {
1105            StreamingStatement::DropSink {
1106                name,
1107                if_exists,
1108                cascade,
1109            } => {
1110                assert_eq!(name.to_string(), "output");
1111                assert!(!if_exists);
1112                assert!(!cascade);
1113            }
1114            _ => panic!("Expected DropSink, got {stmt:?}"),
1115        }
1116    }
1117
1118    #[test]
1119    fn test_parse_drop_sink_if_exists() {
1120        let stmt = parse_one("DROP SINK IF EXISTS output");
1121        match stmt {
1122            StreamingStatement::DropSink {
1123                name,
1124                if_exists,
1125                cascade,
1126            } => {
1127                assert_eq!(name.to_string(), "output");
1128                assert!(if_exists);
1129                assert!(!cascade);
1130            }
1131            _ => panic!("Expected DropSink, got {stmt:?}"),
1132        }
1133    }
1134
1135    #[test]
1136    fn test_parse_drop_sink_cascade() {
1137        let stmt = parse_one("DROP SINK output CASCADE");
1138        match stmt {
1139            StreamingStatement::DropSink {
1140                name,
1141                if_exists,
1142                cascade,
1143            } => {
1144                assert_eq!(name.to_string(), "output");
1145                assert!(!if_exists);
1146                assert!(cascade);
1147            }
1148            _ => panic!("Expected DropSink, got {stmt:?}"),
1149        }
1150    }
1151
1152    #[test]
1153    fn test_parse_drop_materialized_view() {
1154        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
1155        match stmt {
1156            StreamingStatement::DropMaterializedView {
1157                name,
1158                if_exists,
1159                cascade,
1160            } => {
1161                assert_eq!(name.to_string(), "live_stats");
1162                assert!(!if_exists);
1163                assert!(!cascade);
1164            }
1165            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1166        }
1167    }
1168
1169    #[test]
1170    fn test_parse_drop_materialized_view_if_exists_cascade() {
1171        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1172        match stmt {
1173            StreamingStatement::DropMaterializedView {
1174                name,
1175                if_exists,
1176                cascade,
1177            } => {
1178                assert_eq!(name.to_string(), "live_stats");
1179                assert!(if_exists);
1180                assert!(cascade);
1181            }
1182            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1183        }
1184    }
1185
1186    #[test]
1187    fn test_parse_show_sources() {
1188        let stmt = parse_one("SHOW SOURCES");
1189        assert!(matches!(
1190            stmt,
1191            StreamingStatement::Show(ShowCommand::Sources)
1192        ));
1193    }
1194
1195    #[test]
1196    fn test_parse_show_sinks() {
1197        let stmt = parse_one("SHOW SINKS");
1198        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1199    }
1200
1201    #[test]
1202    fn test_parse_show_queries() {
1203        let stmt = parse_one("SHOW QUERIES");
1204        assert!(matches!(
1205            stmt,
1206            StreamingStatement::Show(ShowCommand::Queries)
1207        ));
1208    }
1209
1210    #[test]
1211    fn test_parse_show_materialized_views() {
1212        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1213        assert!(matches!(
1214            stmt,
1215            StreamingStatement::Show(ShowCommand::MaterializedViews)
1216        ));
1217    }
1218
1219    #[test]
1220    fn test_parse_describe() {
1221        let stmt = parse_one("DESCRIBE events");
1222        match stmt {
1223            StreamingStatement::Describe { name, extended } => {
1224                assert_eq!(name.to_string(), "events");
1225                assert!(!extended);
1226            }
1227            _ => panic!("Expected Describe, got {stmt:?}"),
1228        }
1229    }
1230
1231    #[test]
1232    fn test_parse_describe_extended() {
1233        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1234        match stmt {
1235            StreamingStatement::Describe { name, extended } => {
1236                assert_eq!(name.to_string(), "my_schema.events");
1237                assert!(extended);
1238            }
1239            _ => panic!("Expected Describe, got {stmt:?}"),
1240        }
1241    }
1242
1243    #[test]
1244    fn test_parse_explain_select() {
1245        let stmt = parse_one("EXPLAIN SELECT * FROM events");
1246        match stmt {
1247            StreamingStatement::Explain {
1248                statement, analyze, ..
1249            } => {
1250                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1251                assert!(!analyze);
1252            }
1253            _ => panic!("Expected Explain, got {stmt:?}"),
1254        }
1255    }
1256
1257    #[test]
1258    fn test_parse_explain_create_source() {
1259        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1260        match stmt {
1261            StreamingStatement::Explain { statement, .. } => {
1262                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1263            }
1264            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1265        }
1266    }
1267
1268    #[test]
1269    fn test_parse_explain_analyze_select() {
1270        let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1271        match stmt {
1272            StreamingStatement::Explain {
1273                statement, analyze, ..
1274            } => {
1275                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1276                assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1277            }
1278            _ => panic!("Expected Explain, got {stmt:?}"),
1279        }
1280    }
1281
1282    #[test]
1283    fn test_parse_create_materialized_view() {
1284        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1285        match stmt {
1286            StreamingStatement::CreateMaterializedView {
1287                name,
1288                emit_clause,
1289                or_replace,
1290                if_not_exists,
1291                ..
1292            } => {
1293                assert_eq!(name.to_string(), "live_stats");
1294                assert!(emit_clause.is_none());
1295                assert!(!or_replace);
1296                assert!(!if_not_exists);
1297            }
1298            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1299        }
1300    }
1301
1302    #[test]
1303    fn test_parse_create_materialized_view_with_emit() {
1304        let stmt = parse_one(
1305            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1306        );
1307        match stmt {
1308            StreamingStatement::CreateMaterializedView {
1309                name, emit_clause, ..
1310            } => {
1311                assert_eq!(name.to_string(), "live_stats");
1312                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1313            }
1314            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1315        }
1316    }
1317
1318    #[test]
1319    fn test_parse_create_or_replace_materialized_view() {
1320        let stmt = parse_one(
1321            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1322        );
1323        match stmt {
1324            StreamingStatement::CreateMaterializedView {
1325                name,
1326                or_replace,
1327                if_not_exists,
1328                ..
1329            } => {
1330                assert_eq!(name.to_string(), "live_stats");
1331                assert!(or_replace);
1332                assert!(!if_not_exists);
1333            }
1334            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1335        }
1336    }
1337
1338    #[test]
1339    fn test_parse_create_materialized_view_if_not_exists() {
1340        let stmt = parse_one(
1341            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1342        );
1343        match stmt {
1344            StreamingStatement::CreateMaterializedView {
1345                name,
1346                or_replace,
1347                if_not_exists,
1348                ..
1349            } => {
1350                assert_eq!(name.to_string(), "live_stats");
1351                assert!(!or_replace);
1352                assert!(if_not_exists);
1353            }
1354            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1355        }
1356    }
1357
1358    #[test]
1359    fn test_parse_insert_into() {
1360        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1361        match stmt {
1362            StreamingStatement::InsertInto {
1363                table_name,
1364                columns,
1365                values,
1366            } => {
1367                assert_eq!(table_name.to_string(), "events");
1368                assert_eq!(columns.len(), 2);
1369                assert_eq!(columns[0].to_string(), "id");
1370                assert_eq!(columns[1].to_string(), "name");
1371                assert_eq!(values.len(), 1);
1372                assert_eq!(values[0].len(), 2);
1373            }
1374            _ => panic!("Expected InsertInto, got {stmt:?}"),
1375        }
1376    }
1377
1378    #[test]
1379    fn test_parse_insert_into_multiple_rows() {
1380        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1381        match stmt {
1382            StreamingStatement::InsertInto {
1383                table_name,
1384                columns,
1385                values,
1386            } => {
1387                assert_eq!(table_name.to_string(), "events");
1388                assert!(columns.is_empty());
1389                assert_eq!(values.len(), 3);
1390            }
1391            _ => panic!("Expected InsertInto, got {stmt:?}"),
1392        }
1393    }
1394
1395    // ── CREATE STREAM tests ─────────────────────────────
1396
1397    #[test]
1398    fn test_parse_create_stream() {
1399        let stmt = parse_one(
1400            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1401        );
1402        match stmt {
1403            StreamingStatement::CreateStream {
1404                name,
1405                or_replace,
1406                if_not_exists,
1407                emit_clause,
1408                ..
1409            } => {
1410                assert_eq!(name.to_string(), "session_activity");
1411                assert!(!or_replace);
1412                assert!(!if_not_exists);
1413                assert!(emit_clause.is_none());
1414            }
1415            _ => panic!("Expected CreateStream, got {stmt:?}"),
1416        }
1417    }
1418
1419    #[test]
1420    fn test_parse_create_or_replace_stream() {
1421        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1422        match stmt {
1423            StreamingStatement::CreateStream { or_replace, .. } => {
1424                assert!(or_replace);
1425            }
1426            _ => panic!("Expected CreateStream, got {stmt:?}"),
1427        }
1428    }
1429
1430    #[test]
1431    fn test_parse_create_stream_if_not_exists() {
1432        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1433        match stmt {
1434            StreamingStatement::CreateStream { if_not_exists, .. } => {
1435                assert!(if_not_exists);
1436            }
1437            _ => panic!("Expected CreateStream, got {stmt:?}"),
1438        }
1439    }
1440
1441    #[test]
1442    fn test_parse_create_stream_with_emit() {
1443        let stmt =
1444            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1445        match stmt {
1446            StreamingStatement::CreateStream { emit_clause, .. } => {
1447                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1448            }
1449            _ => panic!("Expected CreateStream, got {stmt:?}"),
1450        }
1451    }
1452
1453    #[test]
1454    fn create_stream_with_retain_history() {
1455        let stmt = parse_one(
1456            "CREATE STREAM trades AS SELECT * FROM events WITH ('retain_history' = '64mb')",
1457        );
1458        let StreamingStatement::CreateStream {
1459            retention_bytes, ..
1460        } = stmt
1461        else {
1462            panic!("expected CreateStream");
1463        };
1464        assert_eq!(retention_bytes, Some(64 * 1024 * 1024));
1465    }
1466
1467    #[test]
1468    fn create_stream_with_retain_history_after_emit() {
1469        let stmt = parse_one(
1470            "CREATE STREAM trades AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE \
1471             WITH ('retain_history' = '8mb')",
1472        );
1473        let StreamingStatement::CreateStream {
1474            retention_bytes,
1475            emit_clause,
1476            ..
1477        } = stmt
1478        else {
1479            panic!("expected CreateStream");
1480        };
1481        assert_eq!(retention_bytes, Some(8 * 1024 * 1024));
1482        assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1483    }
1484
1485    #[test]
1486    fn create_stream_rejects_unknown_with_option() {
1487        let res = parse_streaming_sql("CREATE STREAM s AS SELECT * FROM e WITH ('bogus' = '1')");
1488        let err = res.expect_err("unknown WITH key must error");
1489        assert!(err.to_string().contains("bogus"), "got: {err}");
1490    }
1491
1492    #[test]
1493    fn create_stream_with_retain_history_excludes_with_from_query_sql() {
1494        // Regression: query_body_sql used to extend to original_sql.len() when
1495        // EMIT was absent, which sucked the trailing WITH clause into query_sql
1496        // and broke planner re-parsing.
1497        let stmt = parse_one(
1498            "CREATE STREAM trades AS SELECT * FROM events WITH ('retain_history' = '64mb')",
1499        );
1500        let StreamingStatement::CreateStream { query_sql, .. } = stmt else {
1501            panic!("expected CreateStream");
1502        };
1503        assert!(
1504            !query_sql.to_uppercase().contains("RETAIN_HISTORY"),
1505            "query_sql leaked WITH clause: {query_sql}"
1506        );
1507        assert!(
1508            query_sql.contains("FROM events"),
1509            "query_sql should still hold the SELECT body, got: {query_sql}"
1510        );
1511    }
1512
1513    #[test]
1514    fn test_parse_drop_stream() {
1515        let stmt = parse_one("DROP STREAM my_stream");
1516        match stmt {
1517            StreamingStatement::DropStream {
1518                name,
1519                if_exists,
1520                cascade,
1521            } => {
1522                assert_eq!(name.to_string(), "my_stream");
1523                assert!(!if_exists);
1524                assert!(!cascade);
1525            }
1526            _ => panic!("Expected DropStream, got {stmt:?}"),
1527        }
1528    }
1529
1530    #[test]
1531    fn test_parse_drop_stream_if_exists() {
1532        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1533        match stmt {
1534            StreamingStatement::DropStream {
1535                name,
1536                if_exists,
1537                cascade,
1538            } => {
1539                assert_eq!(name.to_string(), "my_stream");
1540                assert!(if_exists);
1541                assert!(!cascade);
1542            }
1543            _ => panic!("Expected DropStream, got {stmt:?}"),
1544        }
1545    }
1546
1547    #[test]
1548    fn test_parse_drop_stream_cascade() {
1549        let stmt = parse_one("DROP STREAM my_stream CASCADE");
1550        match stmt {
1551            StreamingStatement::DropStream {
1552                name,
1553                if_exists,
1554                cascade,
1555            } => {
1556                assert_eq!(name.to_string(), "my_stream");
1557                assert!(!if_exists);
1558                assert!(cascade);
1559            }
1560            _ => panic!("Expected DropStream, got {stmt:?}"),
1561        }
1562    }
1563
1564    #[test]
1565    fn test_parse_show_streams() {
1566        let stmt = parse_one("SHOW STREAMS");
1567        assert!(matches!(
1568            stmt,
1569            StreamingStatement::Show(ShowCommand::Streams)
1570        ));
1571    }
1572
1573    #[test]
1574    fn test_parse_alter_source_add_column() {
1575        let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1576        match stmt {
1577            StreamingStatement::AlterSource { name, operation } => {
1578                assert_eq!(name.to_string(), "events");
1579                match operation {
1580                    statements::AlterSourceOperation::AddColumn { column_def } => {
1581                        assert_eq!(column_def.name.value, "new_col");
1582                        assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1583                    }
1584                    statements::AlterSourceOperation::SetProperties { .. } => {
1585                        panic!("Expected AddColumn")
1586                    }
1587                }
1588            }
1589            _ => panic!("Expected AlterSource, got {stmt:?}"),
1590        }
1591    }
1592
1593    #[test]
1594    fn test_parse_alter_source_set_properties() {
1595        let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1596        match stmt {
1597            StreamingStatement::AlterSource { name, operation } => {
1598                assert_eq!(name.to_string(), "events");
1599                match operation {
1600                    statements::AlterSourceOperation::SetProperties { properties } => {
1601                        assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1602                        assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1603                    }
1604                    statements::AlterSourceOperation::AddColumn { .. } => {
1605                        panic!("Expected SetProperties")
1606                    }
1607                }
1608            }
1609            _ => panic!("Expected AlterSource, got {stmt:?}"),
1610        }
1611    }
1612
1613    #[test]
1614    fn test_parse_checkpoint() {
1615        let stmt = parse_one("CHECKPOINT");
1616        assert!(
1617            matches!(stmt, StreamingStatement::Checkpoint),
1618            "Expected Checkpoint, got {stmt:?}"
1619        );
1620    }
1621
1622    #[test]
1623    fn test_parse_show_checkpoint_status() {
1624        let stmt = parse_one("SHOW CHECKPOINT STATUS");
1625        assert!(
1626            matches!(
1627                stmt,
1628                StreamingStatement::Show(ShowCommand::CheckpointStatus)
1629            ),
1630            "Expected Show(CheckpointStatus), got {stmt:?}"
1631        );
1632    }
1633
1634    #[test]
1635    fn test_parse_restore_checkpoint() {
1636        let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1637        match stmt {
1638            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1639                assert_eq!(checkpoint_id, 42);
1640            }
1641            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1642        }
1643    }
1644
1645    #[test]
1646    fn test_parse_restore_checkpoint_large_id() {
1647        let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1648        match stmt {
1649            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1650                assert_eq!(checkpoint_id, 123_456);
1651            }
1652            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1653        }
1654    }
1655
1656    #[test]
1657    fn test_parse_show_create_source() {
1658        let stmt = parse_one("SHOW CREATE SOURCE events");
1659        match stmt {
1660            StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1661                assert_eq!(name.to_string(), "events");
1662            }
1663            _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1664        }
1665    }
1666
1667    #[test]
1668    fn test_parse_show_create_sink() {
1669        let stmt = parse_one("SHOW CREATE SINK output");
1670        match stmt {
1671            StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1672                assert_eq!(name.to_string(), "output");
1673            }
1674            _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1675        }
1676    }
1677
1678    #[test]
1679    fn create_stream_preserves_temporal_probe_join() {
1680        let sql = "CREATE STREAM markouts_long AS \
1681                   SELECT t.s, p.offset_ms FROM trade_probe t \
1682                   TEMPORAL PROBE JOIN price_ref r \
1683                       ON (s) TIMESTAMPS (ts, ts) \
1684                       LIST (0s, 1s, 5s, 30s) AS p";
1685        let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1686            panic!("expected CreateStream");
1687        };
1688        assert!(
1689            query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1690            "got: {query_sql}"
1691        );
1692        assert!(
1693            query_sql.contains("LIST (0s, 1s, 5s, 30s)"),
1694            "got: {query_sql}"
1695        );
1696        assert!(query_sql.contains("AS p"), "got: {query_sql}");
1697    }
1698
1699    #[test]
1700    fn create_stream_temporal_probe_range_step_preserved() {
1701        let sql = "CREATE STREAM mk AS \
1702                   SELECT s FROM trades t TEMPORAL PROBE JOIN prices r \
1703                   ON (symbol) TIMESTAMPS (ts, ts) \
1704                   RANGE FROM 0s TO 30s STEP 5s AS p";
1705        let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1706            panic!("expected CreateStream");
1707        };
1708        assert!(
1709            query_sql.contains("RANGE FROM 0s TO 30s STEP 5s"),
1710            "got: {query_sql}"
1711        );
1712    }
1713
1714    #[test]
1715    fn create_mv_preserves_temporal_probe_join() {
1716        let sql = "CREATE MATERIALIZED VIEW mv AS \
1717                   SELECT t.s FROM trades t TEMPORAL PROBE JOIN prices r \
1718                   ON (s) TIMESTAMPS (ts, ts) LIST (0s, 5s) AS p";
1719        let StreamingStatement::CreateMaterializedView { query_sql, .. } = parse_one(sql) else {
1720            panic!("expected CreateMaterializedView");
1721        };
1722        assert!(
1723            query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1724            "got: {query_sql}"
1725        );
1726    }
1727
1728    #[test]
1729    fn create_stream_emit_is_not_captured_in_query_sql() {
1730        let sql = "CREATE STREAM s AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE";
1731        let StreamingStatement::CreateStream {
1732            query_sql,
1733            emit_clause,
1734            ..
1735        } = parse_one(sql)
1736        else {
1737            panic!("expected CreateStream");
1738        };
1739        assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1740        assert!(
1741            !query_sql.to_uppercase().contains("EMIT"),
1742            "got: {query_sql}"
1743        );
1744        assert!(query_sql.contains("FROM events"), "got: {query_sql}");
1745    }
1746
1747    #[test]
1748    fn create_stream_plain_select_query_sql_executes() {
1749        let sql = "CREATE STREAM counts AS SELECT COUNT(*) AS c FROM events";
1750        let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1751            panic!("expected CreateStream");
1752        };
1753        assert!(query_sql.to_uppercase().contains("SELECT"));
1754        assert!(query_sql.contains("FROM events"));
1755    }
1756}