Skip to main content

laminar_storage/
changelog_drainer.rs

1//! Ring 1 changelog drainer.
2//!
3//! Consumes entries from the Ring 0 [`StateChangelogBuffer`] to relieve
4//! SPSC backpressure. Runs in the background (Ring 1) on a periodic
5//! or checkpoint-triggered schedule.
6//!
7//! ## Design
8//!
9//! - Drains the SPSC buffer without allocation (reads pre-allocated entries)
10//! - Tracks drain metrics for observability
11//! - Supports explicit flush for checkpoint coordination
12//! - Bounded `pending` buffer prevents unbounded memory growth
13
14use crate::incremental::{StateChangelogBuffer, StateChangelogEntry};
15
16/// Drains a Ring 0 [`StateChangelogBuffer`] from Ring 1.
17///
18/// The drainer is the consumer side of the SPSC changelog buffer.
19/// It pops entries to relieve Ring 0 backpressure and tracks
20/// metadata for observability. Pending entries are cleared after
21/// each successful checkpoint via [`clear_pending()`](Self::clear_pending).
22pub struct ChangelogDrainer {
23    /// Reference to the shared changelog buffer (producer: Ring 0, consumer: this).
24    buffer: std::sync::Arc<StateChangelogBuffer>,
25    /// Accumulated entries since last flush.
26    pending: Vec<StateChangelogEntry>,
27    /// Maximum entries to pop per drain call.
28    max_batch_size: usize,
29    /// Upper bound on `pending.len()` — older entries are discarded when exceeded.
30    max_pending: usize,
31    /// Total entries drained over the lifetime of this drainer.
32    total_drained: u64,
33}
34
35/// Default upper bound on pending entries (256K entries × 32 bytes ≈ 8 MB).
36const DEFAULT_MAX_PENDING: usize = 256 * 1024;
37
38impl ChangelogDrainer {
39    /// Creates a new drainer for the given changelog buffer.
40    #[must_use]
41    pub fn new(buffer: std::sync::Arc<StateChangelogBuffer>, max_batch_size: usize) -> Self {
42        Self {
43            buffer,
44            pending: Vec::with_capacity(max_batch_size),
45            max_batch_size,
46            max_pending: DEFAULT_MAX_PENDING,
47            total_drained: 0,
48        }
49    }
50
51    /// Sets the upper bound on pending entries.
52    ///
53    /// When `pending` exceeds this limit during [`drain()`](Self::drain),
54    /// older entries are discarded to prevent unbounded memory growth.
55    #[must_use]
56    pub fn with_max_pending(mut self, max_pending: usize) -> Self {
57        self.max_pending = max_pending;
58        self
59    }
60
61    /// Drains available entries from the buffer into the pending batch.
62    ///
63    /// If `pending` is at the `max_pending` limit, the oldest half of
64    /// pending entries are discarded to make room. Returns the number
65    /// of new entries drained from the buffer.
66    pub fn drain(&mut self) -> usize {
67        // Enforce max_pending: if we're at the limit, shed the oldest half
68        // to make room while preserving recent entries. This is preferable
69        // to clearing everything — the newest entries are most likely to
70        // be needed for the next checkpoint.
71        if self.pending.len() >= self.max_pending {
72            let keep = self.max_pending / 2;
73            let drop_count = self.pending.len() - keep;
74            tracing::warn!(
75                dropped = drop_count,
76                kept = keep,
77                max_pending = self.max_pending,
78                "changelog drainer pending buffer at limit, shedding oldest entries"
79            );
80            self.pending.drain(..drop_count);
81        }
82
83        let room = self.max_pending.saturating_sub(self.pending.len());
84        let limit = self.max_batch_size.min(room);
85
86        let mut count = 0;
87        while count < limit {
88            match self.buffer.pop() {
89                Some(entry) => {
90                    self.pending.push(entry);
91                    count += 1;
92                }
93                None => break,
94            }
95        }
96        self.total_drained += count as u64;
97        count
98    }
99
100    /// Takes the pending batch, leaving an empty pending buffer.
101    ///
102    /// After calling this, the drainer's pending buffer is cleared and
103    /// ready to accumulate more entries. The allocation is NOT reused.
104    pub fn take_pending(&mut self) -> Vec<StateChangelogEntry> {
105        std::mem::take(&mut self.pending)
106    }
107
108    /// Clears the pending buffer, reusing the existing allocation.
109    ///
110    /// Call this after a successful checkpoint to release the metadata
111    /// entries — they're no longer needed once the checkpoint has
112    /// captured the full state.
113    pub fn clear_pending(&mut self) {
114        self.pending.clear();
115    }
116
117    /// Returns the number of pending (un-taken) entries.
118    #[must_use]
119    pub fn pending_count(&self) -> usize {
120        self.pending.len()
121    }
122
123    /// Returns a reference to the pending entries.
124    #[must_use]
125    pub fn pending(&self) -> &[StateChangelogEntry] {
126        &self.pending
127    }
128
129    /// Returns the total number of entries drained over the drainer's lifetime.
130    #[must_use]
131    pub fn total_drained(&self) -> u64 {
132        self.total_drained
133    }
134
135    /// Returns a reference to the underlying changelog buffer.
136    #[must_use]
137    pub fn buffer(&self) -> &StateChangelogBuffer {
138        &self.buffer
139    }
140}
141
142impl std::fmt::Debug for ChangelogDrainer {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("ChangelogDrainer")
145            .field("pending", &self.pending.len())
146            .field("max_batch_size", &self.max_batch_size)
147            .field("max_pending", &self.max_pending)
148            .field("total_drained", &self.total_drained)
149            .finish_non_exhaustive()
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::incremental::StateChangelogEntry;
157    use std::sync::Arc;
158
159    #[test]
160    fn test_drainer_empty_buffer() {
161        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
162        let mut drainer = ChangelogDrainer::new(buf, 100);
163
164        assert_eq!(drainer.drain(), 0);
165        assert_eq!(drainer.pending_count(), 0);
166        assert_eq!(drainer.total_drained(), 0);
167    }
168
169    #[test]
170    fn test_drainer_basic_drain() {
171        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
172
173        // Push some entries
174        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
175        buf.push(StateChangelogEntry::put(1, 200, 10, 20));
176        buf.push(StateChangelogEntry::delete(1, 300));
177
178        let mut drainer = ChangelogDrainer::new(buf, 100);
179        let count = drainer.drain();
180
181        assert_eq!(count, 3);
182        assert_eq!(drainer.pending_count(), 3);
183        assert_eq!(drainer.total_drained(), 3);
184    }
185
186    #[test]
187    fn test_drainer_take_pending() {
188        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
189        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
190        buf.push(StateChangelogEntry::put(1, 200, 10, 20));
191
192        let mut drainer = ChangelogDrainer::new(buf, 100);
193        drainer.drain();
194
195        let entries = drainer.take_pending();
196        assert_eq!(entries.len(), 2);
197        assert_eq!(drainer.pending_count(), 0);
198
199        // Verify entry contents
200        assert!(entries[0].is_put());
201        assert_eq!(entries[0].key_hash, 100);
202        assert!(entries[1].is_put());
203        assert_eq!(entries[1].key_hash, 200);
204    }
205
206    #[test]
207    fn test_drainer_respects_max_batch_size() {
208        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
209
210        // Push more entries than the max batch size
211        for i in 0..10 {
212            buf.push(StateChangelogEntry::put(1, i, 0, 1));
213        }
214
215        let mut drainer = ChangelogDrainer::new(buf, 3);
216        let count = drainer.drain();
217
218        // Should only drain 3
219        assert_eq!(count, 3);
220        assert_eq!(drainer.pending_count(), 3);
221
222        // Drain again for next batch
223        let count2 = drainer.drain();
224        assert_eq!(count2, 3);
225        assert_eq!(drainer.pending_count(), 6);
226    }
227
228    #[test]
229    fn test_drainer_multiple_drain_cycles() {
230        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
231
232        // First cycle
233        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
234        let mut drainer = ChangelogDrainer::new(buf.clone(), 100);
235        drainer.drain();
236        let batch1 = drainer.take_pending();
237        assert_eq!(batch1.len(), 1);
238
239        // Second cycle
240        buf.push(StateChangelogEntry::delete(2, 200));
241        buf.push(StateChangelogEntry::put(2, 300, 20, 30));
242        drainer.drain();
243        let batch2 = drainer.take_pending();
244        assert_eq!(batch2.len(), 2);
245
246        assert_eq!(drainer.total_drained(), 3);
247    }
248
249    #[test]
250    fn test_drainer_debug() {
251        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
252        let drainer = ChangelogDrainer::new(buf, 100);
253        let debug = format!("{drainer:?}");
254        assert!(debug.contains("ChangelogDrainer"));
255        assert!(debug.contains("pending: 0"));
256        assert!(debug.contains("max_pending"));
257    }
258
259    #[test]
260    fn test_clear_pending_reuses_allocation() {
261        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
262        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
263        buf.push(StateChangelogEntry::put(1, 200, 10, 20));
264
265        let mut drainer = ChangelogDrainer::new(buf, 100);
266        drainer.drain();
267        assert_eq!(drainer.pending_count(), 2);
268
269        drainer.clear_pending();
270        assert_eq!(drainer.pending_count(), 0);
271        // total_drained is not reset
272        assert_eq!(drainer.total_drained(), 2);
273    }
274
275    #[test]
276    fn test_max_pending_bounds() {
277        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
278
279        // Push 10 entries
280        for i in 0..10 {
281            buf.push(StateChangelogEntry::put(1, i, 0, 1));
282        }
283
284        // Create drainer with max_pending = 6
285        let mut drainer = ChangelogDrainer::new(buf.clone(), 100).with_max_pending(6);
286
287        // First drain: gets 5 (room = 6 - 0 - 1, but actually 6 entries fit with room=6)
288        let count = drainer.drain();
289        assert_eq!(count, 6);
290        assert_eq!(drainer.pending_count(), 6);
291
292        // Second drain: pending is at max_pending (6 >= 6), so shed oldest half (3),
293        // keeping 3 recent entries. Then drain remaining 4 from buffer, but room is
294        // only 6-3=3, so only 3 more are drained.
295        let count2 = drainer.drain();
296        assert_eq!(count2, 3);
297        assert_eq!(drainer.pending_count(), 6); // 3 kept + 3 new
298        assert_eq!(drainer.total_drained(), 9);
299
300        // Third drain: again at limit, shed oldest half (3), keep 3,
301        // drain remaining 1 from buffer.
302        let count3 = drainer.drain();
303        assert_eq!(count3, 1);
304        assert_eq!(drainer.pending_count(), 4); // 3 kept + 1 new
305        assert_eq!(drainer.total_drained(), 10);
306    }
307
308    #[test]
309    fn test_max_pending_does_not_exceed() {
310        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
311        for i in 0..3 {
312            buf.push(StateChangelogEntry::put(1, i, 0, 1));
313        }
314
315        let mut drainer = ChangelogDrainer::new(buf, 100).with_max_pending(5);
316        let count = drainer.drain();
317        assert_eq!(count, 3);
318        assert_eq!(drainer.pending_count(), 3);
319        // Still below max_pending, should not clear
320    }
321}