laminar_connectors/postgres/
mod.rs1pub mod sink;
43pub mod sink_config;
44pub mod sink_metrics;
45pub mod types;
46
47pub use sink::PostgresSink;
49pub use sink_config::{DeliveryGuarantee, PostgresSinkConfig, SslMode, WriteMode};
50pub use sink_metrics::PostgresSinkMetrics;
51
52use std::sync::Arc;
53
54use arrow_schema::{DataType, Field, Schema};
55
56use crate::config::{ConfigKeySpec, ConnectorInfo};
57use crate::registry::ConnectorRegistry;
58
59pub fn register_postgres_sink(registry: &ConnectorRegistry) {
61 let info = ConnectorInfo {
62 name: "postgres-sink".to_string(),
63 display_name: "PostgreSQL Sink".to_string(),
64 version: env!("CARGO_PKG_VERSION").to_string(),
65 is_source: false,
66 is_sink: true,
67 config_keys: postgres_sink_config_keys(),
68 };
69
70 registry.register_sink(
71 "postgres-sink",
72 info,
73 Arc::new(|| {
74 let schema = Arc::new(Schema::new(vec![
76 Field::new("key", DataType::Utf8, true),
77 Field::new("value", DataType::Utf8, false),
78 ]));
79 Box::new(PostgresSink::new(schema, PostgresSinkConfig::default()))
80 }),
81 );
82}
83
84fn postgres_sink_config_keys() -> Vec<ConfigKeySpec> {
85 vec![
86 ConfigKeySpec::required("hostname", "PostgreSQL server hostname"),
87 ConfigKeySpec::required("database", "Target database name"),
88 ConfigKeySpec::required("username", "Authentication username"),
89 ConfigKeySpec::required("table.name", "Target table name"),
90 ConfigKeySpec::optional("password", "Authentication password", ""),
91 ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
92 ConfigKeySpec::optional("schema.name", "Target schema name", "public"),
93 ConfigKeySpec::optional(
94 "write.mode",
95 "Write mode: 'append' (COPY BINARY) or 'upsert' (ON CONFLICT)",
96 "append",
97 ),
98 ConfigKeySpec::optional(
99 "primary.key",
100 "Comma-separated primary key columns (required for upsert mode)",
101 "",
102 ),
103 ConfigKeySpec::optional("batch.size", "Max records before flush", "4096"),
104 ConfigKeySpec::optional("flush.interval.ms", "Max time before flush (ms)", "1000"),
105 ConfigKeySpec::optional("pool.size", "Connection pool size", "4"),
106 ConfigKeySpec::optional("connect.timeout.ms", "Connection timeout (ms)", "10000"),
107 ConfigKeySpec::optional(
108 "ssl.mode",
109 "SSL mode: disable/prefer/require/verify-ca/verify-full",
110 "prefer",
111 ),
112 ConfigKeySpec::optional(
113 "auto.create.table",
114 "Create target table from Arrow schema if missing",
115 "false",
116 ),
117 ConfigKeySpec::optional(
118 "changelog.mode",
119 "Handle Z-set records (split INSERT/DELETE by _op)",
120 "false",
121 ),
122 ConfigKeySpec::optional(
123 "delivery.guarantee",
124 "at_least_once or exactly_once",
125 "at_least_once",
126 ),
127 ConfigKeySpec::optional(
128 "sink.id",
129 "Unique ID for offset tracking (auto-generated if not set)",
130 "",
131 ),
132 ]
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn test_register_postgres_sink() {
141 let registry = ConnectorRegistry::new();
142 register_postgres_sink(®istry);
143
144 let info = registry.sink_info("postgres-sink");
145 assert!(info.is_some());
146 let info = info.unwrap();
147 assert_eq!(info.name, "postgres-sink");
148 assert!(info.is_sink);
149 assert!(!info.is_source);
150 assert!(!info.config_keys.is_empty());
151 }
152
153 #[test]
154 fn test_config_keys_required() {
155 let keys = postgres_sink_config_keys();
156 let required: Vec<&str> = keys
157 .iter()
158 .filter(|k| k.required)
159 .map(|k| k.key.as_str())
160 .collect();
161 assert!(required.contains(&"hostname"));
162 assert!(required.contains(&"database"));
163 assert!(required.contains(&"username"));
164 assert!(required.contains(&"table.name"));
165 }
166
167 #[test]
168 fn test_config_keys_optional_present() {
169 let keys = postgres_sink_config_keys();
170 let optional: Vec<&str> = keys
171 .iter()
172 .filter(|k| !k.required)
173 .map(|k| k.key.as_str())
174 .collect();
175 assert!(optional.contains(&"port"));
176 assert!(optional.contains(&"write.mode"));
177 assert!(optional.contains(&"primary.key"));
178 assert!(optional.contains(&"batch.size"));
179 assert!(optional.contains(&"delivery.guarantee"));
180 assert!(optional.contains(&"changelog.mode"));
181 assert!(optional.contains(&"ssl.mode"));
182 }
183
184 #[test]
185 fn test_factory_creates_sink() {
186 let registry = ConnectorRegistry::new();
187 register_postgres_sink(®istry);
188
189 let config = crate::config::ConnectorConfig::new("postgres-sink");
190 let sink = registry.create_sink(&config);
191 assert!(sink.is_ok());
192 }
193}