Skip to main content

laminar_core/
serialization.rs

1//! Shared Arrow IPC serialization helpers.
2//!
3//! Centralizes `RecordBatch` ↔ bytes conversion using the Arrow IPC stream format,
4//! eliminating copy-pasted implementations across join operators and backends.
5
6use arrow_array::RecordBatch;
7use arrow_ipc::reader::StreamReader;
8use arrow_ipc::writer::StreamWriter;
9
10/// Serializes a single [`RecordBatch`] to Arrow IPC stream bytes.
11///
12/// # Errors
13///
14/// Returns [`arrow_schema::ArrowError`] if IPC encoding fails.
15pub fn serialize_batch_stream(batch: &RecordBatch) -> Result<Vec<u8>, arrow_schema::ArrowError> {
16    let mut buf = Vec::new();
17    {
18        let mut writer = StreamWriter::try_new(&mut buf, &batch.schema())?;
19        writer.write(batch)?;
20        writer.finish()?;
21    }
22    Ok(buf)
23}
24
25/// Deserializes a single [`RecordBatch`] from Arrow IPC stream bytes.
26///
27/// # Errors
28///
29/// Returns [`arrow_schema::ArrowError`] if the bytes are invalid or contain no batches.
30pub fn deserialize_batch_stream(bytes: &[u8]) -> Result<RecordBatch, arrow_schema::ArrowError> {
31    let cursor = std::io::Cursor::new(bytes);
32    let mut reader = StreamReader::try_new(cursor, None)?;
33    reader.next().ok_or_else(|| {
34        arrow_schema::ArrowError::IpcError("no record batch in IPC stream".to_string())
35    })?
36}