laminar_connectors/websocket/
connection.rs1use std::time::Duration;
7
8use tracing::{debug, warn};
9
10use super::source_config::ReconnectConfig;
11
12pub struct ConnectionManager {
14 config: ReconnectConfig,
16 urls: Vec<String>,
18 current_url_index: usize,
20 attempt: u32,
22 current_delay: Duration,
24 jitter_seed: u32,
27}
28
29impl ConnectionManager {
30 #[must_use]
37 #[allow(clippy::cast_possible_truncation)]
38 pub fn new(urls: Vec<String>, config: ReconnectConfig) -> Self {
39 let initial_delay = config.initial_delay;
40 let jitter_seed = std::time::SystemTime::now()
43 .duration_since(std::time::UNIX_EPOCH)
44 .unwrap_or_default()
45 .subsec_nanos();
46 Self {
47 config,
48 urls,
49 current_url_index: 0,
50 attempt: 0,
51 current_delay: initial_delay,
52 jitter_seed,
53 }
54 }
55
56 #[must_use]
58 pub fn current_url(&self) -> &str {
59 &self.urls[self.current_url_index]
60 }
61
62 #[must_use]
64 pub fn attempt(&self) -> u32 {
65 self.attempt
66 }
67
68 #[must_use]
70 pub fn reconnect_enabled(&self) -> bool {
71 self.config.enabled
72 }
73
74 #[must_use]
76 pub fn max_retries_exceeded(&self) -> bool {
77 self.config
78 .max_retries
79 .is_some_and(|max| self.attempt >= max)
80 }
81
82 pub fn reset(&mut self) {
84 self.attempt = 0;
85 self.current_delay = self.config.initial_delay;
86 debug!(url = %self.current_url(), "connection established, reset retry state");
87 }
88
89 #[allow(
94 clippy::cast_precision_loss,
95 clippy::cast_possible_truncation,
96 clippy::cast_sign_loss
97 )]
98 pub fn next_backoff(&mut self) -> Option<Duration> {
99 if !self.config.enabled {
100 return None;
101 }
102
103 if self.max_retries_exceeded() {
104 warn!(
105 attempts = self.attempt,
106 max = ?self.config.max_retries,
107 "max reconnection retries exceeded"
108 );
109 return None;
110 }
111
112 self.attempt += 1;
113
114 if self.urls.len() > 1 {
116 self.current_url_index = (self.current_url_index + 1) % self.urls.len();
117 }
118
119 let delay = self.current_delay;
120
121 let delay = if self.config.jitter {
124 let jitter_range = delay.as_millis() as f64 * 0.25;
125 let mixed = self.jitter_seed ^ (self.attempt.wrapping_mul(2_654_435_761));
126 let jitter_offset =
127 (f64::from(mixed % 1000) / 1000.0 * jitter_range * 2.0) - jitter_range;
128 let jittered_ms = (delay.as_millis() as f64 + jitter_offset).max(1.0);
129 Duration::from_millis(jittered_ms as u64)
130 } else {
131 delay
132 };
133
134 let next_ms =
136 (self.current_delay.as_millis() as f64 * self.config.backoff_multiplier) as u64;
137 self.current_delay = Duration::from_millis(next_ms).min(self.config.max_delay);
138
139 warn!(
140 attempt = self.attempt,
141 delay_ms = delay.as_millis(),
142 next_url = %self.current_url(),
143 "scheduling reconnection attempt"
144 );
145
146 Some(delay)
147 }
148
149 #[must_use]
151 pub fn urls(&self) -> &[String] {
152 &self.urls
153 }
154}
155
156impl std::fmt::Debug for ConnectionManager {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("ConnectionManager")
159 .field("current_url", &self.current_url())
160 .field("attempt", &self.attempt)
161 .field("urls", &self.urls.len())
162 .field("reconnect_enabled", &self.config.enabled)
163 .finish_non_exhaustive()
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 fn test_config() -> ReconnectConfig {
172 ReconnectConfig {
173 enabled: true,
174 initial_delay: Duration::from_millis(100),
175 max_delay: Duration::from_secs(30),
176 backoff_multiplier: 2.0,
177 max_retries: None,
178 jitter: false,
179 }
180 }
181
182 #[test]
183 fn test_current_url() {
184 let mgr = ConnectionManager::new(vec!["ws://a".into(), "ws://b".into()], test_config());
185 assert_eq!(mgr.current_url(), "ws://a");
186 }
187
188 #[test]
189 fn test_failover_cycles_urls() {
190 let mut mgr = ConnectionManager::new(
191 vec!["ws://a".into(), "ws://b".into(), "ws://c".into()],
192 test_config(),
193 );
194
195 mgr.next_backoff();
196 assert_eq!(mgr.current_url(), "ws://b");
197
198 mgr.next_backoff();
199 assert_eq!(mgr.current_url(), "ws://c");
200
201 mgr.next_backoff();
202 assert_eq!(mgr.current_url(), "ws://a");
203 }
204
205 #[test]
206 fn test_exponential_backoff() {
207 let mut mgr = ConnectionManager::new(vec!["ws://a".into()], test_config());
208
209 let d1 = mgr.next_backoff().unwrap();
210 assert_eq!(d1, Duration::from_millis(100));
211
212 let d2 = mgr.next_backoff().unwrap();
213 assert_eq!(d2, Duration::from_millis(200));
214
215 let d3 = mgr.next_backoff().unwrap();
216 assert_eq!(d3, Duration::from_millis(400));
217 }
218
219 #[test]
220 fn test_max_delay_cap() {
221 let config = ReconnectConfig {
222 enabled: true,
223 initial_delay: Duration::from_secs(20),
224 max_delay: Duration::from_secs(30),
225 backoff_multiplier: 2.0,
226 max_retries: None,
227 jitter: false,
228 };
229 let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
230
231 mgr.next_backoff(); let d2 = mgr.next_backoff().unwrap(); assert_eq!(d2, Duration::from_secs(30));
234 }
235
236 #[test]
237 fn test_max_retries() {
238 let config = ReconnectConfig {
239 max_retries: Some(2),
240 ..test_config()
241 };
242 let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
243
244 assert!(mgr.next_backoff().is_some()); assert!(mgr.next_backoff().is_some()); assert!(mgr.next_backoff().is_none()); }
248
249 #[test]
250 fn test_reset() {
251 let mut mgr = ConnectionManager::new(vec!["ws://a".into()], test_config());
252
253 mgr.next_backoff();
254 mgr.next_backoff();
255 assert_eq!(mgr.attempt(), 2);
256
257 mgr.reset();
258 assert_eq!(mgr.attempt(), 0);
259
260 let d = mgr.next_backoff().unwrap();
261 assert_eq!(d, Duration::from_millis(100)); }
263
264 #[test]
265 fn test_disabled_reconnect() {
266 let config = ReconnectConfig {
267 enabled: false,
268 ..test_config()
269 };
270 let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
271 assert!(mgr.next_backoff().is_none());
272 }
273
274 #[test]
275 fn test_jitter_varies_delay() {
276 let config = ReconnectConfig {
277 jitter: true,
278 ..test_config()
279 };
280 let mut mgr = ConnectionManager::new(vec!["ws://a".into()], config);
281
282 let d1 = mgr.next_backoff().unwrap();
283 assert!(d1.as_millis() > 0);
285 assert!(d1.as_millis() <= 150); }
287}