laminar_connectors/serde/
raw.rs1use std::sync::Arc;
7
8use arrow_array::builder::StringBuilder;
9use arrow_array::{Array, RecordBatch, StringArray};
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11
12use super::{Format, RecordDeserializer, RecordSerializer};
13use crate::error::SerdeError;
14
15fn raw_schema() -> SchemaRef {
17 Arc::new(Schema::new(vec![Field::new(
18 "value",
19 DataType::Utf8,
20 false,
21 )]))
22}
23
24#[derive(Debug, Clone)]
30pub struct RawBytesDeserializer {
31 _private: (),
32}
33
34impl RawBytesDeserializer {
35 #[must_use]
37 pub fn new() -> Self {
38 Self { _private: () }
39 }
40}
41
42impl Default for RawBytesDeserializer {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl RecordDeserializer for RawBytesDeserializer {
49 fn deserialize(&self, data: &[u8], _schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
50 let s = std::str::from_utf8(data)
51 .map_err(|e| SerdeError::MalformedInput(format!("invalid UTF-8: {e}")))?;
52 let mut builder = StringBuilder::with_capacity(1, data.len());
53 builder.append_value(s);
54
55 let schema = raw_schema();
56 RecordBatch::try_new(schema, vec![Arc::new(builder.finish())])
57 .map_err(|e| SerdeError::MalformedInput(format!("failed to create batch: {e}")))
58 }
59
60 fn deserialize_batch(
61 &self,
62 records: &[&[u8]],
63 _schema: &SchemaRef,
64 ) -> Result<RecordBatch, SerdeError> {
65 let total_bytes: usize = records.iter().map(|r| r.len()).sum();
66 let mut builder = StringBuilder::with_capacity(records.len(), total_bytes);
67
68 for data in records {
69 let s = std::str::from_utf8(data)
70 .map_err(|e| SerdeError::MalformedInput(format!("invalid UTF-8: {e}")))?;
71 builder.append_value(s);
72 }
73
74 let schema = raw_schema();
75 RecordBatch::try_new(schema, vec![Arc::new(builder.finish())])
76 .map_err(|e| SerdeError::MalformedInput(format!("failed to create batch: {e}")))
77 }
78
79 fn format(&self) -> Format {
80 Format::Raw
81 }
82}
83
84#[derive(Debug, Clone)]
88pub struct RawBytesSerializer {
89 _private: (),
90}
91
92impl RawBytesSerializer {
93 #[must_use]
95 pub fn new() -> Self {
96 Self { _private: () }
97 }
98}
99
100impl Default for RawBytesSerializer {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl RecordSerializer for RawBytesSerializer {
107 fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
108 if batch.num_columns() == 0 {
109 return Ok(Vec::new());
110 }
111
112 let column = batch.column(0);
113 let string_arr = column
114 .as_any()
115 .downcast_ref::<StringArray>()
116 .ok_or_else(|| {
117 SerdeError::UnsupportedFormat("raw serializer expects Utf8 column".into())
118 })?;
119
120 let mut records = Vec::with_capacity(batch.num_rows());
121 for i in 0..batch.num_rows() {
122 if string_arr.is_null(i) {
123 records.push(Vec::new());
124 } else {
125 records.push(string_arr.value(i).as_bytes().to_vec());
126 }
127 }
128 Ok(records)
129 }
130
131 fn format(&self) -> Format {
132 Format::Raw
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_raw_deserialize() {
142 let deser = RawBytesDeserializer::new();
143 let schema = raw_schema();
144 let data = b"hello world";
145
146 let batch = deser.deserialize(data, &schema).unwrap();
147 assert_eq!(batch.num_rows(), 1);
148
149 let arr = batch
150 .column(0)
151 .as_any()
152 .downcast_ref::<StringArray>()
153 .unwrap();
154 assert_eq!(arr.value(0), "hello world");
155 }
156
157 #[test]
158 fn test_raw_roundtrip() {
159 let deser = RawBytesDeserializer::new();
160 let ser = RawBytesSerializer::new();
161 let schema = raw_schema();
162
163 let data = b"test data";
164 let batch = deser.deserialize(data, &schema).unwrap();
165 let serialized = ser.serialize(&batch).unwrap();
166
167 assert_eq!(serialized.len(), 1);
168 assert_eq!(serialized[0], b"test data");
169 }
170
171 #[test]
172 fn test_raw_batch() {
173 let deser = RawBytesDeserializer::new();
174 let schema = raw_schema();
175
176 let records: Vec<&[u8]> = vec![b"line1", b"line2", b"line3"];
177 let batch = deser.deserialize_batch(&records, &schema).unwrap();
178 assert_eq!(batch.num_rows(), 3);
179 }
180}