laminar_connectors/schema/json/
encoder.rs1use std::sync::Arc;
7
8use arrow_array::cast::AsArray;
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, SchemaRef, TimeUnit};
11
12use crate::schema::error::{SchemaError, SchemaResult};
13use crate::schema::traits::FormatEncoder;
14
15#[derive(Debug)]
19pub struct JsonEncoder {
20 schema: SchemaRef,
21}
22
23impl JsonEncoder {
24 #[must_use]
26 pub fn new(schema: SchemaRef) -> Self {
27 Self { schema }
28 }
29}
30
31impl FormatEncoder for JsonEncoder {
32 fn input_schema(&self) -> SchemaRef {
33 self.schema.clone()
34 }
35
36 fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>> {
37 let num_rows = batch.num_rows();
38 let mut output = Vec::with_capacity(num_rows);
39
40 for row in 0..num_rows {
41 let mut obj = serde_json::Map::with_capacity(batch.num_columns());
42
43 for (col_idx, field) in self.schema.fields().iter().enumerate() {
44 let col = batch.column(col_idx);
45 let value = if col.is_null(row) {
46 serde_json::Value::Null
47 } else {
48 column_value_to_json(col, row, field.data_type())?
49 };
50 obj.insert(field.name().clone(), value);
51 }
52
53 let bytes = serde_json::to_vec(&serde_json::Value::Object(obj))
54 .map_err(|e| SchemaError::DecodeError(format!("JSON encode error: {e}")))?;
55 output.push(bytes);
56 }
57
58 Ok(output)
59 }
60
61 fn format_name(&self) -> &'static str {
62 "json"
63 }
64}
65
66fn column_value_to_json(
67 col: &Arc<dyn arrow_array::Array>,
68 row: usize,
69 data_type: &DataType,
70) -> SchemaResult<serde_json::Value> {
71 Ok(match data_type {
72 DataType::Boolean => serde_json::Value::Bool(col.as_boolean().value(row)),
73 DataType::Int32 => serde_json::Value::Number(
74 col.as_primitive::<arrow_array::types::Int32Type>()
75 .value(row)
76 .into(),
77 ),
78 DataType::Int64 => serde_json::Value::Number(
79 col.as_primitive::<arrow_array::types::Int64Type>()
80 .value(row)
81 .into(),
82 ),
83 DataType::Float32 => {
84 let f = f64::from(
85 col.as_primitive::<arrow_array::types::Float32Type>()
86 .value(row),
87 );
88 serde_json::Number::from_f64(f)
89 .map_or(serde_json::Value::Null, serde_json::Value::Number)
90 }
91 DataType::Float64 => {
92 let f = col
93 .as_primitive::<arrow_array::types::Float64Type>()
94 .value(row);
95 serde_json::Number::from_f64(f)
96 .map_or(serde_json::Value::Null, serde_json::Value::Number)
97 }
98 DataType::Utf8 => serde_json::Value::String(col.as_string::<i32>().value(row).to_string()),
99 DataType::LargeUtf8 => {
100 serde_json::Value::String(col.as_string::<i64>().value(row).to_string())
101 }
102 DataType::LargeBinary => {
103 let bytes = col.as_binary::<i64>().value(row);
105 match serde_json::from_slice::<serde_json::Value>(bytes) {
106 Ok(v) => v,
107 Err(_) => serde_json::Value::String(format!("<binary:{} bytes>", bytes.len())),
108 }
109 }
110 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
111 let nanos = col
112 .as_primitive::<arrow_array::types::TimestampNanosecondType>()
113 .value(row);
114 let secs = nanos.div_euclid(1_000_000_000);
118 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
119 let nsec = nanos.rem_euclid(1_000_000_000) as u32;
120 if let Some(dt) = chrono::DateTime::from_timestamp(secs, nsec) {
121 serde_json::Value::String(dt.to_rfc3339())
122 } else {
123 serde_json::Value::Number(nanos.into())
124 }
125 }
126 _ => {
128 let arr_str = arrow_cast::display::ArrayFormatter::try_new(
129 col.as_ref(),
130 &arrow_cast::display::FormatOptions::default(),
131 )
132 .map_err(|e| SchemaError::DecodeError(format!("format error: {e}")))?;
133 serde_json::Value::String(arr_str.value(row).to_string())
134 }
135 })
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use arrow_array::*;
142 use arrow_schema::{Field, Schema};
143
144 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
145 Arc::new(Schema::new(
146 fields
147 .into_iter()
148 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
149 .collect::<Vec<_>>(),
150 ))
151 }
152
153 #[test]
154 fn test_encode_basic() {
155 let schema = make_schema(vec![
156 ("id", DataType::Int64, false),
157 ("name", DataType::Utf8, false),
158 ]);
159
160 let batch = RecordBatch::try_new(
161 schema.clone(),
162 vec![
163 Arc::new(Int64Array::from(vec![1, 2])),
164 Arc::new(StringArray::from(vec!["Alice", "Bob"])),
165 ],
166 )
167 .unwrap();
168
169 let encoder = JsonEncoder::new(schema);
170 let records = encoder.encode_batch(&batch).unwrap();
171
172 assert_eq!(records.len(), 2);
173
174 let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
175 assert_eq!(v0["id"], 1);
176 assert_eq!(v0["name"], "Alice");
177
178 let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
179 assert_eq!(v1["id"], 2);
180 assert_eq!(v1["name"], "Bob");
181 }
182
183 #[test]
184 fn test_encode_nulls() {
185 let schema = make_schema(vec![("value", DataType::Int64, true)]);
186
187 let batch = RecordBatch::try_new(
188 schema.clone(),
189 vec![Arc::new(Int64Array::from(vec![Some(1), None]))],
190 )
191 .unwrap();
192
193 let encoder = JsonEncoder::new(schema);
194 let records = encoder.encode_batch(&batch).unwrap();
195
196 let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
197 assert_eq!(v0["value"], 1);
198
199 let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
200 assert!(v1["value"].is_null());
201 }
202
203 #[test]
204 fn test_encode_all_types() {
205 let schema = make_schema(vec![
206 ("bool_col", DataType::Boolean, false),
207 ("int_col", DataType::Int64, false),
208 ("float_col", DataType::Float64, false),
209 ("str_col", DataType::Utf8, false),
210 ]);
211
212 let batch = RecordBatch::try_new(
213 schema.clone(),
214 vec![
215 Arc::new(BooleanArray::from(vec![true])),
216 Arc::new(Int64Array::from(vec![42])),
217 Arc::new(Float64Array::from(vec![3.14])),
218 Arc::new(StringArray::from(vec!["hello"])),
219 ],
220 )
221 .unwrap();
222
223 let encoder = JsonEncoder::new(schema);
224 let records = encoder.encode_batch(&batch).unwrap();
225 let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
226
227 assert_eq!(v["bool_col"], true);
228 assert_eq!(v["int_col"], 42);
229 assert!((v["float_col"].as_f64().unwrap() - 3.14).abs() < f64::EPSILON);
230 assert_eq!(v["str_col"], "hello");
231 }
232
233 #[test]
234 fn test_encode_empty_batch() {
235 let schema = make_schema(vec![("x", DataType::Int64, false)]);
236 let batch = RecordBatch::new_empty(schema.clone());
237 let encoder = JsonEncoder::new(schema);
238 let records = encoder.encode_batch(&batch).unwrap();
239 assert!(records.is_empty());
240 }
241}