Skip to main content

laminar_core/compiler/
row.rs

1//! Fixed-layout row format for compiled Ring 0 event processing.
2//!
3//! [`EventRow`] provides a row-oriented, fixed-layout in-memory format that compiled
4//! expressions operate on via direct pointer arithmetic. Unlike Arrow's columnar
5//! `RecordBatch` (optimal for batch processing in Ring 1), `EventRow` gives compiled
6//! Ring 0 pipelines direct pointer access to fields without column indirection.
7//!
8//! # Memory Layout
9//!
10//! ```text
11//! [header: 8 bytes][null_bitmap: padded to 8B][fixed region][variable data region]
12//! ```
13//!
14//! - **Header** (8 bytes): `[field_count: u16][flags: u16][var_region_offset: u32]`
15//! - **Null bitmap**: `ceil(n_fields / 8)` bytes, padded to 8-byte alignment
16//! - **Fixed region**: Fields in declaration order, naturally aligned
17//! - **Variable data region**: Actual bytes for `Utf8`/`Binary` fields
18//!
19//! # Example
20//!
21//! ```
22//! use std::sync::Arc;
23//! use arrow_schema::{DataType, Field, Schema};
24//! use bumpalo::Bump;
25//! use laminar_core::compiler::row::{RowSchema, MutableEventRow};
26//!
27//! let schema = Arc::new(Schema::new(vec![
28//!     Field::new("ts", DataType::Int64, false),
29//!     Field::new("value", DataType::Float64, true),
30//!     Field::new("name", DataType::Utf8, true),
31//! ]));
32//! let row_schema = RowSchema::from_arrow(&schema).unwrap();
33//!
34//! let arena = Bump::new();
35//! let mut row = MutableEventRow::new_in(&arena, &row_schema, 256);
36//! row.set_i64(0, 1_000_000);
37//! row.set_f64(1, 3.14);
38//! row.set_str(2, "hello");
39//!
40//! let row = row.freeze();
41//! assert_eq!(row.get_i64(0), 1_000_000);
42//! assert!((row.get_f64(1) - 3.14).abs() < f64::EPSILON);
43//! assert_eq!(row.get_str(2), "hello");
44//! ```
45
46use std::sync::Arc;
47
48use arrow_schema::{DataType, SchemaRef, TimeUnit};
49use bumpalo::Bump;
50
51/// Size of the row header in bytes.
52const HEADER_SIZE: usize = 8;
53
54/// Supported scalar types for compiled row access.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum FieldType {
57    /// Boolean (1 byte inline).
58    Bool,
59    /// Signed 8-bit integer.
60    Int8,
61    /// Signed 16-bit integer.
62    Int16,
63    /// Signed 32-bit integer.
64    Int32,
65    /// Signed 64-bit integer.
66    Int64,
67    /// Unsigned 8-bit integer.
68    UInt8,
69    /// Unsigned 16-bit integer.
70    UInt16,
71    /// Unsigned 32-bit integer.
72    UInt32,
73    /// Unsigned 64-bit integer.
74    UInt64,
75    /// 32-bit floating point.
76    Float32,
77    /// 64-bit floating point.
78    Float64,
79    /// Timestamp in microseconds since epoch (stored as `i64`).
80    TimestampMicros,
81    /// Variable-length UTF-8 string (8-byte inline: `u32` offset + `u32` length).
82    Utf8,
83    /// Variable-length binary data (8-byte inline: `u32` offset + `u32` length).
84    Binary,
85}
86
87impl FieldType {
88    /// Converts an Arrow [`DataType`] to a [`FieldType`].
89    ///
90    /// Returns `None` for unsupported types.
91    #[must_use]
92    pub fn from_arrow(dt: &DataType) -> Option<Self> {
93        match dt {
94            DataType::Boolean => Some(Self::Bool),
95            DataType::Int8 => Some(Self::Int8),
96            DataType::Int16 => Some(Self::Int16),
97            DataType::Int32 => Some(Self::Int32),
98            DataType::Int64 => Some(Self::Int64),
99            DataType::UInt8 => Some(Self::UInt8),
100            DataType::UInt16 => Some(Self::UInt16),
101            DataType::UInt32 => Some(Self::UInt32),
102            DataType::UInt64 => Some(Self::UInt64),
103            DataType::Float32 => Some(Self::Float32),
104            DataType::Float64 => Some(Self::Float64),
105            DataType::Timestamp(TimeUnit::Microsecond, _) => Some(Self::TimestampMicros),
106            DataType::Utf8 => Some(Self::Utf8),
107            DataType::Binary => Some(Self::Binary),
108            _ => None,
109        }
110    }
111
112    /// Converts back to the corresponding Arrow [`DataType`].
113    #[must_use]
114    pub fn to_arrow(self) -> DataType {
115        match self {
116            Self::Bool => DataType::Boolean,
117            Self::Int8 => DataType::Int8,
118            Self::Int16 => DataType::Int16,
119            Self::Int32 => DataType::Int32,
120            Self::Int64 => DataType::Int64,
121            Self::UInt8 => DataType::UInt8,
122            Self::UInt16 => DataType::UInt16,
123            Self::UInt32 => DataType::UInt32,
124            Self::UInt64 => DataType::UInt64,
125            Self::Float32 => DataType::Float32,
126            Self::Float64 => DataType::Float64,
127            Self::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
128            Self::Utf8 => DataType::Utf8,
129            Self::Binary => DataType::Binary,
130        }
131    }
132
133    /// Returns the inline byte size for this type.
134    ///
135    /// Fixed types return their native size. Variable-length types ([`Utf8`](Self::Utf8),
136    /// [`Binary`](Self::Binary)) return 8 (a `u32` offset + `u32` length pair).
137    #[must_use]
138    pub const fn inline_size(self) -> usize {
139        match self {
140            Self::Bool | Self::Int8 | Self::UInt8 => 1,
141            Self::Int16 | Self::UInt16 => 2,
142            Self::Int32 | Self::UInt32 | Self::Float32 => 4,
143            Self::Int64
144            | Self::UInt64
145            | Self::Float64
146            | Self::TimestampMicros
147            | Self::Utf8
148            | Self::Binary => 8,
149        }
150    }
151
152    /// Returns the natural alignment requirement in bytes.
153    #[must_use]
154    pub const fn alignment(self) -> usize {
155        match self {
156            Self::Bool | Self::Int8 | Self::UInt8 => 1,
157            Self::Int16 | Self::UInt16 => 2,
158            // Inline slot for Utf8/Binary contains two u32 values, so 4-byte alignment suffices.
159            Self::Int32 | Self::UInt32 | Self::Float32 | Self::Utf8 | Self::Binary => 4,
160            Self::Int64 | Self::UInt64 | Self::Float64 | Self::TimestampMicros => 8,
161        }
162    }
163
164    /// Returns `true` if this type stores data in the variable region.
165    #[must_use]
166    pub const fn is_variable(self) -> bool {
167        matches!(self, Self::Utf8 | Self::Binary)
168    }
169}
170
171/// Per-field layout descriptor: absolute byte offset, size, null bitmap position.
172#[derive(Debug, Clone, Copy)]
173pub struct FieldLayout {
174    /// The scalar type of this field.
175    pub field_type: FieldType,
176    /// Absolute byte offset from the row start to this field's inline data.
177    pub offset: usize,
178    /// Inline size in bytes.
179    pub size: usize,
180    /// Bit index in the null bitmap (0-based).
181    pub null_bit: usize,
182    /// Whether this field stores data in the variable region.
183    pub is_variable: bool,
184}
185
186/// Errors from row schema construction.
187#[derive(Debug, thiserror::Error)]
188pub enum RowError {
189    /// An Arrow field uses a data type not supported by the row format.
190    #[error("unsupported Arrow data type for field '{name}': {data_type}")]
191    UnsupportedType {
192        /// Name of the unsupported field.
193        name: String,
194        /// The unsupported Arrow data type.
195        data_type: DataType,
196    },
197}
198
199/// Pre-computed row layout derived from an Arrow schema.
200///
201/// Converts an Arrow [`SchemaRef`] into a fixed layout suitable for compiled
202/// field access via pointer arithmetic. The layout is computed once and reused
203/// across all rows sharing the same schema.
204#[derive(Debug, Clone)]
205pub struct RowSchema {
206    fields: Vec<FieldLayout>,
207    arrow_schema: SchemaRef,
208    null_bitmap_size: usize,
209    fixed_region_offset: usize,
210    min_row_size: usize,
211}
212
213impl RowSchema {
214    /// Creates a [`RowSchema`] from an Arrow schema.
215    ///
216    /// # Errors
217    ///
218    /// Returns [`RowError::UnsupportedType`] if any field has an unsupported [`DataType`].
219    pub fn from_arrow(schema: &SchemaRef) -> Result<Self, RowError> {
220        let field_count = schema.fields().len();
221        let null_bitmap_bytes = field_count.div_ceil(8);
222        // Pad null bitmap to 8-byte alignment so the fixed region starts aligned.
223        let padded_bitmap = (null_bitmap_bytes + 7) & !7;
224        let fixed_region_offset = HEADER_SIZE + padded_bitmap;
225
226        let mut fields = Vec::with_capacity(field_count);
227        let mut current_offset = fixed_region_offset;
228
229        for (i, arrow_field) in schema.fields().iter().enumerate() {
230            let field_type = FieldType::from_arrow(arrow_field.data_type()).ok_or_else(|| {
231                RowError::UnsupportedType {
232                    name: arrow_field.name().clone(),
233                    data_type: arrow_field.data_type().clone(),
234                }
235            })?;
236
237            let align = field_type.alignment();
238            current_offset = (current_offset + align - 1) & !(align - 1);
239
240            fields.push(FieldLayout {
241                field_type,
242                offset: current_offset,
243                size: field_type.inline_size(),
244                null_bit: i,
245                is_variable: field_type.is_variable(),
246            });
247
248            current_offset += field_type.inline_size();
249        }
250
251        Ok(Self {
252            fields,
253            arrow_schema: Arc::clone(schema),
254            null_bitmap_size: padded_bitmap,
255            fixed_region_offset,
256            min_row_size: current_offset,
257        })
258    }
259
260    /// Returns the header size in bytes (always 8).
261    #[must_use]
262    pub const fn header_size() -> usize {
263        HEADER_SIZE
264    }
265
266    /// Returns the padded null bitmap size in bytes.
267    #[must_use]
268    pub fn null_bitmap_size(&self) -> usize {
269        self.null_bitmap_size
270    }
271
272    /// Returns the byte offset where the fixed data region begins.
273    #[must_use]
274    pub fn fixed_region_offset(&self) -> usize {
275        self.fixed_region_offset
276    }
277
278    /// Returns the minimum row size in bytes (header + bitmap + fixed region, no variable data).
279    #[must_use]
280    pub fn min_row_size(&self) -> usize {
281        self.min_row_size
282    }
283
284    /// Returns the number of fields in the schema.
285    #[must_use]
286    pub fn field_count(&self) -> usize {
287        self.fields.len()
288    }
289
290    /// Returns the layout for the field at the given index.
291    ///
292    /// # Panics
293    ///
294    /// Panics if `idx >= field_count()`.
295    #[must_use]
296    pub fn field(&self, idx: usize) -> &FieldLayout {
297        &self.fields[idx]
298    }
299
300    /// Returns all field layouts.
301    #[must_use]
302    pub fn fields(&self) -> &[FieldLayout] {
303        &self.fields
304    }
305
306    /// Returns the Arrow schema this row layout was derived from.
307    #[must_use]
308    pub fn arrow_schema(&self) -> &SchemaRef {
309        &self.arrow_schema
310    }
311}
312
313/// Zero-copy, read-only accessor for a fixed-layout event row.
314///
315/// All field accessors are `#[inline]` for compiled-code performance.
316/// Type correctness is validated with `debug_assert` (zero overhead in release builds).
317#[derive(Debug)]
318pub struct EventRow<'a> {
319    data: &'a [u8],
320    schema: &'a RowSchema,
321}
322
323#[allow(clippy::missing_panics_doc)]
324impl<'a> EventRow<'a> {
325    /// Creates an [`EventRow`] from a byte slice and schema.
326    ///
327    /// # Panics
328    ///
329    /// Debug-asserts that `data.len() >= schema.min_row_size()`.
330    #[inline]
331    #[must_use]
332    pub fn new(data: &'a [u8], schema: &'a RowSchema) -> Self {
333        debug_assert!(
334            data.len() >= schema.min_row_size(),
335            "data too short: {} < {}",
336            data.len(),
337            schema.min_row_size()
338        );
339        Self { data, schema }
340    }
341
342    /// Returns the underlying byte slice.
343    #[inline]
344    #[must_use]
345    pub fn data(&self) -> &'a [u8] {
346        self.data
347    }
348
349    /// Returns the schema.
350    #[inline]
351    #[must_use]
352    pub fn schema(&self) -> &'a RowSchema {
353        self.schema
354    }
355
356    /// Returns `true` if the field at `field_idx` is null.
357    ///
358    /// Null bitmap convention: bit = 1 means null, bit = 0 means valid.
359    #[inline]
360    #[must_use]
361    pub fn is_null(&self, field_idx: usize) -> bool {
362        let bit = self.schema.fields[field_idx].null_bit;
363        let byte_idx = HEADER_SIZE + bit / 8;
364        let bit_idx = bit % 8;
365        (self.data[byte_idx] & (1 << bit_idx)) != 0
366    }
367
368    /// Reads a `bool` field.
369    #[inline]
370    #[must_use]
371    pub fn get_bool(&self, field_idx: usize) -> bool {
372        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Bool);
373        let offset = self.schema.fields[field_idx].offset;
374        self.data[offset] != 0
375    }
376
377    /// Reads an `i8` field.
378    #[inline]
379    #[must_use]
380    pub fn get_i8(&self, field_idx: usize) -> i8 {
381        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int8);
382        let offset = self.schema.fields[field_idx].offset;
383        i8::from_le_bytes([self.data[offset]])
384    }
385
386    /// Reads an `i16` field.
387    #[inline]
388    #[must_use]
389    pub fn get_i16(&self, field_idx: usize) -> i16 {
390        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int16);
391        let offset = self.schema.fields[field_idx].offset;
392        i16::from_le_bytes([self.data[offset], self.data[offset + 1]])
393    }
394
395    /// Reads an `i32` field.
396    #[inline]
397    #[must_use]
398    pub fn get_i32(&self, field_idx: usize) -> i32 {
399        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int32);
400        let offset = self.schema.fields[field_idx].offset;
401        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().unwrap();
402        i32::from_le_bytes(bytes)
403    }
404
405    /// Reads an `i64` field. Also accepts [`FieldType::TimestampMicros`].
406    #[inline]
407    #[must_use]
408    pub fn get_i64(&self, field_idx: usize) -> i64 {
409        let ft = self.schema.fields[field_idx].field_type;
410        debug_assert!(
411            ft == FieldType::Int64 || ft == FieldType::TimestampMicros,
412            "expected Int64 or TimestampMicros, got {ft:?}"
413        );
414        let offset = self.schema.fields[field_idx].offset;
415        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().unwrap();
416        i64::from_le_bytes(bytes)
417    }
418
419    /// Reads a `u8` field.
420    #[inline]
421    #[must_use]
422    pub fn get_u8(&self, field_idx: usize) -> u8 {
423        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt8);
424        self.data[self.schema.fields[field_idx].offset]
425    }
426
427    /// Reads a `u16` field.
428    #[inline]
429    #[must_use]
430    pub fn get_u16(&self, field_idx: usize) -> u16 {
431        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt16);
432        let offset = self.schema.fields[field_idx].offset;
433        u16::from_le_bytes([self.data[offset], self.data[offset + 1]])
434    }
435
436    /// Reads a `u32` field.
437    #[inline]
438    #[must_use]
439    pub fn get_u32(&self, field_idx: usize) -> u32 {
440        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt32);
441        let offset = self.schema.fields[field_idx].offset;
442        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().unwrap();
443        u32::from_le_bytes(bytes)
444    }
445
446    /// Reads a `u64` field.
447    #[inline]
448    #[must_use]
449    pub fn get_u64(&self, field_idx: usize) -> u64 {
450        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt64);
451        let offset = self.schema.fields[field_idx].offset;
452        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().unwrap();
453        u64::from_le_bytes(bytes)
454    }
455
456    /// Reads an `f32` field.
457    #[inline]
458    #[must_use]
459    pub fn get_f32(&self, field_idx: usize) -> f32 {
460        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float32);
461        let offset = self.schema.fields[field_idx].offset;
462        let bytes: [u8; 4] = self.data[offset..offset + 4].try_into().unwrap();
463        f32::from_le_bytes(bytes)
464    }
465
466    /// Reads an `f64` field.
467    #[inline]
468    #[must_use]
469    pub fn get_f64(&self, field_idx: usize) -> f64 {
470        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float64);
471        let offset = self.schema.fields[field_idx].offset;
472        let bytes: [u8; 8] = self.data[offset..offset + 8].try_into().unwrap();
473        f64::from_le_bytes(bytes)
474    }
475
476    /// Reads a variable-length UTF-8 string field.
477    #[inline]
478    #[must_use]
479    pub fn get_str(&self, field_idx: usize) -> &'a str {
480        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Utf8);
481        let offset = self.schema.fields[field_idx].offset;
482        let (var_off, var_len) = read_var_descriptor(self.data, offset);
483        if var_len == 0 {
484            return "";
485        }
486        // SAFETY: Data should be valid UTF-8 by construction. If it isn't,
487        // return a replacement rather than panicking on the hot path.
488        std::str::from_utf8(&self.data[var_off..var_off + var_len]).unwrap_or("<invalid-utf8>")
489    }
490
491    /// Reads a variable-length binary field.
492    #[inline]
493    #[must_use]
494    pub fn get_bytes(&self, field_idx: usize) -> &'a [u8] {
495        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Binary);
496        let offset = self.schema.fields[field_idx].offset;
497        let (var_off, var_len) = read_var_descriptor(self.data, offset);
498        if var_len == 0 {
499            return &[];
500        }
501        &self.data[var_off..var_off + var_len]
502    }
503
504    /// Returns a raw pointer to the field's inline data for JIT code.
505    #[inline]
506    #[must_use]
507    pub fn field_ptr(&self, field_idx: usize) -> *const u8 {
508        let offset = self.schema.fields[field_idx].offset;
509        self.data[offset..].as_ptr()
510    }
511
512    /// Convenience: reads field 0 as `i64` (convention: field 0 is event time).
513    #[inline]
514    #[must_use]
515    pub fn timestamp(&self) -> i64 {
516        self.get_i64(0)
517    }
518}
519
520/// Reads the `(offset, length)` variable-region descriptor at the given byte offset.
521#[inline]
522fn read_var_descriptor(data: &[u8], offset: usize) -> (usize, usize) {
523    let off_bytes: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
524    let len_bytes: [u8; 4] = data[offset + 4..offset + 8].try_into().unwrap();
525    (
526        u32::from_le_bytes(off_bytes) as usize,
527        u32::from_le_bytes(len_bytes) as usize,
528    )
529}
530
531/// Arena-allocated, mutable writer for constructing an [`EventRow`].
532///
533/// The backing buffer is allocated from a [`bumpalo::Bump`] arena, ensuring
534/// zero heap allocations on the hot path.
535#[derive(Debug)]
536pub struct MutableEventRow<'a> {
537    data: &'a mut [u8],
538    schema: &'a RowSchema,
539    var_offset: usize,
540}
541
542impl<'a> MutableEventRow<'a> {
543    /// Allocates a new row from the given arena.
544    ///
545    /// `var_capacity` is the number of extra bytes reserved for variable-length data
546    /// (strings, binary). The total allocation is `schema.min_row_size() + var_capacity`.
547    #[allow(clippy::cast_possible_truncation)]
548    pub fn new_in(arena: &'a Bump, schema: &'a RowSchema, var_capacity: usize) -> Self {
549        let total = schema.min_row_size() + var_capacity;
550        let data: &'a mut [u8] = arena.alloc_slice_fill_default(total);
551
552        // Write header: [field_count: u16][flags: u16][var_region_offset: u32]
553        let field_count = schema.field_count() as u16;
554        data[0..2].copy_from_slice(&field_count.to_le_bytes());
555        // flags = 0 (already zeroed)
556        let var_start = schema.min_row_size() as u32;
557        data[4..8].copy_from_slice(&var_start.to_le_bytes());
558
559        Self {
560            data,
561            schema,
562            var_offset: schema.min_row_size(),
563        }
564    }
565
566    /// Sets or clears the null flag for a field.
567    ///
568    /// When `is_null` is `true`, the field is marked as null in the bitmap.
569    #[inline]
570    pub fn set_null(&mut self, field_idx: usize, is_null: bool) {
571        let bit = self.schema.fields[field_idx].null_bit;
572        let byte_idx = HEADER_SIZE + bit / 8;
573        let bit_idx = bit % 8;
574        if is_null {
575            self.data[byte_idx] |= 1 << bit_idx;
576        } else {
577            self.data[byte_idx] &= !(1 << bit_idx);
578        }
579    }
580
581    /// Writes a `bool` field.
582    #[inline]
583    pub fn set_bool(&mut self, field_idx: usize, value: bool) {
584        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Bool);
585        let offset = self.schema.fields[field_idx].offset;
586        self.data[offset] = u8::from(value);
587    }
588
589    /// Writes an `i8` field.
590    #[inline]
591    pub fn set_i8(&mut self, field_idx: usize, value: i8) {
592        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int8);
593        let offset = self.schema.fields[field_idx].offset;
594        self.data[offset..=offset].copy_from_slice(&value.to_le_bytes());
595    }
596
597    /// Writes an `i16` field.
598    #[inline]
599    pub fn set_i16(&mut self, field_idx: usize, value: i16) {
600        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int16);
601        let offset = self.schema.fields[field_idx].offset;
602        self.data[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
603    }
604
605    /// Writes an `i32` field.
606    #[inline]
607    pub fn set_i32(&mut self, field_idx: usize, value: i32) {
608        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Int32);
609        let offset = self.schema.fields[field_idx].offset;
610        self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
611    }
612
613    /// Writes an `i64` field. Also accepts [`FieldType::TimestampMicros`].
614    #[inline]
615    pub fn set_i64(&mut self, field_idx: usize, value: i64) {
616        let ft = self.schema.fields[field_idx].field_type;
617        debug_assert!(
618            ft == FieldType::Int64 || ft == FieldType::TimestampMicros,
619            "expected Int64 or TimestampMicros, got {ft:?}"
620        );
621        let offset = self.schema.fields[field_idx].offset;
622        self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
623    }
624
625    /// Writes a `u8` field.
626    #[inline]
627    pub fn set_u8(&mut self, field_idx: usize, value: u8) {
628        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt8);
629        self.data[self.schema.fields[field_idx].offset] = value;
630    }
631
632    /// Writes a `u16` field.
633    #[inline]
634    pub fn set_u16(&mut self, field_idx: usize, value: u16) {
635        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt16);
636        let offset = self.schema.fields[field_idx].offset;
637        self.data[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
638    }
639
640    /// Writes a `u32` field.
641    #[inline]
642    pub fn set_u32(&mut self, field_idx: usize, value: u32) {
643        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt32);
644        let offset = self.schema.fields[field_idx].offset;
645        self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
646    }
647
648    /// Writes a `u64` field.
649    #[inline]
650    pub fn set_u64(&mut self, field_idx: usize, value: u64) {
651        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::UInt64);
652        let offset = self.schema.fields[field_idx].offset;
653        self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
654    }
655
656    /// Writes an `f32` field.
657    #[inline]
658    pub fn set_f32(&mut self, field_idx: usize, value: f32) {
659        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float32);
660        let offset = self.schema.fields[field_idx].offset;
661        self.data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
662    }
663
664    /// Writes an `f64` field.
665    #[inline]
666    pub fn set_f64(&mut self, field_idx: usize, value: f64) {
667        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Float64);
668        let offset = self.schema.fields[field_idx].offset;
669        self.data[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
670    }
671
672    /// Writes a variable-length UTF-8 string field.
673    ///
674    /// Appends the string bytes to the variable region and writes the
675    /// `(offset, length)` descriptor pair in the field's inline slot.
676    ///
677    /// # Panics
678    ///
679    /// Panics if the variable region overflows the allocated capacity.
680    #[inline]
681    #[allow(clippy::cast_possible_truncation)]
682    pub fn set_str(&mut self, field_idx: usize, value: &str) {
683        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Utf8);
684        self.write_variable(field_idx, value.as_bytes());
685    }
686
687    /// Writes a variable-length binary field.
688    ///
689    /// Appends the bytes to the variable region and writes the
690    /// `(offset, length)` descriptor pair in the field's inline slot.
691    ///
692    /// # Panics
693    ///
694    /// Panics if the variable region overflows the allocated capacity.
695    #[inline]
696    #[allow(clippy::cast_possible_truncation)]
697    pub fn set_bytes(&mut self, field_idx: usize, value: &[u8]) {
698        debug_assert_eq!(self.schema.fields[field_idx].field_type, FieldType::Binary);
699        self.write_variable(field_idx, value);
700    }
701
702    /// Borrows the current row state as a read-only [`EventRow`].
703    ///
704    /// The returned row's lifetime is tied to the borrow of `self`.
705    #[inline]
706    #[must_use]
707    pub fn as_event_row(&self) -> EventRow<'_> {
708        EventRow {
709            data: &self.data[..self.var_offset],
710            schema: self.schema,
711        }
712    }
713
714    /// Consumes the mutable row, returning a read-only [`EventRow`] with the arena lifetime.
715    #[inline]
716    #[must_use]
717    pub fn freeze(self) -> EventRow<'a> {
718        let len = self.var_offset;
719        let data: &'a [u8] = self.data;
720        EventRow {
721            data: &data[..len],
722            schema: self.schema,
723        }
724    }
725
726    /// Internal: appends variable-length bytes and writes the inline descriptor.
727    #[inline]
728    #[allow(clippy::cast_possible_truncation)]
729    fn write_variable(&mut self, field_idx: usize, value: &[u8]) {
730        let start = self.var_offset;
731        let len = value.len();
732        self.data[start..start + len].copy_from_slice(value);
733        self.var_offset += len;
734
735        let field_offset = self.schema.fields[field_idx].offset;
736        self.data[field_offset..field_offset + 4].copy_from_slice(&(start as u32).to_le_bytes());
737        self.data[field_offset + 4..field_offset + 8].copy_from_slice(&(len as u32).to_le_bytes());
738    }
739}
740
741#[cfg(test)]
742#[allow(
743    clippy::float_cmp,
744    clippy::approx_constant,
745    clippy::modulo_one,
746    clippy::cast_possible_truncation,
747    clippy::cast_sign_loss
748)]
749mod tests {
750    use super::*;
751    use arrow_schema::{DataType, Field, Schema, TimeUnit};
752    use std::sync::Arc;
753
754    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
755        Arc::new(Schema::new(
756            fields
757                .into_iter()
758                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
759                .collect::<Vec<_>>(),
760        ))
761    }
762
763    // ---- RowSchema tests ----
764
765    #[test]
766    fn schema_from_arrow_basic() {
767        let schema = make_schema(vec![
768            ("a", DataType::Int64, false),
769            ("b", DataType::Float64, true),
770            ("c", DataType::Boolean, false),
771        ]);
772        let rs = RowSchema::from_arrow(&schema).unwrap();
773        assert_eq!(rs.field_count(), 3);
774        assert_eq!(rs.field(0).field_type, FieldType::Int64);
775        assert_eq!(rs.field(1).field_type, FieldType::Float64);
776        assert_eq!(rs.field(2).field_type, FieldType::Bool);
777    }
778
779    #[test]
780    fn schema_from_arrow_all_types() {
781        let schema = make_schema(vec![
782            ("f0", DataType::Boolean, true),
783            ("f1", DataType::Int8, false),
784            ("f2", DataType::Int16, false),
785            ("f3", DataType::Int32, false),
786            ("f4", DataType::Int64, false),
787            ("f5", DataType::UInt8, false),
788            ("f6", DataType::UInt16, false),
789            ("f7", DataType::UInt32, false),
790            ("f8", DataType::UInt64, false),
791            ("f9", DataType::Float32, false),
792            ("f10", DataType::Float64, false),
793            (
794                "f11",
795                DataType::Timestamp(TimeUnit::Microsecond, None),
796                false,
797            ),
798            ("f12", DataType::Utf8, true),
799            ("f13", DataType::Binary, true),
800        ]);
801        let rs = RowSchema::from_arrow(&schema).unwrap();
802        assert_eq!(rs.field_count(), 14);
803        assert_eq!(rs.field(11).field_type, FieldType::TimestampMicros);
804        assert!(rs.field(12).is_variable);
805        assert!(rs.field(13).is_variable);
806    }
807
808    #[test]
809    fn schema_from_arrow_empty() {
810        let schema = make_schema(vec![]);
811        let rs = RowSchema::from_arrow(&schema).unwrap();
812        assert_eq!(rs.field_count(), 0);
813        assert_eq!(rs.min_row_size(), HEADER_SIZE);
814        assert_eq!(rs.null_bitmap_size(), 0);
815    }
816
817    #[test]
818    fn schema_from_arrow_unsupported() {
819        let schema = make_schema(vec![(
820            "bad",
821            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
822            false,
823        )]);
824        let err = RowSchema::from_arrow(&schema).unwrap_err();
825        assert!(err.to_string().contains("unsupported"));
826    }
827
828    #[test]
829    fn schema_null_bitmap_sizes() {
830        // 1 field: bitmap = 1 byte, padded to 8
831        let s1 = make_schema(vec![("a", DataType::Int64, false)]);
832        assert_eq!(RowSchema::from_arrow(&s1).unwrap().null_bitmap_size(), 8);
833
834        // 8 fields: bitmap = 1 byte, padded to 8
835        let s8 = make_schema(
836            (0..8)
837                .map(|i| {
838                    (
839                        Box::leak(format!("f{i}").into_boxed_str()) as &str,
840                        DataType::Int32,
841                        false,
842                    )
843                })
844                .collect(),
845        );
846        assert_eq!(RowSchema::from_arrow(&s8).unwrap().null_bitmap_size(), 8);
847
848        // 9 fields: bitmap = 2 bytes, padded to 8
849        let s9 = make_schema(
850            (0..9)
851                .map(|i| {
852                    (
853                        Box::leak(format!("f{i}").into_boxed_str()) as &str,
854                        DataType::Int32,
855                        false,
856                    )
857                })
858                .collect(),
859        );
860        assert_eq!(RowSchema::from_arrow(&s9).unwrap().null_bitmap_size(), 8);
861
862        // 65 fields: bitmap = 9 bytes, padded to 16
863        let s65 = make_schema(
864            (0..65)
865                .map(|i| {
866                    (
867                        Box::leak(format!("f{i}").into_boxed_str()) as &str,
868                        DataType::Int32,
869                        false,
870                    )
871                })
872                .collect(),
873        );
874        assert_eq!(RowSchema::from_arrow(&s65).unwrap().null_bitmap_size(), 16);
875    }
876
877    #[test]
878    fn schema_field_alignment() {
879        // Bool (1B align) then Int64 (8B align) — should be padded.
880        let schema = make_schema(vec![
881            ("flag", DataType::Boolean, false),
882            ("ts", DataType::Int64, false),
883        ]);
884        let rs = RowSchema::from_arrow(&schema).unwrap();
885        // Bool at fixed_region_offset (16), size 1.
886        // Int64 at next 8-byte boundary.
887        let bool_off = rs.field(0).offset;
888        let i64_off = rs.field(1).offset;
889        assert_eq!(bool_off % 1, 0); // trivially aligned
890        assert_eq!(i64_off % 8, 0); // 8-byte aligned
891        assert!(i64_off > bool_off);
892    }
893
894    #[test]
895    fn schema_mixed_fixed_variable() {
896        let schema = make_schema(vec![
897            ("id", DataType::Int64, false),
898            ("name", DataType::Utf8, true),
899            ("score", DataType::Float64, false),
900            ("data", DataType::Binary, true),
901        ]);
902        let rs = RowSchema::from_arrow(&schema).unwrap();
903        assert_eq!(rs.field_count(), 4);
904        assert!(!rs.field(0).is_variable);
905        assert!(rs.field(1).is_variable);
906        assert_eq!(rs.field(1).size, 8); // inline descriptor
907        assert!(!rs.field(2).is_variable);
908        assert!(rs.field(3).is_variable);
909    }
910
911    #[test]
912    fn schema_var_region_offset_header() {
913        let schema = make_schema(vec![
914            ("a", DataType::Int64, false),
915            ("b", DataType::Utf8, true),
916        ]);
917        let rs = RowSchema::from_arrow(&schema).unwrap();
918        // min_row_size is where variable data would begin.
919        assert!(rs.min_row_size() > rs.fixed_region_offset());
920    }
921
922    // ---- EventRow / MutableEventRow roundtrip tests ----
923
924    #[test]
925    fn roundtrip_bool() {
926        let schema = make_schema(vec![("flag", DataType::Boolean, false)]);
927        let rs = RowSchema::from_arrow(&schema).unwrap();
928        let arena = Bump::new();
929        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
930        row.set_bool(0, true);
931        let row = row.freeze();
932        assert!(row.get_bool(0));
933    }
934
935    #[test]
936    fn roundtrip_integers() {
937        let schema = make_schema(vec![
938            ("a", DataType::Int8, false),
939            ("b", DataType::Int16, false),
940            ("c", DataType::Int32, false),
941            ("d", DataType::Int64, false),
942        ]);
943        let rs = RowSchema::from_arrow(&schema).unwrap();
944        let arena = Bump::new();
945        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
946        row.set_i8(0, -42);
947        row.set_i16(1, -1000);
948        row.set_i32(2, 100_000);
949        row.set_i64(3, i64::MAX);
950        let row = row.freeze();
951        assert_eq!(row.get_i8(0), -42);
952        assert_eq!(row.get_i16(1), -1000);
953        assert_eq!(row.get_i32(2), 100_000);
954        assert_eq!(row.get_i64(3), i64::MAX);
955    }
956
957    #[test]
958    fn roundtrip_unsigned() {
959        let schema = make_schema(vec![
960            ("a", DataType::UInt8, false),
961            ("b", DataType::UInt16, false),
962            ("c", DataType::UInt32, false),
963            ("d", DataType::UInt64, false),
964        ]);
965        let rs = RowSchema::from_arrow(&schema).unwrap();
966        let arena = Bump::new();
967        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
968        row.set_u8(0, 255);
969        row.set_u16(1, 60_000);
970        row.set_u32(2, u32::MAX);
971        row.set_u64(3, u64::MAX);
972        let row = row.freeze();
973        assert_eq!(row.get_u8(0), 255);
974        assert_eq!(row.get_u16(1), 60_000);
975        assert_eq!(row.get_u32(2), u32::MAX);
976        assert_eq!(row.get_u64(3), u64::MAX);
977    }
978
979    #[test]
980    fn roundtrip_floats() {
981        let schema = make_schema(vec![
982            ("a", DataType::Float32, false),
983            ("b", DataType::Float64, false),
984        ]);
985        let rs = RowSchema::from_arrow(&schema).unwrap();
986        let arena = Bump::new();
987        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
988        row.set_f32(0, std::f32::consts::PI);
989        row.set_f64(1, std::f64::consts::E);
990        let row = row.freeze();
991        assert!((row.get_f32(0) - std::f32::consts::PI).abs() < f32::EPSILON);
992        assert!((row.get_f64(1) - std::f64::consts::E).abs() < f64::EPSILON);
993    }
994
995    #[test]
996    fn roundtrip_timestamp() {
997        let schema = make_schema(vec![(
998            "ts",
999            DataType::Timestamp(TimeUnit::Microsecond, None),
1000            false,
1001        )]);
1002        let rs = RowSchema::from_arrow(&schema).unwrap();
1003        let arena = Bump::new();
1004        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1005        let ts = 1_706_000_000_000_000_i64;
1006        row.set_i64(0, ts);
1007        let row = row.freeze();
1008        assert_eq!(row.get_i64(0), ts);
1009        assert_eq!(row.timestamp(), ts);
1010    }
1011
1012    #[test]
1013    fn roundtrip_str() {
1014        let schema = make_schema(vec![("name", DataType::Utf8, true)]);
1015        let rs = RowSchema::from_arrow(&schema).unwrap();
1016        let arena = Bump::new();
1017        let mut row = MutableEventRow::new_in(&arena, &rs, 256);
1018        row.set_str(0, "hello world");
1019        let row = row.freeze();
1020        assert_eq!(row.get_str(0), "hello world");
1021    }
1022
1023    #[test]
1024    fn roundtrip_bytes() {
1025        let schema = make_schema(vec![("data", DataType::Binary, true)]);
1026        let rs = RowSchema::from_arrow(&schema).unwrap();
1027        let arena = Bump::new();
1028        let mut row = MutableEventRow::new_in(&arena, &rs, 256);
1029        row.set_bytes(0, &[0xDE, 0xAD, 0xBE, 0xEF]);
1030        let row = row.freeze();
1031        assert_eq!(row.get_bytes(0), &[0xDE, 0xAD, 0xBE, 0xEF]);
1032    }
1033
1034    #[test]
1035    fn roundtrip_all_types() {
1036        let schema = make_schema(vec![
1037            (
1038                "ts",
1039                DataType::Timestamp(TimeUnit::Microsecond, None),
1040                false,
1041            ),
1042            ("flag", DataType::Boolean, false),
1043            ("i8", DataType::Int8, false),
1044            ("i16", DataType::Int16, false),
1045            ("i32", DataType::Int32, false),
1046            ("i64", DataType::Int64, false),
1047            ("u8", DataType::UInt8, false),
1048            ("u16", DataType::UInt16, false),
1049            ("u32", DataType::UInt32, false),
1050            ("u64", DataType::UInt64, false),
1051            ("f32", DataType::Float32, false),
1052            ("f64", DataType::Float64, false),
1053            ("name", DataType::Utf8, true),
1054            ("data", DataType::Binary, true),
1055        ]);
1056        let rs = RowSchema::from_arrow(&schema).unwrap();
1057        let arena = Bump::new();
1058        let mut row = MutableEventRow::new_in(&arena, &rs, 512);
1059
1060        row.set_i64(0, 999_999);
1061        row.set_bool(1, true);
1062        row.set_i8(2, -1);
1063        row.set_i16(3, 1000);
1064        row.set_i32(4, -50_000);
1065        row.set_i64(5, 42);
1066        row.set_u8(6, 200);
1067        row.set_u16(7, 50_000);
1068        row.set_u32(8, 3_000_000);
1069        row.set_u64(9, 18_000_000_000);
1070        row.set_f32(10, 1.5);
1071        row.set_f64(11, 2.718);
1072        row.set_str(12, "test");
1073        row.set_bytes(13, &[1, 2, 3]);
1074
1075        let row = row.freeze();
1076
1077        assert_eq!(row.timestamp(), 999_999);
1078        assert!(row.get_bool(1));
1079        assert_eq!(row.get_i8(2), -1);
1080        assert_eq!(row.get_i16(3), 1000);
1081        assert_eq!(row.get_i32(4), -50_000);
1082        assert_eq!(row.get_i64(5), 42);
1083        assert_eq!(row.get_u8(6), 200);
1084        assert_eq!(row.get_u16(7), 50_000);
1085        assert_eq!(row.get_u32(8), 3_000_000);
1086        assert_eq!(row.get_u64(9), 18_000_000_000);
1087        assert!((row.get_f32(10) - 1.5).abs() < f32::EPSILON);
1088        assert!((row.get_f64(11) - 2.718).abs() < f64::EPSILON);
1089        assert_eq!(row.get_str(12), "test");
1090        assert_eq!(row.get_bytes(13), &[1, 2, 3]);
1091    }
1092
1093    #[test]
1094    fn null_bitmap_set_and_check() {
1095        let schema = make_schema(vec![
1096            ("a", DataType::Int64, true),
1097            ("b", DataType::Float64, true),
1098            ("c", DataType::Utf8, true),
1099        ]);
1100        let rs = RowSchema::from_arrow(&schema).unwrap();
1101        let arena = Bump::new();
1102        let mut row = MutableEventRow::new_in(&arena, &rs, 64);
1103
1104        // Initially all valid (not null).
1105        let view = row.as_event_row();
1106        assert!(!view.is_null(0));
1107        assert!(!view.is_null(1));
1108        assert!(!view.is_null(2));
1109
1110        row.set_null(1, true);
1111        let view = row.as_event_row();
1112        assert!(!view.is_null(0));
1113        assert!(view.is_null(1));
1114        assert!(!view.is_null(2));
1115
1116        // Clear it again.
1117        row.set_null(1, false);
1118        let view = row.as_event_row();
1119        assert!(!view.is_null(1));
1120    }
1121
1122    #[test]
1123    fn all_null_row() {
1124        let schema = make_schema(vec![
1125            ("a", DataType::Int64, true),
1126            ("b", DataType::Float64, true),
1127            ("c", DataType::Utf8, true),
1128        ]);
1129        let rs = RowSchema::from_arrow(&schema).unwrap();
1130        let arena = Bump::new();
1131        let mut row = MutableEventRow::new_in(&arena, &rs, 64);
1132
1133        for i in 0..rs.field_count() {
1134            row.set_null(i, true);
1135        }
1136        let row = row.freeze();
1137        for i in 0..rs.field_count() {
1138            assert!(row.is_null(i));
1139        }
1140    }
1141
1142    #[test]
1143    fn freeze_returns_correct_length() {
1144        let schema = make_schema(vec![
1145            ("id", DataType::Int64, false),
1146            ("name", DataType::Utf8, true),
1147        ]);
1148        let rs = RowSchema::from_arrow(&schema).unwrap();
1149        let arena = Bump::new();
1150        let mut row = MutableEventRow::new_in(&arena, &rs, 256);
1151        row.set_i64(0, 42);
1152        row.set_str(1, "hello");
1153        let frozen = row.freeze();
1154        // Data length = min_row_size + 5 ("hello")
1155        assert_eq!(frozen.data().len(), rs.min_row_size() + 5);
1156    }
1157
1158    #[test]
1159    fn as_event_row_borrow() {
1160        let schema = make_schema(vec![("x", DataType::Int32, false)]);
1161        let rs = RowSchema::from_arrow(&schema).unwrap();
1162        let arena = Bump::new();
1163        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1164        row.set_i32(0, 77);
1165        // Borrow as EventRow, then continue mutating.
1166        assert_eq!(row.as_event_row().get_i32(0), 77);
1167        row.set_i32(0, 88);
1168        assert_eq!(row.as_event_row().get_i32(0), 88);
1169    }
1170
1171    #[test]
1172    fn field_ptr_access() {
1173        let schema = make_schema(vec![("val", DataType::Int64, false)]);
1174        let rs = RowSchema::from_arrow(&schema).unwrap();
1175        let arena = Bump::new();
1176        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1177        row.set_i64(0, 12345);
1178        let frozen = row.freeze();
1179        let ptr = frozen.field_ptr(0);
1180        // Read i64 from raw pointer.
1181        let bytes = unsafe { std::slice::from_raw_parts(ptr, 8) };
1182        let val = i64::from_le_bytes(bytes.try_into().unwrap());
1183        assert_eq!(val, 12345);
1184    }
1185
1186    #[test]
1187    fn timestamp_convenience() {
1188        let schema = make_schema(vec![
1189            ("ts", DataType::Int64, false),
1190            ("val", DataType::Float64, false),
1191        ]);
1192        let rs = RowSchema::from_arrow(&schema).unwrap();
1193        let arena = Bump::new();
1194        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1195        row.set_i64(0, 42_000);
1196        row.set_f64(1, 1.0);
1197        let row = row.freeze();
1198        assert_eq!(row.timestamp(), 42_000);
1199    }
1200
1201    #[test]
1202    fn empty_string_roundtrip() {
1203        let schema = make_schema(vec![("s", DataType::Utf8, true)]);
1204        let rs = RowSchema::from_arrow(&schema).unwrap();
1205        let arena = Bump::new();
1206        let mut row = MutableEventRow::new_in(&arena, &rs, 64);
1207        row.set_str(0, "");
1208        let row = row.freeze();
1209        assert_eq!(row.get_str(0), "");
1210    }
1211
1212    #[test]
1213    fn large_binary_roundtrip() {
1214        let schema = make_schema(vec![("data", DataType::Binary, true)]);
1215        let rs = RowSchema::from_arrow(&schema).unwrap();
1216        let arena = Bump::new();
1217        let blob: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1218        let mut row = MutableEventRow::new_in(&arena, &rs, blob.len());
1219        row.set_bytes(0, &blob);
1220        let row = row.freeze();
1221        assert_eq!(row.get_bytes(0), &blob[..]);
1222    }
1223
1224    #[test]
1225    fn multiple_variable_fields() {
1226        let schema = make_schema(vec![
1227            ("a", DataType::Utf8, true),
1228            ("b", DataType::Binary, true),
1229            ("c", DataType::Utf8, true),
1230        ]);
1231        let rs = RowSchema::from_arrow(&schema).unwrap();
1232        let arena = Bump::new();
1233        let mut row = MutableEventRow::new_in(&arena, &rs, 512);
1234        row.set_str(0, "first");
1235        row.set_bytes(1, &[10, 20, 30]);
1236        row.set_str(2, "third");
1237        let row = row.freeze();
1238        assert_eq!(row.get_str(0), "first");
1239        assert_eq!(row.get_bytes(1), &[10, 20, 30]);
1240        assert_eq!(row.get_str(2), "third");
1241    }
1242
1243    #[test]
1244    fn roundtrip_max_values() {
1245        let schema = make_schema(vec![
1246            ("a", DataType::Int64, false),
1247            ("b", DataType::Float64, false),
1248            ("c", DataType::UInt64, false),
1249        ]);
1250        let rs = RowSchema::from_arrow(&schema).unwrap();
1251        let arena = Bump::new();
1252        let mut row = MutableEventRow::new_in(&arena, &rs, 0);
1253        row.set_i64(0, i64::MIN);
1254        row.set_f64(1, f64::MAX);
1255        row.set_u64(2, u64::MAX);
1256        let row = row.freeze();
1257        assert_eq!(row.get_i64(0), i64::MIN);
1258        assert_eq!(row.get_f64(1), f64::MAX);
1259        assert_eq!(row.get_u64(2), u64::MAX);
1260    }
1261
1262    #[test]
1263    fn field_type_from_arrow_all() {
1264        assert_eq!(
1265            FieldType::from_arrow(&DataType::Boolean),
1266            Some(FieldType::Bool)
1267        );
1268        assert_eq!(
1269            FieldType::from_arrow(&DataType::Int8),
1270            Some(FieldType::Int8)
1271        );
1272        assert_eq!(
1273            FieldType::from_arrow(&DataType::Int16),
1274            Some(FieldType::Int16)
1275        );
1276        assert_eq!(
1277            FieldType::from_arrow(&DataType::Int32),
1278            Some(FieldType::Int32)
1279        );
1280        assert_eq!(
1281            FieldType::from_arrow(&DataType::Int64),
1282            Some(FieldType::Int64)
1283        );
1284        assert_eq!(
1285            FieldType::from_arrow(&DataType::UInt8),
1286            Some(FieldType::UInt8)
1287        );
1288        assert_eq!(
1289            FieldType::from_arrow(&DataType::UInt16),
1290            Some(FieldType::UInt16)
1291        );
1292        assert_eq!(
1293            FieldType::from_arrow(&DataType::UInt32),
1294            Some(FieldType::UInt32)
1295        );
1296        assert_eq!(
1297            FieldType::from_arrow(&DataType::UInt64),
1298            Some(FieldType::UInt64)
1299        );
1300        assert_eq!(
1301            FieldType::from_arrow(&DataType::Float32),
1302            Some(FieldType::Float32)
1303        );
1304        assert_eq!(
1305            FieldType::from_arrow(&DataType::Float64),
1306            Some(FieldType::Float64)
1307        );
1308        assert_eq!(
1309            FieldType::from_arrow(&DataType::Timestamp(TimeUnit::Microsecond, None)),
1310            Some(FieldType::TimestampMicros)
1311        );
1312        assert_eq!(
1313            FieldType::from_arrow(&DataType::Timestamp(
1314                TimeUnit::Microsecond,
1315                Some("UTC".into())
1316            )),
1317            Some(FieldType::TimestampMicros)
1318        );
1319        assert_eq!(
1320            FieldType::from_arrow(&DataType::Utf8),
1321            Some(FieldType::Utf8)
1322        );
1323        assert_eq!(
1324            FieldType::from_arrow(&DataType::Binary),
1325            Some(FieldType::Binary)
1326        );
1327        // Unsupported types
1328        assert_eq!(FieldType::from_arrow(&DataType::Float16), None);
1329        assert_eq!(FieldType::from_arrow(&DataType::LargeUtf8), None);
1330        assert_eq!(
1331            FieldType::from_arrow(&DataType::Timestamp(TimeUnit::Nanosecond, None)),
1332            None
1333        );
1334    }
1335
1336    #[test]
1337    fn field_type_to_arrow_roundtrip() {
1338        let types = [
1339            FieldType::Bool,
1340            FieldType::Int8,
1341            FieldType::Int16,
1342            FieldType::Int32,
1343            FieldType::Int64,
1344            FieldType::UInt8,
1345            FieldType::UInt16,
1346            FieldType::UInt32,
1347            FieldType::UInt64,
1348            FieldType::Float32,
1349            FieldType::Float64,
1350            FieldType::TimestampMicros,
1351            FieldType::Utf8,
1352            FieldType::Binary,
1353        ];
1354        for ft in types {
1355            let arrow_dt = ft.to_arrow();
1356            let back = FieldType::from_arrow(&arrow_dt).unwrap();
1357            assert_eq!(ft, back, "roundtrip failed for {ft:?}");
1358        }
1359    }
1360}