Skip to main content

laminar_connectors/lookup/
iceberg_lookup.rs

1//! Iceberg on-demand lookup source for cache-miss fallback.
2//!
3//! Implements `LookupSource` via the native Iceberg scan with a batched
4//! `pk IN (...)` filter pushed down (`with_filter`), so all missed keys of a
5//! probe fold into one manifest-pruned scan. [`KeyAligner`](laminar_core::lookup::KeyAligner) handles key decode
6//! and result realignment.
7
8#[cfg(feature = "iceberg")]
9use std::sync::Arc;
10
11#[cfg(feature = "iceberg")]
12use arrow_array::{Array, ArrayRef, RecordBatch};
13#[cfg(feature = "iceberg")]
14use arrow_row::SortField;
15#[cfg(feature = "iceberg")]
16use arrow_schema::SchemaRef;
17#[cfg(feature = "iceberg")]
18use iceberg::expr::{Predicate as IcebergPredicate, Reference};
19#[cfg(feature = "iceberg")]
20use iceberg::spec::Datum;
21#[cfg(feature = "iceberg")]
22use iceberg::Catalog;
23
24#[cfg(feature = "iceberg")]
25use laminar_core::lookup::predicate::Predicate;
26#[cfg(feature = "iceberg")]
27use laminar_core::lookup::source::{
28    projection_names, ColumnId, LookupError, LookupSource, LookupSourceCapabilities,
29};
30#[cfg(feature = "iceberg")]
31use laminar_core::lookup::KeyAligner;
32
33#[cfg(feature = "iceberg")]
34use crate::lakehouse::iceberg_config::IcebergCatalogConfig;
35
36/// Configuration for [`IcebergLookupSource`].
37#[cfg(feature = "iceberg")]
38#[derive(Debug, Clone)]
39pub struct IcebergLookupSourceConfig {
40    /// Shared catalog connection settings (also carries namespace + table).
41    pub catalog: IcebergCatalogConfig,
42    /// Primary key column names.
43    pub primary_key_columns: Vec<String>,
44}
45
46/// Iceberg lookup source for on-demand/partial cache mode.
47#[cfg(feature = "iceberg")]
48pub struct IcebergLookupSource {
49    catalog: Arc<dyn Catalog>,
50    namespace: String,
51    table_name: String,
52    schema: SchemaRef,
53    aligner: KeyAligner,
54}
55
56#[cfg(feature = "iceberg")]
57impl IcebergLookupSource {
58    /// Opens the catalog, loads the table, and derives the Arrow schema.
59    ///
60    /// # Errors
61    ///
62    /// Returns `LookupError` if the catalog/table cannot be opened or a primary
63    /// key column is missing from the table schema.
64    pub async fn open(config: IcebergLookupSourceConfig) -> Result<Self, LookupError> {
65        let catalog = crate::lakehouse::iceberg_io::build_catalog(&config.catalog)
66            .await
67            .map_err(|e| LookupError::Connection(format!("iceberg catalog: {e}")))?;
68        let table = crate::lakehouse::iceberg_io::load_table(
69            catalog.as_ref(),
70            &config.catalog.namespace,
71            &config.catalog.table_name,
72        )
73        .await
74        .map_err(|e| LookupError::Connection(format!("load iceberg table: {e}")))?;
75
76        let iceberg_schema = table.current_schema_ref();
77        let schema: SchemaRef = Arc::new(
78            iceberg::arrow::schema_to_arrow_schema(&iceberg_schema)
79                .map_err(|e| LookupError::Internal(format!("iceberg schema to arrow: {e}")))?,
80        );
81
82        let pk_sort_fields = config
83            .primary_key_columns
84            .iter()
85            .map(|name| {
86                let idx = schema
87                    .index_of(name)
88                    .map_err(|_| LookupError::Internal(format!("pk column not found: {name}")))?;
89                Ok(SortField::new(schema.field(idx).data_type().clone()))
90            })
91            .collect::<Result<Vec<_>, LookupError>>()?;
92        let aligner = KeyAligner::new(pk_sort_fields, config.primary_key_columns)?;
93
94        Ok(Self {
95            catalog,
96            namespace: config.catalog.namespace,
97            table_name: config.catalog.table_name,
98            schema,
99            aligner,
100        })
101    }
102
103    /// Convert one PK cell into an Iceberg [`Datum`], or `None` when NULL.
104    fn cell_to_datum(
105        col_name: &str,
106        array: &dyn Array,
107        row: usize,
108    ) -> Result<Option<Datum>, LookupError> {
109        use arrow_array::{
110            BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
111            Int8Array, LargeStringArray, StringArray, StringViewArray,
112        };
113        use arrow_schema::DataType;
114
115        if array.is_null(row) {
116            return Ok(None);
117        }
118
119        macro_rules! downcast {
120            ($ty:ty) => {
121                array.as_any().downcast_ref::<$ty>().ok_or_else(|| {
122                    LookupError::Internal(format!("pk column '{col_name}' downcast failed"))
123                })?
124            };
125        }
126
127        let datum = match array.data_type() {
128            DataType::Int8 => Datum::int(i32::from(downcast!(Int8Array).value(row))),
129            DataType::Int16 => Datum::int(i32::from(downcast!(Int16Array).value(row))),
130            DataType::Int32 => Datum::int(downcast!(Int32Array).value(row)),
131            DataType::Int64 => Datum::long(downcast!(Int64Array).value(row)),
132            DataType::Float32 => Datum::float(downcast!(Float32Array).value(row)),
133            DataType::Float64 => Datum::double(downcast!(Float64Array).value(row)),
134            DataType::Boolean => Datum::bool(downcast!(BooleanArray).value(row)),
135            DataType::Utf8 => Datum::string(downcast!(StringArray).value(row)),
136            DataType::LargeUtf8 => Datum::string(downcast!(LargeStringArray).value(row)),
137            DataType::Utf8View => Datum::string(downcast!(StringViewArray).value(row)),
138            dt => {
139                return Err(LookupError::Internal(format!(
140                    "unsupported PK data type for iceberg lookup: {dt} (column \"{col_name}\")"
141                )));
142            }
143        };
144        Ok(Some(datum))
145    }
146
147    /// Build a single Iceberg predicate over the decoded key columns: a
148    /// single-column PK folds into `pk IN (...)`; a composite PK becomes an OR
149    /// of per-key AND-groups.
150    fn build_key_predicate(
151        pk_cols: &[String],
152        pk_arrays: &[ArrayRef],
153        n_keys: usize,
154    ) -> Result<IcebergPredicate, LookupError> {
155        if pk_cols.len() == 1 {
156            let col = &pk_cols[0];
157            let array = pk_arrays[0].as_ref();
158            let mut datums = Vec::with_capacity(n_keys);
159            let mut has_null = false;
160            for row in 0..n_keys {
161                match Self::cell_to_datum(col, array, row)? {
162                    Some(d) => datums.push(d),
163                    None => has_null = true,
164                }
165            }
166            let mut pred: Option<IcebergPredicate> =
167                (!datums.is_empty()).then(|| Reference::new(col.clone()).is_in(datums));
168            if has_null {
169                let null_pred = Reference::new(col.clone()).is_null();
170                pred = Some(match pred {
171                    Some(p) => p.or(null_pred),
172                    None => null_pred,
173                });
174            }
175            return pred.ok_or_else(|| LookupError::Internal("no keys to look up".into()));
176        }
177
178        let mut groups: Vec<IcebergPredicate> = Vec::with_capacity(n_keys);
179        for row in 0..n_keys {
180            let mut conj: Option<IcebergPredicate> = None;
181            for (ci, col) in pk_cols.iter().enumerate() {
182                let term = match Self::cell_to_datum(col, pk_arrays[ci].as_ref(), row)? {
183                    Some(d) => Reference::new(col.clone()).equal_to(d),
184                    None => Reference::new(col.clone()).is_null(),
185                };
186                conj = Some(match conj {
187                    Some(c) => c.and(term),
188                    None => term,
189                });
190            }
191            if let Some(c) = conj {
192                groups.push(c);
193            }
194        }
195        let mut it = groups.into_iter();
196        it.next()
197            .map(|first| it.fold(first, IcebergPredicate::or))
198            .ok_or_else(|| LookupError::Internal("no keys to look up".into()))
199    }
200}
201
202#[cfg(feature = "iceberg")]
203impl LookupSource for IcebergLookupSource {
204    async fn query(
205        &self,
206        keys: &[&[u8]],
207        _predicates: &[Predicate],
208        projection: &[ColumnId],
209    ) -> Result<Vec<Option<RecordBatch>>, LookupError> {
210        use tokio_stream::StreamExt;
211
212        if keys.is_empty() {
213            return Ok(Vec::new());
214        }
215
216        let pk_arrays = self.aligner.decode_keys(keys)?;
217        let predicate =
218            Self::build_key_predicate(self.aligner.pk_columns(), &pk_arrays, keys.len())?;
219
220        // Reload the table per query so lookups see the latest snapshot
221        // (eventually-consistent freshness; the cache layer adds TTL on top).
222        let table = crate::lakehouse::iceberg_io::load_table(
223            self.catalog.as_ref(),
224            &self.namespace,
225            &self.table_name,
226        )
227        .await
228        .map_err(|e| LookupError::Query(format!("load iceberg table: {e}")))?;
229
230        // Projection pushdown: select only the requested columns (always incl.
231        // the key, so realignment still works), else every column.
232        let mut builder = table.scan().with_filter(predicate);
233        builder = if projection.is_empty() {
234            builder.select_all()
235        } else {
236            builder.select(projection_names(&self.schema, projection)?)
237        };
238        let scan = builder
239            .build()
240            .map_err(|e| LookupError::Query(format!("build iceberg scan: {e}")))?;
241        let stream = scan
242            .to_arrow()
243            .await
244            .map_err(|e| LookupError::Query(format!("iceberg scan to arrow: {e}")))?;
245
246        let mut batches = Vec::new();
247        let mut stream = std::pin::pin!(stream);
248        while let Some(result) = stream.next().await {
249            batches
250                .push(result.map_err(|e| LookupError::Query(format!("read iceberg batch: {e}")))?);
251        }
252
253        self.aligner.align(keys, &batches)
254    }
255
256    fn capabilities(&self) -> LookupSourceCapabilities {
257        LookupSourceCapabilities {
258            supports_batch_lookup: true,
259            supports_projection_pushdown: true,
260            ..LookupSourceCapabilities::none()
261        }
262    }
263
264    #[allow(clippy::unnecessary_literal_bound)]
265    fn source_name(&self) -> &str {
266        "iceberg"
267    }
268
269    fn schema(&self) -> SchemaRef {
270        Arc::clone(&self.schema)
271    }
272
273    async fn health_check(&self) -> Result<(), LookupError> {
274        crate::lakehouse::iceberg_io::load_table(
275            self.catalog.as_ref(),
276            &self.namespace,
277            &self.table_name,
278        )
279        .await
280        .map(|_| ())
281        .map_err(|e| LookupError::Connection(format!("health check: {e}")))
282    }
283}
284
285#[cfg(all(test, feature = "iceberg"))]
286mod tests {
287    use super::*;
288    use arrow_array::{Int64Array, StringArray};
289
290    #[test]
291    fn cell_to_datum_null_and_unsupported() {
292        let arr = Int64Array::from(vec![None, Some(1)]);
293        assert!(IcebergLookupSource::cell_to_datum("id", &arr, 0)
294            .unwrap()
295            .is_none());
296        assert!(IcebergLookupSource::cell_to_datum("id", &arr, 1)
297            .unwrap()
298            .is_some());
299        // Binary keys are not supported as Iceberg predicates.
300        let bin = arrow_array::BinaryArray::from(vec![b"x".as_ref()]);
301        assert!(IcebergLookupSource::cell_to_datum("k", &bin, 0).is_err());
302    }
303
304    #[test]
305    fn single_col_predicate_is_in_list() {
306        let cols = vec!["id".to_string()];
307        let arrays: Vec<ArrayRef> = vec![Arc::new(Int64Array::from(vec![1, 2, 3]))];
308        let s = format!(
309            "{}",
310            IcebergLookupSource::build_key_predicate(&cols, &arrays, 3).unwrap()
311        )
312        .to_uppercase();
313        assert!(s.contains("IN") && s.contains("id".to_uppercase().as_str()));
314    }
315
316    #[test]
317    fn composite_predicate_is_or_of_and() {
318        let cols = vec!["a".to_string(), "b".to_string()];
319        let arrays: Vec<ArrayRef> = vec![
320            Arc::new(Int64Array::from(vec![1, 2])),
321            Arc::new(StringArray::from(vec!["x", "y"])),
322        ];
323        let s = format!(
324            "{}",
325            IcebergLookupSource::build_key_predicate(&cols, &arrays, 2).unwrap()
326        )
327        .to_uppercase();
328        assert!(s.contains("AND") && s.contains("OR"));
329    }
330
331    #[test]
332    fn null_key_adds_is_null_term() {
333        let cols = vec!["id".to_string()];
334        let arrays: Vec<ArrayRef> = vec![Arc::new(Int64Array::from(vec![Some(1), None]))];
335        let s = format!(
336            "{}",
337            IcebergLookupSource::build_key_predicate(&cols, &arrays, 2).unwrap()
338        )
339        .to_uppercase();
340        assert!(s.contains("NULL"));
341    }
342}