Skip to main content

laminar_connectors/cdc/mysql/
schema.rs

1//! MySQL table schema cache.
2//!
3//! Caches TABLE_MAP_EVENT information for resolving row events
4//! to their corresponding table schemas.
5
6use std::collections::HashMap;
7
8use arrow_schema::{DataType, Field, Schema};
9
10use super::decoder::TableMapMessage;
11use super::types::MySqlColumn;
12
13/// Table schema information from TABLE_MAP_EVENT.
14#[derive(Debug, Clone)]
15pub struct TableInfo {
16    /// Table ID (internal MySQL identifier).
17    pub table_id: u64,
18    /// Database name.
19    pub database: String,
20    /// Table name.
21    pub table: String,
22    /// Column definitions.
23    pub columns: Vec<MySqlColumn>,
24    /// Arrow schema derived from columns.
25    pub arrow_schema: Schema,
26}
27
28impl TableInfo {
29    /// Creates a new table info from a TABLE_MAP message.
30    #[must_use]
31    pub fn from_table_map(msg: &TableMapMessage) -> Self {
32        let fields: Vec<Field> = msg
33            .columns
34            .iter()
35            .map(|col| Field::new(&col.name, col.to_arrow_type(), col.nullable))
36            .collect();
37
38        Self {
39            table_id: msg.table_id,
40            database: msg.database.clone(),
41            table: msg.table.clone(),
42            columns: msg.columns.clone(),
43            arrow_schema: Schema::new(fields),
44        }
45    }
46
47    /// Returns the fully-qualified table name.
48    #[must_use]
49    pub fn full_name(&self) -> String {
50        format!("{}.{}", self.database, self.table)
51    }
52
53    /// Returns the number of columns.
54    #[must_use]
55    pub fn column_count(&self) -> usize {
56        self.columns.len()
57    }
58
59    /// Finds a column by name.
60    #[must_use]
61    pub fn find_column(&self, name: &str) -> Option<(usize, &MySqlColumn)> {
62        self.columns
63            .iter()
64            .enumerate()
65            .find(|(_, c)| c.name == name)
66    }
67}
68
69/// Cache of table schemas indexed by table ID.
70///
71/// MySQL sends TABLE_MAP_EVENT before row events to describe the schema.
72/// This cache stores the mappings for efficient lookup.
73#[derive(Debug, Default)]
74pub struct TableCache {
75    /// Table ID → TableInfo mapping.
76    by_id: HashMap<u64, TableInfo>,
77    /// (database, table) → table_id mapping.
78    by_name: HashMap<(String, String), u64>,
79}
80
81impl TableCache {
82    /// Creates a new empty cache.
83    #[must_use]
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    /// Updates the cache with a TABLE_MAP message.
89    pub fn update(&mut self, msg: &TableMapMessage) {
90        let info = TableInfo::from_table_map(msg);
91        let key = (msg.database.clone(), msg.table.clone());
92
93        self.by_name.insert(key, msg.table_id);
94        self.by_id.insert(msg.table_id, info);
95    }
96
97    /// Gets table info by table ID.
98    #[must_use]
99    pub fn get(&self, table_id: u64) -> Option<&TableInfo> {
100        self.by_id.get(&table_id)
101    }
102
103    /// Gets table info by database and table name.
104    #[must_use]
105    pub fn get_by_name(&self, database: &str, table: &str) -> Option<&TableInfo> {
106        let key = (database.to_string(), table.to_string());
107        self.by_name.get(&key).and_then(|id| self.by_id.get(id))
108    }
109
110    /// Returns the number of cached tables.
111    #[must_use]
112    pub fn len(&self) -> usize {
113        self.by_id.len()
114    }
115
116    /// Returns true if the cache is empty.
117    #[must_use]
118    pub fn is_empty(&self) -> bool {
119        self.by_id.is_empty()
120    }
121
122    /// Clears the cache.
123    pub fn clear(&mut self) {
124        self.by_id.clear();
125        self.by_name.clear();
126    }
127
128    /// Returns all cached table names.
129    #[must_use]
130    pub fn table_names(&self) -> Vec<String> {
131        self.by_id.values().map(TableInfo::full_name).collect()
132    }
133}
134
135/// Builds the CDC envelope schema for MySQL CDC records.
136///
137/// The envelope wraps the actual row data with CDC metadata:
138/// - `_table`: Source table name
139/// - `_op`: Operation type (I/U/D)
140/// - `_ts_ms`: Event timestamp
141/// - `_binlog_file`: Binlog filename
142/// - `_binlog_pos`: Position in binlog
143/// - `_gtid`: GTID (if available)
144/// - `_before`: Before image (for updates/deletes)
145/// - `_after`: After image (for inserts/updates)
146#[must_use]
147pub fn cdc_envelope_schema(table_schema: &Schema) -> Schema {
148    let before_fields = table_schema
149        .fields()
150        .iter()
151        .map(|f| Field::new(f.name(), f.data_type().clone(), true))
152        .collect::<Vec<_>>();
153
154    let after_fields = table_schema
155        .fields()
156        .iter()
157        .map(|f| Field::new(f.name(), f.data_type().clone(), true))
158        .collect::<Vec<_>>();
159
160    Schema::new(vec![
161        Field::new("_table", DataType::Utf8, false),
162        Field::new("_op", DataType::Utf8, false),
163        Field::new("_ts_ms", DataType::Int64, false),
164        Field::new("_binlog_file", DataType::Utf8, true),
165        Field::new("_binlog_pos", DataType::UInt64, true),
166        Field::new("_gtid", DataType::Utf8, true),
167        Field::new("_before", DataType::Struct(before_fields.into()), true),
168        Field::new("_after", DataType::Struct(after_fields.into()), true),
169    ])
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::cdc::mysql::types::mysql_type;
176
177    fn make_test_columns() -> Vec<MySqlColumn> {
178        vec![
179            MySqlColumn::new("id".to_string(), mysql_type::LONGLONG, 0, false, false),
180            MySqlColumn::new("name".to_string(), mysql_type::VARCHAR, 255, true, false),
181            MySqlColumn::new(
182                "created_at".to_string(),
183                mysql_type::TIMESTAMP,
184                0,
185                true,
186                false,
187            ),
188        ]
189    }
190
191    fn make_test_table_map() -> TableMapMessage {
192        TableMapMessage {
193            table_id: 100,
194            database: "testdb".to_string(),
195            table: "users".to_string(),
196            columns: make_test_columns(),
197        }
198    }
199
200    #[test]
201    fn test_table_info_from_table_map() {
202        let msg = make_test_table_map();
203        let info = TableInfo::from_table_map(&msg);
204
205        assert_eq!(info.table_id, 100);
206        assert_eq!(info.database, "testdb");
207        assert_eq!(info.table, "users");
208        assert_eq!(info.column_count(), 3);
209        assert_eq!(info.full_name(), "testdb.users");
210    }
211
212    #[test]
213    fn test_table_info_arrow_schema() {
214        let msg = make_test_table_map();
215        let info = TableInfo::from_table_map(&msg);
216
217        assert_eq!(info.arrow_schema.fields().len(), 3);
218        assert_eq!(info.arrow_schema.field(0).name(), "id");
219        assert_eq!(info.arrow_schema.field(0).data_type(), &DataType::Int64);
220        assert!(!info.arrow_schema.field(0).is_nullable());
221
222        assert_eq!(info.arrow_schema.field(1).name(), "name");
223        assert_eq!(info.arrow_schema.field(1).data_type(), &DataType::Utf8);
224        assert!(info.arrow_schema.field(1).is_nullable());
225    }
226
227    #[test]
228    fn test_table_info_find_column() {
229        let msg = make_test_table_map();
230        let info = TableInfo::from_table_map(&msg);
231
232        let (idx, col) = info.find_column("name").unwrap();
233        assert_eq!(idx, 1);
234        assert_eq!(col.name, "name");
235
236        assert!(info.find_column("nonexistent").is_none());
237    }
238
239    #[test]
240    fn test_table_cache_empty() {
241        let cache = TableCache::new();
242        assert!(cache.is_empty());
243        assert_eq!(cache.len(), 0);
244    }
245
246    #[test]
247    fn test_table_cache_update_and_get() {
248        let mut cache = TableCache::new();
249        let msg = make_test_table_map();
250
251        cache.update(&msg);
252
253        assert!(!cache.is_empty());
254        assert_eq!(cache.len(), 1);
255
256        let info = cache.get(100).unwrap();
257        assert_eq!(info.table, "users");
258    }
259
260    #[test]
261    fn test_table_cache_get_by_name() {
262        let mut cache = TableCache::new();
263        let msg = make_test_table_map();
264        cache.update(&msg);
265
266        let info = cache.get_by_name("testdb", "users").unwrap();
267        assert_eq!(info.table_id, 100);
268
269        assert!(cache.get_by_name("testdb", "nonexistent").is_none());
270    }
271
272    #[test]
273    fn test_table_cache_clear() {
274        let mut cache = TableCache::new();
275        cache.update(&make_test_table_map());
276        assert_eq!(cache.len(), 1);
277
278        cache.clear();
279        assert!(cache.is_empty());
280    }
281
282    #[test]
283    fn test_table_cache_table_names() {
284        let mut cache = TableCache::new();
285        cache.update(&make_test_table_map());
286
287        let names = cache.table_names();
288        assert_eq!(names, vec!["testdb.users"]);
289    }
290
291    #[test]
292    fn test_table_cache_multiple_tables() {
293        let mut cache = TableCache::new();
294
295        cache.update(&TableMapMessage {
296            table_id: 100,
297            database: "db1".to_string(),
298            table: "users".to_string(),
299            columns: make_test_columns(),
300        });
301
302        cache.update(&TableMapMessage {
303            table_id: 101,
304            database: "db1".to_string(),
305            table: "orders".to_string(),
306            columns: make_test_columns(),
307        });
308
309        assert_eq!(cache.len(), 2);
310        assert!(cache.get(100).is_some());
311        assert!(cache.get(101).is_some());
312    }
313
314    #[test]
315    fn test_cdc_envelope_schema() {
316        let msg = make_test_table_map();
317        let info = TableInfo::from_table_map(&msg);
318        let envelope = cdc_envelope_schema(&info.arrow_schema);
319
320        assert_eq!(envelope.fields().len(), 8);
321        assert_eq!(envelope.field(0).name(), "_table");
322        assert_eq!(envelope.field(1).name(), "_op");
323        assert_eq!(envelope.field(2).name(), "_ts_ms");
324        assert_eq!(envelope.field(3).name(), "_binlog_file");
325        assert_eq!(envelope.field(4).name(), "_binlog_pos");
326        assert_eq!(envelope.field(5).name(), "_gtid");
327        assert_eq!(envelope.field(6).name(), "_before");
328        assert_eq!(envelope.field(7).name(), "_after");
329
330        // Verify struct fields
331        if let DataType::Struct(fields) = envelope.field(6).data_type() {
332            assert_eq!(fields.len(), 3);
333        } else {
334            panic!("_before should be a struct");
335        }
336    }
337}