laminar_connectors/serde/
mod.rs1pub mod csv;
5pub mod debezium;
6pub mod json;
7pub mod raw;
8
9use arrow_array::RecordBatch;
10use arrow_schema::SchemaRef;
11
12use crate::error::SerdeError;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub enum Format {
17 Json,
19
20 Csv,
22
23 Raw,
25
26 Debezium,
28
29 Avro,
31}
32
33impl Format {
34 #[must_use]
36 pub fn as_str(&self) -> &'static str {
37 match self {
38 Format::Json => "json",
39 Format::Csv => "csv",
40 Format::Raw => "raw",
41 Format::Debezium => "debezium",
42 Format::Avro => "avro",
43 }
44 }
45
46 pub fn parse(s: &str) -> Result<Self, SerdeError> {
52 s.parse()
53 }
54}
55
56impl std::str::FromStr for Format {
57 type Err = SerdeError;
58
59 fn from_str(s: &str) -> Result<Self, Self::Err> {
60 match s.to_lowercase().as_str() {
61 "json" => Ok(Format::Json),
62 "csv" => Ok(Format::Csv),
63 "raw" | "bytes" => Ok(Format::Raw),
64 "debezium" | "debezium-json" => Ok(Format::Debezium),
65 "avro" | "confluent-avro" => Ok(Format::Avro),
66 other => Err(SerdeError::UnsupportedFormat(other.to_string())),
67 }
68 }
69}
70
71impl std::fmt::Display for Format {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "{}", self.as_str())
74 }
75}
76
77pub trait RecordDeserializer: Send + Sync {
82 fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError>;
89
90 fn deserialize_batch(
100 &self,
101 records: &[&[u8]],
102 schema: &SchemaRef,
103 ) -> Result<RecordBatch, SerdeError> {
104 if records.is_empty() {
105 return Ok(RecordBatch::new_empty(schema.clone()));
106 }
107
108 let batches: Result<Vec<RecordBatch>, SerdeError> = records
109 .iter()
110 .map(|data| self.deserialize(data, schema))
111 .collect();
112 let batches = batches?;
113
114 arrow_select::concat::concat_batches(schema, &batches)
115 .map_err(|e| SerdeError::MalformedInput(format!("failed to concat batches: {e}")))
116 }
117
118 fn format(&self) -> Format;
120
121 fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
123 None
124 }
125}
126
127pub trait RecordSerializer: Send + Sync {
132 fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError>;
140
141 fn serialize_batch(&self, batch: &RecordBatch) -> Result<Vec<u8>, SerdeError> {
150 let records = self.serialize(batch)?;
151 let total_len: usize = records.iter().map(Vec::len).sum();
152 let mut buf = Vec::with_capacity(total_len);
153 for record in &records {
154 buf.extend_from_slice(record);
155 }
156 Ok(buf)
157 }
158
159 fn format(&self) -> Format;
161}
162
163pub fn create_deserializer(format: Format) -> Result<Box<dyn RecordDeserializer>, SerdeError> {
169 match format {
170 Format::Json => Ok(Box::new(json::JsonDeserializer::new())),
171 Format::Csv => Ok(Box::new(csv::CsvDeserializer::new())),
172 Format::Raw => Ok(Box::new(raw::RawBytesDeserializer::new())),
173 Format::Debezium => Ok(Box::new(debezium::DebeziumDeserializer::new())),
174 Format::Avro => Err(SerdeError::UnsupportedFormat(
175 "Avro deserialization requires the 'kafka' feature".into(),
176 )),
177 }
178}
179
180pub fn create_serializer(format: Format) -> Result<Box<dyn RecordSerializer>, SerdeError> {
186 match format {
187 Format::Json => Ok(Box::new(json::JsonSerializer::new())),
188 Format::Csv => Ok(Box::new(csv::CsvSerializer::new())),
189 Format::Raw => Ok(Box::new(raw::RawBytesSerializer::new())),
190 Format::Debezium => Err(SerdeError::UnsupportedFormat(
191 "Debezium is a deserialization-only format".into(),
192 )),
193 Format::Avro => Err(SerdeError::UnsupportedFormat(
194 "Avro serialization requires the 'kafka' feature".into(),
195 )),
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn test_format_from_str() {
205 assert_eq!(Format::parse("json").unwrap(), Format::Json);
206 assert_eq!(Format::parse("JSON").unwrap(), Format::Json);
207 assert_eq!(Format::parse("csv").unwrap(), Format::Csv);
208 assert_eq!(Format::parse("raw").unwrap(), Format::Raw);
209 assert_eq!(Format::parse("bytes").unwrap(), Format::Raw);
210 assert_eq!(Format::parse("debezium").unwrap(), Format::Debezium);
211 assert_eq!(Format::parse("debezium-json").unwrap(), Format::Debezium);
212 assert_eq!(Format::parse("avro").unwrap(), Format::Avro);
213 assert_eq!(Format::parse("confluent-avro").unwrap(), Format::Avro);
214 }
215
216 #[test]
217 fn test_format_display() {
218 assert_eq!(Format::Json.to_string(), "json");
219 assert_eq!(Format::Csv.to_string(), "csv");
220 assert_eq!(Format::Raw.to_string(), "raw");
221 assert_eq!(Format::Debezium.to_string(), "debezium");
222 assert_eq!(Format::Avro.to_string(), "avro");
223 }
224
225 #[test]
226 fn test_create_deserializer() {
227 assert!(create_deserializer(Format::Json).is_ok());
228 assert!(create_deserializer(Format::Csv).is_ok());
229 assert!(create_deserializer(Format::Raw).is_ok());
230 assert!(create_deserializer(Format::Debezium).is_ok());
231 }
232
233 #[test]
234 fn test_create_serializer() {
235 assert!(create_serializer(Format::Json).is_ok());
236 assert!(create_serializer(Format::Csv).is_ok());
237 assert!(create_serializer(Format::Raw).is_ok());
238 assert!(create_serializer(Format::Debezium).is_err()); }
240}