1use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11#[derive(Debug, Clone)]
15pub struct PostgresSinkConfig {
16 pub hostname: String,
18
19 pub port: u16,
21
22 pub database: String,
24
25 pub username: String,
27
28 pub password: String,
30
31 pub schema_name: String,
33
34 pub table_name: String,
36
37 pub write_mode: WriteMode,
39
40 pub primary_key_columns: Vec<String>,
42
43 pub batch_size: usize,
45
46 pub flush_interval: Duration,
48
49 pub pool_size: usize,
51
52 pub connect_timeout: Duration,
54
55 pub ssl_mode: SslMode,
57
58 pub auto_create_table: bool,
60
61 pub changelog_mode: bool,
63
64 pub delivery_guarantee: DeliveryGuarantee,
66
67 pub statement_timeout: Duration,
69
70 pub sink_id: String,
72}
73
74impl Default for PostgresSinkConfig {
75 fn default() -> Self {
76 Self {
77 hostname: "localhost".to_string(),
78 port: 5432,
79 database: String::new(),
80 username: String::new(),
81 password: String::new(),
82 schema_name: "public".to_string(),
83 table_name: String::new(),
84 write_mode: WriteMode::Append,
85 primary_key_columns: Vec::new(),
86 batch_size: 4096,
87 flush_interval: Duration::from_millis(250),
88 pool_size: 4,
89 connect_timeout: Duration::from_secs(10),
90 ssl_mode: SslMode::Prefer,
91 auto_create_table: false,
92 changelog_mode: false,
93 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
94 statement_timeout: Duration::from_secs(30),
95 sink_id: String::new(),
96 }
97 }
98}
99
100impl PostgresSinkConfig {
101 #[must_use]
103 pub fn new(hostname: &str, database: &str, table_name: &str) -> Self {
104 Self {
105 hostname: hostname.to_string(),
106 database: database.to_string(),
107 table_name: table_name.to_string(),
108 ..Default::default()
109 }
110 }
111
112 #[allow(clippy::field_reassign_with_default)]
126 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
127 let mut cfg = Self::default();
128
129 cfg.hostname = config.require("hostname")?.to_string();
130 cfg.database = config.require("database")?.to_string();
131 cfg.username = config.require("username")?.to_string();
132 cfg.table_name = config.require("table.name")?.to_string();
133
134 if let Some(v) = config.get("password") {
135 cfg.password = v.to_string();
136 }
137 if let Some(v) = config.get("port") {
138 cfg.port = crate::config::parse_port(v)?;
139 }
140 if let Some(v) = config.get("schema.name") {
141 cfg.schema_name = v.to_string();
142 }
143 if let Some(v) = config.get("write.mode") {
144 cfg.write_mode = v.parse().map_err(|_| {
145 ConnectorError::ConfigurationError(format!(
146 "invalid write.mode: '{v}' (expected 'append' or 'upsert')"
147 ))
148 })?;
149 }
150 if let Some(v) = config.get("primary.key") {
151 cfg.primary_key_columns = v.split(',').map(|c| c.trim().to_string()).collect();
152 }
153 if let Some(v) = config.get("batch.size") {
154 cfg.batch_size = v.parse().map_err(|_| {
155 ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
156 })?;
157 }
158 if let Some(v) = config.get("flush.interval.ms") {
159 let ms: u64 = v.parse().map_err(|_| {
160 ConnectorError::ConfigurationError(format!("invalid flush.interval.ms: '{v}'"))
161 })?;
162 cfg.flush_interval = Duration::from_millis(ms);
163 }
164 if let Some(v) = config.get("pool.size") {
165 cfg.pool_size = v.parse().map_err(|_| {
166 ConnectorError::ConfigurationError(format!("invalid pool.size: '{v}'"))
167 })?;
168 }
169 if let Some(v) = config.get("connect.timeout.ms") {
170 let ms: u64 = v.parse().map_err(|_| {
171 ConnectorError::ConfigurationError(format!("invalid connect.timeout.ms: '{v}'"))
172 })?;
173 cfg.connect_timeout = Duration::from_millis(ms);
174 }
175 if let Some(v) = config.get("ssl.mode") {
176 cfg.ssl_mode = v.parse().map_err(|_| {
177 ConnectorError::ConfigurationError(format!(
178 "invalid ssl.mode: '{v}' (expected disable/prefer/require/verify-ca/verify-full)"
179 ))
180 })?;
181 }
182 if let Some(v) = config.get("auto.create.table") {
183 cfg.auto_create_table = v.eq_ignore_ascii_case("true");
184 }
185 if let Some(v) = config.get("changelog.mode") {
186 cfg.changelog_mode = v.eq_ignore_ascii_case("true");
187 }
188 if let Some(v) = config.get("statement.timeout.ms") {
189 let ms: u64 = v.parse().map_err(|_| {
190 ConnectorError::ConfigurationError(format!("invalid statement.timeout.ms: '{v}'"))
191 })?;
192 cfg.statement_timeout = Duration::from_millis(ms);
193 }
194 if let Some(v) = config.get("delivery.guarantee") {
195 cfg.delivery_guarantee = v.parse().map_err(|_| {
196 ConnectorError::ConfigurationError(format!(
197 "invalid delivery.guarantee: '{v}' \
198 (expected 'at_least_once' or 'exactly_once')"
199 ))
200 })?;
201 }
202 if let Some(v) = config.get("sink.id") {
203 cfg.sink_id = v.to_string();
204 }
205
206 cfg.validate()?;
207 Ok(cfg)
208 }
209
210 pub fn validate(&self) -> Result<(), ConnectorError> {
216 if self.table_name.is_empty() {
217 return Err(ConnectorError::missing_config("table.name"));
218 }
219 if self.write_mode == WriteMode::Upsert && self.primary_key_columns.is_empty() {
220 return Err(ConnectorError::ConfigurationError(
221 "upsert mode requires 'primary.key' to be set".into(),
222 ));
223 }
224 if self.batch_size == 0 {
225 return Err(ConnectorError::ConfigurationError(
226 "batch.size must be > 0".into(),
227 ));
228 }
229 if self.pool_size == 0 {
230 return Err(ConnectorError::ConfigurationError(
231 "pool.size must be > 0".into(),
232 ));
233 }
234 if self.statement_timeout < Duration::from_secs(1) {
235 return Err(ConnectorError::ConfigurationError(
236 "statement.timeout.ms must be >= 1000 (1 second)".into(),
237 ));
238 }
239 Ok(())
240 }
241
242 #[must_use]
244 pub fn qualified_table_name(&self) -> String {
245 format!("{}.{}", self.schema_name, self.table_name)
246 }
247
248 #[must_use]
250 pub fn effective_sink_id(&self) -> String {
251 if self.sink_id.is_empty() {
252 format!("laminardb-sink-{}", self.qualified_table_name())
253 } else {
254 self.sink_id.clone()
255 }
256 }
257}
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
261pub enum WriteMode {
262 Append,
265 Upsert,
268}
269
270str_enum!(WriteMode, lowercase_nodash, String, "unknown write mode",
271 Append => "append", "copy";
272 Upsert => "upsert", "insert"
273);
274
275pub use crate::connector::DeliveryGuarantee;
276pub use crate::connector::PostgresSslMode as SslMode;
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
283 let mut config = ConnectorConfig::new("postgres-sink");
284 for (k, v) in pairs {
285 config.set(*k, *v);
286 }
287 config
288 }
289
290 fn required_pairs() -> Vec<(&'static str, &'static str)> {
291 vec![
292 ("hostname", "localhost"),
293 ("database", "mydb"),
294 ("username", "writer"),
295 ("table.name", "events"),
296 ]
297 }
298
299 #[test]
300 fn test_parse_required_fields() {
301 let config = make_config(&required_pairs());
302 let cfg = PostgresSinkConfig::from_config(&config).unwrap();
303 assert_eq!(cfg.hostname, "localhost");
304 assert_eq!(cfg.database, "mydb");
305 assert_eq!(cfg.username, "writer");
306 assert_eq!(cfg.table_name, "events");
307 assert_eq!(cfg.port, 5432);
308 assert_eq!(cfg.schema_name, "public");
309 assert_eq!(cfg.write_mode, WriteMode::Append);
310 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
311 }
312
313 #[test]
314 fn test_missing_hostname() {
315 let config = make_config(&[("database", "db"), ("username", "u"), ("table.name", "t")]);
316 assert!(PostgresSinkConfig::from_config(&config).is_err());
317 }
318
319 #[test]
320 fn test_missing_database() {
321 let config = make_config(&[("hostname", "h"), ("username", "u"), ("table.name", "t")]);
322 assert!(PostgresSinkConfig::from_config(&config).is_err());
323 }
324
325 #[test]
326 fn test_missing_username() {
327 let config = make_config(&[("hostname", "h"), ("database", "db"), ("table.name", "t")]);
328 assert!(PostgresSinkConfig::from_config(&config).is_err());
329 }
330
331 #[test]
332 fn test_missing_table_name() {
333 let config = make_config(&[("hostname", "h"), ("database", "db"), ("username", "u")]);
334 assert!(PostgresSinkConfig::from_config(&config).is_err());
335 }
336
337 #[test]
338 fn test_parse_all_optional_fields() {
339 let mut pairs = required_pairs();
340 pairs.extend_from_slice(&[
341 ("password", "secret"),
342 ("port", "5433"),
343 ("schema.name", "analytics"),
344 ("write.mode", "upsert"),
345 ("primary.key", "id, region"),
346 ("batch.size", "8192"),
347 ("flush.interval.ms", "500"),
348 ("pool.size", "8"),
349 ("connect.timeout.ms", "5000"),
350 ("ssl.mode", "require"),
351 ("auto.create.table", "true"),
352 ("changelog.mode", "true"),
353 ("delivery.guarantee", "exactly_once"),
354 ("sink.id", "my-sink"),
355 ]);
356 let config = make_config(&pairs);
357 let cfg = PostgresSinkConfig::from_config(&config).unwrap();
358
359 assert_eq!(cfg.password, "secret");
360 assert_eq!(cfg.port, 5433);
361 assert_eq!(cfg.schema_name, "analytics");
362 assert_eq!(cfg.write_mode, WriteMode::Upsert);
363 assert_eq!(cfg.primary_key_columns, vec!["id", "region"]);
364 assert_eq!(cfg.batch_size, 8192);
365 assert_eq!(cfg.flush_interval, Duration::from_millis(500));
366 assert_eq!(cfg.pool_size, 8);
367 assert_eq!(cfg.connect_timeout, Duration::from_secs(5));
368 assert_eq!(cfg.ssl_mode, SslMode::Require);
369 assert!(cfg.auto_create_table);
370 assert!(cfg.changelog_mode);
371 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
372 assert_eq!(cfg.sink_id, "my-sink");
373 }
374
375 #[test]
376 fn test_upsert_requires_primary_key() {
377 let mut pairs = required_pairs();
378 pairs.push(("write.mode", "upsert"));
379 let config = make_config(&pairs);
380 let result = PostgresSinkConfig::from_config(&config);
381 assert!(result.is_err());
382 let err = result.unwrap_err().to_string();
383 assert!(err.contains("primary.key"), "error: {err}");
384 }
385
386 #[test]
387 fn test_batch_size_zero_rejected() {
388 let mut pairs = required_pairs();
389 pairs.push(("batch.size", "0"));
390 let config = make_config(&pairs);
391 assert!(PostgresSinkConfig::from_config(&config).is_err());
392 }
393
394 #[test]
395 fn test_pool_size_zero_rejected() {
396 let mut pairs = required_pairs();
397 pairs.push(("pool.size", "0"));
398 let config = make_config(&pairs);
399 assert!(PostgresSinkConfig::from_config(&config).is_err());
400 }
401
402 #[test]
403 fn test_qualified_table_name() {
404 let cfg = PostgresSinkConfig::new("localhost", "db", "events");
405 assert_eq!(cfg.qualified_table_name(), "public.events");
406
407 let mut cfg2 = cfg;
408 cfg2.schema_name = "analytics".to_string();
409 assert_eq!(cfg2.qualified_table_name(), "analytics.events");
410 }
411
412 #[test]
413 fn test_effective_sink_id() {
414 let cfg = PostgresSinkConfig::new("localhost", "db", "events");
415 assert_eq!(cfg.effective_sink_id(), "laminardb-sink-public.events");
416
417 let mut cfg2 = cfg;
418 cfg2.sink_id = "custom-sink".to_string();
419 assert_eq!(cfg2.effective_sink_id(), "custom-sink");
420 }
421
422 #[test]
423 fn test_defaults() {
424 let cfg = PostgresSinkConfig::default();
425 assert_eq!(cfg.hostname, "localhost");
426 assert_eq!(cfg.port, 5432);
427 assert_eq!(cfg.schema_name, "public");
428 assert_eq!(cfg.write_mode, WriteMode::Append);
429 assert_eq!(cfg.batch_size, 4096);
430 assert_eq!(cfg.pool_size, 4);
431 assert_eq!(cfg.ssl_mode, SslMode::Prefer);
432 assert!(!cfg.auto_create_table);
433 assert!(!cfg.changelog_mode);
434 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
435 }
436
437 #[test]
438 fn test_write_mode_parse() {
439 assert_eq!("append".parse::<WriteMode>().unwrap(), WriteMode::Append);
440 assert_eq!("copy".parse::<WriteMode>().unwrap(), WriteMode::Append);
441 assert_eq!("upsert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
442 assert_eq!("insert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
443 assert!("unknown".parse::<WriteMode>().is_err());
444 }
445
446 #[test]
447 fn test_write_mode_display() {
448 assert_eq!(WriteMode::Append.to_string(), "append");
449 assert_eq!(WriteMode::Upsert.to_string(), "upsert");
450 }
451
452 #[test]
453 fn test_delivery_guarantee_parse() {
454 assert_eq!(
455 "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
456 DeliveryGuarantee::AtLeastOnce
457 );
458 assert_eq!(
459 "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
460 DeliveryGuarantee::AtLeastOnce
461 );
462 assert_eq!(
463 "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
464 DeliveryGuarantee::ExactlyOnce
465 );
466 assert!("unknown".parse::<DeliveryGuarantee>().is_err());
467 }
468
469 #[test]
470 fn test_delivery_guarantee_display() {
471 assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
472 assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
473 }
474
475 #[test]
476 fn test_ssl_mode_parse() {
477 assert_eq!("disable".parse::<SslMode>().unwrap(), SslMode::Disable);
478 assert_eq!("prefer".parse::<SslMode>().unwrap(), SslMode::Prefer);
479 assert_eq!("require".parse::<SslMode>().unwrap(), SslMode::Require);
480 assert_eq!("verify-ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
481 assert_eq!(
482 "verify-full".parse::<SslMode>().unwrap(),
483 SslMode::VerifyFull
484 );
485 assert!("unknown".parse::<SslMode>().is_err());
486 }
487
488 #[test]
489 fn test_ssl_mode_display() {
490 assert_eq!(SslMode::Disable.to_string(), "disable");
491 assert_eq!(SslMode::Prefer.to_string(), "prefer");
492 assert_eq!(SslMode::Require.to_string(), "require");
493 assert_eq!(SslMode::VerifyCa.to_string(), "verify-ca");
494 assert_eq!(SslMode::VerifyFull.to_string(), "verify-full");
495 }
496
497 #[test]
498 fn test_invalid_port() {
499 let mut pairs = required_pairs();
500 pairs.push(("port", "not_a_number"));
501 let config = make_config(&pairs);
502 assert!(PostgresSinkConfig::from_config(&config).is_err());
503 }
504
505 #[test]
506 fn test_invalid_batch_size() {
507 let mut pairs = required_pairs();
508 pairs.push(("batch.size", "abc"));
509 let config = make_config(&pairs);
510 assert!(PostgresSinkConfig::from_config(&config).is_err());
511 }
512}