Skip to main content

laminar_connectors/cdc/mysql/
mysql_io.rs

1//! MySQL CDC binlog I/O.
2
3#[cfg(feature = "mysql-cdc")]
4use std::time::Duration;
5
6#[cfg(feature = "mysql-cdc")]
7use tokio_stream::StreamExt as _;
8
9#[cfg(feature = "mysql-cdc")]
10use mysql_async::binlog::events::{Event as MysqlEvent, EventData, RowsEventData};
11#[cfg(feature = "mysql-cdc")]
12use mysql_async::binlog::row::BinlogRow;
13#[cfg(feature = "mysql-cdc")]
14use mysql_async::binlog::value::BinlogValue;
15#[cfg(feature = "mysql-cdc")]
16use mysql_async::{BinlogStream, BinlogStreamRequest, Conn, GnoInterval, OptsBuilder, Sid};
17
18#[cfg(feature = "mysql-cdc")]
19use tracing::{debug, info};
20
21#[cfg(feature = "mysql-cdc")]
22use crate::error::ConnectorError;
23
24#[cfg(feature = "mysql-cdc")]
25use super::config::{MySqlCdcConfig, SslMode};
26#[cfg(feature = "mysql-cdc")]
27use super::decoder::{
28    BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DeleteMessage,
29    InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage, UpdateMessage,
30    UpdateRowData,
31};
32#[cfg(feature = "mysql-cdc")]
33use super::gtid::{Gtid, GtidSet};
34#[cfg(feature = "mysql-cdc")]
35use super::types::MySqlColumn;
36
37/// Connects to a MySQL server using the provided configuration.
38///
39/// Builds connection options from the config and establishes a TCP connection.
40///
41/// # Errors
42///
43/// Returns `ConnectorError::ConnectionFailed` if the connection cannot be established.
44#[cfg(feature = "mysql-cdc")]
45pub async fn connect(config: &MySqlCdcConfig) -> Result<Conn, ConnectorError> {
46    info!(
47        host = %config.host,
48        port = config.port,
49        user = %config.username,
50        ssl_mode = %config.ssl_mode,
51        "connecting to MySQL server"
52    );
53
54    let mut opts = OptsBuilder::default()
55        .ip_or_hostname(&config.host)
56        .tcp_port(config.port)
57        .user(Some(&config.username))
58        .prefer_socket(Some(false));
59
60    if let Some(ref password) = config.password {
61        opts = opts.pass(Some(password));
62    }
63
64    if let Some(ref database) = config.database {
65        opts = opts.db_name(Some(database));
66    }
67
68    // Configure SSL based on ssl_mode.
69    opts = match config.ssl_mode {
70        SslMode::Disabled => opts,
71        SslMode::Preferred => opts.ssl_opts(Some(
72            mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true),
73        )),
74        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
75            opts.ssl_opts(Some(mysql_async::SslOpts::default()))
76        }
77    };
78
79    let conn = Conn::new(opts).await.map_err(|e| {
80        ConnectorError::ConnectionFailed(format!(
81            "failed to connect to MySQL {}:{}: {}",
82            config.host, config.port, e
83        ))
84    })?;
85
86    info!(
87        host = %config.host,
88        port = config.port,
89        "connected to MySQL server"
90    );
91
92    Ok(conn)
93}
94
95/// Starts a binlog replication stream from the MySQL server.
96///
97/// The connection is consumed by the binlog stream (MySQL protocol requirement).
98///
99/// # Arguments
100///
101/// * `conn` - MySQL connection (consumed)
102/// * `config` - CDC configuration
103/// * `gtid_set` - Optional GTID set for GTID-based replication
104/// * `position` - Optional binlog position for file/position-based replication
105///
106/// # Errors
107///
108/// Returns `ConnectorError::ConnectionFailed` if the binlog stream cannot be started.
109#[cfg(feature = "mysql-cdc")]
110pub async fn start_binlog_stream(
111    conn: Conn,
112    config: &MySqlCdcConfig,
113    gtid_set: Option<&GtidSet>,
114    position: Option<&BinlogPosition>,
115) -> Result<BinlogStream, ConnectorError> {
116    let mut request = BinlogStreamRequest::new(config.server_id);
117
118    if config.use_gtid {
119        request = request.with_gtid();
120
121        // Add GTID set if available (resume from position).
122        if let Some(gtid_set) = gtid_set {
123            let sids = gtid_set_to_sids(gtid_set);
124            if sids.is_empty() {
125                info!("starting GTID-based binlog replication from beginning");
126            } else {
127                request = request.with_gtid_set(sids);
128                info!("starting GTID-based binlog replication from {}", gtid_set);
129            }
130        } else {
131            info!("starting GTID-based binlog replication from beginning");
132        }
133    } else {
134        // File/position-based replication.
135        if let Some(pos) = position {
136            request = request
137                .with_filename(pos.filename.as_bytes())
138                .with_pos(pos.position);
139            info!(
140                "starting binlog replication from {}:{}",
141                pos.filename, pos.position
142            );
143        } else if let Some(ref filename) = config.binlog_filename {
144            let pos = config.binlog_position.unwrap_or(4);
145            request = request.with_filename(filename.as_bytes()).with_pos(pos);
146            info!("starting binlog replication from {}:{}", filename, pos);
147        } else {
148            info!("starting binlog replication from current position");
149        }
150    }
151
152    let stream = conn.get_binlog_stream(request).await.map_err(|e| {
153        ConnectorError::ConnectionFailed(format!("failed to start binlog stream: {e}"))
154    })?;
155
156    Ok(stream)
157}
158
159/// Reads binlog events from the stream with a timeout.
160///
161/// Returns up to `max_events` events, or fewer if the timeout expires.
162///
163/// # Errors
164///
165/// Returns `ConnectorError::ReadError` if an I/O error occurs.
166#[cfg(feature = "mysql-cdc")]
167pub async fn read_events(
168    stream: &mut BinlogStream,
169    max_events: usize,
170    timeout: Duration,
171) -> Result<Vec<MysqlEvent>, ConnectorError> {
172    let mut events = Vec::with_capacity(max_events.min(64));
173    let deadline = tokio::time::Instant::now() + timeout;
174
175    loop {
176        if events.len() >= max_events {
177            break;
178        }
179
180        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
181        if remaining.is_zero() {
182            break;
183        }
184
185        match tokio::time::timeout(remaining, stream.next()).await {
186            Ok(Some(Ok(event))) => {
187                events.push(event);
188            }
189            Ok(Some(Err(e))) => {
190                // I/O error reading from binlog stream.
191                return Err(ConnectorError::ReadError(format!(
192                    "binlog stream error: {e}"
193                )));
194            }
195            Ok(None) => {
196                // Stream ended (server disconnected or NON_BLOCK mode).
197                debug!("binlog stream ended");
198                break;
199            }
200            Err(_) => {
201                // Timeout - return whatever we have.
202                break;
203            }
204        }
205    }
206
207    Ok(events)
208}
209
210/// Decodes a mysql_async binlog event into our internal `BinlogMessage` type.
211///
212/// Returns `None` for unsupported or irrelevant event types (e.g., FORMAT_DESCRIPTION).
213///
214/// # Arguments
215///
216/// * `event` - The raw binlog event from mysql_async
217/// * `stream` - The binlog stream (provides TABLE_MAP cache via `get_tme()`)
218///
219/// # Errors
220///
221/// Returns `ConnectorError::Internal` if event data cannot be parsed.
222#[cfg(feature = "mysql-cdc")]
223pub fn decode_binlog_event(
224    event: &MysqlEvent,
225    stream: &BinlogStream,
226) -> Result<Option<BinlogMessage>, ConnectorError> {
227    let header = event.header();
228    let timestamp_ms = i64::from(header.timestamp()) * 1000;
229    let log_pos = u64::from(header.log_pos());
230
231    let event_data = event
232        .read_data()
233        .map_err(|e| ConnectorError::Internal(format!("failed to parse binlog event: {e}")))?;
234
235    let Some(event_data) = event_data else {
236        return Ok(None);
237    };
238
239    match event_data {
240        EventData::TableMapEvent(tme) => {
241            let columns = extract_columns_from_tme(&tme);
242            Ok(Some(BinlogMessage::TableMap(TableMapMessage {
243                table_id: tme.table_id(),
244                database: tme.database_name().into_owned(),
245                table: tme.table_name().into_owned(),
246                columns,
247            })))
248        }
249
250        EventData::RowsEvent(rows_event) => {
251            let table_id = rows_event.table_id();
252
253            // Get the TABLE_MAP event from the stream's internal cache.
254            let tme = stream.get_tme(table_id).ok_or_else(|| {
255                ConnectorError::Internal(format!("missing TABLE_MAP for table_id {table_id}"))
256            })?;
257
258            let database = tme.database_name().into_owned();
259            let table = tme.table_name().into_owned();
260
261            match &rows_event {
262                RowsEventData::WriteRowsEvent(_) | RowsEventData::WriteRowsEventV1(_) => {
263                    let rows = decode_rows_after(&rows_event, tme)?;
264                    Ok(Some(BinlogMessage::Insert(InsertMessage {
265                        table_id,
266                        database,
267                        table,
268                        rows,
269                        binlog_position: log_pos,
270                        timestamp_ms,
271                    })))
272                }
273
274                RowsEventData::UpdateRowsEvent(_)
275                | RowsEventData::UpdateRowsEventV1(_)
276                | RowsEventData::PartialUpdateRowsEvent(_) => {
277                    let rows = decode_rows_update(&rows_event, tme)?;
278                    Ok(Some(BinlogMessage::Update(UpdateMessage {
279                        table_id,
280                        database,
281                        table,
282                        rows,
283                        binlog_position: log_pos,
284                        timestamp_ms,
285                    })))
286                }
287
288                RowsEventData::DeleteRowsEvent(_) | RowsEventData::DeleteRowsEventV1(_) => {
289                    let rows = decode_rows_before(&rows_event, tme)?;
290                    Ok(Some(BinlogMessage::Delete(DeleteMessage {
291                        table_id,
292                        database,
293                        table,
294                        rows,
295                        binlog_position: log_pos,
296                        timestamp_ms,
297                    })))
298                }
299            }
300        }
301
302        EventData::QueryEvent(qe) => Ok(Some(BinlogMessage::Query(QueryMessage {
303            database: qe.schema().into_owned(),
304            query: qe.query().into_owned(),
305            binlog_position: log_pos,
306            timestamp_ms,
307        }))),
308
309        EventData::GtidEvent(ge) => {
310            let gtid = sid_bytes_to_gtid(ge.sid(), ge.gno());
311            Ok(Some(BinlogMessage::Begin(BeginMessage {
312                gtid: Some(gtid),
313                binlog_filename: String::new(), // Filled by caller from stream state.
314                binlog_position: log_pos,
315                timestamp_ms,
316            })))
317        }
318
319        EventData::AnonymousGtidEvent(_) => Ok(Some(BinlogMessage::Begin(BeginMessage {
320            gtid: None,
321            binlog_filename: String::new(),
322            binlog_position: log_pos,
323            timestamp_ms,
324        }))),
325
326        EventData::RotateEvent(re) => Ok(Some(BinlogMessage::Rotate(RotateMessage {
327            next_binlog: re.name().into_owned(),
328            position: re.position(),
329        }))),
330
331        EventData::XidEvent(xe) => Ok(Some(BinlogMessage::Commit(CommitMessage {
332            xid: xe.xid,
333            binlog_position: log_pos,
334            timestamp_ms,
335        }))),
336
337        EventData::HeartbeatEvent => Ok(Some(BinlogMessage::Heartbeat)),
338
339        // All other event types are ignored (FORMAT_DESCRIPTION, PREVIOUS_GTIDS, etc.).
340        _ => Ok(None),
341    }
342}
343
344/// Converts a `mysql_async` `BinlogValue` to our `ColumnValue` enum.
345#[cfg(feature = "mysql-cdc")]
346fn binlog_value_to_column_value(val: &BinlogValue<'_>) -> ColumnValue {
347    match val {
348        BinlogValue::Value(v) => mysql_value_to_column_value(v),
349        BinlogValue::Jsonb(jsonb_val) => ColumnValue::Json(format!("{jsonb_val:?}")),
350        BinlogValue::JsonDiff(_diffs) => {
351            // Partial JSON updates (MySQL 8.0.20+) — represent as JSON string.
352            ColumnValue::Json("{}".to_string())
353        }
354    }
355}
356
357/// Converts a `mysql_common::value::Value` to our `ColumnValue` enum.
358#[cfg(feature = "mysql-cdc")]
359fn mysql_value_to_column_value(val: &mysql_async::Value) -> ColumnValue {
360    match val {
361        mysql_async::Value::NULL => ColumnValue::Null,
362        mysql_async::Value::Int(v) => ColumnValue::SignedInt(*v),
363        mysql_async::Value::UInt(v) => ColumnValue::UnsignedInt(*v),
364        mysql_async::Value::Float(v) => ColumnValue::Float(*v),
365        mysql_async::Value::Double(v) => ColumnValue::Double(*v),
366        mysql_async::Value::Bytes(b) => {
367            // Try to interpret as UTF-8 string first.
368            match String::from_utf8(b.clone()) {
369                Ok(s) => ColumnValue::String(s),
370                Err(_) => ColumnValue::Bytes(b.clone()),
371            }
372        }
373        mysql_async::Value::Date(year, month, day, hour, min, sec, micro) => {
374            if *hour == 0 && *min == 0 && *sec == 0 && *micro == 0 {
375                ColumnValue::Date(i32::from(*year), u32::from(*month), u32::from(*day))
376            } else {
377                ColumnValue::DateTime(
378                    i32::from(*year),
379                    u32::from(*month),
380                    u32::from(*day),
381                    u32::from(*hour),
382                    u32::from(*min),
383                    u32::from(*sec),
384                    *micro,
385                )
386            }
387        }
388        mysql_async::Value::Time(is_negative, days, hours, minutes, seconds, micros) => {
389            // MySQL TIME is bounded to ±838:59:59. Compute in i64 and clamp a
390            // corrupt/garbage wire value to that domain so it can't overflow,
391            // sign-flip, or masquerade as a fabricated multi-million-hour value.
392            let total_hours_wide = i64::from(*days) * 24 + i64::from(*hours);
393            let total_hours = i32::try_from(total_hours_wide.min(838)).unwrap_or(838);
394            let h = if *is_negative {
395                -total_hours
396            } else {
397                total_hours
398            };
399            ColumnValue::Time(h, u32::from(*minutes), u32::from(*seconds), *micros)
400        }
401    }
402}
403
404/// Extracts column definitions from a `TableMapEvent`.
405#[cfg(feature = "mysql-cdc")]
406fn extract_columns_from_tme(
407    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
408) -> Vec<MySqlColumn> {
409    #[allow(clippy::cast_possible_truncation)]
410    let count = tme.columns_count() as usize;
411    let null_bitmap = tme.null_bitmask();
412
413    // Try to extract column names from optional metadata.
414    let col_names = extract_column_names(tme);
415
416    (0..count)
417        .map(|i| {
418            let type_id = tme
419                .get_raw_column_type(i)
420                .ok()
421                .flatten()
422                .map_or(0xFF, |ct| ct as u8);
423
424            let nullable = null_bitmap.get(i).as_deref().copied().unwrap_or(true);
425
426            let name = col_names
427                .get(i)
428                .cloned()
429                .unwrap_or_else(|| format!("col_{i}"));
430
431            MySqlColumn::new(name, type_id, 0, nullable, false)
432        })
433        .collect()
434}
435
436/// Extracts column names from TABLE_MAP optional metadata.
437#[cfg(feature = "mysql-cdc")]
438fn extract_column_names(tme: &mysql_async::binlog::events::TableMapEvent<'_>) -> Vec<String> {
439    let mut names = Vec::new();
440    for meta in tme.iter_optional_meta().flatten() {
441        use mysql_async::binlog::events::OptionalMetadataField;
442        if let OptionalMetadataField::ColumnName(col_names) = meta {
443            for name in col_names.iter_names().flatten() {
444                names.push(name.name().into_owned());
445            }
446        }
447    }
448    names
449}
450
451/// Decodes rows from a WriteRows event (after images only).
452#[cfg(feature = "mysql-cdc")]
453fn decode_rows_after(
454    rows_event: &RowsEventData<'_>,
455    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
456) -> Result<Vec<RowData>, ConnectorError> {
457    let mut rows = Vec::new();
458    for row_result in rows_event.rows(tme) {
459        let (_before, after) = row_result
460            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
461        if let Some(row) = after {
462            rows.push(binlog_row_to_row_data(&row));
463        }
464    }
465    Ok(rows)
466}
467
468/// Decodes rows from a DeleteRows event (before images only).
469#[cfg(feature = "mysql-cdc")]
470fn decode_rows_before(
471    rows_event: &RowsEventData<'_>,
472    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
473) -> Result<Vec<RowData>, ConnectorError> {
474    let mut rows = Vec::new();
475    for row_result in rows_event.rows(tme) {
476        let (before, _after) = row_result
477            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
478        if let Some(row) = before {
479            rows.push(binlog_row_to_row_data(&row));
480        }
481    }
482    Ok(rows)
483}
484
485/// Decodes rows from an UpdateRows event (before + after image pairs).
486#[cfg(feature = "mysql-cdc")]
487fn decode_rows_update(
488    rows_event: &RowsEventData<'_>,
489    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
490) -> Result<Vec<UpdateRowData>, ConnectorError> {
491    let mut rows = Vec::new();
492    for row_result in rows_event.rows(tme) {
493        let (before, after) = row_result
494            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
495
496        let before_data = before.map_or_else(
497            || RowData {
498                columns: Vec::new(),
499            },
500            |r| binlog_row_to_row_data(&r),
501        );
502
503        let after_data = after.map_or_else(
504            || RowData {
505                columns: Vec::new(),
506            },
507            |r| binlog_row_to_row_data(&r),
508        );
509
510        rows.push(UpdateRowData {
511            before: before_data,
512            after: after_data,
513        });
514    }
515    Ok(rows)
516}
517
518/// Converts a `BinlogRow` to our `RowData` type.
519#[cfg(feature = "mysql-cdc")]
520fn binlog_row_to_row_data(row: &BinlogRow) -> RowData {
521    let columns = (0..row.len())
522        .map(|i| {
523            row.as_ref(i)
524                .map_or(ColumnValue::Null, binlog_value_to_column_value)
525        })
526        .collect();
527    RowData { columns }
528}
529
530/// Converts a 16-byte SID and GNO to our `Gtid` type.
531#[cfg(feature = "mysql-cdc")]
532fn sid_bytes_to_gtid(sid: [u8; 16], gno: u64) -> Gtid {
533    let uuid = uuid::Uuid::from_bytes(sid);
534    Gtid::new(uuid, gno)
535}
536
537/// Converts our `GtidSet` to mysql_async `Sid` items for `BinlogStreamRequest`.
538#[cfg(feature = "mysql-cdc")]
539fn gtid_set_to_sids(gtid_set: &GtidSet) -> Vec<Sid<'static>> {
540    let mut sids = Vec::new();
541
542    for (source_id, ranges) in gtid_set.iter_sets() {
543        let uuid_bytes = source_id.into_bytes();
544        let mut sid = Sid::new(uuid_bytes);
545
546        for range in ranges {
547            // GnoInterval is [start, end) in mysql_async but our GtidRange is [start, end] inclusive.
548            let interval = GnoInterval::new(range.start, range.end + 1);
549            sid = sid.with_interval(interval);
550        }
551
552        sids.push(sid);
553    }
554
555    sids
556}
557
558/// Builds SSL options from our config.
559///
560/// This is a helper for testing; the main `connect()` function handles SSL inline.
561#[cfg(feature = "mysql-cdc")]
562#[must_use]
563pub fn build_ssl_opts(ssl_mode: &SslMode) -> Option<mysql_async::SslOpts> {
564    match ssl_mode {
565        SslMode::Disabled => None,
566        SslMode::Preferred => {
567            Some(mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true))
568        }
569        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
570            Some(mysql_async::SslOpts::default())
571        }
572    }
573}
574
575/// Builds an `OptsBuilder` from our config (for testing).
576#[cfg(feature = "mysql-cdc")]
577#[must_use]
578pub fn build_opts(config: &MySqlCdcConfig) -> OptsBuilder {
579    let mut opts = OptsBuilder::default()
580        .ip_or_hostname(&config.host)
581        .tcp_port(config.port)
582        .user(Some(&config.username))
583        .prefer_socket(Some(false));
584
585    if let Some(ref password) = config.password {
586        opts = opts.pass(Some(password));
587    }
588
589    if let Some(ref database) = config.database {
590        opts = opts.db_name(Some(database));
591    }
592
593    if let Some(ssl_opts) = build_ssl_opts(&config.ssl_mode) {
594        opts = opts.ssl_opts(Some(ssl_opts));
595    }
596
597    opts
598}
599
600/// Builds a `BinlogStreamRequest` from our config (for testing).
601#[cfg(feature = "mysql-cdc")]
602#[must_use]
603pub fn build_binlog_request<'a>(
604    config: &'a MySqlCdcConfig,
605    gtid_set: Option<&GtidSet>,
606    position: Option<&'a BinlogPosition>,
607) -> BinlogStreamRequest<'a> {
608    let mut request = BinlogStreamRequest::new(config.server_id);
609
610    if config.use_gtid {
611        request = request.with_gtid();
612
613        if let Some(gtid_set) = gtid_set {
614            let sids = gtid_set_to_sids(gtid_set);
615            if !sids.is_empty() {
616                request = request.with_gtid_set(sids);
617            }
618        }
619    } else if let Some(pos) = position {
620        request = request
621            .with_filename(pos.filename.as_bytes())
622            .with_pos(pos.position);
623    } else if let Some(ref filename) = config.binlog_filename {
624        let pos = config.binlog_position.unwrap_or(4);
625        request = request.with_filename(filename.as_bytes()).with_pos(pos);
626    }
627
628    request
629}
630
631// ============================================================================
632// Tests (require mysql-cdc feature)
633// ============================================================================
634
635#[cfg(all(test, feature = "mysql-cdc"))]
636mod tests {
637    use super::*;
638    use mysql_async::Opts;
639
640    fn test_config() -> MySqlCdcConfig {
641        MySqlCdcConfig {
642            host: "localhost".to_string(),
643            port: 3306,
644            database: Some("testdb".to_string()),
645            username: "root".to_string(),
646            password: Some("secret".to_string()),
647            server_id: 12345,
648            ..Default::default()
649        }
650    }
651
652    #[test]
653    fn test_connect_builds_opts() {
654        let config = test_config();
655        let opts_builder = build_opts(&config);
656
657        // Convert to Opts to verify values.
658        let opts: Opts = opts_builder.into();
659
660        assert_eq!(opts.ip_or_hostname(), "localhost");
661        assert_eq!(opts.tcp_port(), 3306);
662        assert_eq!(opts.user(), Some("root"));
663        assert_eq!(opts.pass(), Some("secret"));
664        assert_eq!(opts.db_name(), Some("testdb"));
665    }
666
667    #[test]
668    fn test_connect_builds_opts_no_password() {
669        let config = MySqlCdcConfig {
670            host: "dbhost".to_string(),
671            port: 3307,
672            database: None,
673            username: "repl".to_string(),
674            password: None,
675            server_id: 999,
676            ..Default::default()
677        };
678        let opts_builder = build_opts(&config);
679        let opts: Opts = opts_builder.into();
680
681        assert_eq!(opts.ip_or_hostname(), "dbhost");
682        assert_eq!(opts.tcp_port(), 3307);
683        assert_eq!(opts.user(), Some("repl"));
684        assert_eq!(opts.pass(), None);
685        assert_eq!(opts.db_name(), None);
686    }
687
688    #[test]
689    fn test_ssl_opts_disabled() {
690        let ssl = build_ssl_opts(&SslMode::Disabled);
691        assert!(ssl.is_none());
692    }
693
694    #[test]
695    fn test_ssl_opts_preferred() {
696        let ssl = build_ssl_opts(&SslMode::Preferred);
697        assert!(ssl.is_some());
698        let ssl = ssl.unwrap();
699        assert!(ssl.accept_invalid_certs());
700    }
701
702    #[test]
703    fn test_ssl_opts_required() {
704        let ssl = build_ssl_opts(&SslMode::Required);
705        assert!(ssl.is_some());
706        let ssl = ssl.unwrap();
707        assert!(!ssl.accept_invalid_certs());
708    }
709
710    #[test]
711    fn test_ssl_opts_verify_ca() {
712        let ssl = build_ssl_opts(&SslMode::VerifyCa);
713        assert!(ssl.is_some());
714    }
715
716    #[test]
717    fn test_ssl_opts_verify_identity() {
718        let ssl = build_ssl_opts(&SslMode::VerifyIdentity);
719        assert!(ssl.is_some());
720    }
721
722    #[test]
723    fn test_binlog_request_gtid_mode() {
724        let config = MySqlCdcConfig {
725            server_id: 999,
726            use_gtid: true,
727            ..Default::default()
728        };
729
730        // Without GTID set — should still work.
731        let _request = build_binlog_request(&config, None, None);
732
733        // With GTID set.
734        let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap();
735        let _request = build_binlog_request(&config, Some(&gtid_set), None);
736    }
737
738    #[test]
739    fn test_binlog_request_file_position() {
740        let config = MySqlCdcConfig {
741            server_id: 999,
742            use_gtid: false,
743            ..Default::default()
744        };
745
746        let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
747        let _request = build_binlog_request(&config, None, Some(&pos));
748    }
749
750    #[test]
751    fn test_binlog_request_config_filename() {
752        let config = MySqlCdcConfig {
753            server_id: 999,
754            use_gtid: false,
755            binlog_filename: Some("mysql-bin.000001".to_string()),
756            binlog_position: Some(154),
757            ..Default::default()
758        };
759
760        let _request = build_binlog_request(&config, None, None);
761    }
762
763    #[test]
764    fn test_mysql_value_to_column_value_null() {
765        let val = mysql_value_to_column_value(&mysql_async::Value::NULL);
766        assert_eq!(val, ColumnValue::Null);
767    }
768
769    #[test]
770    fn test_mysql_value_to_column_value_int() {
771        let val = mysql_value_to_column_value(&mysql_async::Value::Int(-42));
772        assert_eq!(val, ColumnValue::SignedInt(-42));
773
774        let val = mysql_value_to_column_value(&mysql_async::Value::UInt(100));
775        assert_eq!(val, ColumnValue::UnsignedInt(100));
776    }
777
778    #[test]
779    fn test_mysql_value_to_column_value_float() {
780        let val = mysql_value_to_column_value(&mysql_async::Value::Float(3.14));
781        assert_eq!(val, ColumnValue::Float(3.14));
782
783        let val = mysql_value_to_column_value(&mysql_async::Value::Double(2.718));
784        assert_eq!(val, ColumnValue::Double(2.718));
785    }
786
787    #[test]
788    fn test_mysql_value_to_column_value_string() {
789        let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(b"hello world".to_vec()));
790        assert_eq!(val, ColumnValue::String("hello world".to_string()));
791    }
792
793    #[test]
794    fn test_mysql_value_to_column_value_bytes() {
795        // Non-UTF8 bytes should map to Bytes.
796        let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(vec![0xFF, 0xFE, 0xFD]));
797        assert_eq!(val, ColumnValue::Bytes(vec![0xFF, 0xFE, 0xFD]));
798    }
799
800    #[test]
801    fn test_mysql_value_to_column_value_date() {
802        // Date only (no time component).
803        let val = mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 0, 0, 0, 0));
804        assert_eq!(val, ColumnValue::Date(2024, 6, 15));
805
806        // DateTime (has time component).
807        let val =
808            mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 14, 30, 45, 0));
809        assert_eq!(val, ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0));
810    }
811
812    #[test]
813    fn test_mysql_value_to_column_value_time() {
814        let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 0, 14, 30, 45, 0));
815        assert_eq!(val, ColumnValue::Time(14, 30, 45, 0));
816
817        // Negative time.
818        let val = mysql_value_to_column_value(&mysql_async::Value::Time(true, 0, 2, 30, 0, 0));
819        assert_eq!(val, ColumnValue::Time(-2, 30, 0, 0));
820
821        // Time with days.
822        let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 1, 6, 0, 0, 0));
823        assert_eq!(val, ColumnValue::Time(30, 0, 0, 0));
824    }
825
826    #[test]
827    fn test_sid_bytes_to_gtid() {
828        let uuid = uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap();
829        let gtid = sid_bytes_to_gtid(uuid.into_bytes(), 42);
830        assert_eq!(gtid.source_id(), uuid);
831        assert_eq!(gtid.transaction_id(), 42);
832    }
833
834    #[test]
835    fn test_gtid_set_to_sids_empty() {
836        let gtid_set = GtidSet::new();
837        let sids = gtid_set_to_sids(&gtid_set);
838        assert!(sids.is_empty());
839    }
840
841    #[test]
842    fn test_gtid_set_to_sids_single() {
843        let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10".parse().unwrap();
844        let sids = gtid_set_to_sids(&gtid_set);
845        assert_eq!(sids.len(), 1);
846    }
847
848    #[test]
849    fn test_gtid_set_to_sids_multiple() {
850        let gtid_set: GtidSet =
851            "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10,4E11FA47-71CA-11E1-9E33-C80AA9429563:1-5"
852                .parse()
853                .unwrap();
854        let sids = gtid_set_to_sids(&gtid_set);
855        assert_eq!(sids.len(), 2);
856    }
857}