Skip to main content

laminar_sql/datafusion/
distributed_scan.rs

1//! Pull-path distributed scan: each node holds only its vnode-owned slice of an
2//! MV result, so a `SELECT` unions the local slice with every peer's.
3
4#![allow(clippy::disallowed_types)] // cold path: ad-hoc query planning, not the streaming hot path
5
6use std::any::Any;
7use std::fmt::{self, Debug, Formatter};
8use std::sync::Arc;
9
10use arrow_array::RecordBatch;
11use arrow_schema::SchemaRef;
12use async_trait::async_trait;
13use datafusion::catalog::Session;
14use datafusion::datasource::{TableProvider, TableType};
15use datafusion::error::{DataFusionError, Result};
16use datafusion::execution::{SendableRecordBatchStream, TaskContext};
17use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
18use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
19use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
20use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
21use datafusion::physical_plan::{
22    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
23};
24use datafusion_common::tree_node::{Transformed, TreeNode};
25use datafusion_expr::{Expr, TableProviderFilterPushDown};
26use datafusion_sql::unparser;
27use futures::stream::StreamExt;
28use laminar_core::cluster::control::{remote_scan_client, ClusterController};
29use laminar_core::cluster::discovery::NodeId;
30
31/// Concurrent in-flight `RemoteScan` calls per execution; bounds fan-out.
32const MAX_CONCURRENT_REMOTE: usize = 16;
33
34/// A `TableProvider` unioning a table's node-local rows (via `inner`) with the
35/// rows pulled from every peer.
36pub struct DistributedTableProvider {
37    table_name: String,
38    schema: SchemaRef,
39    inner: Arc<dyn TableProvider>,
40    controller: Arc<ClusterController>,
41}
42
43impl DistributedTableProvider {
44    /// Wrap `inner` so scans fan out across the cluster.
45    #[must_use]
46    pub fn new(
47        table_name: String,
48        schema: SchemaRef,
49        inner: Arc<dyn TableProvider>,
50        controller: Arc<ClusterController>,
51    ) -> Self {
52        Self {
53            table_name,
54            schema,
55            inner,
56            controller,
57        }
58    }
59}
60
61impl Debug for DistributedTableProvider {
62    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
63        f.debug_struct("DistributedTableProvider")
64            .field("table_name", &self.table_name)
65            .finish_non_exhaustive()
66    }
67}
68
69#[async_trait]
70impl TableProvider for DistributedTableProvider {
71    fn as_any(&self) -> &dyn Any {
72        self
73    }
74
75    fn schema(&self) -> SchemaRef {
76        self.schema.clone()
77    }
78
79    fn table_type(&self) -> TableType {
80        self.inner.table_type()
81    }
82
83    fn supports_filters_pushdown(
84        &self,
85        filters: &[&Expr],
86    ) -> Result<Vec<TableProviderFilterPushDown>> {
87        // Renderable filters push to peers as `Inexact` (coordinator's FilterExec
88        // re-applies the exact predicate); the rest stay `Unsupported`.
89        Ok(filters
90            .iter()
91            .map(|f| {
92                if expr_to_sql(f).is_some() {
93                    TableProviderFilterPushDown::Inexact
94                } else {
95                    TableProviderFilterPushDown::Unsupported
96                }
97            })
98            .collect())
99    }
100
101    async fn scan(
102        &self,
103        state: &dyn Session,
104        projection: Option<&Vec<usize>>,
105        filters: &[Expr],
106        _limit: Option<usize>,
107    ) -> Result<Arc<dyn ExecutionPlan>> {
108        // Local slice; filters/limit are re-applied above the union by
109        // DataFusion (pushdown is `Inexact`), so the local scan stays unfiltered.
110        let local = self.inner.scan(state, projection, &[], None).await?;
111
112        // Join renderable filters into one SQL predicate for peers; unrenderable
113        // ones were reported `Unsupported`, so the coordinator still evaluates them.
114        let rendered: Vec<String> = filters.iter().filter_map(expr_to_sql).collect();
115        let filter_sql = (!rendered.is_empty()).then(|| rendered.join(" AND "));
116
117        let me = self.controller.instance_id();
118        let peers: Vec<NodeId> = self
119            .controller
120            .live_instances()
121            .into_iter()
122            .filter(|&id| id != me)
123            .collect();
124
125        Ok(Arc::new(DistributedScanExec::new(
126            local,
127            self.table_name.clone(),
128            projection.cloned(),
129            filter_sql,
130            Arc::clone(&self.controller),
131            peers,
132        )))
133    }
134}
135
136/// Physical plan unioning the local slice (its single child) with the rows
137/// pulled from each peer.
138pub struct DistributedScanExec {
139    local: Arc<dyn ExecutionPlan>,
140    table_name: String,
141    projection: Option<Vec<usize>>,
142    /// Pushed-down predicate (SQL boolean expression) sent to each peer.
143    filter_sql: Option<String>,
144    controller: Arc<ClusterController>,
145    peers: Vec<NodeId>,
146    schema: SchemaRef,
147    properties: PlanProperties,
148    metrics: ExecutionPlanMetricsSet,
149}
150
151impl DistributedScanExec {
152    /// Build the exec over the already-planned local scan.
153    #[must_use]
154    pub fn new(
155        local: Arc<dyn ExecutionPlan>,
156        table_name: String,
157        projection: Option<Vec<usize>>,
158        filter_sql: Option<String>,
159        controller: Arc<ClusterController>,
160        peers: Vec<NodeId>,
161    ) -> Self {
162        let schema = local.schema();
163        let properties = PlanProperties::new(
164            EquivalenceProperties::new(Arc::clone(&schema)),
165            Partitioning::UnknownPartitioning(1),
166            EmissionType::Incremental,
167            Boundedness::Bounded,
168        );
169        Self {
170            local,
171            table_name,
172            projection,
173            filter_sql,
174            controller,
175            peers,
176            schema,
177            properties,
178            metrics: ExecutionPlanMetricsSet::new(),
179        }
180    }
181}
182
183impl Debug for DistributedScanExec {
184    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
185        f.debug_struct("DistributedScanExec")
186            .field("table_name", &self.table_name)
187            .field("peers", &self.peers.len())
188            .finish_non_exhaustive()
189    }
190}
191
192impl DisplayAs for DistributedScanExec {
193    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
194        match t {
195            DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
196                f,
197                "DistributedScanExec: table={}, peers={}",
198                self.table_name,
199                self.peers.len()
200            ),
201            DisplayFormatType::TreeRender => write!(f, "DistributedScanExec"),
202        }
203    }
204}
205
206impl ExecutionPlan for DistributedScanExec {
207    fn name(&self) -> &'static str {
208        "DistributedScanExec"
209    }
210
211    fn as_any(&self) -> &dyn Any {
212        self
213    }
214
215    fn schema(&self) -> SchemaRef {
216        Arc::clone(&self.schema)
217    }
218
219    fn properties(&self) -> &PlanProperties {
220        &self.properties
221    }
222
223    fn metrics(&self) -> Option<MetricsSet> {
224        Some(self.metrics.clone_inner())
225    }
226
227    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
228        vec![&self.local]
229    }
230
231    fn with_new_children(
232        self: Arc<Self>,
233        mut children: Vec<Arc<dyn ExecutionPlan>>,
234    ) -> Result<Arc<dyn ExecutionPlan>> {
235        if children.len() != 1 {
236            return Err(DataFusionError::Plan(
237                "DistributedScanExec requires exactly one child".into(),
238            ));
239        }
240        Ok(Arc::new(Self::new(
241            children.swap_remove(0),
242            self.table_name.clone(),
243            self.projection.clone(),
244            self.filter_sql.clone(),
245            Arc::clone(&self.controller),
246            self.peers.clone(),
247        )))
248    }
249
250    fn execute(
251        &self,
252        partition: usize,
253        context: Arc<TaskContext>,
254    ) -> Result<SendableRecordBatchStream> {
255        if partition != 0 {
256            return Err(DataFusionError::Plan(format!(
257                "DistributedScanExec has a single partition; got {partition}"
258            )));
259        }
260
261        // Local slice: run all child partitions concurrently.
262        let local_partitions = self.local.output_partitioning().partition_count();
263        let mut local_streams = Vec::with_capacity(local_partitions);
264        for p in 0..local_partitions {
265            local_streams.push(self.local.execute(p, Arc::clone(&context))?);
266        }
267        let local = if local_streams.is_empty() {
268            futures::stream::empty().boxed()
269        } else {
270            futures::stream::select_all(local_streams).boxed()
271        };
272
273        // Metrics surface in EXPLAIN ANALYZE: peer establishment failures, peers
274        // skipped as unreachable (a partial result), and total rows unioned.
275        let peer_failures = MetricBuilder::new(&self.metrics).global_counter("peer_failures");
276        let peers_skipped = MetricBuilder::new(&self.metrics).global_counter("peers_skipped");
277        let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
278
279        // Remote slices fetched concurrently and flattened; each peer streams in
280        // chunks. A peer error fails the whole scan; an empty slice is valid.
281        let pool = Arc::clone(self.controller.query_client_pool());
282        let kv = Arc::clone(self.controller.kv());
283        let table = self.table_name.clone();
284        let projection = self.projection.clone();
285        let filter_sql = self.filter_sql.clone();
286        let schema = Arc::clone(&self.schema);
287        let remote = futures::stream::iter(self.peers.clone())
288            .map(move |peer| {
289                let pool = Arc::clone(&pool);
290                let kv = Arc::clone(&kv);
291                let table = table.clone();
292                let projection = projection.clone();
293                let filter_sql = filter_sql.clone();
294                let schema = Arc::clone(&schema);
295                let peer_failures = peer_failures.clone();
296                let peers_skipped = peers_skipped.clone();
297                async move {
298                    match remote_scan_client(&pool, &kv, peer, &table, projection, filter_sql).await
299                    {
300                        // Adapt laminar-core's String-error Arrow stream into a
301                        // DataFusion record-batch stream of this scan's schema.
302                        Ok(Some(chunks)) => Box::pin(RecordBatchStreamAdapter::new(
303                            schema,
304                            chunks.map(|batch| batch.map_err(DataFusionError::Execution)),
305                        )) as SendableRecordBatchStream,
306                        // Peer unreachable (down/pruned): skip rather than fail the
307                        // scan, but count + log so the partial result isn't silent.
308                        Ok(None) => {
309                            peers_skipped.add(1);
310                            tracing::warn!(
311                                peer = peer.0,
312                                table = %table,
313                                "distributed scan: peer unreachable, omitting its slice (partial result)"
314                            );
315                            Box::pin(RecordBatchStreamAdapter::new(
316                                schema,
317                                futures::stream::empty::<Result<RecordBatch>>(),
318                            )) as SendableRecordBatchStream
319                        }
320                        // Couldn't even open the stream: surface as a one-item
321                        // error stream so flatten() propagates the failure.
322                        Err(e) => {
323                            peer_failures.add(1);
324                            Box::pin(RecordBatchStreamAdapter::new(
325                                schema,
326                                futures::stream::once(async move {
327                                    Err::<RecordBatch, _>(DataFusionError::Execution(e))
328                                }),
329                            )) as SendableRecordBatchStream
330                        }
331                    }
332                }
333            })
334            .buffer_unordered(MAX_CONCURRENT_REMOTE)
335            .flatten()
336            .boxed();
337
338        // Run local scan and remote peer fetches concurrently, counting rows.
339        let stream = futures::stream::select(local, remote).inspect(move |r| {
340            if let Ok(batch) = r {
341                output_rows.add(batch.num_rows());
342            }
343        });
344        Ok(Box::pin(RecordBatchStreamAdapter::new(
345            Arc::clone(&self.schema),
346            stream,
347        )))
348    }
349}
350
351/// Render a predicate to SQL for a peer to re-compile via DataFusion's own
352/// `Expr` -> SQL unparser, or `None` if the unparser can't represent it (e.g. a
353/// UDF we don't push down). Pushdown is `Inexact`, so the coordinator re-applies
354/// the exact predicate above the union — an over-broad render is harmless.
355fn expr_to_sql(expr: &Expr) -> Option<String> {
356    let unqualified = strip_column_qualifiers(expr);
357    unparser::expr_to_sql(&unqualified)
358        .ok()
359        .map(|ast| ast.to_string())
360}
361
362/// Strip the relation qualifier from every column so the rendered predicate
363/// resolves against the peer's single (differently-named) table rather than the
364/// coordinator's. The transform is infallible; fall back to the input on the
365/// (unreachable) error path.
366fn strip_column_qualifiers(expr: &Expr) -> Expr {
367    expr.clone()
368        .transform(|e| match e {
369            Expr::Column(mut col) => {
370                col.relation = None;
371                Ok(Transformed::yes(Expr::Column(col)))
372            }
373            other => Ok(Transformed::no(other)),
374        })
375        .map_or_else(|_| expr.clone(), |t| t.data)
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use datafusion_common::{Column, ScalarValue};
382    use datafusion_expr::{col, lit};
383
384    #[test]
385    fn renders_comparison() {
386        let e = col("price").gt(lit(100_i64));
387        assert_eq!(expr_to_sql(&e).as_deref(), Some("(price > 100)"));
388    }
389
390    #[test]
391    fn strips_column_qualifier() {
392        // A column qualified by the coordinator's table renders unqualified so
393        // the peer can resolve it against its own single table.
394        let e = Expr::Column(Column::new(Some("mv"), "price")).gt(lit(100_i64));
395        assert_eq!(expr_to_sql(&e).as_deref(), Some("(price > 100)"));
396    }
397
398    #[test]
399    fn renders_and_or_not_and_null_checks() {
400        let e = col("a").eq(lit(1_i64)).and(col("b").is_null());
401        assert_eq!(expr_to_sql(&e).as_deref(), Some("((a = 1) AND b IS NULL)"));
402
403        let e = Expr::Not(Box::new(col("flag").is_not_null()));
404        assert_eq!(expr_to_sql(&e).as_deref(), Some("NOT flag IS NOT NULL"));
405    }
406
407    #[test]
408    fn escapes_string_literals_and_quotes() {
409        // `name` is a SQL keyword, so the unparser double-quotes the identifier;
410        // the single quote inside the literal is doubled.
411        let e = col("name").eq(lit("o'brien"));
412        assert_eq!(expr_to_sql(&e).as_deref(), Some("(\"name\" = 'o''brien')"));
413    }
414
415    #[test]
416    fn renders_booleans_and_finite_floats() {
417        assert_eq!(expr_to_sql(&lit(true)).as_deref(), Some("true"));
418        assert_eq!(expr_to_sql(&lit(0.1_f64)).as_deref(), Some("0.1"));
419    }
420
421    #[test]
422    fn like_predicate_now_renders() {
423        // The standard unparser supports LIKE (the hand-rolled renderer did
424        // not); safe because pushdown stays Inexact and the coordinator
425        // re-applies the exact predicate above the union.
426        let e = col("name").like(lit("foo%"));
427        assert!(expr_to_sql(&e).is_some());
428    }
429
430    #[test]
431    fn unconvertible_nodes_return_none() {
432        // A binary literal has no SQL spelling the unparser supports, so the
433        // whole predicate falls back to `None`, stays Unsupported, and is
434        // evaluated only on the coordinator.
435        let e = col("payload").eq(Expr::Literal(
436            ScalarValue::Binary(Some(vec![0xde, 0xad])),
437            None,
438        ));
439        assert_eq!(expr_to_sql(&e), None);
440    }
441}