Skip to main content

laminar_connectors/websocket/
sink_metrics.rs

1//! WebSocket sink connector metrics.
2//!
3//! [`WebSocketSinkMetrics`] provides prometheus-backed counters and gauges
4//! for tracking WebSocket sink statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use prometheus::{IntCounter, IntGauge, Registry};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Prometheus-backed counters/gauges for WebSocket sink connector statistics.
12#[derive(Debug, Clone)]
13pub struct WebSocketSinkMetrics {
14    /// Total messages sent to connected clients.
15    pub messages_sent: IntCounter,
16    /// Total messages dropped because a client was too slow.
17    pub messages_dropped_slow_client: IntCounter,
18    /// Total bytes sent (serialized payload).
19    pub bytes_sent: IntCounter,
20    /// Current number of connected clients.
21    pub connected_clients: IntGauge,
22    /// Total client disconnection events.
23    pub client_disconnects: IntCounter,
24    /// Total replay requests received from clients.
25    pub replay_requests: IntCounter,
26    /// Total clients disconnected due to ping timeout.
27    pub ping_timeouts: IntCounter,
28}
29
30impl WebSocketSinkMetrics {
31    /// Creates a new metrics instance with all counters at zero.
32    #[must_use]
33    #[allow(clippy::missing_panics_doc)]
34    pub fn new(registry: Option<&Registry>) -> Self {
35        let local;
36        let reg = if let Some(r) = registry {
37            r
38        } else {
39            local = Registry::new();
40            &local
41        };
42
43        let messages_sent =
44            IntCounter::new("ws_sink_messages_sent_total", "Total WS messages sent").unwrap();
45        let messages_dropped_slow_client = IntCounter::new(
46            "ws_sink_messages_dropped_slow_client_total",
47            "Total WS messages dropped (slow client)",
48        )
49        .unwrap();
50        let bytes_sent =
51            IntCounter::new("ws_sink_bytes_sent_total", "Total WS bytes sent").unwrap();
52        let connected_clients =
53            IntGauge::new("ws_sink_connected_clients", "Current connected WS clients").unwrap();
54        let client_disconnects = IntCounter::new(
55            "ws_sink_client_disconnects_total",
56            "Total WS client disconnections",
57        )
58        .unwrap();
59        let replay_requests =
60            IntCounter::new("ws_sink_replay_requests_total", "Total WS replay requests").unwrap();
61        let ping_timeouts =
62            IntCounter::new("ws_sink_ping_timeouts_total", "Total WS ping timeouts").unwrap();
63
64        let _ = reg.register(Box::new(messages_sent.clone()));
65        let _ = reg.register(Box::new(messages_dropped_slow_client.clone()));
66        let _ = reg.register(Box::new(bytes_sent.clone()));
67        let _ = reg.register(Box::new(connected_clients.clone()));
68        let _ = reg.register(Box::new(client_disconnects.clone()));
69        let _ = reg.register(Box::new(replay_requests.clone()));
70        let _ = reg.register(Box::new(ping_timeouts.clone()));
71
72        Self {
73            messages_sent,
74            messages_dropped_slow_client,
75            bytes_sent,
76            connected_clients,
77            client_disconnects,
78            replay_requests,
79            ping_timeouts,
80        }
81    }
82
83    /// Records a successfully sent message with the given payload size.
84    pub fn record_send(&self, bytes: u64) {
85        self.messages_sent.inc();
86        self.bytes_sent.inc_by(bytes);
87    }
88
89    /// Records a message dropped due to a slow client.
90    pub fn record_drop(&self) {
91        self.messages_dropped_slow_client.inc();
92    }
93
94    /// Records a new client connection.
95    pub fn record_connect(&self) {
96        self.connected_clients.inc();
97    }
98
99    /// Records a client disconnection.
100    pub fn record_disconnect(&self) {
101        self.client_disconnects.inc();
102        // Saturating subtract to avoid underflow on spurious disconnect events.
103        // IntGauge can go negative so we clamp manually.
104        let current = self.connected_clients.get();
105        if current > 0 {
106            self.connected_clients.dec();
107        }
108    }
109
110    /// Records a replay request from a client.
111    pub fn record_replay(&self) {
112        self.replay_requests.inc();
113    }
114
115    /// Records a client disconnected due to ping timeout.
116    pub fn record_ping_timeout(&self) {
117        self.ping_timeouts.inc();
118    }
119
120    /// Converts to the SDK's [`ConnectorMetrics`].
121    #[must_use]
122    #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
123    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
124        let mut m = ConnectorMetrics {
125            records_total: self.messages_sent.get(),
126            bytes_total: self.bytes_sent.get(),
127            errors_total: self.messages_dropped_slow_client.get(),
128            lag: 0,
129            custom: Vec::new(),
130        };
131        m.add_custom("ws.connected_clients", self.connected_clients.get() as f64);
132        m.add_custom(
133            "ws.client_disconnects",
134            self.client_disconnects.get() as f64,
135        );
136        m.add_custom("ws.replay_requests", self.replay_requests.get() as f64);
137        m.add_custom(
138            "ws.messages_dropped_slow_client",
139            self.messages_dropped_slow_client.get() as f64,
140        );
141        m.add_custom("ws.ping_timeouts", self.ping_timeouts.get() as f64);
142        m
143    }
144}
145
146impl Default for WebSocketSinkMetrics {
147    fn default() -> Self {
148        Self::new(None)
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[test]
157    fn test_initial_zeros() {
158        let m = WebSocketSinkMetrics::new(None);
159        let cm = m.to_connector_metrics();
160        assert_eq!(cm.records_total, 0);
161        assert_eq!(cm.bytes_total, 0);
162        assert_eq!(cm.errors_total, 0);
163    }
164
165    #[test]
166    fn test_record_send() {
167        let m = WebSocketSinkMetrics::new(None);
168        m.record_send(512);
169        m.record_send(1024);
170
171        let cm = m.to_connector_metrics();
172        assert_eq!(cm.records_total, 2);
173        assert_eq!(cm.bytes_total, 1536);
174    }
175
176    #[test]
177    fn test_record_drop() {
178        let m = WebSocketSinkMetrics::new(None);
179        m.record_drop();
180        m.record_drop();
181
182        let cm = m.to_connector_metrics();
183        assert_eq!(cm.errors_total, 2);
184
185        let dropped = cm
186            .custom
187            .iter()
188            .find(|(k, _)| k == "ws.messages_dropped_slow_client");
189        assert_eq!(dropped.unwrap().1, 2.0);
190    }
191
192    #[test]
193    fn test_record_connect_disconnect() {
194        let m = WebSocketSinkMetrics::new(None);
195        m.record_connect();
196        m.record_connect();
197        m.record_connect();
198        m.record_disconnect();
199
200        let cm = m.to_connector_metrics();
201        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
202        assert_eq!(clients.unwrap().1, 2.0);
203
204        let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
205        assert_eq!(disconnects.unwrap().1, 1.0);
206    }
207
208    #[test]
209    fn test_disconnect_saturates_at_zero() {
210        let m = WebSocketSinkMetrics::new(None);
211        // Disconnect without any connect should not underflow
212        m.record_disconnect();
213
214        let cm = m.to_connector_metrics();
215        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
216        assert_eq!(clients.unwrap().1, 0.0);
217
218        let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
219        assert_eq!(disconnects.unwrap().1, 1.0);
220    }
221
222    #[test]
223    fn test_record_replay() {
224        let m = WebSocketSinkMetrics::new(None);
225        m.record_replay();
226        m.record_replay();
227        m.record_replay();
228
229        let cm = m.to_connector_metrics();
230        let replays = cm.custom.iter().find(|(k, _)| k == "ws.replay_requests");
231        assert_eq!(replays.unwrap().1, 3.0);
232    }
233
234    #[test]
235    fn test_default() {
236        let m = WebSocketSinkMetrics::default();
237        let cm = m.to_connector_metrics();
238        assert_eq!(cm.records_total, 0);
239        assert_eq!(cm.bytes_total, 0);
240        assert_eq!(cm.errors_total, 0);
241        assert_eq!(cm.custom.len(), 5);
242    }
243
244    #[test]
245    fn test_custom_metrics_count() {
246        let m = WebSocketSinkMetrics::new(None);
247        let cm = m.to_connector_metrics();
248        // Should have exactly 5 custom metrics
249        assert_eq!(cm.custom.len(), 5);
250    }
251
252    #[test]
253    fn test_combined_operations() {
254        let m = WebSocketSinkMetrics::new(None);
255        m.record_send(100);
256        m.record_send(200);
257        m.record_send(300);
258        m.record_drop();
259        m.record_connect();
260        m.record_connect();
261        m.record_disconnect();
262        m.record_replay();
263
264        let cm = m.to_connector_metrics();
265        assert_eq!(cm.records_total, 3);
266        assert_eq!(cm.bytes_total, 600);
267        assert_eq!(cm.errors_total, 1);
268
269        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
270        assert_eq!(clients.unwrap().1, 1.0);
271    }
272}