laminar_connectors/websocket/
sink_metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
7
8use crate::prom::reg_or_local;
9
10#[derive(Debug, Clone)]
12pub struct WebSocketSinkMetrics {
13 pub messages_sent: IntCounter,
15 pub messages_dropped_slow_client: IntCounter,
17 pub bytes_sent: IntCounter,
19 pub connected_clients: IntGauge,
21 pub client_disconnects: IntCounter,
23 pub replay_requests: IntCounter,
25 pub ping_timeouts: IntCounter,
27}
28
29impl WebSocketSinkMetrics {
30 #[must_use]
32 #[allow(clippy::missing_panics_doc)]
33 pub fn new(registry: Option<&Registry>) -> Self {
34 let mut local = None;
35 let reg = reg_or_local(registry, &mut local);
36
37 Self {
38 messages_sent: reg.counter("ws_sink_messages_sent_total", "Total WS messages sent"),
39 messages_dropped_slow_client: reg.counter(
40 "ws_sink_messages_dropped_slow_client_total",
41 "Total WS messages dropped (slow client)",
42 ),
43 bytes_sent: reg.counter("ws_sink_bytes_sent_total", "Total WS bytes sent"),
44 connected_clients: reg
45 .gauge("ws_sink_connected_clients", "Current connected WS clients"),
46 client_disconnects: reg.counter(
47 "ws_sink_client_disconnects_total",
48 "Total WS client disconnections",
49 ),
50 replay_requests: reg
51 .counter("ws_sink_replay_requests_total", "Total WS replay requests"),
52 ping_timeouts: reg.counter("ws_sink_ping_timeouts_total", "Total WS ping timeouts"),
53 }
54 }
55
56 pub fn record_send(&self, bytes: u64) {
58 self.messages_sent.inc();
59 self.bytes_sent.inc_by(bytes);
60 }
61
62 pub fn record_drop(&self) {
64 self.messages_dropped_slow_client.inc();
65 }
66
67 pub fn record_connect(&self) {
69 self.connected_clients.inc();
70 }
71
72 pub fn record_disconnect(&self) {
74 self.client_disconnects.inc();
75 let current = self.connected_clients.get();
78 if current > 0 {
79 self.connected_clients.dec();
80 }
81 }
82
83 pub fn record_replay(&self) {
85 self.replay_requests.inc();
86 }
87
88 pub fn record_ping_timeout(&self) {
90 self.ping_timeouts.inc();
91 }
92}
93
94impl Default for WebSocketSinkMetrics {
95 fn default() -> Self {
96 Self::new(None)
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[test]
105 fn test_initial_zeros() {
106 let m = WebSocketSinkMetrics::new(None);
107 assert_eq!(m.messages_sent.get(), 0);
108 assert_eq!(m.bytes_sent.get(), 0);
109 assert_eq!(m.messages_dropped_slow_client.get(), 0);
110 }
111
112 #[test]
113 fn test_record_send() {
114 let m = WebSocketSinkMetrics::new(None);
115 m.record_send(512);
116 m.record_send(1024);
117
118 assert_eq!(m.messages_sent.get(), 2);
119 assert_eq!(m.bytes_sent.get(), 1536);
120 }
121
122 #[test]
123 fn test_record_drop() {
124 let m = WebSocketSinkMetrics::new(None);
125 m.record_drop();
126 m.record_drop();
127
128 assert_eq!(m.messages_dropped_slow_client.get(), 2);
129 }
130
131 #[test]
132 fn test_record_connect_disconnect() {
133 let m = WebSocketSinkMetrics::new(None);
134 m.record_connect();
135 m.record_connect();
136 m.record_connect();
137 m.record_disconnect();
138
139 assert_eq!(m.connected_clients.get(), 2);
140 assert_eq!(m.client_disconnects.get(), 1);
141 }
142
143 #[test]
144 fn test_disconnect_saturates_at_zero() {
145 let m = WebSocketSinkMetrics::new(None);
146 m.record_disconnect();
148
149 assert_eq!(m.connected_clients.get(), 0);
150 assert_eq!(m.client_disconnects.get(), 1);
151 }
152
153 #[test]
154 fn test_record_replay() {
155 let m = WebSocketSinkMetrics::new(None);
156 m.record_replay();
157 m.record_replay();
158 m.record_replay();
159
160 assert_eq!(m.replay_requests.get(), 3);
161 }
162
163 #[test]
164 fn test_default() {
165 let m = WebSocketSinkMetrics::default();
166 assert_eq!(m.messages_sent.get(), 0);
167 assert_eq!(m.bytes_sent.get(), 0);
168 assert_eq!(m.messages_dropped_slow_client.get(), 0);
169 }
170
171 #[test]
172 fn test_combined_operations() {
173 let m = WebSocketSinkMetrics::new(None);
174 m.record_send(100);
175 m.record_send(200);
176 m.record_send(300);
177 m.record_drop();
178 m.record_connect();
179 m.record_connect();
180 m.record_disconnect();
181 m.record_replay();
182
183 assert_eq!(m.messages_sent.get(), 3);
184 assert_eq!(m.bytes_sent.get(), 600);
185 assert_eq!(m.messages_dropped_slow_client.get(), 1);
186 assert_eq!(m.connected_clients.get(), 1);
187 }
188}