Skip to main content

laminar_connectors/cdc/mysql/
config.rs

1//! MySQL CDC source connector configuration.
2//!
3//! Provides [`MySqlCdcConfig`] with all settings needed to connect to
4//! a MySQL database and stream binlog changes.
5
6use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11use super::gtid::GtidSet;
12
13/// Configuration for the MySQL CDC source connector.
14#[derive(Debug, Clone)]
15pub struct MySqlCdcConfig {
16    // ── Connection ──
17    /// MySQL host address.
18    pub host: String,
19
20    /// MySQL port.
21    pub port: u16,
22
23    /// Database name (optional for binlog replication).
24    pub database: Option<String>,
25
26    /// Username for authentication.
27    pub username: String,
28
29    /// Password for authentication.
30    pub password: Option<String>,
31
32    /// SSL mode for the connection.
33    pub ssl_mode: SslMode,
34
35    // ── Replication ──
36    /// Server ID for the replica (must be unique in the topology).
37    /// MySQL requires each replica to have a unique server ID.
38    pub server_id: u32,
39
40    /// GTID set to start replication from (None = use binlog position).
41    /// Using GTID is recommended for failover support.
42    pub gtid_set: Option<GtidSet>,
43
44    /// Binlog filename to start from (if not using GTID).
45    pub binlog_filename: Option<String>,
46
47    /// Binlog position to start from (if not using GTID).
48    pub binlog_position: Option<u64>,
49
50    /// Whether to use GTID-based replication (recommended).
51    pub use_gtid: bool,
52
53    // ── Snapshot ──
54    /// How to handle the initial data snapshot.
55    pub snapshot_mode: SnapshotMode,
56
57    // ── Tuning ──
58    /// Timeout for each poll operation.
59    pub poll_timeout: Duration,
60
61    /// Maximum records to return per poll.
62    pub max_poll_records: usize,
63
64    /// Heartbeat interval for the binlog connection.
65    pub heartbeat_interval: Duration,
66
67    /// Connection timeout.
68    pub connect_timeout: Duration,
69
70    /// Read timeout for binlog events.
71    pub read_timeout: Duration,
72
73    // ── Schema ──
74    /// Tables to include (format: "database.table" or just "table").
75    /// Empty = all tables.
76    pub table_include: Vec<String>,
77
78    /// Tables to exclude from replication.
79    pub table_exclude: Vec<String>,
80
81    /// Database filter (if set, only replicate from this database).
82    pub database_filter: Option<String>,
83
84    /// Maximum events to buffer (default: 100,000).
85    pub max_buffered_events: usize,
86
87    /// High watermark ratio (0.0–1.0) of `max_buffered_events`. When the
88    /// buffer reaches this level, stop draining the binlog reader channel
89    /// to apply backpressure (default: 0.8).
90    pub backpressure_high_watermark: f64,
91}
92
93impl Default for MySqlCdcConfig {
94    fn default() -> Self {
95        Self {
96            host: "localhost".to_string(),
97            port: 3306,
98            database: None,
99            username: "root".to_string(),
100            password: None,
101            ssl_mode: SslMode::Preferred,
102            server_id: 1001, // Default replica server ID
103            gtid_set: None,
104            binlog_filename: None,
105            binlog_position: None,
106            use_gtid: true, // GTID is the recommended approach
107            snapshot_mode: SnapshotMode::Initial,
108            poll_timeout: Duration::from_millis(100),
109            max_poll_records: 1000,
110            heartbeat_interval: Duration::from_secs(30),
111            connect_timeout: Duration::from_secs(10),
112            read_timeout: Duration::from_secs(60),
113            table_include: Vec::new(),
114            table_exclude: Vec::new(),
115            database_filter: None,
116            max_buffered_events: 100_000,
117            backpressure_high_watermark: 0.8,
118        }
119    }
120}
121
122impl MySqlCdcConfig {
123    /// Returns the high watermark as an absolute event count.
124    #[must_use]
125    #[allow(
126        clippy::cast_precision_loss,
127        clippy::cast_possible_truncation,
128        clippy::cast_sign_loss
129    )]
130    pub fn backpressure_high_watermark(&self) -> usize {
131        (self.max_buffered_events as f64 * self.backpressure_high_watermark) as usize
132    }
133
134    /// Creates a new config with required fields.
135    #[must_use]
136    pub fn new(host: &str, username: &str) -> Self {
137        Self {
138            host: host.to_string(),
139            username: username.to_string(),
140            ..Self::default()
141        }
142    }
143
144    /// Creates a new config with server ID.
145    #[must_use]
146    pub fn with_server_id(host: &str, username: &str, server_id: u32) -> Self {
147        Self {
148            host: host.to_string(),
149            username: username.to_string(),
150            server_id,
151            ..Self::default()
152        }
153    }
154
155    /// Builds a MySQL connection URL.
156    #[must_use]
157    pub fn connection_url(&self) -> String {
158        let mut url = format!("mysql://{}@{}:{}", self.username, self.host, self.port);
159        if let Some(ref db) = self.database {
160            url.push('/');
161            url.push_str(db);
162        }
163        url
164    }
165
166    /// Parses configuration from a generic [`ConnectorConfig`].
167    ///
168    /// # Errors
169    ///
170    /// Returns `ConnectorError` if required keys are missing or values are
171    /// invalid.
172    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
173        let mut cfg = Self {
174            host: config.require("host")?.to_string(),
175            username: config.require("username")?.to_string(),
176            ..Self::default()
177        };
178
179        if let Some(port) = config.get("port") {
180            cfg.port = crate::config::parse_port(port)?;
181        }
182        cfg.database = config.get("database").map(String::from);
183        cfg.password = config.get("password").map(String::from);
184
185        if let Some(ssl) = config.get_parsed::<SslMode>("ssl.mode")? {
186            cfg.ssl_mode = ssl;
187        }
188
189        if let Some(id) = config.get_parsed::<u32>("server.id")? {
190            cfg.server_id = id;
191        }
192
193        if let Some(gtid) = config.get_parsed::<GtidSet>("gtid.set")? {
194            cfg.gtid_set = Some(gtid);
195        }
196
197        cfg.binlog_filename = config.get("binlog.filename").map(String::from);
198
199        if let Some(pos) = config.get_parsed::<u64>("binlog.position")? {
200            cfg.binlog_position = Some(pos);
201        }
202
203        if let Some(use_gtid) = config.get("use.gtid") {
204            cfg.use_gtid = use_gtid.parse().unwrap_or(true);
205        }
206
207        if let Some(mode) = config.get_parsed::<SnapshotMode>("snapshot.mode")? {
208            cfg.snapshot_mode = mode;
209        }
210
211        if let Some(timeout) = config.get_parsed::<u64>("poll.timeout.ms")? {
212            cfg.poll_timeout = Duration::from_millis(timeout);
213        }
214        if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
215            cfg.max_poll_records = max;
216        }
217        if let Some(interval) = config.get_parsed::<u64>("heartbeat.interval.ms")? {
218            cfg.heartbeat_interval = Duration::from_millis(interval);
219        }
220        if let Some(timeout) = config.get_parsed::<u64>("connect.timeout.ms")? {
221            cfg.connect_timeout = Duration::from_millis(timeout);
222        }
223        if let Some(timeout) = config.get_parsed::<u64>("read.timeout.ms")? {
224            cfg.read_timeout = Duration::from_millis(timeout);
225        }
226
227        if let Some(tables) = config.get("table.include") {
228            cfg.table_include = tables.split(',').map(|s| s.trim().to_string()).collect();
229        }
230        if let Some(tables) = config.get("table.exclude") {
231            cfg.table_exclude = tables.split(',').map(|s| s.trim().to_string()).collect();
232        }
233        cfg.database_filter = config.get("database.filter").map(String::from);
234
235        if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
236            cfg.max_buffered_events = max;
237        }
238        if let Some(hw) = config.get_parsed::<f64>("backpressure.high.watermark")? {
239            cfg.backpressure_high_watermark = hw;
240        }
241
242        cfg.validate()?;
243        Ok(cfg)
244    }
245
246    /// Validates the configuration.
247    ///
248    /// # Errors
249    ///
250    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
251    pub fn validate(&self) -> Result<(), ConnectorError> {
252        crate::config::require_non_empty(&self.host, "host")?;
253        crate::config::require_non_empty(&self.username, "username")?;
254        if self.server_id == 0 {
255            return Err(ConnectorError::ConfigurationError(
256                "server.id must be > 0".to_string(),
257            ));
258        }
259        if self.max_poll_records == 0 {
260            return Err(ConnectorError::ConfigurationError(
261                "max.poll.records must be > 0".to_string(),
262            ));
263        }
264
265        // If not using GTID, binlog filename should be specified for explicit positioning
266        if !self.use_gtid && self.binlog_filename.is_none() && self.binlog_position.is_some() {
267            return Err(ConnectorError::ConfigurationError(
268                "binlog.filename required when binlog.position is set".to_string(),
269            ));
270        }
271
272        Ok(())
273    }
274
275    /// Returns whether a table should be included based on include/exclude lists.
276    #[must_use]
277    pub fn should_include_table(&self, database: &str, table: &str) -> bool {
278        let full_name = format!("{database}.{table}");
279
280        // Check exclude list first
281        if self
282            .table_exclude
283            .iter()
284            .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
285        {
286            return false;
287        }
288
289        // Check database filter
290        if let Some(ref db_filter) = self.database_filter {
291            if db_filter != database {
292                return false;
293            }
294        }
295
296        // Check include list
297        if self.table_include.is_empty() {
298            return true;
299        }
300
301        self.table_include
302            .iter()
303            .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
304    }
305
306    /// Returns the replication mode description.
307    #[must_use]
308    pub fn replication_mode(&self) -> &'static str {
309        if self.use_gtid {
310            "GTID"
311        } else {
312            "binlog position"
313        }
314    }
315}
316
317/// SSL connection mode for MySQL.
318#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
319pub enum SslMode {
320    /// No SSL.
321    Disabled,
322    /// Try SSL, fall back to unencrypted.
323    #[default]
324    Preferred,
325    /// Require SSL.
326    Required,
327    /// Require SSL and verify CA certificate.
328    VerifyCa,
329    /// Require SSL and verify server identity.
330    VerifyIdentity,
331}
332
333str_enum!(SslMode, lowercase_nodash, String, "unknown SSL mode",
334    Disabled => "disabled", "disable", "false";
335    Preferred => "preferred", "prefer";
336    Required => "required", "require", "true";
337    VerifyCa => "verify_ca", "verify-ca";
338    VerifyIdentity => "verify_identity", "verify-identity"
339);
340
341/// How to handle the initial snapshot when no prior checkpoint exists.
342#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
343pub enum SnapshotMode {
344    /// Take a full snapshot on first start, then switch to streaming.
345    #[default]
346    Initial,
347    /// Never take a snapshot; only stream from current binlog position.
348    Never,
349    /// Always take a snapshot on startup, even if a checkpoint exists.
350    Always,
351    /// Only take a schema snapshot, no data.
352    SchemaOnly,
353}
354
355str_enum!(SnapshotMode, lowercase_nodash, String, "unknown snapshot mode",
356    Initial => "initial";
357    Never => "never";
358    Always => "always";
359    SchemaOnly => "schema_only", "schema-only"
360);
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_default_config() {
368        let cfg = MySqlCdcConfig::default();
369        assert_eq!(cfg.host, "localhost");
370        assert_eq!(cfg.port, 3306);
371        assert_eq!(cfg.username, "root");
372        assert!(cfg.use_gtid);
373        assert_eq!(cfg.ssl_mode, SslMode::Preferred);
374        assert_eq!(cfg.snapshot_mode, SnapshotMode::Initial);
375        assert_eq!(cfg.server_id, 1001);
376    }
377
378    #[test]
379    fn test_new_config() {
380        let cfg = MySqlCdcConfig::new("db.example.com", "myuser");
381        assert_eq!(cfg.host, "db.example.com");
382        assert_eq!(cfg.username, "myuser");
383    }
384
385    #[test]
386    fn test_with_server_id() {
387        let cfg = MySqlCdcConfig::with_server_id("db.example.com", "myuser", 5000);
388        assert_eq!(cfg.server_id, 5000);
389    }
390
391    #[test]
392    fn test_connection_url() {
393        let mut cfg = MySqlCdcConfig::new("db.example.com", "myuser");
394        assert_eq!(cfg.connection_url(), "mysql://myuser@db.example.com:3306");
395
396        cfg.database = Some("mydb".to_string());
397        assert_eq!(
398            cfg.connection_url(),
399            "mysql://myuser@db.example.com:3306/mydb"
400        );
401    }
402
403    #[test]
404    fn test_from_connector_config() {
405        let mut config = ConnectorConfig::new("mysql-cdc");
406        config.set("host", "mysql.local");
407        config.set("username", "repl_user");
408        config.set("password", "secret");
409        config.set("port", "3307");
410        config.set("server.id", "2000");
411        config.set("ssl.mode", "required");
412        config.set("snapshot.mode", "never");
413        config.set("use.gtid", "true");
414        config.set("max.poll.records", "500");
415
416        let cfg = MySqlCdcConfig::from_config(&config).unwrap();
417        assert_eq!(cfg.host, "mysql.local");
418        assert_eq!(cfg.username, "repl_user");
419        assert_eq!(cfg.password, Some("secret".to_string()));
420        assert_eq!(cfg.port, 3307);
421        assert_eq!(cfg.server_id, 2000);
422        assert_eq!(cfg.ssl_mode, SslMode::Required);
423        assert_eq!(cfg.snapshot_mode, SnapshotMode::Never);
424        assert!(cfg.use_gtid);
425        assert_eq!(cfg.max_poll_records, 500);
426    }
427
428    #[test]
429    fn test_from_config_missing_required() {
430        let config = ConnectorConfig::new("mysql-cdc");
431        assert!(MySqlCdcConfig::from_config(&config).is_err());
432    }
433
434    #[test]
435    fn test_validate_empty_host() {
436        let mut cfg = MySqlCdcConfig::default();
437        cfg.host = String::new();
438        assert!(cfg.validate().is_err());
439    }
440
441    #[test]
442    fn test_validate_zero_server_id() {
443        let mut cfg = MySqlCdcConfig::default();
444        cfg.server_id = 0;
445        assert!(cfg.validate().is_err());
446    }
447
448    #[test]
449    fn test_validate_binlog_position_without_filename() {
450        let mut cfg = MySqlCdcConfig::default();
451        cfg.use_gtid = false;
452        cfg.binlog_position = Some(12345);
453        cfg.binlog_filename = None;
454        assert!(cfg.validate().is_err());
455    }
456
457    #[test]
458    fn test_ssl_mode_fromstr() {
459        assert_eq!("disabled".parse::<SslMode>().unwrap(), SslMode::Disabled);
460        assert_eq!("preferred".parse::<SslMode>().unwrap(), SslMode::Preferred);
461        assert_eq!("required".parse::<SslMode>().unwrap(), SslMode::Required);
462        assert_eq!("verify_ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
463        assert_eq!(
464            "verify_identity".parse::<SslMode>().unwrap(),
465            SslMode::VerifyIdentity
466        );
467        assert!("invalid".parse::<SslMode>().is_err());
468    }
469
470    #[test]
471    fn test_snapshot_mode_fromstr() {
472        assert_eq!(
473            "initial".parse::<SnapshotMode>().unwrap(),
474            SnapshotMode::Initial
475        );
476        assert_eq!(
477            "never".parse::<SnapshotMode>().unwrap(),
478            SnapshotMode::Never
479        );
480        assert_eq!(
481            "always".parse::<SnapshotMode>().unwrap(),
482            SnapshotMode::Always
483        );
484        assert_eq!(
485            "schema_only".parse::<SnapshotMode>().unwrap(),
486            SnapshotMode::SchemaOnly
487        );
488        assert!("bad".parse::<SnapshotMode>().is_err());
489    }
490
491    #[test]
492    fn test_table_filtering_simple() {
493        let mut cfg = MySqlCdcConfig::default();
494        // No filters → include all
495        assert!(cfg.should_include_table("mydb", "users"));
496
497        // Include list - exact match
498        cfg.table_include = vec!["mydb.users".to_string()];
499        assert!(cfg.should_include_table("mydb", "users"));
500        assert!(!cfg.should_include_table("mydb", "orders"));
501    }
502
503    #[test]
504    fn test_table_filtering_exclude() {
505        let mut cfg = MySqlCdcConfig::default();
506        cfg.table_exclude = vec!["mydb.logs".to_string()];
507        assert!(cfg.should_include_table("mydb", "users"));
508        assert!(!cfg.should_include_table("mydb", "logs"));
509    }
510
511    #[test]
512    fn test_table_filtering_database() {
513        let mut cfg = MySqlCdcConfig::default();
514        cfg.database_filter = Some("production".to_string());
515        assert!(cfg.should_include_table("production", "users"));
516        assert!(!cfg.should_include_table("staging", "users"));
517    }
518
519    #[test]
520    fn test_table_filtering_table_name_only() {
521        let mut cfg = MySqlCdcConfig::default();
522        cfg.table_include = vec!["users".to_string()];
523        assert!(cfg.should_include_table("any_db", "users"));
524        assert!(!cfg.should_include_table("any_db", "orders"));
525    }
526
527    #[test]
528    fn test_replication_mode() {
529        let mut cfg = MySqlCdcConfig::default();
530        cfg.use_gtid = true;
531        assert_eq!(cfg.replication_mode(), "GTID");
532
533        cfg.use_gtid = false;
534        assert_eq!(cfg.replication_mode(), "binlog position");
535    }
536
537    #[test]
538    fn test_from_config_with_gtid_set() {
539        let mut config = ConnectorConfig::new("mysql-cdc");
540        config.set("host", "localhost");
541        config.set("username", "root");
542        config.set("gtid.set", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10");
543
544        let cfg = MySqlCdcConfig::from_config(&config).unwrap();
545        assert!(cfg.gtid_set.is_some());
546    }
547
548    #[test]
549    fn test_from_config_with_binlog_position() {
550        let mut config = ConnectorConfig::new("mysql-cdc");
551        config.set("host", "localhost");
552        config.set("username", "root");
553        config.set("use.gtid", "false");
554        config.set("binlog.filename", "mysql-bin.000003");
555        config.set("binlog.position", "12345");
556
557        let cfg = MySqlCdcConfig::from_config(&config).unwrap();
558        assert!(!cfg.use_gtid);
559        assert_eq!(cfg.binlog_filename, Some("mysql-bin.000003".to_string()));
560        assert_eq!(cfg.binlog_position, Some(12345));
561    }
562}