Skip to main content

laminar_connectors/websocket/
metrics.rs

1//! `WebSocket` source connector metrics.
2//!
3//! [`WebSocketSourceMetrics`] provides prometheus-backed counters and gauges
4//! for tracking WebSocket source statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use prometheus::{IntCounter, IntGauge, Registry};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Prometheus-backed counters/gauges for WebSocket source connector statistics.
12#[derive(Debug, Clone)]
13pub struct WebSocketSourceMetrics {
14    /// Total messages received from the WebSocket connection.
15    pub messages_received: IntCounter,
16    /// Total messages dropped due to backpressure.
17    pub messages_dropped_backpressure: IntCounter,
18    /// Total bytes received (raw payload, before parsing).
19    pub bytes_received: IntCounter,
20    /// Total number of reconnection attempts.
21    pub reconnect_count: IntCounter,
22    /// Total parse/deserialization errors.
23    pub parse_errors: IntCounter,
24    /// Total detected sequence gaps (application-level).
25    pub sequence_gaps: IntCounter,
26    /// Current number of connected clients (for server-mode source).
27    pub connected_clients: IntGauge,
28}
29
30impl WebSocketSourceMetrics {
31    /// Creates a new metrics instance with all counters at zero.
32    #[must_use]
33    #[allow(clippy::missing_panics_doc)]
34    pub fn new(registry: Option<&Registry>) -> Self {
35        let local;
36        let reg = if let Some(r) = registry {
37            r
38        } else {
39            local = Registry::new();
40            &local
41        };
42
43        let messages_received = IntCounter::new(
44            "ws_source_messages_received_total",
45            "Total WS messages received",
46        )
47        .unwrap();
48        let messages_dropped_backpressure = IntCounter::new(
49            "ws_source_messages_dropped_backpressure_total",
50            "Total WS messages dropped (backpressure)",
51        )
52        .unwrap();
53        let bytes_received =
54            IntCounter::new("ws_source_bytes_received_total", "Total WS bytes received").unwrap();
55        let reconnect_count = IntCounter::new(
56            "ws_source_reconnect_total",
57            "Total WS reconnection attempts",
58        )
59        .unwrap();
60        let parse_errors = IntCounter::new(
61            "ws_source_parse_errors_total",
62            "Total WS parse/deserialization errors",
63        )
64        .unwrap();
65        let sequence_gaps = IntCounter::new(
66            "ws_source_sequence_gaps_total",
67            "Total WS sequence gaps detected",
68        )
69        .unwrap();
70        let connected_clients = IntGauge::new(
71            "ws_source_connected_clients",
72            "Current connected WS clients (server mode)",
73        )
74        .unwrap();
75
76        let _ = reg.register(Box::new(messages_received.clone()));
77        let _ = reg.register(Box::new(messages_dropped_backpressure.clone()));
78        let _ = reg.register(Box::new(bytes_received.clone()));
79        let _ = reg.register(Box::new(reconnect_count.clone()));
80        let _ = reg.register(Box::new(parse_errors.clone()));
81        let _ = reg.register(Box::new(sequence_gaps.clone()));
82        let _ = reg.register(Box::new(connected_clients.clone()));
83
84        Self {
85            messages_received,
86            messages_dropped_backpressure,
87            bytes_received,
88            reconnect_count,
89            parse_errors,
90            sequence_gaps,
91            connected_clients,
92        }
93    }
94
95    /// Records a successfully received message with the given payload size.
96    pub fn record_message(&self, bytes: u64) {
97        self.messages_received.inc();
98        self.bytes_received.inc_by(bytes);
99    }
100
101    /// Records a message dropped due to backpressure.
102    pub fn record_drop(&self) {
103        self.messages_dropped_backpressure.inc();
104    }
105
106    /// Records a reconnection attempt.
107    pub fn record_reconnect(&self) {
108        self.reconnect_count.inc();
109    }
110
111    /// Records a parse/deserialization error.
112    pub fn record_parse_error(&self) {
113        self.parse_errors.inc();
114    }
115
116    /// Records a detected sequence gap.
117    pub fn record_sequence_gap(&self) {
118        self.sequence_gaps.inc();
119    }
120
121    /// Sets the current number of connected clients (server-mode source).
122    #[allow(clippy::cast_possible_wrap)]
123    pub fn set_connected_clients(&self, n: u64) {
124        self.connected_clients.set(n as i64);
125    }
126
127    /// Converts to the SDK's [`ConnectorMetrics`].
128    #[must_use]
129    #[allow(clippy::cast_precision_loss)]
130    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
131        let mut m = ConnectorMetrics {
132            records_total: self.messages_received.get(),
133            bytes_total: self.bytes_received.get(),
134            errors_total: self.parse_errors.get(),
135            lag: 0,
136            custom: Vec::new(),
137        };
138        m.add_custom(
139            "ws.messages_dropped_backpressure",
140            self.messages_dropped_backpressure.get() as f64,
141        );
142        m.add_custom("ws.reconnect_count", self.reconnect_count.get() as f64);
143        m.add_custom("ws.sequence_gaps", self.sequence_gaps.get() as f64);
144        m.add_custom("ws.connected_clients", self.connected_clients.get() as f64);
145        m
146    }
147}
148
149impl Default for WebSocketSourceMetrics {
150    fn default() -> Self {
151        Self::new(None)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn test_initial_zeros() {
161        let m = WebSocketSourceMetrics::new(None);
162        let cm = m.to_connector_metrics();
163        assert_eq!(cm.records_total, 0);
164        assert_eq!(cm.bytes_total, 0);
165        assert_eq!(cm.errors_total, 0);
166    }
167
168    #[test]
169    fn test_record_message() {
170        let m = WebSocketSourceMetrics::new(None);
171        m.record_message(1024);
172        m.record_message(2048);
173
174        let cm = m.to_connector_metrics();
175        assert_eq!(cm.records_total, 2);
176        assert_eq!(cm.bytes_total, 3072);
177    }
178
179    #[test]
180    fn test_record_drop() {
181        let m = WebSocketSourceMetrics::new(None);
182        m.record_drop();
183        m.record_drop();
184        m.record_drop();
185
186        let cm = m.to_connector_metrics();
187        let dropped = cm
188            .custom
189            .iter()
190            .find(|(k, _)| k == "ws.messages_dropped_backpressure");
191        assert_eq!(dropped.unwrap().1, 3.0);
192    }
193
194    #[test]
195    fn test_record_reconnect() {
196        let m = WebSocketSourceMetrics::new(None);
197        m.record_reconnect();
198        m.record_reconnect();
199
200        let cm = m.to_connector_metrics();
201        let reconnects = cm.custom.iter().find(|(k, _)| k == "ws.reconnect_count");
202        assert_eq!(reconnects.unwrap().1, 2.0);
203    }
204
205    #[test]
206    fn test_record_parse_error() {
207        let m = WebSocketSourceMetrics::new(None);
208        m.record_parse_error();
209
210        let cm = m.to_connector_metrics();
211        assert_eq!(cm.errors_total, 1);
212    }
213
214    #[test]
215    fn test_record_sequence_gap() {
216        let m = WebSocketSourceMetrics::new(None);
217        m.record_sequence_gap();
218        m.record_sequence_gap();
219
220        let cm = m.to_connector_metrics();
221        let gaps = cm.custom.iter().find(|(k, _)| k == "ws.sequence_gaps");
222        assert_eq!(gaps.unwrap().1, 2.0);
223    }
224
225    #[test]
226    fn test_set_connected_clients() {
227        let m = WebSocketSourceMetrics::new(None);
228        m.set_connected_clients(5);
229
230        let cm = m.to_connector_metrics();
231        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
232        assert_eq!(clients.unwrap().1, 5.0);
233
234        // Verify it overwrites (not accumulates)
235        m.set_connected_clients(3);
236        let cm2 = m.to_connector_metrics();
237        let clients2 = cm2.custom.iter().find(|(k, _)| k == "ws.connected_clients");
238        assert_eq!(clients2.unwrap().1, 3.0);
239    }
240
241    #[test]
242    fn test_default() {
243        let m = WebSocketSourceMetrics::default();
244        let cm = m.to_connector_metrics();
245        assert_eq!(cm.records_total, 0);
246        assert_eq!(cm.bytes_total, 0);
247        assert_eq!(cm.errors_total, 0);
248        assert_eq!(cm.custom.len(), 4);
249    }
250
251    #[test]
252    fn test_custom_metrics_count() {
253        let m = WebSocketSourceMetrics::new(None);
254        let cm = m.to_connector_metrics();
255        // Should have exactly 4 custom metrics
256        assert_eq!(cm.custom.len(), 4);
257    }
258
259    #[test]
260    fn test_combined_operations() {
261        let m = WebSocketSourceMetrics::new(None);
262        m.record_message(100);
263        m.record_message(200);
264        m.record_drop();
265        m.record_reconnect();
266        m.record_parse_error();
267        m.record_sequence_gap();
268        m.set_connected_clients(10);
269
270        let cm = m.to_connector_metrics();
271        assert_eq!(cm.records_total, 2);
272        assert_eq!(cm.bytes_total, 300);
273        assert_eq!(cm.errors_total, 1);
274    }
275}