1use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11use super::gtid::GtidSet;
12
13#[derive(Debug, Clone)]
15pub struct MySqlCdcConfig {
16 pub host: String,
19
20 pub port: u16,
22
23 pub database: Option<String>,
25
26 pub username: String,
28
29 pub password: Option<String>,
31
32 pub ssl_mode: SslMode,
34
35 pub server_id: u32,
39
40 pub gtid_set: Option<GtidSet>,
43
44 pub binlog_filename: Option<String>,
46
47 pub binlog_position: Option<u64>,
49
50 pub use_gtid: bool,
52
53 pub snapshot_mode: SnapshotMode,
56
57 pub poll_timeout: Duration,
60
61 pub max_poll_records: usize,
63
64 pub heartbeat_interval: Duration,
66
67 pub connect_timeout: Duration,
69
70 pub read_timeout: Duration,
72
73 pub table_include: Vec<String>,
77
78 pub table_exclude: Vec<String>,
80
81 pub database_filter: Option<String>,
83
84 pub max_buffered_events: usize,
86
87 pub backpressure_high_watermark: f64,
91}
92
93impl Default for MySqlCdcConfig {
94 fn default() -> Self {
95 Self {
96 host: "localhost".to_string(),
97 port: 3306,
98 database: None,
99 username: "root".to_string(),
100 password: None,
101 ssl_mode: SslMode::Preferred,
102 server_id: 1001, gtid_set: None,
104 binlog_filename: None,
105 binlog_position: None,
106 use_gtid: true, snapshot_mode: SnapshotMode::Initial,
108 poll_timeout: Duration::from_millis(100),
109 max_poll_records: 1000,
110 heartbeat_interval: Duration::from_secs(30),
111 connect_timeout: Duration::from_secs(10),
112 read_timeout: Duration::from_secs(60),
113 table_include: Vec::new(),
114 table_exclude: Vec::new(),
115 database_filter: None,
116 max_buffered_events: 100_000,
117 backpressure_high_watermark: 0.8,
118 }
119 }
120}
121
122impl MySqlCdcConfig {
123 #[must_use]
125 #[allow(
126 clippy::cast_precision_loss,
127 clippy::cast_possible_truncation,
128 clippy::cast_sign_loss
129 )]
130 pub fn backpressure_high_watermark(&self) -> usize {
131 (self.max_buffered_events as f64 * self.backpressure_high_watermark) as usize
132 }
133
134 #[must_use]
136 pub fn new(host: &str, username: &str) -> Self {
137 Self {
138 host: host.to_string(),
139 username: username.to_string(),
140 ..Self::default()
141 }
142 }
143
144 #[must_use]
146 pub fn with_server_id(host: &str, username: &str, server_id: u32) -> Self {
147 Self {
148 host: host.to_string(),
149 username: username.to_string(),
150 server_id,
151 ..Self::default()
152 }
153 }
154
155 #[must_use]
157 pub fn connection_url(&self) -> String {
158 let mut url = format!("mysql://{}@{}:{}", self.username, self.host, self.port);
159 if let Some(ref db) = self.database {
160 url.push('/');
161 url.push_str(db);
162 }
163 url
164 }
165
166 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
173 let mut cfg = Self {
174 host: config.require("host")?.to_string(),
175 username: config.require("username")?.to_string(),
176 ..Self::default()
177 };
178
179 if let Some(port) = config.get("port") {
180 cfg.port = crate::config::parse_port(port)?;
181 }
182 cfg.database = config.get("database").map(String::from);
183 cfg.password = config.get("password").map(String::from);
184
185 if let Some(ssl) = config.get_parsed::<SslMode>("ssl.mode")? {
186 cfg.ssl_mode = ssl;
187 }
188
189 if let Some(id) = config.get_parsed::<u32>("server.id")? {
190 cfg.server_id = id;
191 }
192
193 if let Some(gtid) = config.get_parsed::<GtidSet>("gtid.set")? {
194 cfg.gtid_set = Some(gtid);
195 }
196
197 cfg.binlog_filename = config.get("binlog.filename").map(String::from);
198
199 if let Some(pos) = config.get_parsed::<u64>("binlog.position")? {
200 cfg.binlog_position = Some(pos);
201 }
202
203 if let Some(use_gtid) = config.get("use.gtid") {
204 cfg.use_gtid = use_gtid.parse().unwrap_or(true);
205 }
206
207 if let Some(mode) = config.get_parsed::<SnapshotMode>("snapshot.mode")? {
208 cfg.snapshot_mode = mode;
209 }
210
211 if let Some(timeout) = config.get_parsed::<u64>("poll.timeout.ms")? {
212 cfg.poll_timeout = Duration::from_millis(timeout);
213 }
214 if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
215 cfg.max_poll_records = max;
216 }
217 if let Some(interval) = config.get_parsed::<u64>("heartbeat.interval.ms")? {
218 cfg.heartbeat_interval = Duration::from_millis(interval);
219 }
220 if let Some(timeout) = config.get_parsed::<u64>("connect.timeout.ms")? {
221 cfg.connect_timeout = Duration::from_millis(timeout);
222 }
223 if let Some(timeout) = config.get_parsed::<u64>("read.timeout.ms")? {
224 cfg.read_timeout = Duration::from_millis(timeout);
225 }
226
227 if let Some(tables) = config.get("table.include") {
228 cfg.table_include = tables.split(',').map(|s| s.trim().to_string()).collect();
229 }
230 if let Some(tables) = config.get("table.exclude") {
231 cfg.table_exclude = tables.split(',').map(|s| s.trim().to_string()).collect();
232 }
233 cfg.database_filter = config.get("database.filter").map(String::from);
234
235 if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
236 cfg.max_buffered_events = max;
237 }
238 if let Some(hw) = config.get_parsed::<f64>("backpressure.high.watermark")? {
239 cfg.backpressure_high_watermark = hw;
240 }
241
242 cfg.validate()?;
243 Ok(cfg)
244 }
245
246 pub fn validate(&self) -> Result<(), ConnectorError> {
252 crate::config::require_non_empty(&self.host, "host")?;
253 crate::config::require_non_empty(&self.username, "username")?;
254 if self.server_id == 0 {
255 return Err(ConnectorError::ConfigurationError(
256 "server.id must be > 0".to_string(),
257 ));
258 }
259 if self.max_poll_records == 0 {
260 return Err(ConnectorError::ConfigurationError(
261 "max.poll.records must be > 0".to_string(),
262 ));
263 }
264
265 if !self.use_gtid && self.binlog_filename.is_none() && self.binlog_position.is_some() {
267 return Err(ConnectorError::ConfigurationError(
268 "binlog.filename required when binlog.position is set".to_string(),
269 ));
270 }
271
272 Ok(())
273 }
274
275 #[must_use]
277 pub fn should_include_table(&self, database: &str, table: &str) -> bool {
278 let full_name = format!("{database}.{table}");
279
280 if self
282 .table_exclude
283 .iter()
284 .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
285 {
286 return false;
287 }
288
289 if let Some(ref db_filter) = self.database_filter {
291 if db_filter != database {
292 return false;
293 }
294 }
295
296 if self.table_include.is_empty() {
298 return true;
299 }
300
301 self.table_include
302 .iter()
303 .any(|t| t == table || t == &full_name || t.ends_with(&format!(".{table}")))
304 }
305
306 #[must_use]
308 pub fn replication_mode(&self) -> &'static str {
309 if self.use_gtid {
310 "GTID"
311 } else {
312 "binlog position"
313 }
314 }
315}
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
319pub enum SslMode {
320 Disabled,
322 #[default]
324 Preferred,
325 Required,
327 VerifyCa,
329 VerifyIdentity,
331}
332
333str_enum!(SslMode, lowercase_nodash, String, "unknown SSL mode",
334 Disabled => "disabled", "disable", "false";
335 Preferred => "preferred", "prefer";
336 Required => "required", "require", "true";
337 VerifyCa => "verify_ca", "verify-ca";
338 VerifyIdentity => "verify_identity", "verify-identity"
339);
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
343pub enum SnapshotMode {
344 #[default]
346 Initial,
347 Never,
349 Always,
351 SchemaOnly,
353}
354
355str_enum!(SnapshotMode, lowercase_nodash, String, "unknown snapshot mode",
356 Initial => "initial";
357 Never => "never";
358 Always => "always";
359 SchemaOnly => "schema_only", "schema-only"
360);
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_default_config() {
368 let cfg = MySqlCdcConfig::default();
369 assert_eq!(cfg.host, "localhost");
370 assert_eq!(cfg.port, 3306);
371 assert_eq!(cfg.username, "root");
372 assert!(cfg.use_gtid);
373 assert_eq!(cfg.ssl_mode, SslMode::Preferred);
374 assert_eq!(cfg.snapshot_mode, SnapshotMode::Initial);
375 assert_eq!(cfg.server_id, 1001);
376 }
377
378 #[test]
379 fn test_new_config() {
380 let cfg = MySqlCdcConfig::new("db.example.com", "myuser");
381 assert_eq!(cfg.host, "db.example.com");
382 assert_eq!(cfg.username, "myuser");
383 }
384
385 #[test]
386 fn test_with_server_id() {
387 let cfg = MySqlCdcConfig::with_server_id("db.example.com", "myuser", 5000);
388 assert_eq!(cfg.server_id, 5000);
389 }
390
391 #[test]
392 fn test_connection_url() {
393 let mut cfg = MySqlCdcConfig::new("db.example.com", "myuser");
394 assert_eq!(cfg.connection_url(), "mysql://myuser@db.example.com:3306");
395
396 cfg.database = Some("mydb".to_string());
397 assert_eq!(
398 cfg.connection_url(),
399 "mysql://myuser@db.example.com:3306/mydb"
400 );
401 }
402
403 #[test]
404 fn test_from_connector_config() {
405 let mut config = ConnectorConfig::new("mysql-cdc");
406 config.set("host", "mysql.local");
407 config.set("username", "repl_user");
408 config.set("password", "secret");
409 config.set("port", "3307");
410 config.set("server.id", "2000");
411 config.set("ssl.mode", "required");
412 config.set("snapshot.mode", "never");
413 config.set("use.gtid", "true");
414 config.set("max.poll.records", "500");
415
416 let cfg = MySqlCdcConfig::from_config(&config).unwrap();
417 assert_eq!(cfg.host, "mysql.local");
418 assert_eq!(cfg.username, "repl_user");
419 assert_eq!(cfg.password, Some("secret".to_string()));
420 assert_eq!(cfg.port, 3307);
421 assert_eq!(cfg.server_id, 2000);
422 assert_eq!(cfg.ssl_mode, SslMode::Required);
423 assert_eq!(cfg.snapshot_mode, SnapshotMode::Never);
424 assert!(cfg.use_gtid);
425 assert_eq!(cfg.max_poll_records, 500);
426 }
427
428 #[test]
429 fn test_from_config_missing_required() {
430 let config = ConnectorConfig::new("mysql-cdc");
431 assert!(MySqlCdcConfig::from_config(&config).is_err());
432 }
433
434 #[test]
435 fn test_validate_empty_host() {
436 let mut cfg = MySqlCdcConfig::default();
437 cfg.host = String::new();
438 assert!(cfg.validate().is_err());
439 }
440
441 #[test]
442 fn test_validate_zero_server_id() {
443 let mut cfg = MySqlCdcConfig::default();
444 cfg.server_id = 0;
445 assert!(cfg.validate().is_err());
446 }
447
448 #[test]
449 fn test_validate_binlog_position_without_filename() {
450 let mut cfg = MySqlCdcConfig::default();
451 cfg.use_gtid = false;
452 cfg.binlog_position = Some(12345);
453 cfg.binlog_filename = None;
454 assert!(cfg.validate().is_err());
455 }
456
457 #[test]
458 fn test_ssl_mode_fromstr() {
459 assert_eq!("disabled".parse::<SslMode>().unwrap(), SslMode::Disabled);
460 assert_eq!("preferred".parse::<SslMode>().unwrap(), SslMode::Preferred);
461 assert_eq!("required".parse::<SslMode>().unwrap(), SslMode::Required);
462 assert_eq!("verify_ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
463 assert_eq!(
464 "verify_identity".parse::<SslMode>().unwrap(),
465 SslMode::VerifyIdentity
466 );
467 assert!("invalid".parse::<SslMode>().is_err());
468 }
469
470 #[test]
471 fn test_snapshot_mode_fromstr() {
472 assert_eq!(
473 "initial".parse::<SnapshotMode>().unwrap(),
474 SnapshotMode::Initial
475 );
476 assert_eq!(
477 "never".parse::<SnapshotMode>().unwrap(),
478 SnapshotMode::Never
479 );
480 assert_eq!(
481 "always".parse::<SnapshotMode>().unwrap(),
482 SnapshotMode::Always
483 );
484 assert_eq!(
485 "schema_only".parse::<SnapshotMode>().unwrap(),
486 SnapshotMode::SchemaOnly
487 );
488 assert!("bad".parse::<SnapshotMode>().is_err());
489 }
490
491 #[test]
492 fn test_table_filtering_simple() {
493 let mut cfg = MySqlCdcConfig::default();
494 assert!(cfg.should_include_table("mydb", "users"));
496
497 cfg.table_include = vec!["mydb.users".to_string()];
499 assert!(cfg.should_include_table("mydb", "users"));
500 assert!(!cfg.should_include_table("mydb", "orders"));
501 }
502
503 #[test]
504 fn test_table_filtering_exclude() {
505 let mut cfg = MySqlCdcConfig::default();
506 cfg.table_exclude = vec!["mydb.logs".to_string()];
507 assert!(cfg.should_include_table("mydb", "users"));
508 assert!(!cfg.should_include_table("mydb", "logs"));
509 }
510
511 #[test]
512 fn test_table_filtering_database() {
513 let mut cfg = MySqlCdcConfig::default();
514 cfg.database_filter = Some("production".to_string());
515 assert!(cfg.should_include_table("production", "users"));
516 assert!(!cfg.should_include_table("staging", "users"));
517 }
518
519 #[test]
520 fn test_table_filtering_table_name_only() {
521 let mut cfg = MySqlCdcConfig::default();
522 cfg.table_include = vec!["users".to_string()];
523 assert!(cfg.should_include_table("any_db", "users"));
524 assert!(!cfg.should_include_table("any_db", "orders"));
525 }
526
527 #[test]
528 fn test_replication_mode() {
529 let mut cfg = MySqlCdcConfig::default();
530 cfg.use_gtid = true;
531 assert_eq!(cfg.replication_mode(), "GTID");
532
533 cfg.use_gtid = false;
534 assert_eq!(cfg.replication_mode(), "binlog position");
535 }
536
537 #[test]
538 fn test_from_config_with_gtid_set() {
539 let mut config = ConnectorConfig::new("mysql-cdc");
540 config.set("host", "localhost");
541 config.set("username", "root");
542 config.set("gtid.set", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10");
543
544 let cfg = MySqlCdcConfig::from_config(&config).unwrap();
545 assert!(cfg.gtid_set.is_some());
546 }
547
548 #[test]
549 fn test_from_config_with_binlog_position() {
550 let mut config = ConnectorConfig::new("mysql-cdc");
551 config.set("host", "localhost");
552 config.set("username", "root");
553 config.set("use.gtid", "false");
554 config.set("binlog.filename", "mysql-bin.000003");
555 config.set("binlog.position", "12345");
556
557 let cfg = MySqlCdcConfig::from_config(&config).unwrap();
558 assert!(!cfg.use_gtid);
559 assert_eq!(cfg.binlog_filename, Some("mysql-bin.000003".to_string()));
560 assert_eq!(cfg.binlog_position, Some(12345));
561 }
562}