Skip to main content

laminar_storage/incremental/
changelog.rs

1//! Ring 0 changelog buffer for incremental checkpointing.
2//!
3//! This module provides zero-allocation tracking of state mutations for
4//! background WAL writes and incremental checkpoints.
5//!
6//! ## Design
7//!
8//! The changelog buffer sits on the Ring 0 hot path and must:
9//! - Never allocate after warmup (pre-allocated capacity)
10//! - Be O(1) for push operations
11//! - Signal backpressure when full
12//! - Drain efficiently to Ring 1 for WAL writes
13//!
14//! ## Memory Layout
15//!
16//! Each [`StateChangelogEntry`] is 32 bytes:
17//! ```text
18//! ┌─────────┬──────────┬─────────────┬───────────┬────┬─────────┐
19//! │ epoch   │ key_hash │ mmap_offset │ value_len │ op │ padding │
20//! │ 8 bytes │ 8 bytes  │ 8 bytes     │ 4 bytes   │ 1  │ 3 bytes │
21//! └─────────┴──────────┴─────────────┴───────────┴────┴─────────┘
22//! ```
23
24use std::cell::UnsafeCell;
25use std::hash::{BuildHasher, Hasher};
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use rustc_hash::FxBuildHasher;
29
30/// State mutation operation type.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum StateOp {
34    /// Put (insert or update) operation.
35    Put = 0,
36    /// Delete operation.
37    Delete = 1,
38}
39
40impl StateOp {
41    /// Convert to u8 for compact storage.
42    #[inline]
43    #[must_use]
44    pub const fn to_u8(self) -> u8 {
45        self as u8
46    }
47
48    /// Convert from u8.
49    #[inline]
50    #[must_use]
51    pub const fn from_u8(val: u8) -> Self {
52        match val {
53            1 => Self::Delete,
54            _ => Self::Put,
55        }
56    }
57}
58
59/// Zero-allocation changelog entry for Ring 0 hot path.
60///
61/// This struct is designed for minimal memory footprint (32 bytes) and
62/// cache-efficient access. It stores offset references into the mmap
63/// state store rather than copying key/value data.
64///
65/// # Memory Layout
66///
67/// The struct is `repr(C)` to ensure predictable memory layout:
68/// - `epoch`: 8 bytes - Logical epoch number for ordering
69/// - `key_hash`: 8 bytes - `FxHash` of the key for quick comparison
70/// - `mmap_offset`: 8 bytes - Offset into the mmap state store
71/// - `value_len`: 4 bytes - Length of the value (0 for deletes)
72/// - `op`: 1 byte - Operation type (Put/Delete)
73/// - `_padding`: 3 bytes - Alignment padding
74#[derive(Debug, Clone, Copy)]
75#[repr(C)]
76pub struct StateChangelogEntry {
77    /// Logical epoch number for ordering changelog entries.
78    pub epoch: u64,
79    /// `FxHash` of the key (for deduplication and lookup).
80    pub key_hash: u64,
81    /// Offset into the mmap state store where value is stored.
82    pub mmap_offset: u64,
83    /// Length of the value in bytes (0 for Delete operations).
84    pub value_len: u32,
85    /// Operation type.
86    op: u8,
87    /// Padding for alignment.
88    _padding: [u8; 3],
89}
90
91impl StateChangelogEntry {
92    /// Creates a new Put changelog entry.
93    #[inline]
94    #[must_use]
95    pub fn put(epoch: u64, key_hash: u64, mmap_offset: u64, value_len: u32) -> Self {
96        Self {
97            epoch,
98            key_hash,
99            mmap_offset,
100            value_len,
101            op: StateOp::Put.to_u8(),
102            _padding: [0; 3],
103        }
104    }
105
106    /// Creates a new Delete changelog entry.
107    #[inline]
108    #[must_use]
109    pub fn delete(epoch: u64, key_hash: u64) -> Self {
110        Self {
111            epoch,
112            key_hash,
113            mmap_offset: 0,
114            value_len: 0,
115            op: StateOp::Delete.to_u8(),
116            _padding: [0; 3],
117        }
118    }
119
120    /// Creates a changelog entry from a key with automatic hashing.
121    #[inline]
122    #[must_use]
123    pub fn from_key(key: &[u8], epoch: u64, mmap_offset: u64, value_len: u32, op: StateOp) -> Self {
124        let key_hash = Self::hash_key(key);
125        Self {
126            epoch,
127            key_hash,
128            mmap_offset,
129            value_len,
130            op: op.to_u8(),
131            _padding: [0; 3],
132        }
133    }
134
135    /// Returns the operation type.
136    #[inline]
137    #[must_use]
138    pub fn op(&self) -> StateOp {
139        StateOp::from_u8(self.op)
140    }
141
142    /// Returns true if this is a Put operation.
143    #[inline]
144    #[must_use]
145    pub fn is_put(&self) -> bool {
146        self.op == StateOp::Put.to_u8()
147    }
148
149    /// Returns true if this is a Delete operation.
150    #[inline]
151    #[must_use]
152    pub fn is_delete(&self) -> bool {
153        self.op == StateOp::Delete.to_u8()
154    }
155
156    /// Hash a key using `FxHash` (fast, consistent across entries).
157    #[inline]
158    #[must_use]
159    pub fn hash_key(key: &[u8]) -> u64 {
160        let hasher_builder = FxBuildHasher;
161        let mut hasher = hasher_builder.build_hasher();
162        hasher.write(key);
163        hasher.finish()
164    }
165}
166
167// Verify the struct is exactly 32 bytes
168const _: () = assert!(std::mem::size_of::<StateChangelogEntry>() == 32);
169
170/// Ring 0 SPSC changelog buffer for state mutations.
171///
172/// This buffer is designed for the hot path and must never allocate after
173/// initial warmup. It provides backpressure signaling when full.
174///
175/// ## Thread Safety
176///
177/// The buffer is designed for single-producer single-consumer access:
178/// - Ring 0 (producer): pushes entries via `push()`
179/// - Ring 1 (consumer): drains entries via `drain()`
180///
181/// Use atomic indices for thread-safe access when needed.
182///
183/// ## Example
184///
185/// ```rust,no_run
186/// use laminar_storage::incremental::{StateChangelogBuffer, StateChangelogEntry};
187///
188/// // Pre-allocate buffer for 1024 entries
189/// let mut buffer = StateChangelogBuffer::with_capacity(1024);
190///
191/// // Ring 0: Push state mutations (no allocation)
192/// let entry = StateChangelogEntry::put(1, 12345, 0, 100);
193/// if !buffer.push(entry) {
194///     // Buffer full - apply backpressure
195/// }
196///
197/// // Ring 1: Drain for WAL writes
198/// let entries: Vec<_> = buffer.drain_all().collect();
199/// ```
200pub struct StateChangelogBuffer {
201    /// Pre-allocated entry storage with per-slot interior mutability.
202    ///
203    /// Each slot is wrapped in `UnsafeCell` because this is an SPSC ring buffer:
204    /// the producer (Ring 0) writes slots via `push()` and the consumer
205    /// (Ring 1) reads slots via `pop()`, both through `&self`. Without
206    /// `UnsafeCell`, writing through a shared reference would violate Rust's
207    /// aliasing rules (UB under Stacked Borrows / Tree Borrows).
208    /// This matches the pattern used by `SpscQueue` in `laminar-core`.
209    entries: Box<[UnsafeCell<StateChangelogEntry>]>,
210    /// Current write position (producer).
211    write_pos: AtomicUsize,
212    /// Current read position (consumer).
213    read_pos: AtomicUsize,
214    /// Buffer capacity.
215    capacity: usize,
216    /// Current epoch for new entries.
217    current_epoch: u64,
218    /// Metrics: total entries pushed.
219    total_pushed: AtomicUsize,
220    /// Metrics: total entries drained.
221    total_drained: AtomicUsize,
222    /// Metrics: overflow count (backpressure signals).
223    overflow_count: AtomicUsize,
224    /// Thread ID of the first producer (for debug-mode SPSC violation detection).
225    #[cfg(debug_assertions)]
226    producer_thread_id: std::sync::Mutex<Option<std::thread::ThreadId>>,
227}
228
229impl StateChangelogBuffer {
230    /// Default buffer capacity (16K entries = 512KB).
231    pub const DEFAULT_CAPACITY: usize = 16 * 1024;
232
233    /// Creates a new changelog buffer with the given capacity.
234    ///
235    /// The buffer is pre-allocated to avoid allocation on the hot path.
236    ///
237    /// # Panics
238    ///
239    /// Panics if capacity is 0.
240    #[must_use]
241    pub fn with_capacity(capacity: usize) -> Self {
242        assert!(capacity > 0, "capacity must be > 0");
243
244        // Round up to power of 2 for fast modulo
245        let capacity = capacity.next_power_of_two();
246
247        // Pre-allocate and zero-initialize with per-slot UnsafeCell
248        let zero_entry = StateChangelogEntry {
249            epoch: 0,
250            key_hash: 0,
251            mmap_offset: 0,
252            value_len: 0,
253            op: 0,
254            _padding: [0; 3],
255        };
256        let entries: Vec<UnsafeCell<StateChangelogEntry>> =
257            (0..capacity).map(|_| UnsafeCell::new(zero_entry)).collect();
258
259        Self {
260            entries: entries.into_boxed_slice(),
261            write_pos: AtomicUsize::new(0),
262            read_pos: AtomicUsize::new(0),
263            capacity,
264            current_epoch: 0,
265            total_pushed: AtomicUsize::new(0),
266            total_drained: AtomicUsize::new(0),
267            overflow_count: AtomicUsize::new(0),
268            #[cfg(debug_assertions)]
269            producer_thread_id: std::sync::Mutex::new(None),
270        }
271    }
272
273    /// Creates a buffer with the default capacity.
274    #[must_use]
275    pub fn new() -> Self {
276        Self::with_capacity(Self::DEFAULT_CAPACITY)
277    }
278
279    /// Sets the current epoch for new entries.
280    pub fn set_epoch(&mut self, epoch: u64) {
281        self.current_epoch = epoch;
282    }
283
284    /// Returns the current epoch.
285    #[must_use]
286    pub fn epoch(&self) -> u64 {
287        self.current_epoch
288    }
289
290    /// Advances to the next epoch.
291    pub fn advance_epoch(&mut self) -> u64 {
292        self.current_epoch += 1;
293        self.current_epoch
294    }
295
296    /// Pushes an entry to the buffer (zero allocation).
297    ///
298    /// Returns `true` if successful, `false` if buffer is full (backpressure).
299    #[inline]
300    #[allow(clippy::missing_panics_doc)] // debug-only SPSC violation check
301    pub fn push(&self, entry: StateChangelogEntry) -> bool {
302        #[cfg(debug_assertions)]
303        {
304            let current = std::thread::current().id();
305            let mut guard = self.producer_thread_id.lock().unwrap();
306            if let Some(expected) = *guard {
307                debug_assert_eq!(
308                    current, expected,
309                    "SPSC violation: push() called from a different thread"
310                );
311            } else {
312                *guard = Some(current);
313            }
314        }
315
316        let write_pos = self.write_pos.load(Ordering::Relaxed);
317        let read_pos = self.read_pos.load(Ordering::Acquire);
318
319        // Check if buffer is full
320        let next_pos = (write_pos + 1) & (self.capacity - 1);
321        if next_pos == read_pos {
322            self.overflow_count.fetch_add(1, Ordering::Relaxed);
323            return false;
324        }
325
326        // SAFETY: This is an SPSC ring buffer. We are the sole producer, and
327        // `write_pos` has not yet been published (the Release store is below),
328        // so the consumer cannot be reading this slot. The slot index is valid
329        // because `write_pos < capacity` is maintained by the mask above.
330        #[allow(unsafe_code)]
331        unsafe {
332            self.entries[write_pos].get().write(entry);
333        }
334
335        // Publish the entry
336        self.write_pos.store(next_pos, Ordering::Release);
337        self.total_pushed.fetch_add(1, Ordering::Relaxed);
338
339        true
340    }
341
342    /// Pushes a Put operation for the given key.
343    #[inline]
344    pub fn push_put(&self, key: &[u8], mmap_offset: u64, value_len: u32) -> bool {
345        let entry = StateChangelogEntry::from_key(
346            key,
347            self.current_epoch,
348            mmap_offset,
349            value_len,
350            StateOp::Put,
351        );
352        self.push(entry)
353    }
354
355    /// Pushes a Delete operation for the given key.
356    #[inline]
357    pub fn push_delete(&self, key: &[u8]) -> bool {
358        let entry = StateChangelogEntry::from_key(key, self.current_epoch, 0, 0, StateOp::Delete);
359        self.push(entry)
360    }
361
362    /// Attempts to pop a single entry (for consumer).
363    #[inline]
364    pub fn pop(&self) -> Option<StateChangelogEntry> {
365        let read_pos = self.read_pos.load(Ordering::Relaxed);
366        let write_pos = self.write_pos.load(Ordering::Acquire);
367
368        if read_pos == write_pos {
369            return None;
370        }
371
372        // SAFETY: This is an SPSC ring buffer. We are the sole consumer, and
373        // `read_pos` points to a slot that was published by the producer (the
374        // Acquire load of `write_pos` above synchronizes with the producer's
375        // Release store). The producer will not write to this slot until we
376        // advance `read_pos` past it (Release store below).
377        #[allow(unsafe_code)]
378        let entry = unsafe { self.entries[read_pos].get().read() };
379
380        // Advance read position
381        let next_pos = (read_pos + 1) & (self.capacity - 1);
382        self.read_pos.store(next_pos, Ordering::Release);
383        self.total_drained.fetch_add(1, Ordering::Relaxed);
384
385        Some(entry)
386    }
387
388    /// Drains up to `max_count` entries from the buffer.
389    ///
390    /// Returns an iterator over the drained entries.
391    pub fn drain(&self, max_count: usize) -> impl Iterator<Item = StateChangelogEntry> + '_ {
392        DrainIter {
393            buffer: self,
394            remaining: max_count,
395        }
396    }
397
398    /// Drains all available entries from the buffer.
399    pub fn drain_all(&self) -> impl Iterator<Item = StateChangelogEntry> + '_ {
400        self.drain(usize::MAX)
401    }
402
403    /// Returns the number of entries currently in the buffer.
404    #[must_use]
405    pub fn len(&self) -> usize {
406        let write_pos = self.write_pos.load(Ordering::Acquire);
407        let read_pos = self.read_pos.load(Ordering::Acquire);
408        write_pos.wrapping_sub(read_pos) & (self.capacity - 1)
409    }
410
411    /// Returns true if the buffer is empty.
412    #[must_use]
413    pub fn is_empty(&self) -> bool {
414        self.write_pos.load(Ordering::Acquire) == self.read_pos.load(Ordering::Acquire)
415    }
416
417    /// Returns true if the buffer is full.
418    #[must_use]
419    pub fn is_full(&self) -> bool {
420        let write_pos = self.write_pos.load(Ordering::Acquire);
421        let read_pos = self.read_pos.load(Ordering::Acquire);
422        ((write_pos + 1) & (self.capacity - 1)) == read_pos
423    }
424
425    /// Returns the buffer capacity.
426    #[must_use]
427    pub fn capacity(&self) -> usize {
428        self.capacity
429    }
430
431    /// Returns available space in the buffer.
432    #[must_use]
433    pub fn available(&self) -> usize {
434        self.capacity - self.len() - 1
435    }
436
437    /// Returns the total number of entries pushed (including overflows).
438    #[must_use]
439    pub fn total_pushed(&self) -> usize {
440        self.total_pushed.load(Ordering::Relaxed)
441    }
442
443    /// Returns the total number of entries drained.
444    #[must_use]
445    pub fn total_drained(&self) -> usize {
446        self.total_drained.load(Ordering::Relaxed)
447    }
448
449    /// Returns the number of overflow events (backpressure signals).
450    #[must_use]
451    pub fn overflow_count(&self) -> usize {
452        self.overflow_count.load(Ordering::Relaxed)
453    }
454
455    /// Clears the buffer (for testing/reset).
456    pub fn clear(&self) {
457        // Move read position to write position
458        let write_pos = self.write_pos.load(Ordering::Acquire);
459        self.read_pos.store(write_pos, Ordering::Release);
460    }
461
462    /// Creates a checkpoint barrier at the current position.
463    ///
464    /// Returns the current epoch and write position for recovery.
465    #[must_use]
466    pub fn checkpoint_barrier(&self) -> (u64, usize) {
467        (self.current_epoch, self.write_pos.load(Ordering::Acquire))
468    }
469}
470
471impl Default for StateChangelogBuffer {
472    fn default() -> Self {
473        Self::new()
474    }
475}
476
477// SAFETY: StateChangelogBuffer is Send because:
478// 1. All fields are either Send or thread-safe (atomics)
479// 2. The Box<[UnsafeCell<StateChangelogEntry>]> is pre-allocated and slot access
480//    is coordinated via atomic indices with proper Acquire/Release ordering
481#[allow(unsafe_code)]
482unsafe impl Send for StateChangelogBuffer {}
483
484// SAFETY: StateChangelogBuffer is Sync because:
485// 1. It is an SPSC ring buffer — exactly one producer and one consumer
486// 2. Slot access is coordinated via atomic write_pos/read_pos with
487//    Release (producer) and Acquire (consumer) ordering
488// 3. The producer only writes to unpublished slots; the consumer only
489//    reads from published slots — no data race is possible
490#[allow(unsafe_code)]
491unsafe impl Sync for StateChangelogBuffer {}
492
493/// Drain iterator for the changelog buffer.
494struct DrainIter<'a> {
495    buffer: &'a StateChangelogBuffer,
496    remaining: usize,
497}
498
499impl Iterator for DrainIter<'_> {
500    type Item = StateChangelogEntry;
501
502    fn next(&mut self) -> Option<Self::Item> {
503        if self.remaining == 0 {
504            return None;
505        }
506        self.remaining -= 1;
507        self.buffer.pop()
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514
515    #[test]
516    fn test_state_op_roundtrip() {
517        assert_eq!(StateOp::from_u8(StateOp::Put.to_u8()), StateOp::Put);
518        assert_eq!(StateOp::from_u8(StateOp::Delete.to_u8()), StateOp::Delete);
519        assert_eq!(StateOp::from_u8(255), StateOp::Put); // Default to Put
520    }
521
522    #[test]
523    fn test_changelog_entry_size() {
524        assert_eq!(std::mem::size_of::<StateChangelogEntry>(), 32);
525    }
526
527    #[test]
528    fn test_changelog_entry_put() {
529        let entry = StateChangelogEntry::put(1, 12345, 100, 50);
530        assert_eq!(entry.epoch, 1);
531        assert_eq!(entry.key_hash, 12345);
532        assert_eq!(entry.mmap_offset, 100);
533        assert_eq!(entry.value_len, 50);
534        assert!(entry.is_put());
535        assert!(!entry.is_delete());
536    }
537
538    #[test]
539    fn test_changelog_entry_delete() {
540        let entry = StateChangelogEntry::delete(2, 67890);
541        assert_eq!(entry.epoch, 2);
542        assert_eq!(entry.key_hash, 67890);
543        assert_eq!(entry.mmap_offset, 0);
544        assert_eq!(entry.value_len, 0);
545        assert!(entry.is_delete());
546        assert!(!entry.is_put());
547    }
548
549    #[test]
550    fn test_changelog_entry_from_key() {
551        let entry = StateChangelogEntry::from_key(b"test_key", 5, 200, 75, StateOp::Put);
552        assert_eq!(entry.epoch, 5);
553        assert_eq!(entry.mmap_offset, 200);
554        assert_eq!(entry.value_len, 75);
555        assert!(entry.is_put());
556
557        // Same key should produce same hash
558        let entry2 = StateChangelogEntry::from_key(b"test_key", 6, 300, 80, StateOp::Delete);
559        assert_eq!(entry.key_hash, entry2.key_hash);
560    }
561
562    #[test]
563    fn test_buffer_basic_operations() {
564        let buffer = StateChangelogBuffer::with_capacity(16);
565        assert!(buffer.is_empty());
566        assert_eq!(buffer.len(), 0);
567        assert_eq!(buffer.capacity(), 16);
568
569        let entry = StateChangelogEntry::put(1, 100, 0, 10);
570        assert!(buffer.push(entry));
571        assert!(!buffer.is_empty());
572        assert_eq!(buffer.len(), 1);
573
574        let popped = buffer.pop().unwrap();
575        assert_eq!(popped.key_hash, 100);
576        assert!(buffer.is_empty());
577    }
578
579    #[test]
580    fn test_buffer_full() {
581        let buffer = StateChangelogBuffer::with_capacity(4);
582
583        // Fill buffer (capacity - 1 = 3 entries for ring buffer)
584        for i in 0..3 {
585            assert!(buffer.push(StateChangelogEntry::put(1, i, 0, 10)));
586        }
587
588        // Should be full
589        assert!(buffer.is_full());
590        assert!(!buffer.push(StateChangelogEntry::put(1, 999, 0, 10)));
591        assert_eq!(buffer.overflow_count(), 1);
592    }
593
594    #[test]
595    fn test_buffer_drain() {
596        let buffer = StateChangelogBuffer::with_capacity(16);
597
598        for i in 0..5 {
599            buffer.push(StateChangelogEntry::put(1, i, 0, 10));
600        }
601
602        assert_eq!(buffer.len(), 5);
603
604        // Drain 3 entries
605        let drained: Vec<_> = buffer.drain(3).collect();
606        assert_eq!(drained.len(), 3);
607        assert_eq!(buffer.len(), 2);
608
609        // Drain remaining
610        let remaining: Vec<_> = buffer.drain_all().collect();
611        assert_eq!(remaining.len(), 2);
612        assert!(buffer.is_empty());
613    }
614
615    #[test]
616    fn test_buffer_epoch() {
617        let mut buffer = StateChangelogBuffer::with_capacity(16);
618        assert_eq!(buffer.epoch(), 0);
619
620        buffer.set_epoch(10);
621        assert_eq!(buffer.epoch(), 10);
622
623        assert_eq!(buffer.advance_epoch(), 11);
624        assert_eq!(buffer.epoch(), 11);
625    }
626
627    #[test]
628    fn test_buffer_push_helpers() {
629        let buffer = StateChangelogBuffer::with_capacity(16);
630
631        assert!(buffer.push_put(b"key1", 100, 50));
632        assert!(buffer.push_delete(b"key2"));
633
634        let entries: Vec<_> = buffer.drain_all().collect();
635        assert_eq!(entries.len(), 2);
636        assert!(entries[0].is_put());
637        assert!(entries[1].is_delete());
638    }
639
640    #[test]
641    fn test_buffer_clear() {
642        let buffer = StateChangelogBuffer::with_capacity(16);
643
644        for i in 0..5 {
645            buffer.push(StateChangelogEntry::put(1, i, 0, 10));
646        }
647        assert_eq!(buffer.len(), 5);
648
649        buffer.clear();
650        assert!(buffer.is_empty());
651    }
652
653    #[test]
654    fn test_buffer_checkpoint_barrier() {
655        let mut buffer = StateChangelogBuffer::with_capacity(16);
656        buffer.set_epoch(42);
657
658        buffer.push(StateChangelogEntry::put(42, 1, 0, 10));
659        buffer.push(StateChangelogEntry::put(42, 2, 0, 10));
660
661        let (epoch, pos) = buffer.checkpoint_barrier();
662        assert_eq!(epoch, 42);
663        assert_eq!(pos, 2);
664    }
665
666    #[test]
667    fn test_buffer_metrics() {
668        let buffer = StateChangelogBuffer::with_capacity(8);
669
670        for i in 0..5 {
671            buffer.push(StateChangelogEntry::put(1, i, 0, 10));
672        }
673        assert_eq!(buffer.total_pushed(), 5);
674        assert_eq!(buffer.total_drained(), 0);
675
676        let _ = buffer.pop();
677        let _ = buffer.pop();
678        assert_eq!(buffer.total_drained(), 2);
679    }
680
681    #[test]
682    fn test_entry_from_key() {
683        let key_hash = StateChangelogEntry::hash_key(b"mykey");
684
685        let put = StateChangelogEntry::put(100, key_hash, 500, 75);
686        assert_eq!(put.epoch, 100);
687        assert_eq!(put.mmap_offset, 500);
688        assert_eq!(put.value_len, 75);
689        assert!(put.is_put());
690
691        let delete = StateChangelogEntry::delete(100, key_hash);
692        assert_eq!(delete.epoch, 100);
693        assert!(delete.is_delete());
694
695        // Same key produces same hash
696        assert_eq!(put.key_hash, delete.key_hash);
697    }
698
699    #[test]
700    fn test_buffer_wraparound() {
701        let buffer = StateChangelogBuffer::with_capacity(4);
702
703        // Fill and drain multiple times to test wraparound
704        for iteration in 0..5 {
705            for i in 0..3 {
706                assert!(
707                    buffer.push(StateChangelogEntry::put(1, i + iteration * 10, 0, 10)),
708                    "Failed at iteration {iteration}, entry {i}"
709                );
710            }
711
712            let drained: Vec<_> = buffer.drain_all().collect();
713            assert_eq!(drained.len(), 3, "Failed at iteration {iteration}");
714        }
715    }
716
717    #[test]
718    fn test_key_hash_consistency() {
719        let key = b"consistent_key";
720        let hash1 = StateChangelogEntry::hash_key(key);
721        let hash2 = StateChangelogEntry::hash_key(key);
722        assert_eq!(hash1, hash2);
723
724        let different_key = b"different_key";
725        let hash3 = StateChangelogEntry::hash_key(different_key);
726        assert_ne!(hash1, hash3);
727    }
728}