Skip to main content

laminar_connectors/schema/json/
decoder.rs

1//! JSON format decoder implementing [`FormatDecoder`].
2//!
3//! Converts raw JSON byte payloads into Arrow `RecordBatch`es.
4//! Constructed once at `CREATE SOURCE` time with a frozen Arrow schema;
5//! the decoder is stateless after construction so the Ring 1 hot path
6//! has zero schema lookups.
7#![allow(clippy::disallowed_types)] // cold path: schema management
8
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12
13use arrow_array::builder::{
14    BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    Int8Builder, LargeBinaryBuilder, LargeStringBuilder, ListBuilder, StringBuilder,
16    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
17    TimestampSecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, SchemaRef, TimeUnit};
21
22use crate::schema::error::{SchemaError, SchemaResult};
23use crate::schema::json::jsonb::JsonbEncoder;
24use crate::schema::traits::FormatDecoder;
25use crate::schema::types::RawRecord;
26
27/// Strategy for JSON fields not in the Arrow schema.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum UnknownFieldStrategy {
30    /// Silently ignore unknown fields (default).
31    Ignore,
32    /// Collect unknown fields into an `_extra` `LargeBinary` (JSONB) column.
33    CollectExtra,
34    /// Return a decode error if any unknown field is encountered.
35    Reject,
36}
37
38/// Strategy for JSON values that don't match the expected Arrow type.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum TypeMismatchStrategy {
41    /// Insert null and increment the mismatch counter (default).
42    Null,
43    /// Attempt coercion (e.g., `"123"` → `123` for Int64 columns).
44    Coerce,
45    /// Return a decode error on the first mismatch.
46    Reject,
47}
48
49impl TypeMismatchStrategy {
50    /// Parse from a `WITH` option value (`schema.enforcement`).
51    #[must_use]
52    pub fn from_enforcement_str(s: &str) -> Option<Self> {
53        match s.to_lowercase().as_str() {
54            "coerce" => Some(Self::Coerce),
55            "strict" => Some(Self::Reject),
56            "permissive" => Some(Self::Null),
57            _ => None,
58        }
59    }
60}
61
62/// Epoch unit for *numeric* JSON timestamps feeding a `Timestamp` column
63/// (strings still go through `timestamp_formats`). Default `Millis`
64/// preserves historical behaviour.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub enum EpochUnit {
67    /// Epoch seconds.
68    Seconds,
69    /// Epoch milliseconds (default).
70    #[default]
71    Millis,
72    /// Epoch microseconds.
73    Micros,
74    /// Epoch nanoseconds.
75    Nanos,
76}
77
78impl EpochUnit {
79    /// Parse a `json.column.<col>.epoch_unit` value. Accepts the bare
80    /// unit and the `epoch_*` spelling the WebSocket connector's
81    /// `event.time.format` already uses.
82    #[must_use]
83    pub fn from_str_opt(s: &str) -> Option<Self> {
84        match s.trim().to_lowercase().as_str() {
85            "seconds" | "epoch_seconds" => Some(Self::Seconds),
86            "millis" | "epoch_millis" => Some(Self::Millis),
87            "micros" | "epoch_micros" => Some(Self::Micros),
88            "nanos" | "epoch_nanos" => Some(Self::Nanos),
89            _ => None,
90        }
91    }
92
93    /// Nanoseconds per one unit (used for lossless scaling).
94    const fn nanos_per(self) -> i64 {
95        match self {
96            Self::Seconds => 1_000_000_000,
97            Self::Millis => 1_000_000,
98            Self::Micros => 1_000,
99            Self::Nanos => 1,
100        }
101    }
102}
103
104/// Per-column extraction strategy, pre-computed at Ring 2.
105#[derive(Debug, Clone)]
106enum ColumnExtraction {
107    /// Extract field from the default path target (json.path or root).
108    DefaultPath,
109    /// Extract field from a custom absolute path from root.
110    CustomPath { segments: Vec<String> },
111}
112
113/// JSON decoder configuration.
114#[derive(Debug, Clone)]
115pub struct JsonDecoderConfig {
116    /// How to handle fields present in JSON but absent from the schema.
117    pub unknown_fields: UnknownFieldStrategy,
118
119    /// How to handle type mismatches.
120    pub type_mismatch: TypeMismatchStrategy,
121
122    /// Timestamp format patterns to try when parsing string values
123    /// into Timestamp columns. Tried in order; first match wins.
124    /// Use `"iso8601"` for RFC 3339 / ISO 8601 auto-detection.
125    pub timestamp_formats: Vec<String>,
126
127    /// Whether to encode nested objects as JSONB binary format
128    /// instead of JSON-serialized Utf8. When true, nested objects
129    /// become `LargeBinary` columns with JSONB encoding.
130    pub nested_as_jsonb: bool,
131
132    /// Dot-separated path to navigate before field extraction.
133    /// e.g. `"data"` → navigate into `{"data": {...}}` before extraction.
134    /// e.g. `"data.trade"` → navigate into `{"data":{"trade":{...}}}`.
135    pub json_path: Option<Vec<String>>,
136
137    /// Per-column absolute path overrides: `column_name` → path segments.
138    /// Parsed from `json.column.<name> = 'path.to.field'` options.
139    /// These paths are absolute from the root, ignoring `json_path`.
140    pub json_column_paths: HashMap<String, Vec<String>>,
141
142    /// Per-column numeric-timestamp epoch unit, from
143    /// `json.column.<name>.epoch_unit`. Absent ⇒ [`EpochUnit::Millis`].
144    pub numeric_timestamp_units: HashMap<String, EpochUnit>,
145
146    /// Column names for array-to-rows expansion.
147    /// When set, the target (after `json_path`) must be an array.
148    /// Each element becomes a row; positional names map array elements to columns.
149    pub json_explode: Option<Vec<String>>,
150}
151
152impl Default for JsonDecoderConfig {
153    fn default() -> Self {
154        Self {
155            unknown_fields: UnknownFieldStrategy::Ignore,
156            type_mismatch: TypeMismatchStrategy::Coerce,
157            timestamp_formats: vec![
158                "iso8601".into(),
159                "%Y-%m-%dT%H:%M:%S%.fZ".into(),
160                "%Y-%m-%dT%H:%M:%S%.f%:z".into(),
161                "%Y-%m-%d %H:%M:%S%.f".into(),
162                "%Y-%m-%d %H:%M:%S".into(),
163            ],
164            nested_as_jsonb: false,
165            json_path: None,
166            json_column_paths: HashMap::new(),
167            numeric_timestamp_units: HashMap::new(),
168            json_explode: None,
169        }
170    }
171}
172
173impl JsonDecoderConfig {
174    /// Build config from a [`ConnectorConfig`](crate::config::ConnectorConfig).
175    ///
176    /// Parses `json.path`, `json.column.*`, `json.explode`,
177    /// `schema.enforcement`, and `nested.as.jsonb` properties.
178    /// Called once at Ring 2 (`CREATE SOURCE` time).
179    #[must_use]
180    pub fn from_connector_config(config: &crate::config::ConnectorConfig) -> Self {
181        let mut cfg = Self::default();
182
183        if let Some(path) = config.get("json.path") {
184            cfg.json_path = Some(path.split('.').map(ToString::to_string).collect());
185        }
186
187        let col_props = config.properties_with_prefix("json.column.");
188        for (col_name, val) in col_props {
189            // `json.column.<col>.epoch_unit = '<unit>'` configures numeric
190            // timestamp scaling for <col>; everything else is a path.
191            // (Column names are SQL identifiers — no dots — so the suffix
192            // split is unambiguous against dotted path *values*.)
193            if let Some(col) = col_name.strip_suffix(".epoch_unit") {
194                if let Some(unit) = EpochUnit::from_str_opt(&val) {
195                    cfg.numeric_timestamp_units.insert(col.to_string(), unit);
196                } else {
197                    tracing::warn!(
198                        column = col,
199                        value = %val,
200                        "invalid json.column.<col>.epoch_unit (expected \
201                         seconds|millis|micros|nanos); using millis"
202                    );
203                }
204                continue;
205            }
206            cfg.json_column_paths
207                .insert(col_name, val.split('.').map(ToString::to_string).collect());
208        }
209
210        if let Some(explode) = config.get("json.explode") {
211            cfg.json_explode = Some(explode.split(',').map(|s| s.trim().to_string()).collect());
212        }
213
214        if let Some(enforcement) = config.get("schema.enforcement") {
215            if let Some(strategy) = TypeMismatchStrategy::from_enforcement_str(enforcement) {
216                cfg.type_mismatch = strategy;
217            }
218        }
219        if let Some(v) = config.get("nested.as.jsonb") {
220            cfg.nested_as_jsonb = v.eq_ignore_ascii_case("true");
221        }
222        cfg
223    }
224}
225
226/// Decodes JSON byte payloads into Arrow `RecordBatch`es.
227///
228/// # Ring Placement
229///
230/// - **Ring 1**: `decode_batch()` — parse JSON, build columnar Arrow output
231/// - **Ring 2**: Construction (`new` / `with_config`) — one-time setup
232pub struct JsonDecoder {
233    /// Frozen output schema.
234    schema: SchemaRef,
235    /// Decoder configuration.
236    config: JsonDecoderConfig,
237    /// Pre-computed field index map: field name → column index.
238    field_indices: Vec<(String, usize)>,
239    /// Cumulative type mismatch count (diagnostics).
240    mismatch_count: AtomicU64,
241    /// Ring 2 pre-computed: extraction strategy per schema column.
242    column_extractions: Vec<ColumnExtraction>,
243    /// Ring 2 pre-computed: numeric epoch unit per schema column
244    /// (aligned to `schema.fields()`; default [`EpochUnit::Millis`]).
245    column_epoch_units: Vec<EpochUnit>,
246    /// Ring 2 pre-computed: explode position → schema column index.
247    /// `Some` when `json.explode` is configured; each entry maps an
248    /// explode position to the schema column index (or `None` if
249    /// the explode name doesn't match any schema column).
250    explode_col_indices: Option<Vec<Option<usize>>>,
251}
252
253#[allow(clippy::missing_fields_in_debug)]
254impl std::fmt::Debug for JsonDecoder {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        f.debug_struct("JsonDecoder")
257            .field("schema", &self.schema)
258            .field("config", &self.config)
259            .field(
260                "mismatch_count",
261                &self.mismatch_count.load(Ordering::Relaxed),
262            )
263            .finish()
264    }
265}
266
267impl JsonDecoder {
268    /// Creates a new JSON decoder for the given Arrow schema.
269    #[must_use]
270    pub fn new(schema: SchemaRef) -> Self {
271        Self::with_config(schema, JsonDecoderConfig::default())
272    }
273
274    /// Creates a new JSON decoder with custom configuration.
275    #[must_use]
276    pub fn with_config(schema: SchemaRef, config: JsonDecoderConfig) -> Self {
277        let field_indices: Vec<(String, usize)> = schema
278            .fields()
279            .iter()
280            .enumerate()
281            .map(|(i, f)| (f.name().clone(), i))
282            .collect();
283
284        let column_extractions: Vec<ColumnExtraction> = schema
285            .fields()
286            .iter()
287            .map(|f| {
288                let col_name = f.name();
289                if let Some(path_segments) = config.json_column_paths.get(col_name.as_str()) {
290                    ColumnExtraction::CustomPath {
291                        segments: path_segments.clone(),
292                    }
293                } else {
294                    ColumnExtraction::DefaultPath
295                }
296            })
297            .collect();
298
299        let column_epoch_units: Vec<EpochUnit> = schema
300            .fields()
301            .iter()
302            .map(|f| {
303                config
304                    .numeric_timestamp_units
305                    .get(f.name().as_str())
306                    .copied()
307                    .unwrap_or_default()
308            })
309            .collect();
310
311        let explode_col_indices = config.json_explode.as_ref().map(|names| {
312            names
313                .iter()
314                .map(|name| {
315                    field_indices
316                        .iter()
317                        .find(|(n, _)| n == name)
318                        .map(|(_, idx)| *idx)
319                })
320                .collect()
321        });
322
323        Self {
324            schema,
325            config,
326            field_indices,
327            mismatch_count: AtomicU64::new(0),
328            column_extractions,
329            column_epoch_units,
330            explode_col_indices,
331        }
332    }
333
334    /// Returns the cumulative type mismatch count.
335    pub fn mismatch_count(&self) -> u64 {
336        self.mismatch_count.load(Ordering::Relaxed)
337    }
338}
339
340impl FormatDecoder for JsonDecoder {
341    fn output_schema(&self) -> SchemaRef {
342        self.schema.clone()
343    }
344
345    #[allow(clippy::too_many_lines)]
346    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
347        if records.is_empty() {
348            return Ok(RecordBatch::new_empty(self.schema.clone()));
349        }
350
351        let num_fields = self.schema.fields().len();
352        let capacity = records.len();
353
354        // Initialise one builder per schema column.
355        let mut builders = create_builders(&self.schema, capacity);
356
357        // Optional _extra JSONB column for CollectExtra strategy.
358        let collect_extra = matches!(
359            self.config.unknown_fields,
360            UnknownFieldStrategy::CollectExtra
361        );
362        let mut extra_builder = if collect_extra {
363            Some(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
364        } else {
365            None
366        };
367
368        let mut jsonb_encoder = if self.config.nested_as_jsonb {
369            Some(JsonbEncoder::new())
370        } else {
371            None
372        };
373
374        // Reusable populated-tracking buffer — avoids heap alloc per record.
375        let mut populated = vec![false; num_fields];
376
377        for record in records {
378            let value: serde_json::Value = serde_json::from_slice(&record.value)
379                .map_err(|e| SchemaError::DecodeError(format!("JSON parse error: {e}")))?;
380
381            // Navigate json.path → default target (borrowed, ZERO alloc).
382            // Records lacking the configured path (subscribe acks, server
383            // status frames) are skipped silently. Erroring the batch would
384            // poison the source buffer since failures don't clear consumed
385            // messages.
386            let default_target: &serde_json::Value =
387                match navigate_path_opt(&value, self.config.json_path.as_deref()) {
388                    Some(v) => v,
389                    None => continue,
390                };
391
392            if let Some(ref col_indices) = self.explode_col_indices {
393                // === EXPLODE MODE ===
394                let arr = default_target.as_array().ok_or_else(|| {
395                    SchemaError::DecodeError("json.explode target must be an array".into())
396                })?;
397                for element in arr {
398                    populated.fill(false);
399                    match element {
400                        serde_json::Value::Array(items) => {
401                            for (pos, col_idx_opt) in col_indices.iter().enumerate() {
402                                if let Some(col_idx) = col_idx_opt {
403                                    let val = items.get(pos).unwrap_or(&serde_json::Value::Null);
404                                    let field = &self.schema.fields()[*col_idx];
405                                    append_value(
406                                        &mut builders[*col_idx],
407                                        field.data_type(),
408                                        val,
409                                        &self.config,
410                                        &self.mismatch_count,
411                                        jsonb_encoder.as_mut(),
412                                        self.column_epoch_units[*col_idx],
413                                    )?;
414                                    populated[*col_idx] = true;
415                                }
416                            }
417                        }
418                        serde_json::Value::Object(obj) => {
419                            for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
420                                if let Some(val) = obj.get(col_name.as_str()) {
421                                    let field = &self.schema.fields()[col_idx];
422                                    append_value(
423                                        &mut builders[col_idx],
424                                        field.data_type(),
425                                        val,
426                                        &self.config,
427                                        &self.mismatch_count,
428                                        jsonb_encoder.as_mut(),
429                                        self.column_epoch_units[col_idx],
430                                    )?;
431                                    populated[col_idx] = true;
432                                }
433                            }
434                        }
435                        _ => {
436                            return Err(SchemaError::DecodeError(
437                                "json.explode array elements must be arrays or objects".into(),
438                            ));
439                        }
440                    }
441                    // Append nulls for missing fields.
442                    for (col_idx, was_populated) in populated.iter().enumerate() {
443                        if !was_populated {
444                            append_null(&mut builders[col_idx]);
445                        }
446                    }
447                    // _extra column: append null for each exploded row.
448                    if let Some(ref mut eb) = extra_builder {
449                        eb.append_null();
450                    }
451                }
452            } else {
453                // === NORMAL OBJECT MODE (with per-column path support) ===
454                let default_obj = default_target.as_object().ok_or_else(|| {
455                    SchemaError::DecodeError("JSON value must be an object".into())
456                })?;
457
458                populated.fill(false);
459
460                // Collect unknown fields for CollectExtra.
461                let mut extra_fields: Option<serde_json::Map<String, serde_json::Value>> =
462                    if collect_extra {
463                        Some(serde_json::Map::new())
464                    } else {
465                        None
466                    };
467
468                // Per-column extraction using pre-computed ColumnExtraction.
469                for (col_idx, (col_name, _)) in self.field_indices.iter().enumerate() {
470                    match &self.column_extractions[col_idx] {
471                        ColumnExtraction::DefaultPath => {
472                            if let Some(val) = default_obj.get(col_name.as_str()) {
473                                populated[col_idx] = true;
474                                let field = &self.schema.fields()[col_idx];
475                                append_value(
476                                    &mut builders[col_idx],
477                                    field.data_type(),
478                                    val,
479                                    &self.config,
480                                    &self.mismatch_count,
481                                    jsonb_encoder.as_mut(),
482                                    self.column_epoch_units[col_idx],
483                                )?;
484                            }
485                        }
486                        ColumnExtraction::CustomPath { segments } => {
487                            // Navigate custom path from root (ZERO alloc).
488                            let extracted = navigate_path(&value, segments);
489                            if let Some(val) = extracted {
490                                populated[col_idx] = true;
491                                let field = &self.schema.fields()[col_idx];
492                                append_value(
493                                    &mut builders[col_idx],
494                                    field.data_type(),
495                                    val,
496                                    &self.config,
497                                    &self.mismatch_count,
498                                    jsonb_encoder.as_mut(),
499                                    self.column_epoch_units[col_idx],
500                                )?;
501                            }
502                        }
503                    }
504                }
505
506                // Collect unknown fields (fields in default_obj not in schema).
507                if collect_extra {
508                    for (key, val) in default_obj {
509                        if self.field_index(key).is_none() {
510                            match self.config.unknown_fields {
511                                UnknownFieldStrategy::CollectExtra => {
512                                    if let Some(ref mut extra) = extra_fields {
513                                        extra.insert(key.clone(), val.clone());
514                                    }
515                                }
516                                UnknownFieldStrategy::Reject => {
517                                    return Err(SchemaError::DecodeError(format!(
518                                        "unknown field '{key}' not in schema"
519                                    )));
520                                }
521                                UnknownFieldStrategy::Ignore => {}
522                            }
523                        }
524                    }
525                } else if matches!(self.config.unknown_fields, UnknownFieldStrategy::Reject) {
526                    for key in default_obj.keys() {
527                        if self.field_index(key).is_none() {
528                            return Err(SchemaError::DecodeError(format!(
529                                "unknown field '{key}' not in schema"
530                            )));
531                        }
532                    }
533                }
534
535                // Append nulls for missing fields.
536                for (col_idx, was_populated) in populated.iter().enumerate() {
537                    if !was_populated {
538                        append_null(&mut builders[col_idx]);
539                    }
540                }
541
542                // Append _extra column.
543                if let Some(ref mut eb) = extra_builder {
544                    if let Some(ref extra) = extra_fields {
545                        if extra.is_empty() {
546                            eb.append_null();
547                        } else {
548                            let mut enc = jsonb_encoder
549                                .as_mut()
550                                .map_or_else(JsonbEncoder::new, |_| JsonbEncoder::new());
551                            let bytes = enc.encode(&serde_json::Value::Object(extra.clone()));
552                            eb.append_value(&bytes);
553                        }
554                    } else {
555                        eb.append_null();
556                    }
557                }
558            }
559        }
560
561        // Finish all builders into arrays.
562        let mut columns: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
563
564        // Append _extra column if present.
565        let final_schema = if let Some(mut eb) = extra_builder {
566            columns.push(Arc::new(eb.finish()));
567            let mut fields = self.schema.fields().to_vec();
568            fields.push(Arc::new(arrow_schema::Field::new(
569                "_extra",
570                DataType::LargeBinary,
571                true,
572            )));
573            Arc::new(arrow_schema::Schema::new(fields))
574        } else {
575            self.schema.clone()
576        };
577
578        RecordBatch::try_new(final_schema, columns)
579            .map_err(|e| SchemaError::DecodeError(format!("RecordBatch construction: {e}")))
580    }
581
582    #[allow(clippy::unnecessary_literal_bound)]
583    fn format_name(&self) -> &str {
584        "json"
585    }
586}
587
588impl JsonDecoder {
589    /// O(n) field lookup. For schemas with many fields, consider switching
590    /// to a `HashMap`; for typical schemas (<50 fields) linear scan is faster.
591    fn field_index(&self, name: &str) -> Option<usize> {
592        self.field_indices
593            .iter()
594            .find(|(n, _)| n == name)
595            .map(|(_, idx)| *idx)
596    }
597}
598
599/// Navigate a dot-path through a JSON value tree. Returns `None` if any
600/// segment is missing. Zero allocations — pure pointer-chasing.
601fn navigate_path<'a>(
602    root: &'a serde_json::Value,
603    segments: &[String],
604) -> Option<&'a serde_json::Value> {
605    let mut current = root;
606    for segment in segments {
607        current = current.get(segment.as_str())?;
608    }
609    Some(current)
610}
611
612/// Like `navigate_path`, but `None` segments mean "no path configured —
613/// return the root unchanged."
614fn navigate_path_opt<'a>(
615    root: &'a serde_json::Value,
616    segments: Option<&[String]>,
617) -> Option<&'a serde_json::Value> {
618    match segments {
619        Some(s) => navigate_path(root, s),
620        None => Some(root),
621    }
622}
623
624// ── Builder helpers ────────────────────────────────────────────────
625
626/// Trait-object wrapper so we can store heterogeneous builders in a `Vec`.
627trait ColumnBuilder: Send {
628    fn finish(&mut self) -> ArrayRef;
629    fn append_null_value(&mut self);
630    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
631}
632
633macro_rules! impl_column_builder {
634    ($builder:ty, $array:ty) => {
635        impl ColumnBuilder for $builder {
636            fn finish(&mut self) -> ArrayRef {
637                Arc::new(<$builder>::finish(self))
638            }
639            fn append_null_value(&mut self) {
640                self.append_null();
641            }
642            fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
643                self
644            }
645        }
646    };
647}
648
649impl_column_builder!(BooleanBuilder, arrow_array::BooleanArray);
650impl_column_builder!(Int8Builder, arrow_array::Int8Array);
651impl_column_builder!(Int16Builder, arrow_array::Int16Array);
652impl_column_builder!(Int32Builder, arrow_array::Int32Array);
653impl_column_builder!(Int64Builder, arrow_array::Int64Array);
654impl_column_builder!(UInt8Builder, arrow_array::UInt8Array);
655impl_column_builder!(UInt16Builder, arrow_array::UInt16Array);
656impl_column_builder!(UInt32Builder, arrow_array::UInt32Array);
657impl_column_builder!(UInt64Builder, arrow_array::UInt64Array);
658impl_column_builder!(Float32Builder, arrow_array::Float32Array);
659impl_column_builder!(Float64Builder, arrow_array::Float64Array);
660impl_column_builder!(StringBuilder, arrow_array::StringArray);
661impl_column_builder!(ListBuilder<StringBuilder>, arrow_array::ListArray);
662impl_column_builder!(LargeStringBuilder, arrow_array::LargeStringArray);
663impl_column_builder!(LargeBinaryBuilder, arrow_array::LargeBinaryArray);
664impl_column_builder!(TimestampSecondBuilder, arrow_array::TimestampSecondArray);
665impl_column_builder!(
666    TimestampMillisecondBuilder,
667    arrow_array::TimestampMillisecondArray
668);
669impl_column_builder!(
670    TimestampMicrosecondBuilder,
671    arrow_array::TimestampMicrosecondArray
672);
673impl_column_builder!(
674    TimestampNanosecondBuilder,
675    arrow_array::TimestampNanosecondArray
676);
677
678fn create_builders(schema: &SchemaRef, capacity: usize) -> Vec<Box<dyn ColumnBuilder>> {
679    schema
680        .fields()
681        .iter()
682        .map(|f| create_builder(f.data_type(), capacity))
683        .collect()
684}
685
686fn create_builder(data_type: &DataType, capacity: usize) -> Box<dyn ColumnBuilder> {
687    match data_type {
688        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
689        DataType::Int8 => Box::new(Int8Builder::with_capacity(capacity)),
690        DataType::Int16 => Box::new(Int16Builder::with_capacity(capacity)),
691        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
692        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
693        DataType::UInt8 => Box::new(UInt8Builder::with_capacity(capacity)),
694        DataType::UInt16 => Box::new(UInt16Builder::with_capacity(capacity)),
695        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
696        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
697        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
698        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
699        DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, capacity * 32)),
700        DataType::LargeBinary => {
701            Box::new(LargeBinaryBuilder::with_capacity(capacity, capacity * 64))
702        }
703        DataType::Timestamp(TimeUnit::Second, tz) => {
704            let builder =
705                TimestampSecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
706            Box::new(builder)
707        }
708        DataType::Timestamp(TimeUnit::Millisecond, tz) => {
709            let builder =
710                TimestampMillisecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
711            Box::new(builder)
712        }
713        DataType::Timestamp(TimeUnit::Microsecond, tz) => {
714            let builder =
715                TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
716            Box::new(builder)
717        }
718        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
719            let builder =
720                TimestampNanosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
721            Box::new(builder)
722        }
723        DataType::List(field) if matches!(field.data_type(), DataType::Utf8) => {
724            Box::new(ListBuilder::new(StringBuilder::new()))
725        }
726        // Fallback: serialize as JSON string.
727        _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)),
728    }
729}
730
731fn append_null(builder: &mut Box<dyn ColumnBuilder>) {
732    builder.append_null_value();
733}
734
735/// Append a JSON value to the appropriate builder column.
736#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
737fn append_value(
738    builder: &mut Box<dyn ColumnBuilder>,
739    target_type: &DataType,
740    value: &serde_json::Value,
741    config: &JsonDecoderConfig,
742    mismatch_count: &AtomicU64,
743    jsonb_encoder: Option<&mut JsonbEncoder>,
744    numeric_ts_unit: EpochUnit,
745) -> SchemaResult<()> {
746    if value.is_null() {
747        builder.append_null_value();
748        return Ok(());
749    }
750
751    match target_type {
752        DataType::Boolean => {
753            let b = builder
754                .as_any_mut()
755                .downcast_mut::<BooleanBuilder>()
756                .unwrap();
757            match extract_bool(value, config) {
758                Ok(v) => b.append_value(v),
759                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
760            }
761        }
762        DataType::Int8 => {
763            let b = builder.as_any_mut().downcast_mut::<Int8Builder>().unwrap();
764            match extract_i8(value, config) {
765                Ok(v) => b.append_value(v),
766                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
767            }
768        }
769        DataType::Int16 => {
770            let b = builder.as_any_mut().downcast_mut::<Int16Builder>().unwrap();
771            match extract_i16(value, config) {
772                Ok(v) => b.append_value(v),
773                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
774            }
775        }
776        DataType::Int32 => {
777            let b = builder.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
778            match extract_i32(value, config) {
779                Ok(v) => b.append_value(v),
780                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
781            }
782        }
783        DataType::Int64 => {
784            let b = builder.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
785            match extract_i64(value, config) {
786                Ok(v) => b.append_value(v),
787                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
788            }
789        }
790        DataType::UInt8 => {
791            let b = builder.as_any_mut().downcast_mut::<UInt8Builder>().unwrap();
792            match extract_u8(value, config) {
793                Ok(v) => b.append_value(v),
794                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
795            }
796        }
797        DataType::UInt16 => {
798            let b = builder
799                .as_any_mut()
800                .downcast_mut::<UInt16Builder>()
801                .unwrap();
802            match extract_u16(value, config) {
803                Ok(v) => b.append_value(v),
804                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
805            }
806        }
807        DataType::UInt32 => {
808            let b = builder
809                .as_any_mut()
810                .downcast_mut::<UInt32Builder>()
811                .unwrap();
812            match extract_u32(value, config) {
813                Ok(v) => b.append_value(v),
814                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
815            }
816        }
817        DataType::UInt64 => {
818            let b = builder
819                .as_any_mut()
820                .downcast_mut::<UInt64Builder>()
821                .unwrap();
822            match extract_u64(value, config) {
823                Ok(v) => b.append_value(v),
824                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
825            }
826        }
827        DataType::Float32 => {
828            let b = builder
829                .as_any_mut()
830                .downcast_mut::<Float32Builder>()
831                .unwrap();
832            match extract_f32(value, config) {
833                Ok(v) => b.append_value(v),
834                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
835            }
836        }
837        DataType::Float64 => {
838            let b = builder
839                .as_any_mut()
840                .downcast_mut::<Float64Builder>()
841                .unwrap();
842            match extract_f64(value, config) {
843                Ok(v) => b.append_value(v),
844                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
845            }
846        }
847        DataType::LargeUtf8 => {
848            let b = builder
849                .as_any_mut()
850                .downcast_mut::<LargeStringBuilder>()
851                .unwrap();
852            let s = value_to_string(value);
853            b.append_value(&s);
854        }
855        DataType::LargeBinary => {
856            let b = builder
857                .as_any_mut()
858                .downcast_mut::<LargeBinaryBuilder>()
859                .unwrap();
860            if let Some(enc) = jsonb_encoder {
861                let bytes = enc.encode(value);
862                b.append_value(&bytes);
863            } else {
864                // Fallback: serialize as JSON bytes.
865                let bytes = serde_json::to_vec(value).unwrap_or_default();
866                b.append_value(&bytes);
867            }
868        }
869        DataType::Timestamp(unit, _) => {
870            match extract_timestamp(value, config, *unit, numeric_ts_unit) {
871                Ok(ts) => append_timestamp(builder, *unit, ts),
872                Err(e) => handle_mismatch(builder, config, mismatch_count, &e)?,
873            }
874        }
875        DataType::List(field) if matches!(field.data_type(), DataType::Utf8) => {
876            if let Some(items) = value.as_array() {
877                let b = builder
878                    .as_any_mut()
879                    .downcast_mut::<ListBuilder<StringBuilder>>()
880                    .unwrap();
881                for item in items {
882                    if let Some(s) = item.as_str() {
883                        b.values().append_value(s);
884                    } else if item.is_null() {
885                        b.values().append_null();
886                    } else {
887                        b.values().append_value(value_to_string(item));
888                    }
889                }
890                b.append(true);
891            } else {
892                // A non-array value for a list column is a type mismatch; honor
893                // the Null/Coerce/Reject policy like the scalar arms above.
894                handle_mismatch(
895                    builder,
896                    config,
897                    mismatch_count,
898                    &format!("expected array, got {}", json_type_name(value)),
899                )?;
900            }
901        }
902        // Unsupported types: serialize as JSON string.
903        _ => {
904            let b = builder
905                .as_any_mut()
906                .downcast_mut::<StringBuilder>()
907                .unwrap();
908            let s = value_to_string(value);
909            b.append_value(&s);
910        }
911    }
912
913    Ok(())
914}
915
916fn handle_mismatch(
917    builder: &mut Box<dyn ColumnBuilder>,
918    config: &JsonDecoderConfig,
919    mismatch_count: &AtomicU64,
920    error_msg: &str,
921) -> SchemaResult<()> {
922    match config.type_mismatch {
923        TypeMismatchStrategy::Null => {
924            mismatch_count.fetch_add(1, Ordering::Relaxed);
925            builder.append_null_value();
926            Ok(())
927        }
928        TypeMismatchStrategy::Coerce => {
929            // Coercion already failed in the extractor — this is a real error.
930            Err(SchemaError::DecodeError(format!(
931                "type coercion failed: {error_msg}"
932            )))
933        }
934        TypeMismatchStrategy::Reject => Err(SchemaError::DecodeError(format!(
935            "type mismatch: {error_msg}"
936        ))),
937    }
938}
939
940// ── Value extractors ───────────────────────────────────────────────
941
942fn extract_bool(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<bool, String> {
943    if let Some(b) = value.as_bool() {
944        return Ok(b);
945    }
946    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
947        if let Some(s) = value.as_str() {
948            match s.to_ascii_lowercase().as_str() {
949                "true" | "1" | "yes" => return Ok(true),
950                "false" | "0" | "no" => return Ok(false),
951                _ => {}
952            }
953        }
954        if let Some(n) = value.as_i64() {
955            return Ok(n != 0);
956        }
957    }
958    Err(format!("expected boolean, got {}", json_type_name(value)))
959}
960
961fn extract_i8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i8, String> {
962    if let Some(n) = value.as_i64() {
963        if let Ok(v) = i8::try_from(n) {
964            return Ok(v);
965        }
966        return Err(format!("integer {n} out of i8 range"));
967    }
968    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
969        if let Some(s) = value.as_str() {
970            if let Ok(v) = s.parse::<i8>() {
971                return Ok(v);
972            }
973        }
974        if let Some(f) = value.as_f64() {
975            #[allow(clippy::cast_possible_truncation)]
976            let v = f as i8;
977            return Ok(v);
978        }
979    }
980    Err(format!("expected i8, got {}", json_type_name(value)))
981}
982
983fn extract_i16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i16, String> {
984    if let Some(n) = value.as_i64() {
985        if let Ok(v) = i16::try_from(n) {
986            return Ok(v);
987        }
988        return Err(format!("integer {n} out of i16 range"));
989    }
990    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
991        if let Some(s) = value.as_str() {
992            if let Ok(v) = s.parse::<i16>() {
993                return Ok(v);
994            }
995        }
996        if let Some(f) = value.as_f64() {
997            #[allow(clippy::cast_possible_truncation)]
998            let v = f as i16;
999            return Ok(v);
1000        }
1001    }
1002    Err(format!("expected i16, got {}", json_type_name(value)))
1003}
1004
1005fn extract_i32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i32, String> {
1006    if let Some(n) = value.as_i64() {
1007        if let Ok(v) = i32::try_from(n) {
1008            return Ok(v);
1009        }
1010        return Err(format!("integer {n} out of i32 range"));
1011    }
1012    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1013        if let Some(s) = value.as_str() {
1014            if let Ok(v) = s.parse::<i32>() {
1015                return Ok(v);
1016            }
1017        }
1018        if let Some(f) = value.as_f64() {
1019            #[allow(clippy::cast_possible_truncation)]
1020            return Ok(f as i32);
1021        }
1022    }
1023    Err(format!("expected i32, got {}", json_type_name(value)))
1024}
1025
1026fn extract_i64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<i64, String> {
1027    if let Some(n) = value.as_i64() {
1028        return Ok(n);
1029    }
1030    if let Some(n) = value.as_u64() {
1031        if let Ok(v) = i64::try_from(n) {
1032            return Ok(v);
1033        }
1034        return Err(format!("u64 {n} out of i64 range"));
1035    }
1036    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1037        if let Some(s) = value.as_str() {
1038            if let Ok(v) = s.parse::<i64>() {
1039                return Ok(v);
1040            }
1041        }
1042        if let Some(f) = value.as_f64() {
1043            #[allow(clippy::cast_possible_truncation)]
1044            return Ok(f as i64);
1045        }
1046    }
1047    Err(format!("expected i64, got {}", json_type_name(value)))
1048}
1049
1050fn extract_f32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f32, String> {
1051    if let Some(f) = value.as_f64() {
1052        #[allow(clippy::cast_possible_truncation)]
1053        return Ok(f as f32);
1054    }
1055    if let Some(n) = value.as_i64() {
1056        #[allow(clippy::cast_precision_loss)]
1057        return Ok(n as f32);
1058    }
1059    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1060        if let Some(s) = value.as_str() {
1061            if let Ok(v) = s.parse::<f32>() {
1062                return Ok(v);
1063            }
1064        }
1065    }
1066    Err(format!("expected f32, got {}", json_type_name(value)))
1067}
1068
1069fn extract_f64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<f64, String> {
1070    if let Some(f) = value.as_f64() {
1071        return Ok(f);
1072    }
1073    if let Some(n) = value.as_i64() {
1074        #[allow(clippy::cast_precision_loss)]
1075        return Ok(n as f64);
1076    }
1077    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1078        if let Some(s) = value.as_str() {
1079            if let Ok(v) = s.parse::<f64>() {
1080                return Ok(v);
1081            }
1082        }
1083    }
1084    Err(format!("expected f64, got {}", json_type_name(value)))
1085}
1086
1087fn extract_u8(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u8, String> {
1088    if let Some(n) = value.as_u64() {
1089        if let Ok(v) = u8::try_from(n) {
1090            return Ok(v);
1091        }
1092        return Err(format!("integer {n} out of u8 range"));
1093    }
1094    if let Some(n) = value.as_i64() {
1095        if let Ok(v) = u8::try_from(n) {
1096            return Ok(v);
1097        }
1098        return Err(format!("integer {n} out of u8 range"));
1099    }
1100    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1101        if let Some(s) = value.as_str() {
1102            if let Ok(v) = s.parse::<u8>() {
1103                return Ok(v);
1104            }
1105        }
1106    }
1107    Err(format!("expected u8, got {}", json_type_name(value)))
1108}
1109
1110fn extract_u16(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u16, String> {
1111    if let Some(n) = value.as_u64() {
1112        if let Ok(v) = u16::try_from(n) {
1113            return Ok(v);
1114        }
1115        return Err(format!("integer {n} out of u16 range"));
1116    }
1117    if let Some(n) = value.as_i64() {
1118        if let Ok(v) = u16::try_from(n) {
1119            return Ok(v);
1120        }
1121        return Err(format!("integer {n} out of u16 range"));
1122    }
1123    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1124        if let Some(s) = value.as_str() {
1125            if let Ok(v) = s.parse::<u16>() {
1126                return Ok(v);
1127            }
1128        }
1129    }
1130    Err(format!("expected u16, got {}", json_type_name(value)))
1131}
1132
1133fn extract_u32(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u32, String> {
1134    if let Some(n) = value.as_u64() {
1135        if let Ok(v) = u32::try_from(n) {
1136            return Ok(v);
1137        }
1138        return Err(format!("integer {n} out of u32 range"));
1139    }
1140    if let Some(n) = value.as_i64() {
1141        if let Ok(v) = u32::try_from(n) {
1142            return Ok(v);
1143        }
1144        return Err(format!("integer {n} out of u32 range"));
1145    }
1146    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1147        if let Some(s) = value.as_str() {
1148            if let Ok(v) = s.parse::<u32>() {
1149                return Ok(v);
1150            }
1151        }
1152    }
1153    Err(format!("expected u32, got {}", json_type_name(value)))
1154}
1155
1156fn extract_u64(value: &serde_json::Value, config: &JsonDecoderConfig) -> Result<u64, String> {
1157    if let Some(n) = value.as_u64() {
1158        return Ok(n);
1159    }
1160    if let Some(n) = value.as_i64() {
1161        if let Ok(v) = u64::try_from(n) {
1162            return Ok(v);
1163        }
1164        return Err(format!("integer {n} out of u64 range"));
1165    }
1166    if matches!(config.type_mismatch, TypeMismatchStrategy::Coerce) {
1167        if let Some(s) = value.as_str() {
1168            if let Ok(v) = s.parse::<u64>() {
1169                return Ok(v);
1170            }
1171        }
1172    }
1173    Err(format!("expected u64, got {}", json_type_name(value)))
1174}
1175
1176/// Extracts a timestamp value as an i64 in the specified [`TimeUnit`].
1177///
1178/// For numeric JSON values, scales from `from` (the column's configured
1179/// [`EpochUnit`], default millis) to the target Arrow `TimeUnit`. For
1180/// string values, tries the configured timestamp format patterns (the
1181/// `from` unit does not apply — strings carry their own resolution).
1182fn extract_timestamp(
1183    value: &serde_json::Value,
1184    config: &JsonDecoderConfig,
1185    unit: TimeUnit,
1186    from: EpochUnit,
1187) -> Result<i64, String> {
1188    // Numeric values: scale from the configured epoch unit.
1189    if let Some(n) = value.as_i64() {
1190        return checked_epoch_to_unit(n, from, unit);
1191    }
1192    if let Some(f) = value.as_f64() {
1193        // 2^63 == i64::MAX + 1; exclusive upper bound for a lossless f64->i64.
1194        const POW2_63: f64 = 9_223_372_036_854_775_808.0;
1195        // `f as i64` saturates (NaN -> 0, +-huge -> i64::MIN/MAX), silently
1196        // turning a garbage timestamp into a valid-looking one that poisons
1197        // event-time/watermarks. Round to the nearest integer in `from`
1198        // units and reject out-of-range/non-finite so the configured
1199        // type-mismatch strategy applies, like any other bad value.
1200        let rounded = f.round();
1201        if !(-POW2_63..POW2_63).contains(&rounded) {
1202            return Err(format!("timestamp {f} out of i64 range"));
1203        }
1204        #[allow(clippy::cast_possible_truncation)] // range-checked; already integral
1205        let v = rounded as i64;
1206        return checked_epoch_to_unit(v, from, unit);
1207    }
1208
1209    // String values: try configured timestamp formats.
1210    if let Some(s) = value.as_str() {
1211        for fmt in &config.timestamp_formats {
1212            if fmt == "iso8601" {
1213                if let Ok(nanos) = arrow_cast::parse::string_to_timestamp_nanos(s) {
1214                    return Ok(nanos_to_unit(nanos, unit));
1215                }
1216                continue;
1217            }
1218            if let Ok(ndt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
1219                let nanos = ndt.and_utc().timestamp_nanos_opt().unwrap_or(0);
1220                return Ok(nanos_to_unit(nanos, unit));
1221            }
1222        }
1223        return Err(format!("cannot parse timestamp from string: {s}"));
1224    }
1225
1226    Err(format!("expected timestamp, got {}", json_type_name(value)))
1227}
1228
1229/// Scales an integer epoch `value` from `from` units to the target Arrow
1230/// `TimeUnit`. Up-scaling (e.g. seconds→nanos) is checked and errors
1231/// rather than wrapping on i64 overflow; down-scaling (e.g. nanos→millis)
1232/// truncates toward zero, the conventional and expected precision loss.
1233///
1234/// `from == EpochUnit::Millis` reproduces the historical
1235/// `checked_millis_to_unit` behaviour bit-for-bit.
1236fn checked_epoch_to_unit(value: i64, from: EpochUnit, to: TimeUnit) -> Result<i64, String> {
1237    let from_ns = from.nanos_per();
1238    let to_ns = match to {
1239        TimeUnit::Second => 1_000_000_000,
1240        TimeUnit::Millisecond => 1_000_000,
1241        TimeUnit::Microsecond => 1_000,
1242        TimeUnit::Nanosecond => 1,
1243    };
1244    if from_ns >= to_ns {
1245        // Exact integer factor (both are powers-of-ten multiples of 1ns).
1246        let factor = from_ns / to_ns;
1247        value
1248            .checked_mul(factor)
1249            .ok_or_else(|| format!("timestamp {value} ({from:?}) out of i64 {to:?} range"))
1250    } else {
1251        let factor = to_ns / from_ns;
1252        Ok(value / factor)
1253    }
1254}
1255
1256/// Converts nanoseconds to the target time unit.
1257fn nanos_to_unit(nanos: i64, unit: TimeUnit) -> i64 {
1258    match unit {
1259        TimeUnit::Second => nanos / 1_000_000_000,
1260        TimeUnit::Millisecond => nanos / 1_000_000,
1261        TimeUnit::Microsecond => nanos / 1_000,
1262        TimeUnit::Nanosecond => nanos,
1263    }
1264}
1265
1266/// Appends a timestamp value to the appropriate builder based on [`TimeUnit`].
1267fn append_timestamp(builder: &mut Box<dyn ColumnBuilder>, unit: TimeUnit, value: i64) {
1268    match unit {
1269        TimeUnit::Second => {
1270            builder
1271                .as_any_mut()
1272                .downcast_mut::<TimestampSecondBuilder>()
1273                .unwrap()
1274                .append_value(value);
1275        }
1276        TimeUnit::Millisecond => {
1277            builder
1278                .as_any_mut()
1279                .downcast_mut::<TimestampMillisecondBuilder>()
1280                .unwrap()
1281                .append_value(value);
1282        }
1283        TimeUnit::Microsecond => {
1284            builder
1285                .as_any_mut()
1286                .downcast_mut::<TimestampMicrosecondBuilder>()
1287                .unwrap()
1288                .append_value(value);
1289        }
1290        TimeUnit::Nanosecond => {
1291            builder
1292                .as_any_mut()
1293                .downcast_mut::<TimestampNanosecondBuilder>()
1294                .unwrap()
1295                .append_value(value);
1296        }
1297    }
1298}
1299
1300fn value_to_string(value: &serde_json::Value) -> String {
1301    match value {
1302        serde_json::Value::String(s) => s.clone(),
1303        other => other.to_string(),
1304    }
1305}
1306
1307fn json_type_name(value: &serde_json::Value) -> &'static str {
1308    match value {
1309        serde_json::Value::Null => "null",
1310        serde_json::Value::Bool(_) => "boolean",
1311        serde_json::Value::Number(_) => "number",
1312        serde_json::Value::String(_) => "string",
1313        serde_json::Value::Array(_) => "array",
1314        serde_json::Value::Object(_) => "object",
1315    }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320    use super::*;
1321    use arrow_array::cast::AsArray;
1322    use arrow_schema::{Field, Schema};
1323
1324    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
1325        Arc::new(Schema::new(
1326            fields
1327                .into_iter()
1328                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
1329                .collect::<Vec<_>>(),
1330        ))
1331    }
1332
1333    fn json_record(json: &str) -> RawRecord {
1334        RawRecord::new(json.as_bytes().to_vec())
1335    }
1336
1337    // ── Basic decode tests ────────────────────────────────────
1338
1339    #[test]
1340    fn test_decode_empty_batch() {
1341        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1342        let decoder = JsonDecoder::new(schema.clone());
1343        let batch = decoder.decode_batch(&[]).unwrap();
1344        assert_eq!(batch.num_rows(), 0);
1345        assert_eq!(batch.schema(), schema);
1346    }
1347
1348    #[test]
1349    fn test_decode_single_record() {
1350        let schema = make_schema(vec![
1351            ("id", DataType::Int64, false),
1352            ("name", DataType::Utf8, true),
1353        ]);
1354        let decoder = JsonDecoder::new(schema);
1355        let records = vec![json_record(r#"{"id": 42, "name": "Alice"}"#)];
1356        let batch = decoder.decode_batch(&records).unwrap();
1357
1358        assert_eq!(batch.num_rows(), 1);
1359        assert_eq!(
1360            batch
1361                .column(0)
1362                .as_primitive::<arrow_array::types::Int64Type>()
1363                .value(0),
1364            42
1365        );
1366        assert_eq!(batch.column(1).as_string::<i32>().value(0), "Alice");
1367    }
1368
1369    #[test]
1370    fn test_decode_multiple_records() {
1371        let schema = make_schema(vec![
1372            ("x", DataType::Int64, false),
1373            ("y", DataType::Float64, false),
1374        ]);
1375        let decoder = JsonDecoder::new(schema);
1376        let records = vec![
1377            json_record(r#"{"x": 1, "y": 1.5}"#),
1378            json_record(r#"{"x": 2, "y": 2.5}"#),
1379            json_record(r#"{"x": 3, "y": 3.5}"#),
1380        ];
1381        let batch = decoder.decode_batch(&records).unwrap();
1382
1383        assert_eq!(batch.num_rows(), 3);
1384        let x_col = batch
1385            .column(0)
1386            .as_primitive::<arrow_array::types::Int64Type>();
1387        assert_eq!(x_col.value(0), 1);
1388        assert_eq!(x_col.value(1), 2);
1389        assert_eq!(x_col.value(2), 3);
1390    }
1391
1392    #[test]
1393    fn test_decode_all_types() {
1394        let schema = make_schema(vec![
1395            ("bool_col", DataType::Boolean, false),
1396            ("int_col", DataType::Int64, false),
1397            ("float_col", DataType::Float64, false),
1398            ("str_col", DataType::Utf8, false),
1399        ]);
1400        let decoder = JsonDecoder::new(schema);
1401        let records = vec![json_record(
1402            r#"{"bool_col": true, "int_col": 42, "float_col": 3.14, "str_col": "hello"}"#,
1403        )];
1404        let batch = decoder.decode_batch(&records).unwrap();
1405
1406        assert_eq!(batch.num_rows(), 1);
1407        assert!(batch.column(0).as_boolean().value(0));
1408        assert_eq!(
1409            batch
1410                .column(1)
1411                .as_primitive::<arrow_array::types::Int64Type>()
1412                .value(0),
1413            42
1414        );
1415        let f = batch
1416            .column(2)
1417            .as_primitive::<arrow_array::types::Float64Type>()
1418            .value(0);
1419        assert!((f - 3.14).abs() < f64::EPSILON);
1420        assert_eq!(batch.column(3).as_string::<i32>().value(0), "hello");
1421    }
1422
1423    // ── Null handling ─────────────────────────────────────────
1424
1425    #[test]
1426    fn test_decode_null_values() {
1427        let schema = make_schema(vec![
1428            ("a", DataType::Int64, true),
1429            ("b", DataType::Utf8, true),
1430        ]);
1431        let decoder = JsonDecoder::new(schema);
1432        let records = vec![json_record(r#"{"a": null, "b": null}"#)];
1433        let batch = decoder.decode_batch(&records).unwrap();
1434
1435        assert!(batch.column(0).is_null(0));
1436        assert!(batch.column(1).is_null(0));
1437    }
1438
1439    #[test]
1440    fn test_decode_missing_field_becomes_null() {
1441        let schema = make_schema(vec![
1442            ("a", DataType::Int64, true),
1443            ("b", DataType::Utf8, true),
1444        ]);
1445        let decoder = JsonDecoder::new(schema);
1446        let records = vec![json_record(r#"{"a": 1}"#)]; // "b" missing
1447        let batch = decoder.decode_batch(&records).unwrap();
1448
1449        assert_eq!(
1450            batch
1451                .column(0)
1452                .as_primitive::<arrow_array::types::Int64Type>()
1453                .value(0),
1454            1
1455        );
1456        assert!(batch.column(1).is_null(0));
1457    }
1458
1459    // ── Type mismatch strategies ──────────────────────────────
1460
1461    #[test]
1462    fn test_mismatch_null_strategy() {
1463        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1464        let config = JsonDecoderConfig {
1465            type_mismatch: TypeMismatchStrategy::Null,
1466            ..Default::default()
1467        };
1468        let decoder = JsonDecoder::with_config(schema, config);
1469        let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1470        let batch = decoder.decode_batch(&records).unwrap();
1471
1472        assert!(batch.column(0).is_null(0));
1473        assert_eq!(decoder.mismatch_count(), 1);
1474    }
1475
1476    #[test]
1477    fn test_mismatch_coerce_strategy() {
1478        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1479        let config = JsonDecoderConfig {
1480            type_mismatch: TypeMismatchStrategy::Coerce,
1481            ..Default::default()
1482        };
1483        let decoder = JsonDecoder::with_config(schema, config);
1484        let records = vec![json_record(r#"{"x": "123"}"#)];
1485        let batch = decoder.decode_batch(&records).unwrap();
1486
1487        assert_eq!(
1488            batch
1489                .column(0)
1490                .as_primitive::<arrow_array::types::Int64Type>()
1491                .value(0),
1492            123
1493        );
1494    }
1495
1496    #[test]
1497    fn test_mismatch_reject_strategy() {
1498        let schema = make_schema(vec![("x", DataType::Int64, false)]);
1499        let config = JsonDecoderConfig {
1500            type_mismatch: TypeMismatchStrategy::Reject,
1501            ..Default::default()
1502        };
1503        let decoder = JsonDecoder::with_config(schema, config);
1504        let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1505        let result = decoder.decode_batch(&records);
1506
1507        assert!(result.is_err());
1508        assert!(result.unwrap_err().to_string().contains("type mismatch"));
1509    }
1510
1511    // ── Unknown field strategies ──────────────────────────────
1512
1513    #[test]
1514    fn test_unknown_fields_ignore() {
1515        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1516        let decoder = JsonDecoder::new(schema);
1517        let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1518        let batch = decoder.decode_batch(&records).unwrap();
1519
1520        assert_eq!(batch.num_columns(), 1);
1521        assert_eq!(batch.num_rows(), 1);
1522    }
1523
1524    #[test]
1525    fn test_unknown_fields_reject() {
1526        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1527        let config = JsonDecoderConfig {
1528            unknown_fields: UnknownFieldStrategy::Reject,
1529            ..Default::default()
1530        };
1531        let decoder = JsonDecoder::with_config(schema, config);
1532        let records = vec![json_record(r#"{"a": 1, "unknown": "value"}"#)];
1533        let result = decoder.decode_batch(&records);
1534
1535        assert!(result.is_err());
1536        assert!(result.unwrap_err().to_string().contains("unknown field"));
1537    }
1538
1539    #[test]
1540    fn test_unknown_fields_collect_extra() {
1541        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1542        let config = JsonDecoderConfig {
1543            unknown_fields: UnknownFieldStrategy::CollectExtra,
1544            ..Default::default()
1545        };
1546        let decoder = JsonDecoder::with_config(schema, config);
1547        let records = vec![json_record(r#"{"a": 1, "extra1": "v1", "extra2": 42}"#)];
1548        let batch = decoder.decode_batch(&records).unwrap();
1549
1550        // Schema should have an extra `_extra` column.
1551        assert_eq!(batch.num_columns(), 2);
1552        assert_eq!(batch.schema().field(1).name(), "_extra");
1553        assert!(!batch.column(1).is_null(0));
1554    }
1555
1556    // ── Timestamp parsing ─────────────────────────────────────
1557
1558    #[test]
1559    fn test_decode_timestamp_iso8601() {
1560        let schema = make_schema(vec![(
1561            "ts",
1562            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1563            false,
1564        )]);
1565        let decoder = JsonDecoder::new(schema);
1566        let records = vec![json_record(r#"{"ts": "2025-01-15T10:30:00Z"}"#)];
1567        let batch = decoder.decode_batch(&records).unwrap();
1568
1569        assert!(!batch.column(0).is_null(0));
1570    }
1571
1572    #[test]
1573    fn test_decode_timestamp_epoch_millis() {
1574        let schema = make_schema(vec![(
1575            "ts",
1576            DataType::Timestamp(TimeUnit::Nanosecond, None),
1577            false,
1578        )]);
1579        let decoder = JsonDecoder::new(schema);
1580        let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1581        let batch = decoder.decode_batch(&records).unwrap();
1582
1583        let ts_col = batch
1584            .column(0)
1585            .as_primitive::<arrow_array::types::TimestampNanosecondType>();
1586        // 1705312200000 ms * 1_000_000 = nanos
1587        assert_eq!(ts_col.value(0), 1_705_312_200_000_000_000);
1588    }
1589
1590    #[test]
1591    fn test_decode_out_of_range_float_timestamp_is_rejected() {
1592        // A garbage float epoch-ms must not silently saturate to i64::MAX
1593        // and poison event-time/watermarks — it routes through the
1594        // configured type-mismatch path instead.
1595        let schema = make_schema(vec![(
1596            "ts",
1597            DataType::Timestamp(TimeUnit::Millisecond, None),
1598            false,
1599        )]);
1600        let decoder = JsonDecoder::new(schema);
1601        let records = vec![json_record(r#"{"ts": 1e30}"#)];
1602        let result = decoder.decode_batch(&records);
1603
1604        assert!(result.is_err());
1605        assert!(result.unwrap_err().to_string().contains("out of i64 range"));
1606    }
1607
1608    #[test]
1609    fn test_decode_timestamp_overflow_on_nanosecond_scaling_is_rejected() {
1610        // A ms value that fits i64 but overflows when scaled to nanoseconds
1611        // must error, not wrap into a bogus (watermark-poisoning) timestamp.
1612        let schema = make_schema(vec![(
1613            "ts",
1614            DataType::Timestamp(TimeUnit::Nanosecond, None),
1615            false,
1616        )]);
1617        let decoder = JsonDecoder::new(schema);
1618        let records = vec![json_record(r#"{"ts": 9999999999999999}"#)];
1619        let result = decoder.decode_batch(&records);
1620
1621        assert!(result.is_err());
1622        assert!(result
1623            .unwrap_err()
1624            .to_string()
1625            .contains("out of i64 Nanosecond range"));
1626    }
1627
1628    // ── Nested objects as LargeBinary ─────────────────────────
1629
1630    #[test]
1631    fn test_decode_nested_object_as_json_string() {
1632        let schema = make_schema(vec![("data", DataType::LargeBinary, true)]);
1633        let decoder = JsonDecoder::new(schema);
1634        let records = vec![json_record(r#"{"data": {"nested": true}}"#)];
1635        let batch = decoder.decode_batch(&records).unwrap();
1636
1637        assert!(!batch.column(0).is_null(0));
1638    }
1639
1640    // ── Error cases ───────────────────────────────────────────
1641
1642    #[test]
1643    fn test_decode_invalid_json() {
1644        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1645        let decoder = JsonDecoder::new(schema);
1646        let records = vec![RawRecord::new(b"not json".to_vec())];
1647        let result = decoder.decode_batch(&records);
1648
1649        assert!(result.is_err());
1650        assert!(result.unwrap_err().to_string().contains("JSON parse error"));
1651    }
1652
1653    #[test]
1654    fn test_decode_non_object_json() {
1655        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1656        let decoder = JsonDecoder::new(schema);
1657        let records = vec![json_record("[1, 2, 3]")];
1658        let result = decoder.decode_batch(&records);
1659
1660        assert!(result.is_err());
1661        assert!(result
1662            .unwrap_err()
1663            .to_string()
1664            .contains("must be an object"));
1665    }
1666
1667    // ── FormatDecoder trait ───────────────────────────────────
1668
1669    #[test]
1670    fn test_format_name() {
1671        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1672        let decoder = JsonDecoder::new(schema);
1673        assert_eq!(decoder.format_name(), "json");
1674    }
1675
1676    #[test]
1677    fn test_output_schema() {
1678        let schema = make_schema(vec![
1679            ("a", DataType::Int64, false),
1680            ("b", DataType::Utf8, true),
1681        ]);
1682        let decoder = JsonDecoder::new(schema.clone());
1683        assert_eq!(decoder.output_schema(), schema);
1684    }
1685
1686    #[test]
1687    fn test_decode_one() {
1688        let schema = make_schema(vec![("x", DataType::Int64, false)]);
1689        let decoder = JsonDecoder::new(schema);
1690        let record = json_record(r#"{"x": 99}"#);
1691        let batch = decoder.decode_one(&record).unwrap();
1692        assert_eq!(batch.num_rows(), 1);
1693        assert_eq!(
1694            batch
1695                .column(0)
1696                .as_primitive::<arrow_array::types::Int64Type>()
1697                .value(0),
1698            99
1699        );
1700    }
1701
1702    // ── Int/Float numeric coercion ────────────────────────────
1703
1704    #[test]
1705    fn test_decode_int_from_float_json() {
1706        // JSON number 42.0 is parsed as f64 by serde_json. With the default
1707        // Coerce strategy, it is coerced to Int64 = 42.
1708        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1709        let decoder = JsonDecoder::new(schema);
1710        let records = vec![json_record(r#"{"x": 42.0}"#)];
1711        let batch = decoder.decode_batch(&records).unwrap();
1712        assert_eq!(
1713            batch
1714                .column(0)
1715                .as_primitive::<arrow_array::types::Int64Type>()
1716                .value(0),
1717            42
1718        );
1719    }
1720
1721    #[test]
1722    fn test_decode_float_from_int_json() {
1723        // JSON integer 42 should decode as Float64 = 42.0.
1724        let schema = make_schema(vec![("x", DataType::Float64, false)]);
1725        let decoder = JsonDecoder::new(schema);
1726        let records = vec![json_record(r#"{"x": 42}"#)];
1727        let batch = decoder.decode_batch(&records).unwrap();
1728        let val = batch
1729            .column(0)
1730            .as_primitive::<arrow_array::types::Float64Type>()
1731            .value(0);
1732        assert!((val - 42.0).abs() < f64::EPSILON);
1733    }
1734
1735    // ── Coercion tests (string→numeric, int→float, etc.) ────
1736
1737    #[test]
1738    fn test_decode_string_number_to_float64() {
1739        let schema = make_schema(vec![("price", DataType::Float64, false)]);
1740        let decoder = JsonDecoder::new(schema);
1741        let records = vec![json_record(r#"{"price": "187.52"}"#)];
1742        let batch = decoder.decode_batch(&records).unwrap();
1743        let val = batch
1744            .column(0)
1745            .as_primitive::<arrow_array::types::Float64Type>()
1746            .value(0);
1747        assert!((val - 187.52).abs() < f64::EPSILON);
1748    }
1749
1750    #[test]
1751    fn test_decode_string_to_int() {
1752        let schema = make_schema(vec![("qty", DataType::Int32, false)]);
1753        let decoder = JsonDecoder::new(schema);
1754        let records = vec![json_record(r#"{"qty": "100"}"#)];
1755        let batch = decoder.decode_batch(&records).unwrap();
1756        assert_eq!(
1757            batch
1758                .column(0)
1759                .as_primitive::<arrow_array::types::Int32Type>()
1760                .value(0),
1761            100
1762        );
1763    }
1764
1765    #[test]
1766    fn test_decode_epoch_millis_to_timestamp_millis() {
1767        let schema = make_schema(vec![(
1768            "ts",
1769            DataType::Timestamp(TimeUnit::Millisecond, None),
1770            false,
1771        )]);
1772        let decoder = JsonDecoder::new(schema);
1773        let records = vec![json_record(r#"{"ts": 1705312200000}"#)];
1774        let batch = decoder.decode_batch(&records).unwrap();
1775        let ts_col = batch
1776            .column(0)
1777            .as_primitive::<arrow_array::types::TimestampMillisecondType>();
1778        assert_eq!(ts_col.value(0), 1_705_312_200_000);
1779    }
1780
1781    #[test]
1782    fn test_decode_int_to_float_promotion() {
1783        let schema = make_schema(vec![("val", DataType::Float64, false)]);
1784        let decoder = JsonDecoder::new(schema);
1785        let records = vec![json_record(r#"{"val": 100}"#)];
1786        let batch = decoder.decode_batch(&records).unwrap();
1787        let val = batch
1788            .column(0)
1789            .as_primitive::<arrow_array::types::Float64Type>()
1790            .value(0);
1791        assert!((val - 100.0).abs() < f64::EPSILON);
1792    }
1793
1794    #[test]
1795    fn test_decode_string_boolean() {
1796        let schema = make_schema(vec![("active", DataType::Boolean, false)]);
1797        let decoder = JsonDecoder::new(schema);
1798        let records = vec![json_record(r#"{"active": "true"}"#)];
1799        let batch = decoder.decode_batch(&records).unwrap();
1800        assert!(batch.column(0).as_boolean().value(0));
1801    }
1802
1803    #[test]
1804    fn test_coerce_fails_on_unconvertible() {
1805        // With default Coerce, a string that can't be parsed as Int64 should error.
1806        let schema = make_schema(vec![("x", DataType::Int64, true)]);
1807        let decoder = JsonDecoder::new(schema);
1808        let records = vec![json_record(r#"{"x": "not_a_number"}"#)];
1809        let result = decoder.decode_batch(&records);
1810        assert!(result.is_err());
1811        assert!(result
1812            .unwrap_err()
1813            .to_string()
1814            .contains("type coercion failed"));
1815    }
1816
1817    #[test]
1818    fn test_enforcement_str_parsing() {
1819        assert_eq!(
1820            TypeMismatchStrategy::from_enforcement_str("coerce"),
1821            Some(TypeMismatchStrategy::Coerce)
1822        );
1823        assert_eq!(
1824            TypeMismatchStrategy::from_enforcement_str("STRICT"),
1825            Some(TypeMismatchStrategy::Reject)
1826        );
1827        assert_eq!(
1828            TypeMismatchStrategy::from_enforcement_str("Permissive"),
1829            Some(TypeMismatchStrategy::Null)
1830        );
1831        assert_eq!(TypeMismatchStrategy::from_enforcement_str("unknown"), None);
1832    }
1833
1834    // ── Small integer types ──────────────────────────────────
1835
1836    #[test]
1837    fn test_decode_i8_and_u8() {
1838        let schema = make_schema(vec![
1839            ("signed", DataType::Int8, false),
1840            ("unsigned", DataType::UInt8, false),
1841        ]);
1842        let decoder = JsonDecoder::new(schema);
1843        let records = vec![json_record(r#"{"signed": -5, "unsigned": 200}"#)];
1844        let batch = decoder.decode_batch(&records).unwrap();
1845        assert_eq!(
1846            batch
1847                .column(0)
1848                .as_primitive::<arrow_array::types::Int8Type>()
1849                .value(0),
1850            -5
1851        );
1852        assert_eq!(
1853            batch
1854                .column(1)
1855                .as_primitive::<arrow_array::types::UInt8Type>()
1856                .value(0),
1857            200
1858        );
1859    }
1860
1861    // ── json.path tests ─────────────────────────────────────────
1862
1863    #[test]
1864    fn test_json_path_single() {
1865        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1866        let config = JsonDecoderConfig {
1867            json_path: Some(vec!["data".into()]),
1868            ..Default::default()
1869        };
1870        let decoder = JsonDecoder::with_config(schema, config);
1871        let records = vec![json_record(r#"{"data":{"id":1}}"#)];
1872        let batch = decoder.decode_batch(&records).unwrap();
1873        assert_eq!(batch.num_rows(), 1);
1874        assert_eq!(
1875            batch
1876                .column(0)
1877                .as_primitive::<arrow_array::types::Int64Type>()
1878                .value(0),
1879            1
1880        );
1881    }
1882
1883    #[test]
1884    fn test_json_path_multi() {
1885        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1886        let config = JsonDecoderConfig {
1887            json_path: Some(vec!["a".into(), "b".into()]),
1888            ..Default::default()
1889        };
1890        let decoder = JsonDecoder::with_config(schema, config);
1891        let records = vec![json_record(r#"{"a":{"b":{"id":1}}}"#)];
1892        let batch = decoder.decode_batch(&records).unwrap();
1893        assert_eq!(
1894            batch
1895                .column(0)
1896                .as_primitive::<arrow_array::types::Int64Type>()
1897                .value(0),
1898            1
1899        );
1900    }
1901
1902    /// Records that don't carry the configured json.path are skipped, not
1903    /// erroring the batch. Real-world: WebSocket sources receive subscribe
1904    /// acks and control frames mixed with data frames; failing the batch
1905    /// would poison the source's input buffer.
1906    #[test]
1907    fn test_json_path_missing_skips_record() {
1908        let schema = make_schema(vec![("id", DataType::Int64, false)]);
1909        let config = JsonDecoderConfig {
1910            json_path: Some(vec!["data".into()]),
1911            ..Default::default()
1912        };
1913        let decoder = JsonDecoder::with_config(schema, config);
1914        let records = vec![
1915            json_record(r#"{"success":true,"op":"subscribe"}"#), // ack — no `data`
1916            json_record(r#"{"data":{"id":42}}"#),
1917        ];
1918        let batch = decoder.decode_batch(&records).unwrap();
1919        assert_eq!(batch.num_rows(), 1);
1920        assert_eq!(
1921            batch
1922                .column(0)
1923                .as_primitive::<arrow_array::types::Int64Type>()
1924                .value(0),
1925            42
1926        );
1927    }
1928
1929    // ── json.column.* tests ─────────────────────────────────────
1930
1931    #[test]
1932    fn test_json_column_custom_path() {
1933        let schema = make_schema(vec![
1934            ("p", DataType::Float64, false),
1935            ("stream_name", DataType::Utf8, true),
1936        ]);
1937        let mut col_paths = HashMap::new();
1938        col_paths.insert("stream_name".into(), vec!["stream".into()]);
1939        let config = JsonDecoderConfig {
1940            json_path: Some(vec!["data".into()]),
1941            json_column_paths: col_paths,
1942            ..Default::default()
1943        };
1944        let decoder = JsonDecoder::with_config(schema, config);
1945        let records = vec![json_record(
1946            r#"{"stream":"btcusdt@trade","data":{"p":67523.0}}"#,
1947        )];
1948        let batch = decoder.decode_batch(&records).unwrap();
1949        assert_eq!(batch.num_rows(), 1);
1950        let price = batch
1951            .column(0)
1952            .as_primitive::<arrow_array::types::Float64Type>()
1953            .value(0);
1954        assert!((price - 67523.0).abs() < f64::EPSILON);
1955        assert_eq!(batch.column(1).as_string::<i32>().value(0), "btcusdt@trade");
1956    }
1957
1958    #[test]
1959    fn test_json_column_deep_path() {
1960        let schema = make_schema(vec![
1961            ("p", DataType::Float64, false),
1962            ("ts", DataType::Int64, true),
1963        ]);
1964        let mut col_paths = HashMap::new();
1965        col_paths.insert("ts".into(), vec!["meta".into(), "timestamp".into()]);
1966        let config = JsonDecoderConfig {
1967            json_path: Some(vec!["data".into()]),
1968            json_column_paths: col_paths,
1969            ..Default::default()
1970        };
1971        let decoder = JsonDecoder::with_config(schema, config);
1972        let records = vec![json_record(
1973            r#"{"meta":{"timestamp":123456},"data":{"p":99.5}}"#,
1974        )];
1975        let batch = decoder.decode_batch(&records).unwrap();
1976        assert_eq!(batch.num_rows(), 1);
1977        assert_eq!(
1978            batch
1979                .column(1)
1980                .as_primitive::<arrow_array::types::Int64Type>()
1981                .value(0),
1982            123456
1983        );
1984    }
1985
1986    #[test]
1987    fn test_json_column_missing_returns_null() {
1988        let schema = make_schema(vec![
1989            ("id", DataType::Int64, false),
1990            ("missing_col", DataType::Utf8, true),
1991        ]);
1992        let mut col_paths = HashMap::new();
1993        col_paths.insert("missing_col".into(), vec!["nowhere".into(), "gone".into()]);
1994        let config = JsonDecoderConfig {
1995            json_column_paths: col_paths,
1996            ..Default::default()
1997        };
1998        let decoder = JsonDecoder::with_config(schema, config);
1999        let records = vec![json_record(r#"{"id":42}"#)];
2000        let batch = decoder.decode_batch(&records).unwrap();
2001        assert_eq!(
2002            batch
2003                .column(0)
2004                .as_primitive::<arrow_array::types::Int64Type>()
2005                .value(0),
2006            42
2007        );
2008        assert!(batch.column(1).is_null(0));
2009    }
2010
2011    #[test]
2012    fn test_json_column_path_to_nested_string_array() {
2013        use arrow_array::Array;
2014        use arrow_schema::Field;
2015        let schema = make_schema(vec![(
2016            "tags",
2017            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2018            true,
2019        )]);
2020        let mut col_paths = HashMap::new();
2021        col_paths.insert("tags".into(), vec!["record".into(), "tags".into()]);
2022        let config = JsonDecoderConfig {
2023            json_column_paths: col_paths,
2024            ..Default::default()
2025        };
2026        let decoder = JsonDecoder::with_config(schema, config);
2027        let records = vec![json_record(r#"{"record":{"tags":["en","es"]}}"#)];
2028        let batch = decoder.decode_batch(&records).unwrap();
2029
2030        let list = batch
2031            .column(0)
2032            .as_any()
2033            .downcast_ref::<arrow_array::ListArray>()
2034            .expect("List column");
2035        let vals = list.value(0);
2036        let strs = vals.as_string::<i32>();
2037        assert_eq!(strs.len(), 2);
2038        assert_eq!(strs.value(0), "en");
2039        assert_eq!(strs.value(1), "es");
2040    }
2041
2042    #[test]
2043    fn test_list_column_non_array_honors_reject_strategy() {
2044        use arrow_schema::Field;
2045        let schema = make_schema(vec![(
2046            "tags",
2047            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2048            true,
2049        )]);
2050        let config = JsonDecoderConfig {
2051            type_mismatch: TypeMismatchStrategy::Reject,
2052            ..Default::default()
2053        };
2054        let decoder = JsonDecoder::with_config(schema, config);
2055        // A scalar where the list column expects an array must be rejected, not
2056        // silently coerced to NULL.
2057        let records = vec![json_record(r#"{"tags": "en"}"#)];
2058        let result = decoder.decode_batch(&records);
2059
2060        assert!(result.is_err());
2061        assert!(result.unwrap_err().to_string().contains("type mismatch"));
2062    }
2063
2064    // ── json.explode tests ──────────────────────────────────────
2065
2066    #[test]
2067    fn test_json_explode_arrays() {
2068        let schema = make_schema(vec![
2069            ("price", DataType::Utf8, true),
2070            ("qty", DataType::Utf8, true),
2071        ]);
2072        let config = JsonDecoderConfig {
2073            json_explode: Some(vec!["price".into(), "qty".into()]),
2074            ..Default::default()
2075        };
2076        let decoder = JsonDecoder::with_config(schema, config);
2077        let records = vec![json_record(r#"[["67523","1.5"],["67522","0.8"]]"#)];
2078        let batch = decoder.decode_batch(&records).unwrap();
2079        assert_eq!(batch.num_rows(), 2);
2080        assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
2081        assert_eq!(batch.column(0).as_string::<i32>().value(1), "67522");
2082        assert_eq!(batch.column(1).as_string::<i32>().value(0), "1.5");
2083        assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
2084    }
2085
2086    #[test]
2087    fn test_json_explode_objects() {
2088        let schema = make_schema(vec![
2089            ("id", DataType::Int64, true),
2090            ("name", DataType::Utf8, true),
2091        ]);
2092        let config = JsonDecoderConfig {
2093            json_explode: Some(vec!["id".into(), "name".into()]),
2094            ..Default::default()
2095        };
2096        let decoder = JsonDecoder::with_config(schema, config);
2097        let records = vec![json_record(
2098            r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#,
2099        )];
2100        let batch = decoder.decode_batch(&records).unwrap();
2101        assert_eq!(batch.num_rows(), 2);
2102        assert_eq!(
2103            batch
2104                .column(0)
2105                .as_primitive::<arrow_array::types::Int64Type>()
2106                .value(0),
2107            1
2108        );
2109        assert_eq!(batch.column(1).as_string::<i32>().value(1), "Bob");
2110    }
2111
2112    // ── Combined json.path + json.explode ───────────────────────
2113
2114    #[test]
2115    fn test_json_path_plus_explode() {
2116        let schema = make_schema(vec![
2117            ("price", DataType::Utf8, true),
2118            ("qty", DataType::Utf8, true),
2119        ]);
2120        let config = JsonDecoderConfig {
2121            json_path: Some(vec!["bids".into()]),
2122            json_explode: Some(vec!["price".into(), "qty".into()]),
2123            ..Default::default()
2124        };
2125        let decoder = JsonDecoder::with_config(schema, config);
2126        let records = vec![json_record(r#"{"bids":[["67523","1.5"],["67522","0.8"]]}"#)];
2127        let batch = decoder.decode_batch(&records).unwrap();
2128        assert_eq!(batch.num_rows(), 2);
2129        assert_eq!(batch.column(0).as_string::<i32>().value(0), "67523");
2130        assert_eq!(batch.column(1).as_string::<i32>().value(1), "0.8");
2131    }
2132
2133    // ── from_connector_config tests ─────────────────────────────
2134
2135    #[test]
2136    fn test_from_connector_config() {
2137        let mut config = crate::config::ConnectorConfig::new("websocket");
2138        config.set("json.path", "data.trade");
2139        config.set("json.column.stream_name", "stream");
2140        config.set("json.column.ts", "meta.timestamp");
2141        config.set("json.explode", "price, qty");
2142        config.set("schema.enforcement", "strict");
2143        config.set("nested.as.jsonb", "true");
2144
2145        let cfg = JsonDecoderConfig::from_connector_config(&config);
2146        assert_eq!(cfg.json_path, Some(vec!["data".into(), "trade".into()]));
2147        assert_eq!(
2148            cfg.json_column_paths.get("stream_name"),
2149            Some(&vec!["stream".into()])
2150        );
2151        assert_eq!(
2152            cfg.json_column_paths.get("ts"),
2153            Some(&vec!["meta".into(), "timestamp".into()])
2154        );
2155        assert_eq!(cfg.json_explode, Some(vec!["price".into(), "qty".into()]));
2156        assert_eq!(cfg.type_mismatch, TypeMismatchStrategy::Reject);
2157        assert!(cfg.nested_as_jsonb);
2158    }
2159
2160    #[test]
2161    fn test_default_config_unchanged() {
2162        let schema = make_schema(vec![
2163            ("a", DataType::Int64, false),
2164            ("b", DataType::Utf8, true),
2165        ]);
2166        let decoder = JsonDecoder::new(schema);
2167        let records = vec![
2168            json_record(r#"{"a": 1, "b": "hello"}"#),
2169            json_record(r#"{"a": 2, "b": "world"}"#),
2170        ];
2171        let batch = decoder.decode_batch(&records).unwrap();
2172        assert_eq!(batch.num_rows(), 2);
2173        assert_eq!(
2174            batch
2175                .column(0)
2176                .as_primitive::<arrow_array::types::Int64Type>()
2177                .value(0),
2178            1
2179        );
2180        assert_eq!(batch.column(1).as_string::<i32>().value(1), "world");
2181    }
2182
2183    #[test]
2184    fn test_unknown_fields_with_path() {
2185        let schema = make_schema(vec![("a", DataType::Int64, false)]);
2186        let config = JsonDecoderConfig {
2187            json_path: Some(vec!["data".into()]),
2188            unknown_fields: UnknownFieldStrategy::CollectExtra,
2189            ..Default::default()
2190        };
2191        let decoder = JsonDecoder::with_config(schema, config);
2192        let records = vec![json_record(r#"{"data":{"a":1,"extra":"value"}}"#)];
2193        let batch = decoder.decode_batch(&records).unwrap();
2194        assert_eq!(batch.num_columns(), 2); // a + _extra
2195        assert_eq!(batch.schema().field(1).name(), "_extra");
2196        assert!(!batch.column(1).is_null(0));
2197    }
2198
2199    // ── Per-column numeric epoch unit ─────────────────────────
2200
2201    #[test]
2202    fn from_connector_config_parses_epoch_unit_without_polluting_paths() {
2203        use crate::config::ConnectorConfig;
2204        let mut props = std::collections::HashMap::new();
2205        props.insert("json.column.evt".to_string(), "time_us".to_string());
2206        props.insert(
2207            "json.column.evt.epoch_unit".to_string(),
2208            "micros".to_string(),
2209        );
2210        let cc = ConnectorConfig::with_properties("websocket", props);
2211        let cfg = JsonDecoderConfig::from_connector_config(&cc);
2212        assert_eq!(
2213            cfg.numeric_timestamp_units.get("evt"),
2214            Some(&EpochUnit::Micros)
2215        );
2216        // The `.epoch_unit` key must NOT become a phantom path column.
2217        assert!(cfg.json_column_paths.contains_key("evt"));
2218        assert!(!cfg.json_column_paths.contains_key("evt.epoch_unit"));
2219    }
2220
2221    #[test]
2222    fn numeric_micros_decodes_into_millis_timestamp_column() {
2223        // End-to-end: a numeric microsecond field (Jetstream time_us shape)
2224        // mapped to a Timestamp(ms) column lands at a wall-clock-plausible
2225        // millisecond value instead of being misread as epoch-millis.
2226        let schema = make_schema(vec![(
2227            "ts",
2228            DataType::Timestamp(TimeUnit::Millisecond, None),
2229            false,
2230        )]);
2231        let mut units = HashMap::new();
2232        units.insert("ts".to_string(), EpochUnit::Micros);
2233        let config = JsonDecoderConfig {
2234            numeric_timestamp_units: units,
2235            ..Default::default()
2236        };
2237        let decoder = JsonDecoder::with_config(schema, config);
2238        // 1_779_019_200_123_456 µs == 1_779_019_200_123 ms (~2026-05-17).
2239        let records = vec![json_record(r#"{"ts": 1779019200123456}"#)];
2240        let batch = decoder.decode_batch(&records).unwrap();
2241        let col = batch
2242            .column(0)
2243            .as_primitive::<arrow_array::types::TimestampMillisecondType>();
2244        assert_eq!(col.value(0), 1_779_019_200_123);
2245
2246        // Without the override the same number is (legacy) read as millis.
2247        let schema2 = make_schema(vec![(
2248            "ts",
2249            DataType::Timestamp(TimeUnit::Millisecond, None),
2250            false,
2251        )]);
2252        let d2 = JsonDecoder::new(schema2);
2253        let b2 = d2.decode_batch(&records).unwrap();
2254        let c2 = b2
2255            .column(0)
2256            .as_primitive::<arrow_array::types::TimestampMillisecondType>();
2257        assert_eq!(c2.value(0), 1_779_019_200_123_456);
2258    }
2259}