Skip to main content

laminar_connectors/cdc/mysql/
mod.rs

1//! MySQL binlog replication CDC source connector. Reads row-level changes from
2//! a MySQL server's binlog (GTID or file/position based), decodes them via the
3//! `decoder` submodule, resolves column types against `schema::TableCache`, and
4//! emits Z-set `changelog::ChangeEvent`s that `MySqlCdcSource` converts into
5//! Arrow `RecordBatch`es on `poll_batch`.
6#![allow(dead_code)] // reader path is feature-gated; helpers are used conditionally
7#![allow(clippy::doc_markdown)] // MySQL terminology dominates — backtick-warning noise
8
9pub mod changelog;
10pub mod config;
11pub mod decoder;
12pub mod gtid;
13pub mod metrics;
14#[cfg(feature = "mysql-cdc")]
15pub mod mysql_io;
16pub mod schema;
17pub mod source;
18pub mod types;
19
20// Primary types
21pub use changelog::{
22    column_value_to_json, delete_to_events, events_to_record_batch, insert_to_events, row_to_json,
23    update_to_events, CdcOperation, ChangeEvent,
24};
25pub use config::{MySqlCdcConfig, SnapshotMode, SslMode};
26pub use decoder::{
27    BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DecoderError,
28    DeleteMessage, InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage,
29    UpdateMessage, UpdateRowData,
30};
31pub use gtid::{Gtid, GtidRange, GtidSet};
32pub use metrics::{MetricsSnapshot, MySqlCdcMetrics};
33pub use schema::{cdc_envelope_schema, TableCache, TableInfo};
34pub use source::MySqlCdcSource;
35pub use types::{mysql_type, mysql_type_name, mysql_type_to_arrow, mysql_type_to_sql, MySqlColumn};
36
37use std::sync::Arc;
38
39use crate::config::{ConfigKeySpec, ConnectorInfo};
40use crate::connector::SourceConnector;
41use crate::registry::ConnectorRegistry;
42
43/// Registers the MySQL CDC source connector factory on the given registry.
44pub fn register_mysql_cdc_source(registry: &ConnectorRegistry) {
45    let info = ConnectorInfo {
46        name: "mysql-cdc".to_string(),
47        display_name: "MySQL CDC Source".to_string(),
48        version: env!("CARGO_PKG_VERSION").to_string(),
49        is_source: true,
50        is_sink: false,
51        config_keys: mysql_cdc_config_keys(),
52    };
53
54    registry.register_source(
55        "mysql-cdc",
56        info.clone(),
57        Arc::new(|registry: Option<&prometheus::Registry>| {
58            Box::new(MySqlCdcSource::new(MySqlCdcConfig::default(), registry))
59                as Box<dyn SourceConnector>
60        }),
61    );
62
63    registry.register_table_source(
64        "mysql-cdc",
65        info,
66        Arc::new(|config| {
67            let connector = Box::new(MySqlCdcSource::new(MySqlCdcConfig::default(), None));
68            Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
69                connector,
70                config.clone(),
71                4096,
72            )))
73        }),
74    );
75}
76
77/// Returns the configuration key specifications for `MySQL` CDC source.
78///
79/// This is used for configuration discovery and validation.
80#[allow(clippy::too_many_lines)]
81fn mysql_cdc_config_keys() -> Vec<ConfigKeySpec> {
82    vec![
83        // Connection settings
84        ConfigKeySpec::optional("host", "MySQL server hostname", "localhost"),
85        ConfigKeySpec::optional("port", "MySQL server port", "3306"),
86        ConfigKeySpec::optional("database", "Database name to replicate", ""),
87        ConfigKeySpec::required("username", "MySQL replication user"),
88        ConfigKeySpec::required("password", "MySQL replication password"),
89        // SSL settings
90        ConfigKeySpec::optional(
91            "ssl.mode",
92            "SSL connection mode (disabled/preferred/required/verify_ca/verify_identity)",
93            "preferred",
94        ),
95        // Replication settings
96        ConfigKeySpec::required("server.id", "Unique server ID for this replication client"),
97        ConfigKeySpec::optional(
98            "use.gtid",
99            "Use GTID-based replication (recommended)",
100            "true",
101        ),
102        ConfigKeySpec::optional("gtid.set", "Starting GTID set for replication", ""),
103        ConfigKeySpec::optional(
104            "binlog.filename",
105            "Starting binlog filename (if not using GTID)",
106            "",
107        ),
108        ConfigKeySpec::optional(
109            "binlog.position",
110            "Starting binlog position (if not using GTID)",
111            "4",
112        ),
113        // Snapshot settings
114        ConfigKeySpec::optional(
115            "snapshot.mode",
116            "Snapshot mode (initial/never/always/schema_only)",
117            "initial",
118        ),
119        // Table filtering
120        ConfigKeySpec::optional(
121            "table.include",
122            "Comma-separated list of tables to include",
123            "",
124        ),
125        ConfigKeySpec::optional(
126            "table.exclude",
127            "Comma-separated list of tables to exclude",
128            "",
129        ),
130        ConfigKeySpec::optional("database.filter", "Database name filter pattern", ""),
131        // Tuning settings
132        ConfigKeySpec::optional(
133            "poll.timeout.ms",
134            "Timeout for polling binlog events (milliseconds)",
135            "1000",
136        ),
137        ConfigKeySpec::optional("max.poll.records", "Maximum records per poll batch", "1000"),
138        ConfigKeySpec::optional(
139            "heartbeat.interval.ms",
140            "Heartbeat interval (milliseconds)",
141            "30000",
142        ),
143        ConfigKeySpec::optional(
144            "connect.timeout.ms",
145            "Connection timeout (milliseconds)",
146            "10000",
147        ),
148        ConfigKeySpec::optional("read.timeout.ms", "Read timeout (milliseconds)", "60000"),
149    ]
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[test]
157    fn test_register_mysql_cdc_source() {
158        let registry = ConnectorRegistry::new();
159        register_mysql_cdc_source(&registry);
160
161        // Verify the source type is registered
162        let source_list = registry.list_sources();
163        assert!(source_list.contains(&"mysql-cdc".to_string()));
164    }
165
166    #[test]
167    fn test_mysql_cdc_config_keys() {
168        let specs = mysql_cdc_config_keys();
169
170        // Should have all expected keys
171        assert!(specs.len() >= 15);
172
173        // Required keys
174        let required: Vec<_> = specs.iter().filter(|s| s.required).collect();
175        assert!(required.iter().any(|s| s.key == "username"));
176        assert!(required.iter().any(|s| s.key == "password"));
177        assert!(required.iter().any(|s| s.key == "server.id"));
178
179        // Optional with defaults
180        let host_spec = specs.iter().find(|s| s.key == "host").unwrap();
181        assert!(!host_spec.required);
182        assert_eq!(host_spec.default, Some("localhost".to_string()));
183
184        let port_spec = specs.iter().find(|s| s.key == "port").unwrap();
185        assert!(!port_spec.required);
186        assert_eq!(port_spec.default, Some("3306".to_string()));
187
188        let ssl_spec = specs.iter().find(|s| s.key == "ssl.mode").unwrap();
189        assert!(!ssl_spec.required);
190        assert_eq!(ssl_spec.default, Some("preferred".to_string()));
191    }
192
193    #[test]
194    fn test_public_exports() {
195        // Verify all expected types are exported
196        let _config = MySqlCdcConfig::default();
197        let _ssl = SslMode::Preferred;
198        let _snapshot = SnapshotMode::Initial;
199        let _metrics = MySqlCdcMetrics::new(None);
200        let _cache = TableCache::new();
201        let _op = CdcOperation::Insert;
202    }
203}