Skip to main content

laminar_connectors/serde/
json.rs

1//! JSON serialization and deserialization.
2//!
3//! Implements [`RecordDeserializer`] / [`RecordSerializer`] by delegating
4//! to [`JsonDecoder`] and [`JsonEncoder`].
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use serde_json::Value;
9
10use super::{Format, RecordDeserializer, RecordSerializer};
11use crate::error::SerdeError;
12use crate::schema::json::decoder::JsonDecoder;
13use crate::schema::json::encoder::JsonEncoder;
14use crate::schema::traits::{FormatDecoder, FormatEncoder};
15use crate::schema::types::RawRecord;
16
17/// JSON record deserializer. Delegates to [`JsonDecoder`].
18#[derive(Debug, Clone)]
19pub struct JsonDeserializer {
20    _private: (),
21}
22
23impl JsonDeserializer {
24    /// Creates a new JSON deserializer.
25    #[must_use]
26    pub fn new() -> Self {
27        Self { _private: () }
28    }
29
30    /// Deserializes a pre-parsed JSON [`Value`] into a [`RecordBatch`].
31    ///
32    /// Used by [`DebeziumDeserializer`](super::debezium::DebeziumDeserializer)
33    /// to avoid double-parsing the envelope.
34    ///
35    /// # Errors
36    ///
37    /// Returns `SerdeError` if the value cannot be decoded.
38    pub fn deserialize_value(
39        &self,
40        value: &Value,
41        schema: &SchemaRef,
42    ) -> Result<RecordBatch, SerdeError> {
43        let bytes = serde_json::to_vec(value).map_err(|e| SerdeError::Json(e.to_string()))?;
44        self.deserialize(&bytes, schema)
45    }
46}
47
48impl Default for JsonDeserializer {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl RecordDeserializer for JsonDeserializer {
55    fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
56        let decoder = JsonDecoder::new(schema.clone());
57        let record = RawRecord::new(data.to_vec());
58        decoder
59            .decode_one(&record)
60            .map_err(|e| SerdeError::Json(e.to_string()))
61    }
62
63    fn deserialize_batch(
64        &self,
65        records: &[&[u8]],
66        schema: &SchemaRef,
67    ) -> Result<RecordBatch, SerdeError> {
68        if records.is_empty() {
69            return Ok(RecordBatch::new_empty(schema.clone()));
70        }
71        let decoder = JsonDecoder::new(schema.clone());
72        let raw_records: Vec<RawRecord> =
73            records.iter().map(|r| RawRecord::new(r.to_vec())).collect();
74        decoder
75            .decode_batch(&raw_records)
76            .map_err(|e| SerdeError::Json(e.to_string()))
77    }
78
79    fn format(&self) -> Format {
80        Format::Json
81    }
82}
83
84/// JSON record serializer. Delegates to [`JsonEncoder`].
85#[derive(Debug, Clone)]
86pub struct JsonSerializer {
87    _private: (),
88}
89
90impl JsonSerializer {
91    /// Creates a new JSON serializer.
92    #[must_use]
93    pub fn new() -> Self {
94        Self { _private: () }
95    }
96}
97
98impl Default for JsonSerializer {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl RecordSerializer for JsonSerializer {
105    fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
106        let encoder = JsonEncoder::new(batch.schema());
107        encoder
108            .encode_batch(batch)
109            .map_err(|e| SerdeError::Json(e.to_string()))
110    }
111
112    fn format(&self) -> Format {
113        Format::Json
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use arrow_schema::{DataType, Field, Schema};
121    use std::sync::Arc;
122
123    fn test_schema() -> SchemaRef {
124        Arc::new(Schema::new(vec![
125            Field::new("id", DataType::Int64, false),
126            Field::new("name", DataType::Utf8, false),
127            Field::new("score", DataType::Float64, true),
128        ]))
129    }
130
131    #[test]
132    fn test_json_deserialize_basic() {
133        let deser = JsonDeserializer::new();
134        let schema = test_schema();
135        let data = br#"{"id": 1, "name": "Alice", "score": 95.5}"#;
136
137        let batch = deser.deserialize(data, &schema).unwrap();
138        assert_eq!(batch.num_rows(), 1);
139        assert_eq!(batch.num_columns(), 3);
140    }
141
142    #[test]
143    fn test_json_serialize_roundtrip() {
144        let deser = JsonDeserializer::new();
145        let ser = JsonSerializer::new();
146        let schema = test_schema();
147
148        let data = br#"{"id": 42, "name": "Charlie", "score": 88.5}"#;
149        let batch = deser.deserialize(data, &schema).unwrap();
150
151        let serialized = ser.serialize(&batch).unwrap();
152        assert_eq!(serialized.len(), 1);
153
154        let roundtrip: Value = serde_json::from_slice(&serialized[0]).unwrap();
155        assert_eq!(roundtrip["id"], 42);
156        assert_eq!(roundtrip["name"], "Charlie");
157    }
158
159    #[test]
160    fn test_json_deserialize_batch() {
161        let deser = JsonDeserializer::new();
162        let schema = test_schema();
163
164        let r1 = br#"{"id": 1, "name": "A", "score": 10.0}"#;
165        let r2 = br#"{"id": 2, "name": "B", "score": 20.0}"#;
166        let records: Vec<&[u8]> = vec![r1, r2];
167
168        let batch = deser.deserialize_batch(&records, &schema).unwrap();
169        assert_eq!(batch.num_rows(), 2);
170    }
171
172    #[test]
173    fn test_json_deserialize_coercion() {
174        let deser = JsonDeserializer::new();
175        let schema = Arc::new(Schema::new(vec![
176            Field::new("qty", DataType::Int64, false),
177            Field::new("price", DataType::Float64, false),
178        ]));
179
180        let data = br#"{"qty": "100", "price": "187.52"}"#;
181        let batch = deser.deserialize(data, &schema).unwrap();
182
183        let qty = batch
184            .column(0)
185            .as_any()
186            .downcast_ref::<arrow_array::Int64Array>()
187            .unwrap();
188        assert_eq!(qty.value(0), 100);
189    }
190}