laminar_connectors/cdc/postgres/
schema.rs1use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11
12use super::types::PgColumn;
13
14#[derive(Debug, Clone)]
16pub struct RelationInfo {
17 pub relation_id: u32,
19
20 pub namespace: String,
22
23 pub name: String,
25
26 pub replica_identity: char,
29
30 pub columns: Vec<PgColumn>,
32}
33
34impl RelationInfo {
35 #[must_use]
37 pub fn full_name(&self) -> String {
38 if self.namespace.is_empty() || self.namespace == "public" {
39 self.name.clone()
40 } else {
41 format!("{}.{}", self.namespace, self.name)
42 }
43 }
44
45 #[must_use]
47 pub fn arrow_schema(&self) -> SchemaRef {
48 let fields: Vec<Field> = self
49 .columns
50 .iter()
51 .map(|col| {
52 Field::new(&col.name, col.arrow_type(), true)
55 })
56 .collect();
57 Arc::new(Schema::new(fields))
58 }
59
60 #[must_use]
62 pub fn key_columns(&self) -> Vec<&str> {
63 self.columns
64 .iter()
65 .filter(|c| c.is_key)
66 .map(|c| c.name.as_str())
67 .collect()
68 }
69}
70
71#[derive(Debug, Clone, Default)]
76pub struct RelationCache {
77 relations: HashMap<u32, RelationInfo>,
78}
79
80impl RelationCache {
81 #[must_use]
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn insert(&mut self, info: RelationInfo) {
89 self.relations.insert(info.relation_id, info);
90 }
91
92 #[must_use]
94 pub fn get(&self, relation_id: u32) -> Option<&RelationInfo> {
95 self.relations.get(&relation_id)
96 }
97
98 #[must_use]
100 pub fn len(&self) -> usize {
101 self.relations.len()
102 }
103
104 #[must_use]
106 pub fn is_empty(&self) -> bool {
107 self.relations.is_empty()
108 }
109
110 pub fn clear(&mut self) {
112 self.relations.clear();
113 }
114}
115
116#[must_use]
121pub fn cdc_envelope_schema() -> SchemaRef {
122 Arc::new(Schema::new(vec![
123 Field::new("_table", DataType::Utf8, false),
124 Field::new("_op", DataType::Utf8, false),
125 Field::new("_lsn", DataType::UInt64, false),
126 Field::new("_ts_ms", DataType::Int64, false),
127 Field::new("_before", DataType::Utf8, true),
128 Field::new("_after", DataType::Utf8, true),
129 ]))
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
136
137 fn sample_relation() -> RelationInfo {
138 RelationInfo {
139 relation_id: 16384,
140 namespace: "public".to_string(),
141 name: "users".to_string(),
142 replica_identity: 'd',
143 columns: vec![
144 PgColumn::new("id".to_string(), INT8_OID, -1, true),
145 PgColumn::new("name".to_string(), TEXT_OID, -1, false),
146 PgColumn::new("age".to_string(), INT4_OID, -1, false),
147 ],
148 }
149 }
150
151 #[test]
152 fn test_relation_full_name_public() {
153 let rel = sample_relation();
154 assert_eq!(rel.full_name(), "users");
155 }
156
157 #[test]
158 fn test_relation_full_name_custom_schema() {
159 let mut rel = sample_relation();
160 rel.namespace = "app".to_string();
161 assert_eq!(rel.full_name(), "app.users");
162 }
163
164 #[test]
165 fn test_arrow_schema_generation() {
166 let rel = sample_relation();
167 let schema = rel.arrow_schema();
168 assert_eq!(schema.fields().len(), 3);
169 assert_eq!(schema.field(0).name(), "id");
170 assert_eq!(*schema.field(0).data_type(), DataType::Int64);
171 assert_eq!(schema.field(1).name(), "name");
172 assert_eq!(*schema.field(1).data_type(), DataType::Utf8);
173 }
174
175 #[test]
176 fn test_key_columns() {
177 let rel = sample_relation();
178 let keys = rel.key_columns();
179 assert_eq!(keys, vec!["id"]);
180 }
181
182 #[test]
183 fn test_relation_cache() {
184 let mut cache = RelationCache::new();
185 assert!(cache.is_empty());
186
187 cache.insert(sample_relation());
188 assert_eq!(cache.len(), 1);
189 assert!(cache.get(16384).is_some());
190 assert!(cache.get(99999).is_none());
191 }
192
193 #[test]
194 fn test_cache_replace() {
195 let mut cache = RelationCache::new();
196 cache.insert(sample_relation());
197
198 let mut updated = sample_relation();
199 updated
200 .columns
201 .push(PgColumn::new("email".to_string(), TEXT_OID, -1, false));
202 cache.insert(updated);
203
204 assert_eq!(cache.len(), 1);
205 assert_eq!(cache.get(16384).unwrap().columns.len(), 4);
206 }
207
208 #[test]
209 fn test_cache_clear() {
210 let mut cache = RelationCache::new();
211 cache.insert(sample_relation());
212 cache.clear();
213 assert!(cache.is_empty());
214 }
215
216 #[test]
217 fn test_cdc_envelope_schema() {
218 let schema = cdc_envelope_schema();
219 assert_eq!(schema.fields().len(), 6);
220 assert_eq!(schema.field(0).name(), "_table");
221 assert_eq!(schema.field(1).name(), "_op");
222 assert_eq!(schema.field(2).name(), "_lsn");
223 assert_eq!(schema.field(3).name(), "_ts_ms");
224 assert_eq!(schema.field(4).name(), "_before");
225 assert_eq!(schema.field(5).name(), "_after");
226 assert!(schema.field(4).is_nullable());
228 assert!(schema.field(5).is_nullable());
229 }
230}