1use std::time::Duration;
7
8use crate::config::ConnectorConfig;
9use crate::error::ConnectorError;
10
11#[derive(Debug, Clone)]
15pub struct PostgresSinkConfig {
16 pub hostname: String,
18
19 pub port: u16,
21
22 pub database: String,
24
25 pub username: String,
27
28 pub password: String,
30
31 pub schema_name: String,
33
34 pub table_name: String,
36
37 pub write_mode: WriteMode,
39
40 pub primary_key_columns: Vec<String>,
42
43 pub batch_size: usize,
45
46 pub flush_interval: Duration,
48
49 pub pool_size: usize,
51
52 pub connect_timeout: Duration,
54
55 pub ssl_mode: SslMode,
57
58 pub auto_create_table: bool,
60
61 pub changelog_mode: bool,
63
64 pub delivery_guarantee: DeliveryGuarantee,
66
67 pub sink_id: String,
69}
70
71impl Default for PostgresSinkConfig {
72 fn default() -> Self {
73 Self {
74 hostname: "localhost".to_string(),
75 port: 5432,
76 database: String::new(),
77 username: String::new(),
78 password: String::new(),
79 schema_name: "public".to_string(),
80 table_name: String::new(),
81 write_mode: WriteMode::Append,
82 primary_key_columns: Vec::new(),
83 batch_size: 4096,
84 flush_interval: Duration::from_millis(250),
85 pool_size: 4,
86 connect_timeout: Duration::from_secs(10),
87 ssl_mode: SslMode::Prefer,
88 auto_create_table: false,
89 changelog_mode: false,
90 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
91 sink_id: String::new(),
92 }
93 }
94}
95
96impl PostgresSinkConfig {
97 #[must_use]
99 pub fn new(hostname: &str, database: &str, table_name: &str) -> Self {
100 Self {
101 hostname: hostname.to_string(),
102 database: database.to_string(),
103 table_name: table_name.to_string(),
104 ..Default::default()
105 }
106 }
107
108 #[allow(clippy::field_reassign_with_default)]
122 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
123 let mut cfg = Self::default();
124
125 cfg.hostname = config.require("hostname")?.to_string();
126 cfg.database = config.require("database")?.to_string();
127 cfg.username = config.require("username")?.to_string();
128 cfg.table_name = config.require("table.name")?.to_string();
129
130 if let Some(v) = config.get("password") {
131 cfg.password = v.to_string();
132 }
133 if let Some(v) = config.get("port") {
134 cfg.port = crate::config::parse_port(v)?;
135 }
136 if let Some(v) = config.get("schema.name") {
137 cfg.schema_name = v.to_string();
138 }
139 if let Some(v) = config.get("write.mode") {
140 cfg.write_mode = v.parse().map_err(|_| {
141 ConnectorError::ConfigurationError(format!(
142 "invalid write.mode: '{v}' (expected 'append' or 'upsert')"
143 ))
144 })?;
145 }
146 if let Some(v) = config.get("primary.key") {
147 cfg.primary_key_columns = v.split(',').map(|c| c.trim().to_string()).collect();
148 }
149 if let Some(v) = config.get("batch.size") {
150 cfg.batch_size = v.parse().map_err(|_| {
151 ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
152 })?;
153 }
154 if let Some(v) = config.get("flush.interval.ms") {
155 let ms: u64 = v.parse().map_err(|_| {
156 ConnectorError::ConfigurationError(format!("invalid flush.interval.ms: '{v}'"))
157 })?;
158 cfg.flush_interval = Duration::from_millis(ms);
159 }
160 if let Some(v) = config.get("pool.size") {
161 cfg.pool_size = v.parse().map_err(|_| {
162 ConnectorError::ConfigurationError(format!("invalid pool.size: '{v}'"))
163 })?;
164 }
165 if let Some(v) = config.get("connect.timeout.ms") {
166 let ms: u64 = v.parse().map_err(|_| {
167 ConnectorError::ConfigurationError(format!("invalid connect.timeout.ms: '{v}'"))
168 })?;
169 cfg.connect_timeout = Duration::from_millis(ms);
170 }
171 if let Some(v) = config.get("ssl.mode") {
172 cfg.ssl_mode = v.parse().map_err(|_| {
173 ConnectorError::ConfigurationError(format!(
174 "invalid ssl.mode: '{v}' (expected disable/prefer/require/verify-ca/verify-full)"
175 ))
176 })?;
177 }
178 if let Some(v) = config.get("auto.create.table") {
179 cfg.auto_create_table = v.eq_ignore_ascii_case("true");
180 }
181 if let Some(v) = config.get("changelog.mode") {
182 cfg.changelog_mode = v.eq_ignore_ascii_case("true");
183 }
184 if let Some(v) = config.get("delivery.guarantee") {
185 cfg.delivery_guarantee = v.parse().map_err(|_| {
186 ConnectorError::ConfigurationError(format!(
187 "invalid delivery.guarantee: '{v}' \
188 (expected 'at_least_once' or 'exactly_once')"
189 ))
190 })?;
191 }
192 if let Some(v) = config.get("sink.id") {
193 cfg.sink_id = v.to_string();
194 }
195
196 cfg.validate()?;
197 Ok(cfg)
198 }
199
200 pub fn validate(&self) -> Result<(), ConnectorError> {
206 if self.table_name.is_empty() {
207 return Err(ConnectorError::MissingConfig("table.name".into()));
208 }
209 if self.write_mode == WriteMode::Upsert && self.primary_key_columns.is_empty() {
210 return Err(ConnectorError::ConfigurationError(
211 "upsert mode requires 'primary.key' to be set".into(),
212 ));
213 }
214 if self.batch_size == 0 {
215 return Err(ConnectorError::ConfigurationError(
216 "batch.size must be > 0".into(),
217 ));
218 }
219 if self.pool_size == 0 {
220 return Err(ConnectorError::ConfigurationError(
221 "pool.size must be > 0".into(),
222 ));
223 }
224 Ok(())
225 }
226
227 #[must_use]
229 pub fn qualified_table_name(&self) -> String {
230 format!("{}.{}", self.schema_name, self.table_name)
231 }
232
233 #[must_use]
235 pub fn effective_sink_id(&self) -> String {
236 if self.sink_id.is_empty() {
237 format!("laminardb-sink-{}", self.qualified_table_name())
238 } else {
239 self.sink_id.clone()
240 }
241 }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub enum WriteMode {
247 Append,
250 Upsert,
253}
254
255str_enum!(WriteMode, lowercase_nodash, String, "unknown write mode",
256 Append => "append", "copy";
257 Upsert => "upsert", "insert"
258);
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262pub enum DeliveryGuarantee {
263 AtLeastOnce,
265 ExactlyOnce,
267}
268
269str_enum!(DeliveryGuarantee, lowercase_nodash, String, "unknown delivery guarantee",
270 AtLeastOnce => "at_least_once", "at-least-once", "atleastonce";
271 ExactlyOnce => "exactly_once", "exactly-once", "exactlyonce"
272);
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum SslMode {
277 Disable,
279 Prefer,
281 Require,
283 VerifyCa,
285 VerifyFull,
287}
288
289str_enum!(SslMode, lowercase_nodash, String, "unknown SSL mode",
290 Disable => "disable", "off";
291 Prefer => "prefer";
292 Require => "require";
293 VerifyCa => "verify-ca", "verify_ca", "verifyca";
294 VerifyFull => "verify-full", "verify_full", "verifyfull"
295);
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
302 let mut config = ConnectorConfig::new("postgres-sink");
303 for (k, v) in pairs {
304 config.set(*k, *v);
305 }
306 config
307 }
308
309 fn required_pairs() -> Vec<(&'static str, &'static str)> {
310 vec![
311 ("hostname", "localhost"),
312 ("database", "mydb"),
313 ("username", "writer"),
314 ("table.name", "events"),
315 ]
316 }
317
318 #[test]
319 fn test_parse_required_fields() {
320 let config = make_config(&required_pairs());
321 let cfg = PostgresSinkConfig::from_config(&config).unwrap();
322 assert_eq!(cfg.hostname, "localhost");
323 assert_eq!(cfg.database, "mydb");
324 assert_eq!(cfg.username, "writer");
325 assert_eq!(cfg.table_name, "events");
326 assert_eq!(cfg.port, 5432);
327 assert_eq!(cfg.schema_name, "public");
328 assert_eq!(cfg.write_mode, WriteMode::Append);
329 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
330 }
331
332 #[test]
333 fn test_missing_hostname() {
334 let config = make_config(&[("database", "db"), ("username", "u"), ("table.name", "t")]);
335 assert!(PostgresSinkConfig::from_config(&config).is_err());
336 }
337
338 #[test]
339 fn test_missing_database() {
340 let config = make_config(&[("hostname", "h"), ("username", "u"), ("table.name", "t")]);
341 assert!(PostgresSinkConfig::from_config(&config).is_err());
342 }
343
344 #[test]
345 fn test_missing_username() {
346 let config = make_config(&[("hostname", "h"), ("database", "db"), ("table.name", "t")]);
347 assert!(PostgresSinkConfig::from_config(&config).is_err());
348 }
349
350 #[test]
351 fn test_missing_table_name() {
352 let config = make_config(&[("hostname", "h"), ("database", "db"), ("username", "u")]);
353 assert!(PostgresSinkConfig::from_config(&config).is_err());
354 }
355
356 #[test]
357 fn test_parse_all_optional_fields() {
358 let mut pairs = required_pairs();
359 pairs.extend_from_slice(&[
360 ("password", "secret"),
361 ("port", "5433"),
362 ("schema.name", "analytics"),
363 ("write.mode", "upsert"),
364 ("primary.key", "id, region"),
365 ("batch.size", "8192"),
366 ("flush.interval.ms", "500"),
367 ("pool.size", "8"),
368 ("connect.timeout.ms", "5000"),
369 ("ssl.mode", "require"),
370 ("auto.create.table", "true"),
371 ("changelog.mode", "true"),
372 ("delivery.guarantee", "exactly_once"),
373 ("sink.id", "my-sink"),
374 ]);
375 let config = make_config(&pairs);
376 let cfg = PostgresSinkConfig::from_config(&config).unwrap();
377
378 assert_eq!(cfg.password, "secret");
379 assert_eq!(cfg.port, 5433);
380 assert_eq!(cfg.schema_name, "analytics");
381 assert_eq!(cfg.write_mode, WriteMode::Upsert);
382 assert_eq!(cfg.primary_key_columns, vec!["id", "region"]);
383 assert_eq!(cfg.batch_size, 8192);
384 assert_eq!(cfg.flush_interval, Duration::from_millis(500));
385 assert_eq!(cfg.pool_size, 8);
386 assert_eq!(cfg.connect_timeout, Duration::from_millis(5000));
387 assert_eq!(cfg.ssl_mode, SslMode::Require);
388 assert!(cfg.auto_create_table);
389 assert!(cfg.changelog_mode);
390 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
391 assert_eq!(cfg.sink_id, "my-sink");
392 }
393
394 #[test]
395 fn test_upsert_requires_primary_key() {
396 let mut pairs = required_pairs();
397 pairs.push(("write.mode", "upsert"));
398 let config = make_config(&pairs);
399 let result = PostgresSinkConfig::from_config(&config);
400 assert!(result.is_err());
401 let err = result.unwrap_err().to_string();
402 assert!(err.contains("primary.key"), "error: {err}");
403 }
404
405 #[test]
406 fn test_batch_size_zero_rejected() {
407 let mut pairs = required_pairs();
408 pairs.push(("batch.size", "0"));
409 let config = make_config(&pairs);
410 assert!(PostgresSinkConfig::from_config(&config).is_err());
411 }
412
413 #[test]
414 fn test_pool_size_zero_rejected() {
415 let mut pairs = required_pairs();
416 pairs.push(("pool.size", "0"));
417 let config = make_config(&pairs);
418 assert!(PostgresSinkConfig::from_config(&config).is_err());
419 }
420
421 #[test]
422 fn test_qualified_table_name() {
423 let cfg = PostgresSinkConfig::new("localhost", "db", "events");
424 assert_eq!(cfg.qualified_table_name(), "public.events");
425
426 let mut cfg2 = cfg;
427 cfg2.schema_name = "analytics".to_string();
428 assert_eq!(cfg2.qualified_table_name(), "analytics.events");
429 }
430
431 #[test]
432 fn test_effective_sink_id() {
433 let cfg = PostgresSinkConfig::new("localhost", "db", "events");
434 assert_eq!(cfg.effective_sink_id(), "laminardb-sink-public.events");
435
436 let mut cfg2 = cfg;
437 cfg2.sink_id = "custom-sink".to_string();
438 assert_eq!(cfg2.effective_sink_id(), "custom-sink");
439 }
440
441 #[test]
442 fn test_defaults() {
443 let cfg = PostgresSinkConfig::default();
444 assert_eq!(cfg.hostname, "localhost");
445 assert_eq!(cfg.port, 5432);
446 assert_eq!(cfg.schema_name, "public");
447 assert_eq!(cfg.write_mode, WriteMode::Append);
448 assert_eq!(cfg.batch_size, 4096);
449 assert_eq!(cfg.pool_size, 4);
450 assert_eq!(cfg.ssl_mode, SslMode::Prefer);
451 assert!(!cfg.auto_create_table);
452 assert!(!cfg.changelog_mode);
453 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
454 }
455
456 #[test]
457 fn test_write_mode_parse() {
458 assert_eq!("append".parse::<WriteMode>().unwrap(), WriteMode::Append);
459 assert_eq!("copy".parse::<WriteMode>().unwrap(), WriteMode::Append);
460 assert_eq!("upsert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
461 assert_eq!("insert".parse::<WriteMode>().unwrap(), WriteMode::Upsert);
462 assert!("unknown".parse::<WriteMode>().is_err());
463 }
464
465 #[test]
466 fn test_write_mode_display() {
467 assert_eq!(WriteMode::Append.to_string(), "append");
468 assert_eq!(WriteMode::Upsert.to_string(), "upsert");
469 }
470
471 #[test]
472 fn test_delivery_guarantee_parse() {
473 assert_eq!(
474 "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
475 DeliveryGuarantee::AtLeastOnce
476 );
477 assert_eq!(
478 "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
479 DeliveryGuarantee::AtLeastOnce
480 );
481 assert_eq!(
482 "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
483 DeliveryGuarantee::ExactlyOnce
484 );
485 assert!("unknown".parse::<DeliveryGuarantee>().is_err());
486 }
487
488 #[test]
489 fn test_delivery_guarantee_display() {
490 assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at_least_once");
491 assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly_once");
492 }
493
494 #[test]
495 fn test_ssl_mode_parse() {
496 assert_eq!("disable".parse::<SslMode>().unwrap(), SslMode::Disable);
497 assert_eq!("prefer".parse::<SslMode>().unwrap(), SslMode::Prefer);
498 assert_eq!("require".parse::<SslMode>().unwrap(), SslMode::Require);
499 assert_eq!("verify-ca".parse::<SslMode>().unwrap(), SslMode::VerifyCa);
500 assert_eq!(
501 "verify-full".parse::<SslMode>().unwrap(),
502 SslMode::VerifyFull
503 );
504 assert!("unknown".parse::<SslMode>().is_err());
505 }
506
507 #[test]
508 fn test_ssl_mode_display() {
509 assert_eq!(SslMode::Disable.to_string(), "disable");
510 assert_eq!(SslMode::Prefer.to_string(), "prefer");
511 assert_eq!(SslMode::Require.to_string(), "require");
512 assert_eq!(SslMode::VerifyCa.to_string(), "verify-ca");
513 assert_eq!(SslMode::VerifyFull.to_string(), "verify-full");
514 }
515
516 #[test]
517 fn test_invalid_port() {
518 let mut pairs = required_pairs();
519 pairs.push(("port", "not_a_number"));
520 let config = make_config(&pairs);
521 assert!(PostgresSinkConfig::from_config(&config).is_err());
522 }
523
524 #[test]
525 fn test_invalid_batch_size() {
526 let mut pairs = required_pairs();
527 pairs.push(("batch.size", "abc"));
528 let config = make_config(&pairs);
529 assert!(PostgresSinkConfig::from_config(&config).is_err());
530 }
531}