laminar_connectors/schema/
bridge.rs1use arrow_array::RecordBatch;
11use arrow_schema::SchemaRef;
12
13use super::error::{SchemaError, SchemaResult};
14use super::traits::{FormatDecoder, FormatEncoder};
15use super::types::RawRecord;
16use crate::serde::{RecordDeserializer, RecordSerializer};
17
18pub struct DeserializerDecoder {
37 inner: Box<dyn RecordDeserializer>,
38 schema: SchemaRef,
39 format: String,
40}
41
42impl DeserializerDecoder {
43 pub fn new(
45 deserializer: Box<dyn RecordDeserializer>,
46 schema: SchemaRef,
47 format: impl Into<String>,
48 ) -> Self {
49 Self {
50 inner: deserializer,
51 schema,
52 format: format.into(),
53 }
54 }
55}
56
57impl std::fmt::Debug for DeserializerDecoder {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("DeserializerDecoder")
60 .field("format", &self.format)
61 .field("schema", &self.schema)
62 .finish_non_exhaustive()
63 }
64}
65
66impl FormatDecoder for DeserializerDecoder {
67 fn output_schema(&self) -> SchemaRef {
68 self.schema.clone()
69 }
70
71 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
72 if records.is_empty() {
73 return Ok(RecordBatch::new_empty(self.schema.clone()));
74 }
75
76 let slices: Vec<&[u8]> = records.iter().map(|r| r.value.as_slice()).collect();
77 self.inner
78 .deserialize_batch(&slices, &self.schema)
79 .map_err(|e| SchemaError::DecodeError(e.to_string()))
80 }
81
82 fn decode_one(&self, record: &RawRecord) -> SchemaResult<RecordBatch> {
83 self.inner
84 .deserialize(&record.value, &self.schema)
85 .map_err(|e| SchemaError::DecodeError(e.to_string()))
86 }
87
88 fn format_name(&self) -> &str {
89 &self.format
90 }
91}
92
93pub struct SerializerEncoder {
98 inner: Box<dyn RecordSerializer>,
99 schema: SchemaRef,
100 format: String,
101}
102
103impl SerializerEncoder {
104 pub fn new(
106 serializer: Box<dyn RecordSerializer>,
107 schema: SchemaRef,
108 format: impl Into<String>,
109 ) -> Self {
110 Self {
111 inner: serializer,
112 schema,
113 format: format.into(),
114 }
115 }
116}
117
118impl std::fmt::Debug for SerializerEncoder {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.debug_struct("SerializerEncoder")
121 .field("format", &self.format)
122 .field("schema", &self.schema)
123 .finish_non_exhaustive()
124 }
125}
126
127impl FormatEncoder for SerializerEncoder {
128 fn input_schema(&self) -> SchemaRef {
129 self.schema.clone()
130 }
131
132 fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>> {
133 self.inner
134 .serialize(batch)
135 .map_err(|e| SchemaError::DecodeError(e.to_string()))
136 }
137
138 fn format_name(&self) -> &str {
139 &self.format
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::serde::json::{JsonDeserializer, JsonSerializer};
147 use arrow_array::Int64Array;
148 use arrow_schema::{DataType, Field, Schema};
149 use std::sync::Arc;
150
151 fn test_schema() -> SchemaRef {
152 Arc::new(Schema::new(vec![
153 Field::new("id", DataType::Int64, false),
154 Field::new("name", DataType::Utf8, true),
155 ]))
156 }
157
158 #[test]
161 fn test_decoder_output_schema() {
162 let schema = test_schema();
163 let decoder =
164 DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema.clone(), "json");
165 assert_eq!(decoder.output_schema(), schema);
166 assert_eq!(decoder.format_name(), "json");
167 }
168
169 #[test]
170 fn test_decoder_debug() {
171 let decoder =
172 DeserializerDecoder::new(Box::new(JsonDeserializer::new()), test_schema(), "json");
173 let dbg = format!("{decoder:?}");
174 assert!(dbg.contains("DeserializerDecoder"));
175 assert!(dbg.contains("json"));
176 }
177
178 #[test]
179 fn test_decoder_decode_one() {
180 let schema = test_schema();
181 let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
182
183 let record = RawRecord::new(br#"{"id": 42, "name": "Alice"}"#.to_vec());
184 let batch = decoder.decode_one(&record).unwrap();
185
186 assert_eq!(batch.num_rows(), 1);
187 assert_eq!(batch.num_columns(), 2);
188 }
189
190 #[test]
191 fn test_decoder_decode_batch() {
192 let schema = test_schema();
193 let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
194
195 let records = vec![
196 RawRecord::new(br#"{"id": 1, "name": "A"}"#.to_vec()),
197 RawRecord::new(br#"{"id": 2, "name": "B"}"#.to_vec()),
198 ];
199
200 let batch = decoder.decode_batch(&records).unwrap();
201 assert_eq!(batch.num_rows(), 2);
202 }
203
204 #[test]
205 fn test_decoder_decode_empty() {
206 let schema = test_schema();
207 let decoder =
208 DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema.clone(), "json");
209
210 let batch = decoder.decode_batch(&[]).unwrap();
211 assert_eq!(batch.num_rows(), 0);
212 assert_eq!(batch.schema(), schema);
213 }
214
215 #[test]
216 fn test_decoder_decode_error() {
217 let schema = test_schema();
218 let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
219
220 let record = RawRecord::new(b"not json".to_vec());
221 let result = decoder.decode_one(&record);
222 assert!(result.is_err());
223 }
224
225 #[test]
228 fn test_encoder_input_schema() {
229 let schema = test_schema();
230 let encoder =
231 SerializerEncoder::new(Box::new(JsonSerializer::new()), schema.clone(), "json");
232 assert_eq!(encoder.input_schema(), schema);
233 assert_eq!(encoder.format_name(), "json");
234 }
235
236 #[test]
237 fn test_encoder_debug() {
238 let encoder =
239 SerializerEncoder::new(Box::new(JsonSerializer::new()), test_schema(), "json");
240 let dbg = format!("{encoder:?}");
241 assert!(dbg.contains("SerializerEncoder"));
242 }
243
244 #[test]
245 fn test_encoder_encode_batch() {
246 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
247
248 let batch = RecordBatch::try_new(
249 schema.clone(),
250 vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
251 )
252 .unwrap();
253
254 let encoder = SerializerEncoder::new(Box::new(JsonSerializer::new()), schema, "json");
255
256 let output = encoder.encode_batch(&batch).unwrap();
257 assert_eq!(output.len(), 3);
258
259 for bytes in &output {
261 let _: serde_json::Value = serde_json::from_slice(bytes).unwrap();
262 }
263 }
264
265 #[test]
268 fn test_roundtrip_through_bridge() {
269 let schema = Arc::new(Schema::new(vec![Field::new(
270 "value",
271 DataType::Int64,
272 false,
273 )]));
274
275 let batch = RecordBatch::try_new(
277 schema.clone(),
278 vec![Arc::new(Int64Array::from(vec![10, 20]))],
279 )
280 .unwrap();
281
282 let encoder =
283 SerializerEncoder::new(Box::new(JsonSerializer::new()), schema.clone(), "json");
284 let output = encoder.encode_batch(&batch).unwrap();
285
286 let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
288 let records: Vec<RawRecord> = output.into_iter().map(RawRecord::new).collect();
289 let result_batch = decoder.decode_batch(&records).unwrap();
290
291 assert_eq!(result_batch.num_rows(), 2);
292 let col = result_batch
293 .column(0)
294 .as_any()
295 .downcast_ref::<Int64Array>()
296 .unwrap();
297 assert_eq!(col.value(0), 10);
298 assert_eq!(col.value(1), 20);
299 }
300}