1use std::sync::Arc;
47
48use arrow_schema::{DataType, SchemaRef, TimeUnit};
49use bumpalo::Bump;
50
51const HEADER_SIZE: usize = 8;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum FieldType {
57 Bool,
59 Int8,
61 Int16,
63 Int32,
65 Int64,
67 UInt8,
69 UInt16,
71 UInt32,
73 UInt64,
75 Float32,
77 Float64,
79 TimestampMicros,
81 Utf8,
83 Binary,
85}
86
87impl FieldType {
88 #[must_use]
92 pub fn from_arrow(dt: &DataType) -> Option<Self> {
93 match dt {
94 DataType::Boolean => Some(Self::Bool),
95 DataType::Int8 => Some(Self::Int8),
96 DataType::Int16 => Some(Self::Int16),
97 DataType::Int32 => Some(Self::Int32),
98 DataType::Int64 => Some(Self::Int64),
99 DataType::UInt8 => Some(Self::UInt8),
100 DataType::UInt16 => Some(Self::UInt16),
101 DataType::UInt32 => Some(Self::UInt32),
102 DataType::UInt64 => Some(Self::UInt64),
103 DataType::Float32 => Some(Self::Float32),
104 DataType::Float64 => Some(Self::Float64),
105 DataType::Timestamp(TimeUnit::Microsecond, _) => Some(Self::TimestampMicros),
106 DataType::Utf8 => Some(Self::Utf8),
107 DataType::Binary => Some(Self::Binary),
108 _ => None,
109 }
110 }
111
112 #[must_use]
114 pub fn to_arrow(self) -> DataType {
115 match self {
116 Self::Bool => DataType::Boolean,
117 Self::Int8 => DataType::Int8,
118 Self::Int16 => DataType::Int16,
119 Self::Int32 => DataType::Int32,
120 Self::Int64 => DataType::Int64,
121 Self::UInt8 => DataType::UInt8,
122 Self::UInt16 => DataType::UInt16,
123 Self::UInt32 => DataType::UInt32,
124 Self::UInt64 => DataType::UInt64,
125 Self::Float32 => DataType::Float32,
126 Self::Float64 => DataType::Float64,
127 Self::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
128 Self::Utf8 => DataType::Utf8,
129 Self::Binary => DataType::Binary,
130 }
131 }
132
133 #[must_use]
138 pub const fn inline_size(self) -> usize {
139 match self {
140 Self::Bool | Self::Int8 | Self::UInt8 => 1,
141 Self::Int16 | Self::UInt16 => 2,
142 Self::Int32 | Self::UInt32 | Self::Float32 => 4,
143 Self::Int64
144 | Self::UInt64
145 | Self::Float64
146 | Self::TimestampMicros
147 | Self::Utf8
148 | Self::Binary => 8,
149 }
150 }
151
152 #[must_use]
154 pub const fn alignment(self) -> usize {
155 match self {
156 Self::Bool | Self::Int8 | Self::UInt8 => 1,
157 Self::Int16 | Self::UInt16 => 2,
158 Self::Int32 | Self::UInt32 | Self::Float32 | Self::Utf8 | Self::Binary => 4,
160 Self::Int64 | Self::UInt64 | Self::Float64 | Self::TimestampMicros => 8,
161 }
162 }
163
164 #[must_use]
166 pub const fn is_variable(self) -> bool {
167 matches!(self, Self::Utf8 | Self::Binary)
168 }
169}
170
171#[derive(Debug, Clone, Copy)]
173pub struct FieldLayout {
174 pub field_type: FieldType,
176 pub offset: usize,
178 pub size: usize,
180 pub null_bit: usize,
182 pub is_variable: bool,
184}
185
186#[derive(Debug, thiserror::Error)]
188pub enum RowError {
189 #[error("unsupported Arrow data type for field '{name}': {data_type}")]
191 UnsupportedType {
192 name: String,
194 data_type: DataType,
196 },
197}
198
199#[derive(Debug, Clone)]
205pub struct RowSchema {
206 fields: Vec<FieldLayout>,
207 arrow_schema: SchemaRef,
208 null_bitmap_size: usize,
209 fixed_region_offset: usize,
210 min_row_size: usize,
211}
212
213impl RowSchema {
214 pub fn from_arrow(schema: &SchemaRef) -> Result<Self, RowError> {
220 let field_count = schema.fields().len();
221 let null_bitmap_bytes = field_count.div_ceil(8);
222 let padded_bitmap = (null_bitmap_bytes + 7) & !7;
224 let fixed_region_offset = HEADER_SIZE + padded_bitmap;
225
226 let mut fields = Vec::with_capacity(field_count);
227 let mut current_offset = fixed_region_offset;
228
229 for (i, arrow_field) in schema.fields().iter().enumerate() {
230 let field_type = FieldType::from_arrow(arrow_field.data_type()).ok_or_else(|| {
231 RowError::UnsupportedType {
232 name: arrow_field.name().clone(),
233 data_type: arrow_field.data_type().clone(),
234 }
235 })?;
236
237 let align = field_type.alignment();
238 current_offset = (current_offset + align - 1) & !(align - 1);
239
240 fields.push(FieldLayout {
241 field_type,
242 offset: current_offset,
243 size: field_type.inline_size(),
244 null_bit: i,
245 is_variable: field_type.is_variable(),
246 });
247
248 current_offset += field_type.inline_size();
249 }
250
251 Ok(Self {
252 fields,
253 arrow_schema: Arc::clone(schema),
254 null_bitmap_size: padded_bitmap,
255 fixed_region_offset,
256 min_row_size: current_offset,
257 })
258 }
259
260 #[must_use]
262 pub const fn header_size() -> usize {
263 HEADER_SIZE
264 }
265
266 #[must_use]
268 pub fn null_bitmap_size(&self) -> usize {
269 self.null_bitmap_size
270 }
271
272 #[must_use]
274 pub fn fixed_region_offset(&self) -> usize {
275 self.fixed_region_offset
276 }
277
278 #[must_use]
280 pub fn min_row_size(&self) -> usize {
281 self.min_row_size
282 }
283
284 #[must_use]
286 pub fn field_count(&self) -> usize {
287 self.fields.len()
288 }
289
290 #[must_use]
296 pub fn field(&self, idx: usize) -> &FieldLayout {
297 &self.fields[idx]
298 }
299
300 #[must_use]
302 pub fn fields(&self) -> &[FieldLayout] {
303 &self.fields
304 }
305
306 #[must_use]
308 pub fn arrow_schema(&self) -> &SchemaRef {
309 &self.arrow_schema
310 }
311}
312
313#[derive(Debug)]
318pub struct EventRow<'a> {
319 data: &'a [u8],
320 schema: &'a RowSchema,
321}
322
323#[allow(clippy::missing_panics_doc)]
324impl<'a> EventRow<'a> {
325 #[inline]
331 #[must_use]
332 pub fn new(data: &'a [u8], schema: &'a RowSchema) -> Self {
333 debug_assert!(
334 data.len() >= schema.min_row_size(),
335 "data too short: {} < {}",
336 data.len(),
337 schema.min_row_size()
338 );
339 Self { data, schema }
340 }
341
342 #[inline]
344 #[must_use]
345 pub fn data(&self) -> &'a [u8] {
346 self.data
347 }
348
349 #[inline]
351 #[must_use]
352 pub fn schema(&self) -> &'a RowSchema {
353 self.schema
354 }
355
356 #[inline]
360 #[must_use]
361 pub fn is_null(&self, field_idx: usize) -> bool {
362 let bit = self.schema.fields[field_idx].null_bit;
363 let byte_idx = HEADER_SIZE + bit / 8;
364 let bit_idx = bit % 8;
365 (self.data[byte_idx] & (1 << bit_idx)) != 0
366 }
367
368 #[inline]
370 #[must_use]
371 pub fn get_bool(&self, field_idx: usize) -> bool {
372 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Bool);
373 let offset = self.schema.fields[field_idx].offset;
374 self.data[offset] != 0
375 }
376
377 #[inline]
379 #[must_use]
380 pub fn get_i8(&self, field_idx: usize) -> i8 {
381 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int8);
382 let offset = self.schema.fields[field_idx].offset;
383 i8::from_le_bytes([self.data[offset]])
384 }
385
386 #[inline]
388 #[must_use]
389 pub fn get_i16(&self, field_idx: usize) -> i16 {
390 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int16);
391 let offset = self.schema.fields[field_idx].offset;
392 i16::from_le_bytes([self.data[offset], self.data[offset + 1]])
393 }
394
395 #[inline]
397 #[must_use]
398 pub fn get_i32(&self, field_idx: usize) -> i32 {
399 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int32);
400 let offset = self.schema.fields[field_idx].offset;
401 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().unwrap();
402 i32::from_le_bytes(bytes)
403 }
404
405 #[inline]
407 #[must_use]
408 pub fn get_i64(&self, field_idx: usize) -> i64 {
409 let ft = self.schema.fields[field_idx].field_type;
410 debug_assert!(
411 ft == FieldType::Int64 || ft == FieldType::TimestampMicros,
412 "expected Int64 or TimestampMicros, got {ft:?}"
413 );
414 let offset = self.schema.fields[field_idx].offset;
415 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().unwrap();
416 i64::from_le_bytes(bytes)
417 }
418
419 #[inline]
421 #[must_use]
422 pub fn get_u8(&self, field_idx: usize) -> u8 {
423 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt8);
424 self.data[self.schema.fields[field_idx].offset]
425 }
426
427 #[inline]
429 #[must_use]
430 pub fn get_u16(&self, field_idx: usize) -> u16 {
431 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt16);
432 let offset = self.schema.fields[field_idx].offset;
433 u16::from_le_bytes([self.data[offset], self.data[offset + 1]])
434 }
435
436 #[inline]
438 #[must_use]
439 pub fn get_u32(&self, field_idx: usize) -> u32 {
440 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt32);
441 let offset = self.schema.fields[field_idx].offset;
442 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().unwrap();
443 u32::from_le_bytes(bytes)
444 }
445
446 #[inline]
448 #[must_use]
449 pub fn get_u64(&self, field_idx: usize) -> u64 {
450 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt64);
451 let offset = self.schema.fields[field_idx].offset;
452 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().unwrap();
453 u64::from_le_bytes(bytes)
454 }
455
456 #[inline]
458 #[must_use]
459 pub fn get_f32(&self, field_idx: usize) -> f32 {
460 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float32);
461 let offset = self.schema.fields[field_idx].offset;
462 let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().unwrap();
463 f32::from_le_bytes(bytes)
464 }
465
466 #[inline]
468 #[must_use]
469 pub fn get_f64(&self, field_idx: usize) -> f64 {
470 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float64);
471 let offset = self.schema.fields[field_idx].offset;
472 let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().unwrap();
473 f64::from_le_bytes(bytes)
474 }
475
476 #[inline]
478 #[must_use]
479 pub fn get_str(&self, field_idx: usize) -> &'a str {
480 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Utf8);
481 let offset = self.schema.fields[field_idx].offset;
482 let (var_off, var_len) = read_var_descriptor(self.data, offset);
483 if var_len == 0 {
484 return "";
485 }
486 std::str::from_utf8(&self.data[var_off..var_off + var_len]).unwrap_or("<invalid-utf8>")
489 }
490
491 #[inline]
493 #[must_use]
494 pub fn get_bytes(&self, field_idx: usize) -> &'a [u8] {
495 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Binary);
496 let offset = self.schema.fields[field_idx].offset;
497 let (var_off, var_len) = read_var_descriptor(self.data, offset);
498 if var_len == 0 {
499 return &[];
500 }
501 &self.data[var_off..var_off + var_len]
502 }
503
504 #[inline]
506 #[must_use]
507 pub fn field_ptr(&self, field_idx: usize) -> *const u8 {
508 let offset = self.schema.fields[field_idx].offset;
509 self.data[offset..].as_ptr()
510 }
511
512 #[inline]
514 #[must_use]
515 pub fn timestamp(&self) -> i64 {
516 self.get_i64(0)
517 }
518}
519
520#[inline]
522fn read_var_descriptor(data: &[u8], offset: usize) -> (usize, usize) {
523 let off_bytes: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
524 let len_bytes: [u8; 4] = data[offset + 4..offset + 8].try_into().unwrap();
525 (
526 u32::from_le_bytes(off_bytes) as usize,
527 u32::from_le_bytes(len_bytes) as usize,
528 )
529}
530
531#[derive(Debug)]
536pub struct MutableEventRow<'a> {
537 data: &'a mut [u8],
538 schema: &'a RowSchema,
539 var_offset: usize,
540}
541
542impl<'a> MutableEventRow<'a> {
543 #[allow(clippy::cast_possible_truncation)]
548 pub fn new_in(arena: &'a Bump, schema: &'a RowSchema, var_capacity: usize) -> Self {
549 let total = schema.min_row_size() + var_capacity;
550 let data: &'a mut [u8] = arena.alloc_slice_fill_default(total);
551
552 let field_count = schema.field_count() as u16;
554 data[0..2].copy_from_slice(&field_count.to_le_bytes());
555 let var_start = schema.min_row_size() as u32;
557 data[4..8].copy_from_slice(&var_start.to_le_bytes());
558
559 Self {
560 data,
561 schema,
562 var_offset: schema.min_row_size(),
563 }
564 }
565
566 #[inline]
570 pub fn set_null(&mut self, field_idx: usize, is_null: bool) {
571 let bit = self.schema.fields[field_idx].null_bit;
572 let byte_idx = HEADER_SIZE + bit / 8;
573 let bit_idx = bit % 8;
574 if is_null {
575 self.data[byte_idx] |= 1 << bit_idx;
576 } else {
577 self.data[byte_idx] &= !(1 << bit_idx);
578 }
579 }
580
581 #[inline]
583 pub fn set_bool(&mut self, field_idx: usize, value: bool) {
584 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Bool);
585 let offset = self.schema.fields[field_idx].offset;
586 self.data[offset] = u8::from(value);
587 }
588
589 #[inline]
591 pub fn set_i8(&mut self, field_idx: usize, value: i8) {
592 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int8);
593 let offset = self.schema.fields[field_idx].offset;
594 self.data[offset..=offset].copy_from_slice(&value.to_le_bytes());
595 }
596
597 #[inline]
599 pub fn set_i16(&mut self, field_idx: usize, value: i16) {
600 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int16);
601 let offset = self.schema.fields[field_idx].offset;
602 self.data[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
603 }
604
605 #[inline]
607 pub fn set_i32(&mut self, field_idx: usize, value: i32) {
608 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int32);
609 let offset = self.schema.fields[field_idx].offset;
610 self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
611 }
612
613 #[inline]
615 pub fn set_i64(&mut self, field_idx: usize, value: i64) {
616 let ft = self.schema.fields[field_idx].field_type;
617 debug_assert!(
618 ft == FieldType::Int64 || ft == FieldType::TimestampMicros,
619 "expected Int64 or TimestampMicros, got {ft:?}"
620 );
621 let offset = self.schema.fields[field_idx].offset;
622 self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
623 }
624
625 #[inline]
627 pub fn set_u8(&mut self, field_idx: usize, value: u8) {
628 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt8);
629 self.data[self.schema.fields[field_idx].offset] = value;
630 }
631
632 #[inline]
634 pub fn set_u16(&mut self, field_idx: usize, value: u16) {
635 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt16);
636 let offset = self.schema.fields[field_idx].offset;
637 self.data[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
638 }
639
640 #[inline]
642 pub fn set_u32(&mut self, field_idx: usize, value: u32) {
643 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt32);
644 let offset = self.schema.fields[field_idx].offset;
645 self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
646 }
647
648 #[inline]
650 pub fn set_u64(&mut self, field_idx: usize, value: u64) {
651 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt64);
652 let offset = self.schema.fields[field_idx].offset;
653 self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
654 }
655
656 #[inline]
658 pub fn set_f32(&mut self, field_idx: usize, value: f32) {
659 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float32);
660 let offset = self.schema.fields[field_idx].offset;
661 self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
662 }
663
664 #[inline]
666 pub fn set_f64(&mut self, field_idx: usize, value: f64) {
667 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float64);
668 let offset = self.schema.fields[field_idx].offset;
669 self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
670 }
671
672 #[inline]
681 #[allow(clippy::cast_possible_truncation)]
682 pub fn set_str(&mut self, field_idx: usize, value: &str) {
683 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Utf8);
684 self.write_variable(field_idx, value.as_bytes());
685 }
686
687 #[inline]
696 #[allow(clippy::cast_possible_truncation)]
697 pub fn set_bytes(&mut self, field_idx: usize, value: &[u8]) {
698 debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Binary);
699 self.write_variable(field_idx, value);
700 }
701
702 #[inline]
706 #[must_use]
707 pub fn as_event_row(&self) -> EventRow<'_> {
708 EventRow {
709 data: &self.data[..self.var_offset],
710 schema: self.schema,
711 }
712 }
713
714 #[inline]
716 #[must_use]
717 pub fn freeze(self) -> EventRow<'a> {
718 let len = self.var_offset;
719 let data: &'a [u8] = self.data;
720 EventRow {
721 data: &data[..len],
722 schema: self.schema,
723 }
724 }
725
726 #[inline]
728 #[allow(clippy::cast_possible_truncation)]
729 fn write_variable(&mut self, field_idx: usize, value: &[u8]) {
730 let start = self.var_offset;
731 let len = value.len();
732 self.data[start..start + len].copy_from_slice(value);
733 self.var_offset += len;
734
735 let field_offset = self.schema.fields[field_idx].offset;
736 self.data[field_offset..field_offset + 4].copy_from_slice(&(start as u32).to_le_bytes());
737 self.data[field_offset + 4..field_offset + 8].copy_from_slice(&(len as u32).to_le_bytes());
738 }
739}
740
741#[cfg(test)]
742#[allow(
743 clippy::float_cmp,
744 clippy::approx_constant,
745 clippy::modulo_one,
746 clippy::cast_possible_truncation,
747 clippy::cast_sign_loss
748)]
749mod tests {
750 use super::*;
751 use arrow_schema::{DataType, Field, Schema, TimeUnit};
752 use std::sync::Arc;
753
754 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
755 Arc::new(Schema::new(
756 fields
757 .into_iter()
758 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
759 .collect::<Vec<_>>(),
760 ))
761 }
762
763 #[test]
766 fn schema_from_arrow_basic() {
767 let schema = make_schema(vec![
768 ("a", DataType::Int64, false),
769 ("b", DataType::Float64, true),
770 ("c", DataType::Boolean, false),
771 ]);
772 let rs = RowSchema::from_arrow(&schema).unwrap();
773 assert_eq!(rs.field_count(), 3);
774 assert_eq!(rs.field(0).field_type, FieldType::Int64);
775 assert_eq!(rs.field(1).field_type, FieldType::Float64);
776 assert_eq!(rs.field(2).field_type, FieldType::Bool);
777 }
778
779 #[test]
780 fn schema_from_arrow_all_types() {
781 let schema = make_schema(vec![
782 ("f0", DataType::Boolean, true),
783 ("f1", DataType::Int8, false),
784 ("f2", DataType::Int16, false),
785 ("f3", DataType::Int32, false),
786 ("f4", DataType::Int64, false),
787 ("f5", DataType::UInt8, false),
788 ("f6", DataType::UInt16, false),
789 ("f7", DataType::UInt32, false),
790 ("f8", DataType::UInt64, false),
791 ("f9", DataType::Float32, false),
792 ("f10", DataType::Float64, false),
793 (
794 "f11",
795 DataType::Timestamp(TimeUnit::Microsecond, None),
796 false,
797 ),
798 ("f12", DataType::Utf8, true),
799 ("f13", DataType::Binary, true),
800 ]);
801 let rs = RowSchema::from_arrow(&schema).unwrap();
802 assert_eq!(rs.field_count(), 14);
803 assert_eq!(rs.field(11).field_type, FieldType::TimestampMicros);
804 assert!(rs.field(12).is_variable);
805 assert!(rs.field(13).is_variable);
806 }
807
808 #[test]
809 fn schema_from_arrow_empty() {
810 let schema = make_schema(vec![]);
811 let rs = RowSchema::from_arrow(&schema).unwrap();
812 assert_eq!(rs.field_count(), 0);
813 assert_eq!(rs.min_row_size(), HEADER_SIZE);
814 assert_eq!(rs.null_bitmap_size(), 0);
815 }
816
817 #[test]
818 fn schema_from_arrow_unsupported() {
819 let schema = make_schema(vec![(
820 "bad",
821 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
822 false,
823 )]);
824 let err = RowSchema::from_arrow(&schema).unwrap_err();
825 assert!(err.to_string().contains("unsupported"));
826 }
827
828 #[test]
829 fn schema_null_bitmap_sizes() {
830 let s1 = make_schema(vec![("a", DataType::Int64, false)]);
832 assert_eq!(RowSchema::from_arrow(&s1).unwrap().null_bitmap_size(), 8);
833
834 let s8 = make_schema(
836 (0..8)
837 .map(|i| {
838 (
839 Box::leak(format!("f{i}").into_boxed_str()) as &str,
840 DataType::Int32,
841 false,
842 )
843 })
844 .collect(),
845 );
846 assert_eq!(RowSchema::from_arrow(&s8).unwrap().null_bitmap_size(), 8);
847
848 let s9 = make_schema(
850 (0..9)
851 .map(|i| {
852 (
853 Box::leak(format!("f{i}").into_boxed_str()) as &str,
854 DataType::Int32,
855 false,
856 )
857 })
858 .collect(),
859 );
860 assert_eq!(RowSchema::from_arrow(&s9).unwrap().null_bitmap_size(), 8);
861
862 let s65 = make_schema(
864 (0..65)
865 .map(|i| {
866 (
867 Box::leak(format!("f{i}").into_boxed_str()) as &str,
868 DataType::Int32,
869 false,
870 )
871 })
872 .collect(),
873 );
874 assert_eq!(RowSchema::from_arrow(&s65).unwrap().null_bitmap_size(), 16);
875 }
876
877 #[test]
878 fn schema_field_alignment() {
879 let schema = make_schema(vec![
881 ("flag", DataType::Boolean, false),
882 ("ts", DataType::Int64, false),
883 ]);
884 let rs = RowSchema::from_arrow(&schema).unwrap();
885 let bool_off = rs.field(0).offset;
888 let i64_off = rs.field(1).offset;
889 assert_eq!(bool_off % 1, 0); assert_eq!(i64_off % 8, 0); assert!(i64_off > bool_off);
892 }
893
894 #[test]
895 fn schema_mixed_fixed_variable() {
896 let schema = make_schema(vec![
897 ("id", DataType::Int64, false),
898 ("name", DataType::Utf8, true),
899 ("score", DataType::Float64, false),
900 ("data", DataType::Binary, true),
901 ]);
902 let rs = RowSchema::from_arrow(&schema).unwrap();
903 assert_eq!(rs.field_count(), 4);
904 assert!(!rs.field(0).is_variable);
905 assert!(rs.field(1).is_variable);
906 assert_eq!(rs.field(1).size, 8); assert!(!rs.field(2).is_variable);
908 assert!(rs.field(3).is_variable);
909 }
910
911 #[test]
912 fn schema_var_region_offset_header() {
913 let schema = make_schema(vec![
914 ("a", DataType::Int64, false),
915 ("b", DataType::Utf8, true),
916 ]);
917 let rs = RowSchema::from_arrow(&schema).unwrap();
918 assert!(rs.min_row_size() > rs.fixed_region_offset());
920 }
921
922 #[test]
925 fn roundtrip_bool() {
926 let schema = make_schema(vec![("flag", DataType::Boolean, false)]);
927 let rs = RowSchema::from_arrow(&schema).unwrap();
928 let arena = Bump::new();
929 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
930 row.set_bool(0, true);
931 let row = row.freeze();
932 assert!(row.get_bool(0));
933 }
934
935 #[test]
936 fn roundtrip_integers() {
937 let schema = make_schema(vec![
938 ("a", DataType::Int8, false),
939 ("b", DataType::Int16, false),
940 ("c", DataType::Int32, false),
941 ("d", DataType::Int64, false),
942 ]);
943 let rs = RowSchema::from_arrow(&schema).unwrap();
944 let arena = Bump::new();
945 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
946 row.set_i8(0, -42);
947 row.set_i16(1, -1000);
948 row.set_i32(2, 100_000);
949 row.set_i64(3, i64::MAX);
950 let row = row.freeze();
951 assert_eq!(row.get_i8(0), -42);
952 assert_eq!(row.get_i16(1), -1000);
953 assert_eq!(row.get_i32(2), 100_000);
954 assert_eq!(row.get_i64(3), i64::MAX);
955 }
956
957 #[test]
958 fn roundtrip_unsigned() {
959 let schema = make_schema(vec![
960 ("a", DataType::UInt8, false),
961 ("b", DataType::UInt16, false),
962 ("c", DataType::UInt32, false),
963 ("d", DataType::UInt64, false),
964 ]);
965 let rs = RowSchema::from_arrow(&schema).unwrap();
966 let arena = Bump::new();
967 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
968 row.set_u8(0, 255);
969 row.set_u16(1, 60_000);
970 row.set_u32(2, u32::MAX);
971 row.set_u64(3, u64::MAX);
972 let row = row.freeze();
973 assert_eq!(row.get_u8(0), 255);
974 assert_eq!(row.get_u16(1), 60_000);
975 assert_eq!(row.get_u32(2), u32::MAX);
976 assert_eq!(row.get_u64(3), u64::MAX);
977 }
978
979 #[test]
980 fn roundtrip_floats() {
981 let schema = make_schema(vec![
982 ("a", DataType::Float32, false),
983 ("b", DataType::Float64, false),
984 ]);
985 let rs = RowSchema::from_arrow(&schema).unwrap();
986 let arena = Bump::new();
987 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
988 row.set_f32(0, std::f32::consts::PI);
989 row.set_f64(1, std::f64::consts::E);
990 let row = row.freeze();
991 assert!((row.get_f32(0) - std::f32::consts::PI).abs() < f32::EPSILON);
992 assert!((row.get_f64(1) - std::f64::consts::E).abs() < f64::EPSILON);
993 }
994
995 #[test]
996 fn roundtrip_timestamp() {
997 let schema = make_schema(vec![(
998 "ts",
999 DataType::Timestamp(TimeUnit::Microsecond, None),
1000 false,
1001 )]);
1002 let rs = RowSchema::from_arrow(&schema).unwrap();
1003 let arena = Bump::new();
1004 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1005 let ts = 1_706_000_000_000_000_i64;
1006 row.set_i64(0, ts);
1007 let row = row.freeze();
1008 assert_eq!(row.get_i64(0), ts);
1009 assert_eq!(row.timestamp(), ts);
1010 }
1011
1012 #[test]
1013 fn roundtrip_str() {
1014 let schema = make_schema(vec![("name", DataType::Utf8, true)]);
1015 let rs = RowSchema::from_arrow(&schema).unwrap();
1016 let arena = Bump::new();
1017 let mut row = MutableEventRow::new_in(&arena, &rs, 256);
1018 row.set_str(0, "hello world");
1019 let row = row.freeze();
1020 assert_eq!(row.get_str(0), "hello world");
1021 }
1022
1023 #[test]
1024 fn roundtrip_bytes() {
1025 let schema = make_schema(vec![("data", DataType::Binary, true)]);
1026 let rs = RowSchema::from_arrow(&schema).unwrap();
1027 let arena = Bump::new();
1028 let mut row = MutableEventRow::new_in(&arena, &rs, 256);
1029 row.set_bytes(0, &[0xDE, 0xAD, 0xBE, 0xEF]);
1030 let row = row.freeze();
1031 assert_eq!(row.get_bytes(0), &[0xDE, 0xAD, 0xBE, 0xEF]);
1032 }
1033
1034 #[test]
1035 fn roundtrip_all_types() {
1036 let schema = make_schema(vec![
1037 (
1038 "ts",
1039 DataType::Timestamp(TimeUnit::Microsecond, None),
1040 false,
1041 ),
1042 ("flag", DataType::Boolean, false),
1043 ("i8", DataType::Int8, false),
1044 ("i16", DataType::Int16, false),
1045 ("i32", DataType::Int32, false),
1046 ("i64", DataType::Int64, false),
1047 ("u8", DataType::UInt8, false),
1048 ("u16", DataType::UInt16, false),
1049 ("u32", DataType::UInt32, false),
1050 ("u64", DataType::UInt64, false),
1051 ("f32", DataType::Float32, false),
1052 ("f64", DataType::Float64, false),
1053 ("name", DataType::Utf8, true),
1054 ("data", DataType::Binary, true),
1055 ]);
1056 let rs = RowSchema::from_arrow(&schema).unwrap();
1057 let arena = Bump::new();
1058 let mut row = MutableEventRow::new_in(&arena, &rs, 512);
1059
1060 row.set_i64(0, 999_999);
1061 row.set_bool(1, true);
1062 row.set_i8(2, -1);
1063 row.set_i16(3, 1000);
1064 row.set_i32(4, -50_000);
1065 row.set_i64(5, 42);
1066 row.set_u8(6, 200);
1067 row.set_u16(7, 50_000);
1068 row.set_u32(8, 3_000_000);
1069 row.set_u64(9, 18_000_000_000);
1070 row.set_f32(10, 1.5);
1071 row.set_f64(11, 2.718);
1072 row.set_str(12, "test");
1073 row.set_bytes(13, &[1, 2, 3]);
1074
1075 let row = row.freeze();
1076
1077 assert_eq!(row.timestamp(), 999_999);
1078 assert!(row.get_bool(1));
1079 assert_eq!(row.get_i8(2), -1);
1080 assert_eq!(row.get_i16(3), 1000);
1081 assert_eq!(row.get_i32(4), -50_000);
1082 assert_eq!(row.get_i64(5), 42);
1083 assert_eq!(row.get_u8(6), 200);
1084 assert_eq!(row.get_u16(7), 50_000);
1085 assert_eq!(row.get_u32(8), 3_000_000);
1086 assert_eq!(row.get_u64(9), 18_000_000_000);
1087 assert!((row.get_f32(10) - 1.5).abs() < f32::EPSILON);
1088 assert!((row.get_f64(11) - 2.718).abs() < f64::EPSILON);
1089 assert_eq!(row.get_str(12), "test");
1090 assert_eq!(row.get_bytes(13), &[1, 2, 3]);
1091 }
1092
1093 #[test]
1094 fn null_bitmap_set_and_check() {
1095 let schema = make_schema(vec![
1096 ("a", DataType::Int64, true),
1097 ("b", DataType::Float64, true),
1098 ("c", DataType::Utf8, true),
1099 ]);
1100 let rs = RowSchema::from_arrow(&schema).unwrap();
1101 let arena = Bump::new();
1102 let mut row = MutableEventRow::new_in(&arena, &rs, 64);
1103
1104 let view = row.as_event_row();
1106 assert!(!view.is_null(0));
1107 assert!(!view.is_null(1));
1108 assert!(!view.is_null(2));
1109
1110 row.set_null(1, true);
1111 let view = row.as_event_row();
1112 assert!(!view.is_null(0));
1113 assert!(view.is_null(1));
1114 assert!(!view.is_null(2));
1115
1116 row.set_null(1, false);
1118 let view = row.as_event_row();
1119 assert!(!view.is_null(1));
1120 }
1121
1122 #[test]
1123 fn all_null_row() {
1124 let schema = make_schema(vec![
1125 ("a", DataType::Int64, true),
1126 ("b", DataType::Float64, true),
1127 ("c", DataType::Utf8, true),
1128 ]);
1129 let rs = RowSchema::from_arrow(&schema).unwrap();
1130 let arena = Bump::new();
1131 let mut row = MutableEventRow::new_in(&arena, &rs, 64);
1132
1133 for i in 0..rs.field_count() {
1134 row.set_null(i, true);
1135 }
1136 let row = row.freeze();
1137 for i in 0..rs.field_count() {
1138 assert!(row.is_null(i));
1139 }
1140 }
1141
1142 #[test]
1143 fn freeze_returns_correct_length() {
1144 let schema = make_schema(vec![
1145 ("id", DataType::Int64, false),
1146 ("name", DataType::Utf8, true),
1147 ]);
1148 let rs = RowSchema::from_arrow(&schema).unwrap();
1149 let arena = Bump::new();
1150 let mut row = MutableEventRow::new_in(&arena, &rs, 256);
1151 row.set_i64(0, 42);
1152 row.set_str(1, "hello");
1153 let frozen = row.freeze();
1154 assert_eq!(frozen.data().len(), rs.min_row_size() + 5);
1156 }
1157
1158 #[test]
1159 fn as_event_row_borrow() {
1160 let schema = make_schema(vec![("x", DataType::Int32, false)]);
1161 let rs = RowSchema::from_arrow(&schema).unwrap();
1162 let arena = Bump::new();
1163 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1164 row.set_i32(0, 77);
1165 assert_eq!(row.as_event_row().get_i32(0), 77);
1167 row.set_i32(0, 88);
1168 assert_eq!(row.as_event_row().get_i32(0), 88);
1169 }
1170
1171 #[test]
1172 fn field_ptr_access() {
1173 let schema = make_schema(vec![("val", DataType::Int64, false)]);
1174 let rs = RowSchema::from_arrow(&schema).unwrap();
1175 let arena = Bump::new();
1176 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1177 row.set_i64(0, 12345);
1178 let frozen = row.freeze();
1179 let ptr = frozen.field_ptr(0);
1180 let bytes = unsafe { std::slice::from_raw_parts(ptr, 8) };
1182 let val = i64::from_le_bytes(bytes.try_into().unwrap());
1183 assert_eq!(val, 12345);
1184 }
1185
1186 #[test]
1187 fn timestamp_convenience() {
1188 let schema = make_schema(vec![
1189 ("ts", DataType::Int64, false),
1190 ("val", DataType::Float64, false),
1191 ]);
1192 let rs = RowSchema::from_arrow(&schema).unwrap();
1193 let arena = Bump::new();
1194 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1195 row.set_i64(0, 42_000);
1196 row.set_f64(1, 1.0);
1197 let row = row.freeze();
1198 assert_eq!(row.timestamp(), 42_000);
1199 }
1200
1201 #[test]
1202 fn empty_string_roundtrip() {
1203 let schema = make_schema(vec![("s", DataType::Utf8, true)]);
1204 let rs = RowSchema::from_arrow(&schema).unwrap();
1205 let arena = Bump::new();
1206 let mut row = MutableEventRow::new_in(&arena, &rs, 64);
1207 row.set_str(0, "");
1208 let row = row.freeze();
1209 assert_eq!(row.get_str(0), "");
1210 }
1211
1212 #[test]
1213 fn large_binary_roundtrip() {
1214 let schema = make_schema(vec![("data", DataType::Binary, true)]);
1215 let rs = RowSchema::from_arrow(&schema).unwrap();
1216 let arena = Bump::new();
1217 let blob: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1218 let mut row = MutableEventRow::new_in(&arena, &rs, blob.len());
1219 row.set_bytes(0, &blob);
1220 let row = row.freeze();
1221 assert_eq!(row.get_bytes(0), &blob[..]);
1222 }
1223
1224 #[test]
1225 fn multiple_variable_fields() {
1226 let schema = make_schema(vec![
1227 ("a", DataType::Utf8, true),
1228 ("b", DataType::Binary, true),
1229 ("c", DataType::Utf8, true),
1230 ]);
1231 let rs = RowSchema::from_arrow(&schema).unwrap();
1232 let arena = Bump::new();
1233 let mut row = MutableEventRow::new_in(&arena, &rs, 512);
1234 row.set_str(0, "first");
1235 row.set_bytes(1, &[10, 20, 30]);
1236 row.set_str(2, "third");
1237 let row = row.freeze();
1238 assert_eq!(row.get_str(0), "first");
1239 assert_eq!(row.get_bytes(1), &[10, 20, 30]);
1240 assert_eq!(row.get_str(2), "third");
1241 }
1242
1243 #[test]
1244 fn roundtrip_max_values() {
1245 let schema = make_schema(vec![
1246 ("a", DataType::Int64, false),
1247 ("b", DataType::Float64, false),
1248 ("c", DataType::UInt64, false),
1249 ]);
1250 let rs = RowSchema::from_arrow(&schema).unwrap();
1251 let arena = Bump::new();
1252 let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1253 row.set_i64(0, i64::MIN);
1254 row.set_f64(1, f64::MAX);
1255 row.set_u64(2, u64::MAX);
1256 let row = row.freeze();
1257 assert_eq!(row.get_i64(0), i64::MIN);
1258 assert_eq!(row.get_f64(1), f64::MAX);
1259 assert_eq!(row.get_u64(2), u64::MAX);
1260 }
1261
1262 #[test]
1263 fn field_type_from_arrow_all() {
1264 assert_eq!(
1265 FieldType::from_arrow(&DataType::Boolean),
1266 Some(FieldType::Bool)
1267 );
1268 assert_eq!(
1269 FieldType::from_arrow(&DataType::Int8),
1270 Some(FieldType::Int8)
1271 );
1272 assert_eq!(
1273 FieldType::from_arrow(&DataType::Int16),
1274 Some(FieldType::Int16)
1275 );
1276 assert_eq!(
1277 FieldType::from_arrow(&DataType::Int32),
1278 Some(FieldType::Int32)
1279 );
1280 assert_eq!(
1281 FieldType::from_arrow(&DataType::Int64),
1282 Some(FieldType::Int64)
1283 );
1284 assert_eq!(
1285 FieldType::from_arrow(&DataType::UInt8),
1286 Some(FieldType::UInt8)
1287 );
1288 assert_eq!(
1289 FieldType::from_arrow(&DataType::UInt16),
1290 Some(FieldType::UInt16)
1291 );
1292 assert_eq!(
1293 FieldType::from_arrow(&DataType::UInt32),
1294 Some(FieldType::UInt32)
1295 );
1296 assert_eq!(
1297 FieldType::from_arrow(&DataType::UInt64),
1298 Some(FieldType::UInt64)
1299 );
1300 assert_eq!(
1301 FieldType::from_arrow(&DataType::Float32),
1302 Some(FieldType::Float32)
1303 );
1304 assert_eq!(
1305 FieldType::from_arrow(&DataType::Float64),
1306 Some(FieldType::Float64)
1307 );
1308 assert_eq!(
1309 FieldType::from_arrow(&DataType::Timestamp(TimeUnit::Microsecond, None)),
1310 Some(FieldType::TimestampMicros)
1311 );
1312 assert_eq!(
1313 FieldType::from_arrow(&DataType::Timestamp(
1314 TimeUnit::Microsecond,
1315 Some("UTC".into())
1316 )),
1317 Some(FieldType::TimestampMicros)
1318 );
1319 assert_eq!(
1320 FieldType::from_arrow(&DataType::Utf8),
1321 Some(FieldType::Utf8)
1322 );
1323 assert_eq!(
1324 FieldType::from_arrow(&DataType::Binary),
1325 Some(FieldType::Binary)
1326 );
1327 assert_eq!(FieldType::from_arrow(&DataType::Float16), None);
1329 assert_eq!(FieldType::from_arrow(&DataType::LargeUtf8), None);
1330 assert_eq!(
1331 FieldType::from_arrow(&DataType::Timestamp(TimeUnit::Nanosecond, None)),
1332 None
1333 );
1334 }
1335
1336 #[test]
1337 fn field_type_to_arrow_roundtrip() {
1338 let types = [
1339 FieldType::Bool,
1340 FieldType::Int8,
1341 FieldType::Int16,
1342 FieldType::Int32,
1343 FieldType::Int64,
1344 FieldType::UInt8,
1345 FieldType::UInt16,
1346 FieldType::UInt32,
1347 FieldType::UInt64,
1348 FieldType::Float32,
1349 FieldType::Float64,
1350 FieldType::TimestampMicros,
1351 FieldType::Utf8,
1352 FieldType::Binary,
1353 ];
1354 for ft in types {
1355 let arrow_dt = ft.to_arrow();
1356 let back = FieldType::from_arrow(&arrow_dt).unwrap();
1357 assert_eq!(ft, back, "roundtrip failed for {ft:?}");
1358 }
1359 }
1360}