Skip to main content

laminar_connectors/lookup/
mod.rs

1//! Lookup tables for enrichment joins.
2
3/// CDC-to-reference-table adapter for using CDC sources as lookup tables.
4pub mod cdc_adapter;
5
6/// Delta Lake reference table source for lookup/enrichment joins.
7pub mod delta_reference;
8
9/// Delta Lake on-demand lookup source for cache-miss fallback.
10#[cfg(feature = "delta-lake")]
11pub mod delta_lookup;
12
13/// PostgreSQL lookup source with connection pooling and predicate pushdown.
14#[cfg(feature = "postgres-cdc")]
15pub mod postgres_source;
16
17/// PostgreSQL poll-based reference table source (no CDC required).
18#[cfg(feature = "postgres-cdc")]
19pub mod postgres_reference;
20
21#[cfg(feature = "postgres-cdc")]
22pub use postgres_source::{PostgresLookupSource, PostgresLookupSourceConfig};
23
24/// Parquet file lookup source for static/slowly-changing dimension tables.
25#[cfg(feature = "parquet-lookup")]
26pub mod parquet_source;
27
28#[cfg(feature = "parquet-lookup")]
29pub use parquet_source::{ParquetLookupSource, ParquetLookupSourceConfig};
30
31use async_trait::async_trait;
32
33// Re-export the canonical lookup types from laminar-core.
34pub use laminar_core::lookup::{LookupError, LookupResult};
35
36/// Trait for loading data from external reference tables.
37///
38/// Implementations of this trait provide access to external data sources
39/// for enriching streaming events with dimension data.
40///
41/// # Thread Safety
42///
43/// Implementations must be `Send + Sync` to allow concurrent access from
44/// multiple operator instances.
45///
46/// # Performance Considerations
47///
48/// - Lookups may be called frequently (per-event), so implementations
49///   should be efficient
50/// - Consider batch lookups ([`TableLoader::lookup_batch`]) for better
51///   throughput when multiple keys need to be looked up
52/// - The `LookupJoinOperator` (in `laminar-core`)
53///   caches results, so implementations don't need their own cache
54#[async_trait]
55pub trait TableLoader: Send + Sync {
56    /// Looks up a single key in the table.
57    ///
58    /// # Returns
59    ///
60    /// - `Ok(LookupResult::Hit(batch))` if the key exists
61    /// - `Ok(LookupResult::NotFound)` if the key doesn't exist
62    /// - `Err(LookupError)` if the lookup failed
63    async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError>;
64
65    /// Looks up multiple keys in a single batch operation.
66    ///
67    /// Default implementation calls [`lookup`](TableLoader::lookup) for each key.
68    /// Implementations should override this for better performance when the
69    /// underlying system supports batch queries.
70    async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
71        let mut results = Vec::with_capacity(keys.len());
72        for key in keys {
73            results.push(self.lookup(key).await?);
74        }
75        Ok(results)
76    }
77
78    /// Returns the name of this table loader for logging/debugging.
79    fn name(&self) -> &str;
80
81    /// Checks if the table loader is healthy and can accept requests.
82    ///
83    /// Default implementation returns `true`. Override for loaders that
84    /// need to maintain connections to external systems.
85    async fn health_check(&self) -> bool {
86        true
87    }
88
89    /// Closes the table loader and releases any resources.
90    ///
91    /// Default implementation does nothing. Override for loaders that
92    /// hold connections or other resources.
93    async fn close(&self) -> Result<(), LookupError> {
94        Ok(())
95    }
96}
97
98/// A no-op table loader that always returns `NotFound`.
99///
100/// Useful for testing the lookup join operator without an actual data source.
101#[cfg(any(test, feature = "testing"))]
102#[derive(Debug, Clone, Default)]
103pub struct NoOpTableLoader;
104
105#[cfg(any(test, feature = "testing"))]
106impl NoOpTableLoader {
107    /// Creates a new no-op table loader.
108    #[must_use]
109    pub fn new() -> Self {
110        Self
111    }
112}
113
114#[cfg(any(test, feature = "testing"))]
115#[async_trait]
116impl TableLoader for NoOpTableLoader {
117    async fn lookup(&self, _key: &[u8]) -> Result<LookupResult, LookupError> {
118        Ok(LookupResult::NotFound)
119    }
120
121    #[allow(clippy::unnecessary_literal_bound)]
122    fn name(&self) -> &str {
123        "no_op"
124    }
125}
126
127#[cfg(test)]
128use arrow_array::RecordBatch;
129
130#[cfg(test)]
131#[derive(Debug, Clone)]
132pub(crate) struct InMemoryTableLoader {
133    data: std::sync::Arc<parking_lot::RwLock<rustc_hash::FxHashMap<Vec<u8>, RecordBatch>>>,
134    name: String,
135}
136
137#[cfg(test)]
138impl InMemoryTableLoader {
139    pub fn new() -> Self {
140        Self::with_name("in_memory")
141    }
142
143    pub fn with_name(name: impl Into<String>) -> Self {
144        Self {
145            data: std::sync::Arc::new(parking_lot::RwLock::new(rustc_hash::FxHashMap::default())),
146            name: name.into(),
147        }
148    }
149
150    pub fn insert(&self, key: Vec<u8>, value: RecordBatch) {
151        self.data.write().insert(key, value);
152    }
153
154    pub fn remove(&self, key: &[u8]) -> Option<RecordBatch> {
155        self.data.write().remove(key)
156    }
157
158    pub fn len(&self) -> usize {
159        self.data.read().len()
160    }
161
162    pub fn is_empty(&self) -> bool {
163        self.data.read().is_empty()
164    }
165
166    pub fn clear(&self) {
167        self.data.write().clear();
168    }
169}
170
171#[cfg(test)]
172#[async_trait]
173impl TableLoader for InMemoryTableLoader {
174    async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError> {
175        let data = self.data.read();
176        match data.get(key) {
177            Some(batch) => Ok(LookupResult::Hit(batch.clone())),
178            None => Ok(LookupResult::NotFound),
179        }
180    }
181
182    async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
183        let data = self.data.read();
184        let results = keys
185            .iter()
186            .map(|key| match data.get(*key) {
187                Some(batch) => LookupResult::Hit(batch.clone()),
188                None => LookupResult::NotFound,
189            })
190            .collect();
191        Ok(results)
192    }
193
194    fn name(&self) -> &str {
195        &self.name
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use arrow_array::StringArray;
203    use arrow_schema::{DataType, Field, Schema};
204    use std::sync::Arc;
205
206    fn create_customer_batch(id: &str, name: &str, tier: &str) -> RecordBatch {
207        let schema = Arc::new(Schema::new(vec![
208            Field::new("customer_id", DataType::Utf8, false),
209            Field::new("name", DataType::Utf8, false),
210            Field::new("tier", DataType::Utf8, false),
211        ]));
212        RecordBatch::try_new(
213            schema,
214            vec![
215                Arc::new(StringArray::from(vec![id])),
216                Arc::new(StringArray::from(vec![name])),
217                Arc::new(StringArray::from(vec![tier])),
218            ],
219        )
220        .unwrap()
221    }
222
223    #[tokio::test]
224    async fn test_in_memory_loader_basic() {
225        let loader = InMemoryTableLoader::new();
226
227        // Insert test data
228        loader.insert(
229            b"cust_1".to_vec(),
230            create_customer_batch("cust_1", "Alice", "gold"),
231        );
232        loader.insert(
233            b"cust_2".to_vec(),
234            create_customer_batch("cust_2", "Bob", "silver"),
235        );
236
237        assert_eq!(loader.len(), 2);
238
239        // Lookup existing key
240        let result = loader.lookup(b"cust_1").await.unwrap();
241        assert!(result.is_hit());
242        let batch = result.into_batch().unwrap();
243        assert_eq!(batch.num_rows(), 1);
244
245        // Lookup missing key
246        let result = loader.lookup(b"cust_999").await.unwrap();
247        assert!(!result.is_hit());
248    }
249
250    #[tokio::test]
251    async fn test_in_memory_loader_batch_lookup() {
252        let loader = InMemoryTableLoader::new();
253        loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
254        loader.insert(b"k3".to_vec(), create_customer_batch("k3", "C", "bronze"));
255
256        let keys: Vec<&[u8]> = vec![b"k1", b"k2", b"k3"];
257        let results = loader.lookup_batch(&keys).await.unwrap();
258
259        assert_eq!(results.len(), 3);
260        assert!(results[0].is_hit()); // k1 exists
261        assert!(!results[1].is_hit()); // k2 doesn't exist
262        assert!(results[2].is_hit()); // k3 exists
263    }
264
265    #[tokio::test]
266    async fn test_in_memory_loader_remove() {
267        let loader = InMemoryTableLoader::new();
268        loader.insert(
269            b"key".to_vec(),
270            create_customer_batch("key", "Test", "gold"),
271        );
272
273        assert_eq!(loader.len(), 1);
274
275        let removed = loader.remove(b"key");
276        assert!(removed.is_some());
277        assert_eq!(loader.len(), 0);
278
279        let result = loader.lookup(b"key").await.unwrap();
280        assert!(!result.is_hit());
281    }
282
283    #[tokio::test]
284    async fn test_no_op_loader() {
285        let loader = NoOpTableLoader::new();
286
287        let result = loader.lookup(b"any_key").await.unwrap();
288        assert!(!result.is_hit());
289        assert_eq!(loader.name(), "no_op");
290    }
291
292    #[tokio::test]
293    async fn test_in_memory_loader_clear() {
294        let loader = InMemoryTableLoader::new();
295        loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
296        loader.insert(b"k2".to_vec(), create_customer_batch("k2", "B", "silver"));
297
298        assert!(!loader.is_empty());
299        loader.clear();
300        assert!(loader.is_empty());
301        assert_eq!(loader.len(), 0);
302    }
303
304    #[tokio::test]
305    async fn test_table_loader_health_check() {
306        let loader = InMemoryTableLoader::new();
307        assert!(loader.health_check().await);
308    }
309}