Skip to main content

laminar_connectors/serde/
csv.rs

1//! CSV serialization and deserialization.
2//!
3//! Implements [`RecordDeserializer`] / [`RecordSerializer`] by delegating
4//! to [`CsvDecoder`] and [`CsvEncoder`].
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8
9use super::{Format, RecordDeserializer, RecordSerializer};
10use crate::error::SerdeError;
11use crate::schema::csv::{CsvDecoder, CsvDecoderConfig, CsvEncoder, CsvEncoderConfig};
12use crate::schema::traits::{FormatDecoder, FormatEncoder};
13use crate::schema::types::RawRecord;
14
15/// CSV record deserializer. Delegates to [`CsvDecoder`].
16#[derive(Debug, Clone)]
17pub struct CsvDeserializer {
18    delimiter: u8,
19}
20
21impl CsvDeserializer {
22    /// Creates a new CSV deserializer with comma delimiter.
23    #[must_use]
24    pub fn new() -> Self {
25        Self { delimiter: b',' }
26    }
27
28    /// Creates a CSV deserializer with a custom delimiter.
29    #[must_use]
30    pub fn with_delimiter(delimiter: u8) -> Self {
31        Self { delimiter }
32    }
33}
34
35impl Default for CsvDeserializer {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl RecordDeserializer for CsvDeserializer {
42    fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
43        let config = CsvDecoderConfig {
44            delimiter: self.delimiter,
45            has_header: false,
46            ..CsvDecoderConfig::default()
47        };
48        let decoder = CsvDecoder::with_config(schema.clone(), config);
49        let record = RawRecord::new(data.to_vec());
50        decoder
51            .decode_one(&record)
52            .map_err(|e| SerdeError::Csv(e.to_string()))
53    }
54
55    fn format(&self) -> Format {
56        Format::Csv
57    }
58}
59
60/// CSV record serializer. Delegates to [`CsvEncoder`].
61#[derive(Debug, Clone)]
62pub struct CsvSerializer {
63    delimiter: u8,
64}
65
66impl CsvSerializer {
67    /// Creates a new CSV serializer with comma delimiter.
68    #[must_use]
69    pub fn new() -> Self {
70        Self { delimiter: b',' }
71    }
72
73    /// Creates a CSV serializer with a custom delimiter.
74    #[must_use]
75    pub fn with_delimiter(delimiter: u8) -> Self {
76        Self { delimiter }
77    }
78}
79
80impl Default for CsvSerializer {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl RecordSerializer for CsvSerializer {
87    fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
88        let config = CsvEncoderConfig {
89            delimiter: self.delimiter,
90            has_header: false,
91        };
92        let encoder = CsvEncoder::with_config(batch.schema(), config);
93        encoder
94            .encode_batch(batch)
95            .map_err(|e| SerdeError::Csv(e.to_string()))
96    }
97
98    fn format(&self) -> Format {
99        Format::Csv
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use arrow_schema::{DataType, Field, Schema};
107    use std::sync::Arc;
108
109    fn test_schema() -> SchemaRef {
110        Arc::new(Schema::new(vec![
111            Field::new("id", DataType::Int64, false),
112            Field::new("name", DataType::Utf8, false),
113            Field::new("score", DataType::Float64, true),
114        ]))
115    }
116
117    #[test]
118    fn test_csv_deserialize_basic() {
119        let deser = CsvDeserializer::new();
120        let schema = test_schema();
121        let data = b"1,Alice,95.5";
122
123        let batch = deser.deserialize(data, &schema).unwrap();
124        assert_eq!(batch.num_rows(), 1);
125
126        let ids = batch
127            .column(0)
128            .as_any()
129            .downcast_ref::<arrow_array::Int64Array>()
130            .unwrap();
131        assert_eq!(ids.value(0), 1);
132    }
133
134    #[test]
135    fn test_csv_serialize_roundtrip() {
136        let deser = CsvDeserializer::new();
137        let ser = CsvSerializer::new();
138        let schema = test_schema();
139
140        let data = b"42,Charlie,88.5";
141        let batch = deser.deserialize(data, &schema).unwrap();
142
143        let serialized = ser.serialize(&batch).unwrap();
144        assert_eq!(serialized.len(), 1);
145
146        let line = std::str::from_utf8(&serialized[0]).unwrap();
147        assert!(line.contains("42"));
148        assert!(line.contains("Charlie"));
149    }
150
151    #[test]
152    fn test_csv_null_handling() {
153        let deser = CsvDeserializer::new();
154        let schema = test_schema();
155        let data = b"1,Bob,";
156
157        let batch = deser.deserialize(data, &schema).unwrap();
158        assert!(batch.column(2).is_null(0));
159    }
160
161    #[test]
162    fn test_csv_quoted_fields() {
163        let deser = CsvDeserializer::new();
164        let schema = Arc::new(Schema::new(vec![
165            Field::new("id", DataType::Int64, false),
166            Field::new("desc", DataType::Utf8, false),
167        ]));
168        let data = br#"1,"hello, world""#;
169
170        let batch = deser.deserialize(data, &schema).unwrap();
171        let descs = batch
172            .column(1)
173            .as_any()
174            .downcast_ref::<arrow_array::StringArray>()
175            .unwrap();
176        assert_eq!(descs.value(0), "hello, world");
177    }
178}