1use arrow_array::cast::AsArray;
44use arrow_array::types::TimestampMicrosecondType;
45use arrow_array::{
46 Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
47 Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array, UInt32Array, UInt64Array,
48 UInt8Array,
49};
50
51use bumpalo::Bump;
52
53use super::row::{EventRow, FieldType, MutableEventRow, RowSchema};
54
55enum ColumnAccessor<'a> {
57 Bool(&'a BooleanArray),
58 Int8(&'a Int8Array),
59 Int16(&'a Int16Array),
60 Int32(&'a Int32Array),
61 Int64(&'a Int64Array),
62 UInt8(&'a UInt8Array),
63 UInt16(&'a UInt16Array),
64 UInt32(&'a UInt32Array),
65 UInt64(&'a UInt64Array),
66 Float32(&'a Float32Array),
67 Float64(&'a Float64Array),
68 TimestampMicros(&'a arrow_array::TimestampMicrosecondArray),
69 Utf8(&'a StringArray),
70 Binary(&'a BinaryArray),
71}
72
73pub struct BatchRowReader<'a> {
79 batch: &'a RecordBatch,
80 schema: &'a RowSchema,
81 columns: Vec<ColumnAccessor<'a>>,
82 var_capacity: usize,
84}
85
86impl<'a> BatchRowReader<'a> {
87 #[must_use]
94 pub fn new(batch: &'a RecordBatch, schema: &'a RowSchema) -> Self {
95 assert_eq!(
96 batch.num_columns(),
97 schema.field_count(),
98 "BatchRowReader: column count mismatch: batch has {}, schema has {}",
99 batch.num_columns(),
100 schema.field_count()
101 );
102
103 let columns: Vec<ColumnAccessor<'a>> = (0..schema.field_count())
104 .map(|i| downcast_column(batch, schema, i))
105 .collect();
106
107 let var_capacity = schema.fields().iter().filter(|f| f.is_variable).count() * 64;
108
109 Self {
110 batch,
111 schema,
112 columns,
113 var_capacity,
114 }
115 }
116
117 #[must_use]
119 pub fn row_count(&self) -> usize {
120 self.batch.num_rows()
121 }
122
123 #[must_use]
129 pub fn read_row<'b>(&self, row_idx: usize, arena: &'b Bump) -> EventRow<'b>
130 where
131 'a: 'b,
132 {
133 debug_assert!(
134 row_idx < self.batch.num_rows(),
135 "row index {row_idx} out of bounds (batch has {} rows)",
136 self.batch.num_rows()
137 );
138
139 let mut row = MutableEventRow::new_in(arena, self.schema, self.var_capacity);
140
141 for (field_idx, col) in self.columns.iter().enumerate() {
142 read_field(&mut row, col, field_idx, row_idx);
143 }
144
145 row.freeze()
146 }
147}
148
149fn downcast_column<'a>(
151 batch: &'a RecordBatch,
152 schema: &RowSchema,
153 col_idx: usize,
154) -> ColumnAccessor<'a> {
155 let field_type = schema.field(col_idx).field_type;
156 let col = batch.column(col_idx);
157
158 match field_type {
159 FieldType::Bool => ColumnAccessor::Bool(col.as_boolean()),
160 FieldType::Int8 => ColumnAccessor::Int8(col.as_primitive::<arrow_array::types::Int8Type>()),
161 FieldType::Int16 => {
162 ColumnAccessor::Int16(col.as_primitive::<arrow_array::types::Int16Type>())
163 }
164 FieldType::Int32 => {
165 ColumnAccessor::Int32(col.as_primitive::<arrow_array::types::Int32Type>())
166 }
167 FieldType::Int64 => {
168 ColumnAccessor::Int64(col.as_primitive::<arrow_array::types::Int64Type>())
169 }
170 FieldType::UInt8 => {
171 ColumnAccessor::UInt8(col.as_primitive::<arrow_array::types::UInt8Type>())
172 }
173 FieldType::UInt16 => {
174 ColumnAccessor::UInt16(col.as_primitive::<arrow_array::types::UInt16Type>())
175 }
176 FieldType::UInt32 => {
177 ColumnAccessor::UInt32(col.as_primitive::<arrow_array::types::UInt32Type>())
178 }
179 FieldType::UInt64 => {
180 ColumnAccessor::UInt64(col.as_primitive::<arrow_array::types::UInt64Type>())
181 }
182 FieldType::Float32 => {
183 ColumnAccessor::Float32(col.as_primitive::<arrow_array::types::Float32Type>())
184 }
185 FieldType::Float64 => {
186 ColumnAccessor::Float64(col.as_primitive::<arrow_array::types::Float64Type>())
187 }
188 FieldType::TimestampMicros => {
189 ColumnAccessor::TimestampMicros(col.as_primitive::<TimestampMicrosecondType>())
190 }
191 FieldType::Utf8 => ColumnAccessor::Utf8(col.as_string::<i32>()),
192 FieldType::Binary => ColumnAccessor::Binary(col.as_binary::<i32>()),
193 }
194}
195
196#[allow(clippy::too_many_lines)]
198fn read_field(
199 row: &mut MutableEventRow<'_>,
200 col: &ColumnAccessor<'_>,
201 field_idx: usize,
202 row_idx: usize,
203) {
204 match col {
205 ColumnAccessor::Bool(arr) => {
206 if arr.is_null(row_idx) {
207 row.set_null(field_idx, true);
208 } else {
209 row.set_bool(field_idx, arr.value(row_idx));
210 }
211 }
212 ColumnAccessor::Int8(arr) => {
213 if arr.is_null(row_idx) {
214 row.set_null(field_idx, true);
215 } else {
216 row.set_i8(field_idx, arr.value(row_idx));
217 }
218 }
219 ColumnAccessor::Int16(arr) => {
220 if arr.is_null(row_idx) {
221 row.set_null(field_idx, true);
222 } else {
223 row.set_i16(field_idx, arr.value(row_idx));
224 }
225 }
226 ColumnAccessor::Int32(arr) => {
227 if arr.is_null(row_idx) {
228 row.set_null(field_idx, true);
229 } else {
230 row.set_i32(field_idx, arr.value(row_idx));
231 }
232 }
233 ColumnAccessor::Int64(arr) => {
234 if arr.is_null(row_idx) {
235 row.set_null(field_idx, true);
236 } else {
237 row.set_i64(field_idx, arr.value(row_idx));
238 }
239 }
240 ColumnAccessor::UInt8(arr) => {
241 if arr.is_null(row_idx) {
242 row.set_null(field_idx, true);
243 } else {
244 row.set_u8(field_idx, arr.value(row_idx));
245 }
246 }
247 ColumnAccessor::UInt16(arr) => {
248 if arr.is_null(row_idx) {
249 row.set_null(field_idx, true);
250 } else {
251 row.set_u16(field_idx, arr.value(row_idx));
252 }
253 }
254 ColumnAccessor::UInt32(arr) => {
255 if arr.is_null(row_idx) {
256 row.set_null(field_idx, true);
257 } else {
258 row.set_u32(field_idx, arr.value(row_idx));
259 }
260 }
261 ColumnAccessor::UInt64(arr) => {
262 if arr.is_null(row_idx) {
263 row.set_null(field_idx, true);
264 } else {
265 row.set_u64(field_idx, arr.value(row_idx));
266 }
267 }
268 ColumnAccessor::Float32(arr) => {
269 if arr.is_null(row_idx) {
270 row.set_null(field_idx, true);
271 } else {
272 row.set_f32(field_idx, arr.value(row_idx));
273 }
274 }
275 ColumnAccessor::Float64(arr) => {
276 if arr.is_null(row_idx) {
277 row.set_null(field_idx, true);
278 } else {
279 row.set_f64(field_idx, arr.value(row_idx));
280 }
281 }
282 ColumnAccessor::TimestampMicros(arr) => {
283 if arr.is_null(row_idx) {
284 row.set_null(field_idx, true);
285 } else {
286 row.set_i64(field_idx, arr.value(row_idx));
287 }
288 }
289 ColumnAccessor::Utf8(arr) => {
290 if arr.is_null(row_idx) {
291 row.set_null(field_idx, true);
292 } else {
293 row.set_str(field_idx, arr.value(row_idx));
294 }
295 }
296 ColumnAccessor::Binary(arr) => {
297 if arr.is_null(row_idx) {
298 row.set_null(field_idx, true);
299 } else {
300 row.set_bytes(field_idx, arr.value(row_idx));
301 }
302 }
303 }
304}
305
306#[cfg(test)]
307#[allow(clippy::approx_constant)]
308mod tests {
309 use super::*;
310 use crate::compiler::bridge::RowBatchBridge;
311 use arrow_array::builder::TimestampMicrosecondBuilder;
312 use arrow_array::{
313 Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
314 Int64Array, Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
315 };
316 use arrow_schema::{DataType, Field, Schema, TimeUnit};
317 use std::sync::Arc;
318
319 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> Arc<Schema> {
320 Arc::new(Schema::new(
321 fields
322 .into_iter()
323 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
324 .collect::<Vec<_>>(),
325 ))
326 }
327
328 #[test]
331 fn new_valid_schema() {
332 let schema = make_schema(vec![
333 ("id", DataType::Int64, false),
334 ("val", DataType::Float64, true),
335 ]);
336 let batch = RecordBatch::try_new(
337 Arc::clone(&schema),
338 vec![
339 Arc::new(Int64Array::from(vec![1, 2, 3])),
340 Arc::new(Float64Array::from(vec![Some(1.0), None, Some(3.0)])),
341 ],
342 )
343 .unwrap();
344 let row_schema = RowSchema::from_arrow(&schema).unwrap();
345 let reader = BatchRowReader::new(&batch, &row_schema);
346 assert_eq!(reader.row_count(), 3);
347 }
348
349 #[test]
350 #[should_panic(expected = "column count mismatch")]
351 fn new_column_count_mismatch() {
352 let batch_schema = make_schema(vec![("a", DataType::Int64, false)]);
353 let row_schema_arrow = make_schema(vec![
354 ("a", DataType::Int64, false),
355 ("b", DataType::Int64, false),
356 ]);
357 let batch =
358 RecordBatch::try_new(batch_schema, vec![Arc::new(Int64Array::from(vec![1]))]).unwrap();
359 let row_schema = RowSchema::from_arrow(&row_schema_arrow).unwrap();
360 let _ = BatchRowReader::new(&batch, &row_schema);
361 }
362
363 #[test]
364 fn row_count_empty_batch() {
365 let schema = make_schema(vec![("x", DataType::Int64, false)]);
366 let batch = RecordBatch::try_new(
367 Arc::clone(&schema),
368 vec![Arc::new(Int64Array::from(Vec::<i64>::new()))],
369 )
370 .unwrap();
371 let row_schema = RowSchema::from_arrow(&schema).unwrap();
372 let reader = BatchRowReader::new(&batch, &row_schema);
373 assert_eq!(reader.row_count(), 0);
374 }
375
376 #[test]
379 fn roundtrip_int64_float64() {
380 let schema = make_schema(vec![
381 ("ts", DataType::Int64, false),
382 ("val", DataType::Float64, false),
383 ]);
384 let batch = RecordBatch::try_new(
385 Arc::clone(&schema),
386 vec![
387 Arc::new(Int64Array::from(vec![1000, 2000])),
388 Arc::new(Float64Array::from(vec![3.14, 2.718])),
389 ],
390 )
391 .unwrap();
392 let row_schema = RowSchema::from_arrow(&schema).unwrap();
393 let reader = BatchRowReader::new(&batch, &row_schema);
394 let arena = Bump::new();
395
396 let row0 = reader.read_row(0, &arena);
397 assert_eq!(row0.get_i64(0), 1000);
398 assert!((row0.get_f64(1) - 3.14).abs() < f64::EPSILON);
399
400 let row1 = reader.read_row(1, &arena);
401 assert_eq!(row1.get_i64(0), 2000);
402 assert!((row1.get_f64(1) - 2.718).abs() < f64::EPSILON);
403 }
404
405 #[test]
406 fn roundtrip_all_integer_types() {
407 let schema = make_schema(vec![
408 ("a", DataType::Int8, false),
409 ("b", DataType::Int16, false),
410 ("c", DataType::Int32, false),
411 ("d", DataType::Int64, false),
412 ("e", DataType::UInt8, false),
413 ("f", DataType::UInt16, false),
414 ("g", DataType::UInt32, false),
415 ("h", DataType::UInt64, false),
416 ]);
417 let batch = RecordBatch::try_new(
418 Arc::clone(&schema),
419 vec![
420 Arc::new(Int8Array::from(vec![-42])),
421 Arc::new(Int16Array::from(vec![-1000])),
422 Arc::new(Int32Array::from(vec![100_000])),
423 Arc::new(Int64Array::from(vec![i64::MAX])),
424 Arc::new(UInt8Array::from(vec![255])),
425 Arc::new(UInt16Array::from(vec![60_000])),
426 Arc::new(UInt32Array::from(vec![u32::MAX])),
427 Arc::new(UInt64Array::from(vec![u64::MAX])),
428 ],
429 )
430 .unwrap();
431 let row_schema = RowSchema::from_arrow(&schema).unwrap();
432 let reader = BatchRowReader::new(&batch, &row_schema);
433 let arena = Bump::new();
434
435 let row = reader.read_row(0, &arena);
436 assert_eq!(row.get_i8(0), -42);
437 assert_eq!(row.get_i16(1), -1000);
438 assert_eq!(row.get_i32(2), 100_000);
439 assert_eq!(row.get_i64(3), i64::MAX);
440 assert_eq!(row.get_u8(4), 255);
441 assert_eq!(row.get_u16(5), 60_000);
442 assert_eq!(row.get_u32(6), u32::MAX);
443 assert_eq!(row.get_u64(7), u64::MAX);
444 }
445
446 #[test]
447 fn roundtrip_bool() {
448 let schema = make_schema(vec![("flag", DataType::Boolean, false)]);
449 let batch = RecordBatch::try_new(
450 Arc::clone(&schema),
451 vec![Arc::new(BooleanArray::from(vec![true, false, true]))],
452 )
453 .unwrap();
454 let row_schema = RowSchema::from_arrow(&schema).unwrap();
455 let reader = BatchRowReader::new(&batch, &row_schema);
456 let arena = Bump::new();
457
458 assert!(reader.read_row(0, &arena).get_bool(0));
459 assert!(!reader.read_row(1, &arena).get_bool(0));
460 assert!(reader.read_row(2, &arena).get_bool(0));
461 }
462
463 #[test]
464 fn roundtrip_float32() {
465 let schema = make_schema(vec![("f", DataType::Float32, false)]);
466 let batch = RecordBatch::try_new(
467 Arc::clone(&schema),
468 vec![Arc::new(Float32Array::from(vec![std::f32::consts::PI]))],
469 )
470 .unwrap();
471 let row_schema = RowSchema::from_arrow(&schema).unwrap();
472 let reader = BatchRowReader::new(&batch, &row_schema);
473 let arena = Bump::new();
474
475 let row = reader.read_row(0, &arena);
476 assert!((row.get_f32(0) - std::f32::consts::PI).abs() < f32::EPSILON);
477 }
478
479 #[test]
480 fn roundtrip_timestamp_micros() {
481 let schema = make_schema(vec![(
482 "ts",
483 DataType::Timestamp(TimeUnit::Microsecond, None),
484 false,
485 )]);
486 let ts_val = 1_706_000_000_000_000_i64;
487 let mut builder = TimestampMicrosecondBuilder::with_capacity(1);
488 builder.append_value(ts_val);
489 let batch =
490 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(builder.finish())]).unwrap();
491 let row_schema = RowSchema::from_arrow(&schema).unwrap();
492 let reader = BatchRowReader::new(&batch, &row_schema);
493 let arena = Bump::new();
494
495 let row = reader.read_row(0, &arena);
496 assert_eq!(row.get_i64(0), ts_val);
497 }
498
499 #[test]
502 fn roundtrip_utf8() {
503 let schema = make_schema(vec![("name", DataType::Utf8, false)]);
504 let batch = RecordBatch::try_new(
505 Arc::clone(&schema),
506 vec![Arc::new(StringArray::from(vec!["hello", "world"]))],
507 )
508 .unwrap();
509 let row_schema = RowSchema::from_arrow(&schema).unwrap();
510 let reader = BatchRowReader::new(&batch, &row_schema);
511 let arena = Bump::new();
512
513 assert_eq!(reader.read_row(0, &arena).get_str(0), "hello");
514 assert_eq!(reader.read_row(1, &arena).get_str(0), "world");
515 }
516
517 #[test]
518 fn roundtrip_binary() {
519 let schema = make_schema(vec![("data", DataType::Binary, false)]);
520 let batch = RecordBatch::try_new(
521 Arc::clone(&schema),
522 vec![Arc::new(BinaryArray::from_vec(vec![
523 &[0xDE, 0xAD],
524 &[0xBE, 0xEF],
525 ]))],
526 )
527 .unwrap();
528 let row_schema = RowSchema::from_arrow(&schema).unwrap();
529 let reader = BatchRowReader::new(&batch, &row_schema);
530 let arena = Bump::new();
531
532 assert_eq!(reader.read_row(0, &arena).get_bytes(0), &[0xDE, 0xAD]);
533 assert_eq!(reader.read_row(1, &arena).get_bytes(0), &[0xBE, 0xEF]);
534 }
535
536 #[test]
539 fn null_fields_preserved() {
540 let schema = make_schema(vec![
541 ("a", DataType::Int64, true),
542 ("b", DataType::Float64, true),
543 ]);
544 let batch = RecordBatch::try_new(
545 Arc::clone(&schema),
546 vec![
547 Arc::new(Int64Array::from(vec![Some(42), None])),
548 Arc::new(Float64Array::from(vec![None, Some(3.14)])),
549 ],
550 )
551 .unwrap();
552 let row_schema = RowSchema::from_arrow(&schema).unwrap();
553 let reader = BatchRowReader::new(&batch, &row_schema);
554 let arena = Bump::new();
555
556 let row0 = reader.read_row(0, &arena);
557 assert!(!row0.is_null(0));
558 assert_eq!(row0.get_i64(0), 42);
559 assert!(row0.is_null(1));
560
561 let row1 = reader.read_row(1, &arena);
562 assert!(row1.is_null(0));
563 assert!(!row1.is_null(1));
564 assert!((row1.get_f64(1) - 3.14).abs() < f64::EPSILON);
565 }
566
567 #[test]
568 fn mixed_null_non_null() {
569 let schema = make_schema(vec![
570 ("id", DataType::Int64, false),
571 ("name", DataType::Utf8, true),
572 ("score", DataType::Float64, true),
573 ]);
574 let batch = RecordBatch::try_new(
575 Arc::clone(&schema),
576 vec![
577 Arc::new(Int64Array::from(vec![1, 2])),
578 Arc::new(StringArray::from(vec![Some("alice"), None])),
579 Arc::new(Float64Array::from(vec![None, Some(95.5)])),
580 ],
581 )
582 .unwrap();
583 let row_schema = RowSchema::from_arrow(&schema).unwrap();
584 let reader = BatchRowReader::new(&batch, &row_schema);
585 let arena = Bump::new();
586
587 let row0 = reader.read_row(0, &arena);
588 assert_eq!(row0.get_i64(0), 1);
589 assert_eq!(row0.get_str(1), "alice");
590 assert!(row0.is_null(2));
591
592 let row1 = reader.read_row(1, &arena);
593 assert_eq!(row1.get_i64(0), 2);
594 assert!(row1.is_null(1));
595 assert!((row1.get_f64(2) - 95.5).abs() < f64::EPSILON);
596 }
597
598 #[test]
601 fn full_roundtrip_fixed_types() {
602 let schema = make_schema(vec![
603 ("a", DataType::Int64, true),
604 ("b", DataType::Float64, true),
605 ]);
606 let original = RecordBatch::try_new(
607 Arc::clone(&schema),
608 vec![
609 Arc::new(Int64Array::from(vec![Some(10), None, Some(30)])),
610 Arc::new(Float64Array::from(vec![Some(1.1), Some(2.2), None])),
611 ],
612 )
613 .unwrap();
614
615 let row_schema = RowSchema::from_arrow(&schema).unwrap();
616 let reader = BatchRowReader::new(&original, &row_schema);
617 let mut bridge = RowBatchBridge::new(schema, 16).unwrap();
618
619 let arena = Bump::new();
620 for i in 0..reader.row_count() {
621 let row = reader.read_row(i, &arena);
622 bridge.append_row(&row).unwrap();
623 }
624 let restored = bridge.flush();
625
626 assert_eq!(restored.num_rows(), 3);
628 let col_a = restored
629 .column(0)
630 .as_any()
631 .downcast_ref::<Int64Array>()
632 .unwrap();
633 let col_b = restored
634 .column(1)
635 .as_any()
636 .downcast_ref::<Float64Array>()
637 .unwrap();
638
639 assert!(!col_a.is_null(0));
640 assert_eq!(col_a.value(0), 10);
641 assert!(col_a.is_null(1));
642 assert!(!col_a.is_null(2));
643 assert_eq!(col_a.value(2), 30);
644
645 assert!(!col_b.is_null(0));
646 assert!((col_b.value(0) - 1.1).abs() < f64::EPSILON);
647 assert!(!col_b.is_null(1));
648 assert!((col_b.value(1) - 2.2).abs() < f64::EPSILON);
649 assert!(col_b.is_null(2));
650 }
651
652 #[test]
653 fn full_roundtrip_variable_types() {
654 let schema = make_schema(vec![
655 ("id", DataType::Int64, false),
656 ("name", DataType::Utf8, true),
657 ("data", DataType::Binary, true),
658 ]);
659 let original = RecordBatch::try_new(
660 Arc::clone(&schema),
661 vec![
662 Arc::new(Int64Array::from(vec![1, 2])),
663 Arc::new(StringArray::from(vec![Some("hello"), None])),
664 Arc::new(BinaryArray::from_opt_vec(vec![None, Some(&[0xCA, 0xFE])])),
665 ],
666 )
667 .unwrap();
668
669 let row_schema = RowSchema::from_arrow(&schema).unwrap();
670 let reader = BatchRowReader::new(&original, &row_schema);
671 let mut bridge = RowBatchBridge::new(Arc::clone(original.schema_ref()), 16).unwrap();
672
673 let arena = Bump::new();
674 for i in 0..reader.row_count() {
675 let row = reader.read_row(i, &arena);
676 bridge.append_row(&row).unwrap();
677 }
678 let restored = bridge.flush();
679
680 assert_eq!(restored.num_rows(), 2);
681 let col_name = restored
682 .column(1)
683 .as_any()
684 .downcast_ref::<StringArray>()
685 .unwrap();
686 assert_eq!(col_name.value(0), "hello");
687 assert!(col_name.is_null(1));
688
689 let col_data = restored
690 .column(2)
691 .as_any()
692 .downcast_ref::<BinaryArray>()
693 .unwrap();
694 assert!(col_data.is_null(0));
695 assert_eq!(col_data.value(1), &[0xCA, 0xFE]);
696 }
697
698 #[test]
701 fn empty_batch_no_rows() {
702 let schema = make_schema(vec![("x", DataType::Int64, false)]);
703 let batch = RecordBatch::try_new(
704 Arc::clone(&schema),
705 vec![Arc::new(Int64Array::from(Vec::<i64>::new()))],
706 )
707 .unwrap();
708 let row_schema = RowSchema::from_arrow(&schema).unwrap();
709 let reader = BatchRowReader::new(&batch, &row_schema);
710 assert_eq!(reader.row_count(), 0);
711 }
712
713 #[test]
714 fn single_row_batch() {
715 let schema = make_schema(vec![("x", DataType::Int64, false)]);
716 let batch = RecordBatch::try_new(
717 Arc::clone(&schema),
718 vec![Arc::new(Int64Array::from(vec![42]))],
719 )
720 .unwrap();
721 let row_schema = RowSchema::from_arrow(&schema).unwrap();
722 let reader = BatchRowReader::new(&batch, &row_schema);
723 assert_eq!(reader.row_count(), 1);
724
725 let arena = Bump::new();
726 let row = reader.read_row(0, &arena);
727 assert_eq!(row.get_i64(0), 42);
728 }
729
730 #[test]
731 fn empty_string_in_batch() {
732 let schema = make_schema(vec![("s", DataType::Utf8, false)]);
733 let batch = RecordBatch::try_new(
734 Arc::clone(&schema),
735 vec![Arc::new(StringArray::from(vec!["", "nonempty"]))],
736 )
737 .unwrap();
738 let row_schema = RowSchema::from_arrow(&schema).unwrap();
739 let reader = BatchRowReader::new(&batch, &row_schema);
740 let arena = Bump::new();
741
742 assert_eq!(reader.read_row(0, &arena).get_str(0), "");
743 assert_eq!(reader.read_row(1, &arena).get_str(0), "nonempty");
744 }
745}