laminar_connectors/kafka/
backpressure.rs1use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9
10#[derive(Debug)]
22pub struct KafkaBackpressureController {
23 high_watermark: f64,
25 low_watermark: f64,
27 channel_capacity: usize,
29 channel_len: Arc<AtomicUsize>,
31 is_paused: bool,
33}
34
35impl KafkaBackpressureController {
36 #[must_use]
45 pub fn new(
46 high_watermark: f64,
47 low_watermark: f64,
48 channel_capacity: usize,
49 channel_len: Arc<AtomicUsize>,
50 ) -> Self {
51 Self {
52 high_watermark,
53 low_watermark,
54 channel_capacity,
55 channel_len,
56 is_paused: false,
57 }
58 }
59
60 #[must_use]
65 pub fn should_pause(&self) -> bool {
66 !self.is_paused && self.fill_ratio() >= self.high_watermark
67 }
68
69 #[must_use]
74 pub fn should_resume(&self) -> bool {
75 self.is_paused && self.fill_ratio() <= self.low_watermark
76 }
77
78 pub fn set_paused(&mut self, paused: bool) {
80 self.is_paused = paused;
81 }
82
83 #[must_use]
85 pub fn is_paused(&self) -> bool {
86 self.is_paused
87 }
88
89 #[must_use]
91 #[allow(clippy::cast_precision_loss)]
92 pub fn fill_ratio(&self) -> f64 {
93 if self.channel_capacity == 0 {
94 return 0.0;
95 }
96 let len = self.channel_len.load(Ordering::Relaxed);
97 len as f64 / self.channel_capacity as f64
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 fn make_controller(len: usize, capacity: usize) -> KafkaBackpressureController {
106 let channel_len = Arc::new(AtomicUsize::new(len));
107 KafkaBackpressureController::new(0.8, 0.5, capacity, channel_len)
108 }
109
110 #[test]
111 fn test_should_pause_above_high_watermark() {
112 let ctrl = make_controller(90, 100);
113 assert!(ctrl.should_pause()); assert!(!ctrl.should_resume());
115 }
116
117 #[test]
118 fn test_no_pause_below_high_watermark() {
119 let ctrl = make_controller(70, 100);
120 assert!(!ctrl.should_pause()); }
122
123 #[test]
124 fn test_should_resume_below_low_watermark() {
125 let mut ctrl = make_controller(40, 100);
126 ctrl.set_paused(true);
127 assert!(ctrl.should_resume()); assert!(!ctrl.should_pause());
129 }
130
131 #[test]
132 fn test_hysteresis_no_resume_between_watermarks() {
133 let mut ctrl = make_controller(60, 100);
134 ctrl.set_paused(true);
135 assert!(!ctrl.should_resume()); assert!(!ctrl.should_pause()); }
138
139 #[test]
140 fn test_fill_ratio() {
141 let ctrl = make_controller(50, 100);
142 assert!((ctrl.fill_ratio() - 0.5).abs() < f64::EPSILON);
143 }
144
145 #[test]
146 fn test_fill_ratio_empty_channel() {
147 let ctrl = make_controller(0, 0);
148 assert!((ctrl.fill_ratio() - 0.0).abs() < f64::EPSILON);
149 }
150}