Skip to main content

laminar_core/serialization/
mod.rs

1//! Shared serialization helpers.
2//!
3//! - Arrow IPC: `RecordBatch` ↔ bytes conversion using the Arrow IPC stream format.
4//! - `jsonb_tags`: Canonical JSONB binary format type tag constants.
5
6/// Canonical JSONB binary format type tag constants.
7pub mod jsonb_tags;
8
9use arrow_array::RecordBatch;
10use arrow_ipc::reader::StreamReader;
11use arrow_ipc::writer::StreamWriter;
12
13/// Serializes a single [`RecordBatch`] to Arrow IPC stream bytes.
14///
15/// # Errors
16///
17/// Returns [`arrow_schema::ArrowError`] if IPC encoding fails.
18pub fn serialize_batch_stream(batch: &RecordBatch) -> Result<Vec<u8>, arrow_schema::ArrowError> {
19    let mut buf = Vec::new();
20    {
21        let mut writer = StreamWriter::try_new(&mut buf, &batch.schema())?;
22        writer.write(batch)?;
23        writer.finish()?;
24    }
25    Ok(buf)
26}
27
28/// Deserializes a single [`RecordBatch`] from Arrow IPC stream bytes.
29///
30/// # Errors
31///
32/// Returns [`arrow_schema::ArrowError`] if the bytes are invalid or contain no batches.
33pub fn deserialize_batch_stream(bytes: &[u8]) -> Result<RecordBatch, arrow_schema::ArrowError> {
34    let cursor = std::io::Cursor::new(bytes);
35    let mut reader = StreamReader::try_new(cursor, None)?;
36    reader.next().ok_or_else(|| {
37        arrow_schema::ArrowError::IpcError("no record batch in IPC stream".to_string())
38    })?
39}