Skip to main content

laminar_connectors/cdc/mysql/
changelog.rs

1//! MySQL CDC changelog conversion to Z-set format.
2//!
3//! Converts MySQL binlog row events into CDC change events compatible
4//! with LaminarDB's Z-set changelog format.
5
6use std::sync::Arc;
7
8use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, UInt64Array};
9use arrow_schema::{DataType, Field, Schema};
10
11use super::decoder::{
12    ColumnValue, DeleteMessage, InsertMessage, RowData, UpdateMessage, UpdateRowData,
13};
14use super::schema::TableInfo;
15
16/// CDC operation types.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CdcOperation {
19    /// Insert operation.
20    Insert,
21    /// Update operation (before image).
22    UpdateBefore,
23    /// Update operation (after image).
24    UpdateAfter,
25    /// Delete operation.
26    Delete,
27}
28
29impl CdcOperation {
30    /// Returns the operation code as a string.
31    #[must_use]
32    pub fn as_str(&self) -> &'static str {
33        match self {
34            CdcOperation::Insert => "I",
35            CdcOperation::UpdateBefore => "U-",
36            CdcOperation::UpdateAfter => "U+",
37            CdcOperation::Delete => "D",
38        }
39    }
40
41    /// Returns the Z-set weight for this operation.
42    #[must_use]
43    pub fn weight(&self) -> i8 {
44        match self {
45            CdcOperation::Insert | CdcOperation::UpdateAfter => 1,
46            CdcOperation::Delete | CdcOperation::UpdateBefore => -1,
47        }
48    }
49}
50
51/// A CDC change event from MySQL binlog.
52#[derive(Debug, Clone)]
53pub struct ChangeEvent {
54    /// Source table (database.table).
55    pub table: String,
56    /// Operation type.
57    pub operation: CdcOperation,
58    /// Timestamp in milliseconds since Unix epoch.
59    pub timestamp_ms: i64,
60    /// Binlog filename.
61    pub binlog_file: String,
62    /// Binlog position.
63    pub binlog_position: u64,
64    /// GTID (if available).
65    pub gtid: Option<String>,
66    /// Row data.
67    pub row: RowData,
68}
69
70impl ChangeEvent {
71    /// Creates an insert change event.
72    #[must_use]
73    pub fn insert(
74        table: String,
75        timestamp_ms: i64,
76        binlog_file: String,
77        binlog_position: u64,
78        gtid: Option<String>,
79        row: RowData,
80    ) -> Self {
81        Self {
82            table,
83            operation: CdcOperation::Insert,
84            timestamp_ms,
85            binlog_file,
86            binlog_position,
87            gtid,
88            row,
89        }
90    }
91
92    /// Creates a delete change event.
93    #[must_use]
94    pub fn delete(
95        table: String,
96        timestamp_ms: i64,
97        binlog_file: String,
98        binlog_position: u64,
99        gtid: Option<String>,
100        row: RowData,
101    ) -> Self {
102        Self {
103            table,
104            operation: CdcOperation::Delete,
105            timestamp_ms,
106            binlog_file,
107            binlog_position,
108            gtid,
109            row,
110        }
111    }
112}
113
114/// Converts an INSERT message to change events.
115#[must_use]
116pub fn insert_to_events(
117    msg: &InsertMessage,
118    binlog_file: &str,
119    gtid: Option<&str>,
120) -> Vec<ChangeEvent> {
121    let table = format!("{}.{}", msg.database, msg.table);
122
123    msg.rows
124        .iter()
125        .map(|row| {
126            ChangeEvent::insert(
127                table.clone(),
128                msg.timestamp_ms,
129                binlog_file.to_string(),
130                msg.binlog_position,
131                gtid.map(String::from),
132                row.clone(),
133            )
134        })
135        .collect()
136}
137
138/// Converts an UPDATE message to change events.
139///
140/// Each UPDATE row generates two events:
141/// - U- (before image, weight -1)
142/// - U+ (after image, weight +1)
143#[must_use]
144pub fn update_to_events(
145    msg: &UpdateMessage,
146    binlog_file: &str,
147    gtid: Option<&str>,
148) -> Vec<ChangeEvent> {
149    let table = format!("{}.{}", msg.database, msg.table);
150    let mut events = Vec::with_capacity(msg.rows.len() * 2);
151
152    for UpdateRowData { before, after } in &msg.rows {
153        // Before image (retraction)
154        events.push(ChangeEvent {
155            table: table.clone(),
156            operation: CdcOperation::UpdateBefore,
157            timestamp_ms: msg.timestamp_ms,
158            binlog_file: binlog_file.to_string(),
159            binlog_position: msg.binlog_position,
160            gtid: gtid.map(String::from),
161            row: before.clone(),
162        });
163
164        // After image (insertion)
165        events.push(ChangeEvent {
166            table: table.clone(),
167            operation: CdcOperation::UpdateAfter,
168            timestamp_ms: msg.timestamp_ms,
169            binlog_file: binlog_file.to_string(),
170            binlog_position: msg.binlog_position,
171            gtid: gtid.map(String::from),
172            row: after.clone(),
173        });
174    }
175
176    events
177}
178
179/// Converts a DELETE message to change events.
180#[must_use]
181pub fn delete_to_events(
182    msg: &DeleteMessage,
183    binlog_file: &str,
184    gtid: Option<&str>,
185) -> Vec<ChangeEvent> {
186    let table = format!("{}.{}", msg.database, msg.table);
187
188    msg.rows
189        .iter()
190        .map(|row| {
191            ChangeEvent::delete(
192                table.clone(),
193                msg.timestamp_ms,
194                binlog_file.to_string(),
195                msg.binlog_position,
196                gtid.map(String::from),
197                row.clone(),
198            )
199        })
200        .collect()
201}
202
203/// Schema for CDC metadata columns.
204#[must_use]
205pub fn cdc_metadata_schema() -> Schema {
206    Schema::new(vec![
207        Field::new("_table", DataType::Utf8, false),
208        Field::new("_op", DataType::Utf8, false),
209        Field::new("_ts_ms", DataType::Int64, false),
210        Field::new("_binlog_file", DataType::Utf8, true),
211        Field::new("_binlog_pos", DataType::UInt64, true),
212        Field::new("_gtid", DataType::Utf8, true),
213    ])
214}
215
216/// Converts change events to a `RecordBatch` with CDC metadata + row data.
217///
218/// # Errors
219///
220/// Returns error if schema conversion fails.
221pub fn events_to_record_batch(
222    events: &[ChangeEvent],
223    table_info: &TableInfo,
224) -> Result<RecordBatch, arrow_schema::ArrowError> {
225    if events.is_empty() {
226        // Return empty batch with correct schema
227        let schema = build_cdc_schema(&table_info.arrow_schema);
228        return Ok(RecordBatch::new_empty(Arc::new(schema)));
229    }
230
231    let _n = events.len();
232
233    // CDC metadata columns
234    let tables: Vec<&str> = events.iter().map(|e| e.table.as_str()).collect();
235    let ops: Vec<&str> = events.iter().map(|e| e.operation.as_str()).collect();
236    let timestamps: Vec<i64> = events.iter().map(|e| e.timestamp_ms).collect();
237    let binlog_files: Vec<Option<&str>> = events
238        .iter()
239        .map(|e| Some(e.binlog_file.as_str()))
240        .collect();
241    let binlog_positions: Vec<Option<u64>> =
242        events.iter().map(|e| Some(e.binlog_position)).collect();
243    let gtids: Vec<Option<&str>> = events.iter().map(|e| e.gtid.as_deref()).collect();
244
245    let mut columns: Vec<ArrayRef> = vec![
246        Arc::new(StringArray::from(tables)),
247        Arc::new(StringArray::from(ops)),
248        Arc::new(Int64Array::from(timestamps)),
249        Arc::new(StringArray::from(binlog_files)),
250        Arc::new(UInt64Array::from(binlog_positions)),
251        Arc::new(StringArray::from(gtids)),
252    ];
253
254    // Row data columns
255    for (col_idx, _col_def) in table_info.columns.iter().enumerate() {
256        let values: Vec<Option<String>> = events
257            .iter()
258            .map(|e| {
259                e.row.columns.get(col_idx).and_then(|v| {
260                    if v.is_null() {
261                        None
262                    } else {
263                        Some(v.to_text())
264                    }
265                })
266            })
267            .collect();
268
269        // For simplicity, convert all to strings
270        // A production implementation would use proper Arrow types
271        let array: ArrayRef = Arc::new(StringArray::from(
272            values.iter().map(|v| v.as_deref()).collect::<Vec<_>>(),
273        ));
274        columns.push(array);
275    }
276
277    let schema = build_cdc_schema(&table_info.arrow_schema);
278    RecordBatch::try_new(Arc::new(schema), columns)
279}
280
281/// Builds the full CDC schema including metadata and row columns.
282fn build_cdc_schema(row_schema: &Schema) -> Schema {
283    let mut fields = vec![
284        Field::new("_table", DataType::Utf8, false),
285        Field::new("_op", DataType::Utf8, false),
286        Field::new("_ts_ms", DataType::Int64, false),
287        Field::new("_binlog_file", DataType::Utf8, true),
288        Field::new("_binlog_pos", DataType::UInt64, true),
289        Field::new("_gtid", DataType::Utf8, true),
290    ];
291
292    // Add row data columns (as strings for now)
293    for field in row_schema.fields() {
294        fields.push(Field::new(field.name(), DataType::Utf8, true));
295    }
296
297    Schema::new(fields)
298}
299
300/// Converts a single column value to JSON for the CDC envelope.
301#[must_use]
302pub fn column_value_to_json(value: &ColumnValue) -> serde_json::Value {
303    match value {
304        ColumnValue::Null => serde_json::Value::Null,
305        ColumnValue::SignedInt(v) => serde_json::json!(v),
306        ColumnValue::UnsignedInt(v) => serde_json::json!(v),
307        ColumnValue::Float(v) => serde_json::json!(v),
308        ColumnValue::Double(v) => serde_json::json!(v),
309        ColumnValue::String(s) => serde_json::json!(s),
310        ColumnValue::Bytes(b) => serde_json::json!(base64_encode(b)),
311        ColumnValue::Date(y, m, d) => serde_json::json!(format!("{y:04}-{m:02}-{d:02}")),
312        ColumnValue::Time(h, m, s, us) => {
313            if *us > 0 {
314                serde_json::json!(format!("{h:02}:{m:02}:{s:02}.{us:06}"))
315            } else {
316                serde_json::json!(format!("{h:02}:{m:02}:{s:02}"))
317            }
318        }
319        ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
320            if *us > 0 {
321                serde_json::json!(format!(
322                    "{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}.{us:06}"
323                ))
324            } else {
325                serde_json::json!(format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}"))
326            }
327        }
328        ColumnValue::Timestamp(us) => serde_json::json!(us),
329        ColumnValue::Json(s) => serde_json::from_str(s).unwrap_or_else(|e| {
330            tracing::warn!(
331                error = %e,
332                json_len = s.len(),
333                "[LDB-4003] MySQL CDC JSON column parse failed, treating as string literal"
334            );
335            serde_json::json!(s)
336        }),
337    }
338}
339
340/// Converts a row to JSON object.
341#[must_use]
342pub fn row_to_json(row: &RowData, columns: &[super::types::MySqlColumn]) -> serde_json::Value {
343    let mut obj = serde_json::Map::new();
344    for (i, col) in columns.iter().enumerate() {
345        if let Some(value) = row.columns.get(i) {
346            obj.insert(col.name.clone(), column_value_to_json(value));
347        }
348    }
349    serde_json::Value::Object(obj)
350}
351
352/// Simple base64 encoding for binary data.
353fn base64_encode(data: &[u8]) -> String {
354    use std::fmt::Write;
355    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
356
357    let mut result = String::new();
358    for chunk in data.chunks(3) {
359        let b0 = chunk[0] as usize;
360        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
361        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
362
363        let _ = result.write_char(ALPHABET[b0 >> 2] as char);
364        let _ = result.write_char(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
365
366        if chunk.len() > 1 {
367            let _ = result.write_char(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
368        } else {
369            result.push('=');
370        }
371
372        if chunk.len() > 2 {
373            let _ = result.write_char(ALPHABET[b2 & 0x3f] as char);
374        } else {
375            result.push('=');
376        }
377    }
378    result
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    fn make_test_row() -> RowData {
386        RowData {
387            columns: vec![
388                ColumnValue::SignedInt(1),
389                ColumnValue::String("Alice".to_string()),
390                ColumnValue::Null,
391            ],
392        }
393    }
394
395    #[test]
396    fn test_cdc_operation_as_str() {
397        assert_eq!(CdcOperation::Insert.as_str(), "I");
398        assert_eq!(CdcOperation::UpdateBefore.as_str(), "U-");
399        assert_eq!(CdcOperation::UpdateAfter.as_str(), "U+");
400        assert_eq!(CdcOperation::Delete.as_str(), "D");
401    }
402
403    #[test]
404    fn test_cdc_operation_weight() {
405        assert_eq!(CdcOperation::Insert.weight(), 1);
406        assert_eq!(CdcOperation::UpdateAfter.weight(), 1);
407        assert_eq!(CdcOperation::Delete.weight(), -1);
408        assert_eq!(CdcOperation::UpdateBefore.weight(), -1);
409    }
410
411    #[test]
412    fn test_change_event_insert() {
413        let event = ChangeEvent::insert(
414            "db.users".to_string(),
415            1704067200000,
416            "mysql-bin.000003".to_string(),
417            12345,
418            Some("gtid-123".to_string()),
419            make_test_row(),
420        );
421
422        assert_eq!(event.table, "db.users");
423        assert_eq!(event.operation, CdcOperation::Insert);
424        assert_eq!(event.timestamp_ms, 1704067200000);
425    }
426
427    #[test]
428    fn test_insert_to_events() {
429        let msg = InsertMessage {
430            table_id: 100,
431            database: "testdb".to_string(),
432            table: "users".to_string(),
433            rows: vec![make_test_row(), make_test_row()],
434            binlog_position: 12345,
435            timestamp_ms: 1704067200000,
436        };
437
438        let events = insert_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
439        assert_eq!(events.len(), 2);
440        assert!(events.iter().all(|e| e.operation == CdcOperation::Insert));
441    }
442
443    #[test]
444    fn test_update_to_events() {
445        let msg = UpdateMessage {
446            table_id: 100,
447            database: "testdb".to_string(),
448            table: "users".to_string(),
449            rows: vec![UpdateRowData {
450                before: RowData {
451                    columns: vec![ColumnValue::String("old".to_string())],
452                },
453                after: RowData {
454                    columns: vec![ColumnValue::String("new".to_string())],
455                },
456            }],
457            binlog_position: 12345,
458            timestamp_ms: 1704067200000,
459        };
460
461        let events = update_to_events(&msg, "mysql-bin.000003", None);
462        assert_eq!(events.len(), 2);
463        assert_eq!(events[0].operation, CdcOperation::UpdateBefore);
464        assert_eq!(events[1].operation, CdcOperation::UpdateAfter);
465    }
466
467    #[test]
468    fn test_delete_to_events() {
469        let msg = DeleteMessage {
470            table_id: 100,
471            database: "testdb".to_string(),
472            table: "users".to_string(),
473            rows: vec![make_test_row()],
474            binlog_position: 12345,
475            timestamp_ms: 1704067200000,
476        };
477
478        let events = delete_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
479        assert_eq!(events.len(), 1);
480        assert_eq!(events[0].operation, CdcOperation::Delete);
481    }
482
483    #[test]
484    fn test_column_value_to_json() {
485        assert_eq!(
486            column_value_to_json(&ColumnValue::Null),
487            serde_json::Value::Null
488        );
489        assert_eq!(
490            column_value_to_json(&ColumnValue::SignedInt(42)),
491            serde_json::json!(42)
492        );
493        assert_eq!(
494            column_value_to_json(&ColumnValue::String("hello".to_string())),
495            serde_json::json!("hello")
496        );
497        assert_eq!(
498            column_value_to_json(&ColumnValue::Date(2024, 6, 15)),
499            serde_json::json!("2024-06-15")
500        );
501    }
502
503    #[test]
504    fn test_cdc_metadata_schema() {
505        let schema = cdc_metadata_schema();
506        assert_eq!(schema.fields().len(), 6);
507        assert_eq!(schema.field(0).name(), "_table");
508        assert_eq!(schema.field(1).name(), "_op");
509    }
510
511    #[test]
512    fn test_base64_encode() {
513        assert_eq!(base64_encode(&[]), "");
514        assert_eq!(base64_encode(b"f"), "Zg==");
515        assert_eq!(base64_encode(b"fo"), "Zm8=");
516        assert_eq!(base64_encode(b"foo"), "Zm9v");
517        assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
518    }
519}