Skip to main content

laminar_connectors/cdc/mysql/
source.rs

1//! MySQL CDC source connector implementation.
2//!
3//! Implements the [`SourceConnector`] trait for MySQL binlog replication.
4//! This module provides the main entry point for MySQL CDC: [`MySqlCdcSource`].
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use arrow_schema::{Schema, SchemaRef};
11use async_trait::async_trait;
12use tokio::sync::Notify;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::metrics::ConnectorMetrics;
20
21use super::changelog::ChangeEvent;
22use super::config::MySqlCdcConfig;
23use super::decoder::{BinlogMessage, BinlogPosition};
24use super::gtid::GtidSet;
25use super::metrics::MySqlCdcMetrics;
26use super::schema::{cdc_envelope_schema, TableCache, TableInfo};
27
28/// Single-consumer async receiver for the binlog reader → `poll_batch` queue.
29#[cfg(feature = "mysql-cdc")]
30type BinlogMessageRx = crossfire::AsyncRx<crossfire::mpsc::Array<BinlogMessage>>;
31
32/// MySQL binlog CDC source connector. Reads change events from the MySQL
33/// binary log via replication protocol; supports GTID-based and
34/// file/position-based replication.
35pub struct MySqlCdcSource {
36    /// Configuration for the MySQL CDC connection.
37    config: MySqlCdcConfig,
38
39    /// Whether the source is currently connected.
40    connected: bool,
41
42    /// Cache of table schemas from TABLE_MAP events.
43    table_cache: TableCache,
44
45    /// Current binlog position (file/position).
46    position: Option<BinlogPosition>,
47
48    /// Current GTID set (for GTID-based replication).
49    gtid_set: Option<GtidSet>,
50
51    /// Current binlog filename (updated by ROTATE events).
52    current_binlog_file: String,
53
54    /// Current GTID string (updated by GTID events within a transaction).
55    current_gtid: Option<String>,
56
57    /// Buffered change events waiting to be emitted.
58    event_buffer: Vec<ChangeEvent>,
59
60    /// Metrics for this source.
61    metrics: MySqlCdcMetrics,
62
63    /// Arrow schema for CDC envelope.
64    schema: Option<SchemaRef>,
65
66    /// Last time we received data (for health checks).
67    last_activity: Option<Instant>,
68
69    /// Notification handle signalled when binlog data arrives from the reader task.
70    data_ready: Arc<Notify>,
71
72    /// Channel receiver for decoded binlog messages from the background reader task.
73    #[cfg(feature = "mysql-cdc")]
74    msg_rx: Option<BinlogMessageRx>,
75
76    /// Background binlog reader task handle.
77    #[cfg(feature = "mysql-cdc")]
78    reader_handle: Option<tokio::task::JoinHandle<()>>,
79
80    /// Shutdown signal for the background reader task.
81    #[cfg(feature = "mysql-cdc")]
82    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
83}
84
85// Manual Debug impl because BinlogStream doesn't implement Debug.
86impl std::fmt::Debug for MySqlCdcSource {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("MySqlCdcSource")
89            .field("config", &self.config)
90            .field("connected", &self.connected)
91            .field("table_cache", &self.table_cache)
92            .field("position", &self.position)
93            .field("gtid_set", &self.gtid_set)
94            .field("current_binlog_file", &self.current_binlog_file)
95            .field("current_gtid", &self.current_gtid)
96            .field("event_buffer_len", &self.event_buffer.len())
97            .field("metrics", &self.metrics)
98            .field("schema", &self.schema)
99            .field("last_activity", &self.last_activity)
100            .finish_non_exhaustive()
101    }
102}
103
104impl MySqlCdcSource {
105    /// Creates a new MySQL CDC source with the given configuration.
106    #[must_use]
107    pub fn new(config: MySqlCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
108        Self {
109            config,
110            connected: false,
111            table_cache: TableCache::new(),
112            position: None,
113            gtid_set: None,
114            current_binlog_file: String::new(),
115            current_gtid: None,
116            event_buffer: Vec::new(),
117            metrics: MySqlCdcMetrics::new(registry),
118            schema: None,
119            last_activity: None,
120            data_ready: Arc::new(Notify::new()),
121            #[cfg(feature = "mysql-cdc")]
122            msg_rx: None,
123            #[cfg(feature = "mysql-cdc")]
124            reader_handle: None,
125            #[cfg(feature = "mysql-cdc")]
126            reader_shutdown: None,
127        }
128    }
129
130    /// Creates a MySQL CDC source from a generic connector config.
131    ///
132    /// # Errors
133    ///
134    /// Returns error if required configuration keys are missing.
135    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
136        let mysql_config = MySqlCdcConfig::from_config(config)?;
137        Ok(Self::new(mysql_config, None))
138    }
139
140    /// Returns the number of cached table schemas.
141    #[must_use]
142    pub fn cached_table_count(&self) -> usize {
143        self.table_cache.len()
144    }
145
146    /// Returns the current binlog position.
147    #[must_use]
148    pub fn position(&self) -> Option<&BinlogPosition> {
149        self.position.as_ref()
150    }
151
152    /// Returns the current GTID set.
153    #[must_use]
154    pub fn gtid_set(&self) -> Option<&GtidSet> {
155        self.gtid_set.as_ref()
156    }
157
158    /// Returns a reference to the table cache.
159    #[must_use]
160    pub fn table_cache(&self) -> &TableCache {
161        &self.table_cache
162    }
163
164    /// Returns a reference to the metrics.
165    #[must_use]
166    pub fn cdc_metrics(&self) -> &MySqlCdcMetrics {
167        &self.metrics
168    }
169
170    /// Checks if a table should be included based on filters.
171    #[must_use]
172    pub fn should_include_table(&self, database: &str, table: &str) -> bool {
173        self.config.should_include_table(database, table)
174    }
175
176    /// Returns the configuration.
177    #[must_use]
178    pub fn config(&self) -> &MySqlCdcConfig {
179        &self.config
180    }
181
182    /// Returns whether the source is connected.
183    #[must_use]
184    pub fn is_connected(&self) -> bool {
185        self.connected
186    }
187
188    /// Restores the position from a checkpoint.
189    ///
190    /// Parses the checkpoint offset to extract GTID set or file/position.
191    pub fn restore_position(&mut self, checkpoint: &SourceCheckpoint) {
192        // Try GTID from offset key first
193        if let Some(gtid_str) = checkpoint.get_offset("gtid") {
194            if let Ok(gtid_set) = gtid_str.parse::<GtidSet>() {
195                self.gtid_set = Some(gtid_set);
196                return;
197            }
198        }
199
200        // Try binlog file/position
201        if let (Some(filename), Some(pos_str)) = (
202            checkpoint.get_offset("binlog_file"),
203            checkpoint.get_offset("binlog_position"),
204        ) {
205            if let Ok(pos) = pos_str.parse::<u64>() {
206                self.position = Some(BinlogPosition::new(filename.to_string(), pos));
207            }
208        }
209    }
210
211    /// Creates a checkpoint representing the current position.
212    #[must_use]
213    pub fn create_checkpoint(&self) -> SourceCheckpoint {
214        let mut checkpoint = SourceCheckpoint::new(0);
215
216        if self.config.use_gtid {
217            if let Some(ref gtid_set) = self.gtid_set {
218                checkpoint.set_offset("gtid", gtid_set.to_string());
219            }
220        } else if let Some(ref pos) = self.position {
221            checkpoint.set_offset("binlog_file", &pos.filename);
222            checkpoint.set_offset("binlog_position", pos.position.to_string());
223        }
224
225        checkpoint.set_metadata("server_id", self.config.server_id.to_string());
226
227        checkpoint
228    }
229
230    /// Builds the CDC envelope schema based on a table schema.
231    #[allow(clippy::unused_self)] // Will use self for config options
232    fn build_envelope_schema(&self, table_schema: &Schema) -> SchemaRef {
233        Arc::new(cdc_envelope_schema(table_schema))
234    }
235
236    /// Flushes buffered events to a RecordBatch.
237    ///
238    /// # Errors
239    ///
240    /// Returns error if batch conversion fails.
241    pub fn flush_events(
242        &mut self,
243        table_info: &TableInfo,
244    ) -> Result<Option<RecordBatch>, ConnectorError> {
245        if self.event_buffer.is_empty() {
246            return Ok(None);
247        }
248
249        let events: Vec<_> = self.event_buffer.drain(..).collect();
250        let batch = super::changelog::events_to_record_batch(&events, table_info)
251            .map_err(|e| ConnectorError::Internal(e.to_string()))?;
252        Ok(Some(batch))
253    }
254}
255
256#[async_trait]
257#[allow(clippy::too_many_lines)]
258impl SourceConnector for MySqlCdcSource {
259    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
260        // Parse and update config if provided
261        if !config.properties().is_empty() {
262            self.config = MySqlCdcConfig::from_config(config)?;
263        }
264
265        // Validate configuration
266        self.config.validate()?;
267
268        // Initialize GTID set from config
269        self.gtid_set.clone_from(&self.config.gtid_set);
270
271        // Initialize binlog position from config
272        if let Some(ref filename) = self.config.binlog_filename {
273            self.current_binlog_file.clone_from(filename);
274            if let Some(pos) = self.config.binlog_position {
275                self.position = Some(BinlogPosition::new(filename.clone(), pos));
276            }
277        }
278
279        // Without mysql-cdc feature, open() must fail loudly to prevent
280        // silent data loss (poll_batch would return Ok(None) forever).
281        #[cfg(not(feature = "mysql-cdc"))]
282        {
283            return Err(ConnectorError::ConfigurationError(
284                "MySQL CDC source requires the `mysql-cdc` feature flag. \
285                 Rebuild with `--features mysql-cdc` to enable."
286                    .to_string(),
287            ));
288        }
289
290        // When mysql-cdc feature is enabled, establish a real connection
291        // and spawn a background reader task for event-driven wake-up.
292        #[cfg(feature = "mysql-cdc")]
293        {
294            let conn = super::mysql_io::connect(&self.config).await?;
295            let stream = super::mysql_io::start_binlog_stream(
296                conn,
297                &self.config,
298                self.gtid_set.as_ref(),
299                self.position.as_ref(),
300            )
301            .await?;
302
303            let (msg_tx, msg_rx) = crossfire::mpsc::bounded_async::<BinlogMessage>(4096);
304            let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
305            let data_ready = Arc::clone(&self.data_ready);
306
307            let reader_handle = tokio::spawn(async move {
308                use tokio_stream::StreamExt as _;
309                let mut stream = stream;
310                loop {
311                    let event = tokio::select! {
312                        biased;
313                        _ = shutdown_rx.changed() => break,
314                        event = stream.next() => event,
315                    };
316                    match event {
317                        Some(Ok(raw_event)) => {
318                            match super::mysql_io::decode_binlog_event(&raw_event, &stream) {
319                                Ok(Some(msg)) => {
320                                    if msg_tx.send(msg).await.is_err() {
321                                        break;
322                                    }
323                                    data_ready.notify_one();
324                                }
325                                Ok(None) => {}
326                                Err(e) => {
327                                    tracing::warn!(error = %e, "binlog decode error");
328                                    break;
329                                }
330                            }
331                        }
332                        Some(Err(e)) => {
333                            tracing::warn!(error = %e, "binlog stream error");
334                            break;
335                        }
336                        None => break,
337                    }
338                }
339                if let Err(e) = stream.close().await {
340                    tracing::warn!(error = %e, "error closing binlog stream");
341                }
342            });
343
344            self.msg_rx = Some(msg_rx);
345            self.reader_handle = Some(reader_handle);
346            self.reader_shutdown = Some(shutdown_tx);
347        }
348
349        self.connected = true;
350        self.last_activity = Some(Instant::now());
351
352        Ok(())
353    }
354
355    async fn poll_batch(
356        &mut self,
357        max_records: usize,
358    ) -> Result<Option<SourceBatch>, ConnectorError> {
359        if !self.connected {
360            return Err(ConnectorError::ConfigurationError(
361                "Source not connected".to_string(),
362            ));
363        }
364
365        // Drain decoded binlog messages from background reader task.
366        //
367        // Backpressure: when the event buffer exceeds the high watermark,
368        // stop draining the reader channel. The bounded mpsc channel (4096)
369        // propagates backpressure to the binlog reader task, which in turn
370        // applies TCP backpressure to the replication connection.
371        #[cfg(feature = "mysql-cdc")]
372        {
373            let high_watermark = self.config.backpressure_high_watermark();
374
375            if self.event_buffer.len() >= high_watermark {
376                tracing::debug!(
377                    buffered = self.event_buffer.len(),
378                    high_watermark,
379                    "CDC backpressure active — pausing binlog reader drain"
380                );
381            } else if let Some(rx) = self.msg_rx.as_mut() {
382                let mut last_table_info: Option<TableInfo> = None;
383
384                while self.event_buffer.len() < max_records
385                    && self.event_buffer.len() < high_watermark
386                {
387                    match rx.try_recv() {
388                        Ok(msg) => {
389                            self.metrics.inc_events_received();
390                            match msg {
391                                BinlogMessage::TableMap(tme) => {
392                                    self.metrics.inc_table_maps();
393                                    self.table_cache.update(&tme);
394                                }
395                                BinlogMessage::Insert(insert_msg) => {
396                                    if !self.config.should_include_table(
397                                        &insert_msg.database,
398                                        &insert_msg.table,
399                                    ) {
400                                        continue;
401                                    }
402                                    let row_count = insert_msg.rows.len() as u64;
403                                    let events = super::changelog::insert_to_events(
404                                        &insert_msg,
405                                        &self.current_binlog_file,
406                                        self.current_gtid.as_deref(),
407                                    );
408                                    self.event_buffer.extend(events);
409                                    self.metrics.inc_inserts(row_count);
410                                    last_table_info =
411                                        self.table_cache.get(insert_msg.table_id).cloned();
412                                }
413                                BinlogMessage::Update(update_msg) => {
414                                    if !self.config.should_include_table(
415                                        &update_msg.database,
416                                        &update_msg.table,
417                                    ) {
418                                        continue;
419                                    }
420                                    let row_count = update_msg.rows.len() as u64;
421                                    let events = super::changelog::update_to_events(
422                                        &update_msg,
423                                        &self.current_binlog_file,
424                                        self.current_gtid.as_deref(),
425                                    );
426                                    self.event_buffer.extend(events);
427                                    self.metrics.inc_updates(row_count);
428                                    last_table_info =
429                                        self.table_cache.get(update_msg.table_id).cloned();
430                                }
431                                BinlogMessage::Delete(delete_msg) => {
432                                    if !self.config.should_include_table(
433                                        &delete_msg.database,
434                                        &delete_msg.table,
435                                    ) {
436                                        continue;
437                                    }
438                                    let row_count = delete_msg.rows.len() as u64;
439                                    let events = super::changelog::delete_to_events(
440                                        &delete_msg,
441                                        &self.current_binlog_file,
442                                        self.current_gtid.as_deref(),
443                                    );
444                                    self.event_buffer.extend(events);
445                                    self.metrics.inc_deletes(row_count);
446                                    last_table_info =
447                                        self.table_cache.get(delete_msg.table_id).cloned();
448                                }
449                                BinlogMessage::Begin(begin_msg) => {
450                                    if let Some(ref gtid) = begin_msg.gtid {
451                                        self.current_gtid = Some(gtid.to_string());
452                                        if let Some(ref mut gtid_set) = self.gtid_set {
453                                            gtid_set.add(gtid);
454                                        }
455                                    } else {
456                                        self.current_gtid = None;
457                                    }
458                                }
459                                BinlogMessage::Commit(commit_msg) => {
460                                    self.metrics.inc_transactions();
461                                    self.metrics.set_binlog_position(commit_msg.binlog_position);
462                                    if let Some(ref mut pos) = self.position {
463                                        pos.position = commit_msg.binlog_position;
464                                    }
465                                }
466                                BinlogMessage::Rotate(rotate_msg) => {
467                                    self.current_binlog_file.clone_from(&rotate_msg.next_binlog);
468                                    if let Some(ref mut pos) = self.position {
469                                        pos.filename = rotate_msg.next_binlog;
470                                        pos.position = rotate_msg.position;
471                                    } else {
472                                        self.position = Some(BinlogPosition::new(
473                                            self.current_binlog_file.clone(),
474                                            rotate_msg.position,
475                                        ));
476                                    }
477                                }
478                                BinlogMessage::Query(query_msg) => {
479                                    self.metrics.inc_ddl_events();
480                                    let _ = query_msg;
481                                }
482                                BinlogMessage::Heartbeat => {
483                                    self.metrics.inc_heartbeats();
484                                }
485                            }
486                        }
487                        Err(_) => break,
488                    }
489                }
490
491                self.last_activity = Some(Instant::now());
492
493                if let Some(table_info) = last_table_info {
494                    if let Some(batch) = self.flush_events(&table_info)? {
495                        let schema = self.build_envelope_schema(&table_info.arrow_schema);
496                        self.schema = Some(schema);
497                        return Ok(Some(SourceBatch::new(batch)));
498                    }
499                }
500
501                return Ok(None);
502            }
503
504            self.last_activity = Some(Instant::now());
505            return Ok(None);
506        }
507
508        // Without mysql-cdc feature: stub returns None.
509        #[cfg(not(feature = "mysql-cdc"))]
510        {
511            let _ = max_records;
512            self.last_activity = Some(Instant::now());
513            Ok(None)
514        }
515    }
516
517    fn schema(&self) -> SchemaRef {
518        // Return cached schema or a default CDC envelope schema
519        self.schema.clone().unwrap_or_else(|| {
520            // Default CDC envelope with no table-specific columns
521            Arc::new(cdc_envelope_schema(&Schema::empty()))
522        })
523    }
524
525    fn checkpoint(&self) -> SourceCheckpoint {
526        self.create_checkpoint()
527    }
528
529    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
530        self.restore_position(checkpoint);
531        Ok(())
532    }
533
534    fn health_check(&self) -> HealthStatus {
535        if !self.connected {
536            return HealthStatus::Unhealthy("Not connected".to_string());
537        }
538
539        // Check for recent activity
540        if let Some(last) = self.last_activity {
541            let idle_duration = self.config.heartbeat_interval * 3;
542            if last.elapsed() > idle_duration {
543                return HealthStatus::Degraded(format!(
544                    "No activity for {}s",
545                    last.elapsed().as_secs()
546                ));
547            }
548        }
549
550        // Check error count
551        let errors = self.metrics.errors.get();
552        if errors > 100 {
553            return HealthStatus::Degraded(format!("{errors} errors encountered"));
554        }
555
556        HealthStatus::Healthy
557    }
558
559    fn metrics(&self) -> ConnectorMetrics {
560        self.metrics.to_connector_metrics()
561    }
562
563    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
564        Some(Arc::clone(&self.data_ready))
565    }
566
567    async fn close(&mut self) -> Result<(), ConnectorError> {
568        // Signal reader task to shut down (it closes the binlog stream internally).
569        #[cfg(feature = "mysql-cdc")]
570        {
571            if let Some(tx) = self.reader_shutdown.take() {
572                let _ = tx.send(true);
573            }
574            if let Some(handle) = self.reader_handle.take() {
575                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
576            }
577            self.msg_rx = None;
578        }
579
580        self.connected = false;
581        self.table_cache.clear();
582        self.event_buffer.clear();
583        Ok(())
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use std::time::Duration;
591
592    fn test_config() -> MySqlCdcConfig {
593        MySqlCdcConfig {
594            host: "localhost".to_string(),
595            port: 3306,
596            database: Some("testdb".to_string()),
597            username: "root".to_string(),
598            password: Some("test".to_string()),
599            server_id: 12345,
600            ..Default::default()
601        }
602    }
603
604    #[test]
605    fn test_new_source() {
606        let config = test_config();
607        let source = MySqlCdcSource::new(config, None);
608
609        assert!(!source.is_connected());
610        assert_eq!(source.cached_table_count(), 0);
611        assert!(source.position().is_none());
612        assert!(source.gtid_set().is_none());
613    }
614
615    #[test]
616    fn test_from_config() {
617        let mut config = ConnectorConfig::new("mysql-cdc");
618        config.set("host", "mysql.example.com");
619        config.set("port", "3307");
620        config.set("username", "repl");
621        config.set("password", "secret");
622        config.set("server.id", "999");
623
624        let source = MySqlCdcSource::from_config(&config).unwrap();
625        assert_eq!(source.config().host, "mysql.example.com");
626        assert_eq!(source.config().port, 3307);
627        assert_eq!(source.config().server_id, 999);
628    }
629
630    #[test]
631    fn test_from_config_missing_required() {
632        let config = ConnectorConfig::new("mysql-cdc");
633
634        let result = MySqlCdcSource::from_config(&config);
635        assert!(result.is_err());
636    }
637
638    #[test]
639    fn test_restore_position_gtid() {
640        let mut source = MySqlCdcSource::new(test_config(), None);
641
642        let mut checkpoint = SourceCheckpoint::new(1);
643        checkpoint.set_offset("gtid", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
644
645        source.restore_position(&checkpoint);
646        assert!(source.gtid_set().is_some());
647    }
648
649    #[test]
650    fn test_restore_position_file() {
651        let mut source = MySqlCdcSource::new(test_config(), None);
652
653        let mut checkpoint = SourceCheckpoint::new(1);
654        checkpoint.set_offset("binlog_file", "mysql-bin.000003");
655        checkpoint.set_offset("binlog_position", "12345");
656
657        source.restore_position(&checkpoint);
658        let pos = source.position().unwrap();
659        assert_eq!(pos.filename, "mysql-bin.000003");
660        assert_eq!(pos.position, 12345);
661    }
662
663    #[test]
664    fn test_create_checkpoint_gtid() {
665        let mut source = MySqlCdcSource::new(test_config(), None);
666        source.config.use_gtid = true;
667        source.gtid_set = Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap());
668
669        let checkpoint = source.create_checkpoint();
670        assert!(checkpoint.get_offset("gtid").is_some());
671        // UUID is stored and displayed as lowercase
672        assert!(checkpoint.get_offset("gtid").unwrap().contains("3e11fa47"));
673    }
674
675    #[test]
676    fn test_create_checkpoint_file() {
677        let mut source = MySqlCdcSource::new(test_config(), None);
678        source.config.use_gtid = false;
679        source.position = Some(BinlogPosition::new("mysql-bin.000003".to_string(), 9999));
680
681        let checkpoint = source.create_checkpoint();
682        assert_eq!(
683            checkpoint.get_offset("binlog_file"),
684            Some("mysql-bin.000003")
685        );
686        assert_eq!(checkpoint.get_offset("binlog_position"), Some("9999"));
687    }
688
689    #[test]
690    fn test_schema() {
691        let source = MySqlCdcSource::new(test_config(), None);
692        let schema = source.schema();
693
694        // Should have CDC envelope fields
695        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name()).collect();
696        assert!(field_names.contains(&&"_table".to_string()));
697        assert!(field_names.contains(&&"_op".to_string()));
698        assert!(field_names.contains(&&"_ts_ms".to_string()));
699    }
700
701    #[test]
702    fn test_health_check_not_connected() {
703        let source = MySqlCdcSource::new(test_config(), None);
704
705        match source.health_check() {
706            HealthStatus::Unhealthy(message) => {
707                assert!(message.contains("Not connected"));
708            }
709            _ => panic!("Expected unhealthy status"),
710        }
711    }
712
713    #[test]
714    fn test_health_check_healthy() {
715        let mut source = MySqlCdcSource::new(test_config(), None);
716        source.connected = true;
717        source.last_activity = Some(Instant::now());
718
719        assert!(matches!(source.health_check(), HealthStatus::Healthy));
720    }
721
722    #[test]
723    fn test_health_check_degraded_no_activity() {
724        let mut source = MySqlCdcSource::new(test_config(), None);
725        source.connected = true;
726        source.config.heartbeat_interval = Duration::from_millis(1);
727        source.last_activity = Instant::now().checked_sub(Duration::from_secs(10));
728
729        match source.health_check() {
730            HealthStatus::Degraded(message) => {
731                assert!(message.contains("No activity"));
732            }
733            _ => panic!("Expected degraded status"),
734        }
735    }
736
737    #[test]
738    fn test_health_check_degraded_errors() {
739        let mut source = MySqlCdcSource::new(test_config(), None);
740        source.connected = true;
741        source.last_activity = Some(Instant::now());
742
743        // Simulate many errors
744        for _ in 0..150 {
745            source.metrics.inc_errors();
746        }
747
748        match source.health_check() {
749            HealthStatus::Degraded(message) => {
750                assert!(message.contains("errors"));
751            }
752            _ => panic!("Expected degraded status"),
753        }
754    }
755
756    #[test]
757    fn test_metrics() {
758        let mut source = MySqlCdcSource::new(test_config(), None);
759        source.metrics.inc_inserts(100);
760        source.metrics.inc_updates(50);
761        source.metrics.inc_deletes(25);
762        source.metrics.add_bytes_received(10000);
763        source.metrics.inc_errors();
764
765        let metrics = source.metrics();
766        assert_eq!(metrics.records_total, 175);
767        assert_eq!(metrics.bytes_total, 10000);
768        assert_eq!(metrics.errors_total, 1);
769    }
770
771    #[test]
772    fn test_table_filtering() {
773        let mut config = test_config();
774        config.table_include = vec!["users".to_string(), "orders".to_string()];
775
776        let source = MySqlCdcSource::new(config, None);
777
778        assert!(source.should_include_table("testdb", "users"));
779        assert!(source.should_include_table("testdb", "orders"));
780        assert!(!source.should_include_table("testdb", "other"));
781    }
782
783    // Without mysql-cdc feature, open() must return an error to prevent silent data loss.
784    #[cfg(not(feature = "mysql-cdc"))]
785    #[tokio::test]
786    async fn test_open_fails_without_feature() {
787        let mut source = MySqlCdcSource::new(test_config(), None);
788
789        let result = source.open(&ConnectorConfig::default()).await;
790        assert!(result.is_err());
791        let err = result.unwrap_err().to_string();
792        assert!(
793            err.contains("mysql-cdc"),
794            "error should mention feature flag: {err}"
795        );
796    }
797
798    #[tokio::test]
799    async fn test_poll_not_connected() {
800        let mut source = MySqlCdcSource::new(test_config(), None);
801
802        let result = source.poll_batch(100).await;
803        assert!(result.is_err());
804    }
805
806    // Without mysql-cdc feature, open() returns an error, so poll is unreachable.
807    // With mysql-cdc, open() needs a real MySQL server. Covered by integration tests.
808
809    #[tokio::test]
810    async fn test_restore_async() {
811        let mut source = MySqlCdcSource::new(test_config(), None);
812
813        let mut checkpoint = SourceCheckpoint::new(1);
814        checkpoint.set_offset("binlog_file", "mysql-bin.000005");
815        checkpoint.set_offset("binlog_position", "54321");
816
817        source.restore(&checkpoint).await.unwrap();
818
819        let pos = source.position().unwrap();
820        assert_eq!(pos.filename, "mysql-bin.000005");
821        assert_eq!(pos.position, 54321);
822    }
823}