Skip to main content

laminar_connectors/cdc/mysql/
source.rs

1//! MySQL CDC source connector implementation.
2//!
3//! Implements the [`SourceConnector`] trait for MySQL binlog replication.
4//! This module provides the main entry point for MySQL CDC: [`MySqlCdcSource`].
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use arrow_schema::{Schema, SchemaRef};
11use async_trait::async_trait;
12use tokio::sync::Notify;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::metrics::ConnectorMetrics;
20
21use super::changelog::ChangeEvent;
22use super::config::MySqlCdcConfig;
23use super::decoder::{BinlogMessage, BinlogPosition};
24use super::gtid::GtidSet;
25use super::metrics::MySqlCdcMetrics;
26use super::schema::{cdc_envelope_schema, TableCache, TableInfo};
27
28/// MySQL binlog CDC source connector.
29///
30/// Reads change events from MySQL binary log using replication protocol.
31/// Supports both GTID-based and file/position-based replication.
32///
33/// # Example
34///
35/// ```ignore
36/// use laminar_connectors::cdc::mysql::{MySqlCdcSource, MySqlCdcConfig};
37///
38/// let config = MySqlCdcConfig {
39///     host: "localhost".to_string(),
40///     port: 3306,
41///     username: "replicator".to_string(),
42///     password: "secret".to_string(),
43///     server_id: 12345,
44///     ..Default::default()
45/// };
46///
47/// let mut source = MySqlCdcSource::new(config);
48/// source.open(&ConnectorConfig::default()).await?;
49///
50/// while let Some(batch) = source.poll_batch(1000).await? {
51///     println!("Received {} rows", batch.num_rows());
52/// }
53/// ```
54pub struct MySqlCdcSource {
55    /// Configuration for the MySQL CDC connection.
56    config: MySqlCdcConfig,
57
58    /// Whether the source is currently connected.
59    connected: bool,
60
61    /// Cache of table schemas from TABLE_MAP events.
62    table_cache: TableCache,
63
64    /// Current binlog position (file/position).
65    position: Option<BinlogPosition>,
66
67    /// Current GTID set (for GTID-based replication).
68    gtid_set: Option<GtidSet>,
69
70    /// Current binlog filename (updated by ROTATE events).
71    current_binlog_file: String,
72
73    /// Current GTID string (updated by GTID events within a transaction).
74    current_gtid: Option<String>,
75
76    /// Buffered change events waiting to be emitted.
77    event_buffer: Vec<ChangeEvent>,
78
79    /// Metrics for this source.
80    metrics: MySqlCdcMetrics,
81
82    /// Arrow schema for CDC envelope.
83    schema: Option<SchemaRef>,
84
85    /// Last time we received data (for health checks).
86    last_activity: Option<Instant>,
87
88    /// Notification handle signalled when binlog data arrives from the reader task.
89    data_ready: Arc<Notify>,
90
91    /// Channel receiver for decoded binlog messages from the background reader task.
92    #[cfg(feature = "mysql-cdc")]
93    msg_rx: Option<tokio::sync::mpsc::Receiver<BinlogMessage>>,
94
95    /// Background binlog reader task handle.
96    #[cfg(feature = "mysql-cdc")]
97    reader_handle: Option<tokio::task::JoinHandle<()>>,
98
99    /// Shutdown signal for the background reader task.
100    #[cfg(feature = "mysql-cdc")]
101    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
102}
103
104// Manual Debug impl because BinlogStream doesn't implement Debug.
105impl std::fmt::Debug for MySqlCdcSource {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("MySqlCdcSource")
108            .field("config", &self.config)
109            .field("connected", &self.connected)
110            .field("table_cache", &self.table_cache)
111            .field("position", &self.position)
112            .field("gtid_set", &self.gtid_set)
113            .field("current_binlog_file", &self.current_binlog_file)
114            .field("current_gtid", &self.current_gtid)
115            .field("event_buffer_len", &self.event_buffer.len())
116            .field("metrics", &self.metrics)
117            .field("schema", &self.schema)
118            .field("last_activity", &self.last_activity)
119            .finish_non_exhaustive()
120    }
121}
122
123impl MySqlCdcSource {
124    /// Creates a new MySQL CDC source with the given configuration.
125    #[must_use]
126    pub fn new(config: MySqlCdcConfig) -> Self {
127        Self {
128            config,
129            connected: false,
130            table_cache: TableCache::new(),
131            position: None,
132            gtid_set: None,
133            current_binlog_file: String::new(),
134            current_gtid: None,
135            event_buffer: Vec::new(),
136            metrics: MySqlCdcMetrics::new(),
137            schema: None,
138            last_activity: None,
139            data_ready: Arc::new(Notify::new()),
140            #[cfg(feature = "mysql-cdc")]
141            msg_rx: None,
142            #[cfg(feature = "mysql-cdc")]
143            reader_handle: None,
144            #[cfg(feature = "mysql-cdc")]
145            reader_shutdown: None,
146        }
147    }
148
149    /// Creates a MySQL CDC source from a generic connector config.
150    ///
151    /// # Errors
152    ///
153    /// Returns error if required configuration keys are missing.
154    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
155        let mysql_config = MySqlCdcConfig::from_config(config)?;
156        Ok(Self::new(mysql_config))
157    }
158
159    /// Returns the number of cached table schemas.
160    #[must_use]
161    pub fn cached_table_count(&self) -> usize {
162        self.table_cache.len()
163    }
164
165    /// Returns the current binlog position.
166    #[must_use]
167    pub fn position(&self) -> Option<&BinlogPosition> {
168        self.position.as_ref()
169    }
170
171    /// Returns the current GTID set.
172    #[must_use]
173    pub fn gtid_set(&self) -> Option<&GtidSet> {
174        self.gtid_set.as_ref()
175    }
176
177    /// Returns a reference to the table cache.
178    #[must_use]
179    pub fn table_cache(&self) -> &TableCache {
180        &self.table_cache
181    }
182
183    /// Returns a reference to the metrics.
184    #[must_use]
185    pub fn cdc_metrics(&self) -> &MySqlCdcMetrics {
186        &self.metrics
187    }
188
189    /// Checks if a table should be included based on filters.
190    #[must_use]
191    pub fn should_include_table(&self, database: &str, table: &str) -> bool {
192        self.config.should_include_table(database, table)
193    }
194
195    /// Returns the configuration.
196    #[must_use]
197    pub fn config(&self) -> &MySqlCdcConfig {
198        &self.config
199    }
200
201    /// Returns whether the source is connected.
202    #[must_use]
203    pub fn is_connected(&self) -> bool {
204        self.connected
205    }
206
207    /// Restores the position from a checkpoint.
208    ///
209    /// Parses the checkpoint offset to extract GTID set or file/position.
210    pub fn restore_position(&mut self, checkpoint: &SourceCheckpoint) {
211        // Try GTID from offset key first
212        if let Some(gtid_str) = checkpoint.get_offset("gtid") {
213            if let Ok(gtid_set) = gtid_str.parse::<GtidSet>() {
214                self.gtid_set = Some(gtid_set);
215                return;
216            }
217        }
218
219        // Try binlog file/position
220        if let (Some(filename), Some(pos_str)) = (
221            checkpoint.get_offset("binlog_file"),
222            checkpoint.get_offset("binlog_position"),
223        ) {
224            if let Ok(pos) = pos_str.parse::<u64>() {
225                self.position = Some(BinlogPosition::new(filename.to_string(), pos));
226            }
227        }
228    }
229
230    /// Creates a checkpoint representing the current position.
231    #[must_use]
232    pub fn create_checkpoint(&self) -> SourceCheckpoint {
233        let mut checkpoint = SourceCheckpoint::new(0);
234
235        if self.config.use_gtid {
236            if let Some(ref gtid_set) = self.gtid_set {
237                checkpoint.set_offset("gtid", gtid_set.to_string());
238            }
239        } else if let Some(ref pos) = self.position {
240            checkpoint.set_offset("binlog_file", &pos.filename);
241            checkpoint.set_offset("binlog_position", pos.position.to_string());
242        }
243
244        checkpoint.set_metadata("server_id", self.config.server_id.to_string());
245
246        checkpoint
247    }
248
249    /// Builds the CDC envelope schema based on a table schema.
250    #[allow(clippy::unused_self)] // Will use self for config options
251    fn build_envelope_schema(&self, table_schema: &Schema) -> SchemaRef {
252        Arc::new(cdc_envelope_schema(table_schema))
253    }
254
255    /// Flushes buffered events to a RecordBatch.
256    ///
257    /// # Errors
258    ///
259    /// Returns error if batch conversion fails.
260    pub fn flush_events(
261        &mut self,
262        table_info: &TableInfo,
263    ) -> Result<Option<RecordBatch>, ConnectorError> {
264        if self.event_buffer.is_empty() {
265            return Ok(None);
266        }
267
268        let events: Vec<_> = self.event_buffer.drain(..).collect();
269        let batch = super::changelog::events_to_record_batch(&events, table_info)
270            .map_err(|e| ConnectorError::Internal(e.to_string()))?;
271        Ok(Some(batch))
272    }
273}
274
275#[async_trait]
276#[allow(clippy::too_many_lines)]
277impl SourceConnector for MySqlCdcSource {
278    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
279        // Parse and update config if provided
280        if !config.properties().is_empty() {
281            self.config = MySqlCdcConfig::from_config(config)?;
282        }
283
284        // Validate configuration
285        self.config.validate()?;
286
287        // Initialize GTID set from config
288        self.gtid_set.clone_from(&self.config.gtid_set);
289
290        // Initialize binlog position from config
291        if let Some(ref filename) = self.config.binlog_filename {
292            self.current_binlog_file.clone_from(filename);
293            if let Some(pos) = self.config.binlog_position {
294                self.position = Some(BinlogPosition::new(filename.clone(), pos));
295            }
296        }
297
298        // Without mysql-cdc feature, open() must fail loudly to prevent
299        // silent data loss (poll_batch would return Ok(None) forever).
300        #[cfg(not(feature = "mysql-cdc"))]
301        {
302            return Err(ConnectorError::ConfigurationError(
303                "MySQL CDC source requires the `mysql-cdc` feature flag. \
304                 Rebuild with `--features mysql-cdc` to enable."
305                    .to_string(),
306            ));
307        }
308
309        // When mysql-cdc feature is enabled, establish a real connection
310        // and spawn a background reader task for event-driven wake-up.
311        #[cfg(feature = "mysql-cdc")]
312        {
313            let conn = super::mysql_io::connect(&self.config).await?;
314            let stream = super::mysql_io::start_binlog_stream(
315                conn,
316                &self.config,
317                self.gtid_set.as_ref(),
318                self.position.as_ref(),
319            )
320            .await?;
321
322            let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(4096);
323            let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
324            let data_ready = Arc::clone(&self.data_ready);
325
326            let reader_handle = tokio::spawn(async move {
327                use tokio_stream::StreamExt as _;
328                let mut stream = stream;
329                loop {
330                    let event = tokio::select! {
331                        biased;
332                        _ = shutdown_rx.changed() => break,
333                        event = stream.next() => event,
334                    };
335                    match event {
336                        Some(Ok(raw_event)) => {
337                            match super::mysql_io::decode_binlog_event(&raw_event, &stream) {
338                                Ok(Some(msg)) => {
339                                    if msg_tx.send(msg).await.is_err() {
340                                        break;
341                                    }
342                                    data_ready.notify_one();
343                                }
344                                Ok(None) => {}
345                                Err(e) => {
346                                    tracing::warn!(error = %e, "binlog decode error");
347                                    break;
348                                }
349                            }
350                        }
351                        Some(Err(e)) => {
352                            tracing::warn!(error = %e, "binlog stream error");
353                            break;
354                        }
355                        None => break,
356                    }
357                }
358                if let Err(e) = stream.close().await {
359                    tracing::warn!(error = %e, "error closing binlog stream");
360                }
361            });
362
363            self.msg_rx = Some(msg_rx);
364            self.reader_handle = Some(reader_handle);
365            self.reader_shutdown = Some(shutdown_tx);
366        }
367
368        self.connected = true;
369        self.last_activity = Some(Instant::now());
370
371        Ok(())
372    }
373
374    async fn poll_batch(
375        &mut self,
376        max_records: usize,
377    ) -> Result<Option<SourceBatch>, ConnectorError> {
378        if !self.connected {
379            return Err(ConnectorError::ConfigurationError(
380                "Source not connected".to_string(),
381            ));
382        }
383
384        // Drain decoded binlog messages from background reader task.
385        #[cfg(feature = "mysql-cdc")]
386        {
387            if let Some(rx) = self.msg_rx.as_mut() {
388                let mut last_table_info: Option<TableInfo> = None;
389
390                while self.event_buffer.len() < max_records {
391                    match rx.try_recv() {
392                        Ok(msg) => {
393                            self.metrics.inc_events_received();
394                            match msg {
395                                BinlogMessage::TableMap(tme) => {
396                                    self.metrics.inc_table_maps();
397                                    self.table_cache.update(&tme);
398                                }
399                                BinlogMessage::Insert(insert_msg) => {
400                                    if !self.config.should_include_table(
401                                        &insert_msg.database,
402                                        &insert_msg.table,
403                                    ) {
404                                        continue;
405                                    }
406                                    let row_count = insert_msg.rows.len() as u64;
407                                    let events = super::changelog::insert_to_events(
408                                        &insert_msg,
409                                        &self.current_binlog_file,
410                                        self.current_gtid.as_deref(),
411                                    );
412                                    self.event_buffer.extend(events);
413                                    self.metrics.inc_inserts(row_count);
414                                    last_table_info =
415                                        self.table_cache.get(insert_msg.table_id).cloned();
416                                }
417                                BinlogMessage::Update(update_msg) => {
418                                    if !self.config.should_include_table(
419                                        &update_msg.database,
420                                        &update_msg.table,
421                                    ) {
422                                        continue;
423                                    }
424                                    let row_count = update_msg.rows.len() as u64;
425                                    let events = super::changelog::update_to_events(
426                                        &update_msg,
427                                        &self.current_binlog_file,
428                                        self.current_gtid.as_deref(),
429                                    );
430                                    self.event_buffer.extend(events);
431                                    self.metrics.inc_updates(row_count);
432                                    last_table_info =
433                                        self.table_cache.get(update_msg.table_id).cloned();
434                                }
435                                BinlogMessage::Delete(delete_msg) => {
436                                    if !self.config.should_include_table(
437                                        &delete_msg.database,
438                                        &delete_msg.table,
439                                    ) {
440                                        continue;
441                                    }
442                                    let row_count = delete_msg.rows.len() as u64;
443                                    let events = super::changelog::delete_to_events(
444                                        &delete_msg,
445                                        &self.current_binlog_file,
446                                        self.current_gtid.as_deref(),
447                                    );
448                                    self.event_buffer.extend(events);
449                                    self.metrics.inc_deletes(row_count);
450                                    last_table_info =
451                                        self.table_cache.get(delete_msg.table_id).cloned();
452                                }
453                                BinlogMessage::Begin(begin_msg) => {
454                                    if let Some(ref gtid) = begin_msg.gtid {
455                                        self.current_gtid = Some(gtid.to_string());
456                                        if let Some(ref mut gtid_set) = self.gtid_set {
457                                            gtid_set.add(gtid);
458                                        }
459                                    } else {
460                                        self.current_gtid = None;
461                                    }
462                                }
463                                BinlogMessage::Commit(commit_msg) => {
464                                    self.metrics.inc_transactions();
465                                    self.metrics.set_binlog_position(commit_msg.binlog_position);
466                                    if let Some(ref mut pos) = self.position {
467                                        pos.position = commit_msg.binlog_position;
468                                    }
469                                }
470                                BinlogMessage::Rotate(rotate_msg) => {
471                                    self.current_binlog_file.clone_from(&rotate_msg.next_binlog);
472                                    if let Some(ref mut pos) = self.position {
473                                        pos.filename = rotate_msg.next_binlog;
474                                        pos.position = rotate_msg.position;
475                                    } else {
476                                        self.position = Some(BinlogPosition::new(
477                                            self.current_binlog_file.clone(),
478                                            rotate_msg.position,
479                                        ));
480                                    }
481                                }
482                                BinlogMessage::Query(query_msg) => {
483                                    self.metrics.inc_ddl_events();
484                                    let _ = query_msg;
485                                }
486                                BinlogMessage::Heartbeat => {
487                                    self.metrics.inc_heartbeats();
488                                }
489                            }
490                        }
491                        Err(_) => break,
492                    }
493                }
494
495                self.last_activity = Some(Instant::now());
496
497                if let Some(table_info) = last_table_info {
498                    if let Some(batch) = self.flush_events(&table_info)? {
499                        let schema = self.build_envelope_schema(&table_info.arrow_schema);
500                        self.schema = Some(schema);
501                        return Ok(Some(SourceBatch::new(batch)));
502                    }
503                }
504
505                return Ok(None);
506            }
507
508            self.last_activity = Some(Instant::now());
509            return Ok(None);
510        }
511
512        // Without mysql-cdc feature: stub returns None.
513        #[cfg(not(feature = "mysql-cdc"))]
514        {
515            let _ = max_records;
516            self.last_activity = Some(Instant::now());
517            Ok(None)
518        }
519    }
520
521    fn schema(&self) -> SchemaRef {
522        // Return cached schema or a default CDC envelope schema
523        self.schema.clone().unwrap_or_else(|| {
524            // Default CDC envelope with no table-specific columns
525            Arc::new(cdc_envelope_schema(&Schema::empty()))
526        })
527    }
528
529    fn checkpoint(&self) -> SourceCheckpoint {
530        self.create_checkpoint()
531    }
532
533    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
534        self.restore_position(checkpoint);
535        Ok(())
536    }
537
538    fn health_check(&self) -> HealthStatus {
539        if !self.connected {
540            return HealthStatus::Unhealthy("Not connected".to_string());
541        }
542
543        // Check for recent activity
544        if let Some(last) = self.last_activity {
545            let idle_duration = self.config.heartbeat_interval * 3;
546            if last.elapsed() > idle_duration {
547                return HealthStatus::Degraded(format!(
548                    "No activity for {}s",
549                    last.elapsed().as_secs()
550                ));
551            }
552        }
553
554        // Check error count
555        let errors = self
556            .metrics
557            .errors
558            .load(std::sync::atomic::Ordering::Relaxed);
559        if errors > 100 {
560            return HealthStatus::Degraded(format!("{errors} errors encountered"));
561        }
562
563        HealthStatus::Healthy
564    }
565
566    fn metrics(&self) -> ConnectorMetrics {
567        self.metrics.to_connector_metrics()
568    }
569
570    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
571        Some(Arc::clone(&self.data_ready))
572    }
573
574    async fn close(&mut self) -> Result<(), ConnectorError> {
575        // Signal reader task to shut down (it closes the binlog stream internally).
576        #[cfg(feature = "mysql-cdc")]
577        {
578            if let Some(tx) = self.reader_shutdown.take() {
579                let _ = tx.send(true);
580            }
581            if let Some(handle) = self.reader_handle.take() {
582                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
583            }
584            self.msg_rx = None;
585        }
586
587        self.connected = false;
588        self.table_cache.clear();
589        self.event_buffer.clear();
590        Ok(())
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use std::time::Duration;
598
599    fn test_config() -> MySqlCdcConfig {
600        MySqlCdcConfig {
601            host: "localhost".to_string(),
602            port: 3306,
603            database: Some("testdb".to_string()),
604            username: "root".to_string(),
605            password: Some("test".to_string()),
606            server_id: 12345,
607            ..Default::default()
608        }
609    }
610
611    #[test]
612    fn test_new_source() {
613        let config = test_config();
614        let source = MySqlCdcSource::new(config);
615
616        assert!(!source.is_connected());
617        assert_eq!(source.cached_table_count(), 0);
618        assert!(source.position().is_none());
619        assert!(source.gtid_set().is_none());
620    }
621
622    #[test]
623    fn test_from_config() {
624        let mut config = ConnectorConfig::new("mysql-cdc");
625        config.set("host", "mysql.example.com");
626        config.set("port", "3307");
627        config.set("username", "repl");
628        config.set("password", "secret");
629        config.set("server.id", "999");
630
631        let source = MySqlCdcSource::from_config(&config).unwrap();
632        assert_eq!(source.config().host, "mysql.example.com");
633        assert_eq!(source.config().port, 3307);
634        assert_eq!(source.config().server_id, 999);
635    }
636
637    #[test]
638    fn test_from_config_missing_required() {
639        let config = ConnectorConfig::new("mysql-cdc");
640
641        let result = MySqlCdcSource::from_config(&config);
642        assert!(result.is_err());
643    }
644
645    #[test]
646    fn test_restore_position_gtid() {
647        let mut source = MySqlCdcSource::new(test_config());
648
649        let mut checkpoint = SourceCheckpoint::new(1);
650        checkpoint.set_offset("gtid", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
651
652        source.restore_position(&checkpoint);
653        assert!(source.gtid_set().is_some());
654    }
655
656    #[test]
657    fn test_restore_position_file() {
658        let mut source = MySqlCdcSource::new(test_config());
659
660        let mut checkpoint = SourceCheckpoint::new(1);
661        checkpoint.set_offset("binlog_file", "mysql-bin.000003");
662        checkpoint.set_offset("binlog_position", "12345");
663
664        source.restore_position(&checkpoint);
665        let pos = source.position().unwrap();
666        assert_eq!(pos.filename, "mysql-bin.000003");
667        assert_eq!(pos.position, 12345);
668    }
669
670    #[test]
671    fn test_create_checkpoint_gtid() {
672        let mut source = MySqlCdcSource::new(test_config());
673        source.config.use_gtid = true;
674        source.gtid_set = Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap());
675
676        let checkpoint = source.create_checkpoint();
677        assert!(checkpoint.get_offset("gtid").is_some());
678        // UUID is stored and displayed as lowercase
679        assert!(checkpoint.get_offset("gtid").unwrap().contains("3e11fa47"));
680    }
681
682    #[test]
683    fn test_create_checkpoint_file() {
684        let mut source = MySqlCdcSource::new(test_config());
685        source.config.use_gtid = false;
686        source.position = Some(BinlogPosition::new("mysql-bin.000003".to_string(), 9999));
687
688        let checkpoint = source.create_checkpoint();
689        assert_eq!(
690            checkpoint.get_offset("binlog_file"),
691            Some("mysql-bin.000003")
692        );
693        assert_eq!(checkpoint.get_offset("binlog_position"), Some("9999"));
694    }
695
696    #[test]
697    fn test_schema() {
698        let source = MySqlCdcSource::new(test_config());
699        let schema = source.schema();
700
701        // Should have CDC envelope fields
702        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name()).collect();
703        assert!(field_names.contains(&&"_table".to_string()));
704        assert!(field_names.contains(&&"_op".to_string()));
705        assert!(field_names.contains(&&"_ts_ms".to_string()));
706    }
707
708    #[test]
709    fn test_health_check_not_connected() {
710        let source = MySqlCdcSource::new(test_config());
711
712        match source.health_check() {
713            HealthStatus::Unhealthy(message) => {
714                assert!(message.contains("Not connected"));
715            }
716            _ => panic!("Expected unhealthy status"),
717        }
718    }
719
720    #[test]
721    fn test_health_check_healthy() {
722        let mut source = MySqlCdcSource::new(test_config());
723        source.connected = true;
724        source.last_activity = Some(Instant::now());
725
726        assert!(matches!(source.health_check(), HealthStatus::Healthy));
727    }
728
729    #[test]
730    fn test_health_check_degraded_no_activity() {
731        let mut source = MySqlCdcSource::new(test_config());
732        source.connected = true;
733        source.config.heartbeat_interval = Duration::from_millis(1);
734        source.last_activity = Instant::now().checked_sub(Duration::from_secs(10));
735
736        match source.health_check() {
737            HealthStatus::Degraded(message) => {
738                assert!(message.contains("No activity"));
739            }
740            _ => panic!("Expected degraded status"),
741        }
742    }
743
744    #[test]
745    fn test_health_check_degraded_errors() {
746        let mut source = MySqlCdcSource::new(test_config());
747        source.connected = true;
748        source.last_activity = Some(Instant::now());
749
750        // Simulate many errors
751        for _ in 0..150 {
752            source.metrics.inc_errors();
753        }
754
755        match source.health_check() {
756            HealthStatus::Degraded(message) => {
757                assert!(message.contains("errors"));
758            }
759            _ => panic!("Expected degraded status"),
760        }
761    }
762
763    #[test]
764    fn test_metrics() {
765        let mut source = MySqlCdcSource::new(test_config());
766        source.metrics.inc_inserts(100);
767        source.metrics.inc_updates(50);
768        source.metrics.inc_deletes(25);
769        source.metrics.add_bytes_received(10000);
770        source.metrics.inc_errors();
771
772        let metrics = source.metrics();
773        assert_eq!(metrics.records_total, 175);
774        assert_eq!(metrics.bytes_total, 10000);
775        assert_eq!(metrics.errors_total, 1);
776    }
777
778    #[test]
779    fn test_table_filtering() {
780        let mut config = test_config();
781        config.table_include = vec!["users".to_string(), "orders".to_string()];
782
783        let source = MySqlCdcSource::new(config);
784
785        assert!(source.should_include_table("testdb", "users"));
786        assert!(source.should_include_table("testdb", "orders"));
787        assert!(!source.should_include_table("testdb", "other"));
788    }
789
790    // Without mysql-cdc feature, open() must return an error to prevent silent data loss.
791    #[cfg(not(feature = "mysql-cdc"))]
792    #[tokio::test]
793    async fn test_open_fails_without_feature() {
794        let mut source = MySqlCdcSource::new(test_config());
795
796        let result = source.open(&ConnectorConfig::default()).await;
797        assert!(result.is_err());
798        let err = result.unwrap_err().to_string();
799        assert!(
800            err.contains("mysql-cdc"),
801            "error should mention feature flag: {err}"
802        );
803    }
804
805    #[tokio::test]
806    async fn test_poll_not_connected() {
807        let mut source = MySqlCdcSource::new(test_config());
808
809        let result = source.poll_batch(100).await;
810        assert!(result.is_err());
811    }
812
813    // Without mysql-cdc feature, open() returns an error, so poll is unreachable.
814    // With mysql-cdc, open() needs a real MySQL server. Covered by integration tests.
815
816    #[tokio::test]
817    async fn test_restore_async() {
818        let mut source = MySqlCdcSource::new(test_config());
819
820        let mut checkpoint = SourceCheckpoint::new(1);
821        checkpoint.set_offset("binlog_file", "mysql-bin.000005");
822        checkpoint.set_offset("binlog_position", "54321");
823
824        source.restore(&checkpoint).await.unwrap();
825
826        let pos = source.position().unwrap();
827        assert_eq!(pos.filename, "mysql-bin.000005");
828        assert_eq!(pos.position, 54321);
829    }
830}