1use std::collections::HashMap;
22use std::fmt;
23use std::future::Future;
24use std::sync::Arc;
25
26use arrow_array::RecordBatch;
27use arrow_cast::display::ArrayFormatter;
28use arrow_schema::SchemaRef;
29use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
30
31use laminar_core::lookup::predicate::Predicate;
32use laminar_core::lookup::source::{ColumnId, LookupError, LookupSource, LookupSourceCapabilities};
33
34#[derive(Debug, Clone)]
36pub struct ParquetLookupSourceConfig {
37 pub path: String,
39 pub primary_key_columns: Vec<String>,
41 pub batch_size: usize,
43}
44
45impl Default for ParquetLookupSourceConfig {
46 fn default() -> Self {
47 Self {
48 path: String::new(),
49 primary_key_columns: Vec::new(),
50 batch_size: 8192,
51 }
52 }
53}
54
55pub struct ParquetLookupSource {
64 config: ParquetLookupSourceConfig,
65 data: HashMap<Vec<u8>, RecordBatch>,
67 schema: SchemaRef,
69 row_count: u64,
71}
72
73impl fmt::Debug for ParquetLookupSource {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct("ParquetLookupSource")
76 .field("config", &self.config)
77 .field("data_len", &self.data.len())
78 .field("schema", &self.schema)
79 .field("row_count", &self.row_count)
80 .finish()
81 }
82}
83
84fn serialize_pk(
89 batch: &RecordBatch,
90 pk_indices: &[usize],
91 row: usize,
92) -> Result<Vec<u8>, LookupError> {
93 let mut key = Vec::new();
94 for (i, &col_idx) in pk_indices.iter().enumerate() {
95 if i > 0 {
96 key.push(0); }
98 let col = batch.column(col_idx);
99 let formatter =
100 ArrayFormatter::try_new(col.as_ref(), &arrow_cast::display::FormatOptions::default())
101 .map_err(|e| LookupError::Internal(format!("format pk: {e}")))?;
102 let display = formatter.value(row);
103 key.extend_from_slice(format!("{display}").as_bytes());
104 }
105 Ok(key)
106}
107
108fn extract_row(batch: &RecordBatch, row: usize) -> Result<RecordBatch, LookupError> {
110 use arrow_select::take::take;
111 #[allow(clippy::cast_possible_truncation)] let indices = arrow_array::UInt32Array::from(vec![row as u32]);
113 let columns: Vec<_> = batch
114 .columns()
115 .iter()
116 .map(|col| take(col.as_ref(), &indices, None))
117 .collect::<Result<_, _>>()
118 .map_err(|e| LookupError::Internal(format!("extract row: {e}")))?;
119 RecordBatch::try_new(batch.schema(), columns)
120 .map_err(|e| LookupError::Internal(format!("build row batch: {e}")))
121}
122
123impl ParquetLookupSource {
124 pub fn from_path(config: ParquetLookupSourceConfig) -> Result<Self, LookupError> {
134 let file = std::fs::File::open(&config.path)
135 .map_err(|e| LookupError::Internal(format!("open {}: {e}", config.path)))?;
136
137 let builder = ParquetRecordBatchReaderBuilder::try_new(file)
138 .map_err(|e| LookupError::Internal(format!("parquet reader: {e}")))?;
139
140 let schema = builder.schema().clone();
141
142 let pk_indices: Vec<usize> = config
144 .primary_key_columns
145 .iter()
146 .map(|name| {
147 schema
148 .index_of(name)
149 .map_err(|_| LookupError::Internal(format!("pk column not found: {name}")))
150 })
151 .collect::<Result<Vec<_>, _>>()?;
152
153 let reader = builder
154 .with_batch_size(config.batch_size)
155 .build()
156 .map_err(|e| LookupError::Internal(format!("build reader: {e}")))?;
157
158 let mut data = HashMap::new();
159 let mut row_count = 0u64;
160
161 for batch_result in reader {
162 let batch =
163 batch_result.map_err(|e| LookupError::Internal(format!("read batch: {e}")))?;
164
165 for row in 0..batch.num_rows() {
166 let key = serialize_pk(&batch, &pk_indices, row)?;
167 let value = extract_row(&batch, row)?;
168 data.insert(key, value);
169 row_count += 1;
170 }
171 }
172
173 Ok(Self {
174 config,
175 data,
176 schema: Arc::clone(&schema),
177 row_count,
178 })
179 }
180
181 #[must_use]
183 pub fn schema(&self) -> &SchemaRef {
184 &self.schema
185 }
186}
187
188impl LookupSource for ParquetLookupSource {
189 fn query(
190 &self,
191 keys: &[&[u8]],
192 _predicates: &[Predicate],
193 _projection: &[ColumnId],
194 ) -> impl Future<Output = Result<Vec<Option<RecordBatch>>, LookupError>> + Send {
195 let results: Vec<Option<RecordBatch>> = keys
196 .iter()
197 .map(|k| self.data.get::<[u8]>(k).cloned())
198 .collect();
199 async move { Ok(results) }
200 }
201
202 fn capabilities(&self) -> LookupSourceCapabilities {
203 LookupSourceCapabilities {
204 supports_predicate_pushdown: false,
205 supports_projection_pushdown: false,
206 supports_batch_lookup: true,
207 max_batch_size: 0,
208 }
209 }
210
211 #[allow(clippy::unnecessary_literal_bound)]
212 fn source_name(&self) -> &str {
213 "parquet"
214 }
215
216 fn schema(&self) -> SchemaRef {
217 Arc::clone(&self.schema)
218 }
219
220 fn estimated_row_count(&self) -> Option<u64> {
221 Some(self.row_count)
222 }
223
224 async fn health_check(&self) -> Result<(), LookupError> {
225 Ok(())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use arrow_array::{Int64Array, StringArray};
233 use arrow_schema::{DataType, Field, Schema};
234 use parquet::arrow::ArrowWriter;
235
236 fn write_temp_parquet(schema: &SchemaRef, batches: &[RecordBatch]) -> tempfile::NamedTempFile {
238 let file = tempfile::NamedTempFile::new().expect("temp file");
239 let writer_file = file.reopen().expect("reopen");
240 let mut writer =
241 ArrowWriter::try_new(writer_file, Arc::clone(schema), None).expect("arrow writer");
242 for batch in batches {
243 writer.write(batch).expect("write batch");
244 }
245 writer.close().expect("close writer");
246 file
247 }
248
249 fn customer_schema() -> SchemaRef {
250 Arc::new(Schema::new(vec![
251 Field::new("customer_id", DataType::Int64, false),
252 Field::new("name", DataType::Utf8, false),
253 Field::new("tier", DataType::Utf8, false),
254 ]))
255 }
256
257 fn customer_batch(ids: &[i64], names: &[&str], tiers: &[&str]) -> RecordBatch {
258 let schema = customer_schema();
259 RecordBatch::try_new(
260 schema,
261 vec![
262 Arc::new(Int64Array::from(ids.to_vec())),
263 Arc::new(StringArray::from(names.to_vec())),
264 Arc::new(StringArray::from(tiers.to_vec())),
265 ],
266 )
267 .unwrap()
268 }
269
270 #[tokio::test]
271 async fn test_single_pk_hit_miss() {
272 let batch = customer_batch(
273 &[1, 2, 3],
274 &["Alice", "Bob", "Carol"],
275 &["gold", "silver", "bronze"],
276 );
277 let schema = customer_schema();
278 let file = write_temp_parquet(&schema, &[batch]);
279
280 let config = ParquetLookupSourceConfig {
281 path: file.path().to_string_lossy().into_owned(),
282 primary_key_columns: vec!["customer_id".into()],
283 batch_size: 8192,
284 };
285 let source = ParquetLookupSource::from_path(config).unwrap();
286
287 let results = source.query(&[b"1"], &[], &[]).await.unwrap();
289 assert_eq!(results.len(), 1);
290 assert!(results[0].is_some());
291
292 let results = source.query(&[b"999"], &[], &[]).await.unwrap();
294 assert_eq!(results.len(), 1);
295 assert!(results[0].is_none());
296 }
297
298 #[tokio::test]
299 async fn test_composite_pk() {
300 let schema = Arc::new(Schema::new(vec![
301 Field::new("region", DataType::Utf8, false),
302 Field::new("city", DataType::Utf8, false),
303 Field::new("population", DataType::Int64, false),
304 ]));
305 let batch = RecordBatch::try_new(
306 Arc::clone(&schema),
307 vec![
308 Arc::new(StringArray::from(vec!["US", "US", "UK"])),
309 Arc::new(StringArray::from(vec!["NYC", "LA", "London"])),
310 Arc::new(Int64Array::from(vec![8_000_000, 4_000_000, 9_000_000])),
311 ],
312 )
313 .unwrap();
314 let file = write_temp_parquet(&schema, &[batch]);
315
316 let config = ParquetLookupSourceConfig {
317 path: file.path().to_string_lossy().into_owned(),
318 primary_key_columns: vec!["region".into(), "city".into()],
319 batch_size: 8192,
320 };
321 let source = ParquetLookupSource::from_path(config).unwrap();
322
323 let key = b"US\0NYC";
325 let results = source.query(&[key.as_slice()], &[], &[]).await.unwrap();
326 assert!(results[0].is_some());
327
328 let key = b"US\0Chicago";
330 let results = source.query(&[key.as_slice()], &[], &[]).await.unwrap();
331 assert!(results[0].is_none());
332 }
333
334 #[tokio::test]
335 async fn test_empty_parquet() {
336 let schema = customer_schema();
337 let batch = RecordBatch::try_new(
339 Arc::clone(&schema),
340 vec![
341 Arc::new(Int64Array::from(Vec::<i64>::new())),
342 Arc::new(StringArray::from(Vec::<&str>::new())),
343 Arc::new(StringArray::from(Vec::<&str>::new())),
344 ],
345 )
346 .unwrap();
347 let file = write_temp_parquet(&schema, &[batch]);
348
349 let config = ParquetLookupSourceConfig {
350 path: file.path().to_string_lossy().into_owned(),
351 primary_key_columns: vec!["customer_id".into()],
352 batch_size: 8192,
353 };
354 let source = ParquetLookupSource::from_path(config).unwrap();
355
356 assert_eq!(source.estimated_row_count(), Some(0));
357
358 let results = source.query(&[b"1"], &[], &[]).await.unwrap();
359 assert!(results[0].is_none());
360 }
361
362 #[tokio::test]
363 async fn test_batch_query() {
364 let batch = customer_batch(
365 &[1, 2, 3],
366 &["Alice", "Bob", "Carol"],
367 &["gold", "silver", "bronze"],
368 );
369 let schema = customer_schema();
370 let file = write_temp_parquet(&schema, &[batch]);
371
372 let config = ParquetLookupSourceConfig {
373 path: file.path().to_string_lossy().into_owned(),
374 primary_key_columns: vec!["customer_id".into()],
375 batch_size: 8192,
376 };
377 let source = ParquetLookupSource::from_path(config).unwrap();
378
379 let keys: Vec<&[u8]> = vec![b"1", b"999", b"3"];
380 let results = source.query(&keys, &[], &[]).await.unwrap();
381 assert_eq!(results.len(), 3);
382 assert!(results[0].is_some()); assert!(results[1].is_none()); assert!(results[2].is_some()); }
386
387 #[test]
388 fn test_row_count_estimate() {
389 let batch = customer_batch(
390 &[1, 2, 3],
391 &["Alice", "Bob", "Carol"],
392 &["gold", "silver", "bronze"],
393 );
394 let schema = customer_schema();
395 let file = write_temp_parquet(&schema, &[batch]);
396
397 let config = ParquetLookupSourceConfig {
398 path: file.path().to_string_lossy().into_owned(),
399 primary_key_columns: vec!["customer_id".into()],
400 batch_size: 8192,
401 };
402 let source = ParquetLookupSource::from_path(config).unwrap();
403
404 assert_eq!(source.estimated_row_count(), Some(3));
405 }
406
407 #[test]
408 fn test_source_name() {
409 let batch = customer_batch(&[1], &["Alice"], &["gold"]);
410 let schema = customer_schema();
411 let file = write_temp_parquet(&schema, &[batch]);
412
413 let config = ParquetLookupSourceConfig {
414 path: file.path().to_string_lossy().into_owned(),
415 primary_key_columns: vec!["customer_id".into()],
416 batch_size: 8192,
417 };
418 let source = ParquetLookupSource::from_path(config).unwrap();
419
420 assert_eq!(source.source_name(), "parquet");
421 }
422
423 #[test]
424 fn test_capabilities() {
425 let batch = customer_batch(&[1], &["Alice"], &["gold"]);
426 let schema = customer_schema();
427 let file = write_temp_parquet(&schema, &[batch]);
428
429 let config = ParquetLookupSourceConfig {
430 path: file.path().to_string_lossy().into_owned(),
431 primary_key_columns: vec!["customer_id".into()],
432 batch_size: 8192,
433 };
434 let source = ParquetLookupSource::from_path(config).unwrap();
435
436 let caps = source.capabilities();
437 assert!(!caps.supports_predicate_pushdown);
438 assert!(!caps.supports_projection_pushdown);
439 assert!(caps.supports_batch_lookup);
440 }
441
442 #[tokio::test]
443 async fn test_health_check() {
444 let batch = customer_batch(&[1], &["Alice"], &["gold"]);
445 let schema = customer_schema();
446 let file = write_temp_parquet(&schema, &[batch]);
447
448 let config = ParquetLookupSourceConfig {
449 path: file.path().to_string_lossy().into_owned(),
450 primary_key_columns: vec!["customer_id".into()],
451 batch_size: 8192,
452 };
453 let source = ParquetLookupSource::from_path(config).unwrap();
454
455 assert!(source.health_check().await.is_ok());
456 }
457}