Skip to main content

laminar_connectors/schema/json/
encoder.rs

1//! JSON format encoder implementing [`FormatEncoder`].
2
3use std::io::Write as _;
4use std::sync::Arc;
5
6use arrow_array::cast::AsArray;
7use arrow_array::{Array, RecordBatch};
8use arrow_json::writer::{
9    Encoder, EncoderFactory, EncoderOptions, LineDelimited, NullableEncoder, WriterBuilder,
10};
11use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
12
13use crate::schema::error::{SchemaError, SchemaResult};
14use crate::schema::traits::FormatEncoder;
15
16/// Encodes Arrow `RecordBatch`es into one JSON object per row via
17/// `arrow_json::writer`. `LargeBinary` columns are inlined as JSON
18/// when valid (JSONB passthrough), matching `CollectExtra` semantics.
19#[derive(Debug)]
20pub struct JsonEncoder {
21    schema: SchemaRef,
22}
23
24impl JsonEncoder {
25    /// Creates a new JSON encoder for the given schema.
26    #[must_use]
27    pub fn new(schema: SchemaRef) -> Self {
28        Self { schema }
29    }
30}
31
32impl FormatEncoder for JsonEncoder {
33    fn input_schema(&self) -> SchemaRef {
34        self.schema.clone()
35    }
36
37    fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>> {
38        if batch.num_rows() == 0 {
39            return Ok(Vec::new());
40        }
41
42        let mut buf = Vec::new();
43        {
44            let mut writer = WriterBuilder::new()
45                .with_explicit_nulls(true)
46                .with_encoder_factory(Arc::new(JsonbPassthroughFactory))
47                .build::<_, LineDelimited>(&mut buf);
48            writer
49                .write(batch)
50                .map_err(|e| SchemaError::DecodeError(format!("JSON encode error: {e}")))?;
51            writer
52                .finish()
53                .map_err(|e| SchemaError::DecodeError(format!("JSON finish error: {e}")))?;
54        }
55
56        let output: Vec<Vec<u8>> = buf
57            .split(|&b| b == b'\n')
58            .filter(|line| !line.is_empty())
59            .map(<[u8]>::to_vec)
60            .collect();
61
62        Ok(output)
63    }
64
65    fn format_name(&self) -> &'static str {
66        "json"
67    }
68}
69
70/// Inlines `LargeBinary` as raw JSON when valid; falls through otherwise.
71#[derive(Debug)]
72struct JsonbPassthroughFactory;
73
74impl EncoderFactory for JsonbPassthroughFactory {
75    fn make_default_encoder<'a>(
76        &self,
77        _field: &'a FieldRef,
78        array: &'a dyn Array,
79        _options: &'a EncoderOptions,
80    ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
81        if *array.data_type() != DataType::LargeBinary {
82            return Ok(None);
83        }
84        let binary_array = array.as_binary::<i64>();
85        let nulls = binary_array.nulls().cloned();
86        let encoder = LargeBinaryJsonbEncoder {
87            array: binary_array,
88        };
89        Ok(Some(NullableEncoder::new(Box::new(encoder), nulls)))
90    }
91}
92
93struct LargeBinaryJsonbEncoder<'a> {
94    array: &'a arrow_array::LargeBinaryArray,
95}
96
97impl Encoder for LargeBinaryJsonbEncoder<'_> {
98    fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
99        let bytes = self.array.value(idx);
100        if serde_json::from_slice::<serde_json::Value>(bytes).is_ok() {
101            out.extend_from_slice(bytes);
102        } else {
103            write!(out, "\"<binary:{} bytes>\"", bytes.len()).unwrap();
104        }
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use arrow_array::*;
112    use arrow_schema::{DataType, Field, Schema, TimeUnit};
113
114    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
115        Arc::new(Schema::new(
116            fields
117                .into_iter()
118                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
119                .collect::<Vec<_>>(),
120        ))
121    }
122
123    #[test]
124    fn test_encode_basic() {
125        let schema = make_schema(vec![
126            ("id", DataType::Int64, false),
127            ("name", DataType::Utf8, false),
128        ]);
129
130        let batch = RecordBatch::try_new(
131            schema.clone(),
132            vec![
133                Arc::new(Int64Array::from(vec![1, 2])),
134                Arc::new(StringArray::from(vec!["Alice", "Bob"])),
135            ],
136        )
137        .unwrap();
138
139        let encoder = JsonEncoder::new(schema);
140        let records = encoder.encode_batch(&batch).unwrap();
141
142        assert_eq!(records.len(), 2);
143
144        let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
145        assert_eq!(v0["id"], 1);
146        assert_eq!(v0["name"], "Alice");
147
148        let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
149        assert_eq!(v1["id"], 2);
150        assert_eq!(v1["name"], "Bob");
151    }
152
153    #[test]
154    fn test_encode_nulls_explicit() {
155        let schema = make_schema(vec![
156            ("a", DataType::Int64, true),
157            ("b", DataType::Utf8, true),
158        ]);
159
160        let batch = RecordBatch::try_new(
161            schema.clone(),
162            vec![
163                Arc::new(Int64Array::from(vec![Some(1), None])),
164                Arc::new(StringArray::from(vec![None, Some("x")])),
165            ],
166        )
167        .unwrap();
168
169        let encoder = JsonEncoder::new(schema);
170        let records = encoder.encode_batch(&batch).unwrap();
171
172        // Row 0: a=1, b=null — null key must be present.
173        let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
174        assert_eq!(v0["a"], 1);
175        assert!(
176            v0.get("b").unwrap().is_null(),
177            "null key 'b' must be present"
178        );
179
180        // Row 1: a=null, b="x" — null key must be present.
181        let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
182        assert!(
183            v1.get("a").unwrap().is_null(),
184            "null key 'a' must be present"
185        );
186        assert_eq!(v1["b"], "x");
187    }
188
189    #[test]
190    fn test_encode_large_binary_jsonb() {
191        let schema = make_schema(vec![
192            ("name", DataType::Utf8, false),
193            ("extra", DataType::LargeBinary, true),
194        ]);
195
196        let json_bytes = br#"{"nested":"value","count":42}"#;
197        let batch = RecordBatch::try_new(
198            schema.clone(),
199            vec![
200                Arc::new(StringArray::from(vec!["Alice", "Bob"])),
201                Arc::new(LargeBinaryArray::from(vec![
202                    Some(json_bytes.as_ref()),
203                    None,
204                ])),
205            ],
206        )
207        .unwrap();
208
209        let encoder = JsonEncoder::new(schema);
210        let records = encoder.encode_batch(&batch).unwrap();
211
212        // Row 0: extra must be inlined as a JSON object, not hex-encoded.
213        let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
214        assert_eq!(v0["name"], "Alice");
215        assert!(v0["extra"].is_object(), "JSONB must be inlined as object");
216        assert_eq!(v0["extra"]["nested"], "value");
217        assert_eq!(v0["extra"]["count"], 42);
218
219        // Row 1: null extra.
220        let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
221        assert!(v1["extra"].is_null());
222    }
223
224    #[test]
225    fn test_encode_large_binary_non_json() {
226        let schema = make_schema(vec![("data", DataType::LargeBinary, false)]);
227
228        let batch = RecordBatch::try_new(
229            schema.clone(),
230            vec![Arc::new(LargeBinaryArray::from(vec![
231                b"\x00\x01\x02".as_ref()
232            ]))],
233        )
234        .unwrap();
235
236        let encoder = JsonEncoder::new(schema);
237        let records = encoder.encode_batch(&batch).unwrap();
238
239        let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
240        assert_eq!(v["data"], "<binary:3 bytes>");
241    }
242
243    #[test]
244    fn test_encode_all_types() {
245        let schema = make_schema(vec![
246            ("bool_col", DataType::Boolean, false),
247            ("int_col", DataType::Int64, false),
248            ("float_col", DataType::Float64, false),
249            ("str_col", DataType::Utf8, false),
250        ]);
251
252        let batch = RecordBatch::try_new(
253            schema.clone(),
254            vec![
255                Arc::new(BooleanArray::from(vec![true])),
256                Arc::new(Int64Array::from(vec![42])),
257                Arc::new(Float64Array::from(vec![3.14])),
258                Arc::new(StringArray::from(vec!["hello"])),
259            ],
260        )
261        .unwrap();
262
263        let encoder = JsonEncoder::new(schema);
264        let records = encoder.encode_batch(&batch).unwrap();
265        let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
266
267        assert_eq!(v["bool_col"], true);
268        assert_eq!(v["int_col"], 42);
269        assert!((v["float_col"].as_f64().unwrap() - 3.14).abs() < f64::EPSILON);
270        assert_eq!(v["str_col"], "hello");
271    }
272
273    #[test]
274    fn test_encode_empty_batch() {
275        let schema = make_schema(vec![("x", DataType::Int64, false)]);
276        let batch = RecordBatch::new_empty(schema.clone());
277        let encoder = JsonEncoder::new(schema);
278        let records = encoder.encode_batch(&batch).unwrap();
279        assert!(records.is_empty());
280    }
281
282    #[test]
283    fn test_encode_timestamp() {
284        let schema = make_schema(vec![(
285            "ts",
286            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
287            false,
288        )]);
289
290        let nanos = 1_736_936_600_000_000_000_i64;
291        let batch = RecordBatch::try_new(
292            schema.clone(),
293            vec![Arc::new(
294                TimestampNanosecondArray::from(vec![nanos]).with_timezone("+00:00"),
295            )],
296        )
297        .unwrap();
298
299        let encoder = JsonEncoder::new(schema);
300        let records = encoder.encode_batch(&batch).unwrap();
301        assert_eq!(records.len(), 1);
302
303        let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
304        assert!(v["ts"].is_string());
305    }
306}