laminar_connectors/cdc/postgres/
types.rs1use arrow_schema::DataType;
8
9pub const BOOL_OID: u32 = 16;
13pub const BYTEA_OID: u32 = 17;
15pub const CHAR_OID: u32 = 18;
17pub const INT8_OID: u32 = 20;
19pub const INT2_OID: u32 = 21;
21pub const INT4_OID: u32 = 23;
23pub const TEXT_OID: u32 = 25;
25pub const OID_OID: u32 = 26;
27pub const FLOAT4_OID: u32 = 700;
29pub const FLOAT8_OID: u32 = 701;
31pub const VARCHAR_OID: u32 = 1043;
33pub const DATE_OID: u32 = 1082;
35pub const TIME_OID: u32 = 1083;
37pub const TIMESTAMP_OID: u32 = 1114;
39pub const TIMESTAMPTZ_OID: u32 = 1184;
41pub const INTERVAL_OID: u32 = 1186;
43pub const NUMERIC_OID: u32 = 1700;
45pub const UUID_OID: u32 = 2950;
47pub const JSON_OID: u32 = 114;
49pub const JSONB_OID: u32 = 3802;
51pub const XML_OID: u32 = 142;
53pub const INET_OID: u32 = 869;
55pub const CIDR_OID: u32 = 650;
57pub const MACADDR_OID: u32 = 829;
59pub const BPCHAR_OID: u32 = 1042;
61pub const NAME_OID: u32 = 19;
63
64pub const BOOL_ARRAY_OID: u32 = 1000;
68pub const INT2_ARRAY_OID: u32 = 1005;
70pub const INT4_ARRAY_OID: u32 = 1007;
72pub const INT8_ARRAY_OID: u32 = 1016;
74pub const FLOAT4_ARRAY_OID: u32 = 1021;
76pub const FLOAT8_ARRAY_OID: u32 = 1022;
78pub const TEXT_ARRAY_OID: u32 = 1009;
80pub const VARCHAR_ARRAY_OID: u32 = 1015;
82
83#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct PgColumn {
86 pub name: String,
88
89 pub type_oid: u32,
91
92 pub type_modifier: i32,
95
96 pub is_key: bool,
98}
99
100impl PgColumn {
101 #[must_use]
103 pub fn new(name: String, type_oid: u32, type_modifier: i32, is_key: bool) -> Self {
104 Self {
105 name,
106 type_oid,
107 type_modifier,
108 is_key,
109 }
110 }
111
112 #[must_use]
114 pub fn arrow_type(&self) -> DataType {
115 pg_type_to_arrow(self.type_oid)
116 }
117}
118
119#[must_use]
124#[allow(clippy::match_same_arms)] pub fn pg_type_to_arrow(oid: u32) -> DataType {
126 match oid {
127 BOOL_OID => DataType::Boolean,
129
130 INT2_OID => DataType::Int16,
132 INT4_OID | OID_OID => DataType::Int32,
133 INT8_OID => DataType::Int64,
134
135 FLOAT4_OID => DataType::Float32,
137 FLOAT8_OID => DataType::Float64,
138
139 NUMERIC_OID => DataType::Utf8,
141
142 CHAR_OID | BPCHAR_OID | VARCHAR_OID | TEXT_OID | NAME_OID => DataType::Utf8,
144
145 BYTEA_OID => DataType::Binary,
147
148 DATE_OID => DataType::Date32,
150 TIME_OID => DataType::Time64(arrow_schema::TimeUnit::Microsecond),
151 TIMESTAMP_OID => DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
152 TIMESTAMPTZ_OID => {
153 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into()))
154 }
155 INTERVAL_OID => DataType::Utf8,
156
157 UUID_OID => DataType::Utf8,
159
160 JSON_OID | JSONB_OID => DataType::Utf8,
162
163 INET_OID | CIDR_OID | MACADDR_OID => DataType::Utf8,
165
166 XML_OID => DataType::Utf8,
168
169 BOOL_ARRAY_OID | INT2_ARRAY_OID | INT4_ARRAY_OID | INT8_ARRAY_OID | FLOAT4_ARRAY_OID
171 | FLOAT8_ARRAY_OID | TEXT_ARRAY_OID | VARCHAR_ARRAY_OID => DataType::Utf8,
172
173 _ => DataType::Utf8,
175 }
176}
177
178#[must_use]
180pub fn pg_type_name(oid: u32) -> &'static str {
181 match oid {
182 BOOL_OID => "bool",
183 BYTEA_OID => "bytea",
184 CHAR_OID => "char",
185 INT2_OID => "int2",
186 INT4_OID => "int4",
187 INT8_OID => "int8",
188 TEXT_OID => "text",
189 OID_OID => "oid",
190 FLOAT4_OID => "float4",
191 FLOAT8_OID => "float8",
192 VARCHAR_OID => "varchar",
193 BPCHAR_OID => "bpchar",
194 NAME_OID => "name",
195 DATE_OID => "date",
196 TIME_OID => "time",
197 TIMESTAMP_OID => "timestamp",
198 TIMESTAMPTZ_OID => "timestamptz",
199 INTERVAL_OID => "interval",
200 NUMERIC_OID => "numeric",
201 UUID_OID => "uuid",
202 JSON_OID => "json",
203 JSONB_OID => "jsonb",
204 XML_OID => "xml",
205 INET_OID => "inet",
206 CIDR_OID => "cidr",
207 MACADDR_OID => "macaddr",
208 _ => "unknown",
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_integer_type_mapping() {
218 assert_eq!(pg_type_to_arrow(INT2_OID), DataType::Int16);
219 assert_eq!(pg_type_to_arrow(INT4_OID), DataType::Int32);
220 assert_eq!(pg_type_to_arrow(INT8_OID), DataType::Int64);
221 }
222
223 #[test]
224 fn test_float_type_mapping() {
225 assert_eq!(pg_type_to_arrow(FLOAT4_OID), DataType::Float32);
226 assert_eq!(pg_type_to_arrow(FLOAT8_OID), DataType::Float64);
227 }
228
229 #[test]
230 fn test_text_type_mapping() {
231 assert_eq!(pg_type_to_arrow(TEXT_OID), DataType::Utf8);
232 assert_eq!(pg_type_to_arrow(VARCHAR_OID), DataType::Utf8);
233 assert_eq!(pg_type_to_arrow(BPCHAR_OID), DataType::Utf8);
234 assert_eq!(pg_type_to_arrow(NAME_OID), DataType::Utf8);
235 }
236
237 #[test]
238 fn test_bool_type_mapping() {
239 assert_eq!(pg_type_to_arrow(BOOL_OID), DataType::Boolean);
240 }
241
242 #[test]
243 fn test_timestamp_type_mapping() {
244 assert!(matches!(
245 pg_type_to_arrow(TIMESTAMP_OID),
246 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
247 ));
248 assert!(matches!(
249 pg_type_to_arrow(TIMESTAMPTZ_OID),
250 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some(_))
251 ));
252 }
253
254 #[test]
255 fn test_date_time_mapping() {
256 assert_eq!(pg_type_to_arrow(DATE_OID), DataType::Date32);
257 assert!(matches!(
258 pg_type_to_arrow(TIME_OID),
259 DataType::Time64(arrow_schema::TimeUnit::Microsecond)
260 ));
261 }
262
263 #[test]
264 fn test_binary_type_mapping() {
265 assert_eq!(pg_type_to_arrow(BYTEA_OID), DataType::Binary);
266 }
267
268 #[test]
269 fn test_json_type_mapping() {
270 assert_eq!(pg_type_to_arrow(JSON_OID), DataType::Utf8);
271 assert_eq!(pg_type_to_arrow(JSONB_OID), DataType::Utf8);
272 }
273
274 #[test]
275 fn test_unknown_type_fallback() {
276 assert_eq!(pg_type_to_arrow(99999), DataType::Utf8);
277 }
278
279 #[test]
280 fn test_pg_column() {
281 let col = PgColumn::new("id".to_string(), INT8_OID, -1, true);
282 assert_eq!(col.name, "id");
283 assert_eq!(col.type_oid, INT8_OID);
284 assert!(col.is_key);
285 assert_eq!(col.arrow_type(), DataType::Int64);
286 }
287
288 #[test]
289 fn test_pg_type_name() {
290 assert_eq!(pg_type_name(INT4_OID), "int4");
291 assert_eq!(pg_type_name(TEXT_OID), "text");
292 assert_eq!(pg_type_name(BOOL_OID), "bool");
293 assert_eq!(pg_type_name(99999), "unknown");
294 }
295
296 #[test]
297 fn test_numeric_maps_to_utf8() {
298 assert_eq!(pg_type_to_arrow(NUMERIC_OID), DataType::Utf8);
300 }
301
302 #[test]
303 fn test_array_types_map_to_utf8() {
304 assert_eq!(pg_type_to_arrow(INT4_ARRAY_OID), DataType::Utf8);
305 assert_eq!(pg_type_to_arrow(TEXT_ARRAY_OID), DataType::Utf8);
306 }
307}