1use std::any::Any;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use arrow_schema::{SchemaRef, SortOptions};
12use datafusion::execution::{SendableRecordBatchStream, TaskContext};
13use datafusion::physical_expr::{
14 expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
15};
16use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType, SchedulingType};
17use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
18use datafusion_common::DataFusionError;
19use datafusion_expr::Expr;
20
21use super::source::{SortColumn, StreamSourceRef};
22
23pub struct StreamingScanExec {
35 source: StreamSourceRef,
37 schema: SchemaRef,
39 projection: Option<Vec<usize>>,
41 filters: Vec<Expr>,
43 properties: PlanProperties,
45}
46
47impl StreamingScanExec {
48 pub fn new(
54 source: StreamSourceRef,
55 projection: Option<Vec<usize>>,
56 filters: Vec<Expr>,
57 ) -> Self {
58 let source_schema = source.schema();
59 let source_ordering = source.output_ordering();
60
61 let schema = match &projection {
62 Some(indices) => {
63 let fields: Vec<_> = indices
64 .iter()
65 .map(|&i| source_schema.field(i).clone())
66 .collect();
67 Arc::new(arrow_schema::Schema::new(fields))
68 }
69 None => source_schema,
70 };
71
72 let eq_properties = Self::build_equivalence_properties(&schema, source_ordering.as_deref());
73
74 let properties = PlanProperties::new(
78 eq_properties,
79 Partitioning::UnknownPartitioning(1),
80 EmissionType::Incremental,
81 Boundedness::Unbounded {
82 requires_infinite_memory: false,
83 },
84 )
85 .with_scheduling_type(SchedulingType::NonCooperative);
86
87 Self {
88 source,
89 schema,
90 projection,
91 filters,
92 properties,
93 }
94 }
95
96 fn build_equivalence_properties(
101 schema: &SchemaRef,
102 ordering: Option<&[SortColumn]>,
103 ) -> EquivalenceProperties {
104 let mut eq = EquivalenceProperties::new(Arc::clone(schema));
105
106 if let Some(sort_columns) = ordering {
107 let sort_exprs: Vec<PhysicalSortExpr> = sort_columns
108 .iter()
109 .filter_map(|sc| {
110 schema.index_of(&sc.name).ok().map(|idx| {
112 PhysicalSortExpr::new(
113 Arc::new(Column::new(&sc.name, idx)),
114 SortOptions {
115 descending: sc.descending,
116 nulls_first: sc.nulls_first,
117 },
118 )
119 })
120 })
121 .collect();
122
123 if !sort_exprs.is_empty() {
124 eq.add_ordering(sort_exprs);
125 }
126 }
127
128 eq
129 }
130
131 #[must_use]
133 pub fn source(&self) -> &StreamSourceRef {
134 &self.source
135 }
136
137 #[must_use]
139 pub fn projection(&self) -> Option<&[usize]> {
140 self.projection.as_deref()
141 }
142
143 #[must_use]
145 pub fn filters(&self) -> &[Expr] {
146 &self.filters
147 }
148}
149
150impl Debug for StreamingScanExec {
151 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152 f.debug_struct("StreamingScanExec")
153 .field("source", &self.source)
154 .field("schema", &self.schema)
155 .field("projection", &self.projection)
156 .field("filters", &self.filters)
157 .finish_non_exhaustive()
158 }
159}
160
161impl DisplayAs for StreamingScanExec {
162 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
163 match t {
164 DisplayFormatType::Default | DisplayFormatType::Verbose => {
165 write!(f, "StreamingScanExec: ")?;
166 if let Some(proj) = &self.projection {
167 write!(f, "projection=[{proj:?}]")?;
168 } else {
169 write!(f, "projection=[*]")?;
170 }
171 if !self.filters.is_empty() {
172 write!(f, ", filters={:?}", self.filters)?;
173 }
174 Ok(())
175 }
176 DisplayFormatType::TreeRender => {
177 write!(f, "StreamingScanExec")
178 }
179 }
180 }
181}
182
183impl ExecutionPlan for StreamingScanExec {
184 fn name(&self) -> &'static str {
185 "StreamingScanExec"
186 }
187
188 fn as_any(&self) -> &dyn Any {
189 self
190 }
191
192 fn schema(&self) -> SchemaRef {
193 Arc::clone(&self.schema)
194 }
195
196 fn properties(&self) -> &PlanProperties {
197 &self.properties
198 }
199
200 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
201 vec![]
203 }
204
205 fn with_new_children(
206 self: Arc<Self>,
207 children: Vec<Arc<dyn ExecutionPlan>>,
208 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
209 if children.is_empty() {
210 Ok(self)
212 } else {
213 Err(DataFusionError::Plan(
214 "StreamingScanExec cannot have children".to_string(),
215 ))
216 }
217 }
218
219 fn execute(
220 &self,
221 partition: usize,
222 _context: Arc<TaskContext>,
223 ) -> Result<SendableRecordBatchStream, DataFusionError> {
224 if partition != 0 {
225 return Err(DataFusionError::Plan(format!(
226 "StreamingScanExec only supports partition 0, got {partition}"
227 )));
228 }
229
230 self.source
231 .stream(self.projection.clone(), self.filters.clone())
232 }
233}
234
235impl datafusion::physical_plan::ExecutionPlanProperties for StreamingScanExec {
237 fn output_partitioning(&self) -> &Partitioning {
238 self.properties.output_partitioning()
239 }
240
241 fn output_ordering(&self) -> Option<&LexOrdering> {
242 self.properties.output_ordering()
243 }
244
245 fn boundedness(&self) -> Boundedness {
246 Boundedness::Unbounded {
247 requires_infinite_memory: false,
248 }
249 }
250
251 fn pipeline_behavior(&self) -> EmissionType {
252 EmissionType::Incremental
253 }
254
255 fn equivalence_properties(&self) -> &EquivalenceProperties {
256 self.properties.equivalence_properties()
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use crate::datafusion::source::StreamSource;
264 use arrow_schema::{DataType, Field, Schema};
265 use async_trait::async_trait;
266
267 #[derive(Debug)]
268 struct MockSource {
269 schema: SchemaRef,
270 ordering: Option<Vec<SortColumn>>,
271 }
272
273 impl MockSource {
274 fn new(schema: SchemaRef) -> Self {
275 Self {
276 schema,
277 ordering: None,
278 }
279 }
280
281 fn with_ordering(mut self, ordering: Vec<SortColumn>) -> Self {
282 self.ordering = Some(ordering);
283 self
284 }
285 }
286
287 #[async_trait]
288 impl StreamSource for MockSource {
289 fn schema(&self) -> SchemaRef {
290 Arc::clone(&self.schema)
291 }
292
293 fn stream(
294 &self,
295 _projection: Option<Vec<usize>>,
296 _filters: Vec<Expr>,
297 ) -> Result<SendableRecordBatchStream, DataFusionError> {
298 Err(DataFusionError::NotImplemented("mock".to_string()))
299 }
300
301 fn output_ordering(&self) -> Option<Vec<SortColumn>> {
302 self.ordering.clone()
303 }
304 }
305
306 fn test_schema() -> SchemaRef {
307 Arc::new(Schema::new(vec![
308 Field::new("id", DataType::Int64, false),
309 Field::new("name", DataType::Utf8, true),
310 Field::new("value", DataType::Float64, true),
311 ]))
312 }
313
314 #[test]
315 fn test_scan_exec_schema() {
316 let schema = test_schema();
317 let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
318 let exec = StreamingScanExec::new(source, None, vec![]);
319
320 assert_eq!(exec.schema(), schema);
321 }
322
323 #[test]
324 fn test_scan_exec_projection() {
325 let schema = test_schema();
326 let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
327 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
328
329 let output_schema = exec.schema();
330 assert_eq!(output_schema.fields().len(), 2);
331 assert_eq!(output_schema.field(0).name(), "id");
332 assert_eq!(output_schema.field(1).name(), "value");
333 }
334
335 #[test]
336 fn test_scan_exec_properties() {
337 use datafusion::physical_plan::ExecutionPlanProperties;
338
339 let schema = test_schema();
340 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
341 let exec = StreamingScanExec::new(source, None, vec![]);
342
343 assert!(matches!(exec.boundedness(), Boundedness::Unbounded { .. }));
345
346 let partitioning = exec.properties().output_partitioning();
348 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(1)));
349
350 assert!(exec.children().is_empty());
352 }
353
354 #[test]
355 fn test_scan_exec_display() {
356 let schema = test_schema();
357 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
358 let exec = StreamingScanExec::new(source, Some(vec![0, 1]), vec![]);
359
360 assert_eq!(exec.name(), "StreamingScanExec");
362 let debug = format!("{exec:?}");
364 assert!(debug.contains("StreamingScanExec"));
365 }
366
367 #[test]
368 fn test_scan_exec_name() {
369 let schema = test_schema();
370 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
371 let exec = StreamingScanExec::new(source, None, vec![]);
372
373 assert_eq!(exec.name(), "StreamingScanExec");
374 }
375
376 #[test]
379 fn test_scan_exec_no_ordering() {
380 use datafusion::physical_plan::ExecutionPlanProperties;
381
382 let schema = test_schema();
383 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
384 let exec = StreamingScanExec::new(source, None, vec![]);
385
386 assert!(exec.output_ordering().is_none());
388 }
389
390 #[test]
391 fn test_scan_exec_with_ordering() {
392 use datafusion::physical_plan::ExecutionPlanProperties;
393
394 let schema = test_schema();
395 let source: StreamSourceRef = Arc::new(
396 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
397 );
398 let exec = StreamingScanExec::new(source, None, vec![]);
399
400 let ordering = exec.output_ordering();
402 assert!(ordering.is_some());
403 let lex = ordering.unwrap();
404 assert_eq!(lex.len(), 1);
405 }
406
407 #[test]
408 fn test_scan_exec_output_ordering_returns_some() {
409 use datafusion::physical_plan::ExecutionPlanProperties;
410
411 let schema = test_schema();
412 let source: StreamSourceRef =
413 Arc::new(MockSource::new(Arc::clone(&schema)).with_ordering(vec![
414 SortColumn::ascending("id"),
415 SortColumn::descending("value"),
416 ]));
417 let exec = StreamingScanExec::new(source, None, vec![]);
418
419 let ordering = exec.output_ordering().unwrap();
420 assert_eq!(ordering.len(), 2);
421 }
422
423 #[test]
424 fn test_scan_exec_ordering_with_projection() {
425 use datafusion::physical_plan::ExecutionPlanProperties;
426
427 let schema = test_schema();
428 let source: StreamSourceRef = Arc::new(
430 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
431 );
432 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
434
435 let ordering = exec.output_ordering();
437 assert!(ordering.is_some());
438 }
439
440 #[test]
441 fn test_scan_exec_ordering_column_not_in_projection() {
442 use datafusion::physical_plan::ExecutionPlanProperties;
443
444 let schema = test_schema();
445 let source: StreamSourceRef = Arc::new(
447 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("name")]),
448 );
449 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
451
452 assert!(exec.output_ordering().is_none());
454 }
455
456 #[test]
459 fn test_streaming_scan_exec_scheduling_type() {
460 let schema = test_schema();
461 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
462 let exec = StreamingScanExec::new(source, None, vec![]);
463
464 assert_eq!(
467 exec.properties().scheduling_type,
468 SchedulingType::NonCooperative,
469 );
470 }
471
472 #[tokio::test]
473 async fn test_cooperative_exec_wraps_streaming_scan() {
474 use crate::datafusion::{
475 create_streaming_context, ChannelStreamSource, StreamingTableProvider,
476 };
477 use arrow_schema::{DataType, Field, Schema};
478
479 let ctx = create_streaming_context();
480 let schema = Arc::new(Schema::new(vec![
481 Field::new("id", DataType::Int64, false),
482 Field::new("value", DataType::Float64, true),
483 ]));
484
485 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
486 let _sender = source.take_sender();
487 let provider = StreamingTableProvider::new("events", source);
488 ctx.register_table("events", Arc::new(provider)).unwrap();
489
490 let df = ctx.sql("SELECT id FROM events").await.unwrap();
492 let plan = df.create_physical_plan().await.unwrap();
493 let plan_str = format!(
494 "{}",
495 datafusion::physical_plan::displayable(plan.as_ref()).indent(true)
496 );
497 assert!(
498 plan_str.contains("CooperativeExec"),
499 "Expected CooperativeExec wrapper around StreamingScanExec, got:\n{plan_str}"
500 );
501 }
502}