Skip to main content

laminar_core/state/
ahash_store.rs

1//! AHashMap-backed state store with dual-structure design.
2//!
3//! [`AHashMapStore`] uses `AHashMap<Vec<u8>, Vec<u8>>` for O(1) point lookups
4//! and a `BTreeMap<Vec<u8>, ()>` index for efficient prefix/range scans.
5//! This is the first backend that supports zero-copy `get_ref`.
6//!
7//! ## Performance Characteristics
8//!
9//! - **Get**: O(1) average via AHashMap, < 200ns typical
10//! - **Get (zero-copy)**: O(1) via `get_ref()`, < 150ns typical
11//! - **Put**: O(1) amortized (hash) + O(log n) (BTreeMap index)
12//! - **Delete**: O(1) (hash) + O(log n) (BTreeMap index)
13//! - **Prefix scan**: O(log n + k) via BTreeMap index
14//! - **Range scan**: O(log n + k) via BTreeMap index
15
16use ahash::AHashMap;
17use bytes::Bytes;
18use std::collections::BTreeSet;
19use std::ops::Bound;
20use std::ops::Range;
21
22use super::{prefix_successor, StateError, StateSnapshot, StateStore};
23
24/// High-performance state store using `AHashMap` for point lookups and
25/// `BTreeMap` for ordered scans.
26///
27/// This dual-structure design provides:
28/// - O(1) point lookups (vs O(log n) for `InMemoryStore`)
29/// - Zero-copy reads via [`get_ref`](StateStore::get_ref)
30/// - Same O(log n + k) scan performance as `InMemoryStore`
31///
32/// Trade-off: ~2x memory for keys (stored in both maps) and slightly
33/// slower writes due to dual-map maintenance.
34pub struct AHashMapStore {
35    /// Primary data store for O(1) point lookups.
36    /// Values are `Bytes` so `get()` returns a cheap Arc bump (~2ns) instead
37    /// of `Bytes::copy_from_slice` (alloc + memcpy).
38    data: AHashMap<Vec<u8>, Bytes>,
39    /// Sorted index for prefix/range scans (keys only).
40    index: BTreeSet<Vec<u8>>,
41    /// Track total size in bytes (keys + values).
42    size_bytes: usize,
43}
44
45impl AHashMapStore {
46    /// Creates a new empty store.
47    #[must_use]
48    pub fn new() -> Self {
49        Self {
50            data: AHashMap::new(),
51            index: BTreeSet::new(),
52            size_bytes: 0,
53        }
54    }
55
56    /// Creates a new store with pre-allocated capacity for the hash map.
57    #[must_use]
58    pub fn with_capacity(capacity: usize) -> Self {
59        Self {
60            data: AHashMap::with_capacity(capacity),
61            index: BTreeSet::new(),
62            size_bytes: 0,
63        }
64    }
65}
66
67impl Default for AHashMapStore {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl StateStore for AHashMapStore {
74    #[inline]
75    fn get(&self, key: &[u8]) -> Option<Bytes> {
76        self.data.get(key).cloned() // Arc bump ~2ns, not copy
77    }
78
79    #[inline]
80    fn get_ref(&self, key: &[u8]) -> Option<&[u8]> {
81        self.data.get(key).map(Bytes::as_ref)
82    }
83
84    #[inline]
85    fn put(&mut self, key: &[u8], value: Bytes) -> Result<(), StateError> {
86        if let Some(old_value) = self.data.get_mut(key) {
87            self.size_bytes -= old_value.len();
88            self.size_bytes += value.len();
89            *old_value = value;
90        } else {
91            self.size_bytes += key.len() + value.len();
92            let key_vec = key.to_vec();
93            self.index.insert(key_vec.clone());
94            self.data.insert(key_vec, value);
95        }
96        Ok(())
97    }
98
99    #[inline]
100    fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
101        if let Some(old_value) = self.data.remove(key) {
102            self.size_bytes -= key.len() + old_value.len();
103            self.index.remove(key);
104        }
105        Ok(())
106    }
107
108    fn prefix_scan<'a>(
109        &'a self,
110        prefix: &'a [u8],
111    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
112        if prefix.is_empty() {
113            return Box::new(self.index.iter().map(move |k| {
114                let v = &self.data[k.as_slice()];
115                (Bytes::copy_from_slice(k), v.clone())
116            }));
117        }
118        if let Some(end) = prefix_successor(prefix) {
119            Box::new(
120                self.index
121                    .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
122                    .map(move |k| {
123                        let v = &self.data[k.as_slice()];
124                        (Bytes::copy_from_slice(k), v.clone())
125                    }),
126            )
127        } else {
128            Box::new(
129                self.index
130                    .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
131                    .map(move |k| {
132                        let v = &self.data[k.as_slice()];
133                        (Bytes::copy_from_slice(k), v.clone())
134                    }),
135            )
136        }
137    }
138
139    fn range_scan<'a>(
140        &'a self,
141        range: Range<&'a [u8]>,
142    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
143        Box::new(
144            self.index
145                .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
146                .map(move |k| {
147                    let v = &self.data[k.as_slice()];
148                    (Bytes::copy_from_slice(k), v.clone())
149                }),
150        )
151    }
152
153    #[inline]
154    fn contains(&self, key: &[u8]) -> bool {
155        self.data.contains_key(key)
156    }
157
158    fn size_bytes(&self) -> usize {
159        self.size_bytes
160    }
161
162    fn len(&self) -> usize {
163        self.data.len()
164    }
165
166    fn snapshot(&self) -> StateSnapshot {
167        let data: Vec<(Vec<u8>, Vec<u8>)> = self
168            .index
169            .iter()
170            .map(|k| {
171                let v = self.data[k.as_slice()].to_vec();
172                (k.clone(), v)
173            })
174            .collect();
175        StateSnapshot::new(data)
176    }
177
178    fn restore(&mut self, snapshot: StateSnapshot) {
179        self.data.clear();
180        self.index.clear();
181        self.size_bytes = 0;
182
183        for (key, value) in snapshot.data() {
184            self.size_bytes += key.len() + value.len();
185            self.index.insert(key.clone());
186            self.data.insert(key.clone(), Bytes::copy_from_slice(value));
187        }
188    }
189
190    fn clear(&mut self) {
191        self.data.clear();
192        self.index.clear();
193        self.size_bytes = 0;
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn test_basic_operations() {
203        let mut store = AHashMapStore::new();
204
205        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
206        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
207        assert_eq!(store.len(), 1);
208
209        // Overwrite
210        store.put(b"key1", Bytes::from_static(b"value2")).unwrap();
211        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
212        assert_eq!(store.len(), 1);
213
214        // Delete
215        store.delete(b"key1").unwrap();
216        assert!(store.get(b"key1").is_none());
217        assert_eq!(store.len(), 0);
218    }
219
220    #[test]
221    fn test_get_ref_zero_copy() {
222        let mut store = AHashMapStore::new();
223        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
224
225        // get_ref returns a direct slice
226        let slice = store.get_ref(b"key1").unwrap();
227        assert_eq!(slice, b"value1");
228
229        // Missing key returns None
230        assert!(store.get_ref(b"missing").is_none());
231    }
232
233    #[test]
234    fn test_contains() {
235        let mut store = AHashMapStore::new();
236        assert!(!store.contains(b"key1"));
237
238        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
239        assert!(store.contains(b"key1"));
240
241        store.delete(b"key1").unwrap();
242        assert!(!store.contains(b"key1"));
243    }
244
245    #[test]
246    fn test_prefix_scan() {
247        let mut store = AHashMapStore::new();
248        store
249            .put(b"prefix:1", Bytes::from_static(b"value1"))
250            .unwrap();
251        store
252            .put(b"prefix:2", Bytes::from_static(b"value2"))
253            .unwrap();
254        store
255            .put(b"prefix:10", Bytes::from_static(b"value10"))
256            .unwrap();
257        store
258            .put(b"other:1", Bytes::from_static(b"value3"))
259            .unwrap();
260
261        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
262        assert_eq!(results.len(), 3);
263        for (key, _) in &results {
264            assert!(key.starts_with(b"prefix:"));
265        }
266
267        // Empty prefix returns all
268        let all: Vec<_> = store.prefix_scan(b"").collect();
269        assert_eq!(all.len(), 4);
270    }
271
272    #[test]
273    fn test_prefix_scan_sorted() {
274        let mut store = AHashMapStore::new();
275        store.put(b"prefix:c", Bytes::from_static(b"3")).unwrap();
276        store.put(b"prefix:a", Bytes::from_static(b"1")).unwrap();
277        store.put(b"prefix:b", Bytes::from_static(b"2")).unwrap();
278
279        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
280        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
281        assert_eq!(
282            keys,
283            vec![
284                b"prefix:a".to_vec(),
285                b"prefix:b".to_vec(),
286                b"prefix:c".to_vec()
287            ]
288        );
289    }
290
291    #[test]
292    fn test_range_scan() {
293        let mut store = AHashMapStore::new();
294        store.put(b"a", Bytes::from_static(b"1")).unwrap();
295        store.put(b"b", Bytes::from_static(b"2")).unwrap();
296        store.put(b"c", Bytes::from_static(b"3")).unwrap();
297        store.put(b"d", Bytes::from_static(b"4")).unwrap();
298
299        let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
300        assert_eq!(results.len(), 2);
301
302        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
303        assert!(keys.contains(&b"b".as_slice()));
304        assert!(keys.contains(&b"c".as_slice()));
305    }
306
307    #[test]
308    fn test_snapshot_and_restore() {
309        let mut store = AHashMapStore::new();
310        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
311        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
312
313        let snapshot = store.snapshot();
314        assert_eq!(snapshot.len(), 2);
315
316        store.put(b"key1", Bytes::from_static(b"modified")).unwrap();
317        store.put(b"key3", Bytes::from_static(b"value3")).unwrap();
318        store.delete(b"key2").unwrap();
319
320        store.restore(snapshot);
321        assert_eq!(store.len(), 2);
322        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
323        assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
324        assert!(store.get(b"key3").is_none());
325    }
326
327    #[test]
328    fn test_size_tracking() {
329        let mut store = AHashMapStore::new();
330        assert_eq!(store.size_bytes(), 0);
331
332        store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
333        assert_eq!(store.size_bytes(), 4 + 6);
334
335        store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
336        assert_eq!(store.size_bytes(), (4 + 6) * 2);
337
338        // Overwrite with smaller value
339        store.put(b"key1", Bytes::from_static(b"v1")).unwrap();
340        assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6);
341
342        store.delete(b"key1").unwrap();
343        assert_eq!(store.size_bytes(), 4 + 6);
344
345        store.clear();
346        assert_eq!(store.size_bytes(), 0);
347    }
348
349    #[test]
350    fn test_with_capacity() {
351        let store = AHashMapStore::with_capacity(1000);
352        assert!(store.is_empty());
353        assert_eq!(store.len(), 0);
354    }
355
356    #[test]
357    fn test_delete_nonexistent() {
358        let mut store = AHashMapStore::new();
359        store.delete(b"nonexistent").unwrap();
360        assert_eq!(store.len(), 0);
361        assert_eq!(store.size_bytes(), 0);
362    }
363}