Skip to main content

laminar_core/tpc/
backpressure.rs

1//! # Credit-Based Flow Control
2//!
3//! Implements credit-based backpressure similar to Apache Flink's network stack.
4//!
5//! ## How It Works
6//!
7//! ```text
8//! ┌──────────┐                      ┌──────────┐
9//! │  Sender  │                      │ Receiver │
10//! │          │<── Credits (N=4) ────│          │
11//! │          │                      │          │
12//! │          │── Data + Backlog ───>│          │
13//! │          │                      │          │
14//! │          │<── Credits (N=2) ────│          │
15//! └──────────┘                      └──────────┘
16//! ```
17//!
18//! 1. Receiver grants initial credits (buffer slots) to sender
19//! 2. Sender decrements credits when sending, includes backlog size
20//! 3. Receiver processes data and returns credits based on capacity
21//! 4. If sender has no credits, it must wait or apply overflow strategy
22//!
23//! ## Credit Types
24//!
25//! - **Exclusive credits**: Fixed per-sender, always available
26//! - **Floating credits**: Shared pool, allocated based on backlog priority
27//!
28//! This design prevents buffer overflow while maximizing throughput.
29
30// Credits are bounded by configuration to be within safe range for casting
31// Max credits: 65535 (u16::MAX) to ensure clean i64/f64 conversions
32
33use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
34
35/// Configuration for backpressure handling.
36#[derive(Debug, Clone)]
37pub struct BackpressureConfig {
38    /// Initial exclusive credits per sender (like Flink's buffers-per-channel).
39    /// These credits are always reserved for a specific sender.
40    pub exclusive_credits: usize,
41
42    /// Floating credits shared across all senders (like Flink's floating-buffers-per-gate).
43    /// Allocated dynamically based on backlog priority.
44    pub floating_credits: usize,
45
46    /// Strategy when credits are exhausted.
47    pub overflow_strategy: OverflowStrategy,
48
49    /// High watermark - when queue reaches this %, start throttling.
50    /// Value between 0.0 and 1.0.
51    pub high_watermark: f64,
52
53    /// Low watermark - when queue drops below this %, resume normal flow.
54    /// Value between 0.0 and 1.0.
55    pub low_watermark: f64,
56}
57
58impl Default for BackpressureConfig {
59    fn default() -> Self {
60        Self {
61            exclusive_credits: 4,
62            floating_credits: 8,
63            overflow_strategy: OverflowStrategy::Block,
64            high_watermark: 0.8,
65            low_watermark: 0.5,
66        }
67    }
68}
69
70impl BackpressureConfig {
71    /// Creates a new configuration builder.
72    #[must_use]
73    pub fn builder() -> BackpressureConfigBuilder {
74        BackpressureConfigBuilder::default()
75    }
76
77    /// Total credits available (exclusive + floating).
78    #[must_use]
79    pub fn total_credits(&self) -> usize {
80        self.exclusive_credits + self.floating_credits
81    }
82}
83
84/// Builder for `BackpressureConfig`.
85#[derive(Debug, Default)]
86pub struct BackpressureConfigBuilder {
87    exclusive_credits: Option<usize>,
88    floating_credits: Option<usize>,
89    overflow_strategy: Option<OverflowStrategy>,
90    high_watermark: Option<f64>,
91    low_watermark: Option<f64>,
92}
93
94impl BackpressureConfigBuilder {
95    /// Sets exclusive credits per sender.
96    #[must_use]
97    pub fn exclusive_credits(mut self, credits: usize) -> Self {
98        self.exclusive_credits = Some(credits);
99        self
100    }
101
102    /// Sets floating credits (shared pool).
103    #[must_use]
104    pub fn floating_credits(mut self, credits: usize) -> Self {
105        self.floating_credits = Some(credits);
106        self
107    }
108
109    /// Sets the overflow strategy.
110    #[must_use]
111    pub fn overflow_strategy(mut self, strategy: OverflowStrategy) -> Self {
112        self.overflow_strategy = Some(strategy);
113        self
114    }
115
116    /// Sets the high watermark (0.0 to 1.0).
117    #[must_use]
118    pub fn high_watermark(mut self, watermark: f64) -> Self {
119        self.high_watermark = Some(watermark.clamp(0.0, 1.0));
120        self
121    }
122
123    /// Sets the low watermark (0.0 to 1.0).
124    #[must_use]
125    pub fn low_watermark(mut self, watermark: f64) -> Self {
126        self.low_watermark = Some(watermark.clamp(0.0, 1.0));
127        self
128    }
129
130    /// Builds the configuration.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if `high_watermark < low_watermark`.
135    pub fn build(self) -> std::result::Result<BackpressureConfig, String> {
136        let high = self.high_watermark.unwrap_or(0.8);
137        let low = self.low_watermark.unwrap_or(0.5);
138        if high < low {
139            return Err(format!(
140                "high_watermark ({high}) must be >= low_watermark ({low})"
141            ));
142        }
143        Ok(BackpressureConfig {
144            exclusive_credits: self.exclusive_credits.unwrap_or(4).min(u16::MAX as usize),
145            floating_credits: self.floating_credits.unwrap_or(8).min(u16::MAX as usize),
146            overflow_strategy: self.overflow_strategy.unwrap_or(OverflowStrategy::Block),
147            high_watermark: high,
148            low_watermark: low,
149        })
150    }
151}
152
153/// Strategy for handling overflow when credits are exhausted.
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum OverflowStrategy {
156    /// Block the sender until credits become available.
157    /// Best for exactly-once semantics where no data loss is acceptable.
158    Block,
159
160    /// Drop the data and record metrics.
161    /// Best for best-effort streams where latency matters more than completeness.
162    Drop,
163
164    /// Return an error immediately without blocking or dropping.
165    /// Caller decides what to do.
166    Error,
167}
168
169/// Result of attempting to acquire credits.
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum CreditAcquireResult {
172    /// Credits acquired successfully.
173    Acquired,
174    /// No credits available, would need to wait.
175    WouldBlock,
176    /// Credits exhausted and overflow strategy is Drop.
177    Dropped,
178}
179
180/// Manages credits for a single receiver (core).
181///
182/// Thread-safe for use between sender and receiver threads.
183#[derive(Debug)]
184pub struct CreditGate {
185    /// Available credits (can go negative temporarily during high contention).
186    available: AtomicI64,
187    /// Maximum credits (exclusive + floating).
188    max_credits: usize,
189    /// Configuration.
190    config: BackpressureConfig,
191    /// Metrics.
192    metrics: CreditMetrics,
193}
194
195impl CreditGate {
196    /// Creates a new credit gate with the given configuration.
197    #[must_use]
198    pub fn new(config: BackpressureConfig) -> Self {
199        let max_credits = config.total_credits();
200        Self {
201            #[allow(clippy::cast_possible_wrap)] // Safe: bounded by u16::MAX in build()
202            available: AtomicI64::new(max_credits as i64),
203            max_credits,
204            config,
205            metrics: CreditMetrics::new(),
206        }
207    }
208
209    /// Attempts to acquire one credit.
210    ///
211    /// Returns the result based on the overflow strategy.
212    pub fn try_acquire(&self) -> CreditAcquireResult {
213        self.try_acquire_n(1)
214    }
215
216    /// Attempts to acquire N credits.
217    pub fn try_acquire_n(&self, n: usize) -> CreditAcquireResult {
218        let n = i64::try_from(n).unwrap_or(i64::MAX);
219
220        // Try to acquire credits atomically
221        let mut current = self.available.load(Ordering::Acquire);
222        loop {
223            if current < n {
224                // Not enough credits
225                self.metrics.record_blocked();
226                return match self.config.overflow_strategy {
227                    OverflowStrategy::Drop => {
228                        self.metrics.record_dropped(u64::try_from(n).unwrap_or(0));
229                        CreditAcquireResult::Dropped
230                    }
231                    // Both Block and Error return WouldBlock - caller handles differently
232                    OverflowStrategy::Block | OverflowStrategy::Error => {
233                        CreditAcquireResult::WouldBlock
234                    }
235                };
236            }
237
238            match self.available.compare_exchange_weak(
239                current,
240                current - n,
241                Ordering::AcqRel,
242                Ordering::Acquire,
243            ) {
244                Ok(_) => {
245                    self.metrics.record_acquired(u64::try_from(n).unwrap_or(0));
246                    return CreditAcquireResult::Acquired;
247                }
248                Err(actual) => current = actual,
249            }
250        }
251    }
252
253    /// Acquires credits, blocking with progressive backoff until available.
254    ///
255    /// Only use when `OverflowStrategy::Block` is configured.
256    /// Uses spin → yield → park progression to balance latency vs CPU waste.
257    pub fn acquire_blocking(&self, n: usize) {
258        let mut attempt = 0u32;
259        loop {
260            match self.try_acquire_n(n) {
261                // Success or dropped (shouldn't happen with Block strategy)
262                CreditAcquireResult::Acquired | CreditAcquireResult::Dropped => return,
263                CreditAcquireResult::WouldBlock => {
264                    // Progressive backoff: spin → yield → park
265                    if attempt < 64 {
266                        std::hint::spin_loop();
267                    } else if attempt < 128 {
268                        std::thread::yield_now();
269                    } else {
270                        std::thread::park_timeout(std::time::Duration::from_micros(50));
271                    }
272                    attempt = attempt.saturating_add(1);
273                }
274            }
275        }
276    }
277
278    /// Releases credits back to the pool.
279    ///
280    /// Called by receiver after processing data.
281    pub fn release(&self, n: usize) {
282        let n = i64::try_from(n).unwrap_or(i64::MAX);
283        let prev = self.available.fetch_add(n, Ordering::Release);
284
285        // Clamp to max (in case of over-release)
286        let new_val = prev + n;
287        if new_val > {
288            #[allow(clippy::cast_possible_wrap)]
289            let max = self.max_credits as i64;
290            max
291        } {
292            // Try to correct, but don't worry if it fails (another thread may have acquired)
293            let _ = self.available.compare_exchange(
294                new_val,
295                {
296                    #[allow(clippy::cast_possible_wrap)]
297                    let max = self.max_credits as i64;
298                    max
299                },
300                Ordering::AcqRel,
301                Ordering::Relaxed,
302            );
303        }
304
305        self.metrics.record_released(u64::try_from(n).unwrap_or(0));
306    }
307
308    /// Returns the number of available credits.
309    #[must_use]
310    pub fn available(&self) -> usize {
311        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
312        // Safe: load().max(0) is >= 0
313        let val = self.available.load(Ordering::Relaxed).max(0) as usize;
314        val
315    }
316
317    /// Returns the maximum credits.
318    #[must_use]
319    pub fn max_credits(&self) -> usize {
320        self.max_credits
321    }
322
323    /// Returns true if backpressure is active (credits below threshold).
324    #[must_use]
325    pub fn is_backpressured(&self) -> bool {
326        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio
327        let available = self.available() as f64;
328        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio calculation
329        let max = self.max_credits as f64;
330        (available / max) < (1.0 - self.config.high_watermark)
331    }
332
333    /// Returns true if backpressure has cleared (credits above low watermark).
334    #[must_use]
335    pub fn is_recovered(&self) -> bool {
336        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio
337        let available = self.available() as f64;
338        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio calculation
339        let max = self.max_credits as f64;
340        (available / max) >= (1.0 - self.config.low_watermark)
341    }
342
343    /// Returns the configuration.
344    #[must_use]
345    pub fn config(&self) -> &BackpressureConfig {
346        &self.config
347    }
348
349    /// Returns the metrics.
350    #[must_use]
351    pub fn metrics(&self) -> &CreditMetrics {
352        &self.metrics
353    }
354
355    /// Resets the gate to initial state.
356    pub fn reset(&self) {
357        #[allow(clippy::cast_possible_wrap)] // Safe: max_credits bounded to u16::MAX
358        self.available
359            .store(self.max_credits as i64, Ordering::Release);
360        self.metrics.reset();
361    }
362}
363
364/// Metrics for credit-based flow control.
365#[derive(Debug)]
366pub struct CreditMetrics {
367    /// Total credits acquired.
368    credits_acquired: AtomicU64,
369    /// Total credits released.
370    credits_released: AtomicU64,
371    /// Times sender was blocked due to no credits.
372    times_blocked: AtomicU64,
373    /// Items dropped due to overflow.
374    items_dropped: AtomicU64,
375}
376
377impl CreditMetrics {
378    /// Creates new metrics.
379    fn new() -> Self {
380        Self {
381            credits_acquired: AtomicU64::new(0),
382            credits_released: AtomicU64::new(0),
383            times_blocked: AtomicU64::new(0),
384            items_dropped: AtomicU64::new(0),
385        }
386    }
387
388    fn record_acquired(&self, n: u64) {
389        self.credits_acquired.fetch_add(n, Ordering::Relaxed);
390    }
391
392    fn record_released(&self, n: u64) {
393        self.credits_released.fetch_add(n, Ordering::Relaxed);
394    }
395
396    fn record_blocked(&self) {
397        self.times_blocked.fetch_add(1, Ordering::Relaxed);
398    }
399
400    fn record_dropped(&self, n: u64) {
401        self.items_dropped.fetch_add(n, Ordering::Relaxed);
402    }
403
404    /// Resets all metrics.
405    fn reset(&self) {
406        self.credits_acquired.store(0, Ordering::Relaxed);
407        self.credits_released.store(0, Ordering::Relaxed);
408        self.times_blocked.store(0, Ordering::Relaxed);
409        self.items_dropped.store(0, Ordering::Relaxed);
410    }
411
412    /// Returns total credits acquired.
413    #[must_use]
414    pub fn credits_acquired(&self) -> u64 {
415        self.credits_acquired.load(Ordering::Relaxed)
416    }
417
418    /// Returns total credits released.
419    #[must_use]
420    pub fn credits_released(&self) -> u64 {
421        self.credits_released.load(Ordering::Relaxed)
422    }
423
424    /// Returns times sender was blocked.
425    #[must_use]
426    pub fn times_blocked(&self) -> u64 {
427        self.times_blocked.load(Ordering::Relaxed)
428    }
429
430    /// Returns items dropped due to overflow.
431    #[must_use]
432    pub fn items_dropped(&self) -> u64 {
433        self.items_dropped.load(Ordering::Relaxed)
434    }
435
436    /// Returns a snapshot of all metrics.
437    #[must_use]
438    pub fn snapshot(&self) -> CreditMetricsSnapshot {
439        CreditMetricsSnapshot {
440            credits_acquired: self.credits_acquired(),
441            credits_released: self.credits_released(),
442            times_blocked: self.times_blocked(),
443            items_dropped: self.items_dropped(),
444        }
445    }
446}
447
448/// Snapshot of credit metrics for reporting.
449#[derive(Debug, Clone, Copy)]
450pub struct CreditMetricsSnapshot {
451    /// Total credits acquired.
452    pub credits_acquired: u64,
453    /// Total credits released.
454    pub credits_released: u64,
455    /// Times sender was blocked.
456    pub times_blocked: u64,
457    /// Items dropped due to overflow.
458    pub items_dropped: u64,
459}
460
461impl CreditMetricsSnapshot {
462    /// Returns credits currently in flight (acquired - released).
463    #[allow(clippy::cast_possible_wrap)] // Metrics are u64, difference fits in i64 unless skewed
464    #[must_use]
465    pub fn credits_in_flight(&self) -> i64 {
466        (self.credits_acquired as i64).saturating_sub(self.credits_released as i64)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_credit_gate_basic() {
476        let config = BackpressureConfig {
477            exclusive_credits: 2,
478            floating_credits: 2,
479            overflow_strategy: OverflowStrategy::Block,
480            ..Default::default()
481        };
482        let gate = CreditGate::new(config);
483
484        assert_eq!(gate.available(), 4);
485        assert_eq!(gate.max_credits(), 4);
486
487        // Acquire all credits
488        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
489        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
490        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
491        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
492        assert_eq!(gate.available(), 0);
493
494        // Should block now
495        assert_eq!(gate.try_acquire(), CreditAcquireResult::WouldBlock);
496
497        // Release some
498        gate.release(2);
499        assert_eq!(gate.available(), 2);
500
501        // Can acquire again
502        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
503        assert_eq!(gate.available(), 1);
504    }
505
506    #[test]
507    fn test_credit_gate_drop_strategy() {
508        let config = BackpressureConfig {
509            exclusive_credits: 1,
510            floating_credits: 0,
511            overflow_strategy: OverflowStrategy::Drop,
512            ..Default::default()
513        };
514        let gate = CreditGate::new(config);
515
516        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
517        assert_eq!(gate.try_acquire(), CreditAcquireResult::Dropped);
518
519        assert_eq!(gate.metrics().items_dropped(), 1);
520    }
521
522    #[test]
523    fn test_credit_gate_batch_acquire() {
524        let config = BackpressureConfig {
525            exclusive_credits: 4,
526            floating_credits: 4,
527            overflow_strategy: OverflowStrategy::Block,
528            ..Default::default()
529        };
530        let gate = CreditGate::new(config);
531
532        assert_eq!(gate.try_acquire_n(5), CreditAcquireResult::Acquired);
533        assert_eq!(gate.available(), 3);
534
535        assert_eq!(gate.try_acquire_n(4), CreditAcquireResult::WouldBlock);
536        assert_eq!(gate.try_acquire_n(3), CreditAcquireResult::Acquired);
537        assert_eq!(gate.available(), 0);
538    }
539
540    #[test]
541    fn test_backpressure_watermarks() {
542        let config = BackpressureConfig {
543            exclusive_credits: 10,
544            floating_credits: 0,
545            high_watermark: 0.8, // Backpressure when <20% available
546            low_watermark: 0.5,  // Recovered when >50% available
547            ..Default::default()
548        };
549        let gate = CreditGate::new(config);
550
551        // Initially not backpressured
552        assert!(!gate.is_backpressured());
553        assert!(gate.is_recovered());
554
555        // Use 9 credits (10% available) - should be backpressured
556        for _ in 0..9 {
557            gate.try_acquire();
558        }
559        assert!(gate.is_backpressured());
560        assert!(!gate.is_recovered());
561
562        // Release 4 (50% available) - should be recovered
563        gate.release(4);
564        assert!(!gate.is_backpressured());
565        assert!(gate.is_recovered());
566    }
567
568    #[test]
569    fn test_metrics_snapshot() {
570        let config = BackpressureConfig::default();
571        let gate = CreditGate::new(config);
572
573        gate.try_acquire();
574        gate.try_acquire();
575        gate.release(1);
576
577        let snapshot = gate.metrics().snapshot();
578        assert_eq!(snapshot.credits_acquired, 2);
579        assert_eq!(snapshot.credits_released, 1);
580        assert_eq!(snapshot.credits_in_flight(), 1);
581    }
582
583    #[test]
584    fn test_config_builder() {
585        let config = BackpressureConfig::builder()
586            .exclusive_credits(8)
587            .floating_credits(16)
588            .overflow_strategy(OverflowStrategy::Drop)
589            .high_watermark(0.9)
590            .low_watermark(0.6)
591            .build()
592            .unwrap();
593
594        assert_eq!(config.exclusive_credits, 8);
595        assert_eq!(config.floating_credits, 16);
596        assert_eq!(config.overflow_strategy, OverflowStrategy::Drop);
597        assert!((config.high_watermark - 0.9).abs() < f64::EPSILON);
598        assert!((config.low_watermark - 0.6).abs() < f64::EPSILON);
599        assert_eq!(config.total_credits(), 24);
600    }
601
602    #[test]
603    fn test_concurrent_acquire_release() {
604        use std::sync::Arc;
605        use std::thread;
606
607        let config = BackpressureConfig {
608            exclusive_credits: 100,
609            floating_credits: 0,
610            overflow_strategy: OverflowStrategy::Block,
611            ..Default::default()
612        };
613        let gate = Arc::new(CreditGate::new(config));
614
615        let gate_sender = Arc::clone(&gate);
616        let gate_receiver = Arc::clone(&gate);
617
618        let sender = thread::spawn(move || {
619            let mut acquired = 0;
620            for _ in 0..1000 {
621                if gate_sender.try_acquire() == CreditAcquireResult::Acquired {
622                    acquired += 1;
623                }
624                // Simulate some work
625                std::hint::spin_loop();
626            }
627            acquired
628        });
629
630        let receiver = thread::spawn(move || {
631            let mut released = 0;
632            for _ in 0..500 {
633                gate_receiver.release(1);
634                released += 1;
635                std::hint::spin_loop();
636            }
637            released
638        });
639
640        let acquired = sender.join().unwrap();
641        let released = receiver.join().unwrap();
642
643        // Should have acquired some (up to 100 initial + 500 released)
644        assert!(acquired > 0);
645        assert_eq!(released, 500);
646    }
647}