1pub mod config;
6pub mod metrics;
7pub mod sink;
8pub mod source;
9
10pub use metrics::{NatsSinkMetrics, NatsSourceMetrics};
11pub use sink::NatsSink;
12pub use source::NatsSource;
13
14use std::sync::Arc;
15
16use arrow_schema::Schema;
17
18use crate::config::{ConfigKeySpec, ConnectorInfo};
19use crate::registry::ConnectorRegistry;
20
21pub fn register_nats_source(registry: &ConnectorRegistry) {
23 let info = ConnectorInfo {
24 name: "nats".to_string(),
25 display_name: "NATS Source".to_string(),
26 version: env!("CARGO_PKG_VERSION").to_string(),
27 is_source: true,
28 is_sink: false,
29 config_keys: source_config_keys(),
30 };
31 registry.register_source(
32 "nats",
33 info,
34 Arc::new(|reg| Box::new(NatsSource::new(Arc::new(Schema::empty()), reg))),
35 );
36}
37
38pub fn register_nats_sink(registry: &ConnectorRegistry) {
40 let info = ConnectorInfo {
41 name: "nats".to_string(),
42 display_name: "NATS Sink".to_string(),
43 version: env!("CARGO_PKG_VERSION").to_string(),
44 is_source: false,
45 is_sink: true,
46 config_keys: sink_config_keys(),
47 };
48 registry.register_sink(
49 "nats",
50 info,
51 Arc::new(|reg| Box::new(NatsSink::new(Arc::new(Schema::empty()), reg))),
52 );
53}
54
55fn auth_and_tls_keys() -> Vec<ConfigKeySpec> {
56 use ConfigKeySpec as K;
57 vec![
58 K::optional("auth.mode", "none | user_pass | token | creds_file", "none"),
59 K::optional("user", "Username (auth.mode=user_pass)", ""),
60 K::optional("password", "Password (auth.mode=user_pass)", ""),
61 K::optional("token", "Bearer token (auth.mode=token)", ""),
62 K::optional(
63 "creds.file",
64 "Path to a NATS credentials file (auth.mode=creds_file)",
65 "",
66 ),
67 K::optional("tls.enabled", "Require TLS on the connection", "false"),
68 K::optional(
69 "tls.ca.location",
70 "PEM CA certificate for server verification",
71 "",
72 ),
73 K::optional(
74 "tls.cert.location",
75 "Client certificate for mutual TLS (pairs with tls.key.location)",
76 "",
77 ),
78 K::optional("tls.key.location", "Client private key for mutual TLS", ""),
79 ]
80}
81
82fn source_config_keys() -> Vec<ConfigKeySpec> {
83 use ConfigKeySpec as K;
84 let mut keys = vec![
85 K::required("servers", "NATS server URLs, comma-separated"),
86 K::optional("mode", "core | jetstream", "jetstream"),
87 K::optional(
89 "stream",
90 "JetStream stream name (required in jetstream mode)",
91 "",
92 ),
93 K::optional(
94 "consumer",
95 "Durable consumer name (required in jetstream mode)",
96 "",
97 ),
98 K::optional("subject", "Single subject or wildcard (e.g., orders.>)", ""),
99 K::optional(
100 "subject.filters",
101 "Comma-separated filter subjects (JS 2.10+)",
102 "",
103 ),
104 K::optional(
105 "deliver.policy",
106 "all | new | by_start_sequence | by_start_time",
107 "all",
108 ),
109 K::optional(
110 "start.sequence",
111 "Stream sequence for by_start_sequence",
112 "",
113 ),
114 K::optional("start.time", "RFC3339 timestamp for by_start_time", ""),
115 K::optional("ack.policy", "explicit | none", "explicit"),
116 K::optional(
117 "ack.wait.ms",
118 "Per-message ack wait in milliseconds",
119 "60000",
120 ),
121 K::optional(
122 "max.deliver",
123 "Max delivery attempts before poison action",
124 "5",
125 ),
126 K::optional(
127 "max.ack.pending",
128 "Max unacked messages (server-side flow control)",
129 "10000",
130 ),
131 K::optional("fetch.batch", "Messages per pull fetch", "500"),
132 K::optional("fetch.max.wait.ms", "Max wait per fetch", "500"),
133 K::optional(
134 "fetch.error.threshold",
135 "Consecutive fetch errors before the source reports Unhealthy",
136 "10",
137 ),
138 K::optional(
139 "lag.poll.interval.ms",
140 "Interval between consumer.info() polls for the lag gauge (0 disables)",
141 "10000",
142 ),
143 K::optional(
144 "queue.group",
145 "Queue group for load balancing (core mode only)",
146 "",
147 ),
148 K::optional("format", "json | csv | raw", "json"),
149 ];
150 keys.extend(auth_and_tls_keys());
151 keys
152}
153
154fn sink_config_keys() -> Vec<ConfigKeySpec> {
155 use ConfigKeySpec as K;
156 let mut keys = vec![
157 K::required("servers", "NATS server URLs, comma-separated"),
158 K::optional("mode", "core | jetstream", "jetstream"),
159 K::optional("stream", "Target stream (used for validation only)", ""),
160 K::optional("subject", "Literal subject for every row", ""),
161 K::optional(
162 "subject.column",
163 "Column name whose value is the subject",
164 "",
165 ),
166 K::optional(
167 "expected.stream",
168 "Nats-Expected-Stream header for fail-fast",
169 "",
170 ),
171 K::optional(
172 "delivery.guarantee",
173 "at_least_once | exactly_once",
174 "at_least_once",
175 ),
176 K::optional(
177 "dedup.id.column",
178 "Column used as Nats-Msg-Id (required for exactly-once)",
179 "",
180 ),
181 K::optional(
182 "min.duplicate.window.ms",
183 "Minimum stream duplicate_window accepted under exactly-once",
184 "120000",
185 ),
186 K::optional("max.pending", "Max outstanding PubAck futures", "4096"),
187 K::optional("ack.timeout.ms", "Per-publish ack timeout", "30000"),
188 K::optional("format", "json | csv | raw", "json"),
189 K::optional(
190 "header.columns",
191 "Comma-separated columns projected to NATS headers",
192 "",
193 ),
194 ];
195 keys.extend(auth_and_tls_keys());
196 keys
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn register_source_appears_in_registry() {
205 let registry = ConnectorRegistry::new();
206 register_nats_source(®istry);
207 assert!(registry.list_sources().contains(&"nats".to_string()));
208 }
209
210 #[test]
211 fn register_sink_appears_in_registry() {
212 let registry = ConnectorRegistry::new();
213 register_nats_sink(®istry);
214 assert!(registry.list_sinks().contains(&"nats".to_string()));
215 }
216}