Skip to main content

laminar_connectors/cdc/postgres/
mod.rs

1//! `PostgreSQL` CDC source connector.
2
3pub mod changelog;
4pub mod config;
5pub mod decoder;
6pub mod lsn;
7pub mod metrics;
8pub mod postgres_io;
9pub mod schema;
10pub mod source;
11pub mod types;
12
13// Re-export primary types at module level.
14pub use config::{PostgresCdcConfig, SnapshotMode, SslMode};
15pub use lsn::Lsn;
16pub use source::PostgresCdcSource;
17
18use std::sync::Arc;
19
20use crate::config::{ConfigKeySpec, ConnectorInfo};
21use crate::registry::ConnectorRegistry;
22
23/// Registers the `PostgreSQL` CDC source connector with the given registry.
24pub fn register_postgres_cdc_source(registry: &ConnectorRegistry) {
25    let info = ConnectorInfo {
26        name: "postgres-cdc".to_string(),
27        display_name: "PostgreSQL CDC Source".to_string(),
28        version: env!("CARGO_PKG_VERSION").to_string(),
29        is_source: true,
30        is_sink: false,
31        config_keys: postgres_cdc_config_keys(),
32    };
33
34    registry.register_source(
35        "postgres-cdc",
36        info.clone(),
37        Arc::new(|registry: Option<&prometheus::Registry>| {
38            Box::new(PostgresCdcSource::new(
39                PostgresCdcConfig::default(),
40                registry,
41            ))
42        }),
43    );
44
45    // Also register as a table source so CREATE LOOKUP TABLE ... WITH
46    // ('connector' = 'postgres-cdc') can use CDC for reference table refresh.
47    registry.register_table_source(
48        "postgres-cdc",
49        info,
50        Arc::new(|config| {
51            let connector = Box::new(PostgresCdcSource::new(PostgresCdcConfig::default(), None));
52            Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
53                connector,
54                config.clone(),
55                4096,
56            )))
57        }),
58    );
59
60    // Register standalone "postgres" table source for poll-based snapshot
61    // lookups (no replication slot / CDC required).
62    let pg_info = ConnectorInfo {
63        name: "postgres".to_string(),
64        display_name: "PostgreSQL Lookup Source".to_string(),
65        version: env!("CARGO_PKG_VERSION").to_string(),
66        is_source: true,
67        is_sink: false,
68        config_keys: vec![],
69    };
70    registry.register_table_source(
71        "postgres",
72        pg_info,
73        Arc::new(|config| {
74            Ok(Box::new(
75                crate::lookup::postgres_reference::PostgresReferenceTableSource::new(
76                    config.clone(),
77                ),
78            ))
79        }),
80    );
81
82    // On-demand (partial cache mode) lookup source: pooled + WHERE pk = ANY($1).
83    registry.register_lookup_source("postgres", Arc::new(PostgresLookupFactory));
84}
85
86struct PostgresLookupFactory;
87
88#[async_trait::async_trait]
89impl crate::registry::LookupSourceFactory for PostgresLookupFactory {
90    async fn build(
91        &self,
92        config: crate::config::ConnectorConfig,
93        _declared_schema: Option<arrow_schema::SchemaRef>,
94    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
95    {
96        use crate::lookup::postgres_lookup::{PostgresLookupSource, PostgresLookupSourceConfig};
97
98        let pk_columns: Vec<String> = config
99            .get("_primary_key_columns")
100            .unwrap_or("")
101            .split(',')
102            .map(|s| s.trim().to_string())
103            .filter(|s| !s.is_empty())
104            .collect();
105        if pk_columns.is_empty() {
106            return Err(crate::error::ConnectorError::ConfigurationError(
107                "postgres lookup source requires primary key columns".into(),
108            ));
109        }
110
111        let table = config
112            .get("table")
113            .ok_or_else(|| {
114                crate::error::ConnectorError::ConfigurationError(
115                    "postgres lookup source requires a 'table' property".into(),
116                )
117            })?
118            .to_string();
119
120        let pool_size = if let Some(s) = config.get("pool_size") {
121            s.parse::<usize>().map_err(|e| {
122                crate::error::ConnectorError::ConfigurationError(format!(
123                    "invalid 'pool_size' value '{s}': {e}"
124                ))
125            })?
126        } else {
127            4
128        };
129
130        let lookup_config = PostgresLookupSourceConfig {
131            properties: config.properties().clone(),
132            table,
133            primary_key_columns: pk_columns,
134            pool_size,
135        };
136
137        let source = PostgresLookupSource::open(lookup_config).await?;
138        Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
139    }
140}
141
142fn postgres_cdc_config_keys() -> Vec<ConfigKeySpec> {
143    vec![
144        ConfigKeySpec::required("host", "PostgreSQL host address"),
145        ConfigKeySpec::required("database", "Database name"),
146        ConfigKeySpec::required("slot.name", "Logical replication slot name"),
147        ConfigKeySpec::required("publication", "Publication name"),
148        ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
149        ConfigKeySpec::optional("username", "Connection username", "postgres"),
150        ConfigKeySpec::optional("password", "Connection password", ""),
151        ConfigKeySpec::optional("ssl.mode", "SSL mode (disable/prefer/require)", "prefer"),
152        ConfigKeySpec::optional(
153            "snapshot.mode",
154            "Snapshot mode (initial/never/always)",
155            "initial",
156        ),
157        ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
158        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
159        ConfigKeySpec::optional(
160            "keepalive.interval.ms",
161            "Keepalive interval in milliseconds",
162            "10000",
163        ),
164        ConfigKeySpec::optional(
165            "table.include",
166            "Comma-separated list of tables to include",
167            "",
168        ),
169        ConfigKeySpec::optional(
170            "table.exclude",
171            "Comma-separated list of tables to exclude",
172            "",
173        ),
174        // SSL certificate paths
175        ConfigKeySpec::optional("ssl.ca.cert.path", "Path to CA certificate file", ""),
176        ConfigKeySpec::optional(
177            "ssl.client.cert.path",
178            "Path to client certificate file",
179            "",
180        ),
181        ConfigKeySpec::optional("ssl.client.key.path", "Path to client private key file", ""),
182        ConfigKeySpec::optional("ssl.sni.hostname", "SNI hostname for SSL connections", ""),
183        // Replication start position
184        ConfigKeySpec::optional("start.lsn", "Starting LSN for replication", ""),
185    ]
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn test_register_postgres_cdc_source() {
194        let registry = ConnectorRegistry::new();
195        register_postgres_cdc_source(&registry);
196
197        let info = registry.source_info("postgres-cdc");
198        assert!(info.is_some());
199        let info = info.unwrap();
200        assert_eq!(info.name, "postgres-cdc");
201        assert!(info.is_source);
202        assert!(!info.is_sink);
203        assert!(!info.config_keys.is_empty());
204    }
205
206    #[test]
207    fn test_config_keys() {
208        let keys = postgres_cdc_config_keys();
209        let required: Vec<&str> = keys
210            .iter()
211            .filter(|k| k.required)
212            .map(|k| k.key.as_str())
213            .collect();
214        assert!(required.contains(&"host"));
215        assert!(required.contains(&"database"));
216        assert!(required.contains(&"slot.name"));
217        assert!(required.contains(&"publication"));
218    }
219}