Skip to main content

laminar_connectors/serde/
debezium.rs

1//! Debezium CDC envelope format deserialization.
2//!
3//! Parses Debezium JSON change events into Arrow `RecordBatch`.
4//!
5//! ## Debezium Envelope Format
6//!
7//! ```json
8//! {
9//!   "before": { ... },       // null for inserts
10//!   "after":  { ... },       // null for deletes
11//!   "source": { ... },       // source metadata
12//!   "op": "c|u|d|r",         // operation: create, update, delete, read (snapshot)
13//!   "ts_ms": 1234567890      // timestamp
14//! }
15//! ```
16//!
17//! The deserializer extracts the `after` payload for inserts/updates
18//! and the `before` payload for deletes, adding `__op` and `__ts_ms`
19//! metadata columns.
20
21use std::sync::Arc;
22
23use arrow_array::builder::{Int64Builder, StringBuilder};
24use arrow_array::{ArrayRef, RecordBatch};
25use arrow_schema::{DataType, Field, Schema, SchemaRef};
26use serde_json::Value;
27
28use super::json::JsonDeserializer;
29use super::{Format, RecordDeserializer};
30use crate::error::SerdeError;
31
32/// Debezium operation types.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum DebeziumOp {
35    /// Create (insert).
36    Create,
37    /// Update.
38    Update,
39    /// Delete.
40    Delete,
41    /// Read (snapshot).
42    Read,
43}
44
45impl DebeziumOp {
46    /// Parses an operation from the Debezium `op` field.
47    fn from_str(s: &str) -> Result<Self, SerdeError> {
48        match s {
49            "c" => Ok(DebeziumOp::Create),
50            "u" => Ok(DebeziumOp::Update),
51            "d" => Ok(DebeziumOp::Delete),
52            "r" => Ok(DebeziumOp::Read),
53            other => Err(SerdeError::MalformedInput(format!(
54                "unknown Debezium op: {other}"
55            ))),
56        }
57    }
58
59    /// Returns the operation as a string.
60    #[must_use]
61    pub fn as_str(&self) -> &'static str {
62        match self {
63            DebeziumOp::Create => "c",
64            DebeziumOp::Update => "u",
65            DebeziumOp::Delete => "d",
66            DebeziumOp::Read => "r",
67        }
68    }
69}
70
71/// Debezium CDC envelope deserializer.
72///
73/// Extracts the data payload from a Debezium change event and converts
74/// it to an Arrow `RecordBatch`. Two metadata columns are appended:
75/// - `__op`: The operation type (c/u/d/r)
76/// - `__ts_ms`: The event timestamp in milliseconds
77///
78/// The provided schema should describe the data columns only (without
79/// `__op` and `__ts_ms`); these are added automatically.
80#[derive(Debug, Clone)]
81pub struct DebeziumDeserializer {
82    json_deser: JsonDeserializer,
83}
84
85impl DebeziumDeserializer {
86    /// Creates a new Debezium deserializer.
87    #[must_use]
88    pub fn new() -> Self {
89        Self {
90            json_deser: JsonDeserializer::new(),
91        }
92    }
93}
94
95impl Default for DebeziumDeserializer {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl RecordDeserializer for DebeziumDeserializer {
102    fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
103        let envelope: Value = serde_json::from_slice(data)?;
104        let obj = envelope
105            .as_object()
106            .ok_or_else(|| SerdeError::MalformedInput("expected JSON object".into()))?;
107
108        // Extract operation
109        let op_str = obj
110            .get("op")
111            .and_then(Value::as_str)
112            .ok_or_else(|| SerdeError::MissingField("op".into()))?;
113        let op = DebeziumOp::from_str(op_str)?;
114
115        // Extract timestamp
116        let ts_ms = obj.get("ts_ms").and_then(Value::as_i64).unwrap_or(0);
117
118        // Extract payload: `after` for create/update/read, `before` for delete
119        let payload = match op {
120            DebeziumOp::Create | DebeziumOp::Update | DebeziumOp::Read => obj
121                .get("after")
122                .ok_or_else(|| SerdeError::MissingField("after".into()))?,
123            DebeziumOp::Delete => obj
124                .get("before")
125                .ok_or_else(|| SerdeError::MissingField("before".into()))?,
126        };
127
128        if payload.is_null() {
129            return Err(SerdeError::MalformedInput(format!(
130                "payload is null for op={op_str}"
131            )));
132        }
133
134        // Deserialize the payload directly from the parsed Value (avoids double parse)
135        let data_batch = self.json_deser.deserialize_value(payload, schema)?;
136
137        // Append __op and __ts_ms columns
138        let mut columns: Vec<ArrayRef> = data_batch.columns().to_vec();
139
140        let mut op_builder = StringBuilder::with_capacity(1, 1);
141        op_builder.append_value(op.as_str());
142        columns.push(Arc::new(op_builder.finish()));
143
144        let mut ts_builder = Int64Builder::with_capacity(1);
145        ts_builder.append_value(ts_ms);
146        columns.push(Arc::new(ts_builder.finish()));
147
148        // Build extended schema with metadata columns
149        let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
150        fields.push(Arc::new(Field::new("__op", DataType::Utf8, false)));
151        fields.push(Arc::new(Field::new("__ts_ms", DataType::Int64, false)));
152        let extended_schema = Arc::new(Schema::new(fields));
153
154        RecordBatch::try_new(extended_schema, columns)
155            .map_err(|e| SerdeError::MalformedInput(format!("failed to create RecordBatch: {e}")))
156    }
157
158    fn format(&self) -> Format {
159        Format::Debezium
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use arrow_array::{Int64Array, StringArray};
167    use arrow_schema::Field;
168
169    fn user_schema() -> SchemaRef {
170        Arc::new(Schema::new(vec![
171            Field::new("id", DataType::Int64, false),
172            Field::new("name", DataType::Utf8, false),
173        ]))
174    }
175
176    #[test]
177    fn test_debezium_insert() {
178        let deser = DebeziumDeserializer::new();
179        let schema = user_schema();
180
181        let data = br#"{
182            "before": null,
183            "after": {"id": 1, "name": "Alice"},
184            "op": "c",
185            "ts_ms": 1700000000000
186        }"#;
187
188        let batch = deser.deserialize(data, &schema).unwrap();
189        assert_eq!(batch.num_rows(), 1);
190        // data columns + __op + __ts_ms
191        assert_eq!(batch.num_columns(), 4);
192
193        let ids = batch
194            .column(0)
195            .as_any()
196            .downcast_ref::<Int64Array>()
197            .unwrap();
198        assert_eq!(ids.value(0), 1);
199
200        let names = batch
201            .column(1)
202            .as_any()
203            .downcast_ref::<StringArray>()
204            .unwrap();
205        assert_eq!(names.value(0), "Alice");
206
207        let ops = batch
208            .column(2)
209            .as_any()
210            .downcast_ref::<StringArray>()
211            .unwrap();
212        assert_eq!(ops.value(0), "c");
213
214        let ts = batch
215            .column(3)
216            .as_any()
217            .downcast_ref::<Int64Array>()
218            .unwrap();
219        assert_eq!(ts.value(0), 1_700_000_000_000);
220    }
221
222    #[test]
223    fn test_debezium_update() {
224        let deser = DebeziumDeserializer::new();
225        let schema = user_schema();
226
227        let data = br#"{
228            "before": {"id": 1, "name": "Alice"},
229            "after": {"id": 1, "name": "Alicia"},
230            "op": "u",
231            "ts_ms": 1700000001000
232        }"#;
233
234        let batch = deser.deserialize(data, &schema).unwrap();
235        let names = batch
236            .column(1)
237            .as_any()
238            .downcast_ref::<StringArray>()
239            .unwrap();
240        assert_eq!(names.value(0), "Alicia");
241
242        let ops = batch
243            .column(2)
244            .as_any()
245            .downcast_ref::<StringArray>()
246            .unwrap();
247        assert_eq!(ops.value(0), "u");
248    }
249
250    #[test]
251    fn test_debezium_delete() {
252        let deser = DebeziumDeserializer::new();
253        let schema = user_schema();
254
255        let data = br#"{
256            "before": {"id": 1, "name": "Alice"},
257            "after": null,
258            "op": "d",
259            "ts_ms": 1700000002000
260        }"#;
261
262        let batch = deser.deserialize(data, &schema).unwrap();
263        let ids = batch
264            .column(0)
265            .as_any()
266            .downcast_ref::<Int64Array>()
267            .unwrap();
268        assert_eq!(ids.value(0), 1);
269
270        let ops = batch
271            .column(2)
272            .as_any()
273            .downcast_ref::<StringArray>()
274            .unwrap();
275        assert_eq!(ops.value(0), "d");
276    }
277
278    #[test]
279    fn test_debezium_invalid_op() {
280        let deser = DebeziumDeserializer::new();
281        let schema = user_schema();
282
283        let data = br#"{
284            "before": null,
285            "after": {"id": 1, "name": "Alice"},
286            "op": "x",
287            "ts_ms": 1700000000000
288        }"#;
289
290        let result = deser.deserialize(data, &schema);
291        assert!(result.is_err());
292    }
293}