Skip to main content

laminar_connectors/websocket/
sink_metrics.rs

1//! WebSocket sink connector metrics.
2//!
3//! [`WebSocketSinkMetrics`] provides lock-free atomic counters for
4//! tracking WebSocket sink statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Atomic counters for WebSocket sink connector statistics.
12///
13/// All counters use `Relaxed` ordering for maximum throughput on the
14/// hot path. Snapshot reads via [`to_connector_metrics`](Self::to_connector_metrics)
15/// provide a consistent-enough view for monitoring purposes.
16#[derive(Debug)]
17pub struct WebSocketSinkMetrics {
18    /// Total messages sent to connected clients.
19    pub messages_sent: AtomicU64,
20    /// Total messages dropped because a client was too slow.
21    pub messages_dropped_slow_client: AtomicU64,
22    /// Total bytes sent (serialized payload).
23    pub bytes_sent: AtomicU64,
24    /// Current number of connected clients.
25    pub connected_clients: AtomicU64,
26    /// Total client disconnection events.
27    pub client_disconnects: AtomicU64,
28    /// Total replay requests received from clients.
29    pub replay_requests: AtomicU64,
30    /// Total clients disconnected due to ping timeout.
31    pub ping_timeouts: AtomicU64,
32}
33
34impl WebSocketSinkMetrics {
35    /// Creates a new metrics instance with all counters at zero.
36    #[must_use]
37    pub fn new() -> Self {
38        Self {
39            messages_sent: AtomicU64::new(0),
40            messages_dropped_slow_client: AtomicU64::new(0),
41            bytes_sent: AtomicU64::new(0),
42            connected_clients: AtomicU64::new(0),
43            client_disconnects: AtomicU64::new(0),
44            replay_requests: AtomicU64::new(0),
45            ping_timeouts: AtomicU64::new(0),
46        }
47    }
48
49    /// Records a successfully sent message with the given payload size.
50    pub fn record_send(&self, bytes: u64) {
51        self.messages_sent.fetch_add(1, Ordering::Relaxed);
52        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
53    }
54
55    /// Records a message dropped due to a slow client.
56    pub fn record_drop(&self) {
57        self.messages_dropped_slow_client
58            .fetch_add(1, Ordering::Relaxed);
59    }
60
61    /// Records a new client connection.
62    pub fn record_connect(&self) {
63        self.connected_clients.fetch_add(1, Ordering::Relaxed);
64    }
65
66    /// Records a client disconnection.
67    pub fn record_disconnect(&self) {
68        self.client_disconnects.fetch_add(1, Ordering::Relaxed);
69        // Saturating subtract to avoid underflow on spurious disconnect events.
70        self.connected_clients
71            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
72                Some(v.saturating_sub(1))
73            })
74            .ok();
75    }
76
77    /// Records a replay request from a client.
78    pub fn record_replay(&self) {
79        self.replay_requests.fetch_add(1, Ordering::Relaxed);
80    }
81
82    /// Records a client disconnected due to ping timeout.
83    pub fn record_ping_timeout(&self) {
84        self.ping_timeouts.fetch_add(1, Ordering::Relaxed);
85    }
86
87    /// Converts to the SDK's [`ConnectorMetrics`].
88    #[must_use]
89    #[allow(clippy::cast_precision_loss)]
90    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
91        let mut m = ConnectorMetrics {
92            records_total: self.messages_sent.load(Ordering::Relaxed),
93            bytes_total: self.bytes_sent.load(Ordering::Relaxed),
94            errors_total: self.messages_dropped_slow_client.load(Ordering::Relaxed),
95            lag: 0,
96            custom: Vec::new(),
97        };
98        m.add_custom(
99            "ws.connected_clients",
100            self.connected_clients.load(Ordering::Relaxed) as f64,
101        );
102        m.add_custom(
103            "ws.client_disconnects",
104            self.client_disconnects.load(Ordering::Relaxed) as f64,
105        );
106        m.add_custom(
107            "ws.replay_requests",
108            self.replay_requests.load(Ordering::Relaxed) as f64,
109        );
110        m.add_custom(
111            "ws.messages_dropped_slow_client",
112            self.messages_dropped_slow_client.load(Ordering::Relaxed) as f64,
113        );
114        m.add_custom(
115            "ws.ping_timeouts",
116            self.ping_timeouts.load(Ordering::Relaxed) as f64,
117        );
118        m
119    }
120}
121
122impl Default for WebSocketSinkMetrics {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn test_initial_zeros() {
134        let m = WebSocketSinkMetrics::new();
135        let cm = m.to_connector_metrics();
136        assert_eq!(cm.records_total, 0);
137        assert_eq!(cm.bytes_total, 0);
138        assert_eq!(cm.errors_total, 0);
139    }
140
141    #[test]
142    fn test_record_send() {
143        let m = WebSocketSinkMetrics::new();
144        m.record_send(512);
145        m.record_send(1024);
146
147        let cm = m.to_connector_metrics();
148        assert_eq!(cm.records_total, 2);
149        assert_eq!(cm.bytes_total, 1536);
150    }
151
152    #[test]
153    fn test_record_drop() {
154        let m = WebSocketSinkMetrics::new();
155        m.record_drop();
156        m.record_drop();
157
158        let cm = m.to_connector_metrics();
159        assert_eq!(cm.errors_total, 2);
160
161        let dropped = cm
162            .custom
163            .iter()
164            .find(|(k, _)| k == "ws.messages_dropped_slow_client");
165        assert_eq!(dropped.unwrap().1, 2.0);
166    }
167
168    #[test]
169    fn test_record_connect_disconnect() {
170        let m = WebSocketSinkMetrics::new();
171        m.record_connect();
172        m.record_connect();
173        m.record_connect();
174        m.record_disconnect();
175
176        let cm = m.to_connector_metrics();
177        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
178        assert_eq!(clients.unwrap().1, 2.0);
179
180        let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
181        assert_eq!(disconnects.unwrap().1, 1.0);
182    }
183
184    #[test]
185    fn test_disconnect_saturates_at_zero() {
186        let m = WebSocketSinkMetrics::new();
187        // Disconnect without any connect should not underflow
188        m.record_disconnect();
189
190        let cm = m.to_connector_metrics();
191        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
192        assert_eq!(clients.unwrap().1, 0.0);
193
194        let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
195        assert_eq!(disconnects.unwrap().1, 1.0);
196    }
197
198    #[test]
199    fn test_record_replay() {
200        let m = WebSocketSinkMetrics::new();
201        m.record_replay();
202        m.record_replay();
203        m.record_replay();
204
205        let cm = m.to_connector_metrics();
206        let replays = cm.custom.iter().find(|(k, _)| k == "ws.replay_requests");
207        assert_eq!(replays.unwrap().1, 3.0);
208    }
209
210    #[test]
211    fn test_default() {
212        let m = WebSocketSinkMetrics::default();
213        let cm = m.to_connector_metrics();
214        assert_eq!(cm.records_total, 0);
215        assert_eq!(cm.bytes_total, 0);
216        assert_eq!(cm.errors_total, 0);
217        assert_eq!(cm.custom.len(), 5);
218    }
219
220    #[test]
221    fn test_custom_metrics_count() {
222        let m = WebSocketSinkMetrics::new();
223        let cm = m.to_connector_metrics();
224        // Should have exactly 4 custom metrics
225        assert_eq!(cm.custom.len(), 5);
226    }
227
228    #[test]
229    fn test_combined_operations() {
230        let m = WebSocketSinkMetrics::new();
231        m.record_send(100);
232        m.record_send(200);
233        m.record_send(300);
234        m.record_drop();
235        m.record_connect();
236        m.record_connect();
237        m.record_disconnect();
238        m.record_replay();
239
240        let cm = m.to_connector_metrics();
241        assert_eq!(cm.records_total, 3);
242        assert_eq!(cm.bytes_total, 600);
243        assert_eq!(cm.errors_total, 1);
244
245        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
246        assert_eq!(clients.unwrap().1, 1.0);
247    }
248}