Skip to main content

laminar_core/subscription/
batcher.rs

1//! Notification batching for throughput optimization.
2//!
3//! Accumulates multiple [`ChangeEvent`]s before delivering them as a
4//! [`ChangeEventBatch`], reducing per-event dispatch overhead for
5//! high-throughput scenarios.
6//!
7//! # Flush Triggers
8//!
9//! A batch is flushed when either:
10//! - The batch reaches `max_batch_size` events (size trigger)
11//! - The time since the last flush exceeds `max_batch_delay` (time trigger)
12//!
13//! When batching is disabled (`enabled: false`), events pass through
14//! immediately as single-event batches.
15#![deny(clippy::disallowed_types)]
16
17use std::time::{Duration, Instant};
18
19use rustc_hash::FxHashMap;
20
21use crate::subscription::event::{ChangeEvent, ChangeEventBatch};
22
23// ---------------------------------------------------------------------------
24// BatchConfig
25// ---------------------------------------------------------------------------
26
27/// Configuration for notification batching.
28#[derive(Debug, Clone)]
29pub struct BatchConfig {
30    /// Maximum events per batch before flushing.
31    pub max_batch_size: usize,
32    /// Maximum time to wait for a batch to fill before flushing.
33    pub max_batch_delay: Duration,
34    /// Whether batching is enabled. When `false`, events pass through
35    /// immediately as single-event batches.
36    pub enabled: bool,
37}
38
39impl Default for BatchConfig {
40    fn default() -> Self {
41        Self {
42            max_batch_size: 64,
43            max_batch_delay: Duration::from_micros(100),
44            enabled: false,
45        }
46    }
47}
48
49// ---------------------------------------------------------------------------
50// NotificationBatcher
51// ---------------------------------------------------------------------------
52
53/// Accumulates events per source and flushes as batches.
54///
55/// Used by the Ring 1 dispatcher to reduce per-event overhead when
56/// delivering to subscribers. Each source has its own buffer, so
57/// events from different sources are never mixed.
58pub struct NotificationBatcher {
59    /// Buffered events per `source_id`.
60    buffers: FxHashMap<u32, Vec<ChangeEvent>>,
61    /// Last flush time per `source_id`.
62    last_flush: FxHashMap<u32, Instant>,
63    /// Configuration.
64    config: BatchConfig,
65}
66
67impl NotificationBatcher {
68    /// Creates a new batcher with the given configuration.
69    #[must_use]
70    pub fn new(config: BatchConfig) -> Self {
71        Self {
72            buffers: FxHashMap::default(),
73            last_flush: FxHashMap::default(),
74            config,
75        }
76    }
77
78    /// Adds an event to the batcher.
79    ///
80    /// Returns `Some(ChangeEventBatch)` if the batch is ready to deliver
81    /// (size or time trigger), or `None` if the event was buffered.
82    ///
83    /// When batching is disabled, always returns a single-event batch.
84    pub fn add(
85        &mut self,
86        source_id: u32,
87        source_name: &str,
88        event: ChangeEvent,
89    ) -> Option<ChangeEventBatch> {
90        if !self.config.enabled {
91            let seq = event.sequence().unwrap_or(0);
92            return Some(ChangeEventBatch::new(
93                source_name.to_string(),
94                vec![event],
95                seq,
96                seq,
97            ));
98        }
99
100        let buffer = self.buffers.entry(source_id).or_default();
101        buffer.push(event);
102
103        let now = Instant::now();
104        let last = self.last_flush.entry(source_id).or_insert(now);
105
106        if buffer.len() >= self.config.max_batch_size
107            || now.duration_since(*last) >= self.config.max_batch_delay
108        {
109            *last = now;
110            let events = std::mem::take(buffer);
111            let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
112            let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
113            Some(ChangeEventBatch::new(
114                source_name.to_string(),
115                events,
116                first_seq,
117                last_seq,
118            ))
119        } else {
120            None
121        }
122    }
123
124    /// Forces flush of all pending batches.
125    ///
126    /// Returns a vec of `(source_id, batch)` for all non-empty buffers.
127    pub fn flush_all(&mut self) -> Vec<(u32, ChangeEventBatch)> {
128        let mut results = Vec::new();
129
130        for (&source_id, buffer) in &mut self.buffers {
131            if !buffer.is_empty() {
132                let events = std::mem::take(buffer);
133                let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
134                let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
135                results.push((
136                    source_id,
137                    ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
138                ));
139                self.last_flush.insert(source_id, Instant::now());
140            }
141        }
142
143        results
144    }
145
146    /// Flushes batches that have exceeded the `max_batch_delay`.
147    ///
148    /// Returns a vec of `(source_id, batch)` for expired buffers.
149    pub fn flush_expired(&mut self) -> Vec<(u32, ChangeEventBatch)> {
150        let now = Instant::now();
151        let mut results = Vec::new();
152
153        let expired: Vec<u32> = self
154            .buffers
155            .iter()
156            .filter(|(_, buf)| !buf.is_empty())
157            .filter(|(id, _)| {
158                let last = self.last_flush.get(id).copied().unwrap_or(now);
159                now.duration_since(last) >= self.config.max_batch_delay
160            })
161            .map(|(&id, _)| id)
162            .collect();
163
164        for source_id in expired {
165            if let Some(buffer) = self.buffers.get_mut(&source_id) {
166                if !buffer.is_empty() {
167                    let events = std::mem::take(buffer);
168                    let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
169                    let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
170                    self.last_flush.insert(source_id, now);
171                    results.push((
172                        source_id,
173                        ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
174                    ));
175                }
176            }
177        }
178
179        results
180    }
181
182    /// Returns the number of buffered events across all sources.
183    #[must_use]
184    pub fn buffered_count(&self) -> usize {
185        self.buffers.values().map(Vec::len).sum()
186    }
187
188    /// Returns the configuration.
189    #[must_use]
190    pub fn config(&self) -> &BatchConfig {
191        &self.config
192    }
193}
194
195// ===========================================================================
196// Tests
197// ===========================================================================
198
199#[cfg(test)]
200#[allow(clippy::cast_possible_wrap)]
201mod tests {
202    use super::*;
203    use arrow_array::{Int64Array, RecordBatch};
204    use arrow_schema::{DataType, Field, Schema};
205    use std::sync::Arc;
206
207    fn make_event(seq: u64) -> ChangeEvent {
208        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
209        #[allow(clippy::cast_possible_wrap)]
210        let array = Int64Array::from(vec![seq as i64]);
211        let batch = Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap());
212        #[allow(clippy::cast_possible_wrap)]
213        ChangeEvent::insert(batch, 1000 + seq as i64, seq)
214    }
215
216    #[test]
217    fn test_batcher_immediate_when_disabled() {
218        let config = BatchConfig {
219            enabled: false,
220            ..Default::default()
221        };
222        let mut batcher = NotificationBatcher::new(config);
223
224        let result = batcher.add(0, "mv_a", make_event(1));
225        assert!(result.is_some());
226        let batch = result.unwrap();
227        assert_eq!(batch.len(), 1);
228        assert_eq!(batch.first_sequence, 1);
229        assert_eq!(batch.last_sequence, 1);
230        assert_eq!(batch.source, "mv_a");
231    }
232
233    #[test]
234    fn test_batcher_size_trigger() {
235        let config = BatchConfig {
236            max_batch_size: 3,
237            max_batch_delay: Duration::from_secs(60),
238            enabled: true,
239        };
240        let mut batcher = NotificationBatcher::new(config);
241
242        assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
243        assert!(batcher.add(0, "mv_a", make_event(2)).is_none());
244        assert_eq!(batcher.buffered_count(), 2);
245
246        let batch = batcher.add(0, "mv_a", make_event(3));
247        assert!(batch.is_some());
248        let batch = batch.unwrap();
249        assert_eq!(batch.len(), 3);
250        assert_eq!(batch.first_sequence, 1);
251        assert_eq!(batch.last_sequence, 3);
252        assert_eq!(batcher.buffered_count(), 0);
253    }
254
255    #[test]
256    fn test_batcher_timeout_trigger() {
257        let config = BatchConfig {
258            max_batch_size: 1000,
259            max_batch_delay: Duration::from_millis(1),
260            enabled: true,
261        };
262        let mut batcher = NotificationBatcher::new(config);
263
264        assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
265        std::thread::sleep(Duration::from_millis(5));
266
267        let batch = batcher.add(0, "mv_a", make_event(2));
268        assert!(batch.is_some());
269        assert_eq!(batch.unwrap().len(), 2);
270    }
271
272    #[test]
273    fn test_batcher_flush_all() {
274        let config = BatchConfig {
275            max_batch_size: 100,
276            max_batch_delay: Duration::from_secs(60),
277            enabled: true,
278        };
279        let mut batcher = NotificationBatcher::new(config);
280
281        batcher.add(0, "mv_a", make_event(1));
282        batcher.add(0, "mv_a", make_event(2));
283        batcher.add(1, "mv_b", make_event(3));
284
285        let flushed = batcher.flush_all();
286        assert_eq!(flushed.len(), 2);
287
288        let total_events: usize = flushed.iter().map(|(_, b)| b.len()).sum();
289        assert_eq!(total_events, 3);
290        assert_eq!(batcher.buffered_count(), 0);
291    }
292
293    #[test]
294    fn test_batcher_flush_expired() {
295        let config = BatchConfig {
296            max_batch_size: 100,
297            max_batch_delay: Duration::from_millis(1),
298            enabled: true,
299        };
300        let mut batcher = NotificationBatcher::new(config);
301
302        batcher.add(0, "mv_a", make_event(1));
303        batcher.add(0, "mv_a", make_event(2));
304
305        std::thread::sleep(Duration::from_millis(5));
306
307        let expired = batcher.flush_expired();
308        assert_eq!(expired.len(), 1);
309        assert_eq!(expired[0].1.len(), 2);
310        assert_eq!(batcher.buffered_count(), 0);
311    }
312
313    #[test]
314    fn test_batcher_multiple_sources() {
315        let config = BatchConfig {
316            max_batch_size: 2,
317            max_batch_delay: Duration::from_secs(60),
318            enabled: true,
319        };
320        let mut batcher = NotificationBatcher::new(config);
321
322        assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
323        assert!(batcher.add(1, "mv_b", make_event(2)).is_none());
324        let batch = batcher.add(0, "mv_a", make_event(3));
325        assert!(batch.is_some());
326        assert_eq!(batch.unwrap().len(), 2);
327        assert_eq!(batcher.buffered_count(), 1);
328    }
329}