laminar_connectors/serde/
csv.rs1use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8
9use super::{Format, RecordDeserializer, RecordSerializer};
10use crate::error::SerdeError;
11use crate::schema::csv::{CsvDecoder, CsvDecoderConfig, CsvEncoder, CsvEncoderConfig};
12use crate::schema::traits::{FormatDecoder, FormatEncoder};
13use crate::schema::types::RawRecord;
14
15#[derive(Debug, Clone)]
17pub struct CsvDeserializer {
18 delimiter: u8,
19}
20
21impl CsvDeserializer {
22 #[must_use]
24 pub fn new() -> Self {
25 Self { delimiter: b',' }
26 }
27
28 #[must_use]
30 pub fn with_delimiter(delimiter: u8) -> Self {
31 Self { delimiter }
32 }
33}
34
35impl Default for CsvDeserializer {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl RecordDeserializer for CsvDeserializer {
42 fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
43 let config = CsvDecoderConfig {
44 delimiter: self.delimiter,
45 has_header: false,
46 ..CsvDecoderConfig::default()
47 };
48 let decoder = CsvDecoder::with_config(schema.clone(), config);
49 let record = RawRecord::new(data.to_vec());
50 decoder
51 .decode_one(&record)
52 .map_err(|e| SerdeError::Csv(e.to_string()))
53 }
54
55 fn format(&self) -> Format {
56 Format::Csv
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct CsvSerializer {
63 delimiter: u8,
64}
65
66impl CsvSerializer {
67 #[must_use]
69 pub fn new() -> Self {
70 Self { delimiter: b',' }
71 }
72
73 #[must_use]
75 pub fn with_delimiter(delimiter: u8) -> Self {
76 Self { delimiter }
77 }
78}
79
80impl Default for CsvSerializer {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl RecordSerializer for CsvSerializer {
87 fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError> {
88 let config = CsvEncoderConfig {
89 delimiter: self.delimiter,
90 has_header: false,
91 };
92 let encoder = CsvEncoder::with_config(batch.schema(), config);
93 encoder
94 .encode_batch(batch)
95 .map_err(|e| SerdeError::Csv(e.to_string()))
96 }
97
98 fn format(&self) -> Format {
99 Format::Csv
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use arrow_schema::{DataType, Field, Schema};
107 use std::sync::Arc;
108
109 fn test_schema() -> SchemaRef {
110 Arc::new(Schema::new(vec![
111 Field::new("id", DataType::Int64, false),
112 Field::new("name", DataType::Utf8, false),
113 Field::new("score", DataType::Float64, true),
114 ]))
115 }
116
117 #[test]
118 fn test_csv_deserialize_basic() {
119 let deser = CsvDeserializer::new();
120 let schema = test_schema();
121 let data = b"1,Alice,95.5";
122
123 let batch = deser.deserialize(data, &schema).unwrap();
124 assert_eq!(batch.num_rows(), 1);
125
126 let ids = batch
127 .column(0)
128 .as_any()
129 .downcast_ref::<arrow_array::Int64Array>()
130 .unwrap();
131 assert_eq!(ids.value(0), 1);
132 }
133
134 #[test]
135 fn test_csv_serialize_roundtrip() {
136 let deser = CsvDeserializer::new();
137 let ser = CsvSerializer::new();
138 let schema = test_schema();
139
140 let data = b"42,Charlie,88.5";
141 let batch = deser.deserialize(data, &schema).unwrap();
142
143 let serialized = ser.serialize(&batch).unwrap();
144 assert_eq!(serialized.len(), 1);
145
146 let line = std::str::from_utf8(&serialized[0]).unwrap();
147 assert!(line.contains("42"));
148 assert!(line.contains("Charlie"));
149 }
150
151 #[test]
152 fn test_csv_null_handling() {
153 let deser = CsvDeserializer::new();
154 let schema = test_schema();
155 let data = b"1,Bob,";
156
157 let batch = deser.deserialize(data, &schema).unwrap();
158 assert!(batch.column(2).is_null(0));
159 }
160
161 #[test]
162 fn test_csv_quoted_fields() {
163 let deser = CsvDeserializer::new();
164 let schema = Arc::new(Schema::new(vec![
165 Field::new("id", DataType::Int64, false),
166 Field::new("desc", DataType::Utf8, false),
167 ]));
168 let data = br#"1,"hello, world""#;
169
170 let batch = deser.deserialize(data, &schema).unwrap();
171 let descs = batch
172 .column(1)
173 .as_any()
174 .downcast_ref::<arrow_array::StringArray>()
175 .unwrap();
176 assert_eq!(descs.value(0), "hello, world");
177 }
178}