Skip to main content

laminar_connectors/
retry.rs

1//! Retry/backoff helper shared by connectors.
2//!
3//! Centralises exponential backoff with jitter so every reconnect path
4//! behaves the same way: capped exponent (no shift overflow), capped
5//! delay, jittered to break thundering-herd on broker-wide outages.
6
7use std::time::Duration;
8
9use rand::RngExt;
10
11/// Exponential backoff schedule with jitter and a cap.
12#[derive(Debug, Clone, Copy)]
13pub struct Backoff {
14    initial: Duration,
15    max: Duration,
16    /// Multiplicative jitter range, expressed as a fraction in `[0.0, 1.0]`.
17    /// `0.25` means the actual delay is uniform on
18    /// `[delay * 0.75, delay * 1.25]`.
19    jitter: f64,
20}
21
22impl Backoff {
23    /// New backoff with the given bounds and jitter fraction.
24    #[must_use]
25    pub const fn new(initial: Duration, max: Duration, jitter: f64) -> Self {
26        Self {
27            initial,
28            max,
29            jitter,
30        }
31    }
32
33    /// Default broker-reconnect schedule: 1s → 30s, ±25 % jitter.
34    /// Matches what Kafka clients do; safe under simultaneous broker
35    /// restarts (jitter avoids reconnect storms).
36    #[must_use]
37    pub const fn broker_reconnect() -> Self {
38        Self::new(Duration::from_secs(1), Duration::from_secs(30), 0.25)
39    }
40
41    /// Compute the delay for `attempt` (0-indexed). Caps the exponent at
42    /// 30 to prevent shift overflow with adversarial inputs, then caps
43    /// the resulting duration at `self.max`, then applies jitter.
44    #[must_use]
45    pub fn delay(&self, attempt: u32) -> Duration {
46        // 2^30 seconds is ~34 years; capping the exponent at 30 keeps
47        // the multiplication in u64 range for any sane `initial`.
48        let shift = attempt.min(30);
49        let factor = 1u64 << shift;
50        let raw_nanos = self
51            .initial
52            .as_nanos()
53            .saturating_mul(u128::from(factor))
54            .min(u128::from(u64::MAX));
55        #[allow(clippy::cast_possible_truncation)]
56        let raw = Duration::from_nanos(raw_nanos as u64).min(self.max);
57
58        if self.jitter <= 0.0 {
59            return raw;
60        }
61        let mut rng = rand::rng();
62        let frac: f64 = rng.random_range(-self.jitter..=self.jitter);
63        #[allow(
64            clippy::cast_precision_loss,
65            clippy::cast_sign_loss,
66            clippy::cast_possible_truncation
67        )]
68        let jittered_nanos = (raw.as_nanos() as f64 * (1.0 + frac)).max(0.0) as u64;
69        Duration::from_nanos(jittered_nanos)
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76
77    #[test]
78    fn delay_caps_at_max() {
79        let b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30), 0.0);
80        assert_eq!(b.delay(0), Duration::from_secs(1));
81        assert_eq!(b.delay(1), Duration::from_secs(2));
82        assert_eq!(b.delay(4), Duration::from_secs(16));
83        assert_eq!(b.delay(5), Duration::from_secs(30));
84        // Past saturation still capped, no panic.
85        assert_eq!(b.delay(100), Duration::from_secs(30));
86        assert_eq!(b.delay(u32::MAX), Duration::from_secs(30));
87    }
88
89    #[test]
90    fn jitter_stays_inside_bounds() {
91        let initial = Duration::from_secs(1);
92        let max = Duration::from_secs(60);
93        let b = Backoff::new(initial, max, 0.25);
94        for attempt in 0..7 {
95            let d = b.delay(attempt);
96            let raw = (initial.saturating_mul(1u32 << attempt)).min(max);
97            let lo = raw.as_secs_f64() * 0.74;
98            let hi = raw.as_secs_f64() * 1.26;
99            let actual = d.as_secs_f64();
100            assert!(
101                actual >= lo && actual <= hi,
102                "attempt {attempt}: {actual} not in [{lo}, {hi}]"
103            );
104        }
105    }
106
107    #[test]
108    fn shift_overflow_protected() {
109        // The previous hand-rolled `1u64 << consecutive_failures` panicked
110        // for attempt >= 64. Backoff must not.
111        let b = Backoff::broker_reconnect();
112        let _ = b.delay(64);
113        let _ = b.delay(u32::MAX);
114    }
115}