laminar_connectors/cdc/mysql/
config.rs1use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11use super::gtid::GtidSet;
12
13#[derive(Debug, Clone)]
15pub struct MySqlCdcConfig {
16 pub host: String,
19
20 pub port: u16,
22
23 pub database: Option<String>,
25
26 pub username: String,
28
29 pub password: Option<String>,
31
32 pub ssl_mode: SslMode,
34
35 pub server_id: u32,
39
40 pub gtid_set: Option<GtidSet>,
43
44 pub binlog_filename: Option<String>,
46
47 pub binlog_position: Option<u64>,
49
50 pub use_gtid: bool,
52
53 pub snapshot_mode: SnapshotMode,
56
57 pub poll_timeout: Duration,
60
61 pub max_poll_records: usize,
63
64 pub heartbeat_interval: Duration,
66
67 pub connect_timeout: Duration,
69
70 pub read_timeout: Duration,
72
73 pub table_include: Vec<String>,
77
78 pub table_exclude: Vec<String>,
80
81 pub database_filter: Option<String>,
83}
84
85impl Default for MySqlCdcConfig {
86 fn default() -> Self {
87 Self {
88 host: "localhost".to_string(),
89 port: 3306,
90 database: None,
91 username: "root".to_string(),
92 password: None,
93 ssl_mode: SslMode::Preferred,
94 server_id: 1001, gtid_set: None,
96 binlog_filename: None,
97 binlog_position: None,
98 use_gtid: true, snapshot_mode: SnapshotMode::Initial,
100 poll_timeout: Duration::from_millis(100),
101 max_poll_records: 1000,
102 heartbeat_interval: Duration::from_secs(30),
103 connect_timeout: Duration::from_secs(10),
104 read_timeout: Duration::from_secs(60),
105 table_include: Vec::new(),
106 table_exclude: Vec::new(),
107 database_filter: None,
108 }
109 }
110}
111
112impl MySqlCdcConfig {
113 #[must_use]
115 pub fn new(host: &str, username: &str) -> Self {
116 Self {
117 host: host.to_string(),
118 username: username.to_string(),
119 ..Self::default()
120 }
121 }
122
123 #[must_use]
125 pub fn with_server_id(host: &str, username: &str, server_id: u32) -> Self {
126 Self {
127 host: host.to_string(),
128 username: username.to_string(),
129 server_id,
130 ..Self::default()
131 }
132 }
133
134 #[must_use]
136 pub fn connection_url(&self) -> String {
137 let mut url = format!("mysql://{}@{}:{}", self.username, self.host, self.port);
138 if let Some(ref db) = self.database {
139 url.push('/');
140 url.push_str(db);
141 }
142 url
143 }
144
145 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
152 let mut cfg = Self {
153 host: config.require("host")?.to_string(),
154 username: config.require("username")?.to_string(),
155 ..Self::default()
156 };
157
158 if let Some(port) = config.get("port") {
159 cfg.port = crate::config::parse_port(port)?;
160 }
161 cfg.database = config.get("database").map(String::from);
162 cfg.password = config.get("password").map(String::from);
163
164 if let Some(ssl) = config.get_parsed::<SslMode>("ssl.mode")? {
165 cfg.ssl_mode = ssl;
166 }
167
168 if let Some(id) = config.get_parsed::<u32>("server.id")? {
169 cfg.server_id = id;
170 }
171
172 if let Some(gtid) = config.get_parsed::<GtidSet>("gtid.set")? {
173 cfg.gtid_set = Some(gtid);
174 }
175
176 cfg.binlog_filename = config.get("binlog.filename").map(String::from);
177
178 if let Some(pos) = config.get_parsed::<u64>("binlog.position")? {
179 cfg.binlog_position = Some(pos);
180 }
181
182 if let Some(use_gtid) = config.get("use.gtid") {
183 cfg.use_gtid = use_gtid.parse().unwrap_or(true);
184 }
185
186 if let Some(mode) = config.get_parsed::<SnapshotMode>("snapshot.mode")? {
187 cfg.snapshot_mode = mode;
188 }
189
190 if let Some(timeout) = config.get_parsed::<u64>("poll.timeout.ms")? {
191 cfg.poll_timeout = Duration::from_millis(timeout);
192 }
193 if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
194 cfg.max_poll_records = max;
195 }
196 if let Some(interval) = config.get_parsed::<u64>("heartbeat.interval.ms")? {
197 cfg.heartbeat_interval = Duration::from_millis(interval);
198 }
199 if let Some(timeout) = config.get_parsed::<u64>("connect.timeout.ms")? {
200 cfg.connect_timeout = Duration::from_millis(timeout);
201 }
202 if let Some(timeout) = config.get_parsed::<u64>("read.timeout.ms")? {
203 cfg.read_timeout = Duration::from_millis(timeout);
204 }
205
206 if let Some(tables) = config.get("table.include") {
207 cfg.table_include = tables.split(',').map(|s| s.trim().to_string()).collect();
208 }
209 if let Some(tables) = config.get("table.exclude") {
210 cfg.table_exclude = tables.split(',').map(|s| s.trim().to_string()).collect();
211 }
212 cfg.database_filter = config.get("database.filter").map(String::from);
213
214 cfg.validate()?;
215 Ok(cfg)
216 }
217
218 pub fn validate(&self) -> Result<(), ConnectorError> {
224 crate::config::require_non_empty(&self.host, "host")?;
225 crate::config::require_non_empty(&self.username, "username")?;
226 if self.server_id == 0 {
227 return Err(ConnectorError::ConfigurationError(
228 "server.id must be > 0".to_string(),
229 ));
230 }
231 if self.max_poll_records == 0 {
232 return Err(ConnectorError::ConfigurationError(
233 "max.poll.records must be > 0".to_string(),
234 ));
235 }
236
237 if !self.use_gtid && self.binlog_filename.is_none() && self.binlog_position.is_some() {
239 return Err(ConnectorError::ConfigurationError(
240 "binlog.filename required when binlog.position is set".to_string(),
241 ));
242 }
243
244 Ok(())
245 }
246
247 #[must_use]
249 pub fn should_include_table(&self, database: &str, table: &str) -> bool {
250 let full_name = format!("{database}.{table}");
251
252 if self
254 .table_exclude
255 .iter()
256 .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
257 {
258 return false;
259 }
260
261 if let Some(ref db_filter) = self.database_filter {
263 if db_filter != database {
264 return false;
265 }
266 }
267
268 if self.table_include.is_empty() {
270 return true;
271 }
272
273 self.table_include
274 .iter()
275 .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
276 }
277
278 #[must_use]
280 pub fn replication_mode(&self) -> &'static str {
281 if self.use_gtid {
282 "GTID"
283 } else {
284 "binlog position"
285 }
286 }
287}
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
291pub enum SslMode {
292 Disabled,
294 #[default]
296 Preferred,
297 Required,
299 VerifyCa,
301 VerifyIdentity,
303}
304
305str_enum!(SslMode, lowercase_nodash, String, "unknown SSL mode",
306 Disabled => "disabled", "disable", "false";
307 Preferred => "preferred", "prefer";
308 Required => "required", "require", "true";
309 VerifyCa => "verify_ca", "verify-ca";
310 VerifyIdentity => "verify_identity", "verify-identity"
311);
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
315pub enum SnapshotMode {
316 #[default]
318 Initial,
319 Never,
321 Always,
323 SchemaOnly,
325}
326
327str_enum!(SnapshotMode, lowercase_nodash, String, "unknown snapshot mode",
328 Initial => "initial";
329 Never => "never";
330 Always => "always";
331 SchemaOnly => "schema_only", "schema-only"
332);
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn test_default_config() {
340 let cfg = MySqlCdcConfig::default();
341 assert_eq!(cfg.host, "localhost");
342 assert_eq!(cfg.port, 3306);
343 assert_eq!(cfg.username, "root");
344 assert!(cfg.use_gtid);
345 assert_eq!(cfg.ssl_mode, SslMode::Preferred);
346 assert_eq!(cfg.snapshot_mode, SnapshotMode::Initial);
347 assert_eq!(cfg.server_id, 1001);
348 }
349
350 #[test]
351 fn test_new_config() {
352 let cfg = MySqlCdcConfig::new("db.example.com", "myuser");
353 assert_eq!(cfg.host, "db.example.com");
354 assert_eq!(cfg.username, "myuser");
355 }
356
357 #[test]
358 fn test_with_server_id() {
359 let cfg = MySqlCdcConfig::with_server_id("db.example.com", "myuser", 5000);
360 assert_eq!(cfg.server_id, 5000);
361 }
362
363 #[test]
364 fn test_connection_url() {
365 let mut cfg = MySqlCdcConfig::new("db.example.com", "myuser");
366 assert_eq!(cfg.connection_url(), "mysql://myuser@db.example.com:3306");
367
368 cfg.database = Some("mydb".to_string());
369 assert_eq!(
370 cfg.connection_url(),
371 "mysql://myuser@db.example.com:3306/mydb"
372 );
373 }
374
375 #[test]
376 fn test_from_connector_config() {
377 let mut config = ConnectorConfig::new("mysql-cdc");
378 config.set("host", "mysql.local");
379 config.set("username", "repl_user");
380 config.set("password", "secret");
381 config.set("port", "3307");
382 config.set("server.id", "2000");
383 config.set("ssl.mode", "required");
384 config.set("snapshot.mode", "never");
385 config.set("use.gtid", "true");
386 config.set("max.poll.records", "500");
387
388 let cfg = MySqlCdcConfig::from_config(&config).unwrap();
389 assert_eq!(cfg.host, "mysql.local");
390 assert_eq!(cfg.username, "repl_user");
391 assert_eq!(cfg.password, Some("secret".to_string()));
392 assert_eq!(cfg.port, 3307);
393 assert_eq!(cfg.server_id, 2000);
394 assert_eq!(cfg.ssl_mode, SslMode::Required);
395 assert_eq!(cfg.snapshot_mode, SnapshotMode::Never);
396 assert!(cfg.use_gtid);
397 assert_eq!(cfg.max_poll_records, 500);
398 }
399
400 #[test]
401 fn test_from_config_missing_required() {
402 let config = ConnectorConfig::new("mysql-cdc");
403 assert!(MySqlCdcConfig::from_config(&config).is_err());
404 }
405
406 #[test]
407 fn test_validate_empty_host() {
408 let mut cfg = MySqlCdcConfig::default();
409 cfg.host = String::new();
410 assert!(cfg.validate().is_err());
411 }
412
413 #[test]
414 fn test_validate_zero_server_id() {
415 let mut cfg = MySqlCdcConfig::default();
416 cfg.server_id = 0;
417 assert!(cfg.validate().is_err());
418 }
419
420 #[test]
421 fn test_validate_binlog_position_without_filename() {
422 let mut cfg = MySqlCdcConfig::default();
423 cfg.use_gtid = false;
424 cfg.binlog_position = Some(12345);
425 cfg.binlog_filename = None;
426 assert!(cfg.validate().is_err());
427 }
428
429 #[test]
430 fn test_ssl_mode_fromstr() {
431 assert_eq!("disabled".parse::<SslMode>().unwrap(), SslMode::Disabled);
432 assert_eq!("preferred".parse::<SslMode>().unwrap(), SslMode::Preferred);
433 assert_eq!("required".parse::<SslMode>().unwrap(), SslMode::Required);
434 assert_eq!("verify_ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
435 assert_eq!(
436 "verify_identity".parse::<SslMode>().unwrap(),
437 SslMode::VerifyIdentity
438 );
439 assert!("invalid".parse::<SslMode>().is_err());
440 }
441
442 #[test]
443 fn test_snapshot_mode_fromstr() {
444 assert_eq!(
445 "initial".parse::<SnapshotMode>().unwrap(),
446 SnapshotMode::Initial
447 );
448 assert_eq!(
449 "never".parse::<SnapshotMode>().unwrap(),
450 SnapshotMode::Never
451 );
452 assert_eq!(
453 "always".parse::<SnapshotMode>().unwrap(),
454 SnapshotMode::Always
455 );
456 assert_eq!(
457 "schema_only".parse::<SnapshotMode>().unwrap(),
458 SnapshotMode::SchemaOnly
459 );
460 assert!("bad".parse::<SnapshotMode>().is_err());
461 }
462
463 #[test]
464 fn test_table_filtering_simple() {
465 let mut cfg = MySqlCdcConfig::default();
466 assert!(cfg.should_include_table("mydb", "users"));
468
469 cfg.table_include = vec!["mydb.users".to_string()];
471 assert!(cfg.should_include_table("mydb", "users"));
472 assert!(!cfg.should_include_table("mydb", "orders"));
473 }
474
475 #[test]
476 fn test_table_filtering_exclude() {
477 let mut cfg = MySqlCdcConfig::default();
478 cfg.table_exclude = vec!["mydb.logs".to_string()];
479 assert!(cfg.should_include_table("mydb", "users"));
480 assert!(!cfg.should_include_table("mydb", "logs"));
481 }
482
483 #[test]
484 fn test_table_filtering_database() {
485 let mut cfg = MySqlCdcConfig::default();
486 cfg.database_filter = Some("production".to_string());
487 assert!(cfg.should_include_table("production", "users"));
488 assert!(!cfg.should_include_table("staging", "users"));
489 }
490
491 #[test]
492 fn test_table_filtering_table_name_only() {
493 let mut cfg = MySqlCdcConfig::default();
494 cfg.table_include = vec!["users".to_string()];
495 assert!(cfg.should_include_table("any_db", "users"));
496 assert!(!cfg.should_include_table("any_db", "orders"));
497 }
498
499 #[test]
500 fn test_replication_mode() {
501 let mut cfg = MySqlCdcConfig::default();
502 cfg.use_gtid = true;
503 assert_eq!(cfg.replication_mode(), "GTID");
504
505 cfg.use_gtid = false;
506 assert_eq!(cfg.replication_mode(), "binlog position");
507 }
508
509 #[test]
510 fn test_from_config_with_gtid_set() {
511 let mut config = ConnectorConfig::new("mysql-cdc");
512 config.set("host", "localhost");
513 config.set("username", "root");
514 config.set("gtid.set", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10");
515
516 let cfg = MySqlCdcConfig::from_config(&config).unwrap();
517 assert!(cfg.gtid_set.is_some());
518 }
519
520 #[test]
521 fn test_from_config_with_binlog_position() {
522 let mut config = ConnectorConfig::new("mysql-cdc");
523 config.set("host", "localhost");
524 config.set("username", "root");
525 config.set("use.gtid", "false");
526 config.set("binlog.filename", "mysql-bin.000003");
527 config.set("binlog.position", "12345");
528
529 let cfg = MySqlCdcConfig::from_config(&config).unwrap();
530 assert!(!cfg.use_gtid);
531 assert_eq!(cfg.binlog_filename, Some("mysql-bin.000003".to_string()));
532 assert_eq!(cfg.binlog_position, Some(12345));
533 }
534}