1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod join_parser;
13mod late_data_parser;
14pub mod lookup_table;
16pub mod order_analyzer;
17mod sink_parser;
18mod source_parser;
19mod statements;
20mod tokenizer;
21mod window_rewriter;
22
23pub use lookup_table::CreateLookupTableStatement;
24pub use statements::{
25 AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
26 FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
27 WindowFunction,
28};
29pub use window_rewriter::WindowRewriter;
30
31use dialect::LaminarDialect;
32use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
33
34pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
43 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
44}
45
46pub struct StreamingParser;
52
53impl StreamingParser {
54 #[allow(clippy::too_many_lines)]
67 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
68 let sql_trimmed = sql.trim();
69 if sql_trimmed.is_empty() {
70 return Err(sqlparser::parser::ParserError::ParserError(
71 "Empty SQL statement".to_string(),
72 ));
73 }
74
75 let dialect = LaminarDialect::default();
76
77 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
79 .tokenize_with_location()
80 .map_err(|e| {
81 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
82 })?;
83
84 match detect_streaming_ddl(&tokens) {
86 StreamingDdlKind::CreateSource { .. } => {
87 let mut parser =
88 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
89 let source = source_parser::parse_create_source(&mut parser)
90 .map_err(parse_error_to_parser_error)?;
91 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
92 }
93 StreamingDdlKind::CreateSink { .. } => {
94 let mut parser =
95 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
96 let sink = sink_parser::parse_create_sink(&mut parser)
97 .map_err(parse_error_to_parser_error)?;
98 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
99 }
100 StreamingDdlKind::CreateContinuousQuery { .. } => {
101 let mut parser =
102 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
103 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
104 .map_err(parse_error_to_parser_error)?;
105 Ok(vec![stmt])
106 }
107 StreamingDdlKind::DropSource { .. } => {
108 let mut parser =
109 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
110 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
111 Ok(vec![stmt])
112 }
113 StreamingDdlKind::DropSink { .. } => {
114 let mut parser =
115 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
116 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
117 Ok(vec![stmt])
118 }
119 StreamingDdlKind::DropMaterializedView { .. } => {
120 let mut parser =
121 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
122 let stmt = parse_drop_materialized_view(&mut parser)
123 .map_err(parse_error_to_parser_error)?;
124 Ok(vec![stmt])
125 }
126 StreamingDdlKind::ShowSources => {
127 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
128 }
129 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
130 StreamingDdlKind::ShowQueries => {
131 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
132 }
133 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
134 ShowCommand::MaterializedViews,
135 )]),
136 StreamingDdlKind::DescribeSource => {
137 let mut parser =
138 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
139 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
140 Ok(vec![stmt])
141 }
142 StreamingDdlKind::ExplainStreaming => {
143 let mut parser =
144 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
145 let stmt =
146 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
147 Ok(vec![stmt])
148 }
149 StreamingDdlKind::CreateMaterializedView { .. } => {
150 let mut parser =
151 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
152 let stmt = parse_create_materialized_view(&mut parser, sql_trimmed)
153 .map_err(parse_error_to_parser_error)?;
154 Ok(vec![stmt])
155 }
156 StreamingDdlKind::CreateStream { .. } => {
157 let mut parser =
158 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
159 let stmt = parse_create_stream(&mut parser, sql_trimmed)
160 .map_err(parse_error_to_parser_error)?;
161 Ok(vec![stmt])
162 }
163 StreamingDdlKind::DropStream { .. } => {
164 let mut parser =
165 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
166 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
167 Ok(vec![stmt])
168 }
169 StreamingDdlKind::ShowStreams => {
170 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
171 }
172 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
173 StreamingDdlKind::CreateLookupTable { .. } => {
174 let mut parser =
175 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
176 let lt = lookup_table::parse_create_lookup_table(&mut parser)
177 .map_err(parse_error_to_parser_error)?;
178 Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
179 }
180 StreamingDdlKind::DropLookupTable { .. } => {
181 let mut parser =
182 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
183 let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
184 .map_err(parse_error_to_parser_error)?;
185 Ok(vec![StreamingStatement::DropLookupTable {
186 name,
187 if_exists,
188 }])
189 }
190 StreamingDdlKind::AlterSource => {
191 let mut parser =
192 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
193 let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
194 Ok(vec![stmt])
195 }
196 StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
197 ShowCommand::CheckpointStatus,
198 )]),
199 StreamingDdlKind::ShowCreateSource => {
200 let mut parser =
201 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
202 let stmt =
203 parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
204 Ok(vec![stmt])
205 }
206 StreamingDdlKind::ShowCreateSink => {
207 let mut parser =
208 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
209 let stmt =
210 parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
211 Ok(vec![stmt])
212 }
213 StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
214 StreamingDdlKind::RestoreCheckpoint => {
215 let mut parser =
216 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
217 let stmt =
218 parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
219 Ok(vec![stmt])
220 }
221 StreamingDdlKind::None => {
222 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
224 Ok(statements
225 .into_iter()
226 .map(convert_standard_statement)
227 .collect())
228 }
229 }
230 }
231
232 #[must_use]
234 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
235 match expr {
236 sqlparser::ast::Expr::Function(func) => {
237 if let Some(name) = func.name.0.last() {
238 let func_name = name.to_string().to_uppercase();
239 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
240 } else {
241 false
242 }
243 }
244 _ => false,
245 }
246 }
247
248 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
254 emit_parser::parse_emit_clause_from_sql(sql)
255 }
256
257 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
263 late_data_parser::parse_late_data_clause_from_sql(sql)
264 }
265}
266
267fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
269 match e {
270 ParseError::SqlParseError(pe) => pe,
271 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
272 ParseError::WindowError(msg) => {
273 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
274 }
275 ParseError::ValidationError(msg) => {
276 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
277 }
278 }
279}
280
281fn parse_restore_checkpoint(
289 parser: &mut sqlparser::parser::Parser,
290) -> Result<StreamingStatement, ParseError> {
291 tokenizer::expect_custom_keyword(parser, "RESTORE")?;
293 if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
295 return Err(ParseError::StreamingError(
296 "Expected FROM after RESTORE".to_string(),
297 ));
298 }
299 tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
301 let token = parser.next_token();
303 match &token.token {
304 sqlparser::tokenizer::Token::Number(n, _) => {
305 let id: u64 = n
306 .parse()
307 .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
308 Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
309 }
310 other => Err(ParseError::StreamingError(format!(
311 "Expected checkpoint ID (number), found {other}"
312 ))),
313 }
314}
315
316fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
321 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
322 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
324 let table_name = name.clone();
325 let columns = insert.columns.clone();
326
327 if let Some(ref source) = insert.source {
329 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
330 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
331 return StreamingStatement::InsertInto {
332 table_name,
333 columns,
334 values: rows,
335 };
336 }
337 }
338 }
339 }
340 StreamingStatement::Standard(Box::new(stmt))
341}
342
343fn parse_drop_source(
351 parser: &mut sqlparser::parser::Parser,
352) -> Result<StreamingStatement, ParseError> {
353 parser
354 .expect_keyword(sqlparser::keywords::Keyword::DROP)
355 .map_err(ParseError::SqlParseError)?;
356 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
357 let if_exists = parser.parse_keywords(&[
358 sqlparser::keywords::Keyword::IF,
359 sqlparser::keywords::Keyword::EXISTS,
360 ]);
361 let name = parser
362 .parse_object_name(false)
363 .map_err(ParseError::SqlParseError)?;
364 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
365 Ok(StreamingStatement::DropSource {
366 name,
367 if_exists,
368 cascade,
369 })
370}
371
372fn parse_drop_sink(
380 parser: &mut sqlparser::parser::Parser,
381) -> Result<StreamingStatement, ParseError> {
382 parser
383 .expect_keyword(sqlparser::keywords::Keyword::DROP)
384 .map_err(ParseError::SqlParseError)?;
385 tokenizer::expect_custom_keyword(parser, "SINK")?;
386 let if_exists = parser.parse_keywords(&[
387 sqlparser::keywords::Keyword::IF,
388 sqlparser::keywords::Keyword::EXISTS,
389 ]);
390 let name = parser
391 .parse_object_name(false)
392 .map_err(ParseError::SqlParseError)?;
393 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
394 Ok(StreamingStatement::DropSink {
395 name,
396 if_exists,
397 cascade,
398 })
399}
400
401fn parse_drop_materialized_view(
409 parser: &mut sqlparser::parser::Parser,
410) -> Result<StreamingStatement, ParseError> {
411 parser
412 .expect_keyword(sqlparser::keywords::Keyword::DROP)
413 .map_err(ParseError::SqlParseError)?;
414 parser
415 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
416 .map_err(ParseError::SqlParseError)?;
417 parser
418 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
419 .map_err(ParseError::SqlParseError)?;
420 let if_exists = parser.parse_keywords(&[
421 sqlparser::keywords::Keyword::IF,
422 sqlparser::keywords::Keyword::EXISTS,
423 ]);
424 let name = parser
425 .parse_object_name(false)
426 .map_err(ParseError::SqlParseError)?;
427 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
428 Ok(StreamingStatement::DropMaterializedView {
429 name,
430 if_exists,
431 cascade,
432 })
433}
434
435fn parse_create_stream(
443 parser: &mut sqlparser::parser::Parser,
444 original_sql: &str,
445) -> Result<StreamingStatement, ParseError> {
446 parser
447 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
448 .map_err(ParseError::SqlParseError)?;
449
450 let or_replace = parser.parse_keywords(&[
451 sqlparser::keywords::Keyword::OR,
452 sqlparser::keywords::Keyword::REPLACE,
453 ]);
454
455 tokenizer::expect_custom_keyword(parser, "STREAM")?;
456
457 let if_not_exists = parser.parse_keywords(&[
458 sqlparser::keywords::Keyword::IF,
459 sqlparser::keywords::Keyword::NOT,
460 sqlparser::keywords::Keyword::EXISTS,
461 ]);
462
463 let name = parser
464 .parse_object_name(false)
465 .map_err(ParseError::SqlParseError)?;
466
467 parser
468 .expect_keyword(sqlparser::keywords::Keyword::AS)
469 .map_err(ParseError::SqlParseError)?;
470
471 let remaining = collect_remaining_tokens(parser);
473 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
474
475 let query_sql = query_body_sql(original_sql, &query_tokens, &emit_tokens);
476
477 let stream_dialect = LaminarDialect::default();
478
479 let query = if query_tokens.is_empty() {
480 return Err(ParseError::StreamingError(
481 "Expected SELECT query after AS".to_string(),
482 ));
483 } else {
484 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
485 .with_tokens_with_locations(query_tokens);
486 query_parser
487 .parse_query()
488 .map_err(ParseError::SqlParseError)?
489 };
490
491 let query_stmt =
492 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
493
494 let emit_clause = if emit_tokens.is_empty() {
495 None
496 } else {
497 let mut emit_parser =
498 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
499 emit_parser::parse_emit_clause(&mut emit_parser)?
500 };
501
502 Ok(StreamingStatement::CreateStream {
503 name,
504 query: Box::new(query_stmt),
505 emit_clause,
506 or_replace,
507 if_not_exists,
508 query_sql,
509 })
510}
511
512pub(super) fn query_body_sql(
516 original_sql: &str,
517 query_tokens: &[sqlparser::tokenizer::TokenWithSpan],
518 emit_tokens: &[sqlparser::tokenizer::TokenWithSpan],
519) -> String {
520 use sqlparser::tokenizer::Token;
521
522 let from_spans = || -> Option<String> {
523 let first = query_tokens
524 .iter()
525 .find(|t| !matches!(t.token, Token::EOF))?;
526 let start = location_to_byte_offset(original_sql, first.span.start)?;
527 let end = match emit_tokens.first() {
528 Some(t) => location_to_byte_offset(original_sql, t.span.start)?,
529 None => original_sql.len(),
530 };
531 let slice = original_sql.get(start..end)?;
532 Some(
533 slice
534 .trim_end_matches(|c: char| c.is_whitespace() || c == ';')
535 .to_string(),
536 )
537 };
538
539 from_spans().unwrap_or_else(|| {
540 query_tokens
541 .iter()
542 .take_while(|t| !matches!(t.token, Token::EOF))
543 .map(|t| t.token.to_string())
544 .collect::<Vec<_>>()
545 .join(" ")
546 })
547}
548
549fn parse_drop_stream(
557 parser: &mut sqlparser::parser::Parser,
558) -> Result<StreamingStatement, ParseError> {
559 parser
560 .expect_keyword(sqlparser::keywords::Keyword::DROP)
561 .map_err(ParseError::SqlParseError)?;
562 tokenizer::expect_custom_keyword(parser, "STREAM")?;
563 let if_exists = parser.parse_keywords(&[
564 sqlparser::keywords::Keyword::IF,
565 sqlparser::keywords::Keyword::EXISTS,
566 ]);
567 let name = parser
568 .parse_object_name(false)
569 .map_err(ParseError::SqlParseError)?;
570 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
571 Ok(StreamingStatement::DropStream {
572 name,
573 if_exists,
574 cascade,
575 })
576}
577
578fn parse_alter_source(
591 parser: &mut sqlparser::parser::Parser,
592) -> Result<StreamingStatement, ParseError> {
593 parser
594 .expect_keyword(sqlparser::keywords::Keyword::ALTER)
595 .map_err(ParseError::SqlParseError)?;
596 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
597 let name = parser
598 .parse_object_name(false)
599 .map_err(ParseError::SqlParseError)?;
600
601 if parser.parse_keywords(&[
603 sqlparser::keywords::Keyword::ADD,
604 sqlparser::keywords::Keyword::COLUMN,
605 ]) {
606 let col_name = parser
608 .parse_identifier()
609 .map_err(ParseError::SqlParseError)?;
610 let data_type = parser
611 .parse_data_type()
612 .map_err(ParseError::SqlParseError)?;
613 let column_def = sqlparser::ast::ColumnDef {
614 name: col_name,
615 data_type,
616 options: vec![],
617 };
618 Ok(StreamingStatement::AlterSource {
619 name,
620 operation: statements::AlterSourceOperation::AddColumn { column_def },
621 })
622 } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
623 parser
625 .expect_token(&sqlparser::tokenizer::Token::LParen)
626 .map_err(ParseError::SqlParseError)?;
627 #[allow(clippy::disallowed_types)] let mut properties = std::collections::HashMap::new();
629 loop {
630 let key = parser
631 .parse_literal_string()
632 .map_err(ParseError::SqlParseError)?;
633 parser
634 .expect_token(&sqlparser::tokenizer::Token::Eq)
635 .map_err(ParseError::SqlParseError)?;
636 let value = parser
637 .parse_literal_string()
638 .map_err(ParseError::SqlParseError)?;
639 properties.insert(key, value);
640 if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
641 break;
642 }
643 }
644 parser
645 .expect_token(&sqlparser::tokenizer::Token::RParen)
646 .map_err(ParseError::SqlParseError)?;
647 Ok(StreamingStatement::AlterSource {
648 name,
649 operation: statements::AlterSourceOperation::SetProperties { properties },
650 })
651 } else {
652 Err(ParseError::StreamingError(
653 "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
654 ))
655 }
656}
657
658fn parse_describe(
664 parser: &mut sqlparser::parser::Parser,
665) -> Result<StreamingStatement, ParseError> {
666 let token = parser.next_token();
668 match &token.token {
669 sqlparser::tokenizer::Token::Word(w)
670 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
671 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
672 _ => {
673 return Err(ParseError::StreamingError(
674 "Expected DESCRIBE or DESC".to_string(),
675 ));
676 }
677 }
678 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
679 let name = parser
680 .parse_object_name(false)
681 .map_err(ParseError::SqlParseError)?;
682 Ok(StreamingStatement::Describe { name, extended })
683}
684
685fn parse_show_create_source(
687 parser: &mut sqlparser::parser::Parser,
688) -> Result<StreamingStatement, ParseError> {
689 parser
691 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
692 .map_err(ParseError::SqlParseError)?;
693 parser
694 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
695 .map_err(ParseError::SqlParseError)?;
696 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
697 let name = parser
698 .parse_object_name(false)
699 .map_err(ParseError::SqlParseError)?;
700 Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
701}
702
703fn parse_show_create_sink(
705 parser: &mut sqlparser::parser::Parser,
706) -> Result<StreamingStatement, ParseError> {
707 parser
709 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
710 .map_err(ParseError::SqlParseError)?;
711 parser
712 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
713 .map_err(ParseError::SqlParseError)?;
714 tokenizer::expect_custom_keyword(parser, "SINK")?;
715 let name = parser
716 .parse_object_name(false)
717 .map_err(ParseError::SqlParseError)?;
718 Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
719}
720
721fn parse_explain(
729 parser: &mut sqlparser::parser::Parser,
730 original_sql: &str,
731) -> Result<StreamingStatement, ParseError> {
732 parser
733 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
734 .map_err(ParseError::SqlParseError)?;
735
736 let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
738
739 let explain_prefix_upper = original_sql.to_uppercase();
741 let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
742 let inner_start = if analyze {
743 explain_prefix_upper
744 .find("ANALYZE")
745 .map_or(0, |pos| pos + "ANALYZE".len())
746 } else {
747 explain_prefix_upper
748 .find("EXPLAIN")
749 .map_or(0, |pos| pos + "EXPLAIN".len())
750 };
751 let inner_sql = original_sql[inner_start..].trim();
752 let _ = skip_keyword; let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
756 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
757 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
758 })?;
759 Ok(StreamingStatement::Explain {
760 statement: Box::new(inner),
761 analyze,
762 })
763}
764
765fn parse_create_materialized_view(
778 parser: &mut sqlparser::parser::Parser,
779 original_sql: &str,
780) -> Result<StreamingStatement, ParseError> {
781 parser
782 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
783 .map_err(ParseError::SqlParseError)?;
784
785 let or_replace = parser.parse_keywords(&[
786 sqlparser::keywords::Keyword::OR,
787 sqlparser::keywords::Keyword::REPLACE,
788 ]);
789
790 parser
791 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
792 .map_err(ParseError::SqlParseError)?;
793 parser
794 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
795 .map_err(ParseError::SqlParseError)?;
796
797 let if_not_exists = parser.parse_keywords(&[
798 sqlparser::keywords::Keyword::IF,
799 sqlparser::keywords::Keyword::NOT,
800 sqlparser::keywords::Keyword::EXISTS,
801 ]);
802
803 let name = parser
804 .parse_object_name(false)
805 .map_err(ParseError::SqlParseError)?;
806
807 parser
808 .expect_keyword(sqlparser::keywords::Keyword::AS)
809 .map_err(ParseError::SqlParseError)?;
810
811 let remaining = collect_remaining_tokens(parser);
813 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
814
815 let query_sql = query_body_sql(original_sql, &query_tokens, &emit_tokens);
816
817 let mv_dialect = LaminarDialect::default();
818
819 let query = if query_tokens.is_empty() {
820 return Err(ParseError::StreamingError(
821 "Expected SELECT query after AS".to_string(),
822 ));
823 } else {
824 let mut query_parser =
825 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
826 query_parser
827 .parse_query()
828 .map_err(ParseError::SqlParseError)?
829 };
830
831 let query_stmt =
832 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
833
834 let emit_clause = if emit_tokens.is_empty() {
835 None
836 } else {
837 let mut emit_parser =
838 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
839 emit_parser::parse_emit_clause(&mut emit_parser)?
840 };
841
842 Ok(StreamingStatement::CreateMaterializedView {
843 name,
844 query: Box::new(query_stmt),
845 emit_clause,
846 or_replace,
847 if_not_exists,
848 query_sql,
849 })
850}
851
852fn location_to_byte_offset(sql: &str, loc: sqlparser::tokenizer::Location) -> Option<usize> {
854 if loc.line == 0 {
855 return None;
856 }
857 let (mut line, mut col) = (1u64, 1u64);
858 for (idx, ch) in sql.char_indices() {
859 if line == loc.line && col == loc.column {
860 return Some(idx);
861 }
862 if ch == '\n' {
863 line += 1;
864 col = 1;
865 } else {
866 col += 1;
867 }
868 }
869 (line == loc.line && col == loc.column).then_some(sql.len())
870}
871
872fn collect_remaining_tokens(
874 parser: &mut sqlparser::parser::Parser,
875) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
876 let mut tokens = Vec::new();
877 loop {
878 let token = parser.next_token();
879 if token.token == sqlparser::tokenizer::Token::EOF {
880 tokens.push(token);
881 break;
882 }
883 tokens.push(token);
884 }
885 tokens
886}
887
888fn split_at_emit(
893 tokens: &[sqlparser::tokenizer::TokenWithSpan],
894) -> (
895 Vec<sqlparser::tokenizer::TokenWithSpan>,
896 Vec<sqlparser::tokenizer::TokenWithSpan>,
897) {
898 let mut depth: i32 = 0;
899 for (i, token) in tokens.iter().enumerate() {
900 match &token.token {
901 sqlparser::tokenizer::Token::LParen => depth += 1,
902 sqlparser::tokenizer::Token::RParen => {
903 depth -= 1;
904 }
905 sqlparser::tokenizer::Token::Word(w)
906 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
907 {
908 let mut query_tokens = tokens[..i].to_vec();
909 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
910 token: sqlparser::tokenizer::Token::EOF,
911 span: sqlparser::tokenizer::Span::empty(),
912 });
913 let emit_tokens = tokens[i..].to_vec();
914 return (query_tokens, emit_tokens);
915 }
916 _ => {}
917 }
918 }
919 (tokens.to_vec(), vec![])
920}
921
922#[derive(Debug, thiserror::Error)]
924pub enum ParseError {
925 #[error("SQL parse error: {0}")]
927 SqlParseError(#[from] sqlparser::parser::ParserError),
928
929 #[error("Streaming SQL error: {0}")]
931 StreamingError(String),
932
933 #[error("Window function error: {0}")]
935 WindowError(String),
936
937 #[error("Validation error: {0}")]
939 ValidationError(String),
940}
941
942#[cfg(test)]
943mod tests {
944 use super::*;
945
946 fn parse_one(sql: &str) -> StreamingStatement {
948 let stmts = StreamingParser::parse_sql(sql).unwrap();
949 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
950 stmts.into_iter().next().unwrap()
951 }
952
953 #[test]
954 fn test_parse_drop_source() {
955 let stmt = parse_one("DROP SOURCE events");
956 match stmt {
957 StreamingStatement::DropSource {
958 name,
959 if_exists,
960 cascade,
961 } => {
962 assert_eq!(name.to_string(), "events");
963 assert!(!if_exists);
964 assert!(!cascade);
965 }
966 _ => panic!("Expected DropSource, got {stmt:?}"),
967 }
968 }
969
970 #[test]
971 fn test_parse_drop_source_if_exists() {
972 let stmt = parse_one("DROP SOURCE IF EXISTS events");
973 match stmt {
974 StreamingStatement::DropSource {
975 name,
976 if_exists,
977 cascade,
978 } => {
979 assert_eq!(name.to_string(), "events");
980 assert!(if_exists);
981 assert!(!cascade);
982 }
983 _ => panic!("Expected DropSource, got {stmt:?}"),
984 }
985 }
986
987 #[test]
988 fn test_parse_drop_source_cascade() {
989 let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
990 match stmt {
991 StreamingStatement::DropSource {
992 name,
993 if_exists,
994 cascade,
995 } => {
996 assert_eq!(name.to_string(), "events");
997 assert!(if_exists);
998 assert!(cascade);
999 }
1000 _ => panic!("Expected DropSource, got {stmt:?}"),
1001 }
1002 }
1003
1004 #[test]
1005 fn test_parse_drop_sink() {
1006 let stmt = parse_one("DROP SINK output");
1007 match stmt {
1008 StreamingStatement::DropSink {
1009 name,
1010 if_exists,
1011 cascade,
1012 } => {
1013 assert_eq!(name.to_string(), "output");
1014 assert!(!if_exists);
1015 assert!(!cascade);
1016 }
1017 _ => panic!("Expected DropSink, got {stmt:?}"),
1018 }
1019 }
1020
1021 #[test]
1022 fn test_parse_drop_sink_if_exists() {
1023 let stmt = parse_one("DROP SINK IF EXISTS output");
1024 match stmt {
1025 StreamingStatement::DropSink {
1026 name,
1027 if_exists,
1028 cascade,
1029 } => {
1030 assert_eq!(name.to_string(), "output");
1031 assert!(if_exists);
1032 assert!(!cascade);
1033 }
1034 _ => panic!("Expected DropSink, got {stmt:?}"),
1035 }
1036 }
1037
1038 #[test]
1039 fn test_parse_drop_sink_cascade() {
1040 let stmt = parse_one("DROP SINK output CASCADE");
1041 match stmt {
1042 StreamingStatement::DropSink {
1043 name,
1044 if_exists,
1045 cascade,
1046 } => {
1047 assert_eq!(name.to_string(), "output");
1048 assert!(!if_exists);
1049 assert!(cascade);
1050 }
1051 _ => panic!("Expected DropSink, got {stmt:?}"),
1052 }
1053 }
1054
1055 #[test]
1056 fn test_parse_drop_materialized_view() {
1057 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
1058 match stmt {
1059 StreamingStatement::DropMaterializedView {
1060 name,
1061 if_exists,
1062 cascade,
1063 } => {
1064 assert_eq!(name.to_string(), "live_stats");
1065 assert!(!if_exists);
1066 assert!(!cascade);
1067 }
1068 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1069 }
1070 }
1071
1072 #[test]
1073 fn test_parse_drop_materialized_view_if_exists_cascade() {
1074 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1075 match stmt {
1076 StreamingStatement::DropMaterializedView {
1077 name,
1078 if_exists,
1079 cascade,
1080 } => {
1081 assert_eq!(name.to_string(), "live_stats");
1082 assert!(if_exists);
1083 assert!(cascade);
1084 }
1085 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1086 }
1087 }
1088
1089 #[test]
1090 fn test_parse_show_sources() {
1091 let stmt = parse_one("SHOW SOURCES");
1092 assert!(matches!(
1093 stmt,
1094 StreamingStatement::Show(ShowCommand::Sources)
1095 ));
1096 }
1097
1098 #[test]
1099 fn test_parse_show_sinks() {
1100 let stmt = parse_one("SHOW SINKS");
1101 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1102 }
1103
1104 #[test]
1105 fn test_parse_show_queries() {
1106 let stmt = parse_one("SHOW QUERIES");
1107 assert!(matches!(
1108 stmt,
1109 StreamingStatement::Show(ShowCommand::Queries)
1110 ));
1111 }
1112
1113 #[test]
1114 fn test_parse_show_materialized_views() {
1115 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1116 assert!(matches!(
1117 stmt,
1118 StreamingStatement::Show(ShowCommand::MaterializedViews)
1119 ));
1120 }
1121
1122 #[test]
1123 fn test_parse_describe() {
1124 let stmt = parse_one("DESCRIBE events");
1125 match stmt {
1126 StreamingStatement::Describe { name, extended } => {
1127 assert_eq!(name.to_string(), "events");
1128 assert!(!extended);
1129 }
1130 _ => panic!("Expected Describe, got {stmt:?}"),
1131 }
1132 }
1133
1134 #[test]
1135 fn test_parse_describe_extended() {
1136 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1137 match stmt {
1138 StreamingStatement::Describe { name, extended } => {
1139 assert_eq!(name.to_string(), "my_schema.events");
1140 assert!(extended);
1141 }
1142 _ => panic!("Expected Describe, got {stmt:?}"),
1143 }
1144 }
1145
1146 #[test]
1147 fn test_parse_explain_select() {
1148 let stmt = parse_one("EXPLAIN SELECT * FROM events");
1149 match stmt {
1150 StreamingStatement::Explain {
1151 statement, analyze, ..
1152 } => {
1153 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1154 assert!(!analyze);
1155 }
1156 _ => panic!("Expected Explain, got {stmt:?}"),
1157 }
1158 }
1159
1160 #[test]
1161 fn test_parse_explain_create_source() {
1162 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1163 match stmt {
1164 StreamingStatement::Explain { statement, .. } => {
1165 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1166 }
1167 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1168 }
1169 }
1170
1171 #[test]
1172 fn test_parse_explain_analyze_select() {
1173 let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1174 match stmt {
1175 StreamingStatement::Explain {
1176 statement, analyze, ..
1177 } => {
1178 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1179 assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1180 }
1181 _ => panic!("Expected Explain, got {stmt:?}"),
1182 }
1183 }
1184
1185 #[test]
1186 fn test_parse_create_materialized_view() {
1187 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1188 match stmt {
1189 StreamingStatement::CreateMaterializedView {
1190 name,
1191 emit_clause,
1192 or_replace,
1193 if_not_exists,
1194 ..
1195 } => {
1196 assert_eq!(name.to_string(), "live_stats");
1197 assert!(emit_clause.is_none());
1198 assert!(!or_replace);
1199 assert!(!if_not_exists);
1200 }
1201 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1202 }
1203 }
1204
1205 #[test]
1206 fn test_parse_create_materialized_view_with_emit() {
1207 let stmt = parse_one(
1208 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1209 );
1210 match stmt {
1211 StreamingStatement::CreateMaterializedView {
1212 name, emit_clause, ..
1213 } => {
1214 assert_eq!(name.to_string(), "live_stats");
1215 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1216 }
1217 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1218 }
1219 }
1220
1221 #[test]
1222 fn test_parse_create_or_replace_materialized_view() {
1223 let stmt = parse_one(
1224 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1225 );
1226 match stmt {
1227 StreamingStatement::CreateMaterializedView {
1228 name,
1229 or_replace,
1230 if_not_exists,
1231 ..
1232 } => {
1233 assert_eq!(name.to_string(), "live_stats");
1234 assert!(or_replace);
1235 assert!(!if_not_exists);
1236 }
1237 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1238 }
1239 }
1240
1241 #[test]
1242 fn test_parse_create_materialized_view_if_not_exists() {
1243 let stmt = parse_one(
1244 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1245 );
1246 match stmt {
1247 StreamingStatement::CreateMaterializedView {
1248 name,
1249 or_replace,
1250 if_not_exists,
1251 ..
1252 } => {
1253 assert_eq!(name.to_string(), "live_stats");
1254 assert!(!or_replace);
1255 assert!(if_not_exists);
1256 }
1257 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1258 }
1259 }
1260
1261 #[test]
1262 fn test_parse_insert_into() {
1263 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1264 match stmt {
1265 StreamingStatement::InsertInto {
1266 table_name,
1267 columns,
1268 values,
1269 } => {
1270 assert_eq!(table_name.to_string(), "events");
1271 assert_eq!(columns.len(), 2);
1272 assert_eq!(columns[0].to_string(), "id");
1273 assert_eq!(columns[1].to_string(), "name");
1274 assert_eq!(values.len(), 1);
1275 assert_eq!(values[0].len(), 2);
1276 }
1277 _ => panic!("Expected InsertInto, got {stmt:?}"),
1278 }
1279 }
1280
1281 #[test]
1282 fn test_parse_insert_into_multiple_rows() {
1283 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1284 match stmt {
1285 StreamingStatement::InsertInto {
1286 table_name,
1287 columns,
1288 values,
1289 } => {
1290 assert_eq!(table_name.to_string(), "events");
1291 assert!(columns.is_empty());
1292 assert_eq!(values.len(), 3);
1293 }
1294 _ => panic!("Expected InsertInto, got {stmt:?}"),
1295 }
1296 }
1297
1298 #[test]
1301 fn test_parse_create_stream() {
1302 let stmt = parse_one(
1303 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1304 );
1305 match stmt {
1306 StreamingStatement::CreateStream {
1307 name,
1308 or_replace,
1309 if_not_exists,
1310 emit_clause,
1311 ..
1312 } => {
1313 assert_eq!(name.to_string(), "session_activity");
1314 assert!(!or_replace);
1315 assert!(!if_not_exists);
1316 assert!(emit_clause.is_none());
1317 }
1318 _ => panic!("Expected CreateStream, got {stmt:?}"),
1319 }
1320 }
1321
1322 #[test]
1323 fn test_parse_create_or_replace_stream() {
1324 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1325 match stmt {
1326 StreamingStatement::CreateStream { or_replace, .. } => {
1327 assert!(or_replace);
1328 }
1329 _ => panic!("Expected CreateStream, got {stmt:?}"),
1330 }
1331 }
1332
1333 #[test]
1334 fn test_parse_create_stream_if_not_exists() {
1335 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1336 match stmt {
1337 StreamingStatement::CreateStream { if_not_exists, .. } => {
1338 assert!(if_not_exists);
1339 }
1340 _ => panic!("Expected CreateStream, got {stmt:?}"),
1341 }
1342 }
1343
1344 #[test]
1345 fn test_parse_create_stream_with_emit() {
1346 let stmt =
1347 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1348 match stmt {
1349 StreamingStatement::CreateStream { emit_clause, .. } => {
1350 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1351 }
1352 _ => panic!("Expected CreateStream, got {stmt:?}"),
1353 }
1354 }
1355
1356 #[test]
1357 fn test_parse_drop_stream() {
1358 let stmt = parse_one("DROP STREAM my_stream");
1359 match stmt {
1360 StreamingStatement::DropStream {
1361 name,
1362 if_exists,
1363 cascade,
1364 } => {
1365 assert_eq!(name.to_string(), "my_stream");
1366 assert!(!if_exists);
1367 assert!(!cascade);
1368 }
1369 _ => panic!("Expected DropStream, got {stmt:?}"),
1370 }
1371 }
1372
1373 #[test]
1374 fn test_parse_drop_stream_if_exists() {
1375 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1376 match stmt {
1377 StreamingStatement::DropStream {
1378 name,
1379 if_exists,
1380 cascade,
1381 } => {
1382 assert_eq!(name.to_string(), "my_stream");
1383 assert!(if_exists);
1384 assert!(!cascade);
1385 }
1386 _ => panic!("Expected DropStream, got {stmt:?}"),
1387 }
1388 }
1389
1390 #[test]
1391 fn test_parse_drop_stream_cascade() {
1392 let stmt = parse_one("DROP STREAM my_stream CASCADE");
1393 match stmt {
1394 StreamingStatement::DropStream {
1395 name,
1396 if_exists,
1397 cascade,
1398 } => {
1399 assert_eq!(name.to_string(), "my_stream");
1400 assert!(!if_exists);
1401 assert!(cascade);
1402 }
1403 _ => panic!("Expected DropStream, got {stmt:?}"),
1404 }
1405 }
1406
1407 #[test]
1408 fn test_parse_show_streams() {
1409 let stmt = parse_one("SHOW STREAMS");
1410 assert!(matches!(
1411 stmt,
1412 StreamingStatement::Show(ShowCommand::Streams)
1413 ));
1414 }
1415
1416 #[test]
1417 fn test_parse_alter_source_add_column() {
1418 let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1419 match stmt {
1420 StreamingStatement::AlterSource { name, operation } => {
1421 assert_eq!(name.to_string(), "events");
1422 match operation {
1423 statements::AlterSourceOperation::AddColumn { column_def } => {
1424 assert_eq!(column_def.name.value, "new_col");
1425 assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1426 }
1427 statements::AlterSourceOperation::SetProperties { .. } => {
1428 panic!("Expected AddColumn")
1429 }
1430 }
1431 }
1432 _ => panic!("Expected AlterSource, got {stmt:?}"),
1433 }
1434 }
1435
1436 #[test]
1437 fn test_parse_alter_source_set_properties() {
1438 let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1439 match stmt {
1440 StreamingStatement::AlterSource { name, operation } => {
1441 assert_eq!(name.to_string(), "events");
1442 match operation {
1443 statements::AlterSourceOperation::SetProperties { properties } => {
1444 assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1445 assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1446 }
1447 statements::AlterSourceOperation::AddColumn { .. } => {
1448 panic!("Expected SetProperties")
1449 }
1450 }
1451 }
1452 _ => panic!("Expected AlterSource, got {stmt:?}"),
1453 }
1454 }
1455
1456 #[test]
1457 fn test_parse_checkpoint() {
1458 let stmt = parse_one("CHECKPOINT");
1459 assert!(
1460 matches!(stmt, StreamingStatement::Checkpoint),
1461 "Expected Checkpoint, got {stmt:?}"
1462 );
1463 }
1464
1465 #[test]
1466 fn test_parse_show_checkpoint_status() {
1467 let stmt = parse_one("SHOW CHECKPOINT STATUS");
1468 assert!(
1469 matches!(
1470 stmt,
1471 StreamingStatement::Show(ShowCommand::CheckpointStatus)
1472 ),
1473 "Expected Show(CheckpointStatus), got {stmt:?}"
1474 );
1475 }
1476
1477 #[test]
1478 fn test_parse_restore_checkpoint() {
1479 let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1480 match stmt {
1481 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1482 assert_eq!(checkpoint_id, 42);
1483 }
1484 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1485 }
1486 }
1487
1488 #[test]
1489 fn test_parse_restore_checkpoint_large_id() {
1490 let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1491 match stmt {
1492 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1493 assert_eq!(checkpoint_id, 123_456);
1494 }
1495 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1496 }
1497 }
1498
1499 #[test]
1500 fn test_parse_show_create_source() {
1501 let stmt = parse_one("SHOW CREATE SOURCE events");
1502 match stmt {
1503 StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1504 assert_eq!(name.to_string(), "events");
1505 }
1506 _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1507 }
1508 }
1509
1510 #[test]
1511 fn test_parse_show_create_sink() {
1512 let stmt = parse_one("SHOW CREATE SINK output");
1513 match stmt {
1514 StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1515 assert_eq!(name.to_string(), "output");
1516 }
1517 _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1518 }
1519 }
1520
1521 #[test]
1522 fn create_stream_preserves_temporal_probe_join() {
1523 let sql = "CREATE STREAM markouts_long AS \
1524 SELECT t.s, p.offset_ms FROM trade_probe t \
1525 TEMPORAL PROBE JOIN price_ref r \
1526 ON (s) TIMESTAMPS (ts, ts) \
1527 LIST (0s, 1s, 5s, 30s) AS p";
1528 let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1529 panic!("expected CreateStream");
1530 };
1531 assert!(
1532 query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1533 "got: {query_sql}"
1534 );
1535 assert!(
1536 query_sql.contains("LIST (0s, 1s, 5s, 30s)"),
1537 "got: {query_sql}"
1538 );
1539 assert!(query_sql.contains("AS p"), "got: {query_sql}");
1540 }
1541
1542 #[test]
1543 fn create_stream_temporal_probe_range_step_preserved() {
1544 let sql = "CREATE STREAM mk AS \
1545 SELECT s FROM trades t TEMPORAL PROBE JOIN prices r \
1546 ON (symbol) TIMESTAMPS (ts, ts) \
1547 RANGE FROM 0s TO 30s STEP 5s AS p";
1548 let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1549 panic!("expected CreateStream");
1550 };
1551 assert!(
1552 query_sql.contains("RANGE FROM 0s TO 30s STEP 5s"),
1553 "got: {query_sql}"
1554 );
1555 }
1556
1557 #[test]
1558 fn create_mv_preserves_temporal_probe_join() {
1559 let sql = "CREATE MATERIALIZED VIEW mv AS \
1560 SELECT t.s FROM trades t TEMPORAL PROBE JOIN prices r \
1561 ON (s) TIMESTAMPS (ts, ts) LIST (0s, 5s) AS p";
1562 let StreamingStatement::CreateMaterializedView { query_sql, .. } = parse_one(sql) else {
1563 panic!("expected CreateMaterializedView");
1564 };
1565 assert!(
1566 query_sql.to_uppercase().contains("TEMPORAL PROBE JOIN"),
1567 "got: {query_sql}"
1568 );
1569 }
1570
1571 #[test]
1572 fn create_stream_emit_is_not_captured_in_query_sql() {
1573 let sql = "CREATE STREAM s AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE";
1574 let StreamingStatement::CreateStream {
1575 query_sql,
1576 emit_clause,
1577 ..
1578 } = parse_one(sql)
1579 else {
1580 panic!("expected CreateStream");
1581 };
1582 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1583 assert!(
1584 !query_sql.to_uppercase().contains("EMIT"),
1585 "got: {query_sql}"
1586 );
1587 assert!(query_sql.contains("FROM events"), "got: {query_sql}");
1588 }
1589
1590 #[test]
1591 fn create_stream_plain_select_query_sql_executes() {
1592 let sql = "CREATE STREAM counts AS SELECT COUNT(*) AS c FROM events";
1593 let StreamingStatement::CreateStream { query_sql, .. } = parse_one(sql) else {
1594 panic!("expected CreateStream");
1595 };
1596 assert!(query_sql.to_uppercase().contains("SELECT"));
1597 assert!(query_sql.contains("FROM events"));
1598 }
1599}