laminar_connectors/serde/
debezium.rs1use std::sync::Arc;
22
23use arrow_array::builder::{Int64Builder, StringBuilder};
24use arrow_array::{ArrayRef, RecordBatch};
25use arrow_schema::{DataType, Field, Schema, SchemaRef};
26use serde_json::Value;
27
28use super::json::JsonDeserializer;
29use super::{Format, RecordDeserializer};
30use crate::error::SerdeError;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum DebeziumOp {
35 Create,
37 Update,
39 Delete,
41 Read,
43}
44
45impl DebeziumOp {
46 fn from_str(s: &str) -> Result<Self, SerdeError> {
48 match s {
49 "c" => Ok(DebeziumOp::Create),
50 "u" => Ok(DebeziumOp::Update),
51 "d" => Ok(DebeziumOp::Delete),
52 "r" => Ok(DebeziumOp::Read),
53 other => Err(SerdeError::MalformedInput(format!(
54 "unknown Debezium op: {other}"
55 ))),
56 }
57 }
58
59 #[must_use]
61 pub fn as_str(&self) -> &'static str {
62 match self {
63 DebeziumOp::Create => "c",
64 DebeziumOp::Update => "u",
65 DebeziumOp::Delete => "d",
66 DebeziumOp::Read => "r",
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
81pub struct DebeziumDeserializer {
82 json_deser: JsonDeserializer,
83}
84
85impl DebeziumDeserializer {
86 #[must_use]
88 pub fn new() -> Self {
89 Self {
90 json_deser: JsonDeserializer::new(),
91 }
92 }
93}
94
95impl Default for DebeziumDeserializer {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl RecordDeserializer for DebeziumDeserializer {
102 fn deserialize(&self, data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, SerdeError> {
103 let envelope: Value = serde_json::from_slice(data)?;
104 let obj = envelope
105 .as_object()
106 .ok_or_else(|| SerdeError::MalformedInput("expected JSON object".into()))?;
107
108 let op_str = obj
110 .get("op")
111 .and_then(Value::as_str)
112 .ok_or_else(|| SerdeError::MissingField("op".into()))?;
113 let op = DebeziumOp::from_str(op_str)?;
114
115 let ts_ms = obj.get("ts_ms").and_then(Value::as_i64).unwrap_or(0);
117
118 let payload = match op {
120 DebeziumOp::Create | DebeziumOp::Update | DebeziumOp::Read => obj
121 .get("after")
122 .ok_or_else(|| SerdeError::MissingField("after".into()))?,
123 DebeziumOp::Delete => obj
124 .get("before")
125 .ok_or_else(|| SerdeError::MissingField("before".into()))?,
126 };
127
128 if payload.is_null() {
129 return Err(SerdeError::MalformedInput(format!(
130 "payload is null for op={op_str}"
131 )));
132 }
133
134 let data_batch = self.json_deser.deserialize_value(payload, schema)?;
136
137 let mut columns: Vec<ArrayRef> = data_batch.columns().to_vec();
139
140 let mut op_builder = StringBuilder::with_capacity(1, 1);
141 op_builder.append_value(op.as_str());
142 columns.push(Arc::new(op_builder.finish()));
143
144 let mut ts_builder = Int64Builder::with_capacity(1);
145 ts_builder.append_value(ts_ms);
146 columns.push(Arc::new(ts_builder.finish()));
147
148 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
150 fields.push(Arc::new(Field::new("__op", DataType::Utf8, false)));
151 fields.push(Arc::new(Field::new("__ts_ms", DataType::Int64, false)));
152 let extended_schema = Arc::new(Schema::new(fields));
153
154 RecordBatch::try_new(extended_schema, columns)
155 .map_err(|e| SerdeError::MalformedInput(format!("failed to create RecordBatch: {e}")))
156 }
157
158 fn format(&self) -> Format {
159 Format::Debezium
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use arrow_array::{Int64Array, StringArray};
167 use arrow_schema::Field;
168
169 fn user_schema() -> SchemaRef {
170 Arc::new(Schema::new(vec![
171 Field::new("id", DataType::Int64, false),
172 Field::new("name", DataType::Utf8, false),
173 ]))
174 }
175
176 #[test]
177 fn test_debezium_insert() {
178 let deser = DebeziumDeserializer::new();
179 let schema = user_schema();
180
181 let data = br#"{
182 "before": null,
183 "after": {"id": 1, "name": "Alice"},
184 "op": "c",
185 "ts_ms": 1700000000000
186 }"#;
187
188 let batch = deser.deserialize(data, &schema).unwrap();
189 assert_eq!(batch.num_rows(), 1);
190 assert_eq!(batch.num_columns(), 4);
192
193 let ids = batch
194 .column(0)
195 .as_any()
196 .downcast_ref::<Int64Array>()
197 .unwrap();
198 assert_eq!(ids.value(0), 1);
199
200 let names = batch
201 .column(1)
202 .as_any()
203 .downcast_ref::<StringArray>()
204 .unwrap();
205 assert_eq!(names.value(0), "Alice");
206
207 let ops = batch
208 .column(2)
209 .as_any()
210 .downcast_ref::<StringArray>()
211 .unwrap();
212 assert_eq!(ops.value(0), "c");
213
214 let ts = batch
215 .column(3)
216 .as_any()
217 .downcast_ref::<Int64Array>()
218 .unwrap();
219 assert_eq!(ts.value(0), 1_700_000_000_000);
220 }
221
222 #[test]
223 fn test_debezium_update() {
224 let deser = DebeziumDeserializer::new();
225 let schema = user_schema();
226
227 let data = br#"{
228 "before": {"id": 1, "name": "Alice"},
229 "after": {"id": 1, "name": "Alicia"},
230 "op": "u",
231 "ts_ms": 1700000001000
232 }"#;
233
234 let batch = deser.deserialize(data, &schema).unwrap();
235 let names = batch
236 .column(1)
237 .as_any()
238 .downcast_ref::<StringArray>()
239 .unwrap();
240 assert_eq!(names.value(0), "Alicia");
241
242 let ops = batch
243 .column(2)
244 .as_any()
245 .downcast_ref::<StringArray>()
246 .unwrap();
247 assert_eq!(ops.value(0), "u");
248 }
249
250 #[test]
251 fn test_debezium_delete() {
252 let deser = DebeziumDeserializer::new();
253 let schema = user_schema();
254
255 let data = br#"{
256 "before": {"id": 1, "name": "Alice"},
257 "after": null,
258 "op": "d",
259 "ts_ms": 1700000002000
260 }"#;
261
262 let batch = deser.deserialize(data, &schema).unwrap();
263 let ids = batch
264 .column(0)
265 .as_any()
266 .downcast_ref::<Int64Array>()
267 .unwrap();
268 assert_eq!(ids.value(0), 1);
269
270 let ops = batch
271 .column(2)
272 .as_any()
273 .downcast_ref::<StringArray>()
274 .unwrap();
275 assert_eq!(ops.value(0), "d");
276 }
277
278 #[test]
279 fn test_debezium_invalid_op() {
280 let deser = DebeziumDeserializer::new();
281 let schema = user_schema();
282
283 let data = br#"{
284 "before": null,
285 "after": {"id": 1, "name": "Alice"},
286 "op": "x",
287 "ts_ms": 1700000000000
288 }"#;
289
290 let result = deser.deserialize(data, &schema);
291 assert!(result.is_err());
292 }
293}