Skip to main content

laminar_connectors/schema/
bridge.rs

1//! Bridge adapters between legacy serde traits and the new schema traits.
2//!
3//! - [`DeserializerDecoder`] wraps a [`RecordDeserializer`] into a [`FormatDecoder`]
4//! - [`SerializerEncoder`] wraps a [`RecordSerializer`] into a [`FormatEncoder`]
5//!
6//! This enables gradual migration: existing connectors that already have
7//! `RecordDeserializer`/`RecordSerializer` implementations can be used
8//! through the new schema framework without rewriting.
9
10use arrow_array::RecordBatch;
11use arrow_schema::SchemaRef;
12
13use super::error::{SchemaError, SchemaResult};
14use super::traits::{FormatDecoder, FormatEncoder};
15use super::types::RawRecord;
16use crate::serde::{RecordDeserializer, RecordSerializer};
17
18/// Adapts a [`RecordDeserializer`] to the [`FormatDecoder`] interface.
19///
20/// Wraps an existing `RecordDeserializer` and a schema so it can be used
21/// wherever a `FormatDecoder` is expected.
22///
23/// # Example
24///
25/// ```rust,ignore
26/// use laminar_connectors::serde::json::JsonDeserializer;
27/// use laminar_connectors::schema::bridge::DeserializerDecoder;
28///
29/// let decoder = DeserializerDecoder::new(
30///     Box::new(JsonDeserializer::new()),
31///     my_schema,
32///     "json",
33/// );
34/// let batch = decoder.decode_batch(&records)?;
35/// ```
36pub struct DeserializerDecoder {
37    inner: Box<dyn RecordDeserializer>,
38    schema: SchemaRef,
39    format: String,
40}
41
42impl DeserializerDecoder {
43    /// Creates a new bridge decoder.
44    pub fn new(
45        deserializer: Box<dyn RecordDeserializer>,
46        schema: SchemaRef,
47        format: impl Into<String>,
48    ) -> Self {
49        Self {
50            inner: deserializer,
51            schema,
52            format: format.into(),
53        }
54    }
55}
56
57impl std::fmt::Debug for DeserializerDecoder {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("DeserializerDecoder")
60            .field("format", &self.format)
61            .field("schema", &self.schema)
62            .finish_non_exhaustive()
63    }
64}
65
66impl FormatDecoder for DeserializerDecoder {
67    fn output_schema(&self) -> SchemaRef {
68        self.schema.clone()
69    }
70
71    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
72        if records.is_empty() {
73            return Ok(RecordBatch::new_empty(self.schema.clone()));
74        }
75
76        let slices: Vec<&[u8]> = records.iter().map(|r| r.value.as_slice()).collect();
77        self.inner
78            .deserialize_batch(&slices, &self.schema)
79            .map_err(|e| SchemaError::DecodeError(e.to_string()))
80    }
81
82    fn decode_one(&self, record: &RawRecord) -> SchemaResult<RecordBatch> {
83        self.inner
84            .deserialize(&record.value, &self.schema)
85            .map_err(|e| SchemaError::DecodeError(e.to_string()))
86    }
87
88    fn format_name(&self) -> &str {
89        &self.format
90    }
91}
92
93/// Adapts a [`RecordSerializer`] to the [`FormatEncoder`] interface.
94///
95/// Wraps an existing `RecordSerializer` and a schema so it can be used
96/// wherever a `FormatEncoder` is expected.
97pub struct SerializerEncoder {
98    inner: Box<dyn RecordSerializer>,
99    schema: SchemaRef,
100    format: String,
101}
102
103impl SerializerEncoder {
104    /// Creates a new bridge encoder.
105    pub fn new(
106        serializer: Box<dyn RecordSerializer>,
107        schema: SchemaRef,
108        format: impl Into<String>,
109    ) -> Self {
110        Self {
111            inner: serializer,
112            schema,
113            format: format.into(),
114        }
115    }
116}
117
118impl std::fmt::Debug for SerializerEncoder {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("SerializerEncoder")
121            .field("format", &self.format)
122            .field("schema", &self.schema)
123            .finish_non_exhaustive()
124    }
125}
126
127impl FormatEncoder for SerializerEncoder {
128    fn input_schema(&self) -> SchemaRef {
129        self.schema.clone()
130    }
131
132    fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>> {
133        self.inner
134            .serialize(batch)
135            .map_err(|e| SchemaError::DecodeError(e.to_string()))
136    }
137
138    fn format_name(&self) -> &str {
139        &self.format
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::serde::json::{JsonDeserializer, JsonSerializer};
147    use arrow_array::Int64Array;
148    use arrow_schema::{DataType, Field, Schema};
149    use std::sync::Arc;
150
151    fn test_schema() -> SchemaRef {
152        Arc::new(Schema::new(vec![
153            Field::new("id", DataType::Int64, false),
154            Field::new("name", DataType::Utf8, true),
155        ]))
156    }
157
158    // ── DeserializerDecoder tests ──────────────────────────────
159
160    #[test]
161    fn test_decoder_output_schema() {
162        let schema = test_schema();
163        let decoder =
164            DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema.clone(), "json");
165        assert_eq!(decoder.output_schema(), schema);
166        assert_eq!(decoder.format_name(), "json");
167    }
168
169    #[test]
170    fn test_decoder_debug() {
171        let decoder =
172            DeserializerDecoder::new(Box::new(JsonDeserializer::new()), test_schema(), "json");
173        let dbg = format!("{decoder:?}");
174        assert!(dbg.contains("DeserializerDecoder"));
175        assert!(dbg.contains("json"));
176    }
177
178    #[test]
179    fn test_decoder_decode_one() {
180        let schema = test_schema();
181        let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
182
183        let record = RawRecord::new(br#"{"id": 42, "name": "Alice"}"#.to_vec());
184        let batch = decoder.decode_one(&record).unwrap();
185
186        assert_eq!(batch.num_rows(), 1);
187        assert_eq!(batch.num_columns(), 2);
188    }
189
190    #[test]
191    fn test_decoder_decode_batch() {
192        let schema = test_schema();
193        let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
194
195        let records = vec![
196            RawRecord::new(br#"{"id": 1, "name": "A"}"#.to_vec()),
197            RawRecord::new(br#"{"id": 2, "name": "B"}"#.to_vec()),
198        ];
199
200        let batch = decoder.decode_batch(&records).unwrap();
201        assert_eq!(batch.num_rows(), 2);
202    }
203
204    #[test]
205    fn test_decoder_decode_empty() {
206        let schema = test_schema();
207        let decoder =
208            DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema.clone(), "json");
209
210        let batch = decoder.decode_batch(&[]).unwrap();
211        assert_eq!(batch.num_rows(), 0);
212        assert_eq!(batch.schema(), schema);
213    }
214
215    #[test]
216    fn test_decoder_decode_error() {
217        let schema = test_schema();
218        let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
219
220        let record = RawRecord::new(b"not json".to_vec());
221        let result = decoder.decode_one(&record);
222        assert!(result.is_err());
223    }
224
225    // ── SerializerEncoder tests ────────────────────────────────
226
227    #[test]
228    fn test_encoder_input_schema() {
229        let schema = test_schema();
230        let encoder =
231            SerializerEncoder::new(Box::new(JsonSerializer::new()), schema.clone(), "json");
232        assert_eq!(encoder.input_schema(), schema);
233        assert_eq!(encoder.format_name(), "json");
234    }
235
236    #[test]
237    fn test_encoder_debug() {
238        let encoder =
239            SerializerEncoder::new(Box::new(JsonSerializer::new()), test_schema(), "json");
240        let dbg = format!("{encoder:?}");
241        assert!(dbg.contains("SerializerEncoder"));
242    }
243
244    #[test]
245    fn test_encoder_encode_batch() {
246        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
247
248        let batch = RecordBatch::try_new(
249            schema.clone(),
250            vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
251        )
252        .unwrap();
253
254        let encoder = SerializerEncoder::new(Box::new(JsonSerializer::new()), schema, "json");
255
256        let output = encoder.encode_batch(&batch).unwrap();
257        assert_eq!(output.len(), 3);
258
259        // Each encoded record should be valid JSON.
260        for bytes in &output {
261            let _: serde_json::Value = serde_json::from_slice(bytes).unwrap();
262        }
263    }
264
265    // ── Round-trip test ────────────────────────────────────────
266
267    #[test]
268    fn test_roundtrip_through_bridge() {
269        let schema = Arc::new(Schema::new(vec![Field::new(
270            "value",
271            DataType::Int64,
272            false,
273        )]));
274
275        // Encode.
276        let batch = RecordBatch::try_new(
277            schema.clone(),
278            vec![Arc::new(Int64Array::from(vec![10, 20]))],
279        )
280        .unwrap();
281
282        let encoder =
283            SerializerEncoder::new(Box::new(JsonSerializer::new()), schema.clone(), "json");
284        let output = encoder.encode_batch(&batch).unwrap();
285
286        // Decode.
287        let decoder = DeserializerDecoder::new(Box::new(JsonDeserializer::new()), schema, "json");
288        let records: Vec<RawRecord> = output.into_iter().map(RawRecord::new).collect();
289        let result_batch = decoder.decode_batch(&records).unwrap();
290
291        assert_eq!(result_batch.num_rows(), 2);
292        let col = result_batch
293            .column(0)
294            .as_any()
295            .downcast_ref::<Int64Array>()
296            .unwrap();
297        assert_eq!(col.value(0), 10);
298        assert_eq!(col.value(1), 20);
299    }
300}