laminar_connectors/cdc/postgres/
mod.rs1pub mod changelog;
36pub mod config;
37pub mod decoder;
38pub mod lsn;
39pub mod metrics;
40pub mod postgres_io;
41pub mod schema;
42pub mod source;
43pub mod types;
44
45pub use config::{PostgresCdcConfig, SnapshotMode, SslMode};
47pub use lsn::Lsn;
48pub use source::PostgresCdcSource;
49
50use std::sync::Arc;
51
52use crate::config::{ConfigKeySpec, ConnectorInfo};
53use crate::registry::ConnectorRegistry;
54
55pub fn register_postgres_cdc(registry: &ConnectorRegistry) {
57 let info = ConnectorInfo {
58 name: "postgres-cdc".to_string(),
59 display_name: "PostgreSQL CDC Source".to_string(),
60 version: env!("CARGO_PKG_VERSION").to_string(),
61 is_source: true,
62 is_sink: false,
63 config_keys: postgres_cdc_config_keys(),
64 };
65
66 registry.register_source(
67 "postgres-cdc",
68 info.clone(),
69 Arc::new(|| Box::new(PostgresCdcSource::new(PostgresCdcConfig::default()))),
70 );
71
72 registry.register_table_source(
75 "postgres-cdc",
76 info,
77 Arc::new(|config| {
78 let connector = Box::new(PostgresCdcSource::new(PostgresCdcConfig::default()));
79 Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
80 connector,
81 config.clone(),
82 4096,
83 )))
84 }),
85 );
86}
87
88fn postgres_cdc_config_keys() -> Vec<ConfigKeySpec> {
89 vec![
90 ConfigKeySpec::required("host", "PostgreSQL host address"),
91 ConfigKeySpec::required("database", "Database name"),
92 ConfigKeySpec::required("slot.name", "Logical replication slot name"),
93 ConfigKeySpec::required("publication", "Publication name"),
94 ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
95 ConfigKeySpec::optional("username", "Connection username", "postgres"),
96 ConfigKeySpec::optional("password", "Connection password", ""),
97 ConfigKeySpec::optional("ssl.mode", "SSL mode (disable/prefer/require)", "prefer"),
98 ConfigKeySpec::optional(
99 "snapshot.mode",
100 "Snapshot mode (initial/never/always)",
101 "initial",
102 ),
103 ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
104 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
105 ConfigKeySpec::optional(
106 "keepalive.interval.ms",
107 "Keepalive interval in milliseconds",
108 "10000",
109 ),
110 ConfigKeySpec::optional(
111 "table.include",
112 "Comma-separated list of tables to include",
113 "",
114 ),
115 ConfigKeySpec::optional(
116 "table.exclude",
117 "Comma-separated list of tables to exclude",
118 "",
119 ),
120 ConfigKeySpec::optional("ssl.ca.cert.path", "Path to CA certificate file", ""),
122 ConfigKeySpec::optional(
123 "ssl.client.cert.path",
124 "Path to client certificate file",
125 "",
126 ),
127 ConfigKeySpec::optional("ssl.client.key.path", "Path to client private key file", ""),
128 ConfigKeySpec::optional("ssl.sni.hostname", "SNI hostname for SSL connections", ""),
129 ConfigKeySpec::optional("start.lsn", "Starting LSN for replication", ""),
131 ]
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn test_register_postgres_cdc() {
140 let registry = ConnectorRegistry::new();
141 register_postgres_cdc(®istry);
142
143 let info = registry.source_info("postgres-cdc");
144 assert!(info.is_some());
145 let info = info.unwrap();
146 assert_eq!(info.name, "postgres-cdc");
147 assert!(info.is_source);
148 assert!(!info.is_sink);
149 assert!(!info.config_keys.is_empty());
150 }
151
152 #[test]
153 fn test_config_keys() {
154 let keys = postgres_cdc_config_keys();
155 let required: Vec<&str> = keys
156 .iter()
157 .filter(|k| k.required)
158 .map(|k| k.key.as_str())
159 .collect();
160 assert!(required.contains(&"host"));
161 assert!(required.contains(&"database"));
162 assert!(required.contains(&"slot.name"));
163 assert!(required.contains(&"publication"));
164 }
165}