laminar_connectors/serde/
mod.rs1pub mod csv;
18pub mod debezium;
19pub mod json;
20pub mod raw;
21
22use arrow_array::RecordBatch;
23use arrow_schema::SchemaRef;
24
25use crate::error::SerdeError;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum Format {
30 Json,
32
33 Csv,
35
36 Raw,
38
39 Debezium,
41
42 Avro,
44}
45
46impl Format {
47 #[must_use]
49 pub fn as_str(&self) -> &'static str {
50 match self {
51 Format::Json => "json",
52 Format::Csv => "csv",
53 Format::Raw => "raw",
54 Format::Debezium => "debezium",
55 Format::Avro => "avro",
56 }
57 }
58
59 pub fn parse(s: &str) -> Result<Self, SerdeError> {
65 s.parse()
66 }
67}
68
69impl std::str::FromStr for Format {
70 type Err = SerdeError;
71
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 match s.to_lowercase().as_str() {
74 "json" => Ok(Format::Json),
75 "csv" => Ok(Format::Csv),
76 "raw" | "bytes" => Ok(Format::Raw),
77 "debezium" | "debezium-json" => Ok(Format::Debezium),
78 "avro" | "confluent-avro" => Ok(Format::Avro),
79 other => Err(SerdeError::UnsupportedFormat(other.to_string())),
80 }
81 }
82}
83
84impl std::fmt::Display for Format {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{}", self.as_str())
87 }
88}
89
90pub trait RecordDeserializer: Send + Sync {
95 fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError>;
102
103 fn deserialize_batch(
113 &self,
114 records: &[&[u8]],
115 schema: &SchemaRef,
116 ) -> Result<RecordBatch, SerdeError> {
117 if records.is_empty() {
118 return Ok(RecordBatch::new_empty(schema.clone()));
119 }
120
121 let batches: Result<Vec<RecordBatch>, SerdeError> = records
122 .iter()
123 .map(|data| self.deserialize(data, schema))
124 .collect();
125 let batches = batches?;
126
127 arrow_select::concat::concat_batches(schema, &batches)
128 .map_err(|e| SerdeError::MalformedInput(format!("failed to concat batches: {e}")))
129 }
130
131 fn format(&self) -> Format;
133
134 fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
136 None
137 }
138}
139
140pub trait RecordSerializer: Send + Sync {
145 fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError>;
153
154 fn serialize_batch(&self, batch: &RecordBatch) -> Result<Vec<u8>, SerdeError> {
163 let records = self.serialize(batch)?;
164 let total_len: usize = records.iter().map(Vec::len).sum();
165 let mut buf = Vec::with_capacity(total_len);
166 for record in &records {
167 buf.extend_from_slice(record);
168 }
169 Ok(buf)
170 }
171
172 fn format(&self) -> Format;
174}
175
176pub fn create_deserializer(format: Format) -> Result<Box<dyn RecordDeserializer>, SerdeError> {
182 match format {
183 Format::Json => Ok(Box::new(json::JsonDeserializer::new())),
184 Format::Csv => Ok(Box::new(csv::CsvDeserializer::new())),
185 Format::Raw => Ok(Box::new(raw::RawBytesDeserializer::new())),
186 Format::Debezium => Ok(Box::new(debezium::DebeziumDeserializer::new())),
187 Format::Avro => Err(SerdeError::UnsupportedFormat(
188 "Avro deserialization requires the 'kafka' feature".into(),
189 )),
190 }
191}
192
193pub fn create_serializer(format: Format) -> Result<Box<dyn RecordSerializer>, SerdeError> {
199 match format {
200 Format::Json => Ok(Box::new(json::JsonSerializer::new())),
201 Format::Csv => Ok(Box::new(csv::CsvSerializer::new())),
202 Format::Raw => Ok(Box::new(raw::RawBytesSerializer::new())),
203 Format::Debezium => Err(SerdeError::UnsupportedFormat(
204 "Debezium is a deserialization-only format".into(),
205 )),
206 Format::Avro => Err(SerdeError::UnsupportedFormat(
207 "Avro serialization requires the 'kafka' feature".into(),
208 )),
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_format_from_str() {
218 assert_eq!(Format::parse("json").unwrap(), Format::Json);
219 assert_eq!(Format::parse("JSON").unwrap(), Format::Json);
220 assert_eq!(Format::parse("csv").unwrap(), Format::Csv);
221 assert_eq!(Format::parse("raw").unwrap(), Format::Raw);
222 assert_eq!(Format::parse("bytes").unwrap(), Format::Raw);
223 assert_eq!(Format::parse("debezium").unwrap(), Format::Debezium);
224 assert_eq!(Format::parse("debezium-json").unwrap(), Format::Debezium);
225 assert_eq!(Format::parse("avro").unwrap(), Format::Avro);
226 assert_eq!(Format::parse("confluent-avro").unwrap(), Format::Avro);
227 }
228
229 #[test]
230 fn test_format_display() {
231 assert_eq!(Format::Json.to_string(), "json");
232 assert_eq!(Format::Csv.to_string(), "csv");
233 assert_eq!(Format::Raw.to_string(), "raw");
234 assert_eq!(Format::Debezium.to_string(), "debezium");
235 assert_eq!(Format::Avro.to_string(), "avro");
236 }
237
238 #[test]
239 fn test_create_deserializer() {
240 assert!(create_deserializer(Format::Json).is_ok());
241 assert!(create_deserializer(Format::Csv).is_ok());
242 assert!(create_deserializer(Format::Raw).is_ok());
243 assert!(create_deserializer(Format::Debezium).is_ok());
244 }
245
246 #[test]
247 fn test_create_serializer() {
248 assert!(create_serializer(Format::Json).is_ok());
249 assert!(create_serializer(Format::Csv).is_ok());
250 assert!(create_serializer(Format::Raw).is_ok());
251 assert!(create_serializer(Format::Debezium).is_err()); }
253}