Skip to main content

laminar_connectors/lookup/
delta_lookup.rs

1//! Delta Lake on-demand lookup source for cache-miss fallback.
2//!
3//! Implements `LookupSource` backed by a `DataFusion` `TableProvider`. A
4//! batched, typed `pk IN (...)` filter folds all missed keys of a probe into
5//! one file-/partition-pruned scan; [`KeyAligner`](laminar_core::lookup::KeyAligner) handles key decode and
6//! result realignment.
7
8#[cfg(feature = "delta-lake")]
9use std::sync::Arc;
10
11#[cfg(feature = "delta-lake")]
12use arrow_array::{Array, ArrayRef, RecordBatch};
13#[cfg(feature = "delta-lake")]
14use arrow_row::SortField;
15#[cfg(feature = "delta-lake")]
16use arrow_schema::SchemaRef;
17#[cfg(feature = "delta-lake")]
18use datafusion::common::ScalarValue;
19#[cfg(feature = "delta-lake")]
20use datafusion::prelude::{col, Expr, SessionContext};
21
22#[cfg(feature = "delta-lake")]
23use laminar_core::lookup::predicate::Predicate;
24#[cfg(feature = "delta-lake")]
25use laminar_core::lookup::source::{
26    projection_names, ColumnId, LookupError, LookupSource, LookupSourceCapabilities,
27};
28#[cfg(feature = "delta-lake")]
29use laminar_core::lookup::KeyAligner;
30
31/// Configuration for [`DeltaLookupSource`].
32#[cfg(feature = "delta-lake")]
33#[derive(Debug, Clone)]
34pub struct DeltaLookupSourceConfig {
35    /// Table path (resolved, post-catalog).
36    pub table_path: String,
37    /// Storage options (credentials, etc.).
38    pub storage_options: std::collections::HashMap<String, String>,
39    /// Primary key column names.
40    pub primary_key_columns: Vec<String>,
41    /// `DataFusion` table name (registered in session context).
42    pub table_name: String,
43}
44
45/// Delta Lake lookup source for on-demand/partial cache mode.
46#[cfg(feature = "delta-lake")]
47pub struct DeltaLookupSource {
48    ctx: Arc<SessionContext>,
49    table_name: String,
50    schema: SchemaRef,
51    aligner: KeyAligner,
52}
53
54#[cfg(feature = "delta-lake")]
55impl DeltaLookupSource {
56    /// Opens the Delta table and registers it as a `DataFusion` `TableProvider`.
57    ///
58    /// # Errors
59    ///
60    /// Returns `LookupError` if the table cannot be opened/registered or a
61    /// primary key column is missing from the schema.
62    pub async fn open(config: DeltaLookupSourceConfig) -> Result<Self, LookupError> {
63        let ctx = SessionContext::new();
64        crate::lakehouse::delta_table_provider::register_delta_table(
65            &ctx,
66            &config.table_name,
67            &config.table_path,
68            config.storage_options.clone(),
69        )
70        .await
71        .map_err(|e| LookupError::Connection(format!("register delta table: {e}")))?;
72
73        let table = ctx
74            .table(&config.table_name)
75            .await
76            .map_err(|e| LookupError::Internal(format!("get table: {e}")))?;
77        let schema: SchemaRef = Arc::new(table.schema().as_arrow().clone());
78
79        let pk_sort_fields = pk_sort_fields(&schema, &config.primary_key_columns)?;
80        let aligner = KeyAligner::new(pk_sort_fields, config.primary_key_columns.clone())?;
81
82        warn_if_unclustered(&config).await;
83
84        Ok(Self {
85            ctx: Arc::new(ctx),
86            table_name: config.table_name,
87            schema,
88            aligner,
89        })
90    }
91}
92
93/// Resolve the `RowConverter` sort fields for the primary-key columns.
94#[cfg(feature = "delta-lake")]
95fn pk_sort_fields(
96    schema: &SchemaRef,
97    pk_columns: &[String],
98) -> Result<Vec<SortField>, LookupError> {
99    pk_columns
100        .iter()
101        .map(|name| {
102            let idx = schema
103                .index_of(name)
104                .map_err(|_| LookupError::Internal(format!("pk column not found: {name}")))?;
105            Ok(SortField::new(schema.field(idx).data_type().clone()))
106        })
107        .collect()
108}
109
110/// Build a typed `pk IN (...)` (single column) or OR-of-AND-groups (composite)
111/// filter from the decoded primary-key columns. Using typed `Expr` literals
112/// (not string SQL) keeps type handling and escaping correct.
113#[cfg(feature = "delta-lake")]
114fn build_in_list_filter(
115    pk_columns: &[String],
116    pk_arrays: &[ArrayRef],
117) -> Result<Expr, LookupError> {
118    let n = if pk_arrays.is_empty() {
119        0
120    } else {
121        pk_arrays[0].len()
122    };
123    let scalar = |arr: &ArrayRef, row: usize| {
124        ScalarValue::try_from_array(arr, row)
125            .map(|sv| Expr::Literal(sv, None))
126            .map_err(|e| LookupError::Internal(format!("scalar from key: {e}")))
127    };
128
129    if pk_columns.len() == 1 {
130        let column = col(&pk_columns[0]);
131        let arr = &pk_arrays[0];
132        let mut lits = Vec::new();
133        let mut has_null = false;
134        for row in 0..n {
135            if arr.is_null(row) {
136                has_null = true;
137            } else {
138                lits.push(scalar(arr, row)?);
139            }
140        }
141        let mut filter = (!lits.is_empty()).then(|| column.clone().in_list(lits, false));
142        if has_null {
143            let is_null = column.is_null();
144            filter = Some(match filter {
145                Some(f) => f.or(is_null),
146                None => is_null,
147            });
148        }
149        return filter.ok_or_else(|| LookupError::Internal("no keys to look up".into()));
150    }
151
152    let mut groups: Vec<Expr> = Vec::with_capacity(n);
153    for row in 0..n {
154        let mut conj: Option<Expr> = None;
155        for (ci, name) in pk_columns.iter().enumerate() {
156            let term = if pk_arrays[ci].is_null(row) {
157                col(name).is_null()
158            } else {
159                col(name).eq(scalar(&pk_arrays[ci], row)?)
160            };
161            conj = Some(match conj {
162                Some(c) => c.and(term),
163                None => term,
164            });
165        }
166        if let Some(c) = conj {
167            groups.push(c);
168        }
169    }
170    let mut it = groups.into_iter();
171    it.next()
172        .map(|first| it.fold(first, Expr::or))
173        .ok_or_else(|| LookupError::Internal("no keys to look up".into()))
174}
175
176/// Best-effort clustering diagnostic: an on-demand lookup is only cheap if the
177/// dimension is partitioned/clustered on the key. Delta exposes partition
178/// columns (not Z-ORDER), so this is a warning, never an error.
179#[cfg(feature = "delta-lake")]
180async fn warn_if_unclustered(config: &DeltaLookupSourceConfig) {
181    let Ok(table) = crate::lakehouse::delta_io::open_or_create_table(
182        &config.table_path,
183        config.storage_options.clone(),
184        None,
185    )
186    .await
187    else {
188        return;
189    };
190    let partition_columns = crate::lakehouse::delta_io::get_partition_columns(&table);
191    if !config
192        .primary_key_columns
193        .iter()
194        .any(|k| partition_columns.contains(k))
195    {
196        tracing::warn!(
197            table = %config.table_path,
198            primary_key = ?config.primary_key_columns,
199            partition_columns = ?partition_columns,
200            "delta lookup table is not partitioned on the lookup key; unless it is \
201             Z-ORDER clustered on the key, every cache-miss fetch will full-scan the \
202             table. Cluster the dimension on the lookup key for bounded per-fetch cost."
203        );
204    }
205}
206
207#[cfg(feature = "delta-lake")]
208impl LookupSource for DeltaLookupSource {
209    async fn query(
210        &self,
211        keys: &[&[u8]],
212        _predicates: &[Predicate],
213        projection: &[ColumnId],
214    ) -> Result<Vec<Option<RecordBatch>>, LookupError> {
215        if keys.is_empty() {
216            return Ok(Vec::new());
217        }
218        let pk_arrays = self.aligner.decode_keys(keys)?;
219        let filter = build_in_list_filter(self.aligner.pk_columns(), &pk_arrays)?;
220
221        let mut df = self
222            .ctx
223            .table(&self.table_name)
224            .await
225            .map_err(|e| LookupError::Query(format!("open delta table: {e}")))?
226            .filter(filter)
227            .map_err(|e| LookupError::Query(format!("apply lookup filter: {e}")))?;
228        let original_names = if projection.is_empty() {
229            None
230        } else {
231            Some(projection_names(&self.schema, projection)?)
232        };
233        // Projection pushdown: select only the requested columns (the optimizer
234        // pushes this into the Parquet scan). The projection must carry the
235        // key columns so realignment works, then we project them out if unrequested.
236        if !projection.is_empty() {
237            let mut names = projection_names(&self.schema, projection)?;
238            for pk in self.aligner.pk_columns() {
239                if !names.contains(pk) {
240                    names.push(pk.clone());
241                }
242            }
243            let refs: Vec<&str> = names.iter().map(String::as_str).collect();
244            df = df
245                .select_columns(&refs)
246                .map_err(|e| LookupError::Query(format!("apply lookup projection: {e}")))?;
247        }
248        let batches = df
249            .collect()
250            .await
251            .map_err(|e| LookupError::Query(format!("collect lookup results: {e}")))?;
252
253        let aligned = self
254            .aligner
255            .align(keys, &batches)
256            .map_err(|e| LookupError::Internal(format!("align lookup results: {e}")))?;
257
258        if let Some(orig_names) = original_names {
259            let mut projected_aligned = Vec::with_capacity(aligned.len());
260            for maybe_batch in aligned {
261                if let Some(batch) = maybe_batch {
262                    let indices: Vec<usize> = orig_names
263                        .iter()
264                        .map(|name| {
265                            batch.schema().index_of(name).map_err(|e| {
266                                LookupError::Internal(format!(
267                                    "column not found in aligned schema: {e}"
268                                ))
269                            })
270                        })
271                        .collect::<Result<Vec<usize>, LookupError>>()?;
272                    let projected = batch.project(&indices).map_err(|e| {
273                        LookupError::Internal(format!("project aligned batch: {e}"))
274                    })?;
275                    projected_aligned.push(Some(projected));
276                } else {
277                    projected_aligned.push(None);
278                }
279            }
280            Ok(projected_aligned)
281        } else {
282            Ok(aligned)
283        }
284    }
285
286    fn capabilities(&self) -> LookupSourceCapabilities {
287        LookupSourceCapabilities {
288            supports_batch_lookup: true,
289            supports_projection_pushdown: true,
290            ..LookupSourceCapabilities::none()
291        }
292    }
293
294    #[allow(clippy::unnecessary_literal_bound)]
295    fn source_name(&self) -> &str {
296        "delta-lake"
297    }
298
299    fn schema(&self) -> SchemaRef {
300        Arc::clone(&self.schema)
301    }
302
303    async fn health_check(&self) -> Result<(), LookupError> {
304        self.ctx
305            .table(&self.table_name)
306            .await
307            .map(|_| ())
308            .map_err(|e| LookupError::Connection(format!("health check: {e}")))
309    }
310}
311
312#[cfg(all(test, feature = "delta-lake"))]
313mod tests {
314    use super::*;
315    use arrow_array::{Int64Array, StringArray};
316    use arrow_row::RowConverter;
317    use arrow_schema::{DataType, Field, Schema};
318    use std::collections::HashMap;
319    use tempfile::TempDir;
320
321    fn test_schema() -> SchemaRef {
322        Arc::new(Schema::new(vec![
323            Field::new("id", DataType::Int64, false),
324            Field::new("name", DataType::Utf8, true),
325        ]))
326    }
327
328    fn test_batch(ids: &[i64], names: &[&str]) -> RecordBatch {
329        RecordBatch::try_new(
330            test_schema(),
331            vec![
332                Arc::new(Int64Array::from(ids.to_vec())),
333                Arc::new(StringArray::from(names.to_vec())),
334            ],
335        )
336        .unwrap()
337    }
338
339    fn int_keys(ids: &[i64]) -> Vec<Vec<u8>> {
340        let converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap();
341        let rows = converter
342            .convert_columns(&[Arc::new(Int64Array::from(ids.to_vec()))])
343            .unwrap();
344        (0..ids.len())
345            .map(|i| rows.row(i).as_ref().to_vec())
346            .collect()
347    }
348
349    async fn create_delta_table(path: &str, batches: Vec<RecordBatch>) {
350        use crate::lakehouse::delta_io;
351        use deltalake::protocol::SaveMode;
352
353        let table = delta_io::open_or_create_table(path, HashMap::new(), Some(&test_schema()))
354            .await
355            .unwrap();
356        delta_io::write_batches(
357            table,
358            batches,
359            "test-writer",
360            1,
361            SaveMode::Append,
362            None,
363            false,
364            None,
365            false,
366            None,
367        )
368        .await
369        .unwrap();
370    }
371
372    async fn open_source(path: &str, table_name: &str) -> DeltaLookupSource {
373        DeltaLookupSource::open(DeltaLookupSourceConfig {
374            table_path: path.to_string(),
375            storage_options: HashMap::new(),
376            primary_key_columns: vec!["id".into()],
377            table_name: table_name.to_string(),
378        })
379        .await
380        .unwrap()
381    }
382
383    fn id_at(batch: &RecordBatch) -> i64 {
384        batch
385            .column(0)
386            .as_any()
387            .downcast_ref::<Int64Array>()
388            .unwrap()
389            .value(0)
390    }
391
392    #[tokio::test]
393    async fn batched_lookup_aligns_hits_and_misses() {
394        let temp_dir = TempDir::new().unwrap();
395        let path = temp_dir.path().to_str().unwrap();
396        create_delta_table(path, vec![test_batch(&[1, 2, 3], &["A", "B", "C"])]).await;
397        let source = open_source(path, "lk").await;
398
399        // Out-of-table-order with a miss; one batched fetch.
400        let keys = int_keys(&[3, 1, 999, 2]);
401        let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
402        let results = source.query(&key_refs, &[], &[]).await.unwrap();
403
404        assert_eq!(results.len(), 4);
405        assert_eq!(id_at(results[0].as_ref().unwrap()), 3);
406        assert_eq!(id_at(results[1].as_ref().unwrap()), 1);
407        assert!(results[2].is_none());
408        assert_eq!(id_at(results[3].as_ref().unwrap()), 2);
409    }
410
411    /// The Phase 3 exit criterion: a batched `pk IN (...)` must prune
412    /// non-matching partition files (per-key cost O(matching files), not
413    /// O(table)) on a table clustered on the key.
414    #[tokio::test]
415    async fn in_list_prunes_partition_files() {
416        use datafusion::physical_plan::collect;
417
418        let temp_dir = TempDir::new().unwrap();
419        let path = temp_dir.path().to_str().unwrap();
420
421        // 8 distinct keys → 8 partition directories (one Parquet file each).
422        {
423            use crate::lakehouse::delta_io;
424            use deltalake::protocol::SaveMode;
425            let t = delta_io::open_or_create_table(path, HashMap::new(), None)
426                .await
427                .unwrap();
428            delta_io::write_batches(
429                t,
430                vec![test_batch(
431                    &[0, 1, 2, 3, 4, 5, 6, 7],
432                    &["a", "b", "c", "d", "e", "f", "g", "h"],
433                )],
434                "w",
435                1,
436                SaveMode::Append,
437                Some(&["id".to_string()]),
438                false,
439                None,
440                false,
441                None,
442            )
443            .await
444            .unwrap();
445        }
446
447        // Correctness across partition files via the source.
448        let source = open_source(path, "lk").await;
449        let keys = int_keys(&[5, 2, 100]);
450        let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
451        let results = source.query(&key_refs, &[], &[]).await.unwrap();
452        assert!(results[0].is_some() && results[1].is_some() && results[2].is_none());
453
454        // Pruning: the IN-list query reads fewer than all 8 files. (`next`
455        // provider reports files read via `count_files_scanned`.)
456        let ctx = SessionContext::new();
457        crate::lakehouse::delta_table_provider::register_delta_table(
458            &ctx,
459            "lk",
460            path,
461            HashMap::new(),
462        )
463        .await
464        .unwrap();
465        let plan = ctx
466            .sql("SELECT * FROM \"lk\" WHERE \"id\" IN (2, 5)")
467            .await
468            .unwrap()
469            .create_physical_plan()
470            .await
471            .unwrap();
472        let _ = collect(Arc::clone(&plan), ctx.task_ctx()).await.unwrap();
473        let scanned = sum_plan_metric(&plan, "count_files_scanned");
474        assert!(
475            scanned > 0 && scanned < 8,
476            "expected pruning, scanned={scanned}"
477        );
478    }
479
480    fn sum_plan_metric(
481        plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
482        name: &str,
483    ) -> usize {
484        let mut total = plan
485            .metrics()
486            .and_then(|m| m.sum_by_name(name))
487            .map_or(0, |v| v.as_usize());
488        for child in plan.children() {
489            total += sum_plan_metric(child, name);
490        }
491        total
492    }
493}