Skip to main content

laminar_core/compiler/
orchestrate.rs

1//! SQL compiler orchestrator — single entry point for `LogicalPlan` → [`StreamingQuery`].
2//!
3//! [`compile_streaming_query`] coordinates the full compilation pipeline:
4//! 1. Extract pipelines from a `DataFusion` [`LogicalPlan`]
5//! 2. Detect breakers (stateful operators) → return `None` for plans that can't be fully compiled
6//! 3. Compile each pipeline segment via [`ExecutablePipeline::try_compile`]
7//! 4. Wire each compiled pipeline to a [`super::PipelineBridge`] + [`super::BridgeConsumer`]
8//! 5. Assemble into a [`StreamingQuery`] ready for execution
9//!
10//! # Usage
11//!
12//! ```ignore
13//! use laminar_core::compiler::orchestrate::compile_streaming_query;
14//! use laminar_core::compiler::{CompilerCache, QueryConfig};
15//!
16//! let mut cache = CompilerCache::new(64)?;
17//! let config = QueryConfig::default();
18//! match compile_streaming_query(sql, &logical_plan, &mut cache, &config)? {
19//!     Some(compiled) => { /* use compiled.query, compiled.source_plan, etc. */ }
20//!     None => { /* fall back to DataFusion interpreted execution */ }
21//! }
22//! ```
23
24use std::sync::Arc;
25use std::time::Instant;
26
27use datafusion_expr::LogicalPlan;
28
29use super::cache::CompilerCache;
30use super::error::CompileError;
31use super::extractor::PipelineExtractor;
32use super::fallback::ExecutablePipeline;
33use super::metrics::{QueryConfig, QueryMetadata};
34use super::pipeline_bridge::create_pipeline_bridge;
35use super::query::{StreamingQuery, StreamingQueryBuilder};
36use super::row::RowSchema;
37
38/// Result of successful compilation — ready for execution.
39pub struct CompiledStreamingQuery {
40    /// The compiled [`StreamingQuery`], in `Ready` state. Call `.start()` before use.
41    pub query: StreamingQuery,
42    /// [`LogicalPlan`] containing only the source scan (TableScan/EmptyRelation).
43    /// The caller executes this via `DataFusion` to get the input record batch stream.
44    pub source_plan: LogicalPlan,
45    /// Input [`RowSchema`] for `RecordBatch` → `EventRow` conversion.
46    pub input_schema: Arc<RowSchema>,
47    /// Output [`RowSchema`] (may differ from input if projection changes columns).
48    pub output_schema: Arc<RowSchema>,
49    /// Compilation metadata for diagnostics.
50    pub metadata: QueryMetadata,
51}
52
53impl std::fmt::Debug for CompiledStreamingQuery {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("CompiledStreamingQuery")
56            .field("query", &self.query)
57            .field("input_fields", &self.input_schema.field_count())
58            .field("output_fields", &self.output_schema.field_count())
59            .field("metadata", &self.metadata)
60            .finish_non_exhaustive()
61    }
62}
63
64/// Attempts to compile a `DataFusion` [`LogicalPlan`] into a [`StreamingQuery`].
65///
66/// Returns `Ok(None)` when the plan contains stateful operators (aggregations,
67/// sorts, joins) that cannot be compiled — the caller should fall back to
68/// `DataFusion` interpreted execution.
69///
70/// Returns `Ok(Some(compiled))` with a ready-to-start [`StreamingQuery`], the
71/// source scan plan for `DataFusion` to execute, and the input/output schemas
72/// for `RecordBatch` ↔ `EventRow` conversion.
73///
74/// # Errors
75///
76/// Returns [`CompileError`] only on hard failures (e.g., Cranelift context
77/// initialization failure, or the plan has no source scan). Soft failures
78/// (unsupported expressions) result in fallback pipelines within the query.
79pub fn compile_streaming_query(
80    sql: &str,
81    plan: &LogicalPlan,
82    cache: &mut CompilerCache,
83    config: &QueryConfig,
84) -> Result<Option<CompiledStreamingQuery>, CompileError> {
85    // 1. Extract pipelines from the logical plan.
86    let extract_start = Instant::now();
87    let Ok(extracted) = PipelineExtractor::extract(plan) else {
88        return Ok(None);
89    };
90    let extract_time = extract_start.elapsed();
91
92    // 2. If there are breakers (stateful operators), return None — can't fully compile.
93    if !extracted.breakers.is_empty() {
94        return Ok(None);
95    }
96
97    // 3. Extract the source scan from the plan.
98    let Some(source_plan) = extract_table_scan(plan) else {
99        return Ok(None);
100    };
101
102    // 4. Build input schema from the source plan's output.
103    let source_arrow_schema = Arc::new(source_plan.schema().as_arrow().clone());
104    let input_schema = Arc::new(
105        RowSchema::from_arrow(&source_arrow_schema)
106            .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?,
107    );
108
109    // 5. Compile each pipeline and wire bridges.
110    let compile_start = Instant::now();
111    let mut builder = StreamingQueryBuilder::new(sql);
112    let mut compiled_count = 0_usize;
113    let mut fallback_count = 0_usize;
114    let mut final_output_schema = Arc::clone(&input_schema);
115
116    for pipeline in &extracted.pipelines {
117        let exec = ExecutablePipeline::try_compile(cache, pipeline);
118
119        if exec.is_compiled() {
120            compiled_count += 1;
121        } else {
122            fallback_count += 1;
123        }
124
125        // Determine the output schema for this pipeline's bridge.
126        let pipeline_output_arrow = Arc::clone(&pipeline.output_schema);
127        let pipeline_output_row = Arc::new(
128            RowSchema::from_arrow(&pipeline_output_arrow)
129                .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?,
130        );
131
132        let (bridge, consumer) = create_pipeline_bridge(
133            Arc::clone(&pipeline_output_row),
134            config.queue_capacity,
135            config.output_buffer_size,
136            config.batch_policy.clone(),
137            config.backpressure.clone(),
138        )
139        .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?;
140
141        builder = builder.add_pipeline(exec, bridge, consumer, Arc::clone(&pipeline_output_row));
142        final_output_schema = pipeline_output_row;
143    }
144    let compile_time = compile_start.elapsed();
145
146    // 6. Build metadata.
147    let metadata = QueryMetadata {
148        extract_time,
149        compile_time,
150        compiled_pipeline_count: compiled_count,
151        fallback_pipeline_count: fallback_count,
152        jit_enabled: true,
153        ..Default::default()
154    };
155
156    // 7. Build the StreamingQuery.
157    let query = builder
158        .with_metadata(metadata.clone())
159        .build()
160        .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?;
161
162    Ok(Some(CompiledStreamingQuery {
163        query,
164        source_plan,
165        input_schema,
166        output_schema: final_output_schema,
167        metadata,
168    }))
169}
170
171/// Recursively walks a [`LogicalPlan`] to extract the first source scan node.
172///
173/// Returns the scan as a standalone [`LogicalPlan`] suitable for execution
174/// via `DataFusion`. Recognizes `TableScan` and `EmptyRelation` as sources.
175fn extract_table_scan(plan: &LogicalPlan) -> Option<LogicalPlan> {
176    match plan {
177        LogicalPlan::TableScan(_) | LogicalPlan::EmptyRelation(_) => Some(plan.clone()),
178        _ => plan.inputs().into_iter().find_map(extract_table_scan),
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use arrow_schema::{DataType, Field, Schema};
186    use datafusion_common::DFSchema;
187    use datafusion_expr::{col, lit, LogicalPlanBuilder};
188
189    fn table_scan_plan(fields: Vec<(&str, DataType)>) -> LogicalPlan {
190        let arrow_schema = Arc::new(Schema::new(
191            fields
192                .into_iter()
193                .map(|(name, dt)| Field::new(name, dt, true))
194                .collect::<Vec<_>>(),
195        ));
196        let df_schema = DFSchema::try_from(arrow_schema.as_ref().clone()).unwrap();
197        LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation {
198            produce_one_row: false,
199            schema: Arc::new(df_schema),
200        })
201    }
202
203    fn new_cache() -> CompilerCache {
204        CompilerCache::new(64).unwrap()
205    }
206
207    // ── Compilation tests ───────────────────────────────────────────
208
209    #[test]
210    fn compile_simple_filter() {
211        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
212        let plan = LogicalPlanBuilder::from(scan)
213            .filter(col("x").gt(lit(10_i64)))
214            .unwrap()
215            .build()
216            .unwrap();
217
218        let mut cache = new_cache();
219        let config = QueryConfig::default();
220        let result =
221            compile_streaming_query("SELECT x FROM t WHERE x > 10", &plan, &mut cache, &config);
222        let compiled = result.unwrap().unwrap();
223
224        assert_eq!(compiled.input_schema.field_count(), 1);
225        assert_eq!(compiled.output_schema.field_count(), 1);
226        assert!(
227            compiled.metadata.compiled_pipeline_count > 0
228                || compiled.metadata.fallback_pipeline_count > 0
229        );
230        assert!(compiled.metadata.jit_enabled);
231    }
232
233    #[test]
234    fn compile_filter_and_project() {
235        let scan = table_scan_plan(vec![("x", DataType::Int64), ("y", DataType::Int64)]);
236        let plan = LogicalPlanBuilder::from(scan)
237            .filter(col("x").gt(lit(0_i64)))
238            .unwrap()
239            .project(vec![col("x"), col("y") + lit(1_i64)])
240            .unwrap()
241            .build()
242            .unwrap();
243
244        let mut cache = new_cache();
245        let config = QueryConfig::default();
246        let result = compile_streaming_query(
247            "SELECT x, y + 1 FROM t WHERE x > 0",
248            &plan,
249            &mut cache,
250            &config,
251        );
252        let compiled = result.unwrap().unwrap();
253        assert_eq!(compiled.input_schema.field_count(), 2);
254        assert_eq!(compiled.output_schema.field_count(), 2);
255    }
256
257    #[test]
258    fn compile_passthrough() {
259        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
260        let mut cache = new_cache();
261        let config = QueryConfig::default();
262        let result = compile_streaming_query("SELECT x FROM t", &scan, &mut cache, &config);
263        let compiled = result.unwrap().unwrap();
264        assert_eq!(compiled.query.pipeline_count(), 1);
265    }
266
267    // ── Breaker detection tests ─────────────────────────────────────
268
269    #[test]
270    fn aggregate_returns_none() {
271        let scan = table_scan_plan(vec![("key", DataType::Int64), ("val", DataType::Int64)]);
272        let agg_schema = Arc::new(Schema::new(vec![Field::new("key", DataType::Int64, true)]));
273        let df_schema = DFSchema::try_from(agg_schema.as_ref().clone()).unwrap();
274        let agg = datafusion_expr::Aggregate::try_new_with_schema(
275            Arc::new(scan),
276            vec![col("key")],
277            vec![],
278            Arc::new(df_schema),
279        )
280        .unwrap();
281        let plan = LogicalPlan::Aggregate(agg);
282
283        let mut cache = new_cache();
284        let config = QueryConfig::default();
285        let result =
286            compile_streaming_query("SELECT key FROM t GROUP BY key", &plan, &mut cache, &config);
287        assert!(result.unwrap().is_none());
288    }
289
290    #[test]
291    fn sort_returns_none() {
292        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
293        let plan = LogicalPlanBuilder::from(scan)
294            .sort(vec![col("x").sort(true, true)])
295            .unwrap()
296            .build()
297            .unwrap();
298
299        let mut cache = new_cache();
300        let config = QueryConfig::default();
301        let result =
302            compile_streaming_query("SELECT x FROM t ORDER BY x", &plan, &mut cache, &config);
303        assert!(result.unwrap().is_none());
304    }
305
306    #[test]
307    fn limit_returns_none() {
308        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
309        let plan = LogicalPlanBuilder::from(scan)
310            .limit(0, Some(10))
311            .unwrap()
312            .build()
313            .unwrap();
314
315        let mut cache = new_cache();
316        let config = QueryConfig::default();
317        let result =
318            compile_streaming_query("SELECT x FROM t LIMIT 10", &plan, &mut cache, &config);
319        assert!(result.unwrap().is_none());
320    }
321
322    // ── Source extraction tests ──────────────────────────────────────
323
324    #[test]
325    fn extract_scan_from_simple_plan() {
326        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
327        let found = extract_table_scan(&scan);
328        assert!(found.is_some());
329    }
330
331    #[test]
332    fn extract_scan_from_nested_plan() {
333        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
334        let plan = LogicalPlanBuilder::from(scan)
335            .filter(col("x").gt(lit(0_i64)))
336            .unwrap()
337            .build()
338            .unwrap();
339        let found = extract_table_scan(&plan);
340        assert!(found.is_some());
341        assert!(matches!(found.unwrap(), LogicalPlan::EmptyRelation(_)));
342    }
343
344    // ── Cache interaction tests ─────────────────────────────────────
345
346    #[test]
347    fn repeat_compilation_uses_cache() {
348        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
349        let plan = LogicalPlanBuilder::from(scan)
350            .filter(col("x").gt(lit(10_i64)))
351            .unwrap()
352            .build()
353            .unwrap();
354
355        let mut cache = new_cache();
356        let config = QueryConfig::default();
357
358        // First compilation.
359        let _ = compile_streaming_query("SELECT x FROM t WHERE x > 10", &plan, &mut cache, &config)
360            .unwrap();
361        let cache_after_first = cache.len();
362
363        // Second compilation — should use cached pipeline.
364        let _ = compile_streaming_query("SELECT x FROM t WHERE x > 10", &plan, &mut cache, &config)
365            .unwrap();
366        assert_eq!(cache.len(), cache_after_first);
367    }
368
369    #[test]
370    fn different_query_different_cache_entry() {
371        let scan1 = table_scan_plan(vec![("x", DataType::Int64)]);
372        let plan1 = LogicalPlanBuilder::from(scan1)
373            .filter(col("x").gt(lit(10_i64)))
374            .unwrap()
375            .build()
376            .unwrap();
377
378        let scan2 = table_scan_plan(vec![("x", DataType::Int64)]);
379        let plan2 = LogicalPlanBuilder::from(scan2)
380            .filter(col("x").lt(lit(100_i64)))
381            .unwrap()
382            .build()
383            .unwrap();
384
385        let mut cache = new_cache();
386        let config = QueryConfig::default();
387
388        let _ = compile_streaming_query("q1", &plan1, &mut cache, &config).unwrap();
389        let _ = compile_streaming_query("q2", &plan2, &mut cache, &config).unwrap();
390        assert!(cache.len() >= 2);
391    }
392
393    // ── Integration tests ───────────────────────────────────────────
394
395    #[test]
396    fn metadata_populated() {
397        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
398        let plan = LogicalPlanBuilder::from(scan)
399            .filter(col("x").gt(lit(5_i64)))
400            .unwrap()
401            .build()
402            .unwrap();
403
404        let mut cache = new_cache();
405        let config = QueryConfig::default();
406        let compiled = compile_streaming_query("SELECT x WHERE x > 5", &plan, &mut cache, &config)
407            .unwrap()
408            .unwrap();
409
410        assert!(compiled.metadata.jit_enabled);
411        assert!(compiled.metadata.total_pipelines() > 0);
412        assert!(!compiled.metadata.extract_time.is_zero());
413    }
414
415    #[test]
416    fn compiled_query_starts() {
417        let scan = table_scan_plan(vec![("x", DataType::Int64)]);
418        let plan = LogicalPlanBuilder::from(scan)
419            .filter(col("x").gt(lit(0_i64)))
420            .unwrap()
421            .build()
422            .unwrap();
423
424        let mut cache = new_cache();
425        let config = QueryConfig::default();
426        let mut compiled =
427            compile_streaming_query("SELECT x WHERE x > 0", &plan, &mut cache, &config)
428                .unwrap()
429                .unwrap();
430
431        assert!(compiled.query.start().is_ok());
432        assert!(compiled.query.stop().is_ok());
433    }
434}