laminar_connectors/lookup/
iceberg_lookup.rs1#[cfg(feature = "iceberg")]
9use std::sync::Arc;
10
11#[cfg(feature = "iceberg")]
12use arrow_array::{Array, ArrayRef, RecordBatch};
13#[cfg(feature = "iceberg")]
14use arrow_row::SortField;
15#[cfg(feature = "iceberg")]
16use arrow_schema::SchemaRef;
17#[cfg(feature = "iceberg")]
18use iceberg::expr::{Predicate as IcebergPredicate, Reference};
19#[cfg(feature = "iceberg")]
20use iceberg::spec::Datum;
21#[cfg(feature = "iceberg")]
22use iceberg::Catalog;
23
24#[cfg(feature = "iceberg")]
25use laminar_core::lookup::predicate::Predicate;
26#[cfg(feature = "iceberg")]
27use laminar_core::lookup::source::{
28 projection_names, ColumnId, LookupError, LookupSource, LookupSourceCapabilities,
29};
30#[cfg(feature = "iceberg")]
31use laminar_core::lookup::KeyAligner;
32
33#[cfg(feature = "iceberg")]
34use crate::lakehouse::iceberg_config::IcebergCatalogConfig;
35
36#[cfg(feature = "iceberg")]
38#[derive(Debug, Clone)]
39pub struct IcebergLookupSourceConfig {
40 pub catalog: IcebergCatalogConfig,
42 pub primary_key_columns: Vec<String>,
44}
45
46#[cfg(feature = "iceberg")]
48pub struct IcebergLookupSource {
49 catalog: Arc<dyn Catalog>,
50 namespace: String,
51 table_name: String,
52 schema: SchemaRef,
53 aligner: KeyAligner,
54}
55
56#[cfg(feature = "iceberg")]
57impl IcebergLookupSource {
58 pub async fn open(config: IcebergLookupSourceConfig) -> Result<Self, LookupError> {
65 let catalog = crate::lakehouse::iceberg_io::build_catalog(&config.catalog)
66 .await
67 .map_err(|e| LookupError::Connection(format!("iceberg catalog: {e}")))?;
68 let table = crate::lakehouse::iceberg_io::load_table(
69 catalog.as_ref(),
70 &config.catalog.namespace,
71 &config.catalog.table_name,
72 )
73 .await
74 .map_err(|e| LookupError::Connection(format!("load iceberg table: {e}")))?;
75
76 let iceberg_schema = table.current_schema_ref();
77 let schema: SchemaRef = Arc::new(
78 iceberg::arrow::schema_to_arrow_schema(&iceberg_schema)
79 .map_err(|e| LookupError::Internal(format!("iceberg schema to arrow: {e}")))?,
80 );
81
82 let pk_sort_fields = config
83 .primary_key_columns
84 .iter()
85 .map(|name| {
86 let idx = schema
87 .index_of(name)
88 .map_err(|_| LookupError::Internal(format!("pk column not found: {name}")))?;
89 Ok(SortField::new(schema.field(idx).data_type().clone()))
90 })
91 .collect::<Result<Vec<_>, LookupError>>()?;
92 let aligner = KeyAligner::new(pk_sort_fields, config.primary_key_columns)?;
93
94 Ok(Self {
95 catalog,
96 namespace: config.catalog.namespace,
97 table_name: config.catalog.table_name,
98 schema,
99 aligner,
100 })
101 }
102
103 fn cell_to_datum(
105 col_name: &str,
106 array: &dyn Array,
107 row: usize,
108 ) -> Result<Option<Datum>, LookupError> {
109 use arrow_array::{
110 BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
111 Int8Array, LargeStringArray, StringArray, StringViewArray,
112 };
113 use arrow_schema::DataType;
114
115 if array.is_null(row) {
116 return Ok(None);
117 }
118
119 macro_rules! downcast {
120 ($ty:ty) => {
121 array.as_any().downcast_ref::<$ty>().ok_or_else(|| {
122 LookupError::Internal(format!("pk column '{col_name}' downcast failed"))
123 })?
124 };
125 }
126
127 let datum = match array.data_type() {
128 DataType::Int8 => Datum::int(i32::from(downcast!(Int8Array).value(row))),
129 DataType::Int16 => Datum::int(i32::from(downcast!(Int16Array).value(row))),
130 DataType::Int32 => Datum::int(downcast!(Int32Array).value(row)),
131 DataType::Int64 => Datum::long(downcast!(Int64Array).value(row)),
132 DataType::Float32 => Datum::float(downcast!(Float32Array).value(row)),
133 DataType::Float64 => Datum::double(downcast!(Float64Array).value(row)),
134 DataType::Boolean => Datum::bool(downcast!(BooleanArray).value(row)),
135 DataType::Utf8 => Datum::string(downcast!(StringArray).value(row)),
136 DataType::LargeUtf8 => Datum::string(downcast!(LargeStringArray).value(row)),
137 DataType::Utf8View => Datum::string(downcast!(StringViewArray).value(row)),
138 dt => {
139 return Err(LookupError::Internal(format!(
140 "unsupported PK data type for iceberg lookup: {dt} (column \"{col_name}\")"
141 )));
142 }
143 };
144 Ok(Some(datum))
145 }
146
147 fn build_key_predicate(
151 pk_cols: &[String],
152 pk_arrays: &[ArrayRef],
153 n_keys: usize,
154 ) -> Result<IcebergPredicate, LookupError> {
155 if pk_cols.len() == 1 {
156 let col = &pk_cols[0];
157 let array = pk_arrays[0].as_ref();
158 let mut datums = Vec::with_capacity(n_keys);
159 let mut has_null = false;
160 for row in 0..n_keys {
161 match Self::cell_to_datum(col, array, row)? {
162 Some(d) => datums.push(d),
163 None => has_null = true,
164 }
165 }
166 let mut pred: Option<IcebergPredicate> =
167 (!datums.is_empty()).then(|| Reference::new(col.clone()).is_in(datums));
168 if has_null {
169 let null_pred = Reference::new(col.clone()).is_null();
170 pred = Some(match pred {
171 Some(p) => p.or(null_pred),
172 None => null_pred,
173 });
174 }
175 return pred.ok_or_else(|| LookupError::Internal("no keys to look up".into()));
176 }
177
178 let mut groups: Vec<IcebergPredicate> = Vec::with_capacity(n_keys);
179 for row in 0..n_keys {
180 let mut conj: Option<IcebergPredicate> = None;
181 for (ci, col) in pk_cols.iter().enumerate() {
182 let term = match Self::cell_to_datum(col, pk_arrays[ci].as_ref(), row)? {
183 Some(d) => Reference::new(col.clone()).equal_to(d),
184 None => Reference::new(col.clone()).is_null(),
185 };
186 conj = Some(match conj {
187 Some(c) => c.and(term),
188 None => term,
189 });
190 }
191 if let Some(c) = conj {
192 groups.push(c);
193 }
194 }
195 let mut it = groups.into_iter();
196 it.next()
197 .map(|first| it.fold(first, IcebergPredicate::or))
198 .ok_or_else(|| LookupError::Internal("no keys to look up".into()))
199 }
200}
201
202#[cfg(feature = "iceberg")]
203impl LookupSource for IcebergLookupSource {
204 async fn query(
205 &self,
206 keys: &[&[u8]],
207 _predicates: &[Predicate],
208 projection: &[ColumnId],
209 ) -> Result<Vec<Option<RecordBatch>>, LookupError> {
210 use tokio_stream::StreamExt;
211
212 if keys.is_empty() {
213 return Ok(Vec::new());
214 }
215
216 let pk_arrays = self.aligner.decode_keys(keys)?;
217 let predicate =
218 Self::build_key_predicate(self.aligner.pk_columns(), &pk_arrays, keys.len())?;
219
220 let table = crate::lakehouse::iceberg_io::load_table(
223 self.catalog.as_ref(),
224 &self.namespace,
225 &self.table_name,
226 )
227 .await
228 .map_err(|e| LookupError::Query(format!("load iceberg table: {e}")))?;
229
230 let mut builder = table.scan().with_filter(predicate);
233 builder = if projection.is_empty() {
234 builder.select_all()
235 } else {
236 builder.select(projection_names(&self.schema, projection)?)
237 };
238 let scan = builder
239 .build()
240 .map_err(|e| LookupError::Query(format!("build iceberg scan: {e}")))?;
241 let stream = scan
242 .to_arrow()
243 .await
244 .map_err(|e| LookupError::Query(format!("iceberg scan to arrow: {e}")))?;
245
246 let mut batches = Vec::new();
247 let mut stream = std::pin::pin!(stream);
248 while let Some(result) = stream.next().await {
249 batches
250 .push(result.map_err(|e| LookupError::Query(format!("read iceberg batch: {e}")))?);
251 }
252
253 self.aligner.align(keys, &batches)
254 }
255
256 fn capabilities(&self) -> LookupSourceCapabilities {
257 LookupSourceCapabilities {
258 supports_batch_lookup: true,
259 supports_projection_pushdown: true,
260 ..LookupSourceCapabilities::none()
261 }
262 }
263
264 #[allow(clippy::unnecessary_literal_bound)]
265 fn source_name(&self) -> &str {
266 "iceberg"
267 }
268
269 fn schema(&self) -> SchemaRef {
270 Arc::clone(&self.schema)
271 }
272
273 async fn health_check(&self) -> Result<(), LookupError> {
274 crate::lakehouse::iceberg_io::load_table(
275 self.catalog.as_ref(),
276 &self.namespace,
277 &self.table_name,
278 )
279 .await
280 .map(|_| ())
281 .map_err(|e| LookupError::Connection(format!("health check: {e}")))
282 }
283}
284
285#[cfg(all(test, feature = "iceberg"))]
286mod tests {
287 use super::*;
288 use arrow_array::{Int64Array, StringArray};
289
290 #[test]
291 fn cell_to_datum_null_and_unsupported() {
292 let arr = Int64Array::from(vec![None, Some(1)]);
293 assert!(IcebergLookupSource::cell_to_datum("id", &arr, 0)
294 .unwrap()
295 .is_none());
296 assert!(IcebergLookupSource::cell_to_datum("id", &arr, 1)
297 .unwrap()
298 .is_some());
299 let bin = arrow_array::BinaryArray::from(vec![b"x".as_ref()]);
301 assert!(IcebergLookupSource::cell_to_datum("k", &bin, 0).is_err());
302 }
303
304 #[test]
305 fn single_col_predicate_is_in_list() {
306 let cols = vec!["id".to_string()];
307 let arrays: Vec<ArrayRef> = vec![Arc::new(Int64Array::from(vec![1, 2, 3]))];
308 let s = format!(
309 "{}",
310 IcebergLookupSource::build_key_predicate(&cols, &arrays, 3).unwrap()
311 )
312 .to_uppercase();
313 assert!(s.contains("IN") && s.contains("id".to_uppercase().as_str()));
314 }
315
316 #[test]
317 fn composite_predicate_is_or_of_and() {
318 let cols = vec!["a".to_string(), "b".to_string()];
319 let arrays: Vec<ArrayRef> = vec![
320 Arc::new(Int64Array::from(vec![1, 2])),
321 Arc::new(StringArray::from(vec!["x", "y"])),
322 ];
323 let s = format!(
324 "{}",
325 IcebergLookupSource::build_key_predicate(&cols, &arrays, 2).unwrap()
326 )
327 .to_uppercase();
328 assert!(s.contains("AND") && s.contains("OR"));
329 }
330
331 #[test]
332 fn null_key_adds_is_null_term() {
333 let cols = vec!["id".to_string()];
334 let arrays: Vec<ArrayRef> = vec![Arc::new(Int64Array::from(vec![Some(1), None]))];
335 let s = format!(
336 "{}",
337 IcebergLookupSource::build_key_predicate(&cols, &arrays, 2).unwrap()
338 )
339 .to_uppercase();
340 assert!(s.contains("NULL"));
341 }
342}