Skip to main content

laminar_core/state/
mmap.rs

1//! Memory-mapped state store implementation.
2//!
3//! This module provides a high-performance key-value state store using memory-mapped
4//! files for persistence and `BTreeMap` for sorted key access. It supports both in-memory
5//! and persistent modes.
6//!
7//! # Design
8//!
9//! The store uses a two-tier architecture:
10//! - **Index tier**: `BTreeMap` mapping keys to value entries (offset, length)
11//! - **Data tier**: Either arena-allocated memory or memory-mapped file
12//!
13//! # Performance Characteristics
14//!
15//! - **Get**: O(log n), < 500ns typical (tree lookup + pointer follow)
16//! - **Put**: O(log n), may trigger file growth
17//! - **Prefix scan**: O(log n + k) where k is matching entries
18//!
19//! # Usage
20//!
21//! ```rust,no_run
22//! use laminar_core::state::MmapStateStore;
23//! use std::path::Path;
24//!
25//! // In-memory mode (fast, not persistent)
26//! let mut store = MmapStateStore::in_memory(1024 * 1024); // 1MB arena
27//!
28//! // Persistent mode (file-backed)
29//! let mut store = MmapStateStore::persistent(Path::new("/tmp/state.db"), 1024 * 1024).unwrap();
30//! ```
31
32use bytes::Bytes;
33use memmap2::MmapMut;
34use std::collections::BTreeMap;
35use std::fs::{File, OpenOptions};
36use std::io::Write;
37use std::ops::{Bound, Range};
38use std::path::{Path, PathBuf};
39
40use super::{prefix_successor, StateError, StateSnapshot, StateStore};
41
42/// Advise the kernel to back an mmap region with huge pages (Linux only).
43///
44/// Uses `MADV_HUGEPAGE` to request transparent huge pages for the given region,
45/// eliminating TLB misses for large state stores. This is advisory — if huge
46/// pages are unavailable the kernel falls back to regular pages silently.
47#[cfg(target_os = "linux")]
48fn advise_hugepages(mmap: &MmapMut) {
49    #[allow(unsafe_code)]
50    // SAFETY: `mmap.as_ptr()` points to a valid mmap region of `mmap.len()` bytes.
51    // MADV_HUGEPAGE is an advisory hint that cannot cause memory unsafety even if
52    // it fails or the kernel ignores it.
53    unsafe {
54        libc::madvise(
55            mmap.as_ptr() as *mut libc::c_void,
56            mmap.len(),
57            libc::MADV_HUGEPAGE,
58        );
59    }
60}
61
62#[cfg(not(target_os = "linux"))]
63fn advise_hugepages(_mmap: &MmapMut) {
64    // No-op on non-Linux platforms.
65}
66
67/// Header size in the mmap file (magic + version + entry count + data offset).
68const MMAP_HEADER_SIZE: usize = 32;
69/// Magic number for mmap file identification ("LAMINAR" in hex-ish).
70const MMAP_MAGIC: u64 = 0x004C_414D_494E_4152;
71/// Current mmap file format version.
72const MMAP_VERSION: u32 = 1;
73/// Default growth factor when file needs to expand.
74const GROWTH_FACTOR: f64 = 1.5;
75/// Index file extension
76const INDEX_EXTENSION: &str = "idx";
77
78/// Entry metadata stored in the hash map index.
79#[derive(Debug, Clone, Copy, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
80// Validation via check_bytes is implicit in v0.8 with features, or handled differently
81struct ValueEntry {
82    /// Offset in the data region (arena or mmap).
83    offset: usize,
84    /// Length of the value in bytes.
85    len: usize,
86}
87
88/// Storage backend for the mmap store.
89enum Storage {
90    /// In-memory mode (fastest, not persistent).
91    Arena {
92        /// Pre-allocated buffer for data.
93        data: Vec<u8>,
94        /// Current write position.
95        write_pos: usize,
96    },
97    /// Memory-mapped file (persistent, supports larger-than-memory).
98    Mmap {
99        mmap: MmapMut,
100        file: File,
101        path: PathBuf,
102        /// Current write position in the data region.
103        write_pos: usize,
104        /// Total capacity of the file.
105        capacity: usize,
106    },
107}
108
109impl Storage {
110    /// Get a slice of data at the given offset and length.
111    ///
112    /// Returns `None` if the offset+length is out of bounds (e.g., corrupted index).
113    fn get(&self, offset: usize, len: usize) -> Option<&[u8]> {
114        let end = offset.checked_add(len)?;
115        match self {
116            Storage::Arena { data, .. } => data.get(offset..end),
117            Storage::Mmap { mmap, .. } => {
118                let start = MMAP_HEADER_SIZE.checked_add(offset)?;
119                mmap.get(start..start.checked_add(len)?)
120            }
121        }
122    }
123
124    /// Write data and return the offset where it was written.
125    fn write(&mut self, data: &[u8]) -> Result<usize, StateError> {
126        match self {
127            Storage::Arena {
128                data: buffer,
129                write_pos,
130            } => {
131                let offset = *write_pos;
132                let end = offset + data.len();
133
134                // Grow buffer if needed
135                if end > buffer.len() {
136                    // Growth calculation: precision loss is acceptable for buffer sizing
137                    #[allow(
138                        clippy::cast_possible_truncation,
139                        clippy::cast_sign_loss,
140                        clippy::cast_precision_loss
141                    )]
142                    let new_size = (end as f64 * GROWTH_FACTOR) as usize;
143                    buffer.resize(new_size, 0);
144                }
145
146                buffer[offset..end].copy_from_slice(data);
147                *write_pos = end;
148                Ok(offset)
149            }
150            Storage::Mmap {
151                mmap,
152                file,
153                path: _,
154                write_pos,
155                capacity,
156            } => {
157                let offset = *write_pos;
158                let end = offset + data.len();
159                let required = MMAP_HEADER_SIZE + end;
160
161                // Grow file if needed
162                if required > *capacity {
163                    // Growth calculation: precision loss is acceptable for capacity sizing
164                    #[allow(
165                        clippy::cast_possible_truncation,
166                        clippy::cast_sign_loss,
167                        clippy::cast_precision_loss
168                    )]
169                    let new_capacity = (required as f64 * GROWTH_FACTOR) as usize;
170                    file.set_len(new_capacity as u64)?;
171
172                    // Re-map with new size
173                    // SAFETY: We just resized the file and hold exclusive access.
174                    // The file descriptor is valid because we just successfully called set_len.
175                    // No other code has access to the file while we hold &mut self.
176                    #[allow(unsafe_code)]
177                    {
178                        *mmap = unsafe { MmapMut::map_mut(&*file)? };
179                    }
180                    advise_hugepages(mmap);
181                    *capacity = new_capacity;
182                }
183
184                mmap[MMAP_HEADER_SIZE + offset..MMAP_HEADER_SIZE + end].copy_from_slice(data);
185                *write_pos = end;
186                Ok(offset)
187            }
188        }
189    }
190
191    /// Get the current write position (used data size).
192    fn used_bytes(&self) -> usize {
193        match self {
194            Storage::Arena { write_pos, .. } | Storage::Mmap { write_pos, .. } => *write_pos,
195        }
196    }
197
198    /// Flush to disk (only meaningful for mmap).
199    fn flush(&mut self) -> Result<(), StateError> {
200        match self {
201            Storage::Arena { .. } => Ok(()),
202            Storage::Mmap { mmap, .. } => {
203                mmap.flush()?;
204                Ok(())
205            }
206        }
207    }
208
209    /// Reset the write position (for clear operation).
210    fn reset(&mut self) {
211        match self {
212            Storage::Arena { write_pos, .. } | Storage::Mmap { write_pos, .. } => *write_pos = 0,
213        }
214    }
215
216    /// Check if this is persistent storage.
217    fn is_persistent(&self) -> bool {
218        matches!(self, Storage::Mmap { .. })
219    }
220}
221
222/// Memory-mapped state store implementation.
223///
224/// This store provides high-performance key-value storage with optional
225/// persistence via memory-mapped files. It achieves sub-500ns lookup latency
226/// by using `BTreeMap` for the index (enabling O(log n + k) prefix/range scans)
227/// and direct memory access for values.
228///
229/// # Modes
230///
231/// - **In-memory**: Uses an arena allocator, fastest but not persistent
232/// - **Persistent**: Uses memory-mapped file, survives restarts
233///
234/// # Thread Safety
235///
236/// This store is `Send` but not `Sync`. It's designed for single-threaded
237/// access within a reactor.
238pub struct MmapStateStore {
239    /// Index mapping keys to value entries.
240    index: BTreeMap<Vec<u8>, ValueEntry>,
241    /// Storage backend (arena or mmap).
242    storage: Storage,
243    /// Total size of keys + values for size tracking.
244    size_bytes: usize,
245    /// Next version number (persisted in index file for format compatibility).
246    next_version: u64,
247}
248
249impl MmapStateStore {
250    /// Creates a new in-memory state store with the given initial capacity.
251    ///
252    /// This mode is the fastest but data is lost when the process exits.
253    ///
254    /// # Arguments
255    ///
256    /// * `capacity` - Initial capacity in bytes for the data buffer
257    #[must_use]
258    pub fn in_memory(capacity: usize) -> Self {
259        Self {
260            index: BTreeMap::new(),
261            storage: Storage::Arena {
262                data: vec![0u8; capacity],
263                write_pos: 0,
264            },
265            size_bytes: 0,
266            next_version: 1,
267        }
268    }
269
270    /// Creates a new persistent state store backed by a memory-mapped file.
271    ///
272    /// If the file exists, it will be opened and validated. If it doesn't exist,
273    /// a new file will be created with the given initial capacity.
274    ///
275    /// # Arguments
276    ///
277    /// * `path` - Path to the state file
278    /// * `initial_capacity` - Initial file size if creating new
279    ///
280    /// # Errors
281    ///
282    /// Returns `StateError::Io` if file operations fail, or `StateError::Corruption`
283    /// if the file exists but has an invalid format.
284    pub fn persistent(path: &Path, initial_capacity: usize) -> Result<Self, StateError> {
285        let file_exists = path.exists();
286
287        let file = OpenOptions::new()
288            .read(true)
289            .write(true)
290            .create(true)
291            .truncate(false)
292            .open(path)?;
293
294        let capacity = if file_exists {
295            let metadata = file.metadata()?;
296            // On 32-bit systems this could truncate, but mmap files > 4GB aren't practical there anyway
297            #[allow(clippy::cast_possible_truncation)]
298            let cap = metadata.len() as usize;
299            cap
300        } else {
301            let capacity = initial_capacity.max(MMAP_HEADER_SIZE + 1024);
302            file.set_len(capacity as u64)?;
303            capacity
304        };
305
306        // SAFETY: We have exclusive write access to the file - it was just created or opened
307        // with read/write permissions, and no other code has access to it yet.
308        #[allow(unsafe_code)]
309        let mut mmap = unsafe { MmapMut::map_mut(&file)? };
310
311        advise_hugepages(&mmap);
312
313        let (index, write_pos, next_version) = if file_exists {
314            // Try to load existing data
315            if capacity >= MMAP_HEADER_SIZE {
316                // Try to load persisted index first
317                if let Ok(loaded) = Self::load_index(path) {
318                    // Verify consistency with mmap size?
319                    // For now trust the index file if it loads
320                    loaded
321                } else {
322                    // Fallback to loading from mmap (legacy or corrupted index)
323                    Self::load_from_mmap(&mmap)?
324                }
325            } else {
326                // Initialize new file header but empty structure
327                Self::init_mmap_header(&mut mmap);
328                (BTreeMap::new(), 0, 1)
329            }
330        } else {
331            // Initialize new file
332            Self::init_mmap_header(&mut mmap);
333            (BTreeMap::new(), 0, 1)
334        };
335
336        let size_bytes = index.iter().map(|(k, v)| k.len() + v.len).sum();
337
338        Ok(Self {
339            index,
340            storage: Storage::Mmap {
341                mmap,
342                file,
343                path: path.to_path_buf(),
344                write_pos,
345                capacity,
346            },
347            size_bytes,
348            next_version,
349        })
350    }
351
352    /// Initialize the mmap header for a new file.
353    fn init_mmap_header(mmap: &mut MmapMut) {
354        mmap[0..8].copy_from_slice(&MMAP_MAGIC.to_le_bytes());
355        mmap[8..12].copy_from_slice(&MMAP_VERSION.to_le_bytes());
356        mmap[12..20].copy_from_slice(&0u64.to_le_bytes()); // entry count
357        mmap[20..28].copy_from_slice(&0u64.to_le_bytes()); // data offset
358    }
359
360    /// Load index from existing mmap file.
361    #[allow(clippy::type_complexity)]
362    fn load_from_mmap(
363        mmap: &MmapMut,
364    ) -> Result<(BTreeMap<Vec<u8>, ValueEntry>, usize, u64), StateError> {
365        if mmap.len() < 12 {
366            return Err(StateError::Corruption("State file too short".to_string()));
367        }
368
369        // Check magic number
370        let magic = u64::from_le_bytes(mmap[0..8].try_into().unwrap());
371        if magic != MMAP_MAGIC {
372            return Err(StateError::Corruption(
373                "Invalid magic number in state file".to_string(),
374            ));
375        }
376
377        // Check version
378        let version = u32::from_le_bytes(mmap[8..12].try_into().unwrap());
379        if version != MMAP_VERSION {
380            return Err(StateError::Corruption(format!(
381                "Unsupported state file version: {version}"
382            )));
383        }
384
385        Err(StateError::Corruption(
386            "mmap file exists but no .idx index file found — \
387             data cannot be recovered without the index; \
388             call save_index()/flush() before closing the store"
389                .to_string(),
390        ))
391    }
392
393    /// Check if this store is persistent.
394    #[must_use]
395    pub fn is_persistent(&self) -> bool {
396        self.storage.is_persistent()
397    }
398
399    /// Get the path to the backing file (if persistent).
400    #[must_use]
401    pub fn path(&self) -> Option<&Path> {
402        match &self.storage {
403            Storage::Arena { .. } => None,
404            Storage::Mmap { path, .. } => Some(path),
405        }
406    }
407
408    /// Compact the store by rewriting live data.
409    ///
410    /// This removes holes left by deleted entries and reduces file/memory usage.
411    ///
412    /// # Errors
413    ///
414    /// Returns `StateError` if the compaction fails.
415    pub fn compact(&mut self) -> Result<(), StateError> {
416        // Collect all live data
417        let live_data: Vec<(Vec<u8>, Vec<u8>)> = self
418            .index
419            .iter()
420            .filter_map(|(k, entry)| {
421                let value = self.storage.get(entry.offset, entry.len)?;
422                Some((k.clone(), value.to_vec()))
423            })
424            .collect();
425
426        // Reset storage
427        self.storage.reset();
428        self.index.clear();
429        self.size_bytes = 0;
430
431        // Rewrite all data
432        for (key, value) in live_data {
433            let offset = self.storage.write(&value)?;
434            self.index.insert(
435                key.clone(),
436                ValueEntry {
437                    offset,
438                    len: value.len(),
439                },
440            );
441            self.next_version += 1;
442            self.size_bytes += key.len() + value.len();
443        }
444
445        Ok(())
446    }
447
448    /// Get the fragmentation ratio (wasted space / total space).
449    #[must_use]
450    #[allow(clippy::cast_precision_loss)]
451    pub fn fragmentation(&self) -> f64 {
452        let used = self.storage.used_bytes();
453        if used == 0 {
454            return 0.0;
455        }
456        let live: usize = self.index.values().map(|e| e.len).sum();
457        // Precision loss is acceptable for a ratio calculation
458        1.0 - (live as f64 / used as f64)
459    }
460
461    /// Save the index to disk.
462    ///
463    /// This writes the `BTreeMap` index to a separate `.idx` file.
464    /// Format: `[magic: 8B][version: 4B][last_write_pos: 8B][next_version: 8B][rkyv data]`
465    ///
466    /// # Errors
467    ///
468    /// Returns `StateError::Io` if the file cannot be created or written.
469    /// Returns `StateError::Serialization` if the index cannot be serialized.
470    pub fn save_index(&self) -> Result<(), StateError> {
471        let path = match self.path() {
472            Some(p) => p.with_extension(INDEX_EXTENSION),
473            None => return Ok(()), // Can't save index for in-memory store
474        };
475
476        let file = File::create(&path)?;
477        let mut writer = std::io::BufWriter::new(file);
478
479        // serialize index
480        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&self.index)
481            .map_err(|e| StateError::Serialization(e.to_string()))?;
482
483        // Write header
484        writer.write_all(&MMAP_MAGIC.to_le_bytes())?;
485        writer.write_all(&MMAP_VERSION.to_le_bytes())?;
486        writer.write_all(&[0u8; 4])?; // Padding for 8-byte alignment (32 bytes total)
487
488        let write_pos = self.storage.used_bytes() as u64;
489        writer.write_all(&write_pos.to_le_bytes())?;
490        writer.write_all(&self.next_version.to_le_bytes())?;
491
492        // Write data
493        writer.write_all(&bytes)?;
494        writer.flush()?;
495
496        Ok(())
497    }
498
499    /// Load index from disk.
500    #[allow(clippy::type_complexity)]
501    fn load_index(
502        state_path: &Path,
503    ) -> Result<(BTreeMap<Vec<u8>, ValueEntry>, usize, u64), StateError> {
504        let path = state_path.with_extension(INDEX_EXTENSION);
505        if !path.exists() {
506            return Err(StateError::Io(std::io::Error::new(
507                std::io::ErrorKind::NotFound,
508                "Index file not found",
509            )));
510        }
511
512        let mut file = File::open(path)?;
513        let mut buffer = Vec::new();
514        std::io::Read::read_to_end(&mut file, &mut buffer)?;
515
516        if buffer.len() < 32 {
517            // 8+4+4+8+8
518            return Err(StateError::Corruption("Index file too short".to_string()));
519        }
520
521        // Validate magic
522        let magic_bytes: [u8; 8] = buffer[0..8].try_into().unwrap();
523        if u64::from_le_bytes(magic_bytes) != MMAP_MAGIC {
524            return Err(StateError::Corruption("Invalid index magic".to_string()));
525        }
526
527        // Validate version
528        let version_bytes: [u8; 4] = buffer[8..12].try_into().unwrap();
529        if u32::from_le_bytes(version_bytes) != MMAP_VERSION {
530            return Err(StateError::Corruption("Invalid index version".to_string()));
531        }
532
533        // Skip padding (12..16)
534
535        let write_pos = usize::try_from(u64::from_le_bytes(buffer[16..24].try_into().unwrap()))
536            .map_err(|_| {
537                StateError::Corruption("write_pos exceeds platform address space".to_string())
538            })?;
539        let next_version = u64::from_le_bytes(buffer[24..32].try_into().unwrap());
540
541        let index: BTreeMap<Vec<u8>, ValueEntry> =
542            rkyv::from_bytes::<BTreeMap<Vec<u8>, ValueEntry>, rkyv::rancor::Error>(&buffer[32..])
543                .map_err(|e| StateError::Deserialization(e.to_string()))?;
544
545        Ok((index, write_pos, next_version))
546    }
547}
548
549impl StateStore for MmapStateStore {
550    #[inline]
551    fn get(&self, key: &[u8]) -> Option<Bytes> {
552        self.index.get(key).and_then(|entry| {
553            let data = self.storage.get(entry.offset, entry.len)?;
554            Some(Bytes::copy_from_slice(data))
555        })
556    }
557
558    #[inline]
559    fn get_ref(&self, key: &[u8]) -> Option<&[u8]> {
560        self.index
561            .get(key)
562            .and_then(|entry| self.storage.get(entry.offset, entry.len))
563    }
564
565    #[inline]
566    fn put(&mut self, key: &[u8], value: Bytes) -> Result<(), StateError> {
567        // Write value to storage
568        let offset = self.storage.write(&value)?;
569
570        let entry = ValueEntry {
571            offset,
572            len: value.len(),
573        };
574
575        // Fast path: update existing key without allocating key.to_vec()
576        if let Some(existing) = self.index.get_mut(key) {
577            self.size_bytes = self.size_bytes - existing.len + value.len();
578            *existing = entry;
579        } else {
580            self.size_bytes += key.len() + value.len();
581            self.index.insert(key.to_vec(), entry);
582        }
583
584        // Auto-compact when fragmentation exceeds 50% and store is non-trivial.
585        // Failure is non-fatal — the store is still correct, just wastes space.
586        if self.len() > 100 && self.fragmentation() > 0.5 {
587            if let Err(e) = self.compact() {
588                tracing::warn!(error = %e, "mmap auto-compact failed, will retry later");
589            }
590        }
591
592        Ok(())
593    }
594
595    fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
596        if let Some(entry) = self.index.remove(key) {
597            self.size_bytes -= key.len() + entry.len;
598            // Note: The space in storage becomes fragmentation
599            // Use compact() to reclaim it
600        }
601        Ok(())
602    }
603
604    fn prefix_scan<'a>(
605        &'a self,
606        prefix: &'a [u8],
607    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
608        if prefix.is_empty() {
609            return Box::new(self.index.iter().filter_map(|(k, entry)| {
610                let value = self.storage.get(entry.offset, entry.len)?;
611                Some((Bytes::copy_from_slice(k), Bytes::copy_from_slice(value)))
612            }));
613        }
614        if let Some(end) = prefix_successor(prefix) {
615            Box::new(
616                self.index
617                    .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
618                    .filter_map(|(k, entry)| {
619                        let value = self.storage.get(entry.offset, entry.len)?;
620                        Some((Bytes::copy_from_slice(k), Bytes::copy_from_slice(value)))
621                    }),
622            )
623        } else {
624            Box::new(
625                self.index
626                    .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
627                    .filter_map(|(k, entry)| {
628                        let value = self.storage.get(entry.offset, entry.len)?;
629                        Some((Bytes::copy_from_slice(k), Bytes::copy_from_slice(value)))
630                    }),
631            )
632        }
633    }
634
635    fn range_scan<'a>(
636        &'a self,
637        range: Range<&'a [u8]>,
638    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
639        Box::new(
640            self.index
641                .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
642                .filter_map(|(k, entry)| {
643                    let value = self.storage.get(entry.offset, entry.len)?;
644                    Some((Bytes::copy_from_slice(k), Bytes::copy_from_slice(value)))
645                }),
646        )
647    }
648
649    #[inline]
650    fn contains(&self, key: &[u8]) -> bool {
651        self.index.contains_key(key)
652    }
653
654    fn size_bytes(&self) -> usize {
655        self.size_bytes
656    }
657
658    fn len(&self) -> usize {
659        self.index.len()
660    }
661
662    fn snapshot(&self) -> StateSnapshot {
663        let data: Vec<(Vec<u8>, Vec<u8>)> = self
664            .index
665            .iter()
666            .filter_map(|(k, entry)| {
667                let value = self.storage.get(entry.offset, entry.len)?;
668                Some((k.clone(), value.to_vec()))
669            })
670            .collect();
671        StateSnapshot::new(data)
672    }
673
674    fn restore(&mut self, snapshot: StateSnapshot) {
675        self.index.clear();
676        self.storage.reset();
677        self.size_bytes = 0;
678        self.next_version = 1;
679
680        for (key, value) in snapshot.data() {
681            let offset = self
682                .storage
683                .write(value)
684                .expect("storage write failed during restore — state is irrecoverable");
685            self.index.insert(
686                key.clone(),
687                ValueEntry {
688                    offset,
689                    len: value.len(),
690                },
691            );
692            self.next_version += 1;
693            self.size_bytes += key.len() + value.len();
694        }
695    }
696
697    fn clear(&mut self) {
698        self.index.clear();
699        self.storage.reset();
700        self.size_bytes = 0;
701    }
702
703    fn flush(&mut self) -> Result<(), StateError> {
704        self.storage.flush()?;
705        if self.is_persistent() {
706            self.save_index()?;
707        }
708        Ok(())
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use super::*;
715    use tempfile::tempdir;
716
717    #[test]
718    fn test_in_memory_basic() {
719        let mut store = MmapStateStore::in_memory(1024);
720
721        // Test put and get
722        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
723        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
724        assert_eq!(store.len(), 1);
725
726        // Test overwrite
727        store.put(b"key1", Bytes::from_static(b"value2")).unwrap();
728        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
729        assert_eq!(store.len(), 1);
730
731        // Test delete
732        store.delete(b"key1").unwrap();
733        assert!(store.get(b"key1").is_none());
734        assert_eq!(store.len(), 0);
735    }
736
737    #[test]
738    fn test_persistent_basic() {
739        let dir = tempdir().unwrap();
740        let path = dir.path().join("state.db");
741
742        // Create store and write data
743        {
744            let mut store = MmapStateStore::persistent(&path, 4096).unwrap();
745            store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
746            store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
747            store.flush().unwrap();
748        }
749
750        // Reopen and verify (note: current implementation doesn't persist index)
751        // Full persistence would require storing the index in the file
752        {
753            let store = MmapStateStore::persistent(&path, 4096).unwrap();
754            assert!(store.is_persistent());
755            assert_eq!(store.path(), Some(path.as_path()));
756        }
757    }
758
759    #[test]
760    fn test_contains() {
761        let mut store = MmapStateStore::in_memory(1024);
762        assert!(!store.contains(b"key1"));
763
764        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
765        assert!(store.contains(b"key1"));
766
767        store.delete(b"key1").unwrap();
768        assert!(!store.contains(b"key1"));
769    }
770
771    #[test]
772    fn test_prefix_scan() {
773        let mut store = MmapStateStore::in_memory(4096);
774        store
775            .put(b"prefix:1", Bytes::from_static(b"value1"))
776            .unwrap();
777        store
778            .put(b"prefix:2", Bytes::from_static(b"value2"))
779            .unwrap();
780        store
781            .put(b"prefix:10", Bytes::from_static(b"value10"))
782            .unwrap();
783        store
784            .put(b"other:1", Bytes::from_static(b"value3"))
785            .unwrap();
786
787        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
788        assert_eq!(results.len(), 3);
789
790        for (key, _) in &results {
791            assert!(key.starts_with(b"prefix:"));
792        }
793    }
794
795    #[test]
796    fn test_range_scan() {
797        let mut store = MmapStateStore::in_memory(4096);
798        store.put(b"a", Bytes::from_static(b"1")).unwrap();
799        store.put(b"b", Bytes::from_static(b"2")).unwrap();
800        store.put(b"c", Bytes::from_static(b"3")).unwrap();
801        store.put(b"d", Bytes::from_static(b"4")).unwrap();
802
803        let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
804        assert_eq!(results.len(), 2);
805
806        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
807        assert!(keys.contains(&b"b".as_slice()));
808        assert!(keys.contains(&b"c".as_slice()));
809    }
810
811    #[test]
812    fn test_snapshot_and_restore() {
813        let mut store = MmapStateStore::in_memory(4096);
814        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
815        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
816
817        // Take snapshot
818        let snapshot = store.snapshot();
819        assert_eq!(snapshot.len(), 2);
820
821        // Modify store
822        store.put(b"key1", Bytes::from_static(b"modified")).unwrap();
823        store.put(b"key3", Bytes::from_static(b"value3")).unwrap();
824        store.delete(b"key2").unwrap();
825
826        assert_eq!(store.len(), 2);
827        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
828
829        // Restore from snapshot
830        store.restore(snapshot);
831
832        assert_eq!(store.len(), 2);
833        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
834        assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
835        assert!(store.get(b"key3").is_none());
836    }
837
838    #[test]
839    fn test_size_tracking() {
840        let mut store = MmapStateStore::in_memory(4096);
841        assert_eq!(store.size_bytes(), 0);
842
843        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
844        assert_eq!(store.size_bytes(), 4 + 6); // "key1" + "value1"
845
846        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
847        assert_eq!(store.size_bytes(), (4 + 6) * 2);
848
849        // Overwrite with smaller value (old value becomes fragmentation)
850        store.put(b"key1", Bytes::from_static(b"v1")).unwrap();
851        assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6);
852
853        store.delete(b"key1").unwrap();
854        assert_eq!(store.size_bytes(), 4 + 6);
855
856        store.clear();
857        assert_eq!(store.size_bytes(), 0);
858    }
859
860    #[test]
861    fn test_compact() {
862        let mut store = MmapStateStore::in_memory(4096);
863
864        // Add some data
865        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
866        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
867        store.put(b"key3", Bytes::from_static(b"value3")).unwrap();
868
869        // Delete middle key to create fragmentation
870        store.delete(b"key2").unwrap();
871
872        // Overwrite to create more fragmentation
873        store
874            .put(b"key1", Bytes::from_static(b"new_value1"))
875            .unwrap();
876
877        let frag_before = store.fragmentation();
878        assert!(frag_before > 0.0);
879
880        // Compact
881        store.compact().unwrap();
882
883        let frag_after = store.fragmentation();
884        assert!(frag_after < frag_before);
885        assert!(frag_after.abs() < f64::EPSILON); // Should be zero after compaction
886
887        // Verify data integrity
888        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("new_value1"));
889        assert!(store.get(b"key2").is_none());
890        assert_eq!(store.get(b"key3").unwrap(), Bytes::from("value3"));
891    }
892
893    #[test]
894    fn test_growth() {
895        // Start with very small capacity
896        let mut store = MmapStateStore::in_memory(32);
897
898        // Add data that exceeds initial capacity
899        for i in 0..100 {
900            let key = format!("key{i:04}");
901            let value = format!("value{i:04}");
902            store.put(key.as_bytes(), Bytes::from(value)).unwrap();
903        }
904
905        assert_eq!(store.len(), 100);
906
907        // Verify all data is accessible
908        for i in 0..100 {
909            let key = format!("key{i:04}");
910            let expected = format!("value{i:04}");
911            assert_eq!(
912                store.get(key.as_bytes()).unwrap().as_ref(),
913                expected.as_bytes()
914            );
915        }
916    }
917
918    #[test]
919    fn test_clear() {
920        let mut store = MmapStateStore::in_memory(4096);
921        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
922        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
923
924        assert_eq!(store.len(), 2);
925        assert!(store.size_bytes() > 0);
926
927        store.clear();
928
929        assert_eq!(store.len(), 0);
930        assert_eq!(store.size_bytes(), 0);
931        assert!(store.get(b"key1").is_none());
932    }
933
934    #[test]
935    fn test_empty_store() {
936        let store = MmapStateStore::in_memory(1024);
937        assert!(store.is_empty());
938        assert_eq!(store.len(), 0);
939        assert_eq!(store.size_bytes(), 0);
940        assert!(store.get(b"nonexistent").is_none());
941        assert!(!store.contains(b"nonexistent"));
942    }
943
944    #[test]
945    fn test_large_values() {
946        let mut store = MmapStateStore::in_memory(1024 * 1024);
947
948        // 100KB value
949        let large_value = vec![0xABu8; 100 * 1024];
950        store
951            .put(b"large", Bytes::from(large_value.clone()))
952            .unwrap();
953
954        let retrieved = store.get(b"large").unwrap();
955        assert_eq!(retrieved.len(), large_value.len());
956        assert_eq!(retrieved.as_ref(), &large_value[..]);
957    }
958
959    #[test]
960    fn test_binary_keys_and_values() {
961        let mut store = MmapStateStore::in_memory(4096);
962
963        // Binary key with null bytes
964        let key = [0x00, 0x01, 0x02, 0xFF, 0xFE];
965        let value = [0xDE, 0xAD, 0xBE, 0xEF];
966
967        store.put(&key, Bytes::copy_from_slice(&value)).unwrap();
968        assert_eq!(store.get(&key).unwrap().as_ref(), &value);
969    }
970
971    #[test]
972    fn test_index_persistence() {
973        let temp_dir = tempfile::tempdir().unwrap();
974        let db_path = temp_dir.path().join("test_index.db");
975
976        // 1. Create persistent store and add data
977        {
978            let mut store = MmapStateStore::persistent(&db_path, 1024 * 1024).unwrap();
979            store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
980            store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
981            // Flush should save the index
982            store.flush().unwrap();
983        }
984
985        // 2. Verify index file exists
986        let idx_path = db_path.with_extension(INDEX_EXTENSION);
987        assert!(idx_path.exists());
988
989        // 3. Re-open store and verify data "instant" availability (via index load)
990        {
991            let store = MmapStateStore::persistent(&db_path, 1024 * 1024).unwrap();
992            // Store size should be consistent
993            assert_eq!(store.len(), 2);
994            assert_eq!(store.get(b"key1").unwrap().as_ref(), b"value1");
995        }
996    }
997}