Skip to main content

laminar_connectors/lookup/
mod.rs

1//! # External Lookup Tables for Enrichment Joins
2//!
3//! This module provides the `TableLoader` trait for loading data from external
4//! reference tables (dimension tables) to enrich streaming events.
5//!
6//! ## Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────┐
10//! │                    LookupJoinOperator                       │
11//! │  (laminar-core)                                             │
12//! │  ┌─────────┐    ┌─────────┐    ┌─────────────────────────┐ │
13//! │  │ Event   │───▶│  Cache  │───▶│  Output (enriched)      │ │
14//! │  │ Stream  │    │ (State) │    │                         │ │
15//! │  └─────────┘    └────┬────┘    └─────────────────────────┘ │
16//! │                      │ miss                                 │
17//! │                      ▼                                      │
18//! │               ┌─────────────┐                               │
19//! │               │ TableLoader │  (trait, implemented here)    │
20//! │               └──────┬──────┘                               │
21//! └──────────────────────┼──────────────────────────────────────┘
22//!                        │
23//!                        ▼
24//!             ┌──────────────────────┐
25//!             │   External Systems   │
26//!             │ (Redis, PostgreSQL,  │
27//!             │  HTTP APIs, etc.)    │
28//!             └──────────────────────┘
29//! ```
30//!
31//! ## Implementations
32//!
33//! - `InMemoryTableLoader` - For testing and static reference data
34//! - Redis, PostgreSQL, HTTP loaders
35
36/// CDC-to-reference-table adapter for using CDC sources as lookup tables.
37pub mod cdc_adapter;
38
39/// PostgreSQL lookup source with connection pooling and predicate pushdown.
40#[cfg(feature = "postgres-cdc")]
41pub mod postgres_source;
42
43#[cfg(feature = "postgres-cdc")]
44pub use postgres_source::{PostgresLookupSource, PostgresLookupSourceConfig};
45
46/// Parquet file lookup source for static/slowly-changing dimension tables.
47#[cfg(feature = "parquet-lookup")]
48pub mod parquet_source;
49
50#[cfg(feature = "parquet-lookup")]
51pub use parquet_source::{ParquetLookupSource, ParquetLookupSourceConfig};
52
53use arrow_array::RecordBatch;
54use async_trait::async_trait;
55use thiserror::Error;
56
57/// Errors that can occur during table lookup operations.
58#[derive(Debug, Error)]
59pub enum LookupError {
60    /// The requested key was not found in the table.
61    #[error("Key not found")]
62    KeyNotFound,
63
64    /// Connection to the external system failed.
65    #[error("Connection failed: {0}")]
66    ConnectionFailed(String),
67
68    /// Query execution failed.
69    #[error("Query failed: {0}")]
70    QueryFailed(String),
71
72    /// Timeout waiting for response.
73    #[error("Timeout after {0}ms")]
74    Timeout(u64),
75
76    /// Serialization/deserialization error.
77    #[error("Serialization error: {0}")]
78    SerializationError(String),
79
80    /// The table loader is not available (e.g., not initialized).
81    #[error("Loader not available: {0}")]
82    NotAvailable(String),
83}
84
85/// Result of a lookup operation.
86#[derive(Debug, Clone)]
87pub enum LookupResult {
88    /// The key was found with the associated data.
89    Found(RecordBatch),
90    /// The key was not found in the table.
91    NotFound,
92}
93
94impl LookupResult {
95    /// Returns `true` if the lookup found a result.
96    #[must_use]
97    pub fn is_found(&self) -> bool {
98        matches!(self, LookupResult::Found(_))
99    }
100
101    /// Returns the found batch, or `None` if not found.
102    #[must_use]
103    pub fn into_batch(self) -> Option<RecordBatch> {
104        match self {
105            LookupResult::Found(batch) => Some(batch),
106            LookupResult::NotFound => None,
107        }
108    }
109}
110
111/// Trait for loading data from external reference tables.
112///
113/// Implementations of this trait provide access to external data sources
114/// for enriching streaming events with dimension data.
115///
116/// # Thread Safety
117///
118/// Implementations must be `Send + Sync` to allow concurrent access from
119/// multiple operator instances.
120///
121/// # Performance Considerations
122///
123/// - Lookups may be called frequently (per-event), so implementations
124///   should be efficient
125/// - Consider batch lookups ([`TableLoader::lookup_batch`]) for better
126///   throughput when multiple keys need to be looked up
127/// - The `LookupJoinOperator` (in `laminar-core`)
128///   caches results, so implementations don't need their own cache
129#[async_trait]
130pub trait TableLoader: Send + Sync {
131    /// Looks up a single key in the table.
132    ///
133    /// # Arguments
134    ///
135    /// * `key` - The key to look up (typically the join column value)
136    ///
137    /// # Returns
138    ///
139    /// - `Ok(LookupResult::Found(batch))` if the key exists
140    /// - `Ok(LookupResult::NotFound)` if the key doesn't exist
141    /// - `Err(LookupError)` if the lookup failed
142    async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError>;
143
144    /// Looks up multiple keys in a single batch operation.
145    ///
146    /// Default implementation calls [`lookup`](TableLoader::lookup) for each key.
147    /// Implementations should override this for better performance when the
148    /// underlying system supports batch queries.
149    ///
150    /// # Arguments
151    ///
152    /// * `keys` - The keys to look up
153    ///
154    /// # Returns
155    ///
156    /// A vector of results in the same order as the input keys.
157    async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
158        let mut results = Vec::with_capacity(keys.len());
159        for key in keys {
160            results.push(self.lookup(key).await?);
161        }
162        Ok(results)
163    }
164
165    /// Returns the name of this table loader for logging/debugging.
166    fn name(&self) -> &str;
167
168    /// Checks if the table loader is healthy and can accept requests.
169    ///
170    /// Default implementation returns `true`. Override for loaders that
171    /// need to maintain connections to external systems.
172    async fn health_check(&self) -> bool {
173        true
174    }
175
176    /// Closes the table loader and releases any resources.
177    ///
178    /// Default implementation does nothing. Override for loaders that
179    /// hold connections or other resources.
180    async fn close(&self) -> Result<(), LookupError> {
181        Ok(())
182    }
183}
184
185/// A no-op table loader that always returns `NotFound`.
186///
187/// Useful for testing the lookup join operator without an actual data source.
188#[cfg(any(test, feature = "testing"))]
189#[derive(Debug, Clone, Default)]
190pub struct NoOpTableLoader;
191
192#[cfg(any(test, feature = "testing"))]
193impl NoOpTableLoader {
194    /// Creates a new no-op table loader.
195    #[must_use]
196    pub fn new() -> Self {
197        Self
198    }
199}
200
201#[cfg(any(test, feature = "testing"))]
202#[async_trait]
203impl TableLoader for NoOpTableLoader {
204    async fn lookup(&self, _key: &[u8]) -> Result<LookupResult, LookupError> {
205        Ok(LookupResult::NotFound)
206    }
207
208    #[allow(clippy::unnecessary_literal_bound)]
209    fn name(&self) -> &str {
210        "no_op"
211    }
212}
213
214#[cfg(test)]
215#[derive(Debug, Clone)]
216pub(crate) struct InMemoryTableLoader {
217    data: std::sync::Arc<parking_lot::RwLock<rustc_hash::FxHashMap<Vec<u8>, RecordBatch>>>,
218    name: String,
219}
220
221#[cfg(test)]
222impl InMemoryTableLoader {
223    pub fn new() -> Self {
224        Self::with_name("in_memory")
225    }
226
227    pub fn with_name(name: impl Into<String>) -> Self {
228        Self {
229            data: std::sync::Arc::new(parking_lot::RwLock::new(rustc_hash::FxHashMap::default())),
230            name: name.into(),
231        }
232    }
233
234    pub fn insert(&self, key: Vec<u8>, value: RecordBatch) {
235        self.data.write().insert(key, value);
236    }
237
238    pub fn remove(&self, key: &[u8]) -> Option<RecordBatch> {
239        self.data.write().remove(key)
240    }
241
242    pub fn len(&self) -> usize {
243        self.data.read().len()
244    }
245
246    pub fn is_empty(&self) -> bool {
247        self.data.read().is_empty()
248    }
249
250    pub fn clear(&self) {
251        self.data.write().clear();
252    }
253}
254
255#[cfg(test)]
256#[async_trait]
257impl TableLoader for InMemoryTableLoader {
258    async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError> {
259        let data = self.data.read();
260        match data.get(key) {
261            Some(batch) => Ok(LookupResult::Found(batch.clone())),
262            None => Ok(LookupResult::NotFound),
263        }
264    }
265
266    async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
267        let data = self.data.read();
268        let results = keys
269            .iter()
270            .map(|key| match data.get(*key) {
271                Some(batch) => LookupResult::Found(batch.clone()),
272                None => LookupResult::NotFound,
273            })
274            .collect();
275        Ok(results)
276    }
277
278    fn name(&self) -> &str {
279        &self.name
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use arrow_array::StringArray;
287    use arrow_schema::{DataType, Field, Schema};
288    use std::sync::Arc;
289
290    fn create_customer_batch(id: &str, name: &str, tier: &str) -> RecordBatch {
291        let schema = Arc::new(Schema::new(vec![
292            Field::new("customer_id", DataType::Utf8, false),
293            Field::new("name", DataType::Utf8, false),
294            Field::new("tier", DataType::Utf8, false),
295        ]));
296        RecordBatch::try_new(
297            schema,
298            vec![
299                Arc::new(StringArray::from(vec![id])),
300                Arc::new(StringArray::from(vec![name])),
301                Arc::new(StringArray::from(vec![tier])),
302            ],
303        )
304        .unwrap()
305    }
306
307    #[tokio::test]
308    async fn test_in_memory_loader_basic() {
309        let loader = InMemoryTableLoader::new();
310
311        // Insert test data
312        loader.insert(
313            b"cust_1".to_vec(),
314            create_customer_batch("cust_1", "Alice", "gold"),
315        );
316        loader.insert(
317            b"cust_2".to_vec(),
318            create_customer_batch("cust_2", "Bob", "silver"),
319        );
320
321        assert_eq!(loader.len(), 2);
322
323        // Lookup existing key
324        let result = loader.lookup(b"cust_1").await.unwrap();
325        assert!(result.is_found());
326        let batch = result.into_batch().unwrap();
327        assert_eq!(batch.num_rows(), 1);
328
329        // Lookup missing key
330        let result = loader.lookup(b"cust_999").await.unwrap();
331        assert!(!result.is_found());
332    }
333
334    #[tokio::test]
335    async fn test_in_memory_loader_batch_lookup() {
336        let loader = InMemoryTableLoader::new();
337        loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
338        loader.insert(b"k3".to_vec(), create_customer_batch("k3", "C", "bronze"));
339
340        let keys: Vec<&[u8]> = vec![b"k1", b"k2", b"k3"];
341        let results = loader.lookup_batch(&keys).await.unwrap();
342
343        assert_eq!(results.len(), 3);
344        assert!(results[0].is_found()); // k1 exists
345        assert!(!results[1].is_found()); // k2 doesn't exist
346        assert!(results[2].is_found()); // k3 exists
347    }
348
349    #[tokio::test]
350    async fn test_in_memory_loader_remove() {
351        let loader = InMemoryTableLoader::new();
352        loader.insert(
353            b"key".to_vec(),
354            create_customer_batch("key", "Test", "gold"),
355        );
356
357        assert_eq!(loader.len(), 1);
358
359        let removed = loader.remove(b"key");
360        assert!(removed.is_some());
361        assert_eq!(loader.len(), 0);
362
363        let result = loader.lookup(b"key").await.unwrap();
364        assert!(!result.is_found());
365    }
366
367    #[tokio::test]
368    async fn test_no_op_loader() {
369        let loader = NoOpTableLoader::new();
370
371        let result = loader.lookup(b"any_key").await.unwrap();
372        assert!(!result.is_found());
373        assert_eq!(loader.name(), "no_op");
374    }
375
376    #[tokio::test]
377    async fn test_in_memory_loader_clear() {
378        let loader = InMemoryTableLoader::new();
379        loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
380        loader.insert(b"k2".to_vec(), create_customer_batch("k2", "B", "silver"));
381
382        assert!(!loader.is_empty());
383        loader.clear();
384        assert!(loader.is_empty());
385        assert_eq!(loader.len(), 0);
386    }
387
388    #[tokio::test]
389    async fn test_table_loader_health_check() {
390        let loader = InMemoryTableLoader::new();
391        assert!(loader.health_check().await);
392    }
393}