laminar_connectors/websocket/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug, Clone)]
13pub struct WebSocketSourceMetrics {
14 pub messages_received: IntCounter,
16 pub messages_dropped_backpressure: IntCounter,
18 pub bytes_received: IntCounter,
20 pub reconnect_count: IntCounter,
22 pub parse_errors: IntCounter,
24 pub sequence_gaps: IntCounter,
26 pub connected_clients: IntGauge,
28}
29
30impl WebSocketSourceMetrics {
31 #[must_use]
33 #[allow(clippy::missing_panics_doc)]
34 pub fn new(registry: Option<&Registry>) -> Self {
35 let local;
36 let reg = if let Some(r) = registry {
37 r
38 } else {
39 local = Registry::new();
40 &local
41 };
42
43 let messages_received = IntCounter::new(
44 "ws_source_messages_received_total",
45 "Total WS messages received",
46 )
47 .unwrap();
48 let messages_dropped_backpressure = IntCounter::new(
49 "ws_source_messages_dropped_backpressure_total",
50 "Total WS messages dropped (backpressure)",
51 )
52 .unwrap();
53 let bytes_received =
54 IntCounter::new("ws_source_bytes_received_total", "Total WS bytes received").unwrap();
55 let reconnect_count = IntCounter::new(
56 "ws_source_reconnect_total",
57 "Total WS reconnection attempts",
58 )
59 .unwrap();
60 let parse_errors = IntCounter::new(
61 "ws_source_parse_errors_total",
62 "Total WS parse/deserialization errors",
63 )
64 .unwrap();
65 let sequence_gaps = IntCounter::new(
66 "ws_source_sequence_gaps_total",
67 "Total WS sequence gaps detected",
68 )
69 .unwrap();
70 let connected_clients = IntGauge::new(
71 "ws_source_connected_clients",
72 "Current connected WS clients (server mode)",
73 )
74 .unwrap();
75
76 let _ = reg.register(Box::new(messages_received.clone()));
77 let _ = reg.register(Box::new(messages_dropped_backpressure.clone()));
78 let _ = reg.register(Box::new(bytes_received.clone()));
79 let _ = reg.register(Box::new(reconnect_count.clone()));
80 let _ = reg.register(Box::new(parse_errors.clone()));
81 let _ = reg.register(Box::new(sequence_gaps.clone()));
82 let _ = reg.register(Box::new(connected_clients.clone()));
83
84 Self {
85 messages_received,
86 messages_dropped_backpressure,
87 bytes_received,
88 reconnect_count,
89 parse_errors,
90 sequence_gaps,
91 connected_clients,
92 }
93 }
94
95 pub fn record_message(&self, bytes: u64) {
97 self.messages_received.inc();
98 self.bytes_received.inc_by(bytes);
99 }
100
101 pub fn record_drop(&self) {
103 self.messages_dropped_backpressure.inc();
104 }
105
106 pub fn record_reconnect(&self) {
108 self.reconnect_count.inc();
109 }
110
111 pub fn record_parse_error(&self) {
113 self.parse_errors.inc();
114 }
115
116 pub fn record_sequence_gap(&self) {
118 self.sequence_gaps.inc();
119 }
120
121 #[allow(clippy::cast_possible_wrap)]
123 pub fn set_connected_clients(&self, n: u64) {
124 self.connected_clients.set(n as i64);
125 }
126
127 #[must_use]
129 #[allow(clippy::cast_precision_loss)]
130 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
131 let mut m = ConnectorMetrics {
132 records_total: self.messages_received.get(),
133 bytes_total: self.bytes_received.get(),
134 errors_total: self.parse_errors.get(),
135 lag: 0,
136 custom: Vec::new(),
137 };
138 m.add_custom(
139 "ws.messages_dropped_backpressure",
140 self.messages_dropped_backpressure.get() as f64,
141 );
142 m.add_custom("ws.reconnect_count", self.reconnect_count.get() as f64);
143 m.add_custom("ws.sequence_gaps", self.sequence_gaps.get() as f64);
144 m.add_custom("ws.connected_clients", self.connected_clients.get() as f64);
145 m
146 }
147}
148
149impl Default for WebSocketSourceMetrics {
150 fn default() -> Self {
151 Self::new(None)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn test_initial_zeros() {
161 let m = WebSocketSourceMetrics::new(None);
162 let cm = m.to_connector_metrics();
163 assert_eq!(cm.records_total, 0);
164 assert_eq!(cm.bytes_total, 0);
165 assert_eq!(cm.errors_total, 0);
166 }
167
168 #[test]
169 fn test_record_message() {
170 let m = WebSocketSourceMetrics::new(None);
171 m.record_message(1024);
172 m.record_message(2048);
173
174 let cm = m.to_connector_metrics();
175 assert_eq!(cm.records_total, 2);
176 assert_eq!(cm.bytes_total, 3072);
177 }
178
179 #[test]
180 fn test_record_drop() {
181 let m = WebSocketSourceMetrics::new(None);
182 m.record_drop();
183 m.record_drop();
184 m.record_drop();
185
186 let cm = m.to_connector_metrics();
187 let dropped = cm
188 .custom
189 .iter()
190 .find(|(k, _)| k == "ws.messages_dropped_backpressure");
191 assert_eq!(dropped.unwrap().1, 3.0);
192 }
193
194 #[test]
195 fn test_record_reconnect() {
196 let m = WebSocketSourceMetrics::new(None);
197 m.record_reconnect();
198 m.record_reconnect();
199
200 let cm = m.to_connector_metrics();
201 let reconnects = cm.custom.iter().find(|(k, _)| k == "ws.reconnect_count");
202 assert_eq!(reconnects.unwrap().1, 2.0);
203 }
204
205 #[test]
206 fn test_record_parse_error() {
207 let m = WebSocketSourceMetrics::new(None);
208 m.record_parse_error();
209
210 let cm = m.to_connector_metrics();
211 assert_eq!(cm.errors_total, 1);
212 }
213
214 #[test]
215 fn test_record_sequence_gap() {
216 let m = WebSocketSourceMetrics::new(None);
217 m.record_sequence_gap();
218 m.record_sequence_gap();
219
220 let cm = m.to_connector_metrics();
221 let gaps = cm.custom.iter().find(|(k, _)| k == "ws.sequence_gaps");
222 assert_eq!(gaps.unwrap().1, 2.0);
223 }
224
225 #[test]
226 fn test_set_connected_clients() {
227 let m = WebSocketSourceMetrics::new(None);
228 m.set_connected_clients(5);
229
230 let cm = m.to_connector_metrics();
231 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
232 assert_eq!(clients.unwrap().1, 5.0);
233
234 m.set_connected_clients(3);
236 let cm2 = m.to_connector_metrics();
237 let clients2 = cm2.custom.iter().find(|(k, _)| k == "ws.connected_clients");
238 assert_eq!(clients2.unwrap().1, 3.0);
239 }
240
241 #[test]
242 fn test_default() {
243 let m = WebSocketSourceMetrics::default();
244 let cm = m.to_connector_metrics();
245 assert_eq!(cm.records_total, 0);
246 assert_eq!(cm.bytes_total, 0);
247 assert_eq!(cm.errors_total, 0);
248 assert_eq!(cm.custom.len(), 4);
249 }
250
251 #[test]
252 fn test_custom_metrics_count() {
253 let m = WebSocketSourceMetrics::new(None);
254 let cm = m.to_connector_metrics();
255 assert_eq!(cm.custom.len(), 4);
257 }
258
259 #[test]
260 fn test_combined_operations() {
261 let m = WebSocketSourceMetrics::new(None);
262 m.record_message(100);
263 m.record_message(200);
264 m.record_drop();
265 m.record_reconnect();
266 m.record_parse_error();
267 m.record_sequence_gap();
268 m.set_connected_clients(10);
269
270 let cm = m.to_connector_metrics();
271 assert_eq!(cm.records_total, 2);
272 assert_eq!(cm.bytes_total, 300);
273 assert_eq!(cm.errors_total, 1);
274 }
275}