1use std::sync::Arc;
7
8use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, UInt64Array};
9use arrow_schema::{DataType, Field, Schema};
10
11use super::decoder::{
12 ColumnValue, DeleteMessage, InsertMessage, RowData, UpdateMessage, UpdateRowData,
13};
14use super::schema::TableInfo;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CdcOperation {
19 Insert,
21 UpdateBefore,
23 UpdateAfter,
25 Delete,
27}
28
29impl CdcOperation {
30 #[must_use]
32 pub fn as_str(&self) -> &'static str {
33 match self {
34 CdcOperation::Insert => "I",
35 CdcOperation::UpdateBefore => "U-",
36 CdcOperation::UpdateAfter => "U+",
37 CdcOperation::Delete => "D",
38 }
39 }
40
41 #[must_use]
43 pub fn weight(&self) -> i8 {
44 match self {
45 CdcOperation::Insert | CdcOperation::UpdateAfter => 1,
46 CdcOperation::Delete | CdcOperation::UpdateBefore => -1,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct ChangeEvent {
54 pub table: String,
56 pub operation: CdcOperation,
58 pub timestamp_ms: i64,
60 pub binlog_file: String,
62 pub binlog_position: u64,
64 pub gtid: Option<String>,
66 pub row: RowData,
68}
69
70impl ChangeEvent {
71 #[must_use]
73 pub fn insert(
74 table: String,
75 timestamp_ms: i64,
76 binlog_file: String,
77 binlog_position: u64,
78 gtid: Option<String>,
79 row: RowData,
80 ) -> Self {
81 Self {
82 table,
83 operation: CdcOperation::Insert,
84 timestamp_ms,
85 binlog_file,
86 binlog_position,
87 gtid,
88 row,
89 }
90 }
91
92 #[must_use]
94 pub fn delete(
95 table: String,
96 timestamp_ms: i64,
97 binlog_file: String,
98 binlog_position: u64,
99 gtid: Option<String>,
100 row: RowData,
101 ) -> Self {
102 Self {
103 table,
104 operation: CdcOperation::Delete,
105 timestamp_ms,
106 binlog_file,
107 binlog_position,
108 gtid,
109 row,
110 }
111 }
112}
113
114#[must_use]
116pub fn insert_to_events(
117 msg: &InsertMessage,
118 binlog_file: &str,
119 gtid: Option<&str>,
120) -> Vec<ChangeEvent> {
121 let table = format!("{}.{}", msg.database, msg.table);
122
123 msg.rows
124 .iter()
125 .map(|row| {
126 ChangeEvent::insert(
127 table.clone(),
128 msg.timestamp_ms,
129 binlog_file.to_string(),
130 msg.binlog_position,
131 gtid.map(String::from),
132 row.clone(),
133 )
134 })
135 .collect()
136}
137
138#[must_use]
144pub fn update_to_events(
145 msg: &UpdateMessage,
146 binlog_file: &str,
147 gtid: Option<&str>,
148) -> Vec<ChangeEvent> {
149 let table = format!("{}.{}", msg.database, msg.table);
150 let mut events = Vec::with_capacity(msg.rows.len() * 2);
151
152 for UpdateRowData { before, after } in &msg.rows {
153 events.push(ChangeEvent {
155 table: table.clone(),
156 operation: CdcOperation::UpdateBefore,
157 timestamp_ms: msg.timestamp_ms,
158 binlog_file: binlog_file.to_string(),
159 binlog_position: msg.binlog_position,
160 gtid: gtid.map(String::from),
161 row: before.clone(),
162 });
163
164 events.push(ChangeEvent {
166 table: table.clone(),
167 operation: CdcOperation::UpdateAfter,
168 timestamp_ms: msg.timestamp_ms,
169 binlog_file: binlog_file.to_string(),
170 binlog_position: msg.binlog_position,
171 gtid: gtid.map(String::from),
172 row: after.clone(),
173 });
174 }
175
176 events
177}
178
179#[must_use]
181pub fn delete_to_events(
182 msg: &DeleteMessage,
183 binlog_file: &str,
184 gtid: Option<&str>,
185) -> Vec<ChangeEvent> {
186 let table = format!("{}.{}", msg.database, msg.table);
187
188 msg.rows
189 .iter()
190 .map(|row| {
191 ChangeEvent::delete(
192 table.clone(),
193 msg.timestamp_ms,
194 binlog_file.to_string(),
195 msg.binlog_position,
196 gtid.map(String::from),
197 row.clone(),
198 )
199 })
200 .collect()
201}
202
203#[must_use]
205pub fn cdc_metadata_schema() -> Schema {
206 Schema::new(vec![
207 Field::new("_table", DataType::Utf8, false),
208 Field::new("_op", DataType::Utf8, false),
209 Field::new("_ts_ms", DataType::Int64, false),
210 Field::new("_binlog_file", DataType::Utf8, true),
211 Field::new("_binlog_pos", DataType::UInt64, true),
212 Field::new("_gtid", DataType::Utf8, true),
213 ])
214}
215
216pub fn events_to_record_batch(
222 events: &[ChangeEvent],
223 table_info: &TableInfo,
224) -> Result<RecordBatch, arrow_schema::ArrowError> {
225 if events.is_empty() {
226 let schema = build_cdc_schema(&table_info.arrow_schema);
228 return Ok(RecordBatch::new_empty(Arc::new(schema)));
229 }
230
231 let _n = events.len();
232
233 let tables: Vec<&str> = events.iter().map(|e| e.table.as_str()).collect();
235 let ops: Vec<&str> = events.iter().map(|e| e.operation.as_str()).collect();
236 let timestamps: Vec<i64> = events.iter().map(|e| e.timestamp_ms).collect();
237 let binlog_files: Vec<Option<&str>> = events
238 .iter()
239 .map(|e| Some(e.binlog_file.as_str()))
240 .collect();
241 let binlog_positions: Vec<Option<u64>> =
242 events.iter().map(|e| Some(e.binlog_position)).collect();
243 let gtids: Vec<Option<&str>> = events.iter().map(|e| e.gtid.as_deref()).collect();
244
245 let mut columns: Vec<ArrayRef> = vec![
246 Arc::new(StringArray::from(tables)),
247 Arc::new(StringArray::from(ops)),
248 Arc::new(Int64Array::from(timestamps)),
249 Arc::new(StringArray::from(binlog_files)),
250 Arc::new(UInt64Array::from(binlog_positions)),
251 Arc::new(StringArray::from(gtids)),
252 ];
253
254 for (col_idx, _col_def) in table_info.columns.iter().enumerate() {
256 let values: Vec<Option<String>> = events
257 .iter()
258 .map(|e| {
259 e.row.columns.get(col_idx).and_then(|v| {
260 if v.is_null() {
261 None
262 } else {
263 Some(v.to_text())
264 }
265 })
266 })
267 .collect();
268
269 let array: ArrayRef = Arc::new(StringArray::from(
272 values.iter().map(|v| v.as_deref()).collect::<Vec<_>>(),
273 ));
274 columns.push(array);
275 }
276
277 let schema = build_cdc_schema(&table_info.arrow_schema);
278 RecordBatch::try_new(Arc::new(schema), columns)
279}
280
281fn build_cdc_schema(row_schema: &Schema) -> Schema {
283 let mut fields = vec![
284 Field::new("_table", DataType::Utf8, false),
285 Field::new("_op", DataType::Utf8, false),
286 Field::new("_ts_ms", DataType::Int64, false),
287 Field::new("_binlog_file", DataType::Utf8, true),
288 Field::new("_binlog_pos", DataType::UInt64, true),
289 Field::new("_gtid", DataType::Utf8, true),
290 ];
291
292 for field in row_schema.fields() {
294 fields.push(Field::new(field.name(), DataType::Utf8, true));
295 }
296
297 Schema::new(fields)
298}
299
300#[must_use]
302pub fn column_value_to_json(value: &ColumnValue) -> serde_json::Value {
303 match value {
304 ColumnValue::Null => serde_json::Value::Null,
305 ColumnValue::SignedInt(v) => serde_json::json!(v),
306 ColumnValue::UnsignedInt(v) => serde_json::json!(v),
307 ColumnValue::Float(v) => serde_json::json!(v),
308 ColumnValue::Double(v) => serde_json::json!(v),
309 ColumnValue::String(s) => serde_json::json!(s),
310 ColumnValue::Bytes(b) => serde_json::json!(base64_encode(b)),
311 ColumnValue::Date(y, m, d) => serde_json::json!(format!("{y:04}-{m:02}-{d:02}")),
312 ColumnValue::Time(h, m, s, us) => {
313 if *us > 0 {
314 serde_json::json!(format!("{h:02}:{m:02}:{s:02}.{us:06}"))
315 } else {
316 serde_json::json!(format!("{h:02}:{m:02}:{s:02}"))
317 }
318 }
319 ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
320 if *us > 0 {
321 serde_json::json!(format!(
322 "{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}.{us:06}"
323 ))
324 } else {
325 serde_json::json!(format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}"))
326 }
327 }
328 ColumnValue::Timestamp(us) => serde_json::json!(us),
329 ColumnValue::Json(s) => serde_json::from_str(s).unwrap_or_else(|e| {
330 tracing::warn!(
331 error = %e,
332 json_len = s.len(),
333 "[LDB-4003] MySQL CDC JSON column parse failed, treating as string literal"
334 );
335 serde_json::json!(s)
336 }),
337 }
338}
339
340#[must_use]
342pub fn row_to_json(row: &RowData, columns: &[super::types::MySqlColumn]) -> serde_json::Value {
343 let mut obj = serde_json::Map::new();
344 for (i, col) in columns.iter().enumerate() {
345 if let Some(value) = row.columns.get(i) {
346 obj.insert(col.name.clone(), column_value_to_json(value));
347 }
348 }
349 serde_json::Value::Object(obj)
350}
351
352fn base64_encode(data: &[u8]) -> String {
354 use std::fmt::Write;
355 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
356
357 let mut result = String::new();
358 for chunk in data.chunks(3) {
359 let b0 = chunk[0] as usize;
360 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
361 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
362
363 let _ = result.write_char(ALPHABET[b0 >> 2] as char);
364 let _ = result.write_char(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
365
366 if chunk.len() > 1 {
367 let _ = result.write_char(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
368 } else {
369 result.push('=');
370 }
371
372 if chunk.len() > 2 {
373 let _ = result.write_char(ALPHABET[b2 & 0x3f] as char);
374 } else {
375 result.push('=');
376 }
377 }
378 result
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 fn make_test_row() -> RowData {
386 RowData {
387 columns: vec![
388 ColumnValue::SignedInt(1),
389 ColumnValue::String("Alice".to_string()),
390 ColumnValue::Null,
391 ],
392 }
393 }
394
395 #[test]
396 fn test_cdc_operation_as_str() {
397 assert_eq!(CdcOperation::Insert.as_str(), "I");
398 assert_eq!(CdcOperation::UpdateBefore.as_str(), "U-");
399 assert_eq!(CdcOperation::UpdateAfter.as_str(), "U+");
400 assert_eq!(CdcOperation::Delete.as_str(), "D");
401 }
402
403 #[test]
404 fn test_cdc_operation_weight() {
405 assert_eq!(CdcOperation::Insert.weight(), 1);
406 assert_eq!(CdcOperation::UpdateAfter.weight(), 1);
407 assert_eq!(CdcOperation::Delete.weight(), -1);
408 assert_eq!(CdcOperation::UpdateBefore.weight(), -1);
409 }
410
411 #[test]
412 fn test_change_event_insert() {
413 let event = ChangeEvent::insert(
414 "db.users".to_string(),
415 1704067200000,
416 "mysql-bin.000003".to_string(),
417 12345,
418 Some("gtid-123".to_string()),
419 make_test_row(),
420 );
421
422 assert_eq!(event.table, "db.users");
423 assert_eq!(event.operation, CdcOperation::Insert);
424 assert_eq!(event.timestamp_ms, 1704067200000);
425 }
426
427 #[test]
428 fn test_insert_to_events() {
429 let msg = InsertMessage {
430 table_id: 100,
431 database: "testdb".to_string(),
432 table: "users".to_string(),
433 rows: vec![make_test_row(), make_test_row()],
434 binlog_position: 12345,
435 timestamp_ms: 1704067200000,
436 };
437
438 let events = insert_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
439 assert_eq!(events.len(), 2);
440 assert!(events.iter().all(|e| e.operation == CdcOperation::Insert));
441 }
442
443 #[test]
444 fn test_update_to_events() {
445 let msg = UpdateMessage {
446 table_id: 100,
447 database: "testdb".to_string(),
448 table: "users".to_string(),
449 rows: vec![UpdateRowData {
450 before: RowData {
451 columns: vec![ColumnValue::String("old".to_string())],
452 },
453 after: RowData {
454 columns: vec![ColumnValue::String("new".to_string())],
455 },
456 }],
457 binlog_position: 12345,
458 timestamp_ms: 1704067200000,
459 };
460
461 let events = update_to_events(&msg, "mysql-bin.000003", None);
462 assert_eq!(events.len(), 2);
463 assert_eq!(events[0].operation, CdcOperation::UpdateBefore);
464 assert_eq!(events[1].operation, CdcOperation::UpdateAfter);
465 }
466
467 #[test]
468 fn test_delete_to_events() {
469 let msg = DeleteMessage {
470 table_id: 100,
471 database: "testdb".to_string(),
472 table: "users".to_string(),
473 rows: vec![make_test_row()],
474 binlog_position: 12345,
475 timestamp_ms: 1704067200000,
476 };
477
478 let events = delete_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
479 assert_eq!(events.len(), 1);
480 assert_eq!(events[0].operation, CdcOperation::Delete);
481 }
482
483 #[test]
484 fn test_column_value_to_json() {
485 assert_eq!(
486 column_value_to_json(&ColumnValue::Null),
487 serde_json::Value::Null
488 );
489 assert_eq!(
490 column_value_to_json(&ColumnValue::SignedInt(42)),
491 serde_json::json!(42)
492 );
493 assert_eq!(
494 column_value_to_json(&ColumnValue::String("hello".to_string())),
495 serde_json::json!("hello")
496 );
497 assert_eq!(
498 column_value_to_json(&ColumnValue::Date(2024, 6, 15)),
499 serde_json::json!("2024-06-15")
500 );
501 }
502
503 #[test]
504 fn test_cdc_metadata_schema() {
505 let schema = cdc_metadata_schema();
506 assert_eq!(schema.fields().len(), 6);
507 assert_eq!(schema.field(0).name(), "_table");
508 assert_eq!(schema.field(1).name(), "_op");
509 }
510
511 #[test]
512 fn test_base64_encode() {
513 assert_eq!(base64_encode(&[]), "");
514 assert_eq!(base64_encode(b"f"), "Zg==");
515 assert_eq!(base64_encode(b"fo"), "Zm8=");
516 assert_eq!(base64_encode(b"foo"), "Zm9v");
517 assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
518 }
519}