laminar_connectors/lookup/
mod.rs1pub mod cdc_adapter;
5
6pub mod delta_reference;
8
9#[cfg(feature = "delta-lake")]
11pub mod delta_lookup;
12
13#[cfg(feature = "postgres-cdc")]
15pub mod postgres_source;
16
17#[cfg(feature = "postgres-cdc")]
19pub mod postgres_reference;
20
21#[cfg(feature = "postgres-cdc")]
22pub use postgres_source::{PostgresLookupSource, PostgresLookupSourceConfig};
23
24#[cfg(feature = "parquet-lookup")]
26pub mod parquet_source;
27
28#[cfg(feature = "parquet-lookup")]
29pub use parquet_source::{ParquetLookupSource, ParquetLookupSourceConfig};
30
31use async_trait::async_trait;
32
33pub use laminar_core::lookup::{LookupError, LookupResult};
35
36#[async_trait]
55pub trait TableLoader: Send + Sync {
56 async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError>;
64
65 async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
71 let mut results = Vec::with_capacity(keys.len());
72 for key in keys {
73 results.push(self.lookup(key).await?);
74 }
75 Ok(results)
76 }
77
78 fn name(&self) -> &str;
80
81 async fn health_check(&self) -> bool {
86 true
87 }
88
89 async fn close(&self) -> Result<(), LookupError> {
94 Ok(())
95 }
96}
97
98#[cfg(any(test, feature = "testing"))]
102#[derive(Debug, Clone, Default)]
103pub struct NoOpTableLoader;
104
105#[cfg(any(test, feature = "testing"))]
106impl NoOpTableLoader {
107 #[must_use]
109 pub fn new() -> Self {
110 Self
111 }
112}
113
114#[cfg(any(test, feature = "testing"))]
115#[async_trait]
116impl TableLoader for NoOpTableLoader {
117 async fn lookup(&self, _key: &[u8]) -> Result<LookupResult, LookupError> {
118 Ok(LookupResult::NotFound)
119 }
120
121 #[allow(clippy::unnecessary_literal_bound)]
122 fn name(&self) -> &str {
123 "no_op"
124 }
125}
126
127#[cfg(test)]
128use arrow_array::RecordBatch;
129
130#[cfg(test)]
131#[derive(Debug, Clone)]
132pub(crate) struct InMemoryTableLoader {
133 data: std::sync::Arc<parking_lot::RwLock<rustc_hash::FxHashMap<Vec<u8>, RecordBatch>>>,
134 name: String,
135}
136
137#[cfg(test)]
138impl InMemoryTableLoader {
139 pub fn new() -> Self {
140 Self::with_name("in_memory")
141 }
142
143 pub fn with_name(name: impl Into<String>) -> Self {
144 Self {
145 data: std::sync::Arc::new(parking_lot::RwLock::new(rustc_hash::FxHashMap::default())),
146 name: name.into(),
147 }
148 }
149
150 pub fn insert(&self, key: Vec<u8>, value: RecordBatch) {
151 self.data.write().insert(key, value);
152 }
153
154 pub fn remove(&self, key: &[u8]) -> Option<RecordBatch> {
155 self.data.write().remove(key)
156 }
157
158 pub fn len(&self) -> usize {
159 self.data.read().len()
160 }
161
162 pub fn is_empty(&self) -> bool {
163 self.data.read().is_empty()
164 }
165
166 pub fn clear(&self) {
167 self.data.write().clear();
168 }
169}
170
171#[cfg(test)]
172#[async_trait]
173impl TableLoader for InMemoryTableLoader {
174 async fn lookup(&self, key: &[u8]) -> Result<LookupResult, LookupError> {
175 let data = self.data.read();
176 match data.get(key) {
177 Some(batch) => Ok(LookupResult::Hit(batch.clone())),
178 None => Ok(LookupResult::NotFound),
179 }
180 }
181
182 async fn lookup_batch(&self, keys: &[&[u8]]) -> Result<Vec<LookupResult>, LookupError> {
183 let data = self.data.read();
184 let results = keys
185 .iter()
186 .map(|key| match data.get(*key) {
187 Some(batch) => LookupResult::Hit(batch.clone()),
188 None => LookupResult::NotFound,
189 })
190 .collect();
191 Ok(results)
192 }
193
194 fn name(&self) -> &str {
195 &self.name
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use arrow_array::StringArray;
203 use arrow_schema::{DataType, Field, Schema};
204 use std::sync::Arc;
205
206 fn create_customer_batch(id: &str, name: &str, tier: &str) -> RecordBatch {
207 let schema = Arc::new(Schema::new(vec![
208 Field::new("customer_id", DataType::Utf8, false),
209 Field::new("name", DataType::Utf8, false),
210 Field::new("tier", DataType::Utf8, false),
211 ]));
212 RecordBatch::try_new(
213 schema,
214 vec![
215 Arc::new(StringArray::from(vec![id])),
216 Arc::new(StringArray::from(vec![name])),
217 Arc::new(StringArray::from(vec![tier])),
218 ],
219 )
220 .unwrap()
221 }
222
223 #[tokio::test]
224 async fn test_in_memory_loader_basic() {
225 let loader = InMemoryTableLoader::new();
226
227 loader.insert(
229 b"cust_1".to_vec(),
230 create_customer_batch("cust_1", "Alice", "gold"),
231 );
232 loader.insert(
233 b"cust_2".to_vec(),
234 create_customer_batch("cust_2", "Bob", "silver"),
235 );
236
237 assert_eq!(loader.len(), 2);
238
239 let result = loader.lookup(b"cust_1").await.unwrap();
241 assert!(result.is_hit());
242 let batch = result.into_batch().unwrap();
243 assert_eq!(batch.num_rows(), 1);
244
245 let result = loader.lookup(b"cust_999").await.unwrap();
247 assert!(!result.is_hit());
248 }
249
250 #[tokio::test]
251 async fn test_in_memory_loader_batch_lookup() {
252 let loader = InMemoryTableLoader::new();
253 loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
254 loader.insert(b"k3".to_vec(), create_customer_batch("k3", "C", "bronze"));
255
256 let keys: Vec<&[u8]> = vec![b"k1", b"k2", b"k3"];
257 let results = loader.lookup_batch(&keys).await.unwrap();
258
259 assert_eq!(results.len(), 3);
260 assert!(results[0].is_hit()); assert!(!results[1].is_hit()); assert!(results[2].is_hit()); }
264
265 #[tokio::test]
266 async fn test_in_memory_loader_remove() {
267 let loader = InMemoryTableLoader::new();
268 loader.insert(
269 b"key".to_vec(),
270 create_customer_batch("key", "Test", "gold"),
271 );
272
273 assert_eq!(loader.len(), 1);
274
275 let removed = loader.remove(b"key");
276 assert!(removed.is_some());
277 assert_eq!(loader.len(), 0);
278
279 let result = loader.lookup(b"key").await.unwrap();
280 assert!(!result.is_hit());
281 }
282
283 #[tokio::test]
284 async fn test_no_op_loader() {
285 let loader = NoOpTableLoader::new();
286
287 let result = loader.lookup(b"any_key").await.unwrap();
288 assert!(!result.is_hit());
289 assert_eq!(loader.name(), "no_op");
290 }
291
292 #[tokio::test]
293 async fn test_in_memory_loader_clear() {
294 let loader = InMemoryTableLoader::new();
295 loader.insert(b"k1".to_vec(), create_customer_batch("k1", "A", "gold"));
296 loader.insert(b"k2".to_vec(), create_customer_batch("k2", "B", "silver"));
297
298 assert!(!loader.is_empty());
299 loader.clear();
300 assert!(loader.is_empty());
301 assert_eq!(loader.len(), 0);
302 }
303
304 #[tokio::test]
305 async fn test_table_loader_health_check() {
306 let loader = InMemoryTableLoader::new();
307 assert!(loader.health_check().await);
308 }
309}