Skip to main content

laminar_connectors/serde/
raw.rs

1//! Raw bytes pass-through serialization.
2//!
3//! Treats each record as an opaque byte array. The schema must have
4//! a single `Binary` or `Utf8` column.
5
6use std::sync::Arc;
7
8use arrow_array::builder::StringBuilder;
9use arrow_array::{Array, RecordBatch, StringArray};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11
12use super::{Format, RecordDeserializer, RecordSerializer};
13use crate::error::SerdeError;
14
15/// The default schema for raw bytes: a single `Utf8` column named "value".
16fn raw_schema() -> SchemaRef {
17    Arc::new(Schema::new(vec![Field::new(
18        "value",
19        DataType::Utf8,
20        false,
21    )]))
22}
23
24/// Raw bytes deserializer.
25///
26/// Stores each record's bytes as a single `Utf8` value in a `RecordBatch`
27/// with one column named "value". The provided schema is ignored in favor
28/// of the raw schema.
29#[derive(Debug, Clone)]
30pub struct RawBytesDeserializer {
31    _private: (),
32}
33
34impl RawBytesDeserializer {
35    /// Creates a new raw bytes deserializer.
36    #[must_use]
37    pub fn new() -> Self {
38        Self { _private: () }
39    }
40}
41
42impl Default for RawBytesDeserializer {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl RecordDeserializer for RawBytesDeserializer {
49    fn deserialize(&self, data: &[u8], _schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
50        let s = std::str::from_utf8(data)
51            .map_err(|e| SerdeError::MalformedInput(format!("invalid UTF-8: {e}")))?;
52        let mut builder = StringBuilder::with_capacity(1, data.len());
53        builder.append_value(s);
54
55        let schema = raw_schema();
56        RecordBatch::try_new(schema, vec![Arc::new(builder.finish())])
57            .map_err(|e| SerdeError::MalformedInput(format!("failed to create batch: {e}")))
58    }
59
60    fn deserialize_batch(
61        &self,
62        records: &[&[u8]],
63        _schema: &SchemaRef,
64    ) -> Result<RecordBatch, SerdeError> {
65        let total_bytes: usize = records.iter().map(|r| r.len()).sum();
66        let mut builder = StringBuilder::with_capacity(records.len(), total_bytes);
67
68        for data in records {
69            let s = std::str::from_utf8(data)
70                .map_err(|e| SerdeError::MalformedInput(format!("invalid UTF-8: {e}")))?;
71            builder.append_value(s);
72        }
73
74        let schema = raw_schema();
75        RecordBatch::try_new(schema, vec![Arc::new(builder.finish())])
76            .map_err(|e| SerdeError::MalformedInput(format!("failed to create batch: {e}")))
77    }
78
79    fn format(&self) -> Format {
80        Format::Raw
81    }
82}
83
84/// Raw bytes serializer.
85///
86/// Extracts the first column of each row as UTF-8 bytes.
87#[derive(Debug, Clone)]
88pub struct RawBytesSerializer {
89    _private: (),
90}
91
92impl RawBytesSerializer {
93    /// Creates a new raw bytes serializer.
94    #[must_use]
95    pub fn new() -> Self {
96        Self { _private: () }
97    }
98}
99
100impl Default for RawBytesSerializer {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl RecordSerializer for RawBytesSerializer {
107    fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
108        if batch.num_columns() == 0 {
109            return Ok(Vec::new());
110        }
111
112        let column = batch.column(0);
113        let string_arr = column
114            .as_any()
115            .downcast_ref::<StringArray>()
116            .ok_or_else(|| {
117                SerdeError::UnsupportedFormat("raw serializer expects Utf8 column".into())
118            })?;
119
120        let mut records = Vec::with_capacity(batch.num_rows());
121        for i in 0..batch.num_rows() {
122            if string_arr.is_null(i) {
123                records.push(Vec::new());
124            } else {
125                records.push(string_arr.value(i).as_bytes().to_vec());
126            }
127        }
128        Ok(records)
129    }
130
131    fn format(&self) -> Format {
132        Format::Raw
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_raw_deserialize() {
142        let deser = RawBytesDeserializer::new();
143        let schema = raw_schema();
144        let data = b"hello world";
145
146        let batch = deser.deserialize(data, &schema).unwrap();
147        assert_eq!(batch.num_rows(), 1);
148
149        let arr = batch
150            .column(0)
151            .as_any()
152            .downcast_ref::<StringArray>()
153            .unwrap();
154        assert_eq!(arr.value(0), "hello world");
155    }
156
157    #[test]
158    fn test_raw_roundtrip() {
159        let deser = RawBytesDeserializer::new();
160        let ser = RawBytesSerializer::new();
161        let schema = raw_schema();
162
163        let data = b"test data";
164        let batch = deser.deserialize(data, &schema).unwrap();
165        let serialized = ser.serialize(&batch).unwrap();
166
167        assert_eq!(serialized.len(), 1);
168        assert_eq!(serialized[0], b"test data");
169    }
170
171    #[test]
172    fn test_raw_batch() {
173        let deser = RawBytesDeserializer::new();
174        let schema = raw_schema();
175
176        let records: Vec<&[u8]> = vec![b"line1", b"line2", b"line3"];
177        let batch = deser.deserialize_batch(&records, &schema).unwrap();
178        assert_eq!(batch.num_rows(), 3);
179    }
180}