Skip to main content

laminar_connectors/postgres/
sink_config.rs

1//! `PostgreSQL` sink connector configuration.
2//!
3//! [`PostgresSinkConfig`] encapsulates all settings for writing Arrow
4//! `RecordBatch` data to `PostgreSQL`, parsed from SQL `WITH (...)` clauses.
5
6use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11/// Configuration for the `PostgreSQL` sink connector.
12///
13/// Parsed from SQL `WITH (...)` clause options via [`from_config`](Self::from_config).
14#[derive(Debug, Clone)]
15pub struct PostgresSinkConfig {
16    /// `PostgreSQL` hostname.
17    pub hostname: String,
18
19    /// `PostgreSQL` port (default: 5432).
20    pub port: u16,
21
22    /// Database name.
23    pub database: String,
24
25    /// Username for authentication.
26    pub username: String,
27
28    /// Password for authentication.
29    pub password: String,
30
31    /// Target schema name (default: `"public"`).
32    pub schema_name: String,
33
34    /// Target table name.
35    pub table_name: String,
36
37    /// Write mode: append (COPY BINARY) or upsert (ON CONFLICT).
38    pub write_mode: WriteMode,
39
40    /// Primary key columns (required for upsert mode).
41    pub primary_key_columns: Vec<String>,
42
43    /// Maximum records to buffer before flushing.
44    pub batch_size: usize,
45
46    /// Maximum time to buffer before flushing.
47    pub flush_interval: Duration,
48
49    /// Connection pool size.
50    pub pool_size: usize,
51
52    /// Connection timeout.
53    pub connect_timeout: Duration,
54
55    /// SSL mode for connections.
56    pub ssl_mode: SslMode,
57
58    /// Whether to create the target table if it doesn't exist.
59    pub auto_create_table: bool,
60
61    /// Whether to handle changelog/retraction records.
62    pub changelog_mode: bool,
63
64    /// Delivery guarantee level.
65    pub delivery_guarantee: DeliveryGuarantee,
66
67    /// Per-query statement timeout (default: 30s).
68    pub statement_timeout: Duration,
69
70    /// Sink ID for offset tracking (auto-generated if empty).
71    pub sink_id: String,
72}
73
74impl Default for PostgresSinkConfig {
75    fn default() -> Self {
76        Self {
77            hostname: "localhost".to_string(),
78            port: 5432,
79            database: String::new(),
80            username: String::new(),
81            password: String::new(),
82            schema_name: "public".to_string(),
83            table_name: String::new(),
84            write_mode: WriteMode::Append,
85            primary_key_columns: Vec::new(),
86            batch_size: 4096,
87            flush_interval: Duration::from_millis(250),
88            pool_size: 4,
89            connect_timeout: Duration::from_secs(10),
90            ssl_mode: SslMode::Prefer,
91            auto_create_table: false,
92            changelog_mode: false,
93            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
94            statement_timeout: Duration::from_secs(30),
95            sink_id: String::new(),
96        }
97    }
98}
99
100impl PostgresSinkConfig {
101    /// Creates a minimal config for testing.
102    #[must_use]
103    pub fn new(hostname: &str, database: &str, table_name: &str) -> Self {
104        Self {
105            hostname: hostname.to_string(),
106            database: database.to_string(),
107            table_name: table_name.to_string(),
108            ..Default::default()
109        }
110    }
111
112    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
113    ///
114    /// # Required keys
115    ///
116    /// - `hostname` - `PostgreSQL` server hostname
117    /// - `database` - Target database name
118    /// - `username` - Authentication username
119    /// - `table.name` - Target table name
120    ///
121    /// # Errors
122    ///
123    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
124    /// or `ConnectorError::ConfigurationError` on invalid values.
125    #[allow(clippy::field_reassign_with_default)]
126    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
127        let mut cfg = Self::default();
128
129        cfg.hostname = config.require("hostname")?.to_string();
130        cfg.database = config.require("database")?.to_string();
131        cfg.username = config.require("username")?.to_string();
132        cfg.table_name = config.require("table.name")?.to_string();
133
134        if let Some(v) = config.get("password") {
135            cfg.password = v.to_string();
136        }
137        if let Some(v) = config.get("port") {
138            cfg.port = crate::config::parse_port(v)?;
139        }
140        if let Some(v) = config.get("schema.name") {
141            cfg.schema_name = v.to_string();
142        }
143        if let Some(v) = config.get("write.mode") {
144            cfg.write_mode = v.parse().map_err(|_| {
145                ConnectorError::ConfigurationError(format!(
146                    "invalid write.mode: '{v}' (expected 'append' or 'upsert')"
147                ))
148            })?;
149        }
150        if let Some(v) = config.get("primary.key") {
151            cfg.primary_key_columns = v.split(',').map(|c| c.trim().to_string()).collect();
152        }
153        if let Some(v) = config.get("batch.size") {
154            cfg.batch_size = v.parse().map_err(|_| {
155                ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
156            })?;
157        }
158        if let Some(v) = config.get("flush.interval.ms") {
159            let ms: u64 = v.parse().map_err(|_| {
160                ConnectorError::ConfigurationError(format!("invalid flush.interval.ms: '{v}'"))
161            })?;
162            cfg.flush_interval = Duration::from_millis(ms);
163        }
164        if let Some(v) = config.get("pool.size") {
165            cfg.pool_size = v.parse().map_err(|_| {
166                ConnectorError::ConfigurationError(format!("invalid pool.size: '{v}'"))
167            })?;
168        }
169        if let Some(v) = config.get("connect.timeout.ms") {
170            let ms: u64 = v.parse().map_err(|_| {
171                ConnectorError::ConfigurationError(format!("invalid connect.timeout.ms: '{v}'"))
172            })?;
173            cfg.connect_timeout = Duration::from_millis(ms);
174        }
175        if let Some(v) = config.get("ssl.mode") {
176            cfg.ssl_mode = v.parse().map_err(|_| {
177                ConnectorError::ConfigurationError(format!(
178                    "invalid ssl.mode: '{v}' (expected disable/prefer/require/verify-ca/verify-full)"
179                ))
180            })?;
181        }
182        if let Some(v) = config.get("auto.create.table") {
183            cfg.auto_create_table = v.eq_ignore_ascii_case("true");
184        }
185        if let Some(v) = config.get("changelog.mode") {
186            cfg.changelog_mode = v.eq_ignore_ascii_case("true");
187        }
188        if let Some(v) = config.get("statement.timeout.ms") {
189            let ms: u64 = v.parse().map_err(|_| {
190                ConnectorError::ConfigurationError(format!("invalid statement.timeout.ms: '{v}'"))
191            })?;
192            cfg.statement_timeout = Duration::from_millis(ms);
193        }
194        if let Some(v) = config.get("delivery.guarantee") {
195            cfg.delivery_guarantee = v.parse().map_err(|_| {
196                ConnectorError::ConfigurationError(format!(
197                    "invalid delivery.guarantee: '{v}' \
198                     (expected 'at_least_once' or 'exactly_once')"
199                ))
200            })?;
201        }
202        if let Some(v) = config.get("sink.id") {
203            cfg.sink_id = v.to_string();
204        }
205
206        cfg.validate()?;
207        Ok(cfg)
208    }
209
210    /// Validates the configuration for consistency.
211    ///
212    /// # Errors
213    ///
214    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
215    pub fn validate(&self) -> Result<(), ConnectorError> {
216        if self.table_name.is_empty() {
217            return Err(ConnectorError::missing_config("table.name"));
218        }
219        if self.write_mode == WriteMode::Upsert && self.primary_key_columns.is_empty() {
220            return Err(ConnectorError::ConfigurationError(
221                "upsert mode requires 'primary.key' to be set".into(),
222            ));
223        }
224        if self.batch_size == 0 {
225            return Err(ConnectorError::ConfigurationError(
226                "batch.size must be > 0".into(),
227            ));
228        }
229        if self.pool_size == 0 {
230            return Err(ConnectorError::ConfigurationError(
231                "pool.size must be > 0".into(),
232            ));
233        }
234        if self.statement_timeout < Duration::from_secs(1) {
235            return Err(ConnectorError::ConfigurationError(
236                "statement.timeout.ms must be >= 1000 (1 second)".into(),
237            ));
238        }
239        Ok(())
240    }
241
242    /// Returns the fully qualified table name (`schema.table`).
243    #[must_use]
244    pub fn qualified_table_name(&self) -> String {
245        format!("{}.{}", self.schema_name, self.table_name)
246    }
247
248    /// Returns the effective sink ID, auto-generating from table name if empty.
249    #[must_use]
250    pub fn effective_sink_id(&self) -> String {
251        if self.sink_id.is_empty() {
252            format!("laminardb-sink-{}", self.qualified_table_name())
253        } else {
254            self.sink_id.clone()
255        }
256    }
257}
258
259/// Write mode for the `PostgreSQL` sink.
260#[derive(Debug, Clone, Copy, PartialEq, Eq)]
261pub enum WriteMode {
262    /// Append-only: uses COPY BINARY for maximum throughput.
263    /// No deduplication — every record is inserted.
264    Append,
265    /// Upsert: `INSERT ... ON CONFLICT DO UPDATE`.
266    /// Requires primary key columns. Deduplicates on key.
267    Upsert,
268}
269
270str_enum!(WriteMode, lowercase_nodash, String, "unknown write mode",
271    Append => "append", "copy";
272    Upsert => "upsert", "insert"
273);
274
275pub use crate::connector::DeliveryGuarantee;
276pub use crate::connector::PostgresSslMode as SslMode;
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
283        let mut config = ConnectorConfig::new("postgres-sink");
284        for (k, v) in pairs {
285            config.set(*k, *v);
286        }
287        config
288    }
289
290    fn required_pairs() -> Vec<(&'static str, &'static str)> {
291        vec![
292            ("hostname", "localhost"),
293            ("database", "mydb"),
294            ("username", "writer"),
295            ("table.name", "events"),
296        ]
297    }
298
299    #[test]
300    fn test_parse_required_fields() {
301        let config = make_config(&required_pairs());
302        let cfg = PostgresSinkConfig::from_config(&config).unwrap();
303        assert_eq!(cfg.hostname, "localhost");
304        assert_eq!(cfg.database, "mydb");
305        assert_eq!(cfg.username, "writer");
306        assert_eq!(cfg.table_name, "events");
307        assert_eq!(cfg.port, 5432);
308        assert_eq!(cfg.schema_name, "public");
309        assert_eq!(cfg.write_mode, WriteMode::Append);
310        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
311    }
312
313    #[test]
314    fn test_missing_hostname() {
315        let config = make_config(&[("database", "db"), ("username", "u"), ("table.name", "t")]);
316        assert!(PostgresSinkConfig::from_config(&config).is_err());
317    }
318
319    #[test]
320    fn test_missing_database() {
321        let config = make_config(&[("hostname", "h"), ("username", "u"), ("table.name", "t")]);
322        assert!(PostgresSinkConfig::from_config(&config).is_err());
323    }
324
325    #[test]
326    fn test_missing_username() {
327        let config = make_config(&[("hostname", "h"), ("database", "db"), ("table.name", "t")]);
328        assert!(PostgresSinkConfig::from_config(&config).is_err());
329    }
330
331    #[test]
332    fn test_missing_table_name() {
333        let config = make_config(&[("hostname", "h"), ("database", "db"), ("username", "u")]);
334        assert!(PostgresSinkConfig::from_config(&config).is_err());
335    }
336
337    #[test]
338    fn test_parse_all_optional_fields() {
339        let mut pairs = required_pairs();
340        pairs.extend_from_slice(&[
341            ("password", "secret"),
342            ("port", "5433"),
343            ("schema.name", "analytics"),
344            ("write.mode", "upsert"),
345            ("primary.key", "id, region"),
346            ("batch.size", "8192"),
347            ("flush.interval.ms", "500"),
348            ("pool.size", "8"),
349            ("connect.timeout.ms", "5000"),
350            ("ssl.mode", "require"),
351            ("auto.create.table", "true"),
352            ("changelog.mode", "true"),
353            ("delivery.guarantee", "exactly_once"),
354            ("sink.id", "my-sink"),
355        ]);
356        let config = make_config(&pairs);
357        let cfg = PostgresSinkConfig::from_config(&config).unwrap();
358
359        assert_eq!(cfg.password, "secret");
360        assert_eq!(cfg.port, 5433);
361        assert_eq!(cfg.schema_name, "analytics");
362        assert_eq!(cfg.write_mode, WriteMode::Upsert);
363        assert_eq!(cfg.primary_key_columns, vec!["id", "region"]);
364        assert_eq!(cfg.batch_size, 8192);
365        assert_eq!(cfg.flush_interval, Duration::from_millis(500));
366        assert_eq!(cfg.pool_size, 8);
367        assert_eq!(cfg.connect_timeout, Duration::from_secs(5));
368        assert_eq!(cfg.ssl_mode, SslMode::Require);
369        assert!(cfg.auto_create_table);
370        assert!(cfg.changelog_mode);
371        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
372        assert_eq!(cfg.sink_id, "my-sink");
373    }
374
375    #[test]
376    fn test_upsert_requires_primary_key() {
377        let mut pairs = required_pairs();
378        pairs.push(("write.mode", "upsert"));
379        let config = make_config(&pairs);
380        let result = PostgresSinkConfig::from_config(&config);
381        assert!(result.is_err());
382        let err = result.unwrap_err().to_string();
383        assert!(err.contains("primary.key"), "error: {err}");
384    }
385
386    #[test]
387    fn test_batch_size_zero_rejected() {
388        let mut pairs = required_pairs();
389        pairs.push(("batch.size", "0"));
390        let config = make_config(&pairs);
391        assert!(PostgresSinkConfig::from_config(&config).is_err());
392    }
393
394    #[test]
395    fn test_pool_size_zero_rejected() {
396        let mut pairs = required_pairs();
397        pairs.push(("pool.size", "0"));
398        let config = make_config(&pairs);
399        assert!(PostgresSinkConfig::from_config(&config).is_err());
400    }
401
402    #[test]
403    fn test_qualified_table_name() {
404        let cfg = PostgresSinkConfig::new("localhost", "db", "events");
405        assert_eq!(cfg.qualified_table_name(), "public.events");
406
407        let mut cfg2 = cfg;
408        cfg2.schema_name = "analytics".to_string();
409        assert_eq!(cfg2.qualified_table_name(), "analytics.events");
410    }
411
412    #[test]
413    fn test_effective_sink_id() {
414        let cfg = PostgresSinkConfig::new("localhost", "db", "events");
415        assert_eq!(cfg.effective_sink_id(), "laminardb-sink-public.events");
416
417        let mut cfg2 = cfg;
418        cfg2.sink_id = "custom-sink".to_string();
419        assert_eq!(cfg2.effective_sink_id(), "custom-sink");
420    }
421
422    #[test]
423    fn test_defaults() {
424        let cfg = PostgresSinkConfig::default();
425        assert_eq!(cfg.hostname, "localhost");
426        assert_eq!(cfg.port, 5432);
427        assert_eq!(cfg.schema_name, "public");
428        assert_eq!(cfg.write_mode, WriteMode::Append);
429        assert_eq!(cfg.batch_size, 4096);
430        assert_eq!(cfg.pool_size, 4);
431        assert_eq!(cfg.ssl_mode, SslMode::Prefer);
432        assert!(!cfg.auto_create_table);
433        assert!(!cfg.changelog_mode);
434        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
435    }
436
437    #[test]
438    fn test_write_mode_parse() {
439        assert_eq!("append".parse::<WriteMode>().unwrap(), WriteMode::Append);
440        assert_eq!("copy".parse::<WriteMode>().unwrap(), WriteMode::Append);
441        assert_eq!("upsert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
442        assert_eq!("insert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
443        assert!("unknown".parse::<WriteMode>().is_err());
444    }
445
446    #[test]
447    fn test_write_mode_display() {
448        assert_eq!(WriteMode::Append.to_string(), "append");
449        assert_eq!(WriteMode::Upsert.to_string(), "upsert");
450    }
451
452    #[test]
453    fn test_delivery_guarantee_parse() {
454        assert_eq!(
455            "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
456            DeliveryGuarantee::AtLeastOnce
457        );
458        assert_eq!(
459            "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
460            DeliveryGuarantee::AtLeastOnce
461        );
462        assert_eq!(
463            "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
464            DeliveryGuarantee::ExactlyOnce
465        );
466        assert!("unknown".parse::<DeliveryGuarantee>().is_err());
467    }
468
469    #[test]
470    fn test_delivery_guarantee_display() {
471        assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
472        assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
473    }
474
475    #[test]
476    fn test_ssl_mode_parse() {
477        assert_eq!("disable".parse::<SslMode>().unwrap(), SslMode::Disable);
478        assert_eq!("prefer".parse::<SslMode>().unwrap(), SslMode::Prefer);
479        assert_eq!("require".parse::<SslMode>().unwrap(), SslMode::Require);
480        assert_eq!("verify-ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
481        assert_eq!(
482            "verify-full".parse::<SslMode>().unwrap(),
483            SslMode::VerifyFull
484        );
485        assert!("unknown".parse::<SslMode>().is_err());
486    }
487
488    #[test]
489    fn test_ssl_mode_display() {
490        assert_eq!(SslMode::Disable.to_string(), "disable");
491        assert_eq!(SslMode::Prefer.to_string(), "prefer");
492        assert_eq!(SslMode::Require.to_string(), "require");
493        assert_eq!(SslMode::VerifyCa.to_string(), "verify-ca");
494        assert_eq!(SslMode::VerifyFull.to_string(), "verify-full");
495    }
496
497    #[test]
498    fn test_invalid_port() {
499        let mut pairs = required_pairs();
500        pairs.push(("port", "not_a_number"));
501        let config = make_config(&pairs);
502        assert!(PostgresSinkConfig::from_config(&config).is_err());
503    }
504
505    #[test]
506    fn test_invalid_batch_size() {
507        let mut pairs = required_pairs();
508        pairs.push(("batch.size", "abc"));
509        let config = make_config(&pairs);
510        assert!(PostgresSinkConfig::from_config(&config).is_err());
511    }
512}