1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12
13use arrow_array::builder::{
14 BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 Int8Builder, LargeBinaryBuilder, LargeStringBuilder, StringBuilder,
16 TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
17 TimestampSecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, SchemaRef, TimeUnit};
21
22use crate::schema::error::{SchemaError, SchemaResult};
23use crate::schema::json::jsonb::JsonbEncoder;
24use crate::schema::traits::FormatDecoder;
25use crate::schema::types::RawRecord;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum UnknownFieldStrategy {
30 Ignore,
32 CollectExtra,
34 Reject,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum TypeMismatchStrategy {
41 Null,
43 Coerce,
45 Reject,
47}
48
49impl TypeMismatchStrategy {
50 #[must_use]
52 pub fn from_enforcement_str(s: &str) -> Option<Self> {
53 match s.to_lowercase().as_str() {
54 "coerce" => Some(Self::Coerce),
55 "strict" => Some(Self::Reject),
56 "permissive" => Some(Self::Null),
57 _ => None,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64enum ColumnExtraction {
65 DefaultPath,
67 CustomPath { segments: Vec<String> },
69}
70
71#[derive(Debug, Clone)]
73pub struct JsonDecoderConfig {
74 pub unknown_fields: UnknownFieldStrategy,
76
77 pub type_mismatch: TypeMismatchStrategy,
79
80 pub timestamp_formats: Vec<String>,
84
85 pub nested_as_jsonb: bool,
89
90 pub json_path: Option<Vec<String>>,
94
95 pub json_column_paths: HashMap<String, Vec<String>>,
99
100 pub json_explode: Option<Vec<String>>,
104}
105
106impl Default for JsonDecoderConfig {
107 fn default() -> Self {
108 Self {
109 unknown_fields: UnknownFieldStrategy::Ignore,
110 type_mismatch: TypeMismatchStrategy::Coerce,
111 timestamp_formats: vec![
112 "iso8601".into(),
113 "%Y-%m-%dT%H:%M:%S%.fZ".into(),
114 "%Y-%m-%dT%H:%M:%S%.f%:z".into(),
115 "%Y-%m-%d %H:%M:%S%.f".into(),
116 "%Y-%m-%d %H:%M:%S".into(),
117 ],
118 nested_as_jsonb: false,
119 json_path: None,
120 json_column_paths: HashMap::new(),
121 json_explode: None,
122 }
123 }
124}
125
126impl JsonDecoderConfig {
127 #[must_use]
133 pub fn from_connector_config(config: &crate::config::ConnectorConfig) -> Self {
134 let mut cfg = Self::default();
135
136 if let Some(path) = config.get("json.path") {
137 cfg.json_path = Some(path.split('.').map(ToString::to_string).collect());
138 }
139
140 let col_props = config.properties_with_prefix("json.column.");
141 for (col_name, path_str) in col_props {
142 cfg.json_column_paths.insert(
143 col_name,
144 path_str.split('.').map(ToString::to_string).collect(),
145 );
146 }
147
148 if let Some(explode) = config.get("json.explode") {
149 cfg.json_explode = Some(explode.split(',').map(|s| s.trim().to_string()).collect());
150 }
151
152 if let Some(enforcement) = config.get("schema.enforcement") {
153 if let Some(strategy) = TypeMismatchStrategy::from_enforcement_str(enforcement) {
154 cfg.type_mismatch = strategy;
155 }
156 }
157 if let Some(v) = config.get("nested.as.jsonb") {
158 cfg.nested_as_jsonb = v.eq_ignore_ascii_case("true");
159 }
160 cfg
161 }
162}
163
164pub struct JsonDecoder {
171 schema: SchemaRef,
173 config: JsonDecoderConfig,
175 field_indices: Vec<(String, usize)>,
177 mismatch_count: AtomicU64,
179 column_extractions: Vec<ColumnExtraction>,
181 explode_col_indices: Option<Vec<Option<usize>>>,
186}
187
188#[allow(clippy::missing_fields_in_debug)]
189impl std::fmt::Debug for JsonDecoder {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 f.debug_struct("JsonDecoder")
192 .field("schema", &self.schema)
193 .field("config", &self.config)
194 .field(
195 "mismatch_count",
196 &self.mismatch_count.load(Ordering::Relaxed),
197 )
198 .finish()
199 }
200}
201
202impl JsonDecoder {
203 #[must_use]
205 pub fn new(schema: SchemaRef) -> Self {
206 Self::with_config(schema, JsonDecoderConfig::default())
207 }
208
209 #[must_use]
211 pub fn with_config(schema: SchemaRef, config: JsonDecoderConfig) -> Self {
212 let field_indices: Vec<(String, usize)> = schema
213 .fields()
214 .iter()
215 .enumerate()
216 .map(|(i, f)| (f.name().clone(), i))
217 .collect();
218
219 let column_extractions: Vec<ColumnExtraction> = schema
220 .fields()
221 .iter()
222 .map(|f| {
223 let col_name = f.name();
224 if let Some(path_segments) = config.json_column_paths.get(col_name.as_str()) {
225 ColumnExtraction::CustomPath {
226 segments: path_segments.clone(),
227 }
228 } else {
229 ColumnExtraction::DefaultPath
230 }
231 })
232 .collect();
233
234 let explode_col_indices = config.json_explode.as_ref().map(|names| {
235 names
236 .iter()
237 .map(|name| {
238 field_indices
239 .iter()
240 .find(|(n, _)| n == name)
241 .map(|(_, idx)| *idx)
242 })
243 .collect()
244 });
245
246 Self {
247 schema,
248 config,
249 field_indices,
250 mismatch_count: AtomicU64::new(0),
251 column_extractions,
252 explode_col_indices,
253 }
254 }
255
256 pub fn mismatch_count(&self) -> u64 {
258 self.mismatch_count.load(Ordering::Relaxed)
259 }
260}
261
262impl FormatDecoder for JsonDecoder {
263 fn output_schema(&self) -> SchemaRef {
264 self.schema.clone()
265 }
266
267 #[allow(clippy::too_many_lines)]
268 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
269 if records.is_empty() {
270 return Ok(RecordBatch::new_empty(self.schema.clone()));
271 }
272
273 let num_fields = self.schema.fields().len();
274 let capacity = records.len();
275
276 let mut builders = create_builders(&self.schema, capacity);
278
279 let collect_extra = matches!(
281 self.config.unknown_fields,
282 UnknownFieldStrategy::CollectExtra
283 );
284 let mut extra_builder = if collect_extra {
285 Some(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
286 } else {
287 None
288 };
289
290 let mut jsonb_encoder = if self.config.nested_as_jsonb {
291 Some(JsonbEncoder::new())
292 } else {
293 None
294 };
295
296 let mut populated = vec![false; num_fields];
298
299 for record in records {
300 let value: serde_json::Value = serde_json::from_slice(&record.value)
301 .map_err(|e| SchemaError::DecodeError(format!("JSON parse error: {e}")))?;
302
303 let default_target: &serde_json::Value = if let Some(ref path) = self.config.json_path {
305 let mut current = &value;
306 for segment in path {
307 current = current.get(segment.as_str()).ok_or_else(|| {
308 SchemaError::DecodeError(format!("json.path segment '{segment}' not found"))
309 })?;
310 }
311 current
312 } else {
313 &value
314 };
315
316 if let Some(ref col_indices) = self.explode_col_indices {
317 let arr = default_target.as_array().ok_or_else(|| {
319 SchemaError::DecodeError("json.explode target must be an array".into())
320 })?;
321 for element in arr {
322 populated.fill(false);
323 match element {
324 serde_json::Value::Array(items) => {
325 for (pos, col_idx_opt) in col_indices.iter().enumerate() {
326 if let Some(col_idx) = col_idx_opt {
327 let val = items.get(pos).unwrap_or(&serde_json::Value::Null);
328 let field = &self.schema.fields()[*col_idx];
329 append_value(
330 &mut builders[*col_idx],
331 field.data_type(),
332 val,
333 &self.config,
334 &self.mismatch_count,
335 jsonb_encoder.as_mut(),
336 )?;
337 populated[*col_idx] = true;
338 }
339 }
340 }
341 serde_json::Value::Object(obj) => {
342 for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
343 if let Some(val) = obj.get(col_name.as_str()) {
344 let field = &self.schema.fields()[col_idx];
345 append_value(
346 &mut builders[col_idx],
347 field.data_type(),
348 val,
349 &self.config,
350 &self.mismatch_count,
351 jsonb_encoder.as_mut(),
352 )?;
353 populated[col_idx] = true;
354 }
355 }
356 }
357 _ => {
358 return Err(SchemaError::DecodeError(
359 "json.explode array elements must be arrays or objects".into(),
360 ));
361 }
362 }
363 for (col_idx, was_populated) in populated.iter().enumerate() {
365 if !was_populated {
366 append_null(&mut builders[col_idx]);
367 }
368 }
369 if let Some(ref mut eb) = extra_builder {
371 eb.append_null();
372 }
373 }
374 } else {
375 let default_obj = default_target.as_object().ok_or_else(|| {
377 SchemaError::DecodeError("JSON value must be an object".into())
378 })?;
379
380 populated.fill(false);
381
382 let mut extra_fields: Option<serde_json::Map<String, serde_json::Value>> =
384 if collect_extra {
385 Some(serde_json::Map::new())
386 } else {
387 None
388 };
389
390 for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
392 match &self.column_extractions[col_idx] {
393 ColumnExtraction::DefaultPath => {
394 if let Some(val) = default_obj.get(col_name.as_str()) {
395 populated[col_idx] = true;
396 let field = &self.schema.fields()[col_idx];
397 append_value(
398 &mut builders[col_idx],
399 field.data_type(),
400 val,
401 &self.config,
402 &self.mismatch_count,
403 jsonb_encoder.as_mut(),
404 )?;
405 }
406 }
407 ColumnExtraction::CustomPath { segments } => {
408 let extracted = navigate_path(&value, segments);
410 if let Some(val) = extracted {
411 populated[col_idx] = true;
412 let field = &self.schema.fields()[col_idx];
413 append_value(
414 &mut builders[col_idx],
415 field.data_type(),
416 val,
417 &self.config,
418 &self.mismatch_count,
419 jsonb_encoder.as_mut(),
420 )?;
421 }
422 }
423 }
424 }
425
426 if collect_extra {
428 for (key, val) in default_obj {
429 if self.field_index(key).is_none() {
430 match self.config.unknown_fields {
431 UnknownFieldStrategy::CollectExtra => {
432 if let Some(ref mut extra) = extra_fields {
433 extra.insert(key.clone(), val.clone());
434 }
435 }
436 UnknownFieldStrategy::Reject => {
437 return Err(SchemaError::DecodeError(format!(
438 "unknown field '{key}' not in schema"
439 )));
440 }
441 UnknownFieldStrategy::Ignore => {}
442 }
443 }
444 }
445 } else if matches!(self.config.unknown_fields, UnknownFieldStrategy::Reject) {
446 for key in default_obj.keys() {
447 if self.field_index(key).is_none() {
448 return Err(SchemaError::DecodeError(format!(
449 "unknown field '{key}' not in schema"
450 )));
451 }
452 }
453 }
454
455 for (col_idx, was_populated) in populated.iter().enumerate() {
457 if !was_populated {
458 append_null(&mut builders[col_idx]);
459 }
460 }
461
462 if let Some(ref mut eb) = extra_builder {
464 if let Some(ref extra) = extra_fields {
465 if extra.is_empty() {
466 eb.append_null();
467 } else {
468 let mut enc = jsonb_encoder
469 .as_mut()
470 .map_or_else(JsonbEncoder::new, |_| JsonbEncoder::new());
471 let bytes = enc.encode(&serde_json::Value::Object(extra.clone()));
472 eb.append_value(&bytes);
473 }
474 } else {
475 eb.append_null();
476 }
477 }
478 }
479 }
480
481 let mut columns: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
483
484 let final_schema = if let Some(mut eb) = extra_builder {
486 columns.push(Arc::new(eb.finish()));
487 let mut fields = self.schema.fields().to_vec();
488 fields.push(Arc::new(arrow_schema::Field::new(
489 "_extra",
490 DataType::LargeBinary,
491 true,
492 )));
493 Arc::new(arrow_schema::Schema::new(fields))
494 } else {
495 self.schema.clone()
496 };
497
498 RecordBatch::try_new(final_schema, columns)
499 .map_err(|e| SchemaError::DecodeError(format!("RecordBatch construction: {e}")))
500 }
501
502 #[allow(clippy::unnecessary_literal_bound)]
503 fn format_name(&self) -> &str {
504 "json"
505 }
506}
507
508impl JsonDecoder {
509 fn field_index(&self, name: &str) -> Option<usize> {
512 self.field_indices
513 .iter()
514 .find(|(n, _)| n == name)
515 .map(|(_, idx)| *idx)
516 }
517}
518
519fn navigate_path<'a>(
522 root: &'a serde_json::Value,
523 segments: &[String],
524) -> Option<&'a serde_json::Value> {
525 let mut current = root;
526 for segment in segments {
527 current = current.get(segment.as_str())?;
528 }
529 Some(current)
530}
531
532trait ColumnBuilder: Send {
536 fn finish(&mut self) -> ArrayRef;
537 fn append_null_value(&mut self);
538 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
539}
540
541macro_rules! impl_column_builder {
542 ($builder:ty, $array:ty) => {
543 impl ColumnBuilder for $builder {
544 fn finish(&mut self) -> ArrayRef {
545 Arc::new(<$builder>::finish(self))
546 }
547 fn append_null_value(&mut self) {
548 self.append_null();
549 }
550 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
551 self
552 }
553 }
554 };
555}
556
557impl_column_builder!(BooleanBuilder, arrow_array::BooleanArray);
558impl_column_builder!(Int8Builder, arrow_array::Int8Array);
559impl_column_builder!(Int16Builder, arrow_array::Int16Array);
560impl_column_builder!(Int32Builder, arrow_array::Int32Array);
561impl_column_builder!(Int64Builder, arrow_array::Int64Array);
562impl_column_builder!(UInt8Builder, arrow_array::UInt8Array);
563impl_column_builder!(UInt16Builder, arrow_array::UInt16Array);
564impl_column_builder!(UInt32Builder, arrow_array::UInt32Array);
565impl_column_builder!(UInt64Builder, arrow_array::UInt64Array);
566impl_column_builder!(Float32Builder, arrow_array::Float32Array);
567impl_column_builder!(Float64Builder, arrow_array::Float64Array);
568impl_column_builder!(StringBuilder, arrow_array::StringArray);
569impl_column_builder!(LargeStringBuilder, arrow_array::LargeStringArray);
570impl_column_builder!(LargeBinaryBuilder, arrow_array::LargeBinaryArray);
571impl_column_builder!(TimestampSecondBuilder, arrow_array::TimestampSecondArray);
572impl_column_builder!(
573 TimestampMillisecondBuilder,
574 arrow_array::TimestampMillisecondArray
575);
576impl_column_builder!(
577 TimestampMicrosecondBuilder,
578 arrow_array::TimestampMicrosecondArray
579);
580impl_column_builder!(
581 TimestampNanosecondBuilder,
582 arrow_array::TimestampNanosecondArray
583);
584
585fn create_builders(schema: &SchemaRef, capacity: usize) -> Vec<Box<dyn ColumnBuilder>> {
586 schema
587 .fields()
588 .iter()
589 .map(|f| create_builder(f.data_type(), capacity))
590 .collect()
591}
592
593fn create_builder(data_type: &DataType, capacity: usize) -> Box<dyn ColumnBuilder> {
594 match data_type {
595 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
596 DataType::Int8 => Box::new(Int8Builder::with_capacity(capacity)),
597 DataType::Int16 => Box::new(Int16Builder::with_capacity(capacity)),
598 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
599 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
600 DataType::UInt8 => Box::new(UInt8Builder::with_capacity(capacity)),
601 DataType::UInt16 => Box::new(UInt16Builder::with_capacity(capacity)),
602 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
603 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
604 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
605 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
606 DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, capacity * 32)),
607 DataType::LargeBinary => {
608 Box::new(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
609 }
610 DataType::Timestamp(TimeUnit::Second, tz) => {
611 let builder =
612 TimestampSecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
613 Box::new(builder)
614 }
615 DataType::Timestamp(TimeUnit::Millisecond, tz) => {
616 let builder =
617 TimestampMillisecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
618 Box::new(builder)
619 }
620 DataType::Timestamp(TimeUnit::Microsecond, tz) => {
621 let builder =
622 TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
623 Box::new(builder)
624 }
625 DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
626 let builder =
627 TimestampNanosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
628 Box::new(builder)
629 }
630 _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)),
632 }
633}
634
635fn append_null(builder: &mut Box<dyn ColumnBuilder>) {
636 builder.append_null_value();
637}
638
639#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
641fn append_value(
642 builder: &mut Box<dyn ColumnBuilder>,
643 target_type: &DataType,
644 value: &serde_json::Value,
645 config: &JsonDecoderConfig,
646 mismatch_count: &AtomicU64,
647 jsonb_encoder: Option<&mut JsonbEncoder>,
648) -> SchemaResult<()> {
649 if value.is_null() {
650 builder.append_null_value();
651 return Ok(());
652 }
653
654 match target_type {
655 DataType::Boolean => {
656 let b = builder
657 .as_any_mut()
658 .downcast_mut::<BooleanBuilder>()
659 .unwrap();
660 match extract_bool(value, config) {
661 Ok(v) => b.append_value(v),
662 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
663 }
664 }
665 DataType::Int8 => {
666 let b = builder.as_any_mut().downcast_mut::<Int8Builder>().unwrap();
667 match extract_i8(value, config) {
668 Ok(v) => b.append_value(v),
669 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
670 }
671 }
672 DataType::Int16 => {
673 let b = builder.as_any_mut().downcast_mut::<Int16Builder>().unwrap();
674 match extract_i16(value, config) {
675 Ok(v) => b.append_value(v),
676 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
677 }
678 }
679 DataType::Int32 => {
680 let b = builder.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
681 match extract_i32(value, config) {
682 Ok(v) => b.append_value(v),
683 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
684 }
685 }
686 DataType::Int64 => {
687 let b = builder.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
688 match extract_i64(value, config) {
689 Ok(v) => b.append_value(v),
690 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
691 }
692 }
693 DataType::UInt8 => {
694 let b = builder.as_any_mut().downcast_mut::<UInt8Builder>().unwrap();
695 match extract_u8(value, config) {
696 Ok(v) => b.append_value(v),
697 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
698 }
699 }
700 DataType::UInt16 => {
701 let b = builder
702 .as_any_mut()
703 .downcast_mut::<UInt16Builder>()
704 .unwrap();
705 match extract_u16(value, config) {
706 Ok(v) => b.append_value(v),
707 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
708 }
709 }
710 DataType::UInt32 => {
711 let b = builder
712 .as_any_mut()
713 .downcast_mut::<UInt32Builder>()
714 .unwrap();
715 match extract_u32(value, config) {
716 Ok(v) => b.append_value(v),
717 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
718 }
719 }
720 DataType::UInt64 => {
721 let b = builder
722 .as_any_mut()
723 .downcast_mut::<UInt64Builder>()
724 .unwrap();
725 match extract_u64(value, config) {
726 Ok(v) => b.append_value(v),
727 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
728 }
729 }
730 DataType::Float32 => {
731 let b = builder
732 .as_any_mut()
733 .downcast_mut::<Float32Builder>()
734 .unwrap();
735 match extract_f32(value, config) {
736 Ok(v) => b.append_value(v),
737 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
738 }
739 }
740 DataType::Float64 => {
741 let b = builder
742 .as_any_mut()
743 .downcast_mut::<Float64Builder>()
744 .unwrap();
745 match extract_f64(value, config) {
746 Ok(v) => b.append_value(v),
747 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
748 }
749 }
750 DataType::LargeUtf8 => {
751 let b = builder
752 .as_any_mut()
753 .downcast_mut::<LargeStringBuilder>()
754 .unwrap();
755 let s = value_to_string(value);
756 b.append_value(&s);
757 }
758 DataType::LargeBinary => {
759 let b = builder
760 .as_any_mut()
761 .downcast_mut::<LargeBinaryBuilder>()
762 .unwrap();
763 if let Some(enc) = jsonb_encoder {
764 let bytes = enc.encode(value);
765 b.append_value(&bytes);
766 } else {
767 let bytes = serde_json::to_vec(value).unwrap_or_default();
769 b.append_value(&bytes);
770 }
771 }
772 DataType::Timestamp(unit, _) => match extract_timestamp(value, config, *unit) {
773 Ok(ts) => append_timestamp(builder, *unit, ts),
774 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
775 },
776 _ => {
778 let b = builder
779 .as_any_mut()
780 .downcast_mut::<StringBuilder>()
781 .unwrap();
782 let s = value_to_string(value);
783 b.append_value(&s);
784 }
785 }
786
787 Ok(())
788}
789
790fn handle_mismatch(
791 builder: &mut Box<dyn ColumnBuilder>,
792 config: &JsonDecoderConfig,
793 mismatch_count: &AtomicU64,
794 error_msg: &str,
795) -> SchemaResult<()> {
796 match config.type_mismatch {
797 TypeMismatchStrategy::Null => {
798 mismatch_count.fetch_add(1, Ordering::Relaxed);
799 builder.append_null_value();
800 Ok(())
801 }
802 TypeMismatchStrategy::Coerce => {
803 Err(SchemaError::DecodeError(format!(
805 "type coercion failed: {error_msg}"
806 )))
807 }
808 TypeMismatchStrategy::Reject => Err(SchemaError::DecodeError(format!(
809 "type mismatch: {error_msg}"
810 ))),
811 }
812}
813
814fn extract_bool(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<bool, String> {
817 if let Some(b) = value.as_bool() {
818 return Ok(b);
819 }
820 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
821 if let Some(s) = value.as_str() {
822 match s.to_ascii_lowercase().as_str() {
823 "true" | "1" | "yes" => return Ok(true),
824 "false" | "0" | "no" => return Ok(false),
825 _ => {}
826 }
827 }
828 if let Some(n) = value.as_i64() {
829 return Ok(n != 0);
830 }
831 }
832 Err(format!("expected boolean, got {}", json_type_name(value)))
833}
834
835fn extract_i8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i8, String> {
836 if let Some(n) = value.as_i64() {
837 if let Ok(v) = i8::try_from(n) {
838 return Ok(v);
839 }
840 return Err(format!("integer {n} out of i8 range"));
841 }
842 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
843 if let Some(s) = value.as_str() {
844 if let Ok(v) = s.parse::<i8>() {
845 return Ok(v);
846 }
847 }
848 if let Some(f) = value.as_f64() {
849 #[allow(clippy::cast_possible_truncation)]
850 let v = f as i8;
851 return Ok(v);
852 }
853 }
854 Err(format!("expected i8, got {}", json_type_name(value)))
855}
856
857fn extract_i16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i16, String> {
858 if let Some(n) = value.as_i64() {
859 if let Ok(v) = i16::try_from(n) {
860 return Ok(v);
861 }
862 return Err(format!("integer {n} out of i16 range"));
863 }
864 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
865 if let Some(s) = value.as_str() {
866 if let Ok(v) = s.parse::<i16>() {
867 return Ok(v);
868 }
869 }
870 if let Some(f) = value.as_f64() {
871 #[allow(clippy::cast_possible_truncation)]
872 let v = f as i16;
873 return Ok(v);
874 }
875 }
876 Err(format!("expected i16, got {}", json_type_name(value)))
877}
878
879fn extract_i32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i32, String> {
880 if let Some(n) = value.as_i64() {
881 if let Ok(v) = i32::try_from(n) {
882 return Ok(v);
883 }
884 return Err(format!("integer {n} out of i32 range"));
885 }
886 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
887 if let Some(s) = value.as_str() {
888 if let Ok(v) = s.parse::<i32>() {
889 return Ok(v);
890 }
891 }
892 if let Some(f) = value.as_f64() {
893 #[allow(clippy::cast_possible_truncation)]
894 return Ok(f as i32);
895 }
896 }
897 Err(format!("expected i32, got {}", json_type_name(value)))
898}
899
900fn extract_i64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i64, String> {
901 if let Some(n) = value.as_i64() {
902 return Ok(n);
903 }
904 if let Some(n) = value.as_u64() {
905 if let Ok(v) = i64::try_from(n) {
906 return Ok(v);
907 }
908 return Err(format!("u64 {n} out of i64 range"));
909 }
910 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
911 if let Some(s) = value.as_str() {
912 if let Ok(v) = s.parse::<i64>() {
913 return Ok(v);
914 }
915 }
916 if let Some(f) = value.as_f64() {
917 #[allow(clippy::cast_possible_truncation)]
918 return Ok(f as i64);
919 }
920 }
921 Err(format!("expected i64, got {}", json_type_name(value)))
922}
923
924fn extract_f32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f32, String> {
925 if let Some(f) = value.as_f64() {
926 #[allow(clippy::cast_possible_truncation)]
927 return Ok(f as f32);
928 }
929 if let Some(n) = value.as_i64() {
930 #[allow(clippy::cast_precision_loss)]
931 return Ok(n as f32);
932 }
933 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
934 if let Some(s) = value.as_str() {
935 if let Ok(v) = s.parse::<f32>() {
936 return Ok(v);
937 }
938 }
939 }
940 Err(format!("expected f32, got {}", json_type_name(value)))
941}
942
943fn extract_f64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f64, String> {
944 if let Some(f) = value.as_f64() {
945 return Ok(f);
946 }
947 if let Some(n) = value.as_i64() {
948 #[allow(clippy::cast_precision_loss)]
949 return Ok(n as f64);
950 }
951 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
952 if let Some(s) = value.as_str() {
953 if let Ok(v) = s.parse::<f64>() {
954 return Ok(v);
955 }
956 }
957 }
958 Err(format!("expected f64, got {}", json_type_name(value)))
959}
960
961fn extract_u8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u8, String> {
962 if let Some(n) = value.as_u64() {
963 if let Ok(v) = u8::try_from(n) {
964 return Ok(v);
965 }
966 return Err(format!("integer {n} out of u8 range"));
967 }
968 if let Some(n) = value.as_i64() {
969 if let Ok(v) = u8::try_from(n) {
970 return Ok(v);
971 }
972 return Err(format!("integer {n} out of u8 range"));
973 }
974 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
975 if let Some(s) = value.as_str() {
976 if let Ok(v) = s.parse::<u8>() {
977 return Ok(v);
978 }
979 }
980 }
981 Err(format!("expected u8, got {}", json_type_name(value)))
982}
983
984fn extract_u16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u16, String> {
985 if let Some(n) = value.as_u64() {
986 if let Ok(v) = u16::try_from(n) {
987 return Ok(v);
988 }
989 return Err(format!("integer {n} out of u16 range"));
990 }
991 if let Some(n) = value.as_i64() {
992 if let Ok(v) = u16::try_from(n) {
993 return Ok(v);
994 }
995 return Err(format!("integer {n} out of u16 range"));
996 }
997 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
998 if let Some(s) = value.as_str() {
999 if let Ok(v) = s.parse::<u16>() {
1000 return Ok(v);
1001 }
1002 }
1003 }
1004 Err(format!("expected u16, got {}", json_type_name(value)))
1005}
1006
1007fn extract_u32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u32, String> {
1008 if let Some(n) = value.as_u64() {
1009 if let Ok(v) = u32::try_from(n) {
1010 return Ok(v);
1011 }
1012 return Err(format!("integer {n} out of u32 range"));
1013 }
1014 if let Some(n) = value.as_i64() {
1015 if let Ok(v) = u32::try_from(n) {
1016 return Ok(v);
1017 }
1018 return Err(format!("integer {n} out of u32 range"));
1019 }
1020 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1021 if let Some(s) = value.as_str() {
1022 if let Ok(v) = s.parse::<u32>() {
1023 return Ok(v);
1024 }
1025 }
1026 }
1027 Err(format!("expected u32, got {}", json_type_name(value)))
1028}
1029
1030fn extract_u64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u64, String> {
1031 if let Some(n) = value.as_u64() {
1032 return Ok(n);
1033 }
1034 if let Some(n) = value.as_i64() {
1035 if let Ok(v) = u64::try_from(n) {
1036 return Ok(v);
1037 }
1038 return Err(format!("integer {n} out of u64 range"));
1039 }
1040 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1041 if let Some(s) = value.as_str() {
1042 if let Ok(v) = s.parse::<u64>() {
1043 return Ok(v);
1044 }
1045 }
1046 }
1047 Err(format!("expected u64, got {}", json_type_name(value)))
1048}
1049
1050fn extract_timestamp(
1055 value: &serde_json::Value,
1056 config: &JsonDecoderConfig,
1057 unit: TimeUnit,
1058) -> Result<i64, String> {
1059 if let Some(n) = value.as_i64() {
1061 return Ok(millis_to_unit(n, unit));
1062 }
1063 if let Some(f) = value.as_f64() {
1064 #[allow(clippy::cast_possible_truncation)]
1065 let ms = f as i64;
1066 return Ok(millis_to_unit(ms, unit));
1067 }
1068
1069 if let Some(s) = value.as_str() {
1071 for fmt in &config.timestamp_formats {
1072 if fmt == "iso8601" {
1073 if let Ok(nanos) = arrow_cast::parse::string_to_timestamp_nanos(s) {
1074 return Ok(nanos_to_unit(nanos, unit));
1075 }
1076 continue;
1077 }
1078 if let Ok(ndt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
1079 let nanos = ndt.and_utc().timestamp_nanos_opt().unwrap_or(0);
1080 return Ok(nanos_to_unit(nanos, unit));
1081 }
1082 }
1083 return Err(format!("cannot parse timestamp from string: {s}"));
1084 }
1085
1086 Err(format!("expected timestamp, got {}", json_type_name(value)))
1087}
1088
1089fn millis_to_unit(ms: i64, unit: TimeUnit) -> i64 {
1091 match unit {
1092 TimeUnit::Second => ms / 1_000,
1093 TimeUnit::Millisecond => ms,
1094 TimeUnit::Microsecond => ms * 1_000,
1095 TimeUnit::Nanosecond => ms * 1_000_000,
1096 }
1097}
1098
1099fn nanos_to_unit(nanos: i64, unit: TimeUnit) -> i64 {
1101 match unit {
1102 TimeUnit::Second => nanos / 1_000_000_000,
1103 TimeUnit::Millisecond => nanos / 1_000_000,
1104 TimeUnit::Microsecond => nanos / 1_000,
1105 TimeUnit::Nanosecond => nanos,
1106 }
1107}
1108
1109fn append_timestamp(builder: &mut Box<dyn ColumnBuilder>, unit: TimeUnit, value: i64) {
1111 match unit {
1112 TimeUnit::Second => {
1113 builder
1114 .as_any_mut()
1115 .downcast_mut::<TimestampSecondBuilder>()
1116 .unwrap()
1117 .append_value(value);
1118 }
1119 TimeUnit::Millisecond => {
1120 builder
1121 .as_any_mut()
1122 .downcast_mut::<TimestampMillisecondBuilder>()
1123 .unwrap()
1124 .append_value(value);
1125 }
1126 TimeUnit::Microsecond => {
1127 builder
1128 .as_any_mut()
1129 .downcast_mut::<TimestampMicrosecondBuilder>()
1130 .unwrap()
1131 .append_value(value);
1132 }
1133 TimeUnit::Nanosecond => {
1134 builder
1135 .as_any_mut()
1136 .downcast_mut::<TimestampNanosecondBuilder>()
1137 .unwrap()
1138 .append_value(value);
1139 }
1140 }
1141}
1142
1143fn value_to_string(value: &serde_json::Value) -> String {
1144 match value {
1145 serde_json::Value::String(s) => s.clone(),
1146 other => other.to_string(),
1147 }
1148}
1149
1150fn json_type_name(value: &serde_json::Value) -> &'static str {
1151 match value {
1152 serde_json::Value::Null => "null",
1153 serde_json::Value::Bool(_) => "boolean",
1154 serde_json::Value::Number(_) => "number",
1155 serde_json::Value::String(_) => "string",
1156 serde_json::Value::Array(_) => "array",
1157 serde_json::Value::Object(_) => "object",
1158 }
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163 use super::*;
1164 use arrow_array::cast::AsArray;
1165 use arrow_schema::{Field, Schema};
1166
1167 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
1168 Arc::new(Schema::new(
1169 fields
1170 .into_iter()
1171 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
1172 .collect::<Vec<_>>(),
1173 ))
1174 }
1175
1176 fn json_record(json: &str) -> RawRecord {
1177 RawRecord::new(json.as_bytes().to_vec())
1178 }
1179
1180 #[test]
1183 fn test_decode_empty_batch() {
1184 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1185 let decoder = JsonDecoder::new(schema.clone());
1186 let batch = decoder.decode_batch(&[]).unwrap();
1187 assert_eq!(batch.num_rows(), 0);
1188 assert_eq!(batch.schema(), schema);
1189 }
1190
1191 #[test]
1192 fn test_decode_single_record() {
1193 let schema = make_schema(vec![
1194 ("id", DataType::Int64, false),
1195 ("name", DataType::Utf8, true),
1196 ]);
1197 let decoder = JsonDecoder::new(schema);
1198 let records = vec![json_record(r#"{"id": 42, "name": "Alice"}"#)];
1199 let batch = decoder.decode_batch(&records).unwrap();
1200
1201 assert_eq!(batch.num_rows(), 1);
1202 assert_eq!(
1203 batch
1204 .column(0)
1205 .as_primitive::<arrow_array::types::Int64Type>()
1206 .value(0),
1207 42
1208 );
1209 assert_eq!(batch.column(1).as_string::<i32>().value(0), "Alice");
1210 }
1211
1212 #[test]
1213 fn test_decode_multiple_records() {
1214 let schema = make_schema(vec![
1215 ("x", DataType::Int64, false),
1216 ("y", DataType::Float64, false),
1217 ]);
1218 let decoder = JsonDecoder::new(schema);
1219 let records = vec![
1220 json_record(r#"{"x": 1, "y": 1.5}"#),
1221 json_record(r#"{"x": 2, "y": 2.5}"#),
1222 json_record(r#"{"x": 3, "y": 3.5}"#),
1223 ];
1224 let batch = decoder.decode_batch(&records).unwrap();
1225
1226 assert_eq!(batch.num_rows(), 3);
1227 let x_col = batch
1228 .column(0)
1229 .as_primitive::<arrow_array::types::Int64Type>();
1230 assert_eq!(x_col.value(0), 1);
1231 assert_eq!(x_col.value(1), 2);
1232 assert_eq!(x_col.value(2), 3);
1233 }
1234
1235 #[test]
1236 fn test_decode_all_types() {
1237 let schema = make_schema(vec![
1238 ("bool_col", DataType::Boolean, false),
1239 ("int_col", DataType::Int64, false),
1240 ("float_col", DataType::Float64, false),
1241 ("str_col", DataType::Utf8, false),
1242 ]);
1243 let decoder = JsonDecoder::new(schema);
1244 let records = vec![json_record(
1245 r#"{"bool_col": true, "int_col": 42, "float_col": 3.14, "str_col": "hello"}"#,
1246 )];
1247 let batch = decoder.decode_batch(&records).unwrap();
1248
1249 assert_eq!(batch.num_rows(), 1);
1250 assert!(batch.column(0).as_boolean().value(0));
1251 assert_eq!(
1252 batch
1253 .column(1)
1254 .as_primitive::<arrow_array::types::Int64Type>()
1255 .value(0),
1256 42
1257 );
1258 let f = batch
1259 .column(2)
1260 .as_primitive::<arrow_array::types::Float64Type>()
1261 .value(0);
1262 assert!((f - 3.14).abs() < f64::EPSILON);
1263 assert_eq!(batch.column(3).as_string::<i32>().value(0), "hello");
1264 }
1265
1266 #[test]
1269 fn test_decode_null_values() {
1270 let schema = make_schema(vec![
1271 ("a", DataType::Int64, true),
1272 ("b", DataType::Utf8, true),
1273 ]);
1274 let decoder = JsonDecoder::new(schema);
1275 let records = vec![json_record(r#"{"a": null, "b": null}"#)];
1276 let batch = decoder.decode_batch(&records).unwrap();
1277
1278 assert!(batch.column(0).is_null(0));
1279 assert!(batch.column(1).is_null(0));
1280 }
1281
1282 #[test]
1283 fn test_decode_missing_field_becomes_null() {
1284 let schema = make_schema(vec![
1285 ("a", DataType::Int64, true),
1286 ("b", DataType::Utf8, true),
1287 ]);
1288 let decoder = JsonDecoder::new(schema);
1289 let records = vec![json_record(r#"{"a": 1}"#)]; let batch = decoder.decode_batch(&records).unwrap();
1291
1292 assert_eq!(
1293 batch
1294 .column(0)
1295 .as_primitive::<arrow_array::types::Int64Type>()
1296 .value(0),
1297 1
1298 );
1299 assert!(batch.column(1).is_null(0));
1300 }
1301
1302 #[test]
1305 fn test_mismatch_null_strategy() {
1306 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1307 let config = JsonDecoderConfig {
1308 type_mismatch: TypeMismatchStrategy::Null,
1309 ..Default::default()
1310 };
1311 let decoder = JsonDecoder::with_config(schema, config);
1312 let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1313 let batch = decoder.decode_batch(&records).unwrap();
1314
1315 assert!(batch.column(0).is_null(0));
1316 assert_eq!(decoder.mismatch_count(), 1);
1317 }
1318
1319 #[test]
1320 fn test_mismatch_coerce_strategy() {
1321 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1322 let config = JsonDecoderConfig {
1323 type_mismatch: TypeMismatchStrategy::Coerce,
1324 ..Default::default()
1325 };
1326 let decoder = JsonDecoder::with_config(schema, config);
1327 let records = vec![json_record(r#"{"x": "123"}"#)];
1328 let batch = decoder.decode_batch(&records).unwrap();
1329
1330 assert_eq!(
1331 batch
1332 .column(0)
1333 .as_primitive::<arrow_array::types::Int64Type>()
1334 .value(0),
1335 123
1336 );
1337 }
1338
1339 #[test]
1340 fn test_mismatch_reject_strategy() {
1341 let schema = make_schema(vec![("x", DataType::Int64, false)]);
1342 let config = JsonDecoderConfig {
1343 type_mismatch: TypeMismatchStrategy::Reject,
1344 ..Default::default()
1345 };
1346 let decoder = JsonDecoder::with_config(schema, config);
1347 let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1348 let result = decoder.decode_batch(&records);
1349
1350 assert!(result.is_err());
1351 assert!(result.unwrap_err().to_string().contains("type mismatch"));
1352 }
1353
1354 #[test]
1357 fn test_unknown_fields_ignore() {
1358 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1359 let decoder = JsonDecoder::new(schema);
1360 let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1361 let batch = decoder.decode_batch(&records).unwrap();
1362
1363 assert_eq!(batch.num_columns(), 1);
1364 assert_eq!(batch.num_rows(), 1);
1365 }
1366
1367 #[test]
1368 fn test_unknown_fields_reject() {
1369 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1370 let config = JsonDecoderConfig {
1371 unknown_fields: UnknownFieldStrategy::Reject,
1372 ..Default::default()
1373 };
1374 let decoder = JsonDecoder::with_config(schema, config);
1375 let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1376 let result = decoder.decode_batch(&records);
1377
1378 assert!(result.is_err());
1379 assert!(result.unwrap_err().to_string().contains("unknown field"));
1380 }
1381
1382 #[test]
1383 fn test_unknown_fields_collect_extra() {
1384 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1385 let config = JsonDecoderConfig {
1386 unknown_fields: UnknownFieldStrategy::CollectExtra,
1387 ..Default::default()
1388 };
1389 let decoder = JsonDecoder::with_config(schema, config);
1390 let records = vec![json_record(r#"{"a": 1, "extra1": "v1", "extra2": 42}"#)];
1391 let batch = decoder.decode_batch(&records).unwrap();
1392
1393 assert_eq!(batch.num_columns(), 2);
1395 assert_eq!(batch.schema().field(1).name(), "_extra");
1396 assert!(!batch.column(1).is_null(0));
1397 }
1398
1399 #[test]
1402 fn test_decode_timestamp_iso8601() {
1403 let schema = make_schema(vec![(
1404 "ts",
1405 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1406 false,
1407 )]);
1408 let decoder = JsonDecoder::new(schema);
1409 let records = vec![json_record(r#"{"ts": "2025-01-15T10:30:00Z"}"#)];
1410 let batch = decoder.decode_batch(&records).unwrap();
1411
1412 assert!(!batch.column(0).is_null(0));
1413 }
1414
1415 #[test]
1416 fn test_decode_timestamp_epoch_millis() {
1417 let schema = make_schema(vec![(
1418 "ts",
1419 DataType::Timestamp(TimeUnit::Nanosecond, None),
1420 false,
1421 )]);
1422 let decoder = JsonDecoder::new(schema);
1423 let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1424 let batch = decoder.decode_batch(&records).unwrap();
1425
1426 let ts_col = batch
1427 .column(0)
1428 .as_primitive::<arrow_array::types::TimestampNanosecondType>();
1429 assert_eq!(ts_col.value(0), 1_705_312_200_000_000_000);
1431 }
1432
1433 #[test]
1436 fn test_decode_nested_object_as_json_string() {
1437 let schema = make_schema(vec![("data", DataType::LargeBinary, true)]);
1438 let decoder = JsonDecoder::new(schema);
1439 let records = vec![json_record(r#"{"data": {"nested": true}}"#)];
1440 let batch = decoder.decode_batch(&records).unwrap();
1441
1442 assert!(!batch.column(0).is_null(0));
1443 }
1444
1445 #[test]
1448 fn test_decode_invalid_json() {
1449 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1450 let decoder = JsonDecoder::new(schema);
1451 let records = vec![RawRecord::new(b"not json".to_vec())];
1452 let result = decoder.decode_batch(&records);
1453
1454 assert!(result.is_err());
1455 assert!(result.unwrap_err().to_string().contains("JSON parse error"));
1456 }
1457
1458 #[test]
1459 fn test_decode_non_object_json() {
1460 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1461 let decoder = JsonDecoder::new(schema);
1462 let records = vec![json_record("[1, 2, 3]")];
1463 let result = decoder.decode_batch(&records);
1464
1465 assert!(result.is_err());
1466 assert!(result
1467 .unwrap_err()
1468 .to_string()
1469 .contains("must be an object"));
1470 }
1471
1472 #[test]
1475 fn test_format_name() {
1476 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1477 let decoder = JsonDecoder::new(schema);
1478 assert_eq!(decoder.format_name(), "json");
1479 }
1480
1481 #[test]
1482 fn test_output_schema() {
1483 let schema = make_schema(vec![
1484 ("a", DataType::Int64, false),
1485 ("b", DataType::Utf8, true),
1486 ]);
1487 let decoder = JsonDecoder::new(schema.clone());
1488 assert_eq!(decoder.output_schema(), schema);
1489 }
1490
1491 #[test]
1492 fn test_decode_one() {
1493 let schema = make_schema(vec![("x", DataType::Int64, false)]);
1494 let decoder = JsonDecoder::new(schema);
1495 let record = json_record(r#"{"x": 99}"#);
1496 let batch = decoder.decode_one(&record).unwrap();
1497 assert_eq!(batch.num_rows(), 1);
1498 assert_eq!(
1499 batch
1500 .column(0)
1501 .as_primitive::<arrow_array::types::Int64Type>()
1502 .value(0),
1503 99
1504 );
1505 }
1506
1507 #[test]
1510 fn test_decode_int_from_float_json() {
1511 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1514 let decoder = JsonDecoder::new(schema);
1515 let records = vec![json_record(r#"{"x": 42.0}"#)];
1516 let batch = decoder.decode_batch(&records).unwrap();
1517 assert_eq!(
1518 batch
1519 .column(0)
1520 .as_primitive::<arrow_array::types::Int64Type>()
1521 .value(0),
1522 42
1523 );
1524 }
1525
1526 #[test]
1527 fn test_decode_float_from_int_json() {
1528 let schema = make_schema(vec![("x", DataType::Float64, false)]);
1530 let decoder = JsonDecoder::new(schema);
1531 let records = vec![json_record(r#"{"x": 42}"#)];
1532 let batch = decoder.decode_batch(&records).unwrap();
1533 let val = batch
1534 .column(0)
1535 .as_primitive::<arrow_array::types::Float64Type>()
1536 .value(0);
1537 assert!((val - 42.0).abs() < f64::EPSILON);
1538 }
1539
1540 #[test]
1543 fn test_decode_string_number_to_float64() {
1544 let schema = make_schema(vec![("price", DataType::Float64, false)]);
1545 let decoder = JsonDecoder::new(schema);
1546 let records = vec![json_record(r#"{"price": "187.52"}"#)];
1547 let batch = decoder.decode_batch(&records).unwrap();
1548 let val = batch
1549 .column(0)
1550 .as_primitive::<arrow_array::types::Float64Type>()
1551 .value(0);
1552 assert!((val - 187.52).abs() < f64::EPSILON);
1553 }
1554
1555 #[test]
1556 fn test_decode_string_to_int() {
1557 let schema = make_schema(vec![("qty", DataType::Int32, false)]);
1558 let decoder = JsonDecoder::new(schema);
1559 let records = vec![json_record(r#"{"qty": "100"}"#)];
1560 let batch = decoder.decode_batch(&records).unwrap();
1561 assert_eq!(
1562 batch
1563 .column(0)
1564 .as_primitive::<arrow_array::types::Int32Type>()
1565 .value(0),
1566 100
1567 );
1568 }
1569
1570 #[test]
1571 fn test_decode_epoch_millis_to_timestamp_millis() {
1572 let schema = make_schema(vec![(
1573 "ts",
1574 DataType::Timestamp(TimeUnit::Millisecond, None),
1575 false,
1576 )]);
1577 let decoder = JsonDecoder::new(schema);
1578 let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1579 let batch = decoder.decode_batch(&records).unwrap();
1580 let ts_col = batch
1581 .column(0)
1582 .as_primitive::<arrow_array::types::TimestampMillisecondType>();
1583 assert_eq!(ts_col.value(0), 1_705_312_200_000);
1584 }
1585
1586 #[test]
1587 fn test_decode_int_to_float_promotion() {
1588 let schema = make_schema(vec![("val", DataType::Float64, false)]);
1589 let decoder = JsonDecoder::new(schema);
1590 let records = vec![json_record(r#"{"val": 100}"#)];
1591 let batch = decoder.decode_batch(&records).unwrap();
1592 let val = batch
1593 .column(0)
1594 .as_primitive::<arrow_array::types::Float64Type>()
1595 .value(0);
1596 assert!((val - 100.0).abs() < f64::EPSILON);
1597 }
1598
1599 #[test]
1600 fn test_decode_string_boolean() {
1601 let schema = make_schema(vec![("active", DataType::Boolean, false)]);
1602 let decoder = JsonDecoder::new(schema);
1603 let records = vec![json_record(r#"{"active": "true"}"#)];
1604 let batch = decoder.decode_batch(&records).unwrap();
1605 assert!(batch.column(0).as_boolean().value(0));
1606 }
1607
1608 #[test]
1609 fn test_coerce_fails_on_unconvertible() {
1610 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1612 let decoder = JsonDecoder::new(schema);
1613 let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1614 let result = decoder.decode_batch(&records);
1615 assert!(result.is_err());
1616 assert!(result
1617 .unwrap_err()
1618 .to_string()
1619 .contains("type coercion failed"));
1620 }
1621
1622 #[test]
1623 fn test_enforcement_str_parsing() {
1624 assert_eq!(
1625 TypeMismatchStrategy::from_enforcement_str("coerce"),
1626 Some(TypeMismatchStrategy::Coerce)
1627 );
1628 assert_eq!(
1629 TypeMismatchStrategy::from_enforcement_str("STRICT"),
1630 Some(TypeMismatchStrategy::Reject)
1631 );
1632 assert_eq!(
1633 TypeMismatchStrategy::from_enforcement_str("Permissive"),
1634 Some(TypeMismatchStrategy::Null)
1635 );
1636 assert_eq!(TypeMismatchStrategy::from_enforcement_str("unknown"), None);
1637 }
1638
1639 #[test]
1642 fn test_decode_i8_and_u8() {
1643 let schema = make_schema(vec![
1644 ("signed", DataType::Int8, false),
1645 ("unsigned", DataType::UInt8, false),
1646 ]);
1647 let decoder = JsonDecoder::new(schema);
1648 let records = vec![json_record(r#"{"signed": -5, "unsigned": 200}"#)];
1649 let batch = decoder.decode_batch(&records).unwrap();
1650 assert_eq!(
1651 batch
1652 .column(0)
1653 .as_primitive::<arrow_array::types::Int8Type>()
1654 .value(0),
1655 -5
1656 );
1657 assert_eq!(
1658 batch
1659 .column(1)
1660 .as_primitive::<arrow_array::types::UInt8Type>()
1661 .value(0),
1662 200
1663 );
1664 }
1665
1666 #[test]
1669 fn test_json_path_single() {
1670 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1671 let config = JsonDecoderConfig {
1672 json_path: Some(vec!["data".into()]),
1673 ..Default::default()
1674 };
1675 let decoder = JsonDecoder::with_config(schema, config);
1676 let records = vec![json_record(r#"{"data":{"id":1}}"#)];
1677 let batch = decoder.decode_batch(&records).unwrap();
1678 assert_eq!(batch.num_rows(), 1);
1679 assert_eq!(
1680 batch
1681 .column(0)
1682 .as_primitive::<arrow_array::types::Int64Type>()
1683 .value(0),
1684 1
1685 );
1686 }
1687
1688 #[test]
1689 fn test_json_path_multi() {
1690 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1691 let config = JsonDecoderConfig {
1692 json_path: Some(vec!["a".into(), "b".into()]),
1693 ..Default::default()
1694 };
1695 let decoder = JsonDecoder::with_config(schema, config);
1696 let records = vec![json_record(r#"{"a":{"b":{"id":1}}}"#)];
1697 let batch = decoder.decode_batch(&records).unwrap();
1698 assert_eq!(
1699 batch
1700 .column(0)
1701 .as_primitive::<arrow_array::types::Int64Type>()
1702 .value(0),
1703 1
1704 );
1705 }
1706
1707 #[test]
1708 fn test_json_path_missing_errors() {
1709 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1710 let config = JsonDecoderConfig {
1711 json_path: Some(vec!["nonexistent".into()]),
1712 ..Default::default()
1713 };
1714 let decoder = JsonDecoder::with_config(schema, config);
1715 let records = vec![json_record(r#"{"data":{"id":1}}"#)];
1716 let result = decoder.decode_batch(&records);
1717 assert!(result.is_err());
1718 assert!(result.unwrap_err().to_string().contains("not found"));
1719 }
1720
1721 #[test]
1724 fn test_json_column_custom_path() {
1725 let schema = make_schema(vec![
1726 ("p", DataType::Float64, false),
1727 ("stream_name", DataType::Utf8, true),
1728 ]);
1729 let mut col_paths = HashMap::new();
1730 col_paths.insert("stream_name".into(), vec!["stream".into()]);
1731 let config = JsonDecoderConfig {
1732 json_path: Some(vec!["data".into()]),
1733 json_column_paths: col_paths,
1734 ..Default::default()
1735 };
1736 let decoder = JsonDecoder::with_config(schema, config);
1737 let records = vec![json_record(
1738 r#"{"stream":"btcusdt@trade","data":{"p":67523.0}}"#,
1739 )];
1740 let batch = decoder.decode_batch(&records).unwrap();
1741 assert_eq!(batch.num_rows(), 1);
1742 let price = batch
1743 .column(0)
1744 .as_primitive::<arrow_array::types::Float64Type>()
1745 .value(0);
1746 assert!((price - 67523.0).abs() < f64::EPSILON);
1747 assert_eq!(batch.column(1).as_string::<i32>().value(0), "btcusdt@trade");
1748 }
1749
1750 #[test]
1751 fn test_json_column_deep_path() {
1752 let schema = make_schema(vec![
1753 ("p", DataType::Float64, false),
1754 ("ts", DataType::Int64, true),
1755 ]);
1756 let mut col_paths = HashMap::new();
1757 col_paths.insert("ts".into(), vec!["meta".into(), "timestamp".into()]);
1758 let config = JsonDecoderConfig {
1759 json_path: Some(vec!["data".into()]),
1760 json_column_paths: col_paths,
1761 ..Default::default()
1762 };
1763 let decoder = JsonDecoder::with_config(schema, config);
1764 let records = vec![json_record(
1765 r#"{"meta":{"timestamp":123456},"data":{"p":99.5}}"#,
1766 )];
1767 let batch = decoder.decode_batch(&records).unwrap();
1768 assert_eq!(batch.num_rows(), 1);
1769 assert_eq!(
1770 batch
1771 .column(1)
1772 .as_primitive::<arrow_array::types::Int64Type>()
1773 .value(0),
1774 123456
1775 );
1776 }
1777
1778 #[test]
1779 fn test_json_column_missing_returns_null() {
1780 let schema = make_schema(vec![
1781 ("id", DataType::Int64, false),
1782 ("missing_col", DataType::Utf8, true),
1783 ]);
1784 let mut col_paths = HashMap::new();
1785 col_paths.insert("missing_col".into(), vec!["nowhere".into(), "gone".into()]);
1786 let config = JsonDecoderConfig {
1787 json_column_paths: col_paths,
1788 ..Default::default()
1789 };
1790 let decoder = JsonDecoder::with_config(schema, config);
1791 let records = vec![json_record(r#"{"id":42}"#)];
1792 let batch = decoder.decode_batch(&records).unwrap();
1793 assert_eq!(
1794 batch
1795 .column(0)
1796 .as_primitive::<arrow_array::types::Int64Type>()
1797 .value(0),
1798 42
1799 );
1800 assert!(batch.column(1).is_null(0));
1801 }
1802
1803 #[test]
1806 fn test_json_explode_arrays() {
1807 let schema = make_schema(vec![
1808 ("price", DataType::Utf8, true),
1809 ("qty", DataType::Utf8, true),
1810 ]);
1811 let config = JsonDecoderConfig {
1812 json_explode: Some(vec!["price".into(), "qty".into()]),
1813 ..Default::default()
1814 };
1815 let decoder = JsonDecoder::with_config(schema, config);
1816 let records = vec![json_record(r#"[["67523","1.5"],["67522","0.8"]]"#)];
1817 let batch = decoder.decode_batch(&records).unwrap();
1818 assert_eq!(batch.num_rows(), 2);
1819 assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
1820 assert_eq!(batch.column(0).as_string::<i32>().value(1), "67522");
1821 assert_eq!(batch.column(1).as_string::<i32>().value(0), "1.5");
1822 assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
1823 }
1824
1825 #[test]
1826 fn test_json_explode_objects() {
1827 let schema = make_schema(vec![
1828 ("id", DataType::Int64, true),
1829 ("name", DataType::Utf8, true),
1830 ]);
1831 let config = JsonDecoderConfig {
1832 json_explode: Some(vec!["id".into(), "name".into()]),
1833 ..Default::default()
1834 };
1835 let decoder = JsonDecoder::with_config(schema, config);
1836 let records = vec![json_record(
1837 r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#,
1838 )];
1839 let batch = decoder.decode_batch(&records).unwrap();
1840 assert_eq!(batch.num_rows(), 2);
1841 assert_eq!(
1842 batch
1843 .column(0)
1844 .as_primitive::<arrow_array::types::Int64Type>()
1845 .value(0),
1846 1
1847 );
1848 assert_eq!(batch.column(1).as_string::<i32>().value(1), "Bob");
1849 }
1850
1851 #[test]
1854 fn test_json_path_plus_explode() {
1855 let schema = make_schema(vec![
1856 ("price", DataType::Utf8, true),
1857 ("qty", DataType::Utf8, true),
1858 ]);
1859 let config = JsonDecoderConfig {
1860 json_path: Some(vec!["bids".into()]),
1861 json_explode: Some(vec!["price".into(), "qty".into()]),
1862 ..Default::default()
1863 };
1864 let decoder = JsonDecoder::with_config(schema, config);
1865 let records = vec![json_record(r#"{"bids":[["67523","1.5"],["67522","0.8"]]}"#)];
1866 let batch = decoder.decode_batch(&records).unwrap();
1867 assert_eq!(batch.num_rows(), 2);
1868 assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
1869 assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
1870 }
1871
1872 #[test]
1875 fn test_from_connector_config() {
1876 let mut config = crate::config::ConnectorConfig::new("websocket");
1877 config.set("json.path", "data.trade");
1878 config.set("json.column.stream_name", "stream");
1879 config.set("json.column.ts", "meta.timestamp");
1880 config.set("json.explode", "price, qty");
1881 config.set("schema.enforcement", "strict");
1882 config.set("nested.as.jsonb", "true");
1883
1884 let cfg = JsonDecoderConfig::from_connector_config(&config);
1885 assert_eq!(cfg.json_path, Some(vec!["data".into(), "trade".into()]));
1886 assert_eq!(
1887 cfg.json_column_paths.get("stream_name"),
1888 Some(&vec!["stream".into()])
1889 );
1890 assert_eq!(
1891 cfg.json_column_paths.get("ts"),
1892 Some(&vec!["meta".into(), "timestamp".into()])
1893 );
1894 assert_eq!(cfg.json_explode, Some(vec!["price".into(), "qty".into()]));
1895 assert_eq!(cfg.type_mismatch, TypeMismatchStrategy::Reject);
1896 assert!(cfg.nested_as_jsonb);
1897 }
1898
1899 #[test]
1900 fn test_default_config_unchanged() {
1901 let schema = make_schema(vec![
1902 ("a", DataType::Int64, false),
1903 ("b", DataType::Utf8, true),
1904 ]);
1905 let decoder = JsonDecoder::new(schema);
1906 let records = vec![
1907 json_record(r#"{"a": 1, "b": "hello"}"#),
1908 json_record(r#"{"a": 2, "b": "world"}"#),
1909 ];
1910 let batch = decoder.decode_batch(&records).unwrap();
1911 assert_eq!(batch.num_rows(), 2);
1912 assert_eq!(
1913 batch
1914 .column(0)
1915 .as_primitive::<arrow_array::types::Int64Type>()
1916 .value(0),
1917 1
1918 );
1919 assert_eq!(batch.column(1).as_string::<i32>().value(1), "world");
1920 }
1921
1922 #[test]
1923 fn test_unknown_fields_with_path() {
1924 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1925 let config = JsonDecoderConfig {
1926 json_path: Some(vec!["data".into()]),
1927 unknown_fields: UnknownFieldStrategy::CollectExtra,
1928 ..Default::default()
1929 };
1930 let decoder = JsonDecoder::with_config(schema, config);
1931 let records = vec![json_record(r#"{"data":{"a":1,"extra":"value"}}"#)];
1932 let batch = decoder.decode_batch(&records).unwrap();
1933 assert_eq!(batch.num_columns(), 2); assert_eq!(batch.schema().field(1).name(), "_extra");
1935 assert!(!batch.column(1).is_null(0));
1936 }
1937}