Skip to main content

laminar_connectors/cdc/postgres/
mod.rs

1//! `PostgreSQL` CDC source connector.
2
3pub mod changelog;
4pub mod config;
5pub mod decoder;
6pub mod lsn;
7pub mod metrics;
8pub mod postgres_io;
9pub mod schema;
10pub mod source;
11pub mod types;
12
13// Re-export primary types at module level.
14pub use config::{PostgresCdcConfig, SnapshotMode, SslMode};
15pub use lsn::Lsn;
16pub use source::PostgresCdcSource;
17
18use std::sync::Arc;
19
20use crate::config::{ConfigKeySpec, ConnectorInfo};
21use crate::registry::ConnectorRegistry;
22
23/// Registers the `PostgreSQL` CDC source connector with the given registry.
24pub fn register_postgres_cdc_source(registry: &ConnectorRegistry) {
25    let info = ConnectorInfo {
26        name: "postgres-cdc".to_string(),
27        display_name: "PostgreSQL CDC Source".to_string(),
28        version: env!("CARGO_PKG_VERSION").to_string(),
29        is_source: true,
30        is_sink: false,
31        config_keys: postgres_cdc_config_keys(),
32    };
33
34    registry.register_source(
35        "postgres-cdc",
36        info.clone(),
37        Arc::new(|registry: Option<&prometheus::Registry>| {
38            Box::new(PostgresCdcSource::new(
39                PostgresCdcConfig::default(),
40                registry,
41            ))
42        }),
43    );
44
45    // Also register as a table source so CREATE LOOKUP TABLE ... WITH
46    // ('connector' = 'postgres-cdc') can use CDC for reference table refresh.
47    registry.register_table_source(
48        "postgres-cdc",
49        info,
50        Arc::new(|config| {
51            let connector = Box::new(PostgresCdcSource::new(PostgresCdcConfig::default(), None));
52            Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
53                connector,
54                config.clone(),
55                4096,
56            )))
57        }),
58    );
59
60    // Register standalone "postgres" table source for poll-based snapshot
61    // lookups (no replication slot / CDC required).
62    let pg_info = ConnectorInfo {
63        name: "postgres".to_string(),
64        display_name: "PostgreSQL Lookup Source".to_string(),
65        version: env!("CARGO_PKG_VERSION").to_string(),
66        is_source: true,
67        is_sink: false,
68        config_keys: vec![],
69    };
70    registry.register_table_source(
71        "postgres",
72        pg_info,
73        Arc::new(|config| {
74            Ok(Box::new(
75                crate::lookup::postgres_reference::PostgresReferenceTableSource::new(
76                    config.clone(),
77                ),
78            ))
79        }),
80    );
81}
82
83fn postgres_cdc_config_keys() -> Vec<ConfigKeySpec> {
84    vec![
85        ConfigKeySpec::required("host", "PostgreSQL host address"),
86        ConfigKeySpec::required("database", "Database name"),
87        ConfigKeySpec::required("slot.name", "Logical replication slot name"),
88        ConfigKeySpec::required("publication", "Publication name"),
89        ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
90        ConfigKeySpec::optional("username", "Connection username", "postgres"),
91        ConfigKeySpec::optional("password", "Connection password", ""),
92        ConfigKeySpec::optional("ssl.mode", "SSL mode (disable/prefer/require)", "prefer"),
93        ConfigKeySpec::optional(
94            "snapshot.mode",
95            "Snapshot mode (initial/never/always)",
96            "initial",
97        ),
98        ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
99        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
100        ConfigKeySpec::optional(
101            "keepalive.interval.ms",
102            "Keepalive interval in milliseconds",
103            "10000",
104        ),
105        ConfigKeySpec::optional(
106            "table.include",
107            "Comma-separated list of tables to include",
108            "",
109        ),
110        ConfigKeySpec::optional(
111            "table.exclude",
112            "Comma-separated list of tables to exclude",
113            "",
114        ),
115        // SSL certificate paths
116        ConfigKeySpec::optional("ssl.ca.cert.path", "Path to CA certificate file", ""),
117        ConfigKeySpec::optional(
118            "ssl.client.cert.path",
119            "Path to client certificate file",
120            "",
121        ),
122        ConfigKeySpec::optional("ssl.client.key.path", "Path to client private key file", ""),
123        ConfigKeySpec::optional("ssl.sni.hostname", "SNI hostname for SSL connections", ""),
124        // Replication start position
125        ConfigKeySpec::optional("start.lsn", "Starting LSN for replication", ""),
126    ]
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn test_register_postgres_cdc_source() {
135        let registry = ConnectorRegistry::new();
136        register_postgres_cdc_source(&registry);
137
138        let info = registry.source_info("postgres-cdc");
139        assert!(info.is_some());
140        let info = info.unwrap();
141        assert_eq!(info.name, "postgres-cdc");
142        assert!(info.is_source);
143        assert!(!info.is_sink);
144        assert!(!info.config_keys.is_empty());
145    }
146
147    #[test]
148    fn test_config_keys() {
149        let keys = postgres_cdc_config_keys();
150        let required: Vec<&str> = keys
151            .iter()
152            .filter(|k| k.required)
153            .map(|k| k.key.as_str())
154            .collect();
155        assert!(required.contains(&"host"));
156        assert!(required.contains(&"database"));
157        assert!(required.contains(&"slot.name"));
158        assert!(required.contains(&"publication"));
159    }
160}