Skip to main content

laminar_connectors/schema/csv/
decoder.rs

1//! CSV format decoder implementing [`FormatDecoder`].
2//!
3//! Converts raw CSV byte payloads into Arrow `RecordBatch`es.
4//! Constructed once at `CREATE SOURCE` time with a frozen Arrow schema
5//! and CSV format configuration. The decoder is stateless after
6//! construction so the Ring 1 hot path has zero configuration lookups.
7//!
8//! Uses the `csv` crate's `ByteRecord` API for zero-copy field access
9//! where possible. Type coercion (string → int, string → timestamp, etc.)
10//! is performed during the Arrow builder append phase.
11
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14
15use arrow_array::builder::{
16    BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
17    TimestampNanosecondBuilder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, SchemaRef, TimeUnit};
21
22use crate::schema::error::{SchemaError, SchemaResult};
23use crate::schema::traits::FormatDecoder;
24use crate::schema::types::RawRecord;
25
26/// Strategy for rows with incorrect field count.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum FieldCountMismatchStrategy {
29    /// Pad missing fields with null, ignore extra fields. Default.
30    Null,
31    /// Skip the malformed row entirely.
32    Skip,
33    /// Return a decode error on the first malformed row.
34    Reject,
35}
36
37/// CSV decoder configuration.
38///
39/// Maps directly to the SQL `FORMAT CSV (...)` options.
40/// All fields have sensible defaults matching RFC 4180.
41#[derive(Debug, Clone)]
42pub struct CsvDecoderConfig {
43    /// Field delimiter character. Default: `','` (comma).
44    /// Common alternatives: `'\t'` (tab), `'|'` (pipe), `';'` (semicolon).
45    pub delimiter: u8,
46
47    /// Quote character for fields containing delimiters or newlines.
48    /// Default: `'"'` (double quote). Set to `None` to disable quoting.
49    pub quote: Option<u8>,
50
51    /// Escape character within quoted fields.
52    /// Default: `None` (RFC 4180 uses doubled quote chars for escaping).
53    /// Set to `Some(b'\\')` for backslash-escaped CSVs.
54    pub escape: Option<u8>,
55
56    /// Whether the first row is a header row with column names.
57    /// Default: `true`.
58    pub has_header: bool,
59
60    /// String value to interpret as SQL NULL.
61    /// Default: `""` (empty string). Common alternatives: `"NA"`, `"null"`, `"\\N"`.
62    pub null_string: String,
63
64    /// Comment line prefix. Lines starting with this character are skipped.
65    /// Default: `None` (no comment support).
66    pub comment: Option<u8>,
67
68    /// Number of rows to skip at the beginning of the data (after header).
69    /// Default: `0`.
70    pub skip_rows: usize,
71
72    /// Timestamp format pattern for parsing timestamp columns.
73    /// Default: `"%Y-%m-%d %H:%M:%S%.f"`.
74    pub timestamp_format: String,
75
76    /// Date format pattern for parsing date columns.
77    /// Default: `"%Y-%m-%d"`.
78    pub date_format: String,
79
80    /// How to handle rows with wrong number of fields.
81    /// Default: `Null` (pad missing fields with null, truncate extra).
82    pub field_count_mismatch: FieldCountMismatchStrategy,
83}
84
85impl Default for CsvDecoderConfig {
86    fn default() -> Self {
87        Self {
88            delimiter: b',',
89            quote: Some(b'"'),
90            escape: None,
91            has_header: true,
92            null_string: String::new(),
93            comment: None,
94            skip_rows: 0,
95            timestamp_format: "%Y-%m-%d %H:%M:%S%.f".into(),
96            date_format: "%Y-%m-%d".into(),
97            field_count_mismatch: FieldCountMismatchStrategy::Null,
98        }
99    }
100}
101
102/// Pre-computed coercion strategy for a single CSV column.
103#[derive(Debug, Clone)]
104enum CsvCoercion {
105    /// Parse as boolean (`"true"`/`"false"`, `"1"`/`"0"`, `"yes"`/`"no"`).
106    Boolean,
107    /// Parse as i64.
108    Int64,
109    /// Parse as f64.
110    Float64,
111    /// Parse as `Timestamp(Nanosecond, UTC)` using the configured format.
112    Timestamp(String),
113    /// Parse as `Date32` using the configured format.
114    Date(String),
115    /// No coercion needed — keep as UTF-8 string.
116    Utf8,
117}
118
119/// Decodes CSV byte payloads into Arrow `RecordBatch`es.
120///
121/// # Ring Placement
122///
123/// - **Ring 1**: `decode_batch()` — parse CSV, build columnar Arrow output
124/// - **Ring 2**: Construction (`new` / `with_config`) — one-time setup
125pub struct CsvDecoder {
126    /// Frozen output schema.
127    schema: SchemaRef,
128    /// CSV format configuration.
129    config: CsvDecoderConfig,
130    /// Per-column type coercion functions, indexed by column position.
131    /// Pre-computed at construction time to avoid per-record dispatch.
132    coercions: Vec<CsvCoercion>,
133    /// Cumulative count of parse errors (for diagnostics).
134    parse_error_count: AtomicU64,
135}
136
137#[allow(clippy::missing_fields_in_debug)]
138impl std::fmt::Debug for CsvDecoder {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct("CsvDecoder")
141            .field("schema", &self.schema)
142            .field("config", &self.config)
143            .field(
144                "parse_error_count",
145                &self.parse_error_count.load(Ordering::Relaxed),
146            )
147            .finish()
148    }
149}
150
151impl CsvDecoder {
152    /// Creates a new CSV decoder for the given Arrow schema with default config.
153    #[must_use]
154    pub fn new(schema: SchemaRef) -> Self {
155        Self::with_config(schema, CsvDecoderConfig::default())
156    }
157
158    /// Creates a new CSV decoder with custom configuration.
159    #[must_use]
160    pub fn with_config(schema: SchemaRef, config: CsvDecoderConfig) -> Self {
161        let coercions: Vec<CsvCoercion> = schema
162            .fields()
163            .iter()
164            .map(|field| Self::coercion_for_type(field.data_type(), &config))
165            .collect();
166
167        Self {
168            schema,
169            config,
170            coercions,
171            parse_error_count: AtomicU64::new(0),
172        }
173    }
174
175    /// Returns the cumulative parse error count.
176    pub fn parse_error_count(&self) -> u64 {
177        self.parse_error_count.load(Ordering::Relaxed)
178    }
179
180    /// Determines the coercion strategy for an Arrow data type.
181    fn coercion_for_type(data_type: &DataType, config: &CsvDecoderConfig) -> CsvCoercion {
182        match data_type {
183            DataType::Boolean => CsvCoercion::Boolean,
184            DataType::Int8
185            | DataType::Int16
186            | DataType::Int32
187            | DataType::Int64
188            | DataType::UInt8
189            | DataType::UInt16
190            | DataType::UInt32
191            | DataType::UInt64 => CsvCoercion::Int64,
192            DataType::Float16 | DataType::Float32 | DataType::Float64 => CsvCoercion::Float64,
193            DataType::Timestamp(_, _) => CsvCoercion::Timestamp(config.timestamp_format.clone()),
194            DataType::Date32 | DataType::Date64 => CsvCoercion::Date(config.date_format.clone()),
195            _ => CsvCoercion::Utf8,
196        }
197    }
198
199    /// Builds a `csv::ReaderBuilder` from the decoder config.
200    fn make_reader_builder(&self) -> csv::ReaderBuilder {
201        let mut rb = csv::ReaderBuilder::new();
202        rb.delimiter(self.config.delimiter)
203            .has_headers(false) // We handle headers ourselves
204            .flexible(true); // Allow variable field counts
205
206        if let Some(q) = self.config.quote {
207            rb.quote(q);
208        }
209        if let Some(e) = self.config.escape {
210            rb.escape(Some(e));
211        }
212        if let Some(c) = self.config.comment {
213            rb.comment(Some(c));
214        }
215
216        rb
217    }
218}
219
220impl FormatDecoder for CsvDecoder {
221    fn output_schema(&self) -> SchemaRef {
222        self.schema.clone()
223    }
224
225    /// Decodes a batch of raw CSV records into an Arrow `RecordBatch`.
226    ///
227    /// Each `RawRecord.value` contains one or more CSV lines (typically one
228    /// line per record for streaming sources; may contain multiple lines
229    /// for file-based sources).
230    ///
231    /// # Algorithm
232    ///
233    /// 1. Initialize one Arrow `ArrayBuilder` per schema column.
234    /// 2. Concatenate all raw record bytes into a single buffer.
235    /// 3. Create a `csv::Reader` with pre-configured settings.
236    /// 4. For each CSV row:
237    ///    - Skip rows per `skip_rows` config.
238    ///    - For each field: apply the pre-computed `CsvCoercion`.
239    ///    - Handle field count mismatches per config.
240    /// 5. Finish all builders and assemble into `RecordBatch`.
241    fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
242        if records.is_empty() {
243            return Ok(RecordBatch::new_empty(self.schema.clone()));
244        }
245
246        let num_fields = self.schema.fields().len();
247        let capacity = records.len();
248
249        // Initialize one builder per schema column.
250        let mut builders = create_builders(&self.schema, capacity);
251
252        // Concatenate all raw record bytes, ensuring newline separation.
253        let mut combined = Vec::with_capacity(records.iter().map(|r| r.value.len() + 1).sum());
254        for record in records {
255            combined.extend_from_slice(&record.value);
256            if !record.value.ends_with(b"\n") {
257                combined.push(b'\n');
258            }
259        }
260
261        let rb = self.make_reader_builder();
262        let mut reader = rb.from_reader(combined.as_slice());
263
264        let mut rows_skipped = 0usize;
265        let mut header_skipped = false;
266        let mut row_count = 0usize;
267
268        let mut byte_record = csv::ByteRecord::new();
269        while reader
270            .read_byte_record(&mut byte_record)
271            .map_err(|e| SchemaError::DecodeError(format!("CSV parse error: {e}")))?
272        {
273            // Skip header row if configured.
274            if self.config.has_header && !header_skipped {
275                header_skipped = true;
276                continue;
277            }
278
279            // Skip initial data rows per config.
280            if rows_skipped < self.config.skip_rows {
281                rows_skipped += 1;
282                continue;
283            }
284
285            let field_count = byte_record.len();
286
287            // Handle field count mismatch.
288            if field_count != num_fields {
289                match self.config.field_count_mismatch {
290                    FieldCountMismatchStrategy::Reject => {
291                        return Err(SchemaError::DecodeError(format!(
292                            "field count mismatch: expected {num_fields}, got {field_count}"
293                        )));
294                    }
295                    FieldCountMismatchStrategy::Skip => {
296                        self.parse_error_count.fetch_add(1, Ordering::Relaxed);
297                        continue;
298                    }
299                    FieldCountMismatchStrategy::Null => {
300                        // Will pad/truncate below.
301                    }
302                }
303            }
304
305            // Process each column.
306            for col_idx in 0..num_fields {
307                if col_idx >= field_count {
308                    // Missing field — append null.
309                    append_null(&mut builders[col_idx]);
310                    continue;
311                }
312
313                let raw_field = &byte_record[col_idx];
314                let field_str = std::str::from_utf8(raw_field).unwrap_or("");
315                let trimmed = field_str.trim();
316
317                // Check for null string.
318                if trimmed == self.config.null_string {
319                    append_null(&mut builders[col_idx]);
320                    continue;
321                }
322
323                // Apply coercion.
324                let ok = append_coerced(&mut builders[col_idx], &self.coercions[col_idx], trimmed);
325
326                if !ok {
327                    self.parse_error_count.fetch_add(1, Ordering::Relaxed);
328                    append_null(&mut builders[col_idx]);
329                }
330            }
331
332            row_count += 1;
333        }
334
335        // If no data rows were processed, return empty batch.
336        if row_count == 0 {
337            return Ok(RecordBatch::new_empty(self.schema.clone()));
338        }
339
340        // Finish all builders into arrays.
341        let columns: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
342
343        RecordBatch::try_new(self.schema.clone(), columns)
344            .map_err(|e| SchemaError::DecodeError(format!("RecordBatch construction: {e}")))
345    }
346
347    #[allow(clippy::unnecessary_literal_bound)]
348    fn format_name(&self) -> &str {
349        "csv"
350    }
351}
352
353// ── Builder helpers ────────────────────────────────────────────────
354
355/// Trait-object wrapper so we can store heterogeneous builders in a `Vec`.
356trait ColumnBuilder: Send {
357    fn finish(&mut self) -> ArrayRef;
358    fn append_null_value(&mut self);
359    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
360}
361
362macro_rules! impl_column_builder {
363    ($builder:ty) => {
364        impl ColumnBuilder for $builder {
365            fn finish(&mut self) -> ArrayRef {
366                Arc::new(<$builder>::finish(self))
367            }
368            fn append_null_value(&mut self) {
369                self.append_null();
370            }
371            fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
372                self
373            }
374        }
375    };
376}
377
378impl_column_builder!(BooleanBuilder);
379impl_column_builder!(Int64Builder);
380impl_column_builder!(Float64Builder);
381impl_column_builder!(StringBuilder);
382impl_column_builder!(TimestampNanosecondBuilder);
383impl_column_builder!(Date32Builder);
384
385fn create_builders(schema: &SchemaRef, capacity: usize) -> Vec<Box<dyn ColumnBuilder>> {
386    schema
387        .fields()
388        .iter()
389        .map(|f| create_builder(f.data_type(), capacity))
390        .collect()
391}
392
393fn create_builder(data_type: &DataType, capacity: usize) -> Box<dyn ColumnBuilder> {
394    match data_type {
395        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
396        DataType::Int8
397        | DataType::Int16
398        | DataType::Int32
399        | DataType::Int64
400        | DataType::UInt8
401        | DataType::UInt16
402        | DataType::UInt32
403        | DataType::UInt64 => Box::new(Int64Builder::with_capacity(capacity)),
404        DataType::Float16 | DataType::Float32 | DataType::Float64 => {
405            Box::new(Float64Builder::with_capacity(capacity))
406        }
407        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
408            let builder =
409                TimestampNanosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
410            Box::new(builder)
411        }
412        DataType::Date32 | DataType::Date64 => Box::new(Date32Builder::with_capacity(capacity)),
413        // Fallback: store as UTF-8 string.
414        _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)),
415    }
416}
417
418fn append_null(builder: &mut Box<dyn ColumnBuilder>) {
419    builder.append_null_value();
420}
421
422/// Appends a coerced value to the appropriate builder. Returns `true` on
423/// success, `false` if the value could not be parsed.
424fn append_coerced(
425    builder: &mut Box<dyn ColumnBuilder>,
426    coercion: &CsvCoercion,
427    value: &str,
428) -> bool {
429    match coercion {
430        CsvCoercion::Boolean => {
431            let b = builder
432                .as_any_mut()
433                .downcast_mut::<BooleanBuilder>()
434                .unwrap();
435            match value.to_ascii_lowercase().as_str() {
436                "true" | "1" | "yes" | "t" | "y" => {
437                    b.append_value(true);
438                    true
439                }
440                "false" | "0" | "no" | "f" | "n" => {
441                    b.append_value(false);
442                    true
443                }
444                _ => false,
445            }
446        }
447        CsvCoercion::Int64 => {
448            let b = builder.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
449            match value.parse::<i64>() {
450                Ok(v) => {
451                    b.append_value(v);
452                    true
453                }
454                Err(_) => false,
455            }
456        }
457        CsvCoercion::Float64 => {
458            let b = builder
459                .as_any_mut()
460                .downcast_mut::<Float64Builder>()
461                .unwrap();
462            match value.parse::<f64>() {
463                Ok(v) => {
464                    b.append_value(v);
465                    true
466                }
467                Err(_) => false,
468            }
469        }
470        CsvCoercion::Timestamp(fmt) => {
471            let b = builder
472                .as_any_mut()
473                .downcast_mut::<TimestampNanosecondBuilder>()
474                .unwrap();
475            // Try the configured format first.
476            if let Ok(ndt) = chrono::NaiveDateTime::parse_from_str(value, fmt) {
477                let nanos = ndt.and_utc().timestamp_nanos_opt().unwrap_or(0);
478                b.append_value(nanos);
479                return true;
480            }
481            // Try ISO 8601 fallback.
482            if let Ok(nanos) = arrow_cast::parse::string_to_timestamp_nanos(value) {
483                b.append_value(nanos);
484                return true;
485            }
486            false
487        }
488        CsvCoercion::Date(fmt) => {
489            let b = builder
490                .as_any_mut()
491                .downcast_mut::<Date32Builder>()
492                .unwrap();
493            if let Ok(date) = chrono::NaiveDate::parse_from_str(value, fmt) {
494                // Date32 stores days since epoch (1970-01-01).
495                let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
496                let days = (date - epoch).num_days();
497                #[allow(clippy::cast_possible_truncation)]
498                {
499                    b.append_value(days as i32);
500                }
501                return true;
502            }
503            false
504        }
505        CsvCoercion::Utf8 => {
506            let b = builder
507                .as_any_mut()
508                .downcast_mut::<StringBuilder>()
509                .unwrap();
510            b.append_value(value);
511            true
512        }
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519    use arrow_array::cast::AsArray;
520    use arrow_schema::{Field, Schema};
521
522    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
523        Arc::new(Schema::new(
524            fields
525                .into_iter()
526                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
527                .collect::<Vec<_>>(),
528        ))
529    }
530
531    fn csv_record(line: &str) -> RawRecord {
532        RawRecord::new(line.as_bytes().to_vec())
533    }
534
535    fn csv_block(lines: &str) -> RawRecord {
536        RawRecord::new(lines.as_bytes().to_vec())
537    }
538
539    // ── Basic decode tests ────────────────────────────────────
540
541    #[test]
542    fn test_decode_empty_batch() {
543        let schema = make_schema(vec![("id", DataType::Int64, false)]);
544        let decoder = CsvDecoder::new(schema.clone());
545        let batch = decoder.decode_batch(&[]).unwrap();
546        assert_eq!(batch.num_rows(), 0);
547        assert_eq!(batch.schema(), schema);
548    }
549
550    #[test]
551    fn test_decode_single_row_with_header() {
552        let schema = make_schema(vec![
553            ("id", DataType::Int64, false),
554            ("name", DataType::Utf8, true),
555        ]);
556        let decoder = CsvDecoder::new(schema);
557        let records = vec![csv_block("id,name\n42,Alice")];
558        let batch = decoder.decode_batch(&records).unwrap();
559
560        assert_eq!(batch.num_rows(), 1);
561        assert_eq!(
562            batch
563                .column(0)
564                .as_primitive::<arrow_array::types::Int64Type>()
565                .value(0),
566            42
567        );
568        assert_eq!(batch.column(1).as_string::<i32>().value(0), "Alice");
569    }
570
571    #[test]
572    fn test_decode_multiple_rows() {
573        let schema = make_schema(vec![
574            ("x", DataType::Int64, false),
575            ("y", DataType::Float64, false),
576        ]);
577        let decoder = CsvDecoder::new(schema);
578        let records = vec![csv_block("x,y\n1,1.5\n2,2.5\n3,3.5")];
579        let batch = decoder.decode_batch(&records).unwrap();
580
581        assert_eq!(batch.num_rows(), 3);
582        let x_col = batch
583            .column(0)
584            .as_primitive::<arrow_array::types::Int64Type>();
585        assert_eq!(x_col.value(0), 1);
586        assert_eq!(x_col.value(1), 2);
587        assert_eq!(x_col.value(2), 3);
588    }
589
590    #[test]
591    fn test_decode_all_types() {
592        let schema = make_schema(vec![
593            ("bool_col", DataType::Boolean, false),
594            ("int_col", DataType::Int64, false),
595            ("float_col", DataType::Float64, false),
596            ("str_col", DataType::Utf8, false),
597        ]);
598        let decoder = CsvDecoder::new(schema);
599        let records = vec![csv_block(
600            "bool_col,int_col,float_col,str_col\ntrue,42,3.14,hello",
601        )];
602        let batch = decoder.decode_batch(&records).unwrap();
603
604        assert_eq!(batch.num_rows(), 1);
605        assert!(batch.column(0).as_boolean().value(0));
606        assert_eq!(
607            batch
608                .column(1)
609                .as_primitive::<arrow_array::types::Int64Type>()
610                .value(0),
611            42
612        );
613        let f = batch
614            .column(2)
615            .as_primitive::<arrow_array::types::Float64Type>()
616            .value(0);
617        assert!((f - 3.14).abs() < f64::EPSILON);
618        assert_eq!(batch.column(3).as_string::<i32>().value(0), "hello");
619    }
620
621    // ── Null handling ─────────────────────────────────────────
622
623    #[test]
624    fn test_decode_null_string_default() {
625        // Default null_string is empty string.
626        let schema = make_schema(vec![
627            ("a", DataType::Int64, true),
628            ("b", DataType::Utf8, true),
629        ]);
630        let decoder = CsvDecoder::new(schema);
631        let records = vec![csv_block("a,b\n,")];
632        let batch = decoder.decode_batch(&records).unwrap();
633
634        assert!(batch.column(0).is_null(0));
635        assert!(batch.column(1).is_null(0));
636    }
637
638    #[test]
639    fn test_decode_null_string_custom() {
640        let schema = make_schema(vec![("val", DataType::Int64, true)]);
641        let config = CsvDecoderConfig {
642            null_string: "NA".into(),
643            ..Default::default()
644        };
645        let decoder = CsvDecoder::with_config(schema, config);
646        let records = vec![csv_block("val\nNA\n42")];
647        let batch = decoder.decode_batch(&records).unwrap();
648
649        assert_eq!(batch.num_rows(), 2);
650        assert!(batch.column(0).is_null(0));
651        assert_eq!(
652            batch
653                .column(0)
654                .as_primitive::<arrow_array::types::Int64Type>()
655                .value(1),
656            42
657        );
658    }
659
660    // ── Field count mismatch strategies ───────────────────────
661
662    #[test]
663    fn test_mismatch_null_strategy() {
664        let schema = make_schema(vec![
665            ("a", DataType::Int64, true),
666            ("b", DataType::Utf8, true),
667            ("c", DataType::Int64, true),
668        ]);
669        let decoder = CsvDecoder::new(schema);
670        // Row only has 2 fields, schema expects 3.
671        let records = vec![csv_block("a,b,c\n1,hello")];
672        let batch = decoder.decode_batch(&records).unwrap();
673
674        assert_eq!(batch.num_rows(), 1);
675        assert_eq!(
676            batch
677                .column(0)
678                .as_primitive::<arrow_array::types::Int64Type>()
679                .value(0),
680            1
681        );
682        assert_eq!(batch.column(1).as_string::<i32>().value(0), "hello");
683        assert!(batch.column(2).is_null(0)); // padded with null
684    }
685
686    #[test]
687    fn test_mismatch_skip_strategy() {
688        let schema = make_schema(vec![
689            ("a", DataType::Int64, false),
690            ("b", DataType::Int64, false),
691        ]);
692        let config = CsvDecoderConfig {
693            field_count_mismatch: FieldCountMismatchStrategy::Skip,
694            ..Default::default()
695        };
696        let decoder = CsvDecoder::with_config(schema, config);
697        // One good row, one bad row (too few fields).
698        let records = vec![csv_block("a,b\n1,2\n3")];
699        let batch = decoder.decode_batch(&records).unwrap();
700
701        assert_eq!(batch.num_rows(), 1); // bad row skipped
702        assert_eq!(
703            batch
704                .column(0)
705                .as_primitive::<arrow_array::types::Int64Type>()
706                .value(0),
707            1
708        );
709    }
710
711    #[test]
712    fn test_mismatch_reject_strategy() {
713        let schema = make_schema(vec![
714            ("a", DataType::Int64, false),
715            ("b", DataType::Int64, false),
716        ]);
717        let config = CsvDecoderConfig {
718            field_count_mismatch: FieldCountMismatchStrategy::Reject,
719            ..Default::default()
720        };
721        let decoder = CsvDecoder::with_config(schema, config);
722        let records = vec![csv_block("a,b\n1")]; // too few fields
723        let result = decoder.decode_batch(&records);
724
725        assert!(result.is_err());
726        assert!(result
727            .unwrap_err()
728            .to_string()
729            .contains("field count mismatch"));
730    }
731
732    // ── Delimiter options ─────────────────────────────────────
733
734    #[test]
735    fn test_pipe_delimiter() {
736        let schema = make_schema(vec![
737            ("a", DataType::Int64, false),
738            ("b", DataType::Utf8, false),
739        ]);
740        let config = CsvDecoderConfig {
741            delimiter: b'|',
742            ..Default::default()
743        };
744        let decoder = CsvDecoder::with_config(schema, config);
745        let records = vec![csv_block("a|b\n42|hello")];
746        let batch = decoder.decode_batch(&records).unwrap();
747
748        assert_eq!(
749            batch
750                .column(0)
751                .as_primitive::<arrow_array::types::Int64Type>()
752                .value(0),
753            42
754        );
755        assert_eq!(batch.column(1).as_string::<i32>().value(0), "hello");
756    }
757
758    #[test]
759    fn test_tab_delimiter() {
760        let schema = make_schema(vec![
761            ("a", DataType::Int64, false),
762            ("b", DataType::Utf8, false),
763        ]);
764        let config = CsvDecoderConfig {
765            delimiter: b'\t',
766            ..Default::default()
767        };
768        let decoder = CsvDecoder::with_config(schema, config);
769        let records = vec![csv_block("a\tb\n42\thello")];
770        let batch = decoder.decode_batch(&records).unwrap();
771
772        assert_eq!(
773            batch
774                .column(0)
775                .as_primitive::<arrow_array::types::Int64Type>()
776                .value(0),
777            42
778        );
779        assert_eq!(batch.column(1).as_string::<i32>().value(0), "hello");
780    }
781
782    #[test]
783    fn test_semicolon_delimiter() {
784        let schema = make_schema(vec![
785            ("a", DataType::Int64, false),
786            ("b", DataType::Utf8, false),
787        ]);
788        let config = CsvDecoderConfig {
789            delimiter: b';',
790            ..Default::default()
791        };
792        let decoder = CsvDecoder::with_config(schema, config);
793        let records = vec![csv_block("a;b\n99;world")];
794        let batch = decoder.decode_batch(&records).unwrap();
795
796        assert_eq!(
797            batch
798                .column(0)
799                .as_primitive::<arrow_array::types::Int64Type>()
800                .value(0),
801            99
802        );
803        assert_eq!(batch.column(1).as_string::<i32>().value(0), "world");
804    }
805
806    // ── Comment lines ─────────────────────────────────────────
807
808    #[test]
809    fn test_comment_lines() {
810        let schema = make_schema(vec![("val", DataType::Int64, false)]);
811        let config = CsvDecoderConfig {
812            comment: Some(b'#'),
813            ..Default::default()
814        };
815        let decoder = CsvDecoder::with_config(schema, config);
816        let records = vec![csv_block("val\n# this is a comment\n42\n# another\n99")];
817        let batch = decoder.decode_batch(&records).unwrap();
818
819        assert_eq!(batch.num_rows(), 2);
820        let col = batch
821            .column(0)
822            .as_primitive::<arrow_array::types::Int64Type>();
823        assert_eq!(col.value(0), 42);
824        assert_eq!(col.value(1), 99);
825    }
826
827    // ── Skip rows ─────────────────────────────────────────────
828
829    #[test]
830    fn test_skip_rows() {
831        let schema = make_schema(vec![("val", DataType::Int64, false)]);
832        let config = CsvDecoderConfig {
833            skip_rows: 2,
834            ..Default::default()
835        };
836        let decoder = CsvDecoder::with_config(schema, config);
837        let records = vec![csv_block("val\nskip1\nskip2\n42\n99")];
838        let batch = decoder.decode_batch(&records).unwrap();
839
840        // "skip1" and "skip2" are skipped (parse errors counted), then 42 and 99.
841        // Actually skip_rows skips first N data rows; skip1/skip2 aren't valid i64
842        // so they'd be parse errors. But the skip_rows logic skips before type
843        // coercion, so they won't generate errors.
844        assert_eq!(batch.num_rows(), 2);
845    }
846
847    // ── No header mode ────────────────────────────────────────
848
849    #[test]
850    fn test_no_header() {
851        let schema = make_schema(vec![
852            ("col0", DataType::Int64, false),
853            ("col1", DataType::Utf8, false),
854        ]);
855        let config = CsvDecoderConfig {
856            has_header: false,
857            ..Default::default()
858        };
859        let decoder = CsvDecoder::with_config(schema, config);
860        let records = vec![csv_block("1,alpha\n2,beta")];
861        let batch = decoder.decode_batch(&records).unwrap();
862
863        assert_eq!(batch.num_rows(), 2);
864        let col0 = batch
865            .column(0)
866            .as_primitive::<arrow_array::types::Int64Type>();
867        assert_eq!(col0.value(0), 1);
868        assert_eq!(col0.value(1), 2);
869    }
870
871    // ── Multiple records (streaming) ──────────────────────────
872
873    #[test]
874    fn test_multiple_raw_records() {
875        // Simulate streaming: each RawRecord is one CSV line (no header).
876        let schema = make_schema(vec![
877            ("id", DataType::Int64, false),
878            ("val", DataType::Float64, false),
879        ]);
880        let config = CsvDecoderConfig {
881            has_header: false,
882            ..Default::default()
883        };
884        let decoder = CsvDecoder::with_config(schema, config);
885        let records = vec![
886            csv_record("1,1.5"),
887            csv_record("2,2.5"),
888            csv_record("3,3.5"),
889        ];
890        let batch = decoder.decode_batch(&records).unwrap();
891
892        assert_eq!(batch.num_rows(), 3);
893        let id_col = batch
894            .column(0)
895            .as_primitive::<arrow_array::types::Int64Type>();
896        let val_col = batch
897            .column(1)
898            .as_primitive::<arrow_array::types::Float64Type>();
899        assert_eq!(id_col.value(0), 1);
900        assert_eq!(id_col.value(2), 3);
901        assert!((val_col.value(1) - 2.5).abs() < f64::EPSILON);
902    }
903
904    // ── Quoted fields ─────────────────────────────────────────
905
906    #[test]
907    fn test_quoted_fields_with_delimiter() {
908        let schema = make_schema(vec![
909            ("name", DataType::Utf8, false),
910            ("desc", DataType::Utf8, false),
911        ]);
912        let decoder = CsvDecoder::new(schema);
913        let records = vec![csv_block("name,desc\n\"Smith, John\",\"A, B\"")];
914        let batch = decoder.decode_batch(&records).unwrap();
915
916        assert_eq!(batch.num_rows(), 1);
917        assert_eq!(batch.column(0).as_string::<i32>().value(0), "Smith, John");
918        assert_eq!(batch.column(1).as_string::<i32>().value(0), "A, B");
919    }
920
921    #[test]
922    fn test_quoted_fields_with_newline() {
923        let schema = make_schema(vec![
924            ("id", DataType::Int64, false),
925            ("text", DataType::Utf8, false),
926        ]);
927        let decoder = CsvDecoder::new(schema);
928        let records = vec![csv_block("id,text\n1,\"line1\nline2\"")];
929        let batch = decoder.decode_batch(&records).unwrap();
930
931        assert_eq!(batch.num_rows(), 1);
932        assert_eq!(batch.column(1).as_string::<i32>().value(0), "line1\nline2");
933    }
934
935    #[test]
936    fn test_escaped_quotes_rfc4180() {
937        // RFC 4180: doubled quotes within quoted field.
938        let schema = make_schema(vec![("val", DataType::Utf8, false)]);
939        let decoder = CsvDecoder::new(schema);
940        let records = vec![csv_block("val\n\"She said \"\"hello\"\"\"")];
941        let batch = decoder.decode_batch(&records).unwrap();
942
943        assert_eq!(batch.num_rows(), 1);
944        assert_eq!(
945            batch.column(0).as_string::<i32>().value(0),
946            "She said \"hello\""
947        );
948    }
949
950    // ── Timestamp parsing ─────────────────────────────────────
951
952    #[test]
953    fn test_decode_timestamp() {
954        let schema = make_schema(vec![(
955            "ts",
956            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
957            false,
958        )]);
959        let decoder = CsvDecoder::new(schema);
960        let records = vec![csv_block("ts\n2025-01-15 10:30:00.000")];
961        let batch = decoder.decode_batch(&records).unwrap();
962
963        assert_eq!(batch.num_rows(), 1);
964        assert!(!batch.column(0).is_null(0));
965    }
966
967    #[test]
968    fn test_decode_timestamp_iso8601_fallback() {
969        let schema = make_schema(vec![(
970            "ts",
971            DataType::Timestamp(TimeUnit::Nanosecond, None),
972            false,
973        )]);
974        let decoder = CsvDecoder::new(schema);
975        let records = vec![csv_block("ts\n2025-01-15T10:30:00Z")];
976        let batch = decoder.decode_batch(&records).unwrap();
977
978        assert_eq!(batch.num_rows(), 1);
979        assert!(!batch.column(0).is_null(0));
980    }
981
982    // ── Date parsing ──────────────────────────────────────────
983
984    #[test]
985    fn test_decode_date() {
986        let schema = make_schema(vec![("d", DataType::Date32, false)]);
987        let decoder = CsvDecoder::new(schema);
988        let records = vec![csv_block("d\n2025-06-15")];
989        let batch = decoder.decode_batch(&records).unwrap();
990
991        assert_eq!(batch.num_rows(), 1);
992        assert!(!batch.column(0).is_null(0));
993        // 2025-06-15 is day 20254 since epoch.
994        let days = batch
995            .column(0)
996            .as_primitive::<arrow_array::types::Date32Type>()
997            .value(0);
998        let expected = chrono::NaiveDate::from_ymd_opt(2025, 6, 15)
999            .unwrap()
1000            .signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
1001            .num_days();
1002        #[allow(clippy::cast_possible_truncation)]
1003        {
1004            assert_eq!(days, expected as i32);
1005        }
1006    }
1007
1008    // ── Boolean parsing ───────────────────────────────────────
1009
1010    #[test]
1011    fn test_decode_boolean_variants() {
1012        let schema = make_schema(vec![("b", DataType::Boolean, false)]);
1013        let config = CsvDecoderConfig {
1014            has_header: false,
1015            ..Default::default()
1016        };
1017        let decoder = CsvDecoder::with_config(schema, config);
1018        let records = vec![csv_block("true\nfalse\n1\n0\nyes\nno\nt\nf\ny\nn")];
1019        let batch = decoder.decode_batch(&records).unwrap();
1020
1021        assert_eq!(batch.num_rows(), 10);
1022        let col = batch.column(0).as_boolean();
1023        assert!(col.value(0)); // true
1024        assert!(!col.value(1)); // false
1025        assert!(col.value(2)); // 1
1026        assert!(!col.value(3)); // 0
1027        assert!(col.value(4)); // yes
1028        assert!(!col.value(5)); // no
1029        assert!(col.value(6)); // t
1030        assert!(!col.value(7)); // f
1031        assert!(col.value(8)); // y
1032        assert!(!col.value(9)); // n
1033    }
1034
1035    // ── Parse error counting ──────────────────────────────────
1036
1037    #[test]
1038    fn test_parse_error_count() {
1039        let schema = make_schema(vec![("val", DataType::Int64, true)]);
1040        let decoder = CsvDecoder::new(schema);
1041        let records = vec![csv_block("val\nnot_a_number\n42\nalso_bad")];
1042        let batch = decoder.decode_batch(&records).unwrap();
1043
1044        assert_eq!(batch.num_rows(), 3);
1045        assert!(batch.column(0).is_null(0));
1046        assert_eq!(
1047            batch
1048                .column(0)
1049                .as_primitive::<arrow_array::types::Int64Type>()
1050                .value(1),
1051            42
1052        );
1053        assert!(batch.column(0).is_null(2));
1054        assert_eq!(decoder.parse_error_count(), 2);
1055    }
1056
1057    // ── Extra fields ignored ──────────────────────────────────
1058
1059    #[test]
1060    fn test_extra_fields_truncated() {
1061        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1062        let decoder = CsvDecoder::new(schema);
1063        // Row has 3 fields but schema only has 1.
1064        let records = vec![csv_block("a\n42,extra1,extra2")];
1065        let batch = decoder.decode_batch(&records).unwrap();
1066
1067        // Extra fields silently ignored (flexible mode).
1068        // field_count (3) != num_fields (1), but Null strategy just pads/truncates.
1069        assert_eq!(batch.num_rows(), 1);
1070        assert_eq!(
1071            batch
1072                .column(0)
1073                .as_primitive::<arrow_array::types::Int64Type>()
1074                .value(0),
1075            42
1076        );
1077    }
1078
1079    // ── FormatDecoder trait ───────────────────────────────────
1080
1081    #[test]
1082    fn test_format_name() {
1083        let schema = make_schema(vec![("a", DataType::Int64, false)]);
1084        let decoder = CsvDecoder::new(schema);
1085        assert_eq!(decoder.format_name(), "csv");
1086    }
1087
1088    #[test]
1089    fn test_output_schema() {
1090        let schema = make_schema(vec![
1091            ("a", DataType::Int64, false),
1092            ("b", DataType::Utf8, true),
1093        ]);
1094        let decoder = CsvDecoder::new(schema.clone());
1095        assert_eq!(decoder.output_schema(), schema);
1096    }
1097
1098    #[test]
1099    fn test_decode_one() {
1100        let schema = make_schema(vec![("x", DataType::Int64, false)]);
1101        let config = CsvDecoderConfig {
1102            has_header: false,
1103            ..Default::default()
1104        };
1105        let decoder = CsvDecoder::with_config(schema, config);
1106        let record = csv_record("99");
1107        let batch = decoder.decode_one(&record).unwrap();
1108        assert_eq!(batch.num_rows(), 1);
1109        assert_eq!(
1110            batch
1111                .column(0)
1112                .as_primitive::<arrow_array::types::Int64Type>()
1113                .value(0),
1114            99
1115        );
1116    }
1117
1118    // ── Edge cases ────────────────────────────────────────────
1119
1120    #[test]
1121    fn test_mixed_line_endings() {
1122        let schema = make_schema(vec![("val", DataType::Int64, false)]);
1123        let config = CsvDecoderConfig {
1124            has_header: false,
1125            ..Default::default()
1126        };
1127        let decoder = CsvDecoder::with_config(schema, config);
1128        let records = vec![csv_block("1\r\n2\n3\r\n")];
1129        let batch = decoder.decode_batch(&records).unwrap();
1130        assert_eq!(batch.num_rows(), 3);
1131    }
1132
1133    #[test]
1134    fn test_unicode_values() {
1135        let schema = make_schema(vec![("name", DataType::Utf8, false)]);
1136        let decoder = CsvDecoder::new(schema);
1137        let records = vec![csv_block("name\nこんにちは\nüber\nnaïve")];
1138        let batch = decoder.decode_batch(&records).unwrap();
1139
1140        assert_eq!(batch.num_rows(), 3);
1141        assert_eq!(batch.column(0).as_string::<i32>().value(0), "こんにちは");
1142        assert_eq!(batch.column(0).as_string::<i32>().value(1), "über");
1143        assert_eq!(batch.column(0).as_string::<i32>().value(2), "naïve");
1144    }
1145
1146    #[test]
1147    fn test_trailing_comma() {
1148        // Trailing comma creates an extra empty field.
1149        let schema = make_schema(vec![
1150            ("a", DataType::Int64, false),
1151            ("b", DataType::Int64, true),
1152        ]);
1153        let decoder = CsvDecoder::new(schema);
1154        let records = vec![csv_block("a,b\n1,")];
1155        let batch = decoder.decode_batch(&records).unwrap();
1156
1157        assert_eq!(batch.num_rows(), 1);
1158        assert_eq!(
1159            batch
1160                .column(0)
1161                .as_primitive::<arrow_array::types::Int64Type>()
1162                .value(0),
1163            1
1164        );
1165        // Empty string matches default null_string → null.
1166        assert!(batch.column(1).is_null(0));
1167    }
1168
1169    #[test]
1170    fn test_backslash_escape() {
1171        let schema = make_schema(vec![("val", DataType::Utf8, false)]);
1172        let config = CsvDecoderConfig {
1173            escape: Some(b'\\'),
1174            ..Default::default()
1175        };
1176        let decoder = CsvDecoder::with_config(schema, config);
1177        let records = vec![csv_block("val\n\"hello \\\"world\\\"\"")];
1178        let batch = decoder.decode_batch(&records).unwrap();
1179
1180        assert_eq!(batch.num_rows(), 1);
1181        assert_eq!(
1182            batch.column(0).as_string::<i32>().value(0),
1183            "hello \"world\""
1184        );
1185    }
1186}