1use bytes::Bytes;
65use rkyv::{
66 api::high::{self, HighDeserializer, HighSerializer, HighValidator},
67 bytecheck::CheckBytes,
68 rancor::Error as RkyvError,
69 ser::allocator::ArenaHandle,
70 util::AlignedVec,
71 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
72};
73use std::cell::RefCell;
74use std::collections::BTreeMap;
75use std::ops::Bound;
76use std::ops::Range;
77
78pub(crate) fn prefix_successor(prefix: &[u8]) -> Option<smallvec::SmallVec<[u8; 64]>> {
83 if prefix.is_empty() {
84 return None;
85 }
86 let mut successor = smallvec::SmallVec::<[u8; 64]>::from_slice(prefix);
87 while let Some(last) = successor.last_mut() {
89 if *last < 0xFF {
90 *last += 1;
91 return Some(successor);
92 }
93 successor.pop();
94 }
95 None
97}
98
99pub trait StateStore: Send {
121 fn get(&self, key: &[u8]) -> Option<Bytes>;
129
130 fn put(&mut self, key: &[u8], value: Bytes) -> Result<(), StateError>;
141
142 fn delete(&mut self, key: &[u8]) -> Result<(), StateError>;
150
151 fn prefix_scan<'a>(&'a self, prefix: &'a [u8])
161 -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
162
163 fn range_scan<'a>(
173 &'a self,
174 range: Range<&'a [u8]>,
175 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
176
177 fn contains(&self, key: &[u8]) -> bool {
181 self.get(key).is_some()
182 }
183
184 fn size_bytes(&self) -> usize;
189
190 fn len(&self) -> usize;
192
193 fn is_empty(&self) -> bool {
195 self.len() == 0
196 }
197
198 fn snapshot(&self) -> StateSnapshot;
209
210 fn restore(&mut self, snapshot: StateSnapshot);
215
216 fn clear(&mut self);
218
219 fn flush(&mut self) -> Result<(), StateError> {
228 Ok(()) }
230
231 fn get_ref(&self, _key: &[u8]) -> Option<&[u8]> {
243 None
244 }
245
246 fn get_or_insert(&mut self, key: &[u8], default: &[u8]) -> Result<Bytes, StateError> {
254 if let Some(value) = self.get(key) {
255 Ok(value)
256 } else {
257 let value = Bytes::copy_from_slice(default);
258 self.put(key, value.clone())?;
259 Ok(value)
260 }
261 }
262}
263
264pub trait StateStoreExt: StateStore {
288 fn get_typed<T>(&self, key: &[u8]) -> Result<Option<T>, StateError>
297 where
298 T: Archive,
299 T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
300 + RkyvDeserialize<T, HighDeserializer<RkyvError>>,
301 {
302 let bytes_ref = self.get_ref(key);
304 let bytes_owned;
305 let data: &[u8] = if let Some(r) = bytes_ref {
306 r
307 } else if let Some(b) = self.get(key) {
308 bytes_owned = b;
309 &bytes_owned
310 } else {
311 return Ok(None);
312 };
313
314 let archived = rkyv::access::<T::Archived, RkyvError>(data)
315 .map_err(|e| StateError::Serialization(e.to_string()))?;
316 let value = rkyv::deserialize::<T, RkyvError>(archived)
317 .map_err(|e| StateError::Serialization(e.to_string()))?;
318 Ok(Some(value))
319 }
320
321 fn put_typed<T>(&mut self, key: &[u8], value: &T) -> Result<(), StateError>
330 where
331 T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
332 {
333 thread_local! {
334 static SERIALIZE_BUF: RefCell<AlignedVec> =
335 RefCell::new(AlignedVec::with_capacity(256));
336 }
337
338 SERIALIZE_BUF.with(|buf| {
339 let mut vec = buf.borrow_mut();
340 vec.clear();
341 let taken = std::mem::take(&mut *vec);
343 match high::to_bytes_in::<_, RkyvError>(value, taken) {
344 Ok(filled) => {
345 let result = self.put(key, Bytes::copy_from_slice(&filled));
346 *vec = filled;
347 result
348 }
349 Err(e) => {
350 *vec = AlignedVec::new();
352 Err(StateError::Serialization(e.to_string()))
353 }
354 }
355 })
356 }
357
358 fn update<F>(&mut self, key: &[u8], f: F) -> Result<(), StateError>
367 where
368 F: FnOnce(Option<Bytes>) -> Option<Vec<u8>>,
369 {
370 let current = self.get(key);
371 match f(current) {
372 Some(new_value) => self.put(key, Bytes::from(new_value)),
373 None => self.delete(key),
374 }
375 }
376}
377
378impl<T: StateStore + ?Sized> StateStoreExt for T {}
380
381#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
388pub struct StateSnapshot {
389 data: Vec<(Vec<u8>, Vec<u8>)>,
391 timestamp_ns: u64,
393 version: u32,
395}
396
397impl StateSnapshot {
398 #[must_use]
400 #[allow(clippy::cast_possible_truncation)]
401 pub fn new(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
402 Self {
403 data,
404 timestamp_ns: std::time::SystemTime::now()
406 .duration_since(std::time::UNIX_EPOCH)
407 .map(|d| d.as_nanos() as u64)
408 .unwrap_or(0),
409 version: 1,
410 }
411 }
412
413 #[must_use]
415 pub fn data(&self) -> &[(Vec<u8>, Vec<u8>)] {
416 &self.data
417 }
418
419 #[must_use]
421 pub fn timestamp_ns(&self) -> u64 {
422 self.timestamp_ns
423 }
424
425 #[must_use]
427 pub fn len(&self) -> usize {
428 self.data.len()
429 }
430
431 #[must_use]
433 pub fn is_empty(&self) -> bool {
434 self.data.is_empty()
435 }
436
437 #[must_use]
439 pub fn size_bytes(&self) -> usize {
440 self.data.iter().map(|(k, v)| k.len() + v.len()).sum()
441 }
442
443 pub fn to_bytes(&self) -> Result<AlignedVec, StateError> {
451 rkyv::to_bytes::<RkyvError>(self).map_err(|e| StateError::Serialization(e.to_string()))
452 }
453
454 pub fn from_bytes(bytes: &[u8]) -> Result<Self, StateError> {
462 let archived = rkyv::access::<<Self as Archive>::Archived, RkyvError>(bytes)
463 .map_err(|e| StateError::Serialization(e.to_string()))?;
464 rkyv::deserialize::<Self, RkyvError>(archived)
465 .map_err(|e| StateError::Serialization(e.to_string()))
466 }
467}
468
469pub struct InMemoryStore {
488 data: BTreeMap<Vec<u8>, Bytes>,
490 size_bytes: usize,
492}
493
494impl InMemoryStore {
495 #[must_use]
497 pub fn new() -> Self {
498 Self {
499 data: BTreeMap::new(),
500 size_bytes: 0,
501 }
502 }
503
504 #[must_use]
509 pub fn with_capacity(_capacity: usize) -> Self {
510 Self::new()
511 }
512
513 #[must_use]
518 pub fn capacity(&self) -> usize {
519 self.data.len()
520 }
521
522 pub fn shrink_to_fit(&mut self) {
527 }
529}
530
531impl Default for InMemoryStore {
532 fn default() -> Self {
533 Self::new()
534 }
535}
536
537impl StateStore for InMemoryStore {
538 #[inline]
539 fn get(&self, key: &[u8]) -> Option<Bytes> {
540 self.data.get(key).cloned()
541 }
542
543 #[inline]
544 fn get_ref(&self, key: &[u8]) -> Option<&[u8]> {
545 self.data.get(key).map(Bytes::as_ref)
546 }
547
548 #[inline]
549 fn put(&mut self, key: &[u8], value: Bytes) -> Result<(), StateError> {
550 if let Some(existing) = self.data.get_mut(key) {
553 self.size_bytes -= existing.len();
554 self.size_bytes += value.len();
555 *existing = value;
556 } else {
557 self.size_bytes += key.len() + value.len();
558 self.data.insert(key.to_vec(), value);
559 }
560 Ok(())
561 }
562
563 fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
564 if let Some(old_value) = self.data.remove(key) {
565 self.size_bytes -= key.len() + old_value.len();
566 }
567 Ok(())
568 }
569
570 fn prefix_scan<'a>(
571 &'a self,
572 prefix: &'a [u8],
573 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
574 if prefix.is_empty() {
575 return Box::new(
577 self.data
578 .iter()
579 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
580 );
581 }
582 if let Some(end) = prefix_successor(prefix) {
583 Box::new(
584 self.data
585 .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
586 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
587 )
588 } else {
589 Box::new(
591 self.data
592 .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
593 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
594 )
595 }
596 }
597
598 fn range_scan<'a>(
599 &'a self,
600 range: Range<&'a [u8]>,
601 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
602 Box::new(
603 self.data
604 .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
605 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
606 )
607 }
608
609 #[inline]
610 fn contains(&self, key: &[u8]) -> bool {
611 self.data.contains_key(key.as_ref())
612 }
613
614 fn size_bytes(&self) -> usize {
615 self.size_bytes
616 }
617
618 fn len(&self) -> usize {
619 self.data.len()
620 }
621
622 fn snapshot(&self) -> StateSnapshot {
623 let data: Vec<(Vec<u8>, Vec<u8>)> = self
624 .data
625 .iter()
626 .map(|(k, v)| (k.clone(), v.to_vec()))
627 .collect();
628 StateSnapshot::new(data)
629 }
630
631 fn restore(&mut self, snapshot: StateSnapshot) {
632 self.data.clear();
633 self.size_bytes = 0;
634
635 for (key, value) in snapshot.data {
636 self.size_bytes += key.len() + value.len();
637 self.data.insert(key, Bytes::from(value));
638 }
639 }
640
641 fn clear(&mut self) {
642 self.data.clear();
643 self.size_bytes = 0;
644 }
645}
646
647#[derive(Debug, thiserror::Error)]
649pub enum StateError {
650 #[error("I/O error: {0}")]
652 Io(#[from] std::io::Error),
653
654 #[error("Serialization error: {0}")]
656 Serialization(String),
657
658 #[error("Deserialization error: {0}")]
660 Deserialization(String),
661
662 #[error("Corruption error: {0}")]
664 Corruption(String),
665}
666
667mod mmap;
668
669pub mod ahash_store;
671
672pub mod dict_encoder;
674
675pub use self::StateError as Error;
677pub use ahash_store::AHashMapStore;
678pub use dict_encoder::DictionaryKeyEncoder;
679pub use mmap::MmapStateStore;
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684
685 #[test]
686 fn test_in_memory_store_basic() {
687 let mut store = InMemoryStore::new();
688
689 store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
691 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
692 assert_eq!(store.len(), 1);
693
694 store.put(b"key1", Bytes::from_static(b"value2")).unwrap();
696 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
697 assert_eq!(store.len(), 1);
698
699 store.delete(b"key1").unwrap();
701 assert!(store.get(b"key1").is_none());
702 assert_eq!(store.len(), 0);
703
704 store.delete(b"nonexistent").unwrap();
706 }
707
708 #[test]
709 fn test_contains() {
710 let mut store = InMemoryStore::new();
711 assert!(!store.contains(b"key1"));
712
713 store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
714 assert!(store.contains(b"key1"));
715
716 store.delete(b"key1").unwrap();
717 assert!(!store.contains(b"key1"));
718 }
719
720 #[test]
721 fn test_prefix_scan() {
722 let mut store = InMemoryStore::new();
723 store
724 .put(b"prefix:1", Bytes::from_static(b"value1"))
725 .unwrap();
726 store
727 .put(b"prefix:2", Bytes::from_static(b"value2"))
728 .unwrap();
729 store
730 .put(b"prefix:10", Bytes::from_static(b"value10"))
731 .unwrap();
732 store
733 .put(b"other:1", Bytes::from_static(b"value3"))
734 .unwrap();
735
736 let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
737 assert_eq!(results.len(), 3);
738
739 for (key, _) in &results {
741 assert!(key.starts_with(b"prefix:"));
742 }
743
744 let all: Vec<_> = store.prefix_scan(b"").collect();
746 assert_eq!(all.len(), 4);
747 }
748
749 #[test]
750 fn test_range_scan() {
751 let mut store = InMemoryStore::new();
752 store.put(b"a", Bytes::from_static(b"1")).unwrap();
753 store.put(b"b", Bytes::from_static(b"2")).unwrap();
754 store.put(b"c", Bytes::from_static(b"3")).unwrap();
755 store.put(b"d", Bytes::from_static(b"4")).unwrap();
756
757 let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
758 assert_eq!(results.len(), 2);
759
760 let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
761 assert!(keys.contains(&b"b".as_slice()));
762 assert!(keys.contains(&b"c".as_slice()));
763 }
764
765 #[test]
766 fn test_snapshot_and_restore() {
767 let mut store = InMemoryStore::new();
768 store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
769 store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
770
771 let snapshot = store.snapshot();
773 assert_eq!(snapshot.len(), 2);
774
775 store.put(b"key1", Bytes::from_static(b"modified")).unwrap();
777 store.put(b"key3", Bytes::from_static(b"value3")).unwrap();
778 store.delete(b"key2").unwrap();
779
780 assert_eq!(store.len(), 2);
781 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
782
783 store.restore(snapshot);
785
786 assert_eq!(store.len(), 2);
787 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
788 assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
789 assert!(store.get(b"key3").is_none());
790 }
791
792 #[test]
793 fn test_snapshot_serialization() {
794 let mut store = InMemoryStore::new();
795 store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
796 store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
797
798 let snapshot = store.snapshot();
799
800 let bytes = snapshot.to_bytes().unwrap();
802 let restored = StateSnapshot::from_bytes(&bytes).unwrap();
803
804 assert_eq!(restored.len(), snapshot.len());
805 assert_eq!(restored.data(), snapshot.data());
806 }
807
808 #[test]
809 fn test_typed_access() {
810 let mut store = InMemoryStore::new();
811
812 store.put_typed(b"count", &42u64).unwrap();
814 let count: u64 = store.get_typed(b"count").unwrap().unwrap();
815 assert_eq!(count, 42);
816
817 store.put_typed(b"name", &String::from("alice")).unwrap();
819 let name: String = store.get_typed(b"name").unwrap().unwrap();
820 assert_eq!(name, "alice");
821
822 let nums = vec![1i64, 2, 3, 4, 5];
824 store.put_typed(b"nums", &nums).unwrap();
825 let restored: Vec<i64> = store.get_typed(b"nums").unwrap().unwrap();
826 assert_eq!(restored, nums);
827
828 let missing: Option<u64> = store.get_typed(b"missing").unwrap();
830 assert!(missing.is_none());
831 }
832
833 #[test]
834 fn test_get_or_insert() {
835 let mut store = InMemoryStore::new();
836
837 let value = store.get_or_insert(b"key1", b"default").unwrap();
839 assert_eq!(value, Bytes::from("default"));
840 assert_eq!(store.len(), 1);
841
842 store.put(b"key1", Bytes::from_static(b"modified")).unwrap();
844 let value = store.get_or_insert(b"key1", b"default").unwrap();
845 assert_eq!(value, Bytes::from("modified"));
846 }
847
848 #[test]
849 fn test_update() {
850 let mut store = InMemoryStore::new();
851 store
852 .put(b"counter", Bytes::from_static(b"\x00\x00\x00\x00"))
853 .unwrap();
854
855 store
857 .update(b"counter", |current| {
858 let val = current.map_or(0u32, |b| {
859 u32::from_le_bytes(b.as_ref().try_into().unwrap_or([0; 4]))
860 });
861 Some((val + 1).to_le_bytes().to_vec())
862 })
863 .unwrap();
864
865 let bytes = store.get(b"counter").unwrap();
866 let val = u32::from_le_bytes(bytes.as_ref().try_into().unwrap());
867 assert_eq!(val, 1);
868
869 store.update(b"counter", |_| None).unwrap();
871 assert!(store.get(b"counter").is_none());
872 }
873
874 #[test]
875 fn test_size_tracking() {
876 let mut store = InMemoryStore::new();
877 assert_eq!(store.size_bytes(), 0);
878
879 store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
880 assert_eq!(store.size_bytes(), 4 + 6); store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
883 assert_eq!(store.size_bytes(), (4 + 6) * 2);
884
885 store.put(b"key1", Bytes::from_static(b"v1")).unwrap();
887 assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6); store.delete(b"key1").unwrap();
890 assert_eq!(store.size_bytes(), 4 + 6);
891
892 store.clear();
893 assert_eq!(store.size_bytes(), 0);
894 }
895
896 #[test]
897 fn test_with_capacity() {
898 let store = InMemoryStore::with_capacity(1000);
899 assert_eq!(store.capacity(), 0);
901 assert!(store.is_empty());
902 }
903
904 #[test]
905 fn test_clear() {
906 let mut store = InMemoryStore::new();
907 store.put(b"key1", Bytes::from_static(b"value1")).unwrap();
908 store.put(b"key2", Bytes::from_static(b"value2")).unwrap();
909
910 assert_eq!(store.len(), 2);
911 assert!(store.size_bytes() > 0);
912
913 store.clear();
914
915 assert_eq!(store.len(), 0);
916 assert_eq!(store.size_bytes(), 0);
917 assert!(store.get(b"key1").is_none());
918 }
919
920 #[test]
921 fn test_prefix_successor() {
922 assert_eq!(prefix_successor(b"abc").as_deref(), Some(b"abd".as_slice()));
924
925 assert!(prefix_successor(b"").is_none());
927
928 assert!(prefix_successor(&[0xFF, 0xFF, 0xFF]).is_none());
930
931 assert_eq!(
933 prefix_successor(&[0x01, 0xFF]).as_deref(),
934 Some([0x02].as_slice())
935 );
936 assert_eq!(
937 prefix_successor(&[0x01, 0x02, 0xFF]).as_deref(),
938 Some([0x01, 0x03].as_slice())
939 );
940
941 assert_eq!(
943 prefix_successor(&[0x00]).as_deref(),
944 Some([0x01].as_slice())
945 );
946 assert_eq!(
947 prefix_successor(&[0xFE]).as_deref(),
948 Some([0xFF].as_slice())
949 );
950 assert!(prefix_successor(&[0xFF]).is_none());
951 }
952
953 #[test]
954 fn test_prefix_scan_binary_keys() {
955 let mut store = InMemoryStore::new();
956
957 let prefix_a = [0x00, 0x01]; let prefix_b = [0x00, 0x02]; store
962 .put(&[0x00, 0x01, 0xAA], Bytes::from_static(b"val1"))
963 .unwrap();
964 store
965 .put(&[0x00, 0x01, 0xBB], Bytes::from_static(b"val2"))
966 .unwrap();
967 store
968 .put(&[0x00, 0x02, 0xCC], Bytes::from_static(b"val3"))
969 .unwrap();
970 store
971 .put(&[0x00, 0x02, 0xDD], Bytes::from_static(b"val4"))
972 .unwrap();
973 store
974 .put(&[0x01, 0x01, 0xEE], Bytes::from_static(b"val5"))
975 .unwrap();
976
977 let results_a: Vec<_> = store.prefix_scan(&prefix_a).collect();
979 assert_eq!(results_a.len(), 2);
980 for (key, _) in &results_a {
981 assert!(key.starts_with(&prefix_a));
982 }
983
984 let results_b: Vec<_> = store.prefix_scan(&prefix_b).collect();
986 assert_eq!(results_b.len(), 2);
987 for (key, _) in &results_b {
988 assert!(key.starts_with(&prefix_b));
989 }
990
991 let results_ff: Vec<_> = store.prefix_scan(&[0xFF, 0xFF]).collect();
993 assert_eq!(results_ff.len(), 0);
994 }
995
996 #[test]
997 fn test_prefix_scan_returns_sorted() {
998 let mut store = InMemoryStore::new();
999 store.put(b"prefix:c", Bytes::from_static(b"3")).unwrap();
1000 store.put(b"prefix:a", Bytes::from_static(b"1")).unwrap();
1001 store.put(b"prefix:b", Bytes::from_static(b"2")).unwrap();
1002
1003 let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
1004 let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
1005 assert_eq!(
1006 keys,
1007 vec![
1008 b"prefix:a".to_vec(),
1009 b"prefix:b".to_vec(),
1010 b"prefix:c".to_vec()
1011 ]
1012 );
1013 }
1014}