1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12
13use arrow_array::builder::{
14 BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 Int8Builder, LargeBinaryBuilder, LargeStringBuilder, ListBuilder, StringBuilder,
16 TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
17 TimestampSecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, SchemaRef, TimeUnit};
21
22use crate::schema::error::{SchemaError, SchemaResult};
23use crate::schema::json::jsonb::JsonbEncoder;
24use crate::schema::traits::FormatDecoder;
25use crate::schema::types::RawRecord;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum UnknownFieldStrategy {
30 Ignore,
32 CollectExtra,
34 Reject,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum TypeMismatchStrategy {
41 Null,
43 Coerce,
45 Reject,
47}
48
49impl TypeMismatchStrategy {
50 #[must_use]
52 pub fn from_enforcement_str(s: &str) -> Option<Self> {
53 match s.to_lowercase().as_str() {
54 "coerce" => Some(Self::Coerce),
55 "strict" => Some(Self::Reject),
56 "permissive" => Some(Self::Null),
57 _ => None,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub enum EpochUnit {
67 Seconds,
69 #[default]
71 Millis,
72 Micros,
74 Nanos,
76}
77
78impl EpochUnit {
79 #[must_use]
83 pub fn from_str_opt(s: &str) -> Option<Self> {
84 match s.trim().to_lowercase().as_str() {
85 "seconds" | "epoch_seconds" => Some(Self::Seconds),
86 "millis" | "epoch_millis" => Some(Self::Millis),
87 "micros" | "epoch_micros" => Some(Self::Micros),
88 "nanos" | "epoch_nanos" => Some(Self::Nanos),
89 _ => None,
90 }
91 }
92
93 const fn nanos_per(self) -> i64 {
95 match self {
96 Self::Seconds => 1_000_000_000,
97 Self::Millis => 1_000_000,
98 Self::Micros => 1_000,
99 Self::Nanos => 1,
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
106enum ColumnExtraction {
107 DefaultPath,
109 CustomPath { segments: Vec<String> },
111}
112
113#[derive(Debug, Clone)]
115pub struct JsonDecoderConfig {
116 pub unknown_fields: UnknownFieldStrategy,
118
119 pub type_mismatch: TypeMismatchStrategy,
121
122 pub timestamp_formats: Vec<String>,
126
127 pub nested_as_jsonb: bool,
131
132 pub json_path: Option<Vec<String>>,
136
137 pub json_column_paths: HashMap<String, Vec<String>>,
141
142 pub numeric_timestamp_units: HashMap<String, EpochUnit>,
145
146 pub json_explode: Option<Vec<String>>,
150}
151
152impl Default for JsonDecoderConfig {
153 fn default() -> Self {
154 Self {
155 unknown_fields: UnknownFieldStrategy::Ignore,
156 type_mismatch: TypeMismatchStrategy::Coerce,
157 timestamp_formats: vec![
158 "iso8601".into(),
159 "%Y-%m-%dT%H:%M:%S%.fZ".into(),
160 "%Y-%m-%dT%H:%M:%S%.f%:z".into(),
161 "%Y-%m-%d %H:%M:%S%.f".into(),
162 "%Y-%m-%d %H:%M:%S".into(),
163 ],
164 nested_as_jsonb: false,
165 json_path: None,
166 json_column_paths: HashMap::new(),
167 numeric_timestamp_units: HashMap::new(),
168 json_explode: None,
169 }
170 }
171}
172
173impl JsonDecoderConfig {
174 #[must_use]
180 pub fn from_connector_config(config: &crate::config::ConnectorConfig) -> Self {
181 let mut cfg = Self::default();
182
183 if let Some(path) = config.get("json.path") {
184 cfg.json_path = Some(path.split('.').map(ToString::to_string).collect());
185 }
186
187 let col_props = config.properties_with_prefix("json.column.");
188 for (col_name, val) in col_props {
189 if let Some(col) = col_name.strip_suffix(".epoch_unit") {
194 if let Some(unit) = EpochUnit::from_str_opt(&val) {
195 cfg.numeric_timestamp_units.insert(col.to_string(), unit);
196 } else {
197 tracing::warn!(
198 column = col,
199 value = %val,
200 "invalid json.column.<col>.epoch_unit (expected \
201 seconds|millis|micros|nanos); using millis"
202 );
203 }
204 continue;
205 }
206 cfg.json_column_paths
207 .insert(col_name, val.split('.').map(ToString::to_string).collect());
208 }
209
210 if let Some(explode) = config.get("json.explode") {
211 cfg.json_explode = Some(explode.split(',').map(|s| s.trim().to_string()).collect());
212 }
213
214 if let Some(enforcement) = config.get("schema.enforcement") {
215 if let Some(strategy) = TypeMismatchStrategy::from_enforcement_str(enforcement) {
216 cfg.type_mismatch = strategy;
217 }
218 }
219 if let Some(v) = config.get("nested.as.jsonb") {
220 cfg.nested_as_jsonb = v.eq_ignore_ascii_case("true");
221 }
222 cfg
223 }
224}
225
226pub struct JsonDecoder {
233 schema: SchemaRef,
235 config: JsonDecoderConfig,
237 field_indices: Vec<(String, usize)>,
239 mismatch_count: AtomicU64,
241 column_extractions: Vec<ColumnExtraction>,
243 column_epoch_units: Vec<EpochUnit>,
246 explode_col_indices: Option<Vec<Option<usize>>>,
251}
252
253#[allow(clippy::missing_fields_in_debug)]
254impl std::fmt::Debug for JsonDecoder {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 f.debug_struct("JsonDecoder")
257 .field("schema", &self.schema)
258 .field("config", &self.config)
259 .field(
260 "mismatch_count",
261 &self.mismatch_count.load(Ordering::Relaxed),
262 )
263 .finish()
264 }
265}
266
267impl JsonDecoder {
268 #[must_use]
270 pub fn new(schema: SchemaRef) -> Self {
271 Self::with_config(schema, JsonDecoderConfig::default())
272 }
273
274 #[must_use]
276 pub fn with_config(schema: SchemaRef, config: JsonDecoderConfig) -> Self {
277 let field_indices: Vec<(String, usize)> = schema
278 .fields()
279 .iter()
280 .enumerate()
281 .map(|(i, f)| (f.name().clone(), i))
282 .collect();
283
284 let column_extractions: Vec<ColumnExtraction> = schema
285 .fields()
286 .iter()
287 .map(|f| {
288 let col_name = f.name();
289 if let Some(path_segments) = config.json_column_paths.get(col_name.as_str()) {
290 ColumnExtraction::CustomPath {
291 segments: path_segments.clone(),
292 }
293 } else {
294 ColumnExtraction::DefaultPath
295 }
296 })
297 .collect();
298
299 let column_epoch_units: Vec<EpochUnit> = schema
300 .fields()
301 .iter()
302 .map(|f| {
303 config
304 .numeric_timestamp_units
305 .get(f.name().as_str())
306 .copied()
307 .unwrap_or_default()
308 })
309 .collect();
310
311 let explode_col_indices = config.json_explode.as_ref().map(|names| {
312 names
313 .iter()
314 .map(|name| {
315 field_indices
316 .iter()
317 .find(|(n, _)| n == name)
318 .map(|(_, idx)| *idx)
319 })
320 .collect()
321 });
322
323 Self {
324 schema,
325 config,
326 field_indices,
327 mismatch_count: AtomicU64::new(0),
328 column_extractions,
329 column_epoch_units,
330 explode_col_indices,
331 }
332 }
333
334 pub fn mismatch_count(&self) -> u64 {
336 self.mismatch_count.load(Ordering::Relaxed)
337 }
338}
339
340impl FormatDecoder for JsonDecoder {
341 fn output_schema(&self) -> SchemaRef {
342 self.schema.clone()
343 }
344
345 #[allow(clippy::too_many_lines)]
346 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
347 if records.is_empty() {
348 return Ok(RecordBatch::new_empty(self.schema.clone()));
349 }
350
351 let num_fields = self.schema.fields().len();
352 let capacity = records.len();
353
354 let mut builders = create_builders(&self.schema, capacity);
356
357 let collect_extra = matches!(
359 self.config.unknown_fields,
360 UnknownFieldStrategy::CollectExtra
361 );
362 let mut extra_builder = if collect_extra {
363 Some(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
364 } else {
365 None
366 };
367
368 let mut jsonb_encoder = if self.config.nested_as_jsonb {
369 Some(JsonbEncoder::new())
370 } else {
371 None
372 };
373
374 let mut populated = vec![false; num_fields];
376
377 for record in records {
378 let value: serde_json::Value = serde_json::from_slice(&record.value)
379 .map_err(|e| SchemaError::DecodeError(format!("JSON parse error: {e}")))?;
380
381 let default_target: &serde_json::Value =
387 match navigate_path_opt(&value, self.config.json_path.as_deref()) {
388 Some(v) => v,
389 None => continue,
390 };
391
392 if let Some(ref col_indices) = self.explode_col_indices {
393 let arr = default_target.as_array().ok_or_else(|| {
395 SchemaError::DecodeError("json.explode target must be an array".into())
396 })?;
397 for element in arr {
398 populated.fill(false);
399 match element {
400 serde_json::Value::Array(items) => {
401 for (pos, col_idx_opt) in col_indices.iter().enumerate() {
402 if let Some(col_idx) = col_idx_opt {
403 let val = items.get(pos).unwrap_or(&serde_json::Value::Null);
404 let field = &self.schema.fields()[*col_idx];
405 append_value(
406 &mut builders[*col_idx],
407 field.data_type(),
408 val,
409 &self.config,
410 &self.mismatch_count,
411 jsonb_encoder.as_mut(),
412 self.column_epoch_units[*col_idx],
413 )?;
414 populated[*col_idx] = true;
415 }
416 }
417 }
418 serde_json::Value::Object(obj) => {
419 for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
420 if let Some(val) = obj.get(col_name.as_str()) {
421 let field = &self.schema.fields()[col_idx];
422 append_value(
423 &mut builders[col_idx],
424 field.data_type(),
425 val,
426 &self.config,
427 &self.mismatch_count,
428 jsonb_encoder.as_mut(),
429 self.column_epoch_units[col_idx],
430 )?;
431 populated[col_idx] = true;
432 }
433 }
434 }
435 _ => {
436 return Err(SchemaError::DecodeError(
437 "json.explode array elements must be arrays or objects".into(),
438 ));
439 }
440 }
441 for (col_idx, was_populated) in populated.iter().enumerate() {
443 if !was_populated {
444 append_null(&mut builders[col_idx]);
445 }
446 }
447 if let Some(ref mut eb) = extra_builder {
449 eb.append_null();
450 }
451 }
452 } else {
453 let default_obj = default_target.as_object().ok_or_else(|| {
455 SchemaError::DecodeError("JSON value must be an object".into())
456 })?;
457
458 populated.fill(false);
459
460 let mut extra_fields: Option<serde_json::Map<String, serde_json::Value>> =
462 if collect_extra {
463 Some(serde_json::Map::new())
464 } else {
465 None
466 };
467
468 for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
470 match &self.column_extractions[col_idx] {
471 ColumnExtraction::DefaultPath => {
472 if let Some(val) = default_obj.get(col_name.as_str()) {
473 populated[col_idx] = true;
474 let field = &self.schema.fields()[col_idx];
475 append_value(
476 &mut builders[col_idx],
477 field.data_type(),
478 val,
479 &self.config,
480 &self.mismatch_count,
481 jsonb_encoder.as_mut(),
482 self.column_epoch_units[col_idx],
483 )?;
484 }
485 }
486 ColumnExtraction::CustomPath { segments } => {
487 let extracted = navigate_path(&value, segments);
489 if let Some(val) = extracted {
490 populated[col_idx] = true;
491 let field = &self.schema.fields()[col_idx];
492 append_value(
493 &mut builders[col_idx],
494 field.data_type(),
495 val,
496 &self.config,
497 &self.mismatch_count,
498 jsonb_encoder.as_mut(),
499 self.column_epoch_units[col_idx],
500 )?;
501 }
502 }
503 }
504 }
505
506 if collect_extra {
508 for (key, val) in default_obj {
509 if self.field_index(key).is_none() {
510 match self.config.unknown_fields {
511 UnknownFieldStrategy::CollectExtra => {
512 if let Some(ref mut extra) = extra_fields {
513 extra.insert(key.clone(), val.clone());
514 }
515 }
516 UnknownFieldStrategy::Reject => {
517 return Err(SchemaError::DecodeError(format!(
518 "unknown field '{key}' not in schema"
519 )));
520 }
521 UnknownFieldStrategy::Ignore => {}
522 }
523 }
524 }
525 } else if matches!(self.config.unknown_fields, UnknownFieldStrategy::Reject) {
526 for key in default_obj.keys() {
527 if self.field_index(key).is_none() {
528 return Err(SchemaError::DecodeError(format!(
529 "unknown field '{key}' not in schema"
530 )));
531 }
532 }
533 }
534
535 for (col_idx, was_populated) in populated.iter().enumerate() {
537 if !was_populated {
538 append_null(&mut builders[col_idx]);
539 }
540 }
541
542 if let Some(ref mut eb) = extra_builder {
544 if let Some(ref extra) = extra_fields {
545 if extra.is_empty() {
546 eb.append_null();
547 } else {
548 let mut enc = jsonb_encoder
549 .as_mut()
550 .map_or_else(JsonbEncoder::new, |_| JsonbEncoder::new());
551 let bytes = enc.encode(&serde_json::Value::Object(extra.clone()));
552 eb.append_value(&bytes);
553 }
554 } else {
555 eb.append_null();
556 }
557 }
558 }
559 }
560
561 let mut columns: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
563
564 let final_schema = if let Some(mut eb) = extra_builder {
566 columns.push(Arc::new(eb.finish()));
567 let mut fields = self.schema.fields().to_vec();
568 fields.push(Arc::new(arrow_schema::Field::new(
569 "_extra",
570 DataType::LargeBinary,
571 true,
572 )));
573 Arc::new(arrow_schema::Schema::new(fields))
574 } else {
575 self.schema.clone()
576 };
577
578 RecordBatch::try_new(final_schema, columns)
579 .map_err(|e| SchemaError::DecodeError(format!("RecordBatch construction: {e}")))
580 }
581
582 #[allow(clippy::unnecessary_literal_bound)]
583 fn format_name(&self) -> &str {
584 "json"
585 }
586}
587
588impl JsonDecoder {
589 fn field_index(&self, name: &str) -> Option<usize> {
592 self.field_indices
593 .iter()
594 .find(|(n, _)| n == name)
595 .map(|(_, idx)| *idx)
596 }
597}
598
599fn navigate_path<'a>(
602 root: &'a serde_json::Value,
603 segments: &[String],
604) -> Option<&'a serde_json::Value> {
605 let mut current = root;
606 for segment in segments {
607 current = current.get(segment.as_str())?;
608 }
609 Some(current)
610}
611
612fn navigate_path_opt<'a>(
615 root: &'a serde_json::Value,
616 segments: Option<&[String]>,
617) -> Option<&'a serde_json::Value> {
618 match segments {
619 Some(s) => navigate_path(root, s),
620 None => Some(root),
621 }
622}
623
624trait ColumnBuilder: Send {
628 fn finish(&mut self) -> ArrayRef;
629 fn append_null_value(&mut self);
630 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
631}
632
633macro_rules! impl_column_builder {
634 ($builder:ty, $array:ty) => {
635 impl ColumnBuilder for $builder {
636 fn finish(&mut self) -> ArrayRef {
637 Arc::new(<$builder>::finish(self))
638 }
639 fn append_null_value(&mut self) {
640 self.append_null();
641 }
642 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
643 self
644 }
645 }
646 };
647}
648
649impl_column_builder!(BooleanBuilder, arrow_array::BooleanArray);
650impl_column_builder!(Int8Builder, arrow_array::Int8Array);
651impl_column_builder!(Int16Builder, arrow_array::Int16Array);
652impl_column_builder!(Int32Builder, arrow_array::Int32Array);
653impl_column_builder!(Int64Builder, arrow_array::Int64Array);
654impl_column_builder!(UInt8Builder, arrow_array::UInt8Array);
655impl_column_builder!(UInt16Builder, arrow_array::UInt16Array);
656impl_column_builder!(UInt32Builder, arrow_array::UInt32Array);
657impl_column_builder!(UInt64Builder, arrow_array::UInt64Array);
658impl_column_builder!(Float32Builder, arrow_array::Float32Array);
659impl_column_builder!(Float64Builder, arrow_array::Float64Array);
660impl_column_builder!(StringBuilder, arrow_array::StringArray);
661impl_column_builder!(ListBuilder<StringBuilder>, arrow_array::ListArray);
662impl_column_builder!(LargeStringBuilder, arrow_array::LargeStringArray);
663impl_column_builder!(LargeBinaryBuilder, arrow_array::LargeBinaryArray);
664impl_column_builder!(TimestampSecondBuilder, arrow_array::TimestampSecondArray);
665impl_column_builder!(
666 TimestampMillisecondBuilder,
667 arrow_array::TimestampMillisecondArray
668);
669impl_column_builder!(
670 TimestampMicrosecondBuilder,
671 arrow_array::TimestampMicrosecondArray
672);
673impl_column_builder!(
674 TimestampNanosecondBuilder,
675 arrow_array::TimestampNanosecondArray
676);
677
678fn create_builders(schema: &SchemaRef, capacity: usize) -> Vec<Box<dyn ColumnBuilder>> {
679 schema
680 .fields()
681 .iter()
682 .map(|f| create_builder(f.data_type(), capacity))
683 .collect()
684}
685
686fn create_builder(data_type: &DataType, capacity: usize) -> Box<dyn ColumnBuilder> {
687 match data_type {
688 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
689 DataType::Int8 => Box::new(Int8Builder::with_capacity(capacity)),
690 DataType::Int16 => Box::new(Int16Builder::with_capacity(capacity)),
691 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
692 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
693 DataType::UInt8 => Box::new(UInt8Builder::with_capacity(capacity)),
694 DataType::UInt16 => Box::new(UInt16Builder::with_capacity(capacity)),
695 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
696 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
697 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
698 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
699 DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, capacity * 32)),
700 DataType::LargeBinary => {
701 Box::new(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
702 }
703 DataType::Timestamp(TimeUnit::Second, tz) => {
704 let builder =
705 TimestampSecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
706 Box::new(builder)
707 }
708 DataType::Timestamp(TimeUnit::Millisecond, tz) => {
709 let builder =
710 TimestampMillisecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
711 Box::new(builder)
712 }
713 DataType::Timestamp(TimeUnit::Microsecond, tz) => {
714 let builder =
715 TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
716 Box::new(builder)
717 }
718 DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
719 let builder =
720 TimestampNanosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
721 Box::new(builder)
722 }
723 DataType::List(field) if matches!(field.data_type(), DataType::Utf8) => {
724 Box::new(ListBuilder::new(StringBuilder::new()))
725 }
726 _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)),
728 }
729}
730
731fn append_null(builder: &mut Box<dyn ColumnBuilder>) {
732 builder.append_null_value();
733}
734
735#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
737fn append_value(
738 builder: &mut Box<dyn ColumnBuilder>,
739 target_type: &DataType,
740 value: &serde_json::Value,
741 config: &JsonDecoderConfig,
742 mismatch_count: &AtomicU64,
743 jsonb_encoder: Option<&mut JsonbEncoder>,
744 numeric_ts_unit: EpochUnit,
745) -> SchemaResult<()> {
746 if value.is_null() {
747 builder.append_null_value();
748 return Ok(());
749 }
750
751 match target_type {
752 DataType::Boolean => {
753 let b = builder
754 .as_any_mut()
755 .downcast_mut::<BooleanBuilder>()
756 .unwrap();
757 match extract_bool(value, config) {
758 Ok(v) => b.append_value(v),
759 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
760 }
761 }
762 DataType::Int8 => {
763 let b = builder.as_any_mut().downcast_mut::<Int8Builder>().unwrap();
764 match extract_i8(value, config) {
765 Ok(v) => b.append_value(v),
766 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
767 }
768 }
769 DataType::Int16 => {
770 let b = builder.as_any_mut().downcast_mut::<Int16Builder>().unwrap();
771 match extract_i16(value, config) {
772 Ok(v) => b.append_value(v),
773 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
774 }
775 }
776 DataType::Int32 => {
777 let b = builder.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
778 match extract_i32(value, config) {
779 Ok(v) => b.append_value(v),
780 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
781 }
782 }
783 DataType::Int64 => {
784 let b = builder.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
785 match extract_i64(value, config) {
786 Ok(v) => b.append_value(v),
787 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
788 }
789 }
790 DataType::UInt8 => {
791 let b = builder.as_any_mut().downcast_mut::<UInt8Builder>().unwrap();
792 match extract_u8(value, config) {
793 Ok(v) => b.append_value(v),
794 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
795 }
796 }
797 DataType::UInt16 => {
798 let b = builder
799 .as_any_mut()
800 .downcast_mut::<UInt16Builder>()
801 .unwrap();
802 match extract_u16(value, config) {
803 Ok(v) => b.append_value(v),
804 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
805 }
806 }
807 DataType::UInt32 => {
808 let b = builder
809 .as_any_mut()
810 .downcast_mut::<UInt32Builder>()
811 .unwrap();
812 match extract_u32(value, config) {
813 Ok(v) => b.append_value(v),
814 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
815 }
816 }
817 DataType::UInt64 => {
818 let b = builder
819 .as_any_mut()
820 .downcast_mut::<UInt64Builder>()
821 .unwrap();
822 match extract_u64(value, config) {
823 Ok(v) => b.append_value(v),
824 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
825 }
826 }
827 DataType::Float32 => {
828 let b = builder
829 .as_any_mut()
830 .downcast_mut::<Float32Builder>()
831 .unwrap();
832 match extract_f32(value, config) {
833 Ok(v) => b.append_value(v),
834 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
835 }
836 }
837 DataType::Float64 => {
838 let b = builder
839 .as_any_mut()
840 .downcast_mut::<Float64Builder>()
841 .unwrap();
842 match extract_f64(value, config) {
843 Ok(v) => b.append_value(v),
844 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
845 }
846 }
847 DataType::LargeUtf8 => {
848 let b = builder
849 .as_any_mut()
850 .downcast_mut::<LargeStringBuilder>()
851 .unwrap();
852 let s = value_to_string(value);
853 b.append_value(&s);
854 }
855 DataType::LargeBinary => {
856 let b = builder
857 .as_any_mut()
858 .downcast_mut::<LargeBinaryBuilder>()
859 .unwrap();
860 if let Some(enc) = jsonb_encoder {
861 let bytes = enc.encode(value);
862 b.append_value(&bytes);
863 } else {
864 let bytes = serde_json::to_vec(value).unwrap_or_default();
866 b.append_value(&bytes);
867 }
868 }
869 DataType::Timestamp(unit, _) => {
870 match extract_timestamp(value, config, *unit, numeric_ts_unit) {
871 Ok(ts) => append_timestamp(builder, *unit, ts),
872 Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
873 }
874 }
875 DataType::List(field) if matches!(field.data_type(), DataType::Utf8) => {
876 if let Some(items) = value.as_array() {
877 let b = builder
878 .as_any_mut()
879 .downcast_mut::<ListBuilder<StringBuilder>>()
880 .unwrap();
881 for item in items {
882 if let Some(s) = item.as_str() {
883 b.values().append_value(s);
884 } else if item.is_null() {
885 b.values().append_null();
886 } else {
887 b.values().append_value(value_to_string(item));
888 }
889 }
890 b.append(true);
891 } else {
892 handle_mismatch(
895 builder,
896 config,
897 mismatch_count,
898 &format!("expected array, got {}", json_type_name(value)),
899 )?;
900 }
901 }
902 _ => {
904 let b = builder
905 .as_any_mut()
906 .downcast_mut::<StringBuilder>()
907 .unwrap();
908 let s = value_to_string(value);
909 b.append_value(&s);
910 }
911 }
912
913 Ok(())
914}
915
916fn handle_mismatch(
917 builder: &mut Box<dyn ColumnBuilder>,
918 config: &JsonDecoderConfig,
919 mismatch_count: &AtomicU64,
920 error_msg: &str,
921) -> SchemaResult<()> {
922 match config.type_mismatch {
923 TypeMismatchStrategy::Null => {
924 mismatch_count.fetch_add(1, Ordering::Relaxed);
925 builder.append_null_value();
926 Ok(())
927 }
928 TypeMismatchStrategy::Coerce => {
929 Err(SchemaError::DecodeError(format!(
931 "type coercion failed: {error_msg}"
932 )))
933 }
934 TypeMismatchStrategy::Reject => Err(SchemaError::DecodeError(format!(
935 "type mismatch: {error_msg}"
936 ))),
937 }
938}
939
940fn extract_bool(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<bool, String> {
943 if let Some(b) = value.as_bool() {
944 return Ok(b);
945 }
946 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
947 if let Some(s) = value.as_str() {
948 match s.to_ascii_lowercase().as_str() {
949 "true" | "1" | "yes" => return Ok(true),
950 "false" | "0" | "no" => return Ok(false),
951 _ => {}
952 }
953 }
954 if let Some(n) = value.as_i64() {
955 return Ok(n != 0);
956 }
957 }
958 Err(format!("expected boolean, got {}", json_type_name(value)))
959}
960
961fn extract_i8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i8, String> {
962 if let Some(n) = value.as_i64() {
963 if let Ok(v) = i8::try_from(n) {
964 return Ok(v);
965 }
966 return Err(format!("integer {n} out of i8 range"));
967 }
968 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
969 if let Some(s) = value.as_str() {
970 if let Ok(v) = s.parse::<i8>() {
971 return Ok(v);
972 }
973 }
974 if let Some(f) = value.as_f64() {
975 #[allow(clippy::cast_possible_truncation)]
976 let v = f as i8;
977 return Ok(v);
978 }
979 }
980 Err(format!("expected i8, got {}", json_type_name(value)))
981}
982
983fn extract_i16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i16, String> {
984 if let Some(n) = value.as_i64() {
985 if let Ok(v) = i16::try_from(n) {
986 return Ok(v);
987 }
988 return Err(format!("integer {n} out of i16 range"));
989 }
990 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
991 if let Some(s) = value.as_str() {
992 if let Ok(v) = s.parse::<i16>() {
993 return Ok(v);
994 }
995 }
996 if let Some(f) = value.as_f64() {
997 #[allow(clippy::cast_possible_truncation)]
998 let v = f as i16;
999 return Ok(v);
1000 }
1001 }
1002 Err(format!("expected i16, got {}", json_type_name(value)))
1003}
1004
1005fn extract_i32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i32, String> {
1006 if let Some(n) = value.as_i64() {
1007 if let Ok(v) = i32::try_from(n) {
1008 return Ok(v);
1009 }
1010 return Err(format!("integer {n} out of i32 range"));
1011 }
1012 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1013 if let Some(s) = value.as_str() {
1014 if let Ok(v) = s.parse::<i32>() {
1015 return Ok(v);
1016 }
1017 }
1018 if let Some(f) = value.as_f64() {
1019 #[allow(clippy::cast_possible_truncation)]
1020 return Ok(f as i32);
1021 }
1022 }
1023 Err(format!("expected i32, got {}", json_type_name(value)))
1024}
1025
1026fn extract_i64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i64, String> {
1027 if let Some(n) = value.as_i64() {
1028 return Ok(n);
1029 }
1030 if let Some(n) = value.as_u64() {
1031 if let Ok(v) = i64::try_from(n) {
1032 return Ok(v);
1033 }
1034 return Err(format!("u64 {n} out of i64 range"));
1035 }
1036 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1037 if let Some(s) = value.as_str() {
1038 if let Ok(v) = s.parse::<i64>() {
1039 return Ok(v);
1040 }
1041 }
1042 if let Some(f) = value.as_f64() {
1043 #[allow(clippy::cast_possible_truncation)]
1044 return Ok(f as i64);
1045 }
1046 }
1047 Err(format!("expected i64, got {}", json_type_name(value)))
1048}
1049
1050fn extract_f32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f32, String> {
1051 if let Some(f) = value.as_f64() {
1052 #[allow(clippy::cast_possible_truncation)]
1053 return Ok(f as f32);
1054 }
1055 if let Some(n) = value.as_i64() {
1056 #[allow(clippy::cast_precision_loss)]
1057 return Ok(n as f32);
1058 }
1059 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1060 if let Some(s) = value.as_str() {
1061 if let Ok(v) = s.parse::<f32>() {
1062 return Ok(v);
1063 }
1064 }
1065 }
1066 Err(format!("expected f32, got {}", json_type_name(value)))
1067}
1068
1069fn extract_f64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f64, String> {
1070 if let Some(f) = value.as_f64() {
1071 return Ok(f);
1072 }
1073 if let Some(n) = value.as_i64() {
1074 #[allow(clippy::cast_precision_loss)]
1075 return Ok(n as f64);
1076 }
1077 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1078 if let Some(s) = value.as_str() {
1079 if let Ok(v) = s.parse::<f64>() {
1080 return Ok(v);
1081 }
1082 }
1083 }
1084 Err(format!("expected f64, got {}", json_type_name(value)))
1085}
1086
1087fn extract_u8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u8, String> {
1088 if let Some(n) = value.as_u64() {
1089 if let Ok(v) = u8::try_from(n) {
1090 return Ok(v);
1091 }
1092 return Err(format!("integer {n} out of u8 range"));
1093 }
1094 if let Some(n) = value.as_i64() {
1095 if let Ok(v) = u8::try_from(n) {
1096 return Ok(v);
1097 }
1098 return Err(format!("integer {n} out of u8 range"));
1099 }
1100 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1101 if let Some(s) = value.as_str() {
1102 if let Ok(v) = s.parse::<u8>() {
1103 return Ok(v);
1104 }
1105 }
1106 }
1107 Err(format!("expected u8, got {}", json_type_name(value)))
1108}
1109
1110fn extract_u16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u16, String> {
1111 if let Some(n) = value.as_u64() {
1112 if let Ok(v) = u16::try_from(n) {
1113 return Ok(v);
1114 }
1115 return Err(format!("integer {n} out of u16 range"));
1116 }
1117 if let Some(n) = value.as_i64() {
1118 if let Ok(v) = u16::try_from(n) {
1119 return Ok(v);
1120 }
1121 return Err(format!("integer {n} out of u16 range"));
1122 }
1123 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1124 if let Some(s) = value.as_str() {
1125 if let Ok(v) = s.parse::<u16>() {
1126 return Ok(v);
1127 }
1128 }
1129 }
1130 Err(format!("expected u16, got {}", json_type_name(value)))
1131}
1132
1133fn extract_u32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u32, String> {
1134 if let Some(n) = value.as_u64() {
1135 if let Ok(v) = u32::try_from(n) {
1136 return Ok(v);
1137 }
1138 return Err(format!("integer {n} out of u32 range"));
1139 }
1140 if let Some(n) = value.as_i64() {
1141 if let Ok(v) = u32::try_from(n) {
1142 return Ok(v);
1143 }
1144 return Err(format!("integer {n} out of u32 range"));
1145 }
1146 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1147 if let Some(s) = value.as_str() {
1148 if let Ok(v) = s.parse::<u32>() {
1149 return Ok(v);
1150 }
1151 }
1152 }
1153 Err(format!("expected u32, got {}", json_type_name(value)))
1154}
1155
1156fn extract_u64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u64, String> {
1157 if let Some(n) = value.as_u64() {
1158 return Ok(n);
1159 }
1160 if let Some(n) = value.as_i64() {
1161 if let Ok(v) = u64::try_from(n) {
1162 return Ok(v);
1163 }
1164 return Err(format!("integer {n} out of u64 range"));
1165 }
1166 if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1167 if let Some(s) = value.as_str() {
1168 if let Ok(v) = s.parse::<u64>() {
1169 return Ok(v);
1170 }
1171 }
1172 }
1173 Err(format!("expected u64, got {}", json_type_name(value)))
1174}
1175
1176fn extract_timestamp(
1183 value: &serde_json::Value,
1184 config: &JsonDecoderConfig,
1185 unit: TimeUnit,
1186 from: EpochUnit,
1187) -> Result<i64, String> {
1188 if let Some(n) = value.as_i64() {
1190 return checked_epoch_to_unit(n, from, unit);
1191 }
1192 if let Some(f) = value.as_f64() {
1193 const POW2_63: f64 = 9_223_372_036_854_775_808.0;
1195 let rounded = f.round();
1201 if !(-POW2_63..POW2_63).contains(&rounded) {
1202 return Err(format!("timestamp {f} out of i64 range"));
1203 }
1204 #[allow(clippy::cast_possible_truncation)] let v = rounded as i64;
1206 return checked_epoch_to_unit(v, from, unit);
1207 }
1208
1209 if let Some(s) = value.as_str() {
1211 for fmt in &config.timestamp_formats {
1212 if fmt == "iso8601" {
1213 if let Ok(nanos) = arrow_cast::parse::string_to_timestamp_nanos(s) {
1214 return Ok(nanos_to_unit(nanos, unit));
1215 }
1216 continue;
1217 }
1218 if let Ok(ndt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
1219 let nanos = ndt.and_utc().timestamp_nanos_opt().unwrap_or(0);
1220 return Ok(nanos_to_unit(nanos, unit));
1221 }
1222 }
1223 return Err(format!("cannot parse timestamp from string: {s}"));
1224 }
1225
1226 Err(format!("expected timestamp, got {}", json_type_name(value)))
1227}
1228
1229fn checked_epoch_to_unit(value: i64, from: EpochUnit, to: TimeUnit) -> Result<i64, String> {
1237 let from_ns = from.nanos_per();
1238 let to_ns = match to {
1239 TimeUnit::Second => 1_000_000_000,
1240 TimeUnit::Millisecond => 1_000_000,
1241 TimeUnit::Microsecond => 1_000,
1242 TimeUnit::Nanosecond => 1,
1243 };
1244 if from_ns >= to_ns {
1245 let factor = from_ns / to_ns;
1247 value
1248 .checked_mul(factor)
1249 .ok_or_else(|| format!("timestamp {value} ({from:?}) out of i64 {to:?} range"))
1250 } else {
1251 let factor = to_ns / from_ns;
1252 Ok(value / factor)
1253 }
1254}
1255
1256fn nanos_to_unit(nanos: i64, unit: TimeUnit) -> i64 {
1258 match unit {
1259 TimeUnit::Second => nanos / 1_000_000_000,
1260 TimeUnit::Millisecond => nanos / 1_000_000,
1261 TimeUnit::Microsecond => nanos / 1_000,
1262 TimeUnit::Nanosecond => nanos,
1263 }
1264}
1265
1266fn append_timestamp(builder: &mut Box<dyn ColumnBuilder>, unit: TimeUnit, value: i64) {
1268 match unit {
1269 TimeUnit::Second => {
1270 builder
1271 .as_any_mut()
1272 .downcast_mut::<TimestampSecondBuilder>()
1273 .unwrap()
1274 .append_value(value);
1275 }
1276 TimeUnit::Millisecond => {
1277 builder
1278 .as_any_mut()
1279 .downcast_mut::<TimestampMillisecondBuilder>()
1280 .unwrap()
1281 .append_value(value);
1282 }
1283 TimeUnit::Microsecond => {
1284 builder
1285 .as_any_mut()
1286 .downcast_mut::<TimestampMicrosecondBuilder>()
1287 .unwrap()
1288 .append_value(value);
1289 }
1290 TimeUnit::Nanosecond => {
1291 builder
1292 .as_any_mut()
1293 .downcast_mut::<TimestampNanosecondBuilder>()
1294 .unwrap()
1295 .append_value(value);
1296 }
1297 }
1298}
1299
1300fn value_to_string(value: &serde_json::Value) -> String {
1301 match value {
1302 serde_json::Value::String(s) => s.clone(),
1303 other => other.to_string(),
1304 }
1305}
1306
1307fn json_type_name(value: &serde_json::Value) -> &'static str {
1308 match value {
1309 serde_json::Value::Null => "null",
1310 serde_json::Value::Bool(_) => "boolean",
1311 serde_json::Value::Number(_) => "number",
1312 serde_json::Value::String(_) => "string",
1313 serde_json::Value::Array(_) => "array",
1314 serde_json::Value::Object(_) => "object",
1315 }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320 use super::*;
1321 use arrow_array::cast::AsArray;
1322 use arrow_schema::{Field, Schema};
1323
1324 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
1325 Arc::new(Schema::new(
1326 fields
1327 .into_iter()
1328 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
1329 .collect::<Vec<_>>(),
1330 ))
1331 }
1332
1333 fn json_record(json: &str) -> RawRecord {
1334 RawRecord::new(json.as_bytes().to_vec())
1335 }
1336
1337 #[test]
1340 fn test_decode_empty_batch() {
1341 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1342 let decoder = JsonDecoder::new(schema.clone());
1343 let batch = decoder.decode_batch(&[]).unwrap();
1344 assert_eq!(batch.num_rows(), 0);
1345 assert_eq!(batch.schema(), schema);
1346 }
1347
1348 #[test]
1349 fn test_decode_single_record() {
1350 let schema = make_schema(vec![
1351 ("id", DataType::Int64, false),
1352 ("name", DataType::Utf8, true),
1353 ]);
1354 let decoder = JsonDecoder::new(schema);
1355 let records = vec![json_record(r#"{"id": 42, "name": "Alice"}"#)];
1356 let batch = decoder.decode_batch(&records).unwrap();
1357
1358 assert_eq!(batch.num_rows(), 1);
1359 assert_eq!(
1360 batch
1361 .column(0)
1362 .as_primitive::<arrow_array::types::Int64Type>()
1363 .value(0),
1364 42
1365 );
1366 assert_eq!(batch.column(1).as_string::<i32>().value(0), "Alice");
1367 }
1368
1369 #[test]
1370 fn test_decode_multiple_records() {
1371 let schema = make_schema(vec![
1372 ("x", DataType::Int64, false),
1373 ("y", DataType::Float64, false),
1374 ]);
1375 let decoder = JsonDecoder::new(schema);
1376 let records = vec![
1377 json_record(r#"{"x": 1, "y": 1.5}"#),
1378 json_record(r#"{"x": 2, "y": 2.5}"#),
1379 json_record(r#"{"x": 3, "y": 3.5}"#),
1380 ];
1381 let batch = decoder.decode_batch(&records).unwrap();
1382
1383 assert_eq!(batch.num_rows(), 3);
1384 let x_col = batch
1385 .column(0)
1386 .as_primitive::<arrow_array::types::Int64Type>();
1387 assert_eq!(x_col.value(0), 1);
1388 assert_eq!(x_col.value(1), 2);
1389 assert_eq!(x_col.value(2), 3);
1390 }
1391
1392 #[test]
1393 fn test_decode_all_types() {
1394 let schema = make_schema(vec![
1395 ("bool_col", DataType::Boolean, false),
1396 ("int_col", DataType::Int64, false),
1397 ("float_col", DataType::Float64, false),
1398 ("str_col", DataType::Utf8, false),
1399 ]);
1400 let decoder = JsonDecoder::new(schema);
1401 let records = vec![json_record(
1402 r#"{"bool_col": true, "int_col": 42, "float_col": 3.14, "str_col": "hello"}"#,
1403 )];
1404 let batch = decoder.decode_batch(&records).unwrap();
1405
1406 assert_eq!(batch.num_rows(), 1);
1407 assert!(batch.column(0).as_boolean().value(0));
1408 assert_eq!(
1409 batch
1410 .column(1)
1411 .as_primitive::<arrow_array::types::Int64Type>()
1412 .value(0),
1413 42
1414 );
1415 let f = batch
1416 .column(2)
1417 .as_primitive::<arrow_array::types::Float64Type>()
1418 .value(0);
1419 assert!((f - 3.14).abs() < f64::EPSILON);
1420 assert_eq!(batch.column(3).as_string::<i32>().value(0), "hello");
1421 }
1422
1423 #[test]
1426 fn test_decode_null_values() {
1427 let schema = make_schema(vec![
1428 ("a", DataType::Int64, true),
1429 ("b", DataType::Utf8, true),
1430 ]);
1431 let decoder = JsonDecoder::new(schema);
1432 let records = vec![json_record(r#"{"a": null, "b": null}"#)];
1433 let batch = decoder.decode_batch(&records).unwrap();
1434
1435 assert!(batch.column(0).is_null(0));
1436 assert!(batch.column(1).is_null(0));
1437 }
1438
1439 #[test]
1440 fn test_decode_missing_field_becomes_null() {
1441 let schema = make_schema(vec![
1442 ("a", DataType::Int64, true),
1443 ("b", DataType::Utf8, true),
1444 ]);
1445 let decoder = JsonDecoder::new(schema);
1446 let records = vec![json_record(r#"{"a": 1}"#)]; let batch = decoder.decode_batch(&records).unwrap();
1448
1449 assert_eq!(
1450 batch
1451 .column(0)
1452 .as_primitive::<arrow_array::types::Int64Type>()
1453 .value(0),
1454 1
1455 );
1456 assert!(batch.column(1).is_null(0));
1457 }
1458
1459 #[test]
1462 fn test_mismatch_null_strategy() {
1463 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1464 let config = JsonDecoderConfig {
1465 type_mismatch: TypeMismatchStrategy::Null,
1466 ..Default::default()
1467 };
1468 let decoder = JsonDecoder::with_config(schema, config);
1469 let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1470 let batch = decoder.decode_batch(&records).unwrap();
1471
1472 assert!(batch.column(0).is_null(0));
1473 assert_eq!(decoder.mismatch_count(), 1);
1474 }
1475
1476 #[test]
1477 fn test_mismatch_coerce_strategy() {
1478 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1479 let config = JsonDecoderConfig {
1480 type_mismatch: TypeMismatchStrategy::Coerce,
1481 ..Default::default()
1482 };
1483 let decoder = JsonDecoder::with_config(schema, config);
1484 let records = vec![json_record(r#"{"x": "123"}"#)];
1485 let batch = decoder.decode_batch(&records).unwrap();
1486
1487 assert_eq!(
1488 batch
1489 .column(0)
1490 .as_primitive::<arrow_array::types::Int64Type>()
1491 .value(0),
1492 123
1493 );
1494 }
1495
1496 #[test]
1497 fn test_mismatch_reject_strategy() {
1498 let schema = make_schema(vec![("x", DataType::Int64, false)]);
1499 let config = JsonDecoderConfig {
1500 type_mismatch: TypeMismatchStrategy::Reject,
1501 ..Default::default()
1502 };
1503 let decoder = JsonDecoder::with_config(schema, config);
1504 let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1505 let result = decoder.decode_batch(&records);
1506
1507 assert!(result.is_err());
1508 assert!(result.unwrap_err().to_string().contains("type mismatch"));
1509 }
1510
1511 #[test]
1514 fn test_unknown_fields_ignore() {
1515 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1516 let decoder = JsonDecoder::new(schema);
1517 let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1518 let batch = decoder.decode_batch(&records).unwrap();
1519
1520 assert_eq!(batch.num_columns(), 1);
1521 assert_eq!(batch.num_rows(), 1);
1522 }
1523
1524 #[test]
1525 fn test_unknown_fields_reject() {
1526 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1527 let config = JsonDecoderConfig {
1528 unknown_fields: UnknownFieldStrategy::Reject,
1529 ..Default::default()
1530 };
1531 let decoder = JsonDecoder::with_config(schema, config);
1532 let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1533 let result = decoder.decode_batch(&records);
1534
1535 assert!(result.is_err());
1536 assert!(result.unwrap_err().to_string().contains("unknown field"));
1537 }
1538
1539 #[test]
1540 fn test_unknown_fields_collect_extra() {
1541 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1542 let config = JsonDecoderConfig {
1543 unknown_fields: UnknownFieldStrategy::CollectExtra,
1544 ..Default::default()
1545 };
1546 let decoder = JsonDecoder::with_config(schema, config);
1547 let records = vec![json_record(r#"{"a": 1, "extra1": "v1", "extra2": 42}"#)];
1548 let batch = decoder.decode_batch(&records).unwrap();
1549
1550 assert_eq!(batch.num_columns(), 2);
1552 assert_eq!(batch.schema().field(1).name(), "_extra");
1553 assert!(!batch.column(1).is_null(0));
1554 }
1555
1556 #[test]
1559 fn test_decode_timestamp_iso8601() {
1560 let schema = make_schema(vec![(
1561 "ts",
1562 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1563 false,
1564 )]);
1565 let decoder = JsonDecoder::new(schema);
1566 let records = vec![json_record(r#"{"ts": "2025-01-15T10:30:00Z"}"#)];
1567 let batch = decoder.decode_batch(&records).unwrap();
1568
1569 assert!(!batch.column(0).is_null(0));
1570 }
1571
1572 #[test]
1573 fn test_decode_timestamp_epoch_millis() {
1574 let schema = make_schema(vec![(
1575 "ts",
1576 DataType::Timestamp(TimeUnit::Nanosecond, None),
1577 false,
1578 )]);
1579 let decoder = JsonDecoder::new(schema);
1580 let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1581 let batch = decoder.decode_batch(&records).unwrap();
1582
1583 let ts_col = batch
1584 .column(0)
1585 .as_primitive::<arrow_array::types::TimestampNanosecondType>();
1586 assert_eq!(ts_col.value(0), 1_705_312_200_000_000_000);
1588 }
1589
1590 #[test]
1591 fn test_decode_out_of_range_float_timestamp_is_rejected() {
1592 let schema = make_schema(vec![(
1596 "ts",
1597 DataType::Timestamp(TimeUnit::Millisecond, None),
1598 false,
1599 )]);
1600 let decoder = JsonDecoder::new(schema);
1601 let records = vec![json_record(r#"{"ts": 1e30}"#)];
1602 let result = decoder.decode_batch(&records);
1603
1604 assert!(result.is_err());
1605 assert!(result.unwrap_err().to_string().contains("out of i64 range"));
1606 }
1607
1608 #[test]
1609 fn test_decode_timestamp_overflow_on_nanosecond_scaling_is_rejected() {
1610 let schema = make_schema(vec![(
1613 "ts",
1614 DataType::Timestamp(TimeUnit::Nanosecond, None),
1615 false,
1616 )]);
1617 let decoder = JsonDecoder::new(schema);
1618 let records = vec![json_record(r#"{"ts": 9999999999999999}"#)];
1619 let result = decoder.decode_batch(&records);
1620
1621 assert!(result.is_err());
1622 assert!(result
1623 .unwrap_err()
1624 .to_string()
1625 .contains("out of i64 Nanosecond range"));
1626 }
1627
1628 #[test]
1631 fn test_decode_nested_object_as_json_string() {
1632 let schema = make_schema(vec![("data", DataType::LargeBinary, true)]);
1633 let decoder = JsonDecoder::new(schema);
1634 let records = vec![json_record(r#"{"data": {"nested": true}}"#)];
1635 let batch = decoder.decode_batch(&records).unwrap();
1636
1637 assert!(!batch.column(0).is_null(0));
1638 }
1639
1640 #[test]
1643 fn test_decode_invalid_json() {
1644 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1645 let decoder = JsonDecoder::new(schema);
1646 let records = vec![RawRecord::new(b"not json".to_vec())];
1647 let result = decoder.decode_batch(&records);
1648
1649 assert!(result.is_err());
1650 assert!(result.unwrap_err().to_string().contains("JSON parse error"));
1651 }
1652
1653 #[test]
1654 fn test_decode_non_object_json() {
1655 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1656 let decoder = JsonDecoder::new(schema);
1657 let records = vec![json_record("[1, 2, 3]")];
1658 let result = decoder.decode_batch(&records);
1659
1660 assert!(result.is_err());
1661 assert!(result
1662 .unwrap_err()
1663 .to_string()
1664 .contains("must be an object"));
1665 }
1666
1667 #[test]
1670 fn test_format_name() {
1671 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1672 let decoder = JsonDecoder::new(schema);
1673 assert_eq!(decoder.format_name(), "json");
1674 }
1675
1676 #[test]
1677 fn test_output_schema() {
1678 let schema = make_schema(vec![
1679 ("a", DataType::Int64, false),
1680 ("b", DataType::Utf8, true),
1681 ]);
1682 let decoder = JsonDecoder::new(schema.clone());
1683 assert_eq!(decoder.output_schema(), schema);
1684 }
1685
1686 #[test]
1687 fn test_decode_one() {
1688 let schema = make_schema(vec![("x", DataType::Int64, false)]);
1689 let decoder = JsonDecoder::new(schema);
1690 let record = json_record(r#"{"x": 99}"#);
1691 let batch = decoder.decode_one(&record).unwrap();
1692 assert_eq!(batch.num_rows(), 1);
1693 assert_eq!(
1694 batch
1695 .column(0)
1696 .as_primitive::<arrow_array::types::Int64Type>()
1697 .value(0),
1698 99
1699 );
1700 }
1701
1702 #[test]
1705 fn test_decode_int_from_float_json() {
1706 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1709 let decoder = JsonDecoder::new(schema);
1710 let records = vec![json_record(r#"{"x": 42.0}"#)];
1711 let batch = decoder.decode_batch(&records).unwrap();
1712 assert_eq!(
1713 batch
1714 .column(0)
1715 .as_primitive::<arrow_array::types::Int64Type>()
1716 .value(0),
1717 42
1718 );
1719 }
1720
1721 #[test]
1722 fn test_decode_float_from_int_json() {
1723 let schema = make_schema(vec![("x", DataType::Float64, false)]);
1725 let decoder = JsonDecoder::new(schema);
1726 let records = vec![json_record(r#"{"x": 42}"#)];
1727 let batch = decoder.decode_batch(&records).unwrap();
1728 let val = batch
1729 .column(0)
1730 .as_primitive::<arrow_array::types::Float64Type>()
1731 .value(0);
1732 assert!((val - 42.0).abs() < f64::EPSILON);
1733 }
1734
1735 #[test]
1738 fn test_decode_string_number_to_float64() {
1739 let schema = make_schema(vec![("price", DataType::Float64, false)]);
1740 let decoder = JsonDecoder::new(schema);
1741 let records = vec![json_record(r#"{"price": "187.52"}"#)];
1742 let batch = decoder.decode_batch(&records).unwrap();
1743 let val = batch
1744 .column(0)
1745 .as_primitive::<arrow_array::types::Float64Type>()
1746 .value(0);
1747 assert!((val - 187.52).abs() < f64::EPSILON);
1748 }
1749
1750 #[test]
1751 fn test_decode_string_to_int() {
1752 let schema = make_schema(vec![("qty", DataType::Int32, false)]);
1753 let decoder = JsonDecoder::new(schema);
1754 let records = vec![json_record(r#"{"qty": "100"}"#)];
1755 let batch = decoder.decode_batch(&records).unwrap();
1756 assert_eq!(
1757 batch
1758 .column(0)
1759 .as_primitive::<arrow_array::types::Int32Type>()
1760 .value(0),
1761 100
1762 );
1763 }
1764
1765 #[test]
1766 fn test_decode_epoch_millis_to_timestamp_millis() {
1767 let schema = make_schema(vec![(
1768 "ts",
1769 DataType::Timestamp(TimeUnit::Millisecond, None),
1770 false,
1771 )]);
1772 let decoder = JsonDecoder::new(schema);
1773 let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1774 let batch = decoder.decode_batch(&records).unwrap();
1775 let ts_col = batch
1776 .column(0)
1777 .as_primitive::<arrow_array::types::TimestampMillisecondType>();
1778 assert_eq!(ts_col.value(0), 1_705_312_200_000);
1779 }
1780
1781 #[test]
1782 fn test_decode_int_to_float_promotion() {
1783 let schema = make_schema(vec![("val", DataType::Float64, false)]);
1784 let decoder = JsonDecoder::new(schema);
1785 let records = vec![json_record(r#"{"val": 100}"#)];
1786 let batch = decoder.decode_batch(&records).unwrap();
1787 let val = batch
1788 .column(0)
1789 .as_primitive::<arrow_array::types::Float64Type>()
1790 .value(0);
1791 assert!((val - 100.0).abs() < f64::EPSILON);
1792 }
1793
1794 #[test]
1795 fn test_decode_string_boolean() {
1796 let schema = make_schema(vec![("active", DataType::Boolean, false)]);
1797 let decoder = JsonDecoder::new(schema);
1798 let records = vec![json_record(r#"{"active": "true"}"#)];
1799 let batch = decoder.decode_batch(&records).unwrap();
1800 assert!(batch.column(0).as_boolean().value(0));
1801 }
1802
1803 #[test]
1804 fn test_coerce_fails_on_unconvertible() {
1805 let schema = make_schema(vec![("x", DataType::Int64, true)]);
1807 let decoder = JsonDecoder::new(schema);
1808 let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1809 let result = decoder.decode_batch(&records);
1810 assert!(result.is_err());
1811 assert!(result
1812 .unwrap_err()
1813 .to_string()
1814 .contains("type coercion failed"));
1815 }
1816
1817 #[test]
1818 fn test_enforcement_str_parsing() {
1819 assert_eq!(
1820 TypeMismatchStrategy::from_enforcement_str("coerce"),
1821 Some(TypeMismatchStrategy::Coerce)
1822 );
1823 assert_eq!(
1824 TypeMismatchStrategy::from_enforcement_str("STRICT"),
1825 Some(TypeMismatchStrategy::Reject)
1826 );
1827 assert_eq!(
1828 TypeMismatchStrategy::from_enforcement_str("Permissive"),
1829 Some(TypeMismatchStrategy::Null)
1830 );
1831 assert_eq!(TypeMismatchStrategy::from_enforcement_str("unknown"), None);
1832 }
1833
1834 #[test]
1837 fn test_decode_i8_and_u8() {
1838 let schema = make_schema(vec![
1839 ("signed", DataType::Int8, false),
1840 ("unsigned", DataType::UInt8, false),
1841 ]);
1842 let decoder = JsonDecoder::new(schema);
1843 let records = vec![json_record(r#"{"signed": -5, "unsigned": 200}"#)];
1844 let batch = decoder.decode_batch(&records).unwrap();
1845 assert_eq!(
1846 batch
1847 .column(0)
1848 .as_primitive::<arrow_array::types::Int8Type>()
1849 .value(0),
1850 -5
1851 );
1852 assert_eq!(
1853 batch
1854 .column(1)
1855 .as_primitive::<arrow_array::types::UInt8Type>()
1856 .value(0),
1857 200
1858 );
1859 }
1860
1861 #[test]
1864 fn test_json_path_single() {
1865 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1866 let config = JsonDecoderConfig {
1867 json_path: Some(vec!["data".into()]),
1868 ..Default::default()
1869 };
1870 let decoder = JsonDecoder::with_config(schema, config);
1871 let records = vec![json_record(r#"{"data":{"id":1}}"#)];
1872 let batch = decoder.decode_batch(&records).unwrap();
1873 assert_eq!(batch.num_rows(), 1);
1874 assert_eq!(
1875 batch
1876 .column(0)
1877 .as_primitive::<arrow_array::types::Int64Type>()
1878 .value(0),
1879 1
1880 );
1881 }
1882
1883 #[test]
1884 fn test_json_path_multi() {
1885 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1886 let config = JsonDecoderConfig {
1887 json_path: Some(vec!["a".into(), "b".into()]),
1888 ..Default::default()
1889 };
1890 let decoder = JsonDecoder::with_config(schema, config);
1891 let records = vec![json_record(r#"{"a":{"b":{"id":1}}}"#)];
1892 let batch = decoder.decode_batch(&records).unwrap();
1893 assert_eq!(
1894 batch
1895 .column(0)
1896 .as_primitive::<arrow_array::types::Int64Type>()
1897 .value(0),
1898 1
1899 );
1900 }
1901
1902 #[test]
1907 fn test_json_path_missing_skips_record() {
1908 let schema = make_schema(vec![("id", DataType::Int64, false)]);
1909 let config = JsonDecoderConfig {
1910 json_path: Some(vec!["data".into()]),
1911 ..Default::default()
1912 };
1913 let decoder = JsonDecoder::with_config(schema, config);
1914 let records = vec![
1915 json_record(r#"{"success":true,"op":"subscribe"}"#), json_record(r#"{"data":{"id":42}}"#),
1917 ];
1918 let batch = decoder.decode_batch(&records).unwrap();
1919 assert_eq!(batch.num_rows(), 1);
1920 assert_eq!(
1921 batch
1922 .column(0)
1923 .as_primitive::<arrow_array::types::Int64Type>()
1924 .value(0),
1925 42
1926 );
1927 }
1928
1929 #[test]
1932 fn test_json_column_custom_path() {
1933 let schema = make_schema(vec![
1934 ("p", DataType::Float64, false),
1935 ("stream_name", DataType::Utf8, true),
1936 ]);
1937 let mut col_paths = HashMap::new();
1938 col_paths.insert("stream_name".into(), vec!["stream".into()]);
1939 let config = JsonDecoderConfig {
1940 json_path: Some(vec!["data".into()]),
1941 json_column_paths: col_paths,
1942 ..Default::default()
1943 };
1944 let decoder = JsonDecoder::with_config(schema, config);
1945 let records = vec![json_record(
1946 r#"{"stream":"btcusdt@trade","data":{"p":67523.0}}"#,
1947 )];
1948 let batch = decoder.decode_batch(&records).unwrap();
1949 assert_eq!(batch.num_rows(), 1);
1950 let price = batch
1951 .column(0)
1952 .as_primitive::<arrow_array::types::Float64Type>()
1953 .value(0);
1954 assert!((price - 67523.0).abs() < f64::EPSILON);
1955 assert_eq!(batch.column(1).as_string::<i32>().value(0), "btcusdt@trade");
1956 }
1957
1958 #[test]
1959 fn test_json_column_deep_path() {
1960 let schema = make_schema(vec![
1961 ("p", DataType::Float64, false),
1962 ("ts", DataType::Int64, true),
1963 ]);
1964 let mut col_paths = HashMap::new();
1965 col_paths.insert("ts".into(), vec!["meta".into(), "timestamp".into()]);
1966 let config = JsonDecoderConfig {
1967 json_path: Some(vec!["data".into()]),
1968 json_column_paths: col_paths,
1969 ..Default::default()
1970 };
1971 let decoder = JsonDecoder::with_config(schema, config);
1972 let records = vec![json_record(
1973 r#"{"meta":{"timestamp":123456},"data":{"p":99.5}}"#,
1974 )];
1975 let batch = decoder.decode_batch(&records).unwrap();
1976 assert_eq!(batch.num_rows(), 1);
1977 assert_eq!(
1978 batch
1979 .column(1)
1980 .as_primitive::<arrow_array::types::Int64Type>()
1981 .value(0),
1982 123456
1983 );
1984 }
1985
1986 #[test]
1987 fn test_json_column_missing_returns_null() {
1988 let schema = make_schema(vec![
1989 ("id", DataType::Int64, false),
1990 ("missing_col", DataType::Utf8, true),
1991 ]);
1992 let mut col_paths = HashMap::new();
1993 col_paths.insert("missing_col".into(), vec!["nowhere".into(), "gone".into()]);
1994 let config = JsonDecoderConfig {
1995 json_column_paths: col_paths,
1996 ..Default::default()
1997 };
1998 let decoder = JsonDecoder::with_config(schema, config);
1999 let records = vec![json_record(r#"{"id":42}"#)];
2000 let batch = decoder.decode_batch(&records).unwrap();
2001 assert_eq!(
2002 batch
2003 .column(0)
2004 .as_primitive::<arrow_array::types::Int64Type>()
2005 .value(0),
2006 42
2007 );
2008 assert!(batch.column(1).is_null(0));
2009 }
2010
2011 #[test]
2012 fn test_json_column_path_to_nested_string_array() {
2013 use arrow_array::Array;
2014 use arrow_schema::Field;
2015 let schema = make_schema(vec![(
2016 "tags",
2017 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2018 true,
2019 )]);
2020 let mut col_paths = HashMap::new();
2021 col_paths.insert("tags".into(), vec!["record".into(), "tags".into()]);
2022 let config = JsonDecoderConfig {
2023 json_column_paths: col_paths,
2024 ..Default::default()
2025 };
2026 let decoder = JsonDecoder::with_config(schema, config);
2027 let records = vec![json_record(r#"{"record":{"tags":["en","es"]}}"#)];
2028 let batch = decoder.decode_batch(&records).unwrap();
2029
2030 let list = batch
2031 .column(0)
2032 .as_any()
2033 .downcast_ref::<arrow_array::ListArray>()
2034 .expect("List column");
2035 let vals = list.value(0);
2036 let strs = vals.as_string::<i32>();
2037 assert_eq!(strs.len(), 2);
2038 assert_eq!(strs.value(0), "en");
2039 assert_eq!(strs.value(1), "es");
2040 }
2041
2042 #[test]
2043 fn test_list_column_non_array_honors_reject_strategy() {
2044 use arrow_schema::Field;
2045 let schema = make_schema(vec![(
2046 "tags",
2047 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2048 true,
2049 )]);
2050 let config = JsonDecoderConfig {
2051 type_mismatch: TypeMismatchStrategy::Reject,
2052 ..Default::default()
2053 };
2054 let decoder = JsonDecoder::with_config(schema, config);
2055 let records = vec![json_record(r#"{"tags": "en"}"#)];
2058 let result = decoder.decode_batch(&records);
2059
2060 assert!(result.is_err());
2061 assert!(result.unwrap_err().to_string().contains("type mismatch"));
2062 }
2063
2064 #[test]
2067 fn test_json_explode_arrays() {
2068 let schema = make_schema(vec![
2069 ("price", DataType::Utf8, true),
2070 ("qty", DataType::Utf8, true),
2071 ]);
2072 let config = JsonDecoderConfig {
2073 json_explode: Some(vec!["price".into(), "qty".into()]),
2074 ..Default::default()
2075 };
2076 let decoder = JsonDecoder::with_config(schema, config);
2077 let records = vec![json_record(r#"[["67523","1.5"],["67522","0.8"]]"#)];
2078 let batch = decoder.decode_batch(&records).unwrap();
2079 assert_eq!(batch.num_rows(), 2);
2080 assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
2081 assert_eq!(batch.column(0).as_string::<i32>().value(1), "67522");
2082 assert_eq!(batch.column(1).as_string::<i32>().value(0), "1.5");
2083 assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
2084 }
2085
2086 #[test]
2087 fn test_json_explode_objects() {
2088 let schema = make_schema(vec![
2089 ("id", DataType::Int64, true),
2090 ("name", DataType::Utf8, true),
2091 ]);
2092 let config = JsonDecoderConfig {
2093 json_explode: Some(vec!["id".into(), "name".into()]),
2094 ..Default::default()
2095 };
2096 let decoder = JsonDecoder::with_config(schema, config);
2097 let records = vec![json_record(
2098 r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#,
2099 )];
2100 let batch = decoder.decode_batch(&records).unwrap();
2101 assert_eq!(batch.num_rows(), 2);
2102 assert_eq!(
2103 batch
2104 .column(0)
2105 .as_primitive::<arrow_array::types::Int64Type>()
2106 .value(0),
2107 1
2108 );
2109 assert_eq!(batch.column(1).as_string::<i32>().value(1), "Bob");
2110 }
2111
2112 #[test]
2115 fn test_json_path_plus_explode() {
2116 let schema = make_schema(vec![
2117 ("price", DataType::Utf8, true),
2118 ("qty", DataType::Utf8, true),
2119 ]);
2120 let config = JsonDecoderConfig {
2121 json_path: Some(vec!["bids".into()]),
2122 json_explode: Some(vec!["price".into(), "qty".into()]),
2123 ..Default::default()
2124 };
2125 let decoder = JsonDecoder::with_config(schema, config);
2126 let records = vec![json_record(r#"{"bids":[["67523","1.5"],["67522","0.8"]]}"#)];
2127 let batch = decoder.decode_batch(&records).unwrap();
2128 assert_eq!(batch.num_rows(), 2);
2129 assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
2130 assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
2131 }
2132
2133 #[test]
2136 fn test_from_connector_config() {
2137 let mut config = crate::config::ConnectorConfig::new("websocket");
2138 config.set("json.path", "data.trade");
2139 config.set("json.column.stream_name", "stream");
2140 config.set("json.column.ts", "meta.timestamp");
2141 config.set("json.explode", "price, qty");
2142 config.set("schema.enforcement", "strict");
2143 config.set("nested.as.jsonb", "true");
2144
2145 let cfg = JsonDecoderConfig::from_connector_config(&config);
2146 assert_eq!(cfg.json_path, Some(vec!["data".into(), "trade".into()]));
2147 assert_eq!(
2148 cfg.json_column_paths.get("stream_name"),
2149 Some(&vec!["stream".into()])
2150 );
2151 assert_eq!(
2152 cfg.json_column_paths.get("ts"),
2153 Some(&vec!["meta".into(), "timestamp".into()])
2154 );
2155 assert_eq!(cfg.json_explode, Some(vec!["price".into(), "qty".into()]));
2156 assert_eq!(cfg.type_mismatch, TypeMismatchStrategy::Reject);
2157 assert!(cfg.nested_as_jsonb);
2158 }
2159
2160 #[test]
2161 fn test_default_config_unchanged() {
2162 let schema = make_schema(vec![
2163 ("a", DataType::Int64, false),
2164 ("b", DataType::Utf8, true),
2165 ]);
2166 let decoder = JsonDecoder::new(schema);
2167 let records = vec![
2168 json_record(r#"{"a": 1, "b": "hello"}"#),
2169 json_record(r#"{"a": 2, "b": "world"}"#),
2170 ];
2171 let batch = decoder.decode_batch(&records).unwrap();
2172 assert_eq!(batch.num_rows(), 2);
2173 assert_eq!(
2174 batch
2175 .column(0)
2176 .as_primitive::<arrow_array::types::Int64Type>()
2177 .value(0),
2178 1
2179 );
2180 assert_eq!(batch.column(1).as_string::<i32>().value(1), "world");
2181 }
2182
2183 #[test]
2184 fn test_unknown_fields_with_path() {
2185 let schema = make_schema(vec![("a", DataType::Int64, false)]);
2186 let config = JsonDecoderConfig {
2187 json_path: Some(vec!["data".into()]),
2188 unknown_fields: UnknownFieldStrategy::CollectExtra,
2189 ..Default::default()
2190 };
2191 let decoder = JsonDecoder::with_config(schema, config);
2192 let records = vec![json_record(r#"{"data":{"a":1,"extra":"value"}}"#)];
2193 let batch = decoder.decode_batch(&records).unwrap();
2194 assert_eq!(batch.num_columns(), 2); assert_eq!(batch.schema().field(1).name(), "_extra");
2196 assert!(!batch.column(1).is_null(0));
2197 }
2198
2199 #[test]
2202 fn from_connector_config_parses_epoch_unit_without_polluting_paths() {
2203 use crate::config::ConnectorConfig;
2204 let mut props = std::collections::HashMap::new();
2205 props.insert("json.column.evt".to_string(), "time_us".to_string());
2206 props.insert(
2207 "json.column.evt.epoch_unit".to_string(),
2208 "micros".to_string(),
2209 );
2210 let cc = ConnectorConfig::with_properties("websocket", props);
2211 let cfg = JsonDecoderConfig::from_connector_config(&cc);
2212 assert_eq!(
2213 cfg.numeric_timestamp_units.get("evt"),
2214 Some(&EpochUnit::Micros)
2215 );
2216 assert!(cfg.json_column_paths.contains_key("evt"));
2218 assert!(!cfg.json_column_paths.contains_key("evt.epoch_unit"));
2219 }
2220
2221 #[test]
2222 fn numeric_micros_decodes_into_millis_timestamp_column() {
2223 let schema = make_schema(vec![(
2227 "ts",
2228 DataType::Timestamp(TimeUnit::Millisecond, None),
2229 false,
2230 )]);
2231 let mut units = HashMap::new();
2232 units.insert("ts".to_string(), EpochUnit::Micros);
2233 let config = JsonDecoderConfig {
2234 numeric_timestamp_units: units,
2235 ..Default::default()
2236 };
2237 let decoder = JsonDecoder::with_config(schema, config);
2238 let records = vec![json_record(r#"{"ts": 1779019200123456}"#)];
2240 let batch = decoder.decode_batch(&records).unwrap();
2241 let col = batch
2242 .column(0)
2243 .as_primitive::<arrow_array::types::TimestampMillisecondType>();
2244 assert_eq!(col.value(0), 1_779_019_200_123);
2245
2246 let schema2 = make_schema(vec![(
2248 "ts",
2249 DataType::Timestamp(TimeUnit::Millisecond, None),
2250 false,
2251 )]);
2252 let d2 = JsonDecoder::new(schema2);
2253 let b2 = d2.decode_batch(&records).unwrap();
2254 let c2 = b2
2255 .column(0)
2256 .as_primitive::<arrow_array::types::TimestampMillisecondType>();
2257 assert_eq!(c2.value(0), 1_779_019_200_123_456);
2258 }
2259}