Skip to main content

laminar_core/compiler/
policy.rs

1//! Batching and backpressure policies for the Ring 0 / Ring 1 pipeline bridge.
2//!
3//! [`BatchPolicy`] controls when the [`BridgeConsumer`](super::pipeline_bridge::BridgeConsumer)
4//! flushes accumulated rows into a `RecordBatch`. [`BackpressureStrategy`] determines how the
5//! [`PipelineBridge`](super::pipeline_bridge::PipelineBridge) handles a full SPSC queue.
6
7use std::time::Duration;
8
9/// Controls when the bridge consumer flushes accumulated rows into a `RecordBatch`.
10///
11/// The consumer flushes when **any** of these conditions is met:
12/// - The row count reaches [`max_rows`](Self::max_rows).
13/// - The time since the first row exceeds [`max_latency`](Self::max_latency).
14/// - A watermark message arrives and [`flush_on_watermark`](Self::flush_on_watermark) is `true`.
15/// - A checkpoint barrier arrives (always flushes).
16/// - An EOF message arrives (always flushes).
17#[derive(Debug, Clone)]
18pub struct BatchPolicy {
19    /// Maximum number of rows before a flush.
20    pub max_rows: usize,
21    /// Maximum time a row may wait before flushing.
22    pub max_latency: Duration,
23    /// Whether a watermark advance triggers a flush of pending rows.
24    ///
25    /// When `true` (the default), rows accumulated before a watermark are flushed
26    /// as a batch *before* the watermark is forwarded to Ring 1. This prevents
27    /// partial-batch emissions tied to arbitrary batch boundaries (Issue #55).
28    pub flush_on_watermark: bool,
29}
30
31impl Default for BatchPolicy {
32    fn default() -> Self {
33        Self {
34            max_rows: 1024,
35            max_latency: Duration::from_millis(10),
36            flush_on_watermark: true,
37        }
38    }
39}
40
41impl BatchPolicy {
42    /// Sets the maximum row count before a flush.
43    #[must_use]
44    pub const fn with_max_rows(mut self, max_rows: usize) -> Self {
45        self.max_rows = max_rows;
46        self
47    }
48
49    /// Sets the maximum latency before a flush.
50    #[must_use]
51    pub const fn with_max_latency(mut self, max_latency: Duration) -> Self {
52        self.max_latency = max_latency;
53        self
54    }
55
56    /// Sets whether watermark advances flush pending rows.
57    #[must_use]
58    pub const fn with_flush_on_watermark(mut self, flush_on_watermark: bool) -> Self {
59        self.flush_on_watermark = flush_on_watermark;
60        self
61    }
62}
63
64/// How the bridge producer handles a full SPSC queue.
65#[derive(Debug, Clone, Default)]
66pub enum BackpressureStrategy {
67    /// Drop the newest event and increment a drop counter (best-effort delivery).
68    #[default]
69    DropNewest,
70    /// Signal the upstream source to pause until capacity is available (exactly-once).
71    PauseSource,
72    /// Spill events to the WAL on disk up to the given byte limit (burst-tolerant).
73    SpillToDisk {
74        /// Maximum number of bytes to spill before applying backpressure upstream.
75        max_spill_bytes: usize,
76    },
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82
83    #[test]
84    fn batch_policy_defaults() {
85        let policy = BatchPolicy::default();
86        assert_eq!(policy.max_rows, 1024);
87        assert_eq!(policy.max_latency, Duration::from_millis(10));
88        assert!(policy.flush_on_watermark);
89    }
90
91    #[test]
92    fn batch_policy_builder() {
93        let policy = BatchPolicy::default()
94            .with_max_rows(512)
95            .with_max_latency(Duration::from_millis(5))
96            .with_flush_on_watermark(false);
97
98        assert_eq!(policy.max_rows, 512);
99        assert_eq!(policy.max_latency, Duration::from_millis(5));
100        assert!(!policy.flush_on_watermark);
101    }
102
103    #[test]
104    fn backpressure_strategy_default() {
105        let strategy = BackpressureStrategy::default();
106        assert!(matches!(strategy, BackpressureStrategy::DropNewest));
107    }
108
109    #[test]
110    fn backpressure_strategy_spill() {
111        let strategy = BackpressureStrategy::SpillToDisk {
112            max_spill_bytes: 1024 * 1024,
113        };
114        match strategy {
115            BackpressureStrategy::SpillToDisk { max_spill_bytes } => {
116                assert_eq!(max_spill_bytes, 1024 * 1024);
117            }
118            _ => panic!("expected SpillToDisk"),
119        }
120    }
121
122    #[test]
123    fn batch_policy_debug() {
124        let policy = BatchPolicy::default();
125        let debug = format!("{policy:?}");
126        assert!(debug.contains("BatchPolicy"));
127        assert!(debug.contains("max_rows"));
128        assert!(debug.contains("1024"));
129    }
130
131    #[test]
132    fn batch_policy_clone() {
133        let original = BatchPolicy::default().with_max_rows(256);
134        let cloned = original.clone();
135        assert_eq!(cloned.max_rows, 256);
136        assert_eq!(cloned.max_latency, original.max_latency);
137        assert_eq!(cloned.flush_on_watermark, original.flush_on_watermark);
138    }
139}