Skip to main content

laminar_connectors/schema/json/
decoder.rs

1//! JSON format decoder implementing [`FormatDecoder`].
2//!
3//! Converts raw JSON byte payloads into Arrow `RecordBatch`es.
4//! Constructed once at `CREATE SOURCE` time with a frozen Arrow schema;
5//! the decoder is stateless after construction so the Ring 1 hot path
6//! has zero schema lookups.
7#![allow(clippy::disallowed_types)] // cold path: schema management
8
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12
13use arrow_array::builder::{
14    BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    Int8Builder, LargeBinaryBuilder, LargeStringBuilder, StringBuilder,
16    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
17    TimestampSecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, SchemaRef, TimeUnit};
21
22use crate::schema::error::{SchemaError, SchemaResult};
23use crate::schema::json::jsonb::JsonbEncoder;
24use crate::schema::traits::FormatDecoder;
25use crate::schema::types::RawRecord;
26
27/// Strategy for JSON fields not in the Arrow schema.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum UnknownFieldStrategy {
30    /// Silently ignore unknown fields (default).
31    Ignore,
32    /// Collect unknown fields into an `_extra` `LargeBinary` (JSONB) column.
33    CollectExtra,
34    /// Return a decode error if any unknown field is encountered.
35    Reject,
36}
37
38/// Strategy for JSON values that don't match the expected Arrow type.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum TypeMismatchStrategy {
41    /// Insert null and increment the mismatch counter (default).
42    Null,
43    /// Attempt coercion (e.g., `"123"` → `123` for Int64 columns).
44    Coerce,
45    /// Return a decode error on the first mismatch.
46    Reject,
47}
48
49impl TypeMismatchStrategy {
50    /// Parse from a `WITH` option value (`schema.enforcement`).
51    #[must_use]
52    pub fn from_enforcement_str(s: &str) -> Option<Self> {
53        match s.to_lowercase().as_str() {
54            "coerce" => Some(Self::Coerce),
55            "strict" => Some(Self::Reject),
56            "permissive" => Some(Self::Null),
57            _ => None,
58        }
59    }
60}
61
62/// Per-column extraction strategy, pre-computed at Ring 2.
63#[derive(Debug, Clone)]
64enum ColumnExtraction {
65    /// Extract field from the default path target (json.path or root).
66    DefaultPath,
67    /// Extract field from a custom absolute path from root.
68    CustomPath { segments: Vec<String> },
69}
70
71/// JSON decoder configuration.
72#[derive(Debug, Clone)]
73pub struct JsonDecoderConfig {
74    /// How to handle fields present in JSON but absent from the schema.
75    pub unknown_fields: UnknownFieldStrategy,
76
77    /// How to handle type mismatches.
78    pub type_mismatch: TypeMismatchStrategy,
79
80    /// Timestamp format patterns to try when parsing string values
81    /// into Timestamp columns. Tried in order; first match wins.
82    /// Use `"iso8601"` for RFC 3339 / ISO 8601 auto-detection.
83    pub timestamp_formats: Vec<String>,
84
85    /// Whether to encode nested objects as JSONB binary format
86    /// instead of JSON-serialized Utf8. When true, nested objects
87    /// become `LargeBinary` columns with JSONB encoding.
88    pub nested_as_jsonb: bool,
89
90    /// Dot-separated path to navigate before field extraction.
91    /// e.g. `"data"` → navigate into `{"data": {...}}` before extraction.
92    /// e.g. `"data.trade"` → navigate into `{"data":{"trade":{...}}}`.
93    pub json_path: Option<Vec<String>>,
94
95    /// Per-column absolute path overrides: `column_name` → path segments.
96    /// Parsed from `json.column.<name> = 'path.to.field'` options.
97    /// These paths are absolute from the root, ignoring `json_path`.
98    pub json_column_paths: HashMap<String, Vec<String>>,
99
100    /// Column names for array-to-rows expansion.
101    /// When set, the target (after `json_path`) must be an array.
102    /// Each element becomes a row; positional names map array elements to columns.
103    pub json_explode: Option<Vec<String>>,
104}
105
106impl Default for JsonDecoderConfig {
107    fn default() -> Self {
108        Self {
109            unknown_fields: UnknownFieldStrategy::Ignore,
110            type_mismatch: TypeMismatchStrategy::Coerce,
111            timestamp_formats: vec![
112                "iso8601".into(),
113                "%Y-%m-%dT%H:%M:%S%.fZ".into(),
114                "%Y-%m-%dT%H:%M:%S%.f%:z".into(),
115                "%Y-%m-%d %H:%M:%S%.f".into(),
116                "%Y-%m-%d %H:%M:%S".into(),
117            ],
118            nested_as_jsonb: false,
119            json_path: None,
120            json_column_paths: HashMap::new(),
121            json_explode: None,
122        }
123    }
124}
125
126impl JsonDecoderConfig {
127    /// Build config from a [`ConnectorConfig`](crate::config::ConnectorConfig).
128    ///
129    /// Parses `json.path`, `json.column.*`, `json.explode`,
130    /// `schema.enforcement`, and `nested.as.jsonb` properties.
131    /// Called once at Ring 2 (`CREATE SOURCE` time).
132    #[must_use]
133    pub fn from_connector_config(config: &crate::config::ConnectorConfig) -> Self {
134        let mut cfg = Self::default();
135
136        if let Some(path) = config.get("json.path") {
137            cfg.json_path = Some(path.split('.').map(ToString::to_string).collect());
138        }
139
140        let col_props = config.properties_with_prefix("json.column.");
141        for (col_name, path_str) in col_props {
142            cfg.json_column_paths.insert(
143                col_name,
144                path_str.split('.').map(ToString::to_string).collect(),
145            );
146        }
147
148        if let Some(explode) = config.get("json.explode") {
149            cfg.json_explode = Some(explode.split(',').map(|s| s.trim().to_string()).collect());
150        }
151
152        if let Some(enforcement) = config.get("schema.enforcement") {
153            if let Some(strategy) = TypeMismatchStrategy::from_enforcement_str(enforcement) {
154                cfg.type_mismatch = strategy;
155            }
156        }
157        if let Some(v) = config.get("nested.as.jsonb") {
158            cfg.nested_as_jsonb = v.eq_ignore_ascii_case("true");
159        }
160        cfg
161    }
162}
163
164/// Decodes JSON byte payloads into Arrow `RecordBatch`es.
165///
166/// # Ring Placement
167///
168/// - **Ring 1**: `decode_batch()` — parse JSON, build columnar Arrow output
169/// - **Ring 2**: Construction (`new` / `with_config`) — one-time setup
170pub struct JsonDecoder {
171    /// Frozen output schema.
172    schema: SchemaRef,
173    /// Decoder configuration.
174    config: JsonDecoderConfig,
175    /// Pre-computed field index map: field name → column index.
176    field_indices: Vec<(String, usize)>,
177    /// Cumulative type mismatch count (diagnostics).
178    mismatch_count: AtomicU64,
179    /// Ring 2 pre-computed: extraction strategy per schema column.
180    column_extractions: Vec<ColumnExtraction>,
181    /// Ring 2 pre-computed: explode position → schema column index.
182    /// `Some` when `json.explode` is configured; each entry maps an
183    /// explode position to the schema column index (or `None` if
184    /// the explode name doesn't match any schema column).
185    explode_col_indices: Option<Vec<Option<usize>>>,
186}
187
188#[allow(clippy::missing_fields_in_debug)]
189impl std::fmt::Debug for JsonDecoder {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        f.debug_struct("JsonDecoder")
192            .field("schema", &self.schema)
193            .field("config", &self.config)
194            .field(
195                "mismatch_count",
196                &self.mismatch_count.load(Ordering::Relaxed),
197            )
198            .finish()
199    }
200}
201
202impl JsonDecoder {
203    /// Creates a new JSON decoder for the given Arrow schema.
204    #[must_use]
205    pub fn new(schema: SchemaRef) -> Self {
206        Self::with_config(schema, JsonDecoderConfig::default())
207    }
208
209    /// Creates a new JSON decoder with custom configuration.
210    #[must_use]
211    pub fn with_config(schema: SchemaRef, config: JsonDecoderConfig) -> Self {
212        let field_indices: Vec<(String, usize)> = schema
213            .fields()
214            .iter()
215            .enumerate()
216            .map(|(i, f)| (f.name().clone(), i))
217            .collect();
218
219        let column_extractions: Vec<ColumnExtraction> = schema
220            .fields()
221            .iter()
222            .map(|f| {
223                let col_name = f.name();
224                if let Some(path_segments) = config.json_column_paths.get(col_name.as_str()) {
225                    ColumnExtraction::CustomPath {
226                        segments: path_segments.clone(),
227                    }
228                } else {
229                    ColumnExtraction::DefaultPath
230                }
231            })
232            .collect();
233
234        let explode_col_indices = config.json_explode.as_ref().map(|names| {
235            names
236                .iter()
237                .map(|name| {
238                    field_indices
239                        .iter()
240                        .find(|(n, _)| n == name)
241                        .map(|(_, idx)| *idx)
242                })
243                .collect()
244        });
245
246        Self {
247            schema,
248            config,
249            field_indices,
250            mismatch_count: AtomicU64::new(0),
251            column_extractions,
252            explode_col_indices,
253        }
254    }
255
256    /// Returns the cumulative type mismatch count.
257    pub fn mismatch_count(&self) -> u64 {
258        self.mismatch_count.load(Ordering::Relaxed)
259    }
260}
261
262impl FormatDecoder for JsonDecoder {
263    fn output_schema(&self) -> SchemaRef {
264        self.schema.clone()
265    }
266
267    #[allow(clippy::too_many_lines)]
268    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
269        if records.is_empty() {
270            return Ok(RecordBatch::new_empty(self.schema.clone()));
271        }
272
273        let num_fields = self.schema.fields().len();
274        let capacity = records.len();
275
276        // Initialise one builder per schema column.
277        let mut builders = create_builders(&self.schema, capacity);
278
279        // Optional _extra JSONB column for CollectExtra strategy.
280        let collect_extra = matches!(
281            self.config.unknown_fields,
282            UnknownFieldStrategy::CollectExtra
283        );
284        let mut extra_builder = if collect_extra {
285            Some(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
286        } else {
287            None
288        };
289
290        let mut jsonb_encoder = if self.config.nested_as_jsonb {
291            Some(JsonbEncoder::new())
292        } else {
293            None
294        };
295
296        // Reusable populated-tracking buffer — avoids heap alloc per record.
297        let mut populated = vec![false; num_fields];
298
299        for record in records {
300            let value: serde_json::Value = serde_json::from_slice(&record.value)
301                .map_err(|e| SchemaError::DecodeError(format!("JSON parse error: {e}")))?;
302
303            // Navigate json.path → default target (borrowed, ZERO alloc).
304            let default_target: &serde_json::Value = if let Some(ref path) = self.config.json_path {
305                let mut current = &value;
306                for segment in path {
307                    current = current.get(segment.as_str()).ok_or_else(|| {
308                        SchemaError::DecodeError(format!("json.path segment '{segment}' not found"))
309                    })?;
310                }
311                current
312            } else {
313                &value
314            };
315
316            if let Some(ref col_indices) = self.explode_col_indices {
317                // === EXPLODE MODE ===
318                let arr = default_target.as_array().ok_or_else(|| {
319                    SchemaError::DecodeError("json.explode target must be an array".into())
320                })?;
321                for element in arr {
322                    populated.fill(false);
323                    match element {
324                        serde_json::Value::Array(items) => {
325                            for (pos, col_idx_opt) in col_indices.iter().enumerate() {
326                                if let Some(col_idx) = col_idx_opt {
327                                    let val = items.get(pos).unwrap_or(&serde_json::Value::Null);
328                                    let field = &self.schema.fields()[*col_idx];
329                                    append_value(
330                                        &mut builders[*col_idx],
331                                        field.data_type(),
332                                        val,
333                                        &self.config,
334                                        &self.mismatch_count,
335                                        jsonb_encoder.as_mut(),
336                                    )?;
337                                    populated[*col_idx] = true;
338                                }
339                            }
340                        }
341                        serde_json::Value::Object(obj) => {
342                            for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
343                                if let Some(val) = obj.get(col_name.as_str()) {
344                                    let field = &self.schema.fields()[col_idx];
345                                    append_value(
346                                        &mut builders[col_idx],
347                                        field.data_type(),
348                                        val,
349                                        &self.config,
350                                        &self.mismatch_count,
351                                        jsonb_encoder.as_mut(),
352                                    )?;
353                                    populated[col_idx] = true;
354                                }
355                            }
356                        }
357                        _ => {
358                            return Err(SchemaError::DecodeError(
359                                "json.explode array elements must be arrays or objects".into(),
360                            ));
361                        }
362                    }
363                    // Append nulls for missing fields.
364                    for (col_idx, was_populated) in populated.iter().enumerate() {
365                        if !was_populated {
366                            append_null(&mut builders[col_idx]);
367                        }
368                    }
369                    // _extra column: append null for each exploded row.
370                    if let Some(ref mut eb) = extra_builder {
371                        eb.append_null();
372                    }
373                }
374            } else {
375                // === NORMAL OBJECT MODE (with per-column path support) ===
376                let default_obj = default_target.as_object().ok_or_else(|| {
377                    SchemaError::DecodeError("JSON value must be an object".into())
378                })?;
379
380                populated.fill(false);
381
382                // Collect unknown fields for CollectExtra.
383                let mut extra_fields: Option<serde_json::Map<String, serde_json::Value>> =
384                    if collect_extra {
385                        Some(serde_json::Map::new())
386                    } else {
387                        None
388                    };
389
390                // Per-column extraction using pre-computed ColumnExtraction.
391                for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
392                    match &self.column_extractions[col_idx] {
393                        ColumnExtraction::DefaultPath => {
394                            if let Some(val) = default_obj.get(col_name.as_str()) {
395                                populated[col_idx] = true;
396                                let field = &self.schema.fields()[col_idx];
397                                append_value(
398                                    &mut builders[col_idx],
399                                    field.data_type(),
400                                    val,
401                                    &self.config,
402                                    &self.mismatch_count,
403                                    jsonb_encoder.as_mut(),
404                                )?;
405                            }
406                        }
407                        ColumnExtraction::CustomPath { segments } => {
408                            // Navigate custom path from root (ZERO alloc).
409                            let extracted = navigate_path(&value, segments);
410                            if let Some(val) = extracted {
411                                populated[col_idx] = true;
412                                let field = &self.schema.fields()[col_idx];
413                                append_value(
414                                    &mut builders[col_idx],
415                                    field.data_type(),
416                                    val,
417                                    &self.config,
418                                    &self.mismatch_count,
419                                    jsonb_encoder.as_mut(),
420                                )?;
421                            }
422                        }
423                    }
424                }
425
426                // Collect unknown fields (fields in default_obj not in schema).
427                if collect_extra {
428                    for (key, val) in default_obj {
429                        if self.field_index(key).is_none() {
430                            match self.config.unknown_fields {
431                                UnknownFieldStrategy::CollectExtra => {
432                                    if let Some(ref mut extra) = extra_fields {
433                                        extra.insert(key.clone(), val.clone());
434                                    }
435                                }
436                                UnknownFieldStrategy::Reject => {
437                                    return Err(SchemaError::DecodeError(format!(
438                                        "unknown field '{key}' not in schema"
439                                    )));
440                                }
441                                UnknownFieldStrategy::Ignore => {}
442                            }
443                        }
444                    }
445                } else if matches!(self.config.unknown_fields, UnknownFieldStrategy::Reject) {
446                    for key in default_obj.keys() {
447                        if self.field_index(key).is_none() {
448                            return Err(SchemaError::DecodeError(format!(
449                                "unknown field '{key}' not in schema"
450                            )));
451                        }
452                    }
453                }
454
455                // Append nulls for missing fields.
456                for (col_idx, was_populated) in populated.iter().enumerate() {
457                    if !was_populated {
458                        append_null(&mut builders[col_idx]);
459                    }
460                }
461
462                // Append _extra column.
463                if let Some(ref mut eb) = extra_builder {
464                    if let Some(ref extra) = extra_fields {
465                        if extra.is_empty() {
466                            eb.append_null();
467                        } else {
468                            let mut enc = jsonb_encoder
469                                .as_mut()
470                                .map_or_else(JsonbEncoder::new, |_| JsonbEncoder::new());
471                            let bytes = enc.encode(&serde_json::Value::Object(extra.clone()));
472                            eb.append_value(&bytes);
473                        }
474                    } else {
475                        eb.append_null();
476                    }
477                }
478            }
479        }
480
481        // Finish all builders into arrays.
482        let mut columns: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
483
484        // Append _extra column if present.
485        let final_schema = if let Some(mut eb) = extra_builder {
486            columns.push(Arc::new(eb.finish()));
487            let mut fields = self.schema.fields().to_vec();
488            fields.push(Arc::new(arrow_schema::Field::new(
489                "_extra",
490                DataType::LargeBinary,
491                true,
492            )));
493            Arc::new(arrow_schema::Schema::new(fields))
494        } else {
495            self.schema.clone()
496        };
497
498        RecordBatch::try_new(final_schema, columns)
499            .map_err(|e| SchemaError::DecodeError(format!("RecordBatch construction: {e}")))
500    }
501
502    #[allow(clippy::unnecessary_literal_bound)]
503    fn format_name(&self) -> &str {
504        "json"
505    }
506}
507
508impl JsonDecoder {
509    /// O(n) field lookup. For schemas with many fields, consider switching
510    /// to a `HashMap`; for typical schemas (<50 fields) linear scan is faster.
511    fn field_index(&self, name: &str) -> Option<usize> {
512        self.field_indices
513            .iter()
514            .find(|(n, _)| n == name)
515            .map(|(_, idx)| *idx)
516    }
517}
518
519/// Navigate a dot-path through a JSON value tree. Returns `None` if any
520/// segment is missing. Zero allocations — pure pointer-chasing.
521fn navigate_path<'a>(
522    root: &'a serde_json::Value,
523    segments: &[String],
524) -> Option<&'a serde_json::Value> {
525    let mut current = root;
526    for segment in segments {
527        current = current.get(segment.as_str())?;
528    }
529    Some(current)
530}
531
532// ── Builder helpers ────────────────────────────────────────────────
533
534/// Trait-object wrapper so we can store heterogeneous builders in a `Vec`.
535trait ColumnBuilder: Send {
536    fn finish(&mut self) -> ArrayRef;
537    fn append_null_value(&mut self);
538    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
539}
540
541macro_rules! impl_column_builder {
542    ($builder:ty, $array:ty) => {
543        impl ColumnBuilder for $builder {
544            fn finish(&mut self) -> ArrayRef {
545                Arc::new(<$builder>::finish(self))
546            }
547            fn append_null_value(&mut self) {
548                self.append_null();
549            }
550            fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
551                self
552            }
553        }
554    };
555}
556
557impl_column_builder!(BooleanBuilder, arrow_array::BooleanArray);
558impl_column_builder!(Int8Builder, arrow_array::Int8Array);
559impl_column_builder!(Int16Builder, arrow_array::Int16Array);
560impl_column_builder!(Int32Builder, arrow_array::Int32Array);
561impl_column_builder!(Int64Builder, arrow_array::Int64Array);
562impl_column_builder!(UInt8Builder, arrow_array::UInt8Array);
563impl_column_builder!(UInt16Builder, arrow_array::UInt16Array);
564impl_column_builder!(UInt32Builder, arrow_array::UInt32Array);
565impl_column_builder!(UInt64Builder, arrow_array::UInt64Array);
566impl_column_builder!(Float32Builder, arrow_array::Float32Array);
567impl_column_builder!(Float64Builder, arrow_array::Float64Array);
568impl_column_builder!(StringBuilder, arrow_array::StringArray);
569impl_column_builder!(LargeStringBuilder, arrow_array::LargeStringArray);
570impl_column_builder!(LargeBinaryBuilder, arrow_array::LargeBinaryArray);
571impl_column_builder!(TimestampSecondBuilder, arrow_array::TimestampSecondArray);
572impl_column_builder!(
573    TimestampMillisecondBuilder,
574    arrow_array::TimestampMillisecondArray
575);
576impl_column_builder!(
577    TimestampMicrosecondBuilder,
578    arrow_array::TimestampMicrosecondArray
579);
580impl_column_builder!(
581    TimestampNanosecondBuilder,
582    arrow_array::TimestampNanosecondArray
583);
584
585fn create_builders(schema: &SchemaRef, capacity: usize) -> Vec<Box<dyn ColumnBuilder>> {
586    schema
587        .fields()
588        .iter()
589        .map(|f| create_builder(f.data_type(), capacity))
590        .collect()
591}
592
593fn create_builder(data_type: &DataType, capacity: usize) -> Box<dyn ColumnBuilder> {
594    match data_type {
595        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
596        DataType::Int8 => Box::new(Int8Builder::with_capacity(capacity)),
597        DataType::Int16 => Box::new(Int16Builder::with_capacity(capacity)),
598        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
599        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
600        DataType::UInt8 => Box::new(UInt8Builder::with_capacity(capacity)),
601        DataType::UInt16 => Box::new(UInt16Builder::with_capacity(capacity)),
602        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
603        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
604        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
605        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
606        DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, capacity * 32)),
607        DataType::LargeBinary => {
608            Box::new(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
609        }
610        DataType::Timestamp(TimeUnit::Second, tz) => {
611            let builder =
612                TimestampSecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
613            Box::new(builder)
614        }
615        DataType::Timestamp(TimeUnit::Millisecond, tz) => {
616            let builder =
617                TimestampMillisecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
618            Box::new(builder)
619        }
620        DataType::Timestamp(TimeUnit::Microsecond, tz) => {
621            let builder =
622                TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
623            Box::new(builder)
624        }
625        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
626            let builder =
627                TimestampNanosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
628            Box::new(builder)
629        }
630        // Fallback: serialize as JSON string.
631        _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)),
632    }
633}
634
635fn append_null(builder: &mut Box<dyn ColumnBuilder>) {
636    builder.append_null_value();
637}
638
639/// Append a JSON value to the appropriate builder column.
640#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
641fn append_value(
642    builder: &mut Box<dyn ColumnBuilder>,
643    target_type: &DataType,
644    value: &serde_json::Value,
645    config: &JsonDecoderConfig,
646    mismatch_count: &AtomicU64,
647    jsonb_encoder: Option<&mut JsonbEncoder>,
648) -> SchemaResult<()> {
649    if value.is_null() {
650        builder.append_null_value();
651        return Ok(());
652    }
653
654    match target_type {
655        DataType::Boolean => {
656            let b = builder
657                .as_any_mut()
658                .downcast_mut::<BooleanBuilder>()
659                .unwrap();
660            match extract_bool(value, config) {
661                Ok(v) => b.append_value(v),
662                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
663            }
664        }
665        DataType::Int8 => {
666            let b = builder.as_any_mut().downcast_mut::<Int8Builder>().unwrap();
667            match extract_i8(value, config) {
668                Ok(v) => b.append_value(v),
669                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
670            }
671        }
672        DataType::Int16 => {
673            let b = builder.as_any_mut().downcast_mut::<Int16Builder>().unwrap();
674            match extract_i16(value, config) {
675                Ok(v) => b.append_value(v),
676                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
677            }
678        }
679        DataType::Int32 => {
680            let b = builder.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
681            match extract_i32(value, config) {
682                Ok(v) => b.append_value(v),
683                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
684            }
685        }
686        DataType::Int64 => {
687            let b = builder.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
688            match extract_i64(value, config) {
689                Ok(v) => b.append_value(v),
690                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
691            }
692        }
693        DataType::UInt8 => {
694            let b = builder.as_any_mut().downcast_mut::<UInt8Builder>().unwrap();
695            match extract_u8(value, config) {
696                Ok(v) => b.append_value(v),
697                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
698            }
699        }
700        DataType::UInt16 => {
701            let b = builder
702                .as_any_mut()
703                .downcast_mut::<UInt16Builder>()
704                .unwrap();
705            match extract_u16(value, config) {
706                Ok(v) => b.append_value(v),
707                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
708            }
709        }
710        DataType::UInt32 => {
711            let b = builder
712                .as_any_mut()
713                .downcast_mut::<UInt32Builder>()
714                .unwrap();
715            match extract_u32(value, config) {
716                Ok(v) => b.append_value(v),
717                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
718            }
719        }
720        DataType::UInt64 => {
721            let b = builder
722                .as_any_mut()
723                .downcast_mut::<UInt64Builder>()
724                .unwrap();
725            match extract_u64(value, config) {
726                Ok(v) => b.append_value(v),
727                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
728            }
729        }
730        DataType::Float32 => {
731            let b = builder
732                .as_any_mut()
733                .downcast_mut::<Float32Builder>()
734                .unwrap();
735            match extract_f32(value, config) {
736                Ok(v) => b.append_value(v),
737                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
738            }
739        }
740        DataType::Float64 => {
741            let b = builder
742                .as_any_mut()
743                .downcast_mut::<Float64Builder>()
744                .unwrap();
745            match extract_f64(value, config) {
746                Ok(v) => b.append_value(v),
747                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
748            }
749        }
750        DataType::LargeUtf8 => {
751            let b = builder
752                .as_any_mut()
753                .downcast_mut::<LargeStringBuilder>()
754                .unwrap();
755            let s = value_to_string(value);
756            b.append_value(&s);
757        }
758        DataType::LargeBinary => {
759            let b = builder
760                .as_any_mut()
761                .downcast_mut::<LargeBinaryBuilder>()
762                .unwrap();
763            if let Some(enc) = jsonb_encoder {
764                let bytes = enc.encode(value);
765                b.append_value(&bytes);
766            } else {
767                // Fallback: serialize as JSON bytes.
768                let bytes = serde_json::to_vec(value).unwrap_or_default();
769                b.append_value(&bytes);
770            }
771        }
772        DataType::Timestamp(unit, _) => match extract_timestamp(value, config, *unit) {
773            Ok(ts) => append_timestamp(builder, *unit, ts),
774            Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
775        },
776        // Unsupported types: serialize as JSON string.
777        _ => {
778            let b = builder
779                .as_any_mut()
780                .downcast_mut::<StringBuilder>()
781                .unwrap();
782            let s = value_to_string(value);
783            b.append_value(&s);
784        }
785    }
786
787    Ok(())
788}
789
790fn handle_mismatch(
791    builder: &mut Box<dyn ColumnBuilder>,
792    config: &JsonDecoderConfig,
793    mismatch_count: &AtomicU64,
794    error_msg: &str,
795) -> SchemaResult<()> {
796    match config.type_mismatch {
797        TypeMismatchStrategy::Null => {
798            mismatch_count.fetch_add(1, Ordering::Relaxed);
799            builder.append_null_value();
800            Ok(())
801        }
802        TypeMismatchStrategy::Coerce => {
803            // Coercion already failed in the extractor — this is a real error.
804            Err(SchemaError::DecodeError(format!(
805                "type coercion failed: {error_msg}"
806            )))
807        }
808        TypeMismatchStrategy::Reject => Err(SchemaError::DecodeError(format!(
809            "type mismatch: {error_msg}"
810        ))),
811    }
812}
813
814// ── Value extractors ───────────────────────────────────────────────
815
816fn extract_bool(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<bool, String> {
817    if let Some(b) = value.as_bool() {
818        return Ok(b);
819    }
820    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
821        if let Some(s) = value.as_str() {
822            match s.to_ascii_lowercase().as_str() {
823                "true" | "1" | "yes" => return Ok(true),
824                "false" | "0" | "no" => return Ok(false),
825                _ => {}
826            }
827        }
828        if let Some(n) = value.as_i64() {
829            return Ok(n != 0);
830        }
831    }
832    Err(format!("expected boolean, got {}", json_type_name(value)))
833}
834
835fn extract_i8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i8, String> {
836    if let Some(n) = value.as_i64() {
837        if let Ok(v) = i8::try_from(n) {
838            return Ok(v);
839        }
840        return Err(format!("integer {n} out of i8 range"));
841    }
842    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
843        if let Some(s) = value.as_str() {
844            if let Ok(v) = s.parse::<i8>() {
845                return Ok(v);
846            }
847        }
848        if let Some(f) = value.as_f64() {
849            #[allow(clippy::cast_possible_truncation)]
850            let v = f as i8;
851            return Ok(v);
852        }
853    }
854    Err(format!("expected i8, got {}", json_type_name(value)))
855}
856
857fn extract_i16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i16, String> {
858    if let Some(n) = value.as_i64() {
859        if let Ok(v) = i16::try_from(n) {
860            return Ok(v);
861        }
862        return Err(format!("integer {n} out of i16 range"));
863    }
864    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
865        if let Some(s) = value.as_str() {
866            if let Ok(v) = s.parse::<i16>() {
867                return Ok(v);
868            }
869        }
870        if let Some(f) = value.as_f64() {
871            #[allow(clippy::cast_possible_truncation)]
872            let v = f as i16;
873            return Ok(v);
874        }
875    }
876    Err(format!("expected i16, got {}", json_type_name(value)))
877}
878
879fn extract_i32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i32, String> {
880    if let Some(n) = value.as_i64() {
881        if let Ok(v) = i32::try_from(n) {
882            return Ok(v);
883        }
884        return Err(format!("integer {n} out of i32 range"));
885    }
886    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
887        if let Some(s) = value.as_str() {
888            if let Ok(v) = s.parse::<i32>() {
889                return Ok(v);
890            }
891        }
892        if let Some(f) = value.as_f64() {
893            #[allow(clippy::cast_possible_truncation)]
894            return Ok(f as i32);
895        }
896    }
897    Err(format!("expected i32, got {}", json_type_name(value)))
898}
899
900fn extract_i64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i64, String> {
901    if let Some(n) = value.as_i64() {
902        return Ok(n);
903    }
904    if let Some(n) = value.as_u64() {
905        if let Ok(v) = i64::try_from(n) {
906            return Ok(v);
907        }
908        return Err(format!("u64 {n} out of i64 range"));
909    }
910    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
911        if let Some(s) = value.as_str() {
912            if let Ok(v) = s.parse::<i64>() {
913                return Ok(v);
914            }
915        }
916        if let Some(f) = value.as_f64() {
917            #[allow(clippy::cast_possible_truncation)]
918            return Ok(f as i64);
919        }
920    }
921    Err(format!("expected i64, got {}", json_type_name(value)))
922}
923
924fn extract_f32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f32, String> {
925    if let Some(f) = value.as_f64() {
926        #[allow(clippy::cast_possible_truncation)]
927        return Ok(f as f32);
928    }
929    if let Some(n) = value.as_i64() {
930        #[allow(clippy::cast_precision_loss)]
931        return Ok(n as f32);
932    }
933    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
934        if let Some(s) = value.as_str() {
935            if let Ok(v) = s.parse::<f32>() {
936                return Ok(v);
937            }
938        }
939    }
940    Err(format!("expected f32, got {}", json_type_name(value)))
941}
942
943fn extract_f64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f64, String> {
944    if let Some(f) = value.as_f64() {
945        return Ok(f);
946    }
947    if let Some(n) = value.as_i64() {
948        #[allow(clippy::cast_precision_loss)]
949        return Ok(n as f64);
950    }
951    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
952        if let Some(s) = value.as_str() {
953            if let Ok(v) = s.parse::<f64>() {
954                return Ok(v);
955            }
956        }
957    }
958    Err(format!("expected f64, got {}", json_type_name(value)))
959}
960
961fn extract_u8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u8, String> {
962    if let Some(n) = value.as_u64() {
963        if let Ok(v) = u8::try_from(n) {
964            return Ok(v);
965        }
966        return Err(format!("integer {n} out of u8 range"));
967    }
968    if let Some(n) = value.as_i64() {
969        if let Ok(v) = u8::try_from(n) {
970            return Ok(v);
971        }
972        return Err(format!("integer {n} out of u8 range"));
973    }
974    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
975        if let Some(s) = value.as_str() {
976            if let Ok(v) = s.parse::<u8>() {
977                return Ok(v);
978            }
979        }
980    }
981    Err(format!("expected u8, got {}", json_type_name(value)))
982}
983
984fn extract_u16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u16, String> {
985    if let Some(n) = value.as_u64() {
986        if let Ok(v) = u16::try_from(n) {
987            return Ok(v);
988        }
989        return Err(format!("integer {n} out of u16 range"));
990    }
991    if let Some(n) = value.as_i64() {
992        if let Ok(v) = u16::try_from(n) {
993            return Ok(v);
994        }
995        return Err(format!("integer {n} out of u16 range"));
996    }
997    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
998        if let Some(s) = value.as_str() {
999            if let Ok(v) = s.parse::<u16>() {
1000                return Ok(v);
1001            }
1002        }
1003    }
1004    Err(format!("expected u16, got {}", json_type_name(value)))
1005}
1006
1007fn extract_u32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u32, String> {
1008    if let Some(n) = value.as_u64() {
1009        if let Ok(v) = u32::try_from(n) {
1010            return Ok(v);
1011        }
1012        return Err(format!("integer {n} out of u32 range"));
1013    }
1014    if let Some(n) = value.as_i64() {
1015        if let Ok(v) = u32::try_from(n) {
1016            return Ok(v);
1017        }
1018        return Err(format!("integer {n} out of u32 range"));
1019    }
1020    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1021        if let Some(s) = value.as_str() {
1022            if let Ok(v) = s.parse::<u32>() {
1023                return Ok(v);
1024            }
1025        }
1026    }
1027    Err(format!("expected u32, got {}", json_type_name(value)))
1028}
1029
1030fn extract_u64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u64, String> {
1031    if let Some(n) = value.as_u64() {
1032        return Ok(n);
1033    }
1034    if let Some(n) = value.as_i64() {
1035        if let Ok(v) = u64::try_from(n) {
1036            return Ok(v);
1037        }
1038        return Err(format!("integer {n} out of u64 range"));
1039    }
1040    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1041        if let Some(s) = value.as_str() {
1042            if let Ok(v) = s.parse::<u64>() {
1043                return Ok(v);
1044            }
1045        }
1046    }
1047    Err(format!("expected u64, got {}", json_type_name(value)))
1048}
1049
1050/// Extracts a timestamp value as an i64 in the specified [`TimeUnit`].
1051///
1052/// For numeric JSON values, treats them as epoch milliseconds and converts.
1053/// For string values, tries the configured timestamp format patterns.
1054fn extract_timestamp(
1055    value: &serde_json::Value,
1056    config: &JsonDecoderConfig,
1057    unit: TimeUnit,
1058) -> Result<i64, String> {
1059    // Numeric values: treat as epoch milliseconds.
1060    if let Some(n) = value.as_i64() {
1061        return Ok(millis_to_unit(n, unit));
1062    }
1063    if let Some(f) = value.as_f64() {
1064        #[allow(clippy::cast_possible_truncation)]
1065        let ms = f as i64;
1066        return Ok(millis_to_unit(ms, unit));
1067    }
1068
1069    // String values: try configured timestamp formats.
1070    if let Some(s) = value.as_str() {
1071        for fmt in &config.timestamp_formats {
1072            if fmt == "iso8601" {
1073                if let Ok(nanos) = arrow_cast::parse::string_to_timestamp_nanos(s) {
1074                    return Ok(nanos_to_unit(nanos, unit));
1075                }
1076                continue;
1077            }
1078            if let Ok(ndt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
1079                let nanos = ndt.and_utc().timestamp_nanos_opt().unwrap_or(0);
1080                return Ok(nanos_to_unit(nanos, unit));
1081            }
1082        }
1083        return Err(format!("cannot parse timestamp from string: {s}"));
1084    }
1085
1086    Err(format!("expected timestamp, got {}", json_type_name(value)))
1087}
1088
1089/// Converts epoch milliseconds to the target time unit.
1090fn millis_to_unit(ms: i64, unit: TimeUnit) -> i64 {
1091    match unit {
1092        TimeUnit::Second => ms / 1_000,
1093        TimeUnit::Millisecond => ms,
1094        TimeUnit::Microsecond => ms * 1_000,
1095        TimeUnit::Nanosecond => ms * 1_000_000,
1096    }
1097}
1098
1099/// Converts nanoseconds to the target time unit.
1100fn nanos_to_unit(nanos: i64, unit: TimeUnit) -> i64 {
1101    match unit {
1102        TimeUnit::Second => nanos / 1_000_000_000,
1103        TimeUnit::Millisecond => nanos / 1_000_000,
1104        TimeUnit::Microsecond => nanos / 1_000,
1105        TimeUnit::Nanosecond => nanos,
1106    }
1107}
1108
1109/// Appends a timestamp value to the appropriate builder based on [`TimeUnit`].
1110fn append_timestamp(builder: &mut Box<dyn ColumnBuilder>, unit: TimeUnit, value: i64) {
1111    match unit {
1112        TimeUnit::Second => {
1113            builder
1114                .as_any_mut()
1115                .downcast_mut::<TimestampSecondBuilder>()
1116                .unwrap()
1117                .append_value(value);
1118        }
1119        TimeUnit::Millisecond => {
1120            builder
1121                .as_any_mut()
1122                .downcast_mut::<TimestampMillisecondBuilder>()
1123                .unwrap()
1124                .append_value(value);
1125        }
1126        TimeUnit::Microsecond => {
1127            builder
1128                .as_any_mut()
1129                .downcast_mut::<TimestampMicrosecondBuilder>()
1130                .unwrap()
1131                .append_value(value);
1132        }
1133        TimeUnit::Nanosecond => {
1134            builder
1135                .as_any_mut()
1136                .downcast_mut::<TimestampNanosecondBuilder>()
1137                .unwrap()
1138                .append_value(value);
1139        }
1140    }
1141}
1142
1143fn value_to_string(value: &serde_json::Value) -> String {
1144    match value {
1145        serde_json::Value::String(s) => s.clone(),
1146        other => other.to_string(),
1147    }
1148}
1149
1150fn json_type_name(value: &serde_json::Value) -> &'static str {
1151    match value {
1152        serde_json::Value::Null => "null",
1153        serde_json::Value::Bool(_) => "boolean",
1154        serde_json::Value::Number(_) => "number",
1155        serde_json::Value::String(_) => "string",
1156        serde_json::Value::Array(_) => "array",
1157        serde_json::Value::Object(_) => "object",
1158    }
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163    use super::*;
1164    use arrow_array::cast::AsArray;
1165    use arrow_schema::{Field, Schema};
1166
1167    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
1168        Arc::new(Schema::new(
1169            fields
1170                .into_iter()
1171                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
1172                .collect::<Vec<_>>(),
1173        ))
1174    }
1175
1176    fn json_record(json: &str) -> RawRecord {
1177        RawRecord::new(json.as_bytes().to_vec())
1178    }
1179
1180    // ── Basic decode tests ────────────────────────────────────
1181
1182    #[test]
1183    fn test_decode_empty_batch() {
1184        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1185        let decoder = JsonDecoder::new(schema.clone());
1186        let batch = decoder.decode_batch(&[]).unwrap();
1187        assert_eq!(batch.num_rows(), 0);
1188        assert_eq!(batch.schema(), schema);
1189    }
1190
1191    #[test]
1192    fn test_decode_single_record() {
1193        let schema = make_schema(vec![
1194            ("id", DataType::Int64, false),
1195            ("name", DataType::Utf8, true),
1196        ]);
1197        let decoder = JsonDecoder::new(schema);
1198        let records = vec![json_record(r#"{"id": 42, "name": "Alice"}"#)];
1199        let batch = decoder.decode_batch(&records).unwrap();
1200
1201        assert_eq!(batch.num_rows(), 1);
1202        assert_eq!(
1203            batch
1204                .column(0)
1205                .as_primitive::<arrow_array::types::Int64Type>()
1206                .value(0),
1207            42
1208        );
1209        assert_eq!(batch.column(1).as_string::<i32>().value(0), "Alice");
1210    }
1211
1212    #[test]
1213    fn test_decode_multiple_records() {
1214        let schema = make_schema(vec![
1215            ("x", DataType::Int64, false),
1216            ("y", DataType::Float64, false),
1217        ]);
1218        let decoder = JsonDecoder::new(schema);
1219        let records = vec![
1220            json_record(r#"{"x": 1, "y": 1.5}"#),
1221            json_record(r#"{"x": 2, "y": 2.5}"#),
1222            json_record(r#"{"x": 3, "y": 3.5}"#),
1223        ];
1224        let batch = decoder.decode_batch(&records).unwrap();
1225
1226        assert_eq!(batch.num_rows(), 3);
1227        let x_col = batch
1228            .column(0)
1229            .as_primitive::<arrow_array::types::Int64Type>();
1230        assert_eq!(x_col.value(0), 1);
1231        assert_eq!(x_col.value(1), 2);
1232        assert_eq!(x_col.value(2), 3);
1233    }
1234
1235    #[test]
1236    fn test_decode_all_types() {
1237        let schema = make_schema(vec![
1238            ("bool_col", DataType::Boolean, false),
1239            ("int_col", DataType::Int64, false),
1240            ("float_col", DataType::Float64, false),
1241            ("str_col", DataType::Utf8, false),
1242        ]);
1243        let decoder = JsonDecoder::new(schema);
1244        let records = vec![json_record(
1245            r#"{"bool_col": true, "int_col": 42, "float_col": 3.14, "str_col": "hello"}"#,
1246        )];
1247        let batch = decoder.decode_batch(&records).unwrap();
1248
1249        assert_eq!(batch.num_rows(), 1);
1250        assert!(batch.column(0).as_boolean().value(0));
1251        assert_eq!(
1252            batch
1253                .column(1)
1254                .as_primitive::<arrow_array::types::Int64Type>()
1255                .value(0),
1256            42
1257        );
1258        let f = batch
1259            .column(2)
1260            .as_primitive::<arrow_array::types::Float64Type>()
1261            .value(0);
1262        assert!((f - 3.14).abs() < f64::EPSILON);
1263        assert_eq!(batch.column(3).as_string::<i32>().value(0), "hello");
1264    }
1265
1266    // ── Null handling ─────────────────────────────────────────
1267
1268    #[test]
1269    fn test_decode_null_values() {
1270        let schema = make_schema(vec![
1271            ("a", DataType::Int64, true),
1272            ("b", DataType::Utf8, true),
1273        ]);
1274        let decoder = JsonDecoder::new(schema);
1275        let records = vec![json_record(r#"{"a": null, "b": null}"#)];
1276        let batch = decoder.decode_batch(&records).unwrap();
1277
1278        assert!(batch.column(0).is_null(0));
1279        assert!(batch.column(1).is_null(0));
1280    }
1281
1282    #[test]
1283    fn test_decode_missing_field_becomes_null() {
1284        let schema = make_schema(vec![
1285            ("a", DataType::Int64, true),
1286            ("b", DataType::Utf8, true),
1287        ]);
1288        let decoder = JsonDecoder::new(schema);
1289        let records = vec![json_record(r#"{"a": 1}"#)]; // "b" missing
1290        let batch = decoder.decode_batch(&records).unwrap();
1291
1292        assert_eq!(
1293            batch
1294                .column(0)
1295                .as_primitive::<arrow_array::types::Int64Type>()
1296                .value(0),
1297            1
1298        );
1299        assert!(batch.column(1).is_null(0));
1300    }
1301
1302    // ── Type mismatch strategies ──────────────────────────────
1303
1304    #[test]
1305    fn test_mismatch_null_strategy() {
1306        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1307        let config = JsonDecoderConfig {
1308            type_mismatch: TypeMismatchStrategy::Null,
1309            ..Default::default()
1310        };
1311        let decoder = JsonDecoder::with_config(schema, config);
1312        let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1313        let batch = decoder.decode_batch(&records).unwrap();
1314
1315        assert!(batch.column(0).is_null(0));
1316        assert_eq!(decoder.mismatch_count(), 1);
1317    }
1318
1319    #[test]
1320    fn test_mismatch_coerce_strategy() {
1321        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1322        let config = JsonDecoderConfig {
1323            type_mismatch: TypeMismatchStrategy::Coerce,
1324            ..Default::default()
1325        };
1326        let decoder = JsonDecoder::with_config(schema, config);
1327        let records = vec![json_record(r#"{"x": "123"}"#)];
1328        let batch = decoder.decode_batch(&records).unwrap();
1329
1330        assert_eq!(
1331            batch
1332                .column(0)
1333                .as_primitive::<arrow_array::types::Int64Type>()
1334                .value(0),
1335            123
1336        );
1337    }
1338
1339    #[test]
1340    fn test_mismatch_reject_strategy() {
1341        let schema = make_schema(vec![("x", DataType::Int64, false)]);
1342        let config = JsonDecoderConfig {
1343            type_mismatch: TypeMismatchStrategy::Reject,
1344            ..Default::default()
1345        };
1346        let decoder = JsonDecoder::with_config(schema, config);
1347        let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1348        let result = decoder.decode_batch(&records);
1349
1350        assert!(result.is_err());
1351        assert!(result.unwrap_err().to_string().contains("type mismatch"));
1352    }
1353
1354    // ── Unknown field strategies ──────────────────────────────
1355
1356    #[test]
1357    fn test_unknown_fields_ignore() {
1358        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1359        let decoder = JsonDecoder::new(schema);
1360        let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1361        let batch = decoder.decode_batch(&records).unwrap();
1362
1363        assert_eq!(batch.num_columns(), 1);
1364        assert_eq!(batch.num_rows(), 1);
1365    }
1366
1367    #[test]
1368    fn test_unknown_fields_reject() {
1369        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1370        let config = JsonDecoderConfig {
1371            unknown_fields: UnknownFieldStrategy::Reject,
1372            ..Default::default()
1373        };
1374        let decoder = JsonDecoder::with_config(schema, config);
1375        let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1376        let result = decoder.decode_batch(&records);
1377
1378        assert!(result.is_err());
1379        assert!(result.unwrap_err().to_string().contains("unknown field"));
1380    }
1381
1382    #[test]
1383    fn test_unknown_fields_collect_extra() {
1384        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1385        let config = JsonDecoderConfig {
1386            unknown_fields: UnknownFieldStrategy::CollectExtra,
1387            ..Default::default()
1388        };
1389        let decoder = JsonDecoder::with_config(schema, config);
1390        let records = vec![json_record(r#"{"a": 1, "extra1": "v1", "extra2": 42}"#)];
1391        let batch = decoder.decode_batch(&records).unwrap();
1392
1393        // Schema should have an extra `_extra` column.
1394        assert_eq!(batch.num_columns(), 2);
1395        assert_eq!(batch.schema().field(1).name(), "_extra");
1396        assert!(!batch.column(1).is_null(0));
1397    }
1398
1399    // ── Timestamp parsing ─────────────────────────────────────
1400
1401    #[test]
1402    fn test_decode_timestamp_iso8601() {
1403        let schema = make_schema(vec![(
1404            "ts",
1405            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1406            false,
1407        )]);
1408        let decoder = JsonDecoder::new(schema);
1409        let records = vec![json_record(r#"{"ts": "2025-01-15T10:30:00Z"}"#)];
1410        let batch = decoder.decode_batch(&records).unwrap();
1411
1412        assert!(!batch.column(0).is_null(0));
1413    }
1414
1415    #[test]
1416    fn test_decode_timestamp_epoch_millis() {
1417        let schema = make_schema(vec![(
1418            "ts",
1419            DataType::Timestamp(TimeUnit::Nanosecond, None),
1420            false,
1421        )]);
1422        let decoder = JsonDecoder::new(schema);
1423        let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1424        let batch = decoder.decode_batch(&records).unwrap();
1425
1426        let ts_col = batch
1427            .column(0)
1428            .as_primitive::<arrow_array::types::TimestampNanosecondType>();
1429        // 1705312200000 ms * 1_000_000 = nanos
1430        assert_eq!(ts_col.value(0), 1_705_312_200_000_000_000);
1431    }
1432
1433    // ── Nested objects as LargeBinary ─────────────────────────
1434
1435    #[test]
1436    fn test_decode_nested_object_as_json_string() {
1437        let schema = make_schema(vec![("data", DataType::LargeBinary, true)]);
1438        let decoder = JsonDecoder::new(schema);
1439        let records = vec![json_record(r#"{"data": {"nested": true}}"#)];
1440        let batch = decoder.decode_batch(&records).unwrap();
1441
1442        assert!(!batch.column(0).is_null(0));
1443    }
1444
1445    // ── Error cases ───────────────────────────────────────────
1446
1447    #[test]
1448    fn test_decode_invalid_json() {
1449        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1450        let decoder = JsonDecoder::new(schema);
1451        let records = vec![RawRecord::new(b"not json".to_vec())];
1452        let result = decoder.decode_batch(&records);
1453
1454        assert!(result.is_err());
1455        assert!(result.unwrap_err().to_string().contains("JSON parse error"));
1456    }
1457
1458    #[test]
1459    fn test_decode_non_object_json() {
1460        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1461        let decoder = JsonDecoder::new(schema);
1462        let records = vec![json_record("[1, 2, 3]")];
1463        let result = decoder.decode_batch(&records);
1464
1465        assert!(result.is_err());
1466        assert!(result
1467            .unwrap_err()
1468            .to_string()
1469            .contains("must be an object"));
1470    }
1471
1472    // ── FormatDecoder trait ───────────────────────────────────
1473
1474    #[test]
1475    fn test_format_name() {
1476        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1477        let decoder = JsonDecoder::new(schema);
1478        assert_eq!(decoder.format_name(), "json");
1479    }
1480
1481    #[test]
1482    fn test_output_schema() {
1483        let schema = make_schema(vec![
1484            ("a", DataType::Int64, false),
1485            ("b", DataType::Utf8, true),
1486        ]);
1487        let decoder = JsonDecoder::new(schema.clone());
1488        assert_eq!(decoder.output_schema(), schema);
1489    }
1490
1491    #[test]
1492    fn test_decode_one() {
1493        let schema = make_schema(vec![("x", DataType::Int64, false)]);
1494        let decoder = JsonDecoder::new(schema);
1495        let record = json_record(r#"{"x": 99}"#);
1496        let batch = decoder.decode_one(&record).unwrap();
1497        assert_eq!(batch.num_rows(), 1);
1498        assert_eq!(
1499            batch
1500                .column(0)
1501                .as_primitive::<arrow_array::types::Int64Type>()
1502                .value(0),
1503            99
1504        );
1505    }
1506
1507    // ── Int/Float numeric coercion ────────────────────────────
1508
1509    #[test]
1510    fn test_decode_int_from_float_json() {
1511        // JSON number 42.0 is parsed as f64 by serde_json. With the default
1512        // Coerce strategy, it is coerced to Int64 = 42.
1513        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1514        let decoder = JsonDecoder::new(schema);
1515        let records = vec![json_record(r#"{"x": 42.0}"#)];
1516        let batch = decoder.decode_batch(&records).unwrap();
1517        assert_eq!(
1518            batch
1519                .column(0)
1520                .as_primitive::<arrow_array::types::Int64Type>()
1521                .value(0),
1522            42
1523        );
1524    }
1525
1526    #[test]
1527    fn test_decode_float_from_int_json() {
1528        // JSON integer 42 should decode as Float64 = 42.0.
1529        let schema = make_schema(vec![("x", DataType::Float64, false)]);
1530        let decoder = JsonDecoder::new(schema);
1531        let records = vec![json_record(r#"{"x": 42}"#)];
1532        let batch = decoder.decode_batch(&records).unwrap();
1533        let val = batch
1534            .column(0)
1535            .as_primitive::<arrow_array::types::Float64Type>()
1536            .value(0);
1537        assert!((val - 42.0).abs() < f64::EPSILON);
1538    }
1539
1540    // ── Coercion tests (string→numeric, int→float, etc.) ────
1541
1542    #[test]
1543    fn test_decode_string_number_to_float64() {
1544        let schema = make_schema(vec![("price", DataType::Float64, false)]);
1545        let decoder = JsonDecoder::new(schema);
1546        let records = vec![json_record(r#"{"price": "187.52"}"#)];
1547        let batch = decoder.decode_batch(&records).unwrap();
1548        let val = batch
1549            .column(0)
1550            .as_primitive::<arrow_array::types::Float64Type>()
1551            .value(0);
1552        assert!((val - 187.52).abs() < f64::EPSILON);
1553    }
1554
1555    #[test]
1556    fn test_decode_string_to_int() {
1557        let schema = make_schema(vec![("qty", DataType::Int32, false)]);
1558        let decoder = JsonDecoder::new(schema);
1559        let records = vec![json_record(r#"{"qty": "100"}"#)];
1560        let batch = decoder.decode_batch(&records).unwrap();
1561        assert_eq!(
1562            batch
1563                .column(0)
1564                .as_primitive::<arrow_array::types::Int32Type>()
1565                .value(0),
1566            100
1567        );
1568    }
1569
1570    #[test]
1571    fn test_decode_epoch_millis_to_timestamp_millis() {
1572        let schema = make_schema(vec![(
1573            "ts",
1574            DataType::Timestamp(TimeUnit::Millisecond, None),
1575            false,
1576        )]);
1577        let decoder = JsonDecoder::new(schema);
1578        let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1579        let batch = decoder.decode_batch(&records).unwrap();
1580        let ts_col = batch
1581            .column(0)
1582            .as_primitive::<arrow_array::types::TimestampMillisecondType>();
1583        assert_eq!(ts_col.value(0), 1_705_312_200_000);
1584    }
1585
1586    #[test]
1587    fn test_decode_int_to_float_promotion() {
1588        let schema = make_schema(vec![("val", DataType::Float64, false)]);
1589        let decoder = JsonDecoder::new(schema);
1590        let records = vec![json_record(r#"{"val": 100}"#)];
1591        let batch = decoder.decode_batch(&records).unwrap();
1592        let val = batch
1593            .column(0)
1594            .as_primitive::<arrow_array::types::Float64Type>()
1595            .value(0);
1596        assert!((val - 100.0).abs() < f64::EPSILON);
1597    }
1598
1599    #[test]
1600    fn test_decode_string_boolean() {
1601        let schema = make_schema(vec![("active", DataType::Boolean, false)]);
1602        let decoder = JsonDecoder::new(schema);
1603        let records = vec![json_record(r#"{"active": "true"}"#)];
1604        let batch = decoder.decode_batch(&records).unwrap();
1605        assert!(batch.column(0).as_boolean().value(0));
1606    }
1607
1608    #[test]
1609    fn test_coerce_fails_on_unconvertible() {
1610        // With default Coerce, a string that can't be parsed as Int64 should error.
1611        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1612        let decoder = JsonDecoder::new(schema);
1613        let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1614        let result = decoder.decode_batch(&records);
1615        assert!(result.is_err());
1616        assert!(result
1617            .unwrap_err()
1618            .to_string()
1619            .contains("type coercion failed"));
1620    }
1621
1622    #[test]
1623    fn test_enforcement_str_parsing() {
1624        assert_eq!(
1625            TypeMismatchStrategy::from_enforcement_str("coerce"),
1626            Some(TypeMismatchStrategy::Coerce)
1627        );
1628        assert_eq!(
1629            TypeMismatchStrategy::from_enforcement_str("STRICT"),
1630            Some(TypeMismatchStrategy::Reject)
1631        );
1632        assert_eq!(
1633            TypeMismatchStrategy::from_enforcement_str("Permissive"),
1634            Some(TypeMismatchStrategy::Null)
1635        );
1636        assert_eq!(TypeMismatchStrategy::from_enforcement_str("unknown"), None);
1637    }
1638
1639    // ── Small integer types ──────────────────────────────────
1640
1641    #[test]
1642    fn test_decode_i8_and_u8() {
1643        let schema = make_schema(vec![
1644            ("signed", DataType::Int8, false),
1645            ("unsigned", DataType::UInt8, false),
1646        ]);
1647        let decoder = JsonDecoder::new(schema);
1648        let records = vec![json_record(r#"{"signed": -5, "unsigned": 200}"#)];
1649        let batch = decoder.decode_batch(&records).unwrap();
1650        assert_eq!(
1651            batch
1652                .column(0)
1653                .as_primitive::<arrow_array::types::Int8Type>()
1654                .value(0),
1655            -5
1656        );
1657        assert_eq!(
1658            batch
1659                .column(1)
1660                .as_primitive::<arrow_array::types::UInt8Type>()
1661                .value(0),
1662            200
1663        );
1664    }
1665
1666    // ── json.path tests ─────────────────────────────────────────
1667
1668    #[test]
1669    fn test_json_path_single() {
1670        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1671        let config = JsonDecoderConfig {
1672            json_path: Some(vec!["data".into()]),
1673            ..Default::default()
1674        };
1675        let decoder = JsonDecoder::with_config(schema, config);
1676        let records = vec![json_record(r#"{"data":{"id":1}}"#)];
1677        let batch = decoder.decode_batch(&records).unwrap();
1678        assert_eq!(batch.num_rows(), 1);
1679        assert_eq!(
1680            batch
1681                .column(0)
1682                .as_primitive::<arrow_array::types::Int64Type>()
1683                .value(0),
1684            1
1685        );
1686    }
1687
1688    #[test]
1689    fn test_json_path_multi() {
1690        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1691        let config = JsonDecoderConfig {
1692            json_path: Some(vec!["a".into(), "b".into()]),
1693            ..Default::default()
1694        };
1695        let decoder = JsonDecoder::with_config(schema, config);
1696        let records = vec![json_record(r#"{"a":{"b":{"id":1}}}"#)];
1697        let batch = decoder.decode_batch(&records).unwrap();
1698        assert_eq!(
1699            batch
1700                .column(0)
1701                .as_primitive::<arrow_array::types::Int64Type>()
1702                .value(0),
1703            1
1704        );
1705    }
1706
1707    #[test]
1708    fn test_json_path_missing_errors() {
1709        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1710        let config = JsonDecoderConfig {
1711            json_path: Some(vec!["nonexistent".into()]),
1712            ..Default::default()
1713        };
1714        let decoder = JsonDecoder::with_config(schema, config);
1715        let records = vec![json_record(r#"{"data":{"id":1}}"#)];
1716        let result = decoder.decode_batch(&records);
1717        assert!(result.is_err());
1718        assert!(result.unwrap_err().to_string().contains("not found"));
1719    }
1720
1721    // ── json.column.* tests ─────────────────────────────────────
1722
1723    #[test]
1724    fn test_json_column_custom_path() {
1725        let schema = make_schema(vec![
1726            ("p", DataType::Float64, false),
1727            ("stream_name", DataType::Utf8, true),
1728        ]);
1729        let mut col_paths = HashMap::new();
1730        col_paths.insert("stream_name".into(), vec!["stream".into()]);
1731        let config = JsonDecoderConfig {
1732            json_path: Some(vec!["data".into()]),
1733            json_column_paths: col_paths,
1734            ..Default::default()
1735        };
1736        let decoder = JsonDecoder::with_config(schema, config);
1737        let records = vec![json_record(
1738            r#"{"stream":"btcusdt@trade","data":{"p":67523.0}}"#,
1739        )];
1740        let batch = decoder.decode_batch(&records).unwrap();
1741        assert_eq!(batch.num_rows(), 1);
1742        let price = batch
1743            .column(0)
1744            .as_primitive::<arrow_array::types::Float64Type>()
1745            .value(0);
1746        assert!((price - 67523.0).abs() < f64::EPSILON);
1747        assert_eq!(batch.column(1).as_string::<i32>().value(0), "btcusdt@trade");
1748    }
1749
1750    #[test]
1751    fn test_json_column_deep_path() {
1752        let schema = make_schema(vec![
1753            ("p", DataType::Float64, false),
1754            ("ts", DataType::Int64, true),
1755        ]);
1756        let mut col_paths = HashMap::new();
1757        col_paths.insert("ts".into(), vec!["meta".into(), "timestamp".into()]);
1758        let config = JsonDecoderConfig {
1759            json_path: Some(vec!["data".into()]),
1760            json_column_paths: col_paths,
1761            ..Default::default()
1762        };
1763        let decoder = JsonDecoder::with_config(schema, config);
1764        let records = vec![json_record(
1765            r#"{"meta":{"timestamp":123456},"data":{"p":99.5}}"#,
1766        )];
1767        let batch = decoder.decode_batch(&records).unwrap();
1768        assert_eq!(batch.num_rows(), 1);
1769        assert_eq!(
1770            batch
1771                .column(1)
1772                .as_primitive::<arrow_array::types::Int64Type>()
1773                .value(0),
1774            123456
1775        );
1776    }
1777
1778    #[test]
1779    fn test_json_column_missing_returns_null() {
1780        let schema = make_schema(vec![
1781            ("id", DataType::Int64, false),
1782            ("missing_col", DataType::Utf8, true),
1783        ]);
1784        let mut col_paths = HashMap::new();
1785        col_paths.insert("missing_col".into(), vec!["nowhere".into(), "gone".into()]);
1786        let config = JsonDecoderConfig {
1787            json_column_paths: col_paths,
1788            ..Default::default()
1789        };
1790        let decoder = JsonDecoder::with_config(schema, config);
1791        let records = vec![json_record(r#"{"id":42}"#)];
1792        let batch = decoder.decode_batch(&records).unwrap();
1793        assert_eq!(
1794            batch
1795                .column(0)
1796                .as_primitive::<arrow_array::types::Int64Type>()
1797                .value(0),
1798            42
1799        );
1800        assert!(batch.column(1).is_null(0));
1801    }
1802
1803    // ── json.explode tests ──────────────────────────────────────
1804
1805    #[test]
1806    fn test_json_explode_arrays() {
1807        let schema = make_schema(vec![
1808            ("price", DataType::Utf8, true),
1809            ("qty", DataType::Utf8, true),
1810        ]);
1811        let config = JsonDecoderConfig {
1812            json_explode: Some(vec!["price".into(), "qty".into()]),
1813            ..Default::default()
1814        };
1815        let decoder = JsonDecoder::with_config(schema, config);
1816        let records = vec![json_record(r#"[["67523","1.5"],["67522","0.8"]]"#)];
1817        let batch = decoder.decode_batch(&records).unwrap();
1818        assert_eq!(batch.num_rows(), 2);
1819        assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
1820        assert_eq!(batch.column(0).as_string::<i32>().value(1), "67522");
1821        assert_eq!(batch.column(1).as_string::<i32>().value(0), "1.5");
1822        assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
1823    }
1824
1825    #[test]
1826    fn test_json_explode_objects() {
1827        let schema = make_schema(vec![
1828            ("id", DataType::Int64, true),
1829            ("name", DataType::Utf8, true),
1830        ]);
1831        let config = JsonDecoderConfig {
1832            json_explode: Some(vec!["id".into(), "name".into()]),
1833            ..Default::default()
1834        };
1835        let decoder = JsonDecoder::with_config(schema, config);
1836        let records = vec![json_record(
1837            r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#,
1838        )];
1839        let batch = decoder.decode_batch(&records).unwrap();
1840        assert_eq!(batch.num_rows(), 2);
1841        assert_eq!(
1842            batch
1843                .column(0)
1844                .as_primitive::<arrow_array::types::Int64Type>()
1845                .value(0),
1846            1
1847        );
1848        assert_eq!(batch.column(1).as_string::<i32>().value(1), "Bob");
1849    }
1850
1851    // ── Combined json.path + json.explode ───────────────────────
1852
1853    #[test]
1854    fn test_json_path_plus_explode() {
1855        let schema = make_schema(vec![
1856            ("price", DataType::Utf8, true),
1857            ("qty", DataType::Utf8, true),
1858        ]);
1859        let config = JsonDecoderConfig {
1860            json_path: Some(vec!["bids".into()]),
1861            json_explode: Some(vec!["price".into(), "qty".into()]),
1862            ..Default::default()
1863        };
1864        let decoder = JsonDecoder::with_config(schema, config);
1865        let records = vec![json_record(r#"{"bids":[["67523","1.5"],["67522","0.8"]]}"#)];
1866        let batch = decoder.decode_batch(&records).unwrap();
1867        assert_eq!(batch.num_rows(), 2);
1868        assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
1869        assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
1870    }
1871
1872    // ── from_connector_config tests ─────────────────────────────
1873
1874    #[test]
1875    fn test_from_connector_config() {
1876        let mut config = crate::config::ConnectorConfig::new("websocket");
1877        config.set("json.path", "data.trade");
1878        config.set("json.column.stream_name", "stream");
1879        config.set("json.column.ts", "meta.timestamp");
1880        config.set("json.explode", "price, qty");
1881        config.set("schema.enforcement", "strict");
1882        config.set("nested.as.jsonb", "true");
1883
1884        let cfg = JsonDecoderConfig::from_connector_config(&config);
1885        assert_eq!(cfg.json_path, Some(vec!["data".into(), "trade".into()]));
1886        assert_eq!(
1887            cfg.json_column_paths.get("stream_name"),
1888            Some(&vec!["stream".into()])
1889        );
1890        assert_eq!(
1891            cfg.json_column_paths.get("ts"),
1892            Some(&vec!["meta".into(), "timestamp".into()])
1893        );
1894        assert_eq!(cfg.json_explode, Some(vec!["price".into(), "qty".into()]));
1895        assert_eq!(cfg.type_mismatch, TypeMismatchStrategy::Reject);
1896        assert!(cfg.nested_as_jsonb);
1897    }
1898
1899    #[test]
1900    fn test_default_config_unchanged() {
1901        let schema = make_schema(vec![
1902            ("a", DataType::Int64, false),
1903            ("b", DataType::Utf8, true),
1904        ]);
1905        let decoder = JsonDecoder::new(schema);
1906        let records = vec![
1907            json_record(r#"{"a": 1, "b": "hello"}"#),
1908            json_record(r#"{"a": 2, "b": "world"}"#),
1909        ];
1910        let batch = decoder.decode_batch(&records).unwrap();
1911        assert_eq!(batch.num_rows(), 2);
1912        assert_eq!(
1913            batch
1914                .column(0)
1915                .as_primitive::<arrow_array::types::Int64Type>()
1916                .value(0),
1917            1
1918        );
1919        assert_eq!(batch.column(1).as_string::<i32>().value(1), "world");
1920    }
1921
1922    #[test]
1923    fn test_unknown_fields_with_path() {
1924        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1925        let config = JsonDecoderConfig {
1926            json_path: Some(vec!["data".into()]),
1927            unknown_fields: UnknownFieldStrategy::CollectExtra,
1928            ..Default::default()
1929        };
1930        let decoder = JsonDecoder::with_config(schema, config);
1931        let records = vec![json_record(r#"{"data":{"a":1,"extra":"value"}}"#)];
1932        let batch = decoder.decode_batch(&records).unwrap();
1933        assert_eq!(batch.num_columns(), 2); // a + _extra
1934        assert_eq!(batch.schema().field(1).name(), "_extra");
1935        assert!(!batch.column(1).is_null(0));
1936    }
1937}