Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8/// Optimizer rules for lookup join rewriting.
9pub mod lookup_join;
10/// Predicate splitting and pushdown for lookup joins.
11pub mod predicate_split;
12/// Physical optimizer rule for streaming plan validation.
13pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] // cold path: query planning
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26    analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::{analyze_joins, MultiJoinAnalysis};
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32    CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33    StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36    AnalyticWindowConfig, HavingFilterConfig, JoinOperatorConfig, OrderOperatorConfig,
37    WindowFrameConfig, WindowOperatorConfig,
38};
39
40/// Information about a registered lookup table.
41#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43    /// Table name.
44    pub name: String,
45    /// Column names and types.
46    pub columns: Vec<(String, String)>,
47    /// Primary key columns.
48    pub primary_key: Vec<String>,
49    /// Validated properties.
50    pub properties: LookupTableProperties,
51    /// Pre-computed Arrow schema from column definitions.
52    pub arrow_schema: SchemaRef,
53    /// Raw WITH options for connector configuration pass-through.
54    pub raw_options: HashMap<String, String>,
55}
56
57/// Streaming query planner
58pub struct StreamingPlanner {
59    /// Registered sources
60    sources: HashMap<String, SourceInfo>,
61    /// Registered sinks
62    sinks: HashMap<String, SinkInfo>,
63    /// Registered lookup tables
64    lookup_tables: HashMap<String, LookupTableInfo>,
65    /// Names of windowed views/streams (created with a windowed aggregation).
66    /// Their output is bounded per window close, so an equi-join between two of
67    /// them is a processing-time batch join, not an unbounded stream-stream join.
68    windowed_views: std::collections::HashSet<String>,
69}
70
71/// Information about a registered source
72#[derive(Debug, Clone)]
73pub struct SourceInfo {
74    /// Source name
75    pub name: String,
76    /// Watermark column (if configured)
77    pub watermark_column: Option<String>,
78    /// Connector options
79    pub options: HashMap<String, String>,
80}
81
82/// Information about a registered sink
83#[derive(Debug, Clone)]
84pub struct SinkInfo {
85    /// Sink name
86    pub name: String,
87    /// Source table or query name
88    pub from: String,
89    /// Connector options
90    pub options: HashMap<String, String>,
91}
92
93/// Result of planning a streaming statement
94#[derive(Debug)]
95#[allow(clippy::large_enum_variant)]
96pub enum StreamingPlan {
97    /// Source registration (DDL)
98    RegisterSource(SourceInfo),
99
100    /// Sink registration (DDL)
101    RegisterSink(SinkInfo),
102
103    /// Query plan with streaming configurations
104    Query(QueryPlan),
105
106    /// Standard SQL statement (pass-through to DataFusion)
107    Standard(Box<Statement>),
108
109    /// Lookup table registration (DDL)
110    RegisterLookupTable(LookupTableInfo),
111
112    /// Drop a lookup table
113    DropLookupTable {
114        /// Name of the lookup table to drop.
115        name: String,
116    },
117}
118
119/// A query plan with streaming operator configurations
120#[derive(Debug)]
121pub struct QueryPlan {
122    /// Optional name for the continuous query
123    pub name: Option<String>,
124    /// Window configuration if the query has windowed aggregation
125    pub window_config: Option<WindowOperatorConfig>,
126    /// Join configuration(s) if the query has joins (one per join step)
127    pub join_config: Option<Vec<JoinOperatorConfig>>,
128    /// ORDER BY configuration if the query has ordering
129    pub order_config: Option<OrderOperatorConfig>,
130    /// Analytic window function configuration (LAG/LEAD/etc.)
131    pub analytic_config: Option<AnalyticWindowConfig>,
132    /// HAVING clause filter configuration
133    pub having_config: Option<HavingFilterConfig>,
134    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
135    pub frame_config: Option<WindowFrameConfig>,
136    /// Emit strategy
137    pub emit_clause: Option<EmitClause>,
138    /// The underlying SQL statement
139    pub statement: Box<Statement>,
140}
141
142impl StreamingPlanner {
143    /// Creates a new streaming planner
144    #[must_use]
145    pub fn new() -> Self {
146        Self {
147            sources: HashMap::new(),
148            sinks: HashMap::new(),
149            lookup_tables: HashMap::new(),
150            windowed_views: std::collections::HashSet::new(),
151        }
152    }
153
154    /// Plans a streaming statement.
155    ///
156    /// # Errors
157    ///
158    /// Returns `PlanningError` if the statement cannot be planned.
159    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
160        match statement {
161            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
162            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
163            StreamingStatement::CreateContinuousQuery {
164                name,
165                query,
166                emit_clause,
167                ..
168            }
169            | StreamingStatement::CreateStream {
170                name,
171                query,
172                emit_clause,
173                ..
174            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
175            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
176            StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
177            StreamingStatement::DropLookupTable { name, if_exists } => {
178                self.plan_drop_lookup_table(name, *if_exists)
179            }
180            StreamingStatement::DropSource { .. }
181            | StreamingStatement::DropSink { .. }
182            | StreamingStatement::DropStream { .. }
183            | StreamingStatement::DropMaterializedView { .. }
184            | StreamingStatement::Show(_)
185            | StreamingStatement::Describe { .. }
186            | StreamingStatement::Explain { .. }
187            | StreamingStatement::CreateMaterializedView { .. }
188            | StreamingStatement::InsertInto { .. }
189            | StreamingStatement::AlterSource { .. }
190            | StreamingStatement::Checkpoint
191            | StreamingStatement::RestoreCheckpoint { .. }
192            | StreamingStatement::Subscribe(_)
193            | StreamingStatement::DeclareCursorForSubscribe { .. } => {
194                // These statements are handled directly by the database facade
195                // and don't need query planning. Return as Standard pass-through.
196                Err(PlanningError::UnsupportedSql(format!(
197                    "Statement type {:?} is handled by the database layer, not the planner",
198                    std::mem::discriminant(statement)
199                )))
200            }
201        }
202    }
203
204    /// Plans a CREATE SOURCE statement.
205    fn plan_create_source(
206        &mut self,
207        source: &CreateSourceStatement,
208    ) -> Result<StreamingPlan, PlanningError> {
209        let name = object_name_to_string(&source.name);
210
211        // Check for existing source
212        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
213            return Err(PlanningError::InvalidQuery(format!(
214                "Source '{}' already exists",
215                name
216            )));
217        }
218
219        // Extract watermark column
220        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
221
222        let info = SourceInfo {
223            name: name.clone(),
224            watermark_column,
225            options: source.with_options.clone(),
226        };
227
228        // Register the source
229        self.sources.insert(name, info.clone());
230
231        Ok(StreamingPlan::RegisterSource(info))
232    }
233
234    /// Plans a CREATE SINK statement.
235    fn plan_create_sink(
236        &mut self,
237        sink: &CreateSinkStatement,
238    ) -> Result<StreamingPlan, PlanningError> {
239        let name = object_name_to_string(&sink.name);
240
241        // Check for existing sink
242        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
243            return Err(PlanningError::InvalidQuery(format!(
244                "Sink '{}' already exists",
245                name
246            )));
247        }
248
249        // Determine the source
250        let from = match &sink.from {
251            SinkFrom::Table(table) => object_name_to_string(table),
252            SinkFrom::Query(_) => format!("{}_query", name),
253        };
254
255        let info = SinkInfo {
256            name: name.clone(),
257            from,
258            options: sink.with_options.clone(),
259        };
260
261        // Register the sink
262        self.sinks.insert(name, info.clone());
263
264        Ok(StreamingPlan::RegisterSink(info))
265    }
266
267    /// Plans a CREATE CONTINUOUS QUERY statement.
268    fn plan_continuous_query(
269        &mut self,
270        name: &ObjectName,
271        query: &StreamingStatement,
272        emit_clause: Option<&EmitClause>,
273    ) -> Result<StreamingPlan, PlanningError> {
274        // The query inside should be a standard SELECT
275        let stmt = match query {
276            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
277            _ => {
278                return Err(PlanningError::InvalidQuery(
279                    "Continuous query must contain a SELECT statement".to_string(),
280                ))
281            }
282        };
283
284        // Analyze the query for streaming features
285        let query_plan = self.analyze_query(&stmt, emit_clause)?;
286
287        // A windowed aggregation is a bounded input to a processing-time join.
288        // Insert *and* remove so a CREATE OR REPLACE that makes the view
289        // non-windowed clears the stale entry (else a join on it skips the guard).
290        let view_name = object_name_to_string(name);
291        if query_plan.window_config.is_some() {
292            self.windowed_views.insert(view_name);
293        } else {
294            self.windowed_views.remove(&view_name);
295        }
296
297        Ok(StreamingPlan::Query(QueryPlan {
298            name: Some(object_name_to_string(name)),
299            window_config: query_plan.window_config,
300            join_config: query_plan.join_config,
301            order_config: query_plan.order_config,
302            analytic_config: query_plan.analytic_config,
303            having_config: query_plan.having_config,
304            frame_config: query_plan.frame_config,
305            emit_clause: emit_clause.cloned(),
306            statement: Box::new(stmt),
307        }))
308    }
309
310    /// Plans a standard SQL statement.
311    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
312    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
313        // Check if it's a query that might have streaming features
314        if let Statement::Query(query) = stmt {
315            if let SetExpr::Select(select) = query.body.as_ref() {
316                // Check for window functions in GROUP BY
317                let window_function = Self::extract_window_from_select(select);
318
319                // Check for joins (multi-way)
320                let join_analysis = analyze_joins(select).map_err(|e| {
321                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
322                })?;
323
324                if let Some(ref multi) = join_analysis {
325                    reject_unbounded_streaming_join(
326                        multi,
327                        &self.lookup_tables,
328                        &self.windowed_views,
329                    )?;
330                }
331
332                // Check for ORDER BY
333                let order_analysis = analyze_order_by(stmt);
334                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
335                    .map_err(PlanningError::InvalidQuery)?;
336
337                // Check for analytic functions (LAG/LEAD/etc.)
338                let analytic_analysis = analyze_analytic_functions(stmt);
339                let analytic_config =
340                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
341
342                // Check for HAVING clause
343                let agg_analysis = analyze_aggregates(stmt);
344                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
345
346                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
347                let frame_analysis = analyze_window_frames(stmt);
348                let frame_config = frame_analysis
349                    .as_ref()
350                    .map(WindowFrameConfig::from_analysis);
351
352                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
353                if let Some(fa) = &frame_analysis {
354                    for f in &fa.functions {
355                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
356                            return Err(PlanningError::InvalidQuery(
357                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
358                                    .to_string(),
359                            ));
360                        }
361                    }
362                }
363
364                let has_streaming_features = window_function.is_some()
365                    || join_analysis.is_some()
366                    || order_config.is_some()
367                    || analytic_config.is_some()
368                    || having_config.is_some()
369                    || frame_config.is_some();
370
371                if has_streaming_features {
372                    let window_config = match window_function {
373                        Some(w) => Some(
374                            WindowOperatorConfig::from_window_function(&w)
375                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
376                        ),
377                        None => None,
378                    };
379
380                    // A windowed-view equi-join carries no join_config: it routes
381                    // to the processing-time join, re-detected in `add_query`.
382                    // (A config here would suppress that detection.)
383                    let join_config = join_analysis
384                        .filter(|m| !is_windowed_view_join(m, &self.windowed_views))
385                        .map(|m| JoinOperatorConfig::from_multi_analysis(&m));
386
387                    return Ok(StreamingPlan::Query(QueryPlan {
388                        name: None,
389                        window_config,
390                        join_config,
391                        order_config,
392                        analytic_config,
393                        having_config,
394                        frame_config,
395                        emit_clause: None,
396                        statement: Box::new(stmt.clone()),
397                    }));
398                }
399            }
400        }
401
402        // Pass through standard SQL
403        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
404    }
405
406    /// Analyzes a query for streaming features.
407    fn analyze_query(
408        &self,
409        stmt: &Statement,
410        emit_clause: Option<&EmitClause>,
411    ) -> Result<QueryAnalysis, PlanningError> {
412        let mut analysis = QueryAnalysis::default();
413
414        if let Statement::Query(query) = stmt {
415            if let SetExpr::Select(select) = query.body.as_ref() {
416                // Extract window function
417                if let Some(window) = Self::extract_window_from_select(select) {
418                    let mut config = WindowOperatorConfig::from_window_function(&window)
419                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
420
421                    // Apply emit clause if present
422                    if let Some(emit) = emit_clause {
423                        config = config
424                            .with_emit_clause(emit)
425                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
426                    }
427
428                    analysis.window_config = Some(config);
429                }
430
431                // Extract join info (multi-way)
432                if let Some(multi) = analyze_joins(select).map_err(|e| {
433                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
434                })? {
435                    reject_unbounded_streaming_join(
436                        &multi,
437                        &self.lookup_tables,
438                        &self.windowed_views,
439                    )?;
440                    // A windowed-view equi-join routes to the processing-time join
441                    // (re-detected in `add_query`); a config here would suppress it.
442                    if !is_windowed_view_join(&multi, &self.windowed_views) {
443                        analysis.join_config =
444                            Some(JoinOperatorConfig::from_multi_analysis(&multi));
445                    }
446                }
447            }
448        }
449
450        // Extract ORDER BY info
451        let order_analysis = analyze_order_by(stmt);
452        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
453            .map_err(PlanningError::InvalidQuery)?;
454
455        // Extract analytic function info (LAG/LEAD/etc.)
456        if let Some(analytic) = analyze_analytic_functions(stmt) {
457            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
458        }
459
460        // Extract HAVING clause
461        let agg_analysis = analyze_aggregates(stmt);
462        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
463
464        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
465        if let Some(frame_analysis) = analyze_window_frames(stmt) {
466            // Validate: reject UNBOUNDED FOLLOWING
467            for f in &frame_analysis.functions {
468                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
469                    return Err(PlanningError::InvalidQuery(
470                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
471                            .to_string(),
472                    ));
473                }
474            }
475            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
476        }
477
478        Ok(analysis)
479    }
480
481    /// Extracts window function from a SELECT.
482    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
483        // Check GROUP BY for window functions
484        use sqlparser::ast::GroupByExpr;
485        match &select.group_by {
486            GroupByExpr::Expressions(exprs, _modifiers) => {
487                for group_by_expr in exprs {
488                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
489                    {
490                        return Some(window);
491                    }
492                }
493            }
494            GroupByExpr::All(_) => {}
495        }
496        None
497    }
498
499    /// Plans a CREATE LOOKUP TABLE statement.
500    fn plan_create_lookup_table(
501        &mut self,
502        lt: &CreateLookupTableStatement,
503    ) -> Result<StreamingPlan, PlanningError> {
504        let name = object_name_to_string(&lt.name);
505
506        if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
507            return Err(PlanningError::InvalidQuery(format!(
508                "Lookup table '{}' already exists",
509                name
510            )));
511        }
512
513        let columns: Vec<(String, String)> = lt
514            .columns
515            .iter()
516            .map(|c| (c.name.value.clone(), c.data_type.to_string()))
517            .collect();
518
519        let properties = validate_properties(&lt.with_options).map_err(|e| {
520            PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
521        })?;
522
523        // Compute Arrow schema from column definitions
524        let arrow_fields: Vec<Field> = lt
525            .columns
526            .iter()
527            .map(|c| {
528                let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
529                    .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
530                let nullable = !c
531                    .options
532                    .iter()
533                    .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
534                Ok(Field::new(&c.name.value, dt, nullable))
535            })
536            .collect::<Result<_, PlanningError>>()?;
537        let arrow_schema = Arc::new(Schema::new(arrow_fields));
538
539        let info = LookupTableInfo {
540            name: name.clone(),
541            columns,
542            primary_key: lt.primary_key.clone(),
543            properties,
544            arrow_schema,
545            raw_options: lt.with_options.clone(),
546        };
547
548        self.lookup_tables.insert(name, info.clone());
549
550        Ok(StreamingPlan::RegisterLookupTable(info))
551    }
552
553    /// Plans a DROP LOOKUP TABLE statement.
554    fn plan_drop_lookup_table(
555        &mut self,
556        name: &ObjectName,
557        if_exists: bool,
558    ) -> Result<StreamingPlan, PlanningError> {
559        let name_str = object_name_to_string(name);
560
561        if !if_exists && !self.lookup_tables.contains_key(&name_str) {
562            return Err(PlanningError::InvalidQuery(format!(
563                "Lookup table '{}' does not exist",
564                name_str
565            )));
566        }
567
568        self.lookup_tables.remove(&name_str);
569
570        Ok(StreamingPlan::DropLookupTable { name: name_str })
571    }
572
573    /// Gets a registered source by name.
574    #[must_use]
575    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
576        self.sources.get(name)
577    }
578
579    /// Gets a registered sink by name.
580    #[must_use]
581    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
582        self.sinks.get(name)
583    }
584
585    /// Lists all registered sources.
586    #[must_use]
587    pub fn list_sources(&self) -> Vec<&SourceInfo> {
588        self.sources.values().collect()
589    }
590
591    /// Lists all registered sinks.
592    #[must_use]
593    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
594        self.sinks.values().collect()
595    }
596
597    /// Gets a registered lookup table by name.
598    #[must_use]
599    pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
600        self.lookup_tables.get(name)
601    }
602
603    /// Lists all registered lookup tables.
604    #[must_use]
605    pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
606        self.lookup_tables.values().collect()
607    }
608
609    /// Returns a clone of the lookup tables map for optimizer rule construction.
610    #[must_use]
611    pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
612        self.lookup_tables.clone()
613    }
614
615    /// Converts a query plan's SQL statement into a `DataFusion`
616    /// `LogicalPlan`. Window UDFs (TUMBLE, HOP, SESSION) must be registered
617    /// on `ctx` via
618    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
619    /// for windowed queries to resolve correctly.
620    ///
621    /// # Errors
622    ///
623    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
624    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
625    pub async fn to_logical_plan(
626        &self,
627        plan: &QueryPlan,
628        ctx: &SessionContext,
629    ) -> Result<LogicalPlan, PlanningError> {
630        // Convert the AST statement back to SQL and let DataFusion re-parse
631        // it with its own sqlparser version. This avoids version mismatches
632        // between our sqlparser (0.60) and DataFusion's (0.59).
633        let sql = plan.statement.to_string();
634        ctx.state()
635            .create_logical_plan(&sql)
636            .await
637            .map_err(PlanningError::DataFusion)
638    }
639}
640
641impl Default for StreamingPlanner {
642    fn default() -> Self {
643        Self::new()
644    }
645}
646
647/// Intermediate query analysis result
648#[derive(Debug, Default)]
649#[allow(clippy::struct_field_names)]
650struct QueryAnalysis {
651    window_config: Option<WindowOperatorConfig>,
652    join_config: Option<Vec<JoinOperatorConfig>>,
653    order_config: Option<OrderOperatorConfig>,
654    analytic_config: Option<AnalyticWindowConfig>,
655    having_config: Option<HavingFilterConfig>,
656    frame_config: Option<WindowFrameConfig>,
657}
658
659/// Helper to convert `ObjectName` to String
660fn object_name_to_string(name: &ObjectName) -> String {
661    name.to_string()
662}
663
664/// Reject unbounded joins between two streaming sources.
665fn reject_unbounded_streaming_join(
666    multi: &MultiJoinAnalysis,
667    lookup_tables: &HashMap<String, LookupTableInfo>,
668    windowed_views: &std::collections::HashSet<String>,
669) -> Result<(), PlanningError> {
670    for step in &multi.joins {
671        if step.is_bounded() {
672            continue;
673        }
674        let left_lookup = lookup_tables.contains_key(&step.left_table);
675        let right_lookup = lookup_tables.contains_key(&step.right_table);
676        // A join between two windowed views is bounded per cycle (each emits
677        // aligned closed windows) — it runs as a processing-time batch join.
678        let both_windowed =
679            windowed_views.contains(&step.left_table) && windowed_views.contains(&step.right_table);
680        if !left_lookup && !right_lookup && !both_windowed {
681            return Err(PlanningError::InvalidQuery(format!(
682                "unbounded join between streaming sources '{}' and '{}'; \
683                 add a temporal predicate or use a lookup table",
684                step.left_table, step.right_table,
685            )));
686        }
687    }
688    Ok(())
689}
690
691/// A single non-temporal equi-join between two windowed views — the
692/// processing-time batch-join shape (`create_operator` builds it from the
693/// re-detected config, so its `join_config` is left empty here).
694fn is_windowed_view_join(
695    multi: &MultiJoinAnalysis,
696    windowed_views: &std::collections::HashSet<String>,
697) -> bool {
698    multi.joins.len() == 1
699        && multi.joins.iter().all(|j| {
700            !j.is_bounded()
701                && windowed_views.contains(&j.left_table)
702                && windowed_views.contains(&j.right_table)
703        })
704}
705
706/// Planning errors
707#[derive(Debug, thiserror::Error)]
708pub enum PlanningError {
709    /// Unsupported SQL feature
710    UnsupportedSql(String),
711
712    /// Invalid query
713    InvalidQuery(String),
714
715    /// Source not found
716    SourceNotFound(String),
717
718    /// Sink not found
719    SinkNotFound(String),
720
721    /// `DataFusion` error during logical plan creation (translated on display)
722    DataFusion(#[from] datafusion_common::DataFusionError),
723}
724
725impl std::fmt::Display for PlanningError {
726    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727        match self {
728            Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
729            Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
730            Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
731            Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
732            Self::DataFusion(e) => {
733                let translated = crate::error::translate_datafusion_error(&e.to_string());
734                write!(f, "{translated}")
735            }
736        }
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use super::*;
743    use crate::parser::StreamingParser;
744
745    #[test]
746    fn test_plan_create_source() {
747        let mut planner = StreamingPlanner::new();
748        let statements =
749            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
750
751        let plan = planner.plan(&statements[0]).unwrap();
752        match plan {
753            StreamingPlan::RegisterSource(info) => {
754                assert_eq!(info.name, "events");
755            }
756            _ => panic!("Expected RegisterSource plan"),
757        }
758    }
759
760    #[test]
761    fn test_plan_create_sink() {
762        let mut planner = StreamingPlanner::new();
763        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
764
765        let plan = planner.plan(&statements[0]).unwrap();
766        match plan {
767            StreamingPlan::RegisterSink(info) => {
768                assert_eq!(info.name, "output");
769                assert_eq!(info.from, "events");
770            }
771            _ => panic!("Expected RegisterSink plan"),
772        }
773    }
774
775    #[test]
776    fn test_plan_duplicate_source() {
777        let mut planner = StreamingPlanner::new();
778
779        // First source
780        let statements =
781            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
782        planner.plan(&statements[0]).unwrap();
783
784        // Duplicate should fail
785        let result = planner.plan(&statements[0]);
786        assert!(result.is_err());
787    }
788
789    #[test]
790    fn test_plan_source_if_not_exists() {
791        let mut planner = StreamingPlanner::new();
792
793        // First source
794        let statements =
795            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
796        planner.plan(&statements[0]).unwrap();
797
798        // IF NOT EXISTS should succeed
799        let statements =
800            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
801                .unwrap();
802        let result = planner.plan(&statements[0]);
803        assert!(result.is_ok());
804    }
805
806    #[test]
807    fn test_plan_source_or_replace() {
808        let mut planner = StreamingPlanner::new();
809
810        // First source
811        let statements =
812            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
813        planner.plan(&statements[0]).unwrap();
814
815        // OR REPLACE should succeed
816        let statements =
817            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
818                .unwrap();
819        let result = planner.plan(&statements[0]);
820        assert!(result.is_ok());
821    }
822
823    #[test]
824    fn test_plan_source_with_watermark() {
825        let mut planner = StreamingPlanner::new();
826        let statements = StreamingParser::parse_sql(
827            "CREATE SOURCE events (
828                id INT,
829                ts TIMESTAMP,
830                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
831            )",
832        )
833        .unwrap();
834
835        let plan = planner.plan(&statements[0]).unwrap();
836        match plan {
837            StreamingPlan::RegisterSource(info) => {
838                assert_eq!(info.name, "events");
839                assert_eq!(info.watermark_column, Some("ts".to_string()));
840            }
841            _ => panic!("Expected RegisterSource plan"),
842        }
843    }
844
845    #[test]
846    fn test_plan_standard_select() {
847        let mut planner = StreamingPlanner::new();
848        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
849
850        let plan = planner.plan(&statements[0]).unwrap();
851        match plan {
852            StreamingPlan::Standard(_) => {}
853            _ => panic!("Expected Standard plan for simple SELECT"),
854        }
855    }
856
857    #[test]
858    fn test_list_sources_and_sinks() {
859        let mut planner = StreamingPlanner::new();
860
861        // Create sources
862        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
863        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
864        planner.plan(&s1[0]).unwrap();
865        planner.plan(&s2[0]).unwrap();
866
867        // Create sinks
868        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
869        planner.plan(&k1[0]).unwrap();
870
871        assert_eq!(planner.list_sources().len(), 2);
872        assert_eq!(planner.list_sinks().len(), 1);
873        assert!(planner.get_source("src1").is_some());
874        assert!(planner.get_sink("sink1").is_some());
875    }
876
877    #[test]
878    fn test_plan_query_with_window() {
879        let mut planner = StreamingPlanner::new();
880        let statements = StreamingParser::parse_sql(
881            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
882        )
883        .unwrap();
884
885        let plan = planner.plan(&statements[0]).unwrap();
886        match plan {
887            StreamingPlan::Query(query_plan) => {
888                assert!(query_plan.window_config.is_some());
889                let config = query_plan.window_config.unwrap();
890                assert_eq!(config.time_column, "event_time");
891                assert_eq!(config.size.as_secs(), 300);
892            }
893            _ => panic!("Expected Query plan"),
894        }
895    }
896
897    #[test]
898    fn test_plan_query_with_join() {
899        let mut planner = StreamingPlanner::new();
900        let statements = StreamingParser::parse_sql(
901            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id \
902             AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR",
903        )
904        .unwrap();
905
906        let plan = planner.plan(&statements[0]).unwrap();
907        match plan {
908            StreamingPlan::Query(query_plan) => {
909                assert!(query_plan.join_config.is_some());
910                let configs = query_plan.join_config.unwrap();
911                assert_eq!(configs.len(), 1);
912                assert_eq!(configs[0].left_key(), "order_id");
913                assert_eq!(configs[0].right_key(), "order_id");
914            }
915            _ => panic!("Expected Query plan"),
916        }
917    }
918
919    #[test]
920    fn test_plan_rejects_unbounded_streaming_join() {
921        let mut planner = StreamingPlanner::new();
922        let statements = StreamingParser::parse_sql(
923            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
924        )
925        .unwrap();
926        let err = planner.plan(&statements[0]).unwrap_err();
927        let msg = format!("{err}");
928        assert!(msg.contains("unbounded join"), "got: {msg}");
929        assert!(msg.contains("lookup table"), "got: {msg}");
930    }
931
932    #[test]
933    fn test_plan_allows_join_between_windowed_views() {
934        let mut planner = StreamingPlanner::new();
935        // Two windowed views register as bounded inputs.
936        for sql in [
937            "CREATE STREAM price_1m AS SELECT TUMBLE(ts, INTERVAL '1' MINUTE) AS bucket, \
938             AVG(p) AS price FROM trades GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)",
939            "CREATE STREAM sent_1m AS SELECT TUMBLE(ts, INTERVAL '1' MINUTE) AS bucket, \
940             AVG(s) AS ms FROM posts GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE)",
941        ] {
942            let st = StreamingParser::parse_sql(sql).unwrap();
943            planner.plan(&st[0]).unwrap();
944        }
945        // The equi-join between them is accepted (not rejected as unbounded) and
946        // carries no join_config, so add_query routes it to the process-time join.
947        let st = StreamingParser::parse_sql(
948            "CREATE STREAM joined AS SELECT a.bucket, a.price, b.ms \
949             FROM price_1m a JOIN sent_1m b ON a.bucket = b.bucket",
950        )
951        .unwrap();
952        match planner.plan(&st[0]).unwrap() {
953            StreamingPlan::Query(qp) => {
954                assert!(
955                    qp.join_config.is_none(),
956                    "windowed-view join carries no join_config"
957                );
958            }
959            other => panic!("expected a Query plan, got {other:?}"),
960        }
961    }
962
963    #[test]
964    fn test_plan_query_with_lag() {
965        let mut planner = StreamingPlanner::new();
966        let statements = StreamingParser::parse_sql(
967            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
968        )
969        .unwrap();
970
971        let plan = planner.plan(&statements[0]).unwrap();
972        match plan {
973            StreamingPlan::Query(query_plan) => {
974                assert!(query_plan.analytic_config.is_some());
975                let config = query_plan.analytic_config.unwrap();
976                assert_eq!(config.functions.len(), 1);
977                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
978            }
979            _ => panic!("Expected Query plan with analytic config"),
980        }
981    }
982
983    #[test]
984    fn test_plan_query_with_having() {
985        let mut planner = StreamingPlanner::new();
986        let statements = StreamingParser::parse_sql(
987            "SELECT symbol, COUNT(*) AS cnt FROM trades \
988             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
989             HAVING COUNT(*) > 10",
990        )
991        .unwrap();
992
993        let plan = planner.plan(&statements[0]).unwrap();
994        match plan {
995            StreamingPlan::Query(query_plan) => {
996                assert!(query_plan.window_config.is_some());
997                assert!(query_plan.having_config.is_some());
998                let config = query_plan.having_config.unwrap();
999                assert!(
1000                    config.predicate().contains("COUNT(*)"),
1001                    "predicate was: {}",
1002                    config.predicate()
1003                );
1004            }
1005            _ => panic!("Expected Query plan with having config"),
1006        }
1007    }
1008
1009    #[test]
1010    fn test_plan_query_without_having() {
1011        let mut planner = StreamingPlanner::new();
1012        let statements = StreamingParser::parse_sql(
1013            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
1014        )
1015        .unwrap();
1016
1017        let plan = planner.plan(&statements[0]).unwrap();
1018        match plan {
1019            StreamingPlan::Query(query_plan) => {
1020                assert!(query_plan.having_config.is_none());
1021            }
1022            _ => panic!("Expected Query plan"),
1023        }
1024    }
1025
1026    #[test]
1027    fn test_plan_having_only_produces_query_plan() {
1028        // HAVING without window function still produces a Query plan
1029        let mut planner = StreamingPlanner::new();
1030        let statements = StreamingParser::parse_sql(
1031            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
1032        )
1033        .unwrap();
1034
1035        let plan = planner.plan(&statements[0]).unwrap();
1036        match plan {
1037            StreamingPlan::Query(query_plan) => {
1038                assert!(query_plan.having_config.is_some());
1039                assert!(query_plan.window_config.is_none());
1040            }
1041            _ => panic!("Expected Query plan for HAVING-only query"),
1042        }
1043    }
1044
1045    #[test]
1046    fn test_plan_having_compound_predicate() {
1047        let mut planner = StreamingPlanner::new();
1048        let statements = StreamingParser::parse_sql(
1049            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
1050             FROM trades GROUP BY symbol \
1051             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
1052        )
1053        .unwrap();
1054
1055        let plan = planner.plan(&statements[0]).unwrap();
1056        match plan {
1057            StreamingPlan::Query(query_plan) => {
1058                let config = query_plan.having_config.unwrap();
1059                let pred = config.predicate();
1060                assert!(pred.contains("AND"), "predicate was: {pred}");
1061            }
1062            _ => panic!("Expected Query plan"),
1063        }
1064    }
1065
1066    #[test]
1067    fn test_plan_query_with_lead() {
1068        let mut planner = StreamingPlanner::new();
1069        let statements = StreamingParser::parse_sql(
1070            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
1071        )
1072        .unwrap();
1073
1074        let plan = planner.plan(&statements[0]).unwrap();
1075        match plan {
1076            StreamingPlan::Query(query_plan) => {
1077                assert!(query_plan.analytic_config.is_some());
1078                let config = query_plan.analytic_config.unwrap();
1079                assert!(config.has_lookahead());
1080                assert_eq!(config.functions[0].offset, 2);
1081            }
1082            _ => panic!("Expected Query plan with analytic config"),
1083        }
1084    }
1085
1086    // -- Multi-way join planner tests --
1087
1088    #[test]
1089    fn test_plan_single_join_produces_vec_of_one() {
1090        let mut planner = StreamingPlanner::new();
1091        let statements = StreamingParser::parse_sql(
1092            "SELECT * FROM a JOIN b ON a.id = b.a_id \
1093             AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR",
1094        )
1095        .unwrap();
1096
1097        let plan = planner.plan(&statements[0]).unwrap();
1098        match plan {
1099            StreamingPlan::Query(qp) => {
1100                let configs = qp.join_config.unwrap();
1101                assert_eq!(configs.len(), 1);
1102            }
1103            _ => panic!("Expected Query plan"),
1104        }
1105    }
1106
1107    #[test]
1108    fn test_plan_two_way_join() {
1109        let mut planner = StreamingPlanner::new();
1110        let statements = StreamingParser::parse_sql(
1111            "SELECT * FROM a JOIN b ON a.id = b.a_id \
1112                 AND b.ts BETWEEN a.ts AND a.ts + INTERVAL '1' HOUR \
1113             JOIN c ON b.id = c.b_id \
1114                 AND c.ts BETWEEN b.ts AND b.ts + INTERVAL '1' HOUR",
1115        )
1116        .unwrap();
1117
1118        let plan = planner.plan(&statements[0]).unwrap();
1119        match plan {
1120            StreamingPlan::Query(qp) => {
1121                let configs = qp.join_config.unwrap();
1122                assert_eq!(configs.len(), 2);
1123                assert_eq!(configs[0].left_key(), "id");
1124                assert_eq!(configs[0].right_key(), "a_id");
1125                assert_eq!(configs[1].left_key(), "id");
1126                assert_eq!(configs[1].right_key(), "b_id");
1127            }
1128            _ => panic!("Expected Query plan"),
1129        }
1130    }
1131
1132    #[test]
1133    fn test_plan_mixed_join_types() {
1134        let mut planner = StreamingPlanner::new();
1135        // The second JOIN here would be unbounded if `customers` were a
1136        // streaming source. Register it as a lookup table so the rejection
1137        // rule passes.
1138        let _ = planner.plan(
1139            &StreamingParser::parse_sql(
1140                "CREATE LOOKUP TABLE customers (id BIGINT NOT NULL, name VARCHAR, \
1141                 PRIMARY KEY (id)) WITH (connector = 'parquet', path = '/tmp/x.parquet')",
1142            )
1143            .unwrap()[0],
1144        );
1145        let statements = StreamingParser::parse_sql(
1146            "SELECT * FROM orders o \
1147             JOIN payments p ON o.id = p.order_id \
1148                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1149             JOIN customers c ON p.cust_id = c.id",
1150        )
1151        .unwrap();
1152
1153        let plan = planner.plan(&statements[0]).unwrap();
1154        match plan {
1155            StreamingPlan::Query(qp) => {
1156                let configs = qp.join_config.unwrap();
1157                assert_eq!(configs.len(), 2);
1158                assert!(configs[0].is_stream_stream());
1159                assert!(configs[1].is_lookup());
1160            }
1161            _ => panic!("Expected Query plan"),
1162        }
1163    }
1164
1165    #[test]
1166    fn test_plan_backward_compat_no_join() {
1167        let mut planner = StreamingPlanner::new();
1168        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1169
1170        let plan = planner.plan(&statements[0]).unwrap();
1171        match plan {
1172            StreamingPlan::Standard(_) => {} // No join → pass-through
1173            _ => panic!("Expected Standard plan for simple SELECT"),
1174        }
1175    }
1176
1177    // -- Window Frame planner tests --
1178
1179    #[test]
1180    fn test_plan_query_with_rows_frame() {
1181        let mut planner = StreamingPlanner::new();
1182        let statements = StreamingParser::parse_sql(
1183            "SELECT AVG(price) OVER (ORDER BY ts \
1184             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1185        )
1186        .unwrap();
1187
1188        let plan = planner.plan(&statements[0]).unwrap();
1189        match plan {
1190            StreamingPlan::Query(qp) => {
1191                assert!(qp.frame_config.is_some());
1192                let fc = qp.frame_config.unwrap();
1193                assert_eq!(fc.functions.len(), 1);
1194                assert_eq!(fc.functions[0].source_column, "price");
1195            }
1196            _ => panic!("Expected Query plan with frame_config"),
1197        }
1198    }
1199
1200    #[test]
1201    fn test_plan_frame_with_partition() {
1202        let mut planner = StreamingPlanner::new();
1203        let statements = StreamingParser::parse_sql(
1204            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1205             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1206        )
1207        .unwrap();
1208
1209        let plan = planner.plan(&statements[0]).unwrap();
1210        match plan {
1211            StreamingPlan::Query(qp) => {
1212                let fc = qp.frame_config.unwrap();
1213                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1214                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1215            }
1216            _ => panic!("Expected Query plan with frame_config"),
1217        }
1218    }
1219
1220    #[test]
1221    fn test_plan_no_frame_is_standard() {
1222        let mut planner = StreamingPlanner::new();
1223        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1224
1225        let plan = planner.plan(&statements[0]).unwrap();
1226        match plan {
1227            StreamingPlan::Standard(_) => {} // No frame → pass-through
1228            _ => panic!("Expected Standard plan for simple SELECT"),
1229        }
1230    }
1231
1232    #[test]
1233    fn test_plan_unbounded_following_rejected() {
1234        let mut planner = StreamingPlanner::new();
1235        let statements = StreamingParser::parse_sql(
1236            "SELECT SUM(amount) OVER (ORDER BY id \
1237             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1238             FROM orders",
1239        )
1240        .unwrap();
1241
1242        let result = planner.plan(&statements[0]);
1243        assert!(result.is_err());
1244        let err = result.unwrap_err().to_string();
1245        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1246    }
1247}