1use crate::incremental::{StateChangelogBuffer, StateChangelogEntry};
15
16pub struct ChangelogDrainer {
23 buffer: std::sync::Arc<StateChangelogBuffer>,
25 pending: Vec<StateChangelogEntry>,
27 max_batch_size: usize,
29 max_pending: usize,
31 total_drained: u64,
33}
34
35const DEFAULT_MAX_PENDING: usize = 256 * 1024;
37
38impl ChangelogDrainer {
39 #[must_use]
41 pub fn new(buffer: std::sync::Arc<StateChangelogBuffer>, max_batch_size: usize) -> Self {
42 Self {
43 buffer,
44 pending: Vec::with_capacity(max_batch_size),
45 max_batch_size,
46 max_pending: DEFAULT_MAX_PENDING,
47 total_drained: 0,
48 }
49 }
50
51 #[must_use]
56 pub fn with_max_pending(mut self, max_pending: usize) -> Self {
57 self.max_pending = max_pending;
58 self
59 }
60
61 pub fn drain(&mut self) -> usize {
67 if self.pending.len() >= self.max_pending {
72 let keep = self.max_pending / 2;
73 let drop_count = self.pending.len() - keep;
74 tracing::warn!(
75 dropped = drop_count,
76 kept = keep,
77 max_pending = self.max_pending,
78 "changelog drainer pending buffer at limit, shedding oldest entries"
79 );
80 self.pending.drain(..drop_count);
81 }
82
83 let room = self.max_pending.saturating_sub(self.pending.len());
84 let limit = self.max_batch_size.min(room);
85
86 let mut count = 0;
87 while count < limit {
88 match self.buffer.pop() {
89 Some(entry) => {
90 self.pending.push(entry);
91 count += 1;
92 }
93 None => break,
94 }
95 }
96 self.total_drained += count as u64;
97 count
98 }
99
100 pub fn take_pending(&mut self) -> Vec<StateChangelogEntry> {
105 std::mem::take(&mut self.pending)
106 }
107
108 pub fn clear_pending(&mut self) {
114 self.pending.clear();
115 }
116
117 #[must_use]
119 pub fn pending_count(&self) -> usize {
120 self.pending.len()
121 }
122
123 #[must_use]
125 pub fn pending(&self) -> &[StateChangelogEntry] {
126 &self.pending
127 }
128
129 #[must_use]
131 pub fn total_drained(&self) -> u64 {
132 self.total_drained
133 }
134
135 #[must_use]
137 pub fn buffer(&self) -> &StateChangelogBuffer {
138 &self.buffer
139 }
140}
141
142impl std::fmt::Debug for ChangelogDrainer {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("ChangelogDrainer")
145 .field("pending", &self.pending.len())
146 .field("max_batch_size", &self.max_batch_size)
147 .field("max_pending", &self.max_pending)
148 .field("total_drained", &self.total_drained)
149 .finish_non_exhaustive()
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::incremental::StateChangelogEntry;
157 use std::sync::Arc;
158
159 #[test]
160 fn test_drainer_empty_buffer() {
161 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
162 let mut drainer = ChangelogDrainer::new(buf, 100);
163
164 assert_eq!(drainer.drain(), 0);
165 assert_eq!(drainer.pending_count(), 0);
166 assert_eq!(drainer.total_drained(), 0);
167 }
168
169 #[test]
170 fn test_drainer_basic_drain() {
171 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
172
173 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
175 buf.push(StateChangelogEntry::put(1, 200, 10, 20));
176 buf.push(StateChangelogEntry::delete(1, 300));
177
178 let mut drainer = ChangelogDrainer::new(buf, 100);
179 let count = drainer.drain();
180
181 assert_eq!(count, 3);
182 assert_eq!(drainer.pending_count(), 3);
183 assert_eq!(drainer.total_drained(), 3);
184 }
185
186 #[test]
187 fn test_drainer_take_pending() {
188 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
189 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
190 buf.push(StateChangelogEntry::put(1, 200, 10, 20));
191
192 let mut drainer = ChangelogDrainer::new(buf, 100);
193 drainer.drain();
194
195 let entries = drainer.take_pending();
196 assert_eq!(entries.len(), 2);
197 assert_eq!(drainer.pending_count(), 0);
198
199 assert!(entries[0].is_put());
201 assert_eq!(entries[0].key_hash, 100);
202 assert!(entries[1].is_put());
203 assert_eq!(entries[1].key_hash, 200);
204 }
205
206 #[test]
207 fn test_drainer_respects_max_batch_size() {
208 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
209
210 for i in 0..10 {
212 buf.push(StateChangelogEntry::put(1, i, 0, 1));
213 }
214
215 let mut drainer = ChangelogDrainer::new(buf, 3);
216 let count = drainer.drain();
217
218 assert_eq!(count, 3);
220 assert_eq!(drainer.pending_count(), 3);
221
222 let count2 = drainer.drain();
224 assert_eq!(count2, 3);
225 assert_eq!(drainer.pending_count(), 6);
226 }
227
228 #[test]
229 fn test_drainer_multiple_drain_cycles() {
230 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
231
232 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
234 let mut drainer = ChangelogDrainer::new(buf.clone(), 100);
235 drainer.drain();
236 let batch1 = drainer.take_pending();
237 assert_eq!(batch1.len(), 1);
238
239 buf.push(StateChangelogEntry::delete(2, 200));
241 buf.push(StateChangelogEntry::put(2, 300, 20, 30));
242 drainer.drain();
243 let batch2 = drainer.take_pending();
244 assert_eq!(batch2.len(), 2);
245
246 assert_eq!(drainer.total_drained(), 3);
247 }
248
249 #[test]
250 fn test_drainer_debug() {
251 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
252 let drainer = ChangelogDrainer::new(buf, 100);
253 let debug = format!("{drainer:?}");
254 assert!(debug.contains("ChangelogDrainer"));
255 assert!(debug.contains("pending: 0"));
256 assert!(debug.contains("max_pending"));
257 }
258
259 #[test]
260 fn test_clear_pending_reuses_allocation() {
261 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
262 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
263 buf.push(StateChangelogEntry::put(1, 200, 10, 20));
264
265 let mut drainer = ChangelogDrainer::new(buf, 100);
266 drainer.drain();
267 assert_eq!(drainer.pending_count(), 2);
268
269 drainer.clear_pending();
270 assert_eq!(drainer.pending_count(), 0);
271 assert_eq!(drainer.total_drained(), 2);
273 }
274
275 #[test]
276 fn test_max_pending_bounds() {
277 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
278
279 for i in 0..10 {
281 buf.push(StateChangelogEntry::put(1, i, 0, 1));
282 }
283
284 let mut drainer = ChangelogDrainer::new(buf.clone(), 100).with_max_pending(6);
286
287 let count = drainer.drain();
289 assert_eq!(count, 6);
290 assert_eq!(drainer.pending_count(), 6);
291
292 let count2 = drainer.drain();
296 assert_eq!(count2, 3);
297 assert_eq!(drainer.pending_count(), 6); assert_eq!(drainer.total_drained(), 9);
299
300 let count3 = drainer.drain();
303 assert_eq!(count3, 1);
304 assert_eq!(drainer.pending_count(), 4); assert_eq!(drainer.total_drained(), 10);
306 }
307
308 #[test]
309 fn test_max_pending_does_not_exceed() {
310 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
311 for i in 0..3 {
312 buf.push(StateChangelogEntry::put(1, i, 0, 1));
313 }
314
315 let mut drainer = ChangelogDrainer::new(buf, 100).with_max_pending(5);
316 let count = drainer.drain();
317 assert_eq!(count, 3);
318 assert_eq!(drainer.pending_count(), 3);
319 }
321}