laminar_core/serialization/
mod.rs1pub mod jsonb_tags;
8
9use std::sync::Arc;
10
11use arrow::buffer::Buffer;
12use arrow_array::RecordBatch;
13use arrow_ipc::reader::{StreamDecoder, StreamReader};
14use arrow_ipc::writer::StreamWriter;
15use arrow_schema::{ArrowError, Schema, SchemaRef};
16
17pub fn serialize_batch_stream(batch: &RecordBatch) -> Result<Vec<u8>, arrow_schema::ArrowError> {
23 let mut buf = Vec::new();
24 {
25 let mut writer = StreamWriter::try_new(&mut buf, &batch.schema())?;
26 writer.write(batch)?;
27 writer.finish()?;
28 }
29 Ok(buf)
30}
31
32pub fn deserialize_batch_stream(bytes: &[u8]) -> Result<RecordBatch, arrow_schema::ArrowError> {
38 let cursor = std::io::Cursor::new(bytes);
39 let mut reader = StreamReader::try_new(cursor, None)?;
40 reader.next().ok_or_else(|| {
41 arrow_schema::ArrowError::IpcError("no record batch in IPC stream".to_string())
42 })?
43}
44
45pub struct BatchStreamEncoder {
48 writer: StreamWriter<Vec<u8>>,
49 schema: SchemaRef,
50}
51
52impl BatchStreamEncoder {
53 pub fn new(schema: &Schema) -> Result<Self, ArrowError> {
58 Ok(Self {
59 writer: StreamWriter::try_new(Vec::new(), schema)?,
60 schema: Arc::new(schema.clone()),
61 })
62 }
63
64 #[must_use]
66 pub fn schema(&self) -> &SchemaRef {
67 &self.schema
68 }
69
70 pub fn encode(&mut self, batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
76 self.writer.write(batch)?;
77 Ok(std::mem::take(self.writer.get_mut()))
78 }
79
80 pub fn finish(&mut self) -> Result<Vec<u8>, ArrowError> {
87 self.writer.finish()?;
88 Ok(std::mem::take(self.writer.get_mut()))
89 }
90}
91
92#[derive(Debug, Default)]
95pub struct BatchStreamDecoder {
96 decoder: StreamDecoder,
97}
98
99impl BatchStreamDecoder {
100 #[must_use]
102 pub fn new() -> Self {
103 Self::default()
104 }
105
106 pub fn decode_chunk(&mut self, bytes: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
113 let mut buffer = Buffer::from_vec(bytes);
114 let mut batches = Vec::new();
115 while !buffer.is_empty() {
117 if let Some(batch) = self.decoder.decode(&mut buffer)? {
118 batches.push(batch);
119 }
120 }
121 Ok(batches)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use arrow_array::Int32Array;
129 use arrow_schema::{DataType, Field};
130 use std::sync::Arc;
131
132 fn batch(values: &[i32]) -> RecordBatch {
133 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
134 RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values.to_vec()))]).unwrap()
135 }
136
137 #[test]
141 fn stream_encoder_emits_schema_once() {
142 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
143 let mut encoder = BatchStreamEncoder::new(&schema).unwrap();
144
145 let first = encoder.encode(&batch(&[1, 2, 3])).unwrap();
146 let second = encoder.encode(&batch(&[4, 5, 6])).unwrap();
147
148 assert!(first.len() > second.len());
151
152 let standalone = serialize_batch_stream(&batch(&[4, 5, 6])).unwrap();
155 assert!(second.len() < standalone.len());
156 }
157
158 #[test]
161 fn stream_encode_decode_roundtrip() {
162 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
163 let mut encoder = BatchStreamEncoder::new(&schema).unwrap();
164 let inputs = [batch(&[1, 2]), batch(&[3, 4, 5]), batch(&[])];
167 let mut chunks: Vec<Vec<u8>> = inputs.iter().map(|b| encoder.encode(b).unwrap()).collect();
168 chunks.push(encoder.finish().unwrap());
169
170 let mut decoder = BatchStreamDecoder::new();
171 let mut out = Vec::new();
172 for chunk in chunks {
173 out.extend(decoder.decode_chunk(chunk).unwrap());
174 }
175
176 assert_eq!(out, inputs);
177 }
178}