laminar_core/subscription/
batcher.rs1#![deny(clippy::disallowed_types)]
16
17use std::time::{Duration, Instant};
18
19use rustc_hash::FxHashMap;
20
21use crate::subscription::event::{ChangeEvent, ChangeEventBatch};
22
23#[derive(Debug, Clone)]
29pub struct BatchConfig {
30 pub max_batch_size: usize,
32 pub max_batch_delay: Duration,
34 pub enabled: bool,
37}
38
39impl Default for BatchConfig {
40 fn default() -> Self {
41 Self {
42 max_batch_size: 64,
43 max_batch_delay: Duration::from_micros(100),
44 enabled: false,
45 }
46 }
47}
48
49pub struct NotificationBatcher {
59 buffers: FxHashMap<u32, Vec<ChangeEvent>>,
61 last_flush: FxHashMap<u32, Instant>,
63 config: BatchConfig,
65}
66
67impl NotificationBatcher {
68 #[must_use]
70 pub fn new(config: BatchConfig) -> Self {
71 Self {
72 buffers: FxHashMap::default(),
73 last_flush: FxHashMap::default(),
74 config,
75 }
76 }
77
78 pub fn add(
85 &mut self,
86 source_id: u32,
87 source_name: &str,
88 event: ChangeEvent,
89 ) -> Option<ChangeEventBatch> {
90 if !self.config.enabled {
91 let seq = event.sequence().unwrap_or(0);
92 return Some(ChangeEventBatch::new(
93 source_name.to_string(),
94 vec![event],
95 seq,
96 seq,
97 ));
98 }
99
100 let buffer = self.buffers.entry(source_id).or_default();
101 buffer.push(event);
102
103 let now = Instant::now();
104 let last = self.last_flush.entry(source_id).or_insert(now);
105
106 if buffer.len() >= self.config.max_batch_size
107 || now.duration_since(*last) >= self.config.max_batch_delay
108 {
109 *last = now;
110 let events = std::mem::take(buffer);
111 let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
112 let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
113 Some(ChangeEventBatch::new(
114 source_name.to_string(),
115 events,
116 first_seq,
117 last_seq,
118 ))
119 } else {
120 None
121 }
122 }
123
124 pub fn flush_all(&mut self) -> Vec<(u32, ChangeEventBatch)> {
128 let mut results = Vec::new();
129
130 for (&source_id, buffer) in &mut self.buffers {
131 if !buffer.is_empty() {
132 let events = std::mem::take(buffer);
133 let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
134 let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
135 results.push((
136 source_id,
137 ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
138 ));
139 self.last_flush.insert(source_id, Instant::now());
140 }
141 }
142
143 results
144 }
145
146 pub fn flush_expired(&mut self) -> Vec<(u32, ChangeEventBatch)> {
150 let now = Instant::now();
151 let mut results = Vec::new();
152
153 let expired: Vec<u32> = self
154 .buffers
155 .iter()
156 .filter(|(_, buf)| !buf.is_empty())
157 .filter(|(id, _)| {
158 let last = self.last_flush.get(id).copied().unwrap_or(now);
159 now.duration_since(last) >= self.config.max_batch_delay
160 })
161 .map(|(&id, _)| id)
162 .collect();
163
164 for source_id in expired {
165 if let Some(buffer) = self.buffers.get_mut(&source_id) {
166 if !buffer.is_empty() {
167 let events = std::mem::take(buffer);
168 let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
169 let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
170 self.last_flush.insert(source_id, now);
171 results.push((
172 source_id,
173 ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
174 ));
175 }
176 }
177 }
178
179 results
180 }
181
182 #[must_use]
184 pub fn buffered_count(&self) -> usize {
185 self.buffers.values().map(Vec::len).sum()
186 }
187
188 #[must_use]
190 pub fn config(&self) -> &BatchConfig {
191 &self.config
192 }
193}
194
195#[cfg(test)]
200#[allow(clippy::cast_possible_wrap)]
201mod tests {
202 use super::*;
203 use arrow_array::{Int64Array, RecordBatch};
204 use arrow_schema::{DataType, Field, Schema};
205 use std::sync::Arc;
206
207 fn make_event(seq: u64) -> ChangeEvent {
208 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
209 #[allow(clippy::cast_possible_wrap)]
210 let array = Int64Array::from(vec![seq as i64]);
211 let batch = Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap());
212 #[allow(clippy::cast_possible_wrap)]
213 ChangeEvent::insert(batch, 1000 + seq as i64, seq)
214 }
215
216 #[test]
217 fn test_batcher_immediate_when_disabled() {
218 let config = BatchConfig {
219 enabled: false,
220 ..Default::default()
221 };
222 let mut batcher = NotificationBatcher::new(config);
223
224 let result = batcher.add(0, "mv_a", make_event(1));
225 assert!(result.is_some());
226 let batch = result.unwrap();
227 assert_eq!(batch.len(), 1);
228 assert_eq!(batch.first_sequence, 1);
229 assert_eq!(batch.last_sequence, 1);
230 assert_eq!(batch.source, "mv_a");
231 }
232
233 #[test]
234 fn test_batcher_size_trigger() {
235 let config = BatchConfig {
236 max_batch_size: 3,
237 max_batch_delay: Duration::from_secs(60),
238 enabled: true,
239 };
240 let mut batcher = NotificationBatcher::new(config);
241
242 assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
243 assert!(batcher.add(0, "mv_a", make_event(2)).is_none());
244 assert_eq!(batcher.buffered_count(), 2);
245
246 let batch = batcher.add(0, "mv_a", make_event(3));
247 assert!(batch.is_some());
248 let batch = batch.unwrap();
249 assert_eq!(batch.len(), 3);
250 assert_eq!(batch.first_sequence, 1);
251 assert_eq!(batch.last_sequence, 3);
252 assert_eq!(batcher.buffered_count(), 0);
253 }
254
255 #[test]
256 fn test_batcher_timeout_trigger() {
257 let config = BatchConfig {
258 max_batch_size: 1000,
259 max_batch_delay: Duration::from_millis(1),
260 enabled: true,
261 };
262 let mut batcher = NotificationBatcher::new(config);
263
264 assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
265 std::thread::sleep(Duration::from_millis(5));
266
267 let batch = batcher.add(0, "mv_a", make_event(2));
268 assert!(batch.is_some());
269 assert_eq!(batch.unwrap().len(), 2);
270 }
271
272 #[test]
273 fn test_batcher_flush_all() {
274 let config = BatchConfig {
275 max_batch_size: 100,
276 max_batch_delay: Duration::from_secs(60),
277 enabled: true,
278 };
279 let mut batcher = NotificationBatcher::new(config);
280
281 batcher.add(0, "mv_a", make_event(1));
282 batcher.add(0, "mv_a", make_event(2));
283 batcher.add(1, "mv_b", make_event(3));
284
285 let flushed = batcher.flush_all();
286 assert_eq!(flushed.len(), 2);
287
288 let total_events: usize = flushed.iter().map(|(_, b)| b.len()).sum();
289 assert_eq!(total_events, 3);
290 assert_eq!(batcher.buffered_count(), 0);
291 }
292
293 #[test]
294 fn test_batcher_flush_expired() {
295 let config = BatchConfig {
296 max_batch_size: 100,
297 max_batch_delay: Duration::from_millis(1),
298 enabled: true,
299 };
300 let mut batcher = NotificationBatcher::new(config);
301
302 batcher.add(0, "mv_a", make_event(1));
303 batcher.add(0, "mv_a", make_event(2));
304
305 std::thread::sleep(Duration::from_millis(5));
306
307 let expired = batcher.flush_expired();
308 assert_eq!(expired.len(), 1);
309 assert_eq!(expired[0].1.len(), 2);
310 assert_eq!(batcher.buffered_count(), 0);
311 }
312
313 #[test]
314 fn test_batcher_multiple_sources() {
315 let config = BatchConfig {
316 max_batch_size: 2,
317 max_batch_delay: Duration::from_secs(60),
318 enabled: true,
319 };
320 let mut batcher = NotificationBatcher::new(config);
321
322 assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
323 assert!(batcher.add(1, "mv_b", make_event(2)).is_none());
324 let batch = batcher.add(0, "mv_a", make_event(3));
325 assert!(batch.is_some());
326 assert_eq!(batch.unwrap().len(), 2);
327 assert_eq!(batcher.buffered_count(), 1);
328 }
329}