Skip to main content

laminar_connectors/nats/
metrics.rs

1//! NATS connector metrics. No per-subject labels — subjects are
2//! wildcard-addressable and unbounded-cardinality.
3
4use prometheus::core::Collector;
5use prometheus::{Error as PromError, IntCounter, IntGauge, Registry};
6use tracing::warn;
7
8use crate::prom::reg_or_local;
9
10fn register_collector<C: Collector + Clone + 'static>(reg: &Registry, name: &str, c: &C) {
11    match reg.register(Box::new(c.clone())) {
12        Ok(()) => {}
13        // Multiple connectors on a shared registry can collide; the
14        // second registration silently drops so its counts won't scrape.
15        Err(PromError::AlreadyReg) => {
16            warn!(
17                metric = name,
18                "metric already registered; use separate registries per connector"
19            );
20        }
21        Err(e) => warn!(metric = name, error = ?e, "failed to register metric"),
22    }
23}
24
25/// Prometheus counters for the NATS source.
26#[derive(Debug, Clone)]
27#[allow(missing_docs)]
28pub struct NatsSourceMetrics {
29    pub records_total: IntCounter,
30    pub bytes_total: IntCounter,
31    pub fetch_errors_total: IntCounter,
32    pub acks_total: IntCounter,
33    pub ack_errors_total: IntCounter,
34    pub pending_acks: IntGauge,
35    /// Stream messages not yet delivered to the consumer.
36    pub consumer_lag: IntGauge,
37}
38
39impl NatsSourceMetrics {
40    /// Registers on `registry` if provided; otherwise on a local one.
41    #[must_use]
42    #[allow(clippy::missing_panics_doc)]
43    pub fn new(registry: Option<&Registry>) -> Self {
44        let mut local = None;
45        let handle = reg_or_local(registry, &mut local);
46        let reg = handle.registry();
47
48        macro_rules! reg_c {
49            ($name:expr, $help:expr) => {{
50                let c = IntCounter::new($name, $help).unwrap();
51                register_collector(reg, $name, &c);
52                c
53            }};
54        }
55        let pending_acks =
56            IntGauge::new("nats_source_pending_acks", "Unacked JetStream messages").unwrap();
57        register_collector(reg, "nats_source_pending_acks", &pending_acks);
58        let consumer_lag = IntGauge::new(
59            "nats_source_consumer_lag",
60            "Stream messages not yet delivered to the consumer",
61        )
62        .unwrap();
63        register_collector(reg, "nats_source_consumer_lag", &consumer_lag);
64
65        Self {
66            records_total: reg_c!("nats_source_records_total", "Records delivered"),
67            bytes_total: reg_c!("nats_source_bytes_total", "Payload bytes delivered"),
68            fetch_errors_total: reg_c!("nats_source_fetch_errors_total", "Fetch errors"),
69            acks_total: reg_c!("nats_source_acks_total", "Successful acks"),
70            ack_errors_total: reg_c!("nats_source_ack_errors_total", "Failed acks"),
71            pending_acks,
72            consumer_lag,
73        }
74    }
75
76    #[allow(missing_docs)]
77    pub fn record_poll(&self, records: u64, bytes: u64) {
78        self.records_total.inc_by(records);
79        self.bytes_total.inc_by(bytes);
80    }
81
82    #[allow(missing_docs)]
83    pub fn record_fetch_error(&self) {
84        self.fetch_errors_total.inc();
85    }
86
87    #[allow(missing_docs)]
88    pub fn record_ack(&self) {
89        self.acks_total.inc();
90    }
91
92    #[allow(missing_docs)]
93    pub fn record_ack_error(&self) {
94        self.ack_errors_total.inc();
95    }
96
97    #[allow(missing_docs, clippy::cast_possible_wrap)]
98    pub fn set_pending_acks(&self, n: usize) {
99        self.pending_acks.set(n as i64);
100    }
101
102    #[allow(missing_docs, clippy::cast_possible_wrap)]
103    pub fn set_consumer_lag(&self, n: u64) {
104        self.consumer_lag.set(n as i64);
105    }
106}
107
108/// Prometheus counters for the NATS sink.
109#[derive(Debug, Clone)]
110#[allow(missing_docs)]
111pub struct NatsSinkMetrics {
112    pub records_total: IntCounter,
113    pub bytes_total: IntCounter,
114    pub publish_errors_total: IntCounter,
115    pub ack_errors_total: IntCounter,
116    /// Publishes the broker dropped as `Nats-Msg-Id` duplicates.
117    pub dedup_total: IntCounter,
118    pub epochs_rolled_back: IntCounter,
119    pub pending_futures: IntGauge,
120}
121
122impl NatsSinkMetrics {
123    /// Registers on `registry` if provided; otherwise on a local one.
124    #[must_use]
125    #[allow(clippy::missing_panics_doc)]
126    pub fn new(registry: Option<&Registry>) -> Self {
127        let mut local = None;
128        let handle = reg_or_local(registry, &mut local);
129        let reg = handle.registry();
130
131        macro_rules! reg_c {
132            ($name:expr, $help:expr) => {{
133                let c = IntCounter::new($name, $help).unwrap();
134                register_collector(reg, $name, &c);
135                c
136            }};
137        }
138        let pending_futures =
139            IntGauge::new("nats_sink_pending_futures", "Outstanding PublishAckFutures").unwrap();
140        register_collector(reg, "nats_sink_pending_futures", &pending_futures);
141
142        Self {
143            records_total: reg_c!("nats_sink_records_total", "Records published"),
144            bytes_total: reg_c!("nats_sink_bytes_total", "Payload bytes published"),
145            publish_errors_total: reg_c!("nats_sink_publish_errors_total", "Publish errors"),
146            ack_errors_total: reg_c!("nats_sink_ack_errors_total", "Publish-ack errors"),
147            dedup_total: reg_c!("nats_sink_dedup_total", "Broker-dropped duplicates"),
148            epochs_rolled_back: reg_c!("nats_sink_epochs_rolled_back_total", "Epochs rolled back"),
149            pending_futures,
150        }
151    }
152
153    #[allow(missing_docs)]
154    pub fn record_published_row(&self, bytes: u64) {
155        self.records_total.inc();
156        self.bytes_total.inc_by(bytes);
157    }
158
159    #[allow(missing_docs)]
160    pub fn record_publish_error(&self) {
161        self.publish_errors_total.inc();
162    }
163
164    #[allow(missing_docs)]
165    pub fn record_ack_error(&self) {
166        self.ack_errors_total.inc();
167    }
168
169    #[allow(missing_docs)]
170    pub fn record_dedup(&self) {
171        self.dedup_total.inc();
172    }
173
174    #[allow(missing_docs)]
175    pub fn record_rollback(&self) {
176        self.epochs_rolled_back.inc();
177    }
178
179    #[allow(missing_docs, clippy::cast_possible_wrap)]
180    pub fn set_pending_futures(&self, n: usize) {
181        self.pending_futures.set(n as i64);
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn source_initial_zero() {
191        let m = NatsSourceMetrics::new(None);
192        assert_eq!(m.records_total.get(), 0);
193        assert_eq!(m.bytes_total.get(), 0);
194        assert_eq!(m.fetch_errors_total.get(), 0);
195        assert_eq!(m.consumer_lag.get(), 0);
196    }
197
198    #[test]
199    fn source_records_increment() {
200        let m = NatsSourceMetrics::new(None);
201        m.record_poll(100, 4096);
202        m.record_poll(50, 1024);
203        m.record_ack();
204        m.record_ack();
205        m.record_fetch_error();
206        m.set_pending_acks(7);
207
208        m.set_consumer_lag(42);
209
210        assert_eq!(m.records_total.get(), 150);
211        assert_eq!(m.bytes_total.get(), 5120);
212        assert_eq!(m.fetch_errors_total.get(), 1);
213        assert_eq!(m.consumer_lag.get(), 42);
214        assert_eq!(m.acks_total.get(), 2);
215        assert_eq!(m.pending_acks.get(), 7);
216    }
217
218    #[test]
219    fn sink_records_and_dedup() {
220        let m = NatsSinkMetrics::new(None);
221        for _ in 0..10 {
222            m.record_published_row(200);
223        }
224        m.record_dedup();
225        m.record_dedup();
226        m.record_rollback();
227        m.set_pending_futures(3);
228
229        assert_eq!(m.records_total.get(), 10);
230        assert_eq!(m.bytes_total.get(), 2000);
231        assert_eq!(m.pending_futures.get(), 3);
232        assert_eq!(m.dedup_total.get(), 2);
233        assert_eq!(m.epochs_rolled_back.get(), 1);
234    }
235
236    #[test]
237    fn metrics_register_on_shared_registry() {
238        let reg = Registry::new();
239        let _s = NatsSourceMetrics::new(Some(&reg));
240        let _k = NatsSinkMetrics::new(Some(&reg));
241        let names: Vec<String> = reg.gather().iter().map(|f| f.name().to_string()).collect();
242        assert!(names.contains(&"nats_source_records_total".to_string()));
243        assert!(names.contains(&"nats_sink_records_total".to_string()));
244        assert!(names.contains(&"nats_sink_dedup_total".to_string()));
245    }
246}