laminar_connectors/schema/parquet/
decoder.rs1use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use bytes::Bytes;
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11use parquet::arrow::ProjectionMask;
12
13use crate::schema::error::{SchemaError, SchemaResult};
14use crate::schema::traits::FormatDecoder;
15use crate::schema::types::RawRecord;
16
17#[derive(Debug, Clone)]
22pub enum RowGroupPredicate {
23 Eq {
25 column: String,
27 value: String,
29 },
30 Gt {
32 column: String,
34 value: String,
36 },
37 Lt {
39 column: String,
41 value: String,
43 },
44 Between {
46 column: String,
48 low: String,
50 high: String,
52 },
53 And(Vec<RowGroupPredicate>),
55 Or(Vec<RowGroupPredicate>),
57}
58
59#[derive(Debug, Clone)]
61pub struct ParquetDecoderConfig {
62 pub projection_indices: Vec<usize>,
64
65 pub row_group_indices: Vec<usize>,
67
68 pub batch_size: usize,
70
71 pub predicate: Option<RowGroupPredicate>,
73}
74
75impl Default for ParquetDecoderConfig {
76 fn default() -> Self {
77 Self {
78 projection_indices: Vec::new(),
79 row_group_indices: Vec::new(),
80 batch_size: 8192,
81 predicate: None,
82 }
83 }
84}
85
86impl ParquetDecoderConfig {
87 #[must_use]
89 pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
90 self.projection_indices = indices;
91 self
92 }
93
94 #[must_use]
96 pub fn with_row_groups(mut self, indices: Vec<usize>) -> Self {
97 self.row_group_indices = indices;
98 self
99 }
100
101 #[must_use]
103 pub fn with_batch_size(mut self, size: usize) -> Self {
104 self.batch_size = size;
105 self
106 }
107
108 #[must_use]
110 pub fn with_predicate(mut self, predicate: RowGroupPredicate) -> Self {
111 self.predicate = Some(predicate);
112 self
113 }
114}
115
116pub struct ParquetDecoder {
123 schema: SchemaRef,
125 config: ParquetDecoderConfig,
127}
128
129impl std::fmt::Debug for ParquetDecoder {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("ParquetDecoder")
132 .field("schema", &self.schema)
133 .field("config", &self.config)
134 .finish()
135 }
136}
137
138impl ParquetDecoder {
139 #[must_use]
141 pub fn new(schema: SchemaRef) -> Self {
142 Self::with_config(schema, ParquetDecoderConfig::default())
143 }
144
145 #[must_use]
147 pub fn with_config(schema: SchemaRef, config: ParquetDecoderConfig) -> Self {
148 Self { schema, config }
149 }
150}
151
152impl FormatDecoder for ParquetDecoder {
153 fn output_schema(&self) -> SchemaRef {
154 self.schema.clone()
155 }
156
157 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
158 if records.is_empty() {
159 return Ok(RecordBatch::new_empty(self.schema.clone()));
160 }
161
162 let mut all_batches: Vec<RecordBatch> = Vec::new();
163
164 for record in records {
165 let bytes = Bytes::copy_from_slice(&record.value);
166 let mut builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
167 .map_err(|e| SchemaError::DecodeError(format!("Parquet reader init error: {e}")))?;
168
169 builder = builder.with_batch_size(self.config.batch_size);
171
172 if !self.config.projection_indices.is_empty() {
174 let parquet_schema = builder.parquet_schema().clone();
175 let mask = ProjectionMask::roots(
176 &parquet_schema,
177 self.config.projection_indices.iter().copied(),
178 );
179 builder = builder.with_projection(mask);
180 }
181
182 if !self.config.row_group_indices.is_empty() {
184 builder = builder.with_row_groups(self.config.row_group_indices.clone());
185 }
186
187 let reader = builder.build().map_err(|e| {
188 SchemaError::DecodeError(format!("Parquet reader build error: {e}"))
189 })?;
190
191 for batch_result in reader {
192 let batch = batch_result
193 .map_err(|e| SchemaError::DecodeError(format!("Parquet read error: {e}")))?;
194 all_batches.push(batch);
195 }
196 }
197
198 if all_batches.is_empty() {
199 return Ok(RecordBatch::new_empty(self.schema.clone()));
200 }
201
202 if all_batches.len() == 1 {
203 return Ok(all_batches.into_iter().next().unwrap());
204 }
205
206 arrow_select::concat::concat_batches(&self.schema, &all_batches)
208 .map_err(|e| SchemaError::DecodeError(format!("batch concat error: {e}")))
209 }
210
211 fn format_name(&self) -> &'static str {
212 "parquet"
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use std::sync::Arc;
219
220 use super::*;
221 use arrow_array::cast::AsArray;
222 use arrow_array::{Int64Array, StringArray};
223 use arrow_schema::{DataType, Field, Schema};
224 use parquet::arrow::ArrowWriter;
225
226 fn to_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
228 let mut buf = Vec::new();
229 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
230 writer.write(batch).unwrap();
231 writer.close().unwrap();
232 buf
233 }
234
235 fn make_schema() -> SchemaRef {
236 Arc::new(Schema::new(vec![
237 Field::new("id", DataType::Int64, false),
238 Field::new("name", DataType::Utf8, true),
239 ]))
240 }
241
242 fn make_batch(schema: &SchemaRef, ids: Vec<i64>, names: Vec<&str>) -> RecordBatch {
243 RecordBatch::try_new(
244 schema.clone(),
245 vec![
246 Arc::new(Int64Array::from(ids)),
247 Arc::new(StringArray::from(names)),
248 ],
249 )
250 .unwrap()
251 }
252
253 #[test]
254 fn test_decode_empty_batch() {
255 let schema = make_schema();
256 let decoder = ParquetDecoder::new(schema.clone());
257 let batch = decoder.decode_batch(&[]).unwrap();
258 assert_eq!(batch.num_rows(), 0);
259 }
260
261 #[test]
262 fn test_decode_single_record() {
263 let schema = make_schema();
264 let input = make_batch(&schema, vec![1, 2, 3], vec!["a", "b", "c"]);
265 let parquet_bytes = to_parquet_bytes(&input);
266
267 let decoder = ParquetDecoder::new(schema);
268 let record = RawRecord::new(parquet_bytes);
269 let output = decoder.decode_batch(&[record]).unwrap();
270
271 assert_eq!(output.num_rows(), 3);
272 let ids = output
273 .column(0)
274 .as_primitive::<arrow_array::types::Int64Type>();
275 assert_eq!(ids.value(0), 1);
276 assert_eq!(ids.value(1), 2);
277 assert_eq!(ids.value(2), 3);
278 let names = output.column(1).as_string::<i32>();
279 assert_eq!(names.value(0), "a");
280 }
281
282 #[test]
283 fn test_decode_multiple_records() {
284 let schema = make_schema();
285 let b1 = make_batch(&schema, vec![1, 2], vec!["x", "y"]);
286 let b2 = make_batch(&schema, vec![3, 4], vec!["z", "w"]);
287
288 let r1 = RawRecord::new(to_parquet_bytes(&b1));
289 let r2 = RawRecord::new(to_parquet_bytes(&b2));
290
291 let decoder = ParquetDecoder::new(schema);
292 let output = decoder.decode_batch(&[r1, r2]).unwrap();
293 assert_eq!(output.num_rows(), 4);
294 }
295
296 #[test]
297 fn test_decode_with_batch_size() {
298 let schema = make_schema();
299 let input = make_batch(&schema, vec![1, 2, 3], vec!["a", "b", "c"]);
300 let parquet_bytes = to_parquet_bytes(&input);
301
302 let config = ParquetDecoderConfig::default().with_batch_size(1);
303 let decoder = ParquetDecoder::with_config(schema, config);
304 let record = RawRecord::new(parquet_bytes);
305 let output = decoder.decode_batch(&[record]).unwrap();
306
307 assert_eq!(output.num_rows(), 3);
310 }
311
312 #[test]
313 fn test_format_name() {
314 let schema = make_schema();
315 let decoder = ParquetDecoder::new(schema);
316 assert_eq!(decoder.format_name(), "parquet");
317 }
318
319 #[test]
320 fn test_decode_one() {
321 let schema = make_schema();
322 let input = make_batch(&schema, vec![42], vec!["hello"]);
323 let parquet_bytes = to_parquet_bytes(&input);
324
325 let decoder = ParquetDecoder::new(schema);
326 let record = RawRecord::new(parquet_bytes);
327 let output = decoder.decode_one(&record).unwrap();
328 assert_eq!(output.num_rows(), 1);
329 assert_eq!(
330 output
331 .column(0)
332 .as_primitive::<arrow_array::types::Int64Type>()
333 .value(0),
334 42
335 );
336 }
337
338 #[test]
339 fn test_decode_invalid_bytes() {
340 let schema = make_schema();
341 let decoder = ParquetDecoder::new(schema);
342 let record = RawRecord::new(b"not parquet".to_vec());
343 let result = decoder.decode_batch(&[record]);
344 assert!(result.is_err());
345 assert!(result.unwrap_err().to_string().contains("Parquet"));
346 }
347
348 #[test]
349 fn test_config_builder() {
350 let config = ParquetDecoderConfig::default()
351 .with_projection(vec![0, 2])
352 .with_row_groups(vec![0])
353 .with_batch_size(4096)
354 .with_predicate(RowGroupPredicate::Eq {
355 column: "id".into(),
356 value: "42".into(),
357 });
358
359 assert_eq!(config.projection_indices, vec![0, 2]);
360 assert_eq!(config.row_group_indices, vec![0]);
361 assert_eq!(config.batch_size, 4096);
362 assert!(config.predicate.is_some());
363 }
364}