1use super::gtid::Gtid;
7use super::types::MySqlColumn;
8
9#[derive(Debug, Clone, PartialEq)]
11pub enum BinlogMessage {
12 Begin(BeginMessage),
14 Commit(CommitMessage),
16 TableMap(TableMapMessage),
18 Insert(InsertMessage),
20 Update(UpdateMessage),
22 Delete(DeleteMessage),
24 Query(QueryMessage),
26 Rotate(RotateMessage),
28 Heartbeat,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct BeginMessage {
35 pub gtid: Option<Gtid>,
37 pub binlog_filename: String,
39 pub binlog_position: u64,
41 pub timestamp_ms: i64,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct CommitMessage {
48 pub xid: u64,
50 pub binlog_position: u64,
52 pub timestamp_ms: i64,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TableMapMessage {
59 pub table_id: u64,
61 pub database: String,
63 pub table: String,
65 pub columns: Vec<MySqlColumn>,
67}
68
69#[derive(Debug, Clone, PartialEq)]
71pub struct InsertMessage {
72 pub table_id: u64,
74 pub database: String,
76 pub table: String,
78 pub rows: Vec<RowData>,
80 pub binlog_position: u64,
82 pub timestamp_ms: i64,
84}
85
86#[derive(Debug, Clone, PartialEq)]
88pub struct UpdateMessage {
89 pub table_id: u64,
91 pub database: String,
93 pub table: String,
95 pub rows: Vec<UpdateRowData>,
97 pub binlog_position: u64,
99 pub timestamp_ms: i64,
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub struct DeleteMessage {
106 pub table_id: u64,
108 pub database: String,
110 pub table: String,
112 pub rows: Vec<RowData>,
114 pub binlog_position: u64,
116 pub timestamp_ms: i64,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct QueryMessage {
123 pub database: String,
125 pub query: String,
127 pub binlog_position: u64,
129 pub timestamp_ms: i64,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct RotateMessage {
136 pub next_binlog: String,
138 pub position: u64,
140}
141
142#[derive(Debug, Clone, PartialEq)]
144pub struct RowData {
145 pub columns: Vec<ColumnValue>,
147}
148
149#[derive(Debug, Clone, PartialEq)]
151pub struct UpdateRowData {
152 pub before: RowData,
154 pub after: RowData,
156}
157
158#[derive(Debug, Clone, PartialEq)]
160pub enum ColumnValue {
161 Null,
163 SignedInt(i64),
165 UnsignedInt(u64),
167 Float(f32),
169 Double(f64),
171 String(String),
173 Bytes(Vec<u8>),
175 Date(i32, u32, u32),
177 Time(i32, u32, u32, u32),
179 DateTime(i32, u32, u32, u32, u32, u32, u32),
181 Timestamp(i64),
183 Json(String),
185}
186
187impl ColumnValue {
188 #[must_use]
190 pub fn as_string(&self) -> Option<&str> {
191 match self {
192 ColumnValue::String(s) | ColumnValue::Json(s) => Some(s),
193 _ => None,
194 }
195 }
196
197 #[must_use]
199 pub fn as_i64(&self) -> Option<i64> {
200 match self {
201 ColumnValue::SignedInt(v) => Some(*v),
202 ColumnValue::UnsignedInt(v) => i64::try_from(*v).ok(),
203 _ => None,
204 }
205 }
206
207 #[must_use]
209 pub fn is_null(&self) -> bool {
210 matches!(self, ColumnValue::Null)
211 }
212
213 #[must_use]
215 pub fn to_text(&self) -> String {
216 match self {
217 ColumnValue::Null => String::new(),
218 ColumnValue::SignedInt(v) => v.to_string(),
219 ColumnValue::UnsignedInt(v) => v.to_string(),
220 ColumnValue::Float(v) => v.to_string(),
221 ColumnValue::Double(v) => v.to_string(),
222 ColumnValue::String(s) | ColumnValue::Json(s) => s.clone(),
223 ColumnValue::Bytes(b) => {
224 use std::fmt::Write;
226 let mut hex = String::with_capacity(b.len() * 2);
227 for byte in b {
228 let _ = write!(hex, "{byte:02x}");
229 }
230 hex
231 }
232 ColumnValue::Date(y, m, d) => format!("{y:04}-{m:02}-{d:02}"),
233 ColumnValue::Time(h, m, s, us) => {
234 if *us > 0 {
235 format!("{h:02}:{m:02}:{s:02}.{us:06}")
236 } else {
237 format!("{h:02}:{m:02}:{s:02}")
238 }
239 }
240 ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
241 if *us > 0 {
242 format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}.{us:06}")
243 } else {
244 format!("{y:04}-{mo:02}-{d:02} {h:02}:{mi:02}:{s:02}")
245 }
246 }
247 ColumnValue::Timestamp(us) => {
248 let secs = us / 1_000_000;
250 #[allow(clippy::cast_sign_loss)]
252 let micros = (us % 1_000_000) as u32;
253 format!("{secs}.{micros:06}")
254 }
255 }
256 }
257}
258
259#[derive(Debug, Clone, thiserror::Error)]
261pub enum DecoderError {
262 #[error("unknown event type: {0}")]
264 UnknownEventType(u8),
265
266 #[error("invalid event data: {0}")]
268 InvalidData(String),
269
270 #[error("missing table map for table_id {0}")]
272 MissingTableMap(u64),
273
274 #[error("unsupported column type: {0}")]
276 UnsupportedType(u8),
277}
278
279#[must_use]
283pub fn mysql_timestamp_to_unix_ms(timestamp: u32) -> i64 {
284 i64::from(timestamp) * 1000
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct BinlogPosition {
290 pub filename: String,
292 pub position: u64,
294 pub gtid: Option<Gtid>,
296}
297
298impl BinlogPosition {
299 #[must_use]
301 pub fn new(filename: String, position: u64) -> Self {
302 Self {
303 filename,
304 position,
305 gtid: None,
306 }
307 }
308
309 #[must_use]
311 pub fn with_gtid(filename: String, position: u64, gtid: Gtid) -> Self {
312 Self {
313 filename,
314 position,
315 gtid: Some(gtid),
316 }
317 }
318}
319
320impl std::fmt::Display for BinlogPosition {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 if let Some(ref gtid) = self.gtid {
323 write!(f, "{}:{} (GTID: {})", self.filename, self.position, gtid)
324 } else {
325 write!(f, "{}:{}", self.filename, self.position)
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_column_value_null() {
336 let val = ColumnValue::Null;
337 assert!(val.is_null());
338 assert_eq!(val.to_text(), "");
339 }
340
341 #[test]
342 fn test_column_value_int() {
343 let val = ColumnValue::SignedInt(-42);
344 assert!(!val.is_null());
345 assert_eq!(val.as_i64(), Some(-42));
346 assert_eq!(val.to_text(), "-42");
347
348 let val = ColumnValue::UnsignedInt(100);
349 assert_eq!(val.as_i64(), Some(100));
350 assert_eq!(val.to_text(), "100");
351 }
352
353 #[test]
354 fn test_column_value_float() {
355 let val = ColumnValue::Float(3.14);
356 assert_eq!(val.to_text(), "3.14");
357
358 let val = ColumnValue::Double(2.718281828);
359 assert!(val.to_text().starts_with("2.718"));
360 }
361
362 #[test]
363 fn test_column_value_string() {
364 let val = ColumnValue::String("hello".to_string());
365 assert_eq!(val.as_string(), Some("hello"));
366 assert_eq!(val.to_text(), "hello");
367 }
368
369 #[test]
370 fn test_column_value_bytes() {
371 let val = ColumnValue::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]);
372 assert_eq!(val.to_text(), "deadbeef");
373 }
374
375 #[test]
376 fn test_column_value_date() {
377 let val = ColumnValue::Date(2024, 6, 15);
378 assert_eq!(val.to_text(), "2024-06-15");
379 }
380
381 #[test]
382 fn test_column_value_time() {
383 let val = ColumnValue::Time(14, 30, 45, 0);
384 assert_eq!(val.to_text(), "14:30:45");
385
386 let val = ColumnValue::Time(14, 30, 45, 123456);
387 assert_eq!(val.to_text(), "14:30:45.123456");
388 }
389
390 #[test]
391 fn test_column_value_datetime() {
392 let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0);
393 assert_eq!(val.to_text(), "2024-06-15 14:30:45");
394
395 let val = ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 500000);
396 assert_eq!(val.to_text(), "2024-06-15 14:30:45.500000");
397 }
398
399 #[test]
400 fn test_column_value_json() {
401 let val = ColumnValue::Json(r#"{"key": "value"}"#.to_string());
402 assert_eq!(val.as_string(), Some(r#"{"key": "value"}"#));
403 }
404
405 #[test]
406 fn test_binlog_position() {
407 let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
408 assert_eq!(pos.to_string(), "mysql-bin.000003:12345");
409 assert!(pos.gtid.is_none());
410 }
411
412 #[test]
413 fn test_binlog_position_with_gtid() {
414 let gtid: Gtid = "3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap();
415 let pos = BinlogPosition::with_gtid("mysql-bin.000003".to_string(), 12345, gtid);
416 assert!(pos.gtid.is_some());
417 let s = pos.to_string();
418 assert!(s.contains("GTID"));
419 }
420
421 #[test]
422 fn test_mysql_timestamp_to_unix_ms() {
423 let unix_ms = mysql_timestamp_to_unix_ms(1704067200);
425 assert_eq!(unix_ms, 1704067200000);
426 }
427
428 #[test]
429 fn test_row_data() {
430 let row = RowData {
431 columns: vec![
432 ColumnValue::SignedInt(1),
433 ColumnValue::String("Alice".to_string()),
434 ColumnValue::Null,
435 ],
436 };
437 assert_eq!(row.columns.len(), 3);
438 assert_eq!(row.columns[0].as_i64(), Some(1));
439 assert!(row.columns[2].is_null());
440 }
441
442 #[test]
443 fn test_update_row_data() {
444 let update = UpdateRowData {
445 before: RowData {
446 columns: vec![ColumnValue::String("old".to_string())],
447 },
448 after: RowData {
449 columns: vec![ColumnValue::String("new".to_string())],
450 },
451 };
452 assert_eq!(update.before.columns[0].as_string(), Some("old"));
453 assert_eq!(update.after.columns[0].as_string(), Some("new"));
454 }
455
456 #[test]
457 fn test_begin_message() {
458 let msg = BeginMessage {
459 gtid: Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:5".parse().unwrap()),
460 binlog_filename: "mysql-bin.000003".to_string(),
461 binlog_position: 12345,
462 timestamp_ms: 1704067200000,
463 };
464 assert!(msg.gtid.is_some());
465 }
466
467 #[test]
468 fn test_insert_message() {
469 let msg = InsertMessage {
470 table_id: 100,
471 database: "mydb".to_string(),
472 table: "users".to_string(),
473 rows: vec![RowData {
474 columns: vec![ColumnValue::SignedInt(1)],
475 }],
476 binlog_position: 12345,
477 timestamp_ms: 1704067200000,
478 };
479 assert_eq!(msg.rows.len(), 1);
480 }
481}