laminar_core/lookup/table.rs
1//! Lookup table trait and configuration types.
2//!
3//! A [`LookupTable`] provides synchronous key-value access for enriching
4//! stream events via lookup joins. The trait is `Send + Sync` so tables
5//! can be shared across operator instances in a thread-per-core runtime.
6//!
7//! ## Lookup Flow
8//!
9//! 1. Stream event arrives at a lookup join operator
10//! 2. Operator calls [`get_cached`](LookupTable::get_cached) (Ring 0, < 500ns)
11//! 3. On cache miss, calls [`get`](LookupTable::get) (may hit disk/network)
12//! 4. Result is joined with the stream event
13
14use std::time::Duration;
15
16use arrow_array::RecordBatch;
17
18/// Result of a lookup operation.
19#[derive(Debug, Clone)]
20pub enum LookupResult {
21 /// Cache hit — value found in memory.
22 Hit(RecordBatch),
23 /// The lookup is in progress (async source query pending).
24 Pending,
25 /// Key does not exist in the source.
26 NotFound,
27}
28
29impl LookupResult {
30 /// Returns `true` if this is a cache hit.
31 #[must_use]
32 pub const fn is_hit(&self) -> bool {
33 matches!(self, Self::Hit(_))
34 }
35
36 /// Returns `true` if the key was not found.
37 #[must_use]
38 pub const fn is_not_found(&self) -> bool {
39 matches!(self, Self::NotFound)
40 }
41
42 /// Extracts the `RecordBatch` from a `Hit`, consuming `self`.
43 #[must_use]
44 pub fn into_batch(self) -> Option<RecordBatch> {
45 match self {
46 Self::Hit(b) => Some(b),
47 _ => None,
48 }
49 }
50}
51
52impl PartialEq for LookupResult {
53 fn eq(&self, other: &Self) -> bool {
54 match (self, other) {
55 (Self::Hit(a), Self::Hit(b)) => a == b,
56 (Self::Pending, Self::Pending) | (Self::NotFound, Self::NotFound) => true,
57 _ => false,
58 }
59 }
60}
61
62impl Eq for LookupResult {}
63
64/// Strategy for how lookup table data is distributed.
65#[derive(Debug, Clone, Default, PartialEq, Eq)]
66pub enum LookupStrategy {
67 /// Full copy on every node (small reference tables).
68 #[default]
69 Replicated,
70 /// Data partitioned across nodes by key hash.
71 Partitioned {
72 /// Number of partitions.
73 num_partitions: u32,
74 },
75 /// Queries go directly to the source (no local cache/copy).
76 SourceDirect,
77}
78
79/// Configuration for a lookup table instance.
80#[derive(Debug, Clone)]
81pub struct LookupTableConfig {
82 /// How data is distributed.
83 pub strategy: LookupStrategy,
84 /// Time-to-live for cached entries.
85 pub ttl: Option<Duration>,
86 /// Maximum number of cached entries.
87 pub max_cache_entries: usize,
88 /// Source connector name (for async refresh).
89 pub source: Option<String>,
90}
91
92impl Default for LookupTableConfig {
93 fn default() -> Self {
94 Self {
95 strategy: LookupStrategy::Replicated,
96 ttl: None,
97 max_cache_entries: 100_000,
98 source: None,
99 }
100 }
101}
102
103/// Synchronous lookup table interface for Ring 0 operators.
104///
105/// Implementations must be `Send + Sync` to allow sharing across threads
106/// in the thread-per-core runtime. The hot-path method is
107/// [`get_cached`](Self::get_cached) which should be < 500ns.
108///
109/// Writes (insert/invalidate) come from the Ring 1 CDC adapter or
110/// bulk-load path, not from the hot path.
111pub trait LookupTable: Send + Sync {
112 /// Look up a key in the in-memory cache only (Ring 0).
113 ///
114 /// This is the fast path — must not block or perform I/O.
115 /// Returns [`LookupResult::NotFound`] on cache miss; callers should
116 /// then try [`get`](Self::get) for a deeper lookup.
117 ///
118 /// # Performance
119 ///
120 /// Target: < 500ns
121 fn get_cached(&self, key: &[u8]) -> LookupResult;
122
123 /// Look up a key, potentially checking slower storage tiers.
124 ///
125 /// May check a disk cache, remote cache, or trigger an async
126 /// source query. Returns [`LookupResult::Pending`] if the lookup
127 /// requires an async fetch that hasn't completed yet.
128 fn get(&self, key: &[u8]) -> LookupResult;
129
130 /// Insert or update a cached entry.
131 ///
132 /// Called by the CDC adapter or bulk-load path when new data arrives
133 /// from the source. This is NOT on the hot path.
134 fn insert(&self, key: &[u8], value: RecordBatch);
135
136 /// Invalidate a cached entry.
137 ///
138 /// Called when the source signals a delete or the TTL expires.
139 fn invalidate(&self, key: &[u8]);
140
141 /// Number of entries currently in the cache.
142 fn len(&self) -> usize;
143
144 /// Whether the cache is empty.
145 fn is_empty(&self) -> bool {
146 self.len() == 0
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use arrow_array::StringArray;
154 use arrow_schema::{DataType, Field, Schema};
155 use std::sync::Arc;
156
157 fn test_batch() -> RecordBatch {
158 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Utf8, false)]));
159 RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec!["value"]))]).unwrap()
160 }
161
162 #[test]
163 fn test_lookup_result_methods() {
164 let hit = LookupResult::Hit(test_batch());
165 assert!(hit.is_hit());
166 assert!(!hit.is_not_found());
167 assert!(hit.into_batch().is_some());
168
169 let miss = LookupResult::NotFound;
170 assert!(miss.is_not_found());
171 assert!(!miss.is_hit());
172 assert!(miss.into_batch().is_none());
173
174 let pending = LookupResult::Pending;
175 assert!(!pending.is_hit());
176 assert!(!pending.is_not_found());
177 }
178
179 #[test]
180 fn test_lookup_strategy_default() {
181 let strategy = LookupStrategy::default();
182 assert_eq!(strategy, LookupStrategy::Replicated);
183 }
184
185 #[test]
186 fn test_lookup_table_config_default() {
187 let config = LookupTableConfig::default();
188 assert_eq!(config.strategy, LookupStrategy::Replicated);
189 assert!(config.ttl.is_none());
190 assert_eq!(config.max_cache_entries, 100_000);
191 assert!(config.source.is_none());
192 }
193}