Skip to main content

laminar_connectors/cdc/postgres/
config.rs

1//! `PostgreSQL` CDC source connector configuration.
2//!
3//! Provides [`PostgresCdcConfig`] with all settings needed to connect to
4//! a `PostgreSQL` database and stream logical replication changes.
5
6use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11use super::lsn::Lsn;
12
13/// Configuration for the `PostgreSQL` CDC source connector.
14#[derive(Debug, Clone)]
15pub struct PostgresCdcConfig {
16    // ── Connection ──
17    /// `PostgreSQL` host address.
18    pub host: String,
19
20    /// `PostgreSQL` port.
21    pub port: u16,
22
23    /// Database name.
24    pub database: String,
25
26    /// Username for authentication.
27    pub username: String,
28
29    /// Password for authentication.
30    pub password: Option<String>,
31
32    /// SSL mode for the connection.
33    pub ssl_mode: SslMode,
34
35    /// Path to CA certificate PEM file (for `VerifyCa` / `VerifyFull`).
36    pub ca_cert_path: Option<String>,
37
38    /// Path to client certificate PEM file (for mTLS).
39    pub client_cert_path: Option<String>,
40
41    /// Path to client private key PEM file (for mTLS).
42    pub client_key_path: Option<String>,
43
44    /// SNI hostname override (for proxy/load-balancer scenarios).
45    pub sni_hostname: Option<String>,
46
47    // ── Replication ──
48    /// Name of the logical replication slot.
49    pub slot_name: String,
50
51    /// Name of the publication to subscribe to.
52    pub publication: String,
53
54    /// LSN to start replication from (None = slot's `confirmed_flush_lsn`).
55    pub start_lsn: Option<Lsn>,
56
57    /// Output plugin name (always `pgoutput` for logical replication).
58    pub output_plugin: String,
59
60    // ── Snapshot ──
61    /// How to handle the initial data snapshot.
62    pub snapshot_mode: SnapshotMode,
63
64    // ── Tuning ──
65    /// Timeout for each poll operation.
66    pub poll_timeout: Duration,
67
68    /// Maximum records to return per poll.
69    pub max_poll_records: usize,
70
71    /// Interval for sending keepalive/status updates to `PostgreSQL`.
72    pub keepalive_interval: Duration,
73
74    /// Maximum WAL sender timeout before the server drops the connection.
75    pub wal_sender_timeout: Duration,
76
77    // ── Schema ──
78    /// Tables to include (empty = all tables in publication).
79    pub table_include: Vec<String>,
80
81    /// Tables to exclude from replication.
82    pub table_exclude: Vec<String>,
83
84    /// Maximum events to buffer (default: 100,000).
85    pub max_buffered_events: usize,
86
87    /// High watermark ratio (0.0–1.0) of `max_buffered_events`. When the
88    /// buffer reaches this level, stop draining the WAL reader channel to
89    /// apply backpressure (default: 0.8).
90    pub backpressure_high_watermark: f64,
91}
92
93impl Default for PostgresCdcConfig {
94    fn default() -> Self {
95        Self {
96            host: "localhost".to_string(),
97            port: 5432,
98            database: "postgres".to_string(),
99            username: "postgres".to_string(),
100            password: None,
101            ssl_mode: SslMode::Prefer,
102            ca_cert_path: None,
103            client_cert_path: None,
104            client_key_path: None,
105            sni_hostname: None,
106            slot_name: "laminar_slot".to_string(),
107            publication: "laminar_pub".to_string(),
108            start_lsn: None,
109            output_plugin: "pgoutput".to_string(),
110            snapshot_mode: SnapshotMode::Initial,
111            poll_timeout: Duration::from_millis(100),
112            max_poll_records: 1000,
113            keepalive_interval: Duration::from_secs(10),
114            wal_sender_timeout: Duration::from_secs(60),
115            table_include: Vec::new(),
116            table_exclude: Vec::new(),
117            max_buffered_events: 100_000,
118            backpressure_high_watermark: 0.8,
119        }
120    }
121}
122
123impl PostgresCdcConfig {
124    /// Returns the high watermark as an absolute event count.
125    #[must_use]
126    #[allow(
127        clippy::cast_precision_loss,
128        clippy::cast_possible_truncation,
129        clippy::cast_sign_loss
130    )]
131    pub fn backpressure_high_watermark(&self) -> usize {
132        (self.max_buffered_events as f64 * self.backpressure_high_watermark) as usize
133    }
134
135    /// Creates a new config with required fields.
136    #[must_use]
137    pub fn new(host: &str, database: &str, slot_name: &str, publication: &str) -> Self {
138        Self {
139            host: host.to_string(),
140            database: database.to_string(),
141            slot_name: slot_name.to_string(),
142            publication: publication.to_string(),
143            ..Self::default()
144        }
145    }
146
147    /// Builds a `PostgreSQL` connection string.
148    #[must_use]
149    pub fn connection_string(&self) -> String {
150        use std::fmt::Write;
151        let mut s = format!(
152            "host={} port={} dbname={} user={}",
153            self.host, self.port, self.database, self.username
154        );
155        if let Some(ref pw) = self.password {
156            // Escape for libpq: wrap in single quotes, escape \ and '
157            let escaped = pw.replace('\\', "\\\\").replace('\'', "\\'");
158            let _ = write!(s, " password='{escaped}'");
159        }
160        let _ = write!(s, " sslmode={}", self.ssl_mode);
161        s
162    }
163
164    /// Parses configuration from a generic [`ConnectorConfig`].
165    ///
166    /// # Errors
167    ///
168    /// Returns `ConnectorError` if required keys are missing or values are
169    /// invalid.
170    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
171        let mut cfg = Self {
172            host: config.require("host")?.to_string(),
173            database: config.require("database")?.to_string(),
174            slot_name: config.require("slot.name")?.to_string(),
175            publication: config.require("publication")?.to_string(),
176            ..Self::default()
177        };
178
179        if let Some(port) = config.get("port") {
180            cfg.port = crate::config::parse_port(port)?;
181        }
182        if let Some(user) = config.get("username") {
183            cfg.username = user.to_string();
184        }
185        cfg.password = config.get("password").map(String::from);
186
187        if let Some(ssl) = config.get_parsed::<SslMode>("ssl.mode")? {
188            cfg.ssl_mode = ssl;
189        }
190        cfg.ca_cert_path = config.get("ssl.ca.cert.path").map(String::from);
191        cfg.client_cert_path = config.get("ssl.client.cert.path").map(String::from);
192        cfg.client_key_path = config.get("ssl.client.key.path").map(String::from);
193        cfg.sni_hostname = config.get("ssl.sni.hostname").map(String::from);
194
195        if let Some(lsn) = config.get_parsed::<Lsn>("start.lsn")? {
196            cfg.start_lsn = Some(lsn);
197        }
198        if let Some(mode) = config.get_parsed::<SnapshotMode>("snapshot.mode")? {
199            cfg.snapshot_mode = mode;
200        }
201        if let Some(timeout) = config.get_parsed::<u64>("poll.timeout.ms")? {
202            cfg.poll_timeout = Duration::from_millis(timeout);
203        }
204        if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
205            cfg.max_poll_records = max;
206        }
207        if let Some(interval) = config.get_parsed::<u64>("keepalive.interval.ms")? {
208            cfg.keepalive_interval = Duration::from_millis(interval);
209        }
210        if let Some(tables) = config.get("table.include") {
211            cfg.table_include = tables.split(',').map(|s| s.trim().to_string()).collect();
212        }
213        if let Some(tables) = config.get("table.exclude") {
214            cfg.table_exclude = tables.split(',').map(|s| s.trim().to_string()).collect();
215        }
216        if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
217            cfg.max_buffered_events = max;
218        }
219        if let Some(hw) = config.get_parsed::<f64>("backpressure.high.watermark")? {
220            cfg.backpressure_high_watermark = hw;
221        }
222
223        cfg.validate()?;
224        Ok(cfg)
225    }
226
227    /// Validates the configuration.
228    ///
229    /// # Errors
230    ///
231    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
232    pub fn validate(&self) -> Result<(), ConnectorError> {
233        crate::config::require_non_empty(&self.host, "host")?;
234        crate::config::require_non_empty(&self.database, "database")?;
235        crate::config::require_non_empty(&self.slot_name, "slot.name")?;
236        crate::config::require_non_empty(&self.publication, "publication")?;
237        if self.max_poll_records == 0 {
238            return Err(ConnectorError::ConfigurationError(
239                "max.poll.records must be > 0".to_string(),
240            ));
241        }
242        if self.max_buffered_events == 0 {
243            return Err(ConnectorError::ConfigurationError(
244                "max.buffered.events must be > 0".to_string(),
245            ));
246        }
247        // VerifyCa/VerifyFull require a CA certificate path
248        if matches!(self.ssl_mode, SslMode::VerifyCa | SslMode::VerifyFull)
249            && self.ca_cert_path.is_none()
250        {
251            return Err(ConnectorError::ConfigurationError(format!(
252                "ssl.mode={} requires ssl.ca.cert.path",
253                self.ssl_mode
254            )));
255        }
256        // Client cert without key (or vice versa) is invalid
257        if self.client_cert_path.is_some() != self.client_key_path.is_some() {
258            return Err(ConnectorError::ConfigurationError(
259                "ssl.client.cert.path and ssl.client.key.path must both be set for mTLS"
260                    .to_string(),
261            ));
262        }
263        Ok(())
264    }
265
266    /// Returns whether a table should be included based on include/exclude lists.
267    #[must_use]
268    pub fn should_include_table(&self, table: &str) -> bool {
269        if self.table_exclude.iter().any(|t| t == table) {
270            return false;
271        }
272        if self.table_include.is_empty() {
273            return true;
274        }
275        self.table_include.iter().any(|t| t == table)
276    }
277}
278
279pub use crate::connector::PostgresSslMode as SslMode;
280
281/// How to handle the initial snapshot when no prior checkpoint exists.
282#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
283pub enum SnapshotMode {
284    /// Take a full snapshot on first start, then switch to streaming.
285    #[default]
286    Initial,
287    /// Never take a snapshot; only stream from the replication slot's position.
288    Never,
289    /// Always take a snapshot on startup, even if a checkpoint exists.
290    Always,
291}
292
293str_enum!(SnapshotMode, lowercase_nodash, String, "unknown snapshot mode",
294    Initial => "initial";
295    Never => "never";
296    Always => "always"
297);
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_default_config() {
305        let cfg = PostgresCdcConfig::default();
306        assert_eq!(cfg.host, "localhost");
307        assert_eq!(cfg.port, 5432);
308        assert_eq!(cfg.database, "postgres");
309        assert_eq!(cfg.slot_name, "laminar_slot");
310        assert_eq!(cfg.publication, "laminar_pub");
311        assert_eq!(cfg.output_plugin, "pgoutput");
312        assert_eq!(cfg.ssl_mode, SslMode::Prefer);
313        assert_eq!(cfg.snapshot_mode, SnapshotMode::Initial);
314        assert_eq!(cfg.max_poll_records, 1000);
315    }
316
317    #[test]
318    fn test_new_config() {
319        let cfg = PostgresCdcConfig::new("db.example.com", "mydb", "my_slot", "my_pub");
320        assert_eq!(cfg.host, "db.example.com");
321        assert_eq!(cfg.database, "mydb");
322        assert_eq!(cfg.slot_name, "my_slot");
323        assert_eq!(cfg.publication, "my_pub");
324    }
325
326    #[test]
327    fn test_connection_string() {
328        let mut cfg = PostgresCdcConfig::new("db.example.com", "mydb", "s", "p");
329        cfg.password = Some("secret".to_string());
330        let conn = cfg.connection_string();
331        assert!(conn.contains("host=db.example.com"));
332        assert!(conn.contains("dbname=mydb"));
333        assert!(conn.contains("password='secret'"));
334        assert!(conn.contains("sslmode=prefer"));
335    }
336
337    #[test]
338    fn test_connection_string_password_with_spaces() {
339        let mut cfg = PostgresCdcConfig::new("h", "d", "s", "p");
340        cfg.password = Some("my secret pass".to_string());
341        let conn = cfg.connection_string();
342        assert!(conn.contains("password='my secret pass'"));
343    }
344
345    #[test]
346    fn test_connection_string_password_with_quotes() {
347        let mut cfg = PostgresCdcConfig::new("h", "d", "s", "p");
348        cfg.password = Some("it's a p@ss'word".to_string());
349        let conn = cfg.connection_string();
350        assert!(conn.contains(r"password='it\'s a p@ss\'word'"));
351    }
352
353    #[test]
354    fn test_connection_string_password_with_backslash() {
355        let mut cfg = PostgresCdcConfig::new("h", "d", "s", "p");
356        cfg.password = Some(r"pass\word".to_string());
357        let conn = cfg.connection_string();
358        assert!(conn.contains(r"password='pass\\word'"));
359    }
360
361    #[test]
362    fn test_from_connector_config() {
363        let mut config = ConnectorConfig::new("postgres-cdc");
364        config.set("host", "pg.local");
365        config.set("database", "testdb");
366        config.set("slot.name", "test_slot");
367        config.set("publication", "test_pub");
368        config.set("port", "5433");
369        config.set("ssl.mode", "require");
370        config.set("snapshot.mode", "never");
371        config.set("max.poll.records", "500");
372
373        let cfg = PostgresCdcConfig::from_config(&config).unwrap();
374        assert_eq!(cfg.host, "pg.local");
375        assert_eq!(cfg.port, 5433);
376        assert_eq!(cfg.database, "testdb");
377        assert_eq!(cfg.ssl_mode, SslMode::Require);
378        assert_eq!(cfg.snapshot_mode, SnapshotMode::Never);
379        assert_eq!(cfg.max_poll_records, 500);
380    }
381
382    #[test]
383    fn test_from_config_missing_required() {
384        let config = ConnectorConfig::new("postgres-cdc");
385        assert!(PostgresCdcConfig::from_config(&config).is_err());
386    }
387
388    #[test]
389    fn test_from_config_invalid_port() {
390        let mut config = ConnectorConfig::new("postgres-cdc");
391        config.set("host", "localhost");
392        config.set("database", "db");
393        config.set("slot.name", "s");
394        config.set("publication", "p");
395        config.set("port", "not_a_number");
396        assert!(PostgresCdcConfig::from_config(&config).is_err());
397    }
398
399    #[test]
400    fn test_validate_empty_host() {
401        let mut cfg = PostgresCdcConfig::default();
402        cfg.host = String::new();
403        assert!(cfg.validate().is_err());
404    }
405
406    #[test]
407    fn test_validate_zero_max_poll() {
408        let mut cfg = PostgresCdcConfig::default();
409        cfg.max_poll_records = 0;
410        assert!(cfg.validate().is_err());
411    }
412
413    #[test]
414    fn test_ssl_mode_fromstr() {
415        assert_eq!("disable".parse::<SslMode>().unwrap(), SslMode::Disable);
416        assert_eq!("prefer".parse::<SslMode>().unwrap(), SslMode::Prefer);
417        assert_eq!("require".parse::<SslMode>().unwrap(), SslMode::Require);
418        assert_eq!("verify-ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
419        assert_eq!(
420            "verify-full".parse::<SslMode>().unwrap(),
421            SslMode::VerifyFull
422        );
423        assert!("invalid".parse::<SslMode>().is_err());
424    }
425
426    #[test]
427    fn test_snapshot_mode_fromstr() {
428        assert_eq!(
429            "initial".parse::<SnapshotMode>().unwrap(),
430            SnapshotMode::Initial
431        );
432        assert_eq!(
433            "never".parse::<SnapshotMode>().unwrap(),
434            SnapshotMode::Never
435        );
436        assert_eq!(
437            "always".parse::<SnapshotMode>().unwrap(),
438            SnapshotMode::Always
439        );
440        assert!("bad".parse::<SnapshotMode>().is_err());
441    }
442
443    #[test]
444    fn test_ssl_mode_display() {
445        assert_eq!(SslMode::Disable.to_string(), "disable");
446        assert_eq!(SslMode::VerifyFull.to_string(), "verify-full");
447    }
448
449    #[test]
450    fn test_table_filtering() {
451        let mut cfg = PostgresCdcConfig::default();
452        // No filters → include all
453        assert!(cfg.should_include_table("public.users"));
454
455        // Include list
456        cfg.table_include = vec!["public.users".to_string(), "public.orders".to_string()];
457        assert!(cfg.should_include_table("public.users"));
458        assert!(!cfg.should_include_table("public.logs"));
459
460        // Exclude overrides include
461        cfg.table_exclude = vec!["public.users".to_string()];
462        assert!(!cfg.should_include_table("public.users"));
463    }
464
465    #[test]
466    fn test_from_config_with_start_lsn() {
467        let mut config = ConnectorConfig::new("postgres-cdc");
468        config.set("host", "localhost");
469        config.set("database", "db");
470        config.set("slot.name", "s");
471        config.set("publication", "p");
472        config.set("start.lsn", "0/1234ABCD");
473
474        let cfg = PostgresCdcConfig::from_config(&config).unwrap();
475        assert!(cfg.start_lsn.is_some());
476        assert_eq!(cfg.start_lsn.unwrap().as_u64(), 0x1234_ABCD);
477    }
478
479    #[test]
480    fn test_from_config_table_include() {
481        let mut config = ConnectorConfig::new("postgres-cdc");
482        config.set("host", "localhost");
483        config.set("database", "db");
484        config.set("slot.name", "s");
485        config.set("publication", "p");
486        config.set("table.include", "public.users, public.orders");
487
488        let cfg = PostgresCdcConfig::from_config(&config).unwrap();
489        assert_eq!(cfg.table_include, vec!["public.users", "public.orders"]);
490    }
491
492    // ── TLS cert path fields ──
493
494    #[test]
495    fn test_default_tls_fields_are_none() {
496        let cfg = PostgresCdcConfig::default();
497        assert!(cfg.ca_cert_path.is_none());
498        assert!(cfg.client_cert_path.is_none());
499        assert!(cfg.client_key_path.is_none());
500        assert!(cfg.sni_hostname.is_none());
501    }
502
503    #[test]
504    fn test_from_config_tls_cert_paths() {
505        let mut config = ConnectorConfig::new("postgres-cdc");
506        config.set("host", "localhost");
507        config.set("database", "db");
508        config.set("slot.name", "s");
509        config.set("publication", "p");
510        config.set("ssl.mode", "verify-full");
511        config.set("ssl.ca.cert.path", "/certs/ca.pem");
512        config.set("ssl.client.cert.path", "/certs/client.pem");
513        config.set("ssl.client.key.path", "/certs/client-key.pem");
514        config.set("ssl.sni.hostname", "db.example.com");
515
516        let cfg = PostgresCdcConfig::from_config(&config).unwrap();
517        assert_eq!(cfg.ssl_mode, SslMode::VerifyFull);
518        assert_eq!(cfg.ca_cert_path.as_deref(), Some("/certs/ca.pem"));
519        assert_eq!(cfg.client_cert_path.as_deref(), Some("/certs/client.pem"));
520        assert_eq!(
521            cfg.client_key_path.as_deref(),
522            Some("/certs/client-key.pem")
523        );
524        assert_eq!(cfg.sni_hostname.as_deref(), Some("db.example.com"));
525    }
526
527    #[test]
528    fn test_validate_verify_ca_requires_ca_path() {
529        let mut cfg = PostgresCdcConfig::default();
530        cfg.ssl_mode = SslMode::VerifyCa;
531        let err = cfg.validate().unwrap_err();
532        assert!(err.to_string().contains("ssl.ca.cert.path"));
533    }
534
535    #[test]
536    fn test_validate_verify_full_requires_ca_path() {
537        let mut cfg = PostgresCdcConfig::default();
538        cfg.ssl_mode = SslMode::VerifyFull;
539        let err = cfg.validate().unwrap_err();
540        assert!(err.to_string().contains("ssl.ca.cert.path"));
541    }
542
543    #[test]
544    fn test_validate_verify_ca_with_ca_path_ok() {
545        let mut cfg = PostgresCdcConfig::default();
546        cfg.ssl_mode = SslMode::VerifyCa;
547        cfg.ca_cert_path = Some("/certs/ca.pem".to_string());
548        assert!(cfg.validate().is_ok());
549    }
550
551    #[test]
552    fn test_validate_client_cert_without_key() {
553        let mut cfg = PostgresCdcConfig::default();
554        cfg.client_cert_path = Some("/certs/client.pem".to_string());
555        let err = cfg.validate().unwrap_err();
556        assert!(err.to_string().contains("mTLS"));
557    }
558
559    #[test]
560    fn test_validate_client_key_without_cert() {
561        let mut cfg = PostgresCdcConfig::default();
562        cfg.client_key_path = Some("/certs/client-key.pem".to_string());
563        let err = cfg.validate().unwrap_err();
564        assert!(err.to_string().contains("mTLS"));
565    }
566
567    #[test]
568    fn test_validate_client_cert_and_key_ok() {
569        let mut cfg = PostgresCdcConfig::default();
570        cfg.client_cert_path = Some("/certs/client.pem".to_string());
571        cfg.client_key_path = Some("/certs/client-key.pem".to_string());
572        assert!(cfg.validate().is_ok());
573    }
574
575    #[test]
576    fn test_require_mode_no_ca_path_ok() {
577        let mut cfg = PostgresCdcConfig::default();
578        cfg.ssl_mode = SslMode::Require;
579        assert!(cfg.validate().is_ok());
580    }
581}