Skip to main content

laminar_core/state/
mod.rs

1//! # State Store Module
2//!
3//! High-performance state storage for streaming operators.
4//!
5//! ## Design Goals
6//!
7//! - **< 500ns lookup latency** for point queries
8//! - **Zero-copy** access where possible
9//! - **Lock-free** for single-threaded access
10//! - **Memory-mapped** for large state
11//!
12//! ## State Backends
13//!
14//! - **[`InMemoryStore`]**: BTreeMap-based, fast lookups with O(log n + k) prefix/range scans
15//! - **[`MmapStateStore`]**: Memory-mapped, supports larger-than-memory state with optional persistence
16//!
17//! ## Example
18//!
19//! ```rust
20//! use bytes::Bytes;
21//! use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
22//!
23//! let mut store = InMemoryStore::new();
24//!
25//! // Basic key-value operations
26//! store.put(b"user:1", Bytes::from_static(b"alice")).unwrap();
27//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
28//!
29//! // Typed state access (requires StateStoreExt)
30//! store.put_typed(b"count", &42u64).unwrap();
31//! let count: u64 = store.get_typed(b"count").unwrap().unwrap();
32//! assert_eq!(count, 42);
33//!
34//! // Snapshots for checkpointing
35//! let snapshot = store.snapshot();
36//! store.delete(b"user:1").unwrap();
37//! assert!(store.get(b"user:1").is_none());
38//!
39//! // Restore from snapshot
40//! store.restore(snapshot);
41//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
42//! ```
43//!
44//! ## Memory-Mapped Store Example
45//!
46//! ```rust,no_run
47//! use bytes::Bytes;
48//! use laminar_core::state::{StateStore, MmapStateStore};
49//! use std::path::Path;
50//!
51//! // In-memory mode (fast, not persistent)
52//! let mut store = MmapStateStore::in_memory(1024 * 1024);
53//! store.put(b"key", Bytes::from_static(b"value")).unwrap();
54//!
55//! // Persistent mode (survives restarts)
56//! let mut persistent = MmapStateStore::persistent(
57//!     Path::new("/tmp/state.db"),
58//!     1024 * 1024
59//! ).unwrap();
60//! persistent.put(b"key", Bytes::from_static(b"value")).unwrap();
61//! persistent.flush().unwrap();
62//! ```
63
64use bytes::Bytes;
65use rkyv::{
66    api::high::{self, HighDeserializer, HighSerializer, HighValidator},
67    bytecheck::CheckBytes,
68    rancor::Error as RkyvError,
69    ser::allocator::ArenaHandle,
70    util::AlignedVec,
71    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
72};
73use std::cell::RefCell;
74use std::collections::BTreeMap;
75use std::ops::Bound;
76use std::ops::Range;
77
78/// Compute the lexicographic successor of a byte prefix.
79///
80/// Returns `None` if no successor exists (empty prefix or all bytes are 0xFF).
81/// Used by `BTreeMap::range()` to efficiently bound prefix scans.
82pub(crate) fn prefix_successor(prefix: &[u8]) -> Option<smallvec::SmallVec<[u8; 64]>> {
83    if prefix.is_empty() {
84        return None;
85    }
86    let mut successor = smallvec::SmallVec::<[u8; 64]>::from_slice(prefix);
87    // Walk backwards, incrementing the last non-0xFF byte
88    while let Some(last) = successor.last_mut() {
89        if *last < 0xFF {
90            *last += 1;
91            return Some(successor);
92        }
93        successor.pop();
94    }
95    // All bytes were 0xFF — no successor exists
96    None
97}
98
99/// Trait for state store implementations.
100///
101/// This is the core abstraction for operator state in Ring 0 (hot path).
102/// All implementations must achieve < 500ns lookup latency for point queries.
103///
104/// # Thread Safety
105///
106/// State stores are `Send` but not `Sync`. They are designed for single-threaded
107/// access within a reactor. Cross-thread communication uses SPSC queues.
108///
109/// # Memory Model
110///
111/// - `get()` returns `Bytes` which is a cheap reference-counted handle
112/// - `put()` copies the input to internal storage
113/// - Snapshots are copy-on-write where possible
114///
115/// # Dyn Compatibility
116///
117/// This trait is dyn-compatible for use with `Box<dyn StateStore>`. For generic
118/// convenience methods like `get_typed` and `put_typed`, use the [`StateStoreExt`]
119/// extension trait.
120pub trait StateStore: Send {
121    /// Get a value by key.
122    ///
123    /// Returns `None` if the key does not exist.
124    ///
125    /// # Performance
126    ///
127    /// Target: < 500ns for in-memory stores.
128    fn get(&self, key: &[u8]) -> Option<Bytes>;
129
130    /// Store a key-value pair.
131    ///
132    /// If the key already exists, the value is overwritten.
133    /// Accepts owned `Bytes` to avoid mandatory copy — callers with `&[u8]`
134    /// use `Bytes::copy_from_slice()` at the call site.
135    ///
136    /// # Errors
137    ///
138    /// Returns `StateError` if the operation fails (e.g., disk full for
139    /// memory-mapped stores).
140    fn put(&mut self, key: &[u8], value: Bytes) -> Result<(), StateError>;
141
142    /// Delete a key.
143    ///
144    /// No error is returned if the key does not exist.
145    ///
146    /// # Errors
147    ///
148    /// Returns `StateError` if the operation fails.
149    fn delete(&mut self, key: &[u8]) -> Result<(), StateError>;
150
151    /// Scan all keys with a given prefix.
152    ///
153    /// Returns an iterator over matching (key, value) pairs in
154    /// lexicographic order.
155    ///
156    /// # Performance
157    ///
158    /// O(log n + k) where n is the total number of keys and k is the
159    /// number of matching entries.
160    fn prefix_scan<'a>(&'a self, prefix: &'a [u8])
161        -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
162
163    /// Range scan between two keys (exclusive end).
164    ///
165    /// Returns an iterator over keys where `start <= key < end`
166    /// in lexicographic order.
167    ///
168    /// # Performance
169    ///
170    /// O(log n + k) where n is the total number of keys and k is the
171    /// number of matching entries.
172    fn range_scan<'a>(
173        &'a self,
174        range: Range<&'a [u8]>,
175    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
176
177    /// Check if a key exists.
178    ///
179    /// More efficient than `get()` when you don't need the value.
180    fn contains(&self, key: &[u8]) -> bool {
181        self.get(key).is_some()
182    }
183
184    /// Get approximate size in bytes.
185    ///
186    /// This includes both keys and values. The exact accounting may vary
187    /// by implementation.
188    fn size_bytes(&self) -> usize;
189
190    /// Get the number of entries in the store.
191    fn len(&self) -> usize;
192
193    /// Check if the store is empty.
194    fn is_empty(&self) -> bool {
195        self.len() == 0
196    }
197
198    /// Create a snapshot for checkpointing.
199    ///
200    /// The snapshot captures the current state and can be used to restore
201    /// the store to this point in time. Snapshots are serializable for
202    /// persistence.
203    ///
204    /// # Implementation Notes
205    ///
206    /// For in-memory stores, this clones the data. For memory-mapped stores,
207    /// this may use copy-on-write semantics.
208    fn snapshot(&self) -> StateSnapshot;
209
210    /// Restore from a snapshot.
211    ///
212    /// This replaces the current state with the snapshot's state.
213    /// Any changes since the snapshot was taken are lost.
214    fn restore(&mut self, snapshot: StateSnapshot);
215
216    /// Clear all entries.
217    fn clear(&mut self);
218
219    /// Flush any pending writes to durable storage.
220    ///
221    /// For in-memory stores, this is a no-op. For memory-mapped or
222    /// disk-backed stores, this ensures data is persisted.
223    ///
224    /// # Errors
225    ///
226    /// Returns `StateError` if the flush operation fails.
227    fn flush(&mut self) -> Result<(), StateError> {
228        Ok(()) // Default no-op for in-memory stores
229    }
230
231    /// Get a zero-copy reference to a value by key.
232    ///
233    /// Returns a direct `&[u8]` slice into the store's internal buffer,
234    /// avoiding the `Bytes` ref-count overhead. Only backends that own
235    /// their storage contiguously (e.g., `AHashMapStore`) can implement
236    /// this; others return `None` and callers fall back to [`get`](Self::get).
237    ///
238    /// # Lifetime
239    ///
240    /// The returned slice borrows `self`, so no mutations are allowed
241    /// while the reference is live.
242    fn get_ref(&self, _key: &[u8]) -> Option<&[u8]> {
243        None
244    }
245
246    /// Get a value or insert a default.
247    ///
248    /// If the key doesn't exist, the default is inserted and returned.
249    ///
250    /// # Errors
251    ///
252    /// Returns `StateError` if inserting the default value fails.
253    fn get_or_insert(&mut self, key: &[u8], default: &[u8]) -> Result<Bytes, StateError> {
254        if let Some(value) = self.get(key) {
255            Ok(value)
256        } else {
257            let value = Bytes::copy_from_slice(default);
258            self.put(key, value.clone())?;
259            Ok(value)
260        }
261    }
262}
263
264/// Extension trait for [`StateStore`] providing typed access methods.
265///
266/// These methods use generics and thus cannot be part of the dyn-compatible
267/// `StateStore` trait. Import this trait to use typed access on any state store.
268///
269/// Uses rkyv for zero-copy serialization. Types must derive `Archive`,
270/// `rkyv::Serialize`, and `rkyv::Deserialize`.
271///
272/// # Example
273///
274/// ```rust,ignore
275/// use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
276/// use rkyv::{Archive, Deserialize, Serialize};
277///
278/// #[derive(Archive, Serialize, Deserialize)]
279/// #[rkyv(check_bytes)]
280/// struct Counter { value: u64 }
281///
282/// let mut store = InMemoryStore::new();
283/// store.put_typed(b"count", &Counter { value: 42 }).unwrap();
284/// let count: Counter = store.get_typed(b"count").unwrap().unwrap();
285/// assert_eq!(count.value, 42);
286/// ```
287pub trait StateStoreExt: StateStore {
288    /// Get a value and deserialize it using rkyv.
289    ///
290    /// Uses zero-copy access where possible, falling back to full
291    /// deserialization to return an owned value.
292    ///
293    /// # Errors
294    ///
295    /// Returns `StateError::Serialization` if deserialization fails.
296    fn get_typed<T>(&self, key: &[u8]) -> Result<Option<T>, StateError>
297    where
298        T: Archive,
299        T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
300            + RkyvDeserialize<T, HighDeserializer<RkyvError>>,
301    {
302        // Prefer zero-copy get_ref when available
303        let bytes_ref = self.get_ref(key);
304        let bytes_owned;
305        let data: &[u8] = if let Some(r) = bytes_ref {
306            r
307        } else if let Some(b) = self.get(key) {
308            bytes_owned = b;
309            &bytes_owned
310        } else {
311            return Ok(None);
312        };
313
314        let archived = rkyv::access::<T::Archived, RkyvError>(data)
315            .map_err(|e| StateError::Serialization(e.to_string()))?;
316        let value = rkyv::deserialize::<T, RkyvError>(archived)
317            .map_err(|e| StateError::Serialization(e.to_string()))?;
318        Ok(Some(value))
319    }
320
321    /// Serialize and store a value using rkyv.
322    ///
323    /// Uses a thread-local reusable buffer to avoid per-call heap allocation
324    /// on the hot path. The buffer is cleared and reused between calls.
325    ///
326    /// # Errors
327    ///
328    /// Returns `StateError::Serialization` if serialization fails.
329    fn put_typed<T>(&mut self, key: &[u8], value: &T) -> Result<(), StateError>
330    where
331        T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
332    {
333        thread_local! {
334            static SERIALIZE_BUF: RefCell<AlignedVec> =
335                RefCell::new(AlignedVec::with_capacity(256));
336        }
337
338        SERIALIZE_BUF.with(|buf| {
339            let mut vec = buf.borrow_mut();
340            vec.clear();
341            // Take ownership to pass to to_bytes_in, then put it back
342            let taken = std::mem::take(&mut *vec);
343            match high::to_bytes_in::<_, RkyvError>(value, taken) {
344                Ok(filled) => {
345                    let result = self.put(key, Bytes::copy_from_slice(&filled));
346                    *vec = filled;
347                    result
348                }
349                Err(e) => {
350                    // Restore an empty buffer on error
351                    *vec = AlignedVec::new();
352                    Err(StateError::Serialization(e.to_string()))
353                }
354            }
355        })
356    }
357
358    /// Update a value in place using a closure.
359    ///
360    /// The update function receives the current value (or None) and returns
361    /// the new value. If `None` is returned, the key is deleted.
362    ///
363    /// # Errors
364    ///
365    /// Returns `StateError` if the put or delete operation fails.
366    fn update<F>(&mut self, key: &[u8], f: F) -> Result<(), StateError>
367    where
368        F: FnOnce(Option<Bytes>) -> Option<Vec<u8>>,
369    {
370        let current = self.get(key);
371        match f(current) {
372            Some(new_value) => self.put(key, Bytes::from(new_value)),
373            None => self.delete(key),
374        }
375    }
376}
377
378// Blanket implementation for all StateStore types
379impl<T: StateStore + ?Sized> StateStoreExt for T {}
380
381/// A snapshot of state store contents for checkpointing.
382///
383/// Snapshots can be serialized for persistence and restored later.
384/// They capture the complete state at a point in time.
385///
386/// Uses rkyv for zero-copy deserialization on the hot path.
387#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
388pub struct StateSnapshot {
389    /// Serialized state data
390    data: Vec<(Vec<u8>, Vec<u8>)>,
391    /// Timestamp when snapshot was created (nanoseconds since epoch)
392    timestamp_ns: u64,
393    /// Version for forward compatibility
394    version: u32,
395}
396
397impl StateSnapshot {
398    /// Create a new snapshot from key-value pairs.
399    #[must_use]
400    #[allow(clippy::cast_possible_truncation)]
401    pub fn new(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
402        Self {
403            data,
404            // Truncation is acceptable here - we won't hit u64 overflow until ~584 years from epoch
405            timestamp_ns: std::time::SystemTime::now()
406                .duration_since(std::time::UNIX_EPOCH)
407                .map(|d| d.as_nanos() as u64)
408                .unwrap_or(0),
409            version: 1,
410        }
411    }
412
413    /// Get the snapshot data.
414    #[must_use]
415    pub fn data(&self) -> &[(Vec<u8>, Vec<u8>)] {
416        &self.data
417    }
418
419    /// Get the snapshot timestamp.
420    #[must_use]
421    pub fn timestamp_ns(&self) -> u64 {
422        self.timestamp_ns
423    }
424
425    /// Get the number of entries in the snapshot.
426    #[must_use]
427    pub fn len(&self) -> usize {
428        self.data.len()
429    }
430
431    /// Check if the snapshot is empty.
432    #[must_use]
433    pub fn is_empty(&self) -> bool {
434        self.data.is_empty()
435    }
436
437    /// Get the approximate size in bytes.
438    #[must_use]
439    pub fn size_bytes(&self) -> usize {
440        self.data.iter().map(|(k, v)| k.len() + v.len()).sum()
441    }
442
443    /// Serialize the snapshot to bytes using rkyv.
444    ///
445    /// Returns an aligned byte vector for optimal zero-copy access.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if serialization fails.
450    pub fn to_bytes(&self) -> Result<AlignedVec, StateError> {
451        rkyv::to_bytes::<RkyvError>(self).map_err(|e| StateError::Serialization(e.to_string()))
452    }
453
454    /// Deserialize a snapshot from bytes using rkyv.
455    ///
456    /// Uses zero-copy access internally for performance.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if deserialization fails.
461    pub fn from_bytes(bytes: &[u8]) -> Result<Self, StateError> {
462        let archived = rkyv::access::<<Self as Archive>::Archived, RkyvError>(bytes)
463            .map_err(|e| StateError::Serialization(e.to_string()))?;
464        rkyv::deserialize::<Self, RkyvError>(archived)
465            .map_err(|e| StateError::Serialization(e.to_string()))
466    }
467}
468
469/// In-memory state store using `BTreeMap` for sorted key access.
470///
471/// This state store is suitable for state that fits in memory. It uses
472/// `BTreeMap` which provides O(log n + k) prefix and range scans, making
473/// it efficient for join state and windowed aggregation lookups.
474///
475/// # Performance Characteristics
476///
477/// - **Get**: O(log n), < 500ns typical
478/// - **Put**: O(log n), may allocate
479/// - **Delete**: O(log n)
480/// - **Prefix scan**: O(log n + k) where k is matching entries
481/// - **Range scan**: O(log n + k) where k is matching entries
482///
483/// # Memory Usage
484///
485/// Keys and values are stored as owned `Vec<u8>` and `Bytes` respectively.
486/// Use `size_bytes()` to monitor memory usage.
487pub struct InMemoryStore {
488    /// The underlying sorted map
489    data: BTreeMap<Vec<u8>, Bytes>,
490    /// Track total size for monitoring
491    size_bytes: usize,
492}
493
494impl InMemoryStore {
495    /// Creates a new empty in-memory store.
496    #[must_use]
497    pub fn new() -> Self {
498        Self {
499            data: BTreeMap::new(),
500            size_bytes: 0,
501        }
502    }
503
504    /// Creates a new in-memory store.
505    ///
506    /// The capacity hint is accepted for API compatibility but has no
507    /// effect — `BTreeMap` does not support pre-allocation.
508    #[must_use]
509    pub fn with_capacity(_capacity: usize) -> Self {
510        Self::new()
511    }
512
513    /// Returns the number of entries in the store.
514    ///
515    /// `BTreeMap` does not expose a capacity concept, so this returns
516    /// the current entry count.
517    #[must_use]
518    pub fn capacity(&self) -> usize {
519        self.data.len()
520    }
521
522    /// No-op for API compatibility.
523    ///
524    /// `BTreeMap` manages its own memory and does not support
525    /// explicit shrinking.
526    pub fn shrink_to_fit(&mut self) {
527        // BTreeMap does not support shrink_to_fit
528    }
529}
530
531impl Default for InMemoryStore {
532    fn default() -> Self {
533        Self::new()
534    }
535}
536
537impl StateStore for InMemoryStore {
538    #[inline]
539    fn get(&self, key: &[u8]) -> Option<Bytes> {
540        self.data.get(key).cloned()
541    }
542
543    #[inline]
544    fn get_ref(&self, key: &[u8]) -> Option<&[u8]> {
545        self.data.get(key).map(Bytes::as_ref)
546    }
547
548    #[inline]
549    fn put(&mut self, key: &[u8], value: Bytes) -> Result<(), StateError> {
550        // Check-then-insert: avoids key.to_vec() allocation on the
551        // update path, which is the common case for accumulator state.
552        if let Some(existing) = self.data.get_mut(key) {
553            self.size_bytes -= existing.len();
554            self.size_bytes += value.len();
555            *existing = value;
556        } else {
557            self.size_bytes += key.len() + value.len();
558            self.data.insert(key.to_vec(), value);
559        }
560        Ok(())
561    }
562
563    fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
564        if let Some(old_value) = self.data.remove(key) {
565            self.size_bytes -= key.len() + old_value.len();
566        }
567        Ok(())
568    }
569
570    fn prefix_scan<'a>(
571        &'a self,
572        prefix: &'a [u8],
573    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
574        if prefix.is_empty() {
575            // Empty prefix matches everything
576            return Box::new(
577                self.data
578                    .iter()
579                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
580            );
581        }
582        if let Some(end) = prefix_successor(prefix) {
583            Box::new(
584                self.data
585                    .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
586                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
587            )
588        } else {
589            // All-0xFF prefix: scan from prefix to end
590            Box::new(
591                self.data
592                    .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
593                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
594            )
595        }
596    }
597
598    fn range_scan<'a>(
599        &'a self,
600        range: Range<&'a [u8]>,
601    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
602        Box::new(
603            self.data
604                .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
605                .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
606        )
607    }
608
609    #[inline]
610    fn contains(&self, key: &[u8]) -> bool {
611        self.data.contains_key(key.as_ref())
612    }
613
614    fn size_bytes(&self) -> usize {
615        self.size_bytes
616    }
617
618    fn len(&self) -> usize {
619        self.data.len()
620    }
621
622    fn snapshot(&self) -> StateSnapshot {
623        let data: Vec<(Vec<u8>, Vec<u8>)> = self
624            .data
625            .iter()
626            .map(|(k, v)| (k.clone(), v.to_vec()))
627            .collect();
628        StateSnapshot::new(data)
629    }
630
631    fn restore(&mut self, snapshot: StateSnapshot) {
632        self.data.clear();
633        self.size_bytes = 0;
634
635        for (key, value) in snapshot.data {
636            self.size_bytes += key.len() + value.len();
637            self.data.insert(key, Bytes::from(value));
638        }
639    }
640
641    fn clear(&mut self) {
642        self.data.clear();
643        self.size_bytes = 0;
644    }
645}
646
647/// Errors that can occur in state operations.
648#[derive(Debug, thiserror::Error)]
649pub enum StateError {
650    /// I/O error
651    #[error("I/O error: {0}")]
652    Io(#[from] std::io::Error),
653
654    /// Serialization error
655    #[error("Serialization error: {0}")]
656    Serialization(String),
657
658    /// Deserialization error
659    #[error("Deserialization error: {0}")]
660    Deserialization(String),
661
662    /// Corruption error
663    #[error("Corruption error: {0}")]
664    Corruption(String),
665}
666
667mod mmap;
668
669/// AHashMap-backed state store with O(1) lookups and zero-copy reads.
670pub mod ahash_store;
671
672/// Dictionary key encoder for low-cardinality GROUP BY keys.
673pub mod dict_encoder;
674
675// Re-export main types
676pub use self::StateError as Error;
677pub use ahash_store::AHashMapStore;
678pub use dict_encoder::DictionaryKeyEncoder;
679pub use mmap::MmapStateStore;
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684
685    #[test]
686    fn test_in_memory_store_basic() {
687        let mut store = InMemoryStore::new();
688
689        // Test put and get
690        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
691        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
692        assert_eq!(store.len(), 1);
693
694        // Test overwrite
695        store.put(b"key1", Bytes::from_static(b"value2")).unwrap();
696        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
697        assert_eq!(store.len(), 1);
698
699        // Test delete
700        store.delete(b"key1").unwrap();
701        assert!(store.get(b"key1").is_none());
702        assert_eq!(store.len(), 0);
703
704        // Test delete non-existent key (should not error)
705        store.delete(b"nonexistent").unwrap();
706    }
707
708    #[test]
709    fn test_contains() {
710        let mut store = InMemoryStore::new();
711        assert!(!store.contains(b"key1"));
712
713        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
714        assert!(store.contains(b"key1"));
715
716        store.delete(b"key1").unwrap();
717        assert!(!store.contains(b"key1"));
718    }
719
720    #[test]
721    fn test_prefix_scan() {
722        let mut store = InMemoryStore::new();
723        store
724            .put(b"prefix:1", Bytes::from_static(b"value1"))
725            .unwrap();
726        store
727            .put(b"prefix:2", Bytes::from_static(b"value2"))
728            .unwrap();
729        store
730            .put(b"prefix:10", Bytes::from_static(b"value10"))
731            .unwrap();
732        store
733            .put(b"other:1", Bytes::from_static(b"value3"))
734            .unwrap();
735
736        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
737        assert_eq!(results.len(), 3);
738
739        // All results should have the prefix
740        for (key, _) in &results {
741            assert!(key.starts_with(b"prefix:"));
742        }
743
744        // Empty prefix returns all
745        let all: Vec<_> = store.prefix_scan(b"").collect();
746        assert_eq!(all.len(), 4);
747    }
748
749    #[test]
750    fn test_range_scan() {
751        let mut store = InMemoryStore::new();
752        store.put(b"a", Bytes::from_static(b"1")).unwrap();
753        store.put(b"b", Bytes::from_static(b"2")).unwrap();
754        store.put(b"c", Bytes::from_static(b"3")).unwrap();
755        store.put(b"d", Bytes::from_static(b"4")).unwrap();
756
757        let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
758        assert_eq!(results.len(), 2);
759
760        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
761        assert!(keys.contains(&b"b".as_slice()));
762        assert!(keys.contains(&b"c".as_slice()));
763    }
764
765    #[test]
766    fn test_snapshot_and_restore() {
767        let mut store = InMemoryStore::new();
768        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
769        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
770
771        // Take snapshot
772        let snapshot = store.snapshot();
773        assert_eq!(snapshot.len(), 2);
774
775        // Modify store
776        store.put(b"key1", Bytes::from_static(b"modified")).unwrap();
777        store.put(b"key3", Bytes::from_static(b"value3")).unwrap();
778        store.delete(b"key2").unwrap();
779
780        assert_eq!(store.len(), 2);
781        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
782
783        // Restore from snapshot
784        store.restore(snapshot);
785
786        assert_eq!(store.len(), 2);
787        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
788        assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
789        assert!(store.get(b"key3").is_none());
790    }
791
792    #[test]
793    fn test_snapshot_serialization() {
794        let mut store = InMemoryStore::new();
795        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
796        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
797
798        let snapshot = store.snapshot();
799
800        // Serialize and deserialize
801        let bytes = snapshot.to_bytes().unwrap();
802        let restored = StateSnapshot::from_bytes(&bytes).unwrap();
803
804        assert_eq!(restored.len(), snapshot.len());
805        assert_eq!(restored.data(), snapshot.data());
806    }
807
808    #[test]
809    fn test_typed_access() {
810        let mut store = InMemoryStore::new();
811
812        // Test with integer
813        store.put_typed(b"count", &42u64).unwrap();
814        let count: u64 = store.get_typed(b"count").unwrap().unwrap();
815        assert_eq!(count, 42);
816
817        // Test with string
818        store.put_typed(b"name", &String::from("alice")).unwrap();
819        let name: String = store.get_typed(b"name").unwrap().unwrap();
820        assert_eq!(name, "alice");
821
822        // Test with vector (complex type)
823        let nums = vec![1i64, 2, 3, 4, 5];
824        store.put_typed(b"nums", &nums).unwrap();
825        let restored: Vec<i64> = store.get_typed(b"nums").unwrap().unwrap();
826        assert_eq!(restored, nums);
827
828        // Test non-existent key
829        let missing: Option<u64> = store.get_typed(b"missing").unwrap();
830        assert!(missing.is_none());
831    }
832
833    #[test]
834    fn test_get_or_insert() {
835        let mut store = InMemoryStore::new();
836
837        // First call inserts default
838        let value = store.get_or_insert(b"key1", b"default").unwrap();
839        assert_eq!(value, Bytes::from("default"));
840        assert_eq!(store.len(), 1);
841
842        // Second call returns existing
843        store.put(b"key1", Bytes::from_static(b"modified")).unwrap();
844        let value = store.get_or_insert(b"key1", b"default").unwrap();
845        assert_eq!(value, Bytes::from("modified"));
846    }
847
848    #[test]
849    fn test_update() {
850        let mut store = InMemoryStore::new();
851        store
852            .put(b"counter", Bytes::from_static(b"\x00\x00\x00\x00"))
853            .unwrap();
854
855        // Update existing
856        store
857            .update(b"counter", |current| {
858                let val = current.map_or(0u32, |b| {
859                    u32::from_le_bytes(b.as_ref().try_into().unwrap_or([0; 4]))
860                });
861                Some((val + 1).to_le_bytes().to_vec())
862            })
863            .unwrap();
864
865        let bytes = store.get(b"counter").unwrap();
866        let val = u32::from_le_bytes(bytes.as_ref().try_into().unwrap());
867        assert_eq!(val, 1);
868
869        // Update to delete
870        store.update(b"counter", |_| None).unwrap();
871        assert!(store.get(b"counter").is_none());
872    }
873
874    #[test]
875    fn test_size_tracking() {
876        let mut store = InMemoryStore::new();
877        assert_eq!(store.size_bytes(), 0);
878
879        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
880        assert_eq!(store.size_bytes(), 4 + 6); // "key1" + "value1"
881
882        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
883        assert_eq!(store.size_bytes(), (4 + 6) * 2);
884
885        // Overwrite with smaller value
886        store.put(b"key1", Bytes::from_static(b"v1")).unwrap();
887        assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6); // "key1" + "v1" + "key2" + "value2"
888
889        store.delete(b"key1").unwrap();
890        assert_eq!(store.size_bytes(), 4 + 6);
891
892        store.clear();
893        assert_eq!(store.size_bytes(), 0);
894    }
895
896    #[test]
897    fn test_with_capacity() {
898        let store = InMemoryStore::with_capacity(1000);
899        // BTreeMap does not pre-allocate; capacity() returns len() which is 0
900        assert_eq!(store.capacity(), 0);
901        assert!(store.is_empty());
902    }
903
904    #[test]
905    fn test_clear() {
906        let mut store = InMemoryStore::new();
907        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
908        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
909
910        assert_eq!(store.len(), 2);
911        assert!(store.size_bytes() > 0);
912
913        store.clear();
914
915        assert_eq!(store.len(), 0);
916        assert_eq!(store.size_bytes(), 0);
917        assert!(store.get(b"key1").is_none());
918    }
919
920    #[test]
921    fn test_prefix_successor() {
922        // Normal case
923        assert_eq!(prefix_successor(b"abc").as_deref(), Some(b"abd".as_slice()));
924
925        // Empty prefix
926        assert!(prefix_successor(b"").is_none());
927
928        // All 0xFF bytes — no successor
929        assert!(prefix_successor(&[0xFF, 0xFF, 0xFF]).is_none());
930
931        // Trailing 0xFF bytes are truncated and previous byte incremented
932        assert_eq!(
933            prefix_successor(&[0x01, 0xFF]).as_deref(),
934            Some([0x02].as_slice())
935        );
936        assert_eq!(
937            prefix_successor(&[0x01, 0x02, 0xFF]).as_deref(),
938            Some([0x01, 0x03].as_slice())
939        );
940
941        // Single byte
942        assert_eq!(
943            prefix_successor(&[0x00]).as_deref(),
944            Some([0x01].as_slice())
945        );
946        assert_eq!(
947            prefix_successor(&[0xFE]).as_deref(),
948            Some([0xFF].as_slice())
949        );
950        assert!(prefix_successor(&[0xFF]).is_none());
951    }
952
953    #[test]
954    fn test_prefix_scan_binary_keys() {
955        let mut store = InMemoryStore::new();
956
957        // Simulate join state keys: partition_prefix + key_hash
958        let prefix_a = [0x00, 0x01]; // partition 0, stream 1
959        let prefix_b = [0x00, 0x02]; // partition 0, stream 2
960
961        store
962            .put(&[0x00, 0x01, 0xAA], Bytes::from_static(b"val1"))
963            .unwrap();
964        store
965            .put(&[0x00, 0x01, 0xBB], Bytes::from_static(b"val2"))
966            .unwrap();
967        store
968            .put(&[0x00, 0x02, 0xCC], Bytes::from_static(b"val3"))
969            .unwrap();
970        store
971            .put(&[0x00, 0x02, 0xDD], Bytes::from_static(b"val4"))
972            .unwrap();
973        store
974            .put(&[0x01, 0x01, 0xEE], Bytes::from_static(b"val5"))
975            .unwrap();
976
977        // Prefix scan for partition_a
978        let results_a: Vec<_> = store.prefix_scan(&prefix_a).collect();
979        assert_eq!(results_a.len(), 2);
980        for (key, _) in &results_a {
981            assert!(key.starts_with(&prefix_a));
982        }
983
984        // Prefix scan for partition_b
985        let results_b: Vec<_> = store.prefix_scan(&prefix_b).collect();
986        assert_eq!(results_b.len(), 2);
987        for (key, _) in &results_b {
988            assert!(key.starts_with(&prefix_b));
989        }
990
991        // Prefix scan with all-0xFF prefix
992        let results_ff: Vec<_> = store.prefix_scan(&[0xFF, 0xFF]).collect();
993        assert_eq!(results_ff.len(), 0);
994    }
995
996    #[test]
997    fn test_prefix_scan_returns_sorted() {
998        let mut store = InMemoryStore::new();
999        store.put(b"prefix:c", Bytes::from_static(b"3")).unwrap();
1000        store.put(b"prefix:a", Bytes::from_static(b"1")).unwrap();
1001        store.put(b"prefix:b", Bytes::from_static(b"2")).unwrap();
1002
1003        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
1004        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
1005        assert_eq!(
1006            keys,
1007            vec![
1008                b"prefix:a".to_vec(),
1009                b"prefix:b".to_vec(),
1010                b"prefix:c".to_vec()
1011            ]
1012        );
1013    }
1014}