laminar_connectors/lookup/
mod.rs1pub mod cdc_adapter;
38
39#[cfg(feature = "postgres-cdc")]
41pub mod postgres_source;
42
43#[cfg(feature = "postgres-cdc")]
44pub use postgres_source::{PostgresLookupSource, PostgresLookupSourceConfig};
45
46#[cfg(feature = "parquet-lookup")]
48pub mod parquet_source;
49
50#[cfg(feature = "parquet-lookup")]
51pub use parquet_source::{ParquetLookupSource, ParquetLookupSourceConfig};
52
53use arrow_array::RecordBatch;
54use async_trait::async_trait;
55use thiserror::Error;
56
57#[derive(Debug, Error)]
59pub enum LookupError {
60 #[error("Key not found")]
62 KeyNotFound,
63
64 #[error("Connection failed: {0}")]
66 ConnectionFailed(String),
67
68 #[error("Query failed: {0}")]
70 QueryFailed(String),
71
72 #[error("Timeout after {0}ms")]
74 Timeout(u64),
75
76 #[error("Serialization error: {0}")]
78 SerializationError(String),
79
80 #[error("Loader not available: {0}")]
82 NotAvailable(String),
83}
84
85#[derive(Debug, Clone)]
87pub enum LookupResult {
88 Found(RecordBatch),
90 NotFound,
92}
93
94impl LookupResult {
95 #[must_use]
97 pub fn is_found(&self) -> bool {
98 matches!(self, LookupResult::Found(_))
99 }
100
101 #[must_use]
103 pub fn into_batch(self) -> Option<RecordBatch> {
104 match self {
105 LookupResult::Found(batch) => Some(batch),
106 LookupResult::NotFound => None,
107 }
108 }
109}
110
111#[async_trait]
130pub trait TableLoader: Send + Sync {
131 async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError>;
143
144 async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
158 let mut results = Vec::with_capacity(keys.len());
159 for key in keys {
160 results.push(self.lookup(key).await?);
161 }
162 Ok(results)
163 }
164
165 fn name(&self) -> &str;
167
168 async fn health_check(&self) -> bool {
173 true
174 }
175
176 async fn close(&self) -> Result<(), LookupError> {
181 Ok(())
182 }
183}
184
185#[cfg(any(test, feature = "testing"))]
189#[derive(Debug, Clone, Default)]
190pub struct NoOpTableLoader;
191
192#[cfg(any(test, feature = "testing"))]
193impl NoOpTableLoader {
194 #[must_use]
196 pub fn new() -> Self {
197 Self
198 }
199}
200
201#[cfg(any(test, feature = "testing"))]
202#[async_trait]
203impl TableLoader for NoOpTableLoader {
204 async fn lookup(&self, _key: &[u8]) -> Result<LookupResult, LookupError> {
205 Ok(LookupResult::NotFound)
206 }
207
208 #[allow(clippy::unnecessary_literal_bound)]
209 fn name(&self) -> &str {
210 "no_op"
211 }
212}
213
214#[cfg(test)]
215#[derive(Debug, Clone)]
216pub(crate) struct InMemoryTableLoader {
217 data: std::sync::Arc<parking_lot::RwLock<rustc_hash::FxHashMap<Vec<u8>, RecordBatch>>>,
218 name: String,
219}
220
221#[cfg(test)]
222impl InMemoryTableLoader {
223 pub fn new() -> Self {
224 Self::with_name("in_memory")
225 }
226
227 pub fn with_name(name: impl Into<String>) -> Self {
228 Self {
229 data: std::sync::Arc::new(parking_lot::RwLock::new(rustc_hash::FxHashMap::default())),
230 name: name.into(),
231 }
232 }
233
234 pub fn insert(&self, key: Vec<u8>, value: RecordBatch) {
235 self.data.write().insert(key, value);
236 }
237
238 pub fn remove(&self, key: &[u8]) -> Option<RecordBatch> {
239 self.data.write().remove(key)
240 }
241
242 pub fn len(&self) -> usize {
243 self.data.read().len()
244 }
245
246 pub fn is_empty(&self) -> bool {
247 self.data.read().is_empty()
248 }
249
250 pub fn clear(&self) {
251 self.data.write().clear();
252 }
253}
254
255#[cfg(test)]
256#[async_trait]
257impl TableLoader for InMemoryTableLoader {
258 async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError> {
259 let data = self.data.read();
260 match data.get(key) {
261 Some(batch) => Ok(LookupResult::Found(batch.clone())),
262 None => Ok(LookupResult::NotFound),
263 }
264 }
265
266 async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
267 let data = self.data.read();
268 let results = keys
269 .iter()
270 .map(|key| match data.get(*key) {
271 Some(batch) => LookupResult::Found(batch.clone()),
272 None => LookupResult::NotFound,
273 })
274 .collect();
275 Ok(results)
276 }
277
278 fn name(&self) -> &str {
279 &self.name
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use arrow_array::StringArray;
287 use arrow_schema::{DataType, Field, Schema};
288 use std::sync::Arc;
289
290 fn create_customer_batch(id: &str, name: &str, tier: &str) -> RecordBatch {
291 let schema = Arc::new(Schema::new(vec![
292 Field::new("customer_id", DataType::Utf8, false),
293 Field::new("name", DataType::Utf8, false),
294 Field::new("tier", DataType::Utf8, false),
295 ]));
296 RecordBatch::try_new(
297 schema,
298 vec![
299 Arc::new(StringArray::from(vec![id])),
300 Arc::new(StringArray::from(vec![name])),
301 Arc::new(StringArray::from(vec![tier])),
302 ],
303 )
304 .unwrap()
305 }
306
307 #[tokio::test]
308 async fn test_in_memory_loader_basic() {
309 let loader = InMemoryTableLoader::new();
310
311 loader.insert(
313 b"cust_1".to_vec(),
314 create_customer_batch("cust_1", "Alice", "gold"),
315 );
316 loader.insert(
317 b"cust_2".to_vec(),
318 create_customer_batch("cust_2", "Bob", "silver"),
319 );
320
321 assert_eq!(loader.len(), 2);
322
323 let result = loader.lookup(b"cust_1").await.unwrap();
325 assert!(result.is_found());
326 let batch = result.into_batch().unwrap();
327 assert_eq!(batch.num_rows(), 1);
328
329 let result = loader.lookup(b"cust_999").await.unwrap();
331 assert!(!result.is_found());
332 }
333
334 #[tokio::test]
335 async fn test_in_memory_loader_batch_lookup() {
336 let loader = InMemoryTableLoader::new();
337 loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
338 loader.insert(b"k3".to_vec(), create_customer_batch("k3", "C", "bronze"));
339
340 let keys: Vec<&[u8]> = vec![b"k1", b"k2", b"k3"];
341 let results = loader.lookup_batch(&keys).await.unwrap();
342
343 assert_eq!(results.len(), 3);
344 assert!(results[0].is_found()); assert!(!results[1].is_found()); assert!(results[2].is_found()); }
348
349 #[tokio::test]
350 async fn test_in_memory_loader_remove() {
351 let loader = InMemoryTableLoader::new();
352 loader.insert(
353 b"key".to_vec(),
354 create_customer_batch("key", "Test", "gold"),
355 );
356
357 assert_eq!(loader.len(), 1);
358
359 let removed = loader.remove(b"key");
360 assert!(removed.is_some());
361 assert_eq!(loader.len(), 0);
362
363 let result = loader.lookup(b"key").await.unwrap();
364 assert!(!result.is_found());
365 }
366
367 #[tokio::test]
368 async fn test_no_op_loader() {
369 let loader = NoOpTableLoader::new();
370
371 let result = loader.lookup(b"any_key").await.unwrap();
372 assert!(!result.is_found());
373 assert_eq!(loader.name(), "no_op");
374 }
375
376 #[tokio::test]
377 async fn test_in_memory_loader_clear() {
378 let loader = InMemoryTableLoader::new();
379 loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
380 loader.insert(b"k2".to_vec(), create_customer_batch("k2", "B", "silver"));
381
382 assert!(!loader.is_empty());
383 loader.clear();
384 assert!(loader.is_empty());
385 assert_eq!(loader.len(), 0);
386 }
387
388 #[tokio::test]
389 async fn test_table_loader_health_check() {
390 let loader = InMemoryTableLoader::new();
391 assert!(loader.health_check().await);
392 }
393}