laminar_connectors/websocket/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug)]
17pub struct WebSocketSourceMetrics {
18 pub messages_received: AtomicU64,
20 pub messages_dropped_backpressure: AtomicU64,
22 pub bytes_received: AtomicU64,
24 pub reconnect_count: AtomicU64,
26 pub parse_errors: AtomicU64,
28 pub sequence_gaps: AtomicU64,
30 pub connected_clients: AtomicU64,
32}
33
34impl WebSocketSourceMetrics {
35 #[must_use]
37 pub fn new() -> Self {
38 Self {
39 messages_received: AtomicU64::new(0),
40 messages_dropped_backpressure: AtomicU64::new(0),
41 bytes_received: AtomicU64::new(0),
42 reconnect_count: AtomicU64::new(0),
43 parse_errors: AtomicU64::new(0),
44 sequence_gaps: AtomicU64::new(0),
45 connected_clients: AtomicU64::new(0),
46 }
47 }
48
49 pub fn record_message(&self, bytes: u64) {
51 self.messages_received.fetch_add(1, Ordering::Relaxed);
52 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
53 }
54
55 pub fn record_drop(&self) {
57 self.messages_dropped_backpressure
58 .fetch_add(1, Ordering::Relaxed);
59 }
60
61 pub fn record_reconnect(&self) {
63 self.reconnect_count.fetch_add(1, Ordering::Relaxed);
64 }
65
66 pub fn record_parse_error(&self) {
68 self.parse_errors.fetch_add(1, Ordering::Relaxed);
69 }
70
71 pub fn record_sequence_gap(&self) {
73 self.sequence_gaps.fetch_add(1, Ordering::Relaxed);
74 }
75
76 pub fn set_connected_clients(&self, n: u64) {
78 self.connected_clients.store(n, Ordering::Relaxed);
79 }
80
81 #[must_use]
83 #[allow(clippy::cast_precision_loss)]
84 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
85 let mut m = ConnectorMetrics {
86 records_total: self.messages_received.load(Ordering::Relaxed),
87 bytes_total: self.bytes_received.load(Ordering::Relaxed),
88 errors_total: self.parse_errors.load(Ordering::Relaxed),
89 lag: 0,
90 custom: Vec::new(),
91 };
92 m.add_custom(
93 "ws.messages_dropped_backpressure",
94 self.messages_dropped_backpressure.load(Ordering::Relaxed) as f64,
95 );
96 m.add_custom(
97 "ws.reconnect_count",
98 self.reconnect_count.load(Ordering::Relaxed) as f64,
99 );
100 m.add_custom(
101 "ws.sequence_gaps",
102 self.sequence_gaps.load(Ordering::Relaxed) as f64,
103 );
104 m.add_custom(
105 "ws.connected_clients",
106 self.connected_clients.load(Ordering::Relaxed) as f64,
107 );
108 m
109 }
110}
111
112impl Default for WebSocketSourceMetrics {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn test_initial_zeros() {
124 let m = WebSocketSourceMetrics::new();
125 let cm = m.to_connector_metrics();
126 assert_eq!(cm.records_total, 0);
127 assert_eq!(cm.bytes_total, 0);
128 assert_eq!(cm.errors_total, 0);
129 }
130
131 #[test]
132 fn test_record_message() {
133 let m = WebSocketSourceMetrics::new();
134 m.record_message(1024);
135 m.record_message(2048);
136
137 let cm = m.to_connector_metrics();
138 assert_eq!(cm.records_total, 2);
139 assert_eq!(cm.bytes_total, 3072);
140 }
141
142 #[test]
143 fn test_record_drop() {
144 let m = WebSocketSourceMetrics::new();
145 m.record_drop();
146 m.record_drop();
147 m.record_drop();
148
149 let cm = m.to_connector_metrics();
150 let dropped = cm
151 .custom
152 .iter()
153 .find(|(k, _)| k == "ws.messages_dropped_backpressure");
154 assert_eq!(dropped.unwrap().1, 3.0);
155 }
156
157 #[test]
158 fn test_record_reconnect() {
159 let m = WebSocketSourceMetrics::new();
160 m.record_reconnect();
161 m.record_reconnect();
162
163 let cm = m.to_connector_metrics();
164 let reconnects = cm.custom.iter().find(|(k, _)| k == "ws.reconnect_count");
165 assert_eq!(reconnects.unwrap().1, 2.0);
166 }
167
168 #[test]
169 fn test_record_parse_error() {
170 let m = WebSocketSourceMetrics::new();
171 m.record_parse_error();
172
173 let cm = m.to_connector_metrics();
174 assert_eq!(cm.errors_total, 1);
175 }
176
177 #[test]
178 fn test_record_sequence_gap() {
179 let m = WebSocketSourceMetrics::new();
180 m.record_sequence_gap();
181 m.record_sequence_gap();
182
183 let cm = m.to_connector_metrics();
184 let gaps = cm.custom.iter().find(|(k, _)| k == "ws.sequence_gaps");
185 assert_eq!(gaps.unwrap().1, 2.0);
186 }
187
188 #[test]
189 fn test_set_connected_clients() {
190 let m = WebSocketSourceMetrics::new();
191 m.set_connected_clients(5);
192
193 let cm = m.to_connector_metrics();
194 let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
195 assert_eq!(clients.unwrap().1, 5.0);
196
197 m.set_connected_clients(3);
199 let cm2 = m.to_connector_metrics();
200 let clients2 = cm2.custom.iter().find(|(k, _)| k == "ws.connected_clients");
201 assert_eq!(clients2.unwrap().1, 3.0);
202 }
203
204 #[test]
205 fn test_default() {
206 let m = WebSocketSourceMetrics::default();
207 let cm = m.to_connector_metrics();
208 assert_eq!(cm.records_total, 0);
209 assert_eq!(cm.bytes_total, 0);
210 assert_eq!(cm.errors_total, 0);
211 assert_eq!(cm.custom.len(), 4);
212 }
213
214 #[test]
215 fn test_custom_metrics_count() {
216 let m = WebSocketSourceMetrics::new();
217 let cm = m.to_connector_metrics();
218 assert_eq!(cm.custom.len(), 4);
220 }
221
222 #[test]
223 fn test_combined_operations() {
224 let m = WebSocketSourceMetrics::new();
225 m.record_message(100);
226 m.record_message(200);
227 m.record_drop();
228 m.record_reconnect();
229 m.record_parse_error();
230 m.record_sequence_gap();
231 m.set_connected_clients(10);
232
233 let cm = m.to_connector_metrics();
234 assert_eq!(cm.records_total, 2);
235 assert_eq!(cm.bytes_total, 300);
236 assert_eq!(cm.errors_total, 1);
237 }
238}