laminar_connectors/cdc/postgres/
schema.rs1use std::collections::HashMap;
8use std::sync::Arc;
9
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11
12use super::types::PgColumn;
13
14#[derive(Debug, Clone)]
16pub struct RelationInfo {
17 pub relation_id: u32,
19
20 pub namespace: String,
22
23 pub name: String,
25
26 pub replica_identity: char,
29
30 pub columns: Vec<PgColumn>,
32}
33
34impl RelationInfo {
35 #[must_use]
37 pub fn full_name(&self) -> String {
38 if self.namespace.is_empty() || self.namespace == "public" {
39 self.name.clone()
40 } else {
41 format!("{}.{}", self.namespace, self.name)
42 }
43 }
44
45 #[must_use]
47 pub fn arrow_schema(&self) -> SchemaRef {
48 let fields: Vec<Field> = self
49 .columns
50 .iter()
51 .map(|col| {
52 Field::new(&col.name, col.arrow_type(), true)
55 })
56 .collect();
57 Arc::new(Schema::new(fields))
58 }
59
60 #[must_use]
62 pub fn key_columns(&self) -> Vec<&str> {
63 self.columns
64 .iter()
65 .filter(|c| c.is_key)
66 .map(|c| c.name.as_str())
67 .collect()
68 }
69}
70
71#[derive(Debug, Clone, Default)]
76pub struct RelationCache {
77 relations: HashMap<u32, RelationInfo>,
78}
79
80impl RelationCache {
81 #[must_use]
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn insert(&mut self, info: RelationInfo) {
89 self.relations.insert(info.relation_id, info);
90 }
91
92 #[must_use]
94 pub fn get(&self, relation_id: u32) -> Option<&RelationInfo> {
95 self.relations.get(&relation_id)
96 }
97
98 #[must_use]
100 pub fn len(&self) -> usize {
101 self.relations.len()
102 }
103
104 #[must_use]
106 pub fn is_empty(&self) -> bool {
107 self.relations.is_empty()
108 }
109
110 pub fn clear(&mut self) {
112 self.relations.clear();
113 }
114}
115
116#[must_use]
121pub fn cdc_envelope_schema() -> SchemaRef {
122 use arrow_schema::TimeUnit;
123 Arc::new(Schema::new(vec![
124 Field::new("_table", DataType::Utf8, false),
125 Field::new("_op", DataType::Utf8, false),
126 Field::new("_lsn", DataType::UInt64, false),
127 Field::new(
128 "_ts_ms",
129 DataType::Timestamp(TimeUnit::Millisecond, None),
130 false,
131 ),
132 Field::new("_before", DataType::Utf8, true),
133 Field::new("_after", DataType::Utf8, true),
134 ]))
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::cdc::postgres::types::{INT4_OID, INT8_OID, TEXT_OID};
141
142 fn sample_relation() -> RelationInfo {
143 RelationInfo {
144 relation_id: 16384,
145 namespace: "public".to_string(),
146 name: "users".to_string(),
147 replica_identity: 'd',
148 columns: vec![
149 PgColumn::new("id".to_string(), INT8_OID, -1, true),
150 PgColumn::new("name".to_string(), TEXT_OID, -1, false),
151 PgColumn::new("age".to_string(), INT4_OID, -1, false),
152 ],
153 }
154 }
155
156 #[test]
157 fn test_relation_full_name_public() {
158 let rel = sample_relation();
159 assert_eq!(rel.full_name(), "users");
160 }
161
162 #[test]
163 fn test_relation_full_name_custom_schema() {
164 let mut rel = sample_relation();
165 rel.namespace = "app".to_string();
166 assert_eq!(rel.full_name(), "app.users");
167 }
168
169 #[test]
170 fn test_arrow_schema_generation() {
171 let rel = sample_relation();
172 let schema = rel.arrow_schema();
173 assert_eq!(schema.fields().len(), 3);
174 assert_eq!(schema.field(0).name(), "id");
175 assert_eq!(*schema.field(0).data_type(), DataType::Int64);
176 assert_eq!(schema.field(1).name(), "name");
177 assert_eq!(*schema.field(1).data_type(), DataType::Utf8);
178 }
179
180 #[test]
181 fn test_key_columns() {
182 let rel = sample_relation();
183 let keys = rel.key_columns();
184 assert_eq!(keys, vec!["id"]);
185 }
186
187 #[test]
188 fn test_relation_cache() {
189 let mut cache = RelationCache::new();
190 assert!(cache.is_empty());
191
192 cache.insert(sample_relation());
193 assert_eq!(cache.len(), 1);
194 assert!(cache.get(16384).is_some());
195 assert!(cache.get(99999).is_none());
196 }
197
198 #[test]
199 fn test_cache_replace() {
200 let mut cache = RelationCache::new();
201 cache.insert(sample_relation());
202
203 let mut updated = sample_relation();
204 updated
205 .columns
206 .push(PgColumn::new("email".to_string(), TEXT_OID, -1, false));
207 cache.insert(updated);
208
209 assert_eq!(cache.len(), 1);
210 assert_eq!(cache.get(16384).unwrap().columns.len(), 4);
211 }
212
213 #[test]
214 fn test_cache_clear() {
215 let mut cache = RelationCache::new();
216 cache.insert(sample_relation());
217 cache.clear();
218 assert!(cache.is_empty());
219 }
220
221 #[test]
222 fn test_cdc_envelope_schema() {
223 let schema = cdc_envelope_schema();
224 assert_eq!(schema.fields().len(), 6);
225 assert_eq!(schema.field(0).name(), "_table");
226 assert_eq!(schema.field(1).name(), "_op");
227 assert_eq!(schema.field(2).name(), "_lsn");
228 assert_eq!(schema.field(3).name(), "_ts_ms");
229 assert_eq!(schema.field(4).name(), "_before");
230 assert_eq!(schema.field(5).name(), "_after");
231 assert!(schema.field(4).is_nullable());
233 assert!(schema.field(5).is_nullable());
234 }
235}