Skip to main content

laminar_connectors/cdc/mysql/
schema.rs

1//! MySQL table schema cache.
2//!
3//! Caches TABLE_MAP_EVENT information for resolving row events
4//! to their corresponding table schemas.
5
6use std::collections::HashMap;
7
8use arrow_schema::{DataType, Field, Schema};
9
10use super::decoder::TableMapMessage;
11use super::types::MySqlColumn;
12
13/// Table schema information from TABLE_MAP_EVENT.
14#[derive(Debug, Clone)]
15pub struct TableInfo {
16    /// Table ID (internal MySQL identifier).
17    pub table_id: u64,
18    /// Database name.
19    pub database: String,
20    /// Table name.
21    pub table: String,
22    /// Column definitions.
23    pub columns: Vec<MySqlColumn>,
24    /// Arrow schema derived from columns.
25    pub arrow_schema: Schema,
26}
27
28impl TableInfo {
29    /// Creates a new table info from a TABLE_MAP message.
30    #[must_use]
31    pub fn from_table_map(msg: &TableMapMessage) -> Self {
32        let fields: Vec<Field> = msg
33            .columns
34            .iter()
35            .map(|col| Field::new(&col.name, col.to_arrow_type(), col.nullable))
36            .collect();
37
38        Self {
39            table_id: msg.table_id,
40            database: msg.database.clone(),
41            table: msg.table.clone(),
42            columns: msg.columns.clone(),
43            arrow_schema: Schema::new(fields),
44        }
45    }
46
47    /// Returns the fully-qualified table name.
48    #[must_use]
49    pub fn full_name(&self) -> String {
50        format!("{}.{}", self.database, self.table)
51    }
52
53    /// Returns the number of columns.
54    #[must_use]
55    pub fn column_count(&self) -> usize {
56        self.columns.len()
57    }
58
59    /// Finds a column by name.
60    #[must_use]
61    pub fn find_column(&self, name: &str) -> Option<(usize, &MySqlColumn)> {
62        self.columns
63            .iter()
64            .enumerate()
65            .find(|(_, c)| c.name == name)
66    }
67}
68
69/// Cache of table schemas indexed by table ID.
70///
71/// MySQL sends TABLE_MAP_EVENT before row events to describe the schema.
72/// This cache stores the mappings for efficient lookup.
73#[derive(Debug, Default)]
74pub struct TableCache {
75    /// Table ID → TableInfo mapping.
76    by_id: HashMap<u64, TableInfo>,
77    /// (database, table) → table_id mapping.
78    by_name: HashMap<(String, String), u64>,
79}
80
81impl TableCache {
82    /// Creates a new empty cache.
83    #[must_use]
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    /// Updates the cache with a TABLE_MAP message.
89    pub fn update(&mut self, msg: &TableMapMessage) {
90        let info = TableInfo::from_table_map(msg);
91        let key = (msg.database.clone(), msg.table.clone());
92
93        self.by_name.insert(key, msg.table_id);
94        self.by_id.insert(msg.table_id, info);
95    }
96
97    /// Gets table info by table ID.
98    #[must_use]
99    pub fn get(&self, table_id: u64) -> Option<&TableInfo> {
100        self.by_id.get(&table_id)
101    }
102
103    /// Gets table info by database and table name.
104    #[must_use]
105    pub fn get_by_name(&self, database: &str, table: &str) -> Option<&TableInfo> {
106        let key = (database.to_string(), table.to_string());
107        self.by_name.get(&key).and_then(|id| self.by_id.get(id))
108    }
109
110    /// Returns the number of cached tables.
111    #[must_use]
112    pub fn len(&self) -> usize {
113        self.by_id.len()
114    }
115
116    /// Returns true if the cache is empty.
117    #[must_use]
118    pub fn is_empty(&self) -> bool {
119        self.by_id.is_empty()
120    }
121
122    /// Clears the cache.
123    pub fn clear(&mut self) {
124        self.by_id.clear();
125        self.by_name.clear();
126    }
127
128    /// Returns all cached table names.
129    #[must_use]
130    pub fn table_names(&self) -> Vec<String> {
131        self.by_id.values().map(TableInfo::full_name).collect()
132    }
133}
134
135/// Builds the CDC envelope schema for MySQL CDC records.
136///
137/// The envelope wraps the actual row data with CDC metadata:
138/// - `_table`: Source table name
139/// - `_op`: Operation type (I/U/D)
140/// - `_ts_ms`: Event timestamp
141/// - `_binlog_file`: Binlog filename
142/// - `_binlog_pos`: Position in binlog
143/// - `_gtid`: GTID (if available)
144/// - `_before`: Before image (for updates/deletes)
145/// - `_after`: After image (for inserts/updates)
146#[must_use]
147pub fn cdc_envelope_schema(table_schema: &Schema) -> Schema {
148    use arrow_schema::TimeUnit;
149
150    let before_fields = table_schema
151        .fields()
152        .iter()
153        .map(|f| Field::new(f.name(), f.data_type().clone(), true))
154        .collect::<Vec<_>>();
155
156    let after_fields = table_schema
157        .fields()
158        .iter()
159        .map(|f| Field::new(f.name(), f.data_type().clone(), true))
160        .collect::<Vec<_>>();
161
162    Schema::new(vec![
163        Field::new("_table", DataType::Utf8, false),
164        Field::new("_op", DataType::Utf8, false),
165        Field::new(
166            "_ts_ms",
167            DataType::Timestamp(TimeUnit::Millisecond, None),
168            false,
169        ),
170        Field::new("_binlog_file", DataType::Utf8, true),
171        Field::new("_binlog_pos", DataType::UInt64, true),
172        Field::new("_gtid", DataType::Utf8, true),
173        Field::new("_before", DataType::Struct(before_fields.into()), true),
174        Field::new("_after", DataType::Struct(after_fields.into()), true),
175    ])
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::cdc::mysql::types::mysql_type;
182
183    fn make_test_columns() -> Vec<MySqlColumn> {
184        vec![
185            MySqlColumn::new("id".to_string(), mysql_type::LONGLONG, 0, false, false),
186            MySqlColumn::new("name".to_string(), mysql_type::VARCHAR, 255, true, false),
187            MySqlColumn::new(
188                "created_at".to_string(),
189                mysql_type::TIMESTAMP,
190                0,
191                true,
192                false,
193            ),
194        ]
195    }
196
197    fn make_test_table_map() -> TableMapMessage {
198        TableMapMessage {
199            table_id: 100,
200            database: "testdb".to_string(),
201            table: "users".to_string(),
202            columns: make_test_columns(),
203        }
204    }
205
206    #[test]
207    fn test_table_info_from_table_map() {
208        let msg = make_test_table_map();
209        let info = TableInfo::from_table_map(&msg);
210
211        assert_eq!(info.table_id, 100);
212        assert_eq!(info.database, "testdb");
213        assert_eq!(info.table, "users");
214        assert_eq!(info.column_count(), 3);
215        assert_eq!(info.full_name(), "testdb.users");
216    }
217
218    #[test]
219    fn test_table_info_arrow_schema() {
220        let msg = make_test_table_map();
221        let info = TableInfo::from_table_map(&msg);
222
223        assert_eq!(info.arrow_schema.fields().len(), 3);
224        assert_eq!(info.arrow_schema.field(0).name(), "id");
225        assert_eq!(info.arrow_schema.field(0).data_type(), &DataType::Int64);
226        assert!(!info.arrow_schema.field(0).is_nullable());
227
228        assert_eq!(info.arrow_schema.field(1).name(), "name");
229        assert_eq!(info.arrow_schema.field(1).data_type(), &DataType::Utf8);
230        assert!(info.arrow_schema.field(1).is_nullable());
231    }
232
233    #[test]
234    fn test_table_info_find_column() {
235        let msg = make_test_table_map();
236        let info = TableInfo::from_table_map(&msg);
237
238        let (idx, col) = info.find_column("name").unwrap();
239        assert_eq!(idx, 1);
240        assert_eq!(col.name, "name");
241
242        assert!(info.find_column("nonexistent").is_none());
243    }
244
245    #[test]
246    fn test_table_cache_empty() {
247        let cache = TableCache::new();
248        assert!(cache.is_empty());
249        assert_eq!(cache.len(), 0);
250    }
251
252    #[test]
253    fn test_table_cache_update_and_get() {
254        let mut cache = TableCache::new();
255        let msg = make_test_table_map();
256
257        cache.update(&msg);
258
259        assert!(!cache.is_empty());
260        assert_eq!(cache.len(), 1);
261
262        let info = cache.get(100).unwrap();
263        assert_eq!(info.table, "users");
264    }
265
266    #[test]
267    fn test_table_cache_get_by_name() {
268        let mut cache = TableCache::new();
269        let msg = make_test_table_map();
270        cache.update(&msg);
271
272        let info = cache.get_by_name("testdb", "users").unwrap();
273        assert_eq!(info.table_id, 100);
274
275        assert!(cache.get_by_name("testdb", "nonexistent").is_none());
276    }
277
278    #[test]
279    fn test_table_cache_clear() {
280        let mut cache = TableCache::new();
281        cache.update(&make_test_table_map());
282        assert_eq!(cache.len(), 1);
283
284        cache.clear();
285        assert!(cache.is_empty());
286    }
287
288    #[test]
289    fn test_table_cache_table_names() {
290        let mut cache = TableCache::new();
291        cache.update(&make_test_table_map());
292
293        let names = cache.table_names();
294        assert_eq!(names, vec!["testdb.users"]);
295    }
296
297    #[test]
298    fn test_table_cache_multiple_tables() {
299        let mut cache = TableCache::new();
300
301        cache.update(&TableMapMessage {
302            table_id: 100,
303            database: "db1".to_string(),
304            table: "users".to_string(),
305            columns: make_test_columns(),
306        });
307
308        cache.update(&TableMapMessage {
309            table_id: 101,
310            database: "db1".to_string(),
311            table: "orders".to_string(),
312            columns: make_test_columns(),
313        });
314
315        assert_eq!(cache.len(), 2);
316        assert!(cache.get(100).is_some());
317        assert!(cache.get(101).is_some());
318    }
319
320    #[test]
321    fn test_cdc_envelope_schema() {
322        let msg = make_test_table_map();
323        let info = TableInfo::from_table_map(&msg);
324        let envelope = cdc_envelope_schema(&info.arrow_schema);
325
326        assert_eq!(envelope.fields().len(), 8);
327        assert_eq!(envelope.field(0).name(), "_table");
328        assert_eq!(envelope.field(1).name(), "_op");
329        assert_eq!(envelope.field(2).name(), "_ts_ms");
330        assert_eq!(envelope.field(3).name(), "_binlog_file");
331        assert_eq!(envelope.field(4).name(), "_binlog_pos");
332        assert_eq!(envelope.field(5).name(), "_gtid");
333        assert_eq!(envelope.field(6).name(), "_before");
334        assert_eq!(envelope.field(7).name(), "_after");
335
336        // Verify struct fields
337        if let DataType::Struct(fields) = envelope.field(6).data_type() {
338            assert_eq!(fields.len(), 3);
339        } else {
340            panic!("_before should be a struct");
341        }
342    }
343}