Skip to main content

laminar_connectors/kafka/
backpressure.rs

1//! Backpressure controller for Kafka source consumption.
2//!
3//! [`KafkaBackpressureController`] monitors downstream channel utilization
4//! and signals when the consumer should pause or resume partition
5//! consumption to prevent unbounded memory growth.
6
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9
10/// Controls consumption rate based on downstream channel fill level.
11///
12/// Uses hysteresis (high/low watermark) to avoid rapid pause/resume
13/// oscillation. When the channel fill ratio exceeds the high watermark,
14/// consumption pauses. It resumes only when the ratio drops below the
15/// low watermark.
16///
17/// **Callers must wire the `channel_len` `Arc<AtomicUsize>`**: increment
18/// when sending messages into the downstream channel, decrement when
19/// receiving. Without this wiring, `fill_ratio()` always returns 0.0
20/// and backpressure never triggers.
21#[derive(Debug)]
22pub struct KafkaBackpressureController {
23    /// Fill ratio above which to pause (e.g., 0.8 = 80%).
24    high_watermark: f64,
25    /// Fill ratio below which to resume (e.g., 0.5 = 50%).
26    low_watermark: f64,
27    /// Maximum channel capacity.
28    channel_capacity: usize,
29    /// Current number of items in the channel.
30    channel_len: Arc<AtomicUsize>,
31    /// Whether consumption is currently paused.
32    is_paused: bool,
33}
34
35impl KafkaBackpressureController {
36    /// Creates a new backpressure controller.
37    ///
38    /// # Arguments
39    ///
40    /// * `high_watermark` - Fill ratio to trigger pause (0.0-1.0)
41    /// * `low_watermark` - Fill ratio to trigger resume (0.0-1.0)
42    /// * `channel_capacity` - Maximum channel capacity
43    /// * `channel_len` - Shared atomic counter of current channel size
44    #[must_use]
45    pub fn new(
46        high_watermark: f64,
47        low_watermark: f64,
48        channel_capacity: usize,
49        channel_len: Arc<AtomicUsize>,
50    ) -> Self {
51        Self {
52            high_watermark,
53            low_watermark,
54            channel_capacity,
55            channel_len,
56            is_paused: false,
57        }
58    }
59
60    /// Returns `true` if consumption should be paused.
61    ///
62    /// Triggers when the fill ratio exceeds the high watermark
63    /// and the controller is not already paused.
64    #[must_use]
65    pub fn should_pause(&self) -> bool {
66        !self.is_paused && self.fill_ratio() >= self.high_watermark
67    }
68
69    /// Returns `true` if consumption should resume.
70    ///
71    /// Triggers when the fill ratio drops below the low watermark
72    /// and the controller is currently paused.
73    #[must_use]
74    pub fn should_resume(&self) -> bool {
75        self.is_paused && self.fill_ratio() <= self.low_watermark
76    }
77
78    /// Sets the paused state.
79    pub fn set_paused(&mut self, paused: bool) {
80        self.is_paused = paused;
81    }
82
83    /// Returns whether consumption is currently paused.
84    #[must_use]
85    pub fn is_paused(&self) -> bool {
86        self.is_paused
87    }
88
89    /// Returns the current channel fill ratio (0.0-1.0).
90    #[must_use]
91    #[allow(clippy::cast_precision_loss)]
92    pub fn fill_ratio(&self) -> f64 {
93        if self.channel_capacity == 0 {
94            return 0.0;
95        }
96        let len = self.channel_len.load(Ordering::Relaxed);
97        len as f64 / self.channel_capacity as f64
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    fn make_controller(len: usize, capacity: usize) -> KafkaBackpressureController {
106        let channel_len = Arc::new(AtomicUsize::new(len));
107        KafkaBackpressureController::new(0.8, 0.5, capacity, channel_len)
108    }
109
110    #[test]
111    fn test_should_pause_above_high_watermark() {
112        let ctrl = make_controller(90, 100);
113        assert!(ctrl.should_pause()); // 90% > 80%
114        assert!(!ctrl.should_resume());
115    }
116
117    #[test]
118    fn test_no_pause_below_high_watermark() {
119        let ctrl = make_controller(70, 100);
120        assert!(!ctrl.should_pause()); // 70% < 80%
121    }
122
123    #[test]
124    fn test_should_resume_below_low_watermark() {
125        let mut ctrl = make_controller(40, 100);
126        ctrl.set_paused(true);
127        assert!(ctrl.should_resume()); // 40% < 50%
128        assert!(!ctrl.should_pause());
129    }
130
131    #[test]
132    fn test_hysteresis_no_resume_between_watermarks() {
133        let mut ctrl = make_controller(60, 100);
134        ctrl.set_paused(true);
135        assert!(!ctrl.should_resume()); // 60% is between 50% and 80%
136        assert!(!ctrl.should_pause()); // already paused
137    }
138
139    #[test]
140    fn test_fill_ratio() {
141        let ctrl = make_controller(50, 100);
142        assert!((ctrl.fill_ratio() - 0.5).abs() < f64::EPSILON);
143    }
144
145    #[test]
146    fn test_fill_ratio_empty_channel() {
147        let ctrl = make_controller(0, 0);
148        assert!((ctrl.fill_ratio() - 0.0).abs() < f64::EPSILON);
149    }
150}