laminar_connectors/cdc/mysql/
mod.rs1#![allow(dead_code)]
6#![allow(clippy::doc_markdown)]
9
10mod changelog;
73mod config;
74mod decoder;
75mod gtid;
76mod metrics;
77#[cfg(feature = "mysql-cdc")]
78pub mod mysql_io;
79mod schema;
80mod source;
81mod types;
82
83pub use changelog::{
85 column_value_to_json, delete_to_events, events_to_record_batch, insert_to_events, row_to_json,
86 update_to_events, CdcOperation, ChangeEvent,
87};
88pub use config::{MySqlCdcConfig, SnapshotMode, SslMode};
89pub use decoder::{
90 BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DecoderError,
91 DeleteMessage, InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage,
92 UpdateMessage, UpdateRowData,
93};
94pub use gtid::{Gtid, GtidRange, GtidSet};
95pub use metrics::{MetricsSnapshot, MySqlCdcMetrics};
96pub use schema::{cdc_envelope_schema, TableCache, TableInfo};
97pub use source::MySqlCdcSource;
98pub use types::{mysql_type, mysql_type_name, mysql_type_to_arrow, mysql_type_to_sql, MySqlColumn};
99
100use std::sync::Arc;
101
102use crate::config::{ConfigKeySpec, ConnectorInfo};
103use crate::connector::SourceConnector;
104use crate::registry::ConnectorRegistry;
105
106pub fn register_mysql_cdc_source(registry: &ConnectorRegistry) {
113 let info = ConnectorInfo {
114 name: "mysql-cdc".to_string(),
115 display_name: "MySQL CDC Source".to_string(),
116 version: env!("CARGO_PKG_VERSION").to_string(),
117 is_source: true,
118 is_sink: false,
119 config_keys: config_key_specs(),
120 };
121
122 registry.register_source(
123 "mysql-cdc",
124 info.clone(),
125 Arc::new(|| {
126 Box::new(MySqlCdcSource::new(MySqlCdcConfig::default())) as Box<dyn SourceConnector>
127 }),
128 );
129
130 registry.register_table_source(
131 "mysql-cdc",
132 info,
133 Arc::new(|config| {
134 let connector = Box::new(MySqlCdcSource::new(MySqlCdcConfig::default()));
135 Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
136 connector,
137 config.clone(),
138 4096,
139 )))
140 }),
141 );
142}
143
144#[must_use]
148#[allow(clippy::too_many_lines)]
149pub fn config_key_specs() -> Vec<ConfigKeySpec> {
150 vec![
151 ConfigKeySpec::optional("host", "MySQL server hostname", "localhost"),
153 ConfigKeySpec::optional("port", "MySQL server port", "3306"),
154 ConfigKeySpec::optional("database", "Database name to replicate", ""),
155 ConfigKeySpec::required("username", "MySQL replication user"),
156 ConfigKeySpec::required("password", "MySQL replication password"),
157 ConfigKeySpec::optional(
159 "ssl.mode",
160 "SSL connection mode (disabled/preferred/required/verify_ca/verify_identity)",
161 "preferred",
162 ),
163 ConfigKeySpec::required("server.id", "Unique server ID for this replication client"),
165 ConfigKeySpec::optional(
166 "use.gtid",
167 "Use GTID-based replication (recommended)",
168 "true",
169 ),
170 ConfigKeySpec::optional("gtid.set", "Starting GTID set for replication", ""),
171 ConfigKeySpec::optional(
172 "binlog.filename",
173 "Starting binlog filename (if not using GTID)",
174 "",
175 ),
176 ConfigKeySpec::optional(
177 "binlog.position",
178 "Starting binlog position (if not using GTID)",
179 "4",
180 ),
181 ConfigKeySpec::optional(
183 "snapshot.mode",
184 "Snapshot mode (initial/never/always/schema_only)",
185 "initial",
186 ),
187 ConfigKeySpec::optional(
189 "table.include",
190 "Comma-separated list of tables to include",
191 "",
192 ),
193 ConfigKeySpec::optional(
194 "table.exclude",
195 "Comma-separated list of tables to exclude",
196 "",
197 ),
198 ConfigKeySpec::optional("database.filter", "Database name filter pattern", ""),
199 ConfigKeySpec::optional(
201 "poll.timeout.ms",
202 "Timeout for polling binlog events (milliseconds)",
203 "1000",
204 ),
205 ConfigKeySpec::optional("max.poll.records", "Maximum records per poll batch", "1000"),
206 ConfigKeySpec::optional(
207 "heartbeat.interval.ms",
208 "Heartbeat interval (milliseconds)",
209 "30000",
210 ),
211 ConfigKeySpec::optional(
212 "connect.timeout.ms",
213 "Connection timeout (milliseconds)",
214 "10000",
215 ),
216 ConfigKeySpec::optional("read.timeout.ms", "Read timeout (milliseconds)", "60000"),
217 ]
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn test_register_mysql_cdc_source() {
226 let registry = ConnectorRegistry::new();
227 register_mysql_cdc_source(®istry);
228
229 let source_list = registry.list_sources();
231 assert!(source_list.contains(&"mysql-cdc".to_string()));
232 }
233
234 #[test]
235 fn test_config_key_specs() {
236 let specs = config_key_specs();
237
238 assert!(specs.len() >= 15);
240
241 let required: Vec<_> = specs.iter().filter(|s| s.required).collect();
243 assert!(required.iter().any(|s| s.key == "username"));
244 assert!(required.iter().any(|s| s.key == "password"));
245 assert!(required.iter().any(|s| s.key == "server.id"));
246
247 let host_spec = specs.iter().find(|s| s.key == "host").unwrap();
249 assert!(!host_spec.required);
250 assert_eq!(host_spec.default, Some("localhost".to_string()));
251
252 let port_spec = specs.iter().find(|s| s.key == "port").unwrap();
253 assert!(!port_spec.required);
254 assert_eq!(port_spec.default, Some("3306".to_string()));
255
256 let ssl_spec = specs.iter().find(|s| s.key == "ssl.mode").unwrap();
257 assert!(!ssl_spec.required);
258 assert_eq!(ssl_spec.default, Some("preferred".to_string()));
259 }
260
261 #[test]
262 fn test_public_exports() {
263 let _config = MySqlCdcConfig::default();
265 let _ssl = SslMode::Preferred;
266 let _snapshot = SnapshotMode::Initial;
267 let _metrics = MySqlCdcMetrics::new();
268 let _cache = TableCache::new();
269 let _op = CdcOperation::Insert;
270 }
271}