laminar_connectors/cdc/mysql/
schema.rs1use std::collections::HashMap;
7
8use arrow_schema::{DataType, Field, Schema};
9
10use super::decoder::TableMapMessage;
11use super::types::MySqlColumn;
12
13#[derive(Debug, Clone)]
15pub struct TableInfo {
16 pub table_id: u64,
18 pub database: String,
20 pub table: String,
22 pub columns: Vec<MySqlColumn>,
24 pub arrow_schema: Schema,
26}
27
28impl TableInfo {
29 #[must_use]
31 pub fn from_table_map(msg: &TableMapMessage) -> Self {
32 let fields: Vec<Field> = msg
33 .columns
34 .iter()
35 .map(|col| Field::new(&col.name, col.to_arrow_type(), col.nullable))
36 .collect();
37
38 Self {
39 table_id: msg.table_id,
40 database: msg.database.clone(),
41 table: msg.table.clone(),
42 columns: msg.columns.clone(),
43 arrow_schema: Schema::new(fields),
44 }
45 }
46
47 #[must_use]
49 pub fn full_name(&self) -> String {
50 format!("{}.{}", self.database, self.table)
51 }
52
53 #[must_use]
55 pub fn column_count(&self) -> usize {
56 self.columns.len()
57 }
58
59 #[must_use]
61 pub fn find_column(&self, name: &str) -> Option<(usize, &MySqlColumn)> {
62 self.columns
63 .iter()
64 .enumerate()
65 .find(|(_, c)| c.name == name)
66 }
67}
68
69#[derive(Debug, Default)]
74pub struct TableCache {
75 by_id: HashMap<u64, TableInfo>,
77 by_name: HashMap<(String, String), u64>,
79}
80
81impl TableCache {
82 #[must_use]
84 pub fn new() -> Self {
85 Self::default()
86 }
87
88 pub fn update(&mut self, msg: &TableMapMessage) {
90 let info = TableInfo::from_table_map(msg);
91 let key = (msg.database.clone(), msg.table.clone());
92
93 self.by_name.insert(key, msg.table_id);
94 self.by_id.insert(msg.table_id, info);
95 }
96
97 #[must_use]
99 pub fn get(&self, table_id: u64) -> Option<&TableInfo> {
100 self.by_id.get(&table_id)
101 }
102
103 #[must_use]
105 pub fn get_by_name(&self, database: &str, table: &str) -> Option<&TableInfo> {
106 let key = (database.to_string(), table.to_string());
107 self.by_name.get(&key).and_then(|id| self.by_id.get(id))
108 }
109
110 #[must_use]
112 pub fn len(&self) -> usize {
113 self.by_id.len()
114 }
115
116 #[must_use]
118 pub fn is_empty(&self) -> bool {
119 self.by_id.is_empty()
120 }
121
122 pub fn clear(&mut self) {
124 self.by_id.clear();
125 self.by_name.clear();
126 }
127
128 #[must_use]
130 pub fn table_names(&self) -> Vec<String> {
131 self.by_id.values().map(TableInfo::full_name).collect()
132 }
133}
134
135#[must_use]
147pub fn cdc_envelope_schema(table_schema: &Schema) -> Schema {
148 use arrow_schema::TimeUnit;
149
150 let before_fields = table_schema
151 .fields()
152 .iter()
153 .map(|f| Field::new(f.name(), f.data_type().clone(), true))
154 .collect::<Vec<_>>();
155
156 let after_fields = table_schema
157 .fields()
158 .iter()
159 .map(|f| Field::new(f.name(), f.data_type().clone(), true))
160 .collect::<Vec<_>>();
161
162 Schema::new(vec![
163 Field::new("_table", DataType::Utf8, false),
164 Field::new("_op", DataType::Utf8, false),
165 Field::new(
166 "_ts_ms",
167 DataType::Timestamp(TimeUnit::Millisecond, None),
168 false,
169 ),
170 Field::new("_binlog_file", DataType::Utf8, true),
171 Field::new("_binlog_pos", DataType::UInt64, true),
172 Field::new("_gtid", DataType::Utf8, true),
173 Field::new("_before", DataType::Struct(before_fields.into()), true),
174 Field::new("_after", DataType::Struct(after_fields.into()), true),
175 ])
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use crate::cdc::mysql::types::mysql_type;
182
183 fn make_test_columns() -> Vec<MySqlColumn> {
184 vec![
185 MySqlColumn::new("id".to_string(), mysql_type::LONGLONG, 0, false, false),
186 MySqlColumn::new("name".to_string(), mysql_type::VARCHAR, 255, true, false),
187 MySqlColumn::new(
188 "created_at".to_string(),
189 mysql_type::TIMESTAMP,
190 0,
191 true,
192 false,
193 ),
194 ]
195 }
196
197 fn make_test_table_map() -> TableMapMessage {
198 TableMapMessage {
199 table_id: 100,
200 database: "testdb".to_string(),
201 table: "users".to_string(),
202 columns: make_test_columns(),
203 }
204 }
205
206 #[test]
207 fn test_table_info_from_table_map() {
208 let msg = make_test_table_map();
209 let info = TableInfo::from_table_map(&msg);
210
211 assert_eq!(info.table_id, 100);
212 assert_eq!(info.database, "testdb");
213 assert_eq!(info.table, "users");
214 assert_eq!(info.column_count(), 3);
215 assert_eq!(info.full_name(), "testdb.users");
216 }
217
218 #[test]
219 fn test_table_info_arrow_schema() {
220 let msg = make_test_table_map();
221 let info = TableInfo::from_table_map(&msg);
222
223 assert_eq!(info.arrow_schema.fields().len(), 3);
224 assert_eq!(info.arrow_schema.field(0).name(), "id");
225 assert_eq!(info.arrow_schema.field(0).data_type(), &DataType::Int64);
226 assert!(!info.arrow_schema.field(0).is_nullable());
227
228 assert_eq!(info.arrow_schema.field(1).name(), "name");
229 assert_eq!(info.arrow_schema.field(1).data_type(), &DataType::Utf8);
230 assert!(info.arrow_schema.field(1).is_nullable());
231 }
232
233 #[test]
234 fn test_table_info_find_column() {
235 let msg = make_test_table_map();
236 let info = TableInfo::from_table_map(&msg);
237
238 let (idx, col) = info.find_column("name").unwrap();
239 assert_eq!(idx, 1);
240 assert_eq!(col.name, "name");
241
242 assert!(info.find_column("nonexistent").is_none());
243 }
244
245 #[test]
246 fn test_table_cache_empty() {
247 let cache = TableCache::new();
248 assert!(cache.is_empty());
249 assert_eq!(cache.len(), 0);
250 }
251
252 #[test]
253 fn test_table_cache_update_and_get() {
254 let mut cache = TableCache::new();
255 let msg = make_test_table_map();
256
257 cache.update(&msg);
258
259 assert!(!cache.is_empty());
260 assert_eq!(cache.len(), 1);
261
262 let info = cache.get(100).unwrap();
263 assert_eq!(info.table, "users");
264 }
265
266 #[test]
267 fn test_table_cache_get_by_name() {
268 let mut cache = TableCache::new();
269 let msg = make_test_table_map();
270 cache.update(&msg);
271
272 let info = cache.get_by_name("testdb", "users").unwrap();
273 assert_eq!(info.table_id, 100);
274
275 assert!(cache.get_by_name("testdb", "nonexistent").is_none());
276 }
277
278 #[test]
279 fn test_table_cache_clear() {
280 let mut cache = TableCache::new();
281 cache.update(&make_test_table_map());
282 assert_eq!(cache.len(), 1);
283
284 cache.clear();
285 assert!(cache.is_empty());
286 }
287
288 #[test]
289 fn test_table_cache_table_names() {
290 let mut cache = TableCache::new();
291 cache.update(&make_test_table_map());
292
293 let names = cache.table_names();
294 assert_eq!(names, vec!["testdb.users"]);
295 }
296
297 #[test]
298 fn test_table_cache_multiple_tables() {
299 let mut cache = TableCache::new();
300
301 cache.update(&TableMapMessage {
302 table_id: 100,
303 database: "db1".to_string(),
304 table: "users".to_string(),
305 columns: make_test_columns(),
306 });
307
308 cache.update(&TableMapMessage {
309 table_id: 101,
310 database: "db1".to_string(),
311 table: "orders".to_string(),
312 columns: make_test_columns(),
313 });
314
315 assert_eq!(cache.len(), 2);
316 assert!(cache.get(100).is_some());
317 assert!(cache.get(101).is_some());
318 }
319
320 #[test]
321 fn test_cdc_envelope_schema() {
322 let msg = make_test_table_map();
323 let info = TableInfo::from_table_map(&msg);
324 let envelope = cdc_envelope_schema(&info.arrow_schema);
325
326 assert_eq!(envelope.fields().len(), 8);
327 assert_eq!(envelope.field(0).name(), "_table");
328 assert_eq!(envelope.field(1).name(), "_op");
329 assert_eq!(envelope.field(2).name(), "_ts_ms");
330 assert_eq!(envelope.field(3).name(), "_binlog_file");
331 assert_eq!(envelope.field(4).name(), "_binlog_pos");
332 assert_eq!(envelope.field(5).name(), "_gtid");
333 assert_eq!(envelope.field(6).name(), "_before");
334 assert_eq!(envelope.field(7).name(), "_after");
335
336 if let DataType::Struct(fields) = envelope.field(6).data_type() {
338 assert_eq!(fields.len(), 3);
339 } else {
340 panic!("_before should be a struct");
341 }
342 }
343}