Skip to main content

laminar_connectors/cdc/mysql/
mod.rs

1//! MySQL binlog replication CDC source connector.
2//!
3// Note: Many functions are not yet used because actual I/O is not implemented.
4// These allows will be removed when we add the binlog reader.
5#![allow(dead_code)]
6// MySQL CDC docs reference many MySQL-specific terms that clippy wants backticks for.
7// This is a domain-specific module where MySQL terminology is ubiquitous.
8#![allow(clippy::doc_markdown)]
9
10//!
11//! This module implements a MySQL CDC source that reads change events from
12//! MySQL binary log (binlog) replication stream. It supports:
13//!
14//! - GTID-based and file/position-based replication
15//! - Row-based replication format (INSERT/UPDATE/DELETE events)
16//! - Table filtering with include/exclude patterns
17//! - SSL/TLS connections
18//! - Automatic schema discovery via TABLE_MAP events
19//! - Z-set changelog format
20//!
21//! # Architecture
22//!
23//! ```text
24//! MySQL Server
25//!      │
26//!      │ Binlog Replication Protocol
27//!      ▼
28//! ┌─────────────────────────────────────────┐
29//! │           MySqlCdcSource                │
30//! │  ┌─────────────┐  ┌─────────────────┐  │
31//! │  │   Decoder   │  │   TableCache    │  │
32//! │  │ (binlog →   │  │ (TABLE_MAP →    │  │
33//! │  │  messages)  │  │  Arrow schema)  │  │
34//! │  └─────────────┘  └─────────────────┘  │
35//! │          │                │            │
36//! │          ▼                ▼            │
37//! │  ┌─────────────────────────────────┐   │
38//! │  │        ChangeEvent              │   │
39//! │  │  (Z-set with before/after)      │   │
40//! │  └─────────────────────────────────┘   │
41//! └─────────────────────────────────────────┘
42//!      │
43//!      ▼
44//!  RecordBatch (Arrow)
45//! ```
46//!
47//! # Example
48//!
49//! ```ignore
50//! use laminar_connectors::cdc::mysql::{MySqlCdcSource, MySqlCdcConfig};
51//!
52//! let config = MySqlCdcConfig {
53//!     host: "localhost".to_string(),
54//!     port: 3306,
55//!     username: "replicator".to_string(),
56//!     password: "secret".to_string(),
57//!     database: Some("mydb".to_string()),
58//!     server_id: 12345,
59//!     use_gtid: true,
60//!     ..Default::default()
61//! };
62//!
63//! let mut source = MySqlCdcSource::new(config);
64//! source.open(&Default::default()).await?;
65//!
66//! while let Some(batch) = source.poll_batch(1000).await? {
67//!     // Process CDC events as Arrow RecordBatch
68//!     println!("Received {} rows", batch.num_rows());
69//! }
70//! ```
71
72mod changelog;
73mod config;
74mod decoder;
75mod gtid;
76mod metrics;
77#[cfg(feature = "mysql-cdc")]
78pub mod mysql_io;
79mod schema;
80mod source;
81mod types;
82
83// Primary types
84pub use changelog::{
85    column_value_to_json, delete_to_events, events_to_record_batch, insert_to_events, row_to_json,
86    update_to_events, CdcOperation, ChangeEvent,
87};
88pub use config::{MySqlCdcConfig, SnapshotMode, SslMode};
89pub use decoder::{
90    BeginMessage, BinlogMessage, BinlogPosition, ColumnValue, CommitMessage, DecoderError,
91    DeleteMessage, InsertMessage, QueryMessage, RotateMessage, RowData, TableMapMessage,
92    UpdateMessage, UpdateRowData,
93};
94pub use gtid::{Gtid, GtidRange, GtidSet};
95pub use metrics::{MetricsSnapshot, MySqlCdcMetrics};
96pub use schema::{cdc_envelope_schema, TableCache, TableInfo};
97pub use source::MySqlCdcSource;
98pub use types::{mysql_type, mysql_type_name, mysql_type_to_arrow, mysql_type_to_sql, MySqlColumn};
99
100use std::sync::Arc;
101
102use crate::config::{ConfigKeySpec, ConnectorInfo};
103use crate::connector::SourceConnector;
104use crate::registry::ConnectorRegistry;
105
106/// Registers the MySQL CDC source connector factory.
107///
108/// After calling this function, the connector can be created via:
109/// ```ignore
110/// let source = registry.create_source(&config)?;
111/// ```
112pub fn register_mysql_cdc_source(registry: &ConnectorRegistry) {
113    let info = ConnectorInfo {
114        name: "mysql-cdc".to_string(),
115        display_name: "MySQL CDC Source".to_string(),
116        version: env!("CARGO_PKG_VERSION").to_string(),
117        is_source: true,
118        is_sink: false,
119        config_keys: config_key_specs(),
120    };
121
122    registry.register_source(
123        "mysql-cdc",
124        info.clone(),
125        Arc::new(|| {
126            Box::new(MySqlCdcSource::new(MySqlCdcConfig::default())) as Box<dyn SourceConnector>
127        }),
128    );
129
130    registry.register_table_source(
131        "mysql-cdc",
132        info,
133        Arc::new(|config| {
134            let connector = Box::new(MySqlCdcSource::new(MySqlCdcConfig::default()));
135            Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
136                connector,
137                config.clone(),
138                4096,
139            )))
140        }),
141    );
142}
143
144/// Returns the configuration key specifications for `MySQL` CDC source.
145///
146/// This is used for configuration discovery and validation.
147#[must_use]
148#[allow(clippy::too_many_lines)]
149pub fn config_key_specs() -> Vec<ConfigKeySpec> {
150    vec![
151        // Connection settings
152        ConfigKeySpec::optional("host", "MySQL server hostname", "localhost"),
153        ConfigKeySpec::optional("port", "MySQL server port", "3306"),
154        ConfigKeySpec::optional("database", "Database name to replicate", ""),
155        ConfigKeySpec::required("username", "MySQL replication user"),
156        ConfigKeySpec::required("password", "MySQL replication password"),
157        // SSL settings
158        ConfigKeySpec::optional(
159            "ssl.mode",
160            "SSL connection mode (disabled/preferred/required/verify_ca/verify_identity)",
161            "preferred",
162        ),
163        // Replication settings
164        ConfigKeySpec::required("server.id", "Unique server ID for this replication client"),
165        ConfigKeySpec::optional(
166            "use.gtid",
167            "Use GTID-based replication (recommended)",
168            "true",
169        ),
170        ConfigKeySpec::optional("gtid.set", "Starting GTID set for replication", ""),
171        ConfigKeySpec::optional(
172            "binlog.filename",
173            "Starting binlog filename (if not using GTID)",
174            "",
175        ),
176        ConfigKeySpec::optional(
177            "binlog.position",
178            "Starting binlog position (if not using GTID)",
179            "4",
180        ),
181        // Snapshot settings
182        ConfigKeySpec::optional(
183            "snapshot.mode",
184            "Snapshot mode (initial/never/always/schema_only)",
185            "initial",
186        ),
187        // Table filtering
188        ConfigKeySpec::optional(
189            "table.include",
190            "Comma-separated list of tables to include",
191            "",
192        ),
193        ConfigKeySpec::optional(
194            "table.exclude",
195            "Comma-separated list of tables to exclude",
196            "",
197        ),
198        ConfigKeySpec::optional("database.filter", "Database name filter pattern", ""),
199        // Tuning settings
200        ConfigKeySpec::optional(
201            "poll.timeout.ms",
202            "Timeout for polling binlog events (milliseconds)",
203            "1000",
204        ),
205        ConfigKeySpec::optional("max.poll.records", "Maximum records per poll batch", "1000"),
206        ConfigKeySpec::optional(
207            "heartbeat.interval.ms",
208            "Heartbeat interval (milliseconds)",
209            "30000",
210        ),
211        ConfigKeySpec::optional(
212            "connect.timeout.ms",
213            "Connection timeout (milliseconds)",
214            "10000",
215        ),
216        ConfigKeySpec::optional("read.timeout.ms", "Read timeout (milliseconds)", "60000"),
217    ]
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn test_register_mysql_cdc_source() {
226        let registry = ConnectorRegistry::new();
227        register_mysql_cdc_source(&registry);
228
229        // Verify the source type is registered
230        let source_list = registry.list_sources();
231        assert!(source_list.contains(&"mysql-cdc".to_string()));
232    }
233
234    #[test]
235    fn test_config_key_specs() {
236        let specs = config_key_specs();
237
238        // Should have all expected keys
239        assert!(specs.len() >= 15);
240
241        // Required keys
242        let required: Vec<_> = specs.iter().filter(|s| s.required).collect();
243        assert!(required.iter().any(|s| s.key == "username"));
244        assert!(required.iter().any(|s| s.key == "password"));
245        assert!(required.iter().any(|s| s.key == "server.id"));
246
247        // Optional with defaults
248        let host_spec = specs.iter().find(|s| s.key == "host").unwrap();
249        assert!(!host_spec.required);
250        assert_eq!(host_spec.default, Some("localhost".to_string()));
251
252        let port_spec = specs.iter().find(|s| s.key == "port").unwrap();
253        assert!(!port_spec.required);
254        assert_eq!(port_spec.default, Some("3306".to_string()));
255
256        let ssl_spec = specs.iter().find(|s| s.key == "ssl.mode").unwrap();
257        assert!(!ssl_spec.required);
258        assert_eq!(ssl_spec.default, Some("preferred".to_string()));
259    }
260
261    #[test]
262    fn test_public_exports() {
263        // Verify all expected types are exported
264        let _config = MySqlCdcConfig::default();
265        let _ssl = SslMode::Preferred;
266        let _snapshot = SnapshotMode::Initial;
267        let _metrics = MySqlCdcMetrics::new();
268        let _cache = TableCache::new();
269        let _op = CdcOperation::Insert;
270    }
271}