laminar_connectors/cdc/postgres/
mod.rs1pub mod changelog;
4pub mod config;
5pub mod decoder;
6pub mod lsn;
7pub mod metrics;
8pub mod postgres_io;
9pub mod schema;
10pub mod source;
11pub mod types;
12
13pub use config::{PostgresCdcConfig, SnapshotMode, SslMode};
15pub use lsn::Lsn;
16pub use source::PostgresCdcSource;
17
18use std::sync::Arc;
19
20use crate::config::{ConfigKeySpec, ConnectorInfo};
21use crate::registry::ConnectorRegistry;
22
23pub fn register_postgres_cdc_source(registry: &ConnectorRegistry) {
25 let info = ConnectorInfo {
26 name: "postgres-cdc".to_string(),
27 display_name: "PostgreSQL CDC Source".to_string(),
28 version: env!("CARGO_PKG_VERSION").to_string(),
29 is_source: true,
30 is_sink: false,
31 config_keys: postgres_cdc_config_keys(),
32 };
33
34 registry.register_source(
35 "postgres-cdc",
36 info.clone(),
37 Arc::new(|registry: Option<&prometheus::Registry>| {
38 Box::new(PostgresCdcSource::new(
39 PostgresCdcConfig::default(),
40 registry,
41 ))
42 }),
43 );
44
45 registry.register_table_source(
48 "postgres-cdc",
49 info,
50 Arc::new(|config| {
51 let connector = Box::new(PostgresCdcSource::new(PostgresCdcConfig::default(), None));
52 Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
53 connector,
54 config.clone(),
55 4096,
56 )))
57 }),
58 );
59
60 let pg_info = ConnectorInfo {
63 name: "postgres".to_string(),
64 display_name: "PostgreSQL Lookup Source".to_string(),
65 version: env!("CARGO_PKG_VERSION").to_string(),
66 is_source: true,
67 is_sink: false,
68 config_keys: vec![],
69 };
70 registry.register_table_source(
71 "postgres",
72 pg_info,
73 Arc::new(|config| {
74 Ok(Box::new(
75 crate::lookup::postgres_reference::PostgresReferenceTableSource::new(
76 config.clone(),
77 ),
78 ))
79 }),
80 );
81}
82
83fn postgres_cdc_config_keys() -> Vec<ConfigKeySpec> {
84 vec![
85 ConfigKeySpec::required("host", "PostgreSQL host address"),
86 ConfigKeySpec::required("database", "Database name"),
87 ConfigKeySpec::required("slot.name", "Logical replication slot name"),
88 ConfigKeySpec::required("publication", "Publication name"),
89 ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
90 ConfigKeySpec::optional("username", "Connection username", "postgres"),
91 ConfigKeySpec::optional("password", "Connection password", ""),
92 ConfigKeySpec::optional("ssl.mode", "SSL mode (disable/prefer/require)", "prefer"),
93 ConfigKeySpec::optional(
94 "snapshot.mode",
95 "Snapshot mode (initial/never/always)",
96 "initial",
97 ),
98 ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
99 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
100 ConfigKeySpec::optional(
101 "keepalive.interval.ms",
102 "Keepalive interval in milliseconds",
103 "10000",
104 ),
105 ConfigKeySpec::optional(
106 "table.include",
107 "Comma-separated list of tables to include",
108 "",
109 ),
110 ConfigKeySpec::optional(
111 "table.exclude",
112 "Comma-separated list of tables to exclude",
113 "",
114 ),
115 ConfigKeySpec::optional("ssl.ca.cert.path", "Path to CA certificate file", ""),
117 ConfigKeySpec::optional(
118 "ssl.client.cert.path",
119 "Path to client certificate file",
120 "",
121 ),
122 ConfigKeySpec::optional("ssl.client.key.path", "Path to client private key file", ""),
123 ConfigKeySpec::optional("ssl.sni.hostname", "SNI hostname for SSL connections", ""),
124 ConfigKeySpec::optional("start.lsn", "Starting LSN for replication", ""),
126 ]
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn test_register_postgres_cdc_source() {
135 let registry = ConnectorRegistry::new();
136 register_postgres_cdc_source(®istry);
137
138 let info = registry.source_info("postgres-cdc");
139 assert!(info.is_some());
140 let info = info.unwrap();
141 assert_eq!(info.name, "postgres-cdc");
142 assert!(info.is_source);
143 assert!(!info.is_sink);
144 assert!(!info.config_keys.is_empty());
145 }
146
147 #[test]
148 fn test_config_keys() {
149 let keys = postgres_cdc_config_keys();
150 let required: Vec<&str> = keys
151 .iter()
152 .filter(|k| k.required)
153 .map(|k| k.key.as_str())
154 .collect();
155 assert!(required.contains(&"host"));
156 assert!(required.contains(&"database"));
157 assert!(required.contains(&"slot.name"));
158 assert!(required.contains(&"publication"));
159 }
160}