laminar_connectors/cdc/mysql/
source.rs1use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use arrow_schema::{Schema, SchemaRef};
11use async_trait::async_trait;
12use tokio::sync::Notify;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::metrics::ConnectorMetrics;
20
21use super::changelog::ChangeEvent;
22use super::config::MySqlCdcConfig;
23use super::decoder::{BinlogMessage, BinlogPosition};
24use super::gtid::GtidSet;
25use super::metrics::MySqlCdcMetrics;
26use super::schema::{cdc_envelope_schema, TableCache, TableInfo};
27
28pub struct MySqlCdcSource {
55 config: MySqlCdcConfig,
57
58 connected: bool,
60
61 table_cache: TableCache,
63
64 position: Option<BinlogPosition>,
66
67 gtid_set: Option<GtidSet>,
69
70 current_binlog_file: String,
72
73 current_gtid: Option<String>,
75
76 event_buffer: Vec<ChangeEvent>,
78
79 metrics: MySqlCdcMetrics,
81
82 schema: Option<SchemaRef>,
84
85 last_activity: Option<Instant>,
87
88 data_ready: Arc<Notify>,
90
91 #[cfg(feature = "mysql-cdc")]
93 msg_rx: Option<tokio::sync::mpsc::Receiver<BinlogMessage>>,
94
95 #[cfg(feature = "mysql-cdc")]
97 reader_handle: Option<tokio::task::JoinHandle<()>>,
98
99 #[cfg(feature = "mysql-cdc")]
101 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
102}
103
104impl std::fmt::Debug for MySqlCdcSource {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("MySqlCdcSource")
108 .field("config", &self.config)
109 .field("connected", &self.connected)
110 .field("table_cache", &self.table_cache)
111 .field("position", &self.position)
112 .field("gtid_set", &self.gtid_set)
113 .field("current_binlog_file", &self.current_binlog_file)
114 .field("current_gtid", &self.current_gtid)
115 .field("event_buffer_len", &self.event_buffer.len())
116 .field("metrics", &self.metrics)
117 .field("schema", &self.schema)
118 .field("last_activity", &self.last_activity)
119 .finish_non_exhaustive()
120 }
121}
122
123impl MySqlCdcSource {
124 #[must_use]
126 pub fn new(config: MySqlCdcConfig) -> Self {
127 Self {
128 config,
129 connected: false,
130 table_cache: TableCache::new(),
131 position: None,
132 gtid_set: None,
133 current_binlog_file: String::new(),
134 current_gtid: None,
135 event_buffer: Vec::new(),
136 metrics: MySqlCdcMetrics::new(),
137 schema: None,
138 last_activity: None,
139 data_ready: Arc::new(Notify::new()),
140 #[cfg(feature = "mysql-cdc")]
141 msg_rx: None,
142 #[cfg(feature = "mysql-cdc")]
143 reader_handle: None,
144 #[cfg(feature = "mysql-cdc")]
145 reader_shutdown: None,
146 }
147 }
148
149 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
155 let mysql_config = MySqlCdcConfig::from_config(config)?;
156 Ok(Self::new(mysql_config))
157 }
158
159 #[must_use]
161 pub fn cached_table_count(&self) -> usize {
162 self.table_cache.len()
163 }
164
165 #[must_use]
167 pub fn position(&self) -> Option<&BinlogPosition> {
168 self.position.as_ref()
169 }
170
171 #[must_use]
173 pub fn gtid_set(&self) -> Option<&GtidSet> {
174 self.gtid_set.as_ref()
175 }
176
177 #[must_use]
179 pub fn table_cache(&self) -> &TableCache {
180 &self.table_cache
181 }
182
183 #[must_use]
185 pub fn cdc_metrics(&self) -> &MySqlCdcMetrics {
186 &self.metrics
187 }
188
189 #[must_use]
191 pub fn should_include_table(&self, database: &str, table: &str) -> bool {
192 self.config.should_include_table(database, table)
193 }
194
195 #[must_use]
197 pub fn config(&self) -> &MySqlCdcConfig {
198 &self.config
199 }
200
201 #[must_use]
203 pub fn is_connected(&self) -> bool {
204 self.connected
205 }
206
207 pub fn restore_position(&mut self, checkpoint: &SourceCheckpoint) {
211 if let Some(gtid_str) = checkpoint.get_offset("gtid") {
213 if let Ok(gtid_set) = gtid_str.parse::<GtidSet>() {
214 self.gtid_set = Some(gtid_set);
215 return;
216 }
217 }
218
219 if let (Some(filename), Some(pos_str)) = (
221 checkpoint.get_offset("binlog_file"),
222 checkpoint.get_offset("binlog_position"),
223 ) {
224 if let Ok(pos) = pos_str.parse::<u64>() {
225 self.position = Some(BinlogPosition::new(filename.to_string(), pos));
226 }
227 }
228 }
229
230 #[must_use]
232 pub fn create_checkpoint(&self) -> SourceCheckpoint {
233 let mut checkpoint = SourceCheckpoint::new(0);
234
235 if self.config.use_gtid {
236 if let Some(ref gtid_set) = self.gtid_set {
237 checkpoint.set_offset("gtid", gtid_set.to_string());
238 }
239 } else if let Some(ref pos) = self.position {
240 checkpoint.set_offset("binlog_file", &pos.filename);
241 checkpoint.set_offset("binlog_position", pos.position.to_string());
242 }
243
244 checkpoint.set_metadata("server_id", self.config.server_id.to_string());
245
246 checkpoint
247 }
248
249 #[allow(clippy::unused_self)] fn build_envelope_schema(&self, table_schema: &Schema) -> SchemaRef {
252 Arc::new(cdc_envelope_schema(table_schema))
253 }
254
255 pub fn flush_events(
261 &mut self,
262 table_info: &TableInfo,
263 ) -> Result<Option<RecordBatch>, ConnectorError> {
264 if self.event_buffer.is_empty() {
265 return Ok(None);
266 }
267
268 let events: Vec<_> = self.event_buffer.drain(..).collect();
269 let batch = super::changelog::events_to_record_batch(&events, table_info)
270 .map_err(|e| ConnectorError::Internal(e.to_string()))?;
271 Ok(Some(batch))
272 }
273}
274
275#[async_trait]
276#[allow(clippy::too_many_lines)]
277impl SourceConnector for MySqlCdcSource {
278 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
279 if !config.properties().is_empty() {
281 self.config = MySqlCdcConfig::from_config(config)?;
282 }
283
284 self.config.validate()?;
286
287 self.gtid_set.clone_from(&self.config.gtid_set);
289
290 if let Some(ref filename) = self.config.binlog_filename {
292 self.current_binlog_file.clone_from(filename);
293 if let Some(pos) = self.config.binlog_position {
294 self.position = Some(BinlogPosition::new(filename.clone(), pos));
295 }
296 }
297
298 #[cfg(not(feature = "mysql-cdc"))]
301 {
302 return Err(ConnectorError::ConfigurationError(
303 "MySQL CDC source requires the `mysql-cdc` feature flag. \
304 Rebuild with `--features mysql-cdc` to enable."
305 .to_string(),
306 ));
307 }
308
309 #[cfg(feature = "mysql-cdc")]
312 {
313 let conn = super::mysql_io::connect(&self.config).await?;
314 let stream = super::mysql_io::start_binlog_stream(
315 conn,
316 &self.config,
317 self.gtid_set.as_ref(),
318 self.position.as_ref(),
319 )
320 .await?;
321
322 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(4096);
323 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
324 let data_ready = Arc::clone(&self.data_ready);
325
326 let reader_handle = tokio::spawn(async move {
327 use tokio_stream::StreamExt as _;
328 let mut stream = stream;
329 loop {
330 let event = tokio::select! {
331 biased;
332 _ = shutdown_rx.changed() => break,
333 event = stream.next() => event,
334 };
335 match event {
336 Some(Ok(raw_event)) => {
337 match super::mysql_io::decode_binlog_event(&raw_event, &stream) {
338 Ok(Some(msg)) => {
339 if msg_tx.send(msg).await.is_err() {
340 break;
341 }
342 data_ready.notify_one();
343 }
344 Ok(None) => {}
345 Err(e) => {
346 tracing::warn!(error = %e, "binlog decode error");
347 break;
348 }
349 }
350 }
351 Some(Err(e)) => {
352 tracing::warn!(error = %e, "binlog stream error");
353 break;
354 }
355 None => break,
356 }
357 }
358 if let Err(e) = stream.close().await {
359 tracing::warn!(error = %e, "error closing binlog stream");
360 }
361 });
362
363 self.msg_rx = Some(msg_rx);
364 self.reader_handle = Some(reader_handle);
365 self.reader_shutdown = Some(shutdown_tx);
366 }
367
368 self.connected = true;
369 self.last_activity = Some(Instant::now());
370
371 Ok(())
372 }
373
374 async fn poll_batch(
375 &mut self,
376 max_records: usize,
377 ) -> Result<Option<SourceBatch>, ConnectorError> {
378 if !self.connected {
379 return Err(ConnectorError::ConfigurationError(
380 "Source not connected".to_string(),
381 ));
382 }
383
384 #[cfg(feature = "mysql-cdc")]
386 {
387 if let Some(rx) = self.msg_rx.as_mut() {
388 let mut last_table_info: Option<TableInfo> = None;
389
390 while self.event_buffer.len() < max_records {
391 match rx.try_recv() {
392 Ok(msg) => {
393 self.metrics.inc_events_received();
394 match msg {
395 BinlogMessage::TableMap(tme) => {
396 self.metrics.inc_table_maps();
397 self.table_cache.update(&tme);
398 }
399 BinlogMessage::Insert(insert_msg) => {
400 if !self.config.should_include_table(
401 &insert_msg.database,
402 &insert_msg.table,
403 ) {
404 continue;
405 }
406 let row_count = insert_msg.rows.len() as u64;
407 let events = super::changelog::insert_to_events(
408 &insert_msg,
409 &self.current_binlog_file,
410 self.current_gtid.as_deref(),
411 );
412 self.event_buffer.extend(events);
413 self.metrics.inc_inserts(row_count);
414 last_table_info =
415 self.table_cache.get(insert_msg.table_id).cloned();
416 }
417 BinlogMessage::Update(update_msg) => {
418 if !self.config.should_include_table(
419 &update_msg.database,
420 &update_msg.table,
421 ) {
422 continue;
423 }
424 let row_count = update_msg.rows.len() as u64;
425 let events = super::changelog::update_to_events(
426 &update_msg,
427 &self.current_binlog_file,
428 self.current_gtid.as_deref(),
429 );
430 self.event_buffer.extend(events);
431 self.metrics.inc_updates(row_count);
432 last_table_info =
433 self.table_cache.get(update_msg.table_id).cloned();
434 }
435 BinlogMessage::Delete(delete_msg) => {
436 if !self.config.should_include_table(
437 &delete_msg.database,
438 &delete_msg.table,
439 ) {
440 continue;
441 }
442 let row_count = delete_msg.rows.len() as u64;
443 let events = super::changelog::delete_to_events(
444 &delete_msg,
445 &self.current_binlog_file,
446 self.current_gtid.as_deref(),
447 );
448 self.event_buffer.extend(events);
449 self.metrics.inc_deletes(row_count);
450 last_table_info =
451 self.table_cache.get(delete_msg.table_id).cloned();
452 }
453 BinlogMessage::Begin(begin_msg) => {
454 if let Some(ref gtid) = begin_msg.gtid {
455 self.current_gtid = Some(gtid.to_string());
456 if let Some(ref mut gtid_set) = self.gtid_set {
457 gtid_set.add(gtid);
458 }
459 } else {
460 self.current_gtid = None;
461 }
462 }
463 BinlogMessage::Commit(commit_msg) => {
464 self.metrics.inc_transactions();
465 self.metrics.set_binlog_position(commit_msg.binlog_position);
466 if let Some(ref mut pos) = self.position {
467 pos.position = commit_msg.binlog_position;
468 }
469 }
470 BinlogMessage::Rotate(rotate_msg) => {
471 self.current_binlog_file.clone_from(&rotate_msg.next_binlog);
472 if let Some(ref mut pos) = self.position {
473 pos.filename = rotate_msg.next_binlog;
474 pos.position = rotate_msg.position;
475 } else {
476 self.position = Some(BinlogPosition::new(
477 self.current_binlog_file.clone(),
478 rotate_msg.position,
479 ));
480 }
481 }
482 BinlogMessage::Query(query_msg) => {
483 self.metrics.inc_ddl_events();
484 let _ = query_msg;
485 }
486 BinlogMessage::Heartbeat => {
487 self.metrics.inc_heartbeats();
488 }
489 }
490 }
491 Err(_) => break,
492 }
493 }
494
495 self.last_activity = Some(Instant::now());
496
497 if let Some(table_info) = last_table_info {
498 if let Some(batch) = self.flush_events(&table_info)? {
499 let schema = self.build_envelope_schema(&table_info.arrow_schema);
500 self.schema = Some(schema);
501 return Ok(Some(SourceBatch::new(batch)));
502 }
503 }
504
505 return Ok(None);
506 }
507
508 self.last_activity = Some(Instant::now());
509 return Ok(None);
510 }
511
512 #[cfg(not(feature = "mysql-cdc"))]
514 {
515 let _ = max_records;
516 self.last_activity = Some(Instant::now());
517 Ok(None)
518 }
519 }
520
521 fn schema(&self) -> SchemaRef {
522 self.schema.clone().unwrap_or_else(|| {
524 Arc::new(cdc_envelope_schema(&Schema::empty()))
526 })
527 }
528
529 fn checkpoint(&self) -> SourceCheckpoint {
530 self.create_checkpoint()
531 }
532
533 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
534 self.restore_position(checkpoint);
535 Ok(())
536 }
537
538 fn health_check(&self) -> HealthStatus {
539 if !self.connected {
540 return HealthStatus::Unhealthy("Not connected".to_string());
541 }
542
543 if let Some(last) = self.last_activity {
545 let idle_duration = self.config.heartbeat_interval * 3;
546 if last.elapsed() > idle_duration {
547 return HealthStatus::Degraded(format!(
548 "No activity for {}s",
549 last.elapsed().as_secs()
550 ));
551 }
552 }
553
554 let errors = self
556 .metrics
557 .errors
558 .load(std::sync::atomic::Ordering::Relaxed);
559 if errors > 100 {
560 return HealthStatus::Degraded(format!("{errors} errors encountered"));
561 }
562
563 HealthStatus::Healthy
564 }
565
566 fn metrics(&self) -> ConnectorMetrics {
567 self.metrics.to_connector_metrics()
568 }
569
570 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
571 Some(Arc::clone(&self.data_ready))
572 }
573
574 async fn close(&mut self) -> Result<(), ConnectorError> {
575 #[cfg(feature = "mysql-cdc")]
577 {
578 if let Some(tx) = self.reader_shutdown.take() {
579 let _ = tx.send(true);
580 }
581 if let Some(handle) = self.reader_handle.take() {
582 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
583 }
584 self.msg_rx = None;
585 }
586
587 self.connected = false;
588 self.table_cache.clear();
589 self.event_buffer.clear();
590 Ok(())
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use std::time::Duration;
598
599 fn test_config() -> MySqlCdcConfig {
600 MySqlCdcConfig {
601 host: "localhost".to_string(),
602 port: 3306,
603 database: Some("testdb".to_string()),
604 username: "root".to_string(),
605 password: Some("test".to_string()),
606 server_id: 12345,
607 ..Default::default()
608 }
609 }
610
611 #[test]
612 fn test_new_source() {
613 let config = test_config();
614 let source = MySqlCdcSource::new(config);
615
616 assert!(!source.is_connected());
617 assert_eq!(source.cached_table_count(), 0);
618 assert!(source.position().is_none());
619 assert!(source.gtid_set().is_none());
620 }
621
622 #[test]
623 fn test_from_config() {
624 let mut config = ConnectorConfig::new("mysql-cdc");
625 config.set("host", "mysql.example.com");
626 config.set("port", "3307");
627 config.set("username", "repl");
628 config.set("password", "secret");
629 config.set("server.id", "999");
630
631 let source = MySqlCdcSource::from_config(&config).unwrap();
632 assert_eq!(source.config().host, "mysql.example.com");
633 assert_eq!(source.config().port, 3307);
634 assert_eq!(source.config().server_id, 999);
635 }
636
637 #[test]
638 fn test_from_config_missing_required() {
639 let config = ConnectorConfig::new("mysql-cdc");
640
641 let result = MySqlCdcSource::from_config(&config);
642 assert!(result.is_err());
643 }
644
645 #[test]
646 fn test_restore_position_gtid() {
647 let mut source = MySqlCdcSource::new(test_config());
648
649 let mut checkpoint = SourceCheckpoint::new(1);
650 checkpoint.set_offset("gtid", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
651
652 source.restore_position(&checkpoint);
653 assert!(source.gtid_set().is_some());
654 }
655
656 #[test]
657 fn test_restore_position_file() {
658 let mut source = MySqlCdcSource::new(test_config());
659
660 let mut checkpoint = SourceCheckpoint::new(1);
661 checkpoint.set_offset("binlog_file", "mysql-bin.000003");
662 checkpoint.set_offset("binlog_position", "12345");
663
664 source.restore_position(&checkpoint);
665 let pos = source.position().unwrap();
666 assert_eq!(pos.filename, "mysql-bin.000003");
667 assert_eq!(pos.position, 12345);
668 }
669
670 #[test]
671 fn test_create_checkpoint_gtid() {
672 let mut source = MySqlCdcSource::new(test_config());
673 source.config.use_gtid = true;
674 source.gtid_set = Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap());
675
676 let checkpoint = source.create_checkpoint();
677 assert!(checkpoint.get_offset("gtid").is_some());
678 assert!(checkpoint.get_offset("gtid").unwrap().contains("3e11fa47"));
680 }
681
682 #[test]
683 fn test_create_checkpoint_file() {
684 let mut source = MySqlCdcSource::new(test_config());
685 source.config.use_gtid = false;
686 source.position = Some(BinlogPosition::new("mysql-bin.000003".to_string(), 9999));
687
688 let checkpoint = source.create_checkpoint();
689 assert_eq!(
690 checkpoint.get_offset("binlog_file"),
691 Some("mysql-bin.000003")
692 );
693 assert_eq!(checkpoint.get_offset("binlog_position"), Some("9999"));
694 }
695
696 #[test]
697 fn test_schema() {
698 let source = MySqlCdcSource::new(test_config());
699 let schema = source.schema();
700
701 let field_names: Vec<_> = schema.fields().iter().map(|f| f.name()).collect();
703 assert!(field_names.contains(&&"_table".to_string()));
704 assert!(field_names.contains(&&"_op".to_string()));
705 assert!(field_names.contains(&&"_ts_ms".to_string()));
706 }
707
708 #[test]
709 fn test_health_check_not_connected() {
710 let source = MySqlCdcSource::new(test_config());
711
712 match source.health_check() {
713 HealthStatus::Unhealthy(message) => {
714 assert!(message.contains("Not connected"));
715 }
716 _ => panic!("Expected unhealthy status"),
717 }
718 }
719
720 #[test]
721 fn test_health_check_healthy() {
722 let mut source = MySqlCdcSource::new(test_config());
723 source.connected = true;
724 source.last_activity = Some(Instant::now());
725
726 assert!(matches!(source.health_check(), HealthStatus::Healthy));
727 }
728
729 #[test]
730 fn test_health_check_degraded_no_activity() {
731 let mut source = MySqlCdcSource::new(test_config());
732 source.connected = true;
733 source.config.heartbeat_interval = Duration::from_millis(1);
734 source.last_activity = Instant::now().checked_sub(Duration::from_secs(10));
735
736 match source.health_check() {
737 HealthStatus::Degraded(message) => {
738 assert!(message.contains("No activity"));
739 }
740 _ => panic!("Expected degraded status"),
741 }
742 }
743
744 #[test]
745 fn test_health_check_degraded_errors() {
746 let mut source = MySqlCdcSource::new(test_config());
747 source.connected = true;
748 source.last_activity = Some(Instant::now());
749
750 for _ in 0..150 {
752 source.metrics.inc_errors();
753 }
754
755 match source.health_check() {
756 HealthStatus::Degraded(message) => {
757 assert!(message.contains("errors"));
758 }
759 _ => panic!("Expected degraded status"),
760 }
761 }
762
763 #[test]
764 fn test_metrics() {
765 let mut source = MySqlCdcSource::new(test_config());
766 source.metrics.inc_inserts(100);
767 source.metrics.inc_updates(50);
768 source.metrics.inc_deletes(25);
769 source.metrics.add_bytes_received(10000);
770 source.metrics.inc_errors();
771
772 let metrics = source.metrics();
773 assert_eq!(metrics.records_total, 175);
774 assert_eq!(metrics.bytes_total, 10000);
775 assert_eq!(metrics.errors_total, 1);
776 }
777
778 #[test]
779 fn test_table_filtering() {
780 let mut config = test_config();
781 config.table_include = vec!["users".to_string(), "orders".to_string()];
782
783 let source = MySqlCdcSource::new(config);
784
785 assert!(source.should_include_table("testdb", "users"));
786 assert!(source.should_include_table("testdb", "orders"));
787 assert!(!source.should_include_table("testdb", "other"));
788 }
789
790 #[cfg(not(feature = "mysql-cdc"))]
792 #[tokio::test]
793 async fn test_open_fails_without_feature() {
794 let mut source = MySqlCdcSource::new(test_config());
795
796 let result = source.open(&ConnectorConfig::default()).await;
797 assert!(result.is_err());
798 let err = result.unwrap_err().to_string();
799 assert!(
800 err.contains("mysql-cdc"),
801 "error should mention feature flag: {err}"
802 );
803 }
804
805 #[tokio::test]
806 async fn test_poll_not_connected() {
807 let mut source = MySqlCdcSource::new(test_config());
808
809 let result = source.poll_batch(100).await;
810 assert!(result.is_err());
811 }
812
813 #[tokio::test]
817 async fn test_restore_async() {
818 let mut source = MySqlCdcSource::new(test_config());
819
820 let mut checkpoint = SourceCheckpoint::new(1);
821 checkpoint.set_offset("binlog_file", "mysql-bin.000005");
822 checkpoint.set_offset("binlog_position", "54321");
823
824 source.restore(&checkpoint).await.unwrap();
825
826 let pos = source.position().unwrap();
827 assert_eq!(pos.filename, "mysql-bin.000005");
828 assert_eq!(pos.position, 54321);
829 }
830}