1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
8use std::time::Duration;
9
10use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
11
12use super::window_rewriter::WindowRewriter;
13use super::ParseError;
14
15#[derive(Debug, Clone, PartialEq)]
17pub enum ShowCommand {
18 Sources,
20 Sinks,
22 Queries,
24 MaterializedViews,
26 Streams,
28 Tables,
30 CheckpointStatus,
32 CreateSource {
34 name: ObjectName,
36 },
37 CreateSink {
39 name: ObjectName,
41 },
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub enum StreamingStatement {
47 Standard(Box<sqlparser::ast::Statement>),
49
50 CreateSource(Box<CreateSourceStatement>),
52
53 CreateSink(Box<CreateSinkStatement>),
55
56 CreateContinuousQuery {
58 name: ObjectName,
60 query: Box<StreamingStatement>,
62 emit_clause: Option<EmitClause>,
64 },
65
66 DropSource {
68 name: ObjectName,
70 if_exists: bool,
72 cascade: bool,
74 },
75
76 DropSink {
78 name: ObjectName,
80 if_exists: bool,
82 cascade: bool,
84 },
85
86 DropMaterializedView {
88 name: ObjectName,
90 if_exists: bool,
92 cascade: bool,
94 },
95
96 Show(ShowCommand),
98
99 Describe {
101 name: ObjectName,
103 extended: bool,
105 },
106
107 Explain {
109 statement: Box<StreamingStatement>,
111 analyze: bool,
113 },
114
115 CreateMaterializedView {
117 name: ObjectName,
119 query: Box<StreamingStatement>,
121 emit_clause: Option<EmitClause>,
123 or_replace: bool,
125 if_not_exists: bool,
127 query_sql: String,
129 },
130
131 CreateStream {
133 name: ObjectName,
135 query: Box<StreamingStatement>,
137 emit_clause: Option<EmitClause>,
139 or_replace: bool,
141 if_not_exists: bool,
143 query_sql: String,
145 retention_bytes: Option<u64>,
147 },
148
149 DropStream {
151 name: ObjectName,
153 if_exists: bool,
155 cascade: bool,
157 },
158
159 AlterSource {
161 name: ObjectName,
163 operation: AlterSourceOperation,
165 },
166
167 InsertInto {
169 table_name: ObjectName,
171 columns: Vec<Ident>,
173 values: Vec<Vec<Expr>>,
175 },
176
177 CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
179
180 DropLookupTable {
182 name: ObjectName,
184 if_exists: bool,
186 },
187
188 Checkpoint,
190
191 RestoreCheckpoint {
193 checkpoint_id: u64,
195 },
196
197 Subscribe(Box<SubscribeStatement>),
199
200 DeclareCursorForSubscribe {
206 name: Ident,
208 no_scroll: bool,
210 subscribe: Box<SubscribeStatement>,
212 },
213}
214
215#[derive(Debug, Clone, PartialEq)]
217pub struct SubscribeStatement {
218 pub name: ObjectName,
220 pub filter_sql: Option<String>,
222 pub as_of_epoch: Option<u64>,
224 pub options: HashMap<String, String>,
226}
227
228#[derive(Debug, Clone, PartialEq)]
230pub enum AlterSourceOperation {
231 AddColumn {
233 column_def: ColumnDef,
235 },
236 SetProperties {
238 properties: HashMap<String, String>,
240 },
241}
242
243#[derive(Debug, Clone, PartialEq)]
245pub struct FormatSpec {
246 pub format_type: String,
248 pub options: HashMap<String, String>,
250}
251
252#[derive(Debug, Clone, PartialEq)]
254pub struct CreateSourceStatement {
255 pub name: ObjectName,
257 pub columns: Vec<ColumnDef>,
259 pub watermark: Option<WatermarkDef>,
261 pub with_options: HashMap<String, String>,
263 pub or_replace: bool,
265 pub if_not_exists: bool,
267 pub connector_type: Option<String>,
269 pub connector_options: HashMap<String, String>,
271 pub format: Option<FormatSpec>,
273 pub has_wildcard: bool,
275 pub wildcard_prefix: Option<String>,
277}
278
279#[derive(Debug, Clone, PartialEq)]
281pub struct CreateSinkStatement {
282 pub name: ObjectName,
284 pub from: SinkFrom,
286 pub with_options: HashMap<String, String>,
288 pub or_replace: bool,
290 pub if_not_exists: bool,
292 pub filter: Option<Expr>,
294 pub connector_type: Option<String>,
296 pub connector_options: HashMap<String, String>,
298 pub format: Option<FormatSpec>,
300 pub output_options: HashMap<String, String>,
302}
303
304#[derive(Debug, Clone, PartialEq)]
306pub enum SinkFrom {
307 Table(ObjectName),
309 Query(Box<StreamingStatement>),
311}
312
313#[derive(Debug, Clone, PartialEq)]
315pub struct WatermarkDef {
316 pub column: Ident,
318 pub expression: Option<Expr>,
322}
323
324#[derive(Debug, Clone, PartialEq, Default)]
330pub struct LateDataClause {
331 pub allowed_lateness: Option<Box<Expr>>,
333 pub side_output: Option<String>,
335}
336
337impl LateDataClause {
338 #[must_use]
340 pub fn with_allowed_lateness(lateness: Expr) -> Self {
341 Self {
342 allowed_lateness: Some(Box::new(lateness)),
343 side_output: None,
344 }
345 }
346
347 #[must_use]
349 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
350 Self {
351 allowed_lateness: Some(Box::new(lateness)),
352 side_output: Some(side_output),
353 }
354 }
355
356 #[must_use]
358 pub fn side_output_only(side_output: String) -> Self {
359 Self {
360 allowed_lateness: None,
361 side_output: Some(side_output),
362 }
363 }
364
365 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
371 match &self.allowed_lateness {
372 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
373 None => Ok(Duration::ZERO),
374 }
375 }
376
377 #[must_use]
379 pub fn has_side_output(&self) -> bool {
380 self.side_output.is_some()
381 }
382
383 #[must_use]
385 pub fn get_side_output(&self) -> Option<&str> {
386 self.side_output.as_deref()
387 }
388}
389
390#[derive(Debug, Clone, PartialEq)]
394pub enum EmitStrategy {
395 OnWatermark,
397 OnWindowClose,
399 Periodic(Duration),
401 OnUpdate,
403 Changelog,
405 FinalOnly,
407}
408
409#[derive(Debug, Clone, PartialEq)]
414pub enum EmitClause {
415 AfterWatermark,
421
422 OnWindowClose,
428
429 Periodically {
434 interval: Box<Expr>,
436 },
437
438 OnUpdate,
443
444 Changes,
458
459 Final,
465}
466
467impl std::fmt::Display for EmitClause {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 match self {
470 EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
471 EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
472 EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
473 EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
474 EmitClause::Changes => write!(f, "EMIT CHANGES"),
475 EmitClause::Final => write!(f, "EMIT FINAL"),
476 }
477 }
478}
479
480impl EmitClause {
481 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
487 match self {
488 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
489 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
490 EmitClause::Periodically { interval } => {
491 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
492 Ok(EmitStrategy::Periodic(duration))
493 }
494 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
495 EmitClause::Changes => Ok(EmitStrategy::Changelog),
496 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
497 }
498 }
499
500 #[must_use]
502 pub fn requires_changelog(&self) -> bool {
503 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
504 }
505
506 #[must_use]
508 pub fn is_append_only(&self) -> bool {
509 matches!(
510 self,
511 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
512 )
513 }
514
515 #[must_use]
521 pub fn requires_watermark(&self) -> bool {
522 matches!(
523 self,
524 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
525 )
526 }
527}
528
529#[derive(Debug, Clone, PartialEq)]
531pub enum WindowFunction {
532 Tumble {
534 time_column: Box<Expr>,
536 interval: Box<Expr>,
538 offset: Option<Box<Expr>>,
540 },
541 Hop {
543 time_column: Box<Expr>,
545 slide_interval: Box<Expr>,
547 window_interval: Box<Expr>,
549 offset: Option<Box<Expr>>,
551 },
552 Session {
554 time_column: Box<Expr>,
556 gap_interval: Box<Expr>,
558 },
559 Cumulate {
561 time_column: Box<Expr>,
563 step_interval: Box<Expr>,
565 max_size_interval: Box<Expr>,
567 },
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
574
575 #[test]
576 fn test_create_source_statement() {
577 let stmt = CreateSourceStatement {
578 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
579 columns: vec![
580 ColumnDef {
581 name: Ident::new("id"),
582 data_type: DataType::BigInt(None),
583 options: vec![],
584 },
585 ColumnDef {
586 name: Ident::new("timestamp"),
587 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
588 options: vec![],
589 },
590 ],
591 watermark: Some(WatermarkDef {
592 column: Ident::new("timestamp"),
593 expression: Some(Expr::Identifier(Ident::new("timestamp"))),
594 }),
595 with_options: HashMap::from([
596 ("connector".to_string(), "kafka".to_string()),
597 ("topic".to_string(), "events".to_string()),
598 ]),
599 or_replace: false,
600 if_not_exists: true,
601 connector_type: None,
602 connector_options: HashMap::new(),
603 format: None,
604 has_wildcard: false,
605 wildcard_prefix: None,
606 };
607
608 assert_eq!(stmt.columns.len(), 2);
610 assert!(stmt.watermark.is_some());
611 assert_eq!(
612 stmt.with_options.get("connector"),
613 Some(&"kafka".to_string())
614 );
615 }
616
617 #[test]
618 fn test_emit_clause_variants() {
619 let emit1 = EmitClause::AfterWatermark;
620 let emit2 = EmitClause::OnWindowClose;
621 let emit3 = EmitClause::Periodically {
622 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
623 };
624 let emit4 = EmitClause::OnUpdate;
625
626 match emit1 {
627 EmitClause::AfterWatermark => (),
628 _ => panic!("Expected AfterWatermark"),
629 }
630
631 match emit2 {
632 EmitClause::OnWindowClose => (),
633 _ => panic!("Expected OnWindowClose"),
634 }
635
636 match emit3 {
637 EmitClause::Periodically { .. } => (),
638 _ => panic!("Expected Periodically"),
639 }
640
641 match emit4 {
642 EmitClause::OnUpdate => (),
643 _ => panic!("Expected OnUpdate"),
644 }
645 }
646
647 #[test]
648 fn test_window_functions() {
649 let tumble = WindowFunction::Tumble {
650 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
651 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
652 offset: None,
653 };
654
655 let hop = WindowFunction::Hop {
656 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
657 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
658 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
659 offset: None,
660 };
661
662 match tumble {
663 WindowFunction::Tumble { .. } => (),
664 _ => panic!("Expected Tumble"),
665 }
666
667 match hop {
668 WindowFunction::Hop { .. } => (),
669 _ => panic!("Expected Hop"),
670 }
671 }
672
673 #[test]
674 fn test_late_data_clause_default() {
675 let clause = LateDataClause::default();
676 assert!(clause.allowed_lateness.is_none());
677 assert!(clause.side_output.is_none());
678 }
679
680 #[test]
681 fn test_late_data_clause_with_allowed_lateness() {
682 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
683 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
684 assert!(clause.allowed_lateness.is_some());
685 assert!(clause.side_output.is_none());
686 }
687
688 #[test]
689 fn test_late_data_clause_with_side_output() {
690 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
691 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
692 assert!(clause.allowed_lateness.is_some());
693 assert_eq!(clause.side_output, Some("late_events".to_string()));
694 }
695
696 #[test]
697 fn test_late_data_clause_side_output_only() {
698 let clause = LateDataClause::side_output_only("late_events".to_string());
699 assert!(clause.allowed_lateness.is_none());
700 assert_eq!(clause.side_output, Some("late_events".to_string()));
701 }
702
703 #[test]
704 fn test_show_command_variants() {
705 let sources = ShowCommand::Sources;
706 let sinks = ShowCommand::Sinks;
707 let queries = ShowCommand::Queries;
708 let mvs = ShowCommand::MaterializedViews;
709
710 assert_eq!(sources, ShowCommand::Sources);
711 assert_eq!(sinks, ShowCommand::Sinks);
712 assert_eq!(queries, ShowCommand::Queries);
713 assert_eq!(mvs, ShowCommand::MaterializedViews);
714 }
715
716 #[test]
717 fn test_show_command_clone() {
718 let cmd = ShowCommand::Sources;
719 let cloned = cmd.clone();
720 assert_eq!(cmd, cloned);
721 }
722
723 #[test]
724 fn test_drop_source_statement() {
725 let stmt = StreamingStatement::DropSource {
726 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
727 if_exists: true,
728 cascade: false,
729 };
730 match stmt {
731 StreamingStatement::DropSource {
732 name,
733 if_exists,
734 cascade,
735 } => {
736 assert_eq!(name.to_string(), "events");
737 assert!(if_exists);
738 assert!(!cascade);
739 }
740 _ => panic!("Expected DropSource"),
741 }
742 }
743
744 #[test]
745 fn test_drop_sink_statement() {
746 let stmt = StreamingStatement::DropSink {
747 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
748 if_exists: false,
749 cascade: false,
750 };
751 match stmt {
752 StreamingStatement::DropSink {
753 name,
754 if_exists,
755 cascade,
756 } => {
757 assert_eq!(name.to_string(), "output");
758 assert!(!if_exists);
759 assert!(!cascade);
760 }
761 _ => panic!("Expected DropSink"),
762 }
763 }
764
765 #[test]
766 fn test_drop_materialized_view_statement() {
767 let stmt = StreamingStatement::DropMaterializedView {
768 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
769 if_exists: true,
770 cascade: true,
771 };
772 match stmt {
773 StreamingStatement::DropMaterializedView {
774 name,
775 if_exists,
776 cascade,
777 } => {
778 assert_eq!(name.to_string(), "live_stats");
779 assert!(if_exists);
780 assert!(cascade);
781 }
782 _ => panic!("Expected DropMaterializedView"),
783 }
784 }
785
786 #[test]
787 fn test_show_statement() {
788 let stmt = StreamingStatement::Show(ShowCommand::Sources);
789 match stmt {
790 StreamingStatement::Show(ShowCommand::Sources) => (),
791 _ => panic!("Expected Show(Sources)"),
792 }
793 }
794
795 #[test]
796 fn test_describe_statement() {
797 let stmt = StreamingStatement::Describe {
798 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
799 extended: true,
800 };
801 match stmt {
802 StreamingStatement::Describe { name, extended } => {
803 assert_eq!(name.to_string(), "events");
804 assert!(extended);
805 }
806 _ => panic!("Expected Describe"),
807 }
808 }
809
810 #[test]
811 fn test_explain_statement() {
812 let dialect = sqlparser::dialect::GenericDialect {};
814 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
815 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
816
817 let stmt = StreamingStatement::Explain {
818 statement: Box::new(inner),
819 analyze: false,
820 };
821 match stmt {
822 StreamingStatement::Explain { statement, .. } => {
823 assert!(matches!(*statement, StreamingStatement::Standard(_)));
824 }
825 _ => panic!("Expected Explain"),
826 }
827 }
828
829 #[test]
830 fn test_create_materialized_view_statement() {
831 let dialect = sqlparser::dialect::GenericDialect {};
833 let stmts =
834 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
835 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
836
837 let stmt = StreamingStatement::CreateMaterializedView {
838 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
839 query: Box::new(query),
840 emit_clause: Some(EmitClause::OnWindowClose),
841 or_replace: false,
842 if_not_exists: true,
843 query_sql: "SELECT COUNT(*) FROM events".to_string(),
844 };
845 match stmt {
846 StreamingStatement::CreateMaterializedView {
847 name,
848 emit_clause,
849 or_replace,
850 if_not_exists,
851 ..
852 } => {
853 assert_eq!(name.to_string(), "live_stats");
854 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
855 assert!(!or_replace);
856 assert!(if_not_exists);
857 }
858 _ => panic!("Expected CreateMaterializedView"),
859 }
860 }
861
862 #[test]
863 fn test_insert_into_statement() {
864 let stmt = StreamingStatement::InsertInto {
865 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
866 columns: vec![Ident::new("id"), Ident::new("name")],
867 values: vec![vec![
868 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
869 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
870 ]],
871 };
872 match stmt {
873 StreamingStatement::InsertInto {
874 table_name,
875 columns,
876 values,
877 } => {
878 assert_eq!(table_name.to_string(), "events");
879 assert_eq!(columns.len(), 2);
880 assert_eq!(values.len(), 1);
881 assert_eq!(values[0].len(), 2);
882 }
883 _ => panic!("Expected InsertInto"),
884 }
885 }
886
887 #[test]
888 fn test_eowc_requires_watermark_helper() {
889 assert!(EmitClause::OnWindowClose.requires_watermark());
891 assert!(EmitClause::Final.requires_watermark());
892 assert!(EmitClause::AfterWatermark.requires_watermark());
893
894 assert!(!EmitClause::OnUpdate.requires_watermark());
896 assert!(!EmitClause::Changes.requires_watermark());
897 let periodic = EmitClause::Periodically {
898 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
899 };
900 assert!(!periodic.requires_watermark());
901 }
902}