Skip to main content

laminar_connectors/schema/json/
encoder.rs

1//! JSON format encoder implementing [`FormatEncoder`].
2//!
3//! Converts Arrow `RecordBatch`es into JSON byte payloads (one JSON
4//! object per row).
5
6use std::sync::Arc;
7
8use arrow_array::cast::AsArray;
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, SchemaRef, TimeUnit};
11
12use crate::schema::error::{SchemaError, SchemaResult};
13use crate::schema::traits::FormatEncoder;
14
15/// Encodes Arrow `RecordBatch`es into JSON byte records.
16///
17/// Each row becomes one JSON object serialized as UTF-8 bytes.
18#[derive(Debug)]
19pub struct JsonEncoder {
20    schema: SchemaRef,
21}
22
23impl JsonEncoder {
24    /// Creates a new JSON encoder for the given schema.
25    #[must_use]
26    pub fn new(schema: SchemaRef) -> Self {
27        Self { schema }
28    }
29}
30
31impl FormatEncoder for JsonEncoder {
32    fn input_schema(&self) -> SchemaRef {
33        self.schema.clone()
34    }
35
36    fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>> {
37        let num_rows = batch.num_rows();
38        let mut output = Vec::with_capacity(num_rows);
39
40        for row in 0..num_rows {
41            let mut obj = serde_json::Map::with_capacity(batch.num_columns());
42
43            for (col_idx, field) in self.schema.fields().iter().enumerate() {
44                let col = batch.column(col_idx);
45                let value = if col.is_null(row) {
46                    serde_json::Value::Null
47                } else {
48                    column_value_to_json(col, row, field.data_type())?
49                };
50                obj.insert(field.name().clone(), value);
51            }
52
53            let bytes = serde_json::to_vec(&serde_json::Value::Object(obj))
54                .map_err(|e| SchemaError::DecodeError(format!("JSON encode error: {e}")))?;
55            output.push(bytes);
56        }
57
58        Ok(output)
59    }
60
61    fn format_name(&self) -> &'static str {
62        "json"
63    }
64}
65
66fn column_value_to_json(
67    col: &Arc<dyn arrow_array::Array>,
68    row: usize,
69    data_type: &DataType,
70) -> SchemaResult<serde_json::Value> {
71    Ok(match data_type {
72        DataType::Boolean => serde_json::Value::Bool(col.as_boolean().value(row)),
73        DataType::Int32 => serde_json::Value::Number(
74            col.as_primitive::<arrow_array::types::Int32Type>()
75                .value(row)
76                .into(),
77        ),
78        DataType::Int64 => serde_json::Value::Number(
79            col.as_primitive::<arrow_array::types::Int64Type>()
80                .value(row)
81                .into(),
82        ),
83        DataType::Float32 => {
84            let f = f64::from(
85                col.as_primitive::<arrow_array::types::Float32Type>()
86                    .value(row),
87            );
88            serde_json::Number::from_f64(f)
89                .map_or(serde_json::Value::Null, serde_json::Value::Number)
90        }
91        DataType::Float64 => {
92            let f = col
93                .as_primitive::<arrow_array::types::Float64Type>()
94                .value(row);
95            serde_json::Number::from_f64(f)
96                .map_or(serde_json::Value::Null, serde_json::Value::Number)
97        }
98        DataType::Utf8 => serde_json::Value::String(col.as_string::<i32>().value(row).to_string()),
99        DataType::LargeUtf8 => {
100            serde_json::Value::String(col.as_string::<i64>().value(row).to_string())
101        }
102        DataType::LargeBinary => {
103            // Attempt to parse as JSON; fallback to opaque binary string.
104            let bytes = col.as_binary::<i64>().value(row);
105            match serde_json::from_slice::<serde_json::Value>(bytes) {
106                Ok(v) => v,
107                Err(_) => serde_json::Value::String(format!("<binary:{} bytes>", bytes.len())),
108            }
109        }
110        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
111            let nanos = col
112                .as_primitive::<arrow_array::types::TimestampNanosecondType>()
113                .value(row);
114            // Convert nanos to RFC 3339 string.
115            // `rem_euclid` guarantees a non-negative remainder in [0, 1_000_000_000),
116            // which always fits in u32.
117            let secs = nanos.div_euclid(1_000_000_000);
118            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
119            let nsec = nanos.rem_euclid(1_000_000_000) as u32;
120            if let Some(dt) = chrono::DateTime::from_timestamp(secs, nsec) {
121                serde_json::Value::String(dt.to_rfc3339())
122            } else {
123                serde_json::Value::Number(nanos.into())
124            }
125        }
126        // Fallback: use Arrow's string representation.
127        _ => {
128            let arr_str = arrow_cast::display::ArrayFormatter::try_new(
129                col.as_ref(),
130                &arrow_cast::display::FormatOptions::default(),
131            )
132            .map_err(|e| SchemaError::DecodeError(format!("format error: {e}")))?;
133            serde_json::Value::String(arr_str.value(row).to_string())
134        }
135    })
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use arrow_array::*;
142    use arrow_schema::{Field, Schema};
143
144    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
145        Arc::new(Schema::new(
146            fields
147                .into_iter()
148                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
149                .collect::<Vec<_>>(),
150        ))
151    }
152
153    #[test]
154    fn test_encode_basic() {
155        let schema = make_schema(vec![
156            ("id", DataType::Int64, false),
157            ("name", DataType::Utf8, false),
158        ]);
159
160        let batch = RecordBatch::try_new(
161            schema.clone(),
162            vec![
163                Arc::new(Int64Array::from(vec![1, 2])),
164                Arc::new(StringArray::from(vec!["Alice", "Bob"])),
165            ],
166        )
167        .unwrap();
168
169        let encoder = JsonEncoder::new(schema);
170        let records = encoder.encode_batch(&batch).unwrap();
171
172        assert_eq!(records.len(), 2);
173
174        let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
175        assert_eq!(v0["id"], 1);
176        assert_eq!(v0["name"], "Alice");
177
178        let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
179        assert_eq!(v1["id"], 2);
180        assert_eq!(v1["name"], "Bob");
181    }
182
183    #[test]
184    fn test_encode_nulls() {
185        let schema = make_schema(vec![("value", DataType::Int64, true)]);
186
187        let batch = RecordBatch::try_new(
188            schema.clone(),
189            vec![Arc::new(Int64Array::from(vec![Some(1), None]))],
190        )
191        .unwrap();
192
193        let encoder = JsonEncoder::new(schema);
194        let records = encoder.encode_batch(&batch).unwrap();
195
196        let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
197        assert_eq!(v0["value"], 1);
198
199        let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
200        assert!(v1["value"].is_null());
201    }
202
203    #[test]
204    fn test_encode_all_types() {
205        let schema = make_schema(vec![
206            ("bool_col", DataType::Boolean, false),
207            ("int_col", DataType::Int64, false),
208            ("float_col", DataType::Float64, false),
209            ("str_col", DataType::Utf8, false),
210        ]);
211
212        let batch = RecordBatch::try_new(
213            schema.clone(),
214            vec![
215                Arc::new(BooleanArray::from(vec![true])),
216                Arc::new(Int64Array::from(vec![42])),
217                Arc::new(Float64Array::from(vec![3.14])),
218                Arc::new(StringArray::from(vec!["hello"])),
219            ],
220        )
221        .unwrap();
222
223        let encoder = JsonEncoder::new(schema);
224        let records = encoder.encode_batch(&batch).unwrap();
225        let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
226
227        assert_eq!(v["bool_col"], true);
228        assert_eq!(v["int_col"], 42);
229        assert!((v["float_col"].as_f64().unwrap() - 3.14).abs() < f64::EPSILON);
230        assert_eq!(v["str_col"], "hello");
231    }
232
233    #[test]
234    fn test_encode_empty_batch() {
235        let schema = make_schema(vec![("x", DataType::Int64, false)]);
236        let batch = RecordBatch::new_empty(schema.clone());
237        let encoder = JsonEncoder::new(schema);
238        let records = encoder.encode_batch(&batch).unwrap();
239        assert!(records.is_empty());
240    }
241}