Skip to main content

laminar_connectors/serde/
mod.rs

1//! Record (de)serialization: bytes ↔ Arrow `RecordBatch`. Format
2//! implementations in `json`, `csv`, `raw`, `debezium`.
3
4pub mod csv;
5pub mod debezium;
6pub mod json;
7pub mod raw;
8
9use arrow_array::RecordBatch;
10use arrow_schema::SchemaRef;
11
12use crate::error::SerdeError;
13
14/// Supported serialization formats.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub enum Format {
17    /// JSON format.
18    Json,
19
20    /// CSV format.
21    Csv,
22
23    /// Raw bytes (no deserialization).
24    Raw,
25
26    /// Debezium CDC envelope format.
27    Debezium,
28
29    /// Apache Avro format (with optional Confluent Schema Registry).
30    Avro,
31}
32
33impl Format {
34    /// Returns the format name as a string.
35    #[must_use]
36    pub fn as_str(&self) -> &'static str {
37        match self {
38            Format::Json => "json",
39            Format::Csv => "csv",
40            Format::Raw => "raw",
41            Format::Debezium => "debezium",
42            Format::Avro => "avro",
43        }
44    }
45
46    /// Parses a format from a string.
47    ///
48    /// # Errors
49    ///
50    /// Returns `SerdeError::UnsupportedFormat` if the format is not recognized.
51    pub fn parse(s: &str) -> Result<Self, SerdeError> {
52        s.parse()
53    }
54}
55
56impl std::str::FromStr for Format {
57    type Err = SerdeError;
58
59    fn from_str(s: &str) -> Result<Self, Self::Err> {
60        match s.to_lowercase().as_str() {
61            "json" => Ok(Format::Json),
62            "csv" => Ok(Format::Csv),
63            "raw" | "bytes" => Ok(Format::Raw),
64            "debezium" | "debezium-json" => Ok(Format::Debezium),
65            "avro" | "confluent-avro" => Ok(Format::Avro),
66            other => Err(SerdeError::UnsupportedFormat(other.to_string())),
67        }
68    }
69}
70
71impl std::fmt::Display for Format {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "{}", self.as_str())
74    }
75}
76
77/// Trait for deserializing raw bytes into Arrow `RecordBatch`.
78///
79/// Implementations convert from external formats (JSON, CSV, Avro, etc.)
80/// into Arrow columnar format for processing.
81pub trait RecordDeserializer: Send + Sync {
82    /// Deserializes a single record from raw bytes.
83    ///
84    /// # Errors
85    ///
86    /// Returns `SerdeError` if the input cannot be parsed or doesn't
87    /// match the expected schema.
88    fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError>;
89
90    /// Deserializes a batch of records from raw bytes.
91    ///
92    /// The default implementation calls `deserialize` for each record
93    /// and concatenates the results. Implementations should override
94    /// this for better performance with batch-oriented formats.
95    ///
96    /// # Errors
97    ///
98    /// Returns `SerdeError` if any record cannot be parsed.
99    fn deserialize_batch(
100        &self,
101        records: &[&[u8]],
102        schema: &SchemaRef,
103    ) -> Result<RecordBatch, SerdeError> {
104        if records.is_empty() {
105            return Ok(RecordBatch::new_empty(schema.clone()));
106        }
107
108        let batches: Result<Vec<RecordBatch>, SerdeError> = records
109            .iter()
110            .map(|data| self.deserialize(data, schema))
111            .collect();
112        let batches = batches?;
113
114        arrow_select::concat::concat_batches(schema, &batches)
115            .map_err(|e| SerdeError::MalformedInput(format!("failed to concat batches: {e}")))
116    }
117
118    /// Returns the format this deserializer handles.
119    fn format(&self) -> Format;
120
121    /// Downcasts to concrete type (e.g., for Avro schema resolution).
122    fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
123        None
124    }
125}
126
127/// Trait for serializing Arrow `RecordBatch` into raw bytes.
128///
129/// Implementations convert from Arrow columnar format to external
130/// formats for writing to sinks.
131pub trait RecordSerializer: Send + Sync {
132    /// Serializes a `RecordBatch` into a vector of byte records.
133    ///
134    /// Each element in the returned vector represents one serialized record.
135    ///
136    /// # Errors
137    ///
138    /// Returns `SerdeError` if serialization fails.
139    fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError>;
140
141    /// Serializes a `RecordBatch` into a single byte buffer.
142    ///
143    /// For formats that support batch encoding (e.g., JSON array, CSV),
144    /// this may be more efficient than serializing individual records.
145    ///
146    /// # Errors
147    ///
148    /// Returns `SerdeError` if serialization fails.
149    fn serialize_batch(&self, batch: &RecordBatch) -> Result<Vec<u8>, SerdeError> {
150        let records = self.serialize(batch)?;
151        let total_len: usize = records.iter().map(Vec::len).sum();
152        let mut buf = Vec::with_capacity(total_len);
153        for record in &records {
154            buf.extend_from_slice(record);
155        }
156        Ok(buf)
157    }
158
159    /// Returns the format this serializer produces.
160    fn format(&self) -> Format;
161}
162
163/// Creates a deserializer for the given format.
164///
165/// # Errors
166///
167/// Returns `SerdeError::UnsupportedFormat` if the format is not supported.
168pub fn create_deserializer(format: Format) -> Result<Box<dyn RecordDeserializer>, SerdeError> {
169    match format {
170        Format::Json => Ok(Box::new(json::JsonDeserializer::new())),
171        Format::Csv => Ok(Box::new(csv::CsvDeserializer::new())),
172        Format::Raw => Ok(Box::new(raw::RawBytesDeserializer::new())),
173        Format::Debezium => Ok(Box::new(debezium::DebeziumDeserializer::new())),
174        Format::Avro => Err(SerdeError::UnsupportedFormat(
175            "Avro deserialization requires the 'kafka' feature".into(),
176        )),
177    }
178}
179
180/// Creates a serializer for the given format.
181///
182/// # Errors
183///
184/// Returns `SerdeError::UnsupportedFormat` if the format is not supported.
185pub fn create_serializer(format: Format) -> Result<Box<dyn RecordSerializer>, SerdeError> {
186    match format {
187        Format::Json => Ok(Box::new(json::JsonSerializer::new())),
188        Format::Csv => Ok(Box::new(csv::CsvSerializer::new())),
189        Format::Raw => Ok(Box::new(raw::RawBytesSerializer::new())),
190        Format::Debezium => Err(SerdeError::UnsupportedFormat(
191            "Debezium is a deserialization-only format".into(),
192        )),
193        Format::Avro => Err(SerdeError::UnsupportedFormat(
194            "Avro serialization requires the 'kafka' feature".into(),
195        )),
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn test_format_from_str() {
205        assert_eq!(Format::parse("json").unwrap(), Format::Json);
206        assert_eq!(Format::parse("JSON").unwrap(), Format::Json);
207        assert_eq!(Format::parse("csv").unwrap(), Format::Csv);
208        assert_eq!(Format::parse("raw").unwrap(), Format::Raw);
209        assert_eq!(Format::parse("bytes").unwrap(), Format::Raw);
210        assert_eq!(Format::parse("debezium").unwrap(), Format::Debezium);
211        assert_eq!(Format::parse("debezium-json").unwrap(), Format::Debezium);
212        assert_eq!(Format::parse("avro").unwrap(), Format::Avro);
213        assert_eq!(Format::parse("confluent-avro").unwrap(), Format::Avro);
214    }
215
216    #[test]
217    fn test_format_display() {
218        assert_eq!(Format::Json.to_string(), "json");
219        assert_eq!(Format::Csv.to_string(), "csv");
220        assert_eq!(Format::Raw.to_string(), "raw");
221        assert_eq!(Format::Debezium.to_string(), "debezium");
222        assert_eq!(Format::Avro.to_string(), "avro");
223    }
224
225    #[test]
226    fn test_create_deserializer() {
227        assert!(create_deserializer(Format::Json).is_ok());
228        assert!(create_deserializer(Format::Csv).is_ok());
229        assert!(create_deserializer(Format::Raw).is_ok());
230        assert!(create_deserializer(Format::Debezium).is_ok());
231    }
232
233    #[test]
234    fn test_create_serializer() {
235        assert!(create_serializer(Format::Json).is_ok());
236        assert!(create_serializer(Format::Csv).is_ok());
237        assert!(create_serializer(Format::Raw).is_ok());
238        assert!(create_serializer(Format::Debezium).is_err()); // deser-only
239    }
240}