Skip to main content

laminar_core/lookup/
foyer_cache.rs

1//! foyer-backed in-memory cache for lookup tables.
2//!
3//! ## Ring 0 — [`FoyerMemoryCache`]
4//!
5//! Synchronous [`foyer::Cache`] with S3-FIFO eviction. Checked per-event
6//! on the operator hot path — sub-microsecond latency.
7//!
8//! `RecordBatch` clone is Arc bumps only (~16-48ns), within Ring 0 budget.
9
10use std::hash::{Hash, Hasher};
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use arrow_array::RecordBatch;
14use equivalent::Equivalent;
15use foyer::{Cache, CacheBuilder};
16
17use crate::lookup::table::LookupResult;
18
19/// Composite cache key: table ID + raw key bytes.
20///
21/// The `table_id` ensures that caches for different lookup tables
22/// never collide, even if they share a `foyer::Cache` instance.
23#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
24pub struct LookupCacheKey {
25    /// Lookup table identifier.
26    pub table_id: u32,
27    /// Raw key bytes.
28    pub key: Vec<u8>,
29}
30
31/// Borrowed view of [`LookupCacheKey`] that avoids heap allocation.
32///
33/// Used with foyer's `Cache::get<Q>()` where `Q: Hash + Equivalent<K>`.
34/// Hashes identically to `LookupCacheKey` because `Vec<u8>` and `[u8]`
35/// produce the same hash output.
36pub(crate) struct LookupCacheKeyRef<'a> {
37    pub(crate) table_id: u32,
38    pub(crate) key: &'a [u8],
39}
40
41impl Hash for LookupCacheKeyRef<'_> {
42    fn hash<H: Hasher>(&self, state: &mut H) {
43        // Must match the derived Hash for LookupCacheKey:
44        // Hash::hash(&self.table_id, state) then Hash::hash(&self.key, state).
45        // Vec<u8>::hash delegates to [u8]::hash, so this is identical.
46        self.table_id.hash(state);
47        self.key.hash(state);
48    }
49}
50
51impl Equivalent<LookupCacheKey> for LookupCacheKeyRef<'_> {
52    fn equivalent(&self, other: &LookupCacheKey) -> bool {
53        self.table_id == other.table_id && self.key == other.key.as_slice()
54    }
55}
56
57/// Configuration for [`FoyerMemoryCache`].
58#[derive(Debug, Clone, Copy)]
59pub struct FoyerMemoryCacheConfig {
60    /// Maximum number of entries in the cache.
61    pub capacity: usize,
62    /// Number of shards for concurrent access (should be a power of 2).
63    pub shards: usize,
64}
65
66impl Default for FoyerMemoryCacheConfig {
67    fn default() -> Self {
68        Self {
69            capacity: 256 * 1024, // 256K entries
70            shards: 16,
71        }
72    }
73}
74
75/// foyer-backed in-memory lookup table cache.
76///
77/// Wraps [`foyer::Cache`] with hit/miss counters and lookup-table
78/// semantics. Designed for Ring 0 (< 500ns per operation).
79///
80/// # Thread safety
81///
82/// `foyer::Cache` is internally sharded and lock-free on the read path.
83/// `FoyerMemoryCache` is `Send + Sync`.
84pub struct FoyerMemoryCache {
85    cache: Cache<LookupCacheKey, RecordBatch>,
86    table_id: u32,
87    hits: AtomicU64,
88    misses: AtomicU64,
89}
90
91impl FoyerMemoryCache {
92    /// Create a new cache with the given configuration.
93    #[must_use]
94    pub fn new(table_id: u32, config: FoyerMemoryCacheConfig) -> Self {
95        let cache = CacheBuilder::new(config.capacity)
96            .with_shards(config.shards)
97            .build();
98
99        Self {
100            cache,
101            table_id,
102            hits: AtomicU64::new(0),
103            misses: AtomicU64::new(0),
104        }
105    }
106
107    /// Create a cache with default configuration.
108    #[must_use]
109    pub fn with_defaults(table_id: u32) -> Self {
110        Self::new(table_id, FoyerMemoryCacheConfig::default())
111    }
112
113    /// Total cache hits since creation.
114    #[must_use]
115    pub fn hit_count(&self) -> u64 {
116        self.hits.load(Ordering::Relaxed)
117    }
118
119    /// Total cache misses since creation.
120    #[must_use]
121    pub fn miss_count(&self) -> u64 {
122        self.misses.load(Ordering::Relaxed)
123    }
124
125    /// Cache hit ratio (0.0 – 1.0).
126    #[must_use]
127    #[allow(clippy::cast_precision_loss)]
128    pub fn hit_ratio(&self) -> f64 {
129        let hits = self.hits.load(Ordering::Relaxed);
130        let misses = self.misses.load(Ordering::Relaxed);
131        let total = hits + misses;
132        if total == 0 {
133            0.0
134        } else {
135            hits as f64 / total as f64
136        }
137    }
138
139    /// The table ID this cache is associated with.
140    #[must_use]
141    pub fn table_id(&self) -> u32 {
142        self.table_id
143    }
144
145    /// Build a composite key.
146    fn make_key(&self, key: &[u8]) -> LookupCacheKey {
147        LookupCacheKey {
148            table_id: self.table_id,
149            key: key.to_vec(),
150        }
151    }
152
153    /// Look up a key in the in-memory cache only (Ring 0, < 500ns).
154    pub fn get_cached(&self, key: &[u8]) -> LookupResult {
155        let ref_key = LookupCacheKeyRef {
156            table_id: self.table_id,
157            key,
158        };
159        if let Some(entry) = self.cache.get(&ref_key) {
160            let value = entry.value().clone();
161            self.hits.fetch_add(1, Ordering::Relaxed);
162            LookupResult::Hit(value)
163        } else {
164            self.misses.fetch_add(1, Ordering::Relaxed);
165            LookupResult::NotFound
166        }
167    }
168
169    /// Alias for [`get_cached`](Self::get_cached). No slower storage tiers are wired yet.
170    pub fn get(&self, key: &[u8]) -> LookupResult {
171        self.get_cached(key)
172    }
173
174    /// Insert or update a cached entry.
175    pub fn insert(&self, key: &[u8], value: RecordBatch) {
176        let cache_key = self.make_key(key);
177        self.cache.insert(cache_key, value);
178    }
179
180    /// Invalidate a cached entry.
181    pub fn invalidate(&self, key: &[u8]) {
182        let ref_key = LookupCacheKeyRef {
183            table_id: self.table_id,
184            key,
185        };
186        self.cache.remove(&ref_key);
187    }
188
189    /// Number of entries currently in the cache.
190    pub fn len(&self) -> usize {
191        self.cache.entries()
192    }
193
194    /// Whether the cache is empty.
195    #[must_use]
196    pub fn is_empty(&self) -> bool {
197        self.len() == 0
198    }
199}
200
201impl std::fmt::Debug for FoyerMemoryCache {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        f.debug_struct("FoyerMemoryCache")
204            .field("table_id", &self.table_id)
205            .field("entries", &self.cache.entries())
206            .field("hits", &self.hits.load(Ordering::Relaxed))
207            .field("misses", &self.misses.load(Ordering::Relaxed))
208            .finish()
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use arrow_array::StringArray;
216    use arrow_schema::{DataType, Field, Schema};
217    use std::sync::Arc;
218
219    fn test_batch(val: &str) -> RecordBatch {
220        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Utf8, false)]));
221        RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec![val]))]).unwrap()
222    }
223
224    fn small_cache(table_id: u32) -> FoyerMemoryCache {
225        FoyerMemoryCache::new(
226            table_id,
227            FoyerMemoryCacheConfig {
228                capacity: 64,
229                shards: 4,
230            },
231        )
232    }
233
234    #[test]
235    fn test_foyer_cache_hit_miss() {
236        let cache = small_cache(1);
237
238        let result = cache.get_cached(b"key1");
239        assert!(result.is_not_found());
240        assert_eq!(cache.miss_count(), 1);
241
242        cache.insert(b"key1", test_batch("value1"));
243        let result = cache.get_cached(b"key1");
244        assert!(result.is_hit());
245        let batch = result.into_batch().unwrap();
246        assert_eq!(batch.num_rows(), 1);
247        assert_eq!(cache.hit_count(), 1);
248    }
249
250    #[test]
251    fn test_foyer_cache_eviction() {
252        let cache = FoyerMemoryCache::new(
253            1,
254            FoyerMemoryCacheConfig {
255                capacity: 8,
256                shards: 1,
257            },
258        );
259
260        for i in 0..20u8 {
261            cache.insert(&[i], test_batch(&format!("v{i}")));
262        }
263
264        assert!(cache.len() <= 8, "len {} > capacity 8", cache.len());
265    }
266
267    #[test]
268    fn test_foyer_cache_invalidation() {
269        let cache = small_cache(1);
270
271        cache.insert(b"key1", test_batch("value1"));
272        assert!(cache.get_cached(b"key1").is_hit());
273
274        cache.invalidate(b"key1");
275        assert!(cache.get_cached(b"key1").is_not_found());
276    }
277
278    #[test]
279    fn test_foyer_cache_table_id_isolation() {
280        let cache_a = small_cache(1);
281        let cache_b = small_cache(2);
282
283        cache_a.insert(b"key1", test_batch("from_a"));
284        cache_b.insert(b"key1", test_batch("from_b"));
285
286        let batch_a = cache_a.get_cached(b"key1").into_batch().unwrap();
287        let batch_b = cache_b.get_cached(b"key1").into_batch().unwrap();
288
289        assert_eq!(batch_a.num_rows(), 1);
290        assert_eq!(batch_b.num_rows(), 1);
291        assert_ne!(batch_a, batch_b);
292    }
293
294    #[test]
295    fn test_foyer_cache_lookup_methods() {
296        let cache = small_cache(1);
297
298        cache.insert(b"k", test_batch("v"));
299        assert!(!cache.is_empty());
300        assert!(cache.get(b"k").is_hit());
301    }
302
303    #[test]
304    fn test_foyer_cache_hit_ratio() {
305        let cache = small_cache(1);
306        cache.insert(b"k1", test_batch("v1"));
307
308        // 1 hit
309        cache.get_cached(b"k1");
310        // 1 miss
311        cache.get_cached(b"k2");
312
313        assert!((cache.hit_ratio() - 0.5).abs() < f64::EPSILON);
314    }
315
316    #[test]
317    fn test_foyer_cache_debug() {
318        let cache = small_cache(42);
319        let debug = format!("{cache:?}");
320        assert!(debug.contains("FoyerMemoryCache"));
321        assert!(debug.contains("table_id: 42"));
322    }
323
324    #[test]
325    fn test_foyer_cache_default_config() {
326        let config = FoyerMemoryCacheConfig::default();
327        assert_eq!(config.capacity, 256 * 1024);
328        assert_eq!(config.shards, 16);
329    }
330
331    #[test]
332    fn test_foyer_cache_recordbatch_clone_is_cheap() {
333        let cache = small_cache(1);
334        let batch = test_batch("value");
335        cache.insert(b"k", batch.clone());
336
337        let hit1 = cache.get_cached(b"k").into_batch().unwrap();
338        let hit2 = cache.get_cached(b"k").into_batch().unwrap();
339        assert_eq!(hit1, hit2);
340        assert_eq!(hit1.num_rows(), 1);
341    }
342}