Skip to main content

laminar_connectors/cdc/postgres/
mod.rs

1//! PostgreSQL CDC source connector.
2//!
3//! Streams row-level changes from PostgreSQL using logical replication
4//! (pgoutput plugin). Supports INSERT, UPDATE, DELETE operations with
5//! Z-set changelog integration.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Ring 0 (Hot Path):  SPSC pop only (~5ns, zero CDC code)
11//! Ring 1 (Background): WAL consumption → pgoutput decode → Arrow conversion
12//! Ring 2 (Control):    Slot management, schema discovery, health checks
13//! ```
14//!
15//! # Module Structure
16//!
17//! - `config` - Connection and replication configuration
18//! - `lsn` - Log Sequence Number type
19//! - `types` - PostgreSQL OID to Arrow type mapping
20//! - `decoder` - pgoutput binary protocol parser
21//! - `schema` - Relation (table) schema cache
22//! - `changelog` - Z-set change event conversion
23//! - `metrics` - Lock-free atomic CDC metrics
24//! - `source` - `PostgresCdcSource` implementing `SourceConnector`
25//!
26//! # Usage
27//!
28//! ```rust,ignore
29//! use laminar_connectors::cdc::postgres::{PostgresCdcSource, PostgresCdcConfig};
30//!
31//! let config = PostgresCdcConfig::new("localhost", "mydb", "laminar_slot", "laminar_pub");
32//! let mut source = PostgresCdcSource::new(config);
33//! ```
34
35pub mod changelog;
36pub mod config;
37pub mod decoder;
38pub mod lsn;
39pub mod metrics;
40pub mod postgres_io;
41pub mod schema;
42pub mod source;
43pub mod types;
44
45// Re-export primary types at module level.
46pub use config::{PostgresCdcConfig, SnapshotMode, SslMode};
47pub use lsn::Lsn;
48pub use source::PostgresCdcSource;
49
50use std::sync::Arc;
51
52use crate::config::{ConfigKeySpec, ConnectorInfo};
53use crate::registry::ConnectorRegistry;
54
55/// Registers the `PostgreSQL` CDC source connector with the given registry.
56pub fn register_postgres_cdc(registry: &ConnectorRegistry) {
57    let info = ConnectorInfo {
58        name: "postgres-cdc".to_string(),
59        display_name: "PostgreSQL CDC Source".to_string(),
60        version: env!("CARGO_PKG_VERSION").to_string(),
61        is_source: true,
62        is_sink: false,
63        config_keys: postgres_cdc_config_keys(),
64    };
65
66    registry.register_source(
67        "postgres-cdc",
68        info.clone(),
69        Arc::new(|| Box::new(PostgresCdcSource::new(PostgresCdcConfig::default()))),
70    );
71
72    // Also register as a table source so CREATE LOOKUP TABLE ... WITH
73    // ('connector' = 'postgres-cdc') can use CDC for reference table refresh.
74    registry.register_table_source(
75        "postgres-cdc",
76        info,
77        Arc::new(|config| {
78            let connector = Box::new(PostgresCdcSource::new(PostgresCdcConfig::default()));
79            Ok(Box::new(crate::lookup::cdc_adapter::CdcTableSource::new(
80                connector,
81                config.clone(),
82                4096,
83            )))
84        }),
85    );
86}
87
88fn postgres_cdc_config_keys() -> Vec<ConfigKeySpec> {
89    vec![
90        ConfigKeySpec::required("host", "PostgreSQL host address"),
91        ConfigKeySpec::required("database", "Database name"),
92        ConfigKeySpec::required("slot.name", "Logical replication slot name"),
93        ConfigKeySpec::required("publication", "Publication name"),
94        ConfigKeySpec::optional("port", "PostgreSQL port", "5432"),
95        ConfigKeySpec::optional("username", "Connection username", "postgres"),
96        ConfigKeySpec::optional("password", "Connection password", ""),
97        ConfigKeySpec::optional("ssl.mode", "SSL mode (disable/prefer/require)", "prefer"),
98        ConfigKeySpec::optional(
99            "snapshot.mode",
100            "Snapshot mode (initial/never/always)",
101            "initial",
102        ),
103        ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
104        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
105        ConfigKeySpec::optional(
106            "keepalive.interval.ms",
107            "Keepalive interval in milliseconds",
108            "10000",
109        ),
110        ConfigKeySpec::optional(
111            "table.include",
112            "Comma-separated list of tables to include",
113            "",
114        ),
115        ConfigKeySpec::optional(
116            "table.exclude",
117            "Comma-separated list of tables to exclude",
118            "",
119        ),
120        // SSL certificate paths
121        ConfigKeySpec::optional("ssl.ca.cert.path", "Path to CA certificate file", ""),
122        ConfigKeySpec::optional(
123            "ssl.client.cert.path",
124            "Path to client certificate file",
125            "",
126        ),
127        ConfigKeySpec::optional("ssl.client.key.path", "Path to client private key file", ""),
128        ConfigKeySpec::optional("ssl.sni.hostname", "SNI hostname for SSL connections", ""),
129        // Replication start position
130        ConfigKeySpec::optional("start.lsn", "Starting LSN for replication", ""),
131    ]
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn test_register_postgres_cdc() {
140        let registry = ConnectorRegistry::new();
141        register_postgres_cdc(&registry);
142
143        let info = registry.source_info("postgres-cdc");
144        assert!(info.is_some());
145        let info = info.unwrap();
146        assert_eq!(info.name, "postgres-cdc");
147        assert!(info.is_source);
148        assert!(!info.is_sink);
149        assert!(!info.config_keys.is_empty());
150    }
151
152    #[test]
153    fn test_config_keys() {
154        let keys = postgres_cdc_config_keys();
155        let required: Vec<&str> = keys
156            .iter()
157            .filter(|k| k.required)
158            .map(|k| k.key.as_str())
159            .collect();
160        assert!(required.contains(&"host"));
161        assert!(required.contains(&"database"));
162        assert!(required.contains(&"slot.name"));
163        assert!(required.contains(&"publication"));
164    }
165}