Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8/// Optimizer rules for lookup join rewriting.
9pub mod lookup_join;
10/// Predicate splitting and pushdown for lookup joins.
11pub mod predicate_split;
12/// Physical optimizer rule for streaming plan validation.
13pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] // cold path: query planning
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26    analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::analyze_joins;
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32    CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33    StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36    AnalyticWindowConfig, HavingFilterConfig, JoinOperatorConfig, OrderOperatorConfig,
37    WindowFrameConfig, WindowOperatorConfig,
38};
39
40/// Information about a registered lookup table.
41#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43    /// Table name.
44    pub name: String,
45    /// Column names and types.
46    pub columns: Vec<(String, String)>,
47    /// Primary key columns.
48    pub primary_key: Vec<String>,
49    /// Validated properties.
50    pub properties: LookupTableProperties,
51    /// Pre-computed Arrow schema from column definitions.
52    pub arrow_schema: SchemaRef,
53    /// Raw WITH options for connector configuration pass-through.
54    pub raw_options: HashMap<String, String>,
55}
56
57/// Streaming query planner
58pub struct StreamingPlanner {
59    /// Registered sources
60    sources: HashMap<String, SourceInfo>,
61    /// Registered sinks
62    sinks: HashMap<String, SinkInfo>,
63    /// Registered lookup tables
64    lookup_tables: HashMap<String, LookupTableInfo>,
65}
66
67/// Information about a registered source
68#[derive(Debug, Clone)]
69pub struct SourceInfo {
70    /// Source name
71    pub name: String,
72    /// Watermark column (if configured)
73    pub watermark_column: Option<String>,
74    /// Connector options
75    pub options: HashMap<String, String>,
76}
77
78/// Information about a registered sink
79#[derive(Debug, Clone)]
80pub struct SinkInfo {
81    /// Sink name
82    pub name: String,
83    /// Source table or query name
84    pub from: String,
85    /// Connector options
86    pub options: HashMap<String, String>,
87}
88
89/// Result of planning a streaming statement
90#[derive(Debug)]
91#[allow(clippy::large_enum_variant)]
92pub enum StreamingPlan {
93    /// Source registration (DDL)
94    RegisterSource(SourceInfo),
95
96    /// Sink registration (DDL)
97    RegisterSink(SinkInfo),
98
99    /// Query plan with streaming configurations
100    Query(QueryPlan),
101
102    /// Standard SQL statement (pass-through to DataFusion)
103    Standard(Box<Statement>),
104
105    /// Lookup table registration (DDL)
106    RegisterLookupTable(LookupTableInfo),
107
108    /// Drop a lookup table
109    DropLookupTable {
110        /// Name of the lookup table to drop.
111        name: String,
112    },
113}
114
115/// A query plan with streaming operator configurations
116#[derive(Debug)]
117pub struct QueryPlan {
118    /// Optional name for the continuous query
119    pub name: Option<String>,
120    /// Window configuration if the query has windowed aggregation
121    pub window_config: Option<WindowOperatorConfig>,
122    /// Join configuration(s) if the query has joins (one per join step)
123    pub join_config: Option<Vec<JoinOperatorConfig>>,
124    /// ORDER BY configuration if the query has ordering
125    pub order_config: Option<OrderOperatorConfig>,
126    /// Analytic window function configuration (LAG/LEAD/etc.)
127    pub analytic_config: Option<AnalyticWindowConfig>,
128    /// HAVING clause filter configuration
129    pub having_config: Option<HavingFilterConfig>,
130    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
131    pub frame_config: Option<WindowFrameConfig>,
132    /// Emit strategy
133    pub emit_clause: Option<EmitClause>,
134    /// The underlying SQL statement
135    pub statement: Box<Statement>,
136}
137
138impl StreamingPlanner {
139    /// Creates a new streaming planner
140    #[must_use]
141    pub fn new() -> Self {
142        Self {
143            sources: HashMap::new(),
144            sinks: HashMap::new(),
145            lookup_tables: HashMap::new(),
146        }
147    }
148
149    /// Plans a streaming statement.
150    ///
151    /// # Errors
152    ///
153    /// Returns `PlanningError` if the statement cannot be planned.
154    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
155        match statement {
156            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
157            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
158            StreamingStatement::CreateContinuousQuery {
159                name,
160                query,
161                emit_clause,
162                ..
163            }
164            | StreamingStatement::CreateStream {
165                name,
166                query,
167                emit_clause,
168                ..
169            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
170            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
171            StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
172            StreamingStatement::DropLookupTable { name, if_exists } => {
173                self.plan_drop_lookup_table(name, *if_exists)
174            }
175            StreamingStatement::DropSource { .. }
176            | StreamingStatement::DropSink { .. }
177            | StreamingStatement::DropStream { .. }
178            | StreamingStatement::DropMaterializedView { .. }
179            | StreamingStatement::Show(_)
180            | StreamingStatement::Describe { .. }
181            | StreamingStatement::Explain { .. }
182            | StreamingStatement::CreateMaterializedView { .. }
183            | StreamingStatement::InsertInto { .. }
184            | StreamingStatement::AlterSource { .. }
185            | StreamingStatement::Checkpoint
186            | StreamingStatement::RestoreCheckpoint { .. } => {
187                // These statements are handled directly by the database facade
188                // and don't need query planning. Return as Standard pass-through.
189                Err(PlanningError::UnsupportedSql(format!(
190                    "Statement type {:?} is handled by the database layer, not the planner",
191                    std::mem::discriminant(statement)
192                )))
193            }
194        }
195    }
196
197    /// Plans a CREATE SOURCE statement.
198    fn plan_create_source(
199        &mut self,
200        source: &CreateSourceStatement,
201    ) -> Result<StreamingPlan, PlanningError> {
202        let name = object_name_to_string(&source.name);
203
204        // Check for existing source
205        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
206            return Err(PlanningError::InvalidQuery(format!(
207                "Source '{}' already exists",
208                name
209            )));
210        }
211
212        // Extract watermark column
213        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
214
215        let info = SourceInfo {
216            name: name.clone(),
217            watermark_column,
218            options: source.with_options.clone(),
219        };
220
221        // Register the source
222        self.sources.insert(name, info.clone());
223
224        Ok(StreamingPlan::RegisterSource(info))
225    }
226
227    /// Plans a CREATE SINK statement.
228    fn plan_create_sink(
229        &mut self,
230        sink: &CreateSinkStatement,
231    ) -> Result<StreamingPlan, PlanningError> {
232        let name = object_name_to_string(&sink.name);
233
234        // Check for existing sink
235        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
236            return Err(PlanningError::InvalidQuery(format!(
237                "Sink '{}' already exists",
238                name
239            )));
240        }
241
242        // Determine the source
243        let from = match &sink.from {
244            SinkFrom::Table(table) => object_name_to_string(table),
245            SinkFrom::Query(_) => format!("{}_query", name),
246        };
247
248        let info = SinkInfo {
249            name: name.clone(),
250            from,
251            options: sink.with_options.clone(),
252        };
253
254        // Register the sink
255        self.sinks.insert(name, info.clone());
256
257        Ok(StreamingPlan::RegisterSink(info))
258    }
259
260    /// Plans a CREATE CONTINUOUS QUERY statement.
261    #[allow(clippy::unused_self)] // Will use planner state for query registration
262    fn plan_continuous_query(
263        &mut self,
264        name: &ObjectName,
265        query: &StreamingStatement,
266        emit_clause: Option<&EmitClause>,
267    ) -> Result<StreamingPlan, PlanningError> {
268        // The query inside should be a standard SELECT
269        let stmt = match query {
270            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
271            _ => {
272                return Err(PlanningError::InvalidQuery(
273                    "Continuous query must contain a SELECT statement".to_string(),
274                ))
275            }
276        };
277
278        // Analyze the query for streaming features
279        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
280
281        Ok(StreamingPlan::Query(QueryPlan {
282            name: Some(object_name_to_string(name)),
283            window_config: query_plan.window_config,
284            join_config: query_plan.join_config,
285            order_config: query_plan.order_config,
286            analytic_config: query_plan.analytic_config,
287            having_config: query_plan.having_config,
288            frame_config: query_plan.frame_config,
289            emit_clause: emit_clause.cloned(),
290            statement: Box::new(stmt),
291        }))
292    }
293
294    /// Plans a standard SQL statement.
295    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
296    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
297        // Check if it's a query that might have streaming features
298        if let Statement::Query(query) = stmt {
299            if let SetExpr::Select(select) = query.body.as_ref() {
300                // Check for window functions in GROUP BY
301                let window_function = Self::extract_window_from_select(select);
302
303                // Check for joins (multi-way)
304                let join_analysis = analyze_joins(select).map_err(|e| {
305                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
306                })?;
307
308                // Check for ORDER BY
309                let order_analysis = analyze_order_by(stmt);
310                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
311                    .map_err(PlanningError::InvalidQuery)?;
312
313                // Check for analytic functions (LAG/LEAD/etc.)
314                let analytic_analysis = analyze_analytic_functions(stmt);
315                let analytic_config =
316                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
317
318                // Check for HAVING clause
319                let agg_analysis = analyze_aggregates(stmt);
320                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
321
322                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
323                let frame_analysis = analyze_window_frames(stmt);
324                let frame_config = frame_analysis
325                    .as_ref()
326                    .map(WindowFrameConfig::from_analysis);
327
328                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
329                if let Some(fa) = &frame_analysis {
330                    for f in &fa.functions {
331                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
332                            return Err(PlanningError::InvalidQuery(
333                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
334                                    .to_string(),
335                            ));
336                        }
337                    }
338                }
339
340                let has_streaming_features = window_function.is_some()
341                    || join_analysis.is_some()
342                    || order_config.is_some()
343                    || analytic_config.is_some()
344                    || having_config.is_some()
345                    || frame_config.is_some();
346
347                if has_streaming_features {
348                    let window_config = match window_function {
349                        Some(w) => Some(
350                            WindowOperatorConfig::from_window_function(&w)
351                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
352                        ),
353                        None => None,
354                    };
355
356                    let join_config =
357                        join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
358
359                    return Ok(StreamingPlan::Query(QueryPlan {
360                        name: None,
361                        window_config,
362                        join_config,
363                        order_config,
364                        analytic_config,
365                        having_config,
366                        frame_config,
367                        emit_clause: None,
368                        statement: Box::new(stmt.clone()),
369                    }));
370                }
371            }
372        }
373
374        // Pass through standard SQL
375        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
376    }
377
378    /// Analyzes a query for streaming features.
379    fn analyze_query(
380        stmt: &Statement,
381        emit_clause: Option<&EmitClause>,
382    ) -> Result<QueryAnalysis, PlanningError> {
383        let mut analysis = QueryAnalysis::default();
384
385        if let Statement::Query(query) = stmt {
386            if let SetExpr::Select(select) = query.body.as_ref() {
387                // Extract window function
388                if let Some(window) = Self::extract_window_from_select(select) {
389                    let mut config = WindowOperatorConfig::from_window_function(&window)
390                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
391
392                    // Apply emit clause if present
393                    if let Some(emit) = emit_clause {
394                        config = config
395                            .with_emit_clause(emit)
396                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
397                    }
398
399                    analysis.window_config = Some(config);
400                }
401
402                // Extract join info (multi-way)
403                if let Some(multi) = analyze_joins(select).map_err(|e| {
404                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
405                })? {
406                    analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
407                }
408            }
409        }
410
411        // Extract ORDER BY info
412        let order_analysis = analyze_order_by(stmt);
413        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
414            .map_err(PlanningError::InvalidQuery)?;
415
416        // Extract analytic function info (LAG/LEAD/etc.)
417        if let Some(analytic) = analyze_analytic_functions(stmt) {
418            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
419        }
420
421        // Extract HAVING clause
422        let agg_analysis = analyze_aggregates(stmt);
423        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
424
425        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
426        if let Some(frame_analysis) = analyze_window_frames(stmt) {
427            // Validate: reject UNBOUNDED FOLLOWING
428            for f in &frame_analysis.functions {
429                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
430                    return Err(PlanningError::InvalidQuery(
431                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
432                            .to_string(),
433                    ));
434                }
435            }
436            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
437        }
438
439        Ok(analysis)
440    }
441
442    /// Extracts window function from a SELECT.
443    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
444        // Check GROUP BY for window functions
445        use sqlparser::ast::GroupByExpr;
446        match &select.group_by {
447            GroupByExpr::Expressions(exprs, _modifiers) => {
448                for group_by_expr in exprs {
449                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
450                    {
451                        return Some(window);
452                    }
453                }
454            }
455            GroupByExpr::All(_) => {}
456        }
457        None
458    }
459
460    /// Plans a CREATE LOOKUP TABLE statement.
461    fn plan_create_lookup_table(
462        &mut self,
463        lt: &CreateLookupTableStatement,
464    ) -> Result<StreamingPlan, PlanningError> {
465        let name = object_name_to_string(&lt.name);
466
467        if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
468            return Err(PlanningError::InvalidQuery(format!(
469                "Lookup table '{}' already exists",
470                name
471            )));
472        }
473
474        let columns: Vec<(String, String)> = lt
475            .columns
476            .iter()
477            .map(|c| (c.name.value.clone(), c.data_type.to_string()))
478            .collect();
479
480        let properties = validate_properties(&lt.with_options).map_err(|e| {
481            PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
482        })?;
483
484        // Compute Arrow schema from column definitions
485        let arrow_fields: Vec<Field> = lt
486            .columns
487            .iter()
488            .map(|c| {
489                let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
490                    .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
491                let nullable = !c
492                    .options
493                    .iter()
494                    .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
495                Ok(Field::new(&c.name.value, dt, nullable))
496            })
497            .collect::<Result<_, PlanningError>>()?;
498        let arrow_schema = Arc::new(Schema::new(arrow_fields));
499
500        let info = LookupTableInfo {
501            name: name.clone(),
502            columns,
503            primary_key: lt.primary_key.clone(),
504            properties,
505            arrow_schema,
506            raw_options: lt.with_options.clone(),
507        };
508
509        self.lookup_tables.insert(name, info.clone());
510
511        Ok(StreamingPlan::RegisterLookupTable(info))
512    }
513
514    /// Plans a DROP LOOKUP TABLE statement.
515    fn plan_drop_lookup_table(
516        &mut self,
517        name: &ObjectName,
518        if_exists: bool,
519    ) -> Result<StreamingPlan, PlanningError> {
520        let name_str = object_name_to_string(name);
521
522        if !if_exists && !self.lookup_tables.contains_key(&name_str) {
523            return Err(PlanningError::InvalidQuery(format!(
524                "Lookup table '{}' does not exist",
525                name_str
526            )));
527        }
528
529        self.lookup_tables.remove(&name_str);
530
531        Ok(StreamingPlan::DropLookupTable { name: name_str })
532    }
533
534    /// Gets a registered source by name.
535    #[must_use]
536    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
537        self.sources.get(name)
538    }
539
540    /// Gets a registered sink by name.
541    #[must_use]
542    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
543        self.sinks.get(name)
544    }
545
546    /// Lists all registered sources.
547    #[must_use]
548    pub fn list_sources(&self) -> Vec<&SourceInfo> {
549        self.sources.values().collect()
550    }
551
552    /// Lists all registered sinks.
553    #[must_use]
554    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
555        self.sinks.values().collect()
556    }
557
558    /// Gets a registered lookup table by name.
559    #[must_use]
560    pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
561        self.lookup_tables.get(name)
562    }
563
564    /// Lists all registered lookup tables.
565    #[must_use]
566    pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
567        self.lookup_tables.values().collect()
568    }
569
570    /// Returns a clone of the lookup tables map for optimizer rule construction.
571    #[must_use]
572    pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
573        self.lookup_tables.clone()
574    }
575
576    /// Converts a query plan's SQL statement into a `DataFusion`
577    /// `LogicalPlan`. Window UDFs (TUMBLE, HOP, SESSION) must be registered
578    /// on `ctx` via
579    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
580    /// for windowed queries to resolve correctly.
581    ///
582    /// # Errors
583    ///
584    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
585    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
586    pub async fn to_logical_plan(
587        &self,
588        plan: &QueryPlan,
589        ctx: &SessionContext,
590    ) -> Result<LogicalPlan, PlanningError> {
591        // Convert the AST statement back to SQL and let DataFusion re-parse
592        // it with its own sqlparser version. This avoids version mismatches
593        // between our sqlparser (0.60) and DataFusion's (0.59).
594        let sql = plan.statement.to_string();
595        ctx.state()
596            .create_logical_plan(&sql)
597            .await
598            .map_err(PlanningError::DataFusion)
599    }
600}
601
602impl Default for StreamingPlanner {
603    fn default() -> Self {
604        Self::new()
605    }
606}
607
608/// Intermediate query analysis result
609#[derive(Debug, Default)]
610#[allow(clippy::struct_field_names)]
611struct QueryAnalysis {
612    window_config: Option<WindowOperatorConfig>,
613    join_config: Option<Vec<JoinOperatorConfig>>,
614    order_config: Option<OrderOperatorConfig>,
615    analytic_config: Option<AnalyticWindowConfig>,
616    having_config: Option<HavingFilterConfig>,
617    frame_config: Option<WindowFrameConfig>,
618}
619
620/// Helper to convert `ObjectName` to String
621fn object_name_to_string(name: &ObjectName) -> String {
622    name.to_string()
623}
624
625/// Planning errors
626#[derive(Debug, thiserror::Error)]
627pub enum PlanningError {
628    /// Unsupported SQL feature
629    UnsupportedSql(String),
630
631    /// Invalid query
632    InvalidQuery(String),
633
634    /// Source not found
635    SourceNotFound(String),
636
637    /// Sink not found
638    SinkNotFound(String),
639
640    /// `DataFusion` error during logical plan creation (translated on display)
641    DataFusion(#[from] datafusion_common::DataFusionError),
642}
643
644impl std::fmt::Display for PlanningError {
645    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646        match self {
647            Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
648            Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
649            Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
650            Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
651            Self::DataFusion(e) => {
652                let translated = crate::error::translate_datafusion_error(&e.to_string());
653                write!(f, "{translated}")
654            }
655        }
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662    use crate::parser::StreamingParser;
663
664    #[test]
665    fn test_plan_create_source() {
666        let mut planner = StreamingPlanner::new();
667        let statements =
668            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
669
670        let plan = planner.plan(&statements[0]).unwrap();
671        match plan {
672            StreamingPlan::RegisterSource(info) => {
673                assert_eq!(info.name, "events");
674            }
675            _ => panic!("Expected RegisterSource plan"),
676        }
677    }
678
679    #[test]
680    fn test_plan_create_sink() {
681        let mut planner = StreamingPlanner::new();
682        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
683
684        let plan = planner.plan(&statements[0]).unwrap();
685        match plan {
686            StreamingPlan::RegisterSink(info) => {
687                assert_eq!(info.name, "output");
688                assert_eq!(info.from, "events");
689            }
690            _ => panic!("Expected RegisterSink plan"),
691        }
692    }
693
694    #[test]
695    fn test_plan_duplicate_source() {
696        let mut planner = StreamingPlanner::new();
697
698        // First source
699        let statements =
700            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
701        planner.plan(&statements[0]).unwrap();
702
703        // Duplicate should fail
704        let result = planner.plan(&statements[0]);
705        assert!(result.is_err());
706    }
707
708    #[test]
709    fn test_plan_source_if_not_exists() {
710        let mut planner = StreamingPlanner::new();
711
712        // First source
713        let statements =
714            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
715        planner.plan(&statements[0]).unwrap();
716
717        // IF NOT EXISTS should succeed
718        let statements =
719            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
720                .unwrap();
721        let result = planner.plan(&statements[0]);
722        assert!(result.is_ok());
723    }
724
725    #[test]
726    fn test_plan_source_or_replace() {
727        let mut planner = StreamingPlanner::new();
728
729        // First source
730        let statements =
731            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
732        planner.plan(&statements[0]).unwrap();
733
734        // OR REPLACE should succeed
735        let statements =
736            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
737                .unwrap();
738        let result = planner.plan(&statements[0]);
739        assert!(result.is_ok());
740    }
741
742    #[test]
743    fn test_plan_source_with_watermark() {
744        let mut planner = StreamingPlanner::new();
745        let statements = StreamingParser::parse_sql(
746            "CREATE SOURCE events (
747                id INT,
748                ts TIMESTAMP,
749                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
750            )",
751        )
752        .unwrap();
753
754        let plan = planner.plan(&statements[0]).unwrap();
755        match plan {
756            StreamingPlan::RegisterSource(info) => {
757                assert_eq!(info.name, "events");
758                assert_eq!(info.watermark_column, Some("ts".to_string()));
759            }
760            _ => panic!("Expected RegisterSource plan"),
761        }
762    }
763
764    #[test]
765    fn test_plan_standard_select() {
766        let mut planner = StreamingPlanner::new();
767        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
768
769        let plan = planner.plan(&statements[0]).unwrap();
770        match plan {
771            StreamingPlan::Standard(_) => {}
772            _ => panic!("Expected Standard plan for simple SELECT"),
773        }
774    }
775
776    #[test]
777    fn test_list_sources_and_sinks() {
778        let mut planner = StreamingPlanner::new();
779
780        // Create sources
781        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
782        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
783        planner.plan(&s1[0]).unwrap();
784        planner.plan(&s2[0]).unwrap();
785
786        // Create sinks
787        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
788        planner.plan(&k1[0]).unwrap();
789
790        assert_eq!(planner.list_sources().len(), 2);
791        assert_eq!(planner.list_sinks().len(), 1);
792        assert!(planner.get_source("src1").is_some());
793        assert!(planner.get_sink("sink1").is_some());
794    }
795
796    #[test]
797    fn test_plan_query_with_window() {
798        let mut planner = StreamingPlanner::new();
799        let statements = StreamingParser::parse_sql(
800            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
801        )
802        .unwrap();
803
804        let plan = planner.plan(&statements[0]).unwrap();
805        match plan {
806            StreamingPlan::Query(query_plan) => {
807                assert!(query_plan.window_config.is_some());
808                let config = query_plan.window_config.unwrap();
809                assert_eq!(config.time_column, "event_time");
810                assert_eq!(config.size.as_secs(), 300);
811            }
812            _ => panic!("Expected Query plan"),
813        }
814    }
815
816    #[test]
817    fn test_plan_query_with_join() {
818        let mut planner = StreamingPlanner::new();
819        let statements = StreamingParser::parse_sql(
820            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
821        )
822        .unwrap();
823
824        let plan = planner.plan(&statements[0]).unwrap();
825        match plan {
826            StreamingPlan::Query(query_plan) => {
827                assert!(query_plan.join_config.is_some());
828                let configs = query_plan.join_config.unwrap();
829                assert_eq!(configs.len(), 1);
830                assert_eq!(configs[0].left_key(), "order_id");
831                assert_eq!(configs[0].right_key(), "order_id");
832            }
833            _ => panic!("Expected Query plan"),
834        }
835    }
836
837    #[test]
838    fn test_plan_query_with_lag() {
839        let mut planner = StreamingPlanner::new();
840        let statements = StreamingParser::parse_sql(
841            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
842        )
843        .unwrap();
844
845        let plan = planner.plan(&statements[0]).unwrap();
846        match plan {
847            StreamingPlan::Query(query_plan) => {
848                assert!(query_plan.analytic_config.is_some());
849                let config = query_plan.analytic_config.unwrap();
850                assert_eq!(config.functions.len(), 1);
851                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
852            }
853            _ => panic!("Expected Query plan with analytic config"),
854        }
855    }
856
857    #[test]
858    fn test_plan_query_with_having() {
859        let mut planner = StreamingPlanner::new();
860        let statements = StreamingParser::parse_sql(
861            "SELECT symbol, COUNT(*) AS cnt FROM trades \
862             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
863             HAVING COUNT(*) > 10",
864        )
865        .unwrap();
866
867        let plan = planner.plan(&statements[0]).unwrap();
868        match plan {
869            StreamingPlan::Query(query_plan) => {
870                assert!(query_plan.window_config.is_some());
871                assert!(query_plan.having_config.is_some());
872                let config = query_plan.having_config.unwrap();
873                assert!(
874                    config.predicate().contains("COUNT(*)"),
875                    "predicate was: {}",
876                    config.predicate()
877                );
878            }
879            _ => panic!("Expected Query plan with having config"),
880        }
881    }
882
883    #[test]
884    fn test_plan_query_without_having() {
885        let mut planner = StreamingPlanner::new();
886        let statements = StreamingParser::parse_sql(
887            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
888        )
889        .unwrap();
890
891        let plan = planner.plan(&statements[0]).unwrap();
892        match plan {
893            StreamingPlan::Query(query_plan) => {
894                assert!(query_plan.having_config.is_none());
895            }
896            _ => panic!("Expected Query plan"),
897        }
898    }
899
900    #[test]
901    fn test_plan_having_only_produces_query_plan() {
902        // HAVING without window function still produces a Query plan
903        let mut planner = StreamingPlanner::new();
904        let statements = StreamingParser::parse_sql(
905            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
906        )
907        .unwrap();
908
909        let plan = planner.plan(&statements[0]).unwrap();
910        match plan {
911            StreamingPlan::Query(query_plan) => {
912                assert!(query_plan.having_config.is_some());
913                assert!(query_plan.window_config.is_none());
914            }
915            _ => panic!("Expected Query plan for HAVING-only query"),
916        }
917    }
918
919    #[test]
920    fn test_plan_having_compound_predicate() {
921        let mut planner = StreamingPlanner::new();
922        let statements = StreamingParser::parse_sql(
923            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
924             FROM trades GROUP BY symbol \
925             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
926        )
927        .unwrap();
928
929        let plan = planner.plan(&statements[0]).unwrap();
930        match plan {
931            StreamingPlan::Query(query_plan) => {
932                let config = query_plan.having_config.unwrap();
933                let pred = config.predicate();
934                assert!(pred.contains("AND"), "predicate was: {pred}");
935            }
936            _ => panic!("Expected Query plan"),
937        }
938    }
939
940    #[test]
941    fn test_plan_query_with_lead() {
942        let mut planner = StreamingPlanner::new();
943        let statements = StreamingParser::parse_sql(
944            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
945        )
946        .unwrap();
947
948        let plan = planner.plan(&statements[0]).unwrap();
949        match plan {
950            StreamingPlan::Query(query_plan) => {
951                assert!(query_plan.analytic_config.is_some());
952                let config = query_plan.analytic_config.unwrap();
953                assert!(config.has_lookahead());
954                assert_eq!(config.functions[0].offset, 2);
955            }
956            _ => panic!("Expected Query plan with analytic config"),
957        }
958    }
959
960    // -- Multi-way join planner tests --
961
962    #[test]
963    fn test_plan_single_join_produces_vec_of_one() {
964        let mut planner = StreamingPlanner::new();
965        let statements =
966            StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
967
968        let plan = planner.plan(&statements[0]).unwrap();
969        match plan {
970            StreamingPlan::Query(qp) => {
971                let configs = qp.join_config.unwrap();
972                assert_eq!(configs.len(), 1);
973            }
974            _ => panic!("Expected Query plan"),
975        }
976    }
977
978    #[test]
979    fn test_plan_two_way_join() {
980        let mut planner = StreamingPlanner::new();
981        let statements = StreamingParser::parse_sql(
982            "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
983        )
984        .unwrap();
985
986        let plan = planner.plan(&statements[0]).unwrap();
987        match plan {
988            StreamingPlan::Query(qp) => {
989                let configs = qp.join_config.unwrap();
990                assert_eq!(configs.len(), 2);
991                assert_eq!(configs[0].left_key(), "id");
992                assert_eq!(configs[0].right_key(), "a_id");
993                assert_eq!(configs[1].left_key(), "id");
994                assert_eq!(configs[1].right_key(), "b_id");
995            }
996            _ => panic!("Expected Query plan"),
997        }
998    }
999
1000    #[test]
1001    fn test_plan_mixed_join_types() {
1002        let mut planner = StreamingPlanner::new();
1003        let statements = StreamingParser::parse_sql(
1004            "SELECT * FROM orders o \
1005             JOIN payments p ON o.id = p.order_id \
1006                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1007             JOIN customers c ON p.cust_id = c.id",
1008        )
1009        .unwrap();
1010
1011        let plan = planner.plan(&statements[0]).unwrap();
1012        match plan {
1013            StreamingPlan::Query(qp) => {
1014                let configs = qp.join_config.unwrap();
1015                assert_eq!(configs.len(), 2);
1016                assert!(configs[0].is_stream_stream());
1017                assert!(configs[1].is_lookup());
1018            }
1019            _ => panic!("Expected Query plan"),
1020        }
1021    }
1022
1023    #[test]
1024    fn test_plan_backward_compat_no_join() {
1025        let mut planner = StreamingPlanner::new();
1026        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1027
1028        let plan = planner.plan(&statements[0]).unwrap();
1029        match plan {
1030            StreamingPlan::Standard(_) => {} // No join → pass-through
1031            _ => panic!("Expected Standard plan for simple SELECT"),
1032        }
1033    }
1034
1035    // -- Window Frame planner tests --
1036
1037    #[test]
1038    fn test_plan_query_with_rows_frame() {
1039        let mut planner = StreamingPlanner::new();
1040        let statements = StreamingParser::parse_sql(
1041            "SELECT AVG(price) OVER (ORDER BY ts \
1042             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1043        )
1044        .unwrap();
1045
1046        let plan = planner.plan(&statements[0]).unwrap();
1047        match plan {
1048            StreamingPlan::Query(qp) => {
1049                assert!(qp.frame_config.is_some());
1050                let fc = qp.frame_config.unwrap();
1051                assert_eq!(fc.functions.len(), 1);
1052                assert_eq!(fc.functions[0].source_column, "price");
1053            }
1054            _ => panic!("Expected Query plan with frame_config"),
1055        }
1056    }
1057
1058    #[test]
1059    fn test_plan_frame_with_partition() {
1060        let mut planner = StreamingPlanner::new();
1061        let statements = StreamingParser::parse_sql(
1062            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1063             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1064        )
1065        .unwrap();
1066
1067        let plan = planner.plan(&statements[0]).unwrap();
1068        match plan {
1069            StreamingPlan::Query(qp) => {
1070                let fc = qp.frame_config.unwrap();
1071                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1072                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1073            }
1074            _ => panic!("Expected Query plan with frame_config"),
1075        }
1076    }
1077
1078    #[test]
1079    fn test_plan_no_frame_is_standard() {
1080        let mut planner = StreamingPlanner::new();
1081        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1082
1083        let plan = planner.plan(&statements[0]).unwrap();
1084        match plan {
1085            StreamingPlan::Standard(_) => {} // No frame → pass-through
1086            _ => panic!("Expected Standard plan for simple SELECT"),
1087        }
1088    }
1089
1090    #[test]
1091    fn test_plan_unbounded_following_rejected() {
1092        let mut planner = StreamingPlanner::new();
1093        let statements = StreamingParser::parse_sql(
1094            "SELECT SUM(amount) OVER (ORDER BY id \
1095             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1096             FROM orders",
1097        )
1098        .unwrap();
1099
1100        let result = planner.plan(&statements[0]);
1101        assert!(result.is_err());
1102        let err = result.unwrap_err().to_string();
1103        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1104    }
1105}