Skip to main content

laminar_core/operator/
table_cache.rs

1//! LRU cache and Xor filter for reference table partial-cache mode.
2//!
3//! `TableLruCache` provides O(1) slab-based LRU eviction for hot keys.
4//! `TableXorFilter` wraps `xorf::Xor8` for negative-lookup short-circuiting.
5
6use std::hash::{Hash, Hasher};
7
8use arrow::array::RecordBatch;
9use rustc_hash::FxHashMap;
10
11// ── LRU Cache ───────────────────────────────────────────────────────────
12
13/// A slab node in the intrusive doubly-linked LRU list.
14struct LruNode {
15    key: String,
16    value: RecordBatch,
17    prev: usize,
18    next: usize,
19}
20
21/// Sentinel value for null pointers in the linked list.
22const SENTINEL: usize = usize::MAX;
23
24/// O(1) slab-based LRU cache for reference table rows.
25///
26/// Uses an `FxHashMap<String, usize>` for key→slot lookup and a `Vec<LruNode>`
27/// slab with intrusive doubly-linked list pointers for LRU ordering.
28pub struct TableLruCache {
29    /// Key → slab index lookup.
30    index: FxHashMap<String, usize>,
31    /// Slab of LRU nodes.
32    slab: Vec<LruNode>,
33    /// Free list of recycled slab indices.
34    free: Vec<usize>,
35    /// Head of the LRU list (most recently used).
36    head: usize,
37    /// Tail of the LRU list (least recently used).
38    tail: usize,
39    /// Maximum number of entries.
40    max_entries: usize,
41    /// Total number of `get` calls.
42    gets: u64,
43    /// Number of cache hits.
44    hits: u64,
45    /// Total number of evictions.
46    evictions: u64,
47}
48
49impl TableLruCache {
50    /// Create a new LRU cache with the given maximum capacity.
51    #[must_use]
52    pub fn new(max_entries: usize) -> Self {
53        Self {
54            index: FxHashMap::default(),
55            slab: Vec::new(),
56            free: Vec::new(),
57            head: SENTINEL,
58            tail: SENTINEL,
59            max_entries,
60            gets: 0,
61            hits: 0,
62            evictions: 0,
63        }
64    }
65
66    /// Look up a key, promoting it to most-recently-used on hit.
67    pub fn get(&mut self, key: &str) -> Option<&RecordBatch> {
68        self.gets += 1;
69        let &slot = self.index.get(key)?;
70        self.hits += 1;
71        self.detach(slot);
72        self.push_front(slot);
73        Some(&self.slab[slot].value)
74    }
75
76    /// Insert a key-value pair. Returns the evicted entry if the cache was full.
77    pub fn insert(&mut self, key: String, value: RecordBatch) -> Option<(String, RecordBatch)> {
78        // If key already present, update in place and promote.
79        if let Some(&slot) = self.index.get(&key) {
80            self.slab[slot].value = value;
81            self.detach(slot);
82            self.push_front(slot);
83            return None;
84        }
85
86        // Evict if at capacity.
87        let evicted = if self.index.len() >= self.max_entries {
88            self.evict_tail()
89        } else {
90            None
91        };
92
93        // Allocate a slab slot.
94        let slot = if let Some(free_slot) = self.free.pop() {
95            self.slab[free_slot] = LruNode {
96                key: key.clone(),
97                value,
98                prev: SENTINEL,
99                next: SENTINEL,
100            };
101            free_slot
102        } else {
103            let slot = self.slab.len();
104            self.slab.push(LruNode {
105                key: key.clone(),
106                value,
107                prev: SENTINEL,
108                next: SENTINEL,
109            });
110            slot
111        };
112
113        self.index.insert(key, slot);
114        self.push_front(slot);
115
116        evicted
117    }
118
119    /// Remove a key from the cache. Returns `true` if it was present.
120    pub fn invalidate(&mut self, key: &str) -> bool {
121        if let Some(slot) = self.index.remove(key) {
122            self.detach(slot);
123            self.free.push(slot);
124            true
125        } else {
126            false
127        }
128    }
129
130    /// Clear all entries.
131    pub fn clear(&mut self) {
132        self.index.clear();
133        self.slab.clear();
134        self.free.clear();
135        self.head = SENTINEL;
136        self.tail = SENTINEL;
137    }
138
139    /// Number of entries currently in the cache.
140    #[must_use]
141    pub fn len(&self) -> usize {
142        self.index.len()
143    }
144
145    /// Whether the cache is empty.
146    #[must_use]
147    pub fn is_empty(&self) -> bool {
148        self.index.is_empty()
149    }
150
151    /// Maximum capacity.
152    #[must_use]
153    pub fn max_entries(&self) -> usize {
154        self.max_entries
155    }
156
157    /// Cache hit rate as a fraction in `[0.0, 1.0]`. Returns 0.0 if no gets.
158    #[must_use]
159    #[allow(clippy::cast_precision_loss)]
160    pub fn hit_rate(&self) -> f64 {
161        if self.gets == 0 {
162            0.0
163        } else {
164            self.hits as f64 / self.gets as f64
165        }
166    }
167
168    /// Total number of `get` calls.
169    #[must_use]
170    pub fn total_gets(&self) -> u64 {
171        self.gets
172    }
173
174    /// Total cache hits.
175    #[must_use]
176    pub fn total_hits(&self) -> u64 {
177        self.hits
178    }
179
180    /// Total evictions.
181    #[must_use]
182    pub fn total_evictions(&self) -> u64 {
183        self.evictions
184    }
185
186    // ── Internal linked-list operations ──
187
188    /// Detach a node from the doubly-linked list.
189    fn detach(&mut self, slot: usize) {
190        let prev = self.slab[slot].prev;
191        let next = self.slab[slot].next;
192
193        if prev == SENTINEL {
194            self.head = next;
195        } else {
196            self.slab[prev].next = next;
197        }
198
199        if next == SENTINEL {
200            self.tail = prev;
201        } else {
202            self.slab[next].prev = prev;
203        }
204
205        self.slab[slot].prev = SENTINEL;
206        self.slab[slot].next = SENTINEL;
207    }
208
209    /// Push a node to the front (most recently used).
210    fn push_front(&mut self, slot: usize) {
211        self.slab[slot].prev = SENTINEL;
212        self.slab[slot].next = self.head;
213
214        if self.head != SENTINEL {
215            self.slab[self.head].prev = slot;
216        }
217        self.head = slot;
218
219        if self.tail == SENTINEL {
220            self.tail = slot;
221        }
222    }
223
224    /// Evict the tail (least recently used) entry.
225    fn evict_tail(&mut self) -> Option<(String, RecordBatch)> {
226        if self.tail == SENTINEL {
227            return None;
228        }
229        let slot = self.tail;
230        self.detach(slot);
231
232        let node = &mut self.slab[slot];
233        let key = std::mem::take(&mut node.key);
234        let value = RecordBatch::new_empty(node.value.schema());
235        let evicted_value = std::mem::replace(&mut node.value, value);
236
237        self.index.remove(&key);
238        self.free.push(slot);
239        self.evictions += 1;
240
241        Some((key, evicted_value))
242    }
243}
244
245// ── Xor Filter ──────────────────────────────────────────────────────────
246
247/// Wraps `xorf::Xor8` for negative-lookup short-circuiting.
248///
249/// If the filter says a key does *not* exist, it definitely doesn't (no
250/// false negatives). If it says a key *does* exist, there's a ~0.4% chance
251/// of a false positive — acceptable for avoiding unnecessary cache misses.
252pub struct TableXorFilter {
253    filter: Option<xorf::Xor8>,
254    /// Mapping from string keys to u64 hashes used to build the filter.
255    hash_fn: fn(&str) -> u64,
256    /// Number of times `contains()` short-circuited (returned false).
257    short_circuit_count: u64,
258}
259
260impl TableXorFilter {
261    /// Create a new (empty) xor filter.
262    #[must_use]
263    pub fn new() -> Self {
264        Self {
265            filter: None,
266            hash_fn: hash_key,
267            short_circuit_count: 0,
268        }
269    }
270
271    /// Build the filter from a set of keys.
272    #[must_use]
273    pub fn build(keys: &[String]) -> Self {
274        let mut f = Self::new();
275        f.rebuild(keys);
276        f
277    }
278
279    /// Rebuild the filter in place from a new set of keys.
280    pub fn rebuild(&mut self, keys: &[String]) {
281        if keys.is_empty() {
282            self.filter = None;
283            return;
284        }
285        let hashes: Vec<u64> = keys.iter().map(|k| (self.hash_fn)(k)).collect();
286        self.filter = Some(xorf::Xor8::from(&hashes));
287    }
288
289    /// Check if a key *might* exist.
290    ///
291    /// Returns `true` if the filter is not built (permissive fallback).
292    pub fn contains(&mut self, key: &str) -> bool {
293        match &self.filter {
294            Some(f) => {
295                let h = (self.hash_fn)(key);
296                let result = xorf::Filter::contains(f, &h);
297                if !result {
298                    self.short_circuit_count += 1;
299                }
300                result
301            }
302            None => true, // permissive: no filter means "might exist"
303        }
304    }
305
306    /// Number of times `contains()` definitively returned `false`.
307    #[must_use]
308    pub fn short_circuits(&self) -> u64 {
309        self.short_circuit_count
310    }
311
312    /// Whether the filter has been built.
313    #[must_use]
314    pub fn is_built(&self) -> bool {
315        self.filter.is_some()
316    }
317
318    /// Clear the filter.
319    pub fn clear(&mut self) {
320        self.filter = None;
321        self.short_circuit_count = 0;
322    }
323}
324
325impl Default for TableXorFilter {
326    fn default() -> Self {
327        Self::new()
328    }
329}
330
331/// Hash a string key to u64 using `FxHasher`.
332fn hash_key(key: &str) -> u64 {
333    let mut hasher = rustc_hash::FxHasher::default();
334    key.hash(&mut hasher);
335    hasher.finish()
336}
337
338// ── Metrics ─────────────────────────────────────────────────────────────
339
340/// Aggregated cache metrics for a single table.
341#[derive(Debug, Clone)]
342pub struct TableCacheMetrics {
343    /// Total `get` calls on the LRU cache.
344    pub cache_gets: u64,
345    /// Total LRU cache hits.
346    pub cache_hits: u64,
347    /// Cache hit rate (0.0–1.0).
348    pub cache_hit_rate: f64,
349    /// Total LRU evictions.
350    pub cache_evictions: u64,
351    /// Number of entries currently in the LRU cache.
352    pub cache_entries: usize,
353    /// Maximum LRU cache capacity.
354    pub cache_max_entries: usize,
355    /// Whether the xor filter is built.
356    pub xor_filter_built: bool,
357    /// Number of xor filter short-circuits.
358    pub xor_short_circuits: u64,
359}
360
361// ── Lookup Decision ─────────────────────────────────────────────────────
362
363/// Decision from a cache-aware lookup.
364#[derive(Debug)]
365pub enum LookupDecision {
366    /// Key definitely does not exist (xor filter negative).
367    NotFound,
368    /// Key found in cache (LRU hit or backing store).
369    Found(RecordBatch),
370    /// Key not in cache and may exist in backing store.
371    CacheMiss,
372}
373
374// ── Helper to collect metrics from LRU + xor filter ─────────────────────
375
376/// Collect metrics from an LRU cache and xor filter pair.
377#[must_use]
378pub fn collect_cache_metrics(lru: &TableLruCache, xor: &TableXorFilter) -> TableCacheMetrics {
379    TableCacheMetrics {
380        cache_gets: lru.total_gets(),
381        cache_hits: lru.total_hits(),
382        cache_hit_rate: lru.hit_rate(),
383        cache_evictions: lru.total_evictions(),
384        cache_entries: lru.len(),
385        cache_max_entries: lru.max_entries(),
386        xor_filter_built: xor.is_built(),
387        xor_short_circuits: xor.short_circuits(),
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use std::sync::Arc;
395
396    use arrow::array::{Int32Array, StringArray};
397    use arrow::datatypes::{DataType, Field, Schema};
398
399    fn test_schema() -> Arc<Schema> {
400        Arc::new(Schema::new(vec![
401            Field::new("id", DataType::Int32, false),
402            Field::new("name", DataType::Utf8, false),
403        ]))
404    }
405
406    fn make_row(id: i32, name: &str) -> RecordBatch {
407        RecordBatch::try_new(
408            test_schema(),
409            vec![
410                Arc::new(Int32Array::from(vec![id])),
411                Arc::new(StringArray::from(vec![name])),
412            ],
413        )
414        .unwrap()
415    }
416
417    // ── LRU Cache tests ──
418
419    #[test]
420    fn test_lru_insert_and_get() {
421        let mut cache = TableLruCache::new(10);
422        cache.insert("a".to_string(), make_row(1, "Alice"));
423        let row = cache.get("a").unwrap();
424        assert_eq!(row.num_rows(), 1);
425    }
426
427    #[test]
428    fn test_lru_miss() {
429        let mut cache = TableLruCache::new(10);
430        assert!(cache.get("missing").is_none());
431    }
432
433    #[test]
434    fn test_lru_eviction() {
435        let mut cache = TableLruCache::new(2);
436        cache.insert("a".to_string(), make_row(1, "Alice"));
437        cache.insert("b".to_string(), make_row(2, "Bob"));
438
439        // This should evict "a" (tail)
440        let evicted = cache.insert("c".to_string(), make_row(3, "Charlie"));
441        assert!(evicted.is_some());
442        let (key, _) = evicted.unwrap();
443        assert_eq!(key, "a");
444
445        assert!(cache.get("a").is_none());
446        assert!(cache.get("b").is_some());
447        assert!(cache.get("c").is_some());
448    }
449
450    #[test]
451    fn test_lru_promotes_on_get() {
452        let mut cache = TableLruCache::new(2);
453        cache.insert("a".to_string(), make_row(1, "Alice"));
454        cache.insert("b".to_string(), make_row(2, "Bob"));
455
456        // Access "a" so it becomes MRU; "b" is now LRU
457        cache.get("a");
458
459        // Insert "c" should evict "b" (the LRU), not "a"
460        let evicted = cache.insert("c".to_string(), make_row(3, "Charlie"));
461        assert_eq!(evicted.unwrap().0, "b");
462        assert!(cache.get("a").is_some());
463    }
464
465    #[test]
466    fn test_lru_update_existing_key() {
467        let mut cache = TableLruCache::new(10);
468        cache.insert("a".to_string(), make_row(1, "Old"));
469        cache.insert("a".to_string(), make_row(1, "New"));
470
471        assert_eq!(cache.len(), 1);
472        let row = cache.get("a").unwrap();
473        let names = row
474            .column(1)
475            .as_any()
476            .downcast_ref::<StringArray>()
477            .unwrap();
478        assert_eq!(names.value(0), "New");
479    }
480
481    #[test]
482    fn test_lru_invalidate() {
483        let mut cache = TableLruCache::new(10);
484        cache.insert("a".to_string(), make_row(1, "Alice"));
485        assert!(cache.invalidate("a"));
486        assert!(cache.get("a").is_none());
487        assert!(!cache.invalidate("a")); // already gone
488    }
489
490    #[test]
491    fn test_lru_clear() {
492        let mut cache = TableLruCache::new(10);
493        cache.insert("a".to_string(), make_row(1, "Alice"));
494        cache.insert("b".to_string(), make_row(2, "Bob"));
495        cache.clear();
496        assert!(cache.is_empty());
497        assert_eq!(cache.len(), 0);
498    }
499
500    #[test]
501    fn test_lru_metrics() {
502        let mut cache = TableLruCache::new(2);
503        cache.insert("a".to_string(), make_row(1, "Alice"));
504
505        cache.get("a"); // hit
506        cache.get("b"); // miss
507        cache.get("a"); // hit
508
509        assert_eq!(cache.total_gets(), 3);
510        assert_eq!(cache.total_hits(), 2);
511        assert!((cache.hit_rate() - 2.0 / 3.0).abs() < 0.001);
512    }
513
514    #[test]
515    fn test_lru_eviction_counter() {
516        let mut cache = TableLruCache::new(1);
517        cache.insert("a".to_string(), make_row(1, "Alice"));
518        cache.insert("b".to_string(), make_row(2, "Bob"));
519        cache.insert("c".to_string(), make_row(3, "Charlie"));
520        assert_eq!(cache.total_evictions(), 2);
521    }
522
523    #[test]
524    fn test_lru_hit_rate_zero_gets() {
525        let cache = TableLruCache::new(10);
526        assert!((cache.hit_rate() - 0.0).abs() < f64::EPSILON);
527    }
528
529    #[test]
530    fn test_lru_slab_reuse() {
531        let mut cache = TableLruCache::new(2);
532        cache.insert("a".to_string(), make_row(1, "Alice"));
533        cache.insert("b".to_string(), make_row(2, "Bob"));
534        // Evict "a"
535        cache.insert("c".to_string(), make_row(3, "Charlie"));
536        // Evict "b"
537        cache.insert("d".to_string(), make_row(4, "Dave"));
538
539        // Slab slots should be reused (slab.len() stays at 2)
540        assert_eq!(cache.len(), 2);
541        assert!(cache.get("c").is_some());
542        assert!(cache.get("d").is_some());
543    }
544
545    #[test]
546    fn test_lru_single_capacity() {
547        let mut cache = TableLruCache::new(1);
548        cache.insert("a".to_string(), make_row(1, "Alice"));
549        let evicted = cache.insert("b".to_string(), make_row(2, "Bob"));
550        assert_eq!(evicted.unwrap().0, "a");
551        assert_eq!(cache.len(), 1);
552        assert!(cache.get("b").is_some());
553    }
554
555    // ── Xor Filter tests ──
556
557    #[test]
558    fn test_xor_build_and_contains() {
559        let keys = vec![
560            "apple".to_string(),
561            "banana".to_string(),
562            "cherry".to_string(),
563        ];
564        let mut filter = TableXorFilter::build(&keys);
565
566        // These should be found (no false negatives)
567        assert!(filter.contains("apple"));
568        assert!(filter.contains("banana"));
569        assert!(filter.contains("cherry"));
570        assert!(filter.is_built());
571    }
572
573    #[test]
574    fn test_xor_short_circuits_missing() {
575        let keys: Vec<String> = (0..1000).map(|i| format!("key_{i}")).collect();
576        let mut filter = TableXorFilter::build(&keys);
577
578        // Test many missing keys - some should be short-circuited
579        let mut short_circuited = 0;
580        for i in 1000..2000 {
581            if !filter.contains(&format!("key_{i}")) {
582                short_circuited += 1;
583            }
584        }
585        // With 0.4% FPR, most of the 1000 missing keys should short-circuit
586        assert!(
587            short_circuited > 900,
588            "Expected >900 short-circuits, got {short_circuited}"
589        );
590        assert!(filter.short_circuits() > 900);
591    }
592
593    #[test]
594    fn test_xor_empty_permissive() {
595        let mut filter = TableXorFilter::new();
596        // No filter built → permissive
597        assert!(!filter.is_built());
598        assert!(filter.contains("anything"));
599        assert_eq!(filter.short_circuits(), 0);
600    }
601
602    #[test]
603    fn test_xor_empty_keys_permissive() {
604        let mut filter = TableXorFilter::build(&[]);
605        assert!(!filter.is_built());
606        assert!(filter.contains("anything"));
607    }
608
609    #[test]
610    fn test_xor_rebuild() {
611        let keys1 = vec!["a".to_string(), "b".to_string()];
612        let mut filter = TableXorFilter::build(&keys1);
613        assert!(filter.is_built());
614
615        let keys2 = vec!["x".to_string(), "y".to_string()];
616        filter.rebuild(&keys2);
617        assert!(filter.is_built());
618
619        // "x" and "y" should be found
620        assert!(filter.contains("x"));
621        assert!(filter.contains("y"));
622    }
623
624    #[test]
625    fn test_xor_clear() {
626        let keys = vec!["a".to_string(), "b".to_string()];
627        let mut filter = TableXorFilter::build(&keys);
628        assert!(filter.is_built());
629
630        filter.clear();
631        assert!(!filter.is_built());
632        assert_eq!(filter.short_circuits(), 0);
633    }
634
635    #[test]
636    fn test_xor_default() {
637        let filter = TableXorFilter::default();
638        assert!(!filter.is_built());
639    }
640
641    #[test]
642    fn test_xor_short_circuit_counter() {
643        let keys = vec!["only_key".to_string()];
644        let mut filter = TableXorFilter::build(&keys);
645
646        // The key itself should be found
647        assert!(filter.contains("only_key"));
648        assert_eq!(filter.short_circuits(), 0);
649    }
650
651    // ── Metrics collection test ──
652
653    #[test]
654    fn test_collect_cache_metrics() {
655        let mut lru = TableLruCache::new(100);
656        lru.insert("a".to_string(), make_row(1, "Alice"));
657        lru.get("a");
658        lru.get("missing");
659
660        let keys = vec!["a".to_string()];
661        let xor = TableXorFilter::build(&keys);
662
663        let metrics = collect_cache_metrics(&lru, &xor);
664        assert_eq!(metrics.cache_gets, 2);
665        assert_eq!(metrics.cache_hits, 1);
666        assert_eq!(metrics.cache_entries, 1);
667        assert_eq!(metrics.cache_max_entries, 100);
668        assert!(metrics.xor_filter_built);
669        assert_eq!(metrics.xor_short_circuits, 0);
670    }
671
672    // ── LookupDecision test ──
673
674    #[test]
675    fn test_lookup_decision_variants() {
676        let row = make_row(1, "Alice");
677        let found = LookupDecision::Found(row);
678        assert!(matches!(found, LookupDecision::Found(_)));
679        assert!(matches!(LookupDecision::NotFound, LookupDecision::NotFound));
680        assert!(matches!(
681            LookupDecision::CacheMiss,
682            LookupDecision::CacheMiss
683        ));
684    }
685}