1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
8use std::time::Duration;
9
10use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
11
12use super::window_rewriter::WindowRewriter;
13use super::ParseError;
14
15#[derive(Debug, Clone, PartialEq)]
17pub enum ShowCommand {
18 Sources,
20 Sinks,
22 Queries,
24 MaterializedViews,
26 Streams,
28 Tables,
30 CheckpointStatus,
32 CreateSource {
34 name: ObjectName,
36 },
37 CreateSink {
39 name: ObjectName,
41 },
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub enum StreamingStatement {
47 Standard(Box<sqlparser::ast::Statement>),
49
50 CreateSource(Box<CreateSourceStatement>),
52
53 CreateSink(Box<CreateSinkStatement>),
55
56 CreateContinuousQuery {
58 name: ObjectName,
60 query: Box<StreamingStatement>,
62 emit_clause: Option<EmitClause>,
64 },
65
66 DropSource {
68 name: ObjectName,
70 if_exists: bool,
72 cascade: bool,
74 },
75
76 DropSink {
78 name: ObjectName,
80 if_exists: bool,
82 cascade: bool,
84 },
85
86 DropMaterializedView {
88 name: ObjectName,
90 if_exists: bool,
92 cascade: bool,
94 },
95
96 Show(ShowCommand),
98
99 Describe {
101 name: ObjectName,
103 extended: bool,
105 },
106
107 Explain {
109 statement: Box<StreamingStatement>,
111 analyze: bool,
113 },
114
115 CreateMaterializedView {
117 name: ObjectName,
119 query: Box<StreamingStatement>,
121 emit_clause: Option<EmitClause>,
123 or_replace: bool,
125 if_not_exists: bool,
127 query_sql: String,
129 },
130
131 CreateStream {
133 name: ObjectName,
135 query: Box<StreamingStatement>,
137 emit_clause: Option<EmitClause>,
139 or_replace: bool,
141 if_not_exists: bool,
143 query_sql: String,
145 },
146
147 DropStream {
149 name: ObjectName,
151 if_exists: bool,
153 cascade: bool,
155 },
156
157 AlterSource {
159 name: ObjectName,
161 operation: AlterSourceOperation,
163 },
164
165 InsertInto {
167 table_name: ObjectName,
169 columns: Vec<Ident>,
171 values: Vec<Vec<Expr>>,
173 },
174
175 CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
177
178 DropLookupTable {
180 name: ObjectName,
182 if_exists: bool,
184 },
185
186 Checkpoint,
188
189 RestoreCheckpoint {
191 checkpoint_id: u64,
193 },
194}
195
196#[derive(Debug, Clone, PartialEq)]
198pub enum AlterSourceOperation {
199 AddColumn {
201 column_def: ColumnDef,
203 },
204 SetProperties {
206 properties: HashMap<String, String>,
208 },
209}
210
211#[derive(Debug, Clone, PartialEq)]
213pub struct FormatSpec {
214 pub format_type: String,
216 pub options: HashMap<String, String>,
218}
219
220#[derive(Debug, Clone, PartialEq)]
222pub struct CreateSourceStatement {
223 pub name: ObjectName,
225 pub columns: Vec<ColumnDef>,
227 pub watermark: Option<WatermarkDef>,
229 pub with_options: HashMap<String, String>,
231 pub or_replace: bool,
233 pub if_not_exists: bool,
235 pub connector_type: Option<String>,
237 pub connector_options: HashMap<String, String>,
239 pub format: Option<FormatSpec>,
241 pub has_wildcard: bool,
243 pub wildcard_prefix: Option<String>,
245}
246
247#[derive(Debug, Clone, PartialEq)]
249pub struct CreateSinkStatement {
250 pub name: ObjectName,
252 pub from: SinkFrom,
254 pub with_options: HashMap<String, String>,
256 pub or_replace: bool,
258 pub if_not_exists: bool,
260 pub filter: Option<Expr>,
262 pub connector_type: Option<String>,
264 pub connector_options: HashMap<String, String>,
266 pub format: Option<FormatSpec>,
268 pub output_options: HashMap<String, String>,
270}
271
272#[derive(Debug, Clone, PartialEq)]
274pub enum SinkFrom {
275 Table(ObjectName),
277 Query(Box<StreamingStatement>),
279}
280
281#[derive(Debug, Clone, PartialEq)]
283pub struct WatermarkDef {
284 pub column: Ident,
286 pub expression: Option<Expr>,
290}
291
292#[derive(Debug, Clone, PartialEq, Default)]
298pub struct LateDataClause {
299 pub allowed_lateness: Option<Box<Expr>>,
301 pub side_output: Option<String>,
303}
304
305impl LateDataClause {
306 #[must_use]
308 pub fn with_allowed_lateness(lateness: Expr) -> Self {
309 Self {
310 allowed_lateness: Some(Box::new(lateness)),
311 side_output: None,
312 }
313 }
314
315 #[must_use]
317 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
318 Self {
319 allowed_lateness: Some(Box::new(lateness)),
320 side_output: Some(side_output),
321 }
322 }
323
324 #[must_use]
326 pub fn side_output_only(side_output: String) -> Self {
327 Self {
328 allowed_lateness: None,
329 side_output: Some(side_output),
330 }
331 }
332
333 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
339 match &self.allowed_lateness {
340 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
341 None => Ok(Duration::ZERO),
342 }
343 }
344
345 #[must_use]
347 pub fn has_side_output(&self) -> bool {
348 self.side_output.is_some()
349 }
350
351 #[must_use]
353 pub fn get_side_output(&self) -> Option<&str> {
354 self.side_output.as_deref()
355 }
356}
357
358#[derive(Debug, Clone, PartialEq)]
362pub enum EmitStrategy {
363 OnWatermark,
365 OnWindowClose,
367 Periodic(Duration),
369 OnUpdate,
371 Changelog,
373 FinalOnly,
375}
376
377#[derive(Debug, Clone, PartialEq)]
382pub enum EmitClause {
383 AfterWatermark,
389
390 OnWindowClose,
396
397 Periodically {
402 interval: Box<Expr>,
404 },
405
406 OnUpdate,
411
412 Changes,
426
427 Final,
433}
434
435impl std::fmt::Display for EmitClause {
436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437 match self {
438 EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
439 EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
440 EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
441 EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
442 EmitClause::Changes => write!(f, "EMIT CHANGES"),
443 EmitClause::Final => write!(f, "EMIT FINAL"),
444 }
445 }
446}
447
448impl EmitClause {
449 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
455 match self {
456 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
457 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
458 EmitClause::Periodically { interval } => {
459 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
460 Ok(EmitStrategy::Periodic(duration))
461 }
462 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
463 EmitClause::Changes => Ok(EmitStrategy::Changelog),
464 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
465 }
466 }
467
468 #[must_use]
470 pub fn requires_changelog(&self) -> bool {
471 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
472 }
473
474 #[must_use]
476 pub fn is_append_only(&self) -> bool {
477 matches!(
478 self,
479 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
480 )
481 }
482
483 #[must_use]
489 pub fn requires_watermark(&self) -> bool {
490 matches!(
491 self,
492 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
493 )
494 }
495}
496
497#[derive(Debug, Clone, PartialEq)]
499pub enum WindowFunction {
500 Tumble {
502 time_column: Box<Expr>,
504 interval: Box<Expr>,
506 offset: Option<Box<Expr>>,
508 },
509 Hop {
511 time_column: Box<Expr>,
513 slide_interval: Box<Expr>,
515 window_interval: Box<Expr>,
517 offset: Option<Box<Expr>>,
519 },
520 Session {
522 time_column: Box<Expr>,
524 gap_interval: Box<Expr>,
526 },
527 Cumulate {
529 time_column: Box<Expr>,
531 step_interval: Box<Expr>,
533 max_size_interval: Box<Expr>,
535 },
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
542
543 #[test]
544 fn test_create_source_statement() {
545 let stmt = CreateSourceStatement {
546 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
547 columns: vec![
548 ColumnDef {
549 name: Ident::new("id"),
550 data_type: DataType::BigInt(None),
551 options: vec![],
552 },
553 ColumnDef {
554 name: Ident::new("timestamp"),
555 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
556 options: vec![],
557 },
558 ],
559 watermark: Some(WatermarkDef {
560 column: Ident::new("timestamp"),
561 expression: Some(Expr::Identifier(Ident::new("timestamp"))),
562 }),
563 with_options: HashMap::from([
564 ("connector".to_string(), "kafka".to_string()),
565 ("topic".to_string(), "events".to_string()),
566 ]),
567 or_replace: false,
568 if_not_exists: true,
569 connector_type: None,
570 connector_options: HashMap::new(),
571 format: None,
572 has_wildcard: false,
573 wildcard_prefix: None,
574 };
575
576 assert_eq!(stmt.columns.len(), 2);
578 assert!(stmt.watermark.is_some());
579 assert_eq!(
580 stmt.with_options.get("connector"),
581 Some(&"kafka".to_string())
582 );
583 }
584
585 #[test]
586 fn test_emit_clause_variants() {
587 let emit1 = EmitClause::AfterWatermark;
588 let emit2 = EmitClause::OnWindowClose;
589 let emit3 = EmitClause::Periodically {
590 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
591 };
592 let emit4 = EmitClause::OnUpdate;
593
594 match emit1 {
595 EmitClause::AfterWatermark => (),
596 _ => panic!("Expected AfterWatermark"),
597 }
598
599 match emit2 {
600 EmitClause::OnWindowClose => (),
601 _ => panic!("Expected OnWindowClose"),
602 }
603
604 match emit3 {
605 EmitClause::Periodically { .. } => (),
606 _ => panic!("Expected Periodically"),
607 }
608
609 match emit4 {
610 EmitClause::OnUpdate => (),
611 _ => panic!("Expected OnUpdate"),
612 }
613 }
614
615 #[test]
616 fn test_window_functions() {
617 let tumble = WindowFunction::Tumble {
618 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
619 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
620 offset: None,
621 };
622
623 let hop = WindowFunction::Hop {
624 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
625 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
626 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
627 offset: None,
628 };
629
630 match tumble {
631 WindowFunction::Tumble { .. } => (),
632 _ => panic!("Expected Tumble"),
633 }
634
635 match hop {
636 WindowFunction::Hop { .. } => (),
637 _ => panic!("Expected Hop"),
638 }
639 }
640
641 #[test]
642 fn test_late_data_clause_default() {
643 let clause = LateDataClause::default();
644 assert!(clause.allowed_lateness.is_none());
645 assert!(clause.side_output.is_none());
646 }
647
648 #[test]
649 fn test_late_data_clause_with_allowed_lateness() {
650 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
651 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
652 assert!(clause.allowed_lateness.is_some());
653 assert!(clause.side_output.is_none());
654 }
655
656 #[test]
657 fn test_late_data_clause_with_side_output() {
658 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
659 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
660 assert!(clause.allowed_lateness.is_some());
661 assert_eq!(clause.side_output, Some("late_events".to_string()));
662 }
663
664 #[test]
665 fn test_late_data_clause_side_output_only() {
666 let clause = LateDataClause::side_output_only("late_events".to_string());
667 assert!(clause.allowed_lateness.is_none());
668 assert_eq!(clause.side_output, Some("late_events".to_string()));
669 }
670
671 #[test]
672 fn test_show_command_variants() {
673 let sources = ShowCommand::Sources;
674 let sinks = ShowCommand::Sinks;
675 let queries = ShowCommand::Queries;
676 let mvs = ShowCommand::MaterializedViews;
677
678 assert_eq!(sources, ShowCommand::Sources);
679 assert_eq!(sinks, ShowCommand::Sinks);
680 assert_eq!(queries, ShowCommand::Queries);
681 assert_eq!(mvs, ShowCommand::MaterializedViews);
682 }
683
684 #[test]
685 fn test_show_command_clone() {
686 let cmd = ShowCommand::Sources;
687 let cloned = cmd.clone();
688 assert_eq!(cmd, cloned);
689 }
690
691 #[test]
692 fn test_drop_source_statement() {
693 let stmt = StreamingStatement::DropSource {
694 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
695 if_exists: true,
696 cascade: false,
697 };
698 match stmt {
699 StreamingStatement::DropSource {
700 name,
701 if_exists,
702 cascade,
703 } => {
704 assert_eq!(name.to_string(), "events");
705 assert!(if_exists);
706 assert!(!cascade);
707 }
708 _ => panic!("Expected DropSource"),
709 }
710 }
711
712 #[test]
713 fn test_drop_sink_statement() {
714 let stmt = StreamingStatement::DropSink {
715 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
716 if_exists: false,
717 cascade: false,
718 };
719 match stmt {
720 StreamingStatement::DropSink {
721 name,
722 if_exists,
723 cascade,
724 } => {
725 assert_eq!(name.to_string(), "output");
726 assert!(!if_exists);
727 assert!(!cascade);
728 }
729 _ => panic!("Expected DropSink"),
730 }
731 }
732
733 #[test]
734 fn test_drop_materialized_view_statement() {
735 let stmt = StreamingStatement::DropMaterializedView {
736 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
737 if_exists: true,
738 cascade: true,
739 };
740 match stmt {
741 StreamingStatement::DropMaterializedView {
742 name,
743 if_exists,
744 cascade,
745 } => {
746 assert_eq!(name.to_string(), "live_stats");
747 assert!(if_exists);
748 assert!(cascade);
749 }
750 _ => panic!("Expected DropMaterializedView"),
751 }
752 }
753
754 #[test]
755 fn test_show_statement() {
756 let stmt = StreamingStatement::Show(ShowCommand::Sources);
757 match stmt {
758 StreamingStatement::Show(ShowCommand::Sources) => (),
759 _ => panic!("Expected Show(Sources)"),
760 }
761 }
762
763 #[test]
764 fn test_describe_statement() {
765 let stmt = StreamingStatement::Describe {
766 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
767 extended: true,
768 };
769 match stmt {
770 StreamingStatement::Describe { name, extended } => {
771 assert_eq!(name.to_string(), "events");
772 assert!(extended);
773 }
774 _ => panic!("Expected Describe"),
775 }
776 }
777
778 #[test]
779 fn test_explain_statement() {
780 let dialect = sqlparser::dialect::GenericDialect {};
782 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
783 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
784
785 let stmt = StreamingStatement::Explain {
786 statement: Box::new(inner),
787 analyze: false,
788 };
789 match stmt {
790 StreamingStatement::Explain { statement, .. } => {
791 assert!(matches!(*statement, StreamingStatement::Standard(_)));
792 }
793 _ => panic!("Expected Explain"),
794 }
795 }
796
797 #[test]
798 fn test_create_materialized_view_statement() {
799 let dialect = sqlparser::dialect::GenericDialect {};
801 let stmts =
802 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
803 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
804
805 let stmt = StreamingStatement::CreateMaterializedView {
806 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
807 query: Box::new(query),
808 emit_clause: Some(EmitClause::OnWindowClose),
809 or_replace: false,
810 if_not_exists: true,
811 query_sql: "SELECT COUNT(*) FROM events".to_string(),
812 };
813 match stmt {
814 StreamingStatement::CreateMaterializedView {
815 name,
816 emit_clause,
817 or_replace,
818 if_not_exists,
819 ..
820 } => {
821 assert_eq!(name.to_string(), "live_stats");
822 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
823 assert!(!or_replace);
824 assert!(if_not_exists);
825 }
826 _ => panic!("Expected CreateMaterializedView"),
827 }
828 }
829
830 #[test]
831 fn test_insert_into_statement() {
832 let stmt = StreamingStatement::InsertInto {
833 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
834 columns: vec![Ident::new("id"), Ident::new("name")],
835 values: vec![vec![
836 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
837 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
838 ]],
839 };
840 match stmt {
841 StreamingStatement::InsertInto {
842 table_name,
843 columns,
844 values,
845 } => {
846 assert_eq!(table_name.to_string(), "events");
847 assert_eq!(columns.len(), 2);
848 assert_eq!(values.len(), 1);
849 assert_eq!(values[0].len(), 2);
850 }
851 _ => panic!("Expected InsertInto"),
852 }
853 }
854
855 #[test]
856 fn test_eowc_requires_watermark_helper() {
857 assert!(EmitClause::OnWindowClose.requires_watermark());
859 assert!(EmitClause::Final.requires_watermark());
860 assert!(EmitClause::AfterWatermark.requires_watermark());
861
862 assert!(!EmitClause::OnUpdate.requires_watermark());
864 assert!(!EmitClause::Changes.requires_watermark());
865 let periodic = EmitClause::Periodically {
866 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
867 };
868 assert!(!periodic.requires_watermark());
869 }
870}