1pub mod channel_derivation;
8pub mod lookup_join;
10pub mod predicate_split;
12pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26 analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::analyze_joins;
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32 CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33 StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36 AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
37 OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
38};
39
40#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43 pub name: String,
45 pub columns: Vec<(String, String)>,
47 pub primary_key: Vec<String>,
49 pub properties: LookupTableProperties,
51 pub arrow_schema: SchemaRef,
53 pub raw_options: HashMap<String, String>,
55}
56
57pub struct StreamingPlanner {
59 sources: HashMap<String, SourceInfo>,
61 sinks: HashMap<String, SinkInfo>,
63 lookup_tables: HashMap<String, LookupTableInfo>,
65}
66
67#[derive(Debug, Clone)]
69pub struct SourceInfo {
70 pub name: String,
72 pub watermark_column: Option<String>,
74 pub options: HashMap<String, String>,
76}
77
78#[derive(Debug, Clone)]
80pub struct SinkInfo {
81 pub name: String,
83 pub from: String,
85 pub options: HashMap<String, String>,
87}
88
89#[derive(Debug)]
91#[allow(clippy::large_enum_variant)]
92pub enum StreamingPlan {
93 RegisterSource(SourceInfo),
95
96 RegisterSink(SinkInfo),
98
99 Query(QueryPlan),
101
102 Standard(Box<Statement>),
104
105 DagExplain(DagExplainOutput),
107
108 RegisterLookupTable(LookupTableInfo),
110
111 DropLookupTable {
113 name: String,
115 },
116}
117
118#[derive(Debug)]
120pub struct QueryPlan {
121 pub name: Option<String>,
123 pub window_config: Option<WindowOperatorConfig>,
125 pub join_config: Option<Vec<JoinOperatorConfig>>,
127 pub order_config: Option<OrderOperatorConfig>,
129 pub analytic_config: Option<AnalyticWindowConfig>,
131 pub having_config: Option<HavingFilterConfig>,
133 pub frame_config: Option<WindowFrameConfig>,
135 pub emit_clause: Option<EmitClause>,
137 pub statement: Box<Statement>,
139}
140
141impl StreamingPlanner {
142 #[must_use]
144 pub fn new() -> Self {
145 Self {
146 sources: HashMap::new(),
147 sinks: HashMap::new(),
148 lookup_tables: HashMap::new(),
149 }
150 }
151
152 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
158 match statement {
159 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
160 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
161 StreamingStatement::CreateContinuousQuery {
162 name,
163 query,
164 emit_clause,
165 }
166 | StreamingStatement::CreateStream {
167 name,
168 query,
169 emit_clause,
170 ..
171 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
172 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
173 StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
174 StreamingStatement::DropLookupTable { name, if_exists } => {
175 self.plan_drop_lookup_table(name, *if_exists)
176 }
177 StreamingStatement::DropSource { .. }
178 | StreamingStatement::DropSink { .. }
179 | StreamingStatement::DropStream { .. }
180 | StreamingStatement::DropMaterializedView { .. }
181 | StreamingStatement::Show(_)
182 | StreamingStatement::Describe { .. }
183 | StreamingStatement::Explain { .. }
184 | StreamingStatement::CreateMaterializedView { .. }
185 | StreamingStatement::InsertInto { .. }
186 | StreamingStatement::AlterSource { .. }
187 | StreamingStatement::Checkpoint
188 | StreamingStatement::RestoreCheckpoint { .. } => {
189 Err(PlanningError::UnsupportedSql(format!(
192 "Statement type {:?} is handled by the database layer, not the planner",
193 std::mem::discriminant(statement)
194 )))
195 }
196 }
197 }
198
199 fn plan_create_source(
201 &mut self,
202 source: &CreateSourceStatement,
203 ) -> Result<StreamingPlan, PlanningError> {
204 let name = object_name_to_string(&source.name);
205
206 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
208 return Err(PlanningError::InvalidQuery(format!(
209 "Source '{}' already exists",
210 name
211 )));
212 }
213
214 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
216
217 let info = SourceInfo {
218 name: name.clone(),
219 watermark_column,
220 options: source.with_options.clone(),
221 };
222
223 self.sources.insert(name, info.clone());
225
226 Ok(StreamingPlan::RegisterSource(info))
227 }
228
229 fn plan_create_sink(
231 &mut self,
232 sink: &CreateSinkStatement,
233 ) -> Result<StreamingPlan, PlanningError> {
234 let name = object_name_to_string(&sink.name);
235
236 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
238 return Err(PlanningError::InvalidQuery(format!(
239 "Sink '{}' already exists",
240 name
241 )));
242 }
243
244 let from = match &sink.from {
246 SinkFrom::Table(table) => object_name_to_string(table),
247 SinkFrom::Query(_) => format!("{}_query", name),
248 };
249
250 let info = SinkInfo {
251 name: name.clone(),
252 from,
253 options: sink.with_options.clone(),
254 };
255
256 self.sinks.insert(name, info.clone());
258
259 Ok(StreamingPlan::RegisterSink(info))
260 }
261
262 #[allow(clippy::unused_self)] fn plan_continuous_query(
265 &mut self,
266 name: &ObjectName,
267 query: &StreamingStatement,
268 emit_clause: Option<&EmitClause>,
269 ) -> Result<StreamingPlan, PlanningError> {
270 let stmt = match query {
272 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
273 _ => {
274 return Err(PlanningError::InvalidQuery(
275 "Continuous query must contain a SELECT statement".to_string(),
276 ))
277 }
278 };
279
280 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
282
283 Ok(StreamingPlan::Query(QueryPlan {
284 name: Some(object_name_to_string(name)),
285 window_config: query_plan.window_config,
286 join_config: query_plan.join_config,
287 order_config: query_plan.order_config,
288 analytic_config: query_plan.analytic_config,
289 having_config: query_plan.having_config,
290 frame_config: query_plan.frame_config,
291 emit_clause: emit_clause.cloned(),
292 statement: Box::new(stmt),
293 }))
294 }
295
296 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
299 if let Statement::Query(query) = stmt {
301 if let SetExpr::Select(select) = query.body.as_ref() {
302 let window_function = Self::extract_window_from_select(select);
304
305 let join_analysis = analyze_joins(select).map_err(|e| {
307 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
308 })?;
309
310 let order_analysis = analyze_order_by(stmt);
312 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
313 .map_err(PlanningError::InvalidQuery)?;
314
315 let analytic_analysis = analyze_analytic_functions(stmt);
317 let analytic_config =
318 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
319
320 let agg_analysis = analyze_aggregates(stmt);
322 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
323
324 let frame_analysis = analyze_window_frames(stmt);
326 let frame_config = frame_analysis
327 .as_ref()
328 .map(WindowFrameConfig::from_analysis);
329
330 if let Some(fa) = &frame_analysis {
332 for f in &fa.functions {
333 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
334 return Err(PlanningError::InvalidQuery(
335 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
336 .to_string(),
337 ));
338 }
339 }
340 }
341
342 let has_streaming_features = window_function.is_some()
343 || join_analysis.is_some()
344 || order_config.is_some()
345 || analytic_config.is_some()
346 || having_config.is_some()
347 || frame_config.is_some();
348
349 if has_streaming_features {
350 let window_config = match window_function {
351 Some(w) => Some(
352 WindowOperatorConfig::from_window_function(&w)
353 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
354 ),
355 None => None,
356 };
357
358 let join_config =
359 join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
360
361 return Ok(StreamingPlan::Query(QueryPlan {
362 name: None,
363 window_config,
364 join_config,
365 order_config,
366 analytic_config,
367 having_config,
368 frame_config,
369 emit_clause: None,
370 statement: Box::new(stmt.clone()),
371 }));
372 }
373 }
374 }
375
376 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
378 }
379
380 fn analyze_query(
382 stmt: &Statement,
383 emit_clause: Option<&EmitClause>,
384 ) -> Result<QueryAnalysis, PlanningError> {
385 let mut analysis = QueryAnalysis::default();
386
387 if let Statement::Query(query) = stmt {
388 if let SetExpr::Select(select) = query.body.as_ref() {
389 if let Some(window) = Self::extract_window_from_select(select) {
391 let mut config = WindowOperatorConfig::from_window_function(&window)
392 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
393
394 if let Some(emit) = emit_clause {
396 config = config
397 .with_emit_clause(emit)
398 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
399 }
400
401 analysis.window_config = Some(config);
402 }
403
404 if let Some(multi) = analyze_joins(select).map_err(|e| {
406 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
407 })? {
408 analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
409 }
410 }
411 }
412
413 let order_analysis = analyze_order_by(stmt);
415 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
416 .map_err(PlanningError::InvalidQuery)?;
417
418 if let Some(analytic) = analyze_analytic_functions(stmt) {
420 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
421 }
422
423 let agg_analysis = analyze_aggregates(stmt);
425 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
426
427 if let Some(frame_analysis) = analyze_window_frames(stmt) {
429 for f in &frame_analysis.functions {
431 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
432 return Err(PlanningError::InvalidQuery(
433 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
434 .to_string(),
435 ));
436 }
437 }
438 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
439 }
440
441 Ok(analysis)
442 }
443
444 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
446 use sqlparser::ast::GroupByExpr;
448 match &select.group_by {
449 GroupByExpr::Expressions(exprs, _modifiers) => {
450 for group_by_expr in exprs {
451 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
452 {
453 return Some(window);
454 }
455 }
456 }
457 GroupByExpr::All(_) => {}
458 }
459 None
460 }
461
462 fn plan_create_lookup_table(
464 &mut self,
465 lt: &CreateLookupTableStatement,
466 ) -> Result<StreamingPlan, PlanningError> {
467 let name = object_name_to_string(<.name);
468
469 if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
470 return Err(PlanningError::InvalidQuery(format!(
471 "Lookup table '{}' already exists",
472 name
473 )));
474 }
475
476 let columns: Vec<(String, String)> = lt
477 .columns
478 .iter()
479 .map(|c| (c.name.value.clone(), c.data_type.to_string()))
480 .collect();
481
482 let properties = validate_properties(<.with_options).map_err(|e| {
483 PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
484 })?;
485
486 let arrow_fields: Vec<Field> = lt
488 .columns
489 .iter()
490 .map(|c| {
491 let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
492 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
493 let nullable = !c
494 .options
495 .iter()
496 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
497 Ok(Field::new(&c.name.value, dt, nullable))
498 })
499 .collect::<Result<_, PlanningError>>()?;
500 let arrow_schema = Arc::new(Schema::new(arrow_fields));
501
502 let info = LookupTableInfo {
503 name: name.clone(),
504 columns,
505 primary_key: lt.primary_key.clone(),
506 properties,
507 arrow_schema,
508 raw_options: lt.with_options.clone(),
509 };
510
511 self.lookup_tables.insert(name, info.clone());
512
513 Ok(StreamingPlan::RegisterLookupTable(info))
514 }
515
516 fn plan_drop_lookup_table(
518 &mut self,
519 name: &ObjectName,
520 if_exists: bool,
521 ) -> Result<StreamingPlan, PlanningError> {
522 let name_str = object_name_to_string(name);
523
524 if !if_exists && !self.lookup_tables.contains_key(&name_str) {
525 return Err(PlanningError::InvalidQuery(format!(
526 "Lookup table '{}' does not exist",
527 name_str
528 )));
529 }
530
531 self.lookup_tables.remove(&name_str);
532
533 Ok(StreamingPlan::DropLookupTable { name: name_str })
534 }
535
536 #[must_use]
538 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
539 self.sources.get(name)
540 }
541
542 #[must_use]
544 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
545 self.sinks.get(name)
546 }
547
548 #[must_use]
550 pub fn list_sources(&self) -> Vec<&SourceInfo> {
551 self.sources.values().collect()
552 }
553
554 #[must_use]
556 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
557 self.sinks.values().collect()
558 }
559
560 #[must_use]
562 pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
563 self.lookup_tables.get(name)
564 }
565
566 #[must_use]
568 pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
569 self.lookup_tables.values().collect()
570 }
571
572 #[must_use]
574 pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
575 self.lookup_tables.clone()
576 }
577
578 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
596 &self,
597 plan: &QueryPlan,
598 ctx: &SessionContext,
599 ) -> Result<LogicalPlan, PlanningError> {
600 let sql = plan.statement.to_string();
604 ctx.state()
605 .create_logical_plan(&sql)
606 .await
607 .map_err(PlanningError::DataFusion)
608 }
609}
610
611impl Default for StreamingPlanner {
612 fn default() -> Self {
613 Self::new()
614 }
615}
616
617#[derive(Debug, Default)]
619#[allow(clippy::struct_field_names)]
620struct QueryAnalysis {
621 window_config: Option<WindowOperatorConfig>,
622 join_config: Option<Vec<JoinOperatorConfig>>,
623 order_config: Option<OrderOperatorConfig>,
624 analytic_config: Option<AnalyticWindowConfig>,
625 having_config: Option<HavingFilterConfig>,
626 frame_config: Option<WindowFrameConfig>,
627}
628
629fn object_name_to_string(name: &ObjectName) -> String {
631 name.to_string()
632}
633
634#[derive(Debug, thiserror::Error)]
636pub enum PlanningError {
637 UnsupportedSql(String),
639
640 InvalidQuery(String),
642
643 SourceNotFound(String),
645
646 SinkNotFound(String),
648
649 DataFusion(#[from] datafusion_common::DataFusionError),
651}
652
653impl std::fmt::Display for PlanningError {
654 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655 match self {
656 Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
657 Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
658 Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
659 Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
660 Self::DataFusion(e) => {
661 let translated = crate::error::translate_datafusion_error(&e.to_string());
662 write!(f, "{translated}")
663 }
664 }
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671 use crate::parser::StreamingParser;
672
673 #[test]
674 fn test_plan_create_source() {
675 let mut planner = StreamingPlanner::new();
676 let statements =
677 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
678
679 let plan = planner.plan(&statements[0]).unwrap();
680 match plan {
681 StreamingPlan::RegisterSource(info) => {
682 assert_eq!(info.name, "events");
683 }
684 _ => panic!("Expected RegisterSource plan"),
685 }
686 }
687
688 #[test]
689 fn test_plan_create_sink() {
690 let mut planner = StreamingPlanner::new();
691 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
692
693 let plan = planner.plan(&statements[0]).unwrap();
694 match plan {
695 StreamingPlan::RegisterSink(info) => {
696 assert_eq!(info.name, "output");
697 assert_eq!(info.from, "events");
698 }
699 _ => panic!("Expected RegisterSink plan"),
700 }
701 }
702
703 #[test]
704 fn test_plan_duplicate_source() {
705 let mut planner = StreamingPlanner::new();
706
707 let statements =
709 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
710 planner.plan(&statements[0]).unwrap();
711
712 let result = planner.plan(&statements[0]);
714 assert!(result.is_err());
715 }
716
717 #[test]
718 fn test_plan_source_if_not_exists() {
719 let mut planner = StreamingPlanner::new();
720
721 let statements =
723 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
724 planner.plan(&statements[0]).unwrap();
725
726 let statements =
728 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
729 .unwrap();
730 let result = planner.plan(&statements[0]);
731 assert!(result.is_ok());
732 }
733
734 #[test]
735 fn test_plan_source_or_replace() {
736 let mut planner = StreamingPlanner::new();
737
738 let statements =
740 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
741 planner.plan(&statements[0]).unwrap();
742
743 let statements =
745 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
746 .unwrap();
747 let result = planner.plan(&statements[0]);
748 assert!(result.is_ok());
749 }
750
751 #[test]
752 fn test_plan_source_with_watermark() {
753 let mut planner = StreamingPlanner::new();
754 let statements = StreamingParser::parse_sql(
755 "CREATE SOURCE events (
756 id INT,
757 ts TIMESTAMP,
758 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
759 )",
760 )
761 .unwrap();
762
763 let plan = planner.plan(&statements[0]).unwrap();
764 match plan {
765 StreamingPlan::RegisterSource(info) => {
766 assert_eq!(info.name, "events");
767 assert_eq!(info.watermark_column, Some("ts".to_string()));
768 }
769 _ => panic!("Expected RegisterSource plan"),
770 }
771 }
772
773 #[test]
774 fn test_plan_standard_select() {
775 let mut planner = StreamingPlanner::new();
776 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
777
778 let plan = planner.plan(&statements[0]).unwrap();
779 match plan {
780 StreamingPlan::Standard(_) => {}
781 _ => panic!("Expected Standard plan for simple SELECT"),
782 }
783 }
784
785 #[test]
786 fn test_list_sources_and_sinks() {
787 let mut planner = StreamingPlanner::new();
788
789 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
791 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
792 planner.plan(&s1[0]).unwrap();
793 planner.plan(&s2[0]).unwrap();
794
795 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
797 planner.plan(&k1[0]).unwrap();
798
799 assert_eq!(planner.list_sources().len(), 2);
800 assert_eq!(planner.list_sinks().len(), 1);
801 assert!(planner.get_source("src1").is_some());
802 assert!(planner.get_sink("sink1").is_some());
803 }
804
805 #[test]
806 fn test_plan_query_with_window() {
807 let mut planner = StreamingPlanner::new();
808 let statements = StreamingParser::parse_sql(
809 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
810 )
811 .unwrap();
812
813 let plan = planner.plan(&statements[0]).unwrap();
814 match plan {
815 StreamingPlan::Query(query_plan) => {
816 assert!(query_plan.window_config.is_some());
817 let config = query_plan.window_config.unwrap();
818 assert_eq!(config.time_column, "event_time");
819 assert_eq!(config.size.as_secs(), 300);
820 }
821 _ => panic!("Expected Query plan"),
822 }
823 }
824
825 #[test]
826 fn test_plan_query_with_join() {
827 let mut planner = StreamingPlanner::new();
828 let statements = StreamingParser::parse_sql(
829 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
830 )
831 .unwrap();
832
833 let plan = planner.plan(&statements[0]).unwrap();
834 match plan {
835 StreamingPlan::Query(query_plan) => {
836 assert!(query_plan.join_config.is_some());
837 let configs = query_plan.join_config.unwrap();
838 assert_eq!(configs.len(), 1);
839 assert_eq!(configs[0].left_key(), "order_id");
840 assert_eq!(configs[0].right_key(), "order_id");
841 }
842 _ => panic!("Expected Query plan"),
843 }
844 }
845
846 #[test]
847 fn test_plan_query_with_lag() {
848 let mut planner = StreamingPlanner::new();
849 let statements = StreamingParser::parse_sql(
850 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
851 )
852 .unwrap();
853
854 let plan = planner.plan(&statements[0]).unwrap();
855 match plan {
856 StreamingPlan::Query(query_plan) => {
857 assert!(query_plan.analytic_config.is_some());
858 let config = query_plan.analytic_config.unwrap();
859 assert_eq!(config.functions.len(), 1);
860 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
861 }
862 _ => panic!("Expected Query plan with analytic config"),
863 }
864 }
865
866 #[test]
867 fn test_plan_query_with_having() {
868 let mut planner = StreamingPlanner::new();
869 let statements = StreamingParser::parse_sql(
870 "SELECT symbol, COUNT(*) AS cnt FROM trades \
871 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
872 HAVING COUNT(*) > 10",
873 )
874 .unwrap();
875
876 let plan = planner.plan(&statements[0]).unwrap();
877 match plan {
878 StreamingPlan::Query(query_plan) => {
879 assert!(query_plan.window_config.is_some());
880 assert!(query_plan.having_config.is_some());
881 let config = query_plan.having_config.unwrap();
882 assert!(
883 config.predicate().contains("COUNT(*)"),
884 "predicate was: {}",
885 config.predicate()
886 );
887 }
888 _ => panic!("Expected Query plan with having config"),
889 }
890 }
891
892 #[test]
893 fn test_plan_query_without_having() {
894 let mut planner = StreamingPlanner::new();
895 let statements = StreamingParser::parse_sql(
896 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
897 )
898 .unwrap();
899
900 let plan = planner.plan(&statements[0]).unwrap();
901 match plan {
902 StreamingPlan::Query(query_plan) => {
903 assert!(query_plan.having_config.is_none());
904 }
905 _ => panic!("Expected Query plan"),
906 }
907 }
908
909 #[test]
910 fn test_plan_having_only_produces_query_plan() {
911 let mut planner = StreamingPlanner::new();
913 let statements = StreamingParser::parse_sql(
914 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
915 )
916 .unwrap();
917
918 let plan = planner.plan(&statements[0]).unwrap();
919 match plan {
920 StreamingPlan::Query(query_plan) => {
921 assert!(query_plan.having_config.is_some());
922 assert!(query_plan.window_config.is_none());
923 }
924 _ => panic!("Expected Query plan for HAVING-only query"),
925 }
926 }
927
928 #[test]
929 fn test_plan_having_compound_predicate() {
930 let mut planner = StreamingPlanner::new();
931 let statements = StreamingParser::parse_sql(
932 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
933 FROM trades GROUP BY symbol \
934 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
935 )
936 .unwrap();
937
938 let plan = planner.plan(&statements[0]).unwrap();
939 match plan {
940 StreamingPlan::Query(query_plan) => {
941 let config = query_plan.having_config.unwrap();
942 let pred = config.predicate();
943 assert!(pred.contains("AND"), "predicate was: {pred}");
944 }
945 _ => panic!("Expected Query plan"),
946 }
947 }
948
949 #[test]
950 fn test_plan_query_with_lead() {
951 let mut planner = StreamingPlanner::new();
952 let statements = StreamingParser::parse_sql(
953 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
954 )
955 .unwrap();
956
957 let plan = planner.plan(&statements[0]).unwrap();
958 match plan {
959 StreamingPlan::Query(query_plan) => {
960 assert!(query_plan.analytic_config.is_some());
961 let config = query_plan.analytic_config.unwrap();
962 assert!(config.has_lookahead());
963 assert_eq!(config.functions[0].offset, 2);
964 }
965 _ => panic!("Expected Query plan with analytic config"),
966 }
967 }
968
969 #[test]
972 fn test_plan_single_join_produces_vec_of_one() {
973 let mut planner = StreamingPlanner::new();
974 let statements =
975 StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
976
977 let plan = planner.plan(&statements[0]).unwrap();
978 match plan {
979 StreamingPlan::Query(qp) => {
980 let configs = qp.join_config.unwrap();
981 assert_eq!(configs.len(), 1);
982 }
983 _ => panic!("Expected Query plan"),
984 }
985 }
986
987 #[test]
988 fn test_plan_two_way_join() {
989 let mut planner = StreamingPlanner::new();
990 let statements = StreamingParser::parse_sql(
991 "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
992 )
993 .unwrap();
994
995 let plan = planner.plan(&statements[0]).unwrap();
996 match plan {
997 StreamingPlan::Query(qp) => {
998 let configs = qp.join_config.unwrap();
999 assert_eq!(configs.len(), 2);
1000 assert_eq!(configs[0].left_key(), "id");
1001 assert_eq!(configs[0].right_key(), "a_id");
1002 assert_eq!(configs[1].left_key(), "id");
1003 assert_eq!(configs[1].right_key(), "b_id");
1004 }
1005 _ => panic!("Expected Query plan"),
1006 }
1007 }
1008
1009 #[test]
1010 fn test_plan_mixed_join_types() {
1011 let mut planner = StreamingPlanner::new();
1012 let statements = StreamingParser::parse_sql(
1013 "SELECT * FROM orders o \
1014 JOIN payments p ON o.id = p.order_id \
1015 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1016 JOIN customers c ON p.cust_id = c.id",
1017 )
1018 .unwrap();
1019
1020 let plan = planner.plan(&statements[0]).unwrap();
1021 match plan {
1022 StreamingPlan::Query(qp) => {
1023 let configs = qp.join_config.unwrap();
1024 assert_eq!(configs.len(), 2);
1025 assert!(configs[0].is_stream_stream());
1026 assert!(configs[1].is_lookup());
1027 }
1028 _ => panic!("Expected Query plan"),
1029 }
1030 }
1031
1032 #[test]
1033 fn test_plan_backward_compat_no_join() {
1034 let mut planner = StreamingPlanner::new();
1035 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1036
1037 let plan = planner.plan(&statements[0]).unwrap();
1038 match plan {
1039 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1041 }
1042 }
1043
1044 #[test]
1047 fn test_plan_query_with_rows_frame() {
1048 let mut planner = StreamingPlanner::new();
1049 let statements = StreamingParser::parse_sql(
1050 "SELECT AVG(price) OVER (ORDER BY ts \
1051 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1052 )
1053 .unwrap();
1054
1055 let plan = planner.plan(&statements[0]).unwrap();
1056 match plan {
1057 StreamingPlan::Query(qp) => {
1058 assert!(qp.frame_config.is_some());
1059 let fc = qp.frame_config.unwrap();
1060 assert_eq!(fc.functions.len(), 1);
1061 assert_eq!(fc.functions[0].source_column, "price");
1062 }
1063 _ => panic!("Expected Query plan with frame_config"),
1064 }
1065 }
1066
1067 #[test]
1068 fn test_plan_frame_with_partition() {
1069 let mut planner = StreamingPlanner::new();
1070 let statements = StreamingParser::parse_sql(
1071 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1072 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1073 )
1074 .unwrap();
1075
1076 let plan = planner.plan(&statements[0]).unwrap();
1077 match plan {
1078 StreamingPlan::Query(qp) => {
1079 let fc = qp.frame_config.unwrap();
1080 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1081 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1082 }
1083 _ => panic!("Expected Query plan with frame_config"),
1084 }
1085 }
1086
1087 #[test]
1088 fn test_plan_no_frame_is_standard() {
1089 let mut planner = StreamingPlanner::new();
1090 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1091
1092 let plan = planner.plan(&statements[0]).unwrap();
1093 match plan {
1094 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1096 }
1097 }
1098
1099 #[test]
1100 fn test_plan_unbounded_following_rejected() {
1101 let mut planner = StreamingPlanner::new();
1102 let statements = StreamingParser::parse_sql(
1103 "SELECT SUM(amount) OVER (ORDER BY id \
1104 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1105 FROM orders",
1106 )
1107 .unwrap();
1108
1109 let result = planner.plan(&statements[0]);
1110 assert!(result.is_err());
1111 let err = result.unwrap_err().to_string();
1112 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1113 }
1114}