Skip to main content

laminar_connectors/websocket/
sink_metrics.rs

1//! WebSocket sink connector metrics.
2//!
3//! [`WebSocketSinkMetrics`] provides prometheus-backed counters and gauges
4//! for tracking WebSocket sink statistics.
5
6use prometheus::{IntCounter, IntGauge, Registry};
7
8use crate::prom::reg_or_local;
9
10/// Prometheus-backed counters/gauges for WebSocket sink connector statistics.
11#[derive(Debug, Clone)]
12pub struct WebSocketSinkMetrics {
13    /// Total messages sent to connected clients.
14    pub messages_sent: IntCounter,
15    /// Total messages dropped because a client was too slow.
16    pub messages_dropped_slow_client: IntCounter,
17    /// Total bytes sent (serialized payload).
18    pub bytes_sent: IntCounter,
19    /// Current number of connected clients.
20    pub connected_clients: IntGauge,
21    /// Total client disconnection events.
22    pub client_disconnects: IntCounter,
23    /// Total replay requests received from clients.
24    pub replay_requests: IntCounter,
25    /// Total clients disconnected due to ping timeout.
26    pub ping_timeouts: IntCounter,
27}
28
29impl WebSocketSinkMetrics {
30    /// Creates a new metrics instance with all counters at zero.
31    #[must_use]
32    #[allow(clippy::missing_panics_doc)]
33    pub fn new(registry: Option<&Registry>) -> Self {
34        let mut local = None;
35        let reg = reg_or_local(registry, &mut local);
36
37        Self {
38            messages_sent: reg.counter("ws_sink_messages_sent_total", "Total WS messages sent"),
39            messages_dropped_slow_client: reg.counter(
40                "ws_sink_messages_dropped_slow_client_total",
41                "Total WS messages dropped (slow client)",
42            ),
43            bytes_sent: reg.counter("ws_sink_bytes_sent_total", "Total WS bytes sent"),
44            connected_clients: reg
45                .gauge("ws_sink_connected_clients", "Current connected WS clients"),
46            client_disconnects: reg.counter(
47                "ws_sink_client_disconnects_total",
48                "Total WS client disconnections",
49            ),
50            replay_requests: reg
51                .counter("ws_sink_replay_requests_total", "Total WS replay requests"),
52            ping_timeouts: reg.counter("ws_sink_ping_timeouts_total", "Total WS ping timeouts"),
53        }
54    }
55
56    /// Records a successfully sent message with the given payload size.
57    pub fn record_send(&self, bytes: u64) {
58        self.messages_sent.inc();
59        self.bytes_sent.inc_by(bytes);
60    }
61
62    /// Records a message dropped due to a slow client.
63    pub fn record_drop(&self) {
64        self.messages_dropped_slow_client.inc();
65    }
66
67    /// Records a new client connection.
68    pub fn record_connect(&self) {
69        self.connected_clients.inc();
70    }
71
72    /// Records a client disconnection.
73    pub fn record_disconnect(&self) {
74        self.client_disconnects.inc();
75        // Saturating subtract to avoid underflow on spurious disconnect events.
76        // IntGauge can go negative so we clamp manually.
77        let current = self.connected_clients.get();
78        if current > 0 {
79            self.connected_clients.dec();
80        }
81    }
82
83    /// Records a replay request from a client.
84    pub fn record_replay(&self) {
85        self.replay_requests.inc();
86    }
87
88    /// Records a client disconnected due to ping timeout.
89    pub fn record_ping_timeout(&self) {
90        self.ping_timeouts.inc();
91    }
92}
93
94impl Default for WebSocketSinkMetrics {
95    fn default() -> Self {
96        Self::new(None)
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn test_initial_zeros() {
106        let m = WebSocketSinkMetrics::new(None);
107        assert_eq!(m.messages_sent.get(), 0);
108        assert_eq!(m.bytes_sent.get(), 0);
109        assert_eq!(m.messages_dropped_slow_client.get(), 0);
110    }
111
112    #[test]
113    fn test_record_send() {
114        let m = WebSocketSinkMetrics::new(None);
115        m.record_send(512);
116        m.record_send(1024);
117
118        assert_eq!(m.messages_sent.get(), 2);
119        assert_eq!(m.bytes_sent.get(), 1536);
120    }
121
122    #[test]
123    fn test_record_drop() {
124        let m = WebSocketSinkMetrics::new(None);
125        m.record_drop();
126        m.record_drop();
127
128        assert_eq!(m.messages_dropped_slow_client.get(), 2);
129    }
130
131    #[test]
132    fn test_record_connect_disconnect() {
133        let m = WebSocketSinkMetrics::new(None);
134        m.record_connect();
135        m.record_connect();
136        m.record_connect();
137        m.record_disconnect();
138
139        assert_eq!(m.connected_clients.get(), 2);
140        assert_eq!(m.client_disconnects.get(), 1);
141    }
142
143    #[test]
144    fn test_disconnect_saturates_at_zero() {
145        let m = WebSocketSinkMetrics::new(None);
146        // Disconnect without any connect should not underflow
147        m.record_disconnect();
148
149        assert_eq!(m.connected_clients.get(), 0);
150        assert_eq!(m.client_disconnects.get(), 1);
151    }
152
153    #[test]
154    fn test_record_replay() {
155        let m = WebSocketSinkMetrics::new(None);
156        m.record_replay();
157        m.record_replay();
158        m.record_replay();
159
160        assert_eq!(m.replay_requests.get(), 3);
161    }
162
163    #[test]
164    fn test_default() {
165        let m = WebSocketSinkMetrics::default();
166        assert_eq!(m.messages_sent.get(), 0);
167        assert_eq!(m.bytes_sent.get(), 0);
168        assert_eq!(m.messages_dropped_slow_client.get(), 0);
169    }
170
171    #[test]
172    fn test_combined_operations() {
173        let m = WebSocketSinkMetrics::new(None);
174        m.record_send(100);
175        m.record_send(200);
176        m.record_send(300);
177        m.record_drop();
178        m.record_connect();
179        m.record_connect();
180        m.record_disconnect();
181        m.record_replay();
182
183        assert_eq!(m.messages_sent.get(), 3);
184        assert_eq!(m.bytes_sent.get(), 600);
185        assert_eq!(m.messages_dropped_slow_client.get(), 1);
186        assert_eq!(m.connected_clients.get(), 1);
187    }
188}