laminar_sql/datafusion/source.rs
1//! Stream source trait for `DataFusion` integration
2//!
3//! This module defines the `StreamSource` trait that bridges `LaminarDB`'s
4//! push-based streaming model with `DataFusion`'s pull-based query execution.
5
6use std::fmt::Debug;
7use std::sync::Arc;
8
9use arrow_schema::SchemaRef;
10use async_trait::async_trait;
11use datafusion::physical_plan::SendableRecordBatchStream;
12use datafusion_common::DataFusionError;
13use datafusion_expr::Expr;
14
15/// Declares a column's sort ordering for a streaming source.
16///
17/// When a source declares output ordering, `DataFusion` can elide unnecessary
18/// `SortExec` nodes from the physical plan, enabling ORDER BY queries on
19/// pre-sorted unbounded streams.
20///
21/// # Example
22///
23/// ```rust,ignore
24/// // Source sorted by event_time ascending
25/// let ordering = vec![SortColumn {
26/// name: "event_time".to_string(),
27/// descending: false,
28/// nulls_first: false,
29/// }];
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct SortColumn {
33 /// Column name to sort by
34 pub name: String,
35 /// Whether the sort is descending (false = ascending)
36 pub descending: bool,
37 /// Whether nulls sort first (before non-null values)
38 pub nulls_first: bool,
39}
40
41impl SortColumn {
42 /// Creates a new ascending sort column with nulls last.
43 #[must_use]
44 pub fn ascending(name: impl Into<String>) -> Self {
45 Self {
46 name: name.into(),
47 descending: false,
48 nulls_first: false,
49 }
50 }
51
52 /// Creates a new descending sort column with nulls last.
53 #[must_use]
54 pub fn descending(name: impl Into<String>) -> Self {
55 Self {
56 name: name.into(),
57 descending: true,
58 nulls_first: false,
59 }
60 }
61
62 /// Sets whether nulls sort first.
63 #[must_use]
64 pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
65 self.nulls_first = nulls_first;
66 self
67 }
68}
69
70/// A source of streaming data for `DataFusion` queries.
71///
72/// This trait enables integration between `LaminarDB`'s push-based event
73/// processing and `DataFusion`'s pull-based query execution model.
74///
75/// Implementations must be thread-safe as `DataFusion` may access them
76/// from multiple threads during query planning and execution.
77///
78/// # Filter Pushdown
79///
80/// Sources can optionally support filter pushdown by implementing
81/// `supports_filters`. When filters are pushed down, the source should
82/// apply them before yielding batches to reduce data transfer.
83///
84/// # Projection Pushdown
85///
86/// Sources should respect the `projection` parameter in `stream()` to
87/// only read columns that are needed by the query, improving performance.
88#[async_trait]
89pub trait StreamSource: Send + Sync + Debug {
90 /// Returns the schema of records produced by this source.
91 ///
92 /// The schema must be consistent across all calls and must match
93 /// the schema of `RecordBatch` instances yielded by `stream()`.
94 fn schema(&self) -> SchemaRef;
95
96 /// Creates a stream of `RecordBatch` instances.
97 ///
98 /// # Arguments
99 ///
100 /// * `projection` - Optional column indices to project. If `None`,
101 /// all columns are returned. Indices refer to the source schema.
102 /// * `filters` - Filter expressions that can be applied at the source.
103 /// The source may partially or fully apply these filters.
104 ///
105 /// # Returns
106 ///
107 /// A stream that yields `RecordBatch` instances asynchronously.
108 ///
109 /// # Errors
110 ///
111 /// Returns `DataFusionError` if the stream cannot be created.
112 fn stream(
113 &self,
114 projection: Option<Vec<usize>>,
115 filters: Vec<Expr>,
116 ) -> Result<SendableRecordBatchStream, DataFusionError>;
117
118 /// Returns which filters this source can apply.
119 ///
120 /// For each filter in `filters`, returns `true` if the source will
121 /// apply that filter, `false` otherwise. `DataFusion` uses this to
122 /// know which filters still need to be applied after the scan.
123 ///
124 /// The default implementation returns all `false`, indicating no
125 /// filter pushdown support.
126 ///
127 /// # Arguments
128 ///
129 /// * `filters` - The filters being considered for pushdown.
130 ///
131 /// # Returns
132 ///
133 /// A vector of booleans, one per filter, indicating support.
134 fn supports_filters(&self, filters: &[Expr]) -> Vec<bool> {
135 vec![false; filters.len()]
136 }
137
138 /// Returns the output ordering of this source, if any.
139 ///
140 /// When a source is pre-sorted (e.g., by event time from an ordered
141 /// Kafka partition), declaring the ordering allows `DataFusion` to
142 /// elide `SortExec` from the physical plan for ORDER BY queries that
143 /// match the declared ordering.
144 ///
145 /// The default implementation returns `None`, indicating the source
146 /// has no guaranteed output ordering.
147 fn output_ordering(&self) -> Option<Vec<SortColumn>> {
148 None
149 }
150}
151
152/// A shared reference to a stream source.
153pub type StreamSourceRef = Arc<dyn StreamSource>;
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use arrow_schema::{DataType, Field, Schema};
159
160 #[derive(Debug)]
161 struct MockSource {
162 schema: SchemaRef,
163 }
164
165 #[async_trait]
166 impl StreamSource for MockSource {
167 fn schema(&self) -> SchemaRef {
168 Arc::clone(&self.schema)
169 }
170
171 fn stream(
172 &self,
173 _projection: Option<Vec<usize>>,
174 _filters: Vec<Expr>,
175 ) -> Result<SendableRecordBatchStream, DataFusionError> {
176 // Just testing trait implementation
177 Err(DataFusionError::NotImplemented("mock".to_string()))
178 }
179 }
180
181 #[test]
182 fn test_stream_source_schema() {
183 let schema = Arc::new(Schema::new(vec![
184 Field::new("id", DataType::Int64, false),
185 Field::new("value", DataType::Float64, true),
186 ]));
187
188 let source = MockSource {
189 schema: Arc::clone(&schema),
190 };
191
192 assert_eq!(source.schema(), schema);
193 assert_eq!(source.schema().fields().len(), 2);
194 }
195
196 #[test]
197 fn test_supports_filters_default() {
198 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
199 let source = MockSource { schema };
200
201 // Default implementation returns all false
202 let filters = vec![Expr::Literal(
203 datafusion_common::ScalarValue::Int64(Some(1)),
204 None,
205 )];
206 let supported = source.supports_filters(&filters);
207 assert_eq!(supported, vec![false]);
208 }
209
210 #[test]
211 fn test_sort_column_creation() {
212 let col = SortColumn::ascending("event_time");
213 assert_eq!(col.name, "event_time");
214 assert!(!col.descending);
215 assert!(!col.nulls_first);
216
217 let col = SortColumn::descending("price");
218 assert_eq!(col.name, "price");
219 assert!(col.descending);
220 assert!(!col.nulls_first);
221
222 let col = SortColumn::ascending("ts").with_nulls_first(true);
223 assert!(!col.descending);
224 assert!(col.nulls_first);
225 }
226
227 #[test]
228 fn test_stream_source_default_ordering() {
229 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
230 let source = MockSource { schema };
231
232 // Default implementation returns None
233 assert!(source.output_ordering().is_none());
234 }
235
236 #[test]
237 fn test_stream_source_with_ordering() {
238 #[derive(Debug)]
239 struct OrderedSource {
240 schema: SchemaRef,
241 }
242
243 #[async_trait]
244 impl StreamSource for OrderedSource {
245 fn schema(&self) -> SchemaRef {
246 Arc::clone(&self.schema)
247 }
248
249 fn stream(
250 &self,
251 _projection: Option<Vec<usize>>,
252 _filters: Vec<Expr>,
253 ) -> Result<SendableRecordBatchStream, DataFusionError> {
254 Err(DataFusionError::NotImplemented("mock".to_string()))
255 }
256
257 fn output_ordering(&self) -> Option<Vec<SortColumn>> {
258 Some(vec![SortColumn::ascending("event_time")])
259 }
260 }
261
262 let schema = Arc::new(Schema::new(vec![Field::new(
263 "event_time",
264 DataType::Int64,
265 false,
266 )]));
267 let source = OrderedSource { schema };
268 let ordering = source.output_ordering().unwrap();
269 assert_eq!(ordering.len(), 1);
270 assert_eq!(ordering[0].name, "event_time");
271 assert!(!ordering[0].descending);
272 }
273}