1use std::sync::Arc;
25use std::time::Instant;
26
27use datafusion_expr::LogicalPlan;
28
29use super::cache::CompilerCache;
30use super::error::CompileError;
31use super::extractor::PipelineExtractor;
32use super::fallback::ExecutablePipeline;
33use super::metrics::{QueryConfig, QueryMetadata};
34use super::pipeline_bridge::create_pipeline_bridge;
35use super::query::{StreamingQuery, StreamingQueryBuilder};
36use super::row::RowSchema;
37
38pub struct CompiledStreamingQuery {
40 pub query: StreamingQuery,
42 pub source_plan: LogicalPlan,
45 pub input_schema: Arc<RowSchema>,
47 pub output_schema: Arc<RowSchema>,
49 pub metadata: QueryMetadata,
51}
52
53impl std::fmt::Debug for CompiledStreamingQuery {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("CompiledStreamingQuery")
56 .field("query", &self.query)
57 .field("input_fields", &self.input_schema.field_count())
58 .field("output_fields", &self.output_schema.field_count())
59 .field("metadata", &self.metadata)
60 .finish_non_exhaustive()
61 }
62}
63
64pub fn compile_streaming_query(
80 sql: &str,
81 plan: &LogicalPlan,
82 cache: &mut CompilerCache,
83 config: &QueryConfig,
84) -> Result<Option<CompiledStreamingQuery>, CompileError> {
85 let extract_start = Instant::now();
87 let Ok(extracted) = PipelineExtractor::extract(plan) else {
88 return Ok(None);
89 };
90 let extract_time = extract_start.elapsed();
91
92 if !extracted.breakers.is_empty() {
94 return Ok(None);
95 }
96
97 let Some(source_plan) = extract_table_scan(plan) else {
99 return Ok(None);
100 };
101
102 let source_arrow_schema = Arc::new(source_plan.schema().as_arrow().clone());
104 let input_schema = Arc::new(
105 RowSchema::from_arrow(&source_arrow_schema)
106 .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?,
107 );
108
109 let compile_start = Instant::now();
111 let mut builder = StreamingQueryBuilder::new(sql);
112 let mut compiled_count = 0_usize;
113 let mut fallback_count = 0_usize;
114 let mut final_output_schema = Arc::clone(&input_schema);
115
116 for pipeline in &extracted.pipelines {
117 let exec = ExecutablePipeline::try_compile(cache, pipeline);
118
119 if exec.is_compiled() {
120 compiled_count += 1;
121 } else {
122 fallback_count += 1;
123 }
124
125 let pipeline_output_arrow = Arc::clone(&pipeline.output_schema);
127 let pipeline_output_row = Arc::new(
128 RowSchema::from_arrow(&pipeline_output_arrow)
129 .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?,
130 );
131
132 let (bridge, consumer) = create_pipeline_bridge(
133 Arc::clone(&pipeline_output_row),
134 config.queue_capacity,
135 config.output_buffer_size,
136 config.batch_policy.clone(),
137 config.backpressure.clone(),
138 )
139 .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?;
140
141 builder = builder.add_pipeline(exec, bridge, consumer, Arc::clone(&pipeline_output_row));
142 final_output_schema = pipeline_output_row;
143 }
144 let compile_time = compile_start.elapsed();
145
146 let metadata = QueryMetadata {
148 extract_time,
149 compile_time,
150 compiled_pipeline_count: compiled_count,
151 fallback_pipeline_count: fallback_count,
152 jit_enabled: true,
153 ..Default::default()
154 };
155
156 let query = builder
158 .with_metadata(metadata.clone())
159 .build()
160 .map_err(|e| CompileError::UnsupportedExpr(e.to_string()))?;
161
162 Ok(Some(CompiledStreamingQuery {
163 query,
164 source_plan,
165 input_schema,
166 output_schema: final_output_schema,
167 metadata,
168 }))
169}
170
171fn extract_table_scan(plan: &LogicalPlan) -> Option<LogicalPlan> {
176 match plan {
177 LogicalPlan::TableScan(_) | LogicalPlan::EmptyRelation(_) => Some(plan.clone()),
178 _ => plan.inputs().into_iter().find_map(extract_table_scan),
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use arrow_schema::{DataType, Field, Schema};
186 use datafusion_common::DFSchema;
187 use datafusion_expr::{col, lit, LogicalPlanBuilder};
188
189 fn table_scan_plan(fields: Vec<(&str, DataType)>) -> LogicalPlan {
190 let arrow_schema = Arc::new(Schema::new(
191 fields
192 .into_iter()
193 .map(|(name, dt)| Field::new(name, dt, true))
194 .collect::<Vec<_>>(),
195 ));
196 let df_schema = DFSchema::try_from(arrow_schema.as_ref().clone()).unwrap();
197 LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation {
198 produce_one_row: false,
199 schema: Arc::new(df_schema),
200 })
201 }
202
203 fn new_cache() -> CompilerCache {
204 CompilerCache::new(64).unwrap()
205 }
206
207 #[test]
210 fn compile_simple_filter() {
211 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
212 let plan = LogicalPlanBuilder::from(scan)
213 .filter(col("x").gt(lit(10_i64)))
214 .unwrap()
215 .build()
216 .unwrap();
217
218 let mut cache = new_cache();
219 let config = QueryConfig::default();
220 let result =
221 compile_streaming_query("SELECT x FROM t WHERE x > 10", &plan, &mut cache, &config);
222 let compiled = result.unwrap().unwrap();
223
224 assert_eq!(compiled.input_schema.field_count(), 1);
225 assert_eq!(compiled.output_schema.field_count(), 1);
226 assert!(
227 compiled.metadata.compiled_pipeline_count > 0
228 || compiled.metadata.fallback_pipeline_count > 0
229 );
230 assert!(compiled.metadata.jit_enabled);
231 }
232
233 #[test]
234 fn compile_filter_and_project() {
235 let scan = table_scan_plan(vec![("x", DataType::Int64), ("y", DataType::Int64)]);
236 let plan = LogicalPlanBuilder::from(scan)
237 .filter(col("x").gt(lit(0_i64)))
238 .unwrap()
239 .project(vec![col("x"), col("y") + lit(1_i64)])
240 .unwrap()
241 .build()
242 .unwrap();
243
244 let mut cache = new_cache();
245 let config = QueryConfig::default();
246 let result = compile_streaming_query(
247 "SELECT x, y + 1 FROM t WHERE x > 0",
248 &plan,
249 &mut cache,
250 &config,
251 );
252 let compiled = result.unwrap().unwrap();
253 assert_eq!(compiled.input_schema.field_count(), 2);
254 assert_eq!(compiled.output_schema.field_count(), 2);
255 }
256
257 #[test]
258 fn compile_passthrough() {
259 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
260 let mut cache = new_cache();
261 let config = QueryConfig::default();
262 let result = compile_streaming_query("SELECT x FROM t", &scan, &mut cache, &config);
263 let compiled = result.unwrap().unwrap();
264 assert_eq!(compiled.query.pipeline_count(), 1);
265 }
266
267 #[test]
270 fn aggregate_returns_none() {
271 let scan = table_scan_plan(vec![("key", DataType::Int64), ("val", DataType::Int64)]);
272 let agg_schema = Arc::new(Schema::new(vec![Field::new("key", DataType::Int64, true)]));
273 let df_schema = DFSchema::try_from(agg_schema.as_ref().clone()).unwrap();
274 let agg = datafusion_expr::Aggregate::try_new_with_schema(
275 Arc::new(scan),
276 vec![col("key")],
277 vec![],
278 Arc::new(df_schema),
279 )
280 .unwrap();
281 let plan = LogicalPlan::Aggregate(agg);
282
283 let mut cache = new_cache();
284 let config = QueryConfig::default();
285 let result =
286 compile_streaming_query("SELECT key FROM t GROUP BY key", &plan, &mut cache, &config);
287 assert!(result.unwrap().is_none());
288 }
289
290 #[test]
291 fn sort_returns_none() {
292 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
293 let plan = LogicalPlanBuilder::from(scan)
294 .sort(vec![col("x").sort(true, true)])
295 .unwrap()
296 .build()
297 .unwrap();
298
299 let mut cache = new_cache();
300 let config = QueryConfig::default();
301 let result =
302 compile_streaming_query("SELECT x FROM t ORDER BY x", &plan, &mut cache, &config);
303 assert!(result.unwrap().is_none());
304 }
305
306 #[test]
307 fn limit_returns_none() {
308 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
309 let plan = LogicalPlanBuilder::from(scan)
310 .limit(0, Some(10))
311 .unwrap()
312 .build()
313 .unwrap();
314
315 let mut cache = new_cache();
316 let config = QueryConfig::default();
317 let result =
318 compile_streaming_query("SELECT x FROM t LIMIT 10", &plan, &mut cache, &config);
319 assert!(result.unwrap().is_none());
320 }
321
322 #[test]
325 fn extract_scan_from_simple_plan() {
326 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
327 let found = extract_table_scan(&scan);
328 assert!(found.is_some());
329 }
330
331 #[test]
332 fn extract_scan_from_nested_plan() {
333 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
334 let plan = LogicalPlanBuilder::from(scan)
335 .filter(col("x").gt(lit(0_i64)))
336 .unwrap()
337 .build()
338 .unwrap();
339 let found = extract_table_scan(&plan);
340 assert!(found.is_some());
341 assert!(matches!(found.unwrap(), LogicalPlan::EmptyRelation(_)));
342 }
343
344 #[test]
347 fn repeat_compilation_uses_cache() {
348 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
349 let plan = LogicalPlanBuilder::from(scan)
350 .filter(col("x").gt(lit(10_i64)))
351 .unwrap()
352 .build()
353 .unwrap();
354
355 let mut cache = new_cache();
356 let config = QueryConfig::default();
357
358 let _ = compile_streaming_query("SELECT x FROM t WHERE x > 10", &plan, &mut cache, &config)
360 .unwrap();
361 let cache_after_first = cache.len();
362
363 let _ = compile_streaming_query("SELECT x FROM t WHERE x > 10", &plan, &mut cache, &config)
365 .unwrap();
366 assert_eq!(cache.len(), cache_after_first);
367 }
368
369 #[test]
370 fn different_query_different_cache_entry() {
371 let scan1 = table_scan_plan(vec![("x", DataType::Int64)]);
372 let plan1 = LogicalPlanBuilder::from(scan1)
373 .filter(col("x").gt(lit(10_i64)))
374 .unwrap()
375 .build()
376 .unwrap();
377
378 let scan2 = table_scan_plan(vec![("x", DataType::Int64)]);
379 let plan2 = LogicalPlanBuilder::from(scan2)
380 .filter(col("x").lt(lit(100_i64)))
381 .unwrap()
382 .build()
383 .unwrap();
384
385 let mut cache = new_cache();
386 let config = QueryConfig::default();
387
388 let _ = compile_streaming_query("q1", &plan1, &mut cache, &config).unwrap();
389 let _ = compile_streaming_query("q2", &plan2, &mut cache, &config).unwrap();
390 assert!(cache.len() >= 2);
391 }
392
393 #[test]
396 fn metadata_populated() {
397 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
398 let plan = LogicalPlanBuilder::from(scan)
399 .filter(col("x").gt(lit(5_i64)))
400 .unwrap()
401 .build()
402 .unwrap();
403
404 let mut cache = new_cache();
405 let config = QueryConfig::default();
406 let compiled = compile_streaming_query("SELECT x WHERE x > 5", &plan, &mut cache, &config)
407 .unwrap()
408 .unwrap();
409
410 assert!(compiled.metadata.jit_enabled);
411 assert!(compiled.metadata.total_pipelines() > 0);
412 assert!(!compiled.metadata.extract_time.is_zero());
413 }
414
415 #[test]
416 fn compiled_query_starts() {
417 let scan = table_scan_plan(vec![("x", DataType::Int64)]);
418 let plan = LogicalPlanBuilder::from(scan)
419 .filter(col("x").gt(lit(0_i64)))
420 .unwrap()
421 .build()
422 .unwrap();
423
424 let mut cache = new_cache();
425 let config = QueryConfig::default();
426 let mut compiled =
427 compile_streaming_query("SELECT x WHERE x > 0", &plan, &mut cache, &config)
428 .unwrap()
429 .unwrap();
430
431 assert!(compiled.query.start().is_ok());
432 assert!(compiled.query.stop().is_ok());
433 }
434}