Skip to main content

laminar_connectors/cdc/mysql/
mysql_io.rs

1//! MySQL CDC binlog I/O integration module.
2//!
3//! This module provides the actual I/O operations for MySQL binlog replication
4//! via the `mysql_async` crate. All functions are feature-gated behind `mysql-cdc`.
5//!
6//! # Architecture
7//!
8//! The I/O module is separate from the business logic in `source.rs`
9//! to allow:
10//! - Testing business logic without a running MySQL server
11//! - Clean separation of concerns (connection management vs. event decoding)
12//! - Easy mocking for unit tests
13//!
14//! # Usage
15//!
16//! ```ignore
17//! use laminar_connectors::cdc::mysql::mysql_io;
18//!
19//! let conn = mysql_io::connect(&config).await?;
20//! let stream = mysql_io::start_binlog_stream(conn, &config, None, None).await?;
21//! let events = mysql_io::read_events(&mut stream, 100, Duration::from_secs(1)).await?;
22//! for event in events {
23//!     if let Some(msg) = mysql_io::decode_binlog_event(&event, &stream)? {
24//!         // Process BinlogMessage...
25//!     }
26//! }
27//! ```
28
29#[cfg(feature = "mysql-cdc")]
30use std::time::Duration;
31
32#[cfg(feature = "mysql-cdc")]
33use tokio_stream::StreamExt as _;
34
35#[cfg(feature = "mysql-cdc")]
36use mysql_async::binlog::events::{Event as MysqlEvent, EventData, RowsEventData};
37#[cfg(feature = "mysql-cdc")]
38use mysql_async::binlog::row::BinlogRow;
39#[cfg(feature = "mysql-cdc")]
40use mysql_async::binlog::value::BinlogValue;
41#[cfg(feature = "mysql-cdc")]
42use mysql_async::{BinlogStream, BinlogStreamRequest, Conn, GnoInterval, OptsBuilder, Sid};
43
44#[cfg(feature = "mysql-cdc")]
45use tracing::{debug, info};
46
47#[cfg(feature = "mysql-cdc")]
48use crate::error::ConnectorError;
49
50#[cfg(feature = "mysql-cdc")]
51use super::config::{MySqlCdcConfig, SslMode};
52#[cfg(feature = "mysql-cdc")]
53use super::decoder::{
54    BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DeleteMessage,
55    InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage, UpdateMessage,
56    UpdateRowData,
57};
58#[cfg(feature = "mysql-cdc")]
59use super::gtid::{Gtid, GtidSet};
60#[cfg(feature = "mysql-cdc")]
61use super::types::MySqlColumn;
62
63/// Connects to a MySQL server using the provided configuration.
64///
65/// Builds connection options from the config and establishes a TCP connection.
66///
67/// # Errors
68///
69/// Returns `ConnectorError::ConnectionFailed` if the connection cannot be established.
70#[cfg(feature = "mysql-cdc")]
71pub async fn connect(config: &MySqlCdcConfig) -> Result<Conn, ConnectorError> {
72    info!(
73        host = %config.host,
74        port = config.port,
75        user = %config.username,
76        ssl_mode = %config.ssl_mode,
77        "connecting to MySQL server"
78    );
79
80    let mut opts = OptsBuilder::default()
81        .ip_or_hostname(&config.host)
82        .tcp_port(config.port)
83        .user(Some(&config.username))
84        .prefer_socket(Some(false));
85
86    if let Some(ref password) = config.password {
87        opts = opts.pass(Some(password));
88    }
89
90    if let Some(ref database) = config.database {
91        opts = opts.db_name(Some(database));
92    }
93
94    // Configure SSL based on ssl_mode.
95    opts = match config.ssl_mode {
96        SslMode::Disabled => opts,
97        SslMode::Preferred => opts.ssl_opts(Some(
98            mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true),
99        )),
100        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
101            opts.ssl_opts(Some(mysql_async::SslOpts::default()))
102        }
103    };
104
105    let conn = Conn::new(opts).await.map_err(|e| {
106        ConnectorError::ConnectionFailed(format!(
107            "failed to connect to MySQL {}:{}: {}",
108            config.host, config.port, e
109        ))
110    })?;
111
112    info!(
113        host = %config.host,
114        port = config.port,
115        "connected to MySQL server"
116    );
117
118    Ok(conn)
119}
120
121/// Starts a binlog replication stream from the MySQL server.
122///
123/// The connection is consumed by the binlog stream (MySQL protocol requirement).
124///
125/// # Arguments
126///
127/// * `conn` - MySQL connection (consumed)
128/// * `config` - CDC configuration
129/// * `gtid_set` - Optional GTID set for GTID-based replication
130/// * `position` - Optional binlog position for file/position-based replication
131///
132/// # Errors
133///
134/// Returns `ConnectorError::ConnectionFailed` if the binlog stream cannot be started.
135#[cfg(feature = "mysql-cdc")]
136pub async fn start_binlog_stream(
137    conn: Conn,
138    config: &MySqlCdcConfig,
139    gtid_set: Option<&GtidSet>,
140    position: Option<&BinlogPosition>,
141) -> Result<BinlogStream, ConnectorError> {
142    let mut request = BinlogStreamRequest::new(config.server_id);
143
144    if config.use_gtid {
145        request = request.with_gtid();
146
147        // Add GTID set if available (resume from position).
148        if let Some(gtid_set) = gtid_set {
149            let sids = gtid_set_to_sids(gtid_set);
150            if sids.is_empty() {
151                info!("starting GTID-based binlog replication from beginning");
152            } else {
153                request = request.with_gtid_set(sids);
154                info!("starting GTID-based binlog replication from {}", gtid_set);
155            }
156        } else {
157            info!("starting GTID-based binlog replication from beginning");
158        }
159    } else {
160        // File/position-based replication.
161        if let Some(pos) = position {
162            request = request
163                .with_filename(pos.filename.as_bytes())
164                .with_pos(pos.position);
165            info!(
166                "starting binlog replication from {}:{}",
167                pos.filename, pos.position
168            );
169        } else if let Some(ref filename) = config.binlog_filename {
170            let pos = config.binlog_position.unwrap_or(4);
171            request = request.with_filename(filename.as_bytes()).with_pos(pos);
172            info!("starting binlog replication from {}:{}", filename, pos);
173        } else {
174            info!("starting binlog replication from current position");
175        }
176    }
177
178    let stream = conn.get_binlog_stream(request).await.map_err(|e| {
179        ConnectorError::ConnectionFailed(format!("failed to start binlog stream: {e}"))
180    })?;
181
182    Ok(stream)
183}
184
185/// Reads binlog events from the stream with a timeout.
186///
187/// Returns up to `max_events` events, or fewer if the timeout expires.
188///
189/// # Errors
190///
191/// Returns `ConnectorError::ReadError` if an I/O error occurs.
192#[cfg(feature = "mysql-cdc")]
193pub async fn read_events(
194    stream: &mut BinlogStream,
195    max_events: usize,
196    timeout: Duration,
197) -> Result<Vec<MysqlEvent>, ConnectorError> {
198    let mut events = Vec::with_capacity(max_events.min(64));
199    let deadline = tokio::time::Instant::now() + timeout;
200
201    loop {
202        if events.len() >= max_events {
203            break;
204        }
205
206        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
207        if remaining.is_zero() {
208            break;
209        }
210
211        match tokio::time::timeout(remaining, stream.next()).await {
212            Ok(Some(Ok(event))) => {
213                events.push(event);
214            }
215            Ok(Some(Err(e))) => {
216                // I/O error reading from binlog stream.
217                return Err(ConnectorError::ReadError(format!(
218                    "binlog stream error: {e}"
219                )));
220            }
221            Ok(None) => {
222                // Stream ended (server disconnected or NON_BLOCK mode).
223                debug!("binlog stream ended");
224                break;
225            }
226            Err(_) => {
227                // Timeout - return whatever we have.
228                break;
229            }
230        }
231    }
232
233    Ok(events)
234}
235
236/// Decodes a mysql_async binlog event into our internal `BinlogMessage` type.
237///
238/// Returns `None` for unsupported or irrelevant event types (e.g., FORMAT_DESCRIPTION).
239///
240/// # Arguments
241///
242/// * `event` - The raw binlog event from mysql_async
243/// * `stream` - The binlog stream (provides TABLE_MAP cache via `get_tme()`)
244///
245/// # Errors
246///
247/// Returns `ConnectorError::Internal` if event data cannot be parsed.
248#[cfg(feature = "mysql-cdc")]
249pub fn decode_binlog_event(
250    event: &MysqlEvent,
251    stream: &BinlogStream,
252) -> Result<Option<BinlogMessage>, ConnectorError> {
253    let header = event.header();
254    let timestamp_ms = i64::from(header.timestamp()) * 1000;
255    let log_pos = u64::from(header.log_pos());
256
257    let event_data = event
258        .read_data()
259        .map_err(|e| ConnectorError::Internal(format!("failed to parse binlog event: {e}")))?;
260
261    let Some(event_data) = event_data else {
262        return Ok(None);
263    };
264
265    match event_data {
266        EventData::TableMapEvent(tme) => {
267            let columns = extract_columns_from_tme(&tme);
268            Ok(Some(BinlogMessage::TableMap(TableMapMessage {
269                table_id: tme.table_id(),
270                database: tme.database_name().into_owned(),
271                table: tme.table_name().into_owned(),
272                columns,
273            })))
274        }
275
276        EventData::RowsEvent(rows_event) => {
277            let table_id = rows_event.table_id();
278
279            // Get the TABLE_MAP event from the stream's internal cache.
280            let tme = stream.get_tme(table_id).ok_or_else(|| {
281                ConnectorError::Internal(format!("missing TABLE_MAP for table_id {table_id}"))
282            })?;
283
284            let database = tme.database_name().into_owned();
285            let table = tme.table_name().into_owned();
286
287            match &rows_event {
288                RowsEventData::WriteRowsEvent(_) | RowsEventData::WriteRowsEventV1(_) => {
289                    let rows = decode_rows_after(&rows_event, tme)?;
290                    Ok(Some(BinlogMessage::Insert(InsertMessage {
291                        table_id,
292                        database,
293                        table,
294                        rows,
295                        binlog_position: log_pos,
296                        timestamp_ms,
297                    })))
298                }
299
300                RowsEventData::UpdateRowsEvent(_)
301                | RowsEventData::UpdateRowsEventV1(_)
302                | RowsEventData::PartialUpdateRowsEvent(_) => {
303                    let rows = decode_rows_update(&rows_event, tme)?;
304                    Ok(Some(BinlogMessage::Update(UpdateMessage {
305                        table_id,
306                        database,
307                        table,
308                        rows,
309                        binlog_position: log_pos,
310                        timestamp_ms,
311                    })))
312                }
313
314                RowsEventData::DeleteRowsEvent(_) | RowsEventData::DeleteRowsEventV1(_) => {
315                    let rows = decode_rows_before(&rows_event, tme)?;
316                    Ok(Some(BinlogMessage::Delete(DeleteMessage {
317                        table_id,
318                        database,
319                        table,
320                        rows,
321                        binlog_position: log_pos,
322                        timestamp_ms,
323                    })))
324                }
325            }
326        }
327
328        EventData::QueryEvent(qe) => Ok(Some(BinlogMessage::Query(QueryMessage {
329            database: qe.schema().into_owned(),
330            query: qe.query().into_owned(),
331            binlog_position: log_pos,
332            timestamp_ms,
333        }))),
334
335        EventData::GtidEvent(ge) => {
336            let gtid = sid_bytes_to_gtid(ge.sid(), ge.gno());
337            Ok(Some(BinlogMessage::Begin(BeginMessage {
338                gtid: Some(gtid),
339                binlog_filename: String::new(), // Filled by caller from stream state.
340                binlog_position: log_pos,
341                timestamp_ms,
342            })))
343        }
344
345        EventData::AnonymousGtidEvent(_) => Ok(Some(BinlogMessage::Begin(BeginMessage {
346            gtid: None,
347            binlog_filename: String::new(),
348            binlog_position: log_pos,
349            timestamp_ms,
350        }))),
351
352        EventData::RotateEvent(re) => Ok(Some(BinlogMessage::Rotate(RotateMessage {
353            next_binlog: re.name().into_owned(),
354            position: re.position(),
355        }))),
356
357        EventData::XidEvent(xe) => Ok(Some(BinlogMessage::Commit(CommitMessage {
358            xid: xe.xid,
359            binlog_position: log_pos,
360            timestamp_ms,
361        }))),
362
363        EventData::HeartbeatEvent => Ok(Some(BinlogMessage::Heartbeat)),
364
365        // All other event types are ignored (FORMAT_DESCRIPTION, PREVIOUS_GTIDS, etc.).
366        _ => Ok(None),
367    }
368}
369
370/// Converts a `mysql_async` `BinlogValue` to our `ColumnValue` enum.
371#[cfg(feature = "mysql-cdc")]
372fn binlog_value_to_column_value(val: &BinlogValue<'_>) -> ColumnValue {
373    match val {
374        BinlogValue::Value(v) => mysql_value_to_column_value(v),
375        BinlogValue::Jsonb(jsonb_val) => ColumnValue::Json(format!("{jsonb_val:?}")),
376        BinlogValue::JsonDiff(_diffs) => {
377            // Partial JSON updates (MySQL 8.0.20+) — represent as JSON string.
378            ColumnValue::Json("{}".to_string())
379        }
380    }
381}
382
383/// Converts a `mysql_common::value::Value` to our `ColumnValue` enum.
384#[cfg(feature = "mysql-cdc")]
385fn mysql_value_to_column_value(val: &mysql_async::Value) -> ColumnValue {
386    match val {
387        mysql_async::Value::NULL => ColumnValue::Null,
388        mysql_async::Value::Int(v) => ColumnValue::SignedInt(*v),
389        mysql_async::Value::UInt(v) => ColumnValue::UnsignedInt(*v),
390        mysql_async::Value::Float(v) => ColumnValue::Float(*v),
391        mysql_async::Value::Double(v) => ColumnValue::Double(*v),
392        mysql_async::Value::Bytes(b) => {
393            // Try to interpret as UTF-8 string first.
394            match String::from_utf8(b.clone()) {
395                Ok(s) => ColumnValue::String(s),
396                Err(_) => ColumnValue::Bytes(b.clone()),
397            }
398        }
399        mysql_async::Value::Date(year, month, day, hour, min, sec, micro) => {
400            if *hour == 0 && *min == 0 && *sec == 0 && *micro == 0 {
401                ColumnValue::Date(i32::from(*year), u32::from(*month), u32::from(*day))
402            } else {
403                ColumnValue::DateTime(
404                    i32::from(*year),
405                    u32::from(*month),
406                    u32::from(*day),
407                    u32::from(*hour),
408                    u32::from(*min),
409                    u32::from(*sec),
410                    *micro,
411                )
412            }
413        }
414        mysql_async::Value::Time(is_negative, days, hours, minutes, seconds, micros) => {
415            // Convert days+hours to total hours.
416            // days is u32 and hours is u8, product fits in i32.
417            #[allow(clippy::cast_possible_wrap)]
418            let total_hours = (*days as i32) * 24 + i32::from(*hours);
419            let h = if *is_negative {
420                -total_hours
421            } else {
422                total_hours
423            };
424            ColumnValue::Time(h, u32::from(*minutes), u32::from(*seconds), *micros)
425        }
426    }
427}
428
429/// Extracts column definitions from a `TableMapEvent`.
430#[cfg(feature = "mysql-cdc")]
431fn extract_columns_from_tme(
432    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
433) -> Vec<MySqlColumn> {
434    #[allow(clippy::cast_possible_truncation)]
435    let count = tme.columns_count() as usize;
436    let null_bitmap = tme.null_bitmask();
437
438    // Try to extract column names from optional metadata.
439    let col_names = extract_column_names(tme);
440
441    (0..count)
442        .map(|i| {
443            let type_id = tme
444                .get_raw_column_type(i)
445                .ok()
446                .flatten()
447                .map_or(0xFF, |ct| ct as u8);
448
449            let nullable = null_bitmap.get(i).as_deref().copied().unwrap_or(true);
450
451            let name = col_names
452                .get(i)
453                .cloned()
454                .unwrap_or_else(|| format!("col_{i}"));
455
456            MySqlColumn::new(name, type_id, 0, nullable, false)
457        })
458        .collect()
459}
460
461/// Extracts column names from TABLE_MAP optional metadata.
462#[cfg(feature = "mysql-cdc")]
463fn extract_column_names(tme: &mysql_async::binlog::events::TableMapEvent<'_>) -> Vec<String> {
464    let mut names = Vec::new();
465    for meta in tme.iter_optional_meta().flatten() {
466        use mysql_async::binlog::events::OptionalMetadataField;
467        if let OptionalMetadataField::ColumnName(col_names) = meta {
468            for name in col_names.iter_names().flatten() {
469                names.push(name.name().into_owned());
470            }
471        }
472    }
473    names
474}
475
476/// Decodes rows from a WriteRows event (after images only).
477#[cfg(feature = "mysql-cdc")]
478fn decode_rows_after(
479    rows_event: &RowsEventData<'_>,
480    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
481) -> Result<Vec<RowData>, ConnectorError> {
482    let mut rows = Vec::new();
483    for row_result in rows_event.rows(tme) {
484        let (_before, after) = row_result
485            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
486        if let Some(row) = after {
487            rows.push(binlog_row_to_row_data(&row));
488        }
489    }
490    Ok(rows)
491}
492
493/// Decodes rows from a DeleteRows event (before images only).
494#[cfg(feature = "mysql-cdc")]
495fn decode_rows_before(
496    rows_event: &RowsEventData<'_>,
497    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
498) -> Result<Vec<RowData>, ConnectorError> {
499    let mut rows = Vec::new();
500    for row_result in rows_event.rows(tme) {
501        let (before, _after) = row_result
502            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
503        if let Some(row) = before {
504            rows.push(binlog_row_to_row_data(&row));
505        }
506    }
507    Ok(rows)
508}
509
510/// Decodes rows from an UpdateRows event (before + after image pairs).
511#[cfg(feature = "mysql-cdc")]
512fn decode_rows_update(
513    rows_event: &RowsEventData<'_>,
514    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
515) -> Result<Vec<UpdateRowData>, ConnectorError> {
516    let mut rows = Vec::new();
517    for row_result in rows_event.rows(tme) {
518        let (before, after) = row_result
519            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
520
521        let before_data = before.map_or_else(
522            || RowData {
523                columns: Vec::new(),
524            },
525            |r| binlog_row_to_row_data(&r),
526        );
527
528        let after_data = after.map_or_else(
529            || RowData {
530                columns: Vec::new(),
531            },
532            |r| binlog_row_to_row_data(&r),
533        );
534
535        rows.push(UpdateRowData {
536            before: before_data,
537            after: after_data,
538        });
539    }
540    Ok(rows)
541}
542
543/// Converts a `BinlogRow` to our `RowData` type.
544#[cfg(feature = "mysql-cdc")]
545fn binlog_row_to_row_data(row: &BinlogRow) -> RowData {
546    let columns = (0..row.len())
547        .map(|i| {
548            row.as_ref(i)
549                .map_or(ColumnValue::Null, binlog_value_to_column_value)
550        })
551        .collect();
552    RowData { columns }
553}
554
555/// Converts a 16-byte SID and GNO to our `Gtid` type.
556#[cfg(feature = "mysql-cdc")]
557fn sid_bytes_to_gtid(sid: [u8; 16], gno: u64) -> Gtid {
558    let uuid = uuid::Uuid::from_bytes(sid);
559    Gtid::new(uuid, gno)
560}
561
562/// Converts our `GtidSet` to mysql_async `Sid` items for `BinlogStreamRequest`.
563#[cfg(feature = "mysql-cdc")]
564fn gtid_set_to_sids(gtid_set: &GtidSet) -> Vec<Sid<'static>> {
565    let mut sids = Vec::new();
566
567    for (source_id, ranges) in gtid_set.iter_sets() {
568        let uuid_bytes = source_id.into_bytes();
569        let mut sid = Sid::new(uuid_bytes);
570
571        for range in ranges {
572            // GnoInterval is [start, end) in mysql_async but our GtidRange is [start, end] inclusive.
573            let interval = GnoInterval::new(range.start, range.end + 1);
574            sid = sid.with_interval(interval);
575        }
576
577        sids.push(sid);
578    }
579
580    sids
581}
582
583/// Builds SSL options from our config.
584///
585/// This is a helper for testing; the main `connect()` function handles SSL inline.
586#[cfg(feature = "mysql-cdc")]
587#[must_use]
588pub fn build_ssl_opts(ssl_mode: &SslMode) -> Option<mysql_async::SslOpts> {
589    match ssl_mode {
590        SslMode::Disabled => None,
591        SslMode::Preferred => {
592            Some(mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true))
593        }
594        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
595            Some(mysql_async::SslOpts::default())
596        }
597    }
598}
599
600/// Builds an `OptsBuilder` from our config (for testing).
601#[cfg(feature = "mysql-cdc")]
602#[must_use]
603pub fn build_opts(config: &MySqlCdcConfig) -> OptsBuilder {
604    let mut opts = OptsBuilder::default()
605        .ip_or_hostname(&config.host)
606        .tcp_port(config.port)
607        .user(Some(&config.username))
608        .prefer_socket(Some(false));
609
610    if let Some(ref password) = config.password {
611        opts = opts.pass(Some(password));
612    }
613
614    if let Some(ref database) = config.database {
615        opts = opts.db_name(Some(database));
616    }
617
618    if let Some(ssl_opts) = build_ssl_opts(&config.ssl_mode) {
619        opts = opts.ssl_opts(Some(ssl_opts));
620    }
621
622    opts
623}
624
625/// Builds a `BinlogStreamRequest` from our config (for testing).
626#[cfg(feature = "mysql-cdc")]
627#[must_use]
628pub fn build_binlog_request<'a>(
629    config: &'a MySqlCdcConfig,
630    gtid_set: Option<&GtidSet>,
631    position: Option<&'a BinlogPosition>,
632) -> BinlogStreamRequest<'a> {
633    let mut request = BinlogStreamRequest::new(config.server_id);
634
635    if config.use_gtid {
636        request = request.with_gtid();
637
638        if let Some(gtid_set) = gtid_set {
639            let sids = gtid_set_to_sids(gtid_set);
640            if !sids.is_empty() {
641                request = request.with_gtid_set(sids);
642            }
643        }
644    } else if let Some(pos) = position {
645        request = request
646            .with_filename(pos.filename.as_bytes())
647            .with_pos(pos.position);
648    } else if let Some(ref filename) = config.binlog_filename {
649        let pos = config.binlog_position.unwrap_or(4);
650        request = request.with_filename(filename.as_bytes()).with_pos(pos);
651    }
652
653    request
654}
655
656// ============================================================================
657// Tests (require mysql-cdc feature)
658// ============================================================================
659
660#[cfg(all(test, feature = "mysql-cdc"))]
661mod tests {
662    use super::*;
663    use mysql_async::Opts;
664
665    fn test_config() -> MySqlCdcConfig {
666        MySqlCdcConfig {
667            host: "localhost".to_string(),
668            port: 3306,
669            database: Some("testdb".to_string()),
670            username: "root".to_string(),
671            password: Some("secret".to_string()),
672            server_id: 12345,
673            ..Default::default()
674        }
675    }
676
677    #[test]
678    fn test_connect_builds_opts() {
679        let config = test_config();
680        let opts_builder = build_opts(&config);
681
682        // Convert to Opts to verify values.
683        let opts: Opts = opts_builder.into();
684
685        assert_eq!(opts.ip_or_hostname(), "localhost");
686        assert_eq!(opts.tcp_port(), 3306);
687        assert_eq!(opts.user(), Some("root"));
688        assert_eq!(opts.pass(), Some("secret"));
689        assert_eq!(opts.db_name(), Some("testdb"));
690    }
691
692    #[test]
693    fn test_connect_builds_opts_no_password() {
694        let config = MySqlCdcConfig {
695            host: "dbhost".to_string(),
696            port: 3307,
697            database: None,
698            username: "repl".to_string(),
699            password: None,
700            server_id: 999,
701            ..Default::default()
702        };
703        let opts_builder = build_opts(&config);
704        let opts: Opts = opts_builder.into();
705
706        assert_eq!(opts.ip_or_hostname(), "dbhost");
707        assert_eq!(opts.tcp_port(), 3307);
708        assert_eq!(opts.user(), Some("repl"));
709        assert_eq!(opts.pass(), None);
710        assert_eq!(opts.db_name(), None);
711    }
712
713    #[test]
714    fn test_ssl_opts_disabled() {
715        let ssl = build_ssl_opts(&SslMode::Disabled);
716        assert!(ssl.is_none());
717    }
718
719    #[test]
720    fn test_ssl_opts_preferred() {
721        let ssl = build_ssl_opts(&SslMode::Preferred);
722        assert!(ssl.is_some());
723        let ssl = ssl.unwrap();
724        assert!(ssl.accept_invalid_certs());
725    }
726
727    #[test]
728    fn test_ssl_opts_required() {
729        let ssl = build_ssl_opts(&SslMode::Required);
730        assert!(ssl.is_some());
731        let ssl = ssl.unwrap();
732        assert!(!ssl.accept_invalid_certs());
733    }
734
735    #[test]
736    fn test_ssl_opts_verify_ca() {
737        let ssl = build_ssl_opts(&SslMode::VerifyCa);
738        assert!(ssl.is_some());
739    }
740
741    #[test]
742    fn test_ssl_opts_verify_identity() {
743        let ssl = build_ssl_opts(&SslMode::VerifyIdentity);
744        assert!(ssl.is_some());
745    }
746
747    #[test]
748    fn test_binlog_request_gtid_mode() {
749        let config = MySqlCdcConfig {
750            server_id: 999,
751            use_gtid: true,
752            ..Default::default()
753        };
754
755        // Without GTID set — should still work.
756        let _request = build_binlog_request(&config, None, None);
757
758        // With GTID set.
759        let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap();
760        let _request = build_binlog_request(&config, Some(&gtid_set), None);
761    }
762
763    #[test]
764    fn test_binlog_request_file_position() {
765        let config = MySqlCdcConfig {
766            server_id: 999,
767            use_gtid: false,
768            ..Default::default()
769        };
770
771        let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
772        let _request = build_binlog_request(&config, None, Some(&pos));
773    }
774
775    #[test]
776    fn test_binlog_request_config_filename() {
777        let config = MySqlCdcConfig {
778            server_id: 999,
779            use_gtid: false,
780            binlog_filename: Some("mysql-bin.000001".to_string()),
781            binlog_position: Some(154),
782            ..Default::default()
783        };
784
785        let _request = build_binlog_request(&config, None, None);
786    }
787
788    #[test]
789    fn test_mysql_value_to_column_value_null() {
790        let val = mysql_value_to_column_value(&mysql_async::Value::NULL);
791        assert_eq!(val, ColumnValue::Null);
792    }
793
794    #[test]
795    fn test_mysql_value_to_column_value_int() {
796        let val = mysql_value_to_column_value(&mysql_async::Value::Int(-42));
797        assert_eq!(val, ColumnValue::SignedInt(-42));
798
799        let val = mysql_value_to_column_value(&mysql_async::Value::UInt(100));
800        assert_eq!(val, ColumnValue::UnsignedInt(100));
801    }
802
803    #[test]
804    fn test_mysql_value_to_column_value_float() {
805        let val = mysql_value_to_column_value(&mysql_async::Value::Float(3.14));
806        assert_eq!(val, ColumnValue::Float(3.14));
807
808        let val = mysql_value_to_column_value(&mysql_async::Value::Double(2.718));
809        assert_eq!(val, ColumnValue::Double(2.718));
810    }
811
812    #[test]
813    fn test_mysql_value_to_column_value_string() {
814        let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(b"hello world".to_vec()));
815        assert_eq!(val, ColumnValue::String("hello world".to_string()));
816    }
817
818    #[test]
819    fn test_mysql_value_to_column_value_bytes() {
820        // Non-UTF8 bytes should map to Bytes.
821        let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(vec![0xFF, 0xFE, 0xFD]));
822        assert_eq!(val, ColumnValue::Bytes(vec![0xFF, 0xFE, 0xFD]));
823    }
824
825    #[test]
826    fn test_mysql_value_to_column_value_date() {
827        // Date only (no time component).
828        let val = mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 0, 0, 0, 0));
829        assert_eq!(val, ColumnValue::Date(2024, 6, 15));
830
831        // DateTime (has time component).
832        let val =
833            mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 14, 30, 45, 0));
834        assert_eq!(val, ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0));
835    }
836
837    #[test]
838    fn test_mysql_value_to_column_value_time() {
839        let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 0, 14, 30, 45, 0));
840        assert_eq!(val, ColumnValue::Time(14, 30, 45, 0));
841
842        // Negative time.
843        let val = mysql_value_to_column_value(&mysql_async::Value::Time(true, 0, 2, 30, 0, 0));
844        assert_eq!(val, ColumnValue::Time(-2, 30, 0, 0));
845
846        // Time with days.
847        let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 1, 6, 0, 0, 0));
848        assert_eq!(val, ColumnValue::Time(30, 0, 0, 0));
849    }
850
851    #[test]
852    fn test_sid_bytes_to_gtid() {
853        let uuid = uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap();
854        let gtid = sid_bytes_to_gtid(uuid.into_bytes(), 42);
855        assert_eq!(gtid.source_id(), uuid);
856        assert_eq!(gtid.transaction_id(), 42);
857    }
858
859    #[test]
860    fn test_gtid_set_to_sids_empty() {
861        let gtid_set = GtidSet::new();
862        let sids = gtid_set_to_sids(&gtid_set);
863        assert!(sids.is_empty());
864    }
865
866    #[test]
867    fn test_gtid_set_to_sids_single() {
868        let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10".parse().unwrap();
869        let sids = gtid_set_to_sids(&gtid_set);
870        assert_eq!(sids.len(), 1);
871    }
872
873    #[test]
874    fn test_gtid_set_to_sids_multiple() {
875        let gtid_set: GtidSet =
876            "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10,4E11FA47-71CA-11E1-9E33-C80AA9429563:1-5"
877                .parse()
878                .unwrap();
879        let sids = gtid_set_to_sids(&gtid_set);
880        assert_eq!(sids.len(), 2);
881    }
882}