Skip to main content

laminar_connectors/cdc/mysql/
decoder.rs

1//! MySQL binlog event decoder.
2//!
3//! Wraps the `mysql_cdc` crate's binlog events into our internal
4//! [`BinlogMessage`] types for unified CDC processing.
5
6use super::gtid::Gtid;
7use super::types::MySqlColumn;
8
9/// A decoded binlog message from MySQL replication.
10#[derive(Debug, Clone, PartialEq)]
11pub enum BinlogMessage {
12    /// Transaction begin (GTID event).
13    Begin(BeginMessage),
14    /// Transaction commit (XID event).
15    Commit(CommitMessage),
16    /// Table map event (schema for subsequent row events).
17    TableMap(TableMapMessage),
18    /// Row insert event.
19    Insert(InsertMessage),
20    /// Row update event.
21    Update(UpdateMessage),
22    /// Row delete event.
23    Delete(DeleteMessage),
24    /// Query event (DDL statements).
25    Query(QueryMessage),
26    /// Rotate event (binlog file rotation).
27    Rotate(RotateMessage),
28    /// Heartbeat event.
29    Heartbeat,
30}
31
32/// Transaction begin message (from GTID event).
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct BeginMessage {
35    /// GTID of the transaction (if GTID mode is enabled).
36    pub gtid: Option<Gtid>,
37    /// Binlog filename.
38    pub binlog_filename: String,
39    /// Position in the binlog file.
40    pub binlog_position: u64,
41    /// Timestamp in milliseconds since Unix epoch.
42    pub timestamp_ms: i64,
43}
44
45/// Transaction commit message (from XID event).
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct CommitMessage {
48    /// XID (transaction ID).
49    pub xid: u64,
50    /// Binlog position after commit.
51    pub binlog_position: u64,
52    /// Commit timestamp in milliseconds since Unix epoch.
53    pub timestamp_ms: i64,
54}
55
56/// Table map message (schema definition for row events).
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TableMapMessage {
59    /// Table ID (internal MySQL identifier).
60    pub table_id: u64,
61    /// Database name.
62    pub database: String,
63    /// Table name.
64    pub table: String,
65    /// Column definitions.
66    pub columns: Vec<MySqlColumn>,
67}
68
69/// Row insert message.
70#[derive(Debug, Clone, PartialEq)]
71pub struct InsertMessage {
72    /// Table ID (references prior TableMapMessage).
73    pub table_id: u64,
74    /// Database name.
75    pub database: String,
76    /// Table name.
77    pub table: String,
78    /// Inserted row data.
79    pub rows: Vec<RowData>,
80    /// Binlog position.
81    pub binlog_position: u64,
82    /// Timestamp in milliseconds.
83    pub timestamp_ms: i64,
84}
85
86/// Row update message.
87#[derive(Debug, Clone, PartialEq)]
88pub struct UpdateMessage {
89    /// Table ID (references prior TableMapMessage).
90    pub table_id: u64,
91    /// Database name.
92    pub database: String,
93    /// Table name.
94    pub table: String,
95    /// Updated rows (before and after images).
96    pub rows: Vec<UpdateRowData>,
97    /// Binlog position.
98    pub binlog_position: u64,
99    /// Timestamp in milliseconds.
100    pub timestamp_ms: i64,
101}
102
103/// Row delete message.
104#[derive(Debug, Clone, PartialEq)]
105pub struct DeleteMessage {
106    /// Table ID (references prior TableMapMessage).
107    pub table_id: u64,
108    /// Database name.
109    pub database: String,
110    /// Table name.
111    pub table: String,
112    /// Deleted row data.
113    pub rows: Vec<RowData>,
114    /// Binlog position.
115    pub binlog_position: u64,
116    /// Timestamp in milliseconds.
117    pub timestamp_ms: i64,
118}
119
120/// Query event (typically DDL).
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct QueryMessage {
123    /// Database context.
124    pub database: String,
125    /// SQL query text.
126    pub query: String,
127    /// Binlog position.
128    pub binlog_position: u64,
129    /// Timestamp in milliseconds.
130    pub timestamp_ms: i64,
131}
132
133/// Rotate event (binlog file change).
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct RotateMessage {
136    /// New binlog filename.
137    pub next_binlog: String,
138    /// Position in the new binlog.
139    pub position: u64,
140}
141
142/// Row data containing column values.
143#[derive(Debug, Clone, PartialEq)]
144pub struct RowData {
145    /// Column values in ordinal order.
146    pub columns: Vec<ColumnValue>,
147}
148
149/// Update row data with before and after images.
150#[derive(Debug, Clone, PartialEq)]
151pub struct UpdateRowData {
152    /// Before image (old values).
153    pub before: RowData,
154    /// After image (new values).
155    pub after: RowData,
156}
157
158/// A single column value.
159#[derive(Debug, Clone, PartialEq)]
160pub enum ColumnValue {
161    /// NULL value.
162    Null,
163    /// Signed integer.
164    SignedInt(i64),
165    /// Unsigned integer.
166    UnsignedInt(u64),
167    /// Float.
168    Float(f32),
169    /// Double.
170    Double(f64),
171    /// String or text.
172    String(String),
173    /// Binary data.
174    Bytes(Vec<u8>),
175    /// Date (year, month, day).
176    Date(i32, u32, u32),
177    /// Time (hours, minutes, seconds, microseconds).
178    Time(i32, u32, u32, u32),
179    /// Datetime (year, month, day, hour, minute, second, microsecond).
180    DateTime(i32, u32, u32, u32, u32, u32, u32),
181    /// Timestamp (Unix timestamp in microseconds).
182    Timestamp(i64),
183    /// JSON value (as string).
184    Json(String),
185}
186
187impl ColumnValue {
188    /// Returns the value as a string if applicable.
189    #[must_use]
190    pub fn as_string(&self) -> Option<&str> {
191        match self {
192            ColumnValue::String(s) | ColumnValue::Json(s) => Some(s),
193            _ => None,
194        }
195    }
196
197    /// Returns the value as an i64 if applicable.
198    #[must_use]
199    pub fn as_i64(&self) -> Option<i64> {
200        match self {
201            ColumnValue::SignedInt(v) => Some(*v),
202            ColumnValue::UnsignedInt(v) => i64::try_from(*v).ok(),
203            _ => None,
204        }
205    }
206
207    /// Returns true if the value is NULL.
208    #[must_use]
209    pub fn is_null(&self) -> bool {
210        matches!(self, ColumnValue::Null)
211    }
212
213    /// Converts to a string representation for text-based serialization.
214    #[must_use]
215    pub fn to_text(&self) -> String {
216        match self {
217            ColumnValue::Null => String::new(),
218            ColumnValue::SignedInt(v) => v.to_string(),
219            ColumnValue::UnsignedInt(v) => v.to_string(),
220            ColumnValue::Float(v) => v.to_string(),
221            ColumnValue::Double(v) => v.to_string(),
222            ColumnValue::String(s) | ColumnValue::Json(s) => s.clone(),
223            ColumnValue::Bytes(b) => {
224                // Hex-encode binary data
225                use std::fmt::Write;
226                let mut hex = String::with_capacity(b.len() * 2);
227                for byte in b {
228                    let _ = write!(hex, "{byte:02x}");
229                }
230                hex
231            }
232            ColumnValue::Date(y, m, d) => format!("{y:04}-{m:02}-{d:02}"),
233            ColumnValue::Time(h, m, s, us) => {
234                if *us > 0 {
235                    format!("{h:02}:{m:02}:{s:02}.{us:06}")
236                } else {
237                    format!("{h:02}:{m:02}:{s:02}")
238                }
239            }
240            ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
241                if *us > 0 {
242                    format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}.{us:06}")
243                } else {
244                    format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}")
245                }
246            }
247            ColumnValue::Timestamp(us) => {
248                // Convert microseconds to ISO format
249                let secs = us / 1_000_000;
250                // us % 1_000_000 is always in [0, 999_999] so u32 cast is safe
251                #[allow(clippy::cast_sign_loss)]
252                let micros = (us % 1_000_000) as u32;
253                format!("{secs}.{micros:06}")
254            }
255        }
256    }
257}
258
259/// Decoder errors.
260#[derive(Debug, Clone, thiserror::Error)]
261pub enum DecoderError {
262    /// Unknown event type.
263    #[error("unknown event type: {0}")]
264    UnknownEventType(u8),
265
266    /// Invalid event data.
267    #[error("invalid event data: {0}")]
268    InvalidData(String),
269
270    /// Missing table map for row event.
271    #[error("missing table map for table_id {0}")]
272    MissingTableMap(u64),
273
274    /// Unsupported column type.
275    #[error("unsupported column type: {0}")]
276    UnsupportedType(u8),
277}
278
279/// Decodes a timestamp from MySQL epoch to Unix milliseconds.
280///
281/// MySQL timestamps are seconds since 1970-01-01 00:00:00 UTC.
282#[must_use]
283pub fn mysql_timestamp_to_unix_ms(timestamp: u32) -> i64 {
284    i64::from(timestamp) * 1000
285}
286
287/// Position in the MySQL binlog.
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct BinlogPosition {
290    /// Binlog filename (e.g., "mysql-bin.000003").
291    pub filename: String,
292    /// Position within the file.
293    pub position: u64,
294    /// GTID (if GTID mode is enabled).
295    pub gtid: Option<Gtid>,
296}
297
298impl BinlogPosition {
299    /// Creates a new binlog position.
300    #[must_use]
301    pub fn new(filename: String, position: u64) -> Self {
302        Self {
303            filename,
304            position,
305            gtid: None,
306        }
307    }
308
309    /// Creates a position with GTID.
310    #[must_use]
311    pub fn with_gtid(filename: String, position: u64, gtid: Gtid) -> Self {
312        Self {
313            filename,
314            position,
315            gtid: Some(gtid),
316        }
317    }
318}
319
320impl std::fmt::Display for BinlogPosition {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        if let Some(ref gtid) = self.gtid {
323            write!(f, "{}:{} (GTID: {})", self.filename, self.position, gtid)
324        } else {
325            write!(f, "{}:{}", self.filename, self.position)
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_column_value_null() {
336        let val = ColumnValue::Null;
337        assert!(val.is_null());
338        assert_eq!(val.to_text(), "");
339    }
340
341    #[test]
342    fn test_column_value_int() {
343        let val = ColumnValue::SignedInt(-42);
344        assert!(!val.is_null());
345        assert_eq!(val.as_i64(), Some(-42));
346        assert_eq!(val.to_text(), "-42");
347
348        let val = ColumnValue::UnsignedInt(100);
349        assert_eq!(val.as_i64(), Some(100));
350        assert_eq!(val.to_text(), "100");
351    }
352
353    #[test]
354    fn test_column_value_float() {
355        let val = ColumnValue::Float(3.14);
356        assert_eq!(val.to_text(), "3.14");
357
358        let val = ColumnValue::Double(2.718281828);
359        assert!(val.to_text().starts_with("2.718"));
360    }
361
362    #[test]
363    fn test_column_value_string() {
364        let val = ColumnValue::String("hello".to_string());
365        assert_eq!(val.as_string(), Some("hello"));
366        assert_eq!(val.to_text(), "hello");
367    }
368
369    #[test]
370    fn test_column_value_bytes() {
371        let val = ColumnValue::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]);
372        assert_eq!(val.to_text(), "deadbeef");
373    }
374
375    #[test]
376    fn test_column_value_date() {
377        let val = ColumnValue::Date(2024, 6, 15);
378        assert_eq!(val.to_text(), "2024-06-15");
379    }
380
381    #[test]
382    fn test_column_value_time() {
383        let val = ColumnValue::Time(14, 30, 45, 0);
384        assert_eq!(val.to_text(), "14:30:45");
385
386        let val = ColumnValue::Time(14, 30, 45, 123456);
387        assert_eq!(val.to_text(), "14:30:45.123456");
388    }
389
390    #[test]
391    fn test_column_value_datetime() {
392        let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0);
393        assert_eq!(val.to_text(), "2024-06-15 14:30:45");
394
395        let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 500000);
396        assert_eq!(val.to_text(), "2024-06-15 14:30:45.500000");
397    }
398
399    #[test]
400    fn test_column_value_json() {
401        let val = ColumnValue::Json(r#"{"key": "value"}"#.to_string());
402        assert_eq!(val.as_string(), Some(r#"{"key": "value"}"#));
403    }
404
405    #[test]
406    fn test_binlog_position() {
407        let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
408        assert_eq!(pos.to_string(), "mysql-bin.000003:12345");
409        assert!(pos.gtid.is_none());
410    }
411
412    #[test]
413    fn test_binlog_position_with_gtid() {
414        let gtid: Gtid = "3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap();
415        let pos = BinlogPosition::with_gtid("mysql-bin.000003".to_string(), 12345, gtid);
416        assert!(pos.gtid.is_some());
417        let s = pos.to_string();
418        assert!(s.contains("GTID"));
419    }
420
421    #[test]
422    fn test_mysql_timestamp_to_unix_ms() {
423        // 2024-01-01 00:00:00 UTC = 1704067200 seconds
424        let unix_ms = mysql_timestamp_to_unix_ms(1704067200);
425        assert_eq!(unix_ms, 1704067200000);
426    }
427
428    #[test]
429    fn test_row_data() {
430        let row = RowData {
431            columns: vec![
432                ColumnValue::SignedInt(1),
433                ColumnValue::String("Alice".to_string()),
434                ColumnValue::Null,
435            ],
436        };
437        assert_eq!(row.columns.len(), 3);
438        assert_eq!(row.columns[0].as_i64(), Some(1));
439        assert!(row.columns[2].is_null());
440    }
441
442    #[test]
443    fn test_update_row_data() {
444        let update = UpdateRowData {
445            before: RowData {
446                columns: vec![ColumnValue::String("old".to_string())],
447            },
448            after: RowData {
449                columns: vec![ColumnValue::String("new".to_string())],
450            },
451        };
452        assert_eq!(update.before.columns[0].as_string(), Some("old"));
453        assert_eq!(update.after.columns[0].as_string(), Some("new"));
454    }
455
456    #[test]
457    fn test_begin_message() {
458        let msg = BeginMessage {
459            gtid: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap()),
460            binlog_filename: "mysql-bin.000003".to_string(),
461            binlog_position: 12345,
462            timestamp_ms: 1704067200000,
463        };
464        assert!(msg.gtid.is_some());
465    }
466
467    #[test]
468    fn test_insert_message() {
469        let msg = InsertMessage {
470            table_id: 100,
471            database: "mydb".to_string(),
472            table: "users".to_string(),
473            rows: vec![RowData {
474                columns: vec![ColumnValue::SignedInt(1)],
475            }],
476            binlog_position: 12345,
477            timestamp_ms: 1704067200000,
478        };
479        assert_eq!(msg.rows.len(), 1);
480    }
481}