laminar_connectors/cdc/mysql/
schema.rs1use std::collections::HashMap;
7
8use arrow_schema::{DataType, Field, Schema};
9
10use super::decoder::TableMapMessage;
11use super::types::MySqlColumn;
12
13#[derive(Debug, Clone)]
15pub struct TableInfo {
16 pub table_id: u64,
18 pub database: String,
20 pub table: String,
22 pub columns: Vec<MySqlColumn>,
24 pub arrow_schema: Schema,
26}
27
28impl TableInfo {
29 #[must_use]
31 pub fn from_table_map(msg: &TableMapMessage) -> Self {
32 let fields: Vec<Field> = msg
33 .columns
34 .iter()
35 .map(|col| Field::new(&col.name, col.to_arrow_type(), col.nullable))
36 .collect();
37
38 Self {
39 table_id: msg.table_id,
40 database: msg.database.clone(),
41 table: msg.table.clone(),
42 columns: msg.columns.clone(),
43 arrow_schema: Schema::new(fields),
44 }
45 }
46
47 #[must_use]
49 pub fn full_name(&self) -> String {
50 format!("{}.{}", self.database, self.table)
51 }
52
53 #[must_use]
55 pub fn column_count(&self) -> usize {
56 self.columns.len()
57 }
58
59 #[must_use]
61 pub fn find_column(&self, name: &str) -> Option<(usize, &MySqlColumn)> {
62 self.columns
63 .iter()
64 .enumerate()
65 .find(|(_, c)| c.name == name)
66 }
67}
68
69#[derive(Debug, Default)]
74pub struct TableCache {
75 by_id: HashMap<u64, TableInfo>,
77 by_name: HashMap<(String, String), u64>,
79}
80
81impl TableCache {
82 #[must_use]
84 pub fn new() -> Self {
85 Self::default()
86 }
87
88 pub fn update(&mut self, msg: &TableMapMessage) {
90 let info = TableInfo::from_table_map(msg);
91 let key = (msg.database.clone(), msg.table.clone());
92
93 self.by_name.insert(key, msg.table_id);
94 self.by_id.insert(msg.table_id, info);
95 }
96
97 #[must_use]
99 pub fn get(&self, table_id: u64) -> Option<&TableInfo> {
100 self.by_id.get(&table_id)
101 }
102
103 #[must_use]
105 pub fn get_by_name(&self, database: &str, table: &str) -> Option<&TableInfo> {
106 let key = (database.to_string(), table.to_string());
107 self.by_name.get(&key).and_then(|id| self.by_id.get(id))
108 }
109
110 #[must_use]
112 pub fn len(&self) -> usize {
113 self.by_id.len()
114 }
115
116 #[must_use]
118 pub fn is_empty(&self) -> bool {
119 self.by_id.is_empty()
120 }
121
122 pub fn clear(&mut self) {
124 self.by_id.clear();
125 self.by_name.clear();
126 }
127
128 #[must_use]
130 pub fn table_names(&self) -> Vec<String> {
131 self.by_id.values().map(TableInfo::full_name).collect()
132 }
133}
134
135#[must_use]
147pub fn cdc_envelope_schema(table_schema: &Schema) -> Schema {
148 let before_fields = table_schema
149 .fields()
150 .iter()
151 .map(|f| Field::new(f.name(), f.data_type().clone(), true))
152 .collect::<Vec<_>>();
153
154 let after_fields = table_schema
155 .fields()
156 .iter()
157 .map(|f| Field::new(f.name(), f.data_type().clone(), true))
158 .collect::<Vec<_>>();
159
160 Schema::new(vec![
161 Field::new("_table", DataType::Utf8, false),
162 Field::new("_op", DataType::Utf8, false),
163 Field::new("_ts_ms", DataType::Int64, false),
164 Field::new("_binlog_file", DataType::Utf8, true),
165 Field::new("_binlog_pos", DataType::UInt64, true),
166 Field::new("_gtid", DataType::Utf8, true),
167 Field::new("_before", DataType::Struct(before_fields.into()), true),
168 Field::new("_after", DataType::Struct(after_fields.into()), true),
169 ])
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use crate::cdc::mysql::types::mysql_type;
176
177 fn make_test_columns() -> Vec<MySqlColumn> {
178 vec![
179 MySqlColumn::new("id".to_string(), mysql_type::LONGLONG, 0, false, false),
180 MySqlColumn::new("name".to_string(), mysql_type::VARCHAR, 255, true, false),
181 MySqlColumn::new(
182 "created_at".to_string(),
183 mysql_type::TIMESTAMP,
184 0,
185 true,
186 false,
187 ),
188 ]
189 }
190
191 fn make_test_table_map() -> TableMapMessage {
192 TableMapMessage {
193 table_id: 100,
194 database: "testdb".to_string(),
195 table: "users".to_string(),
196 columns: make_test_columns(),
197 }
198 }
199
200 #[test]
201 fn test_table_info_from_table_map() {
202 let msg = make_test_table_map();
203 let info = TableInfo::from_table_map(&msg);
204
205 assert_eq!(info.table_id, 100);
206 assert_eq!(info.database, "testdb");
207 assert_eq!(info.table, "users");
208 assert_eq!(info.column_count(), 3);
209 assert_eq!(info.full_name(), "testdb.users");
210 }
211
212 #[test]
213 fn test_table_info_arrow_schema() {
214 let msg = make_test_table_map();
215 let info = TableInfo::from_table_map(&msg);
216
217 assert_eq!(info.arrow_schema.fields().len(), 3);
218 assert_eq!(info.arrow_schema.field(0).name(), "id");
219 assert_eq!(info.arrow_schema.field(0).data_type(), &DataType::Int64);
220 assert!(!info.arrow_schema.field(0).is_nullable());
221
222 assert_eq!(info.arrow_schema.field(1).name(), "name");
223 assert_eq!(info.arrow_schema.field(1).data_type(), &DataType::Utf8);
224 assert!(info.arrow_schema.field(1).is_nullable());
225 }
226
227 #[test]
228 fn test_table_info_find_column() {
229 let msg = make_test_table_map();
230 let info = TableInfo::from_table_map(&msg);
231
232 let (idx, col) = info.find_column("name").unwrap();
233 assert_eq!(idx, 1);
234 assert_eq!(col.name, "name");
235
236 assert!(info.find_column("nonexistent").is_none());
237 }
238
239 #[test]
240 fn test_table_cache_empty() {
241 let cache = TableCache::new();
242 assert!(cache.is_empty());
243 assert_eq!(cache.len(), 0);
244 }
245
246 #[test]
247 fn test_table_cache_update_and_get() {
248 let mut cache = TableCache::new();
249 let msg = make_test_table_map();
250
251 cache.update(&msg);
252
253 assert!(!cache.is_empty());
254 assert_eq!(cache.len(), 1);
255
256 let info = cache.get(100).unwrap();
257 assert_eq!(info.table, "users");
258 }
259
260 #[test]
261 fn test_table_cache_get_by_name() {
262 let mut cache = TableCache::new();
263 let msg = make_test_table_map();
264 cache.update(&msg);
265
266 let info = cache.get_by_name("testdb", "users").unwrap();
267 assert_eq!(info.table_id, 100);
268
269 assert!(cache.get_by_name("testdb", "nonexistent").is_none());
270 }
271
272 #[test]
273 fn test_table_cache_clear() {
274 let mut cache = TableCache::new();
275 cache.update(&make_test_table_map());
276 assert_eq!(cache.len(), 1);
277
278 cache.clear();
279 assert!(cache.is_empty());
280 }
281
282 #[test]
283 fn test_table_cache_table_names() {
284 let mut cache = TableCache::new();
285 cache.update(&make_test_table_map());
286
287 let names = cache.table_names();
288 assert_eq!(names, vec!["testdb.users"]);
289 }
290
291 #[test]
292 fn test_table_cache_multiple_tables() {
293 let mut cache = TableCache::new();
294
295 cache.update(&TableMapMessage {
296 table_id: 100,
297 database: "db1".to_string(),
298 table: "users".to_string(),
299 columns: make_test_columns(),
300 });
301
302 cache.update(&TableMapMessage {
303 table_id: 101,
304 database: "db1".to_string(),
305 table: "orders".to_string(),
306 columns: make_test_columns(),
307 });
308
309 assert_eq!(cache.len(), 2);
310 assert!(cache.get(100).is_some());
311 assert!(cache.get(101).is_some());
312 }
313
314 #[test]
315 fn test_cdc_envelope_schema() {
316 let msg = make_test_table_map();
317 let info = TableInfo::from_table_map(&msg);
318 let envelope = cdc_envelope_schema(&info.arrow_schema);
319
320 assert_eq!(envelope.fields().len(), 8);
321 assert_eq!(envelope.field(0).name(), "_table");
322 assert_eq!(envelope.field(1).name(), "_op");
323 assert_eq!(envelope.field(2).name(), "_ts_ms");
324 assert_eq!(envelope.field(3).name(), "_binlog_file");
325 assert_eq!(envelope.field(4).name(), "_binlog_pos");
326 assert_eq!(envelope.field(5).name(), "_gtid");
327 assert_eq!(envelope.field(6).name(), "_before");
328 assert_eq!(envelope.field(7).name(), "_after");
329
330 if let DataType::Struct(fields) = envelope.field(6).data_type() {
332 assert_eq!(fields.len(), 3);
333 } else {
334 panic!("_before should be a struct");
335 }
336 }
337}