Skip to main content

laminar_connectors/postgres/
mod.rs

1//! `PostgreSQL` sink connector.
2
3pub mod sink;
4pub mod sink_config;
5pub mod sink_metrics;
6pub mod types;
7
8// Re-export primary types at module level.
9pub use crate::connector::DeliveryGuarantee;
10pub use sink::PostgresSink;
11pub use sink_config::{PostgresSinkConfig, SslMode, WriteMode};
12pub use sink_metrics::PostgresSinkMetrics;
13
14use std::sync::Arc;
15
16use arrow_schema::{DataType, Field, Schema};
17
18use crate::config::{ConfigKeySpec, ConnectorInfo};
19use crate::registry::ConnectorRegistry;
20
21/// Registers the `PostgreSQL` sink connector with the given registry.
22pub fn register_postgres_sink(registry: &ConnectorRegistry) {
23    let info = ConnectorInfo {
24        name: "postgres-sink".to_string(),
25        display_name: "PostgreSQL Sink".to_string(),
26        version: env!("CARGO_PKG_VERSION").to_string(),
27        is_source: false,
28        is_sink: true,
29        config_keys: postgres_sink_config_keys(),
30    };
31
32    registry.register_sink(
33        "postgres-sink",
34        info,
35        Arc::new(|registry: Option<&prometheus::Registry>| {
36            // Default schema (overridden during open).
37            let schema = Arc::new(Schema::new(vec![
38                Field::new("key", DataType::Utf8, true),
39                Field::new("value", DataType::Utf8, false),
40            ]));
41            Box::new(PostgresSink::new(
42                schema,
43                PostgresSinkConfig::default(),
44                registry,
45            ))
46        }),
47    );
48}
49
50fn postgres_sink_config_keys() -> Vec<ConfigKeySpec> {
51    vec![
52        ConfigKeySpec::required("hostname", "PostgreSQL server hostname"),
53        ConfigKeySpec::required("database", "Target database name"),
54        ConfigKeySpec::required("username", "Authentication username"),
55        ConfigKeySpec::required("table.name", "Target table name"),
56        ConfigKeySpec::optional("password", "Authentication password", ""),
57        ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
58        ConfigKeySpec::optional("schema.name", "Target schema name", "public"),
59        ConfigKeySpec::optional(
60            "write.mode",
61            "Write mode: 'append' (COPY BINARY) or 'upsert' (ON CONFLICT)",
62            "append",
63        ),
64        ConfigKeySpec::optional(
65            "primary.key",
66            "Comma-separated primary key columns (required for upsert mode)",
67            "",
68        ),
69        ConfigKeySpec::optional("batch.size", "Max records before flush", "4096"),
70        ConfigKeySpec::optional("flush.interval.ms", "Max time before flush (ms)", "1000"),
71        ConfigKeySpec::optional("pool.size", "Connection pool size", "4"),
72        ConfigKeySpec::optional("connect.timeout.ms", "Connection timeout (ms)", "10000"),
73        ConfigKeySpec::optional(
74            "ssl.mode",
75            "SSL mode: disable/prefer/require/verify-ca/verify-full",
76            "prefer",
77        ),
78        ConfigKeySpec::optional(
79            "auto.create.table",
80            "Create target table from Arrow schema if missing",
81            "false",
82        ),
83        ConfigKeySpec::optional(
84            "changelog.mode",
85            "Handle Z-set records (split INSERT/DELETE by _op)",
86            "false",
87        ),
88        ConfigKeySpec::optional(
89            "delivery.guarantee",
90            "at_least_once or exactly_once",
91            "at_least_once",
92        ),
93        ConfigKeySpec::optional(
94            "sink.id",
95            "Unique ID for offset tracking (auto-generated if not set)",
96            "",
97        ),
98    ]
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn test_register_postgres_sink() {
107        let registry = ConnectorRegistry::new();
108        register_postgres_sink(&registry);
109
110        let info = registry.sink_info("postgres-sink");
111        assert!(info.is_some());
112        let info = info.unwrap();
113        assert_eq!(info.name, "postgres-sink");
114        assert!(info.is_sink);
115        assert!(!info.is_source);
116        assert!(!info.config_keys.is_empty());
117    }
118
119    #[test]
120    fn test_config_keys_required() {
121        let keys = postgres_sink_config_keys();
122        let required: Vec<&str> = keys
123            .iter()
124            .filter(|k| k.required)
125            .map(|k| k.key.as_str())
126            .collect();
127        assert!(required.contains(&"hostname"));
128        assert!(required.contains(&"database"));
129        assert!(required.contains(&"username"));
130        assert!(required.contains(&"table.name"));
131    }
132
133    #[test]
134    fn test_config_keys_optional_present() {
135        let keys = postgres_sink_config_keys();
136        let optional: Vec<&str> = keys
137            .iter()
138            .filter(|k| !k.required)
139            .map(|k| k.key.as_str())
140            .collect();
141        assert!(optional.contains(&"port"));
142        assert!(optional.contains(&"write.mode"));
143        assert!(optional.contains(&"primary.key"));
144        assert!(optional.contains(&"batch.size"));
145        assert!(optional.contains(&"delivery.guarantee"));
146        assert!(optional.contains(&"changelog.mode"));
147        assert!(optional.contains(&"ssl.mode"));
148    }
149
150    #[test]
151    fn test_factory_creates_sink() {
152        let registry = ConnectorRegistry::new();
153        register_postgres_sink(&registry);
154
155        let config = crate::config::ConnectorConfig::new("postgres-sink");
156        let sink = registry.create_sink(&config, None);
157        assert!(sink.is_ok());
158    }
159}