Skip to main content

laminar_sql/datafusion/
execute.rs

1//! End-to-end streaming SQL execution
2//!
3//! Provides [`execute_streaming_sql`] for parsing, planning, and executing
4//! streaming SQL statements through the `DataFusion` engine.
5//!
6//! # Architecture
7//!
8//! ```text
9//! SQL string
10//!     │
11//!     ▼
12//! parse_streaming_sql()  →  StreamingStatement
13//!     │
14//!     ▼
15//! StreamingPlanner::plan()  →  StreamingPlan
16//!     │
17//!     ├─ DDL (CREATE SOURCE/SINK)  →  DdlResult
18//!     │
19//!     ├─ Query (windows/joins)     →  LogicalPlan → DataFrame → stream
20//!     │                                + QueryPlan metadata
21//!     │
22//!     └─ Standard SQL              →  DataFusion ctx.sql() → stream
23//! ```
24
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::prelude::SessionContext;
27
28use crate::parser::interval_rewriter::rewrite_interval_arithmetic;
29use crate::parser::parse_streaming_sql;
30use crate::planner::{QueryPlan, StreamingPlan, StreamingPlanner};
31use crate::Error;
32
33/// Result of executing a streaming SQL statement.
34#[derive(Debug)]
35pub enum StreamingSqlResult {
36    /// DDL statement result (CREATE SOURCE, CREATE SINK)
37    Ddl(DdlResult),
38    /// Query execution result with optional streaming metadata
39    Query(QueryResult),
40}
41
42/// Result of a DDL statement execution.
43#[derive(Debug)]
44pub struct DdlResult {
45    /// The streaming plan describing what was created or registered
46    pub plan: StreamingPlan,
47}
48
49/// Result of a query execution.
50///
51/// Contains both the `DataFusion` record batch stream and optional
52/// streaming metadata (window config, join config, emit clause) from
53/// the `QueryPlan`. Ring 0 operators use the `query_plan` to configure
54/// windowing and join behavior.
55pub struct QueryResult {
56    /// Record batch stream from `DataFusion` execution
57    pub stream: SendableRecordBatchStream,
58    /// Streaming query metadata (window config, join config, etc.)
59    ///
60    /// `None` for standard SQL pass-through queries.
61    /// `Some` for queries with streaming features (windows, joins).
62    pub query_plan: Option<QueryPlan>,
63}
64
65impl std::fmt::Debug for QueryResult {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("QueryResult")
68            .field("query_plan", &self.query_plan)
69            .field("stream", &"<SendableRecordBatchStream>")
70            .finish()
71    }
72}
73
74/// Executes a streaming SQL statement end-to-end.
75///
76/// This function performs the full pipeline:
77/// 1. Parse SQL with streaming extensions (CREATE SOURCE/SINK, windows, etc.)
78/// 2. Plan via [`StreamingPlanner`]
79/// 3. For DDL: return the streaming plan as [`DdlResult`]
80/// 4. For queries with streaming features: create `LogicalPlan` via
81///    `DataFusion`, execute, and return stream + [`QueryPlan`] metadata
82/// 5. For standard SQL: pass through to `DataFusion` directly
83///
84/// # Arguments
85///
86/// * `sql` - The SQL statement to execute
87/// * `ctx` - `DataFusion` session context (should have streaming functions registered)
88/// * `planner` - Streaming planner with registered sources/sinks
89///
90/// # Errors
91///
92/// Returns [`Error`] if parsing, planning, or execution fails.
93pub async fn execute_streaming_sql(
94    sql: &str,
95    ctx: &SessionContext,
96    planner: &mut StreamingPlanner,
97) -> std::result::Result<StreamingSqlResult, Error> {
98    let statements = parse_streaming_sql(sql)?;
99
100    if statements.is_empty() {
101        return Err(Error::ParseError(
102            crate::parser::ParseError::StreamingError("Empty SQL statement".to_string()),
103        ));
104    }
105
106    // Process the first statement
107    let statement = &statements[0];
108    let plan = planner.plan(statement)?;
109
110    match plan {
111        StreamingPlan::RegisterSource(_) | StreamingPlan::RegisterSink(_) => {
112            Ok(StreamingSqlResult::Ddl(DdlResult { plan }))
113        }
114        StreamingPlan::Query(mut query_plan) => {
115            // Rewrite INTERVAL arithmetic for BIGINT timestamp columns
116            rewrite_interval_arithmetic(&mut query_plan.statement);
117            let logical_plan = planner.to_logical_plan(&query_plan, ctx).await?;
118            let df = ctx.execute_logical_plan(logical_plan).await?;
119            let stream = df.execute_stream().await?;
120
121            Ok(StreamingSqlResult::Query(QueryResult {
122                stream,
123                query_plan: Some(query_plan),
124            }))
125        }
126        StreamingPlan::Standard(mut stmt) => {
127            // Rewrite INTERVAL arithmetic for BIGINT timestamp columns
128            rewrite_interval_arithmetic(&mut stmt);
129            let sql_str = stmt.to_string();
130            let df = ctx.sql(&sql_str).await?;
131            let stream = df.execute_stream().await?;
132
133            Ok(StreamingSqlResult::Query(QueryResult {
134                stream,
135                query_plan: None,
136            }))
137        }
138        StreamingPlan::DagExplain(_)
139        | StreamingPlan::RegisterLookupTable(_)
140        | StreamingPlan::DropLookupTable { .. } => Ok(StreamingSqlResult::Ddl(DdlResult { plan })),
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::datafusion::create_streaming_context;
148
149    #[tokio::test]
150    async fn test_execute_ddl_source() {
151        let ctx = create_streaming_context();
152        crate::datafusion::register_streaming_functions(&ctx);
153        let mut planner = StreamingPlanner::new();
154
155        let result = execute_streaming_sql(
156            "CREATE SOURCE events (id INT, name VARCHAR)",
157            &ctx,
158            &mut planner,
159        )
160        .await
161        .unwrap();
162
163        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
164    }
165
166    #[tokio::test]
167    async fn test_execute_ddl_sink() {
168        let ctx = create_streaming_context();
169        crate::datafusion::register_streaming_functions(&ctx);
170        let mut planner = StreamingPlanner::new();
171
172        // Register source first
173        execute_streaming_sql(
174            "CREATE SOURCE events (id INT, name VARCHAR)",
175            &ctx,
176            &mut planner,
177        )
178        .await
179        .unwrap();
180
181        let result = execute_streaming_sql("CREATE SINK output FROM events", &ctx, &mut planner)
182            .await
183            .unwrap();
184
185        assert!(matches!(result, StreamingSqlResult::Ddl(_)));
186    }
187
188    #[tokio::test]
189    async fn test_execute_empty_sql_error() {
190        let ctx = create_streaming_context();
191        let mut planner = StreamingPlanner::new();
192
193        let result = execute_streaming_sql("", &ctx, &mut planner).await;
194        assert!(result.is_err());
195    }
196
197    #[tokio::test]
198    async fn test_execute_standard_passthrough() {
199        use futures::StreamExt;
200
201        let ctx = create_streaming_context();
202        crate::datafusion::register_streaming_functions(&ctx);
203        let mut planner = StreamingPlanner::new();
204
205        // Simple SELECT 1 goes through DataFusion directly
206        let result = execute_streaming_sql("SELECT 1 as value", &ctx, &mut planner)
207            .await
208            .unwrap();
209
210        match result {
211            StreamingSqlResult::Query(qr) => {
212                assert!(qr.query_plan.is_none());
213                let mut stream = qr.stream;
214                let batch = stream.next().await.unwrap().unwrap();
215                assert_eq!(batch.num_rows(), 1);
216            }
217            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
218        }
219    }
220
221    #[tokio::test]
222    async fn test_execute_standard_query_with_table() {
223        use arrow_array::{Int64Array, RecordBatch, StringArray};
224        use arrow_schema::{DataType, Field, Schema};
225        use futures::StreamExt;
226        use std::sync::Arc;
227
228        let ctx = create_streaming_context();
229        crate::datafusion::register_streaming_functions(&ctx);
230
231        let schema = Arc::new(Schema::new(vec![
232            Field::new("id", DataType::Int64, false),
233            Field::new("name", DataType::Utf8, true),
234        ]));
235
236        let source = Arc::new(crate::datafusion::ChannelStreamSource::new(Arc::clone(
237            &schema,
238        )));
239        let sender = source.take_sender().expect("sender available");
240        let provider = crate::datafusion::StreamingTableProvider::new("users", source);
241        ctx.register_table("users", Arc::new(provider)).unwrap();
242
243        // Send data and close channel
244        let batch = RecordBatch::try_new(
245            Arc::clone(&schema),
246            vec![
247                Arc::new(Int64Array::from(vec![1, 2])),
248                Arc::new(StringArray::from(vec!["alice", "bob"])),
249            ],
250        )
251        .unwrap();
252        sender.send(batch).await.unwrap();
253        drop(sender);
254
255        let mut planner = StreamingPlanner::new();
256        let result = execute_streaming_sql("SELECT id, name FROM users", &ctx, &mut planner)
257            .await
258            .unwrap();
259
260        match result {
261            StreamingSqlResult::Query(qr) => {
262                assert!(qr.query_plan.is_none()); // Standard query
263                let mut stream = qr.stream;
264                let mut total = 0;
265                while let Some(batch) = stream.next().await {
266                    total += batch.unwrap().num_rows();
267                }
268                assert_eq!(total, 2);
269            }
270            StreamingSqlResult::Ddl(_) => panic!("Expected Query result"),
271        }
272    }
273}