1use std::hash::{Hash, Hasher};
7
8use arrow::array::RecordBatch;
9use rustc_hash::FxHashMap;
10
11struct LruNode {
15 key: String,
16 value: RecordBatch,
17 prev: usize,
18 next: usize,
19}
20
21const SENTINEL: usize = usize::MAX;
23
24pub struct TableLruCache {
29 index: FxHashMap<String, usize>,
31 slab: Vec<LruNode>,
33 free: Vec<usize>,
35 head: usize,
37 tail: usize,
39 max_entries: usize,
41 gets: u64,
43 hits: u64,
45 evictions: u64,
47}
48
49impl TableLruCache {
50 #[must_use]
52 pub fn new(max_entries: usize) -> Self {
53 Self {
54 index: FxHashMap::default(),
55 slab: Vec::new(),
56 free: Vec::new(),
57 head: SENTINEL,
58 tail: SENTINEL,
59 max_entries,
60 gets: 0,
61 hits: 0,
62 evictions: 0,
63 }
64 }
65
66 pub fn get(&mut self, key: &str) -> Option<&RecordBatch> {
68 self.gets += 1;
69 let &slot = self.index.get(key)?;
70 self.hits += 1;
71 self.detach(slot);
72 self.push_front(slot);
73 Some(&self.slab[slot].value)
74 }
75
76 pub fn insert(&mut self, key: String, value: RecordBatch) -> Option<(String, RecordBatch)> {
78 if let Some(&slot) = self.index.get(&key) {
80 self.slab[slot].value = value;
81 self.detach(slot);
82 self.push_front(slot);
83 return None;
84 }
85
86 let evicted = if self.index.len() >= self.max_entries {
88 self.evict_tail()
89 } else {
90 None
91 };
92
93 let slot = if let Some(free_slot) = self.free.pop() {
95 self.slab[free_slot] = LruNode {
96 key: key.clone(),
97 value,
98 prev: SENTINEL,
99 next: SENTINEL,
100 };
101 free_slot
102 } else {
103 let slot = self.slab.len();
104 self.slab.push(LruNode {
105 key: key.clone(),
106 value,
107 prev: SENTINEL,
108 next: SENTINEL,
109 });
110 slot
111 };
112
113 self.index.insert(key, slot);
114 self.push_front(slot);
115
116 evicted
117 }
118
119 pub fn invalidate(&mut self, key: &str) -> bool {
121 if let Some(slot) = self.index.remove(key) {
122 self.detach(slot);
123 self.free.push(slot);
124 true
125 } else {
126 false
127 }
128 }
129
130 pub fn clear(&mut self) {
132 self.index.clear();
133 self.slab.clear();
134 self.free.clear();
135 self.head = SENTINEL;
136 self.tail = SENTINEL;
137 }
138
139 #[must_use]
141 pub fn len(&self) -> usize {
142 self.index.len()
143 }
144
145 #[must_use]
147 pub fn is_empty(&self) -> bool {
148 self.index.is_empty()
149 }
150
151 #[must_use]
153 pub fn max_entries(&self) -> usize {
154 self.max_entries
155 }
156
157 #[must_use]
159 #[allow(clippy::cast_precision_loss)]
160 pub fn hit_rate(&self) -> f64 {
161 if self.gets == 0 {
162 0.0
163 } else {
164 self.hits as f64 / self.gets as f64
165 }
166 }
167
168 #[must_use]
170 pub fn total_gets(&self) -> u64 {
171 self.gets
172 }
173
174 #[must_use]
176 pub fn total_hits(&self) -> u64 {
177 self.hits
178 }
179
180 #[must_use]
182 pub fn total_evictions(&self) -> u64 {
183 self.evictions
184 }
185
186 fn detach(&mut self, slot: usize) {
190 let prev = self.slab[slot].prev;
191 let next = self.slab[slot].next;
192
193 if prev == SENTINEL {
194 self.head = next;
195 } else {
196 self.slab[prev].next = next;
197 }
198
199 if next == SENTINEL {
200 self.tail = prev;
201 } else {
202 self.slab[next].prev = prev;
203 }
204
205 self.slab[slot].prev = SENTINEL;
206 self.slab[slot].next = SENTINEL;
207 }
208
209 fn push_front(&mut self, slot: usize) {
211 self.slab[slot].prev = SENTINEL;
212 self.slab[slot].next = self.head;
213
214 if self.head != SENTINEL {
215 self.slab[self.head].prev = slot;
216 }
217 self.head = slot;
218
219 if self.tail == SENTINEL {
220 self.tail = slot;
221 }
222 }
223
224 fn evict_tail(&mut self) -> Option<(String, RecordBatch)> {
226 if self.tail == SENTINEL {
227 return None;
228 }
229 let slot = self.tail;
230 self.detach(slot);
231
232 let node = &mut self.slab[slot];
233 let key = std::mem::take(&mut node.key);
234 let value = RecordBatch::new_empty(node.value.schema());
235 let evicted_value = std::mem::replace(&mut node.value, value);
236
237 self.index.remove(&key);
238 self.free.push(slot);
239 self.evictions += 1;
240
241 Some((key, evicted_value))
242 }
243}
244
245pub struct TableXorFilter {
253 filter: Option<xorf::Xor8>,
254 hash_fn: fn(&str) -> u64,
256 short_circuit_count: u64,
258}
259
260impl TableXorFilter {
261 #[must_use]
263 pub fn new() -> Self {
264 Self {
265 filter: None,
266 hash_fn: hash_key,
267 short_circuit_count: 0,
268 }
269 }
270
271 #[must_use]
273 pub fn build(keys: &[String]) -> Self {
274 let mut f = Self::new();
275 f.rebuild(keys);
276 f
277 }
278
279 pub fn rebuild(&mut self, keys: &[String]) {
281 if keys.is_empty() {
282 self.filter = None;
283 return;
284 }
285 let hashes: Vec<u64> = keys.iter().map(|k| (self.hash_fn)(k)).collect();
286 self.filter = Some(xorf::Xor8::from(&hashes));
287 }
288
289 pub fn contains(&mut self, key: &str) -> bool {
293 match &self.filter {
294 Some(f) => {
295 let h = (self.hash_fn)(key);
296 let result = xorf::Filter::contains(f, &h);
297 if !result {
298 self.short_circuit_count += 1;
299 }
300 result
301 }
302 None => true, }
304 }
305
306 #[must_use]
308 pub fn short_circuits(&self) -> u64 {
309 self.short_circuit_count
310 }
311
312 #[must_use]
314 pub fn is_built(&self) -> bool {
315 self.filter.is_some()
316 }
317
318 pub fn clear(&mut self) {
320 self.filter = None;
321 self.short_circuit_count = 0;
322 }
323}
324
325impl Default for TableXorFilter {
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331fn hash_key(key: &str) -> u64 {
333 let mut hasher = rustc_hash::FxHasher::default();
334 key.hash(&mut hasher);
335 hasher.finish()
336}
337
338#[derive(Debug, Clone)]
342pub struct TableCacheMetrics {
343 pub cache_gets: u64,
345 pub cache_hits: u64,
347 pub cache_hit_rate: f64,
349 pub cache_evictions: u64,
351 pub cache_entries: usize,
353 pub cache_max_entries: usize,
355 pub xor_filter_built: bool,
357 pub xor_short_circuits: u64,
359}
360
361#[derive(Debug)]
365pub enum LookupDecision {
366 NotFound,
368 Found(RecordBatch),
370 CacheMiss,
372}
373
374#[must_use]
378pub fn collect_cache_metrics(lru: &TableLruCache, xor: &TableXorFilter) -> TableCacheMetrics {
379 TableCacheMetrics {
380 cache_gets: lru.total_gets(),
381 cache_hits: lru.total_hits(),
382 cache_hit_rate: lru.hit_rate(),
383 cache_evictions: lru.total_evictions(),
384 cache_entries: lru.len(),
385 cache_max_entries: lru.max_entries(),
386 xor_filter_built: xor.is_built(),
387 xor_short_circuits: xor.short_circuits(),
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use std::sync::Arc;
395
396 use arrow::array::{Int32Array, StringArray};
397 use arrow::datatypes::{DataType, Field, Schema};
398
399 fn test_schema() -> Arc<Schema> {
400 Arc::new(Schema::new(vec![
401 Field::new("id", DataType::Int32, false),
402 Field::new("name", DataType::Utf8, false),
403 ]))
404 }
405
406 fn make_row(id: i32, name: &str) -> RecordBatch {
407 RecordBatch::try_new(
408 test_schema(),
409 vec![
410 Arc::new(Int32Array::from(vec![id])),
411 Arc::new(StringArray::from(vec![name])),
412 ],
413 )
414 .unwrap()
415 }
416
417 #[test]
420 fn test_lru_insert_and_get() {
421 let mut cache = TableLruCache::new(10);
422 cache.insert("a".to_string(), make_row(1, "Alice"));
423 let row = cache.get("a").unwrap();
424 assert_eq!(row.num_rows(), 1);
425 }
426
427 #[test]
428 fn test_lru_miss() {
429 let mut cache = TableLruCache::new(10);
430 assert!(cache.get("missing").is_none());
431 }
432
433 #[test]
434 fn test_lru_eviction() {
435 let mut cache = TableLruCache::new(2);
436 cache.insert("a".to_string(), make_row(1, "Alice"));
437 cache.insert("b".to_string(), make_row(2, "Bob"));
438
439 let evicted = cache.insert("c".to_string(), make_row(3, "Charlie"));
441 assert!(evicted.is_some());
442 let (key, _) = evicted.unwrap();
443 assert_eq!(key, "a");
444
445 assert!(cache.get("a").is_none());
446 assert!(cache.get("b").is_some());
447 assert!(cache.get("c").is_some());
448 }
449
450 #[test]
451 fn test_lru_promotes_on_get() {
452 let mut cache = TableLruCache::new(2);
453 cache.insert("a".to_string(), make_row(1, "Alice"));
454 cache.insert("b".to_string(), make_row(2, "Bob"));
455
456 cache.get("a");
458
459 let evicted = cache.insert("c".to_string(), make_row(3, "Charlie"));
461 assert_eq!(evicted.unwrap().0, "b");
462 assert!(cache.get("a").is_some());
463 }
464
465 #[test]
466 fn test_lru_update_existing_key() {
467 let mut cache = TableLruCache::new(10);
468 cache.insert("a".to_string(), make_row(1, "Old"));
469 cache.insert("a".to_string(), make_row(1, "New"));
470
471 assert_eq!(cache.len(), 1);
472 let row = cache.get("a").unwrap();
473 let names = row
474 .column(1)
475 .as_any()
476 .downcast_ref::<StringArray>()
477 .unwrap();
478 assert_eq!(names.value(0), "New");
479 }
480
481 #[test]
482 fn test_lru_invalidate() {
483 let mut cache = TableLruCache::new(10);
484 cache.insert("a".to_string(), make_row(1, "Alice"));
485 assert!(cache.invalidate("a"));
486 assert!(cache.get("a").is_none());
487 assert!(!cache.invalidate("a")); }
489
490 #[test]
491 fn test_lru_clear() {
492 let mut cache = TableLruCache::new(10);
493 cache.insert("a".to_string(), make_row(1, "Alice"));
494 cache.insert("b".to_string(), make_row(2, "Bob"));
495 cache.clear();
496 assert!(cache.is_empty());
497 assert_eq!(cache.len(), 0);
498 }
499
500 #[test]
501 fn test_lru_metrics() {
502 let mut cache = TableLruCache::new(2);
503 cache.insert("a".to_string(), make_row(1, "Alice"));
504
505 cache.get("a"); cache.get("b"); cache.get("a"); assert_eq!(cache.total_gets(), 3);
510 assert_eq!(cache.total_hits(), 2);
511 assert!((cache.hit_rate() - 2.0 / 3.0).abs() < 0.001);
512 }
513
514 #[test]
515 fn test_lru_eviction_counter() {
516 let mut cache = TableLruCache::new(1);
517 cache.insert("a".to_string(), make_row(1, "Alice"));
518 cache.insert("b".to_string(), make_row(2, "Bob"));
519 cache.insert("c".to_string(), make_row(3, "Charlie"));
520 assert_eq!(cache.total_evictions(), 2);
521 }
522
523 #[test]
524 fn test_lru_hit_rate_zero_gets() {
525 let cache = TableLruCache::new(10);
526 assert!((cache.hit_rate() - 0.0).abs() < f64::EPSILON);
527 }
528
529 #[test]
530 fn test_lru_slab_reuse() {
531 let mut cache = TableLruCache::new(2);
532 cache.insert("a".to_string(), make_row(1, "Alice"));
533 cache.insert("b".to_string(), make_row(2, "Bob"));
534 cache.insert("c".to_string(), make_row(3, "Charlie"));
536 cache.insert("d".to_string(), make_row(4, "Dave"));
538
539 assert_eq!(cache.len(), 2);
541 assert!(cache.get("c").is_some());
542 assert!(cache.get("d").is_some());
543 }
544
545 #[test]
546 fn test_lru_single_capacity() {
547 let mut cache = TableLruCache::new(1);
548 cache.insert("a".to_string(), make_row(1, "Alice"));
549 let evicted = cache.insert("b".to_string(), make_row(2, "Bob"));
550 assert_eq!(evicted.unwrap().0, "a");
551 assert_eq!(cache.len(), 1);
552 assert!(cache.get("b").is_some());
553 }
554
555 #[test]
558 fn test_xor_build_and_contains() {
559 let keys = vec![
560 "apple".to_string(),
561 "banana".to_string(),
562 "cherry".to_string(),
563 ];
564 let mut filter = TableXorFilter::build(&keys);
565
566 assert!(filter.contains("apple"));
568 assert!(filter.contains("banana"));
569 assert!(filter.contains("cherry"));
570 assert!(filter.is_built());
571 }
572
573 #[test]
574 fn test_xor_short_circuits_missing() {
575 let keys: Vec<String> = (0..1000).map(|i| format!("key_{i}")).collect();
576 let mut filter = TableXorFilter::build(&keys);
577
578 let mut short_circuited = 0;
580 for i in 1000..2000 {
581 if !filter.contains(&format!("key_{i}")) {
582 short_circuited += 1;
583 }
584 }
585 assert!(
587 short_circuited > 900,
588 "Expected >900 short-circuits, got {short_circuited}"
589 );
590 assert!(filter.short_circuits() > 900);
591 }
592
593 #[test]
594 fn test_xor_empty_permissive() {
595 let mut filter = TableXorFilter::new();
596 assert!(!filter.is_built());
598 assert!(filter.contains("anything"));
599 assert_eq!(filter.short_circuits(), 0);
600 }
601
602 #[test]
603 fn test_xor_empty_keys_permissive() {
604 let mut filter = TableXorFilter::build(&[]);
605 assert!(!filter.is_built());
606 assert!(filter.contains("anything"));
607 }
608
609 #[test]
610 fn test_xor_rebuild() {
611 let keys1 = vec!["a".to_string(), "b".to_string()];
612 let mut filter = TableXorFilter::build(&keys1);
613 assert!(filter.is_built());
614
615 let keys2 = vec!["x".to_string(), "y".to_string()];
616 filter.rebuild(&keys2);
617 assert!(filter.is_built());
618
619 assert!(filter.contains("x"));
621 assert!(filter.contains("y"));
622 }
623
624 #[test]
625 fn test_xor_clear() {
626 let keys = vec!["a".to_string(), "b".to_string()];
627 let mut filter = TableXorFilter::build(&keys);
628 assert!(filter.is_built());
629
630 filter.clear();
631 assert!(!filter.is_built());
632 assert_eq!(filter.short_circuits(), 0);
633 }
634
635 #[test]
636 fn test_xor_default() {
637 let filter = TableXorFilter::default();
638 assert!(!filter.is_built());
639 }
640
641 #[test]
642 fn test_xor_short_circuit_counter() {
643 let keys = vec!["only_key".to_string()];
644 let mut filter = TableXorFilter::build(&keys);
645
646 assert!(filter.contains("only_key"));
648 assert_eq!(filter.short_circuits(), 0);
649 }
650
651 #[test]
654 fn test_collect_cache_metrics() {
655 let mut lru = TableLruCache::new(100);
656 lru.insert("a".to_string(), make_row(1, "Alice"));
657 lru.get("a");
658 lru.get("missing");
659
660 let keys = vec!["a".to_string()];
661 let xor = TableXorFilter::build(&keys);
662
663 let metrics = collect_cache_metrics(&lru, &xor);
664 assert_eq!(metrics.cache_gets, 2);
665 assert_eq!(metrics.cache_hits, 1);
666 assert_eq!(metrics.cache_entries, 1);
667 assert_eq!(metrics.cache_max_entries, 100);
668 assert!(metrics.xor_filter_built);
669 assert_eq!(metrics.xor_short_circuits, 0);
670 }
671
672 #[test]
675 fn test_lookup_decision_variants() {
676 let row = make_row(1, "Alice");
677 let found = LookupDecision::Found(row);
678 assert!(matches!(found, LookupDecision::Found(_)));
679 assert!(matches!(LookupDecision::NotFound, LookupDecision::NotFound));
680 assert!(matches!(
681 LookupDecision::CacheMiss,
682 LookupDecision::CacheMiss
683 ));
684 }
685}