Skip to main content

laminar_connectors/cdc/postgres/
types.rs

1//! `PostgreSQL` type OID to Arrow `DataType` mapping.
2//!
3//! Maps `PostgreSQL`'s internal type OIDs to Apache Arrow data types
4//! for zero-copy CDC event conversion. Also provides text-format
5//! value parsing for `pgoutput` protocol data.
6
7use arrow_schema::DataType;
8
9// ── Well-known PostgreSQL type OIDs ──
10
11/// `bool` — boolean
12pub const BOOL_OID: u32 = 16;
13/// `bytea` — variable-length binary string
14pub const BYTEA_OID: u32 = 17;
15/// `char` — single character (internal type)
16pub const CHAR_OID: u32 = 18;
17/// `int8` (bigint) — 8-byte signed integer
18pub const INT8_OID: u32 = 20;
19/// `int2` (smallint) — 2-byte signed integer
20pub const INT2_OID: u32 = 21;
21/// `int4` (integer) — 4-byte signed integer
22pub const INT4_OID: u32 = 23;
23/// `text` — variable-length text
24pub const TEXT_OID: u32 = 25;
25/// `oid` — object identifier (unsigned 4 bytes)
26pub const OID_OID: u32 = 26;
27/// `float4` (real) — single precision floating-point
28pub const FLOAT4_OID: u32 = 700;
29/// `float8` (double precision) — double precision floating-point
30pub const FLOAT8_OID: u32 = 701;
31/// `varchar` — variable-length character string
32pub const VARCHAR_OID: u32 = 1043;
33/// `date` — calendar date
34pub const DATE_OID: u32 = 1082;
35/// `time` — time of day (without timezone)
36pub const TIME_OID: u32 = 1083;
37/// `timestamp` — date and time (without timezone)
38pub const TIMESTAMP_OID: u32 = 1114;
39/// `timestamptz` — date and time with timezone
40pub const TIMESTAMPTZ_OID: u32 = 1184;
41/// `interval` — time interval
42pub const INTERVAL_OID: u32 = 1186;
43/// `numeric` — exact numeric with arbitrary precision
44pub const NUMERIC_OID: u32 = 1700;
45/// `uuid` — universally unique identifier
46pub const UUID_OID: u32 = 2950;
47/// `json` — JSON data
48pub const JSON_OID: u32 = 114;
49/// `jsonb` — binary JSON data
50pub const JSONB_OID: u32 = 3802;
51/// `xml` — XML data
52pub const XML_OID: u32 = 142;
53/// `inet` — IPv4/IPv6 host address
54pub const INET_OID: u32 = 869;
55/// `cidr` — IPv4/IPv6 network address
56pub const CIDR_OID: u32 = 650;
57/// `macaddr` — MAC address
58pub const MACADDR_OID: u32 = 829;
59/// `bpchar` — fixed-length character (char(n))
60pub const BPCHAR_OID: u32 = 1042;
61/// `name` — 63-byte internal name type
62pub const NAME_OID: u32 = 19;
63
64// ── Array type OIDs ──
65
66/// `bool[]`
67pub const BOOL_ARRAY_OID: u32 = 1000;
68/// `int2[]`
69pub const INT2_ARRAY_OID: u32 = 1005;
70/// `int4[]`
71pub const INT4_ARRAY_OID: u32 = 1007;
72/// `int8[]`
73pub const INT8_ARRAY_OID: u32 = 1016;
74/// `float4[]`
75pub const FLOAT4_ARRAY_OID: u32 = 1021;
76/// `float8[]`
77pub const FLOAT8_ARRAY_OID: u32 = 1022;
78/// `text[]`
79pub const TEXT_ARRAY_OID: u32 = 1009;
80/// `varchar[]`
81pub const VARCHAR_ARRAY_OID: u32 = 1015;
82
83/// A column descriptor from a `PostgreSQL` relation.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct PgColumn {
86    /// Column name.
87    pub name: String,
88
89    /// `PostgreSQL` type OID.
90    pub type_oid: u32,
91
92    /// Type modifier (e.g., precision for numeric, length for varchar).
93    /// -1 means no modifier.
94    pub type_modifier: i32,
95
96    /// Whether this column is part of the replica identity key.
97    pub is_key: bool,
98}
99
100impl PgColumn {
101    /// Creates a new column descriptor.
102    #[must_use]
103    pub fn new(name: String, type_oid: u32, type_modifier: i32, is_key: bool) -> Self {
104        Self {
105            name,
106            type_oid,
107            type_modifier,
108            is_key,
109        }
110    }
111
112    /// Returns the Arrow `DataType` for this column.
113    #[must_use]
114    pub fn arrow_type(&self) -> DataType {
115        pg_type_to_arrow(self.type_oid)
116    }
117}
118
119/// Maps a `PostgreSQL` type OID to an Arrow `DataType`.
120///
121/// Covers the most common `PostgreSQL` types. Unknown OIDs map to
122/// `DataType::Utf8` as a safe fallback (text representation).
123#[must_use]
124#[allow(clippy::match_same_arms)] // Arms kept separate to document type categories
125pub fn pg_type_to_arrow(oid: u32) -> DataType {
126    match oid {
127        // Boolean
128        BOOL_OID => DataType::Boolean,
129
130        // Integer types
131        INT2_OID => DataType::Int16,
132        INT4_OID | OID_OID => DataType::Int32,
133        INT8_OID => DataType::Int64,
134
135        // Floating point
136        FLOAT4_OID => DataType::Float32,
137        FLOAT8_OID => DataType::Float64,
138
139        // Numeric (arbitrary precision → string to preserve precision)
140        NUMERIC_OID => DataType::Utf8,
141
142        // Character types
143        CHAR_OID | BPCHAR_OID | VARCHAR_OID | TEXT_OID | NAME_OID => DataType::Utf8,
144
145        // Binary
146        BYTEA_OID => DataType::Binary,
147
148        // Date/Time
149        DATE_OID => DataType::Date32,
150        TIME_OID => DataType::Time64(arrow_schema::TimeUnit::Microsecond),
151        TIMESTAMP_OID => DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
152        TIMESTAMPTZ_OID => {
153            DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into()))
154        }
155        INTERVAL_OID => DataType::Utf8,
156
157        // UUID
158        UUID_OID => DataType::Utf8,
159
160        // JSON
161        JSON_OID | JSONB_OID => DataType::Utf8,
162
163        // Network types
164        INET_OID | CIDR_OID | MACADDR_OID => DataType::Utf8,
165
166        // XML
167        XML_OID => DataType::Utf8,
168
169        // Array types (represented as JSON strings)
170        BOOL_ARRAY_OID | INT2_ARRAY_OID | INT4_ARRAY_OID | INT8_ARRAY_OID | FLOAT4_ARRAY_OID
171        | FLOAT8_ARRAY_OID | TEXT_ARRAY_OID | VARCHAR_ARRAY_OID => DataType::Utf8,
172
173        // Unknown types → text fallback
174        _ => DataType::Utf8,
175    }
176}
177
178/// Returns a human-readable name for a `PostgreSQL` type OID.
179#[must_use]
180pub fn pg_type_name(oid: u32) -> &'static str {
181    match oid {
182        BOOL_OID => "bool",
183        BYTEA_OID => "bytea",
184        CHAR_OID => "char",
185        INT2_OID => "int2",
186        INT4_OID => "int4",
187        INT8_OID => "int8",
188        TEXT_OID => "text",
189        OID_OID => "oid",
190        FLOAT4_OID => "float4",
191        FLOAT8_OID => "float8",
192        VARCHAR_OID => "varchar",
193        BPCHAR_OID => "bpchar",
194        NAME_OID => "name",
195        DATE_OID => "date",
196        TIME_OID => "time",
197        TIMESTAMP_OID => "timestamp",
198        TIMESTAMPTZ_OID => "timestamptz",
199        INTERVAL_OID => "interval",
200        NUMERIC_OID => "numeric",
201        UUID_OID => "uuid",
202        JSON_OID => "json",
203        JSONB_OID => "jsonb",
204        XML_OID => "xml",
205        INET_OID => "inet",
206        CIDR_OID => "cidr",
207        MACADDR_OID => "macaddr",
208        _ => "unknown",
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_integer_type_mapping() {
218        assert_eq!(pg_type_to_arrow(INT2_OID), DataType::Int16);
219        assert_eq!(pg_type_to_arrow(INT4_OID), DataType::Int32);
220        assert_eq!(pg_type_to_arrow(INT8_OID), DataType::Int64);
221    }
222
223    #[test]
224    fn test_float_type_mapping() {
225        assert_eq!(pg_type_to_arrow(FLOAT4_OID), DataType::Float32);
226        assert_eq!(pg_type_to_arrow(FLOAT8_OID), DataType::Float64);
227    }
228
229    #[test]
230    fn test_text_type_mapping() {
231        assert_eq!(pg_type_to_arrow(TEXT_OID), DataType::Utf8);
232        assert_eq!(pg_type_to_arrow(VARCHAR_OID), DataType::Utf8);
233        assert_eq!(pg_type_to_arrow(BPCHAR_OID), DataType::Utf8);
234        assert_eq!(pg_type_to_arrow(NAME_OID), DataType::Utf8);
235    }
236
237    #[test]
238    fn test_bool_type_mapping() {
239        assert_eq!(pg_type_to_arrow(BOOL_OID), DataType::Boolean);
240    }
241
242    #[test]
243    fn test_timestamp_type_mapping() {
244        assert!(matches!(
245            pg_type_to_arrow(TIMESTAMP_OID),
246            DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
247        ));
248        assert!(matches!(
249            pg_type_to_arrow(TIMESTAMPTZ_OID),
250            DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some(_))
251        ));
252    }
253
254    #[test]
255    fn test_date_time_mapping() {
256        assert_eq!(pg_type_to_arrow(DATE_OID), DataType::Date32);
257        assert!(matches!(
258            pg_type_to_arrow(TIME_OID),
259            DataType::Time64(arrow_schema::TimeUnit::Microsecond)
260        ));
261    }
262
263    #[test]
264    fn test_binary_type_mapping() {
265        assert_eq!(pg_type_to_arrow(BYTEA_OID), DataType::Binary);
266    }
267
268    #[test]
269    fn test_json_type_mapping() {
270        assert_eq!(pg_type_to_arrow(JSON_OID), DataType::Utf8);
271        assert_eq!(pg_type_to_arrow(JSONB_OID), DataType::Utf8);
272    }
273
274    #[test]
275    fn test_unknown_type_fallback() {
276        assert_eq!(pg_type_to_arrow(99999), DataType::Utf8);
277    }
278
279    #[test]
280    fn test_pg_column() {
281        let col = PgColumn::new("id".to_string(), INT8_OID, -1, true);
282        assert_eq!(col.name, "id");
283        assert_eq!(col.type_oid, INT8_OID);
284        assert!(col.is_key);
285        assert_eq!(col.arrow_type(), DataType::Int64);
286    }
287
288    #[test]
289    fn test_pg_type_name() {
290        assert_eq!(pg_type_name(INT4_OID), "int4");
291        assert_eq!(pg_type_name(TEXT_OID), "text");
292        assert_eq!(pg_type_name(BOOL_OID), "bool");
293        assert_eq!(pg_type_name(99999), "unknown");
294    }
295
296    #[test]
297    fn test_numeric_maps_to_utf8() {
298        // Numeric must be Utf8 to preserve arbitrary precision
299        assert_eq!(pg_type_to_arrow(NUMERIC_OID), DataType::Utf8);
300    }
301
302    #[test]
303    fn test_array_types_map_to_utf8() {
304        assert_eq!(pg_type_to_arrow(INT4_ARRAY_OID), DataType::Utf8);
305        assert_eq!(pg_type_to_arrow(TEXT_ARRAY_OID), DataType::Utf8);
306    }
307}