1use prometheus::core::Collector;
5use prometheus::{Error as PromError, IntCounter, IntGauge, Registry};
6use tracing::warn;
7
8use crate::metrics::ConnectorMetrics;
9
10fn register_collector<C: Collector + Clone + 'static>(reg: &Registry, name: &str, c: &C) {
11 match reg.register(Box::new(c.clone())) {
12 Ok(()) => {}
13 Err(PromError::AlreadyReg) => {
16 warn!(
17 metric = name,
18 "metric already registered; use separate registries per connector"
19 );
20 }
21 Err(e) => warn!(metric = name, error = ?e, "failed to register metric"),
22 }
23}
24
25#[derive(Debug, Clone)]
27#[allow(missing_docs)]
28pub struct NatsSourceMetrics {
29 pub records_total: IntCounter,
30 pub bytes_total: IntCounter,
31 pub fetch_errors_total: IntCounter,
32 pub acks_total: IntCounter,
33 pub ack_errors_total: IntCounter,
34 pub pending_acks: IntGauge,
35 pub consumer_lag: IntGauge,
37}
38
39impl NatsSourceMetrics {
40 #[must_use]
42 #[allow(clippy::missing_panics_doc)]
43 pub fn new(registry: Option<&Registry>) -> Self {
44 let local;
45 let reg = if let Some(r) = registry {
46 r
47 } else {
48 local = Registry::new();
49 &local
50 };
51
52 macro_rules! reg_c {
53 ($name:expr, $help:expr) => {{
54 let c = IntCounter::new($name, $help).unwrap();
55 register_collector(reg, $name, &c);
56 c
57 }};
58 }
59 let pending_acks =
60 IntGauge::new("nats_source_pending_acks", "Unacked JetStream messages").unwrap();
61 register_collector(reg, "nats_source_pending_acks", &pending_acks);
62 let consumer_lag = IntGauge::new(
63 "nats_source_consumer_lag",
64 "Stream messages not yet delivered to the consumer",
65 )
66 .unwrap();
67 register_collector(reg, "nats_source_consumer_lag", &consumer_lag);
68
69 Self {
70 records_total: reg_c!("nats_source_records_total", "Records delivered"),
71 bytes_total: reg_c!("nats_source_bytes_total", "Payload bytes delivered"),
72 fetch_errors_total: reg_c!("nats_source_fetch_errors_total", "Fetch errors"),
73 acks_total: reg_c!("nats_source_acks_total", "Successful acks"),
74 ack_errors_total: reg_c!("nats_source_ack_errors_total", "Failed acks"),
75 pending_acks,
76 consumer_lag,
77 }
78 }
79
80 #[allow(missing_docs)]
81 pub fn record_poll(&self, records: u64, bytes: u64) {
82 self.records_total.inc_by(records);
83 self.bytes_total.inc_by(bytes);
84 }
85
86 #[allow(missing_docs)]
87 pub fn record_fetch_error(&self) {
88 self.fetch_errors_total.inc();
89 }
90
91 #[allow(missing_docs)]
92 pub fn record_ack(&self) {
93 self.acks_total.inc();
94 }
95
96 #[allow(missing_docs)]
97 pub fn record_ack_error(&self) {
98 self.ack_errors_total.inc();
99 }
100
101 #[allow(missing_docs, clippy::cast_possible_wrap)]
102 pub fn set_pending_acks(&self, n: usize) {
103 self.pending_acks.set(n as i64);
104 }
105
106 #[allow(missing_docs, clippy::cast_possible_wrap)]
107 pub fn set_consumer_lag(&self, n: u64) {
108 self.consumer_lag.set(n as i64);
109 }
110
111 #[must_use]
113 #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
114 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
115 let mut m = ConnectorMetrics {
116 records_total: self.records_total.get(),
117 bytes_total: self.bytes_total.get(),
118 errors_total: self.fetch_errors_total.get() + self.ack_errors_total.get(),
119 lag: self.consumer_lag.get().max(0) as u64,
120 custom: Vec::new(),
121 };
122 m.add_custom("nats.acks", self.acks_total.get() as f64);
123 m.add_custom("nats.ack_errors", self.ack_errors_total.get() as f64);
124 m.add_custom("nats.pending_acks", self.pending_acks.get() as f64);
125 m
126 }
127}
128
129#[derive(Debug, Clone)]
131#[allow(missing_docs)]
132pub struct NatsSinkMetrics {
133 pub records_total: IntCounter,
134 pub bytes_total: IntCounter,
135 pub publish_errors_total: IntCounter,
136 pub ack_errors_total: IntCounter,
137 pub dedup_total: IntCounter,
139 pub epochs_rolled_back: IntCounter,
140 pub pending_futures: IntGauge,
141}
142
143impl NatsSinkMetrics {
144 #[must_use]
146 #[allow(clippy::missing_panics_doc)]
147 pub fn new(registry: Option<&Registry>) -> Self {
148 let local;
149 let reg = if let Some(r) = registry {
150 r
151 } else {
152 local = Registry::new();
153 &local
154 };
155
156 macro_rules! reg_c {
157 ($name:expr, $help:expr) => {{
158 let c = IntCounter::new($name, $help).unwrap();
159 register_collector(reg, $name, &c);
160 c
161 }};
162 }
163 let pending_futures =
164 IntGauge::new("nats_sink_pending_futures", "Outstanding PublishAckFutures").unwrap();
165 register_collector(reg, "nats_sink_pending_futures", &pending_futures);
166
167 Self {
168 records_total: reg_c!("nats_sink_records_total", "Records published"),
169 bytes_total: reg_c!("nats_sink_bytes_total", "Payload bytes published"),
170 publish_errors_total: reg_c!("nats_sink_publish_errors_total", "Publish errors"),
171 ack_errors_total: reg_c!("nats_sink_ack_errors_total", "Publish-ack errors"),
172 dedup_total: reg_c!("nats_sink_dedup_total", "Broker-dropped duplicates"),
173 epochs_rolled_back: reg_c!("nats_sink_epochs_rolled_back_total", "Epochs rolled back"),
174 pending_futures,
175 }
176 }
177
178 #[allow(missing_docs)]
179 pub fn record_published_row(&self, bytes: u64) {
180 self.records_total.inc();
181 self.bytes_total.inc_by(bytes);
182 }
183
184 #[allow(missing_docs)]
185 pub fn record_publish_error(&self) {
186 self.publish_errors_total.inc();
187 }
188
189 #[allow(missing_docs)]
190 pub fn record_ack_error(&self) {
191 self.ack_errors_total.inc();
192 }
193
194 #[allow(missing_docs)]
195 pub fn record_dedup(&self) {
196 self.dedup_total.inc();
197 }
198
199 #[allow(missing_docs)]
200 pub fn record_rollback(&self) {
201 self.epochs_rolled_back.inc();
202 }
203
204 #[allow(missing_docs, clippy::cast_possible_wrap)]
205 pub fn set_pending_futures(&self, n: usize) {
206 self.pending_futures.set(n as i64);
207 }
208
209 #[must_use]
210 #[allow(missing_docs, clippy::cast_precision_loss, clippy::cast_sign_loss)]
211 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
212 let mut m = ConnectorMetrics {
213 records_total: self.records_total.get(),
214 bytes_total: self.bytes_total.get(),
215 errors_total: self.publish_errors_total.get() + self.ack_errors_total.get(),
216 lag: self.pending_futures.get() as u64,
217 custom: Vec::new(),
218 };
219 m.add_custom("nats.dedup", self.dedup_total.get() as f64);
220 m.add_custom(
221 "nats.epochs_rolled_back",
222 self.epochs_rolled_back.get() as f64,
223 );
224 m
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn source_initial_zero() {
234 let m = NatsSourceMetrics::new(None);
235 let cm = m.to_connector_metrics();
236 assert_eq!(cm.records_total, 0);
237 assert_eq!(cm.bytes_total, 0);
238 assert_eq!(cm.errors_total, 0);
239 assert_eq!(cm.lag, 0);
240 }
241
242 #[test]
243 fn source_records_increment() {
244 let m = NatsSourceMetrics::new(None);
245 m.record_poll(100, 4096);
246 m.record_poll(50, 1024);
247 m.record_ack();
248 m.record_ack();
249 m.record_fetch_error();
250 m.set_pending_acks(7);
251
252 m.set_consumer_lag(42);
253
254 let cm = m.to_connector_metrics();
255 assert_eq!(cm.records_total, 150);
256 assert_eq!(cm.bytes_total, 5120);
257 assert_eq!(cm.errors_total, 1); assert_eq!(cm.lag, 42, "lag reflects the latest consumer.info() poll");
259 assert!(cm.custom.iter().any(|(k, v)| k == "nats.acks" && *v == 2.0));
260 assert!(cm
261 .custom
262 .iter()
263 .any(|(k, v)| k == "nats.pending_acks" && *v == 7.0));
264 }
265
266 #[test]
267 fn sink_records_and_dedup() {
268 let m = NatsSinkMetrics::new(None);
269 for _ in 0..10 {
270 m.record_published_row(200);
271 }
272 m.record_dedup();
273 m.record_dedup();
274 m.record_rollback();
275 m.set_pending_futures(3);
276
277 let cm = m.to_connector_metrics();
278 assert_eq!(cm.records_total, 10);
279 assert_eq!(cm.bytes_total, 2000);
280 assert_eq!(cm.lag, 3);
281 assert!(cm
282 .custom
283 .iter()
284 .any(|(k, v)| k == "nats.dedup" && *v == 2.0));
285 assert!(cm
286 .custom
287 .iter()
288 .any(|(k, v)| k == "nats.epochs_rolled_back" && *v == 1.0));
289 }
290
291 #[test]
292 fn metrics_register_on_shared_registry() {
293 let reg = Registry::new();
294 let _s = NatsSourceMetrics::new(Some(®));
295 let _k = NatsSinkMetrics::new(Some(®));
296 let names: Vec<String> = reg.gather().iter().map(|f| f.name().to_string()).collect();
297 assert!(names.contains(&"nats_source_records_total".to_string()));
298 assert!(names.contains(&"nats_sink_records_total".to_string()));
299 assert!(names.contains(&"nats_sink_dedup_total".to_string()));
300 }
301}