laminar_connectors/cdc/mysql/
source.rs1use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use arrow_schema::{Schema, SchemaRef};
11use async_trait::async_trait;
12use tokio::sync::Notify;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18
19use super::changelog::ChangeEvent;
20use super::config::MySqlCdcConfig;
21use super::decoder::{BinlogMessage, BinlogPosition};
22use super::gtid::GtidSet;
23use super::metrics::MySqlCdcMetrics;
24use super::schema::{cdc_envelope_schema, TableCache, TableInfo};
25
26#[cfg(feature = "mysql-cdc")]
28type BinlogMessageRx = crossfire::AsyncRx<crossfire::mpsc::Array<BinlogMessage>>;
29
30pub struct MySqlCdcSource {
34 config: MySqlCdcConfig,
36
37 connected: bool,
39
40 table_cache: TableCache,
42
43 position: Option<BinlogPosition>,
45
46 gtid_set: Option<GtidSet>,
48
49 current_binlog_file: String,
51
52 current_gtid: Option<String>,
54
55 event_buffer: Vec<ChangeEvent>,
57
58 metrics: MySqlCdcMetrics,
60
61 schema: Option<SchemaRef>,
63
64 last_activity: Option<Instant>,
66
67 data_ready: Arc<Notify>,
69
70 #[cfg(feature = "mysql-cdc")]
72 msg_rx: Option<BinlogMessageRx>,
73
74 #[cfg(feature = "mysql-cdc")]
76 reader_handle: Option<tokio::task::JoinHandle<()>>,
77
78 #[cfg(feature = "mysql-cdc")]
80 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
81}
82
83impl std::fmt::Debug for MySqlCdcSource {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("MySqlCdcSource")
87 .field("config", &self.config)
88 .field("connected", &self.connected)
89 .field("table_cache", &self.table_cache)
90 .field("position", &self.position)
91 .field("gtid_set", &self.gtid_set)
92 .field("current_binlog_file", &self.current_binlog_file)
93 .field("current_gtid", &self.current_gtid)
94 .field("event_buffer_len", &self.event_buffer.len())
95 .field("metrics", &self.metrics)
96 .field("schema", &self.schema)
97 .field("last_activity", &self.last_activity)
98 .finish_non_exhaustive()
99 }
100}
101
102impl MySqlCdcSource {
103 #[must_use]
105 pub fn new(config: MySqlCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
106 Self {
107 config,
108 connected: false,
109 table_cache: TableCache::new(),
110 position: None,
111 gtid_set: None,
112 current_binlog_file: String::new(),
113 current_gtid: None,
114 event_buffer: Vec::new(),
115 metrics: MySqlCdcMetrics::new(registry),
116 schema: None,
117 last_activity: None,
118 data_ready: Arc::new(Notify::new()),
119 #[cfg(feature = "mysql-cdc")]
120 msg_rx: None,
121 #[cfg(feature = "mysql-cdc")]
122 reader_handle: None,
123 #[cfg(feature = "mysql-cdc")]
124 reader_shutdown: None,
125 }
126 }
127
128 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
134 let mysql_config = MySqlCdcConfig::from_config(config)?;
135 Ok(Self::new(mysql_config, None))
136 }
137
138 #[must_use]
140 pub fn cached_table_count(&self) -> usize {
141 self.table_cache.len()
142 }
143
144 #[must_use]
146 pub fn position(&self) -> Option<&BinlogPosition> {
147 self.position.as_ref()
148 }
149
150 #[must_use]
152 pub fn gtid_set(&self) -> Option<&GtidSet> {
153 self.gtid_set.as_ref()
154 }
155
156 #[must_use]
158 pub fn table_cache(&self) -> &TableCache {
159 &self.table_cache
160 }
161
162 #[must_use]
164 pub fn cdc_metrics(&self) -> &MySqlCdcMetrics {
165 &self.metrics
166 }
167
168 #[must_use]
170 pub fn should_include_table(&self, database: &str, table: &str) -> bool {
171 self.config.should_include_table(database, table)
172 }
173
174 #[must_use]
176 pub fn config(&self) -> &MySqlCdcConfig {
177 &self.config
178 }
179
180 #[must_use]
182 pub fn is_connected(&self) -> bool {
183 self.connected
184 }
185
186 pub fn restore_position(&mut self, checkpoint: &SourceCheckpoint) {
190 if let Some(gtid_str) = checkpoint.get_offset("gtid") {
192 if let Ok(gtid_set) = gtid_str.parse::<GtidSet>() {
193 self.gtid_set = Some(gtid_set);
194 return;
195 }
196 }
197
198 if let (Some(filename), Some(pos_str)) = (
200 checkpoint.get_offset("binlog_file"),
201 checkpoint.get_offset("binlog_position"),
202 ) {
203 if let Ok(pos) = pos_str.parse::<u64>() {
204 self.position = Some(BinlogPosition::new(filename.to_string(), pos));
205 }
206 }
207 }
208
209 #[must_use]
211 pub fn create_checkpoint(&self) -> SourceCheckpoint {
212 let mut checkpoint = SourceCheckpoint::new(0);
213
214 if self.config.use_gtid {
215 if let Some(ref gtid_set) = self.gtid_set {
216 checkpoint.set_offset("gtid", gtid_set.to_string());
217 }
218 } else if let Some(ref pos) = self.position {
219 checkpoint.set_offset("binlog_file", &pos.filename);
220 checkpoint.set_offset("binlog_position", pos.position.to_string());
221 }
222
223 checkpoint.set_metadata("server_id", self.config.server_id.to_string());
224
225 checkpoint
226 }
227
228 #[allow(clippy::unused_self)] fn build_envelope_schema(&self, table_schema: &Schema) -> SchemaRef {
231 Arc::new(cdc_envelope_schema(table_schema))
232 }
233
234 pub fn flush_events(
240 &mut self,
241 table_info: &TableInfo,
242 ) -> Result<Option<RecordBatch>, ConnectorError> {
243 if self.event_buffer.is_empty() {
244 return Ok(None);
245 }
246
247 let events: Vec<_> = self.event_buffer.drain(..).collect();
248 let batch = super::changelog::events_to_record_batch(&events, table_info)
249 .map_err(|e| ConnectorError::Internal(e.to_string()))?;
250 Ok(Some(batch))
251 }
252}
253
254#[async_trait]
255#[allow(clippy::too_many_lines)]
256impl SourceConnector for MySqlCdcSource {
257 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
258 if !config.properties().is_empty() {
260 self.config = MySqlCdcConfig::from_config(config)?;
261 }
262
263 self.config.validate()?;
265
266 self.gtid_set.clone_from(&self.config.gtid_set);
268
269 if let Some(ref filename) = self.config.binlog_filename {
271 self.current_binlog_file.clone_from(filename);
272 if let Some(pos) = self.config.binlog_position {
273 self.position = Some(BinlogPosition::new(filename.clone(), pos));
274 }
275 }
276
277 #[cfg(not(feature = "mysql-cdc"))]
280 {
281 return Err(ConnectorError::ConfigurationError(
282 "MySQL CDC source requires the `mysql-cdc` feature flag. \
283 Rebuild with `--features mysql-cdc` to enable."
284 .to_string(),
285 ));
286 }
287
288 #[cfg(feature = "mysql-cdc")]
291 {
292 let conn = super::mysql_io::connect(&self.config).await?;
293 let stream = super::mysql_io::start_binlog_stream(
294 conn,
295 &self.config,
296 self.gtid_set.as_ref(),
297 self.position.as_ref(),
298 )
299 .await?;
300
301 let (msg_tx, msg_rx) = crossfire::mpsc::bounded_async::<BinlogMessage>(4096);
302 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
303 let data_ready = Arc::clone(&self.data_ready);
304
305 let reader_handle = tokio::spawn(async move {
306 use tokio_stream::StreamExt as _;
307 let mut stream = stream;
308 loop {
309 let event = tokio::select! {
310 biased;
311 _ = shutdown_rx.changed() => break,
312 event = stream.next() => event,
313 };
314 match event {
315 Some(Ok(raw_event)) => {
316 match super::mysql_io::decode_binlog_event(&raw_event, &stream) {
317 Ok(Some(msg)) => {
318 if msg_tx.send(msg).await.is_err() {
319 break;
320 }
321 data_ready.notify_one();
322 }
323 Ok(None) => {}
324 Err(e) => {
325 tracing::warn!(error = %e, "binlog decode error");
326 break;
327 }
328 }
329 }
330 Some(Err(e)) => {
331 tracing::warn!(error = %e, "binlog stream error");
332 break;
333 }
334 None => break,
335 }
336 }
337 if let Err(e) = stream.close().await {
338 tracing::warn!(error = %e, "error closing binlog stream");
339 }
340 });
341
342 self.msg_rx = Some(msg_rx);
343 self.reader_handle = Some(reader_handle);
344 self.reader_shutdown = Some(shutdown_tx);
345 }
346
347 self.connected = true;
348 self.last_activity = Some(Instant::now());
349
350 Ok(())
351 }
352
353 async fn poll_batch(
354 &mut self,
355 max_records: usize,
356 ) -> Result<Option<SourceBatch>, ConnectorError> {
357 if !self.connected {
358 return Err(ConnectorError::ConfigurationError(
359 "Source not connected".to_string(),
360 ));
361 }
362
363 #[cfg(feature = "mysql-cdc")]
370 {
371 let high_watermark = self.config.backpressure_high_watermark();
372
373 if self.event_buffer.len() >= high_watermark {
374 tracing::debug!(
375 buffered = self.event_buffer.len(),
376 high_watermark,
377 "CDC backpressure active — pausing binlog reader drain"
378 );
379 } else if let Some(rx) = self.msg_rx.as_mut() {
380 let mut last_table_info: Option<TableInfo> = None;
381
382 while self.event_buffer.len() < max_records
383 && self.event_buffer.len() < high_watermark
384 {
385 match rx.try_recv() {
386 Ok(msg) => {
387 self.metrics.inc_events_received();
388 match msg {
389 BinlogMessage::TableMap(tme) => {
390 self.metrics.inc_table_maps();
391 self.table_cache.update(&tme);
392 }
393 BinlogMessage::Insert(insert_msg) => {
394 if !self.config.should_include_table(
395 &insert_msg.database,
396 &insert_msg.table,
397 ) {
398 continue;
399 }
400 let row_count = insert_msg.rows.len() as u64;
401 let events = super::changelog::insert_to_events(
402 &insert_msg,
403 &self.current_binlog_file,
404 self.current_gtid.as_deref(),
405 );
406 self.event_buffer.extend(events);
407 self.metrics.inc_inserts(row_count);
408 last_table_info =
409 self.table_cache.get(insert_msg.table_id).cloned();
410 }
411 BinlogMessage::Update(update_msg) => {
412 if !self.config.should_include_table(
413 &update_msg.database,
414 &update_msg.table,
415 ) {
416 continue;
417 }
418 let row_count = update_msg.rows.len() as u64;
419 let events = super::changelog::update_to_events(
420 &update_msg,
421 &self.current_binlog_file,
422 self.current_gtid.as_deref(),
423 );
424 self.event_buffer.extend(events);
425 self.metrics.inc_updates(row_count);
426 last_table_info =
427 self.table_cache.get(update_msg.table_id).cloned();
428 }
429 BinlogMessage::Delete(delete_msg) => {
430 if !self.config.should_include_table(
431 &delete_msg.database,
432 &delete_msg.table,
433 ) {
434 continue;
435 }
436 let row_count = delete_msg.rows.len() as u64;
437 let events = super::changelog::delete_to_events(
438 &delete_msg,
439 &self.current_binlog_file,
440 self.current_gtid.as_deref(),
441 );
442 self.event_buffer.extend(events);
443 self.metrics.inc_deletes(row_count);
444 last_table_info =
445 self.table_cache.get(delete_msg.table_id).cloned();
446 }
447 BinlogMessage::Begin(begin_msg) => {
448 if let Some(ref gtid) = begin_msg.gtid {
449 self.current_gtid = Some(gtid.to_string());
450 if let Some(ref mut gtid_set) = self.gtid_set {
451 gtid_set.add(gtid);
452 }
453 } else {
454 self.current_gtid = None;
455 }
456 }
457 BinlogMessage::Commit(commit_msg) => {
458 self.metrics.inc_transactions();
459 self.metrics.set_binlog_position(commit_msg.binlog_position);
460 if let Some(ref mut pos) = self.position {
461 pos.position = commit_msg.binlog_position;
462 }
463 }
464 BinlogMessage::Rotate(rotate_msg) => {
465 self.current_binlog_file.clone_from(&rotate_msg.next_binlog);
466 if let Some(ref mut pos) = self.position {
467 pos.filename = rotate_msg.next_binlog;
468 pos.position = rotate_msg.position;
469 } else {
470 self.position = Some(BinlogPosition::new(
471 self.current_binlog_file.clone(),
472 rotate_msg.position,
473 ));
474 }
475 }
476 BinlogMessage::Query(query_msg) => {
477 self.metrics.inc_ddl_events();
478 let _ = query_msg;
479 }
480 BinlogMessage::Heartbeat => {
481 self.metrics.inc_heartbeats();
482 }
483 }
484 }
485 Err(_) => break,
486 }
487 }
488
489 self.last_activity = Some(Instant::now());
490
491 if let Some(table_info) = last_table_info {
492 if let Some(batch) = self.flush_events(&table_info)? {
493 let schema = self.build_envelope_schema(&table_info.arrow_schema);
494 self.schema = Some(schema);
495 return Ok(Some(SourceBatch::new(batch)));
496 }
497 }
498
499 return Ok(None);
500 }
501
502 self.last_activity = Some(Instant::now());
503 return Ok(None);
504 }
505
506 #[cfg(not(feature = "mysql-cdc"))]
508 {
509 let _ = max_records;
510 self.last_activity = Some(Instant::now());
511 Ok(None)
512 }
513 }
514
515 fn schema(&self) -> SchemaRef {
516 self.schema.clone().unwrap_or_else(|| {
518 Arc::new(cdc_envelope_schema(&Schema::empty()))
520 })
521 }
522
523 fn checkpoint(&self) -> SourceCheckpoint {
524 self.create_checkpoint()
525 }
526
527 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
528 self.restore_position(checkpoint);
529 Ok(())
530 }
531
532 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
533 Some(Arc::clone(&self.data_ready))
534 }
535
536 async fn close(&mut self) -> Result<(), ConnectorError> {
537 #[cfg(feature = "mysql-cdc")]
539 {
540 if let Some(tx) = self.reader_shutdown.take() {
541 let _ = tx.send(true);
542 }
543 if let Some(handle) = self.reader_handle.take() {
544 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
545 }
546 self.msg_rx = None;
547 }
548
549 self.connected = false;
550 self.table_cache.clear();
551 self.event_buffer.clear();
552 Ok(())
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use super::*;
559
560 fn test_config() -> MySqlCdcConfig {
561 MySqlCdcConfig {
562 host: "localhost".to_string(),
563 port: 3306,
564 database: Some("testdb".to_string()),
565 username: "root".to_string(),
566 password: Some("test".to_string()),
567 server_id: 12345,
568 ..Default::default()
569 }
570 }
571
572 #[test]
573 fn test_new_source() {
574 let config = test_config();
575 let source = MySqlCdcSource::new(config, None);
576
577 assert!(!source.is_connected());
578 assert_eq!(source.cached_table_count(), 0);
579 assert!(source.position().is_none());
580 assert!(source.gtid_set().is_none());
581 }
582
583 #[test]
584 fn test_from_config() {
585 let mut config = ConnectorConfig::new("mysql-cdc");
586 config.set("host", "mysql.example.com");
587 config.set("port", "3307");
588 config.set("username", "repl");
589 config.set("password", "secret");
590 config.set("server.id", "999");
591
592 let source = MySqlCdcSource::from_config(&config).unwrap();
593 assert_eq!(source.config().host, "mysql.example.com");
594 assert_eq!(source.config().port, 3307);
595 assert_eq!(source.config().server_id, 999);
596 }
597
598 #[test]
599 fn test_from_config_missing_required() {
600 let config = ConnectorConfig::new("mysql-cdc");
601
602 let result = MySqlCdcSource::from_config(&config);
603 assert!(result.is_err());
604 }
605
606 #[test]
607 fn test_restore_position_gtid() {
608 let mut source = MySqlCdcSource::new(test_config(), None);
609
610 let mut checkpoint = SourceCheckpoint::new(1);
611 checkpoint.set_offset("gtid", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
612
613 source.restore_position(&checkpoint);
614 assert!(source.gtid_set().is_some());
615 }
616
617 #[test]
618 fn test_restore_position_file() {
619 let mut source = MySqlCdcSource::new(test_config(), None);
620
621 let mut checkpoint = SourceCheckpoint::new(1);
622 checkpoint.set_offset("binlog_file", "mysql-bin.000003");
623 checkpoint.set_offset("binlog_position", "12345");
624
625 source.restore_position(&checkpoint);
626 let pos = source.position().unwrap();
627 assert_eq!(pos.filename, "mysql-bin.000003");
628 assert_eq!(pos.position, 12345);
629 }
630
631 #[test]
632 fn test_create_checkpoint_gtid() {
633 let mut source = MySqlCdcSource::new(test_config(), None);
634 source.config.use_gtid = true;
635 source.gtid_set = Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap());
636
637 let checkpoint = source.create_checkpoint();
638 assert!(checkpoint.get_offset("gtid").is_some());
639 assert!(checkpoint.get_offset("gtid").unwrap().contains("3e11fa47"));
641 }
642
643 #[test]
644 fn test_create_checkpoint_file() {
645 let mut source = MySqlCdcSource::new(test_config(), None);
646 source.config.use_gtid = false;
647 source.position = Some(BinlogPosition::new("mysql-bin.000003".to_string(), 9999));
648
649 let checkpoint = source.create_checkpoint();
650 assert_eq!(
651 checkpoint.get_offset("binlog_file"),
652 Some("mysql-bin.000003")
653 );
654 assert_eq!(checkpoint.get_offset("binlog_position"), Some("9999"));
655 }
656
657 #[test]
658 fn test_schema() {
659 let source = MySqlCdcSource::new(test_config(), None);
660 let schema = source.schema();
661
662 let field_names: Vec<_> = schema.fields().iter().map(|f| f.name()).collect();
664 assert!(field_names.contains(&&"_table".to_string()));
665 assert!(field_names.contains(&&"_op".to_string()));
666 assert!(field_names.contains(&&"_ts_ms".to_string()));
667 }
668
669 #[test]
670 fn test_table_filtering() {
671 let mut config = test_config();
672 config.table_include = vec!["users".to_string(), "orders".to_string()];
673
674 let source = MySqlCdcSource::new(config, None);
675
676 assert!(source.should_include_table("testdb", "users"));
677 assert!(source.should_include_table("testdb", "orders"));
678 assert!(!source.should_include_table("testdb", "other"));
679 }
680
681 #[cfg(not(feature = "mysql-cdc"))]
683 #[tokio::test]
684 async fn test_open_fails_without_feature() {
685 let mut source = MySqlCdcSource::new(test_config(), None);
686
687 let result = source.open(&ConnectorConfig::default()).await;
688 assert!(result.is_err());
689 let err = result.unwrap_err().to_string();
690 assert!(
691 err.contains("mysql-cdc"),
692 "error should mention feature flag: {err}"
693 );
694 }
695
696 #[tokio::test]
697 async fn test_poll_not_connected() {
698 let mut source = MySqlCdcSource::new(test_config(), None);
699
700 let result = source.poll_batch(100).await;
701 assert!(result.is_err());
702 }
703
704 #[tokio::test]
708 async fn test_restore_async() {
709 let mut source = MySqlCdcSource::new(test_config(), None);
710
711 let mut checkpoint = SourceCheckpoint::new(1);
712 checkpoint.set_offset("binlog_file", "mysql-bin.000005");
713 checkpoint.set_offset("binlog_position", "54321");
714
715 source.restore(&checkpoint).await.unwrap();
716
717 let pos = source.position().unwrap();
718 assert_eq!(pos.filename, "mysql-bin.000005");
719 assert_eq!(pos.position, 54321);
720 }
721}