Skip to main content

laminar_connectors/postgres/
mod.rs

1//! `PostgreSQL` sink connector.
2//!
3//! Writes Arrow `RecordBatch` to `PostgreSQL` tables using two strategies:
4//! - **Append mode**: COPY BINARY for maximum throughput (>500K rows/sec)
5//! - **Upsert mode**: `INSERT ... ON CONFLICT DO UPDATE` with UNNEST arrays
6//!
7//! Exactly-once semantics use co-transactional offset storage — data and
8//! epoch markers committed in the same `PostgreSQL` transaction.
9//!
10//! # Architecture
11//!
12//! ```text
13//! Ring 0 (Hot Path):  SPSC push only (~5ns, zero sink code)
14//! Ring 1 (Background): Batch buffering → COPY/INSERT → transaction mgmt
15//! Ring 2 (Control):    Connection pool, table creation, epoch recovery
16//! ```
17//!
18//! # Module Structure
19//!
20//! - `sink_config` - Configuration and enums
21//! - `sink` - `PostgresSink` implementing `SinkConnector`
22//! - `sink_metrics` - Lock-free atomic metrics
23//! - `types` - Arrow → `PostgreSQL` type mapping
24//!
25//! # Usage
26//!
27//! ```rust,ignore
28//! use laminar_connectors::postgres::{PostgresSink, PostgresSinkConfig, WriteMode};
29//!
30//! let config = PostgresSinkConfig {
31//!     hostname: "localhost".to_string(),
32//!     database: "mydb".to_string(),
33//!     table_name: "events".to_string(),
34//!     write_mode: WriteMode::Upsert,
35//!     primary_key_columns: vec!["id".to_string()],
36//!     ..Default::default()
37//! };
38//!
39//! let sink = PostgresSink::new(schema, config);
40//! ```
41
42pub mod sink;
43pub mod sink_config;
44pub mod sink_metrics;
45pub mod types;
46
47// Re-export primary types at module level.
48pub use sink::PostgresSink;
49pub use sink_config::{DeliveryGuarantee, PostgresSinkConfig, SslMode, WriteMode};
50pub use sink_metrics::PostgresSinkMetrics;
51
52use std::sync::Arc;
53
54use arrow_schema::{DataType, Field, Schema};
55
56use crate::config::{ConfigKeySpec, ConnectorInfo};
57use crate::registry::ConnectorRegistry;
58
59/// Registers the `PostgreSQL` sink connector with the given registry.
60pub fn register_postgres_sink(registry: &ConnectorRegistry) {
61    let info = ConnectorInfo {
62        name: "postgres-sink".to_string(),
63        display_name: "PostgreSQL Sink".to_string(),
64        version: env!("CARGO_PKG_VERSION").to_string(),
65        is_source: false,
66        is_sink: true,
67        config_keys: postgres_sink_config_keys(),
68    };
69
70    registry.register_sink(
71        "postgres-sink",
72        info,
73        Arc::new(|| {
74            // Default schema (overridden during open).
75            let schema = Arc::new(Schema::new(vec![
76                Field::new("key", DataType::Utf8, true),
77                Field::new("value", DataType::Utf8, false),
78            ]));
79            Box::new(PostgresSink::new(schema, PostgresSinkConfig::default()))
80        }),
81    );
82}
83
84fn postgres_sink_config_keys() -> Vec<ConfigKeySpec> {
85    vec![
86        ConfigKeySpec::required("hostname", "PostgreSQL server hostname"),
87        ConfigKeySpec::required("database", "Target database name"),
88        ConfigKeySpec::required("username", "Authentication username"),
89        ConfigKeySpec::required("table.name", "Target table name"),
90        ConfigKeySpec::optional("password", "Authentication password", ""),
91        ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
92        ConfigKeySpec::optional("schema.name", "Target schema name", "public"),
93        ConfigKeySpec::optional(
94            "write.mode",
95            "Write mode: 'append' (COPY BINARY) or 'upsert' (ON CONFLICT)",
96            "append",
97        ),
98        ConfigKeySpec::optional(
99            "primary.key",
100            "Comma-separated primary key columns (required for upsert mode)",
101            "",
102        ),
103        ConfigKeySpec::optional("batch.size", "Max records before flush", "4096"),
104        ConfigKeySpec::optional("flush.interval.ms", "Max time before flush (ms)", "1000"),
105        ConfigKeySpec::optional("pool.size", "Connection pool size", "4"),
106        ConfigKeySpec::optional("connect.timeout.ms", "Connection timeout (ms)", "10000"),
107        ConfigKeySpec::optional(
108            "ssl.mode",
109            "SSL mode: disable/prefer/require/verify-ca/verify-full",
110            "prefer",
111        ),
112        ConfigKeySpec::optional(
113            "auto.create.table",
114            "Create target table from Arrow schema if missing",
115            "false",
116        ),
117        ConfigKeySpec::optional(
118            "changelog.mode",
119            "Handle Z-set records (split INSERT/DELETE by _op)",
120            "false",
121        ),
122        ConfigKeySpec::optional(
123            "delivery.guarantee",
124            "at_least_once or exactly_once",
125            "at_least_once",
126        ),
127        ConfigKeySpec::optional(
128            "sink.id",
129            "Unique ID for offset tracking (auto-generated if not set)",
130            "",
131        ),
132    ]
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn test_register_postgres_sink() {
141        let registry = ConnectorRegistry::new();
142        register_postgres_sink(&registry);
143
144        let info = registry.sink_info("postgres-sink");
145        assert!(info.is_some());
146        let info = info.unwrap();
147        assert_eq!(info.name, "postgres-sink");
148        assert!(info.is_sink);
149        assert!(!info.is_source);
150        assert!(!info.config_keys.is_empty());
151    }
152
153    #[test]
154    fn test_config_keys_required() {
155        let keys = postgres_sink_config_keys();
156        let required: Vec<&str> = keys
157            .iter()
158            .filter(|k| k.required)
159            .map(|k| k.key.as_str())
160            .collect();
161        assert!(required.contains(&"hostname"));
162        assert!(required.contains(&"database"));
163        assert!(required.contains(&"username"));
164        assert!(required.contains(&"table.name"));
165    }
166
167    #[test]
168    fn test_config_keys_optional_present() {
169        let keys = postgres_sink_config_keys();
170        let optional: Vec<&str> = keys
171            .iter()
172            .filter(|k| !k.required)
173            .map(|k| k.key.as_str())
174            .collect();
175        assert!(optional.contains(&"port"));
176        assert!(optional.contains(&"write.mode"));
177        assert!(optional.contains(&"primary.key"));
178        assert!(optional.contains(&"batch.size"));
179        assert!(optional.contains(&"delivery.guarantee"));
180        assert!(optional.contains(&"changelog.mode"));
181        assert!(optional.contains(&"ssl.mode"));
182    }
183
184    #[test]
185    fn test_factory_creates_sink() {
186        let registry = ConnectorRegistry::new();
187        register_postgres_sink(&registry);
188
189        let config = crate::config::ConnectorConfig::new("postgres-sink");
190        let sink = registry.create_sink(&config);
191        assert!(sink.is_ok());
192    }
193}