1use prometheus::core::Collector;
5use prometheus::{Error as PromError, IntCounter, IntGauge, Registry};
6use tracing::warn;
7
8use crate::prom::reg_or_local;
9
10fn register_collector<C: Collector + Clone + 'static>(reg: &Registry, name: &str, c: &C) {
11 match reg.register(Box::new(c.clone())) {
12 Ok(()) => {}
13 Err(PromError::AlreadyReg) => {
16 warn!(
17 metric = name,
18 "metric already registered; use separate registries per connector"
19 );
20 }
21 Err(e) => warn!(metric = name, error = ?e, "failed to register metric"),
22 }
23}
24
25#[derive(Debug, Clone)]
27#[allow(missing_docs)]
28pub struct NatsSourceMetrics {
29 pub records_total: IntCounter,
30 pub bytes_total: IntCounter,
31 pub fetch_errors_total: IntCounter,
32 pub acks_total: IntCounter,
33 pub ack_errors_total: IntCounter,
34 pub pending_acks: IntGauge,
35 pub consumer_lag: IntGauge,
37}
38
39impl NatsSourceMetrics {
40 #[must_use]
42 #[allow(clippy::missing_panics_doc)]
43 pub fn new(registry: Option<&Registry>) -> Self {
44 let mut local = None;
45 let handle = reg_or_local(registry, &mut local);
46 let reg = handle.registry();
47
48 macro_rules! reg_c {
49 ($name:expr, $help:expr) => {{
50 let c = IntCounter::new($name, $help).unwrap();
51 register_collector(reg, $name, &c);
52 c
53 }};
54 }
55 let pending_acks =
56 IntGauge::new("nats_source_pending_acks", "Unacked JetStream messages").unwrap();
57 register_collector(reg, "nats_source_pending_acks", &pending_acks);
58 let consumer_lag = IntGauge::new(
59 "nats_source_consumer_lag",
60 "Stream messages not yet delivered to the consumer",
61 )
62 .unwrap();
63 register_collector(reg, "nats_source_consumer_lag", &consumer_lag);
64
65 Self {
66 records_total: reg_c!("nats_source_records_total", "Records delivered"),
67 bytes_total: reg_c!("nats_source_bytes_total", "Payload bytes delivered"),
68 fetch_errors_total: reg_c!("nats_source_fetch_errors_total", "Fetch errors"),
69 acks_total: reg_c!("nats_source_acks_total", "Successful acks"),
70 ack_errors_total: reg_c!("nats_source_ack_errors_total", "Failed acks"),
71 pending_acks,
72 consumer_lag,
73 }
74 }
75
76 #[allow(missing_docs)]
77 pub fn record_poll(&self, records: u64, bytes: u64) {
78 self.records_total.inc_by(records);
79 self.bytes_total.inc_by(bytes);
80 }
81
82 #[allow(missing_docs)]
83 pub fn record_fetch_error(&self) {
84 self.fetch_errors_total.inc();
85 }
86
87 #[allow(missing_docs)]
88 pub fn record_ack(&self) {
89 self.acks_total.inc();
90 }
91
92 #[allow(missing_docs)]
93 pub fn record_ack_error(&self) {
94 self.ack_errors_total.inc();
95 }
96
97 #[allow(missing_docs, clippy::cast_possible_wrap)]
98 pub fn set_pending_acks(&self, n: usize) {
99 self.pending_acks.set(n as i64);
100 }
101
102 #[allow(missing_docs, clippy::cast_possible_wrap)]
103 pub fn set_consumer_lag(&self, n: u64) {
104 self.consumer_lag.set(n as i64);
105 }
106}
107
108#[derive(Debug, Clone)]
110#[allow(missing_docs)]
111pub struct NatsSinkMetrics {
112 pub records_total: IntCounter,
113 pub bytes_total: IntCounter,
114 pub publish_errors_total: IntCounter,
115 pub ack_errors_total: IntCounter,
116 pub dedup_total: IntCounter,
118 pub epochs_rolled_back: IntCounter,
119 pub pending_futures: IntGauge,
120}
121
122impl NatsSinkMetrics {
123 #[must_use]
125 #[allow(clippy::missing_panics_doc)]
126 pub fn new(registry: Option<&Registry>) -> Self {
127 let mut local = None;
128 let handle = reg_or_local(registry, &mut local);
129 let reg = handle.registry();
130
131 macro_rules! reg_c {
132 ($name:expr, $help:expr) => {{
133 let c = IntCounter::new($name, $help).unwrap();
134 register_collector(reg, $name, &c);
135 c
136 }};
137 }
138 let pending_futures =
139 IntGauge::new("nats_sink_pending_futures", "Outstanding PublishAckFutures").unwrap();
140 register_collector(reg, "nats_sink_pending_futures", &pending_futures);
141
142 Self {
143 records_total: reg_c!("nats_sink_records_total", "Records published"),
144 bytes_total: reg_c!("nats_sink_bytes_total", "Payload bytes published"),
145 publish_errors_total: reg_c!("nats_sink_publish_errors_total", "Publish errors"),
146 ack_errors_total: reg_c!("nats_sink_ack_errors_total", "Publish-ack errors"),
147 dedup_total: reg_c!("nats_sink_dedup_total", "Broker-dropped duplicates"),
148 epochs_rolled_back: reg_c!("nats_sink_epochs_rolled_back_total", "Epochs rolled back"),
149 pending_futures,
150 }
151 }
152
153 #[allow(missing_docs)]
154 pub fn record_published_row(&self, bytes: u64) {
155 self.records_total.inc();
156 self.bytes_total.inc_by(bytes);
157 }
158
159 #[allow(missing_docs)]
160 pub fn record_publish_error(&self) {
161 self.publish_errors_total.inc();
162 }
163
164 #[allow(missing_docs)]
165 pub fn record_ack_error(&self) {
166 self.ack_errors_total.inc();
167 }
168
169 #[allow(missing_docs)]
170 pub fn record_dedup(&self) {
171 self.dedup_total.inc();
172 }
173
174 #[allow(missing_docs)]
175 pub fn record_rollback(&self) {
176 self.epochs_rolled_back.inc();
177 }
178
179 #[allow(missing_docs, clippy::cast_possible_wrap)]
180 pub fn set_pending_futures(&self, n: usize) {
181 self.pending_futures.set(n as i64);
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[test]
190 fn source_initial_zero() {
191 let m = NatsSourceMetrics::new(None);
192 assert_eq!(m.records_total.get(), 0);
193 assert_eq!(m.bytes_total.get(), 0);
194 assert_eq!(m.fetch_errors_total.get(), 0);
195 assert_eq!(m.consumer_lag.get(), 0);
196 }
197
198 #[test]
199 fn source_records_increment() {
200 let m = NatsSourceMetrics::new(None);
201 m.record_poll(100, 4096);
202 m.record_poll(50, 1024);
203 m.record_ack();
204 m.record_ack();
205 m.record_fetch_error();
206 m.set_pending_acks(7);
207
208 m.set_consumer_lag(42);
209
210 assert_eq!(m.records_total.get(), 150);
211 assert_eq!(m.bytes_total.get(), 5120);
212 assert_eq!(m.fetch_errors_total.get(), 1);
213 assert_eq!(m.consumer_lag.get(), 42);
214 assert_eq!(m.acks_total.get(), 2);
215 assert_eq!(m.pending_acks.get(), 7);
216 }
217
218 #[test]
219 fn sink_records_and_dedup() {
220 let m = NatsSinkMetrics::new(None);
221 for _ in 0..10 {
222 m.record_published_row(200);
223 }
224 m.record_dedup();
225 m.record_dedup();
226 m.record_rollback();
227 m.set_pending_futures(3);
228
229 assert_eq!(m.records_total.get(), 10);
230 assert_eq!(m.bytes_total.get(), 2000);
231 assert_eq!(m.pending_futures.get(), 3);
232 assert_eq!(m.dedup_total.get(), 2);
233 assert_eq!(m.epochs_rolled_back.get(), 1);
234 }
235
236 #[test]
237 fn metrics_register_on_shared_registry() {
238 let reg = Registry::new();
239 let _s = NatsSourceMetrics::new(Some(®));
240 let _k = NatsSinkMetrics::new(Some(®));
241 let names: Vec<String> = reg.gather().iter().map(|f| f.name().to_string()).collect();
242 assert!(names.contains(&"nats_source_records_total".to_string()));
243 assert!(names.contains(&"nats_sink_records_total".to_string()));
244 assert!(names.contains(&"nats_sink_dedup_total".to_string()));
245 }
246}