Skip to main content

laminar_connectors/cdc/mysql/
changelog.rs

1//! MySQL CDC changelog conversion to Z-set format.
2//!
3//! Converts MySQL binlog row events into CDC change events compatible
4//! with LaminarDB's Z-set changelog format.
5
6use std::sync::Arc;
7
8use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
9use arrow_schema::{DataType, Field, Schema};
10
11use super::decoder::{
12    ColumnValue, DeleteMessage, InsertMessage, RowData, UpdateMessage, UpdateRowData,
13};
14use super::schema::TableInfo;
15
16/// CDC operation types.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CdcOperation {
19    /// Insert operation.
20    Insert,
21    /// Update operation (before image).
22    UpdateBefore,
23    /// Update operation (after image).
24    UpdateAfter,
25    /// Delete operation.
26    Delete,
27}
28
29impl CdcOperation {
30    /// Returns the operation code as a string.
31    #[must_use]
32    pub fn as_str(&self) -> &'static str {
33        match self {
34            CdcOperation::Insert => "I",
35            CdcOperation::UpdateBefore => "U-",
36            CdcOperation::UpdateAfter => "U+",
37            CdcOperation::Delete => "D",
38        }
39    }
40
41    /// Returns the Z-set weight for this operation.
42    #[must_use]
43    pub fn weight(&self) -> i8 {
44        match self {
45            CdcOperation::Insert | CdcOperation::UpdateAfter => 1,
46            CdcOperation::Delete | CdcOperation::UpdateBefore => -1,
47        }
48    }
49}
50
51/// A CDC change event from MySQL binlog.
52#[derive(Debug, Clone)]
53pub struct ChangeEvent {
54    /// Source table (database.table).
55    pub table: String,
56    /// Operation type.
57    pub operation: CdcOperation,
58    /// Timestamp in milliseconds since Unix epoch.
59    pub timestamp_ms: i64,
60    /// Binlog filename.
61    pub binlog_file: String,
62    /// Binlog position.
63    pub binlog_position: u64,
64    /// GTID (if available).
65    pub gtid: Option<String>,
66    /// Row data.
67    pub row: RowData,
68}
69
70impl ChangeEvent {
71    /// Creates an insert change event.
72    #[must_use]
73    pub fn insert(
74        table: String,
75        timestamp_ms: i64,
76        binlog_file: String,
77        binlog_position: u64,
78        gtid: Option<String>,
79        row: RowData,
80    ) -> Self {
81        Self {
82            table,
83            operation: CdcOperation::Insert,
84            timestamp_ms,
85            binlog_file,
86            binlog_position,
87            gtid,
88            row,
89        }
90    }
91
92    /// Creates a delete change event.
93    #[must_use]
94    pub fn delete(
95        table: String,
96        timestamp_ms: i64,
97        binlog_file: String,
98        binlog_position: u64,
99        gtid: Option<String>,
100        row: RowData,
101    ) -> Self {
102        Self {
103            table,
104            operation: CdcOperation::Delete,
105            timestamp_ms,
106            binlog_file,
107            binlog_position,
108            gtid,
109            row,
110        }
111    }
112}
113
114/// Converts an INSERT message to change events.
115#[must_use]
116pub fn insert_to_events(
117    msg: &InsertMessage,
118    binlog_file: &str,
119    gtid: Option<&str>,
120) -> Vec<ChangeEvent> {
121    let table = format!("{}.{}", msg.database, msg.table);
122
123    msg.rows
124        .iter()
125        .map(|row| {
126            ChangeEvent::insert(
127                table.clone(),
128                msg.timestamp_ms,
129                binlog_file.to_string(),
130                msg.binlog_position,
131                gtid.map(String::from),
132                row.clone(),
133            )
134        })
135        .collect()
136}
137
138/// Converts an UPDATE message to change events.
139///
140/// Each UPDATE row generates two events:
141/// - U- (before image, weight -1)
142/// - U+ (after image, weight +1)
143#[must_use]
144pub fn update_to_events(
145    msg: &UpdateMessage,
146    binlog_file: &str,
147    gtid: Option<&str>,
148) -> Vec<ChangeEvent> {
149    let table = format!("{}.{}", msg.database, msg.table);
150    let mut events = Vec::with_capacity(msg.rows.len() * 2);
151
152    for UpdateRowData { before, after } in &msg.rows {
153        // Before image (retraction)
154        events.push(ChangeEvent {
155            table: table.clone(),
156            operation: CdcOperation::UpdateBefore,
157            timestamp_ms: msg.timestamp_ms,
158            binlog_file: binlog_file.to_string(),
159            binlog_position: msg.binlog_position,
160            gtid: gtid.map(String::from),
161            row: before.clone(),
162        });
163
164        // After image (insertion)
165        events.push(ChangeEvent {
166            table: table.clone(),
167            operation: CdcOperation::UpdateAfter,
168            timestamp_ms: msg.timestamp_ms,
169            binlog_file: binlog_file.to_string(),
170            binlog_position: msg.binlog_position,
171            gtid: gtid.map(String::from),
172            row: after.clone(),
173        });
174    }
175
176    events
177}
178
179/// Converts a DELETE message to change events.
180#[must_use]
181pub fn delete_to_events(
182    msg: &DeleteMessage,
183    binlog_file: &str,
184    gtid: Option<&str>,
185) -> Vec<ChangeEvent> {
186    let table = format!("{}.{}", msg.database, msg.table);
187
188    msg.rows
189        .iter()
190        .map(|row| {
191            ChangeEvent::delete(
192                table.clone(),
193                msg.timestamp_ms,
194                binlog_file.to_string(),
195                msg.binlog_position,
196                gtid.map(String::from),
197                row.clone(),
198            )
199        })
200        .collect()
201}
202
203/// Schema for CDC metadata columns.
204#[must_use]
205pub fn cdc_metadata_schema() -> Schema {
206    use arrow_schema::TimeUnit;
207    Schema::new(vec![
208        Field::new("_table", DataType::Utf8, false),
209        Field::new("_op", DataType::Utf8, false),
210        Field::new(
211            "_ts_ms",
212            DataType::Timestamp(TimeUnit::Millisecond, None),
213            false,
214        ),
215        Field::new("_binlog_file", DataType::Utf8, true),
216        Field::new("_binlog_pos", DataType::UInt64, true),
217        Field::new("_gtid", DataType::Utf8, true),
218    ])
219}
220
221/// Converts change events to a `RecordBatch` with CDC metadata + row data.
222///
223/// # Errors
224///
225/// Returns error if schema conversion fails.
226pub fn events_to_record_batch(
227    events: &[ChangeEvent],
228    table_info: &TableInfo,
229) -> Result<RecordBatch, arrow_schema::ArrowError> {
230    if events.is_empty() {
231        // Return empty batch with correct schema
232        let schema = build_cdc_schema(&table_info.arrow_schema);
233        return Ok(RecordBatch::new_empty(Arc::new(schema)));
234    }
235
236    let _n = events.len();
237
238    // CDC metadata columns
239    let tables: Vec<&str> = events.iter().map(|e| e.table.as_str()).collect();
240    let ops: Vec<&str> = events.iter().map(|e| e.operation.as_str()).collect();
241    let timestamps: Vec<i64> = events.iter().map(|e| e.timestamp_ms).collect();
242    let binlog_files: Vec<Option<&str>> = events
243        .iter()
244        .map(|e| Some(e.binlog_file.as_str()))
245        .collect();
246    let binlog_positions: Vec<Option<u64>> =
247        events.iter().map(|e| Some(e.binlog_position)).collect();
248    let gtids: Vec<Option<&str>> = events.iter().map(|e| e.gtid.as_deref()).collect();
249
250    let mut columns: Vec<ArrayRef> = vec![
251        Arc::new(StringArray::from(tables)),
252        Arc::new(StringArray::from(ops)),
253        Arc::new(arrow_array::TimestampMillisecondArray::from(timestamps)),
254        Arc::new(StringArray::from(binlog_files)),
255        Arc::new(UInt64Array::from(binlog_positions)),
256        Arc::new(StringArray::from(gtids)),
257    ];
258
259    // Row data columns
260    for (col_idx, _col_def) in table_info.columns.iter().enumerate() {
261        let values: Vec<Option<String>> = events
262            .iter()
263            .map(|e| {
264                e.row.columns.get(col_idx).and_then(|v| {
265                    if v.is_null() {
266                        None
267                    } else {
268                        Some(v.to_text())
269                    }
270                })
271            })
272            .collect();
273
274        // For simplicity, convert all to strings
275        // A production implementation would use proper Arrow types
276        let array: ArrayRef = Arc::new(StringArray::from(
277            values.iter().map(|v| v.as_deref()).collect::<Vec<_>>(),
278        ));
279        columns.push(array);
280    }
281
282    let schema = build_cdc_schema(&table_info.arrow_schema);
283    RecordBatch::try_new(Arc::new(schema), columns)
284}
285
286/// Builds the full CDC schema including metadata and row columns.
287fn build_cdc_schema(row_schema: &Schema) -> Schema {
288    use arrow_schema::TimeUnit;
289    let mut fields = vec![
290        Field::new("_table", DataType::Utf8, false),
291        Field::new("_op", DataType::Utf8, false),
292        Field::new(
293            "_ts_ms",
294            DataType::Timestamp(TimeUnit::Millisecond, None),
295            false,
296        ),
297        Field::new("_binlog_file", DataType::Utf8, true),
298        Field::new("_binlog_pos", DataType::UInt64, true),
299        Field::new("_gtid", DataType::Utf8, true),
300    ];
301
302    // Add row data columns (as strings for now)
303    for field in row_schema.fields() {
304        fields.push(Field::new(field.name(), DataType::Utf8, true));
305    }
306
307    Schema::new(fields)
308}
309
310/// Converts a single column value to JSON for the CDC envelope.
311#[must_use]
312pub fn column_value_to_json(value: &ColumnValue) -> serde_json::Value {
313    match value {
314        ColumnValue::Null => serde_json::Value::Null,
315        ColumnValue::SignedInt(v) => serde_json::json!(v),
316        ColumnValue::UnsignedInt(v) => serde_json::json!(v),
317        ColumnValue::Float(v) => serde_json::json!(v),
318        ColumnValue::Double(v) => serde_json::json!(v),
319        ColumnValue::String(s) => serde_json::json!(s),
320        ColumnValue::Bytes(b) => serde_json::json!(base64_encode(b)),
321        ColumnValue::Date(y, m, d) => serde_json::json!(format!("{y:04}-{m:02}-{d:02}")),
322        ColumnValue::Time(h, m, s, us) => {
323            if *us > 0 {
324                serde_json::json!(format!("{h:02}:{m:02}:{s:02}.{us:06}"))
325            } else {
326                serde_json::json!(format!("{h:02}:{m:02}:{s:02}"))
327            }
328        }
329        ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
330            if *us > 0 {
331                serde_json::json!(format!(
332                    "{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}.{us:06}"
333                ))
334            } else {
335                serde_json::json!(format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}"))
336            }
337        }
338        ColumnValue::Timestamp(us) => serde_json::json!(us),
339        ColumnValue::Json(s) => serde_json::from_str(s).unwrap_or_else(|e| {
340            tracing::warn!(
341                error = %e,
342                json_len = s.len(),
343                "[LDB-4003] MySQL CDC JSON column parse failed, treating as string literal"
344            );
345            serde_json::json!(s)
346        }),
347    }
348}
349
350/// Converts a row to JSON object.
351#[must_use]
352pub fn row_to_json(row: &RowData, columns: &[super::types::MySqlColumn]) -> serde_json::Value {
353    let mut obj = serde_json::Map::new();
354    for (i, col) in columns.iter().enumerate() {
355        if let Some(value) = row.columns.get(i) {
356            obj.insert(col.name.clone(), column_value_to_json(value));
357        }
358    }
359    serde_json::Value::Object(obj)
360}
361
362/// Simple base64 encoding for binary data.
363fn base64_encode(data: &[u8]) -> String {
364    use std::fmt::Write;
365    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
366
367    let mut result = String::new();
368    for chunk in data.chunks(3) {
369        let b0 = chunk[0] as usize;
370        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
371        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
372
373        let _ = result.write_char(ALPHABET[b0 >> 2] as char);
374        let _ = result.write_char(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
375
376        if chunk.len() > 1 {
377            let _ = result.write_char(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
378        } else {
379            result.push('=');
380        }
381
382        if chunk.len() > 2 {
383            let _ = result.write_char(ALPHABET[b2 & 0x3f] as char);
384        } else {
385            result.push('=');
386        }
387    }
388    result
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    fn make_test_row() -> RowData {
396        RowData {
397            columns: vec![
398                ColumnValue::SignedInt(1),
399                ColumnValue::String("Alice".to_string()),
400                ColumnValue::Null,
401            ],
402        }
403    }
404
405    #[test]
406    fn test_cdc_operation_as_str() {
407        assert_eq!(CdcOperation::Insert.as_str(), "I");
408        assert_eq!(CdcOperation::UpdateBefore.as_str(), "U-");
409        assert_eq!(CdcOperation::UpdateAfter.as_str(), "U+");
410        assert_eq!(CdcOperation::Delete.as_str(), "D");
411    }
412
413    #[test]
414    fn test_cdc_operation_weight() {
415        assert_eq!(CdcOperation::Insert.weight(), 1);
416        assert_eq!(CdcOperation::UpdateAfter.weight(), 1);
417        assert_eq!(CdcOperation::Delete.weight(), -1);
418        assert_eq!(CdcOperation::UpdateBefore.weight(), -1);
419    }
420
421    #[test]
422    fn test_change_event_insert() {
423        let event = ChangeEvent::insert(
424            "db.users".to_string(),
425            1704067200000,
426            "mysql-bin.000003".to_string(),
427            12345,
428            Some("gtid-123".to_string()),
429            make_test_row(),
430        );
431
432        assert_eq!(event.table, "db.users");
433        assert_eq!(event.operation, CdcOperation::Insert);
434        assert_eq!(event.timestamp_ms, 1704067200000);
435    }
436
437    #[test]
438    fn test_insert_to_events() {
439        let msg = InsertMessage {
440            table_id: 100,
441            database: "testdb".to_string(),
442            table: "users".to_string(),
443            rows: vec![make_test_row(), make_test_row()],
444            binlog_position: 12345,
445            timestamp_ms: 1704067200000,
446        };
447
448        let events = insert_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
449        assert_eq!(events.len(), 2);
450        assert!(events.iter().all(|e| e.operation == CdcOperation::Insert));
451    }
452
453    #[test]
454    fn test_update_to_events() {
455        let msg = UpdateMessage {
456            table_id: 100,
457            database: "testdb".to_string(),
458            table: "users".to_string(),
459            rows: vec![UpdateRowData {
460                before: RowData {
461                    columns: vec![ColumnValue::String("old".to_string())],
462                },
463                after: RowData {
464                    columns: vec![ColumnValue::String("new".to_string())],
465                },
466            }],
467            binlog_position: 12345,
468            timestamp_ms: 1704067200000,
469        };
470
471        let events = update_to_events(&msg, "mysql-bin.000003", None);
472        assert_eq!(events.len(), 2);
473        assert_eq!(events[0].operation, CdcOperation::UpdateBefore);
474        assert_eq!(events[1].operation, CdcOperation::UpdateAfter);
475    }
476
477    #[test]
478    fn test_delete_to_events() {
479        let msg = DeleteMessage {
480            table_id: 100,
481            database: "testdb".to_string(),
482            table: "users".to_string(),
483            rows: vec![make_test_row()],
484            binlog_position: 12345,
485            timestamp_ms: 1704067200000,
486        };
487
488        let events = delete_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
489        assert_eq!(events.len(), 1);
490        assert_eq!(events[0].operation, CdcOperation::Delete);
491    }
492
493    #[test]
494    fn test_column_value_to_json() {
495        assert_eq!(
496            column_value_to_json(&ColumnValue::Null),
497            serde_json::Value::Null
498        );
499        assert_eq!(
500            column_value_to_json(&ColumnValue::SignedInt(42)),
501            serde_json::json!(42)
502        );
503        assert_eq!(
504            column_value_to_json(&ColumnValue::String("hello".to_string())),
505            serde_json::json!("hello")
506        );
507        assert_eq!(
508            column_value_to_json(&ColumnValue::Date(2024, 6, 15)),
509            serde_json::json!("2024-06-15")
510        );
511    }
512
513    #[test]
514    fn test_cdc_metadata_schema() {
515        let schema = cdc_metadata_schema();
516        assert_eq!(schema.fields().len(), 6);
517        assert_eq!(schema.field(0).name(), "_table");
518        assert_eq!(schema.field(1).name(), "_op");
519    }
520
521    #[test]
522    fn test_base64_encode() {
523        assert_eq!(base64_encode(&[]), "");
524        assert_eq!(base64_encode(b"f"), "Zg==");
525        assert_eq!(base64_encode(b"fo"), "Zm8=");
526        assert_eq!(base64_encode(b"foo"), "Zm9v");
527        assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
528    }
529}