1#[cfg(feature = "mysql-cdc")]
4use std::time::Duration;
5
6#[cfg(feature = "mysql-cdc")]
7use tokio_stream::StreamExt as _;
8
9#[cfg(feature = "mysql-cdc")]
10use mysql_async::binlog::events::{Event as MysqlEvent, EventData, RowsEventData};
11#[cfg(feature = "mysql-cdc")]
12use mysql_async::binlog::row::BinlogRow;
13#[cfg(feature = "mysql-cdc")]
14use mysql_async::binlog::value::BinlogValue;
15#[cfg(feature = "mysql-cdc")]
16use mysql_async::{BinlogStream, BinlogStreamRequest, Conn, GnoInterval, OptsBuilder, Sid};
17
18#[cfg(feature = "mysql-cdc")]
19use tracing::{debug, info};
20
21#[cfg(feature = "mysql-cdc")]
22use crate::error::ConnectorError;
23
24#[cfg(feature = "mysql-cdc")]
25use super::config::{MySqlCdcConfig, SslMode};
26#[cfg(feature = "mysql-cdc")]
27use super::decoder::{
28 BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DeleteMessage,
29 InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage, UpdateMessage,
30 UpdateRowData,
31};
32#[cfg(feature = "mysql-cdc")]
33use super::gtid::{Gtid, GtidSet};
34#[cfg(feature = "mysql-cdc")]
35use super::types::MySqlColumn;
36
37#[cfg(feature = "mysql-cdc")]
45pub async fn connect(config: &MySqlCdcConfig) -> Result<Conn, ConnectorError> {
46 info!(
47 host = %config.host,
48 port = config.port,
49 user = %config.username,
50 ssl_mode = %config.ssl_mode,
51 "connecting to MySQL server"
52 );
53
54 let mut opts = OptsBuilder::default()
55 .ip_or_hostname(&config.host)
56 .tcp_port(config.port)
57 .user(Some(&config.username))
58 .prefer_socket(Some(false));
59
60 if let Some(ref password) = config.password {
61 opts = opts.pass(Some(password));
62 }
63
64 if let Some(ref database) = config.database {
65 opts = opts.db_name(Some(database));
66 }
67
68 opts = match config.ssl_mode {
70 SslMode::Disabled => opts,
71 SslMode::Preferred => opts.ssl_opts(Some(
72 mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true),
73 )),
74 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
75 opts.ssl_opts(Some(mysql_async::SslOpts::default()))
76 }
77 };
78
79 let conn = Conn::new(opts).await.map_err(|e| {
80 ConnectorError::ConnectionFailed(format!(
81 "failed to connect to MySQL {}:{}: {}",
82 config.host, config.port, e
83 ))
84 })?;
85
86 info!(
87 host = %config.host,
88 port = config.port,
89 "connected to MySQL server"
90 );
91
92 Ok(conn)
93}
94
95#[cfg(feature = "mysql-cdc")]
110pub async fn start_binlog_stream(
111 conn: Conn,
112 config: &MySqlCdcConfig,
113 gtid_set: Option<&GtidSet>,
114 position: Option<&BinlogPosition>,
115) -> Result<BinlogStream, ConnectorError> {
116 let mut request = BinlogStreamRequest::new(config.server_id);
117
118 if config.use_gtid {
119 request = request.with_gtid();
120
121 if let Some(gtid_set) = gtid_set {
123 let sids = gtid_set_to_sids(gtid_set);
124 if sids.is_empty() {
125 info!("starting GTID-based binlog replication from beginning");
126 } else {
127 request = request.with_gtid_set(sids);
128 info!("starting GTID-based binlog replication from {}", gtid_set);
129 }
130 } else {
131 info!("starting GTID-based binlog replication from beginning");
132 }
133 } else {
134 if let Some(pos) = position {
136 request = request
137 .with_filename(pos.filename.as_bytes())
138 .with_pos(pos.position);
139 info!(
140 "starting binlog replication from {}:{}",
141 pos.filename, pos.position
142 );
143 } else if let Some(ref filename) = config.binlog_filename {
144 let pos = config.binlog_position.unwrap_or(4);
145 request = request.with_filename(filename.as_bytes()).with_pos(pos);
146 info!("starting binlog replication from {}:{}", filename, pos);
147 } else {
148 info!("starting binlog replication from current position");
149 }
150 }
151
152 let stream = conn.get_binlog_stream(request).await.map_err(|e| {
153 ConnectorError::ConnectionFailed(format!("failed to start binlog stream: {e}"))
154 })?;
155
156 Ok(stream)
157}
158
159#[cfg(feature = "mysql-cdc")]
167pub async fn read_events(
168 stream: &mut BinlogStream,
169 max_events: usize,
170 timeout: Duration,
171) -> Result<Vec<MysqlEvent>, ConnectorError> {
172 let mut events = Vec::with_capacity(max_events.min(64));
173 let deadline = tokio::time::Instant::now() + timeout;
174
175 loop {
176 if events.len() >= max_events {
177 break;
178 }
179
180 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
181 if remaining.is_zero() {
182 break;
183 }
184
185 match tokio::time::timeout(remaining, stream.next()).await {
186 Ok(Some(Ok(event))) => {
187 events.push(event);
188 }
189 Ok(Some(Err(e))) => {
190 return Err(ConnectorError::ReadError(format!(
192 "binlog stream error: {e}"
193 )));
194 }
195 Ok(None) => {
196 debug!("binlog stream ended");
198 break;
199 }
200 Err(_) => {
201 break;
203 }
204 }
205 }
206
207 Ok(events)
208}
209
210#[cfg(feature = "mysql-cdc")]
223pub fn decode_binlog_event(
224 event: &MysqlEvent,
225 stream: &BinlogStream,
226) -> Result<Option<BinlogMessage>, ConnectorError> {
227 let header = event.header();
228 let timestamp_ms = i64::from(header.timestamp()) * 1000;
229 let log_pos = u64::from(header.log_pos());
230
231 let event_data = event
232 .read_data()
233 .map_err(|e| ConnectorError::Internal(format!("failed to parse binlog event: {e}")))?;
234
235 let Some(event_data) = event_data else {
236 return Ok(None);
237 };
238
239 match event_data {
240 EventData::TableMapEvent(tme) => {
241 let columns = extract_columns_from_tme(&tme);
242 Ok(Some(BinlogMessage::TableMap(TableMapMessage {
243 table_id: tme.table_id(),
244 database: tme.database_name().into_owned(),
245 table: tme.table_name().into_owned(),
246 columns,
247 })))
248 }
249
250 EventData::RowsEvent(rows_event) => {
251 let table_id = rows_event.table_id();
252
253 let tme = stream.get_tme(table_id).ok_or_else(|| {
255 ConnectorError::Internal(format!("missing TABLE_MAP for table_id {table_id}"))
256 })?;
257
258 let database = tme.database_name().into_owned();
259 let table = tme.table_name().into_owned();
260
261 match &rows_event {
262 RowsEventData::WriteRowsEvent(_) | RowsEventData::WriteRowsEventV1(_) => {
263 let rows = decode_rows_after(&rows_event, tme)?;
264 Ok(Some(BinlogMessage::Insert(InsertMessage {
265 table_id,
266 database,
267 table,
268 rows,
269 binlog_position: log_pos,
270 timestamp_ms,
271 })))
272 }
273
274 RowsEventData::UpdateRowsEvent(_)
275 | RowsEventData::UpdateRowsEventV1(_)
276 | RowsEventData::PartialUpdateRowsEvent(_) => {
277 let rows = decode_rows_update(&rows_event, tme)?;
278 Ok(Some(BinlogMessage::Update(UpdateMessage {
279 table_id,
280 database,
281 table,
282 rows,
283 binlog_position: log_pos,
284 timestamp_ms,
285 })))
286 }
287
288 RowsEventData::DeleteRowsEvent(_) | RowsEventData::DeleteRowsEventV1(_) => {
289 let rows = decode_rows_before(&rows_event, tme)?;
290 Ok(Some(BinlogMessage::Delete(DeleteMessage {
291 table_id,
292 database,
293 table,
294 rows,
295 binlog_position: log_pos,
296 timestamp_ms,
297 })))
298 }
299 }
300 }
301
302 EventData::QueryEvent(qe) => Ok(Some(BinlogMessage::Query(QueryMessage {
303 database: qe.schema().into_owned(),
304 query: qe.query().into_owned(),
305 binlog_position: log_pos,
306 timestamp_ms,
307 }))),
308
309 EventData::GtidEvent(ge) => {
310 let gtid = sid_bytes_to_gtid(ge.sid(), ge.gno());
311 Ok(Some(BinlogMessage::Begin(BeginMessage {
312 gtid: Some(gtid),
313 binlog_filename: String::new(), binlog_position: log_pos,
315 timestamp_ms,
316 })))
317 }
318
319 EventData::AnonymousGtidEvent(_) => Ok(Some(BinlogMessage::Begin(BeginMessage {
320 gtid: None,
321 binlog_filename: String::new(),
322 binlog_position: log_pos,
323 timestamp_ms,
324 }))),
325
326 EventData::RotateEvent(re) => Ok(Some(BinlogMessage::Rotate(RotateMessage {
327 next_binlog: re.name().into_owned(),
328 position: re.position(),
329 }))),
330
331 EventData::XidEvent(xe) => Ok(Some(BinlogMessage::Commit(CommitMessage {
332 xid: xe.xid,
333 binlog_position: log_pos,
334 timestamp_ms,
335 }))),
336
337 EventData::HeartbeatEvent => Ok(Some(BinlogMessage::Heartbeat)),
338
339 _ => Ok(None),
341 }
342}
343
344#[cfg(feature = "mysql-cdc")]
346fn binlog_value_to_column_value(val: &BinlogValue<'_>) -> ColumnValue {
347 match val {
348 BinlogValue::Value(v) => mysql_value_to_column_value(v),
349 BinlogValue::Jsonb(jsonb_val) => ColumnValue::Json(format!("{jsonb_val:?}")),
350 BinlogValue::JsonDiff(_diffs) => {
351 ColumnValue::Json("{}".to_string())
353 }
354 }
355}
356
357#[cfg(feature = "mysql-cdc")]
359fn mysql_value_to_column_value(val: &mysql_async::Value) -> ColumnValue {
360 match val {
361 mysql_async::Value::NULL => ColumnValue::Null,
362 mysql_async::Value::Int(v) => ColumnValue::SignedInt(*v),
363 mysql_async::Value::UInt(v) => ColumnValue::UnsignedInt(*v),
364 mysql_async::Value::Float(v) => ColumnValue::Float(*v),
365 mysql_async::Value::Double(v) => ColumnValue::Double(*v),
366 mysql_async::Value::Bytes(b) => {
367 match String::from_utf8(b.clone()) {
369 Ok(s) => ColumnValue::String(s),
370 Err(_) => ColumnValue::Bytes(b.clone()),
371 }
372 }
373 mysql_async::Value::Date(year, month, day, hour, min, sec, micro) => {
374 if *hour == 0 && *min == 0 && *sec == 0 && *micro == 0 {
375 ColumnValue::Date(i32::from(*year), u32::from(*month), u32::from(*day))
376 } else {
377 ColumnValue::DateTime(
378 i32::from(*year),
379 u32::from(*month),
380 u32::from(*day),
381 u32::from(*hour),
382 u32::from(*min),
383 u32::from(*sec),
384 *micro,
385 )
386 }
387 }
388 mysql_async::Value::Time(is_negative, days, hours, minutes, seconds, micros) => {
389 let total_hours_wide = i64::from(*days) * 24 + i64::from(*hours);
393 let total_hours = i32::try_from(total_hours_wide.min(838)).unwrap_or(838);
394 let h = if *is_negative {
395 -total_hours
396 } else {
397 total_hours
398 };
399 ColumnValue::Time(h, u32::from(*minutes), u32::from(*seconds), *micros)
400 }
401 }
402}
403
404#[cfg(feature = "mysql-cdc")]
406fn extract_columns_from_tme(
407 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
408) -> Vec<MySqlColumn> {
409 #[allow(clippy::cast_possible_truncation)]
410 let count = tme.columns_count() as usize;
411 let null_bitmap = tme.null_bitmask();
412
413 let col_names = extract_column_names(tme);
415
416 (0..count)
417 .map(|i| {
418 let type_id = tme
419 .get_raw_column_type(i)
420 .ok()
421 .flatten()
422 .map_or(0xFF, |ct| ct as u8);
423
424 let nullable = null_bitmap.get(i).as_deref().copied().unwrap_or(true);
425
426 let name = col_names
427 .get(i)
428 .cloned()
429 .unwrap_or_else(|| format!("col_{i}"));
430
431 MySqlColumn::new(name, type_id, 0, nullable, false)
432 })
433 .collect()
434}
435
436#[cfg(feature = "mysql-cdc")]
438fn extract_column_names(tme: &mysql_async::binlog::events::TableMapEvent<'_>) -> Vec<String> {
439 let mut names = Vec::new();
440 for meta in tme.iter_optional_meta().flatten() {
441 use mysql_async::binlog::events::OptionalMetadataField;
442 if let OptionalMetadataField::ColumnName(col_names) = meta {
443 for name in col_names.iter_names().flatten() {
444 names.push(name.name().into_owned());
445 }
446 }
447 }
448 names
449}
450
451#[cfg(feature = "mysql-cdc")]
453fn decode_rows_after(
454 rows_event: &RowsEventData<'_>,
455 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
456) -> Result<Vec<RowData>, ConnectorError> {
457 let mut rows = Vec::new();
458 for row_result in rows_event.rows(tme) {
459 let (_before, after) = row_result
460 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
461 if let Some(row) = after {
462 rows.push(binlog_row_to_row_data(&row));
463 }
464 }
465 Ok(rows)
466}
467
468#[cfg(feature = "mysql-cdc")]
470fn decode_rows_before(
471 rows_event: &RowsEventData<'_>,
472 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
473) -> Result<Vec<RowData>, ConnectorError> {
474 let mut rows = Vec::new();
475 for row_result in rows_event.rows(tme) {
476 let (before, _after) = row_result
477 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
478 if let Some(row) = before {
479 rows.push(binlog_row_to_row_data(&row));
480 }
481 }
482 Ok(rows)
483}
484
485#[cfg(feature = "mysql-cdc")]
487fn decode_rows_update(
488 rows_event: &RowsEventData<'_>,
489 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
490) -> Result<Vec<UpdateRowData>, ConnectorError> {
491 let mut rows = Vec::new();
492 for row_result in rows_event.rows(tme) {
493 let (before, after) = row_result
494 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
495
496 let before_data = before.map_or_else(
497 || RowData {
498 columns: Vec::new(),
499 },
500 |r| binlog_row_to_row_data(&r),
501 );
502
503 let after_data = after.map_or_else(
504 || RowData {
505 columns: Vec::new(),
506 },
507 |r| binlog_row_to_row_data(&r),
508 );
509
510 rows.push(UpdateRowData {
511 before: before_data,
512 after: after_data,
513 });
514 }
515 Ok(rows)
516}
517
518#[cfg(feature = "mysql-cdc")]
520fn binlog_row_to_row_data(row: &BinlogRow) -> RowData {
521 let columns = (0..row.len())
522 .map(|i| {
523 row.as_ref(i)
524 .map_or(ColumnValue::Null, binlog_value_to_column_value)
525 })
526 .collect();
527 RowData { columns }
528}
529
530#[cfg(feature = "mysql-cdc")]
532fn sid_bytes_to_gtid(sid: [u8; 16], gno: u64) -> Gtid {
533 let uuid = uuid::Uuid::from_bytes(sid);
534 Gtid::new(uuid, gno)
535}
536
537#[cfg(feature = "mysql-cdc")]
539fn gtid_set_to_sids(gtid_set: &GtidSet) -> Vec<Sid<'static>> {
540 let mut sids = Vec::new();
541
542 for (source_id, ranges) in gtid_set.iter_sets() {
543 let uuid_bytes = source_id.into_bytes();
544 let mut sid = Sid::new(uuid_bytes);
545
546 for range in ranges {
547 let interval = GnoInterval::new(range.start, range.end + 1);
549 sid = sid.with_interval(interval);
550 }
551
552 sids.push(sid);
553 }
554
555 sids
556}
557
558#[cfg(feature = "mysql-cdc")]
562#[must_use]
563pub fn build_ssl_opts(ssl_mode: &SslMode) -> Option<mysql_async::SslOpts> {
564 match ssl_mode {
565 SslMode::Disabled => None,
566 SslMode::Preferred => {
567 Some(mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true))
568 }
569 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
570 Some(mysql_async::SslOpts::default())
571 }
572 }
573}
574
575#[cfg(feature = "mysql-cdc")]
577#[must_use]
578pub fn build_opts(config: &MySqlCdcConfig) -> OptsBuilder {
579 let mut opts = OptsBuilder::default()
580 .ip_or_hostname(&config.host)
581 .tcp_port(config.port)
582 .user(Some(&config.username))
583 .prefer_socket(Some(false));
584
585 if let Some(ref password) = config.password {
586 opts = opts.pass(Some(password));
587 }
588
589 if let Some(ref database) = config.database {
590 opts = opts.db_name(Some(database));
591 }
592
593 if let Some(ssl_opts) = build_ssl_opts(&config.ssl_mode) {
594 opts = opts.ssl_opts(Some(ssl_opts));
595 }
596
597 opts
598}
599
600#[cfg(feature = "mysql-cdc")]
602#[must_use]
603pub fn build_binlog_request<'a>(
604 config: &'a MySqlCdcConfig,
605 gtid_set: Option<&GtidSet>,
606 position: Option<&'a BinlogPosition>,
607) -> BinlogStreamRequest<'a> {
608 let mut request = BinlogStreamRequest::new(config.server_id);
609
610 if config.use_gtid {
611 request = request.with_gtid();
612
613 if let Some(gtid_set) = gtid_set {
614 let sids = gtid_set_to_sids(gtid_set);
615 if !sids.is_empty() {
616 request = request.with_gtid_set(sids);
617 }
618 }
619 } else if let Some(pos) = position {
620 request = request
621 .with_filename(pos.filename.as_bytes())
622 .with_pos(pos.position);
623 } else if let Some(ref filename) = config.binlog_filename {
624 let pos = config.binlog_position.unwrap_or(4);
625 request = request.with_filename(filename.as_bytes()).with_pos(pos);
626 }
627
628 request
629}
630
631#[cfg(all(test, feature = "mysql-cdc"))]
636mod tests {
637 use super::*;
638 use mysql_async::Opts;
639
640 fn test_config() -> MySqlCdcConfig {
641 MySqlCdcConfig {
642 host: "localhost".to_string(),
643 port: 3306,
644 database: Some("testdb".to_string()),
645 username: "root".to_string(),
646 password: Some("secret".to_string()),
647 server_id: 12345,
648 ..Default::default()
649 }
650 }
651
652 #[test]
653 fn test_connect_builds_opts() {
654 let config = test_config();
655 let opts_builder = build_opts(&config);
656
657 let opts: Opts = opts_builder.into();
659
660 assert_eq!(opts.ip_or_hostname(), "localhost");
661 assert_eq!(opts.tcp_port(), 3306);
662 assert_eq!(opts.user(), Some("root"));
663 assert_eq!(opts.pass(), Some("secret"));
664 assert_eq!(opts.db_name(), Some("testdb"));
665 }
666
667 #[test]
668 fn test_connect_builds_opts_no_password() {
669 let config = MySqlCdcConfig {
670 host: "dbhost".to_string(),
671 port: 3307,
672 database: None,
673 username: "repl".to_string(),
674 password: None,
675 server_id: 999,
676 ..Default::default()
677 };
678 let opts_builder = build_opts(&config);
679 let opts: Opts = opts_builder.into();
680
681 assert_eq!(opts.ip_or_hostname(), "dbhost");
682 assert_eq!(opts.tcp_port(), 3307);
683 assert_eq!(opts.user(), Some("repl"));
684 assert_eq!(opts.pass(), None);
685 assert_eq!(opts.db_name(), None);
686 }
687
688 #[test]
689 fn test_ssl_opts_disabled() {
690 let ssl = build_ssl_opts(&SslMode::Disabled);
691 assert!(ssl.is_none());
692 }
693
694 #[test]
695 fn test_ssl_opts_preferred() {
696 let ssl = build_ssl_opts(&SslMode::Preferred);
697 assert!(ssl.is_some());
698 let ssl = ssl.unwrap();
699 assert!(ssl.accept_invalid_certs());
700 }
701
702 #[test]
703 fn test_ssl_opts_required() {
704 let ssl = build_ssl_opts(&SslMode::Required);
705 assert!(ssl.is_some());
706 let ssl = ssl.unwrap();
707 assert!(!ssl.accept_invalid_certs());
708 }
709
710 #[test]
711 fn test_ssl_opts_verify_ca() {
712 let ssl = build_ssl_opts(&SslMode::VerifyCa);
713 assert!(ssl.is_some());
714 }
715
716 #[test]
717 fn test_ssl_opts_verify_identity() {
718 let ssl = build_ssl_opts(&SslMode::VerifyIdentity);
719 assert!(ssl.is_some());
720 }
721
722 #[test]
723 fn test_binlog_request_gtid_mode() {
724 let config = MySqlCdcConfig {
725 server_id: 999,
726 use_gtid: true,
727 ..Default::default()
728 };
729
730 let _request = build_binlog_request(&config, None, None);
732
733 let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap();
735 let _request = build_binlog_request(&config, Some(>id_set), None);
736 }
737
738 #[test]
739 fn test_binlog_request_file_position() {
740 let config = MySqlCdcConfig {
741 server_id: 999,
742 use_gtid: false,
743 ..Default::default()
744 };
745
746 let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
747 let _request = build_binlog_request(&config, None, Some(&pos));
748 }
749
750 #[test]
751 fn test_binlog_request_config_filename() {
752 let config = MySqlCdcConfig {
753 server_id: 999,
754 use_gtid: false,
755 binlog_filename: Some("mysql-bin.000001".to_string()),
756 binlog_position: Some(154),
757 ..Default::default()
758 };
759
760 let _request = build_binlog_request(&config, None, None);
761 }
762
763 #[test]
764 fn test_mysql_value_to_column_value_null() {
765 let val = mysql_value_to_column_value(&mysql_async::Value::NULL);
766 assert_eq!(val, ColumnValue::Null);
767 }
768
769 #[test]
770 fn test_mysql_value_to_column_value_int() {
771 let val = mysql_value_to_column_value(&mysql_async::Value::Int(-42));
772 assert_eq!(val, ColumnValue::SignedInt(-42));
773
774 let val = mysql_value_to_column_value(&mysql_async::Value::UInt(100));
775 assert_eq!(val, ColumnValue::UnsignedInt(100));
776 }
777
778 #[test]
779 fn test_mysql_value_to_column_value_float() {
780 let val = mysql_value_to_column_value(&mysql_async::Value::Float(3.14));
781 assert_eq!(val, ColumnValue::Float(3.14));
782
783 let val = mysql_value_to_column_value(&mysql_async::Value::Double(2.718));
784 assert_eq!(val, ColumnValue::Double(2.718));
785 }
786
787 #[test]
788 fn test_mysql_value_to_column_value_string() {
789 let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(b"hello world".to_vec()));
790 assert_eq!(val, ColumnValue::String("hello world".to_string()));
791 }
792
793 #[test]
794 fn test_mysql_value_to_column_value_bytes() {
795 let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(vec![0xFF, 0xFE, 0xFD]));
797 assert_eq!(val, ColumnValue::Bytes(vec![0xFF, 0xFE, 0xFD]));
798 }
799
800 #[test]
801 fn test_mysql_value_to_column_value_date() {
802 let val = mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 0, 0, 0, 0));
804 assert_eq!(val, ColumnValue::Date(2024, 6, 15));
805
806 let val =
808 mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 14, 30, 45, 0));
809 assert_eq!(val, ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0));
810 }
811
812 #[test]
813 fn test_mysql_value_to_column_value_time() {
814 let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 0, 14, 30, 45, 0));
815 assert_eq!(val, ColumnValue::Time(14, 30, 45, 0));
816
817 let val = mysql_value_to_column_value(&mysql_async::Value::Time(true, 0, 2, 30, 0, 0));
819 assert_eq!(val, ColumnValue::Time(-2, 30, 0, 0));
820
821 let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 1, 6, 0, 0, 0));
823 assert_eq!(val, ColumnValue::Time(30, 0, 0, 0));
824 }
825
826 #[test]
827 fn test_sid_bytes_to_gtid() {
828 let uuid = uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap();
829 let gtid = sid_bytes_to_gtid(uuid.into_bytes(), 42);
830 assert_eq!(gtid.source_id(), uuid);
831 assert_eq!(gtid.transaction_id(), 42);
832 }
833
834 #[test]
835 fn test_gtid_set_to_sids_empty() {
836 let gtid_set = GtidSet::new();
837 let sids = gtid_set_to_sids(>id_set);
838 assert!(sids.is_empty());
839 }
840
841 #[test]
842 fn test_gtid_set_to_sids_single() {
843 let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10".parse().unwrap();
844 let sids = gtid_set_to_sids(>id_set);
845 assert_eq!(sids.len(), 1);
846 }
847
848 #[test]
849 fn test_gtid_set_to_sids_multiple() {
850 let gtid_set: GtidSet =
851 "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10,4E11FA47-71CA-11E1-9E33-C80AA9429563:1-5"
852 .parse()
853 .unwrap();
854 let sids = gtid_set_to_sids(>id_set);
855 assert_eq!(sids.len(), 2);
856 }
857}