1use std::sync::Arc;
6
7use arrow_array::builder::{
8 BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
9 Int8Builder, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
10};
11use arrow_array::{ArrayRef, RecordBatch};
12use arrow_schema::{DataType, SchemaRef};
13use serde_json::Value;
14
15use super::{Format, RecordDeserializer, RecordSerializer};
16use crate::error::SerdeError;
17use crate::schema::json::decoder::JsonDecoder;
18use crate::schema::traits::FormatDecoder;
19use crate::schema::types::RawRecord;
20
21#[derive(Debug, Clone)]
31pub struct JsonDeserializer {
32 _private: (),
33}
34
35impl JsonDeserializer {
36 #[must_use]
38 pub fn new() -> Self {
39 Self { _private: () }
40 }
41}
42
43impl Default for JsonDeserializer {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl JsonDeserializer {
50 pub fn deserialize_value(
60 &self,
61 value: &Value,
62 schema: &SchemaRef,
63 ) -> Result<RecordBatch, SerdeError> {
64 let obj = value
65 .as_object()
66 .ok_or_else(|| SerdeError::MalformedInput("expected JSON object".into()))?;
67
68 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
69
70 for field in schema.fields() {
71 let json_val = obj.get(field.name());
72
73 let is_null = json_val.is_none() || json_val == Some(&Value::Null);
74
75 if is_null && !field.is_nullable() {
76 return Err(SerdeError::MissingField(field.name().clone()));
77 }
78
79 let array = build_array_from_json(field.data_type(), json_val, field.name())?;
80 columns.push(array);
81 }
82
83 RecordBatch::try_new(schema.clone(), columns)
84 .map_err(|e| SerdeError::MalformedInput(format!("failed to create RecordBatch: {e}")))
85 }
86}
87
88impl RecordDeserializer for JsonDeserializer {
89 fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
90 let decoder = JsonDecoder::new(schema.clone());
91 let record = RawRecord::new(data.to_vec());
92 decoder
93 .decode_one(&record)
94 .map_err(|e| SerdeError::Json(e.to_string()))
95 }
96
97 fn deserialize_batch(
98 &self,
99 records: &[&[u8]],
100 schema: &SchemaRef,
101 ) -> Result<RecordBatch, SerdeError> {
102 if records.is_empty() {
103 return Ok(RecordBatch::new_empty(schema.clone()));
104 }
105 let decoder = JsonDecoder::new(schema.clone());
106 let raw_records: Vec<RawRecord> =
107 records.iter().map(|r| RawRecord::new(r.to_vec())).collect();
108 decoder
109 .decode_batch(&raw_records)
110 .map_err(|e| SerdeError::Json(e.to_string()))
111 }
112
113 fn format(&self) -> Format {
114 Format::Json
115 }
116}
117
118#[derive(Debug, Clone)]
122pub struct JsonSerializer {
123 _private: (),
124}
125
126impl JsonSerializer {
127 #[must_use]
129 pub fn new() -> Self {
130 Self { _private: () }
131 }
132}
133
134impl Default for JsonSerializer {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl RecordSerializer for JsonSerializer {
141 fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
142 let mut records = Vec::with_capacity(batch.num_rows());
143 let schema = batch.schema();
144 let mut obj = serde_json::Map::with_capacity(schema.fields().len());
146
147 for row in 0..batch.num_rows() {
148 obj.clear();
149
150 for (col_idx, field) in schema.fields().iter().enumerate() {
151 let column = batch.column(col_idx);
152
153 if column.is_null(row) {
154 obj.insert(field.name().clone(), Value::Null);
155 continue;
156 }
157
158 let value = arrow_column_to_json(column, row, field.data_type())?;
159 obj.insert(field.name().clone(), value);
160 }
161
162 let json_bytes =
163 serde_json::to_vec(&obj).map_err(|e| SerdeError::Json(e.to_string()))?;
164 records.push(json_bytes);
165 }
166
167 Ok(records)
168 }
169
170 fn serialize_batch(&self, batch: &RecordBatch) -> Result<Vec<u8>, SerdeError> {
171 let schema = batch.schema();
172 let mut buf = Vec::with_capacity(batch.num_rows() * 256);
174 let mut obj = serde_json::Map::with_capacity(schema.fields().len());
175 let mut row_buf: Vec<u8> = Vec::with_capacity(256);
177
178 for row in 0..batch.num_rows() {
179 obj.clear();
180
181 for (col_idx, field) in schema.fields().iter().enumerate() {
182 let column = batch.column(col_idx);
183
184 if column.is_null(row) {
185 obj.insert(field.name().clone(), Value::Null);
186 continue;
187 }
188
189 let value = arrow_column_to_json(column, row, field.data_type())?;
190 obj.insert(field.name().clone(), value);
191 }
192
193 row_buf.clear();
194 serde_json::to_writer(&mut row_buf, &obj)
195 .map_err(|e| SerdeError::Json(e.to_string()))?;
196 buf.extend_from_slice(&row_buf);
197 buf.push(b'\n');
198 }
199
200 Ok(buf)
201 }
202
203 fn format(&self) -> Format {
204 Format::Json
205 }
206}
207
208fn build_array_from_json(
210 data_type: &DataType,
211 value: Option<&Value>,
212 field_name: &str,
213) -> Result<ArrayRef, SerdeError> {
214 match data_type {
215 DataType::Boolean => {
216 let mut builder = BooleanBuilder::with_capacity(1);
217 match value {
218 Some(Value::Bool(b)) => builder.append_value(*b),
219 Some(Value::Null) | None => builder.append_null(),
220 _ => {
221 return Err(SerdeError::TypeConversion {
222 field: field_name.into(),
223 expected: "Boolean".into(),
224 message: format!("got {value:?}"),
225 })
226 }
227 }
228 Ok(Arc::new(builder.finish()))
229 }
230 DataType::Int8 => build_int_array::<Int8Builder>(value, field_name, "Int8"),
231 DataType::Int16 => build_int_array::<Int16Builder>(value, field_name, "Int16"),
232 DataType::Int32 => build_int_array::<Int32Builder>(value, field_name, "Int32"),
233 DataType::Int64 => build_int_array::<Int64Builder>(value, field_name, "Int64"),
234 DataType::UInt8 => build_uint_array::<UInt8Builder>(value, field_name, "UInt8"),
235 DataType::UInt16 => build_uint_array::<UInt16Builder>(value, field_name, "UInt16"),
236 DataType::UInt32 => build_uint_array::<UInt32Builder>(value, field_name, "UInt32"),
237 DataType::UInt64 => build_uint_array::<UInt64Builder>(value, field_name, "UInt64"),
238 DataType::Float32 => {
239 let mut builder = Float32Builder::with_capacity(1);
240 match value {
241 Some(Value::Number(n)) => {
242 let v = n.as_f64().ok_or_else(|| SerdeError::TypeConversion {
243 field: field_name.into(),
244 expected: "Float32".into(),
245 message: format!("cannot convert {n}"),
246 })?;
247 #[allow(clippy::cast_possible_truncation)]
248 builder.append_value(v as f32);
250 }
251 Some(Value::Null) | None => builder.append_null(),
252 _ => {
253 return Err(SerdeError::TypeConversion {
254 field: field_name.into(),
255 expected: "Float32".into(),
256 message: format!("got {value:?}"),
257 })
258 }
259 }
260 Ok(Arc::new(builder.finish()))
261 }
262 DataType::Float64 => {
263 let mut builder = Float64Builder::with_capacity(1);
264 match value {
265 Some(Value::Number(n)) => {
266 let v = n.as_f64().ok_or_else(|| SerdeError::TypeConversion {
267 field: field_name.into(),
268 expected: "Float64".into(),
269 message: format!("cannot convert {n}"),
270 })?;
271 builder.append_value(v);
272 }
273 Some(Value::Null) | None => builder.append_null(),
274 _ => {
275 return Err(SerdeError::TypeConversion {
276 field: field_name.into(),
277 expected: "Float64".into(),
278 message: format!("got {value:?}"),
279 })
280 }
281 }
282 Ok(Arc::new(builder.finish()))
283 }
284 DataType::Utf8 => {
285 let mut builder = StringBuilder::with_capacity(1, 64);
286 match value {
287 Some(Value::String(s)) => builder.append_value(s),
288 Some(Value::Null) | None => builder.append_null(),
289 Some(other) => builder.append_value(other.to_string()),
291 }
292 Ok(Arc::new(builder.finish()))
293 }
294 other => Err(SerdeError::UnsupportedFormat(format!(
295 "unsupported Arrow type for JSON deserialization: {other}"
296 ))),
297 }
298}
299
300trait IntBuilder: Default {
302 type Native: TryFrom<i64>;
303 fn append_value(&mut self, v: Self::Native);
304 fn append_null(&mut self);
305 fn finish_array(self) -> ArrayRef;
306}
307
308macro_rules! impl_int_builder {
309 ($builder:ty, $native:ty) => {
310 impl IntBuilder for $builder {
311 type Native = $native;
312 fn append_value(&mut self, v: Self::Native) {
313 <$builder>::append_value(self, v);
314 }
315 fn append_null(&mut self) {
316 <$builder>::append_null(self);
317 }
318 fn finish_array(mut self) -> ArrayRef {
319 Arc::new(self.finish())
320 }
321 }
322 };
323}
324
325impl_int_builder!(Int8Builder, i8);
326impl_int_builder!(Int16Builder, i16);
327impl_int_builder!(Int32Builder, i32);
328impl_int_builder!(Int64Builder, i64);
329
330trait UintBuilder: Default {
331 type Native: TryFrom<u64>;
332 fn append_value(&mut self, v: Self::Native);
333 fn append_null(&mut self);
334 fn finish_array(self) -> ArrayRef;
335}
336
337macro_rules! impl_uint_builder {
338 ($builder:ty, $native:ty) => {
339 impl UintBuilder for $builder {
340 type Native = $native;
341 fn append_value(&mut self, v: Self::Native) {
342 <$builder>::append_value(self, v);
343 }
344 fn append_null(&mut self) {
345 <$builder>::append_null(self);
346 }
347 fn finish_array(mut self) -> ArrayRef {
348 Arc::new(self.finish())
349 }
350 }
351 };
352}
353
354impl_uint_builder!(UInt8Builder, u8);
355impl_uint_builder!(UInt16Builder, u16);
356impl_uint_builder!(UInt32Builder, u32);
357impl_uint_builder!(UInt64Builder, u64);
358
359fn build_int_array<B: IntBuilder>(
360 value: Option<&Value>,
361 field_name: &str,
362 type_name: &str,
363) -> Result<ArrayRef, SerdeError>
364where
365 <B::Native as TryFrom<i64>>::Error: std::fmt::Display,
366{
367 let mut builder = B::default();
368 match value {
369 Some(Value::Number(n)) => {
370 let i = n.as_i64().ok_or_else(|| SerdeError::TypeConversion {
371 field: field_name.into(),
372 expected: type_name.into(),
373 message: format!("cannot convert {n} to i64"),
374 })?;
375 let native = B::Native::try_from(i).map_err(|e| SerdeError::TypeConversion {
376 field: field_name.into(),
377 expected: type_name.into(),
378 message: format!("{e}"),
379 })?;
380 builder.append_value(native);
381 }
382 Some(Value::Null) | None => builder.append_null(),
383 _ => {
384 return Err(SerdeError::TypeConversion {
385 field: field_name.into(),
386 expected: type_name.into(),
387 message: format!("got {value:?}"),
388 })
389 }
390 }
391 Ok(builder.finish_array())
392}
393
394fn build_uint_array<B: UintBuilder>(
395 value: Option<&Value>,
396 field_name: &str,
397 type_name: &str,
398) -> Result<ArrayRef, SerdeError>
399where
400 <B::Native as TryFrom<u64>>::Error: std::fmt::Display,
401{
402 let mut builder = B::default();
403 match value {
404 Some(Value::Number(n)) => {
405 let u = n.as_u64().ok_or_else(|| SerdeError::TypeConversion {
406 field: field_name.into(),
407 expected: type_name.into(),
408 message: format!("cannot convert {n} to u64"),
409 })?;
410 let native = B::Native::try_from(u).map_err(|e| SerdeError::TypeConversion {
411 field: field_name.into(),
412 expected: type_name.into(),
413 message: format!("{e}"),
414 })?;
415 builder.append_value(native);
416 }
417 Some(Value::Null) | None => builder.append_null(),
418 _ => {
419 return Err(SerdeError::TypeConversion {
420 field: field_name.into(),
421 expected: type_name.into(),
422 message: format!("got {value:?}"),
423 })
424 }
425 }
426 Ok(builder.finish_array())
427}
428
429fn arrow_column_to_json(
431 column: &ArrayRef,
432 row: usize,
433 data_type: &DataType,
434) -> Result<Value, SerdeError> {
435 use arrow_array::{
436 BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
437 StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
438 };
439
440 match data_type {
441 DataType::Boolean => {
442 let arr = column.as_any().downcast_ref::<BooleanArray>().unwrap();
443 Ok(Value::Bool(arr.value(row)))
444 }
445 DataType::Int8 => {
446 let arr = column.as_any().downcast_ref::<Int8Array>().unwrap();
447 Ok(Value::Number(i64::from(arr.value(row)).into()))
448 }
449 DataType::Int16 => {
450 let arr = column.as_any().downcast_ref::<Int16Array>().unwrap();
451 Ok(Value::Number(i64::from(arr.value(row)).into()))
452 }
453 DataType::Int32 => {
454 let arr = column.as_any().downcast_ref::<Int32Array>().unwrap();
455 Ok(Value::Number(i64::from(arr.value(row)).into()))
456 }
457 DataType::Int64 => {
458 let arr = column.as_any().downcast_ref::<Int64Array>().unwrap();
459 Ok(Value::Number(arr.value(row).into()))
460 }
461 DataType::UInt8 => {
462 let arr = column.as_any().downcast_ref::<UInt8Array>().unwrap();
463 Ok(Value::Number(u64::from(arr.value(row)).into()))
464 }
465 DataType::UInt16 => {
466 let arr = column.as_any().downcast_ref::<UInt16Array>().unwrap();
467 Ok(Value::Number(u64::from(arr.value(row)).into()))
468 }
469 DataType::UInt32 => {
470 let arr = column.as_any().downcast_ref::<UInt32Array>().unwrap();
471 Ok(Value::Number(u64::from(arr.value(row)).into()))
472 }
473 DataType::UInt64 => {
474 let arr = column.as_any().downcast_ref::<UInt64Array>().unwrap();
475 Ok(Value::Number(arr.value(row).into()))
476 }
477 DataType::Float32 => {
478 let arr = column.as_any().downcast_ref::<Float32Array>().unwrap();
479 let v = f64::from(arr.value(row));
480 Ok(serde_json::Number::from_f64(v).map_or(Value::Null, Value::Number))
481 }
482 DataType::Float64 => {
483 let arr = column.as_any().downcast_ref::<Float64Array>().unwrap();
484 Ok(serde_json::Number::from_f64(arr.value(row)).map_or(Value::Null, Value::Number))
485 }
486 DataType::Utf8 => {
487 let arr = column.as_any().downcast_ref::<StringArray>().unwrap();
488 Ok(Value::String(arr.value(row).to_string()))
489 }
490 other => Err(SerdeError::UnsupportedFormat(format!(
491 "unsupported Arrow type for JSON serialization: {other}"
492 ))),
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use arrow_schema::{Field, Schema};
500
501 fn test_schema() -> SchemaRef {
502 Arc::new(Schema::new(vec![
503 Field::new("id", DataType::Int64, false),
504 Field::new("name", DataType::Utf8, false),
505 Field::new("score", DataType::Float64, true),
506 ]))
507 }
508
509 #[test]
510 fn test_json_deserialize_basic() {
511 let deser = JsonDeserializer::new();
512 let schema = test_schema();
513 let data = br#"{"id": 1, "name": "Alice", "score": 95.5}"#;
514
515 let batch = deser.deserialize(data, &schema).unwrap();
516 assert_eq!(batch.num_rows(), 1);
517 assert_eq!(batch.num_columns(), 3);
518
519 let ids = batch
520 .column(0)
521 .as_any()
522 .downcast_ref::<arrow_array::Int64Array>()
523 .unwrap();
524 assert_eq!(ids.value(0), 1);
525 }
526
527 #[test]
528 fn test_json_deserialize_null_field() {
529 let deser = JsonDeserializer::new();
530 let schema = test_schema();
531 let data = br#"{"id": 2, "name": "Bob", "score": null}"#;
532
533 let batch = deser.deserialize(data, &schema).unwrap();
534 assert_eq!(batch.num_rows(), 1);
535 assert!(batch.column(2).is_null(0));
536 }
537
538 #[test]
539 fn test_json_deserialize_missing_required_field() {
540 let deser = JsonDeserializer::new();
541 let schema = test_schema(); let data = br#"{"id": 3, "score": 80.0}"#;
543
544 let result = deser.deserialize(data, &schema);
546 assert!(result.is_err());
547 }
548
549 #[test]
550 fn test_json_serialize_roundtrip() {
551 let deser = JsonDeserializer::new();
552 let ser = JsonSerializer::new();
553 let schema = test_schema();
554
555 let data = br#"{"id": 42, "name": "Charlie", "score": 88.5}"#;
556 let batch = deser.deserialize(data, &schema).unwrap();
557
558 let serialized = ser.serialize(&batch).unwrap();
559 assert_eq!(serialized.len(), 1);
560
561 let roundtrip: Value = serde_json::from_slice(&serialized[0]).unwrap();
562 assert_eq!(roundtrip["id"], 42);
563 assert_eq!(roundtrip["name"], "Charlie");
564 }
565
566 #[test]
567 fn test_json_deserialize_batch() {
568 let deser = JsonDeserializer::new();
569 let schema = test_schema();
570
571 let r1 = br#"{"id": 1, "name": "A", "score": 10.0}"#;
572 let r2 = br#"{"id": 2, "name": "B", "score": 20.0}"#;
573 let records: Vec<&[u8]> = vec![r1, r2];
574
575 let batch = deser.deserialize_batch(&records, &schema).unwrap();
576 assert_eq!(batch.num_rows(), 2);
577 }
578
579 #[test]
580 fn test_json_serialize_batch_ndjson() {
581 let deser = JsonDeserializer::new();
582 let ser = JsonSerializer::new();
583 let schema = test_schema();
584
585 let r1 = br#"{"id": 1, "name": "A", "score": 10.0}"#;
586 let r2 = br#"{"id": 2, "name": "B", "score": 20.0}"#;
587 let records: Vec<&[u8]> = vec![r1, r2];
588 let batch = deser.deserialize_batch(&records, &schema).unwrap();
589
590 let ndjson = ser.serialize_batch(&batch).unwrap();
591 let lines: Vec<&str> = std::str::from_utf8(&ndjson)
592 .unwrap()
593 .lines()
594 .filter(|l| !l.is_empty())
595 .collect();
596 assert_eq!(lines.len(), 2);
597 }
598
599 #[test]
600 fn test_json_deserialize_coercion() {
601 let deser = JsonDeserializer::new();
602 let schema = Arc::new(Schema::new(vec![
603 Field::new("qty", DataType::Int64, false),
604 Field::new("price", DataType::Float64, false),
605 ]));
606
607 let data = br#"{"qty": "100", "price": "187.52"}"#;
609 let batch = deser.deserialize(data, &schema).unwrap();
610
611 let qty = batch
612 .column(0)
613 .as_any()
614 .downcast_ref::<arrow_array::Int64Array>()
615 .unwrap();
616 assert_eq!(qty.value(0), 100);
617
618 let price = batch
619 .column(1)
620 .as_any()
621 .downcast_ref::<arrow_array::Float64Array>()
622 .unwrap();
623 assert!((price.value(0) - 187.52).abs() < f64::EPSILON);
624 }
625
626 #[test]
627 fn test_json_type_coercion() {
628 let deser = JsonDeserializer::new();
629 let schema = Arc::new(Schema::new(vec![Field::new(
630 "value",
631 DataType::Utf8,
632 false,
633 )]));
634
635 let data = br#"{"value": 42}"#;
637 let batch = deser.deserialize(data, &schema).unwrap();
638 let arr = batch
639 .column(0)
640 .as_any()
641 .downcast_ref::<arrow_array::StringArray>()
642 .unwrap();
643 assert_eq!(arr.value(0), "42");
644 }
645}