laminar_connectors/serde/
json.rs1use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use serde_json::Value;
9
10use super::{Format, RecordDeserializer, RecordSerializer};
11use crate::error::SerdeError;
12use crate::schema::json::decoder::JsonDecoder;
13use crate::schema::json::encoder::JsonEncoder;
14use crate::schema::traits::{FormatDecoder, FormatEncoder};
15use crate::schema::types::RawRecord;
16
17#[derive(Debug, Clone)]
19pub struct JsonDeserializer {
20 _private: (),
21}
22
23impl JsonDeserializer {
24 #[must_use]
26 pub fn new() -> Self {
27 Self { _private: () }
28 }
29
30 pub fn deserialize_value(
39 &self,
40 value: &Value,
41 schema: &SchemaRef,
42 ) -> Result<RecordBatch, SerdeError> {
43 let bytes = serde_json::to_vec(value).map_err(|e| SerdeError::Json(e.to_string()))?;
44 self.deserialize(&bytes, schema)
45 }
46}
47
48impl Default for JsonDeserializer {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl RecordDeserializer for JsonDeserializer {
55 fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
56 let decoder = JsonDecoder::new(schema.clone());
57 let record = RawRecord::new(data.to_vec());
58 decoder
59 .decode_one(&record)
60 .map_err(|e| SerdeError::Json(e.to_string()))
61 }
62
63 fn deserialize_batch(
64 &self,
65 records: &[&[u8]],
66 schema: &SchemaRef,
67 ) -> Result<RecordBatch, SerdeError> {
68 if records.is_empty() {
69 return Ok(RecordBatch::new_empty(schema.clone()));
70 }
71 let decoder = JsonDecoder::new(schema.clone());
72 let raw_records: Vec<RawRecord> =
73 records.iter().map(|r| RawRecord::new(r.to_vec())).collect();
74 decoder
75 .decode_batch(&raw_records)
76 .map_err(|e| SerdeError::Json(e.to_string()))
77 }
78
79 fn format(&self) -> Format {
80 Format::Json
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct JsonSerializer {
87 _private: (),
88}
89
90impl JsonSerializer {
91 #[must_use]
93 pub fn new() -> Self {
94 Self { _private: () }
95 }
96}
97
98impl Default for JsonSerializer {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl RecordSerializer for JsonSerializer {
105 fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
106 let encoder = JsonEncoder::new(batch.schema());
107 encoder
108 .encode_batch(batch)
109 .map_err(|e| SerdeError::Json(e.to_string()))
110 }
111
112 fn format(&self) -> Format {
113 Format::Json
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use arrow_schema::{DataType, Field, Schema};
121 use std::sync::Arc;
122
123 fn test_schema() -> SchemaRef {
124 Arc::new(Schema::new(vec![
125 Field::new("id", DataType::Int64, false),
126 Field::new("name", DataType::Utf8, false),
127 Field::new("score", DataType::Float64, true),
128 ]))
129 }
130
131 #[test]
132 fn test_json_deserialize_basic() {
133 let deser = JsonDeserializer::new();
134 let schema = test_schema();
135 let data = br#"{"id": 1, "name": "Alice", "score": 95.5}"#;
136
137 let batch = deser.deserialize(data, &schema).unwrap();
138 assert_eq!(batch.num_rows(), 1);
139 assert_eq!(batch.num_columns(), 3);
140 }
141
142 #[test]
143 fn test_json_serialize_roundtrip() {
144 let deser = JsonDeserializer::new();
145 let ser = JsonSerializer::new();
146 let schema = test_schema();
147
148 let data = br#"{"id": 42, "name": "Charlie", "score": 88.5}"#;
149 let batch = deser.deserialize(data, &schema).unwrap();
150
151 let serialized = ser.serialize(&batch).unwrap();
152 assert_eq!(serialized.len(), 1);
153
154 let roundtrip: Value = serde_json::from_slice(&serialized[0]).unwrap();
155 assert_eq!(roundtrip["id"], 42);
156 assert_eq!(roundtrip["name"], "Charlie");
157 }
158
159 #[test]
160 fn test_json_deserialize_batch() {
161 let deser = JsonDeserializer::new();
162 let schema = test_schema();
163
164 let r1 = br#"{"id": 1, "name": "A", "score": 10.0}"#;
165 let r2 = br#"{"id": 2, "name": "B", "score": 20.0}"#;
166 let records: Vec<&[u8]> = vec![r1, r2];
167
168 let batch = deser.deserialize_batch(&records, &schema).unwrap();
169 assert_eq!(batch.num_rows(), 2);
170 }
171
172 #[test]
173 fn test_json_deserialize_coercion() {
174 let deser = JsonDeserializer::new();
175 let schema = Arc::new(Schema::new(vec![
176 Field::new("qty", DataType::Int64, false),
177 Field::new("price", DataType::Float64, false),
178 ]));
179
180 let data = br#"{"qty": "100", "price": "187.52"}"#;
181 let batch = deser.deserialize(data, &schema).unwrap();
182
183 let qty = batch
184 .column(0)
185 .as_any()
186 .downcast_ref::<arrow_array::Int64Array>()
187 .unwrap();
188 assert_eq!(qty.value(0), 100);
189 }
190}