Skip to main content

laminar_core/serialization/
mod.rs

1//! Shared serialization helpers.
2//!
3//! - Arrow IPC: `RecordBatch` ↔ bytes conversion using the Arrow IPC stream format.
4//! - `jsonb_tags`: Canonical JSONB binary format type tag constants.
5
6/// Canonical JSONB binary format type tag constants.
7pub mod jsonb_tags;
8
9use std::sync::Arc;
10
11use arrow::buffer::Buffer;
12use arrow_array::RecordBatch;
13use arrow_ipc::reader::{StreamDecoder, StreamReader};
14use arrow_ipc::writer::StreamWriter;
15use arrow_schema::{ArrowError, Schema, SchemaRef};
16
17/// Serializes a single [`RecordBatch`] to Arrow IPC stream bytes.
18///
19/// # Errors
20///
21/// Returns [`arrow_schema::ArrowError`] if IPC encoding fails.
22pub fn serialize_batch_stream(batch: &RecordBatch) -> Result<Vec<u8>, arrow_schema::ArrowError> {
23    let mut buf = Vec::new();
24    {
25        let mut writer = StreamWriter::try_new(&mut buf, &batch.schema())?;
26        writer.write(batch)?;
27        writer.finish()?;
28    }
29    Ok(buf)
30}
31
32/// Deserializes a single [`RecordBatch`] from Arrow IPC stream bytes.
33///
34/// # Errors
35///
36/// Returns [`arrow_schema::ArrowError`] if the bytes are invalid or contain no batches.
37pub fn deserialize_batch_stream(bytes: &[u8]) -> Result<RecordBatch, arrow_schema::ArrowError> {
38    let cursor = std::io::Cursor::new(bytes);
39    let mut reader = StreamReader::try_new(cursor, None)?;
40    reader.next().ok_or_else(|| {
41        arrow_schema::ArrowError::IpcError("no record batch in IPC stream".to_string())
42    })?
43}
44
45/// Incremental Arrow IPC encoder that writes the schema once: concatenating the
46/// per-call blobs in order yields one IPC stream for [`BatchStreamDecoder`].
47pub struct BatchStreamEncoder {
48    writer: StreamWriter<Vec<u8>>,
49    schema: SchemaRef,
50}
51
52impl BatchStreamEncoder {
53    /// Encoder for `schema`; the schema message flushes out with the first batch.
54    ///
55    /// # Errors
56    /// [`ArrowError`] if the schema header can't be IPC-encoded.
57    pub fn new(schema: &Schema) -> Result<Self, ArrowError> {
58        Ok(Self {
59            writer: StreamWriter::try_new(Vec::new(), schema)?,
60            schema: Arc::new(schema.clone()),
61        })
62    }
63
64    /// Schema this encoder was created with; every encoded batch must match it.
65    #[must_use]
66    pub fn schema(&self) -> &SchemaRef {
67        &self.schema
68    }
69
70    /// Encode one batch, returning the bytes written since the last call (the
71    /// first call also carries the schema).
72    ///
73    /// # Errors
74    /// [`ArrowError`] if IPC encoding fails.
75    pub fn encode(&mut self, batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
76        self.writer.write(batch)?;
77        Ok(std::mem::take(self.writer.get_mut()))
78    }
79
80    /// Finish the stream, returning the end-of-stream marker to append after the
81    /// last [`encode`](Self::encode). Also lets the decoder flush a trailing
82    /// zero-row batch; no batches may be encoded after this.
83    ///
84    /// # Errors
85    /// [`ArrowError`] if writing the marker fails.
86    pub fn finish(&mut self) -> Result<Vec<u8>, ArrowError> {
87        self.writer.finish()?;
88        Ok(std::mem::take(self.writer.get_mut()))
89    }
90}
91
92/// Decoder for a stream produced by [`BatchStreamEncoder`]: feed each chunk in
93/// order; the first chunk's schema decodes all later schema-less chunks.
94#[derive(Debug, Default)]
95pub struct BatchStreamDecoder {
96    decoder: StreamDecoder,
97}
98
99impl BatchStreamDecoder {
100    /// Creates an empty decoder that has not yet seen a schema.
101    #[must_use]
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    /// Decode every complete batch in one chunk; a batch straddling a chunk
107    /// boundary is buffered until the rest arrives, preserving order.
108    ///
109    /// # Errors
110    /// [`ArrowError`] if the bytes aren't a valid continuation (e.g. a batch
111    /// before any schema).
112    pub fn decode_chunk(&mut self, bytes: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
113        let mut buffer = Buffer::from_vec(bytes);
114        let mut batches = Vec::new();
115        // Drain the chunk: `decode` yields a batch each time one completes.
116        while !buffer.is_empty() {
117            if let Some(batch) = self.decoder.decode(&mut buffer)? {
118                batches.push(batch);
119            }
120        }
121        Ok(batches)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use arrow_array::Int32Array;
129    use arrow_schema::{DataType, Field};
130    use std::sync::Arc;
131
132    fn batch(values: &[i32]) -> RecordBatch {
133        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
134        RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values.to_vec()))]).unwrap()
135    }
136
137    // The schema rides only in the first chunk: an equal-sized later batch encodes
138    // to fewer bytes than the first, and to fewer bytes than a standalone
139    // schema-carrying serialization of the same batch.
140    #[test]
141    fn stream_encoder_emits_schema_once() {
142        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
143        let mut encoder = BatchStreamEncoder::new(&schema).unwrap();
144
145        let first = encoder.encode(&batch(&[1, 2, 3])).unwrap();
146        let second = encoder.encode(&batch(&[4, 5, 6])).unwrap();
147
148        // Same-width batches, yet the first is larger because it also carries the
149        // one-time schema message.
150        assert!(first.len() > second.len());
151
152        // A standalone (schema + batch) serialization of the equal-sized batch is
153        // larger than the schema-less chunk, proving the duplicate schema is gone.
154        let standalone = serialize_batch_stream(&batch(&[4, 5, 6])).unwrap();
155        assert!(second.len() < standalone.len());
156    }
157
158    // Encoding a batch sequence then feeding the chunks to a single decoder
159    // round-trips every batch, in order, including an empty (zero-row) batch.
160    #[test]
161    fn stream_encode_decode_roundtrip() {
162        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
163        let mut encoder = BatchStreamEncoder::new(&schema).unwrap();
164        // The trailing batch is empty (zero rows): it round-trips only because the
165        // end-of-stream marker from `finish` lets the push decoder flush it.
166        let inputs = [batch(&[1, 2]), batch(&[3, 4, 5]), batch(&[])];
167        let mut chunks: Vec<Vec<u8>> = inputs.iter().map(|b| encoder.encode(b).unwrap()).collect();
168        chunks.push(encoder.finish().unwrap());
169
170        let mut decoder = BatchStreamDecoder::new();
171        let mut out = Vec::new();
172        for chunk in chunks {
173            out.extend(decoder.decode_chunk(chunk).unwrap());
174        }
175
176        assert_eq!(out, inputs);
177    }
178}