Skip to main content

laminar_connectors/cdc/mysql/
decoder.rs

1//! MySQL binlog event decoder.
2//!
3//! Wraps the `mysql_cdc` crate's binlog events into our internal
4//! [`BinlogMessage`] types for unified CDC processing.
5
6use super::gtid::Gtid;
7use super::types::MySqlColumn;
8
9/// A decoded binlog message from MySQL replication.
10#[derive(Debug, Clone, PartialEq)]
11pub enum BinlogMessage {
12    /// Transaction begin (GTID event).
13    Begin(BeginMessage),
14    /// Transaction commit (XID event).
15    Commit(CommitMessage),
16    /// Table map event (schema for subsequent row events).
17    TableMap(TableMapMessage),
18    /// Row insert event.
19    Insert(InsertMessage),
20    /// Row update event.
21    Update(UpdateMessage),
22    /// Row delete event.
23    Delete(DeleteMessage),
24    /// Query event (DDL statements).
25    Query(QueryMessage),
26    /// Rotate event (binlog file rotation).
27    Rotate(RotateMessage),
28    /// Heartbeat event.
29    Heartbeat,
30}
31
32/// Transaction begin message (from GTID event).
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct BeginMessage {
35    /// GTID of the transaction (if GTID mode is enabled).
36    pub gtid: Option<Gtid>,
37    /// Binlog filename.
38    pub binlog_filename: String,
39    /// Position in the binlog file.
40    pub binlog_position: u64,
41    /// Timestamp in milliseconds since Unix epoch.
42    pub timestamp_ms: i64,
43}
44
45/// Transaction commit message (from XID event).
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct CommitMessage {
48    /// XID (transaction ID).
49    pub xid: u64,
50    /// Binlog position after commit.
51    pub binlog_position: u64,
52    /// Commit timestamp in milliseconds since Unix epoch.
53    pub timestamp_ms: i64,
54}
55
56/// Table map message (schema definition for row events).
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TableMapMessage {
59    /// Table ID (internal MySQL identifier).
60    pub table_id: u64,
61    /// Database name.
62    pub database: String,
63    /// Table name.
64    pub table: String,
65    /// Column definitions.
66    pub columns: Vec<MySqlColumn>,
67}
68
69/// Row insert message.
70#[derive(Debug, Clone, PartialEq)]
71pub struct InsertMessage {
72    /// Table ID (references prior TableMapMessage).
73    pub table_id: u64,
74    /// Database name.
75    pub database: String,
76    /// Table name.
77    pub table: String,
78    /// Inserted row data.
79    pub rows: Vec<RowData>,
80    /// Binlog position.
81    pub binlog_position: u64,
82    /// Timestamp in milliseconds.
83    pub timestamp_ms: i64,
84}
85
86/// Row update message.
87#[derive(Debug, Clone, PartialEq)]
88pub struct UpdateMessage {
89    /// Table ID (references prior TableMapMessage).
90    pub table_id: u64,
91    /// Database name.
92    pub database: String,
93    /// Table name.
94    pub table: String,
95    /// Updated rows (before and after images).
96    pub rows: Vec<UpdateRowData>,
97    /// Binlog position.
98    pub binlog_position: u64,
99    /// Timestamp in milliseconds.
100    pub timestamp_ms: i64,
101}
102
103/// Row delete message.
104#[derive(Debug, Clone, PartialEq)]
105pub struct DeleteMessage {
106    /// Table ID (references prior TableMapMessage).
107    pub table_id: u64,
108    /// Database name.
109    pub database: String,
110    /// Table name.
111    pub table: String,
112    /// Deleted row data.
113    pub rows: Vec<RowData>,
114    /// Binlog position.
115    pub binlog_position: u64,
116    /// Timestamp in milliseconds.
117    pub timestamp_ms: i64,
118}
119
120/// Query event (typically DDL).
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct QueryMessage {
123    /// Database context.
124    pub database: String,
125    /// SQL query text.
126    pub query: String,
127    /// Binlog position.
128    pub binlog_position: u64,
129    /// Timestamp in milliseconds.
130    pub timestamp_ms: i64,
131}
132
133/// Rotate event (binlog file change).
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct RotateMessage {
136    /// New binlog filename.
137    pub next_binlog: String,
138    /// Position in the new binlog.
139    pub position: u64,
140}
141
142/// Row data containing column values.
143#[derive(Debug, Clone, PartialEq)]
144pub struct RowData {
145    /// Column values in ordinal order.
146    pub columns: Vec<ColumnValue>,
147}
148
149/// Update row data with before and after images.
150#[derive(Debug, Clone, PartialEq)]
151pub struct UpdateRowData {
152    /// Before image (old values).
153    pub before: RowData,
154    /// After image (new values).
155    pub after: RowData,
156}
157
158/// A single column value.
159#[derive(Debug, Clone, PartialEq)]
160pub enum ColumnValue {
161    /// NULL value.
162    Null,
163    /// Signed integer.
164    SignedInt(i64),
165    /// Unsigned integer.
166    UnsignedInt(u64),
167    /// Float.
168    Float(f32),
169    /// Double.
170    Double(f64),
171    /// String or text.
172    String(String),
173    /// Binary data.
174    Bytes(Vec<u8>),
175    /// Date (year, month, day).
176    Date(i32, u32, u32),
177    /// Time (hours, minutes, seconds, microseconds).
178    Time(i32, u32, u32, u32),
179    /// Datetime (year, month, day, hour, minute, second, microsecond).
180    DateTime(i32, u32, u32, u32, u32, u32, u32),
181    /// Timestamp (Unix timestamp in microseconds).
182    Timestamp(i64),
183    /// JSON value (as string).
184    Json(String),
185}
186
187impl ColumnValue {
188    /// Returns the value as a string if applicable.
189    #[must_use]
190    pub fn as_string(&self) -> Option<&str> {
191        match self {
192            ColumnValue::String(s) | ColumnValue::Json(s) => Some(s),
193            _ => None,
194        }
195    }
196
197    /// Returns the value as an i64 if applicable.
198    #[must_use]
199    pub fn as_i64(&self) -> Option<i64> {
200        match self {
201            ColumnValue::SignedInt(v) => Some(*v),
202            ColumnValue::UnsignedInt(v) => i64::try_from(*v).ok(),
203            _ => None,
204        }
205    }
206
207    /// Returns true if the value is NULL.
208    #[must_use]
209    pub fn is_null(&self) -> bool {
210        matches!(self, ColumnValue::Null)
211    }
212
213    /// Converts to a string representation for text-based serialization.
214    #[must_use]
215    pub fn to_text(&self) -> String {
216        match self {
217            ColumnValue::Null => String::new(),
218            ColumnValue::SignedInt(v) => v.to_string(),
219            ColumnValue::UnsignedInt(v) => v.to_string(),
220            ColumnValue::Float(v) => v.to_string(),
221            ColumnValue::Double(v) => v.to_string(),
222            ColumnValue::String(s) | ColumnValue::Json(s) => s.clone(),
223            ColumnValue::Bytes(b) => {
224                // Hex-encode binary data
225                use std::fmt::Write;
226                let mut hex = String::with_capacity(b.len() * 2);
227                for byte in b {
228                    let _ = write!(hex, "{byte:02x}");
229                }
230                hex
231            }
232            ColumnValue::Date(y, m, d) => format!("{y:04}-{m:02}-{d:02}"),
233            ColumnValue::Time(h, m, s, us) => {
234                if *us > 0 {
235                    format!("{h:02}:{m:02}:{s:02}.{us:06}")
236                } else {
237                    format!("{h:02}:{m:02}:{s:02}")
238                }
239            }
240            ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
241                if *us > 0 {
242                    format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}.{us:06}")
243                } else {
244                    format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}")
245                }
246            }
247            ColumnValue::Timestamp(us) => {
248                // Floor-divide so the sub-second part is always non-negative,
249                // even for pre-epoch (negative) microsecond timestamps.
250                let secs = us.div_euclid(1_000_000);
251                #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
252                // rem_euclid yields [0, 999_999]
253                let micros = us.rem_euclid(1_000_000) as u32;
254                format!("{secs}.{micros:06}")
255            }
256        }
257    }
258}
259
260/// Decoder errors.
261#[derive(Debug, Clone, thiserror::Error)]
262pub enum DecoderError {
263    /// Unknown event type.
264    #[error("unknown event type: {0}")]
265    UnknownEventType(u8),
266
267    /// Invalid event data.
268    #[error("invalid event data: {0}")]
269    InvalidData(String),
270
271    /// Missing table map for row event.
272    #[error("missing table map for table_id {0}")]
273    MissingTableMap(u64),
274
275    /// Unsupported column type.
276    #[error("unsupported column type: {0}")]
277    UnsupportedType(u8),
278}
279
280/// Decodes a timestamp from MySQL epoch to Unix milliseconds.
281///
282/// MySQL timestamps are seconds since 1970-01-01 00:00:00 UTC.
283#[must_use]
284pub fn mysql_timestamp_to_unix_ms(timestamp: u32) -> i64 {
285    i64::from(timestamp) * 1000
286}
287
288/// Position in the MySQL binlog.
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct BinlogPosition {
291    /// Binlog filename (e.g., "mysql-bin.000003").
292    pub filename: String,
293    /// Position within the file.
294    pub position: u64,
295    /// GTID (if GTID mode is enabled).
296    pub gtid: Option<Gtid>,
297}
298
299impl BinlogPosition {
300    /// Creates a new binlog position.
301    #[must_use]
302    pub fn new(filename: String, position: u64) -> Self {
303        Self {
304            filename,
305            position,
306            gtid: None,
307        }
308    }
309
310    /// Creates a position with GTID.
311    #[must_use]
312    pub fn with_gtid(filename: String, position: u64, gtid: Gtid) -> Self {
313        Self {
314            filename,
315            position,
316            gtid: Some(gtid),
317        }
318    }
319}
320
321impl std::fmt::Display for BinlogPosition {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        if let Some(ref gtid) = self.gtid {
324            write!(f, "{}:{} (GTID: {})", self.filename, self.position, gtid)
325        } else {
326            write!(f, "{}:{}", self.filename, self.position)
327        }
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn test_column_value_null() {
337        let val = ColumnValue::Null;
338        assert!(val.is_null());
339        assert_eq!(val.to_text(), "");
340    }
341
342    #[test]
343    fn test_column_value_int() {
344        let val = ColumnValue::SignedInt(-42);
345        assert!(!val.is_null());
346        assert_eq!(val.as_i64(), Some(-42));
347        assert_eq!(val.to_text(), "-42");
348
349        let val = ColumnValue::UnsignedInt(100);
350        assert_eq!(val.as_i64(), Some(100));
351        assert_eq!(val.to_text(), "100");
352    }
353
354    #[test]
355    fn test_column_value_float() {
356        let val = ColumnValue::Float(3.14);
357        assert_eq!(val.to_text(), "3.14");
358
359        let val = ColumnValue::Double(2.718281828);
360        assert!(val.to_text().starts_with("2.718"));
361    }
362
363    #[test]
364    fn test_column_value_string() {
365        let val = ColumnValue::String("hello".to_string());
366        assert_eq!(val.as_string(), Some("hello"));
367        assert_eq!(val.to_text(), "hello");
368    }
369
370    #[test]
371    fn test_column_value_bytes() {
372        let val = ColumnValue::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]);
373        assert_eq!(val.to_text(), "deadbeef");
374    }
375
376    #[test]
377    fn test_column_value_date() {
378        let val = ColumnValue::Date(2024, 6, 15);
379        assert_eq!(val.to_text(), "2024-06-15");
380    }
381
382    #[test]
383    fn test_column_value_time() {
384        let val = ColumnValue::Time(14, 30, 45, 0);
385        assert_eq!(val.to_text(), "14:30:45");
386
387        let val = ColumnValue::Time(14, 30, 45, 123456);
388        assert_eq!(val.to_text(), "14:30:45.123456");
389    }
390
391    #[test]
392    fn test_column_value_datetime() {
393        let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0);
394        assert_eq!(val.to_text(), "2024-06-15 14:30:45");
395
396        let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 500000);
397        assert_eq!(val.to_text(), "2024-06-15 14:30:45.500000");
398    }
399
400    #[test]
401    fn test_column_value_json() {
402        let val = ColumnValue::Json(r#"{"key": "value"}"#.to_string());
403        assert_eq!(val.as_string(), Some(r#"{"key": "value"}"#));
404    }
405
406    #[test]
407    fn test_binlog_position() {
408        let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
409        assert_eq!(pos.to_string(), "mysql-bin.000003:12345");
410        assert!(pos.gtid.is_none());
411    }
412
413    #[test]
414    fn test_binlog_position_with_gtid() {
415        let gtid: Gtid = "3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap();
416        let pos = BinlogPosition::with_gtid("mysql-bin.000003".to_string(), 12345, gtid);
417        assert!(pos.gtid.is_some());
418        let s = pos.to_string();
419        assert!(s.contains("GTID"));
420    }
421
422    #[test]
423    fn test_mysql_timestamp_to_unix_ms() {
424        // 2024-01-01 00:00:00 UTC = 1704067200 seconds
425        let unix_ms = mysql_timestamp_to_unix_ms(1704067200);
426        assert_eq!(unix_ms, 1704067200000);
427    }
428
429    #[test]
430    fn test_row_data() {
431        let row = RowData {
432            columns: vec![
433                ColumnValue::SignedInt(1),
434                ColumnValue::String("Alice".to_string()),
435                ColumnValue::Null,
436            ],
437        };
438        assert_eq!(row.columns.len(), 3);
439        assert_eq!(row.columns[0].as_i64(), Some(1));
440        assert!(row.columns[2].is_null());
441    }
442
443    #[test]
444    fn test_update_row_data() {
445        let update = UpdateRowData {
446            before: RowData {
447                columns: vec![ColumnValue::String("old".to_string())],
448            },
449            after: RowData {
450                columns: vec![ColumnValue::String("new".to_string())],
451            },
452        };
453        assert_eq!(update.before.columns[0].as_string(), Some("old"));
454        assert_eq!(update.after.columns[0].as_string(), Some("new"));
455    }
456
457    #[test]
458    fn test_begin_message() {
459        let msg = BeginMessage {
460            gtid: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap()),
461            binlog_filename: "mysql-bin.000003".to_string(),
462            binlog_position: 12345,
463            timestamp_ms: 1704067200000,
464        };
465        assert!(msg.gtid.is_some());
466    }
467
468    #[test]
469    fn test_insert_message() {
470        let msg = InsertMessage {
471            table_id: 100,
472            database: "mydb".to_string(),
473            table: "users".to_string(),
474            rows: vec![RowData {
475                columns: vec![ColumnValue::SignedInt(1)],
476            }],
477            binlog_position: 12345,
478            timestamp_ms: 1704067200000,
479        };
480        assert_eq!(msg.rows.len(), 1);
481    }
482}