1use super::gtid::Gtid;
7use super::types::MySqlColumn;
8
9#[derive(Debug, Clone, PartialEq)]
11pub enum BinlogMessage {
12 Begin(BeginMessage),
14 Commit(CommitMessage),
16 TableMap(TableMapMessage),
18 Insert(InsertMessage),
20 Update(UpdateMessage),
22 Delete(DeleteMessage),
24 Query(QueryMessage),
26 Rotate(RotateMessage),
28 Heartbeat,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct BeginMessage {
35 pub gtid: Option<Gtid>,
37 pub binlog_filename: String,
39 pub binlog_position: u64,
41 pub timestamp_ms: i64,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct CommitMessage {
48 pub xid: u64,
50 pub binlog_position: u64,
52 pub timestamp_ms: i64,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TableMapMessage {
59 pub table_id: u64,
61 pub database: String,
63 pub table: String,
65 pub columns: Vec<MySqlColumn>,
67}
68
69#[derive(Debug, Clone, PartialEq)]
71pub struct InsertMessage {
72 pub table_id: u64,
74 pub database: String,
76 pub table: String,
78 pub rows: Vec<RowData>,
80 pub binlog_position: u64,
82 pub timestamp_ms: i64,
84}
85
86#[derive(Debug, Clone, PartialEq)]
88pub struct UpdateMessage {
89 pub table_id: u64,
91 pub database: String,
93 pub table: String,
95 pub rows: Vec<UpdateRowData>,
97 pub binlog_position: u64,
99 pub timestamp_ms: i64,
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub struct DeleteMessage {
106 pub table_id: u64,
108 pub database: String,
110 pub table: String,
112 pub rows: Vec<RowData>,
114 pub binlog_position: u64,
116 pub timestamp_ms: i64,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct QueryMessage {
123 pub database: String,
125 pub query: String,
127 pub binlog_position: u64,
129 pub timestamp_ms: i64,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct RotateMessage {
136 pub next_binlog: String,
138 pub position: u64,
140}
141
142#[derive(Debug, Clone, PartialEq)]
144pub struct RowData {
145 pub columns: Vec<ColumnValue>,
147}
148
149#[derive(Debug, Clone, PartialEq)]
151pub struct UpdateRowData {
152 pub before: RowData,
154 pub after: RowData,
156}
157
158#[derive(Debug, Clone, PartialEq)]
160pub enum ColumnValue {
161 Null,
163 SignedInt(i64),
165 UnsignedInt(u64),
167 Float(f32),
169 Double(f64),
171 String(String),
173 Bytes(Vec<u8>),
175 Date(i32, u32, u32),
177 Time(i32, u32, u32, u32),
179 DateTime(i32, u32, u32, u32, u32, u32, u32),
181 Timestamp(i64),
183 Json(String),
185}
186
187impl ColumnValue {
188 #[must_use]
190 pub fn as_string(&self) -> Option<&str> {
191 match self {
192 ColumnValue::String(s) | ColumnValue::Json(s) => Some(s),
193 _ => None,
194 }
195 }
196
197 #[must_use]
199 pub fn as_i64(&self) -> Option<i64> {
200 match self {
201 ColumnValue::SignedInt(v) => Some(*v),
202 ColumnValue::UnsignedInt(v) => i64::try_from(*v).ok(),
203 _ => None,
204 }
205 }
206
207 #[must_use]
209 pub fn is_null(&self) -> bool {
210 matches!(self, ColumnValue::Null)
211 }
212
213 #[must_use]
215 pub fn to_text(&self) -> String {
216 match self {
217 ColumnValue::Null => String::new(),
218 ColumnValue::SignedInt(v) => v.to_string(),
219 ColumnValue::UnsignedInt(v) => v.to_string(),
220 ColumnValue::Float(v) => v.to_string(),
221 ColumnValue::Double(v) => v.to_string(),
222 ColumnValue::String(s) | ColumnValue::Json(s) => s.clone(),
223 ColumnValue::Bytes(b) => {
224 use std::fmt::Write;
226 let mut hex = String::with_capacity(b.len() * 2);
227 for byte in b {
228 let _ = write!(hex, "{byte:02x}");
229 }
230 hex
231 }
232 ColumnValue::Date(y, m, d) => format!("{y:04}-{m:02}-{d:02}"),
233 ColumnValue::Time(h, m, s, us) => {
234 if *us > 0 {
235 format!("{h:02}:{m:02}:{s:02}.{us:06}")
236 } else {
237 format!("{h:02}:{m:02}:{s:02}")
238 }
239 }
240 ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
241 if *us > 0 {
242 format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}.{us:06}")
243 } else {
244 format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}")
245 }
246 }
247 ColumnValue::Timestamp(us) => {
248 let secs = us.div_euclid(1_000_000);
251 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
252 let micros = us.rem_euclid(1_000_000) as u32;
254 format!("{secs}.{micros:06}")
255 }
256 }
257 }
258}
259
260#[derive(Debug, Clone, thiserror::Error)]
262pub enum DecoderError {
263 #[error("unknown event type: {0}")]
265 UnknownEventType(u8),
266
267 #[error("invalid event data: {0}")]
269 InvalidData(String),
270
271 #[error("missing table map for table_id {0}")]
273 MissingTableMap(u64),
274
275 #[error("unsupported column type: {0}")]
277 UnsupportedType(u8),
278}
279
280#[must_use]
284pub fn mysql_timestamp_to_unix_ms(timestamp: u32) -> i64 {
285 i64::from(timestamp) * 1000
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct BinlogPosition {
291 pub filename: String,
293 pub position: u64,
295 pub gtid: Option<Gtid>,
297}
298
299impl BinlogPosition {
300 #[must_use]
302 pub fn new(filename: String, position: u64) -> Self {
303 Self {
304 filename,
305 position,
306 gtid: None,
307 }
308 }
309
310 #[must_use]
312 pub fn with_gtid(filename: String, position: u64, gtid: Gtid) -> Self {
313 Self {
314 filename,
315 position,
316 gtid: Some(gtid),
317 }
318 }
319}
320
321impl std::fmt::Display for BinlogPosition {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 if let Some(ref gtid) = self.gtid {
324 write!(f, "{}:{} (GTID: {})", self.filename, self.position, gtid)
325 } else {
326 write!(f, "{}:{}", self.filename, self.position)
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_column_value_null() {
337 let val = ColumnValue::Null;
338 assert!(val.is_null());
339 assert_eq!(val.to_text(), "");
340 }
341
342 #[test]
343 fn test_column_value_int() {
344 let val = ColumnValue::SignedInt(-42);
345 assert!(!val.is_null());
346 assert_eq!(val.as_i64(), Some(-42));
347 assert_eq!(val.to_text(), "-42");
348
349 let val = ColumnValue::UnsignedInt(100);
350 assert_eq!(val.as_i64(), Some(100));
351 assert_eq!(val.to_text(), "100");
352 }
353
354 #[test]
355 fn test_column_value_float() {
356 let val = ColumnValue::Float(3.14);
357 assert_eq!(val.to_text(), "3.14");
358
359 let val = ColumnValue::Double(2.718281828);
360 assert!(val.to_text().starts_with("2.718"));
361 }
362
363 #[test]
364 fn test_column_value_string() {
365 let val = ColumnValue::String("hello".to_string());
366 assert_eq!(val.as_string(), Some("hello"));
367 assert_eq!(val.to_text(), "hello");
368 }
369
370 #[test]
371 fn test_column_value_bytes() {
372 let val = ColumnValue::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]);
373 assert_eq!(val.to_text(), "deadbeef");
374 }
375
376 #[test]
377 fn test_column_value_date() {
378 let val = ColumnValue::Date(2024, 6, 15);
379 assert_eq!(val.to_text(), "2024-06-15");
380 }
381
382 #[test]
383 fn test_column_value_time() {
384 let val = ColumnValue::Time(14, 30, 45, 0);
385 assert_eq!(val.to_text(), "14:30:45");
386
387 let val = ColumnValue::Time(14, 30, 45, 123456);
388 assert_eq!(val.to_text(), "14:30:45.123456");
389 }
390
391 #[test]
392 fn test_column_value_datetime() {
393 let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0);
394 assert_eq!(val.to_text(), "2024-06-15 14:30:45");
395
396 let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 500000);
397 assert_eq!(val.to_text(), "2024-06-15 14:30:45.500000");
398 }
399
400 #[test]
401 fn test_column_value_json() {
402 let val = ColumnValue::Json(r#"{"key": "value"}"#.to_string());
403 assert_eq!(val.as_string(), Some(r#"{"key": "value"}"#));
404 }
405
406 #[test]
407 fn test_binlog_position() {
408 let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
409 assert_eq!(pos.to_string(), "mysql-bin.000003:12345");
410 assert!(pos.gtid.is_none());
411 }
412
413 #[test]
414 fn test_binlog_position_with_gtid() {
415 let gtid: Gtid = "3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap();
416 let pos = BinlogPosition::with_gtid("mysql-bin.000003".to_string(), 12345, gtid);
417 assert!(pos.gtid.is_some());
418 let s = pos.to_string();
419 assert!(s.contains("GTID"));
420 }
421
422 #[test]
423 fn test_mysql_timestamp_to_unix_ms() {
424 let unix_ms = mysql_timestamp_to_unix_ms(1704067200);
426 assert_eq!(unix_ms, 1704067200000);
427 }
428
429 #[test]
430 fn test_row_data() {
431 let row = RowData {
432 columns: vec![
433 ColumnValue::SignedInt(1),
434 ColumnValue::String("Alice".to_string()),
435 ColumnValue::Null,
436 ],
437 };
438 assert_eq!(row.columns.len(), 3);
439 assert_eq!(row.columns[0].as_i64(), Some(1));
440 assert!(row.columns[2].is_null());
441 }
442
443 #[test]
444 fn test_update_row_data() {
445 let update = UpdateRowData {
446 before: RowData {
447 columns: vec![ColumnValue::String("old".to_string())],
448 },
449 after: RowData {
450 columns: vec![ColumnValue::String("new".to_string())],
451 },
452 };
453 assert_eq!(update.before.columns[0].as_string(), Some("old"));
454 assert_eq!(update.after.columns[0].as_string(), Some("new"));
455 }
456
457 #[test]
458 fn test_begin_message() {
459 let msg = BeginMessage {
460 gtid: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap()),
461 binlog_filename: "mysql-bin.000003".to_string(),
462 binlog_position: 12345,
463 timestamp_ms: 1704067200000,
464 };
465 assert!(msg.gtid.is_some());
466 }
467
468 #[test]
469 fn test_insert_message() {
470 let msg = InsertMessage {
471 table_id: 100,
472 database: "mydb".to_string(),
473 table: "users".to_string(),
474 rows: vec![RowData {
475 columns: vec![ColumnValue::SignedInt(1)],
476 }],
477 binlog_position: 12345,
478 timestamp_ms: 1704067200000,
479 };
480 assert_eq!(msg.rows.len(), 1);
481 }
482}