laminar_connectors/lookup/
delta_lookup.rs1#[cfg(feature = "delta-lake")]
9use std::sync::Arc;
10
11#[cfg(feature = "delta-lake")]
12use arrow_array::{Array, ArrayRef, RecordBatch};
13#[cfg(feature = "delta-lake")]
14use arrow_row::SortField;
15#[cfg(feature = "delta-lake")]
16use arrow_schema::SchemaRef;
17#[cfg(feature = "delta-lake")]
18use datafusion::common::ScalarValue;
19#[cfg(feature = "delta-lake")]
20use datafusion::prelude::{col, Expr, SessionContext};
21
22#[cfg(feature = "delta-lake")]
23use laminar_core::lookup::predicate::Predicate;
24#[cfg(feature = "delta-lake")]
25use laminar_core::lookup::source::{
26 projection_names, ColumnId, LookupError, LookupSource, LookupSourceCapabilities,
27};
28#[cfg(feature = "delta-lake")]
29use laminar_core::lookup::KeyAligner;
30
31#[cfg(feature = "delta-lake")]
33#[derive(Debug, Clone)]
34pub struct DeltaLookupSourceConfig {
35 pub table_path: String,
37 pub storage_options: std::collections::HashMap<String, String>,
39 pub primary_key_columns: Vec<String>,
41 pub table_name: String,
43}
44
45#[cfg(feature = "delta-lake")]
47pub struct DeltaLookupSource {
48 ctx: Arc<SessionContext>,
49 table_name: String,
50 schema: SchemaRef,
51 aligner: KeyAligner,
52}
53
54#[cfg(feature = "delta-lake")]
55impl DeltaLookupSource {
56 pub async fn open(config: DeltaLookupSourceConfig) -> Result<Self, LookupError> {
63 let ctx = SessionContext::new();
64 crate::lakehouse::delta_table_provider::register_delta_table(
65 &ctx,
66 &config.table_name,
67 &config.table_path,
68 config.storage_options.clone(),
69 )
70 .await
71 .map_err(|e| LookupError::Connection(format!("register delta table: {e}")))?;
72
73 let table = ctx
74 .table(&config.table_name)
75 .await
76 .map_err(|e| LookupError::Internal(format!("get table: {e}")))?;
77 let schema: SchemaRef = Arc::new(table.schema().as_arrow().clone());
78
79 let pk_sort_fields = pk_sort_fields(&schema, &config.primary_key_columns)?;
80 let aligner = KeyAligner::new(pk_sort_fields, config.primary_key_columns.clone())?;
81
82 warn_if_unclustered(&config).await;
83
84 Ok(Self {
85 ctx: Arc::new(ctx),
86 table_name: config.table_name,
87 schema,
88 aligner,
89 })
90 }
91}
92
93#[cfg(feature = "delta-lake")]
95fn pk_sort_fields(
96 schema: &SchemaRef,
97 pk_columns: &[String],
98) -> Result<Vec<SortField>, LookupError> {
99 pk_columns
100 .iter()
101 .map(|name| {
102 let idx = schema
103 .index_of(name)
104 .map_err(|_| LookupError::Internal(format!("pk column not found: {name}")))?;
105 Ok(SortField::new(schema.field(idx).data_type().clone()))
106 })
107 .collect()
108}
109
110#[cfg(feature = "delta-lake")]
114fn build_in_list_filter(
115 pk_columns: &[String],
116 pk_arrays: &[ArrayRef],
117) -> Result<Expr, LookupError> {
118 let n = if pk_arrays.is_empty() {
119 0
120 } else {
121 pk_arrays[0].len()
122 };
123 let scalar = |arr: &ArrayRef, row: usize| {
124 ScalarValue::try_from_array(arr, row)
125 .map(|sv| Expr::Literal(sv, None))
126 .map_err(|e| LookupError::Internal(format!("scalar from key: {e}")))
127 };
128
129 if pk_columns.len() == 1 {
130 let column = col(&pk_columns[0]);
131 let arr = &pk_arrays[0];
132 let mut lits = Vec::new();
133 let mut has_null = false;
134 for row in 0..n {
135 if arr.is_null(row) {
136 has_null = true;
137 } else {
138 lits.push(scalar(arr, row)?);
139 }
140 }
141 let mut filter = (!lits.is_empty()).then(|| column.clone().in_list(lits, false));
142 if has_null {
143 let is_null = column.is_null();
144 filter = Some(match filter {
145 Some(f) => f.or(is_null),
146 None => is_null,
147 });
148 }
149 return filter.ok_or_else(|| LookupError::Internal("no keys to look up".into()));
150 }
151
152 let mut groups: Vec<Expr> = Vec::with_capacity(n);
153 for row in 0..n {
154 let mut conj: Option<Expr> = None;
155 for (ci, name) in pk_columns.iter().enumerate() {
156 let term = if pk_arrays[ci].is_null(row) {
157 col(name).is_null()
158 } else {
159 col(name).eq(scalar(&pk_arrays[ci], row)?)
160 };
161 conj = Some(match conj {
162 Some(c) => c.and(term),
163 None => term,
164 });
165 }
166 if let Some(c) = conj {
167 groups.push(c);
168 }
169 }
170 let mut it = groups.into_iter();
171 it.next()
172 .map(|first| it.fold(first, Expr::or))
173 .ok_or_else(|| LookupError::Internal("no keys to look up".into()))
174}
175
176#[cfg(feature = "delta-lake")]
180async fn warn_if_unclustered(config: &DeltaLookupSourceConfig) {
181 let Ok(table) = crate::lakehouse::delta_io::open_or_create_table(
182 &config.table_path,
183 config.storage_options.clone(),
184 None,
185 )
186 .await
187 else {
188 return;
189 };
190 let partition_columns = crate::lakehouse::delta_io::get_partition_columns(&table);
191 if !config
192 .primary_key_columns
193 .iter()
194 .any(|k| partition_columns.contains(k))
195 {
196 tracing::warn!(
197 table = %config.table_path,
198 primary_key = ?config.primary_key_columns,
199 partition_columns = ?partition_columns,
200 "delta lookup table is not partitioned on the lookup key; unless it is \
201 Z-ORDER clustered on the key, every cache-miss fetch will full-scan the \
202 table. Cluster the dimension on the lookup key for bounded per-fetch cost."
203 );
204 }
205}
206
207#[cfg(feature = "delta-lake")]
208impl LookupSource for DeltaLookupSource {
209 async fn query(
210 &self,
211 keys: &[&[u8]],
212 _predicates: &[Predicate],
213 projection: &[ColumnId],
214 ) -> Result<Vec<Option<RecordBatch>>, LookupError> {
215 if keys.is_empty() {
216 return Ok(Vec::new());
217 }
218 let pk_arrays = self.aligner.decode_keys(keys)?;
219 let filter = build_in_list_filter(self.aligner.pk_columns(), &pk_arrays)?;
220
221 let mut df = self
222 .ctx
223 .table(&self.table_name)
224 .await
225 .map_err(|e| LookupError::Query(format!("open delta table: {e}")))?
226 .filter(filter)
227 .map_err(|e| LookupError::Query(format!("apply lookup filter: {e}")))?;
228 let original_names = if projection.is_empty() {
229 None
230 } else {
231 Some(projection_names(&self.schema, projection)?)
232 };
233 if !projection.is_empty() {
237 let mut names = projection_names(&self.schema, projection)?;
238 for pk in self.aligner.pk_columns() {
239 if !names.contains(pk) {
240 names.push(pk.clone());
241 }
242 }
243 let refs: Vec<&str> = names.iter().map(String::as_str).collect();
244 df = df
245 .select_columns(&refs)
246 .map_err(|e| LookupError::Query(format!("apply lookup projection: {e}")))?;
247 }
248 let batches = df
249 .collect()
250 .await
251 .map_err(|e| LookupError::Query(format!("collect lookup results: {e}")))?;
252
253 let aligned = self
254 .aligner
255 .align(keys, &batches)
256 .map_err(|e| LookupError::Internal(format!("align lookup results: {e}")))?;
257
258 if let Some(orig_names) = original_names {
259 let mut projected_aligned = Vec::with_capacity(aligned.len());
260 for maybe_batch in aligned {
261 if let Some(batch) = maybe_batch {
262 let indices: Vec<usize> = orig_names
263 .iter()
264 .map(|name| {
265 batch.schema().index_of(name).map_err(|e| {
266 LookupError::Internal(format!(
267 "column not found in aligned schema: {e}"
268 ))
269 })
270 })
271 .collect::<Result<Vec<usize>, LookupError>>()?;
272 let projected = batch.project(&indices).map_err(|e| {
273 LookupError::Internal(format!("project aligned batch: {e}"))
274 })?;
275 projected_aligned.push(Some(projected));
276 } else {
277 projected_aligned.push(None);
278 }
279 }
280 Ok(projected_aligned)
281 } else {
282 Ok(aligned)
283 }
284 }
285
286 fn capabilities(&self) -> LookupSourceCapabilities {
287 LookupSourceCapabilities {
288 supports_batch_lookup: true,
289 supports_projection_pushdown: true,
290 ..LookupSourceCapabilities::none()
291 }
292 }
293
294 #[allow(clippy::unnecessary_literal_bound)]
295 fn source_name(&self) -> &str {
296 "delta-lake"
297 }
298
299 fn schema(&self) -> SchemaRef {
300 Arc::clone(&self.schema)
301 }
302
303 async fn health_check(&self) -> Result<(), LookupError> {
304 self.ctx
305 .table(&self.table_name)
306 .await
307 .map(|_| ())
308 .map_err(|e| LookupError::Connection(format!("health check: {e}")))
309 }
310}
311
312#[cfg(all(test, feature = "delta-lake"))]
313mod tests {
314 use super::*;
315 use arrow_array::{Int64Array, StringArray};
316 use arrow_row::RowConverter;
317 use arrow_schema::{DataType, Field, Schema};
318 use std::collections::HashMap;
319 use tempfile::TempDir;
320
321 fn test_schema() -> SchemaRef {
322 Arc::new(Schema::new(vec![
323 Field::new("id", DataType::Int64, false),
324 Field::new("name", DataType::Utf8, true),
325 ]))
326 }
327
328 fn test_batch(ids: &[i64], names: &[&str]) -> RecordBatch {
329 RecordBatch::try_new(
330 test_schema(),
331 vec![
332 Arc::new(Int64Array::from(ids.to_vec())),
333 Arc::new(StringArray::from(names.to_vec())),
334 ],
335 )
336 .unwrap()
337 }
338
339 fn int_keys(ids: &[i64]) -> Vec<Vec<u8>> {
340 let converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap();
341 let rows = converter
342 .convert_columns(&[Arc::new(Int64Array::from(ids.to_vec()))])
343 .unwrap();
344 (0..ids.len())
345 .map(|i| rows.row(i).as_ref().to_vec())
346 .collect()
347 }
348
349 async fn create_delta_table(path: &str, batches: Vec<RecordBatch>) {
350 use crate::lakehouse::delta_io;
351 use deltalake::protocol::SaveMode;
352
353 let table = delta_io::open_or_create_table(path, HashMap::new(), Some(&test_schema()))
354 .await
355 .unwrap();
356 delta_io::write_batches(
357 table,
358 batches,
359 "test-writer",
360 1,
361 SaveMode::Append,
362 None,
363 false,
364 None,
365 false,
366 None,
367 )
368 .await
369 .unwrap();
370 }
371
372 async fn open_source(path: &str, table_name: &str) -> DeltaLookupSource {
373 DeltaLookupSource::open(DeltaLookupSourceConfig {
374 table_path: path.to_string(),
375 storage_options: HashMap::new(),
376 primary_key_columns: vec!["id".into()],
377 table_name: table_name.to_string(),
378 })
379 .await
380 .unwrap()
381 }
382
383 fn id_at(batch: &RecordBatch) -> i64 {
384 batch
385 .column(0)
386 .as_any()
387 .downcast_ref::<Int64Array>()
388 .unwrap()
389 .value(0)
390 }
391
392 #[tokio::test]
393 async fn batched_lookup_aligns_hits_and_misses() {
394 let temp_dir = TempDir::new().unwrap();
395 let path = temp_dir.path().to_str().unwrap();
396 create_delta_table(path, vec![test_batch(&[1, 2, 3], &["A", "B", "C"])]).await;
397 let source = open_source(path, "lk").await;
398
399 let keys = int_keys(&[3, 1, 999, 2]);
401 let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
402 let results = source.query(&key_refs, &[], &[]).await.unwrap();
403
404 assert_eq!(results.len(), 4);
405 assert_eq!(id_at(results[0].as_ref().unwrap()), 3);
406 assert_eq!(id_at(results[1].as_ref().unwrap()), 1);
407 assert!(results[2].is_none());
408 assert_eq!(id_at(results[3].as_ref().unwrap()), 2);
409 }
410
411 #[tokio::test]
415 async fn in_list_prunes_partition_files() {
416 use datafusion::physical_plan::collect;
417
418 let temp_dir = TempDir::new().unwrap();
419 let path = temp_dir.path().to_str().unwrap();
420
421 {
423 use crate::lakehouse::delta_io;
424 use deltalake::protocol::SaveMode;
425 let t = delta_io::open_or_create_table(path, HashMap::new(), None)
426 .await
427 .unwrap();
428 delta_io::write_batches(
429 t,
430 vec![test_batch(
431 &[0, 1, 2, 3, 4, 5, 6, 7],
432 &["a", "b", "c", "d", "e", "f", "g", "h"],
433 )],
434 "w",
435 1,
436 SaveMode::Append,
437 Some(&["id".to_string()]),
438 false,
439 None,
440 false,
441 None,
442 )
443 .await
444 .unwrap();
445 }
446
447 let source = open_source(path, "lk").await;
449 let keys = int_keys(&[5, 2, 100]);
450 let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
451 let results = source.query(&key_refs, &[], &[]).await.unwrap();
452 assert!(results[0].is_some() && results[1].is_some() && results[2].is_none());
453
454 let ctx = SessionContext::new();
457 crate::lakehouse::delta_table_provider::register_delta_table(
458 &ctx,
459 "lk",
460 path,
461 HashMap::new(),
462 )
463 .await
464 .unwrap();
465 let plan = ctx
466 .sql("SELECT * FROM \"lk\" WHERE \"id\" IN (2, 5)")
467 .await
468 .unwrap()
469 .create_physical_plan()
470 .await
471 .unwrap();
472 let _ = collect(Arc::clone(&plan), ctx.task_ctx()).await.unwrap();
473 let scanned = sum_plan_metric(&plan, "count_files_scanned");
474 assert!(
475 scanned > 0 && scanned < 8,
476 "expected pruning, scanned={scanned}"
477 );
478 }
479
480 fn sum_plan_metric(
481 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
482 name: &str,
483 ) -> usize {
484 let mut total = plan
485 .metrics()
486 .and_then(|m| m.sum_by_name(name))
487 .map_or(0, |v| v.as_usize());
488 for child in plan.children() {
489 total += sum_plan_metric(child, name);
490 }
491 total
492 }
493}