Skip to main content

laminar_sql/datafusion/
source.rs

1//! Stream source trait for `DataFusion` integration
2//!
3//! This module defines the `StreamSource` trait that bridges `LaminarDB`'s
4//! push-based streaming model with `DataFusion`'s pull-based query execution.
5
6use std::fmt::Debug;
7use std::sync::Arc;
8
9use arrow_schema::SchemaRef;
10use async_trait::async_trait;
11use datafusion::physical_plan::SendableRecordBatchStream;
12use datafusion_common::DataFusionError;
13use datafusion_expr::Expr;
14
15/// Declares a column's sort ordering for a streaming source.
16///
17/// When a source declares output ordering, `DataFusion` can elide unnecessary
18/// `SortExec` nodes from the physical plan, enabling ORDER BY queries on
19/// pre-sorted unbounded streams.
20///
21/// # Example
22///
23/// ```rust,ignore
24/// // Source sorted by event_time ascending
25/// let ordering = vec![SortColumn {
26///     name: "event_time".to_string(),
27///     descending: false,
28///     nulls_first: false,
29/// }];
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct SortColumn {
33    /// Column name to sort by
34    pub name: String,
35    /// Whether the sort is descending (false = ascending)
36    pub descending: bool,
37    /// Whether nulls sort first (before non-null values)
38    pub nulls_first: bool,
39}
40
41impl SortColumn {
42    /// Creates a new ascending sort column with nulls last.
43    #[must_use]
44    pub fn ascending(name: impl Into<String>) -> Self {
45        Self {
46            name: name.into(),
47            descending: false,
48            nulls_first: false,
49        }
50    }
51
52    /// Creates a new descending sort column with nulls last.
53    #[must_use]
54    pub fn descending(name: impl Into<String>) -> Self {
55        Self {
56            name: name.into(),
57            descending: true,
58            nulls_first: false,
59        }
60    }
61
62    /// Sets whether nulls sort first.
63    #[must_use]
64    pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
65        self.nulls_first = nulls_first;
66        self
67    }
68}
69
70/// A source of streaming data for `DataFusion` queries.
71///
72/// This trait enables integration between `LaminarDB`'s push-based event
73/// processing and `DataFusion`'s pull-based query execution model.
74///
75/// Implementations must be thread-safe as `DataFusion` may access them
76/// from multiple threads during query planning and execution.
77///
78/// # Filter Pushdown
79///
80/// Sources can optionally support filter pushdown by implementing
81/// `supports_filters`. When filters are pushed down, the source should
82/// apply them before yielding batches to reduce data transfer.
83///
84/// # Projection Pushdown
85///
86/// Sources should respect the `projection` parameter in `stream()` to
87/// only read columns that are needed by the query, improving performance.
88#[async_trait]
89pub trait StreamSource: Send + Sync + Debug {
90    /// Returns the schema of records produced by this source.
91    ///
92    /// The schema must be consistent across all calls and must match
93    /// the schema of `RecordBatch` instances yielded by `stream()`.
94    fn schema(&self) -> SchemaRef;
95
96    /// Creates a stream of `RecordBatch` instances.
97    ///
98    /// # Arguments
99    ///
100    /// * `projection` - Optional column indices to project. If `None`,
101    ///   all columns are returned. Indices refer to the source schema.
102    /// * `filters` - Filter expressions that can be applied at the source.
103    ///   The source may partially or fully apply these filters.
104    ///
105    /// # Returns
106    ///
107    /// A stream that yields `RecordBatch` instances asynchronously.
108    ///
109    /// # Errors
110    ///
111    /// Returns `DataFusionError` if the stream cannot be created.
112    fn stream(
113        &self,
114        projection: Option<Vec<usize>>,
115        filters: Vec<Expr>,
116    ) -> Result<SendableRecordBatchStream, DataFusionError>;
117
118    /// Returns which filters this source can apply.
119    ///
120    /// For each filter in `filters`, returns `true` if the source will
121    /// apply that filter, `false` otherwise. `DataFusion` uses this to
122    /// know which filters still need to be applied after the scan.
123    ///
124    /// The default implementation returns all `false`, indicating no
125    /// filter pushdown support.
126    ///
127    /// # Arguments
128    ///
129    /// * `filters` - The filters being considered for pushdown.
130    ///
131    /// # Returns
132    ///
133    /// A vector of booleans, one per filter, indicating support.
134    fn supports_filters(&self, filters: &[Expr]) -> Vec<bool> {
135        vec![false; filters.len()]
136    }
137
138    /// Returns the output ordering of this source, if any.
139    ///
140    /// When a source is pre-sorted (e.g., by event time from an ordered
141    /// Kafka partition), declaring the ordering allows `DataFusion` to
142    /// elide `SortExec` from the physical plan for ORDER BY queries that
143    /// match the declared ordering.
144    ///
145    /// The default implementation returns `None`, indicating the source
146    /// has no guaranteed output ordering.
147    fn output_ordering(&self) -> Option<Vec<SortColumn>> {
148        None
149    }
150}
151
152/// A shared reference to a stream source.
153pub type StreamSourceRef = Arc<dyn StreamSource>;
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use arrow_schema::{DataType, Field, Schema};
159
160    #[derive(Debug)]
161    struct MockSource {
162        schema: SchemaRef,
163    }
164
165    #[async_trait]
166    impl StreamSource for MockSource {
167        fn schema(&self) -> SchemaRef {
168            Arc::clone(&self.schema)
169        }
170
171        fn stream(
172            &self,
173            _projection: Option<Vec<usize>>,
174            _filters: Vec<Expr>,
175        ) -> Result<SendableRecordBatchStream, DataFusionError> {
176            // Just testing trait implementation
177            Err(DataFusionError::NotImplemented("mock".to_string()))
178        }
179    }
180
181    #[test]
182    fn test_stream_source_schema() {
183        let schema = Arc::new(Schema::new(vec![
184            Field::new("id", DataType::Int64, false),
185            Field::new("value", DataType::Float64, true),
186        ]));
187
188        let source = MockSource {
189            schema: Arc::clone(&schema),
190        };
191
192        assert_eq!(source.schema(), schema);
193        assert_eq!(source.schema().fields().len(), 2);
194    }
195
196    #[test]
197    fn test_supports_filters_default() {
198        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
199        let source = MockSource { schema };
200
201        // Default implementation returns all false
202        let filters = vec![Expr::Literal(
203            datafusion_common::ScalarValue::Int64(Some(1)),
204            None,
205        )];
206        let supported = source.supports_filters(&filters);
207        assert_eq!(supported, vec![false]);
208    }
209
210    #[test]
211    fn test_sort_column_creation() {
212        let col = SortColumn::ascending("event_time");
213        assert_eq!(col.name, "event_time");
214        assert!(!col.descending);
215        assert!(!col.nulls_first);
216
217        let col = SortColumn::descending("price");
218        assert_eq!(col.name, "price");
219        assert!(col.descending);
220        assert!(!col.nulls_first);
221
222        let col = SortColumn::ascending("ts").with_nulls_first(true);
223        assert!(!col.descending);
224        assert!(col.nulls_first);
225    }
226
227    #[test]
228    fn test_stream_source_default_ordering() {
229        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
230        let source = MockSource { schema };
231
232        // Default implementation returns None
233        assert!(source.output_ordering().is_none());
234    }
235
236    #[test]
237    fn test_stream_source_with_ordering() {
238        #[derive(Debug)]
239        struct OrderedSource {
240            schema: SchemaRef,
241        }
242
243        #[async_trait]
244        impl StreamSource for OrderedSource {
245            fn schema(&self) -> SchemaRef {
246                Arc::clone(&self.schema)
247            }
248
249            fn stream(
250                &self,
251                _projection: Option<Vec<usize>>,
252                _filters: Vec<Expr>,
253            ) -> Result<SendableRecordBatchStream, DataFusionError> {
254                Err(DataFusionError::NotImplemented("mock".to_string()))
255            }
256
257            fn output_ordering(&self) -> Option<Vec<SortColumn>> {
258                Some(vec![SortColumn::ascending("event_time")])
259            }
260        }
261
262        let schema = Arc::new(Schema::new(vec![Field::new(
263            "event_time",
264            DataType::Int64,
265            false,
266        )]));
267        let source = OrderedSource { schema };
268        let ordering = source.output_ordering().unwrap();
269        assert_eq!(ordering.len(), 1);
270        assert_eq!(ordering[0].name, "event_time");
271        assert!(!ordering[0].descending);
272    }
273}