Skip to main content

laminar_connectors/cdc/mysql/
mysql_io.rs

1//! MySQL CDC binlog I/O.
2
3#[cfg(feature = "mysql-cdc")]
4use std::time::Duration;
5
6#[cfg(feature = "mysql-cdc")]
7use tokio_stream::StreamExt as _;
8
9#[cfg(feature = "mysql-cdc")]
10use mysql_async::binlog::events::{Event as MysqlEvent, EventData, RowsEventData};
11#[cfg(feature = "mysql-cdc")]
12use mysql_async::binlog::row::BinlogRow;
13#[cfg(feature = "mysql-cdc")]
14use mysql_async::binlog::value::BinlogValue;
15#[cfg(feature = "mysql-cdc")]
16use mysql_async::{BinlogStream, BinlogStreamRequest, Conn, GnoInterval, OptsBuilder, Sid};
17
18#[cfg(feature = "mysql-cdc")]
19use tracing::{debug, info};
20
21#[cfg(feature = "mysql-cdc")]
22use crate::error::ConnectorError;
23
24#[cfg(feature = "mysql-cdc")]
25use super::config::{MySqlCdcConfig, SslMode};
26#[cfg(feature = "mysql-cdc")]
27use super::decoder::{
28    BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DeleteMessage,
29    InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage, UpdateMessage,
30    UpdateRowData,
31};
32#[cfg(feature = "mysql-cdc")]
33use super::gtid::{Gtid, GtidSet};
34#[cfg(feature = "mysql-cdc")]
35use super::types::MySqlColumn;
36
37/// Connects to a MySQL server using the provided configuration.
38///
39/// Builds connection options from the config and establishes a TCP connection.
40///
41/// # Errors
42///
43/// Returns `ConnectorError::ConnectionFailed` if the connection cannot be established.
44#[cfg(feature = "mysql-cdc")]
45pub async fn connect(config: &MySqlCdcConfig) -> Result<Conn, ConnectorError> {
46    info!(
47        host = %config.host,
48        port = config.port,
49        user = %config.username,
50        ssl_mode = %config.ssl_mode,
51        "connecting to MySQL server"
52    );
53
54    let mut opts = OptsBuilder::default()
55        .ip_or_hostname(&config.host)
56        .tcp_port(config.port)
57        .user(Some(&config.username))
58        .prefer_socket(Some(false));
59
60    if let Some(ref password) = config.password {
61        opts = opts.pass(Some(password));
62    }
63
64    if let Some(ref database) = config.database {
65        opts = opts.db_name(Some(database));
66    }
67
68    // Configure SSL based on ssl_mode.
69    opts = match config.ssl_mode {
70        SslMode::Disabled => opts,
71        SslMode::Preferred => opts.ssl_opts(Some(
72            mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true),
73        )),
74        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
75            opts.ssl_opts(Some(mysql_async::SslOpts::default()))
76        }
77    };
78
79    let conn = Conn::new(opts).await.map_err(|e| {
80        ConnectorError::ConnectionFailed(format!(
81            "failed to connect to MySQL {}:{}: {}",
82            config.host, config.port, e
83        ))
84    })?;
85
86    info!(
87        host = %config.host,
88        port = config.port,
89        "connected to MySQL server"
90    );
91
92    Ok(conn)
93}
94
95/// Starts a binlog replication stream from the MySQL server.
96///
97/// The connection is consumed by the binlog stream (MySQL protocol requirement).
98///
99/// # Arguments
100///
101/// * `conn` - MySQL connection (consumed)
102/// * `config` - CDC configuration
103/// * `gtid_set` - Optional GTID set for GTID-based replication
104/// * `position` - Optional binlog position for file/position-based replication
105///
106/// # Errors
107///
108/// Returns `ConnectorError::ConnectionFailed` if the binlog stream cannot be started.
109#[cfg(feature = "mysql-cdc")]
110pub async fn start_binlog_stream(
111    conn: Conn,
112    config: &MySqlCdcConfig,
113    gtid_set: Option<&GtidSet>,
114    position: Option<&BinlogPosition>,
115) -> Result<BinlogStream, ConnectorError> {
116    let mut request = BinlogStreamRequest::new(config.server_id);
117
118    if config.use_gtid {
119        request = request.with_gtid();
120
121        // Add GTID set if available (resume from position).
122        if let Some(gtid_set) = gtid_set {
123            let sids = gtid_set_to_sids(gtid_set);
124            if sids.is_empty() {
125                info!("starting GTID-based binlog replication from beginning");
126            } else {
127                request = request.with_gtid_set(sids);
128                info!("starting GTID-based binlog replication from {}", gtid_set);
129            }
130        } else {
131            info!("starting GTID-based binlog replication from beginning");
132        }
133    } else {
134        // File/position-based replication.
135        if let Some(pos) = position {
136            request = request
137                .with_filename(pos.filename.as_bytes())
138                .with_pos(pos.position);
139            info!(
140                "starting binlog replication from {}:{}",
141                pos.filename, pos.position
142            );
143        } else if let Some(ref filename) = config.binlog_filename {
144            let pos = config.binlog_position.unwrap_or(4);
145            request = request.with_filename(filename.as_bytes()).with_pos(pos);
146            info!("starting binlog replication from {}:{}", filename, pos);
147        } else {
148            info!("starting binlog replication from current position");
149        }
150    }
151
152    let stream = conn.get_binlog_stream(request).await.map_err(|e| {
153        ConnectorError::ConnectionFailed(format!("failed to start binlog stream: {e}"))
154    })?;
155
156    Ok(stream)
157}
158
159/// Reads binlog events from the stream with a timeout.
160///
161/// Returns up to `max_events` events, or fewer if the timeout expires.
162///
163/// # Errors
164///
165/// Returns `ConnectorError::ReadError` if an I/O error occurs.
166#[cfg(feature = "mysql-cdc")]
167pub async fn read_events(
168    stream: &mut BinlogStream,
169    max_events: usize,
170    timeout: Duration,
171) -> Result<Vec<MysqlEvent>, ConnectorError> {
172    let mut events = Vec::with_capacity(max_events.min(64));
173    let deadline = tokio::time::Instant::now() + timeout;
174
175    loop {
176        if events.len() >= max_events {
177            break;
178        }
179
180        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
181        if remaining.is_zero() {
182            break;
183        }
184
185        match tokio::time::timeout(remaining, stream.next()).await {
186            Ok(Some(Ok(event))) => {
187                events.push(event);
188            }
189            Ok(Some(Err(e))) => {
190                // I/O error reading from binlog stream.
191                return Err(ConnectorError::ReadError(format!(
192                    "binlog stream error: {e}"
193                )));
194            }
195            Ok(None) => {
196                // Stream ended (server disconnected or NON_BLOCK mode).
197                debug!("binlog stream ended");
198                break;
199            }
200            Err(_) => {
201                // Timeout - return whatever we have.
202                break;
203            }
204        }
205    }
206
207    Ok(events)
208}
209
210/// Decodes a mysql_async binlog event into our internal `BinlogMessage` type.
211///
212/// Returns `None` for unsupported or irrelevant event types (e.g., FORMAT_DESCRIPTION).
213///
214/// # Arguments
215///
216/// * `event` - The raw binlog event from mysql_async
217/// * `stream` - The binlog stream (provides TABLE_MAP cache via `get_tme()`)
218///
219/// # Errors
220///
221/// Returns `ConnectorError::Internal` if event data cannot be parsed.
222#[cfg(feature = "mysql-cdc")]
223pub fn decode_binlog_event(
224    event: &MysqlEvent,
225    stream: &BinlogStream,
226) -> Result<Option<BinlogMessage>, ConnectorError> {
227    let header = event.header();
228    let timestamp_ms = i64::from(header.timestamp()) * 1000;
229    let log_pos = u64::from(header.log_pos());
230
231    let event_data = event
232        .read_data()
233        .map_err(|e| ConnectorError::Internal(format!("failed to parse binlog event: {e}")))?;
234
235    let Some(event_data) = event_data else {
236        return Ok(None);
237    };
238
239    match event_data {
240        EventData::TableMapEvent(tme) => {
241            let columns = extract_columns_from_tme(&tme);
242            Ok(Some(BinlogMessage::TableMap(TableMapMessage {
243                table_id: tme.table_id(),
244                database: tme.database_name().into_owned(),
245                table: tme.table_name().into_owned(),
246                columns,
247            })))
248        }
249
250        EventData::RowsEvent(rows_event) => {
251            let table_id = rows_event.table_id();
252
253            // Get the TABLE_MAP event from the stream's internal cache.
254            let tme = stream.get_tme(table_id).ok_or_else(|| {
255                ConnectorError::Internal(format!("missing TABLE_MAP for table_id {table_id}"))
256            })?;
257
258            let database = tme.database_name().into_owned();
259            let table = tme.table_name().into_owned();
260
261            match &rows_event {
262                RowsEventData::WriteRowsEvent(_) | RowsEventData::WriteRowsEventV1(_) => {
263                    let rows = decode_rows_after(&rows_event, tme)?;
264                    Ok(Some(BinlogMessage::Insert(InsertMessage {
265                        table_id,
266                        database,
267                        table,
268                        rows,
269                        binlog_position: log_pos,
270                        timestamp_ms,
271                    })))
272                }
273
274                RowsEventData::UpdateRowsEvent(_)
275                | RowsEventData::UpdateRowsEventV1(_)
276                | RowsEventData::PartialUpdateRowsEvent(_) => {
277                    let rows = decode_rows_update(&rows_event, tme)?;
278                    Ok(Some(BinlogMessage::Update(UpdateMessage {
279                        table_id,
280                        database,
281                        table,
282                        rows,
283                        binlog_position: log_pos,
284                        timestamp_ms,
285                    })))
286                }
287
288                RowsEventData::DeleteRowsEvent(_) | RowsEventData::DeleteRowsEventV1(_) => {
289                    let rows = decode_rows_before(&rows_event, tme)?;
290                    Ok(Some(BinlogMessage::Delete(DeleteMessage {
291                        table_id,
292                        database,
293                        table,
294                        rows,
295                        binlog_position: log_pos,
296                        timestamp_ms,
297                    })))
298                }
299            }
300        }
301
302        EventData::QueryEvent(qe) => Ok(Some(BinlogMessage::Query(QueryMessage {
303            database: qe.schema().into_owned(),
304            query: qe.query().into_owned(),
305            binlog_position: log_pos,
306            timestamp_ms,
307        }))),
308
309        EventData::GtidEvent(ge) => {
310            let gtid = sid_bytes_to_gtid(ge.sid(), ge.gno());
311            Ok(Some(BinlogMessage::Begin(BeginMessage {
312                gtid: Some(gtid),
313                binlog_filename: String::new(), // Filled by caller from stream state.
314                binlog_position: log_pos,
315                timestamp_ms,
316            })))
317        }
318
319        EventData::AnonymousGtidEvent(_) => Ok(Some(BinlogMessage::Begin(BeginMessage {
320            gtid: None,
321            binlog_filename: String::new(),
322            binlog_position: log_pos,
323            timestamp_ms,
324        }))),
325
326        EventData::RotateEvent(re) => Ok(Some(BinlogMessage::Rotate(RotateMessage {
327            next_binlog: re.name().into_owned(),
328            position: re.position(),
329        }))),
330
331        EventData::XidEvent(xe) => Ok(Some(BinlogMessage::Commit(CommitMessage {
332            xid: xe.xid,
333            binlog_position: log_pos,
334            timestamp_ms,
335        }))),
336
337        EventData::HeartbeatEvent => Ok(Some(BinlogMessage::Heartbeat)),
338
339        // All other event types are ignored (FORMAT_DESCRIPTION, PREVIOUS_GTIDS, etc.).
340        _ => Ok(None),
341    }
342}
343
344/// Converts a `mysql_async` `BinlogValue` to our `ColumnValue` enum.
345#[cfg(feature = "mysql-cdc")]
346fn binlog_value_to_column_value(val: &BinlogValue<'_>) -> ColumnValue {
347    match val {
348        BinlogValue::Value(v) => mysql_value_to_column_value(v),
349        BinlogValue::Jsonb(jsonb_val) => ColumnValue::Json(format!("{jsonb_val:?}")),
350        BinlogValue::JsonDiff(_diffs) => {
351            // Partial JSON updates (MySQL 8.0.20+) — represent as JSON string.
352            ColumnValue::Json("{}".to_string())
353        }
354    }
355}
356
357/// Converts a `mysql_common::value::Value` to our `ColumnValue` enum.
358#[cfg(feature = "mysql-cdc")]
359fn mysql_value_to_column_value(val: &mysql_async::Value) -> ColumnValue {
360    match val {
361        mysql_async::Value::NULL => ColumnValue::Null,
362        mysql_async::Value::Int(v) => ColumnValue::SignedInt(*v),
363        mysql_async::Value::UInt(v) => ColumnValue::UnsignedInt(*v),
364        mysql_async::Value::Float(v) => ColumnValue::Float(*v),
365        mysql_async::Value::Double(v) => ColumnValue::Double(*v),
366        mysql_async::Value::Bytes(b) => {
367            // Try to interpret as UTF-8 string first.
368            match String::from_utf8(b.clone()) {
369                Ok(s) => ColumnValue::String(s),
370                Err(_) => ColumnValue::Bytes(b.clone()),
371            }
372        }
373        mysql_async::Value::Date(year, month, day, hour, min, sec, micro) => {
374            if *hour == 0 && *min == 0 && *sec == 0 && *micro == 0 {
375                ColumnValue::Date(i32::from(*year), u32::from(*month), u32::from(*day))
376            } else {
377                ColumnValue::DateTime(
378                    i32::from(*year),
379                    u32::from(*month),
380                    u32::from(*day),
381                    u32::from(*hour),
382                    u32::from(*min),
383                    u32::from(*sec),
384                    *micro,
385                )
386            }
387        }
388        mysql_async::Value::Time(is_negative, days, hours, minutes, seconds, micros) => {
389            // Convert days+hours to total hours.
390            // days is u32 and hours is u8, product fits in i32.
391            #[allow(clippy::cast_possible_wrap)]
392            let total_hours = (*days as i32) * 24 + i32::from(*hours);
393            let h = if *is_negative {
394                -total_hours
395            } else {
396                total_hours
397            };
398            ColumnValue::Time(h, u32::from(*minutes), u32::from(*seconds), *micros)
399        }
400    }
401}
402
403/// Extracts column definitions from a `TableMapEvent`.
404#[cfg(feature = "mysql-cdc")]
405fn extract_columns_from_tme(
406    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
407) -> Vec<MySqlColumn> {
408    #[allow(clippy::cast_possible_truncation)]
409    let count = tme.columns_count() as usize;
410    let null_bitmap = tme.null_bitmask();
411
412    // Try to extract column names from optional metadata.
413    let col_names = extract_column_names(tme);
414
415    (0..count)
416        .map(|i| {
417            let type_id = tme
418                .get_raw_column_type(i)
419                .ok()
420                .flatten()
421                .map_or(0xFF, |ct| ct as u8);
422
423            let nullable = null_bitmap.get(i).as_deref().copied().unwrap_or(true);
424
425            let name = col_names
426                .get(i)
427                .cloned()
428                .unwrap_or_else(|| format!("col_{i}"));
429
430            MySqlColumn::new(name, type_id, 0, nullable, false)
431        })
432        .collect()
433}
434
435/// Extracts column names from TABLE_MAP optional metadata.
436#[cfg(feature = "mysql-cdc")]
437fn extract_column_names(tme: &mysql_async::binlog::events::TableMapEvent<'_>) -> Vec<String> {
438    let mut names = Vec::new();
439    for meta in tme.iter_optional_meta().flatten() {
440        use mysql_async::binlog::events::OptionalMetadataField;
441        if let OptionalMetadataField::ColumnName(col_names) = meta {
442            for name in col_names.iter_names().flatten() {
443                names.push(name.name().into_owned());
444            }
445        }
446    }
447    names
448}
449
450/// Decodes rows from a WriteRows event (after images only).
451#[cfg(feature = "mysql-cdc")]
452fn decode_rows_after(
453    rows_event: &RowsEventData<'_>,
454    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
455) -> Result<Vec<RowData>, ConnectorError> {
456    let mut rows = Vec::new();
457    for row_result in rows_event.rows(tme) {
458        let (_before, after) = row_result
459            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
460        if let Some(row) = after {
461            rows.push(binlog_row_to_row_data(&row));
462        }
463    }
464    Ok(rows)
465}
466
467/// Decodes rows from a DeleteRows event (before images only).
468#[cfg(feature = "mysql-cdc")]
469fn decode_rows_before(
470    rows_event: &RowsEventData<'_>,
471    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
472) -> Result<Vec<RowData>, ConnectorError> {
473    let mut rows = Vec::new();
474    for row_result in rows_event.rows(tme) {
475        let (before, _after) = row_result
476            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
477        if let Some(row) = before {
478            rows.push(binlog_row_to_row_data(&row));
479        }
480    }
481    Ok(rows)
482}
483
484/// Decodes rows from an UpdateRows event (before + after image pairs).
485#[cfg(feature = "mysql-cdc")]
486fn decode_rows_update(
487    rows_event: &RowsEventData<'_>,
488    tme: &mysql_async::binlog::events::TableMapEvent<'_>,
489) -> Result<Vec<UpdateRowData>, ConnectorError> {
490    let mut rows = Vec::new();
491    for row_result in rows_event.rows(tme) {
492        let (before, after) = row_result
493            .map_err(|e| ConnectorError::Internal(format!("failed to decode row: {e}")))?;
494
495        let before_data = before.map_or_else(
496            || RowData {
497                columns: Vec::new(),
498            },
499            |r| binlog_row_to_row_data(&r),
500        );
501
502        let after_data = after.map_or_else(
503            || RowData {
504                columns: Vec::new(),
505            },
506            |r| binlog_row_to_row_data(&r),
507        );
508
509        rows.push(UpdateRowData {
510            before: before_data,
511            after: after_data,
512        });
513    }
514    Ok(rows)
515}
516
517/// Converts a `BinlogRow` to our `RowData` type.
518#[cfg(feature = "mysql-cdc")]
519fn binlog_row_to_row_data(row: &BinlogRow) -> RowData {
520    let columns = (0..row.len())
521        .map(|i| {
522            row.as_ref(i)
523                .map_or(ColumnValue::Null, binlog_value_to_column_value)
524        })
525        .collect();
526    RowData { columns }
527}
528
529/// Converts a 16-byte SID and GNO to our `Gtid` type.
530#[cfg(feature = "mysql-cdc")]
531fn sid_bytes_to_gtid(sid: [u8; 16], gno: u64) -> Gtid {
532    let uuid = uuid::Uuid::from_bytes(sid);
533    Gtid::new(uuid, gno)
534}
535
536/// Converts our `GtidSet` to mysql_async `Sid` items for `BinlogStreamRequest`.
537#[cfg(feature = "mysql-cdc")]
538fn gtid_set_to_sids(gtid_set: &GtidSet) -> Vec<Sid<'static>> {
539    let mut sids = Vec::new();
540
541    for (source_id, ranges) in gtid_set.iter_sets() {
542        let uuid_bytes = source_id.into_bytes();
543        let mut sid = Sid::new(uuid_bytes);
544
545        for range in ranges {
546            // GnoInterval is [start, end) in mysql_async but our GtidRange is [start, end] inclusive.
547            let interval = GnoInterval::new(range.start, range.end + 1);
548            sid = sid.with_interval(interval);
549        }
550
551        sids.push(sid);
552    }
553
554    sids
555}
556
557/// Builds SSL options from our config.
558///
559/// This is a helper for testing; the main `connect()` function handles SSL inline.
560#[cfg(feature = "mysql-cdc")]
561#[must_use]
562pub fn build_ssl_opts(ssl_mode: &SslMode) -> Option<mysql_async::SslOpts> {
563    match ssl_mode {
564        SslMode::Disabled => None,
565        SslMode::Preferred => {
566            Some(mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true))
567        }
568        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyIdentity => {
569            Some(mysql_async::SslOpts::default())
570        }
571    }
572}
573
574/// Builds an `OptsBuilder` from our config (for testing).
575#[cfg(feature = "mysql-cdc")]
576#[must_use]
577pub fn build_opts(config: &MySqlCdcConfig) -> OptsBuilder {
578    let mut opts = OptsBuilder::default()
579        .ip_or_hostname(&config.host)
580        .tcp_port(config.port)
581        .user(Some(&config.username))
582        .prefer_socket(Some(false));
583
584    if let Some(ref password) = config.password {
585        opts = opts.pass(Some(password));
586    }
587
588    if let Some(ref database) = config.database {
589        opts = opts.db_name(Some(database));
590    }
591
592    if let Some(ssl_opts) = build_ssl_opts(&config.ssl_mode) {
593        opts = opts.ssl_opts(Some(ssl_opts));
594    }
595
596    opts
597}
598
599/// Builds a `BinlogStreamRequest` from our config (for testing).
600#[cfg(feature = "mysql-cdc")]
601#[must_use]
602pub fn build_binlog_request<'a>(
603    config: &'a MySqlCdcConfig,
604    gtid_set: Option<&GtidSet>,
605    position: Option<&'a BinlogPosition>,
606) -> BinlogStreamRequest<'a> {
607    let mut request = BinlogStreamRequest::new(config.server_id);
608
609    if config.use_gtid {
610        request = request.with_gtid();
611
612        if let Some(gtid_set) = gtid_set {
613            let sids = gtid_set_to_sids(gtid_set);
614            if !sids.is_empty() {
615                request = request.with_gtid_set(sids);
616            }
617        }
618    } else if let Some(pos) = position {
619        request = request
620            .with_filename(pos.filename.as_bytes())
621            .with_pos(pos.position);
622    } else if let Some(ref filename) = config.binlog_filename {
623        let pos = config.binlog_position.unwrap_or(4);
624        request = request.with_filename(filename.as_bytes()).with_pos(pos);
625    }
626
627    request
628}
629
630// ============================================================================
631// Tests (require mysql-cdc feature)
632// ============================================================================
633
634#[cfg(all(test, feature = "mysql-cdc"))]
635mod tests {
636    use super::*;
637    use mysql_async::Opts;
638
639    fn test_config() -> MySqlCdcConfig {
640        MySqlCdcConfig {
641            host: "localhost".to_string(),
642            port: 3306,
643            database: Some("testdb".to_string()),
644            username: "root".to_string(),
645            password: Some("secret".to_string()),
646            server_id: 12345,
647            ..Default::default()
648        }
649    }
650
651    #[test]
652    fn test_connect_builds_opts() {
653        let config = test_config();
654        let opts_builder = build_opts(&config);
655
656        // Convert to Opts to verify values.
657        let opts: Opts = opts_builder.into();
658
659        assert_eq!(opts.ip_or_hostname(), "localhost");
660        assert_eq!(opts.tcp_port(), 3306);
661        assert_eq!(opts.user(), Some("root"));
662        assert_eq!(opts.pass(), Some("secret"));
663        assert_eq!(opts.db_name(), Some("testdb"));
664    }
665
666    #[test]
667    fn test_connect_builds_opts_no_password() {
668        let config = MySqlCdcConfig {
669            host: "dbhost".to_string(),
670            port: 3307,
671            database: None,
672            username: "repl".to_string(),
673            password: None,
674            server_id: 999,
675            ..Default::default()
676        };
677        let opts_builder = build_opts(&config);
678        let opts: Opts = opts_builder.into();
679
680        assert_eq!(opts.ip_or_hostname(), "dbhost");
681        assert_eq!(opts.tcp_port(), 3307);
682        assert_eq!(opts.user(), Some("repl"));
683        assert_eq!(opts.pass(), None);
684        assert_eq!(opts.db_name(), None);
685    }
686
687    #[test]
688    fn test_ssl_opts_disabled() {
689        let ssl = build_ssl_opts(&SslMode::Disabled);
690        assert!(ssl.is_none());
691    }
692
693    #[test]
694    fn test_ssl_opts_preferred() {
695        let ssl = build_ssl_opts(&SslMode::Preferred);
696        assert!(ssl.is_some());
697        let ssl = ssl.unwrap();
698        assert!(ssl.accept_invalid_certs());
699    }
700
701    #[test]
702    fn test_ssl_opts_required() {
703        let ssl = build_ssl_opts(&SslMode::Required);
704        assert!(ssl.is_some());
705        let ssl = ssl.unwrap();
706        assert!(!ssl.accept_invalid_certs());
707    }
708
709    #[test]
710    fn test_ssl_opts_verify_ca() {
711        let ssl = build_ssl_opts(&SslMode::VerifyCa);
712        assert!(ssl.is_some());
713    }
714
715    #[test]
716    fn test_ssl_opts_verify_identity() {
717        let ssl = build_ssl_opts(&SslMode::VerifyIdentity);
718        assert!(ssl.is_some());
719    }
720
721    #[test]
722    fn test_binlog_request_gtid_mode() {
723        let config = MySqlCdcConfig {
724            server_id: 999,
725            use_gtid: true,
726            ..Default::default()
727        };
728
729        // Without GTID set — should still work.
730        let _request = build_binlog_request(&config, None, None);
731
732        // With GTID set.
733        let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap();
734        let _request = build_binlog_request(&config, Some(&gtid_set), None);
735    }
736
737    #[test]
738    fn test_binlog_request_file_position() {
739        let config = MySqlCdcConfig {
740            server_id: 999,
741            use_gtid: false,
742            ..Default::default()
743        };
744
745        let pos = BinlogPosition::new("mysql-bin.000003".to_string(), 12345);
746        let _request = build_binlog_request(&config, None, Some(&pos));
747    }
748
749    #[test]
750    fn test_binlog_request_config_filename() {
751        let config = MySqlCdcConfig {
752            server_id: 999,
753            use_gtid: false,
754            binlog_filename: Some("mysql-bin.000001".to_string()),
755            binlog_position: Some(154),
756            ..Default::default()
757        };
758
759        let _request = build_binlog_request(&config, None, None);
760    }
761
762    #[test]
763    fn test_mysql_value_to_column_value_null() {
764        let val = mysql_value_to_column_value(&mysql_async::Value::NULL);
765        assert_eq!(val, ColumnValue::Null);
766    }
767
768    #[test]
769    fn test_mysql_value_to_column_value_int() {
770        let val = mysql_value_to_column_value(&mysql_async::Value::Int(-42));
771        assert_eq!(val, ColumnValue::SignedInt(-42));
772
773        let val = mysql_value_to_column_value(&mysql_async::Value::UInt(100));
774        assert_eq!(val, ColumnValue::UnsignedInt(100));
775    }
776
777    #[test]
778    fn test_mysql_value_to_column_value_float() {
779        let val = mysql_value_to_column_value(&mysql_async::Value::Float(3.14));
780        assert_eq!(val, ColumnValue::Float(3.14));
781
782        let val = mysql_value_to_column_value(&mysql_async::Value::Double(2.718));
783        assert_eq!(val, ColumnValue::Double(2.718));
784    }
785
786    #[test]
787    fn test_mysql_value_to_column_value_string() {
788        let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(b"hello world".to_vec()));
789        assert_eq!(val, ColumnValue::String("hello world".to_string()));
790    }
791
792    #[test]
793    fn test_mysql_value_to_column_value_bytes() {
794        // Non-UTF8 bytes should map to Bytes.
795        let val = mysql_value_to_column_value(&mysql_async::Value::Bytes(vec![0xFF, 0xFE, 0xFD]));
796        assert_eq!(val, ColumnValue::Bytes(vec![0xFF, 0xFE, 0xFD]));
797    }
798
799    #[test]
800    fn test_mysql_value_to_column_value_date() {
801        // Date only (no time component).
802        let val = mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 0, 0, 0, 0));
803        assert_eq!(val, ColumnValue::Date(2024, 6, 15));
804
805        // DateTime (has time component).
806        let val =
807            mysql_value_to_column_value(&mysql_async::Value::Date(2024, 6, 15, 14, 30, 45, 0));
808        assert_eq!(val, ColumnValue::DateTime(2024, 6, 15, 14, 30, 45, 0));
809    }
810
811    #[test]
812    fn test_mysql_value_to_column_value_time() {
813        let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 0, 14, 30, 45, 0));
814        assert_eq!(val, ColumnValue::Time(14, 30, 45, 0));
815
816        // Negative time.
817        let val = mysql_value_to_column_value(&mysql_async::Value::Time(true, 0, 2, 30, 0, 0));
818        assert_eq!(val, ColumnValue::Time(-2, 30, 0, 0));
819
820        // Time with days.
821        let val = mysql_value_to_column_value(&mysql_async::Value::Time(false, 1, 6, 0, 0, 0));
822        assert_eq!(val, ColumnValue::Time(30, 0, 0, 0));
823    }
824
825    #[test]
826    fn test_sid_bytes_to_gtid() {
827        let uuid = uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap();
828        let gtid = sid_bytes_to_gtid(uuid.into_bytes(), 42);
829        assert_eq!(gtid.source_id(), uuid);
830        assert_eq!(gtid.transaction_id(), 42);
831    }
832
833    #[test]
834    fn test_gtid_set_to_sids_empty() {
835        let gtid_set = GtidSet::new();
836        let sids = gtid_set_to_sids(&gtid_set);
837        assert!(sids.is_empty());
838    }
839
840    #[test]
841    fn test_gtid_set_to_sids_single() {
842        let gtid_set: GtidSet = "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10".parse().unwrap();
843        let sids = gtid_set_to_sids(&gtid_set);
844        assert_eq!(sids.len(), 1);
845    }
846
847    #[test]
848    fn test_gtid_set_to_sids_multiple() {
849        let gtid_set: GtidSet =
850            "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10,4E11FA47-71CA-11E1-9E33-C80AA9429563:1-5"
851                .parse()
852                .unwrap();
853        let sids = gtid_set_to_sids(&gtid_set);
854        assert_eq!(sids.len(), 2);
855    }
856}