Skip to main content

laminar_sql/datafusion/
exec.rs

1//! Streaming scan execution plan for `DataFusion`
2//!
3//! This module provides `StreamingScanExec`, a `DataFusion` execution plan
4//! that reads from a `StreamSource`. It serves as the leaf node in query
5//! plans for streaming data.
6
7use std::any::Any;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use arrow_schema::{SchemaRef, SortOptions};
12use datafusion::execution::{SendableRecordBatchStream, TaskContext};
13use datafusion::physical_expr::{
14    expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
15};
16use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType, SchedulingType};
17use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
18use datafusion_common::DataFusionError;
19use datafusion_expr::Expr;
20
21use super::source::{SortColumn, StreamSourceRef};
22
23/// A `DataFusion` execution plan that scans from a streaming source.
24///
25/// This is a leaf node in the query plan tree that pulls data from
26/// a `StreamSource` implementation. It handles projection and filter
27/// pushdown to the source when supported.
28///
29/// # Properties
30///
31/// - Single partition (streaming sources are typically not partitioned)
32/// - Unbounded execution mode (streaming)
33/// - No inherent ordering (unless specified by source)
34pub struct StreamingScanExec {
35    /// The streaming source to read from
36    source: StreamSourceRef,
37    /// Output schema (after projection)
38    schema: SchemaRef,
39    /// Column projection (None = all columns)
40    projection: Option<Vec<usize>>,
41    /// Filters pushed down to source
42    filters: Vec<Expr>,
43    /// Cached plan properties
44    properties: PlanProperties,
45}
46
47impl StreamingScanExec {
48    /// Creates a new streaming scan execution plan.
49    ///
50    /// If the source declares an `output_ordering`, the plan's
51    /// `EquivalenceProperties` will include it so `DataFusion` can elide
52    /// `SortExec` for matching ORDER BY queries.
53    pub fn new(
54        source: StreamSourceRef,
55        projection: Option<Vec<usize>>,
56        filters: Vec<Expr>,
57    ) -> Self {
58        let source_schema = source.schema();
59        let source_ordering = source.output_ordering();
60
61        let schema = match &projection {
62            Some(indices) => {
63                let fields: Vec<_> = indices
64                    .iter()
65                    .map(|&i| source_schema.field(i).clone())
66                    .collect();
67                Arc::new(arrow_schema::Schema::new(fields))
68            }
69            None => source_schema,
70        };
71
72        let eq_properties = Self::build_equivalence_properties(&schema, source_ordering.as_deref());
73
74        // SchedulingType::NonCooperative causes DataFusion's EnsureCooperative
75        // optimizer rule to auto-wrap this leaf with CooperativeExec, which
76        // yields to the Tokio executor periodically.
77        let properties = PlanProperties::new(
78            eq_properties,
79            Partitioning::UnknownPartitioning(1),
80            EmissionType::Incremental,
81            Boundedness::Unbounded {
82                requires_infinite_memory: false,
83            },
84        )
85        .with_scheduling_type(SchedulingType::NonCooperative);
86
87        Self {
88            source,
89            schema,
90            projection,
91            filters,
92            properties,
93        }
94    }
95
96    /// Builds `EquivalenceProperties` with optional source ordering.
97    ///
98    /// Converts `SortColumn` declarations into `DataFusion` `PhysicalSortExpr`
99    /// entries. Only columns present in the output schema are included.
100    fn build_equivalence_properties(
101        schema: &SchemaRef,
102        ordering: Option<&[SortColumn]>,
103    ) -> EquivalenceProperties {
104        let mut eq = EquivalenceProperties::new(Arc::clone(schema));
105
106        if let Some(sort_columns) = ordering {
107            let sort_exprs: Vec<PhysicalSortExpr> = sort_columns
108                .iter()
109                .filter_map(|sc| {
110                    // Find column index in the output schema
111                    schema.index_of(&sc.name).ok().map(|idx| {
112                        PhysicalSortExpr::new(
113                            Arc::new(Column::new(&sc.name, idx)),
114                            SortOptions {
115                                descending: sc.descending,
116                                nulls_first: sc.nulls_first,
117                            },
118                        )
119                    })
120                })
121                .collect();
122
123            if !sort_exprs.is_empty() {
124                eq.add_ordering(sort_exprs);
125            }
126        }
127
128        eq
129    }
130
131    /// Returns the streaming source.
132    #[must_use]
133    pub fn source(&self) -> &StreamSourceRef {
134        &self.source
135    }
136
137    /// Returns the column projection.
138    #[must_use]
139    pub fn projection(&self) -> Option<&[usize]> {
140        self.projection.as_deref()
141    }
142
143    /// Returns the pushed-down filters.
144    #[must_use]
145    pub fn filters(&self) -> &[Expr] {
146        &self.filters
147    }
148}
149
150impl Debug for StreamingScanExec {
151    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152        f.debug_struct("StreamingScanExec")
153            .field("source", &self.source)
154            .field("schema", &self.schema)
155            .field("projection", &self.projection)
156            .field("filters", &self.filters)
157            .finish_non_exhaustive()
158    }
159}
160
161impl DisplayAs for StreamingScanExec {
162    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
163        match t {
164            DisplayFormatType::Default | DisplayFormatType::Verbose => {
165                write!(f, "StreamingScanExec: ")?;
166                if let Some(proj) = &self.projection {
167                    write!(f, "projection=[{proj:?}]")?;
168                } else {
169                    write!(f, "projection=[*]")?;
170                }
171                if !self.filters.is_empty() {
172                    write!(f, ", filters={:?}", self.filters)?;
173                }
174                Ok(())
175            }
176            DisplayFormatType::TreeRender => {
177                write!(f, "StreamingScanExec")
178            }
179        }
180    }
181}
182
183impl ExecutionPlan for StreamingScanExec {
184    fn name(&self) -> &'static str {
185        "StreamingScanExec"
186    }
187
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191
192    fn schema(&self) -> SchemaRef {
193        Arc::clone(&self.schema)
194    }
195
196    fn properties(&self) -> &PlanProperties {
197        &self.properties
198    }
199
200    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
201        // Leaf node - no children
202        vec![]
203    }
204
205    fn with_new_children(
206        self: Arc<Self>,
207        children: Vec<Arc<dyn ExecutionPlan>>,
208    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
209        if children.is_empty() {
210            // No changes needed for leaf node
211            Ok(self)
212        } else {
213            Err(DataFusionError::Plan(
214                "StreamingScanExec cannot have children".to_string(),
215            ))
216        }
217    }
218
219    fn execute(
220        &self,
221        partition: usize,
222        _context: Arc<TaskContext>,
223    ) -> Result<SendableRecordBatchStream, DataFusionError> {
224        if partition != 0 {
225            return Err(DataFusionError::Plan(format!(
226                "StreamingScanExec only supports partition 0, got {partition}"
227            )));
228        }
229
230        self.source
231            .stream(self.projection.clone(), self.filters.clone())
232    }
233}
234
235// Required for `DataFusion` to use this execution plan
236impl datafusion::physical_plan::ExecutionPlanProperties for StreamingScanExec {
237    fn output_partitioning(&self) -> &Partitioning {
238        self.properties.output_partitioning()
239    }
240
241    fn output_ordering(&self) -> Option<&LexOrdering> {
242        self.properties.output_ordering()
243    }
244
245    fn boundedness(&self) -> Boundedness {
246        Boundedness::Unbounded {
247            requires_infinite_memory: false,
248        }
249    }
250
251    fn pipeline_behavior(&self) -> EmissionType {
252        EmissionType::Incremental
253    }
254
255    fn equivalence_properties(&self) -> &EquivalenceProperties {
256        self.properties.equivalence_properties()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::datafusion::source::StreamSource;
264    use arrow_schema::{DataType, Field, Schema};
265    use async_trait::async_trait;
266
267    #[derive(Debug)]
268    struct MockSource {
269        schema: SchemaRef,
270        ordering: Option<Vec<SortColumn>>,
271    }
272
273    impl MockSource {
274        fn new(schema: SchemaRef) -> Self {
275            Self {
276                schema,
277                ordering: None,
278            }
279        }
280
281        fn with_ordering(mut self, ordering: Vec<SortColumn>) -> Self {
282            self.ordering = Some(ordering);
283            self
284        }
285    }
286
287    #[async_trait]
288    impl StreamSource for MockSource {
289        fn schema(&self) -> SchemaRef {
290            Arc::clone(&self.schema)
291        }
292
293        fn stream(
294            &self,
295            _projection: Option<Vec<usize>>,
296            _filters: Vec<Expr>,
297        ) -> Result<SendableRecordBatchStream, DataFusionError> {
298            Err(DataFusionError::NotImplemented("mock".to_string()))
299        }
300
301        fn output_ordering(&self) -> Option<Vec<SortColumn>> {
302            self.ordering.clone()
303        }
304    }
305
306    fn test_schema() -> SchemaRef {
307        Arc::new(Schema::new(vec![
308            Field::new("id", DataType::Int64, false),
309            Field::new("name", DataType::Utf8, true),
310            Field::new("value", DataType::Float64, true),
311        ]))
312    }
313
314    #[test]
315    fn test_scan_exec_schema() {
316        let schema = test_schema();
317        let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
318        let exec = StreamingScanExec::new(source, None, vec![]);
319
320        assert_eq!(exec.schema(), schema);
321    }
322
323    #[test]
324    fn test_scan_exec_projection() {
325        let schema = test_schema();
326        let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
327        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
328
329        let output_schema = exec.schema();
330        assert_eq!(output_schema.fields().len(), 2);
331        assert_eq!(output_schema.field(0).name(), "id");
332        assert_eq!(output_schema.field(1).name(), "value");
333    }
334
335    #[test]
336    fn test_scan_exec_properties() {
337        use datafusion::physical_plan::ExecutionPlanProperties;
338
339        let schema = test_schema();
340        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
341        let exec = StreamingScanExec::new(source, None, vec![]);
342
343        // Should be unbounded (streaming)
344        assert!(matches!(exec.boundedness(), Boundedness::Unbounded { .. }));
345
346        // Should be single partition
347        let partitioning = exec.properties().output_partitioning();
348        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(1)));
349
350        // Leaf node has no children
351        assert!(exec.children().is_empty());
352    }
353
354    #[test]
355    fn test_scan_exec_display() {
356        let schema = test_schema();
357        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
358        let exec = StreamingScanExec::new(source, Some(vec![0, 1]), vec![]);
359
360        // Verify it implements DisplayAs by checking the name
361        assert_eq!(exec.name(), "StreamingScanExec");
362        // Debug format should contain the struct info
363        let debug = format!("{exec:?}");
364        assert!(debug.contains("StreamingScanExec"));
365    }
366
367    #[test]
368    fn test_scan_exec_name() {
369        let schema = test_schema();
370        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
371        let exec = StreamingScanExec::new(source, None, vec![]);
372
373        assert_eq!(exec.name(), "StreamingScanExec");
374    }
375
376    // --- Tier 1 ordering tests ---
377
378    #[test]
379    fn test_scan_exec_no_ordering() {
380        use datafusion::physical_plan::ExecutionPlanProperties;
381
382        let schema = test_schema();
383        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
384        let exec = StreamingScanExec::new(source, None, vec![]);
385
386        // No ordering declared -> output_ordering returns None
387        assert!(exec.output_ordering().is_none());
388    }
389
390    #[test]
391    fn test_scan_exec_with_ordering() {
392        use datafusion::physical_plan::ExecutionPlanProperties;
393
394        let schema = test_schema();
395        let source: StreamSourceRef = Arc::new(
396            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
397        );
398        let exec = StreamingScanExec::new(source, None, vec![]);
399
400        // Source ordering declared -> output_ordering returns Some
401        let ordering = exec.output_ordering();
402        assert!(ordering.is_some());
403        let lex = ordering.unwrap();
404        assert_eq!(lex.len(), 1);
405    }
406
407    #[test]
408    fn test_scan_exec_output_ordering_returns_some() {
409        use datafusion::physical_plan::ExecutionPlanProperties;
410
411        let schema = test_schema();
412        let source: StreamSourceRef =
413            Arc::new(MockSource::new(Arc::clone(&schema)).with_ordering(vec![
414                SortColumn::ascending("id"),
415                SortColumn::descending("value"),
416            ]));
417        let exec = StreamingScanExec::new(source, None, vec![]);
418
419        let ordering = exec.output_ordering().unwrap();
420        assert_eq!(ordering.len(), 2);
421    }
422
423    #[test]
424    fn test_scan_exec_ordering_with_projection() {
425        use datafusion::physical_plan::ExecutionPlanProperties;
426
427        let schema = test_schema();
428        // Source ordered by "id" ascending
429        let source: StreamSourceRef = Arc::new(
430            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
431        );
432        // Project only "id" and "value" (indices 0, 2)
433        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
434
435        // "id" is in the projection -> ordering should still be present
436        let ordering = exec.output_ordering();
437        assert!(ordering.is_some());
438    }
439
440    #[test]
441    fn test_scan_exec_ordering_column_not_in_projection() {
442        use datafusion::physical_plan::ExecutionPlanProperties;
443
444        let schema = test_schema();
445        // Source ordered by "name" ascending
446        let source: StreamSourceRef = Arc::new(
447            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("name")]),
448        );
449        // Project only "id" and "value" (indices 0, 2) -- "name" is NOT projected
450        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
451
452        // "name" is not in the projection -> ordering should be None
453        assert!(exec.output_ordering().is_none());
454    }
455
456    // Cooperative scheduling tests
457
458    #[test]
459    fn test_streaming_scan_exec_scheduling_type() {
460        let schema = test_schema();
461        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
462        let exec = StreamingScanExec::new(source, None, vec![]);
463
464        // StreamingScanExec declares NonCooperative so that DataFusion's
465        // EnsureCooperative optimizer auto-wraps it with CooperativeExec.
466        assert_eq!(
467            exec.properties().scheduling_type,
468            SchedulingType::NonCooperative,
469        );
470    }
471
472    #[tokio::test]
473    async fn test_cooperative_exec_wraps_streaming_scan() {
474        use crate::datafusion::{
475            create_streaming_context, ChannelStreamSource, StreamingTableProvider,
476        };
477        use arrow_schema::{DataType, Field, Schema};
478
479        let ctx = create_streaming_context();
480        let schema = Arc::new(Schema::new(vec![
481            Field::new("id", DataType::Int64, false),
482            Field::new("value", DataType::Float64, true),
483        ]));
484
485        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
486        let _sender = source.take_sender();
487        let provider = StreamingTableProvider::new("events", source);
488        ctx.register_table("events", Arc::new(provider)).unwrap();
489
490        // Create a physical plan and verify CooperativeExec wrapping
491        let df = ctx.sql("SELECT id FROM events").await.unwrap();
492        let plan = df.create_physical_plan().await.unwrap();
493        let plan_str = format!(
494            "{}",
495            datafusion::physical_plan::displayable(plan.as_ref()).indent(true)
496        );
497        assert!(
498            plan_str.contains("CooperativeExec"),
499            "Expected CooperativeExec wrapper around StreamingScanExec, got:\n{plan_str}"
500        );
501    }
502}