Skip to main content

laminar_connectors/websocket/
backpressure.rs

1//! Backpressure strategies for WebSocket connectors.
2//!
3//! When the internal Ring 0 bounded channel is full and cannot accept more
4//! messages from a WebSocket source, a [`BackpressureStrategy`] determines
5//! what happens to incoming data.
6
7use serde::{Deserialize, Serialize};
8
9/// Strategy applied when the Ring 0 channel is full and cannot accept more messages.
10///
11/// WebSocket sources produce data at the rate of the remote sender. When the
12/// downstream processing pipeline cannot keep up, one of these strategies
13/// governs how the connector handles the overflow.
14///
15/// **Implementation status** (2026-03-07):
16/// - `Block`: fully implemented (TCP backpressure propagation)
17/// - `DropNewest`: implemented in `source.rs` via `try_send`
18/// - `DropOldest`, `Buffer`, `Sample`: parsed from SQL WITH, fall back to
19///   `DropNewest` behavior with a debug log. Full dispatch planned for Phase 6c.
20#[derive(Debug, Clone, Default, Serialize, Deserialize)]
21pub enum BackpressureStrategy {
22    /// Block WS read -- TCP backpressure propagates to sender.
23    ///
24    /// This is the safest option: the WebSocket read loop simply stops
25    /// reading, which causes the TCP window to fill, eventually slowing
26    /// the remote sender. No data is lost, but latency increases.
27    #[default]
28    Block,
29    /// Drop oldest messages from bounded channel when full.
30    ///
31    /// The channel evicts the oldest buffered message to make room for
32    /// each new arrival. Good for "latest value wins" use cases.
33    DropOldest,
34    /// Drop incoming message when channel full (don't enqueue).
35    ///
36    /// The newly arriving message is silently discarded. Useful when
37    /// freshness of already-buffered data matters more than completeness.
38    DropNewest,
39    /// Buffer up to `max_bytes`, then drop oldest.
40    ///
41    /// An intermediate strategy: an additional byte-bounded buffer sits
42    /// in front of the channel. Once the buffer exceeds `max_bytes`, the
43    /// oldest buffered messages are evicted.
44    Buffer {
45        /// Maximum buffer size in bytes before eviction kicks in.
46        max_bytes: usize,
47    },
48    /// Sample: keep every Nth message.
49    ///
50    /// Under sustained backpressure, only every `rate`-th message is
51    /// forwarded. All others are dropped. Useful for high-frequency
52    /// telemetry where statistical sampling is acceptable.
53    Sample {
54        /// Keep every Nth message (e.g., `rate = 10` keeps 10% of messages).
55        rate: u32,
56    },
57}
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62
63    #[test]
64    fn test_default_is_block() {
65        let strategy = BackpressureStrategy::default();
66        assert!(matches!(strategy, BackpressureStrategy::Block));
67    }
68
69    #[test]
70    fn test_debug_format() {
71        let strategy = BackpressureStrategy::Block;
72        let debug = format!("{strategy:?}");
73        assert_eq!(debug, "Block");
74    }
75
76    #[test]
77    fn test_buffer_variant() {
78        let strategy = BackpressureStrategy::Buffer {
79            max_bytes: 1_048_576,
80        };
81        if let BackpressureStrategy::Buffer { max_bytes } = strategy {
82            assert_eq!(max_bytes, 1_048_576);
83        } else {
84            panic!("expected Buffer variant");
85        }
86    }
87
88    #[test]
89    fn test_sample_variant() {
90        let strategy = BackpressureStrategy::Sample { rate: 10 };
91        if let BackpressureStrategy::Sample { rate } = strategy {
92            assert_eq!(rate, 10);
93        } else {
94            panic!("expected Sample variant");
95        }
96    }
97
98    #[test]
99    fn test_clone() {
100        let original = BackpressureStrategy::DropOldest;
101        let cloned = original.clone();
102        assert!(matches!(cloned, BackpressureStrategy::DropOldest));
103    }
104
105    #[test]
106    fn test_serde_roundtrip_block() {
107        let strategy = BackpressureStrategy::Block;
108        let json = serde_json::to_string(&strategy).unwrap();
109        let deserialized: BackpressureStrategy = serde_json::from_str(&json).unwrap();
110        assert!(matches!(deserialized, BackpressureStrategy::Block));
111    }
112
113    #[test]
114    fn test_serde_roundtrip_buffer() {
115        let strategy = BackpressureStrategy::Buffer { max_bytes: 65_536 };
116        let json = serde_json::to_string(&strategy).unwrap();
117        let deserialized: BackpressureStrategy = serde_json::from_str(&json).unwrap();
118        if let BackpressureStrategy::Buffer { max_bytes } = deserialized {
119            assert_eq!(max_bytes, 65_536);
120        } else {
121            panic!("expected Buffer variant after deserialization");
122        }
123    }
124
125    #[test]
126    fn test_serde_roundtrip_sample() {
127        let strategy = BackpressureStrategy::Sample { rate: 5 };
128        let json = serde_json::to_string(&strategy).unwrap();
129        let deserialized: BackpressureStrategy = serde_json::from_str(&json).unwrap();
130        if let BackpressureStrategy::Sample { rate } = deserialized {
131            assert_eq!(rate, 5);
132        } else {
133            panic!("expected Sample variant after deserialization");
134        }
135    }
136
137    #[test]
138    fn test_serde_roundtrip_drop_oldest() {
139        let strategy = BackpressureStrategy::DropOldest;
140        let json = serde_json::to_string(&strategy).unwrap();
141        let deserialized: BackpressureStrategy = serde_json::from_str(&json).unwrap();
142        assert!(matches!(deserialized, BackpressureStrategy::DropOldest));
143    }
144
145    #[test]
146    fn test_serde_roundtrip_drop_newest() {
147        let strategy = BackpressureStrategy::DropNewest;
148        let json = serde_json::to_string(&strategy).unwrap();
149        let deserialized: BackpressureStrategy = serde_json::from_str(&json).unwrap();
150        assert!(matches!(deserialized, BackpressureStrategy::DropNewest));
151    }
152}