1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10mod declare_parser;
11pub(crate) mod dialect;
12mod emit_parser;
13pub mod join_parser;
14mod late_data_parser;
15pub mod lookup_table;
17pub mod order_analyzer;
18mod sink_parser;
19mod source_parser;
20mod statements;
21mod subscribe_parser;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27 AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28 FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, SubscribeStatement,
29 WatermarkDef, WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48pub struct StreamingParser;
54
55impl StreamingParser {
56 #[allow(clippy::too_many_lines)]
69 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70 let sql_trimmed = sql.trim();
71 if sql_trimmed.is_empty() {
72 return Err(sqlparser::parser::ParserError::ParserError(
73 "Empty SQL statement".to_string(),
74 ));
75 }
76
77 let dialect = LaminarDialect::default();
78
79 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81 .tokenize_with_location()
82 .map_err(|e| {
83 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84 })?;
85
86 match detect_streaming_ddl(&tokens) {
88 StreamingDdlKind::CreateSource { .. } => {
89 let mut parser =
90 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91 let source = source_parser::parse_create_source(&mut parser)
92 .map_err(parse_error_to_parser_error)?;
93 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94 }
95 StreamingDdlKind::CreateSink { .. } => {
96 let mut parser =
97 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98 let sink = sink_parser::parse_create_sink(&mut parser)
99 .map_err(parse_error_to_parser_error)?;
100 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101 }
102 StreamingDdlKind::CreateContinuousQuery { .. } => {
103 let mut parser =
104 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106 .map_err(parse_error_to_parser_error)?;
107 Ok(vec![stmt])
108 }
109 StreamingDdlKind::DropSource { .. } => {
110 let mut parser =
111 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113 Ok(vec![stmt])
114 }
115 StreamingDdlKind::DropSink { .. } => {
116 let mut parser =
117 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119 Ok(vec![stmt])
120 }
121 StreamingDdlKind::DropMaterializedView { .. } => {
122 let mut parser =
123 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124 let stmt = parse_drop_materialized_view(&mut parser)
125 .map_err(parse_error_to_parser_error)?;
126 Ok(vec![stmt])
127 }
128 StreamingDdlKind::ShowSources => {
129 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130 }
131 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132 StreamingDdlKind::ShowQueries => {
133 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134 }
135 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136 ShowCommand::MaterializedViews,
137 )]),
138 StreamingDdlKind::DescribeSource => {
139 let mut parser =
140 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142 Ok(vec![stmt])
143 }
144 StreamingDdlKind::ExplainStreaming => {
145 let mut parser =
146 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147 let stmt =
148 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149 Ok(vec![stmt])
150 }
151 StreamingDdlKind::CreateMaterializedView { .. } => {
152 let mut parser =
153 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154 let stmt = parse_create_materialized_view(&mut parser, sql_trimmed)
155 .map_err(parse_error_to_parser_error)?;
156 Ok(vec![stmt])
157 }
158 StreamingDdlKind::CreateStream { .. } => {
159 let mut parser =
160 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161 let stmt = parse_create_stream(&mut parser, sql_trimmed)
162 .map_err(parse_error_to_parser_error)?;
163 Ok(vec![stmt])
164 }
165 StreamingDdlKind::DropStream { .. } => {
166 let mut parser =
167 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169 Ok(vec![stmt])
170 }
171 StreamingDdlKind::ShowStreams => {
172 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173 }
174 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175 StreamingDdlKind::CreateLookupTable { .. } => {
176 let mut parser =
177 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178 let lt = lookup_table::parse_create_lookup_table(&mut parser)
179 .map_err(parse_error_to_parser_error)?;
180 Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181 }
182 StreamingDdlKind::DropLookupTable { .. } => {
183 let mut parser =
184 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185 let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186 .map_err(parse_error_to_parser_error)?;
187 Ok(vec![StreamingStatement::DropLookupTable {
188 name,
189 if_exists,
190 }])
191 }
192 StreamingDdlKind::AlterSource => {
193 let mut parser =
194 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195 let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196 Ok(vec![stmt])
197 }
198 StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
199 ShowCommand::CheckpointStatus,
200 )]),
201 StreamingDdlKind::ShowCreateSource => {
202 let mut parser =
203 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
204 let stmt =
205 parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
206 Ok(vec![stmt])
207 }
208 StreamingDdlKind::ShowCreateSink => {
209 let mut parser =
210 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
211 let stmt =
212 parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
213 Ok(vec![stmt])
214 }
215 StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
216 StreamingDdlKind::Subscribe => {
217 let mut parser =
218 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
219 let stmt = subscribe_parser::parse_subscribe(&mut parser)
220 .map_err(parse_error_to_parser_error)?;
221 Ok(vec![StreamingStatement::Subscribe(Box::new(stmt))])
222 }
223 StreamingDdlKind::DeclareCursor => {
224 let mut parser =
225 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
226 let stmt = declare_parser::parse_declare_cursor(&mut parser)
227 .map_err(parse_error_to_parser_error)?;
228 Ok(vec![stmt])
229 }
230 StreamingDdlKind::RestoreCheckpoint => {
231 let mut parser =
232 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
233 let stmt =
234 parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
235 Ok(vec![stmt])
236 }
237 StreamingDdlKind::None => {
238 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
240 Ok(statements
241 .into_iter()
242 .map(convert_standard_statement)
243 .collect())
244 }
245 }
246 }
247
248 #[must_use]
250 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
251 match expr {
252 sqlparser::ast::Expr::Function(func) => {
253 if let Some(name) = func.name.0.last() {
254 let func_name = name.to_string().to_uppercase();
255 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
256 } else {
257 false
258 }
259 }
260 _ => false,
261 }
262 }
263
264 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
270 emit_parser::parse_emit_clause_from_sql(sql)
271 }
272
273 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
279 late_data_parser::parse_late_data_clause_from_sql(sql)
280 }
281}
282
283fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
285 match e {
286 ParseError::SqlParseError(pe) => pe,
287 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
288 ParseError::WindowError(msg) => {
289 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
290 }
291 ParseError::ValidationError(msg) => {
292 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
293 }
294 }
295}
296
297fn parse_restore_checkpoint(
305 parser: &mut sqlparser::parser::Parser,
306) -> Result<StreamingStatement, ParseError> {
307 tokenizer::expect_custom_keyword(parser, "RESTORE")?;
309 if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
311 return Err(ParseError::StreamingError(
312 "Expected FROM after RESTORE".to_string(),
313 ));
314 }
315 tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
317 let token = parser.next_token();
319 match &token.token {
320 sqlparser::tokenizer::Token::Number(n, _) => {
321 let id: u64 = n
322 .parse()
323 .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
324 Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
325 }
326 other => Err(ParseError::StreamingError(format!(
327 "Expected checkpoint ID (number), found {other}"
328 ))),
329 }
330}
331
332fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
337 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
338 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
340 let table_name = name.clone();
341 let columns = insert.columns.clone();
342
343 if let Some(ref source) = insert.source {
345 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
346 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
347 return StreamingStatement::InsertInto {
348 table_name,
349 columns,
350 values: rows,
351 };
352 }
353 }
354 }
355 }
356 StreamingStatement::Standard(Box::new(stmt))
357}
358
359fn parse_drop_source(
367 parser: &mut sqlparser::parser::Parser,
368) -> Result<StreamingStatement, ParseError> {
369 parser
370 .expect_keyword(sqlparser::keywords::Keyword::DROP)
371 .map_err(ParseError::SqlParseError)?;
372 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
373 let if_exists = parser.parse_keywords(&[
374 sqlparser::keywords::Keyword::IF,
375 sqlparser::keywords::Keyword::EXISTS,
376 ]);
377 let name = parser
378 .parse_object_name(false)
379 .map_err(ParseError::SqlParseError)?;
380 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
381 Ok(StreamingStatement::DropSource {
382 name,
383 if_exists,
384 cascade,
385 })
386}
387
388fn parse_drop_sink(
396 parser: &mut sqlparser::parser::Parser,
397) -> Result<StreamingStatement, ParseError> {
398 parser
399 .expect_keyword(sqlparser::keywords::Keyword::DROP)
400 .map_err(ParseError::SqlParseError)?;
401 tokenizer::expect_custom_keyword(parser, "SINK")?;
402 let if_exists = parser.parse_keywords(&[
403 sqlparser::keywords::Keyword::IF,
404 sqlparser::keywords::Keyword::EXISTS,
405 ]);
406 let name = parser
407 .parse_object_name(false)
408 .map_err(ParseError::SqlParseError)?;
409 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
410 Ok(StreamingStatement::DropSink {
411 name,
412 if_exists,
413 cascade,
414 })
415}
416
417fn parse_drop_materialized_view(
425 parser: &mut sqlparser::parser::Parser,
426) -> Result<StreamingStatement, ParseError> {
427 parser
428 .expect_keyword(sqlparser::keywords::Keyword::DROP)
429 .map_err(ParseError::SqlParseError)?;
430 parser
431 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
432 .map_err(ParseError::SqlParseError)?;
433 parser
434 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
435 .map_err(ParseError::SqlParseError)?;
436 let if_exists = parser.parse_keywords(&[
437 sqlparser::keywords::Keyword::IF,
438 sqlparser::keywords::Keyword::EXISTS,
439 ]);
440 let name = parser
441 .parse_object_name(false)
442 .map_err(ParseError::SqlParseError)?;
443 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
444 Ok(StreamingStatement::DropMaterializedView {
445 name,
446 if_exists,
447 cascade,
448 })
449}
450
451fn parse_create_stream(
459 parser: &mut sqlparser::parser::Parser,
460 original_sql: &str,
461) -> Result<StreamingStatement, ParseError> {
462 parser
463 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
464 .map_err(ParseError::SqlParseError)?;
465
466 let or_replace = parser.parse_keywords(&[
467 sqlparser::keywords::Keyword::OR,
468 sqlparser::keywords::Keyword::REPLACE,
469 ]);
470
471 tokenizer::expect_custom_keyword(parser, "STREAM")?;
472
473 let if_not_exists = parser.parse_keywords(&[
474 sqlparser::keywords::Keyword::IF,
475 sqlparser::keywords::Keyword::NOT,
476 sqlparser::keywords::Keyword::EXISTS,
477 ]);
478
479 let name = parser
480 .parse_object_name(false)
481 .map_err(ParseError::SqlParseError)?;
482
483 parser
484 .expect_keyword(sqlparser::keywords::Keyword::AS)
485 .map_err(ParseError::SqlParseError)?;
486
487 let remaining = collect_remaining_tokens(parser);
490 let (head_tokens, with_tokens) = split_off_trailing_with(&remaining);
491 let (query_tokens, emit_tokens) = split_at_emit(&head_tokens);
492
493 let query_sql = query_body_sql(
494 original_sql,
495 &query_tokens,
496 &emit_tokens,
497 with_tokens.as_deref(),
498 );
499
500 let stream_dialect = LaminarDialect::default();
501
502 let query = if query_tokens.is_empty() {
503 return Err(ParseError::StreamingError(
504 "Expected SELECT query after AS".to_string(),
505 ));
506 } else {
507 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
508 .with_tokens_with_locations(query_tokens);
509 query_parser
510 .parse_query()
511 .map_err(ParseError::SqlParseError)?
512 };
513
514 let query_stmt =
515 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
516
517 let emit_clause = if emit_tokens.is_empty() {
518 None
519 } else {
520 let mut emit_parser =
521 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
522 emit_parser::parse_emit_clause(&mut emit_parser)?
523 };
524
525 let retention_bytes = match with_tokens {
526 None => None,
527 Some(tokens) => {
528 let mut with_parser =
529 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(tokens);
530 let opts = tokenizer::parse_with_options(&mut with_parser)?;
531 extract_retention_bytes(&opts)?
532 }
533 };
534
535 Ok(StreamingStatement::CreateStream {
536 name,
537 query: Box::new(query_stmt),
538 emit_clause,
539 or_replace,
540 if_not_exists,
541 query_sql,
542 retention_bytes,
543 })
544}
545
546fn split_off_trailing_with(
549 tokens: &[sqlparser::tokenizer::TokenWithSpan],
550) -> (
551 Vec<sqlparser::tokenizer::TokenWithSpan>,
552 Option<Vec<sqlparser::tokenizer::TokenWithSpan>>,
553) {
554 use sqlparser::tokenizer::Token;
555 let mut depth: i32 = 0;
556 let mut last_with_idx: Option<usize> = None;
557 for (i, t) in tokens.iter().enumerate() {
558 match &t.token {
559 Token::LParen => depth += 1,
560 Token::RParen => depth -= 1,
561 Token::Word(w) if depth == 0 && w.value.eq_ignore_ascii_case("WITH") => {
562 if matches!(tokens.get(i + 1).map(|t| &t.token), Some(Token::LParen)) {
563 last_with_idx = Some(i);
564 }
565 }
566 _ => {}
567 }
568 }
569 match last_with_idx {
570 None => (tokens.to_vec(), None),
571 Some(i) => {
572 let mut head = tokens[..i].to_vec();
573 head.push(sqlparser::tokenizer::TokenWithSpan {
574 token: Token::EOF,
575 span: sqlparser::tokenizer::Span::empty(),
576 });
577 let tail = tokens[i..].to_vec();
578 (head, Some(tail))
579 }
580 }
581}
582
583fn extract_retention_bytes(
585 opts: &std::collections::HashMap<String, String>,
586) -> Result<Option<u64>, ParseError> {
587 for key in opts.keys() {
588 if !key.eq_ignore_ascii_case("retain_history") {
589 return Err(ParseError::StreamingError(format!(
590 "unknown CREATE STREAM option '{key}' (expected RETAIN_HISTORY)"
591 )));
592 }
593 }
594 match opts
595 .iter()
596 .find(|(k, _)| k.eq_ignore_ascii_case("retain_history"))
597 {
598 None => Ok(None),
599 Some((_, v)) => {
600 let bytes = lookup_table::ByteSize::parse(v)?;
601 Ok(Some(bytes.as_bytes()))
602 }
603 }
604}
605
606pub(super) fn query_body_sql(
611 original_sql: &str,
612 query_tokens: &[sqlparser::tokenizer::TokenWithSpan],
613 emit_tokens: &[sqlparser::tokenizer::TokenWithSpan],
614 with_tokens: Option<&[sqlparser::tokenizer::TokenWithSpan]>,
615) -> String {
616 use sqlparser::tokenizer::Token;
617
618 let from_spans = || -> Option<String> {
619 let first = query_tokens
620 .iter()
621 .find(|t| !matches!(t.token, Token::EOF))?;
622 let start = location_to_byte_offset(original_sql, first.span.start)?;
623 let trailer_starts = [emit_tokens.first(), with_tokens.and_then(|t| t.first())]
624 .into_iter()
625 .flatten()
626 .filter_map(|t| location_to_byte_offset(original_sql, t.span.start));
627 let end = trailer_starts.min().unwrap_or(original_sql.len());
628 let slice = original_sql.get(start..end)?;
629 Some(
630 slice
631 .trim_end_matches(|c: char| c.is_whitespace() || c == ';')
632 .to_string(),
633 )
634 };
635
636 from_spans().unwrap_or_else(|| {
637 query_tokens
638 .iter()
639 .take_while(|t| !matches!(t.token, Token::EOF))
640 .map(|t| t.token.to_string())
641 .collect::<Vec<_>>()
642 .join(" ")
643 })
644}
645
646fn parse_drop_stream(
654 parser: &mut sqlparser::parser::Parser,
655) -> Result<StreamingStatement, ParseError> {
656 parser
657 .expect_keyword(sqlparser::keywords::Keyword::DROP)
658 .map_err(ParseError::SqlParseError)?;
659 tokenizer::expect_custom_keyword(parser, "STREAM")?;
660 let if_exists = parser.parse_keywords(&[
661 sqlparser::keywords::Keyword::IF,
662 sqlparser::keywords::Keyword::EXISTS,
663 ]);
664 let name = parser
665 .parse_object_name(false)
666 .map_err(ParseError::SqlParseError)?;
667 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
668 Ok(StreamingStatement::DropStream {
669 name,
670 if_exists,
671 cascade,
672 })
673}
674
675fn parse_alter_source(
688 parser: &mut sqlparser::parser::Parser,
689) -> Result<StreamingStatement, ParseError> {
690 parser
691 .expect_keyword(sqlparser::keywords::Keyword::ALTER)
692 .map_err(ParseError::SqlParseError)?;
693 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
694 let name = parser
695 .parse_object_name(false)
696 .map_err(ParseError::SqlParseError)?;
697
698 if parser.parse_keywords(&[
700 sqlparser::keywords::Keyword::ADD,
701 sqlparser::keywords::Keyword::COLUMN,
702 ]) {
703 let col_name = parser
705 .parse_identifier()
706 .map_err(ParseError::SqlParseError)?;
707 let data_type = parser
708 .parse_data_type()
709 .map_err(ParseError::SqlParseError)?;
710 let column_def = sqlparser::ast::ColumnDef {
711 name: col_name,
712 data_type,
713 options: vec![],
714 };
715 Ok(StreamingStatement::AlterSource {
716 name,
717 operation: statements::AlterSourceOperation::AddColumn { column_def },
718 })
719 } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
720 parser
722 .expect_token(&sqlparser::tokenizer::Token::LParen)
723 .map_err(ParseError::SqlParseError)?;
724 #[allow(clippy::disallowed_types)] let mut properties = std::collections::HashMap::new();
726 loop {
727 let key = parser
728 .parse_literal_string()
729 .map_err(ParseError::SqlParseError)?;
730 parser
731 .expect_token(&sqlparser::tokenizer::Token::Eq)
732 .map_err(ParseError::SqlParseError)?;
733 let value = parser
734 .parse_literal_string()
735 .map_err(ParseError::SqlParseError)?;
736 properties.insert(key, value);
737 if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
738 break;
739 }
740 }
741 parser
742 .expect_token(&sqlparser::tokenizer::Token::RParen)
743 .map_err(ParseError::SqlParseError)?;
744 Ok(StreamingStatement::AlterSource {
745 name,
746 operation: statements::AlterSourceOperation::SetProperties { properties },
747 })
748 } else {
749 Err(ParseError::StreamingError(
750 "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
751 ))
752 }
753}
754
755fn parse_describe(
761 parser: &mut sqlparser::parser::Parser,
762) -> Result<StreamingStatement, ParseError> {
763 let token = parser.next_token();
765 match &token.token {
766 sqlparser::tokenizer::Token::Word(w)
767 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
768 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
769 _ => {
770 return Err(ParseError::StreamingError(
771 "Expected DESCRIBE or DESC".to_string(),
772 ));
773 }
774 }
775 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
776 let name = parser
777 .parse_object_name(false)
778 .map_err(ParseError::SqlParseError)?;
779 Ok(StreamingStatement::Describe { name, extended })
780}
781
782fn parse_show_create_source(
784 parser: &mut sqlparser::parser::Parser,
785) -> Result<StreamingStatement, ParseError> {
786 parser
788 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
789 .map_err(ParseError::SqlParseError)?;
790 parser
791 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
792 .map_err(ParseError::SqlParseError)?;
793 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
794 let name = parser
795 .parse_object_name(false)
796 .map_err(ParseError::SqlParseError)?;
797 Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
798}
799
800fn parse_show_create_sink(
802 parser: &mut sqlparser::parser::Parser,
803) -> Result<StreamingStatement, ParseError> {
804 parser
806 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
807 .map_err(ParseError::SqlParseError)?;
808 parser
809 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
810 .map_err(ParseError::SqlParseError)?;
811 tokenizer::expect_custom_keyword(parser, "SINK")?;
812 let name = parser
813 .parse_object_name(false)
814 .map_err(ParseError::SqlParseError)?;
815 Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
816}
817
818fn parse_explain(
826 parser: &mut sqlparser::parser::Parser,
827 original_sql: &str,
828) -> Result<StreamingStatement, ParseError> {
829 parser
830 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
831 .map_err(ParseError::SqlParseError)?;
832
833 let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
835
836 let explain_prefix_upper = original_sql.to_uppercase();
838 let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
839 let inner_start = if analyze {
840 explain_prefix_upper
841 .find("ANALYZE")
842 .map_or(0, |pos| pos + "ANALYZE".len())
843 } else {
844 explain_prefix_upper
845 .find("EXPLAIN")
846 .map_or(0, |pos| pos + "EXPLAIN".len())
847 };
848 let inner_sql = original_sql[inner_start..].trim();
849 let _ = skip_keyword; let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
853 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
854 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
855 })?;
856 Ok(StreamingStatement::Explain {
857 statement: Box::new(inner),
858 analyze,
859 })
860}
861
862fn parse_create_materialized_view(
875 parser: &mut sqlparser::parser::Parser,
876 original_sql: &str,
877) -> Result<StreamingStatement, ParseError> {
878 parser
879 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
880 .map_err(ParseError::SqlParseError)?;
881
882 let or_replace = parser.parse_keywords(&[
883 sqlparser::keywords::Keyword::OR,
884 sqlparser::keywords::Keyword::REPLACE,
885 ]);
886
887 parser
888 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
889 .map_err(ParseError::SqlParseError)?;
890 parser
891 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
892 .map_err(ParseError::SqlParseError)?;
893
894 let if_not_exists = parser.parse_keywords(&[
895 sqlparser::keywords::Keyword::IF,
896 sqlparser::keywords::Keyword::NOT,
897 sqlparser::keywords::Keyword::EXISTS,
898 ]);
899
900 let name = parser
901 .parse_object_name(false)
902 .map_err(ParseError::SqlParseError)?;
903
904 parser
905 .expect_keyword(sqlparser::keywords::Keyword::AS)
906 .map_err(ParseError::SqlParseError)?;
907
908 let remaining = collect_remaining_tokens(parser);
910 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
911
912 let query_sql = query_body_sql(original_sql, &query_tokens, &emit_tokens, None);
913
914 let mv_dialect = LaminarDialect::default();
915
916 let query = if query_tokens.is_empty() {
917 return Err(ParseError::StreamingError(
918 "Expected SELECT query after AS".to_string(),
919 ));
920 } else {
921 let mut query_parser =
922 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
923 query_parser
924 .parse_query()
925 .map_err(ParseError::SqlParseError)?
926 };
927
928 let query_stmt =
929 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
930
931 let emit_clause = if emit_tokens.is_empty() {
932 None
933 } else {
934 let mut emit_parser =
935 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
936 emit_parser::parse_emit_clause(&mut emit_parser)?
937 };
938
939 Ok(StreamingStatement::CreateMaterializedView {
940 name,
941 query: Box::new(query_stmt),
942 emit_clause,
943 or_replace,
944 if_not_exists,
945 query_sql,
946 })
947}
948
949fn location_to_byte_offset(sql: &str, loc: sqlparser::tokenizer::Location) -> Option<usize> {
951 if loc.line == 0 {
952 return None;
953 }
954 let (mut line, mut col) = (1u64, 1u64);
955 for (idx, ch) in sql.char_indices() {
956 if line == loc.line && col == loc.column {
957 return Some(idx);
958 }
959 if ch == '\n' {
960 line += 1;
961 col = 1;
962 } else {
963 col += 1;
964 }
965 }
966 (line == loc.line && col == loc.column).then_some(sql.len())
967}
968
969fn collect_remaining_tokens(
971 parser: &mut sqlparser::parser::Parser,
972) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
973 let mut tokens = Vec::new();
974 loop {
975 let token = parser.next_token();
976 if token.token == sqlparser::tokenizer::Token::EOF {
977 tokens.push(token);
978 break;
979 }
980 tokens.push(token);
981 }
982 tokens
983}
984
985fn split_at_emit(
990 tokens: &[sqlparser::tokenizer::TokenWithSpan],
991) -> (
992 Vec<sqlparser::tokenizer::TokenWithSpan>,
993 Vec<sqlparser::tokenizer::TokenWithSpan>,
994) {
995 let mut depth: i32 = 0;
996 for (i, token) in tokens.iter().enumerate() {
997 match &token.token {
998 sqlparser::tokenizer::Token::LParen => depth += 1,
999 sqlparser::tokenizer::Token::RParen => {
1000 depth -= 1;
1001 }
1002 sqlparser::tokenizer::Token::Word(w)
1003 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
1004 {
1005 let mut query_tokens = tokens[..i].to_vec();
1006 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
1007 token: sqlparser::tokenizer::Token::EOF,
1008 span: sqlparser::tokenizer::Span::empty(),
1009 });
1010 let emit_tokens = tokens[i..].to_vec();
1011 return (query_tokens, emit_tokens);
1012 }
1013 _ => {}
1014 }
1015 }
1016 (tokens.to_vec(), vec![])
1017}
1018
1019#[derive(Debug, thiserror::Error)]
1021pub enum ParseError {
1022 #[error("SQL parse error: {0}")]
1024 SqlParseError(#[from] sqlparser::parser::ParserError),
1025
1026 #[error("Streaming SQL error: {0}")]
1028 StreamingError(String),
1029
1030 #[error("Window function error: {0}")]
1032 WindowError(String),
1033
1034 #[error("Validation error: {0}")]
1036 ValidationError(String),
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use super::*;
1042
1043 fn parse_one(sql: &str) -> StreamingStatement {
1045 let stmts = StreamingParser::parse_sql(sql).unwrap();
1046 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
1047 stmts.into_iter().next().unwrap()
1048 }
1049
1050 #[test]
1051 fn test_parse_drop_source() {
1052 let stmt = parse_one("DROP SOURCE events");
1053 match stmt {
1054 StreamingStatement::DropSource {
1055 name,
1056 if_exists,
1057 cascade,
1058 } => {
1059 assert_eq!(name.to_string(), "events");
1060 assert!(!if_exists);
1061 assert!(!cascade);
1062 }
1063 _ => panic!("Expected DropSource, got {stmt:?}"),
1064 }
1065 }
1066
1067 #[test]
1068 fn test_parse_drop_source_if_exists() {
1069 let stmt = parse_one("DROP SOURCE IF EXISTS events");
1070 match stmt {
1071 StreamingStatement::DropSource {
1072 name,
1073 if_exists,
1074 cascade,
1075 } => {
1076 assert_eq!(name.to_string(), "events");
1077 assert!(if_exists);
1078 assert!(!cascade);
1079 }
1080 _ => panic!("Expected DropSource, got {stmt:?}"),
1081 }
1082 }
1083
1084 #[test]
1085 fn test_parse_drop_source_cascade() {
1086 let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
1087 match stmt {
1088 StreamingStatement::DropSource {
1089 name,
1090 if_exists,
1091 cascade,
1092 } => {
1093 assert_eq!(name.to_string(), "events");
1094 assert!(if_exists);
1095 assert!(cascade);
1096 }
1097 _ => panic!("Expected DropSource, got {stmt:?}"),
1098 }
1099 }
1100
1101 #[test]
1102 fn test_parse_drop_sink() {
1103 let stmt = parse_one("DROP SINK output");
1104 match stmt {
1105 StreamingStatement::DropSink {
1106 name,
1107 if_exists,
1108 cascade,
1109 } => {
1110 assert_eq!(name.to_string(), "output");
1111 assert!(!if_exists);
1112 assert!(!cascade);
1113 }
1114 _ => panic!("Expected DropSink, got {stmt:?}"),
1115 }
1116 }
1117
1118 #[test]
1119 fn test_parse_drop_sink_if_exists() {
1120 let stmt = parse_one("DROP SINK IF EXISTS output");
1121 match stmt {
1122 StreamingStatement::DropSink {
1123 name,
1124 if_exists,
1125 cascade,
1126 } => {
1127 assert_eq!(name.to_string(), "output");
1128 assert!(if_exists);
1129 assert!(!cascade);
1130 }
1131 _ => panic!("Expected DropSink, got {stmt:?}"),
1132 }
1133 }
1134
1135 #[test]
1136 fn test_parse_drop_sink_cascade() {
1137 let stmt = parse_one("DROP SINK output CASCADE");
1138 match stmt {
1139 StreamingStatement::DropSink {
1140 name,
1141 if_exists,
1142 cascade,
1143 } => {
1144 assert_eq!(name.to_string(), "output");
1145 assert!(!if_exists);
1146 assert!(cascade);
1147 }
1148 _ => panic!("Expected DropSink, got {stmt:?}"),
1149 }
1150 }
1151
1152 #[test]
1153 fn test_parse_drop_materialized_view() {
1154 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
1155 match stmt {
1156 StreamingStatement::DropMaterializedView {
1157 name,
1158 if_exists,
1159 cascade,
1160 } => {
1161 assert_eq!(name.to_string(), "live_stats");
1162 assert!(!if_exists);
1163 assert!(!cascade);
1164 }
1165 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1166 }
1167 }
1168
1169 #[test]
1170 fn test_parse_drop_materialized_view_if_exists_cascade() {
1171 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1172 match stmt {
1173 StreamingStatement::DropMaterializedView {
1174 name,
1175 if_exists,
1176 cascade,
1177 } => {
1178 assert_eq!(name.to_string(), "live_stats");
1179 assert!(if_exists);
1180 assert!(cascade);
1181 }
1182 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1183 }
1184 }
1185
1186 #[test]
1187 fn test_parse_show_sources() {
1188 let stmt = parse_one("SHOW SOURCES");
1189 assert!(matches!(
1190 stmt,
1191 StreamingStatement::Show(ShowCommand::Sources)
1192 ));
1193 }
1194
1195 #[test]
1196 fn test_parse_show_sinks() {
1197 let stmt = parse_one("SHOW SINKS");
1198 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1199 }
1200
1201 #[test]
1202 fn test_parse_show_queries() {
1203 let stmt = parse_one("SHOW QUERIES");
1204 assert!(matches!(
1205 stmt,
1206 StreamingStatement::Show(ShowCommand::Queries)
1207 ));
1208 }
1209
1210 #[test]
1211 fn test_parse_show_materialized_views() {
1212 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1213 assert!(matches!(
1214 stmt,
1215 StreamingStatement::Show(ShowCommand::MaterializedViews)
1216 ));
1217 }
1218
1219 #[test]
1220 fn test_parse_describe() {
1221 let stmt = parse_one("DESCRIBE events");
1222 match stmt {
1223 StreamingStatement::Describe { name, extended } => {
1224 assert_eq!(name.to_string(), "events");
1225 assert!(!extended);
1226 }
1227 _ => panic!("Expected Describe, got {stmt:?}"),
1228 }
1229 }
1230
1231 #[test]
1232 fn test_parse_describe_extended() {
1233 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1234 match stmt {
1235 StreamingStatement::Describe { name, extended } => {
1236 assert_eq!(name.to_string(), "my_schema.events");
1237 assert!(extended);
1238 }
1239 _ => panic!("Expected Describe, got {stmt:?}"),
1240 }
1241 }
1242
1243 #[test]
1244 fn test_parse_explain_select() {
1245 let stmt = parse_one("EXPLAIN SELECT * FROM events");
1246 match stmt {
1247 StreamingStatement::Explain {
1248 statement, analyze, ..
1249 } => {
1250 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1251 assert!(!analyze);
1252 }
1253 _ => panic!("Expected Explain, got {stmt:?}"),
1254 }
1255 }
1256
1257 #[test]
1258 fn test_parse_explain_create_source() {
1259 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1260 match stmt {
1261 StreamingStatement::Explain { statement, .. } => {
1262 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1263 }
1264 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1265 }
1266 }
1267
1268 #[test]
1269 fn test_parse_explain_analyze_select() {
1270 let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1271 match stmt {
1272 StreamingStatement::Explain {
1273 statement, analyze, ..
1274 } => {
1275 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1276 assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1277 }
1278 _ => panic!("Expected Explain, got {stmt:?}"),
1279 }
1280 }
1281
1282 #[test]
1283 fn test_parse_create_materialized_view() {
1284 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1285 match stmt {
1286 StreamingStatement::CreateMaterializedView {
1287 name,
1288 emit_clause,
1289 or_replace,
1290 if_not_exists,
1291 ..
1292 } => {
1293 assert_eq!(name.to_string(), "live_stats");
1294 assert!(emit_clause.is_none());
1295 assert!(!or_replace);
1296 assert!(!if_not_exists);
1297 }
1298 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1299 }
1300 }
1301
1302 #[test]
1303 fn test_parse_create_materialized_view_with_emit() {
1304 let stmt = parse_one(
1305 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1306 );
1307 match stmt {
1308 StreamingStatement::CreateMaterializedView {
1309 name, emit_clause, ..
1310 } => {
1311 assert_eq!(name.to_string(), "live_stats");
1312 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1313 }
1314 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1315 }
1316 }
1317
1318 #[test]
1319 fn test_parse_create_or_replace_materialized_view() {
1320 let stmt = parse_one(
1321 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1322 );
1323 match stmt {
1324 StreamingStatement::CreateMaterializedView {
1325 name,
1326 or_replace,
1327 if_not_exists,
1328 ..
1329 } => {
1330 assert_eq!(name.to_string(), "live_stats");
1331 assert!(or_replace);
1332 assert!(!if_not_exists);
1333 }
1334 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1335 }
1336 }
1337
1338 #[test]
1339 fn test_parse_create_materialized_view_if_not_exists() {
1340 let stmt = parse_one(
1341 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1342 );
1343 match stmt {
1344 StreamingStatement::CreateMaterializedView {
1345 name,
1346 or_replace,
1347 if_not_exists,
1348 ..
1349 } => {
1350 assert_eq!(name.to_string(), "live_stats");
1351 assert!(!or_replace);
1352 assert!(if_not_exists);
1353 }
1354 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1355 }
1356 }
1357
1358 #[test]
1359 fn test_parse_insert_into() {
1360 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1361 match stmt {
1362 StreamingStatement::InsertInto {
1363 table_name,
1364 columns,
1365 values,
1366 } => {
1367 assert_eq!(table_name.to_string(), "events");
1368 assert_eq!(columns.len(), 2);
1369 assert_eq!(columns[0].to_string(), "id");
1370 assert_eq!(columns[1].to_string(), "name");
1371 assert_eq!(values.len(), 1);
1372 assert_eq!(values[0].len(), 2);
1373 }
1374 _ => panic!("Expected InsertInto, got {stmt:?}"),
1375 }
1376 }
1377
1378 #[test]
1379 fn test_parse_insert_into_multiple_rows() {
1380 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1381 match stmt {
1382 StreamingStatement::InsertInto {
1383 table_name,
1384 columns,
1385 values,
1386 } => {
1387 assert_eq!(table_name.to_string(), "events");
1388 assert!(columns.is_empty());
1389 assert_eq!(values.len(), 3);
1390 }
1391 _ => panic!("Expected InsertInto, got {stmt:?}"),
1392 }
1393 }
1394
1395 #[test]
1398 fn test_parse_create_stream() {
1399 let stmt = parse_one(
1400 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1401 );
1402 match stmt {
1403 StreamingStatement::CreateStream {
1404 name,
1405 or_replace,
1406 if_not_exists,
1407 emit_clause,
1408 ..
1409 } => {
1410 assert_eq!(name.to_string(), "session_activity");
1411 assert!(!or_replace);
1412 assert!(!if_not_exists);
1413 assert!(emit_clause.is_none());
1414 }
1415 _ => panic!("Expected CreateStream, got {stmt:?}"),
1416 }
1417 }
1418
1419 #[test]
1420 fn test_parse_create_or_replace_stream() {
1421 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1422 match stmt {
1423 StreamingStatement::CreateStream { or_replace, .. } => {
1424 assert!(or_replace);
1425 }
1426 _ => panic!("Expected CreateStream, got {stmt:?}"),
1427 }
1428 }
1429
1430 #[test]
1431 fn test_parse_create_stream_if_not_exists() {
1432 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1433 match stmt {
1434 StreamingStatement::CreateStream { if_not_exists, .. } => {
1435 assert!(if_not_exists);
1436 }
1437 _ => panic!("Expected CreateStream, got {stmt:?}"),
1438 }
1439 }
1440
1441 #[test]
1442 fn test_parse_create_stream_with_emit() {
1443 let stmt =
1444 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1445 match stmt {
1446 StreamingStatement::CreateStream { emit_clause, .. } => {
1447 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1448 }
1449 _ => panic!("Expected CreateStream, got {stmt:?}"),
1450 }
1451 }
1452
1453 #[test]
1454 fn create_stream_with_retain_history() {
1455 let stmt = parse_one(
1456 "CREATE STREAM trades AS SELECT * FROM events WITH ('retain_history' = '64mb')",
1457 );
1458 let StreamingStatement::CreateStream {
1459 retention_bytes, ..
1460 } = stmt
1461 else {
1462 panic!("expected CreateStream");
1463 };
1464 assert_eq!(retention_bytes, Some(64 * 1024 * 1024));
1465 }
1466
1467 #[test]
1468 fn create_stream_with_retain_history_after_emit() {
1469 let stmt = parse_one(
1470 "CREATE STREAM trades AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE \
1471 WITH ('retain_history' = '8mb')",
1472 );
1473 let StreamingStatement::CreateStream {
1474 retention_bytes,
1475 emit_clause,
1476 ..
1477 } = stmt
1478 else {
1479 panic!("expected CreateStream");
1480 };
1481 assert_eq!(retention_bytes, Some(8 * 1024 * 1024));
1482 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1483 }
1484
1485 #[test]
1486 fn create_stream_rejects_unknown_with_option() {
1487 let res = parse_streaming_sql("CREATE STREAM s AS SELECT * FROM e WITH ('bogus' = '1')");
1488 let err = res.expect_err("unknown WITH key must error");
1489 assert!(err.to_string().contains("bogus"), "got: {err}");
1490 }
1491
1492 #[test]
1493 fn create_stream_with_retain_history_excludes_with_from_query_sql() {
1494 let stmt = parse_one(
1498 "CREATE STREAM trades AS SELECT * FROM events WITH ('retain_history' = '64mb')",
1499 );
1500 let StreamingStatement::CreateStream { query_sql, .. } = stmt else {
1501 panic!("expected CreateStream");
1502 };
1503 assert!(
1504 !query_sql.to_uppercase().contains("RETAIN_HISTORY"),
1505 "query_sql leaked WITH clause: {query_sql}"
1506 );
1507 assert!(
1508 query_sql.contains("FROM events"),
1509 "query_sql should still hold the SELECT body, got: {query_sql}"
1510 );
1511 }
1512
1513 #[test]
1514 fn test_parse_drop_stream() {
1515 let stmt = parse_one("DROP STREAM my_stream");
1516 match stmt {
1517 StreamingStatement::DropStream {
1518 name,
1519 if_exists,
1520 cascade,
1521 } => {
1522 assert_eq!(name.to_string(), "my_stream");
1523 assert!(!if_exists);
1524 assert!(!cascade);
1525 }
1526 _ => panic!("Expected DropStream, got {stmt:?}"),
1527 }
1528 }
1529
1530 #[test]
1531 fn test_parse_drop_stream_if_exists() {
1532 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1533 match stmt {
1534 StreamingStatement::DropStream {
1535 name,
1536 if_exists,
1537 cascade,
1538 } => {
1539 assert_eq!(name.to_string(), "my_stream");
1540 assert!(if_exists);
1541 assert!(!cascade);
1542 }
1543 _ => panic!("Expected DropStream, got {stmt:?}"),
1544 }
1545 }
1546
1547 #[test]
1548 fn test_parse_drop_stream_cascade() {
1549 let stmt = parse_one("DROP STREAM my_stream CASCADE");
1550 match stmt {
1551 StreamingStatement::DropStream {
1552 name,
1553 if_exists,
1554 cascade,
1555 } => {
1556 assert_eq!(name.to_string(), "my_stream");
1557 assert!(!if_exists);
1558 assert!(cascade);
1559 }
1560 _ => panic!("Expected DropStream, got {stmt:?}"),
1561 }
1562 }
1563
1564 #[test]
1565 fn test_parse_show_streams() {
1566 let stmt = parse_one("SHOW STREAMS");
1567 assert!(matches!(
1568 stmt,
1569 StreamingStatement::Show(ShowCommand::Streams)
1570 ));
1571 }
1572
1573 #[test]
1574 fn test_parse_alter_source_add_column() {
1575 let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1576 match stmt {
1577 StreamingStatement::AlterSource { name, operation } => {
1578 assert_eq!(name.to_string(), "events");
1579 match operation {
1580 statements::AlterSourceOperation::AddColumn { column_def } => {
1581 assert_eq!(column_def.name.value, "new_col");
1582 assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1583 }
1584 statements::AlterSourceOperation::SetProperties { .. } => {
1585 panic!("Expected AddColumn")
1586 }
1587 }
1588 }
1589 _ => panic!("Expected AlterSource, got {stmt:?}"),
1590 }
1591 }
1592
1593 #[test]
1594 fn test_parse_alter_source_set_properties() {
1595 let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1596 match stmt {
1597 StreamingStatement::AlterSource { name, operation } => {
1598 assert_eq!(name.to_string(), "events");
1599 match operation {
1600 statements::AlterSourceOperation::SetProperties { properties } => {
1601 assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1602 assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1603 }
1604 statements::AlterSourceOperation::AddColumn { .. } => {
1605 panic!("Expected SetProperties")
1606 }
1607 }
1608 }
1609 _ => panic!("Expected AlterSource, got {stmt:?}"),
1610 }
1611 }
1612
1613 #[test]
1614 fn test_parse_checkpoint() {
1615 let stmt = parse_one("CHECKPOINT");
1616 assert!(
1617 matches!(stmt, StreamingStatement::Checkpoint),
1618 "Expected Checkpoint, got {stmt:?}"
1619 );
1620 }
1621
1622 #[test]
1623 fn test_parse_show_checkpoint_status() {
1624 let stmt = parse_one("SHOW CHECKPOINT STATUS");
1625 assert!(
1626 matches!(
1627 stmt,
1628 StreamingStatement::Show(ShowCommand::CheckpointStatus)
1629 ),
1630 "Expected Show(CheckpointStatus), got {stmt:?}"
1631 );
1632 }
1633
1634 #[test]
1635 fn test_parse_restore_checkpoint() {
1636 let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1637 match stmt {
1638 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1639 assert_eq!(checkpoint_id, 42);
1640 }
1641 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1642 }
1643 }
1644
1645 #[test]
1646 fn test_parse_restore_checkpoint_large_id() {
1647 let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1648 match stmt {
1649 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1650 assert_eq!(checkpoint_id, 123_456);
1651 }
1652 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1653 }
1654 }
1655
1656 #[test]
1657 fn test_parse_show_create_source() {
1658 let stmt = parse_one("SHOW CREATE SOURCE events");
1659 match stmt {
1660 StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1661 assert_eq!(name.to_string(), "events");
1662 }
1663 _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1664 }
1665 }
1666
1667 #[test]
1668 fn test_parse_show_create_sink() {
1669 let stmt = parse_one("SHOW CREATE SINK output");
1670 match stmt {
1671 StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1672 assert_eq!(name.to_string(), "output");
1673 }
1674 _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1675 }
1676 }
1677
1678 #[test]
1679 fn create_stream_preserves_temporal_probe_join() {
1680 let sql = "CREATE STREAM markouts_long AS \
1681 SELECT t.s, p.offset_ms FROM trade_probe t \
1682 TEMPORAL PROBE JOIN price_ref r \
1683 ON (s) TIMESTAMPS (ts, ts) \
1684 LIST (0s, 1s, 5s, 30s) AS p";
1685 let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1686 panic!("expected CreateStream");
1687 };
1688 assert!(
1689 query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1690 "got: {query_sql}"
1691 );
1692 assert!(
1693 query_sql.contains("LIST (0s, 1s, 5s, 30s)"),
1694 "got: {query_sql}"
1695 );
1696 assert!(query_sql.contains("AS p"), "got: {query_sql}");
1697 }
1698
1699 #[test]
1700 fn create_stream_temporal_probe_range_step_preserved() {
1701 let sql = "CREATE STREAM mk AS \
1702 SELECT s FROM trades t TEMPORAL PROBE JOIN prices r \
1703 ON (symbol) TIMESTAMPS (ts, ts) \
1704 RANGE FROM 0s TO 30s STEP 5s AS p";
1705 let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1706 panic!("expected CreateStream");
1707 };
1708 assert!(
1709 query_sql.contains("RANGE FROM 0s TO 30s STEP 5s"),
1710 "got: {query_sql}"
1711 );
1712 }
1713
1714 #[test]
1715 fn create_mv_preserves_temporal_probe_join() {
1716 let sql = "CREATE MATERIALIZED VIEW mv AS \
1717 SELECT t.s FROM trades t TEMPORAL PROBE JOIN prices r \
1718 ON (s) TIMESTAMPS (ts, ts) LIST (0s, 5s) AS p";
1719 let StreamingStatement::CreateMaterializedView { query_sql, .. } = parse_one(sql) else {
1720 panic!("expected CreateMaterializedView");
1721 };
1722 assert!(
1723 query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1724 "got: {query_sql}"
1725 );
1726 }
1727
1728 #[test]
1729 fn create_stream_emit_is_not_captured_in_query_sql() {
1730 let sql = "CREATE STREAM s AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE";
1731 let StreamingStatement::CreateStream {
1732 query_sql,
1733 emit_clause,
1734 ..
1735 } = parse_one(sql)
1736 else {
1737 panic!("expected CreateStream");
1738 };
1739 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1740 assert!(
1741 !query_sql.to_uppercase().contains("EMIT"),
1742 "got: {query_sql}"
1743 );
1744 assert!(query_sql.contains("FROM events"), "got: {query_sql}");
1745 }
1746
1747 #[test]
1748 fn create_stream_plain_select_query_sql_executes() {
1749 let sql = "CREATE STREAM counts AS SELECT COUNT(*) AS c FROM events";
1750 let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1751 panic!("expected CreateStream");
1752 };
1753 assert!(query_sql.to_uppercase().contains("SELECT"));
1754 assert!(query_sql.contains("FROM events"));
1755 }
1756}