Skip to main content

laminar_core/lookup/
table.rs

1//! Lookup table trait and configuration types.
2//!
3//! A [`LookupTable`] provides synchronous key-value access for enriching
4//! stream events via lookup joins. The trait is `Send + Sync` so tables
5//! can be shared across operator instances in a thread-per-core runtime.
6//!
7//! ## Lookup Flow
8//!
9//! 1. Stream event arrives at a lookup join operator
10//! 2. Operator calls [`get_cached`](LookupTable::get_cached) (Ring 0, < 500ns)
11//! 3. On cache miss, calls [`get`](LookupTable::get) (may hit disk/network)
12//! 4. Result is joined with the stream event
13
14use std::time::Duration;
15
16use arrow_array::RecordBatch;
17
18/// Result of a lookup operation.
19#[derive(Debug, Clone)]
20pub enum LookupResult {
21    /// Cache hit — value found in memory.
22    Hit(RecordBatch),
23    /// The lookup is in progress (async source query pending).
24    Pending,
25    /// Key does not exist in the source.
26    NotFound,
27}
28
29impl LookupResult {
30    /// Returns `true` if this is a cache hit.
31    #[must_use]
32    pub const fn is_hit(&self) -> bool {
33        matches!(self, Self::Hit(_))
34    }
35
36    /// Returns `true` if the key was not found.
37    #[must_use]
38    pub const fn is_not_found(&self) -> bool {
39        matches!(self, Self::NotFound)
40    }
41
42    /// Extracts the `RecordBatch` from a `Hit`, consuming `self`.
43    #[must_use]
44    pub fn into_batch(self) -> Option<RecordBatch> {
45        match self {
46            Self::Hit(b) => Some(b),
47            _ => None,
48        }
49    }
50}
51
52impl PartialEq for LookupResult {
53    fn eq(&self, other: &Self) -> bool {
54        match (self, other) {
55            (Self::Hit(a), Self::Hit(b)) => a == b,
56            (Self::Pending, Self::Pending) | (Self::NotFound, Self::NotFound) => true,
57            _ => false,
58        }
59    }
60}
61
62impl Eq for LookupResult {}
63
64/// Strategy for how lookup table data is distributed.
65#[derive(Debug, Clone, Default, PartialEq, Eq)]
66pub enum LookupStrategy {
67    /// Full copy on every node (small reference tables).
68    #[default]
69    Replicated,
70    /// Data partitioned across nodes by key hash.
71    Partitioned {
72        /// Number of partitions.
73        num_partitions: u32,
74    },
75    /// Queries go directly to the source (no local cache/copy).
76    SourceDirect,
77}
78
79/// Configuration for a lookup table instance.
80#[derive(Debug, Clone)]
81pub struct LookupTableConfig {
82    /// How data is distributed.
83    pub strategy: LookupStrategy,
84    /// Time-to-live for cached entries.
85    pub ttl: Option<Duration>,
86    /// Maximum number of cached entries.
87    pub max_cache_entries: usize,
88    /// Source connector name (for async refresh).
89    pub source: Option<String>,
90}
91
92impl Default for LookupTableConfig {
93    fn default() -> Self {
94        Self {
95            strategy: LookupStrategy::Replicated,
96            ttl: None,
97            max_cache_entries: 100_000,
98            source: None,
99        }
100    }
101}
102
103/// Synchronous lookup table interface for Ring 0 operators.
104///
105/// Implementations must be `Send + Sync` to allow sharing across threads
106/// in the thread-per-core runtime. The hot-path method is
107/// [`get_cached`](Self::get_cached) which should be < 500ns.
108///
109/// Writes (insert/invalidate) come from the Ring 1 CDC adapter or
110/// bulk-load path, not from the hot path.
111pub trait LookupTable: Send + Sync {
112    /// Look up a key in the in-memory cache only (Ring 0).
113    ///
114    /// This is the fast path — must not block or perform I/O.
115    /// Returns [`LookupResult::NotFound`] on cache miss; callers should
116    /// then try [`get`](Self::get) for a deeper lookup.
117    ///
118    /// # Performance
119    ///
120    /// Target: < 500ns
121    fn get_cached(&self, key: &[u8]) -> LookupResult;
122
123    /// Look up a key, potentially checking slower storage tiers.
124    ///
125    /// May check a disk cache, remote cache, or trigger an async
126    /// source query. Returns [`LookupResult::Pending`] if the lookup
127    /// requires an async fetch that hasn't completed yet.
128    fn get(&self, key: &[u8]) -> LookupResult;
129
130    /// Insert or update a cached entry.
131    ///
132    /// Called by the CDC adapter or bulk-load path when new data arrives
133    /// from the source. This is NOT on the hot path.
134    fn insert(&self, key: &[u8], value: RecordBatch);
135
136    /// Invalidate a cached entry.
137    ///
138    /// Called when the source signals a delete or the TTL expires.
139    fn invalidate(&self, key: &[u8]);
140
141    /// Number of entries currently in the cache.
142    fn len(&self) -> usize;
143
144    /// Whether the cache is empty.
145    fn is_empty(&self) -> bool {
146        self.len() == 0
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use arrow_array::StringArray;
154    use arrow_schema::{DataType, Field, Schema};
155    use std::sync::Arc;
156
157    fn test_batch() -> RecordBatch {
158        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Utf8, false)]));
159        RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec!["value"]))]).unwrap()
160    }
161
162    #[test]
163    fn test_lookup_result_methods() {
164        let hit = LookupResult::Hit(test_batch());
165        assert!(hit.is_hit());
166        assert!(!hit.is_not_found());
167        assert!(hit.into_batch().is_some());
168
169        let miss = LookupResult::NotFound;
170        assert!(miss.is_not_found());
171        assert!(!miss.is_hit());
172        assert!(miss.into_batch().is_none());
173
174        let pending = LookupResult::Pending;
175        assert!(!pending.is_hit());
176        assert!(!pending.is_not_found());
177    }
178
179    #[test]
180    fn test_lookup_strategy_default() {
181        let strategy = LookupStrategy::default();
182        assert_eq!(strategy, LookupStrategy::Replicated);
183    }
184
185    #[test]
186    fn test_lookup_table_config_default() {
187        let config = LookupTableConfig::default();
188        assert_eq!(config.strategy, LookupStrategy::Replicated);
189        assert!(config.ttl.is_none());
190        assert_eq!(config.max_cache_entries, 100_000);
191        assert!(config.source.is_none());
192    }
193}