Skip to main content

laminar_connectors/lookup/
parquet_source.rs

1//! Parquet file lookup source.
2//!
3//! Loads a Parquet file into an in-memory `HashMap` at construction time,
4//! providing a `LookupSource` implementation suitable for static or
5//! slowly-changing dimension tables stored as Parquet.
6//!
7//! ## Usage
8//!
9//! ```rust,no_run
10//! use laminar_connectors::lookup::parquet_source::{ParquetLookupSource, ParquetLookupSourceConfig};
11//!
12//! let config = ParquetLookupSourceConfig {
13//!     path: "/data/customers.parquet".into(),
14//!     primary_key_columns: vec!["customer_id".into()],
15//!     batch_size: 8192,
16//! };
17//!
18//! let source = ParquetLookupSource::from_path(config).unwrap();
19//! ```
20
21use std::collections::HashMap;
22use std::fmt;
23use std::future::Future;
24use std::sync::Arc;
25
26use arrow_array::RecordBatch;
27use arrow_cast::display::ArrayFormatter;
28use arrow_schema::SchemaRef;
29use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
30
31use laminar_core::lookup::predicate::Predicate;
32use laminar_core::lookup::source::{ColumnId, LookupError, LookupSource, LookupSourceCapabilities};
33
34/// Configuration for [`ParquetLookupSource`].
35#[derive(Debug, Clone)]
36pub struct ParquetLookupSourceConfig {
37    /// Path to the Parquet file (local filesystem).
38    pub path: String,
39    /// Column names that form the primary key.
40    pub primary_key_columns: Vec<String>,
41    /// Batch size for reading the Parquet file (default: 8192).
42    pub batch_size: usize,
43}
44
45impl Default for ParquetLookupSourceConfig {
46    fn default() -> Self {
47        Self {
48            path: String::new(),
49            primary_key_columns: Vec::new(),
50            batch_size: 8192,
51        }
52    }
53}
54
55/// A `LookupSource` that loads a Parquet file into memory.
56///
57/// All data is loaded eagerly at construction time. Lookups are served
58/// from an in-memory `HashMap` keyed by the serialized primary key
59/// columns. The values are single-row `RecordBatch` instances.
60///
61/// This is appropriate for dimension tables that fit in memory and are
62/// read-mostly or static.
63pub struct ParquetLookupSource {
64    config: ParquetLookupSourceConfig,
65    /// Primary key bytes → single-row `RecordBatch`.
66    data: HashMap<Vec<u8>, RecordBatch>,
67    /// Schema of the loaded Parquet file.
68    schema: SchemaRef,
69    /// Number of rows loaded.
70    row_count: u64,
71}
72
73impl fmt::Debug for ParquetLookupSource {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.debug_struct("ParquetLookupSource")
76            .field("config", &self.config)
77            .field("data_len", &self.data.len())
78            .field("schema", &self.schema)
79            .field("row_count", &self.row_count)
80            .finish()
81    }
82}
83
84/// Serialize primary key columns from a `RecordBatch` row into bytes.
85///
86/// Concatenates the display representation of each PK column for the
87/// given row index, separated by `\0` to avoid key collisions.
88fn serialize_pk(
89    batch: &RecordBatch,
90    pk_indices: &[usize],
91    row: usize,
92) -> Result<Vec<u8>, LookupError> {
93    let mut key = Vec::new();
94    for (i, &col_idx) in pk_indices.iter().enumerate() {
95        if i > 0 {
96            key.push(0); // separator
97        }
98        let col = batch.column(col_idx);
99        let formatter =
100            ArrayFormatter::try_new(col.as_ref(), &arrow_cast::display::FormatOptions::default())
101                .map_err(|e| LookupError::Internal(format!("format pk: {e}")))?;
102        let display = formatter.value(row);
103        key.extend_from_slice(format!("{display}").as_bytes());
104    }
105    Ok(key)
106}
107
108/// Extract a single row from a `RecordBatch` as a new single-row batch.
109fn extract_row(batch: &RecordBatch, row: usize) -> Result<RecordBatch, LookupError> {
110    use arrow_select::take::take;
111    #[allow(clippy::cast_possible_truncation)] // row index within a RecordBatch fits u32
112    let indices = arrow_array::UInt32Array::from(vec![row as u32]);
113    let columns: Vec<_> = batch
114        .columns()
115        .iter()
116        .map(|col| take(col.as_ref(), &indices, None))
117        .collect::<Result<_, _>>()
118        .map_err(|e| LookupError::Internal(format!("extract row: {e}")))?;
119    RecordBatch::try_new(batch.schema(), columns)
120        .map_err(|e| LookupError::Internal(format!("build row batch: {e}")))
121}
122
123impl ParquetLookupSource {
124    /// Load a Parquet file from the local filesystem.
125    ///
126    /// Reads all record batches, extracts primary key columns, and stores
127    /// each row in an in-memory `HashMap`.
128    ///
129    /// # Errors
130    ///
131    /// Returns [`LookupError::Internal`] if the file cannot be opened,
132    /// read, or if the primary key columns are not found in the schema.
133    pub fn from_path(config: ParquetLookupSourceConfig) -> Result<Self, LookupError> {
134        let file = std::fs::File::open(&config.path)
135            .map_err(|e| LookupError::Internal(format!("open {}: {e}", config.path)))?;
136
137        let builder = ParquetRecordBatchReaderBuilder::try_new(file)
138            .map_err(|e| LookupError::Internal(format!("parquet reader: {e}")))?;
139
140        let schema = builder.schema().clone();
141
142        // Resolve PK column indices
143        let pk_indices: Vec<usize> = config
144            .primary_key_columns
145            .iter()
146            .map(|name| {
147                schema
148                    .index_of(name)
149                    .map_err(|_| LookupError::Internal(format!("pk column not found: {name}")))
150            })
151            .collect::<Result<Vec<_>, _>>()?;
152
153        let reader = builder
154            .with_batch_size(config.batch_size)
155            .build()
156            .map_err(|e| LookupError::Internal(format!("build reader: {e}")))?;
157
158        let mut data = HashMap::new();
159        let mut row_count = 0u64;
160
161        for batch_result in reader {
162            let batch =
163                batch_result.map_err(|e| LookupError::Internal(format!("read batch: {e}")))?;
164
165            for row in 0..batch.num_rows() {
166                let key = serialize_pk(&batch, &pk_indices, row)?;
167                let value = extract_row(&batch, row)?;
168                data.insert(key, value);
169                row_count += 1;
170            }
171        }
172
173        Ok(Self {
174            config,
175            data,
176            schema: Arc::clone(&schema),
177            row_count,
178        })
179    }
180
181    /// Returns the Arrow schema of the loaded Parquet file.
182    #[must_use]
183    pub fn schema(&self) -> &SchemaRef {
184        &self.schema
185    }
186}
187
188impl LookupSource for ParquetLookupSource {
189    fn query(
190        &self,
191        keys: &[&[u8]],
192        _predicates: &[Predicate],
193        _projection: &[ColumnId],
194    ) -> impl Future<Output = Result<Vec<Option<RecordBatch>>, LookupError>> + Send {
195        let results: Vec<Option<RecordBatch>> = keys
196            .iter()
197            .map(|k| self.data.get::<[u8]>(k).cloned())
198            .collect();
199        async move { Ok(results) }
200    }
201
202    fn capabilities(&self) -> LookupSourceCapabilities {
203        LookupSourceCapabilities {
204            supports_predicate_pushdown: false,
205            supports_projection_pushdown: false,
206            supports_batch_lookup: true,
207            max_batch_size: 0,
208        }
209    }
210
211    #[allow(clippy::unnecessary_literal_bound)]
212    fn source_name(&self) -> &str {
213        "parquet"
214    }
215
216    fn schema(&self) -> SchemaRef {
217        Arc::clone(&self.schema)
218    }
219
220    fn estimated_row_count(&self) -> Option<u64> {
221        Some(self.row_count)
222    }
223
224    async fn health_check(&self) -> Result<(), LookupError> {
225        Ok(())
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use arrow_array::{Int64Array, StringArray};
233    use arrow_schema::{DataType, Field, Schema};
234    use parquet::arrow::ArrowWriter;
235
236    /// Create a temp Parquet file with customer data and return the path.
237    fn write_temp_parquet(schema: &SchemaRef, batches: &[RecordBatch]) -> tempfile::NamedTempFile {
238        let file = tempfile::NamedTempFile::new().expect("temp file");
239        let writer_file = file.reopen().expect("reopen");
240        let mut writer =
241            ArrowWriter::try_new(writer_file, Arc::clone(schema), None).expect("arrow writer");
242        for batch in batches {
243            writer.write(batch).expect("write batch");
244        }
245        writer.close().expect("close writer");
246        file
247    }
248
249    fn customer_schema() -> SchemaRef {
250        Arc::new(Schema::new(vec![
251            Field::new("customer_id", DataType::Int64, false),
252            Field::new("name", DataType::Utf8, false),
253            Field::new("tier", DataType::Utf8, false),
254        ]))
255    }
256
257    fn customer_batch(ids: &[i64], names: &[&str], tiers: &[&str]) -> RecordBatch {
258        let schema = customer_schema();
259        RecordBatch::try_new(
260            schema,
261            vec![
262                Arc::new(Int64Array::from(ids.to_vec())),
263                Arc::new(StringArray::from(names.to_vec())),
264                Arc::new(StringArray::from(tiers.to_vec())),
265            ],
266        )
267        .unwrap()
268    }
269
270    #[tokio::test]
271    async fn test_single_pk_hit_miss() {
272        let batch = customer_batch(
273            &[1, 2, 3],
274            &["Alice", "Bob", "Carol"],
275            &["gold", "silver", "bronze"],
276        );
277        let schema = customer_schema();
278        let file = write_temp_parquet(&schema, &[batch]);
279
280        let config = ParquetLookupSourceConfig {
281            path: file.path().to_string_lossy().into_owned(),
282            primary_key_columns: vec!["customer_id".into()],
283            batch_size: 8192,
284        };
285        let source = ParquetLookupSource::from_path(config).unwrap();
286
287        // Hit
288        let results = source.query(&[b"1"], &[], &[]).await.unwrap();
289        assert_eq!(results.len(), 1);
290        assert!(results[0].is_some());
291
292        // Miss
293        let results = source.query(&[b"999"], &[], &[]).await.unwrap();
294        assert_eq!(results.len(), 1);
295        assert!(results[0].is_none());
296    }
297
298    #[tokio::test]
299    async fn test_composite_pk() {
300        let schema = Arc::new(Schema::new(vec![
301            Field::new("region", DataType::Utf8, false),
302            Field::new("city", DataType::Utf8, false),
303            Field::new("population", DataType::Int64, false),
304        ]));
305        let batch = RecordBatch::try_new(
306            Arc::clone(&schema),
307            vec![
308                Arc::new(StringArray::from(vec!["US", "US", "UK"])),
309                Arc::new(StringArray::from(vec!["NYC", "LA", "London"])),
310                Arc::new(Int64Array::from(vec![8_000_000, 4_000_000, 9_000_000])),
311            ],
312        )
313        .unwrap();
314        let file = write_temp_parquet(&schema, &[batch]);
315
316        let config = ParquetLookupSourceConfig {
317            path: file.path().to_string_lossy().into_owned(),
318            primary_key_columns: vec!["region".into(), "city".into()],
319            batch_size: 8192,
320        };
321        let source = ParquetLookupSource::from_path(config).unwrap();
322
323        // Composite key: "US\0NYC"
324        let key = b"US\0NYC";
325        let results = source.query(&[key.as_slice()], &[], &[]).await.unwrap();
326        assert!(results[0].is_some());
327
328        // Composite key miss
329        let key = b"US\0Chicago";
330        let results = source.query(&[key.as_slice()], &[], &[]).await.unwrap();
331        assert!(results[0].is_none());
332    }
333
334    #[tokio::test]
335    async fn test_empty_parquet() {
336        let schema = customer_schema();
337        // Write an empty batch
338        let batch = RecordBatch::try_new(
339            Arc::clone(&schema),
340            vec![
341                Arc::new(Int64Array::from(Vec::<i64>::new())),
342                Arc::new(StringArray::from(Vec::<&str>::new())),
343                Arc::new(StringArray::from(Vec::<&str>::new())),
344            ],
345        )
346        .unwrap();
347        let file = write_temp_parquet(&schema, &[batch]);
348
349        let config = ParquetLookupSourceConfig {
350            path: file.path().to_string_lossy().into_owned(),
351            primary_key_columns: vec!["customer_id".into()],
352            batch_size: 8192,
353        };
354        let source = ParquetLookupSource::from_path(config).unwrap();
355
356        assert_eq!(source.estimated_row_count(), Some(0));
357
358        let results = source.query(&[b"1"], &[], &[]).await.unwrap();
359        assert!(results[0].is_none());
360    }
361
362    #[tokio::test]
363    async fn test_batch_query() {
364        let batch = customer_batch(
365            &[1, 2, 3],
366            &["Alice", "Bob", "Carol"],
367            &["gold", "silver", "bronze"],
368        );
369        let schema = customer_schema();
370        let file = write_temp_parquet(&schema, &[batch]);
371
372        let config = ParquetLookupSourceConfig {
373            path: file.path().to_string_lossy().into_owned(),
374            primary_key_columns: vec!["customer_id".into()],
375            batch_size: 8192,
376        };
377        let source = ParquetLookupSource::from_path(config).unwrap();
378
379        let keys: Vec<&[u8]> = vec![b"1", b"999", b"3"];
380        let results = source.query(&keys, &[], &[]).await.unwrap();
381        assert_eq!(results.len(), 3);
382        assert!(results[0].is_some()); // 1 exists
383        assert!(results[1].is_none()); // 999 missing
384        assert!(results[2].is_some()); // 3 exists
385    }
386
387    #[test]
388    fn test_row_count_estimate() {
389        let batch = customer_batch(
390            &[1, 2, 3],
391            &["Alice", "Bob", "Carol"],
392            &["gold", "silver", "bronze"],
393        );
394        let schema = customer_schema();
395        let file = write_temp_parquet(&schema, &[batch]);
396
397        let config = ParquetLookupSourceConfig {
398            path: file.path().to_string_lossy().into_owned(),
399            primary_key_columns: vec!["customer_id".into()],
400            batch_size: 8192,
401        };
402        let source = ParquetLookupSource::from_path(config).unwrap();
403
404        assert_eq!(source.estimated_row_count(), Some(3));
405    }
406
407    #[test]
408    fn test_source_name() {
409        let batch = customer_batch(&[1], &["Alice"], &["gold"]);
410        let schema = customer_schema();
411        let file = write_temp_parquet(&schema, &[batch]);
412
413        let config = ParquetLookupSourceConfig {
414            path: file.path().to_string_lossy().into_owned(),
415            primary_key_columns: vec!["customer_id".into()],
416            batch_size: 8192,
417        };
418        let source = ParquetLookupSource::from_path(config).unwrap();
419
420        assert_eq!(source.source_name(), "parquet");
421    }
422
423    #[test]
424    fn test_capabilities() {
425        let batch = customer_batch(&[1], &["Alice"], &["gold"]);
426        let schema = customer_schema();
427        let file = write_temp_parquet(&schema, &[batch]);
428
429        let config = ParquetLookupSourceConfig {
430            path: file.path().to_string_lossy().into_owned(),
431            primary_key_columns: vec!["customer_id".into()],
432            batch_size: 8192,
433        };
434        let source = ParquetLookupSource::from_path(config).unwrap();
435
436        let caps = source.capabilities();
437        assert!(!caps.supports_predicate_pushdown);
438        assert!(!caps.supports_projection_pushdown);
439        assert!(caps.supports_batch_lookup);
440    }
441
442    #[tokio::test]
443    async fn test_health_check() {
444        let batch = customer_batch(&[1], &["Alice"], &["gold"]);
445        let schema = customer_schema();
446        let file = write_temp_parquet(&schema, &[batch]);
447
448        let config = ParquetLookupSourceConfig {
449            path: file.path().to_string_lossy().into_owned(),
450            primary_key_columns: vec!["customer_id".into()],
451            batch_size: 8192,
452        };
453        let source = ParquetLookupSource::from_path(config).unwrap();
454
455        assert!(source.health_check().await.is_ok());
456    }
457}