laminar_connectors/websocket/
sink_metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug, Clone)]
13pub struct WebSocketSinkMetrics {
14 pub messages_sent: IntCounter,
16 pub messages_dropped_slow_client: IntCounter,
18 pub bytes_sent: IntCounter,
20 pub connected_clients: IntGauge,
22 pub client_disconnects: IntCounter,
24 pub replay_requests: IntCounter,
26 pub ping_timeouts: IntCounter,
28}
29
30impl WebSocketSinkMetrics {
31 #[must_use]
33 #[allow(clippy::missing_panics_doc)]
34 pub fn new(registry: Option<&Registry>) -> Self {
35 let local;
36 let reg = if let Some(r) = registry {
37 r
38 } else {
39 local = Registry::new();
40 &local
41 };
42
43 let messages_sent =
44 IntCounter::new("ws_sink_messages_sent_total", "Total WS messages sent").unwrap();
45 let messages_dropped_slow_client = IntCounter::new(
46 "ws_sink_messages_dropped_slow_client_total",
47 "Total WS messages dropped (slow client)",
48 )
49 .unwrap();
50 let bytes_sent =
51 IntCounter::new("ws_sink_bytes_sent_total", "Total WS bytes sent").unwrap();
52 let connected_clients =
53 IntGauge::new("ws_sink_connected_clients", "Current connected WS clients").unwrap();
54 let client_disconnects = IntCounter::new(
55 "ws_sink_client_disconnects_total",
56 "Total WS client disconnections",
57 )
58 .unwrap();
59 let replay_requests =
60 IntCounter::new("ws_sink_replay_requests_total", "Total WS replay requests").unwrap();
61 let ping_timeouts =
62 IntCounter::new("ws_sink_ping_timeouts_total", "Total WS ping timeouts").unwrap();
63
64 let _ = reg.register(Box::new(messages_sent.clone()));
65 let _ = reg.register(Box::new(messages_dropped_slow_client.clone()));
66 let _ = reg.register(Box::new(bytes_sent.clone()));
67 let _ = reg.register(Box::new(connected_clients.clone()));
68 let _ = reg.register(Box::new(client_disconnects.clone()));
69 let _ = reg.register(Box::new(replay_requests.clone()));
70 let _ = reg.register(Box::new(ping_timeouts.clone()));
71
72 Self {
73 messages_sent,
74 messages_dropped_slow_client,
75 bytes_sent,
76 connected_clients,
77 client_disconnects,
78 replay_requests,
79 ping_timeouts,
80 }
81 }
82
83 pub fn record_send(&self, bytes: u64) {
85 self.messages_sent.inc();
86 self.bytes_sent.inc_by(bytes);
87 }
88
89 pub fn record_drop(&self) {
91 self.messages_dropped_slow_client.inc();
92 }
93
94 pub fn record_connect(&self) {
96 self.connected_clients.inc();
97 }
98
99 pub fn record_disconnect(&self) {
101 self.client_disconnects.inc();
102 let current = self.connected_clients.get();
105 if current > 0 {
106 self.connected_clients.dec();
107 }
108 }
109
110 pub fn record_replay(&self) {
112 self.replay_requests.inc();
113 }
114
115 pub fn record_ping_timeout(&self) {
117 self.ping_timeouts.inc();
118 }
119
120 #[must_use]
122 #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
123 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
124 let mut m = ConnectorMetrics {
125 records_total: self.messages_sent.get(),
126 bytes_total: self.bytes_sent.get(),
127 errors_total: self.messages_dropped_slow_client.get(),
128 lag: 0,
129 custom: Vec::new(),
130 };
131 m.add_custom("ws.connected_clients", self.connected_clients.get() as f64);
132 m.add_custom(
133 "ws.client_disconnects",
134 self.client_disconnects.get() as f64,
135 );
136 m.add_custom("ws.replay_requests", self.replay_requests.get() as f64);
137 m.add_custom(
138 "ws.messages_dropped_slow_client",
139 self.messages_dropped_slow_client.get() as f64,
140 );
141 m.add_custom("ws.ping_timeouts", self.ping_timeouts.get() as f64);
142 m
143 }
144}
145
146impl Default for WebSocketSinkMetrics {
147 fn default() -> Self {
148 Self::new(None)
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[test]
157 fn test_initial_zeros() {
158 let m = WebSocketSinkMetrics::new(None);
159 let cm = m.to_connector_metrics();
160 assert_eq!(cm.records_total, 0);
161 assert_eq!(cm.bytes_total, 0);
162 assert_eq!(cm.errors_total, 0);
163 }
164
165 #[test]
166 fn test_record_send() {
167 let m = WebSocketSinkMetrics::new(None);
168 m.record_send(512);
169 m.record_send(1024);
170
171 let cm = m.to_connector_metrics();
172 assert_eq!(cm.records_total, 2);
173 assert_eq!(cm.bytes_total, 1536);
174 }
175
176 #[test]
177 fn test_record_drop() {
178 let m = WebSocketSinkMetrics::new(None);
179 m.record_drop();
180 m.record_drop();
181
182 let cm = m.to_connector_metrics();
183 assert_eq!(cm.errors_total, 2);
184
185 let dropped = cm
186 .custom
187 .iter()
188 .find(|(k, _)| k == "ws.messages_dropped_slow_client");
189 assert_eq!(dropped.unwrap().1, 2.0);
190 }
191
192 #[test]
193 fn test_record_connect_disconnect() {
194 let m = WebSocketSinkMetrics::new(None);
195 m.record_connect();
196 m.record_connect();
197 m.record_connect();
198 m.record_disconnect();
199
200 let cm = m.to_connector_metrics();
201 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
202 assert_eq!(clients.unwrap().1, 2.0);
203
204 let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
205 assert_eq!(disconnects.unwrap().1, 1.0);
206 }
207
208 #[test]
209 fn test_disconnect_saturates_at_zero() {
210 let m = WebSocketSinkMetrics::new(None);
211 m.record_disconnect();
213
214 let cm = m.to_connector_metrics();
215 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
216 assert_eq!(clients.unwrap().1, 0.0);
217
218 let disconnects = cm.custom.iter().find(|(k, _)| k == "ws.client_disconnects");
219 assert_eq!(disconnects.unwrap().1, 1.0);
220 }
221
222 #[test]
223 fn test_record_replay() {
224 let m = WebSocketSinkMetrics::new(None);
225 m.record_replay();
226 m.record_replay();
227 m.record_replay();
228
229 let cm = m.to_connector_metrics();
230 let replays = cm.custom.iter().find(|(k, _)| k == "ws.replay_requests");
231 assert_eq!(replays.unwrap().1, 3.0);
232 }
233
234 #[test]
235 fn test_default() {
236 let m = WebSocketSinkMetrics::default();
237 let cm = m.to_connector_metrics();
238 assert_eq!(cm.records_total, 0);
239 assert_eq!(cm.bytes_total, 0);
240 assert_eq!(cm.errors_total, 0);
241 assert_eq!(cm.custom.len(), 5);
242 }
243
244 #[test]
245 fn test_custom_metrics_count() {
246 let m = WebSocketSinkMetrics::new(None);
247 let cm = m.to_connector_metrics();
248 assert_eq!(cm.custom.len(), 5);
250 }
251
252 #[test]
253 fn test_combined_operations() {
254 let m = WebSocketSinkMetrics::new(None);
255 m.record_send(100);
256 m.record_send(200);
257 m.record_send(300);
258 m.record_drop();
259 m.record_connect();
260 m.record_connect();
261 m.record_disconnect();
262 m.record_replay();
263
264 let cm = m.to_connector_metrics();
265 assert_eq!(cm.records_total, 3);
266 assert_eq!(cm.bytes_total, 600);
267 assert_eq!(cm.errors_total, 1);
268
269 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
270 assert_eq!(clients.unwrap().1, 1.0);
271 }
272}