laminar_connectors/cdc/mysql/
mod.rs1#![allow(dead_code)] #![allow(clippy::doc_markdown)] pub mod changelog;
10pub mod config;
11pub mod decoder;
12pub mod gtid;
13pub mod metrics;
14#[cfg(feature = "mysql-cdc")]
15pub mod mysql_io;
16pub mod schema;
17pub mod source;
18pub mod types;
19
20pub use changelog::{
22 column_value_to_json, delete_to_events, events_to_record_batch, insert_to_events, row_to_json,
23 update_to_events, CdcOperation, ChangeEvent,
24};
25pub use config::{MySqlCdcConfig, SnapshotMode, SslMode};
26pub use decoder::{
27 BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DecoderError,
28 DeleteMessage, InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage,
29 UpdateMessage, UpdateRowData,
30};
31pub use gtid::{Gtid, GtidRange, GtidSet};
32pub use metrics::{MetricsSnapshot, MySqlCdcMetrics};
33pub use schema::{cdc_envelope_schema, TableCache, TableInfo};
34pub use source::MySqlCdcSource;
35pub use types::{mysql_type, mysql_type_name, mysql_type_to_arrow, mysql_type_to_sql, MySqlColumn};
36
37use std::sync::Arc;
38
39use crate::config::{ConfigKeySpec, ConnectorInfo};
40use crate::connector::SourceConnector;
41use crate::registry::ConnectorRegistry;
42
43pub fn register_mysql_cdc_source(registry: &ConnectorRegistry) {
45 let info = ConnectorInfo {
46 name: "mysql-cdc".to_string(),
47 display_name: "MySQL CDC Source".to_string(),
48 version: env!("CARGO_PKG_VERSION").to_string(),
49 is_source: true,
50 is_sink: false,
51 config_keys: mysql_cdc_config_keys(),
52 };
53
54 registry.register_source(
55 "mysql-cdc",
56 info.clone(),
57 Arc::new(|registry: Option<&prometheus::Registry>| {
58 Box::new(MySqlCdcSource::new(MySqlCdcConfig::default(), registry))
59 as Box<dyn SourceConnector>
60 }),
61 );
62
63 registry.register_table_source(
64 "mysql-cdc",
65 info,
66 Arc::new(|config| {
67 let connector = Box::new(MySqlCdcSource::new(MySqlCdcConfig::default(), None));
68 Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
69 connector,
70 config.clone(),
71 4096,
72 )))
73 }),
74 );
75}
76
77#[allow(clippy::too_many_lines)]
81fn mysql_cdc_config_keys() -> Vec<ConfigKeySpec> {
82 vec![
83 ConfigKeySpec::optional("host", "MySQL server hostname", "localhost"),
85 ConfigKeySpec::optional("port", "MySQL server port", "3306"),
86 ConfigKeySpec::optional("database", "Database name to replicate", ""),
87 ConfigKeySpec::required("username", "MySQL replication user"),
88 ConfigKeySpec::required("password", "MySQL replication password"),
89 ConfigKeySpec::optional(
91 "ssl.mode",
92 "SSL connection mode (disabled/preferred/required/verify_ca/verify_identity)",
93 "preferred",
94 ),
95 ConfigKeySpec::required("server.id", "Unique server ID for this replication client"),
97 ConfigKeySpec::optional(
98 "use.gtid",
99 "Use GTID-based replication (recommended)",
100 "true",
101 ),
102 ConfigKeySpec::optional("gtid.set", "Starting GTID set for replication", ""),
103 ConfigKeySpec::optional(
104 "binlog.filename",
105 "Starting binlog filename (if not using GTID)",
106 "",
107 ),
108 ConfigKeySpec::optional(
109 "binlog.position",
110 "Starting binlog position (if not using GTID)",
111 "4",
112 ),
113 ConfigKeySpec::optional(
115 "snapshot.mode",
116 "Snapshot mode (initial/never/always/schema_only)",
117 "initial",
118 ),
119 ConfigKeySpec::optional(
121 "table.include",
122 "Comma-separated list of tables to include",
123 "",
124 ),
125 ConfigKeySpec::optional(
126 "table.exclude",
127 "Comma-separated list of tables to exclude",
128 "",
129 ),
130 ConfigKeySpec::optional("database.filter", "Database name filter pattern", ""),
131 ConfigKeySpec::optional(
133 "poll.timeout.ms",
134 "Timeout for polling binlog events (milliseconds)",
135 "1000",
136 ),
137 ConfigKeySpec::optional("max.poll.records", "Maximum records per poll batch", "1000"),
138 ConfigKeySpec::optional(
139 "heartbeat.interval.ms",
140 "Heartbeat interval (milliseconds)",
141 "30000",
142 ),
143 ConfigKeySpec::optional(
144 "connect.timeout.ms",
145 "Connection timeout (milliseconds)",
146 "10000",
147 ),
148 ConfigKeySpec::optional("read.timeout.ms", "Read timeout (milliseconds)", "60000"),
149 ]
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[test]
157 fn test_register_mysql_cdc_source() {
158 let registry = ConnectorRegistry::new();
159 register_mysql_cdc_source(®istry);
160
161 let source_list = registry.list_sources();
163 assert!(source_list.contains(&"mysql-cdc".to_string()));
164 }
165
166 #[test]
167 fn test_mysql_cdc_config_keys() {
168 let specs = mysql_cdc_config_keys();
169
170 assert!(specs.len() >= 15);
172
173 let required: Vec<_> = specs.iter().filter(|s| s.required).collect();
175 assert!(required.iter().any(|s| s.key == "username"));
176 assert!(required.iter().any(|s| s.key == "password"));
177 assert!(required.iter().any(|s| s.key == "server.id"));
178
179 let host_spec = specs.iter().find(|s| s.key == "host").unwrap();
181 assert!(!host_spec.required);
182 assert_eq!(host_spec.default, Some("localhost".to_string()));
183
184 let port_spec = specs.iter().find(|s| s.key == "port").unwrap();
185 assert!(!port_spec.required);
186 assert_eq!(port_spec.default, Some("3306".to_string()));
187
188 let ssl_spec = specs.iter().find(|s| s.key == "ssl.mode").unwrap();
189 assert!(!ssl_spec.required);
190 assert_eq!(ssl_spec.default, Some("preferred".to_string()));
191 }
192
193 #[test]
194 fn test_public_exports() {
195 let _config = MySqlCdcConfig::default();
197 let _ssl = SslMode::Preferred;
198 let _snapshot = SnapshotMode::Initial;
199 let _metrics = MySqlCdcMetrics::new(None);
200 let _cache = TableCache::new();
201 let _op = CdcOperation::Insert;
202 }
203}