laminar_connectors/schema/json/
encoder.rs1use std::io::Write as _;
4use std::sync::Arc;
5
6use arrow_array::cast::AsArray;
7use arrow_array::{Array, RecordBatch};
8use arrow_json::writer::{
9 Encoder, EncoderFactory, EncoderOptions, LineDelimited, NullableEncoder, WriterBuilder,
10};
11use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
12
13use crate::schema::error::{SchemaError, SchemaResult};
14use crate::schema::traits::FormatEncoder;
15
16#[derive(Debug)]
20pub struct JsonEncoder {
21 schema: SchemaRef,
22}
23
24impl JsonEncoder {
25 #[must_use]
27 pub fn new(schema: SchemaRef) -> Self {
28 Self { schema }
29 }
30}
31
32impl FormatEncoder for JsonEncoder {
33 fn input_schema(&self) -> SchemaRef {
34 self.schema.clone()
35 }
36
37 fn encode_batch(&self, batch: &RecordBatch) -> SchemaResult<Vec<Vec<u8>>> {
38 if batch.num_rows() == 0 {
39 return Ok(Vec::new());
40 }
41
42 let mut buf = Vec::new();
43 {
44 let mut writer = WriterBuilder::new()
45 .with_explicit_nulls(true)
46 .with_encoder_factory(Arc::new(JsonbPassthroughFactory))
47 .build::<_, LineDelimited>(&mut buf);
48 writer
49 .write(batch)
50 .map_err(|e| SchemaError::DecodeError(format!("JSON encode error: {e}")))?;
51 writer
52 .finish()
53 .map_err(|e| SchemaError::DecodeError(format!("JSON finish error: {e}")))?;
54 }
55
56 let output: Vec<Vec<u8>> = buf
57 .split(|&b| b == b'\n')
58 .filter(|line| !line.is_empty())
59 .map(<[u8]>::to_vec)
60 .collect();
61
62 Ok(output)
63 }
64
65 fn format_name(&self) -> &'static str {
66 "json"
67 }
68}
69
70#[derive(Debug)]
72struct JsonbPassthroughFactory;
73
74impl EncoderFactory for JsonbPassthroughFactory {
75 fn make_default_encoder<'a>(
76 &self,
77 _field: &'a FieldRef,
78 array: &'a dyn Array,
79 _options: &'a EncoderOptions,
80 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
81 if *array.data_type() != DataType::LargeBinary {
82 return Ok(None);
83 }
84 let binary_array = array.as_binary::<i64>();
85 let nulls = binary_array.nulls().cloned();
86 let encoder = LargeBinaryJsonbEncoder {
87 array: binary_array,
88 };
89 Ok(Some(NullableEncoder::new(Box::new(encoder), nulls)))
90 }
91}
92
93struct LargeBinaryJsonbEncoder<'a> {
94 array: &'a arrow_array::LargeBinaryArray,
95}
96
97impl Encoder for LargeBinaryJsonbEncoder<'_> {
98 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
99 let bytes = self.array.value(idx);
100 if serde_json::from_slice::<serde_json::Value>(bytes).is_ok() {
101 out.extend_from_slice(bytes);
102 } else {
103 write!(out, "\"<binary:{} bytes>\"", bytes.len()).unwrap();
104 }
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use arrow_array::*;
112 use arrow_schema::{DataType, Field, Schema, TimeUnit};
113
114 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
115 Arc::new(Schema::new(
116 fields
117 .into_iter()
118 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
119 .collect::<Vec<_>>(),
120 ))
121 }
122
123 #[test]
124 fn test_encode_basic() {
125 let schema = make_schema(vec![
126 ("id", DataType::Int64, false),
127 ("name", DataType::Utf8, false),
128 ]);
129
130 let batch = RecordBatch::try_new(
131 schema.clone(),
132 vec![
133 Arc::new(Int64Array::from(vec![1, 2])),
134 Arc::new(StringArray::from(vec!["Alice", "Bob"])),
135 ],
136 )
137 .unwrap();
138
139 let encoder = JsonEncoder::new(schema);
140 let records = encoder.encode_batch(&batch).unwrap();
141
142 assert_eq!(records.len(), 2);
143
144 let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
145 assert_eq!(v0["id"], 1);
146 assert_eq!(v0["name"], "Alice");
147
148 let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
149 assert_eq!(v1["id"], 2);
150 assert_eq!(v1["name"], "Bob");
151 }
152
153 #[test]
154 fn test_encode_nulls_explicit() {
155 let schema = make_schema(vec![
156 ("a", DataType::Int64, true),
157 ("b", DataType::Utf8, true),
158 ]);
159
160 let batch = RecordBatch::try_new(
161 schema.clone(),
162 vec![
163 Arc::new(Int64Array::from(vec![Some(1), None])),
164 Arc::new(StringArray::from(vec![None, Some("x")])),
165 ],
166 )
167 .unwrap();
168
169 let encoder = JsonEncoder::new(schema);
170 let records = encoder.encode_batch(&batch).unwrap();
171
172 let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
174 assert_eq!(v0["a"], 1);
175 assert!(
176 v0.get("b").unwrap().is_null(),
177 "null key 'b' must be present"
178 );
179
180 let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
182 assert!(
183 v1.get("a").unwrap().is_null(),
184 "null key 'a' must be present"
185 );
186 assert_eq!(v1["b"], "x");
187 }
188
189 #[test]
190 fn test_encode_large_binary_jsonb() {
191 let schema = make_schema(vec![
192 ("name", DataType::Utf8, false),
193 ("extra", DataType::LargeBinary, true),
194 ]);
195
196 let json_bytes = br#"{"nested":"value","count":42}"#;
197 let batch = RecordBatch::try_new(
198 schema.clone(),
199 vec![
200 Arc::new(StringArray::from(vec!["Alice", "Bob"])),
201 Arc::new(LargeBinaryArray::from(vec![
202 Some(json_bytes.as_ref()),
203 None,
204 ])),
205 ],
206 )
207 .unwrap();
208
209 let encoder = JsonEncoder::new(schema);
210 let records = encoder.encode_batch(&batch).unwrap();
211
212 let v0: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
214 assert_eq!(v0["name"], "Alice");
215 assert!(v0["extra"].is_object(), "JSONB must be inlined as object");
216 assert_eq!(v0["extra"]["nested"], "value");
217 assert_eq!(v0["extra"]["count"], 42);
218
219 let v1: serde_json::Value = serde_json::from_slice(&records[1]).unwrap();
221 assert!(v1["extra"].is_null());
222 }
223
224 #[test]
225 fn test_encode_large_binary_non_json() {
226 let schema = make_schema(vec![("data", DataType::LargeBinary, false)]);
227
228 let batch = RecordBatch::try_new(
229 schema.clone(),
230 vec![Arc::new(LargeBinaryArray::from(vec![
231 b"\x00\x01\x02".as_ref()
232 ]))],
233 )
234 .unwrap();
235
236 let encoder = JsonEncoder::new(schema);
237 let records = encoder.encode_batch(&batch).unwrap();
238
239 let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
240 assert_eq!(v["data"], "<binary:3 bytes>");
241 }
242
243 #[test]
244 fn test_encode_all_types() {
245 let schema = make_schema(vec![
246 ("bool_col", DataType::Boolean, false),
247 ("int_col", DataType::Int64, false),
248 ("float_col", DataType::Float64, false),
249 ("str_col", DataType::Utf8, false),
250 ]);
251
252 let batch = RecordBatch::try_new(
253 schema.clone(),
254 vec![
255 Arc::new(BooleanArray::from(vec![true])),
256 Arc::new(Int64Array::from(vec![42])),
257 Arc::new(Float64Array::from(vec![3.14])),
258 Arc::new(StringArray::from(vec!["hello"])),
259 ],
260 )
261 .unwrap();
262
263 let encoder = JsonEncoder::new(schema);
264 let records = encoder.encode_batch(&batch).unwrap();
265 let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
266
267 assert_eq!(v["bool_col"], true);
268 assert_eq!(v["int_col"], 42);
269 assert!((v["float_col"].as_f64().unwrap() - 3.14).abs() < f64::EPSILON);
270 assert_eq!(v["str_col"], "hello");
271 }
272
273 #[test]
274 fn test_encode_empty_batch() {
275 let schema = make_schema(vec![("x", DataType::Int64, false)]);
276 let batch = RecordBatch::new_empty(schema.clone());
277 let encoder = JsonEncoder::new(schema);
278 let records = encoder.encode_batch(&batch).unwrap();
279 assert!(records.is_empty());
280 }
281
282 #[test]
283 fn test_encode_timestamp() {
284 let schema = make_schema(vec![(
285 "ts",
286 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
287 false,
288 )]);
289
290 let nanos = 1_736_936_600_000_000_000_i64;
291 let batch = RecordBatch::try_new(
292 schema.clone(),
293 vec![Arc::new(
294 TimestampNanosecondArray::from(vec![nanos]).with_timezone("+00:00"),
295 )],
296 )
297 .unwrap();
298
299 let encoder = JsonEncoder::new(schema);
300 let records = encoder.encode_batch(&batch).unwrap();
301 assert_eq!(records.len(), 1);
302
303 let v: serde_json::Value = serde_json::from_slice(&records[0]).unwrap();
304 assert!(v["ts"].is_string());
305 }
306}