1#[cfg(feature = "mysql-cdc")]
30use std::time::Duration;
31
32#[cfg(feature = "mysql-cdc")]
33use tokio_stream::StreamExt as _;
34
35#[cfg(feature = "mysql-cdc")]
36use mysql_async::binlog::events::{Event as MysqlEvent, EventData, RowsEventData};
37#[cfg(feature = "mysql-cdc")]
38use mysql_async::binlog::row::BinlogRow;
39#[cfg(feature = "mysql-cdc")]
40use mysql_async::binlog::value::BinlogValue;
41#[cfg(feature = "mysql-cdc")]
42use mysql_async::{BinlogStream, BinlogStreamRequest, Conn, GnoInterval, OptsBuilder, Sid};
43
44#[cfg(feature = "mysql-cdc")]
45use tracing::{debug, info};
46
47#[cfg(feature = "mysql-cdc")]
48use crate::error::ConnectorError;
49
50#[cfg(feature = "mysql-cdc")]
51use super::config::{MySqlCdcConfig, SslMode};
52#[cfg(feature = "mysql-cdc")]
53use super::decoder::{
54 BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DeleteMessage,
55 InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage, UpdateMessage,
56 UpdateRowData,
57};
58#[cfg(feature = "mysql-cdc")]
59use super::gtid::{Gtid, GtidSet};
60#[cfg(feature = "mysql-cdc")]
61use super::types::MySqlColumn;
62
63#[cfg(feature = "mysql-cdc")]
71pub async fn connect(config: &MySqlCdcConfig) -> Result<Conn, ConnectorError> {
72 info!(
73 host = %config.host,
74 port = config.port,
75 user = %config.username,
76 ssl_mode = %config.ssl_mode,
77 "connecting to MySQL server"
78 );
79
80 let mut opts = OptsBuilder::default()
81 .ip_or_hostname(&config.host)
82 .tcp_port(config.port)
83 .user(Some(&config.username))
84 .prefer_socket(Some(false));
85
86 if let Some(ref password) = config.password {
87 opts = opts.pass(Some(password));
88 }
89
90 if let Some(ref database) = config.database {
91 opts = opts.db_name(Some(database));
92 }
93
94 opts = match config.ssl_mode {
96 SslMode::Disabled => opts,
97 SslMode::Preferred => opts.ssl_opts(Some(
98 mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true),
99 )),
100 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
101 opts.ssl_opts(Some(mysql_async::SslOpts::default()))
102 }
103 };
104
105 let conn = Conn::new(opts).await.map_err(|e| {
106 ConnectorError::ConnectionFailed(format!(
107 "failed to connect to MySQL {}:{}: {}",
108 config.host, config.port, e
109 ))
110 })?;
111
112 info!(
113 host = %config.host,
114 port = config.port,
115 "connected to MySQL server"
116 );
117
118 Ok(conn)
119}
120
121#[cfg(feature = "mysql-cdc")]
136pub async fn start_binlog_stream(
137 conn: Conn,
138 config: &MySqlCdcConfig,
139 gtid_set: Option<&GtidSet>,
140 position: Option<&BinlogPosition>,
141) -> Result<BinlogStream, ConnectorError> {
142 let mut request = BinlogStreamRequest::new(config.server_id);
143
144 if config.use_gtid {
145 request = request.with_gtid();
146
147 if let Some(gtid_set) = gtid_set {
149 let sids = gtid_set_to_sids(gtid_set);
150 if sids.is_empty() {
151 info!("starting GTID-based binlog replication from beginning");
152 } else {
153 request = request.with_gtid_set(sids);
154 info!("starting GTID-based binlog replication from {}", gtid_set);
155 }
156 } else {
157 info!("starting GTID-based binlog replication from beginning");
158 }
159 } else {
160 if let Some(pos) = position {
162 request = request
163 .with_filename(pos.filename.as_bytes())
164 .with_pos(pos.position);
165 info!(
166 "starting binlog replication from {}:{}",
167 pos.filename, pos.position
168 );
169 } else if let Some(ref filename) = config.binlog_filename {
170 let pos = config.binlog_position.unwrap_or(4);
171 request = request.with_filename(filename.as_bytes()).with_pos(pos);
172 info!("starting binlog replication from {}:{}", filename, pos);
173 } else {
174 info!("starting binlog replication from current position");
175 }
176 }
177
178 let stream = conn.get_binlog_stream(request).await.map_err(|e| {
179 ConnectorError::ConnectionFailed(format!("failed to start binlog stream: {e}"))
180 })?;
181
182 Ok(stream)
183}
184
185#[cfg(feature = "mysql-cdc")]
193pub async fn read_events(
194 stream: &mut BinlogStream,
195 max_events: usize,
196 timeout: Duration,
197) -> Result<Vec<MysqlEvent>, ConnectorError> {
198 let mut events = Vec::with_capacity(max_events.min(64));
199 let deadline = tokio::time::Instant::now() + timeout;
200
201 loop {
202 if events.len() >= max_events {
203 break;
204 }
205
206 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
207 if remaining.is_zero() {
208 break;
209 }
210
211 match tokio::time::timeout(remaining, stream.next()).await {
212 Ok(Some(Ok(event))) => {
213 events.push(event);
214 }
215 Ok(Some(Err(e))) => {
216 return Err(ConnectorError::ReadError(format!(
218 "binlog stream error: {e}"
219 )));
220 }
221 Ok(None) => {
222 debug!("binlog stream ended");
224 break;
225 }
226 Err(_) => {
227 break;
229 }
230 }
231 }
232
233 Ok(events)
234}
235
236#[cfg(feature = "mysql-cdc")]
249pub fn decode_binlog_event(
250 event: &MysqlEvent,
251 stream: &BinlogStream,
252) -> Result<Option<BinlogMessage>, ConnectorError> {
253 let header = event.header();
254 let timestamp_ms = i64::from(header.timestamp()) * 1000;
255 let log_pos = u64::from(header.log_pos());
256
257 let event_data = event
258 .read_data()
259 .map_err(|e| ConnectorError::Internal(format!("failed to parse binlog event: {e}")))?;
260
261 let Some(event_data) = event_data else {
262 return Ok(None);
263 };
264
265 match event_data {
266 EventData::TableMapEvent(tme) => {
267 let columns = extract_columns_from_tme(&tme);
268 Ok(Some(BinlogMessage::TableMap(TableMapMessage {
269 table_id: tme.table_id(),
270 database: tme.database_name().into_owned(),
271 table: tme.table_name().into_owned(),
272 columns,
273 })))
274 }
275
276 EventData::RowsEvent(rows_event) => {
277 let table_id = rows_event.table_id();
278
279 let tme = stream.get_tme(table_id).ok_or_else(|| {
281 ConnectorError::Internal(format!("missing TABLE_MAP for table_id {table_id}"))
282 })?;
283
284 let database = tme.database_name().into_owned();
285 let table = tme.table_name().into_owned();
286
287 match &rows_event {
288 RowsEventData::WriteRowsEvent(_) | RowsEventData::WriteRowsEventV1(_) => {
289 let rows = decode_rows_after(&rows_event, tme)?;
290 Ok(Some(BinlogMessage::Insert(InsertMessage {
291 table_id,
292 database,
293 table,
294 rows,
295 binlog_position: log_pos,
296 timestamp_ms,
297 })))
298 }
299
300 RowsEventData::UpdateRowsEvent(_)
301 | RowsEventData::UpdateRowsEventV1(_)
302 | RowsEventData::PartialUpdateRowsEvent(_) => {
303 let rows = decode_rows_update(&rows_event, tme)?;
304 Ok(Some(BinlogMessage::Update(UpdateMessage {
305 table_id,
306 database,
307 table,
308 rows,
309 binlog_position: log_pos,
310 timestamp_ms,
311 })))
312 }
313
314 RowsEventData::DeleteRowsEvent(_) | RowsEventData::DeleteRowsEventV1(_) => {
315 let rows = decode_rows_before(&rows_event, tme)?;
316 Ok(Some(BinlogMessage::Delete(DeleteMessage {
317 table_id,
318 database,
319 table,
320 rows,
321 binlog_position: log_pos,
322 timestamp_ms,
323 })))
324 }
325 }
326 }
327
328 EventData::QueryEvent(qe) => Ok(Some(BinlogMessage::Query(QueryMessage {
329 database: qe.schema().into_owned(),
330 query: qe.query().into_owned(),
331 binlog_position: log_pos,
332 timestamp_ms,
333 }))),
334
335 EventData::GtidEvent(ge) => {
336 let gtid = sid_bytes_to_gtid(ge.sid(), ge.gno());
337 Ok(Some(BinlogMessage::Begin(BeginMessage {
338 gtid: Some(gtid),
339 binlog_filename: String::new(), binlog_position: log_pos,
341 timestamp_ms,
342 })))
343 }
344
345 EventData::AnonymousGtidEvent(_) => Ok(Some(BinlogMessage::Begin(BeginMessage {
346 gtid: None,
347 binlog_filename: String::new(),
348 binlog_position: log_pos,
349 timestamp_ms,
350 }))),
351
352 EventData::RotateEvent(re) => Ok(Some(BinlogMessage::Rotate(RotateMessage {
353 next_binlog: re.name().into_owned(),
354 position: re.position(),
355 }))),
356
357 EventData::XidEvent(xe) => Ok(Some(BinlogMessage::Commit(CommitMessage {
358 xid: xe.xid,
359 binlog_position: log_pos,
360 timestamp_ms,
361 }))),
362
363 EventData::HeartbeatEvent => Ok(Some(BinlogMessage::Heartbeat)),
364
365 _ => Ok(None),
367 }
368}
369
370#[cfg(feature = "mysql-cdc")]
372fn binlog_value_to_column_value(val: &BinlogValue<'_>) -> ColumnValue {
373 match val {
374 BinlogValue::Value(v) => mysql_value_to_column_value(v),
375 BinlogValue::Jsonb(jsonb_val) => ColumnValue::Json(format!("{jsonb_val:?}")),
376 BinlogValue::JsonDiff(_diffs) => {
377 ColumnValue::Json("{}".to_string())
379 }
380 }
381}
382
383#[cfg(feature = "mysql-cdc")]
385fn mysql_value_to_column_value(val: &mysql_async::Value) -> ColumnValue {
386 match val {
387 mysql_async::Value::NULL => ColumnValue::Null,
388 mysql_async::Value::Int(v) => ColumnValue::SignedInt(*v),
389 mysql_async::Value::UInt(v) => ColumnValue::UnsignedInt(*v),
390 mysql_async::Value::Float(v) => ColumnValue::Float(*v),
391 mysql_async::Value::Double(v) => ColumnValue::Double(*v),
392 mysql_async::Value::Bytes(b) => {
393 match String::from_utf8(b.clone()) {
395 Ok(s) => ColumnValue::String(s),
396 Err(_) => ColumnValue::Bytes(b.clone()),
397 }
398 }
399 mysql_async::Value::Date(year, month, day, hour, min, sec, micro) => {
400 if *hour == 0 && *min == 0 && *sec == 0 && *micro == 0 {
401 ColumnValue::Date(i32::from(*year), u32::from(*month), u32::from(*day))
402 } else {
403 ColumnValue::DateTime(
404 i32::from(*year),
405 u32::from(*month),
406 u32::from(*day),
407 u32::from(*hour),
408 u32::from(*min),
409 u32::from(*sec),
410 *micro,
411 )
412 }
413 }
414 mysql_async::Value::Time(is_negative, days, hours, minutes, seconds, micros) => {
415 #[allow(clippy::cast_possible_wrap)]
418 let total_hours = (*days as i32) * 24 + i32::from(*hours);
419 let h = if *is_negative {
420 -total_hours
421 } else {
422 total_hours
423 };
424 ColumnValue::Time(h, u32::from(*minutes), u32::from(*seconds), *micros)
425 }
426 }
427}
428
429#[cfg(feature = "mysql-cdc")]
431fn extract_columns_from_tme(
432 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
433) -> Vec<MySqlColumn> {
434 #[allow(clippy::cast_possible_truncation)]
435 let count = tme.columns_count() as usize;
436 let null_bitmap = tme.null_bitmask();
437
438 let col_names = extract_column_names(tme);
440
441 (0..count)
442 .map(|i| {
443 let type_id = tme
444 .get_raw_column_type(i)
445 .ok()
446 .flatten()
447 .map_or(0xFF, |ct| ct as u8);
448
449 let nullable = null_bitmap.get(i).as_deref().copied().unwrap_or(true);
450
451 let name = col_names
452 .get(i)
453 .cloned()
454 .unwrap_or_else(|| format!("col_{i}"));
455
456 MySqlColumn::new(name, type_id, 0, nullable, false)
457 })
458 .collect()
459}
460
461#[cfg(feature = "mysql-cdc")]
463fn extract_column_names(tme: &mysql_async::binlog::events::TableMapEvent<'_>) -> Vec<String> {
464 let mut names = Vec::new();
465 for meta in tme.iter_optional_meta().flatten() {
466 use mysql_async::binlog::events::OptionalMetadataField;
467 if let OptionalMetadataField::ColumnName(col_names) = meta {
468 for name in col_names.iter_names().flatten() {
469 names.push(name.name().into_owned());
470 }
471 }
472 }
473 names
474}
475
476#[cfg(feature = "mysql-cdc")]
478fn decode_rows_after(
479 rows_event: &RowsEventData<'_>,
480 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
481) -> Result<Vec<RowData>, ConnectorError> {
482 let mut rows = Vec::new();
483 for row_result in rows_event.rows(tme) {
484 let (_before, after) = row_result
485 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
486 if let Some(row) = after {
487 rows.push(binlog_row_to_row_data(&row));
488 }
489 }
490 Ok(rows)
491}
492
493#[cfg(feature = "mysql-cdc")]
495fn decode_rows_before(
496 rows_event: &RowsEventData<'_>,
497 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
498) -> Result<Vec<RowData>, ConnectorError> {
499 let mut rows = Vec::new();
500 for row_result in rows_event.rows(tme) {
501 let (before, _after) = row_result
502 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
503 if let Some(row) = before {
504 rows.push(binlog_row_to_row_data(&row));
505 }
506 }
507 Ok(rows)
508}
509
510#[cfg(feature = "mysql-cdc")]
512fn decode_rows_update(
513 rows_event: &RowsEventData<'_>,
514 tme: &mysql_async::binlog::events::TableMapEvent<'_>,
515) -> Result<Vec<UpdateRowData>, ConnectorError> {
516 let mut rows = Vec::new();
517 for row_result in rows_event.rows(tme) {
518 let (before, after) = row_result
519 .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
520
521 let before_data = before.map_or_else(
522 || RowData {
523 columns: Vec::new(),
524 },
525 |r| binlog_row_to_row_data(&r),
526 );
527
528 let after_data = after.map_or_else(
529 || RowData {
530 columns: Vec::new(),
531 },
532 |r| binlog_row_to_row_data(&r),
533 );
534
535 rows.push(UpdateRowData {
536 before: before_data,
537 after: after_data,
538 });
539 }
540 Ok(rows)
541}
542
543#[cfg(feature = "mysql-cdc")]
545fn binlog_row_to_row_data(row: &BinlogRow) -> RowData {
546 let columns = (0..row.len())
547 .map(|i| {
548 row.as_ref(i)
549 .map_or(ColumnValue::Null, binlog_value_to_column_value)
550 })
551 .collect();
552 RowData { columns }
553}
554
555#[cfg(feature = "mysql-cdc")]
557fn sid_bytes_to_gtid(sid: [u8; 16], gno: u64) -> Gtid {
558 let uuid = uuid::Uuid::from_bytes(sid);
559 Gtid::new(uuid, gno)
560}
561
562#[cfg(feature = "mysql-cdc")]
564fn gtid_set_to_sids(gtid_set: &GtidSet) -> Vec<Sid<'static>> {
565 let mut sids = Vec::new();
566
567 for (source_id, ranges) in gtid_set.iter_sets() {
568 let uuid_bytes = source_id.into_bytes();
569 let mut sid = Sid::new(uuid_bytes);
570
571 for range in ranges {
572 let interval = GnoInterval::new(range.start, range.end + 1);
574 sid = sid.with_interval(interval);
575 }
576
577 sids.push(sid);
578 }
579
580 sids
581}
582
583#[cfg(feature = "mysql-cdc")]
587#[must_use]
588pub fn build_ssl_opts(ssl_mode: &SslMode) -> Option<mysql_async::SslOpts> {
589 match ssl_mode {
590 SslMode::Disabled => None,
591 SslMode::Preferred => {
592 Some(mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true))
593 }
594 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
595 Some(mysql_async::SslOpts::default())
596 }
597 }
598}
599
600#[cfg(feature = "mysql-cdc")]
602#[must_use]
603pub fn build_opts(config: &MySqlCdcConfig) -> OptsBuilder {
604 let mut opts = OptsBuilder::default()
605 .ip_or_hostname(&config.host)
606 .tcp_port(config.port)
607 .user(Some(&config.username))
608 .prefer_socket(Some(false));
609
610 if let Some(ref password) = config.password {
611 opts = opts.pass(Some(password));
612 }
613
614 if let Some(ref database) = config.database {
615 opts = opts.db_name(Some(database));
616 }
617
618 if let Some(ssl_opts) = build_ssl_opts(&config.ssl_mode) {
619 opts = opts.ssl_opts(Some(ssl_opts));
620 }
621
622 opts
623}
624
625#[cfg(feature = "mysql-cdc")]
627#[must_use]
628pub fn build_binlog_request<'a>(
629 config: &'a MySqlCdcConfig,
630 gtid_set: Option<&GtidSet>,
631 position: Option<&'a BinlogPosition>,
632) -> BinlogStreamRequest<'a> {
633 let mut request = BinlogStreamRequest::new(config.server_id);
634
635 if config.use_gtid {
636 request = request.with_gtid();
637
638 if let Some(gtid_set) = gtid_set {
639 let sids = gtid_set_to_sids(gtid_set);
640 if !sids.is_empty() {
641 request = request.with_gtid_set(sids);
642 }
643 }
644 } else if let Some(pos) = position {
645 request = request
646 .with_filename(pos.filename.as_bytes())
647 .with_pos(pos.position);
648 } else if let Some(ref filename) = config.binlog_filename {
649 let pos = config.binlog_position.unwrap_or(4);
650 request = request.with_filename(filename.as_bytes()).with_pos(pos);
651 }
652
653 request
654}
655
656#[cfg(all(test, feature = "mysql-cdc"))]
661mod tests {
662 use super::*;
663 use mysql_async::Opts;
664
665 fn test_config() -> MySqlCdcConfig {
666 MySqlCdcConfig {
667 host: "localhost".to_string(),
668 port: 3306,
669 database: Some("testdb".to_string()),
670 username: "root".to_string(),
671 password: Some("secret".to_string()),
672 server_id: 12345,
673 ..Default::default()
674 }
675 }
676
677 #[test]
678 fn test_connect_builds_opts() {
679 let config = test_config();
680 let opts_builder = build_opts(&config);
681
682 let opts: Opts = opts_builder.into();
684
685 assert_eq!(opts.ip_or_hostname(), "localhost");
686 assert_eq!(opts.tcp_port(), 3306);
687 assert_eq!(opts.user(), Some("root"));
688 assert_eq!(opts.pass(), Some("secret"));
689 assert_eq!(opts.db_name(), Some("testdb"));
690 }
691
692 #[test]
693 fn test_connect_builds_opts_no_password() {
694 let config = MySqlCdcConfig {
695 host: "dbhost".to_string(),
696 port: 3307,
697 database: None,
698 username: "repl".to_string(),
699 password: None,
700 server_id: 999,
701 ..Default::default()
702 };
703 let opts_builder = build_opts(&config);
704 let opts: Opts = opts_builder.into();
705
706 assert_eq!(opts.ip_or_hostname(), "dbhost");
707 assert_eq!(opts.tcp_port(), 3307);
708 assert_eq!(opts.user(), Some("repl"));
709 assert_eq!(opts.pass(), None);
710 assert_eq!(opts.db_name(), None);
711 }
712
713 #[test]
714 fn test_ssl_opts_disabled() {
715 let ssl = build_ssl_opts(&SslMode::Disabled);
716 assert!(ssl.is_none());
717 }
718
719 #[test]
720 fn test_ssl_opts_preferred() {
721 let ssl = build_ssl_opts(&SslMode::Preferred);
722 assert!(ssl.is_some());
723 let ssl = ssl.unwrap();
724 assert!(ssl.accept_invalid_certs());
725 }
726
727 #[test]
728 fn test_ssl_opts_required() {
729 let ssl = build_ssl_opts(&SslMode::Required);
730 assert!(ssl.is_some());
731 let ssl = ssl.unwrap();
732 assert!(!ssl.accept_invalid_certs());
733 }
734
735 #[test]
736 fn test_ssl_opts_verify_ca() {
737 let ssl = build_ssl_opts(&SslMode::VerifyCa);
738 assert!(ssl.is_some());
739 }
740
741 #[test]
742 fn test_ssl_opts_verify_identity() {
743 let ssl = build_ssl_opts(&SslMode::VerifyIdentity);
744 assert!(ssl.is_some());
745 }
746
747 #[test]
748 fn test_binlog_request_gtid_mode() {
749 let config = MySqlCdcConfig {
750 server_id: 999,
751 use_gtid: true,
752 ..Default::default()
753 };
754
755 let _request = build_binlog_request(&config, None, None);
757
758 let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap();
760 let _request = build_binlog_request(&config, Some(>id_set), None);
761 }
762
763 #[test]
764 fn test_binlog_request_file_position() {
765 let config = MySqlCdcConfig {
766 server_id: 999,
767 use_gtid: false,
768 ..Default::default()
769 };
770
771 let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
772 let _request = build_binlog_request(&config, None, Some(&pos));
773 }
774
775 #[test]
776 fn test_binlog_request_config_filename() {
777 let config = MySqlCdcConfig {
778 server_id: 999,
779 use_gtid: false,
780 binlog_filename: Some("mysql-bin.000001".to_string()),
781 binlog_position: Some(154),
782 ..Default::default()
783 };
784
785 let _request = build_binlog_request(&config, None, None);
786 }
787
788 #[test]
789 fn test_mysql_value_to_column_value_null() {
790 let val = mysql_value_to_column_value(&mysql_async::Value::NULL);
791 assert_eq!(val, ColumnValue::Null);
792 }
793
794 #[test]
795 fn test_mysql_value_to_column_value_int() {
796 let val = mysql_value_to_column_value(&mysql_async::Value::Int(-42));
797 assert_eq!(val, ColumnValue::SignedInt(-42));
798
799 let val = mysql_value_to_column_value(&mysql_async::Value::UInt(100));
800 assert_eq!(val, ColumnValue::UnsignedInt(100));
801 }
802
803 #[test]
804 fn test_mysql_value_to_column_value_float() {
805 let val = mysql_value_to_column_value(&mysql_async::Value::Float(3.14));
806 assert_eq!(val, ColumnValue::Float(3.14));
807
808 let val = mysql_value_to_column_value(&mysql_async::Value::Double(2.718));
809 assert_eq!(val, ColumnValue::Double(2.718));
810 }
811
812 #[test]
813 fn test_mysql_value_to_column_value_string() {
814 let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(b"hello world".to_vec()));
815 assert_eq!(val, ColumnValue::String("hello world".to_string()));
816 }
817
818 #[test]
819 fn test_mysql_value_to_column_value_bytes() {
820 let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(vec![0xFF, 0xFE, 0xFD]));
822 assert_eq!(val, ColumnValue::Bytes(vec![0xFF, 0xFE, 0xFD]));
823 }
824
825 #[test]
826 fn test_mysql_value_to_column_value_date() {
827 let val = mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 0, 0, 0, 0));
829 assert_eq!(val, ColumnValue::Date(2024, 6, 15));
830
831 let val =
833 mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 14, 30, 45, 0));
834 assert_eq!(val, ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0));
835 }
836
837 #[test]
838 fn test_mysql_value_to_column_value_time() {
839 let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 0, 14, 30, 45, 0));
840 assert_eq!(val, ColumnValue::Time(14, 30, 45, 0));
841
842 let val = mysql_value_to_column_value(&mysql_async::Value::Time(true, 0, 2, 30, 0, 0));
844 assert_eq!(val, ColumnValue::Time(-2, 30, 0, 0));
845
846 let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 1, 6, 0, 0, 0));
848 assert_eq!(val, ColumnValue::Time(30, 0, 0, 0));
849 }
850
851 #[test]
852 fn test_sid_bytes_to_gtid() {
853 let uuid = uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap();
854 let gtid = sid_bytes_to_gtid(uuid.into_bytes(), 42);
855 assert_eq!(gtid.source_id(), uuid);
856 assert_eq!(gtid.transaction_id(), 42);
857 }
858
859 #[test]
860 fn test_gtid_set_to_sids_empty() {
861 let gtid_set = GtidSet::new();
862 let sids = gtid_set_to_sids(>id_set);
863 assert!(sids.is_empty());
864 }
865
866 #[test]
867 fn test_gtid_set_to_sids_single() {
868 let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10".parse().unwrap();
869 let sids = gtid_set_to_sids(>id_set);
870 assert_eq!(sids.len(), 1);
871 }
872
873 #[test]
874 fn test_gtid_set_to_sids_multiple() {
875 let gtid_set: GtidSet =
876 "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10,4E11FA47-71CA-11E1-9E33-C80AA9429563:1-5"
877 .parse()
878 .unwrap();
879 let sids = gtid_set_to_sids(>id_set);
880 assert_eq!(sids.len(), 2);
881 }
882}