1use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14
15use arrow_array::builder::{
16 BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
17 TimestampNanosecondBuilder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, SchemaRef, TimeUnit};
21
22use crate::schema::error::{SchemaError, SchemaResult};
23use crate::schema::traits::FormatDecoder;
24use crate::schema::types::RawRecord;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum FieldCountMismatchStrategy {
29 Null,
31 Skip,
33 Reject,
35}
36
37#[derive(Debug, Clone)]
42pub struct CsvDecoderConfig {
43 pub delimiter: u8,
46
47 pub quote: Option<u8>,
50
51 pub escape: Option<u8>,
55
56 pub has_header: bool,
59
60 pub null_string: String,
63
64 pub comment: Option<u8>,
67
68 pub skip_rows: usize,
71
72 pub timestamp_format: String,
75
76 pub date_format: String,
79
80 pub field_count_mismatch: FieldCountMismatchStrategy,
83}
84
85impl Default for CsvDecoderConfig {
86 fn default() -> Self {
87 Self {
88 delimiter: b',',
89 quote: Some(b'"'),
90 escape: None,
91 has_header: true,
92 null_string: String::new(),
93 comment: None,
94 skip_rows: 0,
95 timestamp_format: "%Y-%m-%d %H:%M:%S%.f".into(),
96 date_format: "%Y-%m-%d".into(),
97 field_count_mismatch: FieldCountMismatchStrategy::Null,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
104enum CsvCoercion {
105 Boolean,
107 Int64,
109 Float64,
111 Timestamp(String),
113 Date(String),
115 Utf8,
117}
118
119pub struct CsvDecoder {
126 schema: SchemaRef,
128 config: CsvDecoderConfig,
130 coercions: Vec<CsvCoercion>,
133 parse_error_count: AtomicU64,
135}
136
137#[allow(clippy::missing_fields_in_debug)]
138impl std::fmt::Debug for CsvDecoder {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 f.debug_struct("CsvDecoder")
141 .field("schema", &self.schema)
142 .field("config", &self.config)
143 .field(
144 "parse_error_count",
145 &self.parse_error_count.load(Ordering::Relaxed),
146 )
147 .finish()
148 }
149}
150
151impl CsvDecoder {
152 #[must_use]
154 pub fn new(schema: SchemaRef) -> Self {
155 Self::with_config(schema, CsvDecoderConfig::default())
156 }
157
158 #[must_use]
160 pub fn with_config(schema: SchemaRef, config: CsvDecoderConfig) -> Self {
161 let coercions: Vec<CsvCoercion> = schema
162 .fields()
163 .iter()
164 .map(|field| Self::coercion_for_type(field.data_type(), &config))
165 .collect();
166
167 Self {
168 schema,
169 config,
170 coercions,
171 parse_error_count: AtomicU64::new(0),
172 }
173 }
174
175 pub fn parse_error_count(&self) -> u64 {
177 self.parse_error_count.load(Ordering::Relaxed)
178 }
179
180 fn coercion_for_type(data_type: &DataType, config: &CsvDecoderConfig) -> CsvCoercion {
182 match data_type {
183 DataType::Boolean => CsvCoercion::Boolean,
184 DataType::Int8
185 | DataType::Int16
186 | DataType::Int32
187 | DataType::Int64
188 | DataType::UInt8
189 | DataType::UInt16
190 | DataType::UInt32
191 | DataType::UInt64 => CsvCoercion::Int64,
192 DataType::Float16 | DataType::Float32 | DataType::Float64 => CsvCoercion::Float64,
193 DataType::Timestamp(_, _) => CsvCoercion::Timestamp(config.timestamp_format.clone()),
194 DataType::Date32 | DataType::Date64 => CsvCoercion::Date(config.date_format.clone()),
195 _ => CsvCoercion::Utf8,
196 }
197 }
198
199 fn make_reader_builder(&self) -> csv::ReaderBuilder {
201 let mut rb = csv::ReaderBuilder::new();
202 rb.delimiter(self.config.delimiter)
203 .has_headers(false) .flexible(true); if let Some(q) = self.config.quote {
207 rb.quote(q);
208 }
209 if let Some(e) = self.config.escape {
210 rb.escape(Some(e));
211 }
212 if let Some(c) = self.config.comment {
213 rb.comment(Some(c));
214 }
215
216 rb
217 }
218}
219
220impl FormatDecoder for CsvDecoder {
221 fn output_schema(&self) -> SchemaRef {
222 self.schema.clone()
223 }
224
225 fn decode_batch(&self, records: &[RawRecord]) -> SchemaResult<RecordBatch> {
242 if records.is_empty() {
243 return Ok(RecordBatch::new_empty(self.schema.clone()));
244 }
245
246 let num_fields = self.schema.fields().len();
247 let capacity = records.len();
248
249 let mut builders = create_builders(&self.schema, capacity);
251
252 let mut combined = Vec::with_capacity(records.iter().map(|r| r.value.len() + 1).sum());
254 for record in records {
255 combined.extend_from_slice(&record.value);
256 if !record.value.ends_with(b"\n") {
257 combined.push(b'\n');
258 }
259 }
260
261 let rb = self.make_reader_builder();
262 let mut reader = rb.from_reader(combined.as_slice());
263
264 let mut rows_skipped = 0usize;
265 let mut header_skipped = false;
266 let mut row_count = 0usize;
267
268 let mut byte_record = csv::ByteRecord::new();
269 while reader
270 .read_byte_record(&mut byte_record)
271 .map_err(|e| SchemaError::DecodeError(format!("CSV parse error: {e}")))?
272 {
273 if self.config.has_header && !header_skipped {
275 header_skipped = true;
276 continue;
277 }
278
279 if rows_skipped < self.config.skip_rows {
281 rows_skipped += 1;
282 continue;
283 }
284
285 let field_count = byte_record.len();
286
287 if field_count != num_fields {
289 match self.config.field_count_mismatch {
290 FieldCountMismatchStrategy::Reject => {
291 return Err(SchemaError::DecodeError(format!(
292 "field count mismatch: expected {num_fields}, got {field_count}"
293 )));
294 }
295 FieldCountMismatchStrategy::Skip => {
296 self.parse_error_count.fetch_add(1, Ordering::Relaxed);
297 continue;
298 }
299 FieldCountMismatchStrategy::Null => {
300 }
302 }
303 }
304
305 for col_idx in 0..num_fields {
307 if col_idx >= field_count {
308 append_null(&mut builders[col_idx]);
310 continue;
311 }
312
313 let raw_field = &byte_record[col_idx];
314 let field_str = std::str::from_utf8(raw_field).unwrap_or("");
315 let trimmed = field_str.trim();
316
317 if trimmed == self.config.null_string {
319 append_null(&mut builders[col_idx]);
320 continue;
321 }
322
323 let ok = append_coerced(&mut builders[col_idx], &self.coercions[col_idx], trimmed);
325
326 if !ok {
327 self.parse_error_count.fetch_add(1, Ordering::Relaxed);
328 append_null(&mut builders[col_idx]);
329 }
330 }
331
332 row_count += 1;
333 }
334
335 if row_count == 0 {
337 return Ok(RecordBatch::new_empty(self.schema.clone()));
338 }
339
340 let columns: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
342
343 RecordBatch::try_new(self.schema.clone(), columns)
344 .map_err(|e| SchemaError::DecodeError(format!("RecordBatch construction: {e}")))
345 }
346
347 #[allow(clippy::unnecessary_literal_bound)]
348 fn format_name(&self) -> &str {
349 "csv"
350 }
351}
352
353trait ColumnBuilder: Send {
357 fn finish(&mut self) -> ArrayRef;
358 fn append_null_value(&mut self);
359 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
360}
361
362macro_rules! impl_column_builder {
363 ($builder:ty) => {
364 impl ColumnBuilder for $builder {
365 fn finish(&mut self) -> ArrayRef {
366 Arc::new(<$builder>::finish(self))
367 }
368 fn append_null_value(&mut self) {
369 self.append_null();
370 }
371 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
372 self
373 }
374 }
375 };
376}
377
378impl_column_builder!(BooleanBuilder);
379impl_column_builder!(Int64Builder);
380impl_column_builder!(Float64Builder);
381impl_column_builder!(StringBuilder);
382impl_column_builder!(TimestampNanosecondBuilder);
383impl_column_builder!(Date32Builder);
384
385fn create_builders(schema: &SchemaRef, capacity: usize) -> Vec<Box<dyn ColumnBuilder>> {
386 schema
387 .fields()
388 .iter()
389 .map(|f| create_builder(f.data_type(), capacity))
390 .collect()
391}
392
393fn create_builder(data_type: &DataType, capacity: usize) -> Box<dyn ColumnBuilder> {
394 match data_type {
395 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
396 DataType::Int8
397 | DataType::Int16
398 | DataType::Int32
399 | DataType::Int64
400 | DataType::UInt8
401 | DataType::UInt16
402 | DataType::UInt32
403 | DataType::UInt64 => Box::new(Int64Builder::with_capacity(capacity)),
404 DataType::Float16 | DataType::Float32 | DataType::Float64 => {
405 Box::new(Float64Builder::with_capacity(capacity))
406 }
407 DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
408 let builder =
409 TimestampNanosecondBuilder::with_capacity(capacity).with_timezone_opt(tz.clone());
410 Box::new(builder)
411 }
412 DataType::Date32 | DataType::Date64 => Box::new(Date32Builder::with_capacity(capacity)),
413 _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)),
415 }
416}
417
418fn append_null(builder: &mut Box<dyn ColumnBuilder>) {
419 builder.append_null_value();
420}
421
422fn append_coerced(
425 builder: &mut Box<dyn ColumnBuilder>,
426 coercion: &CsvCoercion,
427 value: &str,
428) -> bool {
429 match coercion {
430 CsvCoercion::Boolean => {
431 let b = builder
432 .as_any_mut()
433 .downcast_mut::<BooleanBuilder>()
434 .unwrap();
435 match value.to_ascii_lowercase().as_str() {
436 "true" | "1" | "yes" | "t" | "y" => {
437 b.append_value(true);
438 true
439 }
440 "false" | "0" | "no" | "f" | "n" => {
441 b.append_value(false);
442 true
443 }
444 _ => false,
445 }
446 }
447 CsvCoercion::Int64 => {
448 let b = builder.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
449 match value.parse::<i64>() {
450 Ok(v) => {
451 b.append_value(v);
452 true
453 }
454 Err(_) => false,
455 }
456 }
457 CsvCoercion::Float64 => {
458 let b = builder
459 .as_any_mut()
460 .downcast_mut::<Float64Builder>()
461 .unwrap();
462 match value.parse::<f64>() {
463 Ok(v) => {
464 b.append_value(v);
465 true
466 }
467 Err(_) => false,
468 }
469 }
470 CsvCoercion::Timestamp(fmt) => {
471 let b = builder
472 .as_any_mut()
473 .downcast_mut::<TimestampNanosecondBuilder>()
474 .unwrap();
475 if let Ok(ndt) = chrono::NaiveDateTime::parse_from_str(value, fmt) {
477 let nanos = ndt.and_utc().timestamp_nanos_opt().unwrap_or(0);
478 b.append_value(nanos);
479 return true;
480 }
481 if let Ok(nanos) = arrow_cast::parse::string_to_timestamp_nanos(value) {
483 b.append_value(nanos);
484 return true;
485 }
486 false
487 }
488 CsvCoercion::Date(fmt) => {
489 let b = builder
490 .as_any_mut()
491 .downcast_mut::<Date32Builder>()
492 .unwrap();
493 if let Ok(date) = chrono::NaiveDate::parse_from_str(value, fmt) {
494 let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
496 let days = (date - epoch).num_days();
497 #[allow(clippy::cast_possible_truncation)]
498 {
499 b.append_value(days as i32);
500 }
501 return true;
502 }
503 false
504 }
505 CsvCoercion::Utf8 => {
506 let b = builder
507 .as_any_mut()
508 .downcast_mut::<StringBuilder>()
509 .unwrap();
510 b.append_value(value);
511 true
512 }
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519 use arrow_array::cast::AsArray;
520 use arrow_schema::{Field, Schema};
521
522 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> SchemaRef {
523 Arc::new(Schema::new(
524 fields
525 .into_iter()
526 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
527 .collect::<Vec<_>>(),
528 ))
529 }
530
531 fn csv_record(line: &str) -> RawRecord {
532 RawRecord::new(line.as_bytes().to_vec())
533 }
534
535 fn csv_block(lines: &str) -> RawRecord {
536 RawRecord::new(lines.as_bytes().to_vec())
537 }
538
539 #[test]
542 fn test_decode_empty_batch() {
543 let schema = make_schema(vec![("id", DataType::Int64, false)]);
544 let decoder = CsvDecoder::new(schema.clone());
545 let batch = decoder.decode_batch(&[]).unwrap();
546 assert_eq!(batch.num_rows(), 0);
547 assert_eq!(batch.schema(), schema);
548 }
549
550 #[test]
551 fn test_decode_single_row_with_header() {
552 let schema = make_schema(vec![
553 ("id", DataType::Int64, false),
554 ("name", DataType::Utf8, true),
555 ]);
556 let decoder = CsvDecoder::new(schema);
557 let records = vec![csv_block("id,name\n42,Alice")];
558 let batch = decoder.decode_batch(&records).unwrap();
559
560 assert_eq!(batch.num_rows(), 1);
561 assert_eq!(
562 batch
563 .column(0)
564 .as_primitive::<arrow_array::types::Int64Type>()
565 .value(0),
566 42
567 );
568 assert_eq!(batch.column(1).as_string::<i32>().value(0), "Alice");
569 }
570
571 #[test]
572 fn test_decode_multiple_rows() {
573 let schema = make_schema(vec![
574 ("x", DataType::Int64, false),
575 ("y", DataType::Float64, false),
576 ]);
577 let decoder = CsvDecoder::new(schema);
578 let records = vec![csv_block("x,y\n1,1.5\n2,2.5\n3,3.5")];
579 let batch = decoder.decode_batch(&records).unwrap();
580
581 assert_eq!(batch.num_rows(), 3);
582 let x_col = batch
583 .column(0)
584 .as_primitive::<arrow_array::types::Int64Type>();
585 assert_eq!(x_col.value(0), 1);
586 assert_eq!(x_col.value(1), 2);
587 assert_eq!(x_col.value(2), 3);
588 }
589
590 #[test]
591 fn test_decode_all_types() {
592 let schema = make_schema(vec![
593 ("bool_col", DataType::Boolean, false),
594 ("int_col", DataType::Int64, false),
595 ("float_col", DataType::Float64, false),
596 ("str_col", DataType::Utf8, false),
597 ]);
598 let decoder = CsvDecoder::new(schema);
599 let records = vec![csv_block(
600 "bool_col,int_col,float_col,str_col\ntrue,42,3.14,hello",
601 )];
602 let batch = decoder.decode_batch(&records).unwrap();
603
604 assert_eq!(batch.num_rows(), 1);
605 assert!(batch.column(0).as_boolean().value(0));
606 assert_eq!(
607 batch
608 .column(1)
609 .as_primitive::<arrow_array::types::Int64Type>()
610 .value(0),
611 42
612 );
613 let f = batch
614 .column(2)
615 .as_primitive::<arrow_array::types::Float64Type>()
616 .value(0);
617 assert!((f - 3.14).abs() < f64::EPSILON);
618 assert_eq!(batch.column(3).as_string::<i32>().value(0), "hello");
619 }
620
621 #[test]
624 fn test_decode_null_string_default() {
625 let schema = make_schema(vec![
627 ("a", DataType::Int64, true),
628 ("b", DataType::Utf8, true),
629 ]);
630 let decoder = CsvDecoder::new(schema);
631 let records = vec![csv_block("a,b\n,")];
632 let batch = decoder.decode_batch(&records).unwrap();
633
634 assert!(batch.column(0).is_null(0));
635 assert!(batch.column(1).is_null(0));
636 }
637
638 #[test]
639 fn test_decode_null_string_custom() {
640 let schema = make_schema(vec![("val", DataType::Int64, true)]);
641 let config = CsvDecoderConfig {
642 null_string: "NA".into(),
643 ..Default::default()
644 };
645 let decoder = CsvDecoder::with_config(schema, config);
646 let records = vec![csv_block("val\nNA\n42")];
647 let batch = decoder.decode_batch(&records).unwrap();
648
649 assert_eq!(batch.num_rows(), 2);
650 assert!(batch.column(0).is_null(0));
651 assert_eq!(
652 batch
653 .column(0)
654 .as_primitive::<arrow_array::types::Int64Type>()
655 .value(1),
656 42
657 );
658 }
659
660 #[test]
663 fn test_mismatch_null_strategy() {
664 let schema = make_schema(vec![
665 ("a", DataType::Int64, true),
666 ("b", DataType::Utf8, true),
667 ("c", DataType::Int64, true),
668 ]);
669 let decoder = CsvDecoder::new(schema);
670 let records = vec![csv_block("a,b,c\n1,hello")];
672 let batch = decoder.decode_batch(&records).unwrap();
673
674 assert_eq!(batch.num_rows(), 1);
675 assert_eq!(
676 batch
677 .column(0)
678 .as_primitive::<arrow_array::types::Int64Type>()
679 .value(0),
680 1
681 );
682 assert_eq!(batch.column(1).as_string::<i32>().value(0), "hello");
683 assert!(batch.column(2).is_null(0)); }
685
686 #[test]
687 fn test_mismatch_skip_strategy() {
688 let schema = make_schema(vec![
689 ("a", DataType::Int64, false),
690 ("b", DataType::Int64, false),
691 ]);
692 let config = CsvDecoderConfig {
693 field_count_mismatch: FieldCountMismatchStrategy::Skip,
694 ..Default::default()
695 };
696 let decoder = CsvDecoder::with_config(schema, config);
697 let records = vec![csv_block("a,b\n1,2\n3")];
699 let batch = decoder.decode_batch(&records).unwrap();
700
701 assert_eq!(batch.num_rows(), 1); assert_eq!(
703 batch
704 .column(0)
705 .as_primitive::<arrow_array::types::Int64Type>()
706 .value(0),
707 1
708 );
709 }
710
711 #[test]
712 fn test_mismatch_reject_strategy() {
713 let schema = make_schema(vec![
714 ("a", DataType::Int64, false),
715 ("b", DataType::Int64, false),
716 ]);
717 let config = CsvDecoderConfig {
718 field_count_mismatch: FieldCountMismatchStrategy::Reject,
719 ..Default::default()
720 };
721 let decoder = CsvDecoder::with_config(schema, config);
722 let records = vec![csv_block("a,b\n1")]; let result = decoder.decode_batch(&records);
724
725 assert!(result.is_err());
726 assert!(result
727 .unwrap_err()
728 .to_string()
729 .contains("field count mismatch"));
730 }
731
732 #[test]
735 fn test_pipe_delimiter() {
736 let schema = make_schema(vec![
737 ("a", DataType::Int64, false),
738 ("b", DataType::Utf8, false),
739 ]);
740 let config = CsvDecoderConfig {
741 delimiter: b'|',
742 ..Default::default()
743 };
744 let decoder = CsvDecoder::with_config(schema, config);
745 let records = vec![csv_block("a|b\n42|hello")];
746 let batch = decoder.decode_batch(&records).unwrap();
747
748 assert_eq!(
749 batch
750 .column(0)
751 .as_primitive::<arrow_array::types::Int64Type>()
752 .value(0),
753 42
754 );
755 assert_eq!(batch.column(1).as_string::<i32>().value(0), "hello");
756 }
757
758 #[test]
759 fn test_tab_delimiter() {
760 let schema = make_schema(vec![
761 ("a", DataType::Int64, false),
762 ("b", DataType::Utf8, false),
763 ]);
764 let config = CsvDecoderConfig {
765 delimiter: b'\t',
766 ..Default::default()
767 };
768 let decoder = CsvDecoder::with_config(schema, config);
769 let records = vec![csv_block("a\tb\n42\thello")];
770 let batch = decoder.decode_batch(&records).unwrap();
771
772 assert_eq!(
773 batch
774 .column(0)
775 .as_primitive::<arrow_array::types::Int64Type>()
776 .value(0),
777 42
778 );
779 assert_eq!(batch.column(1).as_string::<i32>().value(0), "hello");
780 }
781
782 #[test]
783 fn test_semicolon_delimiter() {
784 let schema = make_schema(vec![
785 ("a", DataType::Int64, false),
786 ("b", DataType::Utf8, false),
787 ]);
788 let config = CsvDecoderConfig {
789 delimiter: b';',
790 ..Default::default()
791 };
792 let decoder = CsvDecoder::with_config(schema, config);
793 let records = vec![csv_block("a;b\n99;world")];
794 let batch = decoder.decode_batch(&records).unwrap();
795
796 assert_eq!(
797 batch
798 .column(0)
799 .as_primitive::<arrow_array::types::Int64Type>()
800 .value(0),
801 99
802 );
803 assert_eq!(batch.column(1).as_string::<i32>().value(0), "world");
804 }
805
806 #[test]
809 fn test_comment_lines() {
810 let schema = make_schema(vec![("val", DataType::Int64, false)]);
811 let config = CsvDecoderConfig {
812 comment: Some(b'#'),
813 ..Default::default()
814 };
815 let decoder = CsvDecoder::with_config(schema, config);
816 let records = vec![csv_block("val\n# this is a comment\n42\n# another\n99")];
817 let batch = decoder.decode_batch(&records).unwrap();
818
819 assert_eq!(batch.num_rows(), 2);
820 let col = batch
821 .column(0)
822 .as_primitive::<arrow_array::types::Int64Type>();
823 assert_eq!(col.value(0), 42);
824 assert_eq!(col.value(1), 99);
825 }
826
827 #[test]
830 fn test_skip_rows() {
831 let schema = make_schema(vec![("val", DataType::Int64, false)]);
832 let config = CsvDecoderConfig {
833 skip_rows: 2,
834 ..Default::default()
835 };
836 let decoder = CsvDecoder::with_config(schema, config);
837 let records = vec![csv_block("val\nskip1\nskip2\n42\n99")];
838 let batch = decoder.decode_batch(&records).unwrap();
839
840 assert_eq!(batch.num_rows(), 2);
845 }
846
847 #[test]
850 fn test_no_header() {
851 let schema = make_schema(vec![
852 ("col0", DataType::Int64, false),
853 ("col1", DataType::Utf8, false),
854 ]);
855 let config = CsvDecoderConfig {
856 has_header: false,
857 ..Default::default()
858 };
859 let decoder = CsvDecoder::with_config(schema, config);
860 let records = vec![csv_block("1,alpha\n2,beta")];
861 let batch = decoder.decode_batch(&records).unwrap();
862
863 assert_eq!(batch.num_rows(), 2);
864 let col0 = batch
865 .column(0)
866 .as_primitive::<arrow_array::types::Int64Type>();
867 assert_eq!(col0.value(0), 1);
868 assert_eq!(col0.value(1), 2);
869 }
870
871 #[test]
874 fn test_multiple_raw_records() {
875 let schema = make_schema(vec![
877 ("id", DataType::Int64, false),
878 ("val", DataType::Float64, false),
879 ]);
880 let config = CsvDecoderConfig {
881 has_header: false,
882 ..Default::default()
883 };
884 let decoder = CsvDecoder::with_config(schema, config);
885 let records = vec![
886 csv_record("1,1.5"),
887 csv_record("2,2.5"),
888 csv_record("3,3.5"),
889 ];
890 let batch = decoder.decode_batch(&records).unwrap();
891
892 assert_eq!(batch.num_rows(), 3);
893 let id_col = batch
894 .column(0)
895 .as_primitive::<arrow_array::types::Int64Type>();
896 let val_col = batch
897 .column(1)
898 .as_primitive::<arrow_array::types::Float64Type>();
899 assert_eq!(id_col.value(0), 1);
900 assert_eq!(id_col.value(2), 3);
901 assert!((val_col.value(1) - 2.5).abs() < f64::EPSILON);
902 }
903
904 #[test]
907 fn test_quoted_fields_with_delimiter() {
908 let schema = make_schema(vec![
909 ("name", DataType::Utf8, false),
910 ("desc", DataType::Utf8, false),
911 ]);
912 let decoder = CsvDecoder::new(schema);
913 let records = vec![csv_block("name,desc\n\"Smith, John\",\"A, B\"")];
914 let batch = decoder.decode_batch(&records).unwrap();
915
916 assert_eq!(batch.num_rows(), 1);
917 assert_eq!(batch.column(0).as_string::<i32>().value(0), "Smith, John");
918 assert_eq!(batch.column(1).as_string::<i32>().value(0), "A, B");
919 }
920
921 #[test]
922 fn test_quoted_fields_with_newline() {
923 let schema = make_schema(vec![
924 ("id", DataType::Int64, false),
925 ("text", DataType::Utf8, false),
926 ]);
927 let decoder = CsvDecoder::new(schema);
928 let records = vec![csv_block("id,text\n1,\"line1\nline2\"")];
929 let batch = decoder.decode_batch(&records).unwrap();
930
931 assert_eq!(batch.num_rows(), 1);
932 assert_eq!(batch.column(1).as_string::<i32>().value(0), "line1\nline2");
933 }
934
935 #[test]
936 fn test_escaped_quotes_rfc4180() {
937 let schema = make_schema(vec![("val", DataType::Utf8, false)]);
939 let decoder = CsvDecoder::new(schema);
940 let records = vec![csv_block("val\n\"She said \"\"hello\"\"\"")];
941 let batch = decoder.decode_batch(&records).unwrap();
942
943 assert_eq!(batch.num_rows(), 1);
944 assert_eq!(
945 batch.column(0).as_string::<i32>().value(0),
946 "She said \"hello\""
947 );
948 }
949
950 #[test]
953 fn test_decode_timestamp() {
954 let schema = make_schema(vec![(
955 "ts",
956 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
957 false,
958 )]);
959 let decoder = CsvDecoder::new(schema);
960 let records = vec![csv_block("ts\n2025-01-15 10:30:00.000")];
961 let batch = decoder.decode_batch(&records).unwrap();
962
963 assert_eq!(batch.num_rows(), 1);
964 assert!(!batch.column(0).is_null(0));
965 }
966
967 #[test]
968 fn test_decode_timestamp_iso8601_fallback() {
969 let schema = make_schema(vec![(
970 "ts",
971 DataType::Timestamp(TimeUnit::Nanosecond, None),
972 false,
973 )]);
974 let decoder = CsvDecoder::new(schema);
975 let records = vec![csv_block("ts\n2025-01-15T10:30:00Z")];
976 let batch = decoder.decode_batch(&records).unwrap();
977
978 assert_eq!(batch.num_rows(), 1);
979 assert!(!batch.column(0).is_null(0));
980 }
981
982 #[test]
985 fn test_decode_date() {
986 let schema = make_schema(vec![("d", DataType::Date32, false)]);
987 let decoder = CsvDecoder::new(schema);
988 let records = vec![csv_block("d\n2025-06-15")];
989 let batch = decoder.decode_batch(&records).unwrap();
990
991 assert_eq!(batch.num_rows(), 1);
992 assert!(!batch.column(0).is_null(0));
993 let days = batch
995 .column(0)
996 .as_primitive::<arrow_array::types::Date32Type>()
997 .value(0);
998 let expected = chrono::NaiveDate::from_ymd_opt(2025, 6, 15)
999 .unwrap()
1000 .signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
1001 .num_days();
1002 #[allow(clippy::cast_possible_truncation)]
1003 {
1004 assert_eq!(days, expected as i32);
1005 }
1006 }
1007
1008 #[test]
1011 fn test_decode_boolean_variants() {
1012 let schema = make_schema(vec![("b", DataType::Boolean, false)]);
1013 let config = CsvDecoderConfig {
1014 has_header: false,
1015 ..Default::default()
1016 };
1017 let decoder = CsvDecoder::with_config(schema, config);
1018 let records = vec![csv_block("true\nfalse\n1\n0\nyes\nno\nt\nf\ny\nn")];
1019 let batch = decoder.decode_batch(&records).unwrap();
1020
1021 assert_eq!(batch.num_rows(), 10);
1022 let col = batch.column(0).as_boolean();
1023 assert!(col.value(0)); assert!(!col.value(1)); assert!(col.value(2)); assert!(!col.value(3)); assert!(col.value(4)); assert!(!col.value(5)); assert!(col.value(6)); assert!(!col.value(7)); assert!(col.value(8)); assert!(!col.value(9)); }
1034
1035 #[test]
1038 fn test_parse_error_count() {
1039 let schema = make_schema(vec![("val", DataType::Int64, true)]);
1040 let decoder = CsvDecoder::new(schema);
1041 let records = vec![csv_block("val\nnot_a_number\n42\nalso_bad")];
1042 let batch = decoder.decode_batch(&records).unwrap();
1043
1044 assert_eq!(batch.num_rows(), 3);
1045 assert!(batch.column(0).is_null(0));
1046 assert_eq!(
1047 batch
1048 .column(0)
1049 .as_primitive::<arrow_array::types::Int64Type>()
1050 .value(1),
1051 42
1052 );
1053 assert!(batch.column(0).is_null(2));
1054 assert_eq!(decoder.parse_error_count(), 2);
1055 }
1056
1057 #[test]
1060 fn test_extra_fields_truncated() {
1061 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1062 let decoder = CsvDecoder::new(schema);
1063 let records = vec![csv_block("a\n42,extra1,extra2")];
1065 let batch = decoder.decode_batch(&records).unwrap();
1066
1067 assert_eq!(batch.num_rows(), 1);
1070 assert_eq!(
1071 batch
1072 .column(0)
1073 .as_primitive::<arrow_array::types::Int64Type>()
1074 .value(0),
1075 42
1076 );
1077 }
1078
1079 #[test]
1082 fn test_format_name() {
1083 let schema = make_schema(vec![("a", DataType::Int64, false)]);
1084 let decoder = CsvDecoder::new(schema);
1085 assert_eq!(decoder.format_name(), "csv");
1086 }
1087
1088 #[test]
1089 fn test_output_schema() {
1090 let schema = make_schema(vec![
1091 ("a", DataType::Int64, false),
1092 ("b", DataType::Utf8, true),
1093 ]);
1094 let decoder = CsvDecoder::new(schema.clone());
1095 assert_eq!(decoder.output_schema(), schema);
1096 }
1097
1098 #[test]
1099 fn test_decode_one() {
1100 let schema = make_schema(vec![("x", DataType::Int64, false)]);
1101 let config = CsvDecoderConfig {
1102 has_header: false,
1103 ..Default::default()
1104 };
1105 let decoder = CsvDecoder::with_config(schema, config);
1106 let record = csv_record("99");
1107 let batch = decoder.decode_one(&record).unwrap();
1108 assert_eq!(batch.num_rows(), 1);
1109 assert_eq!(
1110 batch
1111 .column(0)
1112 .as_primitive::<arrow_array::types::Int64Type>()
1113 .value(0),
1114 99
1115 );
1116 }
1117
1118 #[test]
1121 fn test_mixed_line_endings() {
1122 let schema = make_schema(vec![("val", DataType::Int64, false)]);
1123 let config = CsvDecoderConfig {
1124 has_header: false,
1125 ..Default::default()
1126 };
1127 let decoder = CsvDecoder::with_config(schema, config);
1128 let records = vec![csv_block("1\r\n2\n3\r\n")];
1129 let batch = decoder.decode_batch(&records).unwrap();
1130 assert_eq!(batch.num_rows(), 3);
1131 }
1132
1133 #[test]
1134 fn test_unicode_values() {
1135 let schema = make_schema(vec![("name", DataType::Utf8, false)]);
1136 let decoder = CsvDecoder::new(schema);
1137 let records = vec![csv_block("name\nこんにちは\nüber\nnaïve")];
1138 let batch = decoder.decode_batch(&records).unwrap();
1139
1140 assert_eq!(batch.num_rows(), 3);
1141 assert_eq!(batch.column(0).as_string::<i32>().value(0), "こんにちは");
1142 assert_eq!(batch.column(0).as_string::<i32>().value(1), "über");
1143 assert_eq!(batch.column(0).as_string::<i32>().value(2), "naïve");
1144 }
1145
1146 #[test]
1147 fn test_trailing_comma() {
1148 let schema = make_schema(vec![
1150 ("a", DataType::Int64, false),
1151 ("b", DataType::Int64, true),
1152 ]);
1153 let decoder = CsvDecoder::new(schema);
1154 let records = vec![csv_block("a,b\n1,")];
1155 let batch = decoder.decode_batch(&records).unwrap();
1156
1157 assert_eq!(batch.num_rows(), 1);
1158 assert_eq!(
1159 batch
1160 .column(0)
1161 .as_primitive::<arrow_array::types::Int64Type>()
1162 .value(0),
1163 1
1164 );
1165 assert!(batch.column(1).is_null(0));
1167 }
1168
1169 #[test]
1170 fn test_backslash_escape() {
1171 let schema = make_schema(vec![("val", DataType::Utf8, false)]);
1172 let config = CsvDecoderConfig {
1173 escape: Some(b'\\'),
1174 ..Default::default()
1175 };
1176 let decoder = CsvDecoder::with_config(schema, config);
1177 let records = vec![csv_block("val\n\"hello \\\"world\\\"\"")];
1178 let batch = decoder.decode_batch(&records).unwrap();
1179
1180 assert_eq!(batch.num_rows(), 1);
1181 assert_eq!(
1182 batch.column(0).as_string::<i32>().value(0),
1183 "hello \"world\""
1184 );
1185 }
1186}