laminar_connectors/postgres/
mod.rs1pub mod sink;
4pub mod sink_config;
5pub mod sink_metrics;
6pub mod types;
7
8pub use crate::connector::DeliveryGuarantee;
10pub use sink::PostgresSink;
11pub use sink_config::{PostgresSinkConfig, SslMode, WriteMode};
12pub use sink_metrics::PostgresSinkMetrics;
13
14use std::sync::Arc;
15
16use arrow_schema::{DataType, Field, Schema};
17
18use crate::config::{ConfigKeySpec, ConnectorInfo};
19use crate::registry::ConnectorRegistry;
20
21pub fn register_postgres_sink(registry: &ConnectorRegistry) {
23 let info = ConnectorInfo {
24 name: "postgres-sink".to_string(),
25 display_name: "PostgreSQL Sink".to_string(),
26 version: env!("CARGO_PKG_VERSION").to_string(),
27 is_source: false,
28 is_sink: true,
29 config_keys: postgres_sink_config_keys(),
30 };
31
32 registry.register_sink(
33 "postgres-sink",
34 info,
35 Arc::new(|registry: Option<&prometheus::Registry>| {
36 let schema = Arc::new(Schema::new(vec![
38 Field::new("key", DataType::Utf8, true),
39 Field::new("value", DataType::Utf8, false),
40 ]));
41 Box::new(PostgresSink::new(
42 schema,
43 PostgresSinkConfig::default(),
44 registry,
45 ))
46 }),
47 );
48}
49
50fn postgres_sink_config_keys() -> Vec<ConfigKeySpec> {
51 vec![
52 ConfigKeySpec::required("hostname", "PostgreSQL server hostname"),
53 ConfigKeySpec::required("database", "Target database name"),
54 ConfigKeySpec::required("username", "Authentication username"),
55 ConfigKeySpec::required("table.name", "Target table name"),
56 ConfigKeySpec::optional("password", "Authentication password", ""),
57 ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
58 ConfigKeySpec::optional("schema.name", "Target schema name", "public"),
59 ConfigKeySpec::optional(
60 "write.mode",
61 "Write mode: 'append' (COPY BINARY) or 'upsert' (ON CONFLICT)",
62 "append",
63 ),
64 ConfigKeySpec::optional(
65 "primary.key",
66 "Comma-separated primary key columns (required for upsert mode)",
67 "",
68 ),
69 ConfigKeySpec::optional("batch.size", "Max records before flush", "4096"),
70 ConfigKeySpec::optional("flush.interval.ms", "Max time before flush (ms)", "1000"),
71 ConfigKeySpec::optional("pool.size", "Connection pool size", "4"),
72 ConfigKeySpec::optional("connect.timeout.ms", "Connection timeout (ms)", "10000"),
73 ConfigKeySpec::optional(
74 "ssl.mode",
75 "SSL mode: disable/prefer/require/verify-ca/verify-full",
76 "prefer",
77 ),
78 ConfigKeySpec::optional(
79 "auto.create.table",
80 "Create target table from Arrow schema if missing",
81 "false",
82 ),
83 ConfigKeySpec::optional(
84 "changelog.mode",
85 "Handle Z-set records (split INSERT/DELETE by _op)",
86 "false",
87 ),
88 ConfigKeySpec::optional(
89 "delivery.guarantee",
90 "at_least_once or exactly_once",
91 "at_least_once",
92 ),
93 ConfigKeySpec::optional(
94 "sink.id",
95 "Unique ID for offset tracking (auto-generated if not set)",
96 "",
97 ),
98 ]
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn test_register_postgres_sink() {
107 let registry = ConnectorRegistry::new();
108 register_postgres_sink(®istry);
109
110 let info = registry.sink_info("postgres-sink");
111 assert!(info.is_some());
112 let info = info.unwrap();
113 assert_eq!(info.name, "postgres-sink");
114 assert!(info.is_sink);
115 assert!(!info.is_source);
116 assert!(!info.config_keys.is_empty());
117 }
118
119 #[test]
120 fn test_config_keys_required() {
121 let keys = postgres_sink_config_keys();
122 let required: Vec<&str> = keys
123 .iter()
124 .filter(|k| k.required)
125 .map(|k| k.key.as_str())
126 .collect();
127 assert!(required.contains(&"hostname"));
128 assert!(required.contains(&"database"));
129 assert!(required.contains(&"username"));
130 assert!(required.contains(&"table.name"));
131 }
132
133 #[test]
134 fn test_config_keys_optional_present() {
135 let keys = postgres_sink_config_keys();
136 let optional: Vec<&str> = keys
137 .iter()
138 .filter(|k| !k.required)
139 .map(|k| k.key.as_str())
140 .collect();
141 assert!(optional.contains(&"port"));
142 assert!(optional.contains(&"write.mode"));
143 assert!(optional.contains(&"primary.key"));
144 assert!(optional.contains(&"batch.size"));
145 assert!(optional.contains(&"delivery.guarantee"));
146 assert!(optional.contains(&"changelog.mode"));
147 assert!(optional.contains(&"ssl.mode"));
148 }
149
150 #[test]
151 fn test_factory_creates_sink() {
152 let registry = ConnectorRegistry::new();
153 register_postgres_sink(®istry);
154
155 let config = crate::config::ConnectorConfig::new("postgres-sink");
156 let sink = registry.create_sink(&config, None);
157 assert!(sink.is_ok());
158 }
159}