Skip to main content

laminar_connectors/serde/
json.rs

1//! JSON serialization and deserialization.
2//!
3//! Converts between JSON records and Arrow `RecordBatch` using `serde_json`.
4
5use std::sync::Arc;
6
7use arrow_array::builder::{
8    BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
9    Int8Builder, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
10};
11use arrow_array::{ArrayRef, RecordBatch};
12use arrow_schema::{DataType, SchemaRef};
13use serde_json::Value;
14
15use super::{Format, RecordDeserializer, RecordSerializer};
16use crate::error::SerdeError;
17use crate::schema::json::decoder::JsonDecoder;
18use crate::schema::traits::FormatDecoder;
19use crate::schema::types::RawRecord;
20
21/// JSON record deserializer.
22///
23/// Parses JSON objects and maps fields to Arrow columns based on the
24/// provided schema. Supports:
25/// - All integer types (i8-i64, u8-u64)
26/// - Float types (f32, f64)
27/// - Boolean
28/// - Utf8 (String)
29/// - Nullable fields
30#[derive(Debug, Clone)]
31pub struct JsonDeserializer {
32    _private: (),
33}
34
35impl JsonDeserializer {
36    /// Creates a new JSON deserializer.
37    #[must_use]
38    pub fn new() -> Self {
39        Self { _private: () }
40    }
41}
42
43impl Default for JsonDeserializer {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl JsonDeserializer {
50    /// Deserializes a pre-parsed JSON [`Value`] into a [`RecordBatch`].
51    ///
52    /// Avoids the serialize-then-reparse overhead when the caller already
53    /// holds a parsed `Value` (e.g. Debezium envelope extraction).
54    ///
55    /// # Errors
56    ///
57    /// Returns `SerdeError` if the value is not a JSON object, a required
58    /// field is missing, or Arrow array construction fails.
59    pub fn deserialize_value(
60        &self,
61        value: &Value,
62        schema: &SchemaRef,
63    ) -> Result<RecordBatch, SerdeError> {
64        let obj = value
65            .as_object()
66            .ok_or_else(|| SerdeError::MalformedInput("expected JSON object".into()))?;
67
68        let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
69
70        for field in schema.fields() {
71            let json_val = obj.get(field.name());
72
73            let is_null = json_val.is_none() || json_val == Some(&Value::Null);
74
75            if is_null && !field.is_nullable() {
76                return Err(SerdeError::MissingField(field.name().clone()));
77            }
78
79            let array = build_array_from_json(field.data_type(), json_val, field.name())?;
80            columns.push(array);
81        }
82
83        RecordBatch::try_new(schema.clone(), columns)
84            .map_err(|e| SerdeError::MalformedInput(format!("failed to create RecordBatch: {e}")))
85    }
86}
87
88impl RecordDeserializer for JsonDeserializer {
89    fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
90        let decoder = JsonDecoder::new(schema.clone());
91        let record = RawRecord::new(data.to_vec());
92        decoder
93            .decode_one(&record)
94            .map_err(|e| SerdeError::Json(e.to_string()))
95    }
96
97    fn deserialize_batch(
98        &self,
99        records: &[&[u8]],
100        schema: &SchemaRef,
101    ) -> Result<RecordBatch, SerdeError> {
102        if records.is_empty() {
103            return Ok(RecordBatch::new_empty(schema.clone()));
104        }
105        let decoder = JsonDecoder::new(schema.clone());
106        let raw_records: Vec<RawRecord> =
107            records.iter().map(|r| RawRecord::new(r.to_vec())).collect();
108        decoder
109            .decode_batch(&raw_records)
110            .map_err(|e| SerdeError::Json(e.to_string()))
111    }
112
113    fn format(&self) -> Format {
114        Format::Json
115    }
116}
117
118/// JSON record serializer.
119///
120/// Converts Arrow `RecordBatch` rows to JSON objects.
121#[derive(Debug, Clone)]
122pub struct JsonSerializer {
123    _private: (),
124}
125
126impl JsonSerializer {
127    /// Creates a new JSON serializer.
128    #[must_use]
129    pub fn new() -> Self {
130        Self { _private: () }
131    }
132}
133
134impl Default for JsonSerializer {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl RecordSerializer for JsonSerializer {
141    fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
142        let mut records = Vec::with_capacity(batch.num_rows());
143        let schema = batch.schema();
144        // Reuse a single Map across all rows (clear instead of re-allocate)
145        let mut obj = serde_json::Map::with_capacity(schema.fields().len());
146
147        for row in 0..batch.num_rows() {
148            obj.clear();
149
150            for (col_idx, field) in schema.fields().iter().enumerate() {
151                let column = batch.column(col_idx);
152
153                if column.is_null(row) {
154                    obj.insert(field.name().clone(), Value::Null);
155                    continue;
156                }
157
158                let value = arrow_column_to_json(column, row, field.data_type())?;
159                obj.insert(field.name().clone(), value);
160            }
161
162            let json_bytes =
163                serde_json::to_vec(&obj).map_err(|e| SerdeError::Json(e.to_string()))?;
164            records.push(json_bytes);
165        }
166
167        Ok(records)
168    }
169
170    fn serialize_batch(&self, batch: &RecordBatch) -> Result<Vec<u8>, SerdeError> {
171        let schema = batch.schema();
172        // Estimate ~256 bytes per row for capacity hint
173        let mut buf = Vec::with_capacity(batch.num_rows() * 256);
174        let mut obj = serde_json::Map::with_capacity(schema.fields().len());
175        // Reusable per-row write buffer
176        let mut row_buf: Vec<u8> = Vec::with_capacity(256);
177
178        for row in 0..batch.num_rows() {
179            obj.clear();
180
181            for (col_idx, field) in schema.fields().iter().enumerate() {
182                let column = batch.column(col_idx);
183
184                if column.is_null(row) {
185                    obj.insert(field.name().clone(), Value::Null);
186                    continue;
187                }
188
189                let value = arrow_column_to_json(column, row, field.data_type())?;
190                obj.insert(field.name().clone(), value);
191            }
192
193            row_buf.clear();
194            serde_json::to_writer(&mut row_buf, &obj)
195                .map_err(|e| SerdeError::Json(e.to_string()))?;
196            buf.extend_from_slice(&row_buf);
197            buf.push(b'\n');
198        }
199
200        Ok(buf)
201    }
202
203    fn format(&self) -> Format {
204        Format::Json
205    }
206}
207
208/// Builds a single-element Arrow array from a JSON value.
209fn build_array_from_json(
210    data_type: &DataType,
211    value: Option<&Value>,
212    field_name: &str,
213) -> Result<ArrayRef, SerdeError> {
214    match data_type {
215        DataType::Boolean => {
216            let mut builder = BooleanBuilder::with_capacity(1);
217            match value {
218                Some(Value::Bool(b)) => builder.append_value(*b),
219                Some(Value::Null) | None => builder.append_null(),
220                _ => {
221                    return Err(SerdeError::TypeConversion {
222                        field: field_name.into(),
223                        expected: "Boolean".into(),
224                        message: format!("got {value:?}"),
225                    })
226                }
227            }
228            Ok(Arc::new(builder.finish()))
229        }
230        DataType::Int8 => build_int_array::<Int8Builder>(value, field_name, "Int8"),
231        DataType::Int16 => build_int_array::<Int16Builder>(value, field_name, "Int16"),
232        DataType::Int32 => build_int_array::<Int32Builder>(value, field_name, "Int32"),
233        DataType::Int64 => build_int_array::<Int64Builder>(value, field_name, "Int64"),
234        DataType::UInt8 => build_uint_array::<UInt8Builder>(value, field_name, "UInt8"),
235        DataType::UInt16 => build_uint_array::<UInt16Builder>(value, field_name, "UInt16"),
236        DataType::UInt32 => build_uint_array::<UInt32Builder>(value, field_name, "UInt32"),
237        DataType::UInt64 => build_uint_array::<UInt64Builder>(value, field_name, "UInt64"),
238        DataType::Float32 => {
239            let mut builder = Float32Builder::with_capacity(1);
240            match value {
241                Some(Value::Number(n)) => {
242                    let v = n.as_f64().ok_or_else(|| SerdeError::TypeConversion {
243                        field: field_name.into(),
244                        expected: "Float32".into(),
245                        message: format!("cannot convert {n}"),
246                    })?;
247                    #[allow(clippy::cast_possible_truncation)]
248                    // f64 → f32: precision loss acceptable for Float32 columns
249                    builder.append_value(v as f32);
250                }
251                Some(Value::Null) | None => builder.append_null(),
252                _ => {
253                    return Err(SerdeError::TypeConversion {
254                        field: field_name.into(),
255                        expected: "Float32".into(),
256                        message: format!("got {value:?}"),
257                    })
258                }
259            }
260            Ok(Arc::new(builder.finish()))
261        }
262        DataType::Float64 => {
263            let mut builder = Float64Builder::with_capacity(1);
264            match value {
265                Some(Value::Number(n)) => {
266                    let v = n.as_f64().ok_or_else(|| SerdeError::TypeConversion {
267                        field: field_name.into(),
268                        expected: "Float64".into(),
269                        message: format!("cannot convert {n}"),
270                    })?;
271                    builder.append_value(v);
272                }
273                Some(Value::Null) | None => builder.append_null(),
274                _ => {
275                    return Err(SerdeError::TypeConversion {
276                        field: field_name.into(),
277                        expected: "Float64".into(),
278                        message: format!("got {value:?}"),
279                    })
280                }
281            }
282            Ok(Arc::new(builder.finish()))
283        }
284        DataType::Utf8 => {
285            let mut builder = StringBuilder::with_capacity(1, 64);
286            match value {
287                Some(Value::String(s)) => builder.append_value(s),
288                Some(Value::Null) | None => builder.append_null(),
289                // Coerce non-string values to string representation
290                Some(other) => builder.append_value(other.to_string()),
291            }
292            Ok(Arc::new(builder.finish()))
293        }
294        other => Err(SerdeError::UnsupportedFormat(format!(
295            "unsupported Arrow type for JSON deserialization: {other}"
296        ))),
297    }
298}
299
300/// Helper trait for building integer arrays from JSON.
301trait IntBuilder: Default {
302    type Native: TryFrom<i64>;
303    fn append_value(&mut self, v: Self::Native);
304    fn append_null(&mut self);
305    fn finish_array(self) -> ArrayRef;
306}
307
308macro_rules! impl_int_builder {
309    ($builder:ty, $native:ty) => {
310        impl IntBuilder for $builder {
311            type Native = $native;
312            fn append_value(&mut self, v: Self::Native) {
313                <$builder>::append_value(self, v);
314            }
315            fn append_null(&mut self) {
316                <$builder>::append_null(self);
317            }
318            fn finish_array(mut self) -> ArrayRef {
319                Arc::new(self.finish())
320            }
321        }
322    };
323}
324
325impl_int_builder!(Int8Builder, i8);
326impl_int_builder!(Int16Builder, i16);
327impl_int_builder!(Int32Builder, i32);
328impl_int_builder!(Int64Builder, i64);
329
330trait UintBuilder: Default {
331    type Native: TryFrom<u64>;
332    fn append_value(&mut self, v: Self::Native);
333    fn append_null(&mut self);
334    fn finish_array(self) -> ArrayRef;
335}
336
337macro_rules! impl_uint_builder {
338    ($builder:ty, $native:ty) => {
339        impl UintBuilder for $builder {
340            type Native = $native;
341            fn append_value(&mut self, v: Self::Native) {
342                <$builder>::append_value(self, v);
343            }
344            fn append_null(&mut self) {
345                <$builder>::append_null(self);
346            }
347            fn finish_array(mut self) -> ArrayRef {
348                Arc::new(self.finish())
349            }
350        }
351    };
352}
353
354impl_uint_builder!(UInt8Builder, u8);
355impl_uint_builder!(UInt16Builder, u16);
356impl_uint_builder!(UInt32Builder, u32);
357impl_uint_builder!(UInt64Builder, u64);
358
359fn build_int_array<B: IntBuilder>(
360    value: Option<&Value>,
361    field_name: &str,
362    type_name: &str,
363) -> Result<ArrayRef, SerdeError>
364where
365    <B::Native as TryFrom<i64>>::Error: std::fmt::Display,
366{
367    let mut builder = B::default();
368    match value {
369        Some(Value::Number(n)) => {
370            let i = n.as_i64().ok_or_else(|| SerdeError::TypeConversion {
371                field: field_name.into(),
372                expected: type_name.into(),
373                message: format!("cannot convert {n} to i64"),
374            })?;
375            let native = B::Native::try_from(i).map_err(|e| SerdeError::TypeConversion {
376                field: field_name.into(),
377                expected: type_name.into(),
378                message: format!("{e}"),
379            })?;
380            builder.append_value(native);
381        }
382        Some(Value::Null) | None => builder.append_null(),
383        _ => {
384            return Err(SerdeError::TypeConversion {
385                field: field_name.into(),
386                expected: type_name.into(),
387                message: format!("got {value:?}"),
388            })
389        }
390    }
391    Ok(builder.finish_array())
392}
393
394fn build_uint_array<B: UintBuilder>(
395    value: Option<&Value>,
396    field_name: &str,
397    type_name: &str,
398) -> Result<ArrayRef, SerdeError>
399where
400    <B::Native as TryFrom<u64>>::Error: std::fmt::Display,
401{
402    let mut builder = B::default();
403    match value {
404        Some(Value::Number(n)) => {
405            let u = n.as_u64().ok_or_else(|| SerdeError::TypeConversion {
406                field: field_name.into(),
407                expected: type_name.into(),
408                message: format!("cannot convert {n} to u64"),
409            })?;
410            let native = B::Native::try_from(u).map_err(|e| SerdeError::TypeConversion {
411                field: field_name.into(),
412                expected: type_name.into(),
413                message: format!("{e}"),
414            })?;
415            builder.append_value(native);
416        }
417        Some(Value::Null) | None => builder.append_null(),
418        _ => {
419            return Err(SerdeError::TypeConversion {
420                field: field_name.into(),
421                expected: type_name.into(),
422                message: format!("got {value:?}"),
423            })
424        }
425    }
426    Ok(builder.finish_array())
427}
428
429/// Converts an Arrow column value at `row` to a JSON value.
430fn arrow_column_to_json(
431    column: &ArrayRef,
432    row: usize,
433    data_type: &DataType,
434) -> Result<Value, SerdeError> {
435    use arrow_array::{
436        BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
437        StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
438    };
439
440    match data_type {
441        DataType::Boolean => {
442            let arr = column.as_any().downcast_ref::<BooleanArray>().unwrap();
443            Ok(Value::Bool(arr.value(row)))
444        }
445        DataType::Int8 => {
446            let arr = column.as_any().downcast_ref::<Int8Array>().unwrap();
447            Ok(Value::Number(i64::from(arr.value(row)).into()))
448        }
449        DataType::Int16 => {
450            let arr = column.as_any().downcast_ref::<Int16Array>().unwrap();
451            Ok(Value::Number(i64::from(arr.value(row)).into()))
452        }
453        DataType::Int32 => {
454            let arr = column.as_any().downcast_ref::<Int32Array>().unwrap();
455            Ok(Value::Number(i64::from(arr.value(row)).into()))
456        }
457        DataType::Int64 => {
458            let arr = column.as_any().downcast_ref::<Int64Array>().unwrap();
459            Ok(Value::Number(arr.value(row).into()))
460        }
461        DataType::UInt8 => {
462            let arr = column.as_any().downcast_ref::<UInt8Array>().unwrap();
463            Ok(Value::Number(u64::from(arr.value(row)).into()))
464        }
465        DataType::UInt16 => {
466            let arr = column.as_any().downcast_ref::<UInt16Array>().unwrap();
467            Ok(Value::Number(u64::from(arr.value(row)).into()))
468        }
469        DataType::UInt32 => {
470            let arr = column.as_any().downcast_ref::<UInt32Array>().unwrap();
471            Ok(Value::Number(u64::from(arr.value(row)).into()))
472        }
473        DataType::UInt64 => {
474            let arr = column.as_any().downcast_ref::<UInt64Array>().unwrap();
475            Ok(Value::Number(arr.value(row).into()))
476        }
477        DataType::Float32 => {
478            let arr = column.as_any().downcast_ref::<Float32Array>().unwrap();
479            let v = f64::from(arr.value(row));
480            Ok(serde_json::Number::from_f64(v).map_or(Value::Null, Value::Number))
481        }
482        DataType::Float64 => {
483            let arr = column.as_any().downcast_ref::<Float64Array>().unwrap();
484            Ok(serde_json::Number::from_f64(arr.value(row)).map_or(Value::Null, Value::Number))
485        }
486        DataType::Utf8 => {
487            let arr = column.as_any().downcast_ref::<StringArray>().unwrap();
488            Ok(Value::String(arr.value(row).to_string()))
489        }
490        other => Err(SerdeError::UnsupportedFormat(format!(
491            "unsupported Arrow type for JSON serialization: {other}"
492        ))),
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499    use arrow_schema::{Field, Schema};
500
501    fn test_schema() -> SchemaRef {
502        Arc::new(Schema::new(vec![
503            Field::new("id", DataType::Int64, false),
504            Field::new("name", DataType::Utf8, false),
505            Field::new("score", DataType::Float64, true),
506        ]))
507    }
508
509    #[test]
510    fn test_json_deserialize_basic() {
511        let deser = JsonDeserializer::new();
512        let schema = test_schema();
513        let data = br#"{"id": 1, "name": "Alice", "score": 95.5}"#;
514
515        let batch = deser.deserialize(data, &schema).unwrap();
516        assert_eq!(batch.num_rows(), 1);
517        assert_eq!(batch.num_columns(), 3);
518
519        let ids = batch
520            .column(0)
521            .as_any()
522            .downcast_ref::<arrow_array::Int64Array>()
523            .unwrap();
524        assert_eq!(ids.value(0), 1);
525    }
526
527    #[test]
528    fn test_json_deserialize_null_field() {
529        let deser = JsonDeserializer::new();
530        let schema = test_schema();
531        let data = br#"{"id": 2, "name": "Bob", "score": null}"#;
532
533        let batch = deser.deserialize(data, &schema).unwrap();
534        assert_eq!(batch.num_rows(), 1);
535        assert!(batch.column(2).is_null(0));
536    }
537
538    #[test]
539    fn test_json_deserialize_missing_required_field() {
540        let deser = JsonDeserializer::new();
541        let schema = test_schema(); // `name` is non-nullable
542        let data = br#"{"id": 3, "score": 80.0}"#;
543
544        // Missing non-nullable field → error from RecordBatch construction.
545        let result = deser.deserialize(data, &schema);
546        assert!(result.is_err());
547    }
548
549    #[test]
550    fn test_json_serialize_roundtrip() {
551        let deser = JsonDeserializer::new();
552        let ser = JsonSerializer::new();
553        let schema = test_schema();
554
555        let data = br#"{"id": 42, "name": "Charlie", "score": 88.5}"#;
556        let batch = deser.deserialize(data, &schema).unwrap();
557
558        let serialized = ser.serialize(&batch).unwrap();
559        assert_eq!(serialized.len(), 1);
560
561        let roundtrip: Value = serde_json::from_slice(&serialized[0]).unwrap();
562        assert_eq!(roundtrip["id"], 42);
563        assert_eq!(roundtrip["name"], "Charlie");
564    }
565
566    #[test]
567    fn test_json_deserialize_batch() {
568        let deser = JsonDeserializer::new();
569        let schema = test_schema();
570
571        let r1 = br#"{"id": 1, "name": "A", "score": 10.0}"#;
572        let r2 = br#"{"id": 2, "name": "B", "score": 20.0}"#;
573        let records: Vec<&[u8]> = vec![r1, r2];
574
575        let batch = deser.deserialize_batch(&records, &schema).unwrap();
576        assert_eq!(batch.num_rows(), 2);
577    }
578
579    #[test]
580    fn test_json_serialize_batch_ndjson() {
581        let deser = JsonDeserializer::new();
582        let ser = JsonSerializer::new();
583        let schema = test_schema();
584
585        let r1 = br#"{"id": 1, "name": "A", "score": 10.0}"#;
586        let r2 = br#"{"id": 2, "name": "B", "score": 20.0}"#;
587        let records: Vec<&[u8]> = vec![r1, r2];
588        let batch = deser.deserialize_batch(&records, &schema).unwrap();
589
590        let ndjson = ser.serialize_batch(&batch).unwrap();
591        let lines: Vec<&str> = std::str::from_utf8(&ndjson)
592            .unwrap()
593            .lines()
594            .filter(|l| !l.is_empty())
595            .collect();
596        assert_eq!(lines.len(), 2);
597    }
598
599    #[test]
600    fn test_json_deserialize_coercion() {
601        let deser = JsonDeserializer::new();
602        let schema = Arc::new(Schema::new(vec![
603            Field::new("qty", DataType::Int64, false),
604            Field::new("price", DataType::Float64, false),
605        ]));
606
607        // String numbers should coerce to the declared numeric types.
608        let data = br#"{"qty": "100", "price": "187.52"}"#;
609        let batch = deser.deserialize(data, &schema).unwrap();
610
611        let qty = batch
612            .column(0)
613            .as_any()
614            .downcast_ref::<arrow_array::Int64Array>()
615            .unwrap();
616        assert_eq!(qty.value(0), 100);
617
618        let price = batch
619            .column(1)
620            .as_any()
621            .downcast_ref::<arrow_array::Float64Array>()
622            .unwrap();
623        assert!((price.value(0) - 187.52).abs() < f64::EPSILON);
624    }
625
626    #[test]
627    fn test_json_type_coercion() {
628        let deser = JsonDeserializer::new();
629        let schema = Arc::new(Schema::new(vec![Field::new(
630            "value",
631            DataType::Utf8,
632            false,
633        )]));
634
635        // Numbers should be coerced to string
636        let data = br#"{"value": 42}"#;
637        let batch = deser.deserialize(data, &schema).unwrap();
638        let arr = batch
639            .column(0)
640            .as_any()
641            .downcast_ref::<arrow_array::StringArray>()
642            .unwrap();
643        assert_eq!(arr.value(0), "42");
644    }
645}