1use std::sync::Arc;
7
8use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
9use arrow_schema::{DataType, Field, Schema};
10
11use super::decoder::{
12 ColumnValue, DeleteMessage, InsertMessage, RowData, UpdateMessage, UpdateRowData,
13};
14use super::schema::TableInfo;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CdcOperation {
19 Insert,
21 UpdateBefore,
23 UpdateAfter,
25 Delete,
27}
28
29impl CdcOperation {
30 #[must_use]
32 pub fn as_str(&self) -> &'static str {
33 match self {
34 CdcOperation::Insert => "I",
35 CdcOperation::UpdateBefore => "U-",
36 CdcOperation::UpdateAfter => "U+",
37 CdcOperation::Delete => "D",
38 }
39 }
40
41 #[must_use]
43 pub fn weight(&self) -> i8 {
44 match self {
45 CdcOperation::Insert | CdcOperation::UpdateAfter => 1,
46 CdcOperation::Delete | CdcOperation::UpdateBefore => -1,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct ChangeEvent {
54 pub table: String,
56 pub operation: CdcOperation,
58 pub timestamp_ms: i64,
60 pub binlog_file: String,
62 pub binlog_position: u64,
64 pub gtid: Option<String>,
66 pub row: RowData,
68}
69
70impl ChangeEvent {
71 #[must_use]
73 pub fn insert(
74 table: String,
75 timestamp_ms: i64,
76 binlog_file: String,
77 binlog_position: u64,
78 gtid: Option<String>,
79 row: RowData,
80 ) -> Self {
81 Self {
82 table,
83 operation: CdcOperation::Insert,
84 timestamp_ms,
85 binlog_file,
86 binlog_position,
87 gtid,
88 row,
89 }
90 }
91
92 #[must_use]
94 pub fn delete(
95 table: String,
96 timestamp_ms: i64,
97 binlog_file: String,
98 binlog_position: u64,
99 gtid: Option<String>,
100 row: RowData,
101 ) -> Self {
102 Self {
103 table,
104 operation: CdcOperation::Delete,
105 timestamp_ms,
106 binlog_file,
107 binlog_position,
108 gtid,
109 row,
110 }
111 }
112}
113
114#[must_use]
116pub fn insert_to_events(
117 msg: &InsertMessage,
118 binlog_file: &str,
119 gtid: Option<&str>,
120) -> Vec<ChangeEvent> {
121 let table = format!("{}.{}", msg.database, msg.table);
122
123 msg.rows
124 .iter()
125 .map(|row| {
126 ChangeEvent::insert(
127 table.clone(),
128 msg.timestamp_ms,
129 binlog_file.to_string(),
130 msg.binlog_position,
131 gtid.map(String::from),
132 row.clone(),
133 )
134 })
135 .collect()
136}
137
138#[must_use]
144pub fn update_to_events(
145 msg: &UpdateMessage,
146 binlog_file: &str,
147 gtid: Option<&str>,
148) -> Vec<ChangeEvent> {
149 let table = format!("{}.{}", msg.database, msg.table);
150 let mut events = Vec::with_capacity(msg.rows.len() * 2);
151
152 for UpdateRowData { before, after } in &msg.rows {
153 events.push(ChangeEvent {
155 table: table.clone(),
156 operation: CdcOperation::UpdateBefore,
157 timestamp_ms: msg.timestamp_ms,
158 binlog_file: binlog_file.to_string(),
159 binlog_position: msg.binlog_position,
160 gtid: gtid.map(String::from),
161 row: before.clone(),
162 });
163
164 events.push(ChangeEvent {
166 table: table.clone(),
167 operation: CdcOperation::UpdateAfter,
168 timestamp_ms: msg.timestamp_ms,
169 binlog_file: binlog_file.to_string(),
170 binlog_position: msg.binlog_position,
171 gtid: gtid.map(String::from),
172 row: after.clone(),
173 });
174 }
175
176 events
177}
178
179#[must_use]
181pub fn delete_to_events(
182 msg: &DeleteMessage,
183 binlog_file: &str,
184 gtid: Option<&str>,
185) -> Vec<ChangeEvent> {
186 let table = format!("{}.{}", msg.database, msg.table);
187
188 msg.rows
189 .iter()
190 .map(|row| {
191 ChangeEvent::delete(
192 table.clone(),
193 msg.timestamp_ms,
194 binlog_file.to_string(),
195 msg.binlog_position,
196 gtid.map(String::from),
197 row.clone(),
198 )
199 })
200 .collect()
201}
202
203#[must_use]
205pub fn cdc_metadata_schema() -> Schema {
206 use arrow_schema::TimeUnit;
207 Schema::new(vec![
208 Field::new("_table", DataType::Utf8, false),
209 Field::new("_op", DataType::Utf8, false),
210 Field::new(
211 "_ts_ms",
212 DataType::Timestamp(TimeUnit::Millisecond, None),
213 false,
214 ),
215 Field::new("_binlog_file", DataType::Utf8, true),
216 Field::new("_binlog_pos", DataType::UInt64, true),
217 Field::new("_gtid", DataType::Utf8, true),
218 ])
219}
220
221pub fn events_to_record_batch(
227 events: &[ChangeEvent],
228 table_info: &TableInfo,
229) -> Result<RecordBatch, arrow_schema::ArrowError> {
230 if events.is_empty() {
231 let schema = build_cdc_schema(&table_info.arrow_schema);
233 return Ok(RecordBatch::new_empty(Arc::new(schema)));
234 }
235
236 let _n = events.len();
237
238 let tables: Vec<&str> = events.iter().map(|e| e.table.as_str()).collect();
240 let ops: Vec<&str> = events.iter().map(|e| e.operation.as_str()).collect();
241 let timestamps: Vec<i64> = events.iter().map(|e| e.timestamp_ms).collect();
242 let binlog_files: Vec<Option<&str>> = events
243 .iter()
244 .map(|e| Some(e.binlog_file.as_str()))
245 .collect();
246 let binlog_positions: Vec<Option<u64>> =
247 events.iter().map(|e| Some(e.binlog_position)).collect();
248 let gtids: Vec<Option<&str>> = events.iter().map(|e| e.gtid.as_deref()).collect();
249
250 let mut columns: Vec<ArrayRef> = vec![
251 Arc::new(StringArray::from(tables)),
252 Arc::new(StringArray::from(ops)),
253 Arc::new(arrow_array::TimestampMillisecondArray::from(timestamps)),
254 Arc::new(StringArray::from(binlog_files)),
255 Arc::new(UInt64Array::from(binlog_positions)),
256 Arc::new(StringArray::from(gtids)),
257 ];
258
259 for (col_idx, _col_def) in table_info.columns.iter().enumerate() {
261 let values: Vec<Option<String>> = events
262 .iter()
263 .map(|e| {
264 e.row.columns.get(col_idx).and_then(|v| {
265 if v.is_null() {
266 None
267 } else {
268 Some(v.to_text())
269 }
270 })
271 })
272 .collect();
273
274 let array: ArrayRef = Arc::new(StringArray::from(
277 values.iter().map(|v| v.as_deref()).collect::<Vec<_>>(),
278 ));
279 columns.push(array);
280 }
281
282 let schema = build_cdc_schema(&table_info.arrow_schema);
283 RecordBatch::try_new(Arc::new(schema), columns)
284}
285
286fn build_cdc_schema(row_schema: &Schema) -> Schema {
288 use arrow_schema::TimeUnit;
289 let mut fields = vec![
290 Field::new("_table", DataType::Utf8, false),
291 Field::new("_op", DataType::Utf8, false),
292 Field::new(
293 "_ts_ms",
294 DataType::Timestamp(TimeUnit::Millisecond, None),
295 false,
296 ),
297 Field::new("_binlog_file", DataType::Utf8, true),
298 Field::new("_binlog_pos", DataType::UInt64, true),
299 Field::new("_gtid", DataType::Utf8, true),
300 ];
301
302 for field in row_schema.fields() {
304 fields.push(Field::new(field.name(), DataType::Utf8, true));
305 }
306
307 Schema::new(fields)
308}
309
310#[must_use]
312pub fn column_value_to_json(value: &ColumnValue) -> serde_json::Value {
313 match value {
314 ColumnValue::Null => serde_json::Value::Null,
315 ColumnValue::SignedInt(v) => serde_json::json!(v),
316 ColumnValue::UnsignedInt(v) => serde_json::json!(v),
317 ColumnValue::Float(v) => serde_json::json!(v),
318 ColumnValue::Double(v) => serde_json::json!(v),
319 ColumnValue::String(s) => serde_json::json!(s),
320 ColumnValue::Bytes(b) => serde_json::json!(base64_encode(b)),
321 ColumnValue::Date(y, m, d) => serde_json::json!(format!("{y:04}-{m:02}-{d:02}")),
322 ColumnValue::Time(h, m, s, us) => {
323 if *us > 0 {
324 serde_json::json!(format!("{h:02}:{m:02}:{s:02}.{us:06}"))
325 } else {
326 serde_json::json!(format!("{h:02}:{m:02}:{s:02}"))
327 }
328 }
329 ColumnValue::DateTime(y, mo, d, h, mi, s, us) => {
330 if *us > 0 {
331 serde_json::json!(format!(
332 "{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}.{us:06}"
333 ))
334 } else {
335 serde_json::json!(format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}"))
336 }
337 }
338 ColumnValue::Timestamp(us) => serde_json::json!(us),
339 ColumnValue::Json(s) => serde_json::from_str(s).unwrap_or_else(|e| {
340 tracing::warn!(
341 error = %e,
342 json_len = s.len(),
343 "[LDB-4003] MySQL CDC JSON column parse failed, treating as string literal"
344 );
345 serde_json::json!(s)
346 }),
347 }
348}
349
350#[must_use]
352pub fn row_to_json(row: &RowData, columns: &[super::types::MySqlColumn]) -> serde_json::Value {
353 let mut obj = serde_json::Map::new();
354 for (i, col) in columns.iter().enumerate() {
355 if let Some(value) = row.columns.get(i) {
356 obj.insert(col.name.clone(), column_value_to_json(value));
357 }
358 }
359 serde_json::Value::Object(obj)
360}
361
362fn base64_encode(data: &[u8]) -> String {
364 use std::fmt::Write;
365 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
366
367 let mut result = String::new();
368 for chunk in data.chunks(3) {
369 let b0 = chunk[0] as usize;
370 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
371 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
372
373 let _ = result.write_char(ALPHABET[b0 >> 2] as char);
374 let _ = result.write_char(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
375
376 if chunk.len() > 1 {
377 let _ = result.write_char(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
378 } else {
379 result.push('=');
380 }
381
382 if chunk.len() > 2 {
383 let _ = result.write_char(ALPHABET[b2 & 0x3f] as char);
384 } else {
385 result.push('=');
386 }
387 }
388 result
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 fn make_test_row() -> RowData {
396 RowData {
397 columns: vec![
398 ColumnValue::SignedInt(1),
399 ColumnValue::String("Alice".to_string()),
400 ColumnValue::Null,
401 ],
402 }
403 }
404
405 #[test]
406 fn test_cdc_operation_as_str() {
407 assert_eq!(CdcOperation::Insert.as_str(), "I");
408 assert_eq!(CdcOperation::UpdateBefore.as_str(), "U-");
409 assert_eq!(CdcOperation::UpdateAfter.as_str(), "U+");
410 assert_eq!(CdcOperation::Delete.as_str(), "D");
411 }
412
413 #[test]
414 fn test_cdc_operation_weight() {
415 assert_eq!(CdcOperation::Insert.weight(), 1);
416 assert_eq!(CdcOperation::UpdateAfter.weight(), 1);
417 assert_eq!(CdcOperation::Delete.weight(), -1);
418 assert_eq!(CdcOperation::UpdateBefore.weight(), -1);
419 }
420
421 #[test]
422 fn test_change_event_insert() {
423 let event = ChangeEvent::insert(
424 "db.users".to_string(),
425 1704067200000,
426 "mysql-bin.000003".to_string(),
427 12345,
428 Some("gtid-123".to_string()),
429 make_test_row(),
430 );
431
432 assert_eq!(event.table, "db.users");
433 assert_eq!(event.operation, CdcOperation::Insert);
434 assert_eq!(event.timestamp_ms, 1704067200000);
435 }
436
437 #[test]
438 fn test_insert_to_events() {
439 let msg = InsertMessage {
440 table_id: 100,
441 database: "testdb".to_string(),
442 table: "users".to_string(),
443 rows: vec![make_test_row(), make_test_row()],
444 binlog_position: 12345,
445 timestamp_ms: 1704067200000,
446 };
447
448 let events = insert_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
449 assert_eq!(events.len(), 2);
450 assert!(events.iter().all(|e| e.operation == CdcOperation::Insert));
451 }
452
453 #[test]
454 fn test_update_to_events() {
455 let msg = UpdateMessage {
456 table_id: 100,
457 database: "testdb".to_string(),
458 table: "users".to_string(),
459 rows: vec![UpdateRowData {
460 before: RowData {
461 columns: vec![ColumnValue::String("old".to_string())],
462 },
463 after: RowData {
464 columns: vec![ColumnValue::String("new".to_string())],
465 },
466 }],
467 binlog_position: 12345,
468 timestamp_ms: 1704067200000,
469 };
470
471 let events = update_to_events(&msg, "mysql-bin.000003", None);
472 assert_eq!(events.len(), 2);
473 assert_eq!(events[0].operation, CdcOperation::UpdateBefore);
474 assert_eq!(events[1].operation, CdcOperation::UpdateAfter);
475 }
476
477 #[test]
478 fn test_delete_to_events() {
479 let msg = DeleteMessage {
480 table_id: 100,
481 database: "testdb".to_string(),
482 table: "users".to_string(),
483 rows: vec![make_test_row()],
484 binlog_position: 12345,
485 timestamp_ms: 1704067200000,
486 };
487
488 let events = delete_to_events(&msg, "mysql-bin.000003", Some("gtid-1"));
489 assert_eq!(events.len(), 1);
490 assert_eq!(events[0].operation, CdcOperation::Delete);
491 }
492
493 #[test]
494 fn test_column_value_to_json() {
495 assert_eq!(
496 column_value_to_json(&ColumnValue::Null),
497 serde_json::Value::Null
498 );
499 assert_eq!(
500 column_value_to_json(&ColumnValue::SignedInt(42)),
501 serde_json::json!(42)
502 );
503 assert_eq!(
504 column_value_to_json(&ColumnValue::String("hello".to_string())),
505 serde_json::json!("hello")
506 );
507 assert_eq!(
508 column_value_to_json(&ColumnValue::Date(2024, 6, 15)),
509 serde_json::json!("2024-06-15")
510 );
511 }
512
513 #[test]
514 fn test_cdc_metadata_schema() {
515 let schema = cdc_metadata_schema();
516 assert_eq!(schema.fields().len(), 6);
517 assert_eq!(schema.field(0).name(), "_table");
518 assert_eq!(schema.field(1).name(), "_op");
519 }
520
521 #[test]
522 fn test_base64_encode() {
523 assert_eq!(base64_encode(&[]), "");
524 assert_eq!(base64_encode(b"f"), "Zg==");
525 assert_eq!(base64_encode(b"fo"), "Zm8=");
526 assert_eq!(base64_encode(b"foo"), "Zm9v");
527 assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
528 }
529}