Skip to main content

laminar_connectors/cdc/postgres/
schema.rs

1//! `PostgreSQL` relation schema cache.
2//!
3//! Caches the schema (column metadata) for each relation received in
4//! `pgoutput` Relation messages. Required because DML messages only
5//! reference relations by OID --- the schema must be looked up from this cache.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11
12use super::types::PgColumn;
13
14/// Cached information about a `PostgreSQL` relation (table).
15#[derive(Debug, Clone)]
16pub struct RelationInfo {
17    /// The relation OID from `pgoutput`.
18    pub relation_id: u32,
19
20    /// Schema (namespace) name.
21    pub namespace: String,
22
23    /// Table name.
24    pub name: String,
25
26    /// Replica identity setting: 'd' (default), 'n' (nothing),
27    /// 'f' (full), 'i' (index).
28    pub replica_identity: char,
29
30    /// Column descriptors in ordinal order.
31    pub columns: Vec<PgColumn>,
32}
33
34impl RelationInfo {
35    /// Returns the fully qualified table name: `namespace.name`.
36    #[must_use]
37    pub fn full_name(&self) -> String {
38        if self.namespace.is_empty() || self.namespace == "public" {
39            self.name.clone()
40        } else {
41            format!("{}.{}", self.namespace, self.name)
42        }
43    }
44
45    /// Generates an Arrow schema from the relation's columns.
46    #[must_use]
47    pub fn arrow_schema(&self) -> SchemaRef {
48        let fields: Vec<Field> = self
49            .columns
50            .iter()
51            .map(|col| {
52                // All CDC columns are nullable since DELETE may have
53                // unchanged TOAST values or partial replica identity.
54                Field::new(&col.name, col.arrow_type(), true)
55            })
56            .collect();
57        Arc::new(Schema::new(fields))
58    }
59
60    /// Returns the key column names based on replica identity.
61    #[must_use]
62    pub fn key_columns(&self) -> Vec<&str> {
63        self.columns
64            .iter()
65            .filter(|c| c.is_key)
66            .map(|c| c.name.as_str())
67            .collect()
68    }
69}
70
71/// Cache of relation schemas received from `pgoutput` Relation messages.
72///
73/// The decoder populates this cache as it encounters Relation messages.
74/// DML decoders look up column metadata by relation ID.
75#[derive(Debug, Clone, Default)]
76pub struct RelationCache {
77    relations: HashMap<u32, RelationInfo>,
78}
79
80impl RelationCache {
81    /// Creates an empty relation cache.
82    #[must_use]
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Adds or replaces a relation in the cache.
88    pub fn insert(&mut self, info: RelationInfo) {
89        self.relations.insert(info.relation_id, info);
90    }
91
92    /// Looks up a relation by its OID.
93    #[must_use]
94    pub fn get(&self, relation_id: u32) -> Option<&RelationInfo> {
95        self.relations.get(&relation_id)
96    }
97
98    /// Returns the number of cached relations.
99    #[must_use]
100    pub fn len(&self) -> usize {
101        self.relations.len()
102    }
103
104    /// Returns `true` if no relations are cached.
105    #[must_use]
106    pub fn is_empty(&self) -> bool {
107        self.relations.is_empty()
108    }
109
110    /// Clears the cache.
111    pub fn clear(&mut self) {
112        self.relations.clear();
113    }
114}
115
116/// Builds the CDC envelope schema used by [`PostgresCdcSource`](super::source::PostgresCdcSource).
117///
118/// This schema wraps change events in a uniform envelope with metadata
119/// columns, making it compatible with any table structure.
120#[must_use]
121pub fn cdc_envelope_schema() -> SchemaRef {
122    use arrow_schema::TimeUnit;
123    Arc::new(Schema::new(vec![
124        Field::new("_table", DataType::Utf8, false),
125        Field::new("_op", DataType::Utf8, false),
126        Field::new("_lsn", DataType::UInt64, false),
127        Field::new(
128            "_ts_ms",
129            DataType::Timestamp(TimeUnit::Millisecond, None),
130            false,
131        ),
132        Field::new("_before", DataType::Utf8, true),
133        Field::new("_after", DataType::Utf8, true),
134    ]))
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
141
142    fn sample_relation() -> RelationInfo {
143        RelationInfo {
144            relation_id: 16384,
145            namespace: "public".to_string(),
146            name: "users".to_string(),
147            replica_identity: 'd',
148            columns: vec![
149                PgColumn::new("id".to_string(), INT8_OID, -1, true),
150                PgColumn::new("name".to_string(), TEXT_OID, -1, false),
151                PgColumn::new("age".to_string(), INT4_OID, -1, false),
152            ],
153        }
154    }
155
156    #[test]
157    fn test_relation_full_name_public() {
158        let rel = sample_relation();
159        assert_eq!(rel.full_name(), "users");
160    }
161
162    #[test]
163    fn test_relation_full_name_custom_schema() {
164        let mut rel = sample_relation();
165        rel.namespace = "app".to_string();
166        assert_eq!(rel.full_name(), "app.users");
167    }
168
169    #[test]
170    fn test_arrow_schema_generation() {
171        let rel = sample_relation();
172        let schema = rel.arrow_schema();
173        assert_eq!(schema.fields().len(), 3);
174        assert_eq!(schema.field(0).name(), "id");
175        assert_eq!(*schema.field(0).data_type(), DataType::Int64);
176        assert_eq!(schema.field(1).name(), "name");
177        assert_eq!(*schema.field(1).data_type(), DataType::Utf8);
178    }
179
180    #[test]
181    fn test_key_columns() {
182        let rel = sample_relation();
183        let keys = rel.key_columns();
184        assert_eq!(keys, vec!["id"]);
185    }
186
187    #[test]
188    fn test_relation_cache() {
189        let mut cache = RelationCache::new();
190        assert!(cache.is_empty());
191
192        cache.insert(sample_relation());
193        assert_eq!(cache.len(), 1);
194        assert!(cache.get(16384).is_some());
195        assert!(cache.get(99999).is_none());
196    }
197
198    #[test]
199    fn test_cache_replace() {
200        let mut cache = RelationCache::new();
201        cache.insert(sample_relation());
202
203        let mut updated = sample_relation();
204        updated
205            .columns
206            .push(PgColumn::new("email".to_string(), TEXT_OID, -1, false));
207        cache.insert(updated);
208
209        assert_eq!(cache.len(), 1);
210        assert_eq!(cache.get(16384).unwrap().columns.len(), 4);
211    }
212
213    #[test]
214    fn test_cache_clear() {
215        let mut cache = RelationCache::new();
216        cache.insert(sample_relation());
217        cache.clear();
218        assert!(cache.is_empty());
219    }
220
221    #[test]
222    fn test_cdc_envelope_schema() {
223        let schema = cdc_envelope_schema();
224        assert_eq!(schema.fields().len(), 6);
225        assert_eq!(schema.field(0).name(), "_table");
226        assert_eq!(schema.field(1).name(), "_op");
227        assert_eq!(schema.field(2).name(), "_lsn");
228        assert_eq!(schema.field(3).name(), "_ts_ms");
229        assert_eq!(schema.field(4).name(), "_before");
230        assert_eq!(schema.field(5).name(), "_after");
231        // _before and _after are nullable
232        assert!(schema.field(4).is_nullable());
233        assert!(schema.field(5).is_nullable());
234    }
235}