Skip to main content

laminar_connectors/schema/parquet/
decoder.rs

1//! Parquet format decoder implementing [`FormatDecoder`].
2//!
3//! Each [`RawRecord`] value contains complete Parquet file bytes.
4//! The decoder uses `ParquetRecordBatchReaderBuilder` with optional
5//! projection pushdown and row-group filtering.
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use bytes::Bytes;
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11use parquet::arrow::ProjectionMask;
12
13use crate::schema::error::{SchemaError, SchemaResult};
14use crate::schema::traits::FormatDecoder;
15use crate::schema::types::RawRecord;
16
17/// Predicate for row-group level filtering.
18///
19/// Evaluated against row-group statistics to skip entire row groups
20/// that cannot contain matching rows.
21#[derive(Debug, Clone)]
22pub enum RowGroupPredicate {
23    /// Column equals value (statistics min <= value <= max).
24    Eq {
25        /// Column name.
26        column: String,
27        /// Comparison value as string (parsed per column type).
28        value: String,
29    },
30    /// Column greater than value (statistics max > value).
31    Gt {
32        /// Column name.
33        column: String,
34        /// Comparison value.
35        value: String,
36    },
37    /// Column less than value (statistics min < value).
38    Lt {
39        /// Column name.
40        column: String,
41        /// Comparison value.
42        value: String,
43    },
44    /// Column in range \[low, high\].
45    Between {
46        /// Column name.
47        column: String,
48        /// Low bound (inclusive).
49        low: String,
50        /// High bound (inclusive).
51        high: String,
52    },
53    /// Logical AND of predicates.
54    And(Vec<RowGroupPredicate>),
55    /// Logical OR of predicates.
56    Or(Vec<RowGroupPredicate>),
57}
58
59/// Configuration for the Parquet decoder.
60#[derive(Debug, Clone)]
61pub struct ParquetDecoderConfig {
62    /// Column indices to project (empty = all columns).
63    pub projection_indices: Vec<usize>,
64
65    /// Row-group indices to read (empty = all row groups).
66    pub row_group_indices: Vec<usize>,
67
68    /// Maximum rows per `RecordBatch`.
69    pub batch_size: usize,
70
71    /// Optional row-group predicate for statistics-based filtering.
72    pub predicate: Option<RowGroupPredicate>,
73}
74
75impl Default for ParquetDecoderConfig {
76    fn default() -> Self {
77        Self {
78            projection_indices: Vec::new(),
79            row_group_indices: Vec::new(),
80            batch_size: 8192,
81            predicate: None,
82        }
83    }
84}
85
86impl ParquetDecoderConfig {
87    /// Sets the projection column indices.
88    #[must_use]
89    pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
90        self.projection_indices = indices;
91        self
92    }
93
94    /// Sets the row-group indices to read.
95    #[must_use]
96    pub fn with_row_groups(mut self, indices: Vec<usize>) -> Self {
97        self.row_group_indices = indices;
98        self
99    }
100
101    /// Sets the batch size.
102    #[must_use]
103    pub fn with_batch_size(mut self, size: usize) -> Self {
104        self.batch_size = size;
105        self
106    }
107
108    /// Sets a row-group predicate.
109    #[must_use]
110    pub fn with_predicate(mut self, predicate: RowGroupPredicate) -> Self {
111        self.predicate = Some(predicate);
112        self
113    }
114}
115
116/// Decodes Parquet file bytes into Arrow `RecordBatch`es.
117///
118/// # Ring Placement
119///
120/// - **Ring 1**: `decode_batch()` — Parquet read + Arrow conversion
121/// - **Ring 2**: Construction — one-time schema validation
122pub struct ParquetDecoder {
123    /// Output schema (frozen at construction).
124    schema: SchemaRef,
125    /// Decoder configuration.
126    config: ParquetDecoderConfig,
127}
128
129impl std::fmt::Debug for ParquetDecoder {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("ParquetDecoder")
132            .field("schema", &self.schema)
133            .field("config", &self.config)
134            .finish()
135    }
136}
137
138impl ParquetDecoder {
139    /// Creates a new Parquet decoder for the given Arrow schema.
140    #[must_use]
141    pub fn new(schema: SchemaRef) -> Self {
142        Self::with_config(schema, ParquetDecoderConfig::default())
143    }
144
145    /// Creates a new Parquet decoder with custom configuration.
146    #[must_use]
147    pub fn with_config(schema: SchemaRef, config: ParquetDecoderConfig) -> Self {
148        Self { schema, config }
149    }
150}
151
152impl FormatDecoder for ParquetDecoder {
153    fn output_schema(&self) -> SchemaRef {
154        self.schema.clone()
155    }
156
157    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
158        if records.is_empty() {
159            return Ok(RecordBatch::new_empty(self.schema.clone()));
160        }
161
162        let mut all_batches: Vec<RecordBatch> = Vec::new();
163
164        for record in records {
165            let bytes = Bytes::copy_from_slice(&record.value);
166            let mut builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
167                .map_err(|e| SchemaError::DecodeError(format!("Parquet reader init error: {e}")))?;
168
169            // Apply batch size.
170            builder = builder.with_batch_size(self.config.batch_size);
171
172            // Apply projection if specified.
173            if !self.config.projection_indices.is_empty() {
174                let parquet_schema = builder.parquet_schema().clone();
175                let mask = ProjectionMask::roots(
176                    &parquet_schema,
177                    self.config.projection_indices.iter().copied(),
178                );
179                builder = builder.with_projection(mask);
180            }
181
182            // Apply row-group selection if specified.
183            if !self.config.row_group_indices.is_empty() {
184                builder = builder.with_row_groups(self.config.row_group_indices.clone());
185            }
186
187            let reader = builder.build().map_err(|e| {
188                SchemaError::DecodeError(format!("Parquet reader build error: {e}"))
189            })?;
190
191            for batch_result in reader {
192                let batch = batch_result
193                    .map_err(|e| SchemaError::DecodeError(format!("Parquet read error: {e}")))?;
194                all_batches.push(batch);
195            }
196        }
197
198        if all_batches.is_empty() {
199            return Ok(RecordBatch::new_empty(self.schema.clone()));
200        }
201
202        if all_batches.len() == 1 {
203            return Ok(all_batches.into_iter().next().unwrap());
204        }
205
206        // Concatenate all batches.
207        arrow_select::concat::concat_batches(&self.schema, &all_batches)
208            .map_err(|e| SchemaError::DecodeError(format!("batch concat error: {e}")))
209    }
210
211    fn format_name(&self) -> &'static str {
212        "parquet"
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use std::sync::Arc;
219
220    use super::*;
221    use arrow_array::cast::AsArray;
222    use arrow_array::{Int64Array, StringArray};
223    use arrow_schema::{DataType, Field, Schema};
224    use parquet::arrow::ArrowWriter;
225
226    /// Helper: write a `RecordBatch` to Parquet bytes.
227    fn to_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
228        let mut buf = Vec::new();
229        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
230        writer.write(batch).unwrap();
231        writer.close().unwrap();
232        buf
233    }
234
235    fn make_schema() -> SchemaRef {
236        Arc::new(Schema::new(vec![
237            Field::new("id", DataType::Int64, false),
238            Field::new("name", DataType::Utf8, true),
239        ]))
240    }
241
242    fn make_batch(schema: &SchemaRef, ids: Vec<i64>, names: Vec<&str>) -> RecordBatch {
243        RecordBatch::try_new(
244            schema.clone(),
245            vec![
246                Arc::new(Int64Array::from(ids)),
247                Arc::new(StringArray::from(names)),
248            ],
249        )
250        .unwrap()
251    }
252
253    #[test]
254    fn test_decode_empty_batch() {
255        let schema = make_schema();
256        let decoder = ParquetDecoder::new(schema.clone());
257        let batch = decoder.decode_batch(&[]).unwrap();
258        assert_eq!(batch.num_rows(), 0);
259    }
260
261    #[test]
262    fn test_decode_single_record() {
263        let schema = make_schema();
264        let input = make_batch(&schema, vec![1, 2, 3], vec!["a", "b", "c"]);
265        let parquet_bytes = to_parquet_bytes(&input);
266
267        let decoder = ParquetDecoder::new(schema);
268        let record = RawRecord::new(parquet_bytes);
269        let output = decoder.decode_batch(&[record]).unwrap();
270
271        assert_eq!(output.num_rows(), 3);
272        let ids = output
273            .column(0)
274            .as_primitive::<arrow_array::types::Int64Type>();
275        assert_eq!(ids.value(0), 1);
276        assert_eq!(ids.value(1), 2);
277        assert_eq!(ids.value(2), 3);
278        let names = output.column(1).as_string::<i32>();
279        assert_eq!(names.value(0), "a");
280    }
281
282    #[test]
283    fn test_decode_multiple_records() {
284        let schema = make_schema();
285        let b1 = make_batch(&schema, vec![1, 2], vec!["x", "y"]);
286        let b2 = make_batch(&schema, vec![3, 4], vec!["z", "w"]);
287
288        let r1 = RawRecord::new(to_parquet_bytes(&b1));
289        let r2 = RawRecord::new(to_parquet_bytes(&b2));
290
291        let decoder = ParquetDecoder::new(schema);
292        let output = decoder.decode_batch(&[r1, r2]).unwrap();
293        assert_eq!(output.num_rows(), 4);
294    }
295
296    #[test]
297    fn test_decode_with_batch_size() {
298        let schema = make_schema();
299        let input = make_batch(&schema, vec![1, 2, 3], vec!["a", "b", "c"]);
300        let parquet_bytes = to_parquet_bytes(&input);
301
302        let config = ParquetDecoderConfig::default().with_batch_size(1);
303        let decoder = ParquetDecoder::with_config(schema, config);
304        let record = RawRecord::new(parquet_bytes);
305        let output = decoder.decode_batch(&[record]).unwrap();
306
307        // All rows should still be present (batch_size only affects internal
308        // chunking, concat merges them back).
309        assert_eq!(output.num_rows(), 3);
310    }
311
312    #[test]
313    fn test_format_name() {
314        let schema = make_schema();
315        let decoder = ParquetDecoder::new(schema);
316        assert_eq!(decoder.format_name(), "parquet");
317    }
318
319    #[test]
320    fn test_decode_one() {
321        let schema = make_schema();
322        let input = make_batch(&schema, vec![42], vec!["hello"]);
323        let parquet_bytes = to_parquet_bytes(&input);
324
325        let decoder = ParquetDecoder::new(schema);
326        let record = RawRecord::new(parquet_bytes);
327        let output = decoder.decode_one(&record).unwrap();
328        assert_eq!(output.num_rows(), 1);
329        assert_eq!(
330            output
331                .column(0)
332                .as_primitive::<arrow_array::types::Int64Type>()
333                .value(0),
334            42
335        );
336    }
337
338    #[test]
339    fn test_decode_invalid_bytes() {
340        let schema = make_schema();
341        let decoder = ParquetDecoder::new(schema);
342        let record = RawRecord::new(b"not parquet".to_vec());
343        let result = decoder.decode_batch(&[record]);
344        assert!(result.is_err());
345        assert!(result.unwrap_err().to_string().contains("Parquet"));
346    }
347
348    #[test]
349    fn test_config_builder() {
350        let config = ParquetDecoderConfig::default()
351            .with_projection(vec![0, 2])
352            .with_row_groups(vec![0])
353            .with_batch_size(4096)
354            .with_predicate(RowGroupPredicate::Eq {
355                column: "id".into(),
356                value: "42".into(),
357            });
358
359        assert_eq!(config.projection_indices, vec![0, 2]);
360        assert_eq!(config.row_group_indices, vec![0]);
361        assert_eq!(config.batch_size, 4096);
362        assert!(config.predicate.is_some());
363    }
364}