laminar_connectors/websocket/
sink_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug)]
17pub struct WebSocketSinkMetrics {
18 pub messages_sent: AtomicU64,
20 pub messages_dropped_slow_client: AtomicU64,
22 pub bytes_sent: AtomicU64,
24 pub connected_clients: AtomicU64,
26 pub client_disconnects: AtomicU64,
28 pub replay_requests: AtomicU64,
30 pub ping_timeouts: AtomicU64,
32}
33
34impl WebSocketSinkMetrics {
35 #[must_use]
37 pub fn new() -> Self {
38 Self {
39 messages_sent: AtomicU64::new(0),
40 messages_dropped_slow_client: AtomicU64::new(0),
41 bytes_sent: AtomicU64::new(0),
42 connected_clients: AtomicU64::new(0),
43 client_disconnects: AtomicU64::new(0),
44 replay_requests: AtomicU64::new(0),
45 ping_timeouts: AtomicU64::new(0),
46 }
47 }
48
49 pub fn record_send(&self, bytes: u64) {
51 self.messages_sent.fetch_add(1, Ordering::Relaxed);
52 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
53 }
54
55 pub fn record_drop(&self) {
57 self.messages_dropped_slow_client
58 .fetch_add(1, Ordering::Relaxed);
59 }
60
61 pub fn record_connect(&self) {
63 self.connected_clients.fetch_add(1, Ordering::Relaxed);
64 }
65
66 pub fn record_disconnect(&self) {
68 self.client_disconnects.fetch_add(1, Ordering::Relaxed);
69 self.connected_clients
71 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
72 Some(v.saturating_sub(1))
73 })
74 .ok();
75 }
76
77 pub fn record_replay(&self) {
79 self.replay_requests.fetch_add(1, Ordering::Relaxed);
80 }
81
82 pub fn record_ping_timeout(&self) {
84 self.ping_timeouts.fetch_add(1, Ordering::Relaxed);
85 }
86
87 #[must_use]
89 #[allow(clippy::cast_precision_loss)]
90 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
91 let mut m = ConnectorMetrics {
92 records_total: self.messages_sent.load(Ordering::Relaxed),
93 bytes_total: self.bytes_sent.load(Ordering::Relaxed),
94 errors_total: self.messages_dropped_slow_client.load(Ordering::Relaxed),
95 lag: 0,
96 custom: Vec::new(),
97 };
98 m.add_custom(
99 "ws.connected_clients",
100 self.connected_clients.load(Ordering::Relaxed) as f64,
101 );
102 m.add_custom(
103 "ws.client_disconnects",
104 self.client_disconnects.load(Ordering::Relaxed) as f64,
105 );
106 m.add_custom(
107 "ws.replay_requests",
108 self.replay_requests.load(Ordering::Relaxed) as f64,
109 );
110 m.add_custom(
111 "ws.messages_dropped_slow_client",
112 self.messages_dropped_slow_client.load(Ordering::Relaxed) as f64,
113 );
114 m.add_custom(
115 "ws.ping_timeouts",
116 self.ping_timeouts.load(Ordering::Relaxed) as f64,
117 );
118 m
119 }
120}
121
122impl Default for WebSocketSinkMetrics {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn test_initial_zeros() {
134 let m = WebSocketSinkMetrics::new();
135 let cm = m.to_connector_metrics();
136 assert_eq!(cm.records_total, 0);
137 assert_eq!(cm.bytes_total, 0);
138 assert_eq!(cm.errors_total, 0);
139 }
140
141 #[test]
142 fn test_record_send() {
143 let m = WebSocketSinkMetrics::new();
144 m.record_send(512);
145 m.record_send(1024);
146
147 let cm = m.to_connector_metrics();
148 assert_eq!(cm.records_total, 2);
149 assert_eq!(cm.bytes_total, 1536);
150 }
151
152 #[test]
153 fn test_record_drop() {
154 let m = WebSocketSinkMetrics::new();
155 m.record_drop();
156 m.record_drop();
157
158 let cm = m.to_connector_metrics();
159 assert_eq!(cm.errors_total, 2);
160
161 let dropped = cm
162 .custom
163 .iter()
164 .find(|(k, _)| k == "ws.messages_dropped_slow_client");
165 assert_eq!(dropped.unwrap().1, 2.0);
166 }
167
168 #[test]
169 fn test_record_connect_disconnect() {
170 let m = WebSocketSinkMetrics::new();
171 m.record_connect();
172 m.record_connect();
173 m.record_connect();
174 m.record_disconnect();
175
176 let cm = m.to_connector_metrics();
177 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
178 assert_eq!(clients.unwrap().1, 2.0);
179
180 let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
181 assert_eq!(disconnects.unwrap().1, 1.0);
182 }
183
184 #[test]
185 fn test_disconnect_saturates_at_zero() {
186 let m = WebSocketSinkMetrics::new();
187 m.record_disconnect();
189
190 let cm = m.to_connector_metrics();
191 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
192 assert_eq!(clients.unwrap().1, 0.0);
193
194 let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
195 assert_eq!(disconnects.unwrap().1, 1.0);
196 }
197
198 #[test]
199 fn test_record_replay() {
200 let m = WebSocketSinkMetrics::new();
201 m.record_replay();
202 m.record_replay();
203 m.record_replay();
204
205 let cm = m.to_connector_metrics();
206 let replays = cm.custom.iter().find(|(k, _)| k == "ws.replay_requests");
207 assert_eq!(replays.unwrap().1, 3.0);
208 }
209
210 #[test]
211 fn test_default() {
212 let m = WebSocketSinkMetrics::default();
213 let cm = m.to_connector_metrics();
214 assert_eq!(cm.records_total, 0);
215 assert_eq!(cm.bytes_total, 0);
216 assert_eq!(cm.errors_total, 0);
217 assert_eq!(cm.custom.len(), 5);
218 }
219
220 #[test]
221 fn test_custom_metrics_count() {
222 let m = WebSocketSinkMetrics::new();
223 let cm = m.to_connector_metrics();
224 assert_eq!(cm.custom.len(), 5);
226 }
227
228 #[test]
229 fn test_combined_operations() {
230 let m = WebSocketSinkMetrics::new();
231 m.record_send(100);
232 m.record_send(200);
233 m.record_send(300);
234 m.record_drop();
235 m.record_connect();
236 m.record_connect();
237 m.record_disconnect();
238 m.record_replay();
239
240 let cm = m.to_connector_metrics();
241 assert_eq!(cm.records_total, 3);
242 assert_eq!(cm.bytes_total, 600);
243 assert_eq!(cm.errors_total, 1);
244
245 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
246 assert_eq!(clients.unwrap().1, 1.0);
247 }
248}