Skip to main content

laminar_connectors/websocket/
metrics.rs

1//! WebSocket source connector metrics.
2//!
3//! [`WebSocketSourceMetrics`] provides lock-free atomic counters for
4//! tracking WebSocket source statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Atomic counters for WebSocket source connector statistics.
12///
13/// All counters use `Relaxed` ordering for maximum throughput on the
14/// hot path. Snapshot reads via [`to_connector_metrics`](Self::to_connector_metrics)
15/// provide a consistent-enough view for monitoring purposes.
16#[derive(Debug)]
17pub struct WebSocketSourceMetrics {
18    /// Total messages received from the WebSocket connection.
19    pub messages_received: AtomicU64,
20    /// Total messages dropped due to backpressure.
21    pub messages_dropped_backpressure: AtomicU64,
22    /// Total bytes received (raw payload, before parsing).
23    pub bytes_received: AtomicU64,
24    /// Total number of reconnection attempts.
25    pub reconnect_count: AtomicU64,
26    /// Total parse/deserialization errors.
27    pub parse_errors: AtomicU64,
28    /// Total detected sequence gaps (application-level).
29    pub sequence_gaps: AtomicU64,
30    /// Current number of connected clients (for server-mode source).
31    pub connected_clients: AtomicU64,
32}
33
34impl WebSocketSourceMetrics {
35    /// Creates a new metrics instance with all counters at zero.
36    #[must_use]
37    pub fn new() -> Self {
38        Self {
39            messages_received: AtomicU64::new(0),
40            messages_dropped_backpressure: AtomicU64::new(0),
41            bytes_received: AtomicU64::new(0),
42            reconnect_count: AtomicU64::new(0),
43            parse_errors: AtomicU64::new(0),
44            sequence_gaps: AtomicU64::new(0),
45            connected_clients: AtomicU64::new(0),
46        }
47    }
48
49    /// Records a successfully received message with the given payload size.
50    pub fn record_message(&self, bytes: u64) {
51        self.messages_received.fetch_add(1, Ordering::Relaxed);
52        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
53    }
54
55    /// Records a message dropped due to backpressure.
56    pub fn record_drop(&self) {
57        self.messages_dropped_backpressure
58            .fetch_add(1, Ordering::Relaxed);
59    }
60
61    /// Records a reconnection attempt.
62    pub fn record_reconnect(&self) {
63        self.reconnect_count.fetch_add(1, Ordering::Relaxed);
64    }
65
66    /// Records a parse/deserialization error.
67    pub fn record_parse_error(&self) {
68        self.parse_errors.fetch_add(1, Ordering::Relaxed);
69    }
70
71    /// Records a detected sequence gap.
72    pub fn record_sequence_gap(&self) {
73        self.sequence_gaps.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Sets the current number of connected clients (server-mode source).
77    pub fn set_connected_clients(&self, n: u64) {
78        self.connected_clients.store(n, Ordering::Relaxed);
79    }
80
81    /// Converts to the SDK's [`ConnectorMetrics`].
82    #[must_use]
83    #[allow(clippy::cast_precision_loss)]
84    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
85        let mut m = ConnectorMetrics {
86            records_total: self.messages_received.load(Ordering::Relaxed),
87            bytes_total: self.bytes_received.load(Ordering::Relaxed),
88            errors_total: self.parse_errors.load(Ordering::Relaxed),
89            lag: 0,
90            custom: Vec::new(),
91        };
92        m.add_custom(
93            "ws.messages_dropped_backpressure",
94            self.messages_dropped_backpressure.load(Ordering::Relaxed) as f64,
95        );
96        m.add_custom(
97            "ws.reconnect_count",
98            self.reconnect_count.load(Ordering::Relaxed) as f64,
99        );
100        m.add_custom(
101            "ws.sequence_gaps",
102            self.sequence_gaps.load(Ordering::Relaxed) as f64,
103        );
104        m.add_custom(
105            "ws.connected_clients",
106            self.connected_clients.load(Ordering::Relaxed) as f64,
107        );
108        m
109    }
110}
111
112impl Default for WebSocketSourceMetrics {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn test_initial_zeros() {
124        let m = WebSocketSourceMetrics::new();
125        let cm = m.to_connector_metrics();
126        assert_eq!(cm.records_total, 0);
127        assert_eq!(cm.bytes_total, 0);
128        assert_eq!(cm.errors_total, 0);
129    }
130
131    #[test]
132    fn test_record_message() {
133        let m = WebSocketSourceMetrics::new();
134        m.record_message(1024);
135        m.record_message(2048);
136
137        let cm = m.to_connector_metrics();
138        assert_eq!(cm.records_total, 2);
139        assert_eq!(cm.bytes_total, 3072);
140    }
141
142    #[test]
143    fn test_record_drop() {
144        let m = WebSocketSourceMetrics::new();
145        m.record_drop();
146        m.record_drop();
147        m.record_drop();
148
149        let cm = m.to_connector_metrics();
150        let dropped = cm
151            .custom
152            .iter()
153            .find(|(k, _)| k == "ws.messages_dropped_backpressure");
154        assert_eq!(dropped.unwrap().1, 3.0);
155    }
156
157    #[test]
158    fn test_record_reconnect() {
159        let m = WebSocketSourceMetrics::new();
160        m.record_reconnect();
161        m.record_reconnect();
162
163        let cm = m.to_connector_metrics();
164        let reconnects = cm.custom.iter().find(|(k, _)| k == "ws.reconnect_count");
165        assert_eq!(reconnects.unwrap().1, 2.0);
166    }
167
168    #[test]
169    fn test_record_parse_error() {
170        let m = WebSocketSourceMetrics::new();
171        m.record_parse_error();
172
173        let cm = m.to_connector_metrics();
174        assert_eq!(cm.errors_total, 1);
175    }
176
177    #[test]
178    fn test_record_sequence_gap() {
179        let m = WebSocketSourceMetrics::new();
180        m.record_sequence_gap();
181        m.record_sequence_gap();
182
183        let cm = m.to_connector_metrics();
184        let gaps = cm.custom.iter().find(|(k, _)| k == "ws.sequence_gaps");
185        assert_eq!(gaps.unwrap().1, 2.0);
186    }
187
188    #[test]
189    fn test_set_connected_clients() {
190        let m = WebSocketSourceMetrics::new();
191        m.set_connected_clients(5);
192
193        let cm = m.to_connector_metrics();
194        let clients = cm.custom.iter().find(|(k, _)| k == "ws.connected_clients");
195        assert_eq!(clients.unwrap().1, 5.0);
196
197        // Verify it overwrites (not accumulates)
198        m.set_connected_clients(3);
199        let cm2 = m.to_connector_metrics();
200        let clients2 = cm2.custom.iter().find(|(k, _)| k == "ws.connected_clients");
201        assert_eq!(clients2.unwrap().1, 3.0);
202    }
203
204    #[test]
205    fn test_default() {
206        let m = WebSocketSourceMetrics::default();
207        let cm = m.to_connector_metrics();
208        assert_eq!(cm.records_total, 0);
209        assert_eq!(cm.bytes_total, 0);
210        assert_eq!(cm.errors_total, 0);
211        assert_eq!(cm.custom.len(), 4);
212    }
213
214    #[test]
215    fn test_custom_metrics_count() {
216        let m = WebSocketSourceMetrics::new();
217        let cm = m.to_connector_metrics();
218        // Should have exactly 4 custom metrics
219        assert_eq!(cm.custom.len(), 4);
220    }
221
222    #[test]
223    fn test_combined_operations() {
224        let m = WebSocketSourceMetrics::new();
225        m.record_message(100);
226        m.record_message(200);
227        m.record_drop();
228        m.record_reconnect();
229        m.record_parse_error();
230        m.record_sequence_gap();
231        m.set_connected_clients(10);
232
233        let cm = m.to_connector_metrics();
234        assert_eq!(cm.records_total, 2);
235        assert_eq!(cm.bytes_total, 300);
236        assert_eq!(cm.errors_total, 1);
237    }
238}