Skip to main content

laminar_connectors/cdc/postgres/
schema.rs

1//! `PostgreSQL` relation schema cache.
2//!
3//! Caches the schema (column metadata) for each relation received in
4//! `pgoutput` Relation messages. Required because DML messages only
5//! reference relations by OID --- the schema must be looked up from this cache.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11
12use super::types::PgColumn;
13
14/// Cached information about a `PostgreSQL` relation (table).
15#[derive(Debug, Clone)]
16pub struct RelationInfo {
17    /// The relation OID from `pgoutput`.
18    pub relation_id: u32,
19
20    /// Schema (namespace) name.
21    pub namespace: String,
22
23    /// Table name.
24    pub name: String,
25
26    /// Replica identity setting: 'd' (default), 'n' (nothing),
27    /// 'f' (full), 'i' (index).
28    pub replica_identity: char,
29
30    /// Column descriptors in ordinal order.
31    pub columns: Vec<PgColumn>,
32}
33
34impl RelationInfo {
35    /// Returns the fully qualified table name: `namespace.name`.
36    #[must_use]
37    pub fn full_name(&self) -> String {
38        if self.namespace.is_empty() || self.namespace == "public" {
39            self.name.clone()
40        } else {
41            format!("{}.{}", self.namespace, self.name)
42        }
43    }
44
45    /// Generates an Arrow schema from the relation's columns.
46    #[must_use]
47    pub fn arrow_schema(&self) -> SchemaRef {
48        let fields: Vec<Field> = self
49            .columns
50            .iter()
51            .map(|col| {
52                // All CDC columns are nullable since DELETE may have
53                // unchanged TOAST values or partial replica identity.
54                Field::new(&col.name, col.arrow_type(), true)
55            })
56            .collect();
57        Arc::new(Schema::new(fields))
58    }
59
60    /// Returns the key column names based on replica identity.
61    #[must_use]
62    pub fn key_columns(&self) -> Vec<&str> {
63        self.columns
64            .iter()
65            .filter(|c| c.is_key)
66            .map(|c| c.name.as_str())
67            .collect()
68    }
69}
70
71/// Cache of relation schemas received from `pgoutput` Relation messages.
72///
73/// The decoder populates this cache as it encounters Relation messages.
74/// DML decoders look up column metadata by relation ID.
75#[derive(Debug, Clone, Default)]
76pub struct RelationCache {
77    relations: HashMap<u32, RelationInfo>,
78}
79
80impl RelationCache {
81    /// Creates an empty relation cache.
82    #[must_use]
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Adds or replaces a relation in the cache.
88    pub fn insert(&mut self, info: RelationInfo) {
89        self.relations.insert(info.relation_id, info);
90    }
91
92    /// Looks up a relation by its OID.
93    #[must_use]
94    pub fn get(&self, relation_id: u32) -> Option<&RelationInfo> {
95        self.relations.get(&relation_id)
96    }
97
98    /// Returns the number of cached relations.
99    #[must_use]
100    pub fn len(&self) -> usize {
101        self.relations.len()
102    }
103
104    /// Returns `true` if no relations are cached.
105    #[must_use]
106    pub fn is_empty(&self) -> bool {
107        self.relations.is_empty()
108    }
109
110    /// Clears the cache.
111    pub fn clear(&mut self) {
112        self.relations.clear();
113    }
114}
115
116/// Builds the CDC envelope schema used by [`PostgresCdcSource`](super::source::PostgresCdcSource).
117///
118/// This schema wraps change events in a uniform envelope with metadata
119/// columns, making it compatible with any table structure.
120#[must_use]
121pub fn cdc_envelope_schema() -> SchemaRef {
122    Arc::new(Schema::new(vec![
123        Field::new("_table", DataType::Utf8, false),
124        Field::new("_op", DataType::Utf8, false),
125        Field::new("_lsn", DataType::UInt64, false),
126        Field::new("_ts_ms", DataType::Int64, false),
127        Field::new("_before", DataType::Utf8, true),
128        Field::new("_after", DataType::Utf8, true),
129    ]))
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
136
137    fn sample_relation() -> RelationInfo {
138        RelationInfo {
139            relation_id: 16384,
140            namespace: "public".to_string(),
141            name: "users".to_string(),
142            replica_identity: 'd',
143            columns: vec![
144                PgColumn::new("id".to_string(), INT8_OID, -1, true),
145                PgColumn::new("name".to_string(), TEXT_OID, -1, false),
146                PgColumn::new("age".to_string(), INT4_OID, -1, false),
147            ],
148        }
149    }
150
151    #[test]
152    fn test_relation_full_name_public() {
153        let rel = sample_relation();
154        assert_eq!(rel.full_name(), "users");
155    }
156
157    #[test]
158    fn test_relation_full_name_custom_schema() {
159        let mut rel = sample_relation();
160        rel.namespace = "app".to_string();
161        assert_eq!(rel.full_name(), "app.users");
162    }
163
164    #[test]
165    fn test_arrow_schema_generation() {
166        let rel = sample_relation();
167        let schema = rel.arrow_schema();
168        assert_eq!(schema.fields().len(), 3);
169        assert_eq!(schema.field(0).name(), "id");
170        assert_eq!(*schema.field(0).data_type(), DataType::Int64);
171        assert_eq!(schema.field(1).name(), "name");
172        assert_eq!(*schema.field(1).data_type(), DataType::Utf8);
173    }
174
175    #[test]
176    fn test_key_columns() {
177        let rel = sample_relation();
178        let keys = rel.key_columns();
179        assert_eq!(keys, vec!["id"]);
180    }
181
182    #[test]
183    fn test_relation_cache() {
184        let mut cache = RelationCache::new();
185        assert!(cache.is_empty());
186
187        cache.insert(sample_relation());
188        assert_eq!(cache.len(), 1);
189        assert!(cache.get(16384).is_some());
190        assert!(cache.get(99999).is_none());
191    }
192
193    #[test]
194    fn test_cache_replace() {
195        let mut cache = RelationCache::new();
196        cache.insert(sample_relation());
197
198        let mut updated = sample_relation();
199        updated
200            .columns
201            .push(PgColumn::new("email".to_string(), TEXT_OID, -1, false));
202        cache.insert(updated);
203
204        assert_eq!(cache.len(), 1);
205        assert_eq!(cache.get(16384).unwrap().columns.len(), 4);
206    }
207
208    #[test]
209    fn test_cache_clear() {
210        let mut cache = RelationCache::new();
211        cache.insert(sample_relation());
212        cache.clear();
213        assert!(cache.is_empty());
214    }
215
216    #[test]
217    fn test_cdc_envelope_schema() {
218        let schema = cdc_envelope_schema();
219        assert_eq!(schema.fields().len(), 6);
220        assert_eq!(schema.field(0).name(), "_table");
221        assert_eq!(schema.field(1).name(), "_op");
222        assert_eq!(schema.field(2).name(), "_lsn");
223        assert_eq!(schema.field(3).name(), "_ts_ms");
224        assert_eq!(schema.field(4).name(), "_before");
225        assert_eq!(schema.field(5).name(), "_after");
226        // _before and _after are nullable
227        assert!(schema.field(4).is_nullable());
228        assert!(schema.field(5).is_nullable());
229    }
230}