Skip to main content

laminar_connectors/cdc/postgres/
changelog.rs

1//! Z-set changelog integration for `PostgreSQL` CDC events.
2//!
3//! Converts decoded `pgoutput` DML messages into change events compatible
4//! with `LaminarDB`'s changelog/retraction model (Z-sets).
5//!
6//! # Z-Set Semantics
7//!
8//! - INSERT → weight +1 (new row)
9//! - DELETE → weight -1 (retracted row)
10//! - UPDATE → weight -1 (old row) + weight +1 (new row)
11
12use std::sync::Arc;
13
14use arrow_array::builder::{Int64Builder, StringBuilder, UInt64Builder};
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17
18use super::decoder::{ColumnValue, TupleData};
19use super::lsn::Lsn;
20use super::schema::{cdc_envelope_schema, RelationInfo};
21
22/// A CDC operation type.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum CdcOperation {
25    /// Row inserted (Z-set weight +1).
26    Insert,
27    /// Row updated (produces -1/+1 pair).
28    Update,
29    /// Row deleted (Z-set weight -1).
30    Delete,
31}
32
33impl CdcOperation {
34    /// Returns the single-character code for the operation.
35    #[must_use]
36    pub fn as_str(self) -> &'static str {
37        match self {
38            CdcOperation::Insert => "I",
39            CdcOperation::Update => "U",
40            CdcOperation::Delete => "D",
41        }
42    }
43}
44
45/// A single change event from CDC.
46#[derive(Debug, Clone)]
47pub struct ChangeEvent {
48    /// Fully qualified table name.
49    pub table: String,
50    /// The operation type.
51    pub op: CdcOperation,
52    /// WAL position of this change.
53    pub lsn: Lsn,
54    /// Commit timestamp in milliseconds since Unix epoch.
55    pub ts_ms: i64,
56    /// Old row values as JSON (for UPDATE and DELETE).
57    pub before: Option<String>,
58    /// New row values as JSON (for INSERT and UPDATE).
59    pub after: Option<String>,
60}
61
62/// Converts tuple data to a JSON string using column names from the relation.
63///
64/// Produces a flat JSON object like `{"id": "42", "name": "Alice"}`.
65/// All values are serialized as strings (matching `pgoutput` text format).
66///
67/// Writes JSON directly to a `String` buffer instead of building an
68/// intermediate `HashMap`, avoiding per-row map + key/value cloning.
69#[must_use]
70pub fn tuple_to_json(tuple: &TupleData, relation: &RelationInfo) -> String {
71    let mut buf = String::with_capacity(128);
72    buf.push('{');
73    let mut first = true;
74    for (i, col_val) in tuple.columns.iter().enumerate() {
75        if let Some(col_info) = relation.columns.get(i) {
76            let val = match col_val {
77                ColumnValue::Text(s) => Some(s.as_str()),
78                ColumnValue::Null => None,
79                ColumnValue::Unchanged => continue,
80            };
81            if !first {
82                buf.push(',');
83            }
84            first = false;
85            // Write key (escape quotes in column names)
86            buf.push('"');
87            escape_json_str(&col_info.name, &mut buf);
88            buf.push('"');
89            buf.push(':');
90            // Write value
91            match val {
92                Some(s) => {
93                    buf.push('"');
94                    escape_json_str(s, &mut buf);
95                    buf.push('"');
96                }
97                None => buf.push_str("null"),
98            }
99        }
100    }
101    buf.push('}');
102    buf
103}
104
105/// Escapes a string for JSON output (quotes and control characters).
106fn escape_json_str(s: &str, buf: &mut String) {
107    for ch in s.chars() {
108        match ch {
109            '"' => buf.push_str("\\\""),
110            '\\' => buf.push_str("\\\\"),
111            '\n' => buf.push_str("\\n"),
112            '\r' => buf.push_str("\\r"),
113            '\t' => buf.push_str("\\t"),
114            c if c.is_control() => {
115                use std::fmt::Write;
116                let _ = write!(buf, "\\u{:04x}", c as u32);
117            }
118            c => buf.push(c),
119        }
120    }
121}
122
123/// Converts a batch of [`ChangeEvent`]s into an Arrow [`RecordBatch`]
124/// using the CDC envelope schema.
125///
126/// # Errors
127///
128/// Returns an error if the Arrow batch construction fails.
129pub fn events_to_record_batch(
130    events: &[ChangeEvent],
131) -> Result<RecordBatch, arrow_schema::ArrowError> {
132    let schema: SchemaRef = cdc_envelope_schema();
133
134    let mut table_builder = StringBuilder::with_capacity(events.len(), events.len() * 32);
135    let mut op_builder = StringBuilder::with_capacity(events.len(), events.len());
136    let mut lsn_builder = UInt64Builder::with_capacity(events.len());
137    let mut ts_builder = Int64Builder::with_capacity(events.len());
138    let mut before_builder = StringBuilder::with_capacity(events.len(), events.len() * 64);
139    let mut after_builder = StringBuilder::with_capacity(events.len(), events.len() * 64);
140
141    for event in events {
142        table_builder.append_value(&event.table);
143        op_builder.append_value(event.op.as_str());
144        lsn_builder.append_value(event.lsn.as_u64());
145        ts_builder.append_value(event.ts_ms);
146
147        match &event.before {
148            Some(json) => before_builder.append_value(json),
149            None => before_builder.append_null(),
150        }
151        match &event.after {
152            Some(json) => after_builder.append_value(json),
153            None => after_builder.append_null(),
154        }
155    }
156
157    RecordBatch::try_new(
158        schema,
159        vec![
160            Arc::new(table_builder.finish()),
161            Arc::new(op_builder.finish()),
162            Arc::new(lsn_builder.finish()),
163            Arc::new(ts_builder.finish()),
164            Arc::new(before_builder.finish()),
165            Arc::new(after_builder.finish()),
166        ],
167    )
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use crate::cdc::postgres::schema::RelationInfo;
174    use crate::cdc::postgres::types::PgColumn;
175    use crate::cdc::postgres::types::{INT8_OID, TEXT_OID};
176
177    fn sample_relation() -> RelationInfo {
178        RelationInfo {
179            relation_id: 16384,
180            namespace: "public".to_string(),
181            name: "users".to_string(),
182            replica_identity: 'd',
183            columns: vec![
184                PgColumn::new("id".to_string(), INT8_OID, -1, true),
185                PgColumn::new("name".to_string(), TEXT_OID, -1, false),
186            ],
187        }
188    }
189
190    #[test]
191    fn test_tuple_to_json() {
192        let relation = sample_relation();
193        let tuple = TupleData {
194            columns: vec![
195                ColumnValue::Text("42".to_string()),
196                ColumnValue::Text("Alice".to_string()),
197            ],
198        };
199
200        let json = tuple_to_json(&tuple, &relation);
201        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
202        assert_eq!(parsed["id"], "42");
203        assert_eq!(parsed["name"], "Alice");
204    }
205
206    #[test]
207    fn test_tuple_to_json_with_null() {
208        let relation = sample_relation();
209        let tuple = TupleData {
210            columns: vec![ColumnValue::Text("42".to_string()), ColumnValue::Null],
211        };
212
213        let json = tuple_to_json(&tuple, &relation);
214        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
215        assert_eq!(parsed["id"], "42");
216        assert!(parsed["name"].is_null());
217    }
218
219    #[test]
220    fn test_tuple_to_json_unchanged_omitted() {
221        let relation = sample_relation();
222        let tuple = TupleData {
223            columns: vec![ColumnValue::Text("42".to_string()), ColumnValue::Unchanged],
224        };
225
226        let json = tuple_to_json(&tuple, &relation);
227        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
228        assert_eq!(parsed["id"], "42");
229        // unchanged columns are omitted
230        assert!(parsed.get("name").is_none());
231    }
232
233    #[test]
234    fn test_events_to_record_batch_insert() {
235        let events = vec![ChangeEvent {
236            table: "users".to_string(),
237            op: CdcOperation::Insert,
238            lsn: Lsn::new(0x100),
239            ts_ms: 1_700_000_000_000,
240            before: None,
241            after: Some(r#"{"id":"1","name":"Alice"}"#.to_string()),
242        }];
243
244        let batch = events_to_record_batch(&events).unwrap();
245        assert_eq!(batch.num_rows(), 1);
246        assert_eq!(batch.num_columns(), 6);
247    }
248
249    #[test]
250    fn test_events_to_record_batch_mixed() {
251        let events = vec![
252            ChangeEvent {
253                table: "users".to_string(),
254                op: CdcOperation::Insert,
255                lsn: Lsn::new(0x100),
256                ts_ms: 1_700_000_000_000,
257                before: None,
258                after: Some(r#"{"id":"1"}"#.to_string()),
259            },
260            ChangeEvent {
261                table: "users".to_string(),
262                op: CdcOperation::Update,
263                lsn: Lsn::new(0x200),
264                ts_ms: 1_700_000_000_001,
265                before: Some(r#"{"id":"1","name":"Alice"}"#.to_string()),
266                after: Some(r#"{"id":"1","name":"Bob"}"#.to_string()),
267            },
268            ChangeEvent {
269                table: "users".to_string(),
270                op: CdcOperation::Delete,
271                lsn: Lsn::new(0x300),
272                ts_ms: 1_700_000_000_002,
273                before: Some(r#"{"id":"1"}"#.to_string()),
274                after: None,
275            },
276        ];
277
278        let batch = events_to_record_batch(&events).unwrap();
279        assert_eq!(batch.num_rows(), 3);
280    }
281
282    #[test]
283    fn test_events_to_record_batch_empty() {
284        let events: Vec<ChangeEvent> = vec![];
285        let batch = events_to_record_batch(&events).unwrap();
286        assert_eq!(batch.num_rows(), 0);
287        assert_eq!(batch.num_columns(), 6);
288    }
289
290    #[test]
291    fn test_cdc_operation_as_str() {
292        assert_eq!(CdcOperation::Insert.as_str(), "I");
293        assert_eq!(CdcOperation::Update.as_str(), "U");
294        assert_eq!(CdcOperation::Delete.as_str(), "D");
295    }
296}