Skip to main content

laminar_connectors/websocket/
metrics.rs

1//! `WebSocket` source connector metrics.
2//!
3//! [`WebSocketSourceMetrics`] provides prometheus-backed counters and gauges
4//! for tracking WebSocket source statistics.
5
6use prometheus::{IntCounter, IntGauge, Registry};
7
8use crate::prom::reg_or_local;
9
10/// Prometheus-backed counters/gauges for WebSocket source connector statistics.
11#[derive(Debug, Clone)]
12pub struct WebSocketSourceMetrics {
13    /// Total messages received from the WebSocket connection.
14    pub messages_received: IntCounter,
15    /// Total messages dropped due to backpressure.
16    pub messages_dropped_backpressure: IntCounter,
17    /// Total bytes received (raw payload, before parsing).
18    pub bytes_received: IntCounter,
19    /// Total number of reconnection attempts.
20    pub reconnect_count: IntCounter,
21    /// Total parse/deserialization errors.
22    pub parse_errors: IntCounter,
23    /// Total detected sequence gaps (application-level).
24    pub sequence_gaps: IntCounter,
25    /// Current number of connected clients (for server-mode source).
26    pub connected_clients: IntGauge,
27}
28
29impl WebSocketSourceMetrics {
30    /// Creates a new metrics instance with all counters at zero.
31    #[must_use]
32    #[allow(clippy::missing_panics_doc)]
33    pub fn new(registry: Option<&Registry>) -> Self {
34        let mut local = None;
35        let reg = reg_or_local(registry, &mut local);
36
37        Self {
38            messages_received: reg.counter(
39                "ws_source_messages_received_total",
40                "Total WS messages received",
41            ),
42            messages_dropped_backpressure: reg.counter(
43                "ws_source_messages_dropped_backpressure_total",
44                "Total WS messages dropped (backpressure)",
45            ),
46            bytes_received: reg
47                .counter("ws_source_bytes_received_total", "Total WS bytes received"),
48            reconnect_count: reg.counter(
49                "ws_source_reconnect_total",
50                "Total WS reconnection attempts",
51            ),
52            parse_errors: reg.counter(
53                "ws_source_parse_errors_total",
54                "Total WS parse/deserialization errors",
55            ),
56            sequence_gaps: reg.counter(
57                "ws_source_sequence_gaps_total",
58                "Total WS sequence gaps detected",
59            ),
60            connected_clients: reg.gauge(
61                "ws_source_connected_clients",
62                "Current connected WS clients (server mode)",
63            ),
64        }
65    }
66
67    /// Records a successfully received message with the given payload size.
68    pub fn record_message(&self, bytes: u64) {
69        self.messages_received.inc();
70        self.bytes_received.inc_by(bytes);
71    }
72
73    /// Records a message dropped due to backpressure.
74    pub fn record_drop(&self) {
75        self.messages_dropped_backpressure.inc();
76    }
77
78    /// Records a reconnection attempt.
79    pub fn record_reconnect(&self) {
80        self.reconnect_count.inc();
81    }
82
83    /// Records a parse/deserialization error.
84    pub fn record_parse_error(&self) {
85        self.parse_errors.inc();
86    }
87
88    /// Records a detected sequence gap.
89    pub fn record_sequence_gap(&self) {
90        self.sequence_gaps.inc();
91    }
92
93    /// Sets the current number of connected clients (server-mode source).
94    #[allow(clippy::cast_possible_wrap)]
95    pub fn set_connected_clients(&self, n: u64) {
96        self.connected_clients.set(n as i64);
97    }
98}
99
100impl Default for WebSocketSourceMetrics {
101    fn default() -> Self {
102        Self::new(None)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn test_initial_zeros() {
112        let m = WebSocketSourceMetrics::new(None);
113        assert_eq!(m.messages_received.get(), 0);
114        assert_eq!(m.bytes_received.get(), 0);
115        assert_eq!(m.parse_errors.get(), 0);
116    }
117
118    #[test]
119    fn test_record_message() {
120        let m = WebSocketSourceMetrics::new(None);
121        m.record_message(1024);
122        m.record_message(2048);
123
124        assert_eq!(m.messages_received.get(), 2);
125        assert_eq!(m.bytes_received.get(), 3072);
126    }
127
128    #[test]
129    fn test_record_drop() {
130        let m = WebSocketSourceMetrics::new(None);
131        m.record_drop();
132        m.record_drop();
133        m.record_drop();
134
135        assert_eq!(m.messages_dropped_backpressure.get(), 3);
136    }
137
138    #[test]
139    fn test_record_reconnect() {
140        let m = WebSocketSourceMetrics::new(None);
141        m.record_reconnect();
142        m.record_reconnect();
143
144        assert_eq!(m.reconnect_count.get(), 2);
145    }
146
147    #[test]
148    fn test_record_parse_error() {
149        let m = WebSocketSourceMetrics::new(None);
150        m.record_parse_error();
151
152        assert_eq!(m.parse_errors.get(), 1);
153    }
154
155    #[test]
156    fn test_record_sequence_gap() {
157        let m = WebSocketSourceMetrics::new(None);
158        m.record_sequence_gap();
159        m.record_sequence_gap();
160
161        assert_eq!(m.sequence_gaps.get(), 2);
162    }
163
164    #[test]
165    fn test_set_connected_clients() {
166        let m = WebSocketSourceMetrics::new(None);
167        m.set_connected_clients(5);
168
169        assert_eq!(m.connected_clients.get(), 5);
170
171        // Verify it overwrites (not accumulates)
172        m.set_connected_clients(3);
173        assert_eq!(m.connected_clients.get(), 3);
174    }
175
176    #[test]
177    fn test_default() {
178        let m = WebSocketSourceMetrics::default();
179        assert_eq!(m.messages_received.get(), 0);
180        assert_eq!(m.bytes_received.get(), 0);
181        assert_eq!(m.parse_errors.get(), 0);
182    }
183
184    #[test]
185    fn test_combined_operations() {
186        let m = WebSocketSourceMetrics::new(None);
187        m.record_message(100);
188        m.record_message(200);
189        m.record_drop();
190        m.record_reconnect();
191        m.record_parse_error();
192        m.record_sequence_gap();
193        m.set_connected_clients(10);
194
195        assert_eq!(m.messages_received.get(), 2);
196        assert_eq!(m.bytes_received.get(), 300);
197        assert_eq!(m.parse_errors.get(), 1);
198    }
199}