laminar_connectors/websocket/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
7
8use crate::prom::reg_or_local;
9
10#[derive(Debug, Clone)]
12pub struct WebSocketSourceMetrics {
13 pub messages_received: IntCounter,
15 pub messages_dropped_backpressure: IntCounter,
17 pub bytes_received: IntCounter,
19 pub reconnect_count: IntCounter,
21 pub parse_errors: IntCounter,
23 pub sequence_gaps: IntCounter,
25 pub connected_clients: IntGauge,
27}
28
29impl WebSocketSourceMetrics {
30 #[must_use]
32 #[allow(clippy::missing_panics_doc)]
33 pub fn new(registry: Option<&Registry>) -> Self {
34 let mut local = None;
35 let reg = reg_or_local(registry, &mut local);
36
37 Self {
38 messages_received: reg.counter(
39 "ws_source_messages_received_total",
40 "Total WS messages received",
41 ),
42 messages_dropped_backpressure: reg.counter(
43 "ws_source_messages_dropped_backpressure_total",
44 "Total WS messages dropped (backpressure)",
45 ),
46 bytes_received: reg
47 .counter("ws_source_bytes_received_total", "Total WS bytes received"),
48 reconnect_count: reg.counter(
49 "ws_source_reconnect_total",
50 "Total WS reconnection attempts",
51 ),
52 parse_errors: reg.counter(
53 "ws_source_parse_errors_total",
54 "Total WS parse/deserialization errors",
55 ),
56 sequence_gaps: reg.counter(
57 "ws_source_sequence_gaps_total",
58 "Total WS sequence gaps detected",
59 ),
60 connected_clients: reg.gauge(
61 "ws_source_connected_clients",
62 "Current connected WS clients (server mode)",
63 ),
64 }
65 }
66
67 pub fn record_message(&self, bytes: u64) {
69 self.messages_received.inc();
70 self.bytes_received.inc_by(bytes);
71 }
72
73 pub fn record_drop(&self) {
75 self.messages_dropped_backpressure.inc();
76 }
77
78 pub fn record_reconnect(&self) {
80 self.reconnect_count.inc();
81 }
82
83 pub fn record_parse_error(&self) {
85 self.parse_errors.inc();
86 }
87
88 pub fn record_sequence_gap(&self) {
90 self.sequence_gaps.inc();
91 }
92
93 #[allow(clippy::cast_possible_wrap)]
95 pub fn set_connected_clients(&self, n: u64) {
96 self.connected_clients.set(n as i64);
97 }
98}
99
100impl Default for WebSocketSourceMetrics {
101 fn default() -> Self {
102 Self::new(None)
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 #[test]
111 fn test_initial_zeros() {
112 let m = WebSocketSourceMetrics::new(None);
113 assert_eq!(m.messages_received.get(), 0);
114 assert_eq!(m.bytes_received.get(), 0);
115 assert_eq!(m.parse_errors.get(), 0);
116 }
117
118 #[test]
119 fn test_record_message() {
120 let m = WebSocketSourceMetrics::new(None);
121 m.record_message(1024);
122 m.record_message(2048);
123
124 assert_eq!(m.messages_received.get(), 2);
125 assert_eq!(m.bytes_received.get(), 3072);
126 }
127
128 #[test]
129 fn test_record_drop() {
130 let m = WebSocketSourceMetrics::new(None);
131 m.record_drop();
132 m.record_drop();
133 m.record_drop();
134
135 assert_eq!(m.messages_dropped_backpressure.get(), 3);
136 }
137
138 #[test]
139 fn test_record_reconnect() {
140 let m = WebSocketSourceMetrics::new(None);
141 m.record_reconnect();
142 m.record_reconnect();
143
144 assert_eq!(m.reconnect_count.get(), 2);
145 }
146
147 #[test]
148 fn test_record_parse_error() {
149 let m = WebSocketSourceMetrics::new(None);
150 m.record_parse_error();
151
152 assert_eq!(m.parse_errors.get(), 1);
153 }
154
155 #[test]
156 fn test_record_sequence_gap() {
157 let m = WebSocketSourceMetrics::new(None);
158 m.record_sequence_gap();
159 m.record_sequence_gap();
160
161 assert_eq!(m.sequence_gaps.get(), 2);
162 }
163
164 #[test]
165 fn test_set_connected_clients() {
166 let m = WebSocketSourceMetrics::new(None);
167 m.set_connected_clients(5);
168
169 assert_eq!(m.connected_clients.get(), 5);
170
171 m.set_connected_clients(3);
173 assert_eq!(m.connected_clients.get(), 3);
174 }
175
176 #[test]
177 fn test_default() {
178 let m = WebSocketSourceMetrics::default();
179 assert_eq!(m.messages_received.get(), 0);
180 assert_eq!(m.bytes_received.get(), 0);
181 assert_eq!(m.parse_errors.get(), 0);
182 }
183
184 #[test]
185 fn test_combined_operations() {
186 let m = WebSocketSourceMetrics::new(None);
187 m.record_message(100);
188 m.record_message(200);
189 m.record_drop();
190 m.record_reconnect();
191 m.record_parse_error();
192 m.record_sequence_gap();
193 m.set_connected_clients(10);
194
195 assert_eq!(m.messages_received.get(), 2);
196 assert_eq!(m.bytes_received.get(), 300);
197 assert_eq!(m.parse_errors.get(), 1);
198 }
199}