Skip to main content

laminar_connectors/websocket/
connection.rs

1//! WebSocket connection management: reconnection, failover, heartbeat.
2//!
3//! Provides exponential backoff with jitter for reconnection attempts
4//! and multi-URL failover support.
5
6use std::time::Duration;
7
8use tracing::{debug, warn};
9
10use super::source_config::ReconnectConfig;
11
12/// Manages WebSocket reconnection with exponential backoff and URL failover.
13pub struct ConnectionManager {
14    /// Reconnection configuration.
15    config: ReconnectConfig,
16    /// Available URLs for failover (source client mode).
17    urls: Vec<String>,
18    /// Index of the currently active URL.
19    current_url_index: usize,
20    /// Current retry attempt number.
21    attempt: u32,
22    /// Current backoff delay.
23    current_delay: Duration,
24    /// Per-instance seed for jitter (prevents thundering herd when
25    /// multiple instances reconnect simultaneously).
26    jitter_seed: u32,
27}
28
29impl ConnectionManager {
30    /// Creates a new connection manager.
31    ///
32    /// # Arguments
33    ///
34    /// * `urls` - One or more WebSocket URLs for failover. First is primary.
35    /// * `config` - Reconnection settings.
36    #[must_use]
37    #[allow(clippy::cast_possible_truncation)]
38    pub fn new(urls: Vec<String>, config: ReconnectConfig) -> Self {
39        let initial_delay = config.initial_delay;
40        // Seed jitter from current time so each instance gets a unique
41        // offset, preventing thundering-herd on simultaneous reconnects.
42        let jitter_seed = std::time::SystemTime::now()
43            .duration_since(std::time::UNIX_EPOCH)
44            .unwrap_or_default()
45            .subsec_nanos();
46        Self {
47            config,
48            urls,
49            current_url_index: 0,
50            attempt: 0,
51            current_delay: initial_delay,
52            jitter_seed,
53        }
54    }
55
56    /// Returns the currently active URL.
57    #[must_use]
58    pub fn current_url(&self) -> &str {
59        &self.urls[self.current_url_index]
60    }
61
62    /// Returns the current retry attempt count.
63    #[must_use]
64    pub fn attempt(&self) -> u32 {
65        self.attempt
66    }
67
68    /// Returns whether reconnection is enabled.
69    #[must_use]
70    pub fn reconnect_enabled(&self) -> bool {
71        self.config.enabled
72    }
73
74    /// Returns whether the maximum retry count has been exceeded.
75    #[must_use]
76    pub fn max_retries_exceeded(&self) -> bool {
77        self.config
78            .max_retries
79            .is_some_and(|max| self.attempt >= max)
80    }
81
82    /// Resets the retry state after a successful connection.
83    pub fn reset(&mut self) {
84        self.attempt = 0;
85        self.current_delay = self.config.initial_delay;
86        debug!(url = %self.current_url(), "connection established, reset retry state");
87    }
88
89    /// Computes the next backoff delay and advances the failover state.
90    ///
91    /// Returns `None` if reconnection is disabled or max retries exceeded.
92    /// Otherwise returns the duration to wait before the next attempt.
93    #[allow(
94        clippy::cast_precision_loss,
95        clippy::cast_possible_truncation,
96        clippy::cast_sign_loss
97    )]
98    pub fn next_backoff(&mut self) -> Option<Duration> {
99        if !self.config.enabled {
100            return None;
101        }
102
103        if self.max_retries_exceeded() {
104            warn!(
105                attempts = self.attempt,
106                max = ?self.config.max_retries,
107                "max reconnection retries exceeded"
108            );
109            return None;
110        }
111
112        self.attempt += 1;
113
114        // Cycle to the next URL for failover.
115        if self.urls.len() > 1 {
116            self.current_url_index = (self.current_url_index + 1) % self.urls.len();
117        }
118
119        let delay = self.current_delay;
120
121        // Apply jitter: ±25% of the delay. Uses per-instance seed XOR
122        // attempt so different instances get different offsets.
123        let delay = if self.config.jitter {
124            let jitter_range = delay.as_millis() as f64 * 0.25;
125            let mixed = self.jitter_seed ^ (self.attempt.wrapping_mul(2_654_435_761));
126            let jitter_offset =
127                (f64::from(mixed % 1000) / 1000.0 * jitter_range * 2.0) - jitter_range;
128            let jittered_ms = (delay.as_millis() as f64 + jitter_offset).max(1.0);
129            Duration::from_millis(jittered_ms as u64)
130        } else {
131            delay
132        };
133
134        // Increase delay for next attempt.
135        let next_ms =
136            (self.current_delay.as_millis() as f64 * self.config.backoff_multiplier) as u64;
137        self.current_delay = Duration::from_millis(next_ms).min(self.config.max_delay);
138
139        warn!(
140            attempt = self.attempt,
141            delay_ms = delay.as_millis(),
142            next_url = %self.current_url(),
143            "scheduling reconnection attempt"
144        );
145
146        Some(delay)
147    }
148
149    /// Returns a list of all configured URLs.
150    #[must_use]
151    pub fn urls(&self) -> &[String] {
152        &self.urls
153    }
154}
155
156impl std::fmt::Debug for ConnectionManager {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("ConnectionManager")
159            .field("current_url", &self.current_url())
160            .field("attempt", &self.attempt)
161            .field("urls", &self.urls.len())
162            .field("reconnect_enabled", &self.config.enabled)
163            .finish_non_exhaustive()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    fn test_config() -> ReconnectConfig {
172        ReconnectConfig {
173            enabled: true,
174            initial_delay: Duration::from_millis(100),
175            max_delay: Duration::from_secs(30),
176            backoff_multiplier: 2.0,
177            max_retries: None,
178            jitter: false,
179        }
180    }
181
182    #[test]
183    fn test_current_url() {
184        let mgr = ConnectionManager::new(vec!["ws://a".into(), "ws://b".into()], test_config());
185        assert_eq!(mgr.current_url(), "ws://a");
186    }
187
188    #[test]
189    fn test_failover_cycles_urls() {
190        let mut mgr = ConnectionManager::new(
191            vec!["ws://a".into(), "ws://b".into(), "ws://c".into()],
192            test_config(),
193        );
194
195        mgr.next_backoff();
196        assert_eq!(mgr.current_url(), "ws://b");
197
198        mgr.next_backoff();
199        assert_eq!(mgr.current_url(), "ws://c");
200
201        mgr.next_backoff();
202        assert_eq!(mgr.current_url(), "ws://a");
203    }
204
205    #[test]
206    fn test_exponential_backoff() {
207        let mut mgr = ConnectionManager::new(vec!["ws://a".into()], test_config());
208
209        let d1 = mgr.next_backoff().unwrap();
210        assert_eq!(d1, Duration::from_millis(100));
211
212        let d2 = mgr.next_backoff().unwrap();
213        assert_eq!(d2, Duration::from_millis(200));
214
215        let d3 = mgr.next_backoff().unwrap();
216        assert_eq!(d3, Duration::from_millis(400));
217    }
218
219    #[test]
220    fn test_max_delay_cap() {
221        let config = ReconnectConfig {
222            enabled: true,
223            initial_delay: Duration::from_secs(20),
224            max_delay: Duration::from_secs(30),
225            backoff_multiplier: 2.0,
226            max_retries: None,
227            jitter: false,
228        };
229        let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
230
231        mgr.next_backoff(); // 20s
232        let d2 = mgr.next_backoff().unwrap(); // would be 40s, capped to 30s
233        assert_eq!(d2, Duration::from_secs(30));
234    }
235
236    #[test]
237    fn test_max_retries() {
238        let config = ReconnectConfig {
239            max_retries: Some(2),
240            ..test_config()
241        };
242        let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
243
244        assert!(mgr.next_backoff().is_some()); // attempt 1
245        assert!(mgr.next_backoff().is_some()); // attempt 2
246        assert!(mgr.next_backoff().is_none()); // exceeded
247    }
248
249    #[test]
250    fn test_reset() {
251        let mut mgr = ConnectionManager::new(vec!["ws://a".into()], test_config());
252
253        mgr.next_backoff();
254        mgr.next_backoff();
255        assert_eq!(mgr.attempt(), 2);
256
257        mgr.reset();
258        assert_eq!(mgr.attempt(), 0);
259
260        let d = mgr.next_backoff().unwrap();
261        assert_eq!(d, Duration::from_millis(100)); // reset to initial
262    }
263
264    #[test]
265    fn test_disabled_reconnect() {
266        let config = ReconnectConfig {
267            enabled: false,
268            ..test_config()
269        };
270        let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
271        assert!(mgr.next_backoff().is_none());
272    }
273
274    #[test]
275    fn test_jitter_varies_delay() {
276        let config = ReconnectConfig {
277            jitter: true,
278            ..test_config()
279        };
280        let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
281
282        let d1 = mgr.next_backoff().unwrap();
283        // With jitter, delay should be approximately 100ms but not exactly
284        assert!(d1.as_millis() > 0);
285        assert!(d1.as_millis() <= 150); // within 25% + margin
286    }
287}