1#[cfg(feature = "mysql-cdc")]
4use std::time::Duration;
5
6#[cfg(feature = "mysql-cdc")]
7use tokio_stream::StreamExt as _;
8
9#[cfg(feature = "mysql-cdc")]
10use mysql_async::binlog::events::{Event as MysqlEvent, EventData, RowsEventData};
11#[cfg(feature = "mysql-cdc")]
12use mysql_async::binlog::row::BinlogRow;
13#[cfg(feature = "mysql-cdc")]
14use mysql_async::binlog::value::BinlogValue;
15#[cfg(feature = "mysql-cdc")]
16use mysql_async::{BinlogStream, BinlogStreamRequest, Conn, GnoInterval, OptsBuilder, Sid};
17
18#[cfg(feature = "mysql-cdc")]
19use tracing::{debug, info};
20
21#[cfg(feature = "mysql-cdc")]
22use crate::error::ConnectorError;
23
24#[cfg(feature = "mysql-cdc")]
25use super::config::{MySqlCdcConfig, SslMode};
26#[cfg(feature = "mysql-cdc")]
27use super::decoder::{
28 BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DeleteMessage,
29 InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage, UpdateMessage,
30 UpdateRowData,
31};
32#[cfg(feature = "mysql-cdc")]
33use super::gtid::{Gtid, GtidSet};
34#[cfg(feature = "mysql-cdc")]
35use super::types::MySqlColumn;
36
37#[cfg(feature = "mysql-cdc")]
45pub async fn connect(config: &MySqlCdcConfig) -> Result<Conn, ConnectorError> {
46 info!(
47 host = %config.host,
48 port = config.port,
49 user = %config.username,
50 ssl_mode = %config.ssl_mode,
51 "connecting to MySQL server"
52 );
53
54 let mut opts = OptsBuilder::default()
55 .ip_or_hostname(&config.host)
56 .tcp_port(config.port)
57 .user(Some(&config.username))
58 .prefer_socket(Some(false));
59
60 if let Some(ref password) = config.password {
61 opts = opts.pass(Some(password));
62 }
63
64 if let Some(ref database) = config.database {
65 opts = opts.db_name(Some(database));
66 }
67
68 opts = match config.ssl_mode {
70 SslMode::Disabled => opts,
71 SslMode::Preferred => opts.ssl_opts(Some(
72 mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true),
73 )),
74 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
75 opts.ssl_opts(Some(mysql_async::SslOpts::default()))
76 }
77 };
78
79 let conn = Conn::new(opts).await.map_err(|e| {
80 ConnectorError::ConnectionFailed(format!(
81 "failed to connect to MySQL {}:{}: {}",
82 config.host, config.port, e
83 ))
84 })?;
85
86 info!(
87 host = %config.host,
88 port = config.port,
89 "connected to MySQL server"
90 );
91
92 Ok(conn)
93}
94
95#[cfg(feature = "mysql-cdc")]
110pub async fn start_binlog_stream(
111 conn: Conn,
112 config: &MySqlCdcConfig,
113 gtid_set: Option<&GtidSet>,
114 position: Option<&BinlogPosition>,
115) -> Result<BinlogStream, ConnectorError> {
116 let mut request = BinlogStreamRequest::new(config.server_id);
117
118 if config.use_gtid {
119 request = request.with_gtid();
120
121 if let Some(gtid_set) = gtid_set {
123 let sids = gtid_set_to_sids(gtid_set);
124 if sids.is_empty() {
125 info!("starting GTID-based binlog replication from beginning");
126 } else {
127 request = request.with_gtid_set(sids);
128 info!("starting GTID-based binlog replication from {}", gtid_set);
129 }
130 } else {
131 info!("starting GTID-based binlog replication from beginning");
132 }
133 } else {
134 if let Some(pos) = position {
136 request = request
137 .with_filename(pos.filename.as_bytes())
138 .with_pos(pos.position);
139 info!(
140 "starting binlog replication from {}:{}",
141 pos.filename, pos.position
142 );
143 } else if let Some(ref filename) = config.binlog_filename {
144 let pos = config.binlog_position.unwrap_or(4);
145 request = request.with_filename(filename.as_bytes()).with_pos(pos);
146 info!("starting binlog replication from {}:{}", filename, pos);
147 } else {
148 info!("starting binlog replication from current position");
149 }
150 }
151
152 let stream = conn.get_binlog_stream(request).await.map_err(|e| {
153 ConnectorError::ConnectionFailed(format!("failed to start binlog stream: {e}"))
154 })?;
155
156 Ok(stream)
157}
158
159#[cfg(feature = "mysql-cdc")]
167pub async fn read_events(
168 stream: &mut BinlogStream,
169 max_events: usize,
170 timeout: Duration,
171) -> Result<Vec<MysqlEvent>, ConnectorError> {
172 let mut events = Vec::with_capacity(max_events.min(64));
173 let deadline = tokio::time::Instant::now() + timeout;
174
175 loop {
176 if events.len() >= max_events {
177 break;
178 }
179
180 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
181 if remaining.is_zero() {
182 break;
183 }
184
185 match tokio::time::timeout(remaining, stream.next()).await {
186 Ok(Some(Ok(event))) => {
187 events.push(event);
188 }
189 Ok(Some(Err(e))) => {
190 return Err(ConnectorError::ReadError(format!(
192 "binlog stream error: {e}"
193 )));
194 }
195 Ok(None) => {
196 debug!("binlog stream ended");
198 break;
199 }
200 Err(_) => {
201 break;
203 }
204 }
205 }
206
207 Ok(events)
208}
209
210#[cfg(feature = "mysql-cdc")]
223pub fn decode_binlog_event(
224 event: &MysqlEvent,
225 stream: &BinlogStream,
226) -> Result<Option<BinlogMessage>, ConnectorError> {
227 let header = event.header();
228 let timestamp_ms = i64::from(header.timestamp()) * 1000;
229 let log_pos = u64::from(header.log_pos());
230
231 let event_data = event
232 .read_data()
233 .map_err(|e| ConnectorError::Internal(format!("failed to parse binlog event: {e}")))?;
234
235 let Some(event_data) = event_data else {
236 return Ok(None);
237 };
238
239 match event_data {
240 EventData::TableMapEvent(tme) => {
241 let columns = extract_columns_from_tme(&tme);
242 Ok(Some(BinlogMessage::TableMap(TableMapMessage {
243 table_id: tme.table_id(),
244 database: tme.database_name().into_owned(),
245 table: tme.table_name().into_owned(),
246 columns,
247 })))
248 }
249
250 EventData::RowsEvent(rows_event) => {
251 let table_id = rows_event.table_id();
252
253 let tme = stream.get_tme(table_id).ok_or_else(|| {
255 ConnectorError::Internal(format!("missing TABLE_MAP for table_id {table_id}"))
256 })?;
257
258 let database = tme.database_name().into_owned();
259 let table = tme.table_name().into_owned();
260
261 match &rows_event {
262 RowsEventData::WriteRowsEvent(_) | RowsEventData::WriteRowsEventV1(_) => {
263 let rows = decode_rows_after(&rows_event, tme)?;
264 Ok(Some(BinlogMessage::Insert(InsertMessage {
265 table_id,
266 database,
267 table,
268 rows,
269 binlog_position: log_pos,
270 timestamp_ms,
271 })))
272 }
273
274 RowsEventData::UpdateRowsEvent(_)
275 | RowsEventData::UpdateRowsEventV1(_)
276 | RowsEventData::PartialUpdateRowsEvent(_) => {
277 let rows = decode_rows_update(&rows_event, tme)?;
278 Ok(Some(BinlogMessage::Update(UpdateMessage {
279 table_id,
280 database,
281 table,
282 rows,
283 binlog_position: log_pos,
284 timestamp_ms,
285 })))
286 }
287
288 RowsEventData::DeleteRowsEvent(_) | RowsEventData::DeleteRowsEventV1(_) => {
289 let rows = decode_rows_before(&rows_event, tme)?;
290 Ok(Some(BinlogMessage::Delete(DeleteMessage {
291 table_id,
292 database,
293 table,
294 rows,
295 binlog_position: log_pos,
296 timestamp_ms,
297 })))
298 }
299 }
300 }
301
302 EventData::QueryEvent(qe) => Ok(Some(BinlogMessage::Query(QueryMessage {
303 database: qe.schema().into_owned(),
304 query: qe.query().into_owned(),
305 binlog_position: log_pos,
306 timestamp_ms,
307 }))),
308
309 EventData::GtidEvent(ge) => {
310 let gtid = sid_bytes_to_gtid(ge.sid(), ge.gno());
311 Ok(Some(BinlogMessage::Begin(BeginMessage {
312 gtid: Some(gtid),
313 binlog_filename: String::new(), binlog_position: log_pos,
315 timestamp_ms,
316 })))
317 }
318
319 EventData::AnonymousGtidEvent(_) => Ok(Some(BinlogMessage::Begin(BeginMessage {
320 gtid: None,
321 binlog_filename: String::new(),
322 binlog_position: log_pos,
323 timestamp_ms,
324 }))),
325
326 EventData::RotateEvent(re) => Ok(Some(BinlogMessage::Rotate(RotateMessage {
327 next_binlog: re.name().into_owned(),
328 position: re.position(),
329 }))),
330
331 EventData::XidEvent(xe) => Ok(Some(BinlogMessage::Commit(CommitMessage {
332 xid: xe.xid,
333 binlog_position: log_pos,
334 timestamp_ms,
335 }))),
336
337 EventData::HeartbeatEvent => Ok(Some(BinlogMessage::Heartbeat)),
338
339 _ => Ok(None),
341 }
342}
343
344#[cfg(feature = "mysql-cdc")]
346fn binlog_value_to_column_value(val: &BinlogValue<'_>) -> ColumnValue {
347 match val {
348 BinlogValue::Value(v) => mysql_value_to_column_value(v),
349 BinlogValue::Jsonb(jsonb_val) => ColumnValue::Json(format!("{jsonb_val:?}")),
350 BinlogValue::JsonDiff(_diffs) => {
351 ColumnValue::Json("{}".to_string())
353 }
354 }
355}
356
357#[cfg(feature = "mysql-cdc")]
359fn mysql_value_to_column_value(val: &mysql_async::Value) -> ColumnValue {
360 match val {
361 mysql_async::Value::NULL => ColumnValue::Null,
362 mysql_async::Value::Int(v) => ColumnValue::SignedInt(*v),
363 mysql_async::Value::UInt(v) => ColumnValue::UnsignedInt(*v),
364 mysql_async::Value::Float(v) => ColumnValue::Float(*v),
365 mysql_async::Value::Double(v) => ColumnValue::Double(*v),
366 mysql_async::Value::Bytes(b) => {
367 match String::from_utf8(b.clone()) {
369 Ok(s) => ColumnValue::String(s),
370 Err(_) => ColumnValue::Bytes(b.clone()),
371 }
372 }
373 mysql_async::Value::Date(year, month, day, hour, min, sec, micro) => {
374 if *hour == 0 && *min == 0 && *sec == 0 && *micro == 0 {
375 ColumnValue::Date(i32::from(*year), u32::from(*month), u32::from(*day))
376 } else {
377 ColumnValue::DateTime(
378 i32::from(*year),
379 u32::from(*month),
380 u32::from(*day),
381 u32::from(*hour),
382 u32::from(*min),
383 u32::from(*sec),
384 *micro,
385 )
386 }
387 }
388 mysql_async::Value::Time(is_negative, days, hours, minutes, seconds, micros) => {
389 #[allow(clippy::cast_possible_wrap)]
392 let total_hours = (*days as i32) * 24 + i32::from(*hours);
393 let h = if *is_negative {
394 -total_hours
395 } else {
396 total_hours
397 };
398 ColumnValue::Time(h, u32::from(*minutes), u32::from(*seconds), *micros)
399 }
400 }
401}
402
403#[cfg(feature = "mysql-cdc")]
405fn extract_columns_from_tme(
406 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
407) -> Vec<MySqlColumn> {
408 #[allow(clippy::cast_possible_truncation)]
409 let count = tme.columns_count() as usize;
410 let null_bitmap = tme.null_bitmask();
411
412 let col_names = extract_column_names(tme);
414
415 (0..count)
416 .map(|i| {
417 let type_id = tme
418 .get_raw_column_type(i)
419 .ok()
420 .flatten()
421 .map_or(0xFF, |ct| ct as u8);
422
423 let nullable = null_bitmap.get(i).as_deref().copied().unwrap_or(true);
424
425 let name = col_names
426 .get(i)
427 .cloned()
428 .unwrap_or_else(|| format!("col_{i}"));
429
430 MySqlColumn::new(name, type_id, 0, nullable, false)
431 })
432 .collect()
433}
434
435#[cfg(feature = "mysql-cdc")]
437fn extract_column_names(tme: &mysql_async::binlog::events::TableMapEvent<'_>) -> Vec<String> {
438 let mut names = Vec::new();
439 for meta in tme.iter_optional_meta().flatten() {
440 use mysql_async::binlog::events::OptionalMetadataField;
441 if let OptionalMetadataField::ColumnName(col_names) = meta {
442 for name in col_names.iter_names().flatten() {
443 names.push(name.name().into_owned());
444 }
445 }
446 }
447 names
448}
449
450#[cfg(feature = "mysql-cdc")]
452fn decode_rows_after(
453 rows_event: &RowsEventData<'_>,
454 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
455) -> Result<Vec<RowData>, ConnectorError> {
456 let mut rows = Vec::new();
457 for row_result in rows_event.rows(tme) {
458 let (_before, after) = row_result
459 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
460 if let Some(row) = after {
461 rows.push(binlog_row_to_row_data(&row));
462 }
463 }
464 Ok(rows)
465}
466
467#[cfg(feature = "mysql-cdc")]
469fn decode_rows_before(
470 rows_event: &RowsEventData<'_>,
471 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
472) -> Result<Vec<RowData>, ConnectorError> {
473 let mut rows = Vec::new();
474 for row_result in rows_event.rows(tme) {
475 let (before, _after) = row_result
476 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
477 if let Some(row) = before {
478 rows.push(binlog_row_to_row_data(&row));
479 }
480 }
481 Ok(rows)
482}
483
484#[cfg(feature = "mysql-cdc")]
486fn decode_rows_update(
487 rows_event: &RowsEventData<'_>,
488 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
489) -> Result<Vec<UpdateRowData>, ConnectorError> {
490 let mut rows = Vec::new();
491 for row_result in rows_event.rows(tme) {
492 let (before, after) = row_result
493 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
494
495 let before_data = before.map_or_else(
496 || RowData {
497 columns: Vec::new(),
498 },
499 |r| binlog_row_to_row_data(&r),
500 );
501
502 let after_data = after.map_or_else(
503 || RowData {
504 columns: Vec::new(),
505 },
506 |r| binlog_row_to_row_data(&r),
507 );
508
509 rows.push(UpdateRowData {
510 before: before_data,
511 after: after_data,
512 });
513 }
514 Ok(rows)
515}
516
517#[cfg(feature = "mysql-cdc")]
519fn binlog_row_to_row_data(row: &BinlogRow) -> RowData {
520 let columns = (0..row.len())
521 .map(|i| {
522 row.as_ref(i)
523 .map_or(ColumnValue::Null, binlog_value_to_column_value)
524 })
525 .collect();
526 RowData { columns }
527}
528
529#[cfg(feature = "mysql-cdc")]
531fn sid_bytes_to_gtid(sid: [u8; 16], gno: u64) -> Gtid {
532 let uuid = uuid::Uuid::from_bytes(sid);
533 Gtid::new(uuid, gno)
534}
535
536#[cfg(feature = "mysql-cdc")]
538fn gtid_set_to_sids(gtid_set: &GtidSet) -> Vec<Sid<'static>> {
539 let mut sids = Vec::new();
540
541 for (source_id, ranges) in gtid_set.iter_sets() {
542 let uuid_bytes = source_id.into_bytes();
543 let mut sid = Sid::new(uuid_bytes);
544
545 for range in ranges {
546 let interval = GnoInterval::new(range.start, range.end + 1);
548 sid = sid.with_interval(interval);
549 }
550
551 sids.push(sid);
552 }
553
554 sids
555}
556
557#[cfg(feature = "mysql-cdc")]
561#[must_use]
562pub fn build_ssl_opts(ssl_mode: &SslMode) -> Option<mysql_async::SslOpts> {
563 match ssl_mode {
564 SslMode::Disabled => None,
565 SslMode::Preferred => {
566 Some(mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true))
567 }
568 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
569 Some(mysql_async::SslOpts::default())
570 }
571 }
572}
573
574#[cfg(feature = "mysql-cdc")]
576#[must_use]
577pub fn build_opts(config: &MySqlCdcConfig) -> OptsBuilder {
578 let mut opts = OptsBuilder::default()
579 .ip_or_hostname(&config.host)
580 .tcp_port(config.port)
581 .user(Some(&config.username))
582 .prefer_socket(Some(false));
583
584 if let Some(ref password) = config.password {
585 opts = opts.pass(Some(password));
586 }
587
588 if let Some(ref database) = config.database {
589 opts = opts.db_name(Some(database));
590 }
591
592 if let Some(ssl_opts) = build_ssl_opts(&config.ssl_mode) {
593 opts = opts.ssl_opts(Some(ssl_opts));
594 }
595
596 opts
597}
598
599#[cfg(feature = "mysql-cdc")]
601#[must_use]
602pub fn build_binlog_request<'a>(
603 config: &'a MySqlCdcConfig,
604 gtid_set: Option<&GtidSet>,
605 position: Option<&'a BinlogPosition>,
606) -> BinlogStreamRequest<'a> {
607 let mut request = BinlogStreamRequest::new(config.server_id);
608
609 if config.use_gtid {
610 request = request.with_gtid();
611
612 if let Some(gtid_set) = gtid_set {
613 let sids = gtid_set_to_sids(gtid_set);
614 if !sids.is_empty() {
615 request = request.with_gtid_set(sids);
616 }
617 }
618 } else if let Some(pos) = position {
619 request = request
620 .with_filename(pos.filename.as_bytes())
621 .with_pos(pos.position);
622 } else if let Some(ref filename) = config.binlog_filename {
623 let pos = config.binlog_position.unwrap_or(4);
624 request = request.with_filename(filename.as_bytes()).with_pos(pos);
625 }
626
627 request
628}
629
630#[cfg(all(test, feature = "mysql-cdc"))]
635mod tests {
636 use super::*;
637 use mysql_async::Opts;
638
639 fn test_config() -> MySqlCdcConfig {
640 MySqlCdcConfig {
641 host: "localhost".to_string(),
642 port: 3306,
643 database: Some("testdb".to_string()),
644 username: "root".to_string(),
645 password: Some("secret".to_string()),
646 server_id: 12345,
647 ..Default::default()
648 }
649 }
650
651 #[test]
652 fn test_connect_builds_opts() {
653 let config = test_config();
654 let opts_builder = build_opts(&config);
655
656 let opts: Opts = opts_builder.into();
658
659 assert_eq!(opts.ip_or_hostname(), "localhost");
660 assert_eq!(opts.tcp_port(), 3306);
661 assert_eq!(opts.user(), Some("root"));
662 assert_eq!(opts.pass(), Some("secret"));
663 assert_eq!(opts.db_name(), Some("testdb"));
664 }
665
666 #[test]
667 fn test_connect_builds_opts_no_password() {
668 let config = MySqlCdcConfig {
669 host: "dbhost".to_string(),
670 port: 3307,
671 database: None,
672 username: "repl".to_string(),
673 password: None,
674 server_id: 999,
675 ..Default::default()
676 };
677 let opts_builder = build_opts(&config);
678 let opts: Opts = opts_builder.into();
679
680 assert_eq!(opts.ip_or_hostname(), "dbhost");
681 assert_eq!(opts.tcp_port(), 3307);
682 assert_eq!(opts.user(), Some("repl"));
683 assert_eq!(opts.pass(), None);
684 assert_eq!(opts.db_name(), None);
685 }
686
687 #[test]
688 fn test_ssl_opts_disabled() {
689 let ssl = build_ssl_opts(&SslMode::Disabled);
690 assert!(ssl.is_none());
691 }
692
693 #[test]
694 fn test_ssl_opts_preferred() {
695 let ssl = build_ssl_opts(&SslMode::Preferred);
696 assert!(ssl.is_some());
697 let ssl = ssl.unwrap();
698 assert!(ssl.accept_invalid_certs());
699 }
700
701 #[test]
702 fn test_ssl_opts_required() {
703 let ssl = build_ssl_opts(&SslMode::Required);
704 assert!(ssl.is_some());
705 let ssl = ssl.unwrap();
706 assert!(!ssl.accept_invalid_certs());
707 }
708
709 #[test]
710 fn test_ssl_opts_verify_ca() {
711 let ssl = build_ssl_opts(&SslMode::VerifyCa);
712 assert!(ssl.is_some());
713 }
714
715 #[test]
716 fn test_ssl_opts_verify_identity() {
717 let ssl = build_ssl_opts(&SslMode::VerifyIdentity);
718 assert!(ssl.is_some());
719 }
720
721 #[test]
722 fn test_binlog_request_gtid_mode() {
723 let config = MySqlCdcConfig {
724 server_id: 999,
725 use_gtid: true,
726 ..Default::default()
727 };
728
729 let _request = build_binlog_request(&config, None, None);
731
732 let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap();
734 let _request = build_binlog_request(&config, Some(>id_set), None);
735 }
736
737 #[test]
738 fn test_binlog_request_file_position() {
739 let config = MySqlCdcConfig {
740 server_id: 999,
741 use_gtid: false,
742 ..Default::default()
743 };
744
745 let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
746 let _request = build_binlog_request(&config, None, Some(&pos));
747 }
748
749 #[test]
750 fn test_binlog_request_config_filename() {
751 let config = MySqlCdcConfig {
752 server_id: 999,
753 use_gtid: false,
754 binlog_filename: Some("mysql-bin.000001".to_string()),
755 binlog_position: Some(154),
756 ..Default::default()
757 };
758
759 let _request = build_binlog_request(&config, None, None);
760 }
761
762 #[test]
763 fn test_mysql_value_to_column_value_null() {
764 let val = mysql_value_to_column_value(&mysql_async::Value::NULL);
765 assert_eq!(val, ColumnValue::Null);
766 }
767
768 #[test]
769 fn test_mysql_value_to_column_value_int() {
770 let val = mysql_value_to_column_value(&mysql_async::Value::Int(-42));
771 assert_eq!(val, ColumnValue::SignedInt(-42));
772
773 let val = mysql_value_to_column_value(&mysql_async::Value::UInt(100));
774 assert_eq!(val, ColumnValue::UnsignedInt(100));
775 }
776
777 #[test]
778 fn test_mysql_value_to_column_value_float() {
779 let val = mysql_value_to_column_value(&mysql_async::Value::Float(3.14));
780 assert_eq!(val, ColumnValue::Float(3.14));
781
782 let val = mysql_value_to_column_value(&mysql_async::Value::Double(2.718));
783 assert_eq!(val, ColumnValue::Double(2.718));
784 }
785
786 #[test]
787 fn test_mysql_value_to_column_value_string() {
788 let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(b"hello world".to_vec()));
789 assert_eq!(val, ColumnValue::String("hello world".to_string()));
790 }
791
792 #[test]
793 fn test_mysql_value_to_column_value_bytes() {
794 let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(vec![0xFF, 0xFE, 0xFD]));
796 assert_eq!(val, ColumnValue::Bytes(vec![0xFF, 0xFE, 0xFD]));
797 }
798
799 #[test]
800 fn test_mysql_value_to_column_value_date() {
801 let val = mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 0, 0, 0, 0));
803 assert_eq!(val, ColumnValue::Date(2024, 6, 15));
804
805 let val =
807 mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 14, 30, 45, 0));
808 assert_eq!(val, ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0));
809 }
810
811 #[test]
812 fn test_mysql_value_to_column_value_time() {
813 let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 0, 14, 30, 45, 0));
814 assert_eq!(val, ColumnValue::Time(14, 30, 45, 0));
815
816 let val = mysql_value_to_column_value(&mysql_async::Value::Time(true, 0, 2, 30, 0, 0));
818 assert_eq!(val, ColumnValue::Time(-2, 30, 0, 0));
819
820 let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 1, 6, 0, 0, 0));
822 assert_eq!(val, ColumnValue::Time(30, 0, 0, 0));
823 }
824
825 #[test]
826 fn test_sid_bytes_to_gtid() {
827 let uuid = uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap();
828 let gtid = sid_bytes_to_gtid(uuid.into_bytes(), 42);
829 assert_eq!(gtid.source_id(), uuid);
830 assert_eq!(gtid.transaction_id(), 42);
831 }
832
833 #[test]
834 fn test_gtid_set_to_sids_empty() {
835 let gtid_set = GtidSet::new();
836 let sids = gtid_set_to_sids(>id_set);
837 assert!(sids.is_empty());
838 }
839
840 #[test]
841 fn test_gtid_set_to_sids_single() {
842 let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10".parse().unwrap();
843 let sids = gtid_set_to_sids(>id_set);
844 assert_eq!(sids.len(), 1);
845 }
846
847 #[test]
848 fn test_gtid_set_to_sids_multiple() {
849 let gtid_set: GtidSet =
850 "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10,4E11FA47-71CA-11E1-9E33-C80AA9429563:1-5"
851 .parse()
852 .unwrap();
853 let sids = gtid_set_to_sids(>id_set);
854 assert_eq!(sids.len(), 2);
855 }
856}