Skip to main content

laminar_connectors/cdc/mysql/
config.rs

1//! MySQL CDC source connector configuration.
2//!
3//! Provides [`MySqlCdcConfig`] with all settings needed to connect to
4//! a MySQL database and stream binlog changes.
5
6use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11use super::gtid::GtidSet;
12
13/// Configuration for the MySQL CDC source connector.
14#[derive(Debug, Clone)]
15pub struct MySqlCdcConfig {
16    // ── Connection ──
17    /// MySQL host address.
18    pub host: String,
19
20    /// MySQL port.
21    pub port: u16,
22
23    /// Database name (optional for binlog replication).
24    pub database: Option<String>,
25
26    /// Username for authentication.
27    pub username: String,
28
29    /// Password for authentication.
30    pub password: Option<String>,
31
32    /// SSL mode for the connection.
33    pub ssl_mode: SslMode,
34
35    // ── Replication ──
36    /// Server ID for the replica (must be unique in the topology).
37    /// MySQL requires each replica to have a unique server ID.
38    pub server_id: u32,
39
40    /// GTID set to start replication from (None = use binlog position).
41    /// Using GTID is recommended for failover support.
42    pub gtid_set: Option<GtidSet>,
43
44    /// Binlog filename to start from (if not using GTID).
45    pub binlog_filename: Option<String>,
46
47    /// Binlog position to start from (if not using GTID).
48    pub binlog_position: Option<u64>,
49
50    /// Whether to use GTID-based replication (recommended).
51    pub use_gtid: bool,
52
53    // ── Snapshot ──
54    /// How to handle the initial data snapshot.
55    pub snapshot_mode: SnapshotMode,
56
57    // ── Tuning ──
58    /// Timeout for each poll operation.
59    pub poll_timeout: Duration,
60
61    /// Maximum records to return per poll.
62    pub max_poll_records: usize,
63
64    /// Heartbeat interval for the binlog connection.
65    pub heartbeat_interval: Duration,
66
67    /// Connection timeout.
68    pub connect_timeout: Duration,
69
70    /// Read timeout for binlog events.
71    pub read_timeout: Duration,
72
73    // ── Schema ──
74    /// Tables to include (format: "database.table" or just "table").
75    /// Empty = all tables.
76    pub table_include: Vec<String>,
77
78    /// Tables to exclude from replication.
79    pub table_exclude: Vec<String>,
80
81    /// Database filter (if set, only replicate from this database).
82    pub database_filter: Option<String>,
83}
84
85impl Default for MySqlCdcConfig {
86    fn default() -> Self {
87        Self {
88            host: "localhost".to_string(),
89            port: 3306,
90            database: None,
91            username: "root".to_string(),
92            password: None,
93            ssl_mode: SslMode::Preferred,
94            server_id: 1001, // Default replica server ID
95            gtid_set: None,
96            binlog_filename: None,
97            binlog_position: None,
98            use_gtid: true, // GTID is the recommended approach
99            snapshot_mode: SnapshotMode::Initial,
100            poll_timeout: Duration::from_millis(100),
101            max_poll_records: 1000,
102            heartbeat_interval: Duration::from_secs(30),
103            connect_timeout: Duration::from_secs(10),
104            read_timeout: Duration::from_secs(60),
105            table_include: Vec::new(),
106            table_exclude: Vec::new(),
107            database_filter: None,
108        }
109    }
110}
111
112impl MySqlCdcConfig {
113    /// Creates a new config with required fields.
114    #[must_use]
115    pub fn new(host: &str, username: &str) -> Self {
116        Self {
117            host: host.to_string(),
118            username: username.to_string(),
119            ..Self::default()
120        }
121    }
122
123    /// Creates a new config with server ID.
124    #[must_use]
125    pub fn with_server_id(host: &str, username: &str, server_id: u32) -> Self {
126        Self {
127            host: host.to_string(),
128            username: username.to_string(),
129            server_id,
130            ..Self::default()
131        }
132    }
133
134    /// Builds a MySQL connection URL.
135    #[must_use]
136    pub fn connection_url(&self) -> String {
137        let mut url = format!("mysql://{}@{}:{}", self.username, self.host, self.port);
138        if let Some(ref db) = self.database {
139            url.push('/');
140            url.push_str(db);
141        }
142        url
143    }
144
145    /// Parses configuration from a generic [`ConnectorConfig`].
146    ///
147    /// # Errors
148    ///
149    /// Returns `ConnectorError` if required keys are missing or values are
150    /// invalid.
151    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
152        let mut cfg = Self {
153            host: config.require("host")?.to_string(),
154            username: config.require("username")?.to_string(),
155            ..Self::default()
156        };
157
158        if let Some(port) = config.get("port") {
159            cfg.port = crate::config::parse_port(port)?;
160        }
161        cfg.database = config.get("database").map(String::from);
162        cfg.password = config.get("password").map(String::from);
163
164        if let Some(ssl) = config.get_parsed::<SslMode>("ssl.mode")? {
165            cfg.ssl_mode = ssl;
166        }
167
168        if let Some(id) = config.get_parsed::<u32>("server.id")? {
169            cfg.server_id = id;
170        }
171
172        if let Some(gtid) = config.get_parsed::<GtidSet>("gtid.set")? {
173            cfg.gtid_set = Some(gtid);
174        }
175
176        cfg.binlog_filename = config.get("binlog.filename").map(String::from);
177
178        if let Some(pos) = config.get_parsed::<u64>("binlog.position")? {
179            cfg.binlog_position = Some(pos);
180        }
181
182        if let Some(use_gtid) = config.get("use.gtid") {
183            cfg.use_gtid = use_gtid.parse().unwrap_or(true);
184        }
185
186        if let Some(mode) = config.get_parsed::<SnapshotMode>("snapshot.mode")? {
187            cfg.snapshot_mode = mode;
188        }
189
190        if let Some(timeout) = config.get_parsed::<u64>("poll.timeout.ms")? {
191            cfg.poll_timeout = Duration::from_millis(timeout);
192        }
193        if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
194            cfg.max_poll_records = max;
195        }
196        if let Some(interval) = config.get_parsed::<u64>("heartbeat.interval.ms")? {
197            cfg.heartbeat_interval = Duration::from_millis(interval);
198        }
199        if let Some(timeout) = config.get_parsed::<u64>("connect.timeout.ms")? {
200            cfg.connect_timeout = Duration::from_millis(timeout);
201        }
202        if let Some(timeout) = config.get_parsed::<u64>("read.timeout.ms")? {
203            cfg.read_timeout = Duration::from_millis(timeout);
204        }
205
206        if let Some(tables) = config.get("table.include") {
207            cfg.table_include = tables.split(',').map(|s| s.trim().to_string()).collect();
208        }
209        if let Some(tables) = config.get("table.exclude") {
210            cfg.table_exclude = tables.split(',').map(|s| s.trim().to_string()).collect();
211        }
212        cfg.database_filter = config.get("database.filter").map(String::from);
213
214        cfg.validate()?;
215        Ok(cfg)
216    }
217
218    /// Validates the configuration.
219    ///
220    /// # Errors
221    ///
222    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
223    pub fn validate(&self) -> Result<(), ConnectorError> {
224        crate::config::require_non_empty(&self.host, "host")?;
225        crate::config::require_non_empty(&self.username, "username")?;
226        if self.server_id == 0 {
227            return Err(ConnectorError::ConfigurationError(
228                "server.id must be > 0".to_string(),
229            ));
230        }
231        if self.max_poll_records == 0 {
232            return Err(ConnectorError::ConfigurationError(
233                "max.poll.records must be > 0".to_string(),
234            ));
235        }
236
237        // If not using GTID, binlog filename should be specified for explicit positioning
238        if !self.use_gtid && self.binlog_filename.is_none() && self.binlog_position.is_some() {
239            return Err(ConnectorError::ConfigurationError(
240                "binlog.filename required when binlog.position is set".to_string(),
241            ));
242        }
243
244        Ok(())
245    }
246
247    /// Returns whether a table should be included based on include/exclude lists.
248    #[must_use]
249    pub fn should_include_table(&self, database: &str, table: &str) -> bool {
250        let full_name = format!("{database}.{table}");
251
252        // Check exclude list first
253        if self
254            .table_exclude
255            .iter()
256            .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
257        {
258            return false;
259        }
260
261        // Check database filter
262        if let Some(ref db_filter) = self.database_filter {
263            if db_filter != database {
264                return false;
265            }
266        }
267
268        // Check include list
269        if self.table_include.is_empty() {
270            return true;
271        }
272
273        self.table_include
274            .iter()
275            .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
276    }
277
278    /// Returns the replication mode description.
279    #[must_use]
280    pub fn replication_mode(&self) -> &'static str {
281        if self.use_gtid {
282            "GTID"
283        } else {
284            "binlog position"
285        }
286    }
287}
288
289/// SSL connection mode for MySQL.
290#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
291pub enum SslMode {
292    /// No SSL.
293    Disabled,
294    /// Try SSL, fall back to unencrypted.
295    #[default]
296    Preferred,
297    /// Require SSL.
298    Required,
299    /// Require SSL and verify CA certificate.
300    VerifyCa,
301    /// Require SSL and verify server identity.
302    VerifyIdentity,
303}
304
305str_enum!(SslMode, lowercase_nodash, String, "unknown SSL mode",
306    Disabled => "disabled", "disable", "false";
307    Preferred => "preferred", "prefer";
308    Required => "required", "require", "true";
309    VerifyCa => "verify_ca", "verify-ca";
310    VerifyIdentity => "verify_identity", "verify-identity"
311);
312
313/// How to handle the initial snapshot when no prior checkpoint exists.
314#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
315pub enum SnapshotMode {
316    /// Take a full snapshot on first start, then switch to streaming.
317    #[default]
318    Initial,
319    /// Never take a snapshot; only stream from current binlog position.
320    Never,
321    /// Always take a snapshot on startup, even if a checkpoint exists.
322    Always,
323    /// Only take a schema snapshot, no data.
324    SchemaOnly,
325}
326
327str_enum!(SnapshotMode, lowercase_nodash, String, "unknown snapshot mode",
328    Initial => "initial";
329    Never => "never";
330    Always => "always";
331    SchemaOnly => "schema_only", "schema-only"
332);
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_default_config() {
340        let cfg = MySqlCdcConfig::default();
341        assert_eq!(cfg.host, "localhost");
342        assert_eq!(cfg.port, 3306);
343        assert_eq!(cfg.username, "root");
344        assert!(cfg.use_gtid);
345        assert_eq!(cfg.ssl_mode, SslMode::Preferred);
346        assert_eq!(cfg.snapshot_mode, SnapshotMode::Initial);
347        assert_eq!(cfg.server_id, 1001);
348    }
349
350    #[test]
351    fn test_new_config() {
352        let cfg = MySqlCdcConfig::new("db.example.com", "myuser");
353        assert_eq!(cfg.host, "db.example.com");
354        assert_eq!(cfg.username, "myuser");
355    }
356
357    #[test]
358    fn test_with_server_id() {
359        let cfg = MySqlCdcConfig::with_server_id("db.example.com", "myuser", 5000);
360        assert_eq!(cfg.server_id, 5000);
361    }
362
363    #[test]
364    fn test_connection_url() {
365        let mut cfg = MySqlCdcConfig::new("db.example.com", "myuser");
366        assert_eq!(cfg.connection_url(), "mysql://myuser@db.example.com:3306");
367
368        cfg.database = Some("mydb".to_string());
369        assert_eq!(
370            cfg.connection_url(),
371            "mysql://myuser@db.example.com:3306/mydb"
372        );
373    }
374
375    #[test]
376    fn test_from_connector_config() {
377        let mut config = ConnectorConfig::new("mysql-cdc");
378        config.set("host", "mysql.local");
379        config.set("username", "repl_user");
380        config.set("password", "secret");
381        config.set("port", "3307");
382        config.set("server.id", "2000");
383        config.set("ssl.mode", "required");
384        config.set("snapshot.mode", "never");
385        config.set("use.gtid", "true");
386        config.set("max.poll.records", "500");
387
388        let cfg = MySqlCdcConfig::from_config(&config).unwrap();
389        assert_eq!(cfg.host, "mysql.local");
390        assert_eq!(cfg.username, "repl_user");
391        assert_eq!(cfg.password, Some("secret".to_string()));
392        assert_eq!(cfg.port, 3307);
393        assert_eq!(cfg.server_id, 2000);
394        assert_eq!(cfg.ssl_mode, SslMode::Required);
395        assert_eq!(cfg.snapshot_mode, SnapshotMode::Never);
396        assert!(cfg.use_gtid);
397        assert_eq!(cfg.max_poll_records, 500);
398    }
399
400    #[test]
401    fn test_from_config_missing_required() {
402        let config = ConnectorConfig::new("mysql-cdc");
403        assert!(MySqlCdcConfig::from_config(&config).is_err());
404    }
405
406    #[test]
407    fn test_validate_empty_host() {
408        let mut cfg = MySqlCdcConfig::default();
409        cfg.host = String::new();
410        assert!(cfg.validate().is_err());
411    }
412
413    #[test]
414    fn test_validate_zero_server_id() {
415        let mut cfg = MySqlCdcConfig::default();
416        cfg.server_id = 0;
417        assert!(cfg.validate().is_err());
418    }
419
420    #[test]
421    fn test_validate_binlog_position_without_filename() {
422        let mut cfg = MySqlCdcConfig::default();
423        cfg.use_gtid = false;
424        cfg.binlog_position = Some(12345);
425        cfg.binlog_filename = None;
426        assert!(cfg.validate().is_err());
427    }
428
429    #[test]
430    fn test_ssl_mode_fromstr() {
431        assert_eq!("disabled".parse::<SslMode>().unwrap(), SslMode::Disabled);
432        assert_eq!("preferred".parse::<SslMode>().unwrap(), SslMode::Preferred);
433        assert_eq!("required".parse::<SslMode>().unwrap(), SslMode::Required);
434        assert_eq!("verify_ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
435        assert_eq!(
436            "verify_identity".parse::<SslMode>().unwrap(),
437            SslMode::VerifyIdentity
438        );
439        assert!("invalid".parse::<SslMode>().is_err());
440    }
441
442    #[test]
443    fn test_snapshot_mode_fromstr() {
444        assert_eq!(
445            "initial".parse::<SnapshotMode>().unwrap(),
446            SnapshotMode::Initial
447        );
448        assert_eq!(
449            "never".parse::<SnapshotMode>().unwrap(),
450            SnapshotMode::Never
451        );
452        assert_eq!(
453            "always".parse::<SnapshotMode>().unwrap(),
454            SnapshotMode::Always
455        );
456        assert_eq!(
457            "schema_only".parse::<SnapshotMode>().unwrap(),
458            SnapshotMode::SchemaOnly
459        );
460        assert!("bad".parse::<SnapshotMode>().is_err());
461    }
462
463    #[test]
464    fn test_table_filtering_simple() {
465        let mut cfg = MySqlCdcConfig::default();
466        // No filters → include all
467        assert!(cfg.should_include_table("mydb", "users"));
468
469        // Include list - exact match
470        cfg.table_include = vec!["mydb.users".to_string()];
471        assert!(cfg.should_include_table("mydb", "users"));
472        assert!(!cfg.should_include_table("mydb", "orders"));
473    }
474
475    #[test]
476    fn test_table_filtering_exclude() {
477        let mut cfg = MySqlCdcConfig::default();
478        cfg.table_exclude = vec!["mydb.logs".to_string()];
479        assert!(cfg.should_include_table("mydb", "users"));
480        assert!(!cfg.should_include_table("mydb", "logs"));
481    }
482
483    #[test]
484    fn test_table_filtering_database() {
485        let mut cfg = MySqlCdcConfig::default();
486        cfg.database_filter = Some("production".to_string());
487        assert!(cfg.should_include_table("production", "users"));
488        assert!(!cfg.should_include_table("staging", "users"));
489    }
490
491    #[test]
492    fn test_table_filtering_table_name_only() {
493        let mut cfg = MySqlCdcConfig::default();
494        cfg.table_include = vec!["users".to_string()];
495        assert!(cfg.should_include_table("any_db", "users"));
496        assert!(!cfg.should_include_table("any_db", "orders"));
497    }
498
499    #[test]
500    fn test_replication_mode() {
501        let mut cfg = MySqlCdcConfig::default();
502        cfg.use_gtid = true;
503        assert_eq!(cfg.replication_mode(), "GTID");
504
505        cfg.use_gtid = false;
506        assert_eq!(cfg.replication_mode(), "binlog position");
507    }
508
509    #[test]
510    fn test_from_config_with_gtid_set() {
511        let mut config = ConnectorConfig::new("mysql-cdc");
512        config.set("host", "localhost");
513        config.set("username", "root");
514        config.set("gtid.set", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10");
515
516        let cfg = MySqlCdcConfig::from_config(&config).unwrap();
517        assert!(cfg.gtid_set.is_some());
518    }
519
520    #[test]
521    fn test_from_config_with_binlog_position() {
522        let mut config = ConnectorConfig::new("mysql-cdc");
523        config.set("host", "localhost");
524        config.set("username", "root");
525        config.set("use.gtid", "false");
526        config.set("binlog.filename", "mysql-bin.000003");
527        config.set("binlog.position", "12345");
528
529        let cfg = MySqlCdcConfig::from_config(&config).unwrap();
530        assert!(!cfg.use_gtid);
531        assert_eq!(cfg.binlog_filename, Some("mysql-bin.000003".to_string()));
532        assert_eq!(cfg.binlog_position, Some(12345));
533    }
534}