Skip to main content

laminar_core/compiler/
batch_reader.rs

1//! Arrow `RecordBatch` → [`EventRow`] bridge for Ring 0 input.
2//!
3//! [`BatchRowReader`] decomposes columnar Arrow data into row-oriented
4//! [`EventRow`]s allocated in a bump arena. This is the reverse of
5//! [`RowBatchBridge`](super::bridge::RowBatchBridge), which converts
6//! rows *back* to Arrow `RecordBatch`.
7//!
8//! # Usage
9//!
10//! ```
11//! use std::sync::Arc;
12//! use arrow_array::{Int64Array, Float64Array, RecordBatch};
13//! use arrow_schema::{DataType, Field, Schema};
14//! use bumpalo::Bump;
15//! use laminar_core::compiler::row::RowSchema;
16//! use laminar_core::compiler::batch_reader::BatchRowReader;
17//!
18//! let schema = Arc::new(Schema::new(vec![
19//!     Field::new("ts", DataType::Int64, false),
20//!     Field::new("val", DataType::Float64, true),
21//! ]));
22//! let batch = RecordBatch::try_new(
23//!     Arc::clone(&schema),
24//!     vec![
25//!         Arc::new(Int64Array::from(vec![1000, 2000])),
26//!         Arc::new(Float64Array::from(vec![Some(1.5), None])),
27//!     ],
28//! ).unwrap();
29//!
30//! let row_schema = RowSchema::from_arrow(&schema).unwrap();
31//! let reader = BatchRowReader::new(&batch, &row_schema);
32//! assert_eq!(reader.row_count(), 2);
33//!
34//! let arena = Bump::new();
35//! let row0 = reader.read_row(0, &arena);
36//! assert_eq!(row0.get_i64(0), 1000);
37//! assert!(!row0.is_null(1));
38//!
39//! let row1 = reader.read_row(1, &arena);
40//! assert!(row1.is_null(1));
41//! ```
42
43use arrow_array::cast::AsArray;
44use arrow_array::types::TimestampMicrosecondType;
45use arrow_array::{
46    Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
47    Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array, UInt32Array, UInt64Array,
48    UInt8Array,
49};
50
51use bumpalo::Bump;
52
53use super::row::{EventRow, FieldType, MutableEventRow, RowSchema};
54
55/// Pre-downcast column accessor to avoid per-row `downcast_ref` overhead.
56enum ColumnAccessor<'a> {
57    Bool(&'a BooleanArray),
58    Int8(&'a Int8Array),
59    Int16(&'a Int16Array),
60    Int32(&'a Int32Array),
61    Int64(&'a Int64Array),
62    UInt8(&'a UInt8Array),
63    UInt16(&'a UInt16Array),
64    UInt32(&'a UInt32Array),
65    UInt64(&'a UInt64Array),
66    Float32(&'a Float32Array),
67    Float64(&'a Float64Array),
68    TimestampMicros(&'a arrow_array::TimestampMicrosecondArray),
69    Utf8(&'a StringArray),
70    Binary(&'a BinaryArray),
71}
72
73/// Reads rows from an Arrow [`RecordBatch`] into [`EventRow`] format.
74///
75/// Each call to [`read_row`](Self::read_row) decomposes a single row from the
76/// columnar batch into a row-oriented [`EventRow`] allocated in a bump arena.
77/// Column downcasts are performed once at construction for amortized overhead.
78pub struct BatchRowReader<'a> {
79    batch: &'a RecordBatch,
80    schema: &'a RowSchema,
81    columns: Vec<ColumnAccessor<'a>>,
82    /// Pre-computed variable-length capacity estimate (64 bytes per var-len field).
83    var_capacity: usize,
84}
85
86impl<'a> BatchRowReader<'a> {
87    /// Creates a reader for the given batch.
88    ///
89    /// # Panics
90    ///
91    /// Panics if the batch schema doesn't match the [`RowSchema`] field types
92    /// (field count mismatch or incompatible data types).
93    #[must_use]
94    pub fn new(batch: &'a RecordBatch, schema: &'a RowSchema) -> Self {
95        assert_eq!(
96            batch.num_columns(),
97            schema.field_count(),
98            "BatchRowReader: column count mismatch: batch has {}, schema has {}",
99            batch.num_columns(),
100            schema.field_count()
101        );
102
103        let columns: Vec<ColumnAccessor<'a>> = (0..schema.field_count())
104            .map(|i| downcast_column(batch, schema, i))
105            .collect();
106
107        let var_capacity = schema.fields().iter().filter(|f| f.is_variable).count() * 64;
108
109        Self {
110            batch,
111            schema,
112            columns,
113            var_capacity,
114        }
115    }
116
117    /// Returns the number of rows in the batch.
118    #[must_use]
119    pub fn row_count(&self) -> usize {
120        self.batch.num_rows()
121    }
122
123    /// Reads row `row_idx` into an [`EventRow`] allocated in the given arena.
124    ///
125    /// # Panics
126    ///
127    /// Panics if `row_idx >= row_count()`.
128    #[must_use]
129    pub fn read_row<'b>(&self, row_idx: usize, arena: &'b Bump) -> EventRow<'b>
130    where
131        'a: 'b,
132    {
133        debug_assert!(
134            row_idx < self.batch.num_rows(),
135            "row index {row_idx} out of bounds (batch has {} rows)",
136            self.batch.num_rows()
137        );
138
139        let mut row = MutableEventRow::new_in(arena, self.schema, self.var_capacity);
140
141        for (field_idx, col) in self.columns.iter().enumerate() {
142            read_field(&mut row, col, field_idx, row_idx);
143        }
144
145        row.freeze()
146    }
147}
148
149/// Downcasts a batch column to the typed `ColumnAccessor` matching the schema.
150fn downcast_column<'a>(
151    batch: &'a RecordBatch,
152    schema: &RowSchema,
153    col_idx: usize,
154) -> ColumnAccessor<'a> {
155    let field_type = schema.field(col_idx).field_type;
156    let col = batch.column(col_idx);
157
158    match field_type {
159        FieldType::Bool => ColumnAccessor::Bool(col.as_boolean()),
160        FieldType::Int8 => ColumnAccessor::Int8(col.as_primitive::<arrow_array::types::Int8Type>()),
161        FieldType::Int16 => {
162            ColumnAccessor::Int16(col.as_primitive::<arrow_array::types::Int16Type>())
163        }
164        FieldType::Int32 => {
165            ColumnAccessor::Int32(col.as_primitive::<arrow_array::types::Int32Type>())
166        }
167        FieldType::Int64 => {
168            ColumnAccessor::Int64(col.as_primitive::<arrow_array::types::Int64Type>())
169        }
170        FieldType::UInt8 => {
171            ColumnAccessor::UInt8(col.as_primitive::<arrow_array::types::UInt8Type>())
172        }
173        FieldType::UInt16 => {
174            ColumnAccessor::UInt16(col.as_primitive::<arrow_array::types::UInt16Type>())
175        }
176        FieldType::UInt32 => {
177            ColumnAccessor::UInt32(col.as_primitive::<arrow_array::types::UInt32Type>())
178        }
179        FieldType::UInt64 => {
180            ColumnAccessor::UInt64(col.as_primitive::<arrow_array::types::UInt64Type>())
181        }
182        FieldType::Float32 => {
183            ColumnAccessor::Float32(col.as_primitive::<arrow_array::types::Float32Type>())
184        }
185        FieldType::Float64 => {
186            ColumnAccessor::Float64(col.as_primitive::<arrow_array::types::Float64Type>())
187        }
188        FieldType::TimestampMicros => {
189            ColumnAccessor::TimestampMicros(col.as_primitive::<TimestampMicrosecondType>())
190        }
191        FieldType::Utf8 => ColumnAccessor::Utf8(col.as_string::<i32>()),
192        FieldType::Binary => ColumnAccessor::Binary(col.as_binary::<i32>()),
193    }
194}
195
196/// Reads one field from a column array into a `MutableEventRow`.
197#[allow(clippy::too_many_lines)]
198fn read_field(
199    row: &mut MutableEventRow<'_>,
200    col: &ColumnAccessor<'_>,
201    field_idx: usize,
202    row_idx: usize,
203) {
204    match col {
205        ColumnAccessor::Bool(arr) => {
206            if arr.is_null(row_idx) {
207                row.set_null(field_idx, true);
208            } else {
209                row.set_bool(field_idx, arr.value(row_idx));
210            }
211        }
212        ColumnAccessor::Int8(arr) => {
213            if arr.is_null(row_idx) {
214                row.set_null(field_idx, true);
215            } else {
216                row.set_i8(field_idx, arr.value(row_idx));
217            }
218        }
219        ColumnAccessor::Int16(arr) => {
220            if arr.is_null(row_idx) {
221                row.set_null(field_idx, true);
222            } else {
223                row.set_i16(field_idx, arr.value(row_idx));
224            }
225        }
226        ColumnAccessor::Int32(arr) => {
227            if arr.is_null(row_idx) {
228                row.set_null(field_idx, true);
229            } else {
230                row.set_i32(field_idx, arr.value(row_idx));
231            }
232        }
233        ColumnAccessor::Int64(arr) => {
234            if arr.is_null(row_idx) {
235                row.set_null(field_idx, true);
236            } else {
237                row.set_i64(field_idx, arr.value(row_idx));
238            }
239        }
240        ColumnAccessor::UInt8(arr) => {
241            if arr.is_null(row_idx) {
242                row.set_null(field_idx, true);
243            } else {
244                row.set_u8(field_idx, arr.value(row_idx));
245            }
246        }
247        ColumnAccessor::UInt16(arr) => {
248            if arr.is_null(row_idx) {
249                row.set_null(field_idx, true);
250            } else {
251                row.set_u16(field_idx, arr.value(row_idx));
252            }
253        }
254        ColumnAccessor::UInt32(arr) => {
255            if arr.is_null(row_idx) {
256                row.set_null(field_idx, true);
257            } else {
258                row.set_u32(field_idx, arr.value(row_idx));
259            }
260        }
261        ColumnAccessor::UInt64(arr) => {
262            if arr.is_null(row_idx) {
263                row.set_null(field_idx, true);
264            } else {
265                row.set_u64(field_idx, arr.value(row_idx));
266            }
267        }
268        ColumnAccessor::Float32(arr) => {
269            if arr.is_null(row_idx) {
270                row.set_null(field_idx, true);
271            } else {
272                row.set_f32(field_idx, arr.value(row_idx));
273            }
274        }
275        ColumnAccessor::Float64(arr) => {
276            if arr.is_null(row_idx) {
277                row.set_null(field_idx, true);
278            } else {
279                row.set_f64(field_idx, arr.value(row_idx));
280            }
281        }
282        ColumnAccessor::TimestampMicros(arr) => {
283            if arr.is_null(row_idx) {
284                row.set_null(field_idx, true);
285            } else {
286                row.set_i64(field_idx, arr.value(row_idx));
287            }
288        }
289        ColumnAccessor::Utf8(arr) => {
290            if arr.is_null(row_idx) {
291                row.set_null(field_idx, true);
292            } else {
293                row.set_str(field_idx, arr.value(row_idx));
294            }
295        }
296        ColumnAccessor::Binary(arr) => {
297            if arr.is_null(row_idx) {
298                row.set_null(field_idx, true);
299            } else {
300                row.set_bytes(field_idx, arr.value(row_idx));
301            }
302        }
303    }
304}
305
306#[cfg(test)]
307#[allow(clippy::approx_constant)]
308mod tests {
309    use super::*;
310    use crate::compiler::bridge::RowBatchBridge;
311    use arrow_array::builder::TimestampMicrosecondBuilder;
312    use arrow_array::{
313        Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
314        Int64Array, Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
315    };
316    use arrow_schema::{DataType, Field, Schema, TimeUnit};
317    use std::sync::Arc;
318
319    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> Arc<Schema> {
320        Arc::new(Schema::new(
321            fields
322                .into_iter()
323                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
324                .collect::<Vec<_>>(),
325        ))
326    }
327
328    // ── Construction tests ──────────────────────────────────────────
329
330    #[test]
331    fn new_valid_schema() {
332        let schema = make_schema(vec![
333            ("id", DataType::Int64, false),
334            ("val", DataType::Float64, true),
335        ]);
336        let batch = RecordBatch::try_new(
337            Arc::clone(&schema),
338            vec![
339                Arc::new(Int64Array::from(vec![1, 2, 3])),
340                Arc::new(Float64Array::from(vec![Some(1.0), None, Some(3.0)])),
341            ],
342        )
343        .unwrap();
344        let row_schema = RowSchema::from_arrow(&schema).unwrap();
345        let reader = BatchRowReader::new(&batch, &row_schema);
346        assert_eq!(reader.row_count(), 3);
347    }
348
349    #[test]
350    #[should_panic(expected = "column count mismatch")]
351    fn new_column_count_mismatch() {
352        let batch_schema = make_schema(vec![("a", DataType::Int64, false)]);
353        let row_schema_arrow = make_schema(vec![
354            ("a", DataType::Int64, false),
355            ("b", DataType::Int64, false),
356        ]);
357        let batch =
358            RecordBatch::try_new(batch_schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
359        let row_schema = RowSchema::from_arrow(&row_schema_arrow).unwrap();
360        let _ = BatchRowReader::new(&batch, &row_schema);
361    }
362
363    #[test]
364    fn row_count_empty_batch() {
365        let schema = make_schema(vec![("x", DataType::Int64, false)]);
366        let batch = RecordBatch::try_new(
367            Arc::clone(&schema),
368            vec![Arc::new(Int64Array::from(Vec::<i64>::new()))],
369        )
370        .unwrap();
371        let row_schema = RowSchema::from_arrow(&schema).unwrap();
372        let reader = BatchRowReader::new(&batch, &row_schema);
373        assert_eq!(reader.row_count(), 0);
374    }
375
376    // ── Fixed-type round-trip tests ─────────────────────────────────
377
378    #[test]
379    fn roundtrip_int64_float64() {
380        let schema = make_schema(vec![
381            ("ts", DataType::Int64, false),
382            ("val", DataType::Float64, false),
383        ]);
384        let batch = RecordBatch::try_new(
385            Arc::clone(&schema),
386            vec![
387                Arc::new(Int64Array::from(vec![1000, 2000])),
388                Arc::new(Float64Array::from(vec![3.14, 2.718])),
389            ],
390        )
391        .unwrap();
392        let row_schema = RowSchema::from_arrow(&schema).unwrap();
393        let reader = BatchRowReader::new(&batch, &row_schema);
394        let arena = Bump::new();
395
396        let row0 = reader.read_row(0, &arena);
397        assert_eq!(row0.get_i64(0), 1000);
398        assert!((row0.get_f64(1) - 3.14).abs() < f64::EPSILON);
399
400        let row1 = reader.read_row(1, &arena);
401        assert_eq!(row1.get_i64(0), 2000);
402        assert!((row1.get_f64(1) - 2.718).abs() < f64::EPSILON);
403    }
404
405    #[test]
406    fn roundtrip_all_integer_types() {
407        let schema = make_schema(vec![
408            ("a", DataType::Int8, false),
409            ("b", DataType::Int16, false),
410            ("c", DataType::Int32, false),
411            ("d", DataType::Int64, false),
412            ("e", DataType::UInt8, false),
413            ("f", DataType::UInt16, false),
414            ("g", DataType::UInt32, false),
415            ("h", DataType::UInt64, false),
416        ]);
417        let batch = RecordBatch::try_new(
418            Arc::clone(&schema),
419            vec![
420                Arc::new(Int8Array::from(vec![-42])),
421                Arc::new(Int16Array::from(vec![-1000])),
422                Arc::new(Int32Array::from(vec![100_000])),
423                Arc::new(Int64Array::from(vec![i64::MAX])),
424                Arc::new(UInt8Array::from(vec![255])),
425                Arc::new(UInt16Array::from(vec![60_000])),
426                Arc::new(UInt32Array::from(vec![u32::MAX])),
427                Arc::new(UInt64Array::from(vec![u64::MAX])),
428            ],
429        )
430        .unwrap();
431        let row_schema = RowSchema::from_arrow(&schema).unwrap();
432        let reader = BatchRowReader::new(&batch, &row_schema);
433        let arena = Bump::new();
434
435        let row = reader.read_row(0, &arena);
436        assert_eq!(row.get_i8(0), -42);
437        assert_eq!(row.get_i16(1), -1000);
438        assert_eq!(row.get_i32(2), 100_000);
439        assert_eq!(row.get_i64(3), i64::MAX);
440        assert_eq!(row.get_u8(4), 255);
441        assert_eq!(row.get_u16(5), 60_000);
442        assert_eq!(row.get_u32(6), u32::MAX);
443        assert_eq!(row.get_u64(7), u64::MAX);
444    }
445
446    #[test]
447    fn roundtrip_bool() {
448        let schema = make_schema(vec![("flag", DataType::Boolean, false)]);
449        let batch = RecordBatch::try_new(
450            Arc::clone(&schema),
451            vec![Arc::new(BooleanArray::from(vec![true, false, true]))],
452        )
453        .unwrap();
454        let row_schema = RowSchema::from_arrow(&schema).unwrap();
455        let reader = BatchRowReader::new(&batch, &row_schema);
456        let arena = Bump::new();
457
458        assert!(reader.read_row(0, &arena).get_bool(0));
459        assert!(!reader.read_row(1, &arena).get_bool(0));
460        assert!(reader.read_row(2, &arena).get_bool(0));
461    }
462
463    #[test]
464    fn roundtrip_float32() {
465        let schema = make_schema(vec![("f", DataType::Float32, false)]);
466        let batch = RecordBatch::try_new(
467            Arc::clone(&schema),
468            vec![Arc::new(Float32Array::from(vec![std::f32::consts::PI]))],
469        )
470        .unwrap();
471        let row_schema = RowSchema::from_arrow(&schema).unwrap();
472        let reader = BatchRowReader::new(&batch, &row_schema);
473        let arena = Bump::new();
474
475        let row = reader.read_row(0, &arena);
476        assert!((row.get_f32(0) - std::f32::consts::PI).abs() < f32::EPSILON);
477    }
478
479    #[test]
480    fn roundtrip_timestamp_micros() {
481        let schema = make_schema(vec![(
482            "ts",
483            DataType::Timestamp(TimeUnit::Microsecond, None),
484            false,
485        )]);
486        let ts_val = 1_706_000_000_000_000_i64;
487        let mut builder = TimestampMicrosecondBuilder::with_capacity(1);
488        builder.append_value(ts_val);
489        let batch =
490            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(builder.finish())]).unwrap();
491        let row_schema = RowSchema::from_arrow(&schema).unwrap();
492        let reader = BatchRowReader::new(&batch, &row_schema);
493        let arena = Bump::new();
494
495        let row = reader.read_row(0, &arena);
496        assert_eq!(row.get_i64(0), ts_val);
497    }
498
499    // ── Variable-length type tests ──────────────────────────────────
500
501    #[test]
502    fn roundtrip_utf8() {
503        let schema = make_schema(vec![("name", DataType::Utf8, false)]);
504        let batch = RecordBatch::try_new(
505            Arc::clone(&schema),
506            vec![Arc::new(StringArray::from(vec!["hello", "world"]))],
507        )
508        .unwrap();
509        let row_schema = RowSchema::from_arrow(&schema).unwrap();
510        let reader = BatchRowReader::new(&batch, &row_schema);
511        let arena = Bump::new();
512
513        assert_eq!(reader.read_row(0, &arena).get_str(0), "hello");
514        assert_eq!(reader.read_row(1, &arena).get_str(0), "world");
515    }
516
517    #[test]
518    fn roundtrip_binary() {
519        let schema = make_schema(vec![("data", DataType::Binary, false)]);
520        let batch = RecordBatch::try_new(
521            Arc::clone(&schema),
522            vec![Arc::new(BinaryArray::from_vec(vec![
523                &[0xDE, 0xAD],
524                &[0xBE, 0xEF],
525            ]))],
526        )
527        .unwrap();
528        let row_schema = RowSchema::from_arrow(&schema).unwrap();
529        let reader = BatchRowReader::new(&batch, &row_schema);
530        let arena = Bump::new();
531
532        assert_eq!(reader.read_row(0, &arena).get_bytes(0), &[0xDE, 0xAD]);
533        assert_eq!(reader.read_row(1, &arena).get_bytes(0), &[0xBE, 0xEF]);
534    }
535
536    // ── Null handling tests ─────────────────────────────────────────
537
538    #[test]
539    fn null_fields_preserved() {
540        let schema = make_schema(vec![
541            ("a", DataType::Int64, true),
542            ("b", DataType::Float64, true),
543        ]);
544        let batch = RecordBatch::try_new(
545            Arc::clone(&schema),
546            vec![
547                Arc::new(Int64Array::from(vec![Some(42), None])),
548                Arc::new(Float64Array::from(vec![None, Some(3.14)])),
549            ],
550        )
551        .unwrap();
552        let row_schema = RowSchema::from_arrow(&schema).unwrap();
553        let reader = BatchRowReader::new(&batch, &row_schema);
554        let arena = Bump::new();
555
556        let row0 = reader.read_row(0, &arena);
557        assert!(!row0.is_null(0));
558        assert_eq!(row0.get_i64(0), 42);
559        assert!(row0.is_null(1));
560
561        let row1 = reader.read_row(1, &arena);
562        assert!(row1.is_null(0));
563        assert!(!row1.is_null(1));
564        assert!((row1.get_f64(1) - 3.14).abs() < f64::EPSILON);
565    }
566
567    #[test]
568    fn mixed_null_non_null() {
569        let schema = make_schema(vec![
570            ("id", DataType::Int64, false),
571            ("name", DataType::Utf8, true),
572            ("score", DataType::Float64, true),
573        ]);
574        let batch = RecordBatch::try_new(
575            Arc::clone(&schema),
576            vec![
577                Arc::new(Int64Array::from(vec![1, 2])),
578                Arc::new(StringArray::from(vec![Some("alice"), None])),
579                Arc::new(Float64Array::from(vec![None, Some(95.5)])),
580            ],
581        )
582        .unwrap();
583        let row_schema = RowSchema::from_arrow(&schema).unwrap();
584        let reader = BatchRowReader::new(&batch, &row_schema);
585        let arena = Bump::new();
586
587        let row0 = reader.read_row(0, &arena);
588        assert_eq!(row0.get_i64(0), 1);
589        assert_eq!(row0.get_str(1), "alice");
590        assert!(row0.is_null(2));
591
592        let row1 = reader.read_row(1, &arena);
593        assert_eq!(row1.get_i64(0), 2);
594        assert!(row1.is_null(1));
595        assert!((row1.get_f64(2) - 95.5).abs() < f64::EPSILON);
596    }
597
598    // ── Round-trip: batch → row → bridge → batch ────────────────────
599
600    #[test]
601    fn full_roundtrip_fixed_types() {
602        let schema = make_schema(vec![
603            ("a", DataType::Int64, true),
604            ("b", DataType::Float64, true),
605        ]);
606        let original = RecordBatch::try_new(
607            Arc::clone(&schema),
608            vec![
609                Arc::new(Int64Array::from(vec![Some(10), None, Some(30)])),
610                Arc::new(Float64Array::from(vec![Some(1.1), Some(2.2), None])),
611            ],
612        )
613        .unwrap();
614
615        let row_schema = RowSchema::from_arrow(&schema).unwrap();
616        let reader = BatchRowReader::new(&original, &row_schema);
617        let mut bridge = RowBatchBridge::new(schema, 16).unwrap();
618
619        let arena = Bump::new();
620        for i in 0..reader.row_count() {
621            let row = reader.read_row(i, &arena);
622            bridge.append_row(&row).unwrap();
623        }
624        let restored = bridge.flush();
625
626        // Compare column by column.
627        assert_eq!(restored.num_rows(), 3);
628        let col_a = restored
629            .column(0)
630            .as_any()
631            .downcast_ref::<Int64Array>()
632            .unwrap();
633        let col_b = restored
634            .column(1)
635            .as_any()
636            .downcast_ref::<Float64Array>()
637            .unwrap();
638
639        assert!(!col_a.is_null(0));
640        assert_eq!(col_a.value(0), 10);
641        assert!(col_a.is_null(1));
642        assert!(!col_a.is_null(2));
643        assert_eq!(col_a.value(2), 30);
644
645        assert!(!col_b.is_null(0));
646        assert!((col_b.value(0) - 1.1).abs() < f64::EPSILON);
647        assert!(!col_b.is_null(1));
648        assert!((col_b.value(1) - 2.2).abs() < f64::EPSILON);
649        assert!(col_b.is_null(2));
650    }
651
652    #[test]
653    fn full_roundtrip_variable_types() {
654        let schema = make_schema(vec![
655            ("id", DataType::Int64, false),
656            ("name", DataType::Utf8, true),
657            ("data", DataType::Binary, true),
658        ]);
659        let original = RecordBatch::try_new(
660            Arc::clone(&schema),
661            vec![
662                Arc::new(Int64Array::from(vec![1, 2])),
663                Arc::new(StringArray::from(vec![Some("hello"), None])),
664                Arc::new(BinaryArray::from_opt_vec(vec![None, Some(&[0xCA, 0xFE])])),
665            ],
666        )
667        .unwrap();
668
669        let row_schema = RowSchema::from_arrow(&schema).unwrap();
670        let reader = BatchRowReader::new(&original, &row_schema);
671        let mut bridge = RowBatchBridge::new(Arc::clone(original.schema_ref()), 16).unwrap();
672
673        let arena = Bump::new();
674        for i in 0..reader.row_count() {
675            let row = reader.read_row(i, &arena);
676            bridge.append_row(&row).unwrap();
677        }
678        let restored = bridge.flush();
679
680        assert_eq!(restored.num_rows(), 2);
681        let col_name = restored
682            .column(1)
683            .as_any()
684            .downcast_ref::<StringArray>()
685            .unwrap();
686        assert_eq!(col_name.value(0), "hello");
687        assert!(col_name.is_null(1));
688
689        let col_data = restored
690            .column(2)
691            .as_any()
692            .downcast_ref::<BinaryArray>()
693            .unwrap();
694        assert!(col_data.is_null(0));
695        assert_eq!(col_data.value(1), &[0xCA, 0xFE]);
696    }
697
698    // ── Edge cases ──────────────────────────────────────────────────
699
700    #[test]
701    fn empty_batch_no_rows() {
702        let schema = make_schema(vec![("x", DataType::Int64, false)]);
703        let batch = RecordBatch::try_new(
704            Arc::clone(&schema),
705            vec![Arc::new(Int64Array::from(Vec::<i64>::new()))],
706        )
707        .unwrap();
708        let row_schema = RowSchema::from_arrow(&schema).unwrap();
709        let reader = BatchRowReader::new(&batch, &row_schema);
710        assert_eq!(reader.row_count(), 0);
711    }
712
713    #[test]
714    fn single_row_batch() {
715        let schema = make_schema(vec![("x", DataType::Int64, false)]);
716        let batch = RecordBatch::try_new(
717            Arc::clone(&schema),
718            vec![Arc::new(Int64Array::from(vec![42]))],
719        )
720        .unwrap();
721        let row_schema = RowSchema::from_arrow(&schema).unwrap();
722        let reader = BatchRowReader::new(&batch, &row_schema);
723        assert_eq!(reader.row_count(), 1);
724
725        let arena = Bump::new();
726        let row = reader.read_row(0, &arena);
727        assert_eq!(row.get_i64(0), 42);
728    }
729
730    #[test]
731    fn empty_string_in_batch() {
732        let schema = make_schema(vec![("s", DataType::Utf8, false)]);
733        let batch = RecordBatch::try_new(
734            Arc::clone(&schema),
735            vec![Arc::new(StringArray::from(vec!["", "nonempty"]))],
736        )
737        .unwrap();
738        let row_schema = RowSchema::from_arrow(&schema).unwrap();
739        let reader = BatchRowReader::new(&batch, &row_schema);
740        let arena = Bump::new();
741
742        assert_eq!(reader.read_row(0, &arena).get_str(0), "");
743        assert_eq!(reader.read_row(1, &arena).get_str(0), "nonempty");
744    }
745}