Skip to main content

laminar_connectors/nats/
metrics.rs

1//! NATS connector metrics. No per-subject labels — subjects are
2//! wildcard-addressable and unbounded-cardinality.
3
4use prometheus::core::Collector;
5use prometheus::{Error as PromError, IntCounter, IntGauge, Registry};
6use tracing::warn;
7
8use crate::metrics::ConnectorMetrics;
9
10fn register_collector<C: Collector + Clone + 'static>(reg: &Registry, name: &str, c: &C) {
11    match reg.register(Box::new(c.clone())) {
12        Ok(()) => {}
13        // Multiple connectors on a shared registry can collide; the
14        // second registration silently drops so its counts won't scrape.
15        Err(PromError::AlreadyReg) => {
16            warn!(
17                metric = name,
18                "metric already registered; use separate registries per connector"
19            );
20        }
21        Err(e) => warn!(metric = name, error = ?e, "failed to register metric"),
22    }
23}
24
25/// Prometheus counters for the NATS source.
26#[derive(Debug, Clone)]
27#[allow(missing_docs)]
28pub struct NatsSourceMetrics {
29    pub records_total: IntCounter,
30    pub bytes_total: IntCounter,
31    pub fetch_errors_total: IntCounter,
32    pub acks_total: IntCounter,
33    pub ack_errors_total: IntCounter,
34    pub pending_acks: IntGauge,
35    /// Stream messages not yet delivered to the consumer.
36    pub consumer_lag: IntGauge,
37}
38
39impl NatsSourceMetrics {
40    /// Registers on `registry` if provided; otherwise on a local one.
41    #[must_use]
42    #[allow(clippy::missing_panics_doc)]
43    pub fn new(registry: Option<&Registry>) -> Self {
44        let local;
45        let reg = if let Some(r) = registry {
46            r
47        } else {
48            local = Registry::new();
49            &local
50        };
51
52        macro_rules! reg_c {
53            ($name:expr, $help:expr) => {{
54                let c = IntCounter::new($name, $help).unwrap();
55                register_collector(reg, $name, &c);
56                c
57            }};
58        }
59        let pending_acks =
60            IntGauge::new("nats_source_pending_acks", "Unacked JetStream messages").unwrap();
61        register_collector(reg, "nats_source_pending_acks", &pending_acks);
62        let consumer_lag = IntGauge::new(
63            "nats_source_consumer_lag",
64            "Stream messages not yet delivered to the consumer",
65        )
66        .unwrap();
67        register_collector(reg, "nats_source_consumer_lag", &consumer_lag);
68
69        Self {
70            records_total: reg_c!("nats_source_records_total", "Records delivered"),
71            bytes_total: reg_c!("nats_source_bytes_total", "Payload bytes delivered"),
72            fetch_errors_total: reg_c!("nats_source_fetch_errors_total", "Fetch errors"),
73            acks_total: reg_c!("nats_source_acks_total", "Successful acks"),
74            ack_errors_total: reg_c!("nats_source_ack_errors_total", "Failed acks"),
75            pending_acks,
76            consumer_lag,
77        }
78    }
79
80    #[allow(missing_docs)]
81    pub fn record_poll(&self, records: u64, bytes: u64) {
82        self.records_total.inc_by(records);
83        self.bytes_total.inc_by(bytes);
84    }
85
86    #[allow(missing_docs)]
87    pub fn record_fetch_error(&self) {
88        self.fetch_errors_total.inc();
89    }
90
91    #[allow(missing_docs)]
92    pub fn record_ack(&self) {
93        self.acks_total.inc();
94    }
95
96    #[allow(missing_docs)]
97    pub fn record_ack_error(&self) {
98        self.ack_errors_total.inc();
99    }
100
101    #[allow(missing_docs, clippy::cast_possible_wrap)]
102    pub fn set_pending_acks(&self, n: usize) {
103        self.pending_acks.set(n as i64);
104    }
105
106    #[allow(missing_docs, clippy::cast_possible_wrap)]
107    pub fn set_consumer_lag(&self, n: u64) {
108        self.consumer_lag.set(n as i64);
109    }
110
111    /// `lag` comes from the most recent `consumer.info()` poll.
112    #[must_use]
113    #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
114    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
115        let mut m = ConnectorMetrics {
116            records_total: self.records_total.get(),
117            bytes_total: self.bytes_total.get(),
118            errors_total: self.fetch_errors_total.get() + self.ack_errors_total.get(),
119            lag: self.consumer_lag.get().max(0) as u64,
120            custom: Vec::new(),
121        };
122        m.add_custom("nats.acks", self.acks_total.get() as f64);
123        m.add_custom("nats.ack_errors", self.ack_errors_total.get() as f64);
124        m.add_custom("nats.pending_acks", self.pending_acks.get() as f64);
125        m
126    }
127}
128
129/// Prometheus counters for the NATS sink.
130#[derive(Debug, Clone)]
131#[allow(missing_docs)]
132pub struct NatsSinkMetrics {
133    pub records_total: IntCounter,
134    pub bytes_total: IntCounter,
135    pub publish_errors_total: IntCounter,
136    pub ack_errors_total: IntCounter,
137    /// Publishes the broker dropped as `Nats-Msg-Id` duplicates.
138    pub dedup_total: IntCounter,
139    pub epochs_rolled_back: IntCounter,
140    pub pending_futures: IntGauge,
141}
142
143impl NatsSinkMetrics {
144    /// Registers on `registry` if provided; otherwise on a local one.
145    #[must_use]
146    #[allow(clippy::missing_panics_doc)]
147    pub fn new(registry: Option<&Registry>) -> Self {
148        let local;
149        let reg = if let Some(r) = registry {
150            r
151        } else {
152            local = Registry::new();
153            &local
154        };
155
156        macro_rules! reg_c {
157            ($name:expr, $help:expr) => {{
158                let c = IntCounter::new($name, $help).unwrap();
159                register_collector(reg, $name, &c);
160                c
161            }};
162        }
163        let pending_futures =
164            IntGauge::new("nats_sink_pending_futures", "Outstanding PublishAckFutures").unwrap();
165        register_collector(reg, "nats_sink_pending_futures", &pending_futures);
166
167        Self {
168            records_total: reg_c!("nats_sink_records_total", "Records published"),
169            bytes_total: reg_c!("nats_sink_bytes_total", "Payload bytes published"),
170            publish_errors_total: reg_c!("nats_sink_publish_errors_total", "Publish errors"),
171            ack_errors_total: reg_c!("nats_sink_ack_errors_total", "Publish-ack errors"),
172            dedup_total: reg_c!("nats_sink_dedup_total", "Broker-dropped duplicates"),
173            epochs_rolled_back: reg_c!("nats_sink_epochs_rolled_back_total", "Epochs rolled back"),
174            pending_futures,
175        }
176    }
177
178    #[allow(missing_docs)]
179    pub fn record_published_row(&self, bytes: u64) {
180        self.records_total.inc();
181        self.bytes_total.inc_by(bytes);
182    }
183
184    #[allow(missing_docs)]
185    pub fn record_publish_error(&self) {
186        self.publish_errors_total.inc();
187    }
188
189    #[allow(missing_docs)]
190    pub fn record_ack_error(&self) {
191        self.ack_errors_total.inc();
192    }
193
194    #[allow(missing_docs)]
195    pub fn record_dedup(&self) {
196        self.dedup_total.inc();
197    }
198
199    #[allow(missing_docs)]
200    pub fn record_rollback(&self) {
201        self.epochs_rolled_back.inc();
202    }
203
204    #[allow(missing_docs, clippy::cast_possible_wrap)]
205    pub fn set_pending_futures(&self, n: usize) {
206        self.pending_futures.set(n as i64);
207    }
208
209    #[must_use]
210    #[allow(missing_docs, clippy::cast_precision_loss, clippy::cast_sign_loss)]
211    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
212        let mut m = ConnectorMetrics {
213            records_total: self.records_total.get(),
214            bytes_total: self.bytes_total.get(),
215            errors_total: self.publish_errors_total.get() + self.ack_errors_total.get(),
216            lag: self.pending_futures.get() as u64,
217            custom: Vec::new(),
218        };
219        m.add_custom("nats.dedup", self.dedup_total.get() as f64);
220        m.add_custom(
221            "nats.epochs_rolled_back",
222            self.epochs_rolled_back.get() as f64,
223        );
224        m
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn source_initial_zero() {
234        let m = NatsSourceMetrics::new(None);
235        let cm = m.to_connector_metrics();
236        assert_eq!(cm.records_total, 0);
237        assert_eq!(cm.bytes_total, 0);
238        assert_eq!(cm.errors_total, 0);
239        assert_eq!(cm.lag, 0);
240    }
241
242    #[test]
243    fn source_records_increment() {
244        let m = NatsSourceMetrics::new(None);
245        m.record_poll(100, 4096);
246        m.record_poll(50, 1024);
247        m.record_ack();
248        m.record_ack();
249        m.record_fetch_error();
250        m.set_pending_acks(7);
251
252        m.set_consumer_lag(42);
253
254        let cm = m.to_connector_metrics();
255        assert_eq!(cm.records_total, 150);
256        assert_eq!(cm.bytes_total, 5120);
257        assert_eq!(cm.errors_total, 1); // fetch error
258        assert_eq!(cm.lag, 42, "lag reflects the latest consumer.info() poll");
259        assert!(cm.custom.iter().any(|(k, v)| k == "nats.acks" && *v == 2.0));
260        assert!(cm
261            .custom
262            .iter()
263            .any(|(k, v)| k == "nats.pending_acks" && *v == 7.0));
264    }
265
266    #[test]
267    fn sink_records_and_dedup() {
268        let m = NatsSinkMetrics::new(None);
269        for _ in 0..10 {
270            m.record_published_row(200);
271        }
272        m.record_dedup();
273        m.record_dedup();
274        m.record_rollback();
275        m.set_pending_futures(3);
276
277        let cm = m.to_connector_metrics();
278        assert_eq!(cm.records_total, 10);
279        assert_eq!(cm.bytes_total, 2000);
280        assert_eq!(cm.lag, 3);
281        assert!(cm
282            .custom
283            .iter()
284            .any(|(k, v)| k == "nats.dedup" && *v == 2.0));
285        assert!(cm
286            .custom
287            .iter()
288            .any(|(k, v)| k == "nats.epochs_rolled_back" && *v == 1.0));
289    }
290
291    #[test]
292    fn metrics_register_on_shared_registry() {
293        let reg = Registry::new();
294        let _s = NatsSourceMetrics::new(Some(&reg));
295        let _k = NatsSinkMetrics::new(Some(&reg));
296        let names: Vec<String> = reg.gather().iter().map(|f| f.name().to_string()).collect();
297        assert!(names.contains(&"nats_source_records_total".to_string()));
298        assert!(names.contains(&"nats_sink_records_total".to_string()));
299        assert!(names.contains(&"nats_sink_dedup_total".to_string()));
300    }
301}