1pub mod channel_derivation;
8pub mod lookup_join;
10pub mod predicate_split;
12pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26 analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::{analyze_joins, MultiJoinAnalysis};
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32 CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33 StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36 AnalyticWindowConfig, HavingFilterConfig, JoinOperatorConfig, OrderOperatorConfig,
37 WindowFrameConfig, WindowOperatorConfig,
38};
39
40#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43 pub name: String,
45 pub columns: Vec<(String, String)>,
47 pub primary_key: Vec<String>,
49 pub properties: LookupTableProperties,
51 pub arrow_schema: SchemaRef,
53 pub raw_options: HashMap<String, String>,
55}
56
57pub struct StreamingPlanner {
59 sources: HashMap<String, SourceInfo>,
61 sinks: HashMap<String, SinkInfo>,
63 lookup_tables: HashMap<String, LookupTableInfo>,
65 windowed_views: std::collections::HashSet<String>,
69}
70
71#[derive(Debug, Clone)]
73pub struct SourceInfo {
74 pub name: String,
76 pub watermark_column: Option<String>,
78 pub options: HashMap<String, String>,
80}
81
82#[derive(Debug, Clone)]
84pub struct SinkInfo {
85 pub name: String,
87 pub from: String,
89 pub options: HashMap<String, String>,
91}
92
93#[derive(Debug)]
95#[allow(clippy::large_enum_variant)]
96pub enum StreamingPlan {
97 RegisterSource(SourceInfo),
99
100 RegisterSink(SinkInfo),
102
103 Query(QueryPlan),
105
106 Standard(Box<Statement>),
108
109 RegisterLookupTable(LookupTableInfo),
111
112 DropLookupTable {
114 name: String,
116 },
117}
118
119#[derive(Debug)]
121pub struct QueryPlan {
122 pub name: Option<String>,
124 pub window_config: Option<WindowOperatorConfig>,
126 pub join_config: Option<Vec<JoinOperatorConfig>>,
128 pub order_config: Option<OrderOperatorConfig>,
130 pub analytic_config: Option<AnalyticWindowConfig>,
132 pub having_config: Option<HavingFilterConfig>,
134 pub frame_config: Option<WindowFrameConfig>,
136 pub emit_clause: Option<EmitClause>,
138 pub statement: Box<Statement>,
140}
141
142impl StreamingPlanner {
143 #[must_use]
145 pub fn new() -> Self {
146 Self {
147 sources: HashMap::new(),
148 sinks: HashMap::new(),
149 lookup_tables: HashMap::new(),
150 windowed_views: std::collections::HashSet::new(),
151 }
152 }
153
154 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
160 match statement {
161 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
162 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
163 StreamingStatement::CreateContinuousQuery {
164 name,
165 query,
166 emit_clause,
167 ..
168 }
169 | StreamingStatement::CreateStream {
170 name,
171 query,
172 emit_clause,
173 ..
174 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
175 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
176 StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
177 StreamingStatement::DropLookupTable { name, if_exists } => {
178 self.plan_drop_lookup_table(name, *if_exists)
179 }
180 StreamingStatement::DropSource { .. }
181 | StreamingStatement::DropSink { .. }
182 | StreamingStatement::DropStream { .. }
183 | StreamingStatement::DropMaterializedView { .. }
184 | StreamingStatement::Show(_)
185 | StreamingStatement::Describe { .. }
186 | StreamingStatement::Explain { .. }
187 | StreamingStatement::CreateMaterializedView { .. }
188 | StreamingStatement::InsertInto { .. }
189 | StreamingStatement::AlterSource { .. }
190 | StreamingStatement::Checkpoint
191 | StreamingStatement::RestoreCheckpoint { .. }
192 | StreamingStatement::Subscribe(_)
193 | StreamingStatement::DeclareCursorForSubscribe { .. } => {
194 Err(PlanningError::UnsupportedSql(format!(
197 "Statement type {:?} is handled by the database layer, not the planner",
198 std::mem::discriminant(statement)
199 )))
200 }
201 }
202 }
203
204 fn plan_create_source(
206 &mut self,
207 source: &CreateSourceStatement,
208 ) -> Result<StreamingPlan, PlanningError> {
209 let name = object_name_to_string(&source.name);
210
211 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
213 return Err(PlanningError::InvalidQuery(format!(
214 "Source '{}' already exists",
215 name
216 )));
217 }
218
219 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
221
222 let info = SourceInfo {
223 name: name.clone(),
224 watermark_column,
225 options: source.with_options.clone(),
226 };
227
228 self.sources.insert(name, info.clone());
230
231 Ok(StreamingPlan::RegisterSource(info))
232 }
233
234 fn plan_create_sink(
236 &mut self,
237 sink: &CreateSinkStatement,
238 ) -> Result<StreamingPlan, PlanningError> {
239 let name = object_name_to_string(&sink.name);
240
241 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
243 return Err(PlanningError::InvalidQuery(format!(
244 "Sink '{}' already exists",
245 name
246 )));
247 }
248
249 let from = match &sink.from {
251 SinkFrom::Table(table) => object_name_to_string(table),
252 SinkFrom::Query(_) => format!("{}_query", name),
253 };
254
255 let info = SinkInfo {
256 name: name.clone(),
257 from,
258 options: sink.with_options.clone(),
259 };
260
261 self.sinks.insert(name, info.clone());
263
264 Ok(StreamingPlan::RegisterSink(info))
265 }
266
267 fn plan_continuous_query(
269 &mut self,
270 name: &ObjectName,
271 query: &StreamingStatement,
272 emit_clause: Option<&EmitClause>,
273 ) -> Result<StreamingPlan, PlanningError> {
274 let stmt = match query {
276 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
277 _ => {
278 return Err(PlanningError::InvalidQuery(
279 "Continuous query must contain a SELECT statement".to_string(),
280 ))
281 }
282 };
283
284 let query_plan = self.analyze_query(&stmt, emit_clause)?;
286
287 let view_name = object_name_to_string(name);
291 if query_plan.window_config.is_some() {
292 self.windowed_views.insert(view_name);
293 } else {
294 self.windowed_views.remove(&view_name);
295 }
296
297 Ok(StreamingPlan::Query(QueryPlan {
298 name: Some(object_name_to_string(name)),
299 window_config: query_plan.window_config,
300 join_config: query_plan.join_config,
301 order_config: query_plan.order_config,
302 analytic_config: query_plan.analytic_config,
303 having_config: query_plan.having_config,
304 frame_config: query_plan.frame_config,
305 emit_clause: emit_clause.cloned(),
306 statement: Box::new(stmt),
307 }))
308 }
309
310 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
313 if let Statement::Query(query) = stmt {
315 if let SetExpr::Select(select) = query.body.as_ref() {
316 let window_function = Self::extract_window_from_select(select);
318
319 let join_analysis = analyze_joins(select).map_err(|e| {
321 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
322 })?;
323
324 if let Some(ref multi) = join_analysis {
325 reject_unbounded_streaming_join(
326 multi,
327 &self.lookup_tables,
328 &self.windowed_views,
329 )?;
330 }
331
332 let order_analysis = analyze_order_by(stmt);
334 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
335 .map_err(PlanningError::InvalidQuery)?;
336
337 let analytic_analysis = analyze_analytic_functions(stmt);
339 let analytic_config =
340 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
341
342 let agg_analysis = analyze_aggregates(stmt);
344 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
345
346 let frame_analysis = analyze_window_frames(stmt);
348 let frame_config = frame_analysis
349 .as_ref()
350 .map(WindowFrameConfig::from_analysis);
351
352 if let Some(fa) = &frame_analysis {
354 for f in &fa.functions {
355 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
356 return Err(PlanningError::InvalidQuery(
357 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
358 .to_string(),
359 ));
360 }
361 }
362 }
363
364 let has_streaming_features = window_function.is_some()
365 || join_analysis.is_some()
366 || order_config.is_some()
367 || analytic_config.is_some()
368 || having_config.is_some()
369 || frame_config.is_some();
370
371 if has_streaming_features {
372 let window_config = match window_function {
373 Some(w) => Some(
374 WindowOperatorConfig::from_window_function(&w)
375 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
376 ),
377 None => None,
378 };
379
380 let join_config = join_analysis
384 .filter(|m| !is_windowed_view_join(m, &self.windowed_views))
385 .map(|m| JoinOperatorConfig::from_multi_analysis(&m));
386
387 return Ok(StreamingPlan::Query(QueryPlan {
388 name: None,
389 window_config,
390 join_config,
391 order_config,
392 analytic_config,
393 having_config,
394 frame_config,
395 emit_clause: None,
396 statement: Box::new(stmt.clone()),
397 }));
398 }
399 }
400 }
401
402 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
404 }
405
406 fn analyze_query(
408 &self,
409 stmt: &Statement,
410 emit_clause: Option<&EmitClause>,
411 ) -> Result<QueryAnalysis, PlanningError> {
412 let mut analysis = QueryAnalysis::default();
413
414 if let Statement::Query(query) = stmt {
415 if let SetExpr::Select(select) = query.body.as_ref() {
416 if let Some(window) = Self::extract_window_from_select(select) {
418 let mut config = WindowOperatorConfig::from_window_function(&window)
419 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
420
421 if let Some(emit) = emit_clause {
423 config = config
424 .with_emit_clause(emit)
425 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
426 }
427
428 analysis.window_config = Some(config);
429 }
430
431 if let Some(multi) = analyze_joins(select).map_err(|e| {
433 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
434 })? {
435 reject_unbounded_streaming_join(
436 &multi,
437 &self.lookup_tables,
438 &self.windowed_views,
439 )?;
440 if !is_windowed_view_join(&multi, &self.windowed_views) {
443 analysis.join_config =
444 Some(JoinOperatorConfig::from_multi_analysis(&multi));
445 }
446 }
447 }
448 }
449
450 let order_analysis = analyze_order_by(stmt);
452 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
453 .map_err(PlanningError::InvalidQuery)?;
454
455 if let Some(analytic) = analyze_analytic_functions(stmt) {
457 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
458 }
459
460 let agg_analysis = analyze_aggregates(stmt);
462 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
463
464 if let Some(frame_analysis) = analyze_window_frames(stmt) {
466 for f in &frame_analysis.functions {
468 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
469 return Err(PlanningError::InvalidQuery(
470 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
471 .to_string(),
472 ));
473 }
474 }
475 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
476 }
477
478 Ok(analysis)
479 }
480
481 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
483 use sqlparser::ast::GroupByExpr;
485 match &select.group_by {
486 GroupByExpr::Expressions(exprs, _modifiers) => {
487 for group_by_expr in exprs {
488 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
489 {
490 return Some(window);
491 }
492 }
493 }
494 GroupByExpr::All(_) => {}
495 }
496 None
497 }
498
499 fn plan_create_lookup_table(
501 &mut self,
502 lt: &CreateLookupTableStatement,
503 ) -> Result<StreamingPlan, PlanningError> {
504 let name = object_name_to_string(<.name);
505
506 if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
507 return Err(PlanningError::InvalidQuery(format!(
508 "Lookup table '{}' already exists",
509 name
510 )));
511 }
512
513 let columns: Vec<(String, String)> = lt
514 .columns
515 .iter()
516 .map(|c| (c.name.value.clone(), c.data_type.to_string()))
517 .collect();
518
519 let properties = validate_properties(<.with_options).map_err(|e| {
520 PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
521 })?;
522
523 let arrow_fields: Vec<Field> = lt
525 .columns
526 .iter()
527 .map(|c| {
528 let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
529 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
530 let nullable = !c
531 .options
532 .iter()
533 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
534 Ok(Field::new(&c.name.value, dt, nullable))
535 })
536 .collect::<Result<_, PlanningError>>()?;
537 let arrow_schema = Arc::new(Schema::new(arrow_fields));
538
539 let info = LookupTableInfo {
540 name: name.clone(),
541 columns,
542 primary_key: lt.primary_key.clone(),
543 properties,
544 arrow_schema,
545 raw_options: lt.with_options.clone(),
546 };
547
548 self.lookup_tables.insert(name, info.clone());
549
550 Ok(StreamingPlan::RegisterLookupTable(info))
551 }
552
553 fn plan_drop_lookup_table(
555 &mut self,
556 name: &ObjectName,
557 if_exists: bool,
558 ) -> Result<StreamingPlan, PlanningError> {
559 let name_str = object_name_to_string(name);
560
561 if !if_exists && !self.lookup_tables.contains_key(&name_str) {
562 return Err(PlanningError::InvalidQuery(format!(
563 "Lookup table '{}' does not exist",
564 name_str
565 )));
566 }
567
568 self.lookup_tables.remove(&name_str);
569
570 Ok(StreamingPlan::DropLookupTable { name: name_str })
571 }
572
573 #[must_use]
575 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
576 self.sources.get(name)
577 }
578
579 #[must_use]
581 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
582 self.sinks.get(name)
583 }
584
585 #[must_use]
587 pub fn list_sources(&self) -> Vec<&SourceInfo> {
588 self.sources.values().collect()
589 }
590
591 #[must_use]
593 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
594 self.sinks.values().collect()
595 }
596
597 #[must_use]
599 pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
600 self.lookup_tables.get(name)
601 }
602
603 #[must_use]
605 pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
606 self.lookup_tables.values().collect()
607 }
608
609 #[must_use]
611 pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
612 self.lookup_tables.clone()
613 }
614
615 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
626 &self,
627 plan: &QueryPlan,
628 ctx: &SessionContext,
629 ) -> Result<LogicalPlan, PlanningError> {
630 let sql = plan.statement.to_string();
634 ctx.state()
635 .create_logical_plan(&sql)
636 .await
637 .map_err(PlanningError::DataFusion)
638 }
639}
640
641impl Default for StreamingPlanner {
642 fn default() -> Self {
643 Self::new()
644 }
645}
646
647#[derive(Debug, Default)]
649#[allow(clippy::struct_field_names)]
650struct QueryAnalysis {
651 window_config: Option<WindowOperatorConfig>,
652 join_config: Option<Vec<JoinOperatorConfig>>,
653 order_config: Option<OrderOperatorConfig>,
654 analytic_config: Option<AnalyticWindowConfig>,
655 having_config: Option<HavingFilterConfig>,
656 frame_config: Option<WindowFrameConfig>,
657}
658
659fn object_name_to_string(name: &ObjectName) -> String {
661 name.to_string()
662}
663
664fn reject_unbounded_streaming_join(
666 multi: &MultiJoinAnalysis,
667 lookup_tables: &HashMap<String, LookupTableInfo>,
668 windowed_views: &std::collections::HashSet<String>,
669) -> Result<(), PlanningError> {
670 for step in &multi.joins {
671 if step.is_bounded() {
672 continue;
673 }
674 let left_lookup = lookup_tables.contains_key(&step.left_table);
675 let right_lookup = lookup_tables.contains_key(&step.right_table);
676 let both_windowed =
679 windowed_views.contains(&step.left_table) && windowed_views.contains(&step.right_table);
680 if !left_lookup && !right_lookup && !both_windowed {
681 return Err(PlanningError::InvalidQuery(format!(
682 "unbounded join between streaming sources '{}' and '{}'; \
683 add a temporal predicate or use a lookup table",
684 step.left_table, step.right_table,
685 )));
686 }
687 }
688 Ok(())
689}
690
691fn is_windowed_view_join(
695 multi: &MultiJoinAnalysis,
696 windowed_views: &std::collections::HashSet<String>,
697) -> bool {
698 multi.joins.len() == 1
699 && multi.joins.iter().all(|j| {
700 !j.is_bounded()
701 && windowed_views.contains(&j.left_table)
702 && windowed_views.contains(&j.right_table)
703 })
704}
705
706#[derive(Debug, thiserror::Error)]
708pub enum PlanningError {
709 UnsupportedSql(String),
711
712 InvalidQuery(String),
714
715 SourceNotFound(String),
717
718 SinkNotFound(String),
720
721 DataFusion(#[from] datafusion_common::DataFusionError),
723}
724
725impl std::fmt::Display for PlanningError {
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 match self {
728 Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
729 Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
730 Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
731 Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
732 Self::DataFusion(e) => {
733 let translated = crate::error::translate_datafusion_error(&e.to_string());
734 write!(f, "{translated}")
735 }
736 }
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use super::*;
743 use crate::parser::StreamingParser;
744
745 #[test]
746 fn test_plan_create_source() {
747 let mut planner = StreamingPlanner::new();
748 let statements =
749 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
750
751 let plan = planner.plan(&statements[0]).unwrap();
752 match plan {
753 StreamingPlan::RegisterSource(info) => {
754 assert_eq!(info.name, "events");
755 }
756 _ => panic!("Expected RegisterSource plan"),
757 }
758 }
759
760 #[test]
761 fn test_plan_create_sink() {
762 let mut planner = StreamingPlanner::new();
763 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
764
765 let plan = planner.plan(&statements[0]).unwrap();
766 match plan {
767 StreamingPlan::RegisterSink(info) => {
768 assert_eq!(info.name, "output");
769 assert_eq!(info.from, "events");
770 }
771 _ => panic!("Expected RegisterSink plan"),
772 }
773 }
774
775 #[test]
776 fn test_plan_duplicate_source() {
777 let mut planner = StreamingPlanner::new();
778
779 let statements =
781 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
782 planner.plan(&statements[0]).unwrap();
783
784 let result = planner.plan(&statements[0]);
786 assert!(result.is_err());
787 }
788
789 #[test]
790 fn test_plan_source_if_not_exists() {
791 let mut planner = StreamingPlanner::new();
792
793 let statements =
795 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
796 planner.plan(&statements[0]).unwrap();
797
798 let statements =
800 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
801 .unwrap();
802 let result = planner.plan(&statements[0]);
803 assert!(result.is_ok());
804 }
805
806 #[test]
807 fn test_plan_source_or_replace() {
808 let mut planner = StreamingPlanner::new();
809
810 let statements =
812 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
813 planner.plan(&statements[0]).unwrap();
814
815 let statements =
817 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
818 .unwrap();
819 let result = planner.plan(&statements[0]);
820 assert!(result.is_ok());
821 }
822
823 #[test]
824 fn test_plan_source_with_watermark() {
825 let mut planner = StreamingPlanner::new();
826 let statements = StreamingParser::parse_sql(
827 "CREATE SOURCE events (
828 id INT,
829 ts TIMESTAMP,
830 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
831 )",
832 )
833 .unwrap();
834
835 let plan = planner.plan(&statements[0]).unwrap();
836 match plan {
837 StreamingPlan::RegisterSource(info) => {
838 assert_eq!(info.name, "events");
839 assert_eq!(info.watermark_column, Some("ts".to_string()));
840 }
841 _ => panic!("Expected RegisterSource plan"),
842 }
843 }
844
845 #[test]
846 fn test_plan_standard_select() {
847 let mut planner = StreamingPlanner::new();
848 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
849
850 let plan = planner.plan(&statements[0]).unwrap();
851 match plan {
852 StreamingPlan::Standard(_) => {}
853 _ => panic!("Expected Standard plan for simple SELECT"),
854 }
855 }
856
857 #[test]
858 fn test_list_sources_and_sinks() {
859 let mut planner = StreamingPlanner::new();
860
861 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
863 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
864 planner.plan(&s1[0]).unwrap();
865 planner.plan(&s2[0]).unwrap();
866
867 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
869 planner.plan(&k1[0]).unwrap();
870
871 assert_eq!(planner.list_sources().len(), 2);
872 assert_eq!(planner.list_sinks().len(), 1);
873 assert!(planner.get_source("src1").is_some());
874 assert!(planner.get_sink("sink1").is_some());
875 }
876
877 #[test]
878 fn test_plan_query_with_window() {
879 let mut planner = StreamingPlanner::new();
880 let statements = StreamingParser::parse_sql(
881 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
882 )
883 .unwrap();
884
885 let plan = planner.plan(&statements[0]).unwrap();
886 match plan {
887 StreamingPlan::Query(query_plan) => {
888 assert!(query_plan.window_config.is_some());
889 let config = query_plan.window_config.unwrap();
890 assert_eq!(config.time_column, "event_time");
891 assert_eq!(config.size.as_secs(), 300);
892 }
893 _ => panic!("Expected Query plan"),
894 }
895 }
896
897 #[test]
898 fn test_plan_query_with_join() {
899 let mut planner = StreamingPlanner::new();
900 let statements = StreamingParser::parse_sql(
901 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id \
902 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR",
903 )
904 .unwrap();
905
906 let plan = planner.plan(&statements[0]).unwrap();
907 match plan {
908 StreamingPlan::Query(query_plan) => {
909 assert!(query_plan.join_config.is_some());
910 let configs = query_plan.join_config.unwrap();
911 assert_eq!(configs.len(), 1);
912 assert_eq!(configs[0].left_key(), "order_id");
913 assert_eq!(configs[0].right_key(), "order_id");
914 }
915 _ => panic!("Expected Query plan"),
916 }
917 }
918
919 #[test]
920 fn test_plan_rejects_unbounded_streaming_join() {
921 let mut planner = StreamingPlanner::new();
922 let statements = StreamingParser::parse_sql(
923 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
924 )
925 .unwrap();
926 let err = planner.plan(&statements[0]).unwrap_err();
927 let msg = format!("{err}");
928 assert!(msg.contains("unbounded join"), "got: {msg}");
929 assert!(msg.contains("lookup table"), "got: {msg}");
930 }
931
932 #[test]
933 fn test_plan_allows_join_between_windowed_views() {
934 let mut planner = StreamingPlanner::new();
935 for sql in [
937 "CREATE STREAM price_1m AS SELECT TUMBLE(ts, INTERVAL '1' MINUTE) AS bucket, \
938 AVG(p) AS price FROM trades GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)",
939 "CREATE STREAM sent_1m AS SELECT TUMBLE(ts, INTERVAL '1' MINUTE) AS bucket, \
940 AVG(s) AS ms FROM posts GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)",
941 ] {
942 let st = StreamingParser::parse_sql(sql).unwrap();
943 planner.plan(&st[0]).unwrap();
944 }
945 let st = StreamingParser::parse_sql(
948 "CREATE STREAM joined AS SELECT a.bucket, a.price, b.ms \
949 FROM price_1m a JOIN sent_1m b ON a.bucket = b.bucket",
950 )
951 .unwrap();
952 match planner.plan(&st[0]).unwrap() {
953 StreamingPlan::Query(qp) => {
954 assert!(
955 qp.join_config.is_none(),
956 "windowed-view join carries no join_config"
957 );
958 }
959 other => panic!("expected a Query plan, got {other:?}"),
960 }
961 }
962
963 #[test]
964 fn test_plan_query_with_lag() {
965 let mut planner = StreamingPlanner::new();
966 let statements = StreamingParser::parse_sql(
967 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
968 )
969 .unwrap();
970
971 let plan = planner.plan(&statements[0]).unwrap();
972 match plan {
973 StreamingPlan::Query(query_plan) => {
974 assert!(query_plan.analytic_config.is_some());
975 let config = query_plan.analytic_config.unwrap();
976 assert_eq!(config.functions.len(), 1);
977 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
978 }
979 _ => panic!("Expected Query plan with analytic config"),
980 }
981 }
982
983 #[test]
984 fn test_plan_query_with_having() {
985 let mut planner = StreamingPlanner::new();
986 let statements = StreamingParser::parse_sql(
987 "SELECT symbol, COUNT(*) AS cnt FROM trades \
988 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
989 HAVING COUNT(*) > 10",
990 )
991 .unwrap();
992
993 let plan = planner.plan(&statements[0]).unwrap();
994 match plan {
995 StreamingPlan::Query(query_plan) => {
996 assert!(query_plan.window_config.is_some());
997 assert!(query_plan.having_config.is_some());
998 let config = query_plan.having_config.unwrap();
999 assert!(
1000 config.predicate().contains("COUNT(*)"),
1001 "predicate was: {}",
1002 config.predicate()
1003 );
1004 }
1005 _ => panic!("Expected Query plan with having config"),
1006 }
1007 }
1008
1009 #[test]
1010 fn test_plan_query_without_having() {
1011 let mut planner = StreamingPlanner::new();
1012 let statements = StreamingParser::parse_sql(
1013 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
1014 )
1015 .unwrap();
1016
1017 let plan = planner.plan(&statements[0]).unwrap();
1018 match plan {
1019 StreamingPlan::Query(query_plan) => {
1020 assert!(query_plan.having_config.is_none());
1021 }
1022 _ => panic!("Expected Query plan"),
1023 }
1024 }
1025
1026 #[test]
1027 fn test_plan_having_only_produces_query_plan() {
1028 let mut planner = StreamingPlanner::new();
1030 let statements = StreamingParser::parse_sql(
1031 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
1032 )
1033 .unwrap();
1034
1035 let plan = planner.plan(&statements[0]).unwrap();
1036 match plan {
1037 StreamingPlan::Query(query_plan) => {
1038 assert!(query_plan.having_config.is_some());
1039 assert!(query_plan.window_config.is_none());
1040 }
1041 _ => panic!("Expected Query plan for HAVING-only query"),
1042 }
1043 }
1044
1045 #[test]
1046 fn test_plan_having_compound_predicate() {
1047 let mut planner = StreamingPlanner::new();
1048 let statements = StreamingParser::parse_sql(
1049 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
1050 FROM trades GROUP BY symbol \
1051 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
1052 )
1053 .unwrap();
1054
1055 let plan = planner.plan(&statements[0]).unwrap();
1056 match plan {
1057 StreamingPlan::Query(query_plan) => {
1058 let config = query_plan.having_config.unwrap();
1059 let pred = config.predicate();
1060 assert!(pred.contains("AND"), "predicate was: {pred}");
1061 }
1062 _ => panic!("Expected Query plan"),
1063 }
1064 }
1065
1066 #[test]
1067 fn test_plan_query_with_lead() {
1068 let mut planner = StreamingPlanner::new();
1069 let statements = StreamingParser::parse_sql(
1070 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
1071 )
1072 .unwrap();
1073
1074 let plan = planner.plan(&statements[0]).unwrap();
1075 match plan {
1076 StreamingPlan::Query(query_plan) => {
1077 assert!(query_plan.analytic_config.is_some());
1078 let config = query_plan.analytic_config.unwrap();
1079 assert!(config.has_lookahead());
1080 assert_eq!(config.functions[0].offset, 2);
1081 }
1082 _ => panic!("Expected Query plan with analytic config"),
1083 }
1084 }
1085
1086 #[test]
1089 fn test_plan_single_join_produces_vec_of_one() {
1090 let mut planner = StreamingPlanner::new();
1091 let statements = StreamingParser::parse_sql(
1092 "SELECT * FROM a JOIN b ON a.id = b.a_id \
1093 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR",
1094 )
1095 .unwrap();
1096
1097 let plan = planner.plan(&statements[0]).unwrap();
1098 match plan {
1099 StreamingPlan::Query(qp) => {
1100 let configs = qp.join_config.unwrap();
1101 assert_eq!(configs.len(), 1);
1102 }
1103 _ => panic!("Expected Query plan"),
1104 }
1105 }
1106
1107 #[test]
1108 fn test_plan_two_way_join() {
1109 let mut planner = StreamingPlanner::new();
1110 let statements = StreamingParser::parse_sql(
1111 "SELECT * FROM a JOIN b ON a.id = b.a_id \
1112 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR \
1113 JOIN c ON b.id = c.b_id \
1114 AND c.ts BETWEEN b.ts AND b.ts + INTERVAL '1' HOUR",
1115 )
1116 .unwrap();
1117
1118 let plan = planner.plan(&statements[0]).unwrap();
1119 match plan {
1120 StreamingPlan::Query(qp) => {
1121 let configs = qp.join_config.unwrap();
1122 assert_eq!(configs.len(), 2);
1123 assert_eq!(configs[0].left_key(), "id");
1124 assert_eq!(configs[0].right_key(), "a_id");
1125 assert_eq!(configs[1].left_key(), "id");
1126 assert_eq!(configs[1].right_key(), "b_id");
1127 }
1128 _ => panic!("Expected Query plan"),
1129 }
1130 }
1131
1132 #[test]
1133 fn test_plan_mixed_join_types() {
1134 let mut planner = StreamingPlanner::new();
1135 let _ = planner.plan(
1139 &StreamingParser::parse_sql(
1140 "CREATE LOOKUP TABLE customers (id BIGINT NOT NULL, name VARCHAR, \
1141 PRIMARY KEY (id)) WITH (connector = 'parquet', path = '/tmp/x.parquet')",
1142 )
1143 .unwrap()[0],
1144 );
1145 let statements = StreamingParser::parse_sql(
1146 "SELECT * FROM orders o \
1147 JOIN payments p ON o.id = p.order_id \
1148 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1149 JOIN customers c ON p.cust_id = c.id",
1150 )
1151 .unwrap();
1152
1153 let plan = planner.plan(&statements[0]).unwrap();
1154 match plan {
1155 StreamingPlan::Query(qp) => {
1156 let configs = qp.join_config.unwrap();
1157 assert_eq!(configs.len(), 2);
1158 assert!(configs[0].is_stream_stream());
1159 assert!(configs[1].is_lookup());
1160 }
1161 _ => panic!("Expected Query plan"),
1162 }
1163 }
1164
1165 #[test]
1166 fn test_plan_backward_compat_no_join() {
1167 let mut planner = StreamingPlanner::new();
1168 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1169
1170 let plan = planner.plan(&statements[0]).unwrap();
1171 match plan {
1172 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1174 }
1175 }
1176
1177 #[test]
1180 fn test_plan_query_with_rows_frame() {
1181 let mut planner = StreamingPlanner::new();
1182 let statements = StreamingParser::parse_sql(
1183 "SELECT AVG(price) OVER (ORDER BY ts \
1184 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1185 )
1186 .unwrap();
1187
1188 let plan = planner.plan(&statements[0]).unwrap();
1189 match plan {
1190 StreamingPlan::Query(qp) => {
1191 assert!(qp.frame_config.is_some());
1192 let fc = qp.frame_config.unwrap();
1193 assert_eq!(fc.functions.len(), 1);
1194 assert_eq!(fc.functions[0].source_column, "price");
1195 }
1196 _ => panic!("Expected Query plan with frame_config"),
1197 }
1198 }
1199
1200 #[test]
1201 fn test_plan_frame_with_partition() {
1202 let mut planner = StreamingPlanner::new();
1203 let statements = StreamingParser::parse_sql(
1204 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1205 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1206 )
1207 .unwrap();
1208
1209 let plan = planner.plan(&statements[0]).unwrap();
1210 match plan {
1211 StreamingPlan::Query(qp) => {
1212 let fc = qp.frame_config.unwrap();
1213 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1214 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1215 }
1216 _ => panic!("Expected Query plan with frame_config"),
1217 }
1218 }
1219
1220 #[test]
1221 fn test_plan_no_frame_is_standard() {
1222 let mut planner = StreamingPlanner::new();
1223 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1224
1225 let plan = planner.plan(&statements[0]).unwrap();
1226 match plan {
1227 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1229 }
1230 }
1231
1232 #[test]
1233 fn test_plan_unbounded_following_rejected() {
1234 let mut planner = StreamingPlanner::new();
1235 let statements = StreamingParser::parse_sql(
1236 "SELECT SUM(amount) OVER (ORDER BY id \
1237 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1238 FROM orders",
1239 )
1240 .unwrap();
1241
1242 let result = planner.plan(&statements[0]);
1243 assert!(result.is_err());
1244 let err = result.unwrap_err().to_string();
1245 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1246 }
1247}