1#![allow(clippy::disallowed_types)] use std::any::Any;
7use std::fmt::{self, Debug, Formatter};
8use std::sync::Arc;
9
10use arrow_array::RecordBatch;
11use arrow_schema::SchemaRef;
12use async_trait::async_trait;
13use datafusion::catalog::Session;
14use datafusion::datasource::{TableProvider, TableType};
15use datafusion::error::{DataFusionError, Result};
16use datafusion::execution::{SendableRecordBatchStream, TaskContext};
17use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
18use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
19use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
20use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
21use datafusion::physical_plan::{
22 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
23};
24use datafusion_common::tree_node::{Transformed, TreeNode};
25use datafusion_expr::{Expr, TableProviderFilterPushDown};
26use datafusion_sql::unparser;
27use futures::stream::StreamExt;
28use laminar_core::cluster::control::{remote_scan_client, ClusterController};
29use laminar_core::cluster::discovery::NodeId;
30
31const MAX_CONCURRENT_REMOTE: usize = 16;
33
34pub struct DistributedTableProvider {
37 table_name: String,
38 schema: SchemaRef,
39 inner: Arc<dyn TableProvider>,
40 controller: Arc<ClusterController>,
41}
42
43impl DistributedTableProvider {
44 #[must_use]
46 pub fn new(
47 table_name: String,
48 schema: SchemaRef,
49 inner: Arc<dyn TableProvider>,
50 controller: Arc<ClusterController>,
51 ) -> Self {
52 Self {
53 table_name,
54 schema,
55 inner,
56 controller,
57 }
58 }
59}
60
61impl Debug for DistributedTableProvider {
62 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
63 f.debug_struct("DistributedTableProvider")
64 .field("table_name", &self.table_name)
65 .finish_non_exhaustive()
66 }
67}
68
69#[async_trait]
70impl TableProvider for DistributedTableProvider {
71 fn as_any(&self) -> &dyn Any {
72 self
73 }
74
75 fn schema(&self) -> SchemaRef {
76 self.schema.clone()
77 }
78
79 fn table_type(&self) -> TableType {
80 self.inner.table_type()
81 }
82
83 fn supports_filters_pushdown(
84 &self,
85 filters: &[&Expr],
86 ) -> Result<Vec<TableProviderFilterPushDown>> {
87 Ok(filters
90 .iter()
91 .map(|f| {
92 if expr_to_sql(f).is_some() {
93 TableProviderFilterPushDown::Inexact
94 } else {
95 TableProviderFilterPushDown::Unsupported
96 }
97 })
98 .collect())
99 }
100
101 async fn scan(
102 &self,
103 state: &dyn Session,
104 projection: Option<&Vec<usize>>,
105 filters: &[Expr],
106 _limit: Option<usize>,
107 ) -> Result<Arc<dyn ExecutionPlan>> {
108 let local = self.inner.scan(state, projection, &[], None).await?;
111
112 let rendered: Vec<String> = filters.iter().filter_map(expr_to_sql).collect();
115 let filter_sql = (!rendered.is_empty()).then(|| rendered.join(" AND "));
116
117 let me = self.controller.instance_id();
118 let peers: Vec<NodeId> = self
119 .controller
120 .live_instances()
121 .into_iter()
122 .filter(|&id| id != me)
123 .collect();
124
125 Ok(Arc::new(DistributedScanExec::new(
126 local,
127 self.table_name.clone(),
128 projection.cloned(),
129 filter_sql,
130 Arc::clone(&self.controller),
131 peers,
132 )))
133 }
134}
135
136pub struct DistributedScanExec {
139 local: Arc<dyn ExecutionPlan>,
140 table_name: String,
141 projection: Option<Vec<usize>>,
142 filter_sql: Option<String>,
144 controller: Arc<ClusterController>,
145 peers: Vec<NodeId>,
146 schema: SchemaRef,
147 properties: PlanProperties,
148 metrics: ExecutionPlanMetricsSet,
149}
150
151impl DistributedScanExec {
152 #[must_use]
154 pub fn new(
155 local: Arc<dyn ExecutionPlan>,
156 table_name: String,
157 projection: Option<Vec<usize>>,
158 filter_sql: Option<String>,
159 controller: Arc<ClusterController>,
160 peers: Vec<NodeId>,
161 ) -> Self {
162 let schema = local.schema();
163 let properties = PlanProperties::new(
164 EquivalenceProperties::new(Arc::clone(&schema)),
165 Partitioning::UnknownPartitioning(1),
166 EmissionType::Incremental,
167 Boundedness::Bounded,
168 );
169 Self {
170 local,
171 table_name,
172 projection,
173 filter_sql,
174 controller,
175 peers,
176 schema,
177 properties,
178 metrics: ExecutionPlanMetricsSet::new(),
179 }
180 }
181}
182
183impl Debug for DistributedScanExec {
184 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
185 f.debug_struct("DistributedScanExec")
186 .field("table_name", &self.table_name)
187 .field("peers", &self.peers.len())
188 .finish_non_exhaustive()
189 }
190}
191
192impl DisplayAs for DistributedScanExec {
193 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
194 match t {
195 DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
196 f,
197 "DistributedScanExec: table={}, peers={}",
198 self.table_name,
199 self.peers.len()
200 ),
201 DisplayFormatType::TreeRender => write!(f, "DistributedScanExec"),
202 }
203 }
204}
205
206impl ExecutionPlan for DistributedScanExec {
207 fn name(&self) -> &'static str {
208 "DistributedScanExec"
209 }
210
211 fn as_any(&self) -> &dyn Any {
212 self
213 }
214
215 fn schema(&self) -> SchemaRef {
216 Arc::clone(&self.schema)
217 }
218
219 fn properties(&self) -> &PlanProperties {
220 &self.properties
221 }
222
223 fn metrics(&self) -> Option<MetricsSet> {
224 Some(self.metrics.clone_inner())
225 }
226
227 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
228 vec![&self.local]
229 }
230
231 fn with_new_children(
232 self: Arc<Self>,
233 mut children: Vec<Arc<dyn ExecutionPlan>>,
234 ) -> Result<Arc<dyn ExecutionPlan>> {
235 if children.len() != 1 {
236 return Err(DataFusionError::Plan(
237 "DistributedScanExec requires exactly one child".into(),
238 ));
239 }
240 Ok(Arc::new(Self::new(
241 children.swap_remove(0),
242 self.table_name.clone(),
243 self.projection.clone(),
244 self.filter_sql.clone(),
245 Arc::clone(&self.controller),
246 self.peers.clone(),
247 )))
248 }
249
250 fn execute(
251 &self,
252 partition: usize,
253 context: Arc<TaskContext>,
254 ) -> Result<SendableRecordBatchStream> {
255 if partition != 0 {
256 return Err(DataFusionError::Plan(format!(
257 "DistributedScanExec has a single partition; got {partition}"
258 )));
259 }
260
261 let local_partitions = self.local.output_partitioning().partition_count();
263 let mut local_streams = Vec::with_capacity(local_partitions);
264 for p in 0..local_partitions {
265 local_streams.push(self.local.execute(p, Arc::clone(&context))?);
266 }
267 let local = if local_streams.is_empty() {
268 futures::stream::empty().boxed()
269 } else {
270 futures::stream::select_all(local_streams).boxed()
271 };
272
273 let peer_failures = MetricBuilder::new(&self.metrics).global_counter("peer_failures");
276 let peers_skipped = MetricBuilder::new(&self.metrics).global_counter("peers_skipped");
277 let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
278
279 let pool = Arc::clone(self.controller.query_client_pool());
282 let kv = Arc::clone(self.controller.kv());
283 let table = self.table_name.clone();
284 let projection = self.projection.clone();
285 let filter_sql = self.filter_sql.clone();
286 let schema = Arc::clone(&self.schema);
287 let remote = futures::stream::iter(self.peers.clone())
288 .map(move |peer| {
289 let pool = Arc::clone(&pool);
290 let kv = Arc::clone(&kv);
291 let table = table.clone();
292 let projection = projection.clone();
293 let filter_sql = filter_sql.clone();
294 let schema = Arc::clone(&schema);
295 let peer_failures = peer_failures.clone();
296 let peers_skipped = peers_skipped.clone();
297 async move {
298 match remote_scan_client(&pool, &kv, peer, &table, projection, filter_sql).await
299 {
300 Ok(Some(chunks)) => Box::pin(RecordBatchStreamAdapter::new(
303 schema,
304 chunks.map(|batch| batch.map_err(DataFusionError::Execution)),
305 )) as SendableRecordBatchStream,
306 Ok(None) => {
309 peers_skipped.add(1);
310 tracing::warn!(
311 peer = peer.0,
312 table = %table,
313 "distributed scan: peer unreachable, omitting its slice (partial result)"
314 );
315 Box::pin(RecordBatchStreamAdapter::new(
316 schema,
317 futures::stream::empty::<Result<RecordBatch>>(),
318 )) as SendableRecordBatchStream
319 }
320 Err(e) => {
323 peer_failures.add(1);
324 Box::pin(RecordBatchStreamAdapter::new(
325 schema,
326 futures::stream::once(async move {
327 Err::<RecordBatch, _>(DataFusionError::Execution(e))
328 }),
329 )) as SendableRecordBatchStream
330 }
331 }
332 }
333 })
334 .buffer_unordered(MAX_CONCURRENT_REMOTE)
335 .flatten()
336 .boxed();
337
338 let stream = futures::stream::select(local, remote).inspect(move |r| {
340 if let Ok(batch) = r {
341 output_rows.add(batch.num_rows());
342 }
343 });
344 Ok(Box::pin(RecordBatchStreamAdapter::new(
345 Arc::clone(&self.schema),
346 stream,
347 )))
348 }
349}
350
351fn expr_to_sql(expr: &Expr) -> Option<String> {
356 let unqualified = strip_column_qualifiers(expr);
357 unparser::expr_to_sql(&unqualified)
358 .ok()
359 .map(|ast| ast.to_string())
360}
361
362fn strip_column_qualifiers(expr: &Expr) -> Expr {
367 expr.clone()
368 .transform(|e| match e {
369 Expr::Column(mut col) => {
370 col.relation = None;
371 Ok(Transformed::yes(Expr::Column(col)))
372 }
373 other => Ok(Transformed::no(other)),
374 })
375 .map_or_else(|_| expr.clone(), |t| t.data)
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use datafusion_common::{Column, ScalarValue};
382 use datafusion_expr::{col, lit};
383
384 #[test]
385 fn renders_comparison() {
386 let e = col("price").gt(lit(100_i64));
387 assert_eq!(expr_to_sql(&e).as_deref(), Some("(price > 100)"));
388 }
389
390 #[test]
391 fn strips_column_qualifier() {
392 let e = Expr::Column(Column::new(Some("mv"), "price")).gt(lit(100_i64));
395 assert_eq!(expr_to_sql(&e).as_deref(), Some("(price > 100)"));
396 }
397
398 #[test]
399 fn renders_and_or_not_and_null_checks() {
400 let e = col("a").eq(lit(1_i64)).and(col("b").is_null());
401 assert_eq!(expr_to_sql(&e).as_deref(), Some("((a = 1) AND b IS NULL)"));
402
403 let e = Expr::Not(Box::new(col("flag").is_not_null()));
404 assert_eq!(expr_to_sql(&e).as_deref(), Some("NOT flag IS NOT NULL"));
405 }
406
407 #[test]
408 fn escapes_string_literals_and_quotes() {
409 let e = col("name").eq(lit("o'brien"));
412 assert_eq!(expr_to_sql(&e).as_deref(), Some("(\"name\" = 'o''brien')"));
413 }
414
415 #[test]
416 fn renders_booleans_and_finite_floats() {
417 assert_eq!(expr_to_sql(&lit(true)).as_deref(), Some("true"));
418 assert_eq!(expr_to_sql(&lit(0.1_f64)).as_deref(), Some("0.1"));
419 }
420
421 #[test]
422 fn like_predicate_now_renders() {
423 let e = col("name").like(lit("foo%"));
427 assert!(expr_to_sql(&e).is_some());
428 }
429
430 #[test]
431 fn unconvertible_nodes_return_none() {
432 let e = col("payload").eq(Expr::Literal(
436 ScalarValue::Binary(Some(vec![0xde, 0xad])),
437 None,
438 ));
439 assert_eq!(expr_to_sql(&e), None);
440 }
441}