Skip to main content

laminar_connectors/postgres/
sink_config.rs

1//! `PostgreSQL` sink connector configuration.
2//!
3//! [`PostgresSinkConfig`] encapsulates all settings for writing Arrow
4//! `RecordBatch` data to `PostgreSQL`, parsed from SQL `WITH (...)` clauses.
5
6use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11/// Configuration for the `PostgreSQL` sink connector.
12///
13/// Parsed from SQL `WITH (...)` clause options via [`from_config`](Self::from_config).
14#[derive(Debug, Clone)]
15pub struct PostgresSinkConfig {
16    /// `PostgreSQL` hostname.
17    pub hostname: String,
18
19    /// `PostgreSQL` port (default: 5432).
20    pub port: u16,
21
22    /// Database name.
23    pub database: String,
24
25    /// Username for authentication.
26    pub username: String,
27
28    /// Password for authentication.
29    pub password: String,
30
31    /// Target schema name (default: `"public"`).
32    pub schema_name: String,
33
34    /// Target table name.
35    pub table_name: String,
36
37    /// Write mode: append (COPY BINARY) or upsert (ON CONFLICT).
38    pub write_mode: WriteMode,
39
40    /// Primary key columns (required for upsert mode).
41    pub primary_key_columns: Vec<String>,
42
43    /// Maximum records to buffer before flushing.
44    pub batch_size: usize,
45
46    /// Maximum time to buffer before flushing.
47    pub flush_interval: Duration,
48
49    /// Connection pool size.
50    pub pool_size: usize,
51
52    /// Connection timeout.
53    pub connect_timeout: Duration,
54
55    /// SSL mode for connections.
56    pub ssl_mode: SslMode,
57
58    /// Whether to create the target table if it doesn't exist.
59    pub auto_create_table: bool,
60
61    /// Whether to handle changelog/retraction records.
62    pub changelog_mode: bool,
63
64    /// Delivery guarantee level.
65    pub delivery_guarantee: DeliveryGuarantee,
66
67    /// Sink ID for offset tracking (auto-generated if empty).
68    pub sink_id: String,
69}
70
71impl Default for PostgresSinkConfig {
72    fn default() -> Self {
73        Self {
74            hostname: "localhost".to_string(),
75            port: 5432,
76            database: String::new(),
77            username: String::new(),
78            password: String::new(),
79            schema_name: "public".to_string(),
80            table_name: String::new(),
81            write_mode: WriteMode::Append,
82            primary_key_columns: Vec::new(),
83            batch_size: 4096,
84            flush_interval: Duration::from_millis(250),
85            pool_size: 4,
86            connect_timeout: Duration::from_secs(10),
87            ssl_mode: SslMode::Prefer,
88            auto_create_table: false,
89            changelog_mode: false,
90            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
91            sink_id: String::new(),
92        }
93    }
94}
95
96impl PostgresSinkConfig {
97    /// Creates a minimal config for testing.
98    #[must_use]
99    pub fn new(hostname: &str, database: &str, table_name: &str) -> Self {
100        Self {
101            hostname: hostname.to_string(),
102            database: database.to_string(),
103            table_name: table_name.to_string(),
104            ..Default::default()
105        }
106    }
107
108    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
109    ///
110    /// # Required keys
111    ///
112    /// - `hostname` - `PostgreSQL` server hostname
113    /// - `database` - Target database name
114    /// - `username` - Authentication username
115    /// - `table.name` - Target table name
116    ///
117    /// # Errors
118    ///
119    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
120    /// or `ConnectorError::ConfigurationError` on invalid values.
121    #[allow(clippy::field_reassign_with_default)]
122    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
123        let mut cfg = Self::default();
124
125        cfg.hostname = config.require("hostname")?.to_string();
126        cfg.database = config.require("database")?.to_string();
127        cfg.username = config.require("username")?.to_string();
128        cfg.table_name = config.require("table.name")?.to_string();
129
130        if let Some(v) = config.get("password") {
131            cfg.password = v.to_string();
132        }
133        if let Some(v) = config.get("port") {
134            cfg.port = crate::config::parse_port(v)?;
135        }
136        if let Some(v) = config.get("schema.name") {
137            cfg.schema_name = v.to_string();
138        }
139        if let Some(v) = config.get("write.mode") {
140            cfg.write_mode = v.parse().map_err(|_| {
141                ConnectorError::ConfigurationError(format!(
142                    "invalid write.mode: '{v}' (expected 'append' or 'upsert')"
143                ))
144            })?;
145        }
146        if let Some(v) = config.get("primary.key") {
147            cfg.primary_key_columns = v.split(',').map(|c| c.trim().to_string()).collect();
148        }
149        if let Some(v) = config.get("batch.size") {
150            cfg.batch_size = v.parse().map_err(|_| {
151                ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
152            })?;
153        }
154        if let Some(v) = config.get("flush.interval.ms") {
155            let ms: u64 = v.parse().map_err(|_| {
156                ConnectorError::ConfigurationError(format!("invalid flush.interval.ms: '{v}'"))
157            })?;
158            cfg.flush_interval = Duration::from_millis(ms);
159        }
160        if let Some(v) = config.get("pool.size") {
161            cfg.pool_size = v.parse().map_err(|_| {
162                ConnectorError::ConfigurationError(format!("invalid pool.size: '{v}'"))
163            })?;
164        }
165        if let Some(v) = config.get("connect.timeout.ms") {
166            let ms: u64 = v.parse().map_err(|_| {
167                ConnectorError::ConfigurationError(format!("invalid connect.timeout.ms: '{v}'"))
168            })?;
169            cfg.connect_timeout = Duration::from_millis(ms);
170        }
171        if let Some(v) = config.get("ssl.mode") {
172            cfg.ssl_mode = v.parse().map_err(|_| {
173                ConnectorError::ConfigurationError(format!(
174                    "invalid ssl.mode: '{v}' (expected disable/prefer/require/verify-ca/verify-full)"
175                ))
176            })?;
177        }
178        if let Some(v) = config.get("auto.create.table") {
179            cfg.auto_create_table = v.eq_ignore_ascii_case("true");
180        }
181        if let Some(v) = config.get("changelog.mode") {
182            cfg.changelog_mode = v.eq_ignore_ascii_case("true");
183        }
184        if let Some(v) = config.get("delivery.guarantee") {
185            cfg.delivery_guarantee = v.parse().map_err(|_| {
186                ConnectorError::ConfigurationError(format!(
187                    "invalid delivery.guarantee: '{v}' \
188                     (expected 'at_least_once' or 'exactly_once')"
189                ))
190            })?;
191        }
192        if let Some(v) = config.get("sink.id") {
193            cfg.sink_id = v.to_string();
194        }
195
196        cfg.validate()?;
197        Ok(cfg)
198    }
199
200    /// Validates the configuration for consistency.
201    ///
202    /// # Errors
203    ///
204    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
205    pub fn validate(&self) -> Result<(), ConnectorError> {
206        if self.table_name.is_empty() {
207            return Err(ConnectorError::MissingConfig("table.name".into()));
208        }
209        if self.write_mode == WriteMode::Upsert && self.primary_key_columns.is_empty() {
210            return Err(ConnectorError::ConfigurationError(
211                "upsert mode requires 'primary.key' to be set".into(),
212            ));
213        }
214        if self.batch_size == 0 {
215            return Err(ConnectorError::ConfigurationError(
216                "batch.size must be > 0".into(),
217            ));
218        }
219        if self.pool_size == 0 {
220            return Err(ConnectorError::ConfigurationError(
221                "pool.size must be > 0".into(),
222            ));
223        }
224        Ok(())
225    }
226
227    /// Returns the fully qualified table name (`schema.table`).
228    #[must_use]
229    pub fn qualified_table_name(&self) -> String {
230        format!("{}.{}", self.schema_name, self.table_name)
231    }
232
233    /// Returns the effective sink ID, auto-generating from table name if empty.
234    #[must_use]
235    pub fn effective_sink_id(&self) -> String {
236        if self.sink_id.is_empty() {
237            format!("laminardb-sink-{}", self.qualified_table_name())
238        } else {
239            self.sink_id.clone()
240        }
241    }
242}
243
244/// Write mode for the `PostgreSQL` sink.
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub enum WriteMode {
247    /// Append-only: uses COPY BINARY for maximum throughput.
248    /// No deduplication — every record is inserted.
249    Append,
250    /// Upsert: `INSERT ... ON CONFLICT DO UPDATE`.
251    /// Requires primary key columns. Deduplicates on key.
252    Upsert,
253}
254
255str_enum!(WriteMode, lowercase_nodash, String, "unknown write mode",
256    Append => "append", "copy";
257    Upsert => "upsert", "insert"
258);
259
260/// Delivery guarantee for the `PostgreSQL` sink.
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262pub enum DeliveryGuarantee {
263    /// At-least-once: records may be duplicated on failure recovery.
264    AtLeastOnce,
265    /// Exactly-once: co-transactional offset storage in `PostgreSQL`.
266    ExactlyOnce,
267}
268
269str_enum!(DeliveryGuarantee, lowercase_nodash, String, "unknown delivery guarantee",
270    AtLeastOnce => "at_least_once", "at-least-once", "atleastonce";
271    ExactlyOnce => "exactly_once", "exactly-once", "exactlyonce"
272);
273
274/// SSL mode for `PostgreSQL` connections.
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum SslMode {
277    /// No SSL.
278    Disable,
279    /// Use SSL if available, fall back to unencrypted.
280    Prefer,
281    /// Require SSL.
282    Require,
283    /// Require SSL and verify server certificate.
284    VerifyCa,
285    /// Require SSL, verify certificate and hostname.
286    VerifyFull,
287}
288
289str_enum!(SslMode, lowercase_nodash, String, "unknown SSL mode",
290    Disable => "disable", "off";
291    Prefer => "prefer";
292    Require => "require";
293    VerifyCa => "verify-ca", "verify_ca", "verifyca";
294    VerifyFull => "verify-full", "verify_full", "verifyfull"
295);
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
302        let mut config = ConnectorConfig::new("postgres-sink");
303        for (k, v) in pairs {
304            config.set(*k, *v);
305        }
306        config
307    }
308
309    fn required_pairs() -> Vec<(&'static str, &'static str)> {
310        vec![
311            ("hostname", "localhost"),
312            ("database", "mydb"),
313            ("username", "writer"),
314            ("table.name", "events"),
315        ]
316    }
317
318    #[test]
319    fn test_parse_required_fields() {
320        let config = make_config(&required_pairs());
321        let cfg = PostgresSinkConfig::from_config(&config).unwrap();
322        assert_eq!(cfg.hostname, "localhost");
323        assert_eq!(cfg.database, "mydb");
324        assert_eq!(cfg.username, "writer");
325        assert_eq!(cfg.table_name, "events");
326        assert_eq!(cfg.port, 5432);
327        assert_eq!(cfg.schema_name, "public");
328        assert_eq!(cfg.write_mode, WriteMode::Append);
329        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
330    }
331
332    #[test]
333    fn test_missing_hostname() {
334        let config = make_config(&[("database", "db"), ("username", "u"), ("table.name", "t")]);
335        assert!(PostgresSinkConfig::from_config(&config).is_err());
336    }
337
338    #[test]
339    fn test_missing_database() {
340        let config = make_config(&[("hostname", "h"), ("username", "u"), ("table.name", "t")]);
341        assert!(PostgresSinkConfig::from_config(&config).is_err());
342    }
343
344    #[test]
345    fn test_missing_username() {
346        let config = make_config(&[("hostname", "h"), ("database", "db"), ("table.name", "t")]);
347        assert!(PostgresSinkConfig::from_config(&config).is_err());
348    }
349
350    #[test]
351    fn test_missing_table_name() {
352        let config = make_config(&[("hostname", "h"), ("database", "db"), ("username", "u")]);
353        assert!(PostgresSinkConfig::from_config(&config).is_err());
354    }
355
356    #[test]
357    fn test_parse_all_optional_fields() {
358        let mut pairs = required_pairs();
359        pairs.extend_from_slice(&[
360            ("password", "secret"),
361            ("port", "5433"),
362            ("schema.name", "analytics"),
363            ("write.mode", "upsert"),
364            ("primary.key", "id, region"),
365            ("batch.size", "8192"),
366            ("flush.interval.ms", "500"),
367            ("pool.size", "8"),
368            ("connect.timeout.ms", "5000"),
369            ("ssl.mode", "require"),
370            ("auto.create.table", "true"),
371            ("changelog.mode", "true"),
372            ("delivery.guarantee", "exactly_once"),
373            ("sink.id", "my-sink"),
374        ]);
375        let config = make_config(&pairs);
376        let cfg = PostgresSinkConfig::from_config(&config).unwrap();
377
378        assert_eq!(cfg.password, "secret");
379        assert_eq!(cfg.port, 5433);
380        assert_eq!(cfg.schema_name, "analytics");
381        assert_eq!(cfg.write_mode, WriteMode::Upsert);
382        assert_eq!(cfg.primary_key_columns, vec!["id", "region"]);
383        assert_eq!(cfg.batch_size, 8192);
384        assert_eq!(cfg.flush_interval, Duration::from_millis(500));
385        assert_eq!(cfg.pool_size, 8);
386        assert_eq!(cfg.connect_timeout, Duration::from_millis(5000));
387        assert_eq!(cfg.ssl_mode, SslMode::Require);
388        assert!(cfg.auto_create_table);
389        assert!(cfg.changelog_mode);
390        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
391        assert_eq!(cfg.sink_id, "my-sink");
392    }
393
394    #[test]
395    fn test_upsert_requires_primary_key() {
396        let mut pairs = required_pairs();
397        pairs.push(("write.mode", "upsert"));
398        let config = make_config(&pairs);
399        let result = PostgresSinkConfig::from_config(&config);
400        assert!(result.is_err());
401        let err = result.unwrap_err().to_string();
402        assert!(err.contains("primary.key"), "error: {err}");
403    }
404
405    #[test]
406    fn test_batch_size_zero_rejected() {
407        let mut pairs = required_pairs();
408        pairs.push(("batch.size", "0"));
409        let config = make_config(&pairs);
410        assert!(PostgresSinkConfig::from_config(&config).is_err());
411    }
412
413    #[test]
414    fn test_pool_size_zero_rejected() {
415        let mut pairs = required_pairs();
416        pairs.push(("pool.size", "0"));
417        let config = make_config(&pairs);
418        assert!(PostgresSinkConfig::from_config(&config).is_err());
419    }
420
421    #[test]
422    fn test_qualified_table_name() {
423        let cfg = PostgresSinkConfig::new("localhost", "db", "events");
424        assert_eq!(cfg.qualified_table_name(), "public.events");
425
426        let mut cfg2 = cfg;
427        cfg2.schema_name = "analytics".to_string();
428        assert_eq!(cfg2.qualified_table_name(), "analytics.events");
429    }
430
431    #[test]
432    fn test_effective_sink_id() {
433        let cfg = PostgresSinkConfig::new("localhost", "db", "events");
434        assert_eq!(cfg.effective_sink_id(), "laminardb-sink-public.events");
435
436        let mut cfg2 = cfg;
437        cfg2.sink_id = "custom-sink".to_string();
438        assert_eq!(cfg2.effective_sink_id(), "custom-sink");
439    }
440
441    #[test]
442    fn test_defaults() {
443        let cfg = PostgresSinkConfig::default();
444        assert_eq!(cfg.hostname, "localhost");
445        assert_eq!(cfg.port, 5432);
446        assert_eq!(cfg.schema_name, "public");
447        assert_eq!(cfg.write_mode, WriteMode::Append);
448        assert_eq!(cfg.batch_size, 4096);
449        assert_eq!(cfg.pool_size, 4);
450        assert_eq!(cfg.ssl_mode, SslMode::Prefer);
451        assert!(!cfg.auto_create_table);
452        assert!(!cfg.changelog_mode);
453        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
454    }
455
456    #[test]
457    fn test_write_mode_parse() {
458        assert_eq!("append".parse::<WriteMode>().unwrap(), WriteMode::Append);
459        assert_eq!("copy".parse::<WriteMode>().unwrap(), WriteMode::Append);
460        assert_eq!("upsert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
461        assert_eq!("insert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
462        assert!("unknown".parse::<WriteMode>().is_err());
463    }
464
465    #[test]
466    fn test_write_mode_display() {
467        assert_eq!(WriteMode::Append.to_string(), "append");
468        assert_eq!(WriteMode::Upsert.to_string(), "upsert");
469    }
470
471    #[test]
472    fn test_delivery_guarantee_parse() {
473        assert_eq!(
474            "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
475            DeliveryGuarantee::AtLeastOnce
476        );
477        assert_eq!(
478            "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
479            DeliveryGuarantee::AtLeastOnce
480        );
481        assert_eq!(
482            "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
483            DeliveryGuarantee::ExactlyOnce
484        );
485        assert!("unknown".parse::<DeliveryGuarantee>().is_err());
486    }
487
488    #[test]
489    fn test_delivery_guarantee_display() {
490        assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at_least_once");
491        assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly_once");
492    }
493
494    #[test]
495    fn test_ssl_mode_parse() {
496        assert_eq!("disable".parse::<SslMode>().unwrap(), SslMode::Disable);
497        assert_eq!("prefer".parse::<SslMode>().unwrap(), SslMode::Prefer);
498        assert_eq!("require".parse::<SslMode>().unwrap(), SslMode::Require);
499        assert_eq!("verify-ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
500        assert_eq!(
501            "verify-full".parse::<SslMode>().unwrap(),
502            SslMode::VerifyFull
503        );
504        assert!("unknown".parse::<SslMode>().is_err());
505    }
506
507    #[test]
508    fn test_ssl_mode_display() {
509        assert_eq!(SslMode::Disable.to_string(), "disable");
510        assert_eq!(SslMode::Prefer.to_string(), "prefer");
511        assert_eq!(SslMode::Require.to_string(), "require");
512        assert_eq!(SslMode::VerifyCa.to_string(), "verify-ca");
513        assert_eq!(SslMode::VerifyFull.to_string(), "verify-full");
514    }
515
516    #[test]
517    fn test_invalid_port() {
518        let mut pairs = required_pairs();
519        pairs.push(("port", "not_a_number"));
520        let config = make_config(&pairs);
521        assert!(PostgresSinkConfig::from_config(&config).is_err());
522    }
523
524    #[test]
525    fn test_invalid_batch_size() {
526        let mut pairs = required_pairs();
527        pairs.push(("batch.size", "abc"));
528        let config = make_config(&pairs);
529        assert!(PostgresSinkConfig::from_config(&config).is_err());
530    }
531}