laminar_connectors/cdc/postgres/
changelog.rs1use std::sync::Arc;
13
14use arrow_array::builder::{Int64Builder, StringBuilder, UInt64Builder};
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17
18use super::decoder::{ColumnValue, TupleData};
19use super::lsn::Lsn;
20use super::schema::{cdc_envelope_schema, RelationInfo};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum CdcOperation {
25 Insert,
27 Update,
29 Delete,
31}
32
33impl CdcOperation {
34 #[must_use]
36 pub fn as_str(self) -> &'static str {
37 match self {
38 CdcOperation::Insert => "I",
39 CdcOperation::Update => "U",
40 CdcOperation::Delete => "D",
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct ChangeEvent {
48 pub table: String,
50 pub op: CdcOperation,
52 pub lsn: Lsn,
54 pub ts_ms: i64,
56 pub before: Option<String>,
58 pub after: Option<String>,
60}
61
62#[must_use]
70pub fn tuple_to_json(tuple: &TupleData, relation: &RelationInfo) -> String {
71 let mut buf = String::with_capacity(128);
72 buf.push('{');
73 let mut first = true;
74 for (i, col_val) in tuple.columns.iter().enumerate() {
75 if let Some(col_info) = relation.columns.get(i) {
76 let val = match col_val {
77 ColumnValue::Text(s) => Some(s.as_str()),
78 ColumnValue::Null => None,
79 ColumnValue::Unchanged => continue,
80 };
81 if !first {
82 buf.push(',');
83 }
84 first = false;
85 buf.push('"');
87 escape_json_str(&col_info.name, &mut buf);
88 buf.push('"');
89 buf.push(':');
90 match val {
92 Some(s) => {
93 buf.push('"');
94 escape_json_str(s, &mut buf);
95 buf.push('"');
96 }
97 None => buf.push_str("null"),
98 }
99 }
100 }
101 buf.push('}');
102 buf
103}
104
105fn escape_json_str(s: &str, buf: &mut String) {
107 for ch in s.chars() {
108 match ch {
109 '"' => buf.push_str("\\\""),
110 '\\' => buf.push_str("\\\\"),
111 '\n' => buf.push_str("\\n"),
112 '\r' => buf.push_str("\\r"),
113 '\t' => buf.push_str("\\t"),
114 c if c.is_control() => {
115 use std::fmt::Write;
116 let _ = write!(buf, "\\u{:04x}", c as u32);
117 }
118 c => buf.push(c),
119 }
120 }
121}
122
123pub fn events_to_record_batch(
130 events: &[ChangeEvent],
131) -> Result<RecordBatch, arrow_schema::ArrowError> {
132 let schema: SchemaRef = cdc_envelope_schema();
133
134 let mut table_builder = StringBuilder::with_capacity(events.len(), events.len() * 32);
135 let mut op_builder = StringBuilder::with_capacity(events.len(), events.len());
136 let mut lsn_builder = UInt64Builder::with_capacity(events.len());
137 let mut ts_builder = Int64Builder::with_capacity(events.len());
138 let mut before_builder = StringBuilder::with_capacity(events.len(), events.len() * 64);
139 let mut after_builder = StringBuilder::with_capacity(events.len(), events.len() * 64);
140
141 for event in events {
142 table_builder.append_value(&event.table);
143 op_builder.append_value(event.op.as_str());
144 lsn_builder.append_value(event.lsn.as_u64());
145 ts_builder.append_value(event.ts_ms);
146
147 match &event.before {
148 Some(json) => before_builder.append_value(json),
149 None => before_builder.append_null(),
150 }
151 match &event.after {
152 Some(json) => after_builder.append_value(json),
153 None => after_builder.append_null(),
154 }
155 }
156
157 RecordBatch::try_new(
158 schema,
159 vec![
160 Arc::new(table_builder.finish()),
161 Arc::new(op_builder.finish()),
162 Arc::new(lsn_builder.finish()),
163 Arc::new(ts_builder.finish()),
164 Arc::new(before_builder.finish()),
165 Arc::new(after_builder.finish()),
166 ],
167 )
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use crate::cdc::postgres::schema::RelationInfo;
174 use crate::cdc::postgres::types::PgColumn;
175 use crate::cdc::postgres::types::{INT8_OID, TEXT_OID};
176
177 fn sample_relation() -> RelationInfo {
178 RelationInfo {
179 relation_id: 16384,
180 namespace: "public".to_string(),
181 name: "users".to_string(),
182 replica_identity: 'd',
183 columns: vec![
184 PgColumn::new("id".to_string(), INT8_OID, -1, true),
185 PgColumn::new("name".to_string(), TEXT_OID, -1, false),
186 ],
187 }
188 }
189
190 #[test]
191 fn test_tuple_to_json() {
192 let relation = sample_relation();
193 let tuple = TupleData {
194 columns: vec![
195 ColumnValue::Text("42".to_string()),
196 ColumnValue::Text("Alice".to_string()),
197 ],
198 };
199
200 let json = tuple_to_json(&tuple, &relation);
201 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
202 assert_eq!(parsed["id"], "42");
203 assert_eq!(parsed["name"], "Alice");
204 }
205
206 #[test]
207 fn test_tuple_to_json_with_null() {
208 let relation = sample_relation();
209 let tuple = TupleData {
210 columns: vec![ColumnValue::Text("42".to_string()), ColumnValue::Null],
211 };
212
213 let json = tuple_to_json(&tuple, &relation);
214 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
215 assert_eq!(parsed["id"], "42");
216 assert!(parsed["name"].is_null());
217 }
218
219 #[test]
220 fn test_tuple_to_json_unchanged_omitted() {
221 let relation = sample_relation();
222 let tuple = TupleData {
223 columns: vec![ColumnValue::Text("42".to_string()), ColumnValue::Unchanged],
224 };
225
226 let json = tuple_to_json(&tuple, &relation);
227 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
228 assert_eq!(parsed["id"], "42");
229 assert!(parsed.get("name").is_none());
231 }
232
233 #[test]
234 fn test_events_to_record_batch_insert() {
235 let events = vec![ChangeEvent {
236 table: "users".to_string(),
237 op: CdcOperation::Insert,
238 lsn: Lsn::new(0x100),
239 ts_ms: 1_700_000_000_000,
240 before: None,
241 after: Some(r#"{"id":"1","name":"Alice"}"#.to_string()),
242 }];
243
244 let batch = events_to_record_batch(&events).unwrap();
245 assert_eq!(batch.num_rows(), 1);
246 assert_eq!(batch.num_columns(), 6);
247 }
248
249 #[test]
250 fn test_events_to_record_batch_mixed() {
251 let events = vec![
252 ChangeEvent {
253 table: "users".to_string(),
254 op: CdcOperation::Insert,
255 lsn: Lsn::new(0x100),
256 ts_ms: 1_700_000_000_000,
257 before: None,
258 after: Some(r#"{"id":"1"}"#.to_string()),
259 },
260 ChangeEvent {
261 table: "users".to_string(),
262 op: CdcOperation::Update,
263 lsn: Lsn::new(0x200),
264 ts_ms: 1_700_000_000_001,
265 before: Some(r#"{"id":"1","name":"Alice"}"#.to_string()),
266 after: Some(r#"{"id":"1","name":"Bob"}"#.to_string()),
267 },
268 ChangeEvent {
269 table: "users".to_string(),
270 op: CdcOperation::Delete,
271 lsn: Lsn::new(0x300),
272 ts_ms: 1_700_000_000_002,
273 before: Some(r#"{"id":"1"}"#.to_string()),
274 after: None,
275 },
276 ];
277
278 let batch = events_to_record_batch(&events).unwrap();
279 assert_eq!(batch.num_rows(), 3);
280 }
281
282 #[test]
283 fn test_events_to_record_batch_empty() {
284 let events: Vec<ChangeEvent> = vec![];
285 let batch = events_to_record_batch(&events).unwrap();
286 assert_eq!(batch.num_rows(), 0);
287 assert_eq!(batch.num_columns(), 6);
288 }
289
290 #[test]
291 fn test_cdc_operation_as_str() {
292 assert_eq!(CdcOperation::Insert.as_str(), "I");
293 assert_eq!(CdcOperation::Update.as_str(), "U");
294 assert_eq!(CdcOperation::Delete.as_str(), "D");
295 }
296}