Skip to main content

laminar_connectors/nats/
mod.rs

1//! NATS source and sink — `core` (non-durable, at-most-once) or
2//! `jetstream` (default; replayable; at-least-once, or exactly-once
3//! with `Nats-Msg-Id` dedup).
4
5pub mod config;
6pub mod metrics;
7pub mod sink;
8pub mod source;
9
10pub use metrics::{NatsSinkMetrics, NatsSourceMetrics};
11pub use sink::NatsSink;
12pub use source::NatsSource;
13
14use std::sync::Arc;
15
16use arrow_schema::Schema;
17
18use crate::config::{ConfigKeySpec, ConnectorInfo};
19use crate::registry::ConnectorRegistry;
20
21/// Registers the NATS source connector.
22pub fn register_nats_source(registry: &ConnectorRegistry) {
23    let info = ConnectorInfo {
24        name: "nats".to_string(),
25        display_name: "NATS Source".to_string(),
26        version: env!("CARGO_PKG_VERSION").to_string(),
27        is_source: true,
28        is_sink: false,
29        config_keys: source_config_keys(),
30    };
31    registry.register_source(
32        "nats",
33        info,
34        Arc::new(|reg| Box::new(NatsSource::new(Arc::new(Schema::empty()), reg))),
35    );
36}
37
38/// Registers the NATS sink connector.
39pub fn register_nats_sink(registry: &ConnectorRegistry) {
40    let info = ConnectorInfo {
41        name: "nats".to_string(),
42        display_name: "NATS Sink".to_string(),
43        version: env!("CARGO_PKG_VERSION").to_string(),
44        is_source: false,
45        is_sink: true,
46        config_keys: sink_config_keys(),
47    };
48    registry.register_sink(
49        "nats",
50        info,
51        Arc::new(|reg| Box::new(NatsSink::new(Arc::new(Schema::empty()), reg))),
52    );
53}
54
55fn auth_and_tls_keys() -> Vec<ConfigKeySpec> {
56    use ConfigKeySpec as K;
57    vec![
58        K::optional("auth.mode", "none | user_pass | token | creds_file", "none"),
59        K::optional("user", "Username (auth.mode=user_pass)", ""),
60        K::optional("password", "Password (auth.mode=user_pass)", ""),
61        K::optional("token", "Bearer token (auth.mode=token)", ""),
62        K::optional(
63            "creds.file",
64            "Path to a NATS credentials file (auth.mode=creds_file)",
65            "",
66        ),
67        K::optional("tls.enabled", "Require TLS on the connection", "false"),
68        K::optional(
69            "tls.ca.location",
70            "PEM CA certificate for server verification",
71            "",
72        ),
73        K::optional(
74            "tls.cert.location",
75            "Client certificate for mutual TLS (pairs with tls.key.location)",
76            "",
77        ),
78        K::optional("tls.key.location", "Client private key for mutual TLS", ""),
79    ]
80}
81
82fn source_config_keys() -> Vec<ConfigKeySpec> {
83    use ConfigKeySpec as K;
84    let mut keys = vec![
85        K::required("servers", "NATS server URLs, comma-separated"),
86        K::optional("mode", "core | jetstream", "jetstream"),
87        // JetStream
88        K::optional(
89            "stream",
90            "JetStream stream name (required in jetstream mode)",
91            "",
92        ),
93        K::optional(
94            "consumer",
95            "Durable consumer name (required in jetstream mode)",
96            "",
97        ),
98        K::optional("subject", "Single subject or wildcard (e.g., orders.>)", ""),
99        K::optional(
100            "subject.filters",
101            "Comma-separated filter subjects (JS 2.10+)",
102            "",
103        ),
104        K::optional(
105            "deliver.policy",
106            "all | new | by_start_sequence | by_start_time",
107            "all",
108        ),
109        K::optional(
110            "start.sequence",
111            "Stream sequence for by_start_sequence",
112            "",
113        ),
114        K::optional("start.time", "RFC3339 timestamp for by_start_time", ""),
115        K::optional("ack.policy", "explicit | none", "explicit"),
116        K::optional(
117            "ack.wait.ms",
118            "Per-message ack wait in milliseconds",
119            "60000",
120        ),
121        K::optional(
122            "max.deliver",
123            "Max delivery attempts before poison action",
124            "5",
125        ),
126        K::optional(
127            "max.ack.pending",
128            "Max unacked messages (server-side flow control)",
129            "10000",
130        ),
131        K::optional("fetch.batch", "Messages per pull fetch", "500"),
132        K::optional("fetch.max.wait.ms", "Max wait per fetch", "500"),
133        K::optional(
134            "fetch.error.threshold",
135            "Consecutive fetch errors before the source reports Unhealthy",
136            "10",
137        ),
138        K::optional(
139            "lag.poll.interval.ms",
140            "Interval between consumer.info() polls for the lag gauge (0 disables)",
141            "10000",
142        ),
143        K::optional(
144            "queue.group",
145            "Queue group for load balancing (core mode only)",
146            "",
147        ),
148        K::optional("format", "json | csv | raw", "json"),
149    ];
150    keys.extend(auth_and_tls_keys());
151    keys
152}
153
154fn sink_config_keys() -> Vec<ConfigKeySpec> {
155    use ConfigKeySpec as K;
156    let mut keys = vec![
157        K::required("servers", "NATS server URLs, comma-separated"),
158        K::optional("mode", "core | jetstream", "jetstream"),
159        K::optional("stream", "Target stream (used for validation only)", ""),
160        K::optional("subject", "Literal subject for every row", ""),
161        K::optional(
162            "subject.column",
163            "Column name whose value is the subject",
164            "",
165        ),
166        K::optional(
167            "expected.stream",
168            "Nats-Expected-Stream header for fail-fast",
169            "",
170        ),
171        K::optional(
172            "delivery.guarantee",
173            "at_least_once | exactly_once",
174            "at_least_once",
175        ),
176        K::optional(
177            "dedup.id.column",
178            "Column used as Nats-Msg-Id (required for exactly-once)",
179            "",
180        ),
181        K::optional(
182            "min.duplicate.window.ms",
183            "Minimum stream duplicate_window accepted under exactly-once",
184            "120000",
185        ),
186        K::optional("max.pending", "Max outstanding PubAck futures", "4096"),
187        K::optional("ack.timeout.ms", "Per-publish ack timeout", "30000"),
188        K::optional("format", "json | csv | raw", "json"),
189        K::optional(
190            "header.columns",
191            "Comma-separated columns projected to NATS headers",
192            "",
193        ),
194    ];
195    keys.extend(auth_and_tls_keys());
196    keys
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn register_source_appears_in_registry() {
205        let registry = ConnectorRegistry::new();
206        register_nats_source(&registry);
207        assert!(registry.list_sources().contains(&"nats".to_string()));
208    }
209
210    #[test]
211    fn register_sink_appears_in_registry() {
212        let registry = ConnectorRegistry::new();
213        register_nats_sink(&registry);
214        assert!(registry.list_sinks().contains(&"nats".to_string()));
215    }
216}