1use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use arrow_schema::{Schema, SchemaRef};
11use async_trait::async_trait;
12use tokio::sync::Notify;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::metrics::ConnectorMetrics;
20
21use super::changelog::ChangeEvent;
22use super::config::MySqlCdcConfig;
23use super::decoder::{BinlogMessage, BinlogPosition};
24use super::gtid::GtidSet;
25use super::metrics::MySqlCdcMetrics;
26use super::schema::{cdc_envelope_schema, TableCache, TableInfo};
27
28#[cfg(feature = "mysql-cdc")]
30type BinlogMessageRx = crossfire::AsyncRx<crossfire::mpsc::Array<BinlogMessage>>;
31
32pub struct MySqlCdcSource {
36 config: MySqlCdcConfig,
38
39 connected: bool,
41
42 table_cache: TableCache,
44
45 position: Option<BinlogPosition>,
47
48 gtid_set: Option<GtidSet>,
50
51 current_binlog_file: String,
53
54 current_gtid: Option<String>,
56
57 event_buffer: Vec<ChangeEvent>,
59
60 metrics: MySqlCdcMetrics,
62
63 schema: Option<SchemaRef>,
65
66 last_activity: Option<Instant>,
68
69 data_ready: Arc<Notify>,
71
72 #[cfg(feature = "mysql-cdc")]
74 msg_rx: Option<BinlogMessageRx>,
75
76 #[cfg(feature = "mysql-cdc")]
78 reader_handle: Option<tokio::task::JoinHandle<()>>,
79
80 #[cfg(feature = "mysql-cdc")]
82 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
83}
84
85impl std::fmt::Debug for MySqlCdcSource {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("MySqlCdcSource")
89 .field("config", &self.config)
90 .field("connected", &self.connected)
91 .field("table_cache", &self.table_cache)
92 .field("position", &self.position)
93 .field("gtid_set", &self.gtid_set)
94 .field("current_binlog_file", &self.current_binlog_file)
95 .field("current_gtid", &self.current_gtid)
96 .field("event_buffer_len", &self.event_buffer.len())
97 .field("metrics", &self.metrics)
98 .field("schema", &self.schema)
99 .field("last_activity", &self.last_activity)
100 .finish_non_exhaustive()
101 }
102}
103
104impl MySqlCdcSource {
105 #[must_use]
107 pub fn new(config: MySqlCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
108 Self {
109 config,
110 connected: false,
111 table_cache: TableCache::new(),
112 position: None,
113 gtid_set: None,
114 current_binlog_file: String::new(),
115 current_gtid: None,
116 event_buffer: Vec::new(),
117 metrics: MySqlCdcMetrics::new(registry),
118 schema: None,
119 last_activity: None,
120 data_ready: Arc::new(Notify::new()),
121 #[cfg(feature = "mysql-cdc")]
122 msg_rx: None,
123 #[cfg(feature = "mysql-cdc")]
124 reader_handle: None,
125 #[cfg(feature = "mysql-cdc")]
126 reader_shutdown: None,
127 }
128 }
129
130 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
136 let mysql_config = MySqlCdcConfig::from_config(config)?;
137 Ok(Self::new(mysql_config, None))
138 }
139
140 #[must_use]
142 pub fn cached_table_count(&self) -> usize {
143 self.table_cache.len()
144 }
145
146 #[must_use]
148 pub fn position(&self) -> Option<&BinlogPosition> {
149 self.position.as_ref()
150 }
151
152 #[must_use]
154 pub fn gtid_set(&self) -> Option<&GtidSet> {
155 self.gtid_set.as_ref()
156 }
157
158 #[must_use]
160 pub fn table_cache(&self) -> &TableCache {
161 &self.table_cache
162 }
163
164 #[must_use]
166 pub fn cdc_metrics(&self) -> &MySqlCdcMetrics {
167 &self.metrics
168 }
169
170 #[must_use]
172 pub fn should_include_table(&self, database: &str, table: &str) -> bool {
173 self.config.should_include_table(database, table)
174 }
175
176 #[must_use]
178 pub fn config(&self) -> &MySqlCdcConfig {
179 &self.config
180 }
181
182 #[must_use]
184 pub fn is_connected(&self) -> bool {
185 self.connected
186 }
187
188 pub fn restore_position(&mut self, checkpoint: &SourceCheckpoint) {
192 if let Some(gtid_str) = checkpoint.get_offset("gtid") {
194 if let Ok(gtid_set) = gtid_str.parse::<GtidSet>() {
195 self.gtid_set = Some(gtid_set);
196 return;
197 }
198 }
199
200 if let (Some(filename), Some(pos_str)) = (
202 checkpoint.get_offset("binlog_file"),
203 checkpoint.get_offset("binlog_position"),
204 ) {
205 if let Ok(pos) = pos_str.parse::<u64>() {
206 self.position = Some(BinlogPosition::new(filename.to_string(), pos));
207 }
208 }
209 }
210
211 #[must_use]
213 pub fn create_checkpoint(&self) -> SourceCheckpoint {
214 let mut checkpoint = SourceCheckpoint::new(0);
215
216 if self.config.use_gtid {
217 if let Some(ref gtid_set) = self.gtid_set {
218 checkpoint.set_offset("gtid", gtid_set.to_string());
219 }
220 } else if let Some(ref pos) = self.position {
221 checkpoint.set_offset("binlog_file", &pos.filename);
222 checkpoint.set_offset("binlog_position", pos.position.to_string());
223 }
224
225 checkpoint.set_metadata("server_id", self.config.server_id.to_string());
226
227 checkpoint
228 }
229
230 #[allow(clippy::unused_self)] fn build_envelope_schema(&self, table_schema: &Schema) -> SchemaRef {
233 Arc::new(cdc_envelope_schema(table_schema))
234 }
235
236 pub fn flush_events(
242 &mut self,
243 table_info: &TableInfo,
244 ) -> Result<Option<RecordBatch>, ConnectorError> {
245 if self.event_buffer.is_empty() {
246 return Ok(None);
247 }
248
249 let events: Vec<_> = self.event_buffer.drain(..).collect();
250 let batch = super::changelog::events_to_record_batch(&events, table_info)
251 .map_err(|e| ConnectorError::Internal(e.to_string()))?;
252 Ok(Some(batch))
253 }
254}
255
256#[async_trait]
257#[allow(clippy::too_many_lines)]
258impl SourceConnector for MySqlCdcSource {
259 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
260 if !config.properties().is_empty() {
262 self.config = MySqlCdcConfig::from_config(config)?;
263 }
264
265 self.config.validate()?;
267
268 self.gtid_set.clone_from(&self.config.gtid_set);
270
271 if let Some(ref filename) = self.config.binlog_filename {
273 self.current_binlog_file.clone_from(filename);
274 if let Some(pos) = self.config.binlog_position {
275 self.position = Some(BinlogPosition::new(filename.clone(), pos));
276 }
277 }
278
279 #[cfg(not(feature = "mysql-cdc"))]
282 {
283 return Err(ConnectorError::ConfigurationError(
284 "MySQL CDC source requires the `mysql-cdc` feature flag. \
285 Rebuild with `--features mysql-cdc` to enable."
286 .to_string(),
287 ));
288 }
289
290 #[cfg(feature = "mysql-cdc")]
293 {
294 let conn = super::mysql_io::connect(&self.config).await?;
295 let stream = super::mysql_io::start_binlog_stream(
296 conn,
297 &self.config,
298 self.gtid_set.as_ref(),
299 self.position.as_ref(),
300 )
301 .await?;
302
303 let (msg_tx, msg_rx) = crossfire::mpsc::bounded_async::<BinlogMessage>(4096);
304 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
305 let data_ready = Arc::clone(&self.data_ready);
306
307 let reader_handle = tokio::spawn(async move {
308 use tokio_stream::StreamExt as _;
309 let mut stream = stream;
310 loop {
311 let event = tokio::select! {
312 biased;
313 _ = shutdown_rx.changed() => break,
314 event = stream.next() => event,
315 };
316 match event {
317 Some(Ok(raw_event)) => {
318 match super::mysql_io::decode_binlog_event(&raw_event, &stream) {
319 Ok(Some(msg)) => {
320 if msg_tx.send(msg).await.is_err() {
321 break;
322 }
323 data_ready.notify_one();
324 }
325 Ok(None) => {}
326 Err(e) => {
327 tracing::warn!(error = %e, "binlog decode error");
328 break;
329 }
330 }
331 }
332 Some(Err(e)) => {
333 tracing::warn!(error = %e, "binlog stream error");
334 break;
335 }
336 None => break,
337 }
338 }
339 if let Err(e) = stream.close().await {
340 tracing::warn!(error = %e, "error closing binlog stream");
341 }
342 });
343
344 self.msg_rx = Some(msg_rx);
345 self.reader_handle = Some(reader_handle);
346 self.reader_shutdown = Some(shutdown_tx);
347 }
348
349 self.connected = true;
350 self.last_activity = Some(Instant::now());
351
352 Ok(())
353 }
354
355 async fn poll_batch(
356 &mut self,
357 max_records: usize,
358 ) -> Result<Option<SourceBatch>, ConnectorError> {
359 if !self.connected {
360 return Err(ConnectorError::ConfigurationError(
361 "Source not connected".to_string(),
362 ));
363 }
364
365 #[cfg(feature = "mysql-cdc")]
372 {
373 let high_watermark = self.config.backpressure_high_watermark();
374
375 if self.event_buffer.len() >= high_watermark {
376 tracing::debug!(
377 buffered = self.event_buffer.len(),
378 high_watermark,
379 "CDC backpressure active — pausing binlog reader drain"
380 );
381 } else if let Some(rx) = self.msg_rx.as_mut() {
382 let mut last_table_info: Option<TableInfo> = None;
383
384 while self.event_buffer.len() < max_records
385 && self.event_buffer.len() < high_watermark
386 {
387 match rx.try_recv() {
388 Ok(msg) => {
389 self.metrics.inc_events_received();
390 match msg {
391 BinlogMessage::TableMap(tme) => {
392 self.metrics.inc_table_maps();
393 self.table_cache.update(&tme);
394 }
395 BinlogMessage::Insert(insert_msg) => {
396 if !self.config.should_include_table(
397 &insert_msg.database,
398 &insert_msg.table,
399 ) {
400 continue;
401 }
402 let row_count = insert_msg.rows.len() as u64;
403 let events = super::changelog::insert_to_events(
404 &insert_msg,
405 &self.current_binlog_file,
406 self.current_gtid.as_deref(),
407 );
408 self.event_buffer.extend(events);
409 self.metrics.inc_inserts(row_count);
410 last_table_info =
411 self.table_cache.get(insert_msg.table_id).cloned();
412 }
413 BinlogMessage::Update(update_msg) => {
414 if !self.config.should_include_table(
415 &update_msg.database,
416 &update_msg.table,
417 ) {
418 continue;
419 }
420 let row_count = update_msg.rows.len() as u64;
421 let events = super::changelog::update_to_events(
422 &update_msg,
423 &self.current_binlog_file,
424 self.current_gtid.as_deref(),
425 );
426 self.event_buffer.extend(events);
427 self.metrics.inc_updates(row_count);
428 last_table_info =
429 self.table_cache.get(update_msg.table_id).cloned();
430 }
431 BinlogMessage::Delete(delete_msg) => {
432 if !self.config.should_include_table(
433 &delete_msg.database,
434 &delete_msg.table,
435 ) {
436 continue;
437 }
438 let row_count = delete_msg.rows.len() as u64;
439 let events = super::changelog::delete_to_events(
440 &delete_msg,
441 &self.current_binlog_file,
442 self.current_gtid.as_deref(),
443 );
444 self.event_buffer.extend(events);
445 self.metrics.inc_deletes(row_count);
446 last_table_info =
447 self.table_cache.get(delete_msg.table_id).cloned();
448 }
449 BinlogMessage::Begin(begin_msg) => {
450 if let Some(ref gtid) = begin_msg.gtid {
451 self.current_gtid = Some(gtid.to_string());
452 if let Some(ref mut gtid_set) = self.gtid_set {
453 gtid_set.add(gtid);
454 }
455 } else {
456 self.current_gtid = None;
457 }
458 }
459 BinlogMessage::Commit(commit_msg) => {
460 self.metrics.inc_transactions();
461 self.metrics.set_binlog_position(commit_msg.binlog_position);
462 if let Some(ref mut pos) = self.position {
463 pos.position = commit_msg.binlog_position;
464 }
465 }
466 BinlogMessage::Rotate(rotate_msg) => {
467 self.current_binlog_file.clone_from(&rotate_msg.next_binlog);
468 if let Some(ref mut pos) = self.position {
469 pos.filename = rotate_msg.next_binlog;
470 pos.position = rotate_msg.position;
471 } else {
472 self.position = Some(BinlogPosition::new(
473 self.current_binlog_file.clone(),
474 rotate_msg.position,
475 ));
476 }
477 }
478 BinlogMessage::Query(query_msg) => {
479 self.metrics.inc_ddl_events();
480 let _ = query_msg;
481 }
482 BinlogMessage::Heartbeat => {
483 self.metrics.inc_heartbeats();
484 }
485 }
486 }
487 Err(_) => break,
488 }
489 }
490
491 self.last_activity = Some(Instant::now());
492
493 if let Some(table_info) = last_table_info {
494 if let Some(batch) = self.flush_events(&table_info)? {
495 let schema = self.build_envelope_schema(&table_info.arrow_schema);
496 self.schema = Some(schema);
497 return Ok(Some(SourceBatch::new(batch)));
498 }
499 }
500
501 return Ok(None);
502 }
503
504 self.last_activity = Some(Instant::now());
505 return Ok(None);
506 }
507
508 #[cfg(not(feature = "mysql-cdc"))]
510 {
511 let _ = max_records;
512 self.last_activity = Some(Instant::now());
513 Ok(None)
514 }
515 }
516
517 fn schema(&self) -> SchemaRef {
518 self.schema.clone().unwrap_or_else(|| {
520 Arc::new(cdc_envelope_schema(&Schema::empty()))
522 })
523 }
524
525 fn checkpoint(&self) -> SourceCheckpoint {
526 self.create_checkpoint()
527 }
528
529 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
530 self.restore_position(checkpoint);
531 Ok(())
532 }
533
534 fn health_check(&self) -> HealthStatus {
535 if !self.connected {
536 return HealthStatus::Unhealthy("Not connected".to_string());
537 }
538
539 if let Some(last) = self.last_activity {
541 let idle_duration = self.config.heartbeat_interval * 3;
542 if last.elapsed() > idle_duration {
543 return HealthStatus::Degraded(format!(
544 "No activity for {}s",
545 last.elapsed().as_secs()
546 ));
547 }
548 }
549
550 let errors = self.metrics.errors.get();
552 if errors > 100 {
553 return HealthStatus::Degraded(format!("{errors} errors encountered"));
554 }
555
556 HealthStatus::Healthy
557 }
558
559 fn metrics(&self) -> ConnectorMetrics {
560 self.metrics.to_connector_metrics()
561 }
562
563 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
564 Some(Arc::clone(&self.data_ready))
565 }
566
567 async fn close(&mut self) -> Result<(), ConnectorError> {
568 #[cfg(feature = "mysql-cdc")]
570 {
571 if let Some(tx) = self.reader_shutdown.take() {
572 let _ = tx.send(true);
573 }
574 if let Some(handle) = self.reader_handle.take() {
575 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
576 }
577 self.msg_rx = None;
578 }
579
580 self.connected = false;
581 self.table_cache.clear();
582 self.event_buffer.clear();
583 Ok(())
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590 use std::time::Duration;
591
592 fn test_config() -> MySqlCdcConfig {
593 MySqlCdcConfig {
594 host: "localhost".to_string(),
595 port: 3306,
596 database: Some("testdb".to_string()),
597 username: "root".to_string(),
598 password: Some("test".to_string()),
599 server_id: 12345,
600 ..Default::default()
601 }
602 }
603
604 #[test]
605 fn test_new_source() {
606 let config = test_config();
607 let source = MySqlCdcSource::new(config, None);
608
609 assert!(!source.is_connected());
610 assert_eq!(source.cached_table_count(), 0);
611 assert!(source.position().is_none());
612 assert!(source.gtid_set().is_none());
613 }
614
615 #[test]
616 fn test_from_config() {
617 let mut config = ConnectorConfig::new("mysql-cdc");
618 config.set("host", "mysql.example.com");
619 config.set("port", "3307");
620 config.set("username", "repl");
621 config.set("password", "secret");
622 config.set("server.id", "999");
623
624 let source = MySqlCdcSource::from_config(&config).unwrap();
625 assert_eq!(source.config().host, "mysql.example.com");
626 assert_eq!(source.config().port, 3307);
627 assert_eq!(source.config().server_id, 999);
628 }
629
630 #[test]
631 fn test_from_config_missing_required() {
632 let config = ConnectorConfig::new("mysql-cdc");
633
634 let result = MySqlCdcSource::from_config(&config);
635 assert!(result.is_err());
636 }
637
638 #[test]
639 fn test_restore_position_gtid() {
640 let mut source = MySqlCdcSource::new(test_config(), None);
641
642 let mut checkpoint = SourceCheckpoint::new(1);
643 checkpoint.set_offset("gtid", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
644
645 source.restore_position(&checkpoint);
646 assert!(source.gtid_set().is_some());
647 }
648
649 #[test]
650 fn test_restore_position_file() {
651 let mut source = MySqlCdcSource::new(test_config(), None);
652
653 let mut checkpoint = SourceCheckpoint::new(1);
654 checkpoint.set_offset("binlog_file", "mysql-bin.000003");
655 checkpoint.set_offset("binlog_position", "12345");
656
657 source.restore_position(&checkpoint);
658 let pos = source.position().unwrap();
659 assert_eq!(pos.filename, "mysql-bin.000003");
660 assert_eq!(pos.position, 12345);
661 }
662
663 #[test]
664 fn test_create_checkpoint_gtid() {
665 let mut source = MySqlCdcSource::new(test_config(), None);
666 source.config.use_gtid = true;
667 source.gtid_set = Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap());
668
669 let checkpoint = source.create_checkpoint();
670 assert!(checkpoint.get_offset("gtid").is_some());
671 assert!(checkpoint.get_offset("gtid").unwrap().contains("3e11fa47"));
673 }
674
675 #[test]
676 fn test_create_checkpoint_file() {
677 let mut source = MySqlCdcSource::new(test_config(), None);
678 source.config.use_gtid = false;
679 source.position = Some(BinlogPosition::new("mysql-bin.000003".to_string(), 9999));
680
681 let checkpoint = source.create_checkpoint();
682 assert_eq!(
683 checkpoint.get_offset("binlog_file"),
684 Some("mysql-bin.000003")
685 );
686 assert_eq!(checkpoint.get_offset("binlog_position"), Some("9999"));
687 }
688
689 #[test]
690 fn test_schema() {
691 let source = MySqlCdcSource::new(test_config(), None);
692 let schema = source.schema();
693
694 let field_names: Vec<_> = schema.fields().iter().map(|f| f.name()).collect();
696 assert!(field_names.contains(&&"_table".to_string()));
697 assert!(field_names.contains(&&"_op".to_string()));
698 assert!(field_names.contains(&&"_ts_ms".to_string()));
699 }
700
701 #[test]
702 fn test_health_check_not_connected() {
703 let source = MySqlCdcSource::new(test_config(), None);
704
705 match source.health_check() {
706 HealthStatus::Unhealthy(message) => {
707 assert!(message.contains("Not connected"));
708 }
709 _ => panic!("Expected unhealthy status"),
710 }
711 }
712
713 #[test]
714 fn test_health_check_healthy() {
715 let mut source = MySqlCdcSource::new(test_config(), None);
716 source.connected = true;
717 source.last_activity = Some(Instant::now());
718
719 assert!(matches!(source.health_check(), HealthStatus::Healthy));
720 }
721
722 #[test]
723 fn test_health_check_degraded_no_activity() {
724 let mut source = MySqlCdcSource::new(test_config(), None);
725 source.connected = true;
726 source.config.heartbeat_interval = Duration::from_millis(1);
727 source.last_activity = Instant::now().checked_sub(Duration::from_secs(10));
728
729 match source.health_check() {
730 HealthStatus::Degraded(message) => {
731 assert!(message.contains("No activity"));
732 }
733 _ => panic!("Expected degraded status"),
734 }
735 }
736
737 #[test]
738 fn test_health_check_degraded_errors() {
739 let mut source = MySqlCdcSource::new(test_config(), None);
740 source.connected = true;
741 source.last_activity = Some(Instant::now());
742
743 for _ in 0..150 {
745 source.metrics.inc_errors();
746 }
747
748 match source.health_check() {
749 HealthStatus::Degraded(message) => {
750 assert!(message.contains("errors"));
751 }
752 _ => panic!("Expected degraded status"),
753 }
754 }
755
756 #[test]
757 fn test_metrics() {
758 let mut source = MySqlCdcSource::new(test_config(), None);
759 source.metrics.inc_inserts(100);
760 source.metrics.inc_updates(50);
761 source.metrics.inc_deletes(25);
762 source.metrics.add_bytes_received(10000);
763 source.metrics.inc_errors();
764
765 let metrics = source.metrics();
766 assert_eq!(metrics.records_total, 175);
767 assert_eq!(metrics.bytes_total, 10000);
768 assert_eq!(metrics.errors_total, 1);
769 }
770
771 #[test]
772 fn test_table_filtering() {
773 let mut config = test_config();
774 config.table_include = vec!["users".to_string(), "orders".to_string()];
775
776 let source = MySqlCdcSource::new(config, None);
777
778 assert!(source.should_include_table("testdb", "users"));
779 assert!(source.should_include_table("testdb", "orders"));
780 assert!(!source.should_include_table("testdb", "other"));
781 }
782
783 #[cfg(not(feature = "mysql-cdc"))]
785 #[tokio::test]
786 async fn test_open_fails_without_feature() {
787 let mut source = MySqlCdcSource::new(test_config(), None);
788
789 let result = source.open(&ConnectorConfig::default()).await;
790 assert!(result.is_err());
791 let err = result.unwrap_err().to_string();
792 assert!(
793 err.contains("mysql-cdc"),
794 "error should mention feature flag: {err}"
795 );
796 }
797
798 #[tokio::test]
799 async fn test_poll_not_connected() {
800 let mut source = MySqlCdcSource::new(test_config(), None);
801
802 let result = source.poll_batch(100).await;
803 assert!(result.is_err());
804 }
805
806 #[tokio::test]
810 async fn test_restore_async() {
811 let mut source = MySqlCdcSource::new(test_config(), None);
812
813 let mut checkpoint = SourceCheckpoint::new(1);
814 checkpoint.set_offset("binlog_file", "mysql-bin.000005");
815 checkpoint.set_offset("binlog_position", "54321");
816
817 source.restore(&checkpoint).await.unwrap();
818
819 let pos = source.position().unwrap();
820 assert_eq!(pos.filename, "mysql-bin.000005");
821 assert_eq!(pos.position, 54321);
822 }
823}