1use std::time::Duration;
4
5use crate::config::ConnectorConfig;
6use crate::connector::DeliveryGuarantee;
7use crate::error::ConnectorError;
8use crate::serde::Format;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum Mode {
13 Core,
15 #[default]
17 JetStream,
18}
19
20str_enum!(fromstr Mode, lowercase, ConnectorError, "invalid nats mode",
21 Core => "core";
22 JetStream => "jetstream"
23);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum AckPolicy {
28 #[default]
30 Explicit,
31 None,
33}
34
35str_enum!(fromstr AckPolicy, lowercase, ConnectorError, "invalid ack.policy",
36 Explicit => "explicit";
37 None => "none"
38);
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub enum DeliverPolicy {
43 #[default]
45 All,
46 New,
48 ByStartSequence,
50 ByStartTime,
52}
53
54str_enum!(fromstr DeliverPolicy, lowercase, ConnectorError, "invalid deliver.policy",
55 All => "all";
56 New => "new";
57 ByStartSequence => "by_start_sequence";
58 ByStartTime => "by_start_time"
59);
60
61#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum SubjectSpec {
64 Literal(String),
66 Column(String),
68}
69
70#[derive(Clone, Default, PartialEq, Eq)]
73#[allow(missing_docs)]
74pub enum AuthMode {
75 #[default]
76 None,
77 UserPass {
78 user: String,
79 password: String,
80 },
81 Token(String),
82 CredsFile(String),
83}
84
85impl std::fmt::Debug for AuthMode {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 Self::None => f.write_str("None"),
89 Self::UserPass { user, .. } => f
90 .debug_struct("UserPass")
91 .field("user", user)
92 .field("password", &"<redacted>")
93 .finish(),
94 Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
95 Self::CredsFile(path) => f.debug_tuple("CredsFile").field(path).finish(),
96 }
97 }
98}
99
100#[derive(Debug, Clone, Default)]
102#[allow(missing_docs)]
103pub struct TlsConfig {
104 pub enabled: bool,
105 pub ca_location: Option<String>,
106 pub cert_location: Option<String>,
107 pub key_location: Option<String>,
108}
109
110#[derive(Debug, Clone)]
112#[allow(missing_docs)] pub struct NatsSourceConfig {
114 pub servers: Vec<String>,
115 pub mode: Mode,
116 pub auth: AuthMode,
117 pub tls: TlsConfig,
118
119 pub stream: Option<String>,
121 pub subject: Option<String>,
122 pub subject_filters: Vec<String>,
123 pub consumer: Option<String>,
124 pub deliver_policy: DeliverPolicy,
125 pub start_sequence: Option<u64>,
126 pub start_time: Option<String>, pub ack_policy: AckPolicy,
128 pub ack_wait: Duration,
129 pub max_deliver: i64,
130 pub max_ack_pending: i64,
131 pub fetch_batch: usize,
132 pub fetch_max_wait: Duration,
133 pub fetch_error_threshold: u32,
136 pub lag_poll_interval: Duration,
139
140 pub queue_group: Option<String>,
142
143 pub format: Format,
144}
145
146impl NatsSourceConfig {
147 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
151 let servers = parse_servers(config)?;
152 let mode = parse_or_default::<Mode>(config, "mode")?;
153 let auth = parse_auth(config)?;
154 let tls = parse_tls(config)?;
155
156 let stream = config.get("stream").map(str::to_string);
157 let subject = config.get("subject").map(str::to_string);
158 let subject_filters = config
159 .get("subject.filters")
160 .map(split_csv)
161 .unwrap_or_default();
162 let consumer = config.get("consumer").map(str::to_string);
163 let deliver_policy = parse_or_default::<DeliverPolicy>(config, "deliver.policy")?;
164 let start_sequence = parse_opt_u64(config, "start.sequence")?;
165 let start_time = config.get("start.time").map(str::to_string);
166 let ack_policy = parse_or_default::<AckPolicy>(config, "ack.policy")?;
167 let ack_wait = require_positive_duration(config, "ack.wait.ms", Duration::from_secs(60))?;
168 let max_deliver = require_positive_or_unlimited_i64(config, "max.deliver", 5)?;
169 let max_ack_pending = require_positive_or_unlimited_i64(config, "max.ack.pending", 10_000)?;
170 let fetch_batch = require_positive_usize(config, "fetch.batch", 500)?;
171 let fetch_max_wait =
172 require_positive_duration(config, "fetch.max.wait.ms", Duration::from_millis(500))?;
173 let fetch_error_threshold = parse_u32(config, "fetch.error.threshold", 10)?;
174 let lag_poll_interval =
175 parse_duration_ms(config, "lag.poll.interval.ms", Duration::from_secs(10))?;
176 let queue_group = config.get("queue.group").map(str::to_string);
177 let format = parse_format(config)?;
178
179 let cfg = Self {
180 servers,
181 mode,
182 auth,
183 tls,
184 stream,
185 subject,
186 subject_filters,
187 consumer,
188 deliver_policy,
189 start_sequence,
190 start_time,
191 ack_policy,
192 ack_wait,
193 max_deliver,
194 max_ack_pending,
195 fetch_batch,
196 fetch_max_wait,
197 fetch_error_threshold,
198 lag_poll_interval,
199 queue_group,
200 format,
201 };
202 cfg.validate()?;
203 Ok(cfg)
204 }
205
206 fn validate(&self) -> Result<(), ConnectorError> {
207 match self.mode {
208 Mode::JetStream => {
209 if self.stream.is_none() {
210 return Err(cfg_err(
211 "[LDB-5040] jetstream mode requires 'stream' to be set",
212 ));
213 }
214 if self.consumer.is_none() {
215 return Err(cfg_err(
216 "[LDB-5041] jetstream mode requires 'consumer' (durable name) — \
217 ephemeral consumers are not supported",
218 ));
219 }
220 if self.subject.is_none() && self.subject_filters.is_empty() {
221 return Err(cfg_err(
222 "[LDB-5042] jetstream mode requires 'subject' or 'subject.filters' \
223 — the consumer must have at least one filter",
224 ));
225 }
226 if matches!(self.deliver_policy, DeliverPolicy::ByStartSequence)
227 && self.start_sequence.is_none()
228 {
229 return Err(cfg_err(
230 "[LDB-5043] deliver.policy=by_start_sequence requires 'start.sequence'",
231 ));
232 }
233 if matches!(self.deliver_policy, DeliverPolicy::ByStartTime)
234 && self.start_time.is_none()
235 {
236 return Err(cfg_err(
237 "[LDB-5044] deliver.policy=by_start_time requires 'start.time'",
238 ));
239 }
240 }
241 Mode::Core => {
242 if self.subject.is_none() {
243 return Err(cfg_err(
244 "[LDB-5045] core mode requires 'subject' (JetStream stream config \
245 does not apply)",
246 ));
247 }
248 }
249 }
250 Ok(())
251 }
252}
253
254#[derive(Debug, Clone)]
256#[allow(missing_docs)]
257pub struct NatsSinkConfig {
258 pub servers: Vec<String>,
259 pub mode: Mode,
260 pub auth: AuthMode,
261 pub tls: TlsConfig,
262 pub stream: Option<String>,
263 pub subject: SubjectSpec,
264 pub expected_stream: Option<String>,
265 pub delivery_guarantee: DeliveryGuarantee,
266 pub dedup_id_column: Option<String>,
267 pub min_duplicate_window: Duration,
271 pub max_pending: usize,
272 pub ack_timeout: Duration,
273 pub format: Format,
274 pub header_columns: Vec<String>,
275}
276
277impl NatsSinkConfig {
278 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
282 let servers = parse_servers(config)?;
283 let mode = parse_or_default::<Mode>(config, "mode")?;
284 let auth = parse_auth(config)?;
285 let tls = parse_tls(config)?;
286
287 let subject = match (config.get("subject"), config.get("subject.column")) {
288 (Some(s), None) => SubjectSpec::Literal(s.to_string()),
289 (None, Some(col)) => SubjectSpec::Column(col.to_string()),
290 (Some(_), Some(_)) => {
291 return Err(cfg_err(
292 "[LDB-5050] set 'subject' OR 'subject.column', not both",
293 ));
294 }
295 (None, None) => {
296 return Err(cfg_err(
297 "[LDB-5051] 'subject' or 'subject.column' is required",
298 ));
299 }
300 };
301
302 let delivery_guarantee =
303 parse_or_default::<DeliveryGuarantee>(config, "delivery.guarantee")
304 .map_err(|_| cfg_err("[LDB-5052] invalid delivery.guarantee"))?;
305 let dedup_id_column = config.get("dedup.id.column").map(str::to_string);
306
307 let cfg = Self {
308 servers,
309 mode,
310 auth,
311 tls,
312 stream: config.get("stream").map(str::to_string),
313 subject,
314 expected_stream: config.get("expected.stream").map(str::to_string),
315 delivery_guarantee,
316 dedup_id_column,
317 min_duplicate_window: require_positive_duration(
318 config,
319 "min.duplicate.window.ms",
320 Duration::from_secs(120),
321 )?,
322 max_pending: require_positive_usize(config, "max.pending", 4096)?,
323 ack_timeout: require_positive_duration(
324 config,
325 "ack.timeout.ms",
326 Duration::from_secs(30),
327 )?,
328 format: parse_format(config)?,
329 header_columns: config
330 .get("header.columns")
331 .map(split_csv)
332 .unwrap_or_default(),
333 };
334 cfg.validate()?;
335 Ok(cfg)
336 }
337
338 fn validate(&self) -> Result<(), ConnectorError> {
339 if self.mode == Mode::Core && self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
340 return Err(cfg_err(
341 "[LDB-5053] delivery.guarantee=exactly_once is not supported in mode=core \
342 (no server-side dedup); use mode=jetstream",
343 ));
344 }
345 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
346 && self.dedup_id_column.is_none()
347 {
348 return Err(cfg_err(
349 "[LDB-5054] delivery.guarantee=exactly_once requires 'dedup.id.column' — \
350 msg-id dedup with epoch-row hashing is not supported (deterministic replay \
351 is too fragile; name a unique-per-row column)",
352 ));
353 }
354 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce && self.stream.is_none() {
355 return Err(cfg_err(
356 "[LDB-5055] delivery.guarantee=exactly_once requires 'stream' so the sink \
357 can validate its duplicate_window at startup",
358 ));
359 }
360 for h in &self.header_columns {
361 if is_reserved_header(h) {
362 return Err(cfg_err(&format!(
363 "[LDB-5065] header.columns entry '{h}' collides with a reserved NATS \
364 header (Nats-Msg-Id / Nats-Expected-Stream); rename the column",
365 )));
366 }
367 }
368 Ok(())
369 }
370}
371
372fn is_reserved_header(name: &str) -> bool {
375 const RESERVED: &[&str] = &["Nats-Msg-Id", "Nats-Expected-Stream"];
376 RESERVED.iter().any(|r| r.eq_ignore_ascii_case(name))
377}
378
379fn cfg_err(msg: &str) -> ConnectorError {
382 ConnectorError::ConfigurationError(msg.to_string())
383}
384
385fn parse_servers(config: &ConnectorConfig) -> Result<Vec<String>, ConnectorError> {
386 let raw = config.require("servers")?;
387 let servers: Vec<String> = split_csv(raw);
388 if servers.is_empty() {
389 return Err(cfg_err("[LDB-5030] 'servers' must not be empty"));
390 }
391 Ok(servers)
392}
393
394fn split_csv(raw: &str) -> Vec<String> {
395 raw.split(',')
396 .map(str::trim)
397 .filter(|s| !s.is_empty())
398 .map(str::to_string)
399 .collect()
400}
401
402fn parse_or_default<T: std::str::FromStr + Default>(
403 config: &ConnectorConfig,
404 key: &str,
405) -> Result<T, ConnectorError>
406where
407 T::Err: std::fmt::Display,
408{
409 match config.get(key) {
410 Some(s) => s
411 .parse()
412 .map_err(|e: T::Err| cfg_err(&format!("invalid {key}: {e}"))),
413 None => Ok(T::default()),
414 }
415}
416
417fn parse_format(config: &ConnectorConfig) -> Result<Format, ConnectorError> {
418 match config.get("format") {
419 Some(s) => Format::parse(s).map_err(|e| cfg_err(&e.to_string())),
420 None => Ok(Format::Json),
421 }
422}
423
424fn parse_bool(config: &ConnectorConfig, key: &str, default: bool) -> Result<bool, ConnectorError> {
425 match config.get(key) {
426 Some(s) => s
427 .parse::<bool>()
428 .map_err(|_| cfg_err(&format!("{key} must be 'true' or 'false', got '{s}'"))),
429 None => Ok(default),
430 }
431}
432
433fn parse_opt_u64(config: &ConnectorConfig, key: &str) -> Result<Option<u64>, ConnectorError> {
434 config.get(key).map(|s| parse_int(key, s)).transpose()
435}
436
437fn parse_i64(config: &ConnectorConfig, key: &str, default: i64) -> Result<i64, ConnectorError> {
438 config.get(key).map_or(Ok(default), |s| parse_int(key, s))
439}
440
441fn parse_usize(
442 config: &ConnectorConfig,
443 key: &str,
444 default: usize,
445) -> Result<usize, ConnectorError> {
446 config.get(key).map_or(Ok(default), |s| parse_int(key, s))
447}
448
449fn parse_u32(config: &ConnectorConfig, key: &str, default: u32) -> Result<u32, ConnectorError> {
450 config.get(key).map_or(Ok(default), |s| parse_int(key, s))
451}
452
453fn require_positive_duration(
454 config: &ConnectorConfig,
455 key: &str,
456 default: Duration,
457) -> Result<Duration, ConnectorError> {
458 let v = parse_duration_ms(config, key, default)?;
459 if v.is_zero() {
460 return Err(cfg_err(&format!("{key} must be > 0")));
461 }
462 Ok(v)
463}
464
465fn require_positive_usize(
466 config: &ConnectorConfig,
467 key: &str,
468 default: usize,
469) -> Result<usize, ConnectorError> {
470 let v = parse_usize(config, key, default)?;
471 if v == 0 {
472 return Err(cfg_err(&format!("{key} must be > 0")));
473 }
474 Ok(v)
475}
476
477fn require_positive_or_unlimited_i64(
480 config: &ConnectorConfig,
481 key: &str,
482 default: i64,
483) -> Result<i64, ConnectorError> {
484 let v = parse_i64(config, key, default)?;
485 if v == 0 || v < -1 {
486 return Err(cfg_err(&format!("{key} must be > 0, or -1 for unlimited")));
487 }
488 Ok(v)
489}
490
491fn parse_duration_ms(
492 config: &ConnectorConfig,
493 key: &str,
494 default: Duration,
495) -> Result<Duration, ConnectorError> {
496 config.get(key).map_or(Ok(default), |s| {
497 parse_int::<u64>(key, s).map(Duration::from_millis)
498 })
499}
500
501fn parse_int<T: std::str::FromStr>(key: &str, raw: &str) -> Result<T, ConnectorError> {
502 raw.parse::<T>()
503 .map_err(|_| cfg_err(&format!("{key} must be an integer, got '{raw}'")))
504}
505
506fn parse_auth(config: &ConnectorConfig) -> Result<AuthMode, ConnectorError> {
507 let mode = config.get("auth.mode").unwrap_or("none");
508 match mode {
509 "none" | "" => {
510 if config.get("user").is_some()
511 || config.get("password").is_some()
512 || config.get("token").is_some()
513 || config.get("creds.file").is_some()
514 {
515 return Err(cfg_err(
516 "[LDB-5063] credentials set but auth.mode=none; \
517 set auth.mode explicitly or remove the credentials",
518 ));
519 }
520 Ok(AuthMode::None)
521 }
522 "user_pass" => {
523 let user = config
524 .get("user")
525 .ok_or_else(|| cfg_err("[LDB-5060] auth.mode=user_pass requires 'user'"))?
526 .to_string();
527 let password = config
528 .get("password")
529 .ok_or_else(|| cfg_err("[LDB-5060] auth.mode=user_pass requires 'password'"))?
530 .to_string();
531 Ok(AuthMode::UserPass { user, password })
532 }
533 "token" => {
534 let token = config
535 .get("token")
536 .ok_or_else(|| cfg_err("[LDB-5061] auth.mode=token requires 'token'"))?
537 .to_string();
538 Ok(AuthMode::Token(token))
539 }
540 "creds_file" => {
541 let path = config
542 .get("creds.file")
543 .ok_or_else(|| cfg_err("[LDB-5064] auth.mode=creds_file requires 'creds.file'"))?
544 .to_string();
545 Ok(AuthMode::CredsFile(path))
546 }
547 other => Err(cfg_err(&format!(
548 "invalid auth.mode '{other}'; expected none | user_pass | token | creds_file"
549 ))),
550 }
551}
552
553fn parse_tls(config: &ConnectorConfig) -> Result<TlsConfig, ConnectorError> {
554 let cert_location = config.get("tls.cert.location").map(str::to_string);
555 let key_location = config.get("tls.key.location").map(str::to_string);
556 if cert_location.is_some() != key_location.is_some() {
557 return Err(cfg_err(
558 "[LDB-5062] tls.cert.location and tls.key.location must both be set \
559 (mutual-TLS client cert) or both be unset",
560 ));
561 }
562 Ok(TlsConfig {
563 enabled: parse_bool(config, "tls.enabled", false)?,
564 ca_location: config.get("tls.ca.location").map(str::to_string),
565 cert_location,
566 key_location,
567 })
568}
569
570pub(super) fn build_connect_options(
577 auth: &AuthMode,
578 tls: &TlsConfig,
579) -> Result<async_nats::ConnectOptions, ConnectorError> {
580 let mut opts = async_nats::ConnectOptions::new();
581 match auth {
582 AuthMode::None => {}
583 AuthMode::UserPass { user, password } => {
584 opts = opts.user_and_password(user.clone(), password.clone());
585 }
586 AuthMode::Token(token) => {
587 opts = opts.token(token.clone());
588 }
589 AuthMode::CredsFile(path) => {
590 let contents = std::fs::read_to_string(path)
591 .map_err(|e| cfg_err(&format!("creds.file '{path}': {e}")))?;
592 opts = async_nats::ConnectOptions::with_credentials(&contents)
593 .map_err(|e| cfg_err(&format!("creds.file '{path}' invalid: {e}")))?;
594 }
595 }
596 let tls_touched = tls.enabled || tls.ca_location.is_some() || tls.cert_location.is_some();
597 if tls_touched {
598 opts = opts.require_tls(true);
599 }
600 if let Some(ca) = &tls.ca_location {
601 opts = opts.add_root_certificates(ca.into());
602 }
603 if let (Some(cert), Some(key)) = (&tls.cert_location, &tls.key_location) {
604 opts = opts.add_client_certificate(cert.into(), key.into());
605 }
606 Ok(opts)
607}
608
609#[cfg(test)]
610#[allow(clippy::disallowed_types)]
611mod tests {
612 use super::*;
613 use std::collections::HashMap;
614
615 fn cfg(pairs: &[(&str, &str)]) -> ConnectorConfig {
616 let mut props = HashMap::new();
617 for (k, v) in pairs {
618 props.insert((*k).to_string(), (*v).to_string());
619 }
620 ConnectorConfig::with_properties("nats", props)
621 }
622
623 #[test]
626 fn source_jetstream_requires_stream() {
627 let err = NatsSourceConfig::from_config(&cfg(&[
628 ("servers", "nats://a:4222"),
629 ("consumer", "c"),
630 ("subject", "x.>"),
631 ]))
632 .unwrap_err()
633 .to_string();
634 assert!(err.contains("LDB-5040"), "got: {err}");
635 }
636
637 #[test]
638 fn source_jetstream_requires_consumer() {
639 let err = NatsSourceConfig::from_config(&cfg(&[
640 ("servers", "nats://a:4222"),
641 ("stream", "S"),
642 ("subject", "x.>"),
643 ]))
644 .unwrap_err()
645 .to_string();
646 assert!(err.contains("LDB-5041"), "got: {err}");
647 }
648
649 #[test]
650 fn source_jetstream_requires_subject_or_filters() {
651 let err = NatsSourceConfig::from_config(&cfg(&[
652 ("servers", "nats://a:4222"),
653 ("stream", "S"),
654 ("consumer", "c"),
655 ]))
656 .unwrap_err()
657 .to_string();
658 assert!(err.contains("LDB-5042"), "got: {err}");
659 }
660
661 #[test]
662 fn source_core_requires_subject() {
663 let err =
664 NatsSourceConfig::from_config(&cfg(&[("servers", "nats://a:4222"), ("mode", "core")]))
665 .unwrap_err()
666 .to_string();
667 assert!(err.contains("LDB-5045"), "got: {err}");
668 }
669
670 #[test]
671 fn source_by_start_sequence_requires_start_sequence() {
672 let err = NatsSourceConfig::from_config(&cfg(&[
673 ("servers", "nats://a:4222"),
674 ("stream", "S"),
675 ("consumer", "c"),
676 ("subject", "x.>"),
677 ("deliver.policy", "by_start_sequence"),
678 ]))
679 .unwrap_err()
680 .to_string();
681 assert!(err.contains("LDB-5043"), "got: {err}");
682 }
683
684 #[test]
685 fn source_happy_path_jetstream() {
686 let parsed = NatsSourceConfig::from_config(&cfg(&[
687 ("servers", "nats://a:4222,nats://b:4222"),
688 ("stream", "ORDERS"),
689 ("consumer", "laminar-orders"),
690 ("subject.filters", "orders.us.*,orders.eu.*"),
691 ("ack.wait.ms", "45000"),
692 ("max.deliver", "3"),
693 ]))
694 .unwrap();
695 assert_eq!(parsed.servers.len(), 2);
696 assert_eq!(parsed.mode, Mode::JetStream);
697 assert_eq!(parsed.subject_filters, vec!["orders.us.*", "orders.eu.*"]);
698 assert_eq!(parsed.ack_wait, Duration::from_secs(45));
699 assert_eq!(parsed.max_deliver, 3);
700 }
701
702 #[test]
703 fn source_happy_path_core() {
704 let parsed = NatsSourceConfig::from_config(&cfg(&[
705 ("servers", "nats://a:4222"),
706 ("mode", "core"),
707 ("subject", "events"),
708 ("queue.group", "workers"),
709 ]))
710 .unwrap();
711 assert_eq!(parsed.mode, Mode::Core);
712 assert_eq!(parsed.subject.as_deref(), Some("events"));
713 assert_eq!(parsed.queue_group.as_deref(), Some("workers"));
714 }
715
716 #[test]
719 fn sink_rejects_subject_and_subject_column() {
720 let err = NatsSinkConfig::from_config(&cfg(&[
721 ("servers", "nats://a:4222"),
722 ("subject", "x"),
723 ("subject.column", "c"),
724 ]))
725 .unwrap_err()
726 .to_string();
727 assert!(err.contains("LDB-5050"), "got: {err}");
728 }
729
730 #[test]
731 fn sink_rejects_neither_subject_nor_column() {
732 let err = NatsSinkConfig::from_config(&cfg(&[("servers", "nats://a:4222")]))
733 .unwrap_err()
734 .to_string();
735 assert!(err.contains("LDB-5051"), "got: {err}");
736 }
737
738 #[test]
739 fn sink_rejects_header_column_colliding_with_reserved() {
740 let err = NatsSinkConfig::from_config(&cfg(&[
741 ("servers", "nats://a:4222"),
742 ("subject", "x"),
743 ("header.columns", "trace_id,nats-msg-id"),
744 ]))
745 .unwrap_err()
746 .to_string();
747 assert!(err.contains("LDB-5065"), "got: {err}");
748 }
749
750 #[test]
751 fn sink_rejects_core_with_exactly_once() {
752 let err = NatsSinkConfig::from_config(&cfg(&[
753 ("servers", "nats://a:4222"),
754 ("mode", "core"),
755 ("subject", "x"),
756 ("delivery.guarantee", "exactly_once"),
757 ]))
758 .unwrap_err()
759 .to_string();
760 assert!(err.contains("LDB-5053"), "got: {err}");
761 }
762
763 #[test]
764 fn sink_rejects_exactly_once_without_stream() {
765 let err = NatsSinkConfig::from_config(&cfg(&[
766 ("servers", "nats://a:4222"),
767 ("subject", "x"),
768 ("delivery.guarantee", "exactly_once"),
769 ("dedup.id.column", "event_id"),
770 ]))
771 .unwrap_err()
772 .to_string();
773 assert!(err.contains("LDB-5055"), "got: {err}");
774 }
775
776 #[test]
777 fn sink_rejects_exactly_once_without_dedup_column() {
778 let err = NatsSinkConfig::from_config(&cfg(&[
779 ("servers", "nats://a:4222"),
780 ("stream", "OUT"),
781 ("subject", "x.processed"),
782 ("delivery.guarantee", "exactly_once"),
783 ]))
784 .unwrap_err()
785 .to_string();
786 assert!(err.contains("LDB-5054"), "got: {err}");
787 }
788
789 #[test]
790 fn sink_happy_path_exactly_once() {
791 let parsed = NatsSinkConfig::from_config(&cfg(&[
792 ("servers", "nats://a:4222"),
793 ("stream", "OUT"),
794 ("subject.column", "out_subject"),
795 ("delivery.guarantee", "exactly_once"),
796 ("dedup.id.column", "event_id"),
797 ("header.columns", "trace_id,tenant"),
798 ]))
799 .unwrap();
800 assert_eq!(parsed.subject, SubjectSpec::Column("out_subject".into()));
801 assert_eq!(parsed.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
802 assert_eq!(parsed.dedup_id_column.as_deref(), Some("event_id"));
803 assert_eq!(parsed.header_columns, vec!["trace_id", "tenant"]);
804 }
805
806 #[test]
809 fn servers_required() {
810 let err = NatsSourceConfig::from_config(&cfg(&[]))
811 .unwrap_err()
812 .to_string();
813 assert!(err.contains("servers"), "got: {err}");
814 }
815
816 #[test]
817 fn zero_fetch_batch_rejected() {
818 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("fetch.batch", "0")]))
819 .unwrap_err()
820 .to_string();
821 assert!(err.contains("fetch.batch must be > 0"), "got: {err}");
822 }
823
824 #[test]
825 fn zero_ack_wait_rejected() {
826 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("ack.wait.ms", "0")]))
827 .unwrap_err()
828 .to_string();
829 assert!(err.contains("ack.wait.ms must be > 0"), "got: {err}");
830 }
831
832 #[test]
833 fn max_deliver_unlimited_accepted() {
834 let parsed =
835 NatsSourceConfig::from_config(&jetstream_happy(&[("max.deliver", "-1")])).unwrap();
836 assert_eq!(parsed.max_deliver, -1);
837 }
838
839 #[test]
840 fn max_deliver_negative_other_than_minus_one_rejected() {
841 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("max.deliver", "-5")]))
842 .unwrap_err()
843 .to_string();
844 assert!(err.contains("max.deliver"), "got: {err}");
845 }
846
847 #[test]
848 fn sink_zero_max_pending_rejected() {
849 let err = NatsSinkConfig::from_config(&cfg(&[
850 ("servers", "nats://a:4222"),
851 ("subject", "x"),
852 ("max.pending", "0"),
853 ]))
854 .unwrap_err()
855 .to_string();
856 assert!(err.contains("max.pending must be > 0"), "got: {err}");
857 }
858
859 #[test]
860 fn servers_empty_csv_rejected() {
861 let err = NatsSourceConfig::from_config(&cfg(&[("servers", ",,")]))
862 .unwrap_err()
863 .to_string();
864 assert!(err.contains("LDB-5030"), "got: {err}");
865 }
866
867 fn jetstream_happy(pairs: &[(&str, &str)]) -> ConnectorConfig {
870 let mut base = vec![
871 ("servers", "nats://a:4222"),
872 ("stream", "S"),
873 ("consumer", "c"),
874 ("subject", "x.>"),
875 ];
876 base.extend_from_slice(pairs);
877 cfg(&base)
878 }
879
880 #[test]
881 fn auth_user_pass_requires_user() {
882 let err = NatsSourceConfig::from_config(&jetstream_happy(&[
883 ("auth.mode", "user_pass"),
884 ("password", "secret"),
885 ]))
886 .unwrap_err()
887 .to_string();
888 assert!(err.contains("LDB-5060"), "got: {err}");
889 }
890
891 #[test]
892 fn auth_user_pass_requires_password() {
893 let err = NatsSourceConfig::from_config(&jetstream_happy(&[
894 ("auth.mode", "user_pass"),
895 ("user", "alice"),
896 ]))
897 .unwrap_err()
898 .to_string();
899 assert!(err.contains("LDB-5060"), "got: {err}");
900 }
901
902 #[test]
903 fn auth_user_pass_happy_path() {
904 let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
905 ("auth.mode", "user_pass"),
906 ("user", "alice"),
907 ("password", "wonderland"),
908 ]))
909 .unwrap();
910 assert_eq!(
911 parsed.auth,
912 AuthMode::UserPass {
913 user: "alice".into(),
914 password: "wonderland".into(),
915 }
916 );
917 }
918
919 #[test]
920 fn auth_token_requires_token() {
921 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("auth.mode", "token")]))
922 .unwrap_err()
923 .to_string();
924 assert!(err.contains("LDB-5061"), "got: {err}");
925 }
926
927 #[test]
928 fn auth_token_happy_path() {
929 let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
930 ("auth.mode", "token"),
931 ("token", "abc123"),
932 ]))
933 .unwrap();
934 assert_eq!(parsed.auth, AuthMode::Token("abc123".into()));
935 }
936
937 #[test]
938 fn auth_none_with_stray_credentials_rejected() {
939 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("user", "alice")]))
940 .unwrap_err()
941 .to_string();
942 assert!(err.contains("LDB-5063"), "got: {err}");
943 }
944
945 #[test]
946 fn auth_creds_file_requires_path() {
947 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("auth.mode", "creds_file")]))
948 .unwrap_err()
949 .to_string();
950 assert!(err.contains("LDB-5064"), "got: {err}");
951 }
952
953 #[test]
954 fn auth_creds_file_happy_path() {
955 let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
956 ("auth.mode", "creds_file"),
957 ("creds.file", "/secrets/user.creds"),
958 ]))
959 .unwrap();
960 assert_eq!(
961 parsed.auth,
962 AuthMode::CredsFile("/secrets/user.creds".into())
963 );
964 }
965
966 #[test]
967 fn auth_unknown_mode_rejected() {
968 let err = NatsSourceConfig::from_config(&jetstream_happy(&[("auth.mode", "banana")]))
969 .unwrap_err()
970 .to_string();
971 assert!(err.contains("invalid auth.mode"), "got: {err}");
972 }
973
974 #[test]
977 fn tls_cert_without_key_rejected() {
978 let err = NatsSourceConfig::from_config(&jetstream_happy(&[(
979 "tls.cert.location",
980 "/certs/client.pem",
981 )]))
982 .unwrap_err()
983 .to_string();
984 assert!(err.contains("LDB-5062"), "got: {err}");
985 }
986
987 #[test]
988 fn tls_key_without_cert_rejected() {
989 let err = NatsSourceConfig::from_config(&jetstream_happy(&[(
990 "tls.key.location",
991 "/certs/client.key",
992 )]))
993 .unwrap_err()
994 .to_string();
995 assert!(err.contains("LDB-5062"), "got: {err}");
996 }
997
998 #[test]
999 fn tls_happy_path() {
1000 let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
1001 ("tls.enabled", "true"),
1002 ("tls.ca.location", "/certs/ca.pem"),
1003 ("tls.cert.location", "/certs/client.pem"),
1004 ("tls.key.location", "/certs/client.key"),
1005 ]))
1006 .unwrap();
1007 assert!(parsed.tls.enabled);
1008 assert_eq!(parsed.tls.ca_location.as_deref(), Some("/certs/ca.pem"));
1009 assert_eq!(
1010 parsed.tls.cert_location.as_deref(),
1011 Some("/certs/client.pem")
1012 );
1013 }
1014
1015 #[test]
1016 fn auth_and_tls_on_sink() {
1017 let parsed = NatsSinkConfig::from_config(&cfg(&[
1018 ("servers", "nats://a:4222"),
1019 ("subject", "x"),
1020 ("auth.mode", "user_pass"),
1021 ("user", "alice"),
1022 ("password", "wonderland"),
1023 ("tls.enabled", "true"),
1024 ]))
1025 .unwrap();
1026 assert!(matches!(parsed.auth, AuthMode::UserPass { .. }));
1027 assert!(parsed.tls.enabled);
1028 }
1029}