1use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11use super::lsn::Lsn;
12
13#[derive(Debug, Clone)]
15pub struct PostgresCdcConfig {
16 pub host: String,
19
20 pub port: u16,
22
23 pub database: String,
25
26 pub username: String,
28
29 pub password: Option<String>,
31
32 pub ssl_mode: SslMode,
34
35 pub ca_cert_path: Option<String>,
37
38 pub client_cert_path: Option<String>,
40
41 pub client_key_path: Option<String>,
43
44 pub sni_hostname: Option<String>,
46
47 pub slot_name: String,
50
51 pub publication: String,
53
54 pub start_lsn: Option<Lsn>,
56
57 pub output_plugin: String,
59
60 pub snapshot_mode: SnapshotMode,
63
64 pub poll_timeout: Duration,
67
68 pub max_poll_records: usize,
70
71 pub keepalive_interval: Duration,
73
74 pub wal_sender_timeout: Duration,
76
77 pub table_include: Vec<String>,
80
81 pub table_exclude: Vec<String>,
83
84 pub max_buffered_events: usize,
86
87 pub backpressure_high_watermark: f64,
91}
92
93impl Default for PostgresCdcConfig {
94 fn default() -> Self {
95 Self {
96 host: "localhost".to_string(),
97 port: 5432,
98 database: "postgres".to_string(),
99 username: "postgres".to_string(),
100 password: None,
101 ssl_mode: SslMode::Prefer,
102 ca_cert_path: None,
103 client_cert_path: None,
104 client_key_path: None,
105 sni_hostname: None,
106 slot_name: "laminar_slot".to_string(),
107 publication: "laminar_pub".to_string(),
108 start_lsn: None,
109 output_plugin: "pgoutput".to_string(),
110 snapshot_mode: SnapshotMode::Initial,
111 poll_timeout: Duration::from_millis(100),
112 max_poll_records: 1000,
113 keepalive_interval: Duration::from_secs(10),
114 wal_sender_timeout: Duration::from_secs(60),
115 table_include: Vec::new(),
116 table_exclude: Vec::new(),
117 max_buffered_events: 100_000,
118 backpressure_high_watermark: 0.8,
119 }
120 }
121}
122
123impl PostgresCdcConfig {
124 #[must_use]
126 #[allow(
127 clippy::cast_precision_loss,
128 clippy::cast_possible_truncation,
129 clippy::cast_sign_loss
130 )]
131 pub fn backpressure_high_watermark(&self) -> usize {
132 (self.max_buffered_events as f64 * self.backpressure_high_watermark) as usize
133 }
134
135 #[must_use]
137 pub fn new(host: &str, database: &str, slot_name: &str, publication: &str) -> Self {
138 Self {
139 host: host.to_string(),
140 database: database.to_string(),
141 slot_name: slot_name.to_string(),
142 publication: publication.to_string(),
143 ..Self::default()
144 }
145 }
146
147 #[must_use]
149 pub fn connection_string(&self) -> String {
150 use std::fmt::Write;
151 let mut s = format!(
152 "host={} port={} dbname={} user={}",
153 self.host, self.port, self.database, self.username
154 );
155 if let Some(ref pw) = self.password {
156 let escaped = pw.replace('\\', "\\\\").replace('\'', "\\'");
158 let _ = write!(s, " password='{escaped}'");
159 }
160 let _ = write!(s, " sslmode={}", self.ssl_mode);
161 s
162 }
163
164 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
171 let mut cfg = Self {
172 host: config.require("host")?.to_string(),
173 database: config.require("database")?.to_string(),
174 slot_name: config.require("slot.name")?.to_string(),
175 publication: config.require("publication")?.to_string(),
176 ..Self::default()
177 };
178
179 if let Some(port) = config.get("port") {
180 cfg.port = crate::config::parse_port(port)?;
181 }
182 if let Some(user) = config.get("username") {
183 cfg.username = user.to_string();
184 }
185 cfg.password = config.get("password").map(String::from);
186
187 if let Some(ssl) = config.get_parsed::<SslMode>("ssl.mode")? {
188 cfg.ssl_mode = ssl;
189 }
190 cfg.ca_cert_path = config.get("ssl.ca.cert.path").map(String::from);
191 cfg.client_cert_path = config.get("ssl.client.cert.path").map(String::from);
192 cfg.client_key_path = config.get("ssl.client.key.path").map(String::from);
193 cfg.sni_hostname = config.get("ssl.sni.hostname").map(String::from);
194
195 if let Some(lsn) = config.get_parsed::<Lsn>("start.lsn")? {
196 cfg.start_lsn = Some(lsn);
197 }
198 if let Some(mode) = config.get_parsed::<SnapshotMode>("snapshot.mode")? {
199 cfg.snapshot_mode = mode;
200 }
201 if let Some(timeout) = config.get_parsed::<u64>("poll.timeout.ms")? {
202 cfg.poll_timeout = Duration::from_millis(timeout);
203 }
204 if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
205 cfg.max_poll_records = max;
206 }
207 if let Some(interval) = config.get_parsed::<u64>("keepalive.interval.ms")? {
208 cfg.keepalive_interval = Duration::from_millis(interval);
209 }
210 if let Some(tables) = config.get("table.include") {
211 cfg.table_include = tables.split(',').map(|s| s.trim().to_string()).collect();
212 }
213 if let Some(tables) = config.get("table.exclude") {
214 cfg.table_exclude = tables.split(',').map(|s| s.trim().to_string()).collect();
215 }
216 if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
217 cfg.max_buffered_events = max;
218 }
219 if let Some(hw) = config.get_parsed::<f64>("backpressure.high.watermark")? {
220 cfg.backpressure_high_watermark = hw;
221 }
222
223 cfg.validate()?;
224 Ok(cfg)
225 }
226
227 pub fn validate(&self) -> Result<(), ConnectorError> {
233 crate::config::require_non_empty(&self.host, "host")?;
234 crate::config::require_non_empty(&self.database, "database")?;
235 crate::config::require_non_empty(&self.slot_name, "slot.name")?;
236 crate::config::require_non_empty(&self.publication, "publication")?;
237 if self.max_poll_records == 0 {
238 return Err(ConnectorError::ConfigurationError(
239 "max.poll.records must be > 0".to_string(),
240 ));
241 }
242 if self.max_buffered_events == 0 {
243 return Err(ConnectorError::ConfigurationError(
244 "max.buffered.events must be > 0".to_string(),
245 ));
246 }
247 if matches!(self.ssl_mode, SslMode::VerifyCa | SslMode::VerifyFull)
249 && self.ca_cert_path.is_none()
250 {
251 return Err(ConnectorError::ConfigurationError(format!(
252 "ssl.mode={} requires ssl.ca.cert.path",
253 self.ssl_mode
254 )));
255 }
256 if self.client_cert_path.is_some() != self.client_key_path.is_some() {
258 return Err(ConnectorError::ConfigurationError(
259 "ssl.client.cert.path and ssl.client.key.path must both be set for mTLS"
260 .to_string(),
261 ));
262 }
263 Ok(())
264 }
265
266 #[must_use]
268 pub fn should_include_table(&self, table: &str) -> bool {
269 if self.table_exclude.iter().any(|t| t == table) {
270 return false;
271 }
272 if self.table_include.is_empty() {
273 return true;
274 }
275 self.table_include.iter().any(|t| t == table)
276 }
277}
278
279pub use crate::connector::PostgresSslMode as SslMode;
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
283pub enum SnapshotMode {
284 #[default]
286 Initial,
287 Never,
289 Always,
291}
292
293str_enum!(SnapshotMode, lowercase_nodash, String, "unknown snapshot mode",
294 Initial => "initial";
295 Never => "never";
296 Always => "always"
297);
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_default_config() {
305 let cfg = PostgresCdcConfig::default();
306 assert_eq!(cfg.host, "localhost");
307 assert_eq!(cfg.port, 5432);
308 assert_eq!(cfg.database, "postgres");
309 assert_eq!(cfg.slot_name, "laminar_slot");
310 assert_eq!(cfg.publication, "laminar_pub");
311 assert_eq!(cfg.output_plugin, "pgoutput");
312 assert_eq!(cfg.ssl_mode, SslMode::Prefer);
313 assert_eq!(cfg.snapshot_mode, SnapshotMode::Initial);
314 assert_eq!(cfg.max_poll_records, 1000);
315 }
316
317 #[test]
318 fn test_new_config() {
319 let cfg = PostgresCdcConfig::new("db.example.com", "mydb", "my_slot", "my_pub");
320 assert_eq!(cfg.host, "db.example.com");
321 assert_eq!(cfg.database, "mydb");
322 assert_eq!(cfg.slot_name, "my_slot");
323 assert_eq!(cfg.publication, "my_pub");
324 }
325
326 #[test]
327 fn test_connection_string() {
328 let mut cfg = PostgresCdcConfig::new("db.example.com", "mydb", "s", "p");
329 cfg.password = Some("secret".to_string());
330 let conn = cfg.connection_string();
331 assert!(conn.contains("host=db.example.com"));
332 assert!(conn.contains("dbname=mydb"));
333 assert!(conn.contains("password='secret'"));
334 assert!(conn.contains("sslmode=prefer"));
335 }
336
337 #[test]
338 fn test_connection_string_password_with_spaces() {
339 let mut cfg = PostgresCdcConfig::new("h", "d", "s", "p");
340 cfg.password = Some("my secret pass".to_string());
341 let conn = cfg.connection_string();
342 assert!(conn.contains("password='my secret pass'"));
343 }
344
345 #[test]
346 fn test_connection_string_password_with_quotes() {
347 let mut cfg = PostgresCdcConfig::new("h", "d", "s", "p");
348 cfg.password = Some("it's a p@ss'word".to_string());
349 let conn = cfg.connection_string();
350 assert!(conn.contains(r"password='it\'s a p@ss\'word'"));
351 }
352
353 #[test]
354 fn test_connection_string_password_with_backslash() {
355 let mut cfg = PostgresCdcConfig::new("h", "d", "s", "p");
356 cfg.password = Some(r"pass\word".to_string());
357 let conn = cfg.connection_string();
358 assert!(conn.contains(r"password='pass\\word'"));
359 }
360
361 #[test]
362 fn test_from_connector_config() {
363 let mut config = ConnectorConfig::new("postgres-cdc");
364 config.set("host", "pg.local");
365 config.set("database", "testdb");
366 config.set("slot.name", "test_slot");
367 config.set("publication", "test_pub");
368 config.set("port", "5433");
369 config.set("ssl.mode", "require");
370 config.set("snapshot.mode", "never");
371 config.set("max.poll.records", "500");
372
373 let cfg = PostgresCdcConfig::from_config(&config).unwrap();
374 assert_eq!(cfg.host, "pg.local");
375 assert_eq!(cfg.port, 5433);
376 assert_eq!(cfg.database, "testdb");
377 assert_eq!(cfg.ssl_mode, SslMode::Require);
378 assert_eq!(cfg.snapshot_mode, SnapshotMode::Never);
379 assert_eq!(cfg.max_poll_records, 500);
380 }
381
382 #[test]
383 fn test_from_config_missing_required() {
384 let config = ConnectorConfig::new("postgres-cdc");
385 assert!(PostgresCdcConfig::from_config(&config).is_err());
386 }
387
388 #[test]
389 fn test_from_config_invalid_port() {
390 let mut config = ConnectorConfig::new("postgres-cdc");
391 config.set("host", "localhost");
392 config.set("database", "db");
393 config.set("slot.name", "s");
394 config.set("publication", "p");
395 config.set("port", "not_a_number");
396 assert!(PostgresCdcConfig::from_config(&config).is_err());
397 }
398
399 #[test]
400 fn test_validate_empty_host() {
401 let mut cfg = PostgresCdcConfig::default();
402 cfg.host = String::new();
403 assert!(cfg.validate().is_err());
404 }
405
406 #[test]
407 fn test_validate_zero_max_poll() {
408 let mut cfg = PostgresCdcConfig::default();
409 cfg.max_poll_records = 0;
410 assert!(cfg.validate().is_err());
411 }
412
413 #[test]
414 fn test_ssl_mode_fromstr() {
415 assert_eq!("disable".parse::<SslMode>().unwrap(), SslMode::Disable);
416 assert_eq!("prefer".parse::<SslMode>().unwrap(), SslMode::Prefer);
417 assert_eq!("require".parse::<SslMode>().unwrap(), SslMode::Require);
418 assert_eq!("verify-ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
419 assert_eq!(
420 "verify-full".parse::<SslMode>().unwrap(),
421 SslMode::VerifyFull
422 );
423 assert!("invalid".parse::<SslMode>().is_err());
424 }
425
426 #[test]
427 fn test_snapshot_mode_fromstr() {
428 assert_eq!(
429 "initial".parse::<SnapshotMode>().unwrap(),
430 SnapshotMode::Initial
431 );
432 assert_eq!(
433 "never".parse::<SnapshotMode>().unwrap(),
434 SnapshotMode::Never
435 );
436 assert_eq!(
437 "always".parse::<SnapshotMode>().unwrap(),
438 SnapshotMode::Always
439 );
440 assert!("bad".parse::<SnapshotMode>().is_err());
441 }
442
443 #[test]
444 fn test_ssl_mode_display() {
445 assert_eq!(SslMode::Disable.to_string(), "disable");
446 assert_eq!(SslMode::VerifyFull.to_string(), "verify-full");
447 }
448
449 #[test]
450 fn test_table_filtering() {
451 let mut cfg = PostgresCdcConfig::default();
452 assert!(cfg.should_include_table("public.users"));
454
455 cfg.table_include = vec!["public.users".to_string(), "public.orders".to_string()];
457 assert!(cfg.should_include_table("public.users"));
458 assert!(!cfg.should_include_table("public.logs"));
459
460 cfg.table_exclude = vec!["public.users".to_string()];
462 assert!(!cfg.should_include_table("public.users"));
463 }
464
465 #[test]
466 fn test_from_config_with_start_lsn() {
467 let mut config = ConnectorConfig::new("postgres-cdc");
468 config.set("host", "localhost");
469 config.set("database", "db");
470 config.set("slot.name", "s");
471 config.set("publication", "p");
472 config.set("start.lsn", "0/1234ABCD");
473
474 let cfg = PostgresCdcConfig::from_config(&config).unwrap();
475 assert!(cfg.start_lsn.is_some());
476 assert_eq!(cfg.start_lsn.unwrap().as_u64(), 0x1234_ABCD);
477 }
478
479 #[test]
480 fn test_from_config_table_include() {
481 let mut config = ConnectorConfig::new("postgres-cdc");
482 config.set("host", "localhost");
483 config.set("database", "db");
484 config.set("slot.name", "s");
485 config.set("publication", "p");
486 config.set("table.include", "public.users, public.orders");
487
488 let cfg = PostgresCdcConfig::from_config(&config).unwrap();
489 assert_eq!(cfg.table_include, vec!["public.users", "public.orders"]);
490 }
491
492 #[test]
495 fn test_default_tls_fields_are_none() {
496 let cfg = PostgresCdcConfig::default();
497 assert!(cfg.ca_cert_path.is_none());
498 assert!(cfg.client_cert_path.is_none());
499 assert!(cfg.client_key_path.is_none());
500 assert!(cfg.sni_hostname.is_none());
501 }
502
503 #[test]
504 fn test_from_config_tls_cert_paths() {
505 let mut config = ConnectorConfig::new("postgres-cdc");
506 config.set("host", "localhost");
507 config.set("database", "db");
508 config.set("slot.name", "s");
509 config.set("publication", "p");
510 config.set("ssl.mode", "verify-full");
511 config.set("ssl.ca.cert.path", "/certs/ca.pem");
512 config.set("ssl.client.cert.path", "/certs/client.pem");
513 config.set("ssl.client.key.path", "/certs/client-key.pem");
514 config.set("ssl.sni.hostname", "db.example.com");
515
516 let cfg = PostgresCdcConfig::from_config(&config).unwrap();
517 assert_eq!(cfg.ssl_mode, SslMode::VerifyFull);
518 assert_eq!(cfg.ca_cert_path.as_deref(), Some("/certs/ca.pem"));
519 assert_eq!(cfg.client_cert_path.as_deref(), Some("/certs/client.pem"));
520 assert_eq!(
521 cfg.client_key_path.as_deref(),
522 Some("/certs/client-key.pem")
523 );
524 assert_eq!(cfg.sni_hostname.as_deref(), Some("db.example.com"));
525 }
526
527 #[test]
528 fn test_validate_verify_ca_requires_ca_path() {
529 let mut cfg = PostgresCdcConfig::default();
530 cfg.ssl_mode = SslMode::VerifyCa;
531 let err = cfg.validate().unwrap_err();
532 assert!(err.to_string().contains("ssl.ca.cert.path"));
533 }
534
535 #[test]
536 fn test_validate_verify_full_requires_ca_path() {
537 let mut cfg = PostgresCdcConfig::default();
538 cfg.ssl_mode = SslMode::VerifyFull;
539 let err = cfg.validate().unwrap_err();
540 assert!(err.to_string().contains("ssl.ca.cert.path"));
541 }
542
543 #[test]
544 fn test_validate_verify_ca_with_ca_path_ok() {
545 let mut cfg = PostgresCdcConfig::default();
546 cfg.ssl_mode = SslMode::VerifyCa;
547 cfg.ca_cert_path = Some("/certs/ca.pem".to_string());
548 assert!(cfg.validate().is_ok());
549 }
550
551 #[test]
552 fn test_validate_client_cert_without_key() {
553 let mut cfg = PostgresCdcConfig::default();
554 cfg.client_cert_path = Some("/certs/client.pem".to_string());
555 let err = cfg.validate().unwrap_err();
556 assert!(err.to_string().contains("mTLS"));
557 }
558
559 #[test]
560 fn test_validate_client_key_without_cert() {
561 let mut cfg = PostgresCdcConfig::default();
562 cfg.client_key_path = Some("/certs/client-key.pem".to_string());
563 let err = cfg.validate().unwrap_err();
564 assert!(err.to_string().contains("mTLS"));
565 }
566
567 #[test]
568 fn test_validate_client_cert_and_key_ok() {
569 let mut cfg = PostgresCdcConfig::default();
570 cfg.client_cert_path = Some("/certs/client.pem".to_string());
571 cfg.client_key_path = Some("/certs/client-key.pem".to_string());
572 assert!(cfg.validate().is_ok());
573 }
574
575 #[test]
576 fn test_require_mode_no_ca_path_ok() {
577 let mut cfg = PostgresCdcConfig::default();
578 cfg.ssl_mode = SslMode::Require;
579 assert!(cfg.validate().is_ok());
580 }
581}