Skip to main content

laminar_connectors/cdc/mysql/
source.rs

1//! MySQL CDC source connector implementation.
2//!
3//! Implements the [`SourceConnector`] trait for MySQL binlog replication.
4//! This module provides the main entry point for MySQL CDC: [`MySqlCdcSource`].
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use arrow_array::RecordBatch;
10use arrow_schema::{Schema, SchemaRef};
11use async_trait::async_trait;
12use tokio::sync::Notify;
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18
19use super::changelog::ChangeEvent;
20use super::config::MySqlCdcConfig;
21use super::decoder::{BinlogMessage, BinlogPosition};
22use super::gtid::GtidSet;
23use super::metrics::MySqlCdcMetrics;
24use super::schema::{cdc_envelope_schema, TableCache, TableInfo};
25
26/// Single-consumer async receiver for the binlog reader → `poll_batch` queue.
27#[cfg(feature = "mysql-cdc")]
28type BinlogMessageRx = crossfire::AsyncRx<crossfire::mpsc::Array<BinlogMessage>>;
29
30/// MySQL binlog CDC source connector. Reads change events from the MySQL
31/// binary log via replication protocol; supports GTID-based and
32/// file/position-based replication.
33pub struct MySqlCdcSource {
34    /// Configuration for the MySQL CDC connection.
35    config: MySqlCdcConfig,
36
37    /// Whether the source is currently connected.
38    connected: bool,
39
40    /// Cache of table schemas from TABLE_MAP events.
41    table_cache: TableCache,
42
43    /// Current binlog position (file/position).
44    position: Option<BinlogPosition>,
45
46    /// Current GTID set (for GTID-based replication).
47    gtid_set: Option<GtidSet>,
48
49    /// Current binlog filename (updated by ROTATE events).
50    current_binlog_file: String,
51
52    /// Current GTID string (updated by GTID events within a transaction).
53    current_gtid: Option<String>,
54
55    /// Buffered change events waiting to be emitted.
56    event_buffer: Vec<ChangeEvent>,
57
58    /// Metrics for this source.
59    metrics: MySqlCdcMetrics,
60
61    /// Arrow schema for CDC envelope.
62    schema: Option<SchemaRef>,
63
64    /// Last time we received data (for health checks).
65    last_activity: Option<Instant>,
66
67    /// Notification handle signalled when binlog data arrives from the reader task.
68    data_ready: Arc<Notify>,
69
70    /// Channel receiver for decoded binlog messages from the background reader task.
71    #[cfg(feature = "mysql-cdc")]
72    msg_rx: Option<BinlogMessageRx>,
73
74    /// Background binlog reader task handle.
75    #[cfg(feature = "mysql-cdc")]
76    reader_handle: Option<tokio::task::JoinHandle<()>>,
77
78    /// Shutdown signal for the background reader task.
79    #[cfg(feature = "mysql-cdc")]
80    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
81}
82
83// Manual Debug impl because BinlogStream doesn't implement Debug.
84impl std::fmt::Debug for MySqlCdcSource {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("MySqlCdcSource")
87            .field("config", &self.config)
88            .field("connected", &self.connected)
89            .field("table_cache", &self.table_cache)
90            .field("position", &self.position)
91            .field("gtid_set", &self.gtid_set)
92            .field("current_binlog_file", &self.current_binlog_file)
93            .field("current_gtid", &self.current_gtid)
94            .field("event_buffer_len", &self.event_buffer.len())
95            .field("metrics", &self.metrics)
96            .field("schema", &self.schema)
97            .field("last_activity", &self.last_activity)
98            .finish_non_exhaustive()
99    }
100}
101
102impl MySqlCdcSource {
103    /// Creates a new MySQL CDC source with the given configuration.
104    #[must_use]
105    pub fn new(config: MySqlCdcConfig, registry: Option<&prometheus::Registry>) -> Self {
106        Self {
107            config,
108            connected: false,
109            table_cache: TableCache::new(),
110            position: None,
111            gtid_set: None,
112            current_binlog_file: String::new(),
113            current_gtid: None,
114            event_buffer: Vec::new(),
115            metrics: MySqlCdcMetrics::new(registry),
116            schema: None,
117            last_activity: None,
118            data_ready: Arc::new(Notify::new()),
119            #[cfg(feature = "mysql-cdc")]
120            msg_rx: None,
121            #[cfg(feature = "mysql-cdc")]
122            reader_handle: None,
123            #[cfg(feature = "mysql-cdc")]
124            reader_shutdown: None,
125        }
126    }
127
128    /// Creates a MySQL CDC source from a generic connector config.
129    ///
130    /// # Errors
131    ///
132    /// Returns error if required configuration keys are missing.
133    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
134        let mysql_config = MySqlCdcConfig::from_config(config)?;
135        Ok(Self::new(mysql_config, None))
136    }
137
138    /// Returns the number of cached table schemas.
139    #[must_use]
140    pub fn cached_table_count(&self) -> usize {
141        self.table_cache.len()
142    }
143
144    /// Returns the current binlog position.
145    #[must_use]
146    pub fn position(&self) -> Option<&BinlogPosition> {
147        self.position.as_ref()
148    }
149
150    /// Returns the current GTID set.
151    #[must_use]
152    pub fn gtid_set(&self) -> Option<&GtidSet> {
153        self.gtid_set.as_ref()
154    }
155
156    /// Returns a reference to the table cache.
157    #[must_use]
158    pub fn table_cache(&self) -> &TableCache {
159        &self.table_cache
160    }
161
162    /// Returns a reference to the metrics.
163    #[must_use]
164    pub fn cdc_metrics(&self) -> &MySqlCdcMetrics {
165        &self.metrics
166    }
167
168    /// Checks if a table should be included based on filters.
169    #[must_use]
170    pub fn should_include_table(&self, database: &str, table: &str) -> bool {
171        self.config.should_include_table(database, table)
172    }
173
174    /// Returns the configuration.
175    #[must_use]
176    pub fn config(&self) -> &MySqlCdcConfig {
177        &self.config
178    }
179
180    /// Returns whether the source is connected.
181    #[must_use]
182    pub fn is_connected(&self) -> bool {
183        self.connected
184    }
185
186    /// Restores the position from a checkpoint.
187    ///
188    /// Parses the checkpoint offset to extract GTID set or file/position.
189    pub fn restore_position(&mut self, checkpoint: &SourceCheckpoint) {
190        // Try GTID from offset key first
191        if let Some(gtid_str) = checkpoint.get_offset("gtid") {
192            if let Ok(gtid_set) = gtid_str.parse::<GtidSet>() {
193                self.gtid_set = Some(gtid_set);
194                return;
195            }
196        }
197
198        // Try binlog file/position
199        if let (Some(filename), Some(pos_str)) = (
200            checkpoint.get_offset("binlog_file"),
201            checkpoint.get_offset("binlog_position"),
202        ) {
203            if let Ok(pos) = pos_str.parse::<u64>() {
204                self.position = Some(BinlogPosition::new(filename.to_string(), pos));
205            }
206        }
207    }
208
209    /// Creates a checkpoint representing the current position.
210    #[must_use]
211    pub fn create_checkpoint(&self) -> SourceCheckpoint {
212        let mut checkpoint = SourceCheckpoint::new(0);
213
214        if self.config.use_gtid {
215            if let Some(ref gtid_set) = self.gtid_set {
216                checkpoint.set_offset("gtid", gtid_set.to_string());
217            }
218        } else if let Some(ref pos) = self.position {
219            checkpoint.set_offset("binlog_file", &pos.filename);
220            checkpoint.set_offset("binlog_position", pos.position.to_string());
221        }
222
223        checkpoint.set_metadata("server_id", self.config.server_id.to_string());
224
225        checkpoint
226    }
227
228    /// Builds the CDC envelope schema based on a table schema.
229    #[allow(clippy::unused_self)] // Will use self for config options
230    fn build_envelope_schema(&self, table_schema: &Schema) -> SchemaRef {
231        Arc::new(cdc_envelope_schema(table_schema))
232    }
233
234    /// Flushes buffered events to a RecordBatch.
235    ///
236    /// # Errors
237    ///
238    /// Returns error if batch conversion fails.
239    pub fn flush_events(
240        &mut self,
241        table_info: &TableInfo,
242    ) -> Result<Option<RecordBatch>, ConnectorError> {
243        if self.event_buffer.is_empty() {
244            return Ok(None);
245        }
246
247        let events: Vec<_> = self.event_buffer.drain(..).collect();
248        let batch = super::changelog::events_to_record_batch(&events, table_info)
249            .map_err(|e| ConnectorError::Internal(e.to_string()))?;
250        Ok(Some(batch))
251    }
252}
253
254#[async_trait]
255#[allow(clippy::too_many_lines)]
256impl SourceConnector for MySqlCdcSource {
257    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
258        // Parse and update config if provided
259        if !config.properties().is_empty() {
260            self.config = MySqlCdcConfig::from_config(config)?;
261        }
262
263        // Validate configuration
264        self.config.validate()?;
265
266        // Initialize GTID set from config
267        self.gtid_set.clone_from(&self.config.gtid_set);
268
269        // Initialize binlog position from config
270        if let Some(ref filename) = self.config.binlog_filename {
271            self.current_binlog_file.clone_from(filename);
272            if let Some(pos) = self.config.binlog_position {
273                self.position = Some(BinlogPosition::new(filename.clone(), pos));
274            }
275        }
276
277        // Without mysql-cdc feature, open() must fail loudly to prevent
278        // silent data loss (poll_batch would return Ok(None) forever).
279        #[cfg(not(feature = "mysql-cdc"))]
280        {
281            return Err(ConnectorError::ConfigurationError(
282                "MySQL CDC source requires the `mysql-cdc` feature flag. \
283                 Rebuild with `--features mysql-cdc` to enable."
284                    .to_string(),
285            ));
286        }
287
288        // When mysql-cdc feature is enabled, establish a real connection
289        // and spawn a background reader task for event-driven wake-up.
290        #[cfg(feature = "mysql-cdc")]
291        {
292            let conn = super::mysql_io::connect(&self.config).await?;
293            let stream = super::mysql_io::start_binlog_stream(
294                conn,
295                &self.config,
296                self.gtid_set.as_ref(),
297                self.position.as_ref(),
298            )
299            .await?;
300
301            let (msg_tx, msg_rx) = crossfire::mpsc::bounded_async::<BinlogMessage>(4096);
302            let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
303            let data_ready = Arc::clone(&self.data_ready);
304
305            let reader_handle = tokio::spawn(async move {
306                use tokio_stream::StreamExt as _;
307                let mut stream = stream;
308                loop {
309                    let event = tokio::select! {
310                        biased;
311                        _ = shutdown_rx.changed() => break,
312                        event = stream.next() => event,
313                    };
314                    match event {
315                        Some(Ok(raw_event)) => {
316                            match super::mysql_io::decode_binlog_event(&raw_event, &stream) {
317                                Ok(Some(msg)) => {
318                                    if msg_tx.send(msg).await.is_err() {
319                                        break;
320                                    }
321                                    data_ready.notify_one();
322                                }
323                                Ok(None) => {}
324                                Err(e) => {
325                                    tracing::warn!(error = %e, "binlog decode error");
326                                    break;
327                                }
328                            }
329                        }
330                        Some(Err(e)) => {
331                            tracing::warn!(error = %e, "binlog stream error");
332                            break;
333                        }
334                        None => break,
335                    }
336                }
337                if let Err(e) = stream.close().await {
338                    tracing::warn!(error = %e, "error closing binlog stream");
339                }
340            });
341
342            self.msg_rx = Some(msg_rx);
343            self.reader_handle = Some(reader_handle);
344            self.reader_shutdown = Some(shutdown_tx);
345        }
346
347        self.connected = true;
348        self.last_activity = Some(Instant::now());
349
350        Ok(())
351    }
352
353    async fn poll_batch(
354        &mut self,
355        max_records: usize,
356    ) -> Result<Option<SourceBatch>, ConnectorError> {
357        if !self.connected {
358            return Err(ConnectorError::ConfigurationError(
359                "Source not connected".to_string(),
360            ));
361        }
362
363        // Drain decoded binlog messages from background reader task.
364        //
365        // Backpressure: when the event buffer exceeds the high watermark,
366        // stop draining the reader channel. The bounded mpsc channel (4096)
367        // propagates backpressure to the binlog reader task, which in turn
368        // applies TCP backpressure to the replication connection.
369        #[cfg(feature = "mysql-cdc")]
370        {
371            let high_watermark = self.config.backpressure_high_watermark();
372
373            if self.event_buffer.len() >= high_watermark {
374                tracing::debug!(
375                    buffered = self.event_buffer.len(),
376                    high_watermark,
377                    "CDC backpressure active — pausing binlog reader drain"
378                );
379            } else if let Some(rx) = self.msg_rx.as_mut() {
380                let mut last_table_info: Option<TableInfo> = None;
381
382                while self.event_buffer.len() < max_records
383                    && self.event_buffer.len() < high_watermark
384                {
385                    match rx.try_recv() {
386                        Ok(msg) => {
387                            self.metrics.inc_events_received();
388                            match msg {
389                                BinlogMessage::TableMap(tme) => {
390                                    self.metrics.inc_table_maps();
391                                    self.table_cache.update(&tme);
392                                }
393                                BinlogMessage::Insert(insert_msg) => {
394                                    if !self.config.should_include_table(
395                                        &insert_msg.database,
396                                        &insert_msg.table,
397                                    ) {
398                                        continue;
399                                    }
400                                    let row_count = insert_msg.rows.len() as u64;
401                                    let events = super::changelog::insert_to_events(
402                                        &insert_msg,
403                                        &self.current_binlog_file,
404                                        self.current_gtid.as_deref(),
405                                    );
406                                    self.event_buffer.extend(events);
407                                    self.metrics.inc_inserts(row_count);
408                                    last_table_info =
409                                        self.table_cache.get(insert_msg.table_id).cloned();
410                                }
411                                BinlogMessage::Update(update_msg) => {
412                                    if !self.config.should_include_table(
413                                        &update_msg.database,
414                                        &update_msg.table,
415                                    ) {
416                                        continue;
417                                    }
418                                    let row_count = update_msg.rows.len() as u64;
419                                    let events = super::changelog::update_to_events(
420                                        &update_msg,
421                                        &self.current_binlog_file,
422                                        self.current_gtid.as_deref(),
423                                    );
424                                    self.event_buffer.extend(events);
425                                    self.metrics.inc_updates(row_count);
426                                    last_table_info =
427                                        self.table_cache.get(update_msg.table_id).cloned();
428                                }
429                                BinlogMessage::Delete(delete_msg) => {
430                                    if !self.config.should_include_table(
431                                        &delete_msg.database,
432                                        &delete_msg.table,
433                                    ) {
434                                        continue;
435                                    }
436                                    let row_count = delete_msg.rows.len() as u64;
437                                    let events = super::changelog::delete_to_events(
438                                        &delete_msg,
439                                        &self.current_binlog_file,
440                                        self.current_gtid.as_deref(),
441                                    );
442                                    self.event_buffer.extend(events);
443                                    self.metrics.inc_deletes(row_count);
444                                    last_table_info =
445                                        self.table_cache.get(delete_msg.table_id).cloned();
446                                }
447                                BinlogMessage::Begin(begin_msg) => {
448                                    if let Some(ref gtid) = begin_msg.gtid {
449                                        self.current_gtid = Some(gtid.to_string());
450                                        if let Some(ref mut gtid_set) = self.gtid_set {
451                                            gtid_set.add(gtid);
452                                        }
453                                    } else {
454                                        self.current_gtid = None;
455                                    }
456                                }
457                                BinlogMessage::Commit(commit_msg) => {
458                                    self.metrics.inc_transactions();
459                                    self.metrics.set_binlog_position(commit_msg.binlog_position);
460                                    if let Some(ref mut pos) = self.position {
461                                        pos.position = commit_msg.binlog_position;
462                                    }
463                                }
464                                BinlogMessage::Rotate(rotate_msg) => {
465                                    self.current_binlog_file.clone_from(&rotate_msg.next_binlog);
466                                    if let Some(ref mut pos) = self.position {
467                                        pos.filename = rotate_msg.next_binlog;
468                                        pos.position = rotate_msg.position;
469                                    } else {
470                                        self.position = Some(BinlogPosition::new(
471                                            self.current_binlog_file.clone(),
472                                            rotate_msg.position,
473                                        ));
474                                    }
475                                }
476                                BinlogMessage::Query(query_msg) => {
477                                    self.metrics.inc_ddl_events();
478                                    let _ = query_msg;
479                                }
480                                BinlogMessage::Heartbeat => {
481                                    self.metrics.inc_heartbeats();
482                                }
483                            }
484                        }
485                        Err(_) => break,
486                    }
487                }
488
489                self.last_activity = Some(Instant::now());
490
491                if let Some(table_info) = last_table_info {
492                    if let Some(batch) = self.flush_events(&table_info)? {
493                        let schema = self.build_envelope_schema(&table_info.arrow_schema);
494                        self.schema = Some(schema);
495                        return Ok(Some(SourceBatch::new(batch)));
496                    }
497                }
498
499                return Ok(None);
500            }
501
502            self.last_activity = Some(Instant::now());
503            return Ok(None);
504        }
505
506        // Without mysql-cdc feature: stub returns None.
507        #[cfg(not(feature = "mysql-cdc"))]
508        {
509            let _ = max_records;
510            self.last_activity = Some(Instant::now());
511            Ok(None)
512        }
513    }
514
515    fn schema(&self) -> SchemaRef {
516        // Return cached schema or a default CDC envelope schema
517        self.schema.clone().unwrap_or_else(|| {
518            // Default CDC envelope with no table-specific columns
519            Arc::new(cdc_envelope_schema(&Schema::empty()))
520        })
521    }
522
523    fn checkpoint(&self) -> SourceCheckpoint {
524        self.create_checkpoint()
525    }
526
527    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
528        self.restore_position(checkpoint);
529        Ok(())
530    }
531
532    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
533        Some(Arc::clone(&self.data_ready))
534    }
535
536    async fn close(&mut self) -> Result<(), ConnectorError> {
537        // Signal reader task to shut down (it closes the binlog stream internally).
538        #[cfg(feature = "mysql-cdc")]
539        {
540            if let Some(tx) = self.reader_shutdown.take() {
541                let _ = tx.send(true);
542            }
543            if let Some(handle) = self.reader_handle.take() {
544                let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
545            }
546            self.msg_rx = None;
547        }
548
549        self.connected = false;
550        self.table_cache.clear();
551        self.event_buffer.clear();
552        Ok(())
553    }
554}
555
556#[cfg(test)]
557mod tests {
558    use super::*;
559
560    fn test_config() -> MySqlCdcConfig {
561        MySqlCdcConfig {
562            host: "localhost".to_string(),
563            port: 3306,
564            database: Some("testdb".to_string()),
565            username: "root".to_string(),
566            password: Some("test".to_string()),
567            server_id: 12345,
568            ..Default::default()
569        }
570    }
571
572    #[test]
573    fn test_new_source() {
574        let config = test_config();
575        let source = MySqlCdcSource::new(config, None);
576
577        assert!(!source.is_connected());
578        assert_eq!(source.cached_table_count(), 0);
579        assert!(source.position().is_none());
580        assert!(source.gtid_set().is_none());
581    }
582
583    #[test]
584    fn test_from_config() {
585        let mut config = ConnectorConfig::new("mysql-cdc");
586        config.set("host", "mysql.example.com");
587        config.set("port", "3307");
588        config.set("username", "repl");
589        config.set("password", "secret");
590        config.set("server.id", "999");
591
592        let source = MySqlCdcSource::from_config(&config).unwrap();
593        assert_eq!(source.config().host, "mysql.example.com");
594        assert_eq!(source.config().port, 3307);
595        assert_eq!(source.config().server_id, 999);
596    }
597
598    #[test]
599    fn test_from_config_missing_required() {
600        let config = ConnectorConfig::new("mysql-cdc");
601
602        let result = MySqlCdcSource::from_config(&config);
603        assert!(result.is_err());
604    }
605
606    #[test]
607    fn test_restore_position_gtid() {
608        let mut source = MySqlCdcSource::new(test_config(), None);
609
610        let mut checkpoint = SourceCheckpoint::new(1);
611        checkpoint.set_offset("gtid", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
612
613        source.restore_position(&checkpoint);
614        assert!(source.gtid_set().is_some());
615    }
616
617    #[test]
618    fn test_restore_position_file() {
619        let mut source = MySqlCdcSource::new(test_config(), None);
620
621        let mut checkpoint = SourceCheckpoint::new(1);
622        checkpoint.set_offset("binlog_file", "mysql-bin.000003");
623        checkpoint.set_offset("binlog_position", "12345");
624
625        source.restore_position(&checkpoint);
626        let pos = source.position().unwrap();
627        assert_eq!(pos.filename, "mysql-bin.000003");
628        assert_eq!(pos.position, 12345);
629    }
630
631    #[test]
632    fn test_create_checkpoint_gtid() {
633        let mut source = MySqlCdcSource::new(test_config(), None);
634        source.config.use_gtid = true;
635        source.gtid_set = Some("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5".parse().unwrap());
636
637        let checkpoint = source.create_checkpoint();
638        assert!(checkpoint.get_offset("gtid").is_some());
639        // UUID is stored and displayed as lowercase
640        assert!(checkpoint.get_offset("gtid").unwrap().contains("3e11fa47"));
641    }
642
643    #[test]
644    fn test_create_checkpoint_file() {
645        let mut source = MySqlCdcSource::new(test_config(), None);
646        source.config.use_gtid = false;
647        source.position = Some(BinlogPosition::new("mysql-bin.000003".to_string(), 9999));
648
649        let checkpoint = source.create_checkpoint();
650        assert_eq!(
651            checkpoint.get_offset("binlog_file"),
652            Some("mysql-bin.000003")
653        );
654        assert_eq!(checkpoint.get_offset("binlog_position"), Some("9999"));
655    }
656
657    #[test]
658    fn test_schema() {
659        let source = MySqlCdcSource::new(test_config(), None);
660        let schema = source.schema();
661
662        // Should have CDC envelope fields
663        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name()).collect();
664        assert!(field_names.contains(&&"_table".to_string()));
665        assert!(field_names.contains(&&"_op".to_string()));
666        assert!(field_names.contains(&&"_ts_ms".to_string()));
667    }
668
669    #[test]
670    fn test_table_filtering() {
671        let mut config = test_config();
672        config.table_include = vec!["users".to_string(), "orders".to_string()];
673
674        let source = MySqlCdcSource::new(config, None);
675
676        assert!(source.should_include_table("testdb", "users"));
677        assert!(source.should_include_table("testdb", "orders"));
678        assert!(!source.should_include_table("testdb", "other"));
679    }
680
681    // Without mysql-cdc feature, open() must return an error to prevent silent data loss.
682    #[cfg(not(feature = "mysql-cdc"))]
683    #[tokio::test]
684    async fn test_open_fails_without_feature() {
685        let mut source = MySqlCdcSource::new(test_config(), None);
686
687        let result = source.open(&ConnectorConfig::default()).await;
688        assert!(result.is_err());
689        let err = result.unwrap_err().to_string();
690        assert!(
691            err.contains("mysql-cdc"),
692            "error should mention feature flag: {err}"
693        );
694    }
695
696    #[tokio::test]
697    async fn test_poll_not_connected() {
698        let mut source = MySqlCdcSource::new(test_config(), None);
699
700        let result = source.poll_batch(100).await;
701        assert!(result.is_err());
702    }
703
704    // Without mysql-cdc feature, open() returns an error, so poll is unreachable.
705    // With mysql-cdc, open() needs a real MySQL server. Covered by integration tests.
706
707    #[tokio::test]
708    async fn test_restore_async() {
709        let mut source = MySqlCdcSource::new(test_config(), None);
710
711        let mut checkpoint = SourceCheckpoint::new(1);
712        checkpoint.set_offset("binlog_file", "mysql-bin.000005");
713        checkpoint.set_offset("binlog_position", "54321");
714
715        source.restore(&checkpoint).await.unwrap();
716
717        let pos = source.position().unwrap();
718        assert_eq!(pos.filename, "mysql-bin.000005");
719        assert_eq!(pos.position, 54321);
720    }
721}