1use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
34
35#[derive(Debug, Clone)]
37pub struct BackpressureConfig {
38 pub exclusive_credits: usize,
41
42 pub floating_credits: usize,
45
46 pub overflow_strategy: OverflowStrategy,
48
49 pub high_watermark: f64,
52
53 pub low_watermark: f64,
56}
57
58impl Default for BackpressureConfig {
59 fn default() -> Self {
60 Self {
61 exclusive_credits: 4,
62 floating_credits: 8,
63 overflow_strategy: OverflowStrategy::Block,
64 high_watermark: 0.8,
65 low_watermark: 0.5,
66 }
67 }
68}
69
70impl BackpressureConfig {
71 #[must_use]
73 pub fn builder() -> BackpressureConfigBuilder {
74 BackpressureConfigBuilder::default()
75 }
76
77 #[must_use]
79 pub fn total_credits(&self) -> usize {
80 self.exclusive_credits + self.floating_credits
81 }
82}
83
84#[derive(Debug, Default)]
86pub struct BackpressureConfigBuilder {
87 exclusive_credits: Option<usize>,
88 floating_credits: Option<usize>,
89 overflow_strategy: Option<OverflowStrategy>,
90 high_watermark: Option<f64>,
91 low_watermark: Option<f64>,
92}
93
94impl BackpressureConfigBuilder {
95 #[must_use]
97 pub fn exclusive_credits(mut self, credits: usize) -> Self {
98 self.exclusive_credits = Some(credits);
99 self
100 }
101
102 #[must_use]
104 pub fn floating_credits(mut self, credits: usize) -> Self {
105 self.floating_credits = Some(credits);
106 self
107 }
108
109 #[must_use]
111 pub fn overflow_strategy(mut self, strategy: OverflowStrategy) -> Self {
112 self.overflow_strategy = Some(strategy);
113 self
114 }
115
116 #[must_use]
118 pub fn high_watermark(mut self, watermark: f64) -> Self {
119 self.high_watermark = Some(watermark.clamp(0.0, 1.0));
120 self
121 }
122
123 #[must_use]
125 pub fn low_watermark(mut self, watermark: f64) -> Self {
126 self.low_watermark = Some(watermark.clamp(0.0, 1.0));
127 self
128 }
129
130 pub fn build(self) -> std::result::Result<BackpressureConfig, String> {
136 let high = self.high_watermark.unwrap_or(0.8);
137 let low = self.low_watermark.unwrap_or(0.5);
138 if high < low {
139 return Err(format!(
140 "high_watermark ({high}) must be >= low_watermark ({low})"
141 ));
142 }
143 Ok(BackpressureConfig {
144 exclusive_credits: self.exclusive_credits.unwrap_or(4).min(u16::MAX as usize),
145 floating_credits: self.floating_credits.unwrap_or(8).min(u16::MAX as usize),
146 overflow_strategy: self.overflow_strategy.unwrap_or(OverflowStrategy::Block),
147 high_watermark: high,
148 low_watermark: low,
149 })
150 }
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum OverflowStrategy {
156 Block,
159
160 Drop,
163
164 Error,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum CreditAcquireResult {
172 Acquired,
174 WouldBlock,
176 Dropped,
178}
179
180#[derive(Debug)]
184pub struct CreditGate {
185 available: AtomicI64,
187 max_credits: usize,
189 config: BackpressureConfig,
191 metrics: CreditMetrics,
193}
194
195impl CreditGate {
196 #[must_use]
198 pub fn new(config: BackpressureConfig) -> Self {
199 let max_credits = config.total_credits();
200 Self {
201 #[allow(clippy::cast_possible_wrap)] available: AtomicI64::new(max_credits as i64),
203 max_credits,
204 config,
205 metrics: CreditMetrics::new(),
206 }
207 }
208
209 pub fn try_acquire(&self) -> CreditAcquireResult {
213 self.try_acquire_n(1)
214 }
215
216 pub fn try_acquire_n(&self, n: usize) -> CreditAcquireResult {
218 let n = i64::try_from(n).unwrap_or(i64::MAX);
219
220 let mut current = self.available.load(Ordering::Acquire);
222 loop {
223 if current < n {
224 self.metrics.record_blocked();
226 return match self.config.overflow_strategy {
227 OverflowStrategy::Drop => {
228 self.metrics.record_dropped(u64::try_from(n).unwrap_or(0));
229 CreditAcquireResult::Dropped
230 }
231 OverflowStrategy::Block | OverflowStrategy::Error => {
233 CreditAcquireResult::WouldBlock
234 }
235 };
236 }
237
238 match self.available.compare_exchange_weak(
239 current,
240 current - n,
241 Ordering::AcqRel,
242 Ordering::Acquire,
243 ) {
244 Ok(_) => {
245 self.metrics.record_acquired(u64::try_from(n).unwrap_or(0));
246 return CreditAcquireResult::Acquired;
247 }
248 Err(actual) => current = actual,
249 }
250 }
251 }
252
253 pub fn acquire_blocking(&self, n: usize) {
258 let mut attempt = 0u32;
259 loop {
260 match self.try_acquire_n(n) {
261 CreditAcquireResult::Acquired | CreditAcquireResult::Dropped => return,
263 CreditAcquireResult::WouldBlock => {
264 if attempt < 64 {
266 std::hint::spin_loop();
267 } else if attempt < 128 {
268 std::thread::yield_now();
269 } else {
270 std::thread::park_timeout(std::time::Duration::from_micros(50));
271 }
272 attempt = attempt.saturating_add(1);
273 }
274 }
275 }
276 }
277
278 pub fn release(&self, n: usize) {
282 let n = i64::try_from(n).unwrap_or(i64::MAX);
283 let prev = self.available.fetch_add(n, Ordering::Release);
284
285 let new_val = prev + n;
287 if new_val > {
288 #[allow(clippy::cast_possible_wrap)]
289 let max = self.max_credits as i64;
290 max
291 } {
292 let _ = self.available.compare_exchange(
294 new_val,
295 {
296 #[allow(clippy::cast_possible_wrap)]
297 let max = self.max_credits as i64;
298 max
299 },
300 Ordering::AcqRel,
301 Ordering::Relaxed,
302 );
303 }
304
305 self.metrics.record_released(u64::try_from(n).unwrap_or(0));
306 }
307
308 #[must_use]
310 pub fn available(&self) -> usize {
311 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
312 let val = self.available.load(Ordering::Relaxed).max(0) as usize;
314 val
315 }
316
317 #[must_use]
319 pub fn max_credits(&self) -> usize {
320 self.max_credits
321 }
322
323 #[must_use]
325 pub fn is_backpressured(&self) -> bool {
326 #[allow(clippy::cast_precision_loss)] let available = self.available() as f64;
328 #[allow(clippy::cast_precision_loss)] let max = self.max_credits as f64;
330 (available / max) < (1.0 - self.config.high_watermark)
331 }
332
333 #[must_use]
335 pub fn is_recovered(&self) -> bool {
336 #[allow(clippy::cast_precision_loss)] let available = self.available() as f64;
338 #[allow(clippy::cast_precision_loss)] let max = self.max_credits as f64;
340 (available / max) >= (1.0 - self.config.low_watermark)
341 }
342
343 #[must_use]
345 pub fn config(&self) -> &BackpressureConfig {
346 &self.config
347 }
348
349 #[must_use]
351 pub fn metrics(&self) -> &CreditMetrics {
352 &self.metrics
353 }
354
355 pub fn reset(&self) {
357 #[allow(clippy::cast_possible_wrap)] self.available
359 .store(self.max_credits as i64, Ordering::Release);
360 self.metrics.reset();
361 }
362}
363
364#[derive(Debug)]
366pub struct CreditMetrics {
367 credits_acquired: AtomicU64,
369 credits_released: AtomicU64,
371 times_blocked: AtomicU64,
373 items_dropped: AtomicU64,
375}
376
377impl CreditMetrics {
378 fn new() -> Self {
380 Self {
381 credits_acquired: AtomicU64::new(0),
382 credits_released: AtomicU64::new(0),
383 times_blocked: AtomicU64::new(0),
384 items_dropped: AtomicU64::new(0),
385 }
386 }
387
388 fn record_acquired(&self, n: u64) {
389 self.credits_acquired.fetch_add(n, Ordering::Relaxed);
390 }
391
392 fn record_released(&self, n: u64) {
393 self.credits_released.fetch_add(n, Ordering::Relaxed);
394 }
395
396 fn record_blocked(&self) {
397 self.times_blocked.fetch_add(1, Ordering::Relaxed);
398 }
399
400 fn record_dropped(&self, n: u64) {
401 self.items_dropped.fetch_add(n, Ordering::Relaxed);
402 }
403
404 fn reset(&self) {
406 self.credits_acquired.store(0, Ordering::Relaxed);
407 self.credits_released.store(0, Ordering::Relaxed);
408 self.times_blocked.store(0, Ordering::Relaxed);
409 self.items_dropped.store(0, Ordering::Relaxed);
410 }
411
412 #[must_use]
414 pub fn credits_acquired(&self) -> u64 {
415 self.credits_acquired.load(Ordering::Relaxed)
416 }
417
418 #[must_use]
420 pub fn credits_released(&self) -> u64 {
421 self.credits_released.load(Ordering::Relaxed)
422 }
423
424 #[must_use]
426 pub fn times_blocked(&self) -> u64 {
427 self.times_blocked.load(Ordering::Relaxed)
428 }
429
430 #[must_use]
432 pub fn items_dropped(&self) -> u64 {
433 self.items_dropped.load(Ordering::Relaxed)
434 }
435
436 #[must_use]
438 pub fn snapshot(&self) -> CreditMetricsSnapshot {
439 CreditMetricsSnapshot {
440 credits_acquired: self.credits_acquired(),
441 credits_released: self.credits_released(),
442 times_blocked: self.times_blocked(),
443 items_dropped: self.items_dropped(),
444 }
445 }
446}
447
448#[derive(Debug, Clone, Copy)]
450pub struct CreditMetricsSnapshot {
451 pub credits_acquired: u64,
453 pub credits_released: u64,
455 pub times_blocked: u64,
457 pub items_dropped: u64,
459}
460
461impl CreditMetricsSnapshot {
462 #[allow(clippy::cast_possible_wrap)] #[must_use]
465 pub fn credits_in_flight(&self) -> i64 {
466 (self.credits_acquired as i64).saturating_sub(self.credits_released as i64)
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_credit_gate_basic() {
476 let config = BackpressureConfig {
477 exclusive_credits: 2,
478 floating_credits: 2,
479 overflow_strategy: OverflowStrategy::Block,
480 ..Default::default()
481 };
482 let gate = CreditGate::new(config);
483
484 assert_eq!(gate.available(), 4);
485 assert_eq!(gate.max_credits(), 4);
486
487 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
489 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
490 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
491 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
492 assert_eq!(gate.available(), 0);
493
494 assert_eq!(gate.try_acquire(), CreditAcquireResult::WouldBlock);
496
497 gate.release(2);
499 assert_eq!(gate.available(), 2);
500
501 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
503 assert_eq!(gate.available(), 1);
504 }
505
506 #[test]
507 fn test_credit_gate_drop_strategy() {
508 let config = BackpressureConfig {
509 exclusive_credits: 1,
510 floating_credits: 0,
511 overflow_strategy: OverflowStrategy::Drop,
512 ..Default::default()
513 };
514 let gate = CreditGate::new(config);
515
516 assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
517 assert_eq!(gate.try_acquire(), CreditAcquireResult::Dropped);
518
519 assert_eq!(gate.metrics().items_dropped(), 1);
520 }
521
522 #[test]
523 fn test_credit_gate_batch_acquire() {
524 let config = BackpressureConfig {
525 exclusive_credits: 4,
526 floating_credits: 4,
527 overflow_strategy: OverflowStrategy::Block,
528 ..Default::default()
529 };
530 let gate = CreditGate::new(config);
531
532 assert_eq!(gate.try_acquire_n(5), CreditAcquireResult::Acquired);
533 assert_eq!(gate.available(), 3);
534
535 assert_eq!(gate.try_acquire_n(4), CreditAcquireResult::WouldBlock);
536 assert_eq!(gate.try_acquire_n(3), CreditAcquireResult::Acquired);
537 assert_eq!(gate.available(), 0);
538 }
539
540 #[test]
541 fn test_backpressure_watermarks() {
542 let config = BackpressureConfig {
543 exclusive_credits: 10,
544 floating_credits: 0,
545 high_watermark: 0.8, low_watermark: 0.5, ..Default::default()
548 };
549 let gate = CreditGate::new(config);
550
551 assert!(!gate.is_backpressured());
553 assert!(gate.is_recovered());
554
555 for _ in 0..9 {
557 gate.try_acquire();
558 }
559 assert!(gate.is_backpressured());
560 assert!(!gate.is_recovered());
561
562 gate.release(4);
564 assert!(!gate.is_backpressured());
565 assert!(gate.is_recovered());
566 }
567
568 #[test]
569 fn test_metrics_snapshot() {
570 let config = BackpressureConfig::default();
571 let gate = CreditGate::new(config);
572
573 gate.try_acquire();
574 gate.try_acquire();
575 gate.release(1);
576
577 let snapshot = gate.metrics().snapshot();
578 assert_eq!(snapshot.credits_acquired, 2);
579 assert_eq!(snapshot.credits_released, 1);
580 assert_eq!(snapshot.credits_in_flight(), 1);
581 }
582
583 #[test]
584 fn test_config_builder() {
585 let config = BackpressureConfig::builder()
586 .exclusive_credits(8)
587 .floating_credits(16)
588 .overflow_strategy(OverflowStrategy::Drop)
589 .high_watermark(0.9)
590 .low_watermark(0.6)
591 .build()
592 .unwrap();
593
594 assert_eq!(config.exclusive_credits, 8);
595 assert_eq!(config.floating_credits, 16);
596 assert_eq!(config.overflow_strategy, OverflowStrategy::Drop);
597 assert!((config.high_watermark - 0.9).abs() < f64::EPSILON);
598 assert!((config.low_watermark - 0.6).abs() < f64::EPSILON);
599 assert_eq!(config.total_credits(), 24);
600 }
601
602 #[test]
603 fn test_concurrent_acquire_release() {
604 use std::sync::Arc;
605 use std::thread;
606
607 let config = BackpressureConfig {
608 exclusive_credits: 100,
609 floating_credits: 0,
610 overflow_strategy: OverflowStrategy::Block,
611 ..Default::default()
612 };
613 let gate = Arc::new(CreditGate::new(config));
614
615 let gate_sender = Arc::clone(&gate);
616 let gate_receiver = Arc::clone(&gate);
617
618 let sender = thread::spawn(move || {
619 let mut acquired = 0;
620 for _ in 0..1000 {
621 if gate_sender.try_acquire() == CreditAcquireResult::Acquired {
622 acquired += 1;
623 }
624 std::hint::spin_loop();
626 }
627 acquired
628 });
629
630 let receiver = thread::spawn(move || {
631 let mut released = 0;
632 for _ in 0..500 {
633 gate_receiver.release(1);
634 released += 1;
635 std::hint::spin_loop();
636 }
637 released
638 });
639
640 let acquired = sender.join().unwrap();
641 let released = receiver.join().unwrap();
642
643 assert!(acquired > 0);
645 assert_eq!(released, 500);
646 }
647}