laminar_core/lookup/
foyer_cache.rs1use std::hash::{Hash, Hasher};
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use arrow_array::RecordBatch;
14use equivalent::Equivalent;
15use foyer::{Cache, CacheBuilder};
16
17use crate::lookup::table::LookupResult;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
24pub struct LookupCacheKey {
25 pub table_id: u32,
27 pub key: Vec<u8>,
29}
30
31pub(crate) struct LookupCacheKeyRef<'a> {
37 pub(crate) table_id: u32,
38 pub(crate) key: &'a [u8],
39}
40
41impl Hash for LookupCacheKeyRef<'_> {
42 fn hash<H: Hasher>(&self, state: &mut H) {
43 self.table_id.hash(state);
47 self.key.hash(state);
48 }
49}
50
51impl Equivalent<LookupCacheKey> for LookupCacheKeyRef<'_> {
52 fn equivalent(&self, other: &LookupCacheKey) -> bool {
53 self.table_id == other.table_id && self.key == other.key.as_slice()
54 }
55}
56
57#[derive(Debug, Clone, Copy)]
59pub struct FoyerMemoryCacheConfig {
60 pub capacity: usize,
62 pub shards: usize,
64}
65
66impl Default for FoyerMemoryCacheConfig {
67 fn default() -> Self {
68 Self {
69 capacity: 256 * 1024, shards: 16,
71 }
72 }
73}
74
75pub struct FoyerMemoryCache {
85 cache: Cache<LookupCacheKey, RecordBatch>,
86 table_id: u32,
87 hits: AtomicU64,
88 misses: AtomicU64,
89}
90
91impl FoyerMemoryCache {
92 #[must_use]
94 pub fn new(table_id: u32, config: FoyerMemoryCacheConfig) -> Self {
95 let cache = CacheBuilder::new(config.capacity)
96 .with_shards(config.shards)
97 .build();
98
99 Self {
100 cache,
101 table_id,
102 hits: AtomicU64::new(0),
103 misses: AtomicU64::new(0),
104 }
105 }
106
107 #[must_use]
109 pub fn with_defaults(table_id: u32) -> Self {
110 Self::new(table_id, FoyerMemoryCacheConfig::default())
111 }
112
113 #[must_use]
115 pub fn hit_count(&self) -> u64 {
116 self.hits.load(Ordering::Relaxed)
117 }
118
119 #[must_use]
121 pub fn miss_count(&self) -> u64 {
122 self.misses.load(Ordering::Relaxed)
123 }
124
125 #[must_use]
127 #[allow(clippy::cast_precision_loss)]
128 pub fn hit_ratio(&self) -> f64 {
129 let hits = self.hits.load(Ordering::Relaxed);
130 let misses = self.misses.load(Ordering::Relaxed);
131 let total = hits + misses;
132 if total == 0 {
133 0.0
134 } else {
135 hits as f64 / total as f64
136 }
137 }
138
139 #[must_use]
141 pub fn table_id(&self) -> u32 {
142 self.table_id
143 }
144
145 fn make_key(&self, key: &[u8]) -> LookupCacheKey {
147 LookupCacheKey {
148 table_id: self.table_id,
149 key: key.to_vec(),
150 }
151 }
152
153 pub fn get_cached(&self, key: &[u8]) -> LookupResult {
155 let ref_key = LookupCacheKeyRef {
156 table_id: self.table_id,
157 key,
158 };
159 if let Some(entry) = self.cache.get(&ref_key) {
160 let value = entry.value().clone();
161 self.hits.fetch_add(1, Ordering::Relaxed);
162 LookupResult::Hit(value)
163 } else {
164 self.misses.fetch_add(1, Ordering::Relaxed);
165 LookupResult::NotFound
166 }
167 }
168
169 pub fn get(&self, key: &[u8]) -> LookupResult {
171 self.get_cached(key)
172 }
173
174 pub fn insert(&self, key: &[u8], value: RecordBatch) {
176 let cache_key = self.make_key(key);
177 self.cache.insert(cache_key, value);
178 }
179
180 pub fn invalidate(&self, key: &[u8]) {
182 let ref_key = LookupCacheKeyRef {
183 table_id: self.table_id,
184 key,
185 };
186 self.cache.remove(&ref_key);
187 }
188
189 pub fn len(&self) -> usize {
191 self.cache.entries()
192 }
193
194 #[must_use]
196 pub fn is_empty(&self) -> bool {
197 self.len() == 0
198 }
199}
200
201impl std::fmt::Debug for FoyerMemoryCache {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("FoyerMemoryCache")
204 .field("table_id", &self.table_id)
205 .field("entries", &self.cache.entries())
206 .field("hits", &self.hits.load(Ordering::Relaxed))
207 .field("misses", &self.misses.load(Ordering::Relaxed))
208 .finish()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use arrow_array::StringArray;
216 use arrow_schema::{DataType, Field, Schema};
217 use std::sync::Arc;
218
219 fn test_batch(val: &str) -> RecordBatch {
220 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Utf8, false)]));
221 RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec![val]))]).unwrap()
222 }
223
224 fn small_cache(table_id: u32) -> FoyerMemoryCache {
225 FoyerMemoryCache::new(
226 table_id,
227 FoyerMemoryCacheConfig {
228 capacity: 64,
229 shards: 4,
230 },
231 )
232 }
233
234 #[test]
235 fn test_foyer_cache_hit_miss() {
236 let cache = small_cache(1);
237
238 let result = cache.get_cached(b"key1");
239 assert!(result.is_not_found());
240 assert_eq!(cache.miss_count(), 1);
241
242 cache.insert(b"key1", test_batch("value1"));
243 let result = cache.get_cached(b"key1");
244 assert!(result.is_hit());
245 let batch = result.into_batch().unwrap();
246 assert_eq!(batch.num_rows(), 1);
247 assert_eq!(cache.hit_count(), 1);
248 }
249
250 #[test]
251 fn test_foyer_cache_eviction() {
252 let cache = FoyerMemoryCache::new(
253 1,
254 FoyerMemoryCacheConfig {
255 capacity: 8,
256 shards: 1,
257 },
258 );
259
260 for i in 0..20u8 {
261 cache.insert(&[i], test_batch(&format!("v{i}")));
262 }
263
264 assert!(cache.len() <= 8, "len {} > capacity 8", cache.len());
265 }
266
267 #[test]
268 fn test_foyer_cache_invalidation() {
269 let cache = small_cache(1);
270
271 cache.insert(b"key1", test_batch("value1"));
272 assert!(cache.get_cached(b"key1").is_hit());
273
274 cache.invalidate(b"key1");
275 assert!(cache.get_cached(b"key1").is_not_found());
276 }
277
278 #[test]
279 fn test_foyer_cache_table_id_isolation() {
280 let cache_a = small_cache(1);
281 let cache_b = small_cache(2);
282
283 cache_a.insert(b"key1", test_batch("from_a"));
284 cache_b.insert(b"key1", test_batch("from_b"));
285
286 let batch_a = cache_a.get_cached(b"key1").into_batch().unwrap();
287 let batch_b = cache_b.get_cached(b"key1").into_batch().unwrap();
288
289 assert_eq!(batch_a.num_rows(), 1);
290 assert_eq!(batch_b.num_rows(), 1);
291 assert_ne!(batch_a, batch_b);
292 }
293
294 #[test]
295 fn test_foyer_cache_lookup_methods() {
296 let cache = small_cache(1);
297
298 cache.insert(b"k", test_batch("v"));
299 assert!(!cache.is_empty());
300 assert!(cache.get(b"k").is_hit());
301 }
302
303 #[test]
304 fn test_foyer_cache_hit_ratio() {
305 let cache = small_cache(1);
306 cache.insert(b"k1", test_batch("v1"));
307
308 cache.get_cached(b"k1");
310 cache.get_cached(b"k2");
312
313 assert!((cache.hit_ratio() - 0.5).abs() < f64::EPSILON);
314 }
315
316 #[test]
317 fn test_foyer_cache_debug() {
318 let cache = small_cache(42);
319 let debug = format!("{cache:?}");
320 assert!(debug.contains("FoyerMemoryCache"));
321 assert!(debug.contains("table_id: 42"));
322 }
323
324 #[test]
325 fn test_foyer_cache_default_config() {
326 let config = FoyerMemoryCacheConfig::default();
327 assert_eq!(config.capacity, 256 * 1024);
328 assert_eq!(config.shards, 16);
329 }
330
331 #[test]
332 fn test_foyer_cache_recordbatch_clone_is_cheap() {
333 let cache = small_cache(1);
334 let batch = test_batch("value");
335 cache.insert(b"k", batch.clone());
336
337 let hit1 = cache.get_cached(b"k").into_batch().unwrap();
338 let hit2 = cache.get_cached(b"k").into_batch().unwrap();
339 assert_eq!(hit1, hit2);
340 assert_eq!(hit1.num_rows(), 1);
341 }
342}