laminar_connectors/cdc/postgres/
mod.rs1pub mod changelog;
4pub mod config;
5pub mod decoder;
6pub mod lsn;
7pub mod metrics;
8pub mod postgres_io;
9pub mod schema;
10pub mod source;
11pub mod types;
12
13pub use config::{PostgresCdcConfig, SnapshotMode, SslMode};
15pub use lsn::Lsn;
16pub use source::PostgresCdcSource;
17
18use std::sync::Arc;
19
20use crate::config::{ConfigKeySpec, ConnectorInfo};
21use crate::registry::ConnectorRegistry;
22
23pub fn register_postgres_cdc_source(registry: &ConnectorRegistry) {
25 let info = ConnectorInfo {
26 name: "postgres-cdc".to_string(),
27 display_name: "PostgreSQL CDC Source".to_string(),
28 version: env!("CARGO_PKG_VERSION").to_string(),
29 is_source: true,
30 is_sink: false,
31 config_keys: postgres_cdc_config_keys(),
32 };
33
34 registry.register_source(
35 "postgres-cdc",
36 info.clone(),
37 Arc::new(|registry: Option<&prometheus::Registry>| {
38 Box::new(PostgresCdcSource::new(
39 PostgresCdcConfig::default(),
40 registry,
41 ))
42 }),
43 );
44
45 registry.register_table_source(
48 "postgres-cdc",
49 info,
50 Arc::new(|config| {
51 let connector = Box::new(PostgresCdcSource::new(PostgresCdcConfig::default(), None));
52 Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
53 connector,
54 config.clone(),
55 4096,
56 )))
57 }),
58 );
59
60 let pg_info = ConnectorInfo {
63 name: "postgres".to_string(),
64 display_name: "PostgreSQL Lookup Source".to_string(),
65 version: env!("CARGO_PKG_VERSION").to_string(),
66 is_source: true,
67 is_sink: false,
68 config_keys: vec![],
69 };
70 registry.register_table_source(
71 "postgres",
72 pg_info,
73 Arc::new(|config| {
74 Ok(Box::new(
75 crate::lookup::postgres_reference::PostgresReferenceTableSource::new(
76 config.clone(),
77 ),
78 ))
79 }),
80 );
81
82 registry.register_lookup_source("postgres", Arc::new(PostgresLookupFactory));
84}
85
86struct PostgresLookupFactory;
87
88#[async_trait::async_trait]
89impl crate::registry::LookupSourceFactory for PostgresLookupFactory {
90 async fn build(
91 &self,
92 config: crate::config::ConnectorConfig,
93 _declared_schema: Option<arrow_schema::SchemaRef>,
94 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
95 {
96 use crate::lookup::postgres_lookup::{PostgresLookupSource, PostgresLookupSourceConfig};
97
98 let pk_columns: Vec<String> = config
99 .get("_primary_key_columns")
100 .unwrap_or("")
101 .split(',')
102 .map(|s| s.trim().to_string())
103 .filter(|s| !s.is_empty())
104 .collect();
105 if pk_columns.is_empty() {
106 return Err(crate::error::ConnectorError::ConfigurationError(
107 "postgres lookup source requires primary key columns".into(),
108 ));
109 }
110
111 let table = config
112 .get("table")
113 .ok_or_else(|| {
114 crate::error::ConnectorError::ConfigurationError(
115 "postgres lookup source requires a 'table' property".into(),
116 )
117 })?
118 .to_string();
119
120 let pool_size = if let Some(s) = config.get("pool_size") {
121 s.parse::<usize>().map_err(|e| {
122 crate::error::ConnectorError::ConfigurationError(format!(
123 "invalid 'pool_size' value '{s}': {e}"
124 ))
125 })?
126 } else {
127 4
128 };
129
130 let lookup_config = PostgresLookupSourceConfig {
131 properties: config.properties().clone(),
132 table,
133 primary_key_columns: pk_columns,
134 pool_size,
135 };
136
137 let source = PostgresLookupSource::open(lookup_config).await?;
138 Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
139 }
140}
141
142fn postgres_cdc_config_keys() -> Vec<ConfigKeySpec> {
143 vec![
144 ConfigKeySpec::required("host", "PostgreSQL host address"),
145 ConfigKeySpec::required("database", "Database name"),
146 ConfigKeySpec::required("slot.name", "Logical replication slot name"),
147 ConfigKeySpec::required("publication", "Publication name"),
148 ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
149 ConfigKeySpec::optional("username", "Connection username", "postgres"),
150 ConfigKeySpec::optional("password", "Connection password", ""),
151 ConfigKeySpec::optional("ssl.mode", "SSL mode (disable/prefer/require)", "prefer"),
152 ConfigKeySpec::optional(
153 "snapshot.mode",
154 "Snapshot mode (initial/never/always)",
155 "initial",
156 ),
157 ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
158 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
159 ConfigKeySpec::optional(
160 "keepalive.interval.ms",
161 "Keepalive interval in milliseconds",
162 "10000",
163 ),
164 ConfigKeySpec::optional(
165 "table.include",
166 "Comma-separated list of tables to include",
167 "",
168 ),
169 ConfigKeySpec::optional(
170 "table.exclude",
171 "Comma-separated list of tables to exclude",
172 "",
173 ),
174 ConfigKeySpec::optional("ssl.ca.cert.path", "Path to CA certificate file", ""),
176 ConfigKeySpec::optional(
177 "ssl.client.cert.path",
178 "Path to client certificate file",
179 "",
180 ),
181 ConfigKeySpec::optional("ssl.client.key.path", "Path to client private key file", ""),
182 ConfigKeySpec::optional("ssl.sni.hostname", "SNI hostname for SSL connections", ""),
183 ConfigKeySpec::optional("start.lsn", "Starting LSN for replication", ""),
185 ]
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn test_register_postgres_cdc_source() {
194 let registry = ConnectorRegistry::new();
195 register_postgres_cdc_source(®istry);
196
197 let info = registry.source_info("postgres-cdc");
198 assert!(info.is_some());
199 let info = info.unwrap();
200 assert_eq!(info.name, "postgres-cdc");
201 assert!(info.is_source);
202 assert!(!info.is_sink);
203 assert!(!info.config_keys.is_empty());
204 }
205
206 #[test]
207 fn test_config_keys() {
208 let keys = postgres_cdc_config_keys();
209 let required: Vec<&str> = keys
210 .iter()
211 .filter(|k| k.required)
212 .map(|k| k.key.as_str())
213 .collect();
214 assert!(required.contains(&"host"));
215 assert!(required.contains(&"database"));
216 assert!(required.contains(&"slot.name"));
217 assert!(required.contains(&"publication"));
218 }
219}