1pub mod channel_derivation;
8pub mod lookup_join;
10pub mod predicate_split;
12pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26 analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::analyze_joins;
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32 CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33 StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36 AnalyticWindowConfig, HavingFilterConfig, JoinOperatorConfig, OrderOperatorConfig,
37 WindowFrameConfig, WindowOperatorConfig,
38};
39
40#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43 pub name: String,
45 pub columns: Vec<(String, String)>,
47 pub primary_key: Vec<String>,
49 pub properties: LookupTableProperties,
51 pub arrow_schema: SchemaRef,
53 pub raw_options: HashMap<String, String>,
55}
56
57pub struct StreamingPlanner {
59 sources: HashMap<String, SourceInfo>,
61 sinks: HashMap<String, SinkInfo>,
63 lookup_tables: HashMap<String, LookupTableInfo>,
65}
66
67#[derive(Debug, Clone)]
69pub struct SourceInfo {
70 pub name: String,
72 pub watermark_column: Option<String>,
74 pub options: HashMap<String, String>,
76}
77
78#[derive(Debug, Clone)]
80pub struct SinkInfo {
81 pub name: String,
83 pub from: String,
85 pub options: HashMap<String, String>,
87}
88
89#[derive(Debug)]
91#[allow(clippy::large_enum_variant)]
92pub enum StreamingPlan {
93 RegisterSource(SourceInfo),
95
96 RegisterSink(SinkInfo),
98
99 Query(QueryPlan),
101
102 Standard(Box<Statement>),
104
105 RegisterLookupTable(LookupTableInfo),
107
108 DropLookupTable {
110 name: String,
112 },
113}
114
115#[derive(Debug)]
117pub struct QueryPlan {
118 pub name: Option<String>,
120 pub window_config: Option<WindowOperatorConfig>,
122 pub join_config: Option<Vec<JoinOperatorConfig>>,
124 pub order_config: Option<OrderOperatorConfig>,
126 pub analytic_config: Option<AnalyticWindowConfig>,
128 pub having_config: Option<HavingFilterConfig>,
130 pub frame_config: Option<WindowFrameConfig>,
132 pub emit_clause: Option<EmitClause>,
134 pub statement: Box<Statement>,
136}
137
138impl StreamingPlanner {
139 #[must_use]
141 pub fn new() -> Self {
142 Self {
143 sources: HashMap::new(),
144 sinks: HashMap::new(),
145 lookup_tables: HashMap::new(),
146 }
147 }
148
149 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
155 match statement {
156 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
157 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
158 StreamingStatement::CreateContinuousQuery {
159 name,
160 query,
161 emit_clause,
162 ..
163 }
164 | StreamingStatement::CreateStream {
165 name,
166 query,
167 emit_clause,
168 ..
169 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
170 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
171 StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
172 StreamingStatement::DropLookupTable { name, if_exists } => {
173 self.plan_drop_lookup_table(name, *if_exists)
174 }
175 StreamingStatement::DropSource { .. }
176 | StreamingStatement::DropSink { .. }
177 | StreamingStatement::DropStream { .. }
178 | StreamingStatement::DropMaterializedView { .. }
179 | StreamingStatement::Show(_)
180 | StreamingStatement::Describe { .. }
181 | StreamingStatement::Explain { .. }
182 | StreamingStatement::CreateMaterializedView { .. }
183 | StreamingStatement::InsertInto { .. }
184 | StreamingStatement::AlterSource { .. }
185 | StreamingStatement::Checkpoint
186 | StreamingStatement::RestoreCheckpoint { .. } => {
187 Err(PlanningError::UnsupportedSql(format!(
190 "Statement type {:?} is handled by the database layer, not the planner",
191 std::mem::discriminant(statement)
192 )))
193 }
194 }
195 }
196
197 fn plan_create_source(
199 &mut self,
200 source: &CreateSourceStatement,
201 ) -> Result<StreamingPlan, PlanningError> {
202 let name = object_name_to_string(&source.name);
203
204 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
206 return Err(PlanningError::InvalidQuery(format!(
207 "Source '{}' already exists",
208 name
209 )));
210 }
211
212 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
214
215 let info = SourceInfo {
216 name: name.clone(),
217 watermark_column,
218 options: source.with_options.clone(),
219 };
220
221 self.sources.insert(name, info.clone());
223
224 Ok(StreamingPlan::RegisterSource(info))
225 }
226
227 fn plan_create_sink(
229 &mut self,
230 sink: &CreateSinkStatement,
231 ) -> Result<StreamingPlan, PlanningError> {
232 let name = object_name_to_string(&sink.name);
233
234 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
236 return Err(PlanningError::InvalidQuery(format!(
237 "Sink '{}' already exists",
238 name
239 )));
240 }
241
242 let from = match &sink.from {
244 SinkFrom::Table(table) => object_name_to_string(table),
245 SinkFrom::Query(_) => format!("{}_query", name),
246 };
247
248 let info = SinkInfo {
249 name: name.clone(),
250 from,
251 options: sink.with_options.clone(),
252 };
253
254 self.sinks.insert(name, info.clone());
256
257 Ok(StreamingPlan::RegisterSink(info))
258 }
259
260 #[allow(clippy::unused_self)] fn plan_continuous_query(
263 &mut self,
264 name: &ObjectName,
265 query: &StreamingStatement,
266 emit_clause: Option<&EmitClause>,
267 ) -> Result<StreamingPlan, PlanningError> {
268 let stmt = match query {
270 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
271 _ => {
272 return Err(PlanningError::InvalidQuery(
273 "Continuous query must contain a SELECT statement".to_string(),
274 ))
275 }
276 };
277
278 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
280
281 Ok(StreamingPlan::Query(QueryPlan {
282 name: Some(object_name_to_string(name)),
283 window_config: query_plan.window_config,
284 join_config: query_plan.join_config,
285 order_config: query_plan.order_config,
286 analytic_config: query_plan.analytic_config,
287 having_config: query_plan.having_config,
288 frame_config: query_plan.frame_config,
289 emit_clause: emit_clause.cloned(),
290 statement: Box::new(stmt),
291 }))
292 }
293
294 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
297 if let Statement::Query(query) = stmt {
299 if let SetExpr::Select(select) = query.body.as_ref() {
300 let window_function = Self::extract_window_from_select(select);
302
303 let join_analysis = analyze_joins(select).map_err(|e| {
305 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
306 })?;
307
308 let order_analysis = analyze_order_by(stmt);
310 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
311 .map_err(PlanningError::InvalidQuery)?;
312
313 let analytic_analysis = analyze_analytic_functions(stmt);
315 let analytic_config =
316 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
317
318 let agg_analysis = analyze_aggregates(stmt);
320 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
321
322 let frame_analysis = analyze_window_frames(stmt);
324 let frame_config = frame_analysis
325 .as_ref()
326 .map(WindowFrameConfig::from_analysis);
327
328 if let Some(fa) = &frame_analysis {
330 for f in &fa.functions {
331 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
332 return Err(PlanningError::InvalidQuery(
333 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
334 .to_string(),
335 ));
336 }
337 }
338 }
339
340 let has_streaming_features = window_function.is_some()
341 || join_analysis.is_some()
342 || order_config.is_some()
343 || analytic_config.is_some()
344 || having_config.is_some()
345 || frame_config.is_some();
346
347 if has_streaming_features {
348 let window_config = match window_function {
349 Some(w) => Some(
350 WindowOperatorConfig::from_window_function(&w)
351 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
352 ),
353 None => None,
354 };
355
356 let join_config =
357 join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
358
359 return Ok(StreamingPlan::Query(QueryPlan {
360 name: None,
361 window_config,
362 join_config,
363 order_config,
364 analytic_config,
365 having_config,
366 frame_config,
367 emit_clause: None,
368 statement: Box::new(stmt.clone()),
369 }));
370 }
371 }
372 }
373
374 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
376 }
377
378 fn analyze_query(
380 stmt: &Statement,
381 emit_clause: Option<&EmitClause>,
382 ) -> Result<QueryAnalysis, PlanningError> {
383 let mut analysis = QueryAnalysis::default();
384
385 if let Statement::Query(query) = stmt {
386 if let SetExpr::Select(select) = query.body.as_ref() {
387 if let Some(window) = Self::extract_window_from_select(select) {
389 let mut config = WindowOperatorConfig::from_window_function(&window)
390 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
391
392 if let Some(emit) = emit_clause {
394 config = config
395 .with_emit_clause(emit)
396 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
397 }
398
399 analysis.window_config = Some(config);
400 }
401
402 if let Some(multi) = analyze_joins(select).map_err(|e| {
404 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
405 })? {
406 analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
407 }
408 }
409 }
410
411 let order_analysis = analyze_order_by(stmt);
413 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
414 .map_err(PlanningError::InvalidQuery)?;
415
416 if let Some(analytic) = analyze_analytic_functions(stmt) {
418 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
419 }
420
421 let agg_analysis = analyze_aggregates(stmt);
423 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
424
425 if let Some(frame_analysis) = analyze_window_frames(stmt) {
427 for f in &frame_analysis.functions {
429 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
430 return Err(PlanningError::InvalidQuery(
431 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
432 .to_string(),
433 ));
434 }
435 }
436 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
437 }
438
439 Ok(analysis)
440 }
441
442 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
444 use sqlparser::ast::GroupByExpr;
446 match &select.group_by {
447 GroupByExpr::Expressions(exprs, _modifiers) => {
448 for group_by_expr in exprs {
449 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
450 {
451 return Some(window);
452 }
453 }
454 }
455 GroupByExpr::All(_) => {}
456 }
457 None
458 }
459
460 fn plan_create_lookup_table(
462 &mut self,
463 lt: &CreateLookupTableStatement,
464 ) -> Result<StreamingPlan, PlanningError> {
465 let name = object_name_to_string(<.name);
466
467 if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
468 return Err(PlanningError::InvalidQuery(format!(
469 "Lookup table '{}' already exists",
470 name
471 )));
472 }
473
474 let columns: Vec<(String, String)> = lt
475 .columns
476 .iter()
477 .map(|c| (c.name.value.clone(), c.data_type.to_string()))
478 .collect();
479
480 let properties = validate_properties(<.with_options).map_err(|e| {
481 PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
482 })?;
483
484 let arrow_fields: Vec<Field> = lt
486 .columns
487 .iter()
488 .map(|c| {
489 let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
490 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
491 let nullable = !c
492 .options
493 .iter()
494 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
495 Ok(Field::new(&c.name.value, dt, nullable))
496 })
497 .collect::<Result<_, PlanningError>>()?;
498 let arrow_schema = Arc::new(Schema::new(arrow_fields));
499
500 let info = LookupTableInfo {
501 name: name.clone(),
502 columns,
503 primary_key: lt.primary_key.clone(),
504 properties,
505 arrow_schema,
506 raw_options: lt.with_options.clone(),
507 };
508
509 self.lookup_tables.insert(name, info.clone());
510
511 Ok(StreamingPlan::RegisterLookupTable(info))
512 }
513
514 fn plan_drop_lookup_table(
516 &mut self,
517 name: &ObjectName,
518 if_exists: bool,
519 ) -> Result<StreamingPlan, PlanningError> {
520 let name_str = object_name_to_string(name);
521
522 if !if_exists && !self.lookup_tables.contains_key(&name_str) {
523 return Err(PlanningError::InvalidQuery(format!(
524 "Lookup table '{}' does not exist",
525 name_str
526 )));
527 }
528
529 self.lookup_tables.remove(&name_str);
530
531 Ok(StreamingPlan::DropLookupTable { name: name_str })
532 }
533
534 #[must_use]
536 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
537 self.sources.get(name)
538 }
539
540 #[must_use]
542 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
543 self.sinks.get(name)
544 }
545
546 #[must_use]
548 pub fn list_sources(&self) -> Vec<&SourceInfo> {
549 self.sources.values().collect()
550 }
551
552 #[must_use]
554 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
555 self.sinks.values().collect()
556 }
557
558 #[must_use]
560 pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
561 self.lookup_tables.get(name)
562 }
563
564 #[must_use]
566 pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
567 self.lookup_tables.values().collect()
568 }
569
570 #[must_use]
572 pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
573 self.lookup_tables.clone()
574 }
575
576 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
587 &self,
588 plan: &QueryPlan,
589 ctx: &SessionContext,
590 ) -> Result<LogicalPlan, PlanningError> {
591 let sql = plan.statement.to_string();
595 ctx.state()
596 .create_logical_plan(&sql)
597 .await
598 .map_err(PlanningError::DataFusion)
599 }
600}
601
602impl Default for StreamingPlanner {
603 fn default() -> Self {
604 Self::new()
605 }
606}
607
608#[derive(Debug, Default)]
610#[allow(clippy::struct_field_names)]
611struct QueryAnalysis {
612 window_config: Option<WindowOperatorConfig>,
613 join_config: Option<Vec<JoinOperatorConfig>>,
614 order_config: Option<OrderOperatorConfig>,
615 analytic_config: Option<AnalyticWindowConfig>,
616 having_config: Option<HavingFilterConfig>,
617 frame_config: Option<WindowFrameConfig>,
618}
619
620fn object_name_to_string(name: &ObjectName) -> String {
622 name.to_string()
623}
624
625#[derive(Debug, thiserror::Error)]
627pub enum PlanningError {
628 UnsupportedSql(String),
630
631 InvalidQuery(String),
633
634 SourceNotFound(String),
636
637 SinkNotFound(String),
639
640 DataFusion(#[from] datafusion_common::DataFusionError),
642}
643
644impl std::fmt::Display for PlanningError {
645 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646 match self {
647 Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
648 Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
649 Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
650 Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
651 Self::DataFusion(e) => {
652 let translated = crate::error::translate_datafusion_error(&e.to_string());
653 write!(f, "{translated}")
654 }
655 }
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662 use crate::parser::StreamingParser;
663
664 #[test]
665 fn test_plan_create_source() {
666 let mut planner = StreamingPlanner::new();
667 let statements =
668 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
669
670 let plan = planner.plan(&statements[0]).unwrap();
671 match plan {
672 StreamingPlan::RegisterSource(info) => {
673 assert_eq!(info.name, "events");
674 }
675 _ => panic!("Expected RegisterSource plan"),
676 }
677 }
678
679 #[test]
680 fn test_plan_create_sink() {
681 let mut planner = StreamingPlanner::new();
682 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
683
684 let plan = planner.plan(&statements[0]).unwrap();
685 match plan {
686 StreamingPlan::RegisterSink(info) => {
687 assert_eq!(info.name, "output");
688 assert_eq!(info.from, "events");
689 }
690 _ => panic!("Expected RegisterSink plan"),
691 }
692 }
693
694 #[test]
695 fn test_plan_duplicate_source() {
696 let mut planner = StreamingPlanner::new();
697
698 let statements =
700 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
701 planner.plan(&statements[0]).unwrap();
702
703 let result = planner.plan(&statements[0]);
705 assert!(result.is_err());
706 }
707
708 #[test]
709 fn test_plan_source_if_not_exists() {
710 let mut planner = StreamingPlanner::new();
711
712 let statements =
714 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
715 planner.plan(&statements[0]).unwrap();
716
717 let statements =
719 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
720 .unwrap();
721 let result = planner.plan(&statements[0]);
722 assert!(result.is_ok());
723 }
724
725 #[test]
726 fn test_plan_source_or_replace() {
727 let mut planner = StreamingPlanner::new();
728
729 let statements =
731 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
732 planner.plan(&statements[0]).unwrap();
733
734 let statements =
736 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
737 .unwrap();
738 let result = planner.plan(&statements[0]);
739 assert!(result.is_ok());
740 }
741
742 #[test]
743 fn test_plan_source_with_watermark() {
744 let mut planner = StreamingPlanner::new();
745 let statements = StreamingParser::parse_sql(
746 "CREATE SOURCE events (
747 id INT,
748 ts TIMESTAMP,
749 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
750 )",
751 )
752 .unwrap();
753
754 let plan = planner.plan(&statements[0]).unwrap();
755 match plan {
756 StreamingPlan::RegisterSource(info) => {
757 assert_eq!(info.name, "events");
758 assert_eq!(info.watermark_column, Some("ts".to_string()));
759 }
760 _ => panic!("Expected RegisterSource plan"),
761 }
762 }
763
764 #[test]
765 fn test_plan_standard_select() {
766 let mut planner = StreamingPlanner::new();
767 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
768
769 let plan = planner.plan(&statements[0]).unwrap();
770 match plan {
771 StreamingPlan::Standard(_) => {}
772 _ => panic!("Expected Standard plan for simple SELECT"),
773 }
774 }
775
776 #[test]
777 fn test_list_sources_and_sinks() {
778 let mut planner = StreamingPlanner::new();
779
780 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
782 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
783 planner.plan(&s1[0]).unwrap();
784 planner.plan(&s2[0]).unwrap();
785
786 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
788 planner.plan(&k1[0]).unwrap();
789
790 assert_eq!(planner.list_sources().len(), 2);
791 assert_eq!(planner.list_sinks().len(), 1);
792 assert!(planner.get_source("src1").is_some());
793 assert!(planner.get_sink("sink1").is_some());
794 }
795
796 #[test]
797 fn test_plan_query_with_window() {
798 let mut planner = StreamingPlanner::new();
799 let statements = StreamingParser::parse_sql(
800 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
801 )
802 .unwrap();
803
804 let plan = planner.plan(&statements[0]).unwrap();
805 match plan {
806 StreamingPlan::Query(query_plan) => {
807 assert!(query_plan.window_config.is_some());
808 let config = query_plan.window_config.unwrap();
809 assert_eq!(config.time_column, "event_time");
810 assert_eq!(config.size.as_secs(), 300);
811 }
812 _ => panic!("Expected Query plan"),
813 }
814 }
815
816 #[test]
817 fn test_plan_query_with_join() {
818 let mut planner = StreamingPlanner::new();
819 let statements = StreamingParser::parse_sql(
820 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
821 )
822 .unwrap();
823
824 let plan = planner.plan(&statements[0]).unwrap();
825 match plan {
826 StreamingPlan::Query(query_plan) => {
827 assert!(query_plan.join_config.is_some());
828 let configs = query_plan.join_config.unwrap();
829 assert_eq!(configs.len(), 1);
830 assert_eq!(configs[0].left_key(), "order_id");
831 assert_eq!(configs[0].right_key(), "order_id");
832 }
833 _ => panic!("Expected Query plan"),
834 }
835 }
836
837 #[test]
838 fn test_plan_query_with_lag() {
839 let mut planner = StreamingPlanner::new();
840 let statements = StreamingParser::parse_sql(
841 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
842 )
843 .unwrap();
844
845 let plan = planner.plan(&statements[0]).unwrap();
846 match plan {
847 StreamingPlan::Query(query_plan) => {
848 assert!(query_plan.analytic_config.is_some());
849 let config = query_plan.analytic_config.unwrap();
850 assert_eq!(config.functions.len(), 1);
851 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
852 }
853 _ => panic!("Expected Query plan with analytic config"),
854 }
855 }
856
857 #[test]
858 fn test_plan_query_with_having() {
859 let mut planner = StreamingPlanner::new();
860 let statements = StreamingParser::parse_sql(
861 "SELECT symbol, COUNT(*) AS cnt FROM trades \
862 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
863 HAVING COUNT(*) > 10",
864 )
865 .unwrap();
866
867 let plan = planner.plan(&statements[0]).unwrap();
868 match plan {
869 StreamingPlan::Query(query_plan) => {
870 assert!(query_plan.window_config.is_some());
871 assert!(query_plan.having_config.is_some());
872 let config = query_plan.having_config.unwrap();
873 assert!(
874 config.predicate().contains("COUNT(*)"),
875 "predicate was: {}",
876 config.predicate()
877 );
878 }
879 _ => panic!("Expected Query plan with having config"),
880 }
881 }
882
883 #[test]
884 fn test_plan_query_without_having() {
885 let mut planner = StreamingPlanner::new();
886 let statements = StreamingParser::parse_sql(
887 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
888 )
889 .unwrap();
890
891 let plan = planner.plan(&statements[0]).unwrap();
892 match plan {
893 StreamingPlan::Query(query_plan) => {
894 assert!(query_plan.having_config.is_none());
895 }
896 _ => panic!("Expected Query plan"),
897 }
898 }
899
900 #[test]
901 fn test_plan_having_only_produces_query_plan() {
902 let mut planner = StreamingPlanner::new();
904 let statements = StreamingParser::parse_sql(
905 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
906 )
907 .unwrap();
908
909 let plan = planner.plan(&statements[0]).unwrap();
910 match plan {
911 StreamingPlan::Query(query_plan) => {
912 assert!(query_plan.having_config.is_some());
913 assert!(query_plan.window_config.is_none());
914 }
915 _ => panic!("Expected Query plan for HAVING-only query"),
916 }
917 }
918
919 #[test]
920 fn test_plan_having_compound_predicate() {
921 let mut planner = StreamingPlanner::new();
922 let statements = StreamingParser::parse_sql(
923 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
924 FROM trades GROUP BY symbol \
925 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
926 )
927 .unwrap();
928
929 let plan = planner.plan(&statements[0]).unwrap();
930 match plan {
931 StreamingPlan::Query(query_plan) => {
932 let config = query_plan.having_config.unwrap();
933 let pred = config.predicate();
934 assert!(pred.contains("AND"), "predicate was: {pred}");
935 }
936 _ => panic!("Expected Query plan"),
937 }
938 }
939
940 #[test]
941 fn test_plan_query_with_lead() {
942 let mut planner = StreamingPlanner::new();
943 let statements = StreamingParser::parse_sql(
944 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
945 )
946 .unwrap();
947
948 let plan = planner.plan(&statements[0]).unwrap();
949 match plan {
950 StreamingPlan::Query(query_plan) => {
951 assert!(query_plan.analytic_config.is_some());
952 let config = query_plan.analytic_config.unwrap();
953 assert!(config.has_lookahead());
954 assert_eq!(config.functions[0].offset, 2);
955 }
956 _ => panic!("Expected Query plan with analytic config"),
957 }
958 }
959
960 #[test]
963 fn test_plan_single_join_produces_vec_of_one() {
964 let mut planner = StreamingPlanner::new();
965 let statements =
966 StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
967
968 let plan = planner.plan(&statements[0]).unwrap();
969 match plan {
970 StreamingPlan::Query(qp) => {
971 let configs = qp.join_config.unwrap();
972 assert_eq!(configs.len(), 1);
973 }
974 _ => panic!("Expected Query plan"),
975 }
976 }
977
978 #[test]
979 fn test_plan_two_way_join() {
980 let mut planner = StreamingPlanner::new();
981 let statements = StreamingParser::parse_sql(
982 "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
983 )
984 .unwrap();
985
986 let plan = planner.plan(&statements[0]).unwrap();
987 match plan {
988 StreamingPlan::Query(qp) => {
989 let configs = qp.join_config.unwrap();
990 assert_eq!(configs.len(), 2);
991 assert_eq!(configs[0].left_key(), "id");
992 assert_eq!(configs[0].right_key(), "a_id");
993 assert_eq!(configs[1].left_key(), "id");
994 assert_eq!(configs[1].right_key(), "b_id");
995 }
996 _ => panic!("Expected Query plan"),
997 }
998 }
999
1000 #[test]
1001 fn test_plan_mixed_join_types() {
1002 let mut planner = StreamingPlanner::new();
1003 let statements = StreamingParser::parse_sql(
1004 "SELECT * FROM orders o \
1005 JOIN payments p ON o.id = p.order_id \
1006 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1007 JOIN customers c ON p.cust_id = c.id",
1008 )
1009 .unwrap();
1010
1011 let plan = planner.plan(&statements[0]).unwrap();
1012 match plan {
1013 StreamingPlan::Query(qp) => {
1014 let configs = qp.join_config.unwrap();
1015 assert_eq!(configs.len(), 2);
1016 assert!(configs[0].is_stream_stream());
1017 assert!(configs[1].is_lookup());
1018 }
1019 _ => panic!("Expected Query plan"),
1020 }
1021 }
1022
1023 #[test]
1024 fn test_plan_backward_compat_no_join() {
1025 let mut planner = StreamingPlanner::new();
1026 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1027
1028 let plan = planner.plan(&statements[0]).unwrap();
1029 match plan {
1030 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1032 }
1033 }
1034
1035 #[test]
1038 fn test_plan_query_with_rows_frame() {
1039 let mut planner = StreamingPlanner::new();
1040 let statements = StreamingParser::parse_sql(
1041 "SELECT AVG(price) OVER (ORDER BY ts \
1042 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1043 )
1044 .unwrap();
1045
1046 let plan = planner.plan(&statements[0]).unwrap();
1047 match plan {
1048 StreamingPlan::Query(qp) => {
1049 assert!(qp.frame_config.is_some());
1050 let fc = qp.frame_config.unwrap();
1051 assert_eq!(fc.functions.len(), 1);
1052 assert_eq!(fc.functions[0].source_column, "price");
1053 }
1054 _ => panic!("Expected Query plan with frame_config"),
1055 }
1056 }
1057
1058 #[test]
1059 fn test_plan_frame_with_partition() {
1060 let mut planner = StreamingPlanner::new();
1061 let statements = StreamingParser::parse_sql(
1062 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1063 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1064 )
1065 .unwrap();
1066
1067 let plan = planner.plan(&statements[0]).unwrap();
1068 match plan {
1069 StreamingPlan::Query(qp) => {
1070 let fc = qp.frame_config.unwrap();
1071 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1072 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1073 }
1074 _ => panic!("Expected Query plan with frame_config"),
1075 }
1076 }
1077
1078 #[test]
1079 fn test_plan_no_frame_is_standard() {
1080 let mut planner = StreamingPlanner::new();
1081 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1082
1083 let plan = planner.plan(&statements[0]).unwrap();
1084 match plan {
1085 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1087 }
1088 }
1089
1090 #[test]
1091 fn test_plan_unbounded_following_rejected() {
1092 let mut planner = StreamingPlanner::new();
1093 let statements = StreamingParser::parse_sql(
1094 "SELECT SUM(amount) OVER (ORDER BY id \
1095 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1096 FROM orders",
1097 )
1098 .unwrap();
1099
1100 let result = planner.plan(&statements[0]);
1101 assert!(result.is_err());
1102 let err = result.unwrap_err().to_string();
1103 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1104 }
1105}