Skip to main content

laminar_connectors/nats/
config.rs

1//! NATS source and sink configuration.
2
3use std::time::Duration;
4
5use crate::config::ConnectorConfig;
6use crate::connector::DeliveryGuarantee;
7use crate::error::ConnectorError;
8use crate::serde::Format;
9
10/// NATS connector mode.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum Mode {
13    /// Plain NATS pub/sub. No durability, no replay, at-most-once.
14    Core,
15    /// `JetStream` with durable pull consumers.
16    #[default]
17    JetStream,
18}
19
20str_enum!(fromstr Mode, lowercase, ConnectorError, "invalid nats mode",
21    Core => "core";
22    JetStream => "jetstream"
23);
24
25/// `JetStream` consumer ack policy.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum AckPolicy {
28    /// Each message acked individually.
29    #[default]
30    Explicit,
31    /// No ack required.
32    None,
33}
34
35str_enum!(fromstr AckPolicy, lowercase, ConnectorError, "invalid ack.policy",
36    Explicit => "explicit";
37    None => "none"
38);
39
40/// `JetStream` consumer delivery policy — where to start.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub enum DeliverPolicy {
43    /// Every message retained by the stream.
44    #[default]
45    All,
46    /// Only messages published after consumer creation.
47    New,
48    /// Start from a user-supplied stream sequence.
49    ByStartSequence,
50    /// Start from a user-supplied timestamp.
51    ByStartTime,
52}
53
54str_enum!(fromstr DeliverPolicy, lowercase, ConnectorError, "invalid deliver.policy",
55    All => "all";
56    New => "new";
57    ByStartSequence => "by_start_sequence";
58    ByStartTime => "by_start_time"
59);
60
61/// Where the sink gets the subject to publish to.
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum SubjectSpec {
64    /// A single literal subject used for every row.
65    Literal(String),
66    /// Per-row: read the named column and use its string value.
67    Column(String),
68}
69
70/// NATS user-authentication mode. Custom `Debug` redacts secrets
71/// because the parent config structs derive `Debug`.
72#[derive(Clone, Default, PartialEq, Eq)]
73#[allow(missing_docs)]
74pub enum AuthMode {
75    #[default]
76    None,
77    UserPass {
78        user: String,
79        password: String,
80    },
81    Token(String),
82    CredsFile(String),
83}
84
85impl std::fmt::Debug for AuthMode {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            Self::None => f.write_str("None"),
89            Self::UserPass { user, .. } => f
90                .debug_struct("UserPass")
91                .field("user", user)
92                .field("password", &"<redacted>")
93                .finish(),
94            Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
95            Self::CredsFile(path) => f.debug_tuple("CredsFile").field(path).finish(),
96        }
97    }
98}
99
100/// TLS transport configuration.
101#[derive(Debug, Clone, Default)]
102#[allow(missing_docs)]
103pub struct TlsConfig {
104    pub enabled: bool,
105    pub ca_location: Option<String>,
106    pub cert_location: Option<String>,
107    pub key_location: Option<String>,
108}
109
110/// NATS source configuration.
111#[derive(Debug, Clone)]
112#[allow(missing_docs)] // one-line `///` per field noises up the config struct
113pub struct NatsSourceConfig {
114    pub servers: Vec<String>,
115    pub mode: Mode,
116    pub auth: AuthMode,
117    pub tls: TlsConfig,
118
119    // JetStream
120    pub stream: Option<String>,
121    pub subject: Option<String>,
122    pub subject_filters: Vec<String>,
123    pub consumer: Option<String>,
124    pub deliver_policy: DeliverPolicy,
125    pub start_sequence: Option<u64>,
126    pub start_time: Option<String>, // RFC3339; parsed against the NATS client's time type in open()
127    pub ack_policy: AckPolicy,
128    pub ack_wait: Duration,
129    pub max_deliver: i64,
130    pub max_ack_pending: i64,
131    pub fetch_batch: usize,
132    pub fetch_max_wait: Duration,
133    /// Consecutive fetch errors before `health_check` reports
134    /// `Unhealthy`. Zero disables the flip.
135    pub fetch_error_threshold: u32,
136    /// Interval between `consumer.info()` polls that feed the
137    /// `nats_source_consumer_lag` gauge. Zero disables the poll.
138    pub lag_poll_interval: Duration,
139
140    // Core
141    pub queue_group: Option<String>,
142
143    pub format: Format,
144}
145
146impl NatsSourceConfig {
147    /// # Errors
148    ///
149    /// `ConfigurationError` on any parse or validation failure.
150    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
151        let servers = parse_servers(config)?;
152        let mode = parse_or_default::<Mode>(config, "mode")?;
153        let auth = parse_auth(config)?;
154        let tls = parse_tls(config)?;
155
156        let stream = config.get("stream").map(str::to_string);
157        let subject = config.get("subject").map(str::to_string);
158        let subject_filters = config
159            .get("subject.filters")
160            .map(split_csv)
161            .unwrap_or_default();
162        let consumer = config.get("consumer").map(str::to_string);
163        let deliver_policy = parse_or_default::<DeliverPolicy>(config, "deliver.policy")?;
164        let start_sequence = parse_opt_u64(config, "start.sequence")?;
165        let start_time = config.get("start.time").map(str::to_string);
166        let ack_policy = parse_or_default::<AckPolicy>(config, "ack.policy")?;
167        let ack_wait = require_positive_duration(config, "ack.wait.ms", Duration::from_secs(60))?;
168        let max_deliver = require_positive_or_unlimited_i64(config, "max.deliver", 5)?;
169        let max_ack_pending = require_positive_or_unlimited_i64(config, "max.ack.pending", 10_000)?;
170        let fetch_batch = require_positive_usize(config, "fetch.batch", 500)?;
171        let fetch_max_wait =
172            require_positive_duration(config, "fetch.max.wait.ms", Duration::from_millis(500))?;
173        let fetch_error_threshold = parse_u32(config, "fetch.error.threshold", 10)?;
174        let lag_poll_interval =
175            parse_duration_ms(config, "lag.poll.interval.ms", Duration::from_secs(10))?;
176        let queue_group = config.get("queue.group").map(str::to_string);
177        let format = parse_format(config)?;
178
179        let cfg = Self {
180            servers,
181            mode,
182            auth,
183            tls,
184            stream,
185            subject,
186            subject_filters,
187            consumer,
188            deliver_policy,
189            start_sequence,
190            start_time,
191            ack_policy,
192            ack_wait,
193            max_deliver,
194            max_ack_pending,
195            fetch_batch,
196            fetch_max_wait,
197            fetch_error_threshold,
198            lag_poll_interval,
199            queue_group,
200            format,
201        };
202        cfg.validate()?;
203        Ok(cfg)
204    }
205
206    fn validate(&self) -> Result<(), ConnectorError> {
207        match self.mode {
208            Mode::JetStream => {
209                if self.stream.is_none() {
210                    return Err(cfg_err(
211                        "[LDB-5040] jetstream mode requires 'stream' to be set",
212                    ));
213                }
214                if self.consumer.is_none() {
215                    return Err(cfg_err(
216                        "[LDB-5041] jetstream mode requires 'consumer' (durable name) — \
217                         ephemeral consumers are not supported",
218                    ));
219                }
220                if self.subject.is_none() && self.subject_filters.is_empty() {
221                    return Err(cfg_err(
222                        "[LDB-5042] jetstream mode requires 'subject' or 'subject.filters' \
223                         — the consumer must have at least one filter",
224                    ));
225                }
226                if matches!(self.deliver_policy, DeliverPolicy::ByStartSequence)
227                    && self.start_sequence.is_none()
228                {
229                    return Err(cfg_err(
230                        "[LDB-5043] deliver.policy=by_start_sequence requires 'start.sequence'",
231                    ));
232                }
233                if matches!(self.deliver_policy, DeliverPolicy::ByStartTime)
234                    && self.start_time.is_none()
235                {
236                    return Err(cfg_err(
237                        "[LDB-5044] deliver.policy=by_start_time requires 'start.time'",
238                    ));
239                }
240            }
241            Mode::Core => {
242                if self.subject.is_none() {
243                    return Err(cfg_err(
244                        "[LDB-5045] core mode requires 'subject' (JetStream stream config \
245                         does not apply)",
246                    ));
247                }
248            }
249        }
250        Ok(())
251    }
252}
253
254/// NATS sink configuration.
255#[derive(Debug, Clone)]
256#[allow(missing_docs)]
257pub struct NatsSinkConfig {
258    pub servers: Vec<String>,
259    pub mode: Mode,
260    pub auth: AuthMode,
261    pub tls: TlsConfig,
262    pub stream: Option<String>,
263    pub subject: SubjectSpec,
264    pub expected_stream: Option<String>,
265    pub delivery_guarantee: DeliveryGuarantee,
266    pub dedup_id_column: Option<String>,
267    /// Stream's `duplicate_window` must be at least this long under
268    /// exactly-once — else rollback redelivery can land outside the
269    /// dedup horizon.
270    pub min_duplicate_window: Duration,
271    pub max_pending: usize,
272    pub ack_timeout: Duration,
273    pub format: Format,
274    pub header_columns: Vec<String>,
275}
276
277impl NatsSinkConfig {
278    /// # Errors
279    ///
280    /// `ConfigurationError` on any parse or validation failure.
281    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
282        let servers = parse_servers(config)?;
283        let mode = parse_or_default::<Mode>(config, "mode")?;
284        let auth = parse_auth(config)?;
285        let tls = parse_tls(config)?;
286
287        let subject = match (config.get("subject"), config.get("subject.column")) {
288            (Some(s), None) => SubjectSpec::Literal(s.to_string()),
289            (None, Some(col)) => SubjectSpec::Column(col.to_string()),
290            (Some(_), Some(_)) => {
291                return Err(cfg_err(
292                    "[LDB-5050] set 'subject' OR 'subject.column', not both",
293                ));
294            }
295            (None, None) => {
296                return Err(cfg_err(
297                    "[LDB-5051] 'subject' or 'subject.column' is required",
298                ));
299            }
300        };
301
302        let delivery_guarantee =
303            parse_or_default::<DeliveryGuarantee>(config, "delivery.guarantee")
304                .map_err(|_| cfg_err("[LDB-5052] invalid delivery.guarantee"))?;
305        let dedup_id_column = config.get("dedup.id.column").map(str::to_string);
306
307        let cfg = Self {
308            servers,
309            mode,
310            auth,
311            tls,
312            stream: config.get("stream").map(str::to_string),
313            subject,
314            expected_stream: config.get("expected.stream").map(str::to_string),
315            delivery_guarantee,
316            dedup_id_column,
317            min_duplicate_window: require_positive_duration(
318                config,
319                "min.duplicate.window.ms",
320                Duration::from_secs(120),
321            )?,
322            max_pending: require_positive_usize(config, "max.pending", 4096)?,
323            ack_timeout: require_positive_duration(
324                config,
325                "ack.timeout.ms",
326                Duration::from_secs(30),
327            )?,
328            format: parse_format(config)?,
329            header_columns: config
330                .get("header.columns")
331                .map(split_csv)
332                .unwrap_or_default(),
333        };
334        cfg.validate()?;
335        Ok(cfg)
336    }
337
338    fn validate(&self) -> Result<(), ConnectorError> {
339        if self.mode == Mode::Core && self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
340            return Err(cfg_err(
341                "[LDB-5053] delivery.guarantee=exactly_once is not supported in mode=core \
342                 (no server-side dedup); use mode=jetstream",
343            ));
344        }
345        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
346            && self.dedup_id_column.is_none()
347        {
348            return Err(cfg_err(
349                "[LDB-5054] delivery.guarantee=exactly_once requires 'dedup.id.column' — \
350                 msg-id dedup with epoch-row hashing is not supported (deterministic replay \
351                 is too fragile; name a unique-per-row column)",
352            ));
353        }
354        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce && self.stream.is_none() {
355            return Err(cfg_err(
356                "[LDB-5055] delivery.guarantee=exactly_once requires 'stream' so the sink \
357                 can validate its duplicate_window at startup",
358            ));
359        }
360        for h in &self.header_columns {
361            if is_reserved_header(h) {
362                return Err(cfg_err(&format!(
363                    "[LDB-5065] header.columns entry '{h}' collides with a reserved NATS \
364                     header (Nats-Msg-Id / Nats-Expected-Stream); rename the column",
365                )));
366            }
367        }
368        Ok(())
369    }
370}
371
372/// Header names the sink manages itself; a user header with the same
373/// name would otherwise silently clobber exactly-once semantics.
374fn is_reserved_header(name: &str) -> bool {
375    const RESERVED: &[&str] = &["Nats-Msg-Id", "Nats-Expected-Stream"];
376    RESERVED.iter().any(|r| r.eq_ignore_ascii_case(name))
377}
378
379// ── helpers ──
380
381fn cfg_err(msg: &str) -> ConnectorError {
382    ConnectorError::ConfigurationError(msg.to_string())
383}
384
385fn parse_servers(config: &ConnectorConfig) -> Result<Vec<String>, ConnectorError> {
386    let raw = config.require("servers")?;
387    let servers: Vec<String> = split_csv(raw);
388    if servers.is_empty() {
389        return Err(cfg_err("[LDB-5030] 'servers' must not be empty"));
390    }
391    Ok(servers)
392}
393
394fn split_csv(raw: &str) -> Vec<String> {
395    raw.split(',')
396        .map(str::trim)
397        .filter(|s| !s.is_empty())
398        .map(str::to_string)
399        .collect()
400}
401
402fn parse_or_default<T: std::str::FromStr + Default>(
403    config: &ConnectorConfig,
404    key: &str,
405) -> Result<T, ConnectorError>
406where
407    T::Err: std::fmt::Display,
408{
409    match config.get(key) {
410        Some(s) => s
411            .parse()
412            .map_err(|e: T::Err| cfg_err(&format!("invalid {key}: {e}"))),
413        None => Ok(T::default()),
414    }
415}
416
417fn parse_format(config: &ConnectorConfig) -> Result<Format, ConnectorError> {
418    match config.get("format") {
419        Some(s) => Format::parse(s).map_err(|e| cfg_err(&e.to_string())),
420        None => Ok(Format::Json),
421    }
422}
423
424fn parse_bool(config: &ConnectorConfig, key: &str, default: bool) -> Result<bool, ConnectorError> {
425    match config.get(key) {
426        Some(s) => s
427            .parse::<bool>()
428            .map_err(|_| cfg_err(&format!("{key} must be 'true' or 'false', got '{s}'"))),
429        None => Ok(default),
430    }
431}
432
433fn parse_opt_u64(config: &ConnectorConfig, key: &str) -> Result<Option<u64>, ConnectorError> {
434    config.get(key).map(|s| parse_int(key, s)).transpose()
435}
436
437fn parse_i64(config: &ConnectorConfig, key: &str, default: i64) -> Result<i64, ConnectorError> {
438    config.get(key).map_or(Ok(default), |s| parse_int(key, s))
439}
440
441fn parse_usize(
442    config: &ConnectorConfig,
443    key: &str,
444    default: usize,
445) -> Result<usize, ConnectorError> {
446    config.get(key).map_or(Ok(default), |s| parse_int(key, s))
447}
448
449fn parse_u32(config: &ConnectorConfig, key: &str, default: u32) -> Result<u32, ConnectorError> {
450    config.get(key).map_or(Ok(default), |s| parse_int(key, s))
451}
452
453fn require_positive_duration(
454    config: &ConnectorConfig,
455    key: &str,
456    default: Duration,
457) -> Result<Duration, ConnectorError> {
458    let v = parse_duration_ms(config, key, default)?;
459    if v.is_zero() {
460        return Err(cfg_err(&format!("{key} must be > 0")));
461    }
462    Ok(v)
463}
464
465fn require_positive_usize(
466    config: &ConnectorConfig,
467    key: &str,
468    default: usize,
469) -> Result<usize, ConnectorError> {
470    let v = parse_usize(config, key, default)?;
471    if v == 0 {
472        return Err(cfg_err(&format!("{key} must be > 0")));
473    }
474    Ok(v)
475}
476
477/// NATS allows `-1` for "unlimited" in `max_deliver` / `max_ack_pending`.
478/// Reject 0 (undefined) and negative values other than -1.
479fn require_positive_or_unlimited_i64(
480    config: &ConnectorConfig,
481    key: &str,
482    default: i64,
483) -> Result<i64, ConnectorError> {
484    let v = parse_i64(config, key, default)?;
485    if v == 0 || v < -1 {
486        return Err(cfg_err(&format!("{key} must be > 0, or -1 for unlimited")));
487    }
488    Ok(v)
489}
490
491fn parse_duration_ms(
492    config: &ConnectorConfig,
493    key: &str,
494    default: Duration,
495) -> Result<Duration, ConnectorError> {
496    config.get(key).map_or(Ok(default), |s| {
497        parse_int::<u64>(key, s).map(Duration::from_millis)
498    })
499}
500
501fn parse_int<T: std::str::FromStr>(key: &str, raw: &str) -> Result<T, ConnectorError> {
502    raw.parse::<T>()
503        .map_err(|_| cfg_err(&format!("{key} must be an integer, got '{raw}'")))
504}
505
506fn parse_auth(config: &ConnectorConfig) -> Result<AuthMode, ConnectorError> {
507    let mode = config.get("auth.mode").unwrap_or("none");
508    match mode {
509        "none" | "" => {
510            if config.get("user").is_some()
511                || config.get("password").is_some()
512                || config.get("token").is_some()
513                || config.get("creds.file").is_some()
514            {
515                return Err(cfg_err(
516                    "[LDB-5063] credentials set but auth.mode=none; \
517                     set auth.mode explicitly or remove the credentials",
518                ));
519            }
520            Ok(AuthMode::None)
521        }
522        "user_pass" => {
523            let user = config
524                .get("user")
525                .ok_or_else(|| cfg_err("[LDB-5060] auth.mode=user_pass requires 'user'"))?
526                .to_string();
527            let password = config
528                .get("password")
529                .ok_or_else(|| cfg_err("[LDB-5060] auth.mode=user_pass requires 'password'"))?
530                .to_string();
531            Ok(AuthMode::UserPass { user, password })
532        }
533        "token" => {
534            let token = config
535                .get("token")
536                .ok_or_else(|| cfg_err("[LDB-5061] auth.mode=token requires 'token'"))?
537                .to_string();
538            Ok(AuthMode::Token(token))
539        }
540        "creds_file" => {
541            let path = config
542                .get("creds.file")
543                .ok_or_else(|| cfg_err("[LDB-5064] auth.mode=creds_file requires 'creds.file'"))?
544                .to_string();
545            Ok(AuthMode::CredsFile(path))
546        }
547        other => Err(cfg_err(&format!(
548            "invalid auth.mode '{other}'; expected none | user_pass | token | creds_file"
549        ))),
550    }
551}
552
553fn parse_tls(config: &ConnectorConfig) -> Result<TlsConfig, ConnectorError> {
554    let cert_location = config.get("tls.cert.location").map(str::to_string);
555    let key_location = config.get("tls.key.location").map(str::to_string);
556    if cert_location.is_some() != key_location.is_some() {
557        return Err(cfg_err(
558            "[LDB-5062] tls.cert.location and tls.key.location must both be set \
559             (mutual-TLS client cert) or both be unset",
560        ));
561    }
562    Ok(TlsConfig {
563        enabled: parse_bool(config, "tls.enabled", false)?,
564        ca_location: config.get("tls.ca.location").map(str::to_string),
565        cert_location,
566        key_location,
567    })
568}
569
570/// Build `ConnectOptions` from parsed auth + TLS. Shared between
571/// source and sink.
572///
573/// # Errors
574///
575/// Returns `ConnectorError` if the creds file can't be read or parsed.
576pub(super) fn build_connect_options(
577    auth: &AuthMode,
578    tls: &TlsConfig,
579) -> Result<async_nats::ConnectOptions, ConnectorError> {
580    let mut opts = async_nats::ConnectOptions::new();
581    match auth {
582        AuthMode::None => {}
583        AuthMode::UserPass { user, password } => {
584            opts = opts.user_and_password(user.clone(), password.clone());
585        }
586        AuthMode::Token(token) => {
587            opts = opts.token(token.clone());
588        }
589        AuthMode::CredsFile(path) => {
590            let contents = std::fs::read_to_string(path)
591                .map_err(|e| cfg_err(&format!("creds.file '{path}': {e}")))?;
592            opts = async_nats::ConnectOptions::with_credentials(&contents)
593                .map_err(|e| cfg_err(&format!("creds.file '{path}' invalid: {e}")))?;
594        }
595    }
596    let tls_touched = tls.enabled || tls.ca_location.is_some() || tls.cert_location.is_some();
597    if tls_touched {
598        opts = opts.require_tls(true);
599    }
600    if let Some(ca) = &tls.ca_location {
601        opts = opts.add_root_certificates(ca.into());
602    }
603    if let (Some(cert), Some(key)) = (&tls.cert_location, &tls.key_location) {
604        opts = opts.add_client_certificate(cert.into(), key.into());
605    }
606    Ok(opts)
607}
608
609#[cfg(test)]
610#[allow(clippy::disallowed_types)]
611mod tests {
612    use super::*;
613    use std::collections::HashMap;
614
615    fn cfg(pairs: &[(&str, &str)]) -> ConnectorConfig {
616        let mut props = HashMap::new();
617        for (k, v) in pairs {
618            props.insert((*k).to_string(), (*v).to_string());
619        }
620        ConnectorConfig::with_properties("nats", props)
621    }
622
623    // ── source ──
624
625    #[test]
626    fn source_jetstream_requires_stream() {
627        let err = NatsSourceConfig::from_config(&cfg(&[
628            ("servers", "nats://a:4222"),
629            ("consumer", "c"),
630            ("subject", "x.>"),
631        ]))
632        .unwrap_err()
633        .to_string();
634        assert!(err.contains("LDB-5040"), "got: {err}");
635    }
636
637    #[test]
638    fn source_jetstream_requires_consumer() {
639        let err = NatsSourceConfig::from_config(&cfg(&[
640            ("servers", "nats://a:4222"),
641            ("stream", "S"),
642            ("subject", "x.>"),
643        ]))
644        .unwrap_err()
645        .to_string();
646        assert!(err.contains("LDB-5041"), "got: {err}");
647    }
648
649    #[test]
650    fn source_jetstream_requires_subject_or_filters() {
651        let err = NatsSourceConfig::from_config(&cfg(&[
652            ("servers", "nats://a:4222"),
653            ("stream", "S"),
654            ("consumer", "c"),
655        ]))
656        .unwrap_err()
657        .to_string();
658        assert!(err.contains("LDB-5042"), "got: {err}");
659    }
660
661    #[test]
662    fn source_core_requires_subject() {
663        let err =
664            NatsSourceConfig::from_config(&cfg(&[("servers", "nats://a:4222"), ("mode", "core")]))
665                .unwrap_err()
666                .to_string();
667        assert!(err.contains("LDB-5045"), "got: {err}");
668    }
669
670    #[test]
671    fn source_by_start_sequence_requires_start_sequence() {
672        let err = NatsSourceConfig::from_config(&cfg(&[
673            ("servers", "nats://a:4222"),
674            ("stream", "S"),
675            ("consumer", "c"),
676            ("subject", "x.>"),
677            ("deliver.policy", "by_start_sequence"),
678        ]))
679        .unwrap_err()
680        .to_string();
681        assert!(err.contains("LDB-5043"), "got: {err}");
682    }
683
684    #[test]
685    fn source_happy_path_jetstream() {
686        let parsed = NatsSourceConfig::from_config(&cfg(&[
687            ("servers", "nats://a:4222,nats://b:4222"),
688            ("stream", "ORDERS"),
689            ("consumer", "laminar-orders"),
690            ("subject.filters", "orders.us.*,orders.eu.*"),
691            ("ack.wait.ms", "45000"),
692            ("max.deliver", "3"),
693        ]))
694        .unwrap();
695        assert_eq!(parsed.servers.len(), 2);
696        assert_eq!(parsed.mode, Mode::JetStream);
697        assert_eq!(parsed.subject_filters, vec!["orders.us.*", "orders.eu.*"]);
698        assert_eq!(parsed.ack_wait, Duration::from_secs(45));
699        assert_eq!(parsed.max_deliver, 3);
700    }
701
702    #[test]
703    fn source_happy_path_core() {
704        let parsed = NatsSourceConfig::from_config(&cfg(&[
705            ("servers", "nats://a:4222"),
706            ("mode", "core"),
707            ("subject", "events"),
708            ("queue.group", "workers"),
709        ]))
710        .unwrap();
711        assert_eq!(parsed.mode, Mode::Core);
712        assert_eq!(parsed.subject.as_deref(), Some("events"));
713        assert_eq!(parsed.queue_group.as_deref(), Some("workers"));
714    }
715
716    // ── sink ──
717
718    #[test]
719    fn sink_rejects_subject_and_subject_column() {
720        let err = NatsSinkConfig::from_config(&cfg(&[
721            ("servers", "nats://a:4222"),
722            ("subject", "x"),
723            ("subject.column", "c"),
724        ]))
725        .unwrap_err()
726        .to_string();
727        assert!(err.contains("LDB-5050"), "got: {err}");
728    }
729
730    #[test]
731    fn sink_rejects_neither_subject_nor_column() {
732        let err = NatsSinkConfig::from_config(&cfg(&[("servers", "nats://a:4222")]))
733            .unwrap_err()
734            .to_string();
735        assert!(err.contains("LDB-5051"), "got: {err}");
736    }
737
738    #[test]
739    fn sink_rejects_header_column_colliding_with_reserved() {
740        let err = NatsSinkConfig::from_config(&cfg(&[
741            ("servers", "nats://a:4222"),
742            ("subject", "x"),
743            ("header.columns", "trace_id,nats-msg-id"),
744        ]))
745        .unwrap_err()
746        .to_string();
747        assert!(err.contains("LDB-5065"), "got: {err}");
748    }
749
750    #[test]
751    fn sink_rejects_core_with_exactly_once() {
752        let err = NatsSinkConfig::from_config(&cfg(&[
753            ("servers", "nats://a:4222"),
754            ("mode", "core"),
755            ("subject", "x"),
756            ("delivery.guarantee", "exactly_once"),
757        ]))
758        .unwrap_err()
759        .to_string();
760        assert!(err.contains("LDB-5053"), "got: {err}");
761    }
762
763    #[test]
764    fn sink_rejects_exactly_once_without_stream() {
765        let err = NatsSinkConfig::from_config(&cfg(&[
766            ("servers", "nats://a:4222"),
767            ("subject", "x"),
768            ("delivery.guarantee", "exactly_once"),
769            ("dedup.id.column", "event_id"),
770        ]))
771        .unwrap_err()
772        .to_string();
773        assert!(err.contains("LDB-5055"), "got: {err}");
774    }
775
776    #[test]
777    fn sink_rejects_exactly_once_without_dedup_column() {
778        let err = NatsSinkConfig::from_config(&cfg(&[
779            ("servers", "nats://a:4222"),
780            ("stream", "OUT"),
781            ("subject", "x.processed"),
782            ("delivery.guarantee", "exactly_once"),
783        ]))
784        .unwrap_err()
785        .to_string();
786        assert!(err.contains("LDB-5054"), "got: {err}");
787    }
788
789    #[test]
790    fn sink_happy_path_exactly_once() {
791        let parsed = NatsSinkConfig::from_config(&cfg(&[
792            ("servers", "nats://a:4222"),
793            ("stream", "OUT"),
794            ("subject.column", "out_subject"),
795            ("delivery.guarantee", "exactly_once"),
796            ("dedup.id.column", "event_id"),
797            ("header.columns", "trace_id,tenant"),
798        ]))
799        .unwrap();
800        assert_eq!(parsed.subject, SubjectSpec::Column("out_subject".into()));
801        assert_eq!(parsed.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
802        assert_eq!(parsed.dedup_id_column.as_deref(), Some("event_id"));
803        assert_eq!(parsed.header_columns, vec!["trace_id", "tenant"]);
804    }
805
806    // ── servers ──
807
808    #[test]
809    fn servers_required() {
810        let err = NatsSourceConfig::from_config(&cfg(&[]))
811            .unwrap_err()
812            .to_string();
813        assert!(err.contains("servers"), "got: {err}");
814    }
815
816    #[test]
817    fn zero_fetch_batch_rejected() {
818        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("fetch.batch", "0")]))
819            .unwrap_err()
820            .to_string();
821        assert!(err.contains("fetch.batch must be > 0"), "got: {err}");
822    }
823
824    #[test]
825    fn zero_ack_wait_rejected() {
826        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("ack.wait.ms", "0")]))
827            .unwrap_err()
828            .to_string();
829        assert!(err.contains("ack.wait.ms must be > 0"), "got: {err}");
830    }
831
832    #[test]
833    fn max_deliver_unlimited_accepted() {
834        let parsed =
835            NatsSourceConfig::from_config(&jetstream_happy(&[("max.deliver", "-1")])).unwrap();
836        assert_eq!(parsed.max_deliver, -1);
837    }
838
839    #[test]
840    fn max_deliver_negative_other_than_minus_one_rejected() {
841        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("max.deliver", "-5")]))
842            .unwrap_err()
843            .to_string();
844        assert!(err.contains("max.deliver"), "got: {err}");
845    }
846
847    #[test]
848    fn sink_zero_max_pending_rejected() {
849        let err = NatsSinkConfig::from_config(&cfg(&[
850            ("servers", "nats://a:4222"),
851            ("subject", "x"),
852            ("max.pending", "0"),
853        ]))
854        .unwrap_err()
855        .to_string();
856        assert!(err.contains("max.pending must be > 0"), "got: {err}");
857    }
858
859    #[test]
860    fn servers_empty_csv_rejected() {
861        let err = NatsSourceConfig::from_config(&cfg(&[("servers", ",,")]))
862            .unwrap_err()
863            .to_string();
864        assert!(err.contains("LDB-5030"), "got: {err}");
865    }
866
867    // ── auth ──
868
869    fn jetstream_happy(pairs: &[(&str, &str)]) -> ConnectorConfig {
870        let mut base = vec![
871            ("servers", "nats://a:4222"),
872            ("stream", "S"),
873            ("consumer", "c"),
874            ("subject", "x.>"),
875        ];
876        base.extend_from_slice(pairs);
877        cfg(&base)
878    }
879
880    #[test]
881    fn auth_user_pass_requires_user() {
882        let err = NatsSourceConfig::from_config(&jetstream_happy(&[
883            ("auth.mode", "user_pass"),
884            ("password", "secret"),
885        ]))
886        .unwrap_err()
887        .to_string();
888        assert!(err.contains("LDB-5060"), "got: {err}");
889    }
890
891    #[test]
892    fn auth_user_pass_requires_password() {
893        let err = NatsSourceConfig::from_config(&jetstream_happy(&[
894            ("auth.mode", "user_pass"),
895            ("user", "alice"),
896        ]))
897        .unwrap_err()
898        .to_string();
899        assert!(err.contains("LDB-5060"), "got: {err}");
900    }
901
902    #[test]
903    fn auth_user_pass_happy_path() {
904        let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
905            ("auth.mode", "user_pass"),
906            ("user", "alice"),
907            ("password", "wonderland"),
908        ]))
909        .unwrap();
910        assert_eq!(
911            parsed.auth,
912            AuthMode::UserPass {
913                user: "alice".into(),
914                password: "wonderland".into(),
915            }
916        );
917    }
918
919    #[test]
920    fn auth_token_requires_token() {
921        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("auth.mode", "token")]))
922            .unwrap_err()
923            .to_string();
924        assert!(err.contains("LDB-5061"), "got: {err}");
925    }
926
927    #[test]
928    fn auth_token_happy_path() {
929        let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
930            ("auth.mode", "token"),
931            ("token", "abc123"),
932        ]))
933        .unwrap();
934        assert_eq!(parsed.auth, AuthMode::Token("abc123".into()));
935    }
936
937    #[test]
938    fn auth_none_with_stray_credentials_rejected() {
939        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("user", "alice")]))
940            .unwrap_err()
941            .to_string();
942        assert!(err.contains("LDB-5063"), "got: {err}");
943    }
944
945    #[test]
946    fn auth_creds_file_requires_path() {
947        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("auth.mode", "creds_file")]))
948            .unwrap_err()
949            .to_string();
950        assert!(err.contains("LDB-5064"), "got: {err}");
951    }
952
953    #[test]
954    fn auth_creds_file_happy_path() {
955        let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
956            ("auth.mode", "creds_file"),
957            ("creds.file", "/secrets/user.creds"),
958        ]))
959        .unwrap();
960        assert_eq!(
961            parsed.auth,
962            AuthMode::CredsFile("/secrets/user.creds".into())
963        );
964    }
965
966    #[test]
967    fn auth_unknown_mode_rejected() {
968        let err = NatsSourceConfig::from_config(&jetstream_happy(&[("auth.mode", "banana")]))
969            .unwrap_err()
970            .to_string();
971        assert!(err.contains("invalid auth.mode"), "got: {err}");
972    }
973
974    // ── tls ──
975
976    #[test]
977    fn tls_cert_without_key_rejected() {
978        let err = NatsSourceConfig::from_config(&jetstream_happy(&[(
979            "tls.cert.location",
980            "/certs/client.pem",
981        )]))
982        .unwrap_err()
983        .to_string();
984        assert!(err.contains("LDB-5062"), "got: {err}");
985    }
986
987    #[test]
988    fn tls_key_without_cert_rejected() {
989        let err = NatsSourceConfig::from_config(&jetstream_happy(&[(
990            "tls.key.location",
991            "/certs/client.key",
992        )]))
993        .unwrap_err()
994        .to_string();
995        assert!(err.contains("LDB-5062"), "got: {err}");
996    }
997
998    #[test]
999    fn tls_happy_path() {
1000        let parsed = NatsSourceConfig::from_config(&jetstream_happy(&[
1001            ("tls.enabled", "true"),
1002            ("tls.ca.location", "/certs/ca.pem"),
1003            ("tls.cert.location", "/certs/client.pem"),
1004            ("tls.key.location", "/certs/client.key"),
1005        ]))
1006        .unwrap();
1007        assert!(parsed.tls.enabled);
1008        assert_eq!(parsed.tls.ca_location.as_deref(), Some("/certs/ca.pem"));
1009        assert_eq!(
1010            parsed.tls.cert_location.as_deref(),
1011            Some("/certs/client.pem")
1012        );
1013    }
1014
1015    #[test]
1016    fn auth_and_tls_on_sink() {
1017        let parsed = NatsSinkConfig::from_config(&cfg(&[
1018            ("servers", "nats://a:4222"),
1019            ("subject", "x"),
1020            ("auth.mode", "user_pass"),
1021            ("user", "alice"),
1022            ("password", "wonderland"),
1023            ("tls.enabled", "true"),
1024        ]))
1025        .unwrap();
1026        assert!(matches!(parsed.auth, AuthMode::UserPass { .. }));
1027        assert!(parsed.tls.enabled);
1028    }
1029}