Skip to main content

laminar_connectors/serde/
mod.rs

1//! Record serialization and deserialization framework.
2//!
3//! Provides traits and implementations for converting between external
4//! data formats and Arrow `RecordBatch`:
5//!
6//! - `RecordDeserializer`: Converts raw bytes to `RecordBatch`
7//! - `RecordSerializer`: Converts `RecordBatch` to raw bytes
8//! - `Format`: Enum of supported serialization formats
9//!
10//! ## Implementations
11//!
12//! - `json`: JSON format using `serde_json`
13//! - `csv`: CSV format
14//! - `raw`: Raw bytes pass-through
15//! - `debezium`: Debezium CDC envelope format
16
17pub mod csv;
18pub mod debezium;
19pub mod json;
20pub mod raw;
21
22use arrow_array::RecordBatch;
23use arrow_schema::SchemaRef;
24
25use crate::error::SerdeError;
26
27/// Supported serialization formats.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum Format {
30    /// JSON format.
31    Json,
32
33    /// CSV format.
34    Csv,
35
36    /// Raw bytes (no deserialization).
37    Raw,
38
39    /// Debezium CDC envelope format.
40    Debezium,
41
42    /// Apache Avro format (with optional Confluent Schema Registry).
43    Avro,
44}
45
46impl Format {
47    /// Returns the format name as a string.
48    #[must_use]
49    pub fn as_str(&self) -> &'static str {
50        match self {
51            Format::Json => "json",
52            Format::Csv => "csv",
53            Format::Raw => "raw",
54            Format::Debezium => "debezium",
55            Format::Avro => "avro",
56        }
57    }
58
59    /// Parses a format from a string.
60    ///
61    /// # Errors
62    ///
63    /// Returns `SerdeError::UnsupportedFormat` if the format is not recognized.
64    pub fn parse(s: &str) -> Result<Self, SerdeError> {
65        s.parse()
66    }
67}
68
69impl std::str::FromStr for Format {
70    type Err = SerdeError;
71
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        match s.to_lowercase().as_str() {
74            "json" => Ok(Format::Json),
75            "csv" => Ok(Format::Csv),
76            "raw" | "bytes" => Ok(Format::Raw),
77            "debezium" | "debezium-json" => Ok(Format::Debezium),
78            "avro" | "confluent-avro" => Ok(Format::Avro),
79            other => Err(SerdeError::UnsupportedFormat(other.to_string())),
80        }
81    }
82}
83
84impl std::fmt::Display for Format {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{}", self.as_str())
87    }
88}
89
90/// Trait for deserializing raw bytes into Arrow `RecordBatch`.
91///
92/// Implementations convert from external formats (JSON, CSV, Avro, etc.)
93/// into Arrow columnar format for processing.
94pub trait RecordDeserializer: Send + Sync {
95    /// Deserializes a single record from raw bytes.
96    ///
97    /// # Errors
98    ///
99    /// Returns `SerdeError` if the input cannot be parsed or doesn't
100    /// match the expected schema.
101    fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError>;
102
103    /// Deserializes a batch of records from raw bytes.
104    ///
105    /// The default implementation calls `deserialize` for each record
106    /// and concatenates the results. Implementations should override
107    /// this for better performance with batch-oriented formats.
108    ///
109    /// # Errors
110    ///
111    /// Returns `SerdeError` if any record cannot be parsed.
112    fn deserialize_batch(
113        &self,
114        records: &[&[u8]],
115        schema: &SchemaRef,
116    ) -> Result<RecordBatch, SerdeError> {
117        if records.is_empty() {
118            return Ok(RecordBatch::new_empty(schema.clone()));
119        }
120
121        let batches: Result<Vec<RecordBatch>, SerdeError> = records
122            .iter()
123            .map(|data| self.deserialize(data, schema))
124            .collect();
125        let batches = batches?;
126
127        arrow_select::concat::concat_batches(schema, &batches)
128            .map_err(|e| SerdeError::MalformedInput(format!("failed to concat batches: {e}")))
129    }
130
131    /// Returns the format this deserializer handles.
132    fn format(&self) -> Format;
133
134    /// Downcasts to concrete type (e.g., for Avro schema resolution).
135    fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
136        None
137    }
138}
139
140/// Trait for serializing Arrow `RecordBatch` into raw bytes.
141///
142/// Implementations convert from Arrow columnar format to external
143/// formats for writing to sinks.
144pub trait RecordSerializer: Send + Sync {
145    /// Serializes a `RecordBatch` into a vector of byte records.
146    ///
147    /// Each element in the returned vector represents one serialized record.
148    ///
149    /// # Errors
150    ///
151    /// Returns `SerdeError` if serialization fails.
152    fn serialize(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>, SerdeError>;
153
154    /// Serializes a `RecordBatch` into a single byte buffer.
155    ///
156    /// For formats that support batch encoding (e.g., JSON array, CSV),
157    /// this may be more efficient than serializing individual records.
158    ///
159    /// # Errors
160    ///
161    /// Returns `SerdeError` if serialization fails.
162    fn serialize_batch(&self, batch: &RecordBatch) -> Result<Vec<u8>, SerdeError> {
163        let records = self.serialize(batch)?;
164        let total_len: usize = records.iter().map(Vec::len).sum();
165        let mut buf = Vec::with_capacity(total_len);
166        for record in &records {
167            buf.extend_from_slice(record);
168        }
169        Ok(buf)
170    }
171
172    /// Returns the format this serializer produces.
173    fn format(&self) -> Format;
174}
175
176/// Creates a deserializer for the given format.
177///
178/// # Errors
179///
180/// Returns `SerdeError::UnsupportedFormat` if the format is not supported.
181pub fn create_deserializer(format: Format) -> Result<Box<dyn RecordDeserializer>, SerdeError> {
182    match format {
183        Format::Json => Ok(Box::new(json::JsonDeserializer::new())),
184        Format::Csv => Ok(Box::new(csv::CsvDeserializer::new())),
185        Format::Raw => Ok(Box::new(raw::RawBytesDeserializer::new())),
186        Format::Debezium => Ok(Box::new(debezium::DebeziumDeserializer::new())),
187        Format::Avro => Err(SerdeError::UnsupportedFormat(
188            "Avro deserialization requires the 'kafka' feature".into(),
189        )),
190    }
191}
192
193/// Creates a serializer for the given format.
194///
195/// # Errors
196///
197/// Returns `SerdeError::UnsupportedFormat` if the format is not supported.
198pub fn create_serializer(format: Format) -> Result<Box<dyn RecordSerializer>, SerdeError> {
199    match format {
200        Format::Json => Ok(Box::new(json::JsonSerializer::new())),
201        Format::Csv => Ok(Box::new(csv::CsvSerializer::new())),
202        Format::Raw => Ok(Box::new(raw::RawBytesSerializer::new())),
203        Format::Debezium => Err(SerdeError::UnsupportedFormat(
204            "Debezium is a deserialization-only format".into(),
205        )),
206        Format::Avro => Err(SerdeError::UnsupportedFormat(
207            "Avro serialization requires the 'kafka' feature".into(),
208        )),
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_format_from_str() {
218        assert_eq!(Format::parse("json").unwrap(), Format::Json);
219        assert_eq!(Format::parse("JSON").unwrap(), Format::Json);
220        assert_eq!(Format::parse("csv").unwrap(), Format::Csv);
221        assert_eq!(Format::parse("raw").unwrap(), Format::Raw);
222        assert_eq!(Format::parse("bytes").unwrap(), Format::Raw);
223        assert_eq!(Format::parse("debezium").unwrap(), Format::Debezium);
224        assert_eq!(Format::parse("debezium-json").unwrap(), Format::Debezium);
225        assert_eq!(Format::parse("avro").unwrap(), Format::Avro);
226        assert_eq!(Format::parse("confluent-avro").unwrap(), Format::Avro);
227    }
228
229    #[test]
230    fn test_format_display() {
231        assert_eq!(Format::Json.to_string(), "json");
232        assert_eq!(Format::Csv.to_string(), "csv");
233        assert_eq!(Format::Raw.to_string(), "raw");
234        assert_eq!(Format::Debezium.to_string(), "debezium");
235        assert_eq!(Format::Avro.to_string(), "avro");
236    }
237
238    #[test]
239    fn test_create_deserializer() {
240        assert!(create_deserializer(Format::Json).is_ok());
241        assert!(create_deserializer(Format::Csv).is_ok());
242        assert!(create_deserializer(Format::Raw).is_ok());
243        assert!(create_deserializer(Format::Debezium).is_ok());
244    }
245
246    #[test]
247    fn test_create_serializer() {
248        assert!(create_serializer(Format::Json).is_ok());
249        assert!(create_serializer(Format::Csv).is_ok());
250        assert!(create_serializer(Format::Raw).is_ok());
251        assert!(create_serializer(Format::Debezium).is_err()); // deser-only
252    }
253}