Skip to main content

laminar_connectors/cdc/mysql/
types.rs

1//! MySQL type to Arrow type mapping.
2//!
3//! Maps MySQL column types to Arrow `DataType` for CDC record conversion.
4
5// Using wildcard import for local mysql_type constants is cleaner for this domain-specific code.
6#![allow(clippy::wildcard_imports)]
7// Multiple match arms returning the same type (e.g., TIMESTAMP | TIMESTAMP2 -> same) is intentional.
8#![allow(clippy::match_same_arms)]
9
10use arrow_schema::DataType;
11
12/// MySQL column type constants.
13///
14/// These match the field type values from MySQL's protocol.
15/// See: <https://dev.mysql.com/doc/dev/mysql-server/latest/field__types_8h.html>
16pub mod mysql_type {
17    /// DECIMAL, NUMERIC
18    pub const DECIMAL: u8 = 0x00;
19    /// TINYINT
20    pub const TINY: u8 = 0x01;
21    /// SMALLINT
22    pub const SHORT: u8 = 0x02;
23    /// INT
24    pub const LONG: u8 = 0x03;
25    /// FLOAT
26    pub const FLOAT: u8 = 0x04;
27    /// DOUBLE
28    pub const DOUBLE: u8 = 0x05;
29    /// NULL
30    pub const NULL: u8 = 0x06;
31    /// TIMESTAMP
32    pub const TIMESTAMP: u8 = 0x07;
33    /// BIGINT
34    pub const LONGLONG: u8 = 0x08;
35    /// MEDIUMINT
36    pub const INT24: u8 = 0x09;
37    /// DATE
38    pub const DATE: u8 = 0x0A;
39    /// TIME
40    pub const TIME: u8 = 0x0B;
41    /// DATETIME
42    pub const DATETIME: u8 = 0x0C;
43    /// YEAR
44    pub const YEAR: u8 = 0x0D;
45    /// Internal: NEWDATE
46    pub const NEWDATE: u8 = 0x0E;
47    /// VARCHAR
48    pub const VARCHAR: u8 = 0x0F;
49    /// BIT
50    pub const BIT: u8 = 0x10;
51    /// TIMESTAMP2 (fractional seconds)
52    pub const TIMESTAMP2: u8 = 0x11;
53    /// DATETIME2 (fractional seconds)
54    pub const DATETIME2: u8 = 0x12;
55    /// TIME2 (fractional seconds)
56    pub const TIME2: u8 = 0x13;
57    /// JSON (MySQL 5.7+)
58    pub const JSON: u8 = 0xF5;
59    /// DECIMAL (new)
60    pub const NEWDECIMAL: u8 = 0xF6;
61    /// ENUM
62    pub const ENUM: u8 = 0xF7;
63    /// SET
64    pub const SET: u8 = 0xF8;
65    /// TINYBLOB, TINYTEXT
66    pub const TINY_BLOB: u8 = 0xF9;
67    /// MEDIUMBLOB, MEDIUMTEXT
68    pub const MEDIUM_BLOB: u8 = 0xFA;
69    /// LONGBLOB, LONGTEXT
70    pub const LONG_BLOB: u8 = 0xFB;
71    /// BLOB, TEXT
72    pub const BLOB: u8 = 0xFC;
73    /// VARCHAR, VARBINARY
74    pub const VAR_STRING: u8 = 0xFD;
75    /// CHAR, BINARY
76    pub const STRING: u8 = 0xFE;
77    /// GEOMETRY
78    pub const GEOMETRY: u8 = 0xFF;
79}
80
81/// MySQL column metadata from TABLE_MAP_EVENT.
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct MySqlColumn {
84    /// Column name.
85    pub name: String,
86    /// MySQL type ID.
87    pub type_id: u8,
88    /// Column metadata (varies by type).
89    pub metadata: u16,
90    /// Whether the column is nullable.
91    pub nullable: bool,
92    /// Whether the column is unsigned (for integer types).
93    pub unsigned: bool,
94}
95
96impl MySqlColumn {
97    /// Creates a new MySQL column descriptor.
98    #[must_use]
99    pub fn new(name: String, type_id: u8, metadata: u16, nullable: bool, unsigned: bool) -> Self {
100        Self {
101            name,
102            type_id,
103            metadata,
104            nullable,
105            unsigned,
106        }
107    }
108
109    /// Converts MySQL type to Arrow `DataType`.
110    #[must_use]
111    pub fn to_arrow_type(&self) -> DataType {
112        mysql_type_to_arrow(self.type_id, self.unsigned, self.metadata)
113    }
114
115    /// Returns the display name of the MySQL type.
116    #[must_use]
117    pub fn type_name(&self) -> &'static str {
118        mysql_type_name(self.type_id)
119    }
120}
121
122/// Converts a MySQL type ID to an Arrow `DataType`.
123#[must_use]
124pub fn mysql_type_to_arrow(type_id: u8, unsigned: bool, _metadata: u16) -> DataType {
125    use mysql_type::*;
126
127    match type_id {
128        // Integers
129        TINY => {
130            if unsigned {
131                DataType::UInt8
132            } else {
133                DataType::Int8
134            }
135        }
136        SHORT | YEAR => {
137            if unsigned {
138                DataType::UInt16
139            } else {
140                DataType::Int16
141            }
142        }
143        LONG | INT24 => {
144            if unsigned {
145                DataType::UInt32
146            } else {
147                DataType::Int32
148            }
149        }
150        LONGLONG => {
151            if unsigned {
152                DataType::UInt64
153            } else {
154                DataType::Int64
155            }
156        }
157
158        // Floating point
159        FLOAT => DataType::Float32,
160        DOUBLE => DataType::Float64,
161
162        // Decimal - map to string for precision preservation
163        DECIMAL | NEWDECIMAL => DataType::Utf8,
164
165        // Date/Time types
166        DATE | NEWDATE => DataType::Date32,
167        TIME | TIME2 => DataType::Time64(arrow_schema::TimeUnit::Microsecond),
168        TIMESTAMP | TIMESTAMP2 => {
169            DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into()))
170        }
171        DATETIME | DATETIME2 => DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
172
173        // String types
174        VARCHAR | VAR_STRING | STRING | ENUM | SET => DataType::Utf8,
175
176        // Binary types
177        BIT => DataType::Binary,
178        TINY_BLOB | MEDIUM_BLOB | LONG_BLOB | BLOB => DataType::LargeBinary,
179
180        // JSON
181        JSON => DataType::Utf8, // JSON stored as string
182
183        // Geometry - binary
184        GEOMETRY => DataType::LargeBinary,
185
186        // NULL
187        NULL => DataType::Null,
188
189        // Unknown - fall back to binary
190        _ => DataType::Binary,
191    }
192}
193
194/// Returns the human-readable name of a MySQL type.
195#[must_use]
196pub fn mysql_type_name(type_id: u8) -> &'static str {
197    use mysql_type::*;
198
199    match type_id {
200        DECIMAL => "DECIMAL",
201        TINY => "TINYINT",
202        SHORT => "SMALLINT",
203        LONG => "INT",
204        FLOAT => "FLOAT",
205        DOUBLE => "DOUBLE",
206        NULL => "NULL",
207        TIMESTAMP => "TIMESTAMP",
208        LONGLONG => "BIGINT",
209        INT24 => "MEDIUMINT",
210        DATE => "DATE",
211        TIME => "TIME",
212        DATETIME => "DATETIME",
213        YEAR => "YEAR",
214        NEWDATE => "DATE",
215        VARCHAR => "VARCHAR",
216        BIT => "BIT",
217        TIMESTAMP2 => "TIMESTAMP",
218        DATETIME2 => "DATETIME",
219        TIME2 => "TIME",
220        JSON => "JSON",
221        NEWDECIMAL => "DECIMAL",
222        ENUM => "ENUM",
223        SET => "SET",
224        TINY_BLOB => "TINYBLOB",
225        MEDIUM_BLOB => "MEDIUMBLOB",
226        LONG_BLOB => "LONGBLOB",
227        BLOB => "BLOB",
228        VAR_STRING => "VARCHAR",
229        STRING => "CHAR",
230        GEOMETRY => "GEOMETRY",
231        _ => "UNKNOWN",
232    }
233}
234
235/// SQL type string for DDL generation.
236#[must_use]
237pub fn mysql_type_to_sql(type_id: u8, unsigned: bool, metadata: u16) -> String {
238    use mysql_type::*;
239
240    let base = match type_id {
241        TINY => "TINYINT",
242        SHORT => "SMALLINT",
243        LONG => "INT",
244        LONGLONG => "BIGINT",
245        INT24 => "MEDIUMINT",
246        FLOAT => "FLOAT",
247        DOUBLE => "DOUBLE",
248        DECIMAL | NEWDECIMAL => {
249            // metadata contains precision and scale
250            let precision = (metadata >> 8) as u8;
251            let scale = (metadata & 0xFF) as u8;
252            return format!("DECIMAL({precision},{scale})");
253        }
254        DATE | NEWDATE => "DATE",
255        TIME | TIME2 => "TIME",
256        TIMESTAMP | TIMESTAMP2 => "TIMESTAMP",
257        DATETIME | DATETIME2 => "DATETIME",
258        YEAR => "YEAR",
259        VARCHAR | VAR_STRING => {
260            return format!("VARCHAR({metadata})");
261        }
262        STRING => {
263            return format!("CHAR({metadata})");
264        }
265        BIT => {
266            return format!("BIT({metadata})");
267        }
268        TINY_BLOB => "TINYBLOB",
269        MEDIUM_BLOB => "MEDIUMBLOB",
270        LONG_BLOB => "LONGBLOB",
271        BLOB => "BLOB",
272        JSON => "JSON",
273        ENUM => "ENUM",
274        SET => "SET",
275        GEOMETRY => "GEOMETRY",
276        NULL => "NULL",
277        _ => "UNKNOWN",
278    };
279
280    if unsigned && matches!(type_id, TINY | SHORT | LONG | LONGLONG | INT24) {
281        format!("{base} UNSIGNED")
282    } else {
283        base.to_string()
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_integer_types() {
293        assert_eq!(
294            mysql_type_to_arrow(mysql_type::TINY, false, 0),
295            DataType::Int8
296        );
297        assert_eq!(
298            mysql_type_to_arrow(mysql_type::TINY, true, 0),
299            DataType::UInt8
300        );
301        assert_eq!(
302            mysql_type_to_arrow(mysql_type::SHORT, false, 0),
303            DataType::Int16
304        );
305        assert_eq!(
306            mysql_type_to_arrow(mysql_type::LONG, false, 0),
307            DataType::Int32
308        );
309        assert_eq!(
310            mysql_type_to_arrow(mysql_type::LONGLONG, false, 0),
311            DataType::Int64
312        );
313        assert_eq!(
314            mysql_type_to_arrow(mysql_type::LONGLONG, true, 0),
315            DataType::UInt64
316        );
317    }
318
319    #[test]
320    fn test_float_types() {
321        assert_eq!(
322            mysql_type_to_arrow(mysql_type::FLOAT, false, 0),
323            DataType::Float32
324        );
325        assert_eq!(
326            mysql_type_to_arrow(mysql_type::DOUBLE, false, 0),
327            DataType::Float64
328        );
329    }
330
331    #[test]
332    fn test_string_types() {
333        assert_eq!(
334            mysql_type_to_arrow(mysql_type::VARCHAR, false, 255),
335            DataType::Utf8
336        );
337        assert_eq!(
338            mysql_type_to_arrow(mysql_type::STRING, false, 50),
339            DataType::Utf8
340        );
341        assert_eq!(
342            mysql_type_to_arrow(mysql_type::JSON, false, 0),
343            DataType::Utf8
344        );
345    }
346
347    #[test]
348    fn test_datetime_types() {
349        assert_eq!(
350            mysql_type_to_arrow(mysql_type::DATE, false, 0),
351            DataType::Date32
352        );
353        assert!(matches!(
354            mysql_type_to_arrow(mysql_type::TIMESTAMP, false, 0),
355            DataType::Timestamp(_, Some(_))
356        ));
357        assert!(matches!(
358            mysql_type_to_arrow(mysql_type::DATETIME, false, 0),
359            DataType::Timestamp(_, None)
360        ));
361    }
362
363    #[test]
364    fn test_binary_types() {
365        assert_eq!(
366            mysql_type_to_arrow(mysql_type::BLOB, false, 0),
367            DataType::LargeBinary
368        );
369        assert_eq!(
370            mysql_type_to_arrow(mysql_type::BIT, false, 8),
371            DataType::Binary
372        );
373    }
374
375    #[test]
376    fn test_type_names() {
377        assert_eq!(mysql_type_name(mysql_type::TINY), "TINYINT");
378        assert_eq!(mysql_type_name(mysql_type::VARCHAR), "VARCHAR");
379        assert_eq!(mysql_type_name(mysql_type::JSON), "JSON");
380        assert_eq!(mysql_type_name(mysql_type::BLOB), "BLOB");
381    }
382
383    #[test]
384    fn test_mysql_column() {
385        let col = MySqlColumn::new("id".to_string(), mysql_type::LONGLONG, 0, false, false);
386        assert_eq!(col.name, "id");
387        assert_eq!(col.to_arrow_type(), DataType::Int64);
388        assert_eq!(col.type_name(), "BIGINT");
389    }
390
391    #[test]
392    fn test_type_to_sql() {
393        assert_eq!(mysql_type_to_sql(mysql_type::TINY, false, 0), "TINYINT");
394        assert_eq!(
395            mysql_type_to_sql(mysql_type::TINY, true, 0),
396            "TINYINT UNSIGNED"
397        );
398        assert_eq!(
399            mysql_type_to_sql(mysql_type::VARCHAR, false, 255),
400            "VARCHAR(255)"
401        );
402        // DECIMAL with precision 10, scale 2
403        let metadata = (10u16 << 8) | 2u16;
404        assert_eq!(
405            mysql_type_to_sql(mysql_type::NEWDECIMAL, false, metadata),
406            "DECIMAL(10,2)"
407        );
408    }
409}