laminar_connectors/mongodb/
lookup.rs1#[cfg(feature = "mongodb-cdc")]
13use std::sync::Arc;
14
15#[cfg(feature = "mongodb-cdc")]
16use arrow_array::{Array, RecordBatch};
17#[cfg(feature = "mongodb-cdc")]
18use arrow_row::SortField;
19#[cfg(feature = "mongodb-cdc")]
20use arrow_schema::{DataType, SchemaRef};
21#[cfg(feature = "mongodb-cdc")]
22use mongodb::bson::{doc, Bson, Document};
23#[cfg(feature = "mongodb-cdc")]
24use mongodb::Client;
25
26#[cfg(feature = "mongodb-cdc")]
27use laminar_core::lookup::predicate::Predicate;
28#[cfg(feature = "mongodb-cdc")]
29use laminar_core::lookup::source::{
30 projection_names, ColumnId, LookupError, LookupSource, LookupSourceCapabilities,
31};
32#[cfg(feature = "mongodb-cdc")]
33use laminar_core::lookup::KeyAligner;
34
35#[cfg(feature = "mongodb-cdc")]
37#[derive(Debug, Clone)]
38pub struct MongoLookupSourceConfig {
39 pub connection_uri: String,
41 pub database: String,
43 pub collection: String,
45 pub primary_key_columns: Vec<String>,
47 pub schema: SchemaRef,
49}
50
51#[cfg(feature = "mongodb-cdc")]
53pub struct MongoLookupSource {
54 client: Client,
55 database: String,
56 collection: String,
57 pk_field: String,
58 schema: SchemaRef,
59 aligner: KeyAligner,
60}
61
62#[cfg(feature = "mongodb-cdc")]
63impl MongoLookupSource {
64 pub async fn open(config: MongoLookupSourceConfig) -> Result<Self, LookupError> {
71 if config.primary_key_columns.len() != 1 {
72 return Err(LookupError::Internal(format!(
73 "mongodb lookup requires exactly one primary key column, got {}",
74 config.primary_key_columns.len()
75 )));
76 }
77 let pk_field = config.primary_key_columns[0].clone();
78
79 let pk_idx = config.schema.index_of(&pk_field).map_err(|_| {
80 LookupError::Internal(format!("pk column not in declared schema: {pk_field}"))
81 })?;
82
83 for field in config.schema.fields() {
84 match field.data_type() {
85 DataType::Int32
86 | DataType::Int64
87 | DataType::Float64
88 | DataType::Boolean
89 | DataType::Utf8
90 | DataType::LargeUtf8 => {}
91 dt => {
92 return Err(LookupError::Internal(format!(
93 "unsupported field data type in schema for mongodb lookup: {dt}"
94 )));
95 }
96 }
97 }
98
99 let pk_sort_fields = vec![SortField::new(
100 config.schema.field(pk_idx).data_type().clone(),
101 )];
102 let aligner = KeyAligner::new(pk_sort_fields, config.primary_key_columns)?;
103
104 let client_options = mongodb::options::ClientOptions::parse(&config.connection_uri)
105 .await
106 .map_err(|e| LookupError::Connection(format!("mongodb client options: {e}")))?;
107 let client = Client::with_options(client_options)
108 .map_err(|e| LookupError::Connection(format!("mongodb client: {e}")))?;
109
110 Ok(Self {
111 client,
112 database: config.database,
113 collection: config.collection,
114 pk_field,
115 schema: config.schema,
116 aligner,
117 })
118 }
119
120 fn cell_to_bson(array: &dyn Array, row: usize) -> Result<Option<Bson>, LookupError> {
123 use arrow_array::{
124 BooleanArray, Float64Array, Int32Array, Int64Array, LargeStringArray, StringArray,
125 StringViewArray,
126 };
127
128 fn downcast<T: 'static>(array: &dyn Array) -> Result<&T, LookupError> {
129 array
130 .as_any()
131 .downcast_ref::<T>()
132 .ok_or_else(|| LookupError::Internal("pk column downcast failed".into()))
133 }
134
135 if array.is_null(row) {
136 return Ok(None);
137 }
138
139 let bson = match array.data_type() {
140 DataType::Int32 => Bson::Int32(downcast::<Int32Array>(array)?.value(row)),
141 DataType::Int64 => Bson::Int64(downcast::<Int64Array>(array)?.value(row)),
142 DataType::Float64 => Bson::Double(downcast::<Float64Array>(array)?.value(row)),
143 DataType::Boolean => Bson::Boolean(downcast::<BooleanArray>(array)?.value(row)),
144 DataType::Utf8 => Bson::String(downcast::<StringArray>(array)?.value(row).to_string()),
145 DataType::LargeUtf8 => {
146 Bson::String(downcast::<LargeStringArray>(array)?.value(row).to_string())
147 }
148 DataType::Utf8View => {
149 Bson::String(downcast::<StringViewArray>(array)?.value(row).to_string())
150 }
151 dt => {
152 return Err(LookupError::Internal(format!(
153 "unsupported PK data type for mongodb lookup: {dt}"
154 )));
155 }
156 };
157 Ok(Some(bson))
158 }
159
160 fn docs_to_batch(schema: &SchemaRef, docs: &[Document]) -> Result<RecordBatch, LookupError> {
164 use arrow_array::builder::{
165 BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, LargeStringBuilder,
166 StringBuilder,
167 };
168
169 let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
170 for field in schema.fields() {
171 let name = field.name().as_str();
172 let array: Arc<dyn Array> = match field.data_type() {
173 DataType::Int32 => {
174 let mut b = Int32Builder::with_capacity(docs.len());
175 for d in docs {
176 b.append_option(
177 bson_as_i64(d.get(name)).and_then(|v| i32::try_from(v).ok()),
178 );
179 }
180 Arc::new(b.finish())
181 }
182 DataType::Int64 => {
183 let mut b = Int64Builder::with_capacity(docs.len());
184 for d in docs {
185 b.append_option(bson_as_i64(d.get(name)));
186 }
187 Arc::new(b.finish())
188 }
189 DataType::Float64 => {
190 let mut b = Float64Builder::with_capacity(docs.len());
191 for d in docs {
192 b.append_option(bson_as_f64(d.get(name)));
193 }
194 Arc::new(b.finish())
195 }
196 DataType::Boolean => {
197 let mut b = BooleanBuilder::with_capacity(docs.len());
198 for d in docs {
199 b.append_option(d.get(name).and_then(Bson::as_bool));
200 }
201 Arc::new(b.finish())
202 }
203 DataType::LargeUtf8 => {
204 let mut b = LargeStringBuilder::with_capacity(docs.len(), docs.len() * 16);
205 for d in docs {
206 match d.get(name) {
207 None | Some(Bson::Null) => b.append_null(),
208 Some(v) => b.append_value(bson_to_string(v)),
209 }
210 }
211 Arc::new(b.finish())
212 }
213 DataType::Utf8 => {
214 let mut b = StringBuilder::with_capacity(docs.len(), docs.len() * 16);
215 for d in docs {
216 match d.get(name) {
217 None | Some(Bson::Null) => b.append_null(),
218 Some(v) => b.append_value(bson_to_string(v)),
219 }
220 }
221 Arc::new(b.finish())
222 }
223 _ => {
224 return Err(LookupError::Internal(format!(
225 "unsupported field data type: {:?}",
226 field.data_type()
227 )));
228 }
229 };
230 columns.push(array);
231 }
232 RecordBatch::try_new(Arc::clone(schema), columns)
233 .map_err(|e| LookupError::Internal(format!("arrow batch construction: {e}")))
234 }
235}
236
237#[cfg(feature = "mongodb-cdc")]
238impl LookupSource for MongoLookupSource {
239 async fn query(
240 &self,
241 keys: &[&[u8]],
242 _predicates: &[Predicate],
243 projection: &[ColumnId],
244 ) -> Result<Vec<Option<RecordBatch>>, LookupError> {
245 use tokio_stream::StreamExt;
246
247 if keys.is_empty() {
248 return Ok(Vec::new());
249 }
250
251 let pk_arrays = self.aligner.decode_keys(keys)?;
252 let pk_array = pk_arrays[0].as_ref();
253 let mut in_values: Vec<Bson> = Vec::with_capacity(keys.len());
254 for row in 0..pk_array.len() {
255 if let Some(b) = Self::cell_to_bson(pk_array, row)? {
256 in_values.push(b);
257 }
258 }
259
260 let filter = doc! { &self.pk_field: doc! { "$in": in_values } };
261 let collection = self
262 .client
263 .database(&self.database)
264 .collection::<Document>(&self.collection);
265
266 let mut find = collection.find(filter);
269 let mut project_needed = false;
270 let out_schema = if projection.is_empty() {
271 Arc::clone(&self.schema)
272 } else {
273 let mut names = projection_names(&self.schema, projection)?;
274 let mut idx: Vec<usize> = projection.iter().map(|&c| c as usize).collect();
275 if !names.contains(&self.pk_field) {
276 names.push(self.pk_field.clone());
277 let pk_idx = self
278 .schema
279 .index_of(&self.pk_field)
280 .map_err(|e| LookupError::Internal(format!("pk column index: {e}")))?;
281 idx.push(pk_idx);
282 project_needed = true;
283 }
284
285 let mut proj_doc = Document::new();
286 for name in &names {
287 proj_doc.insert(name.clone(), 1);
288 }
289 find = find.projection(proj_doc);
290 Arc::new(
291 self.schema
292 .project(&idx)
293 .map_err(|e| LookupError::Internal(format!("project mongodb schema: {e}")))?,
294 )
295 };
296
297 let mut cursor = find
298 .await
299 .map_err(|e| LookupError::Query(format!("mongodb find: {e}")))?;
300 let mut docs: Vec<Document> = Vec::new();
301 while let Some(next) = cursor.next().await {
302 docs.push(next.map_err(|e| LookupError::Query(format!("mongodb cursor: {e}")))?);
303 }
304
305 let batches = if docs.is_empty() {
306 Vec::new()
307 } else {
308 vec![Self::docs_to_batch(&out_schema, &docs)?]
309 };
310 let aligned = self.aligner.align(keys, &batches)?;
311
312 if project_needed {
313 let orig_names = projection_names(&self.schema, projection)?;
314 let mut projected_aligned = Vec::with_capacity(aligned.len());
315 for maybe_batch in aligned {
316 if let Some(batch) = maybe_batch {
317 let indices: Vec<usize> = orig_names
318 .iter()
319 .map(|name| {
320 batch.schema().index_of(name).map_err(|e| {
321 LookupError::Internal(format!(
322 "column not found in aligned schema: {e}"
323 ))
324 })
325 })
326 .collect::<Result<Vec<usize>, LookupError>>()?;
327 let projected = batch.project(&indices).map_err(|e| {
328 LookupError::Internal(format!("project aligned batch: {e}"))
329 })?;
330 projected_aligned.push(Some(projected));
331 } else {
332 projected_aligned.push(None);
333 }
334 }
335 Ok(projected_aligned)
336 } else {
337 Ok(aligned)
338 }
339 }
340
341 fn capabilities(&self) -> LookupSourceCapabilities {
342 LookupSourceCapabilities {
343 supports_batch_lookup: true,
344 supports_projection_pushdown: true,
345 ..LookupSourceCapabilities::none()
346 }
347 }
348
349 #[allow(clippy::unnecessary_literal_bound)]
350 fn source_name(&self) -> &str {
351 "mongodb"
352 }
353
354 fn schema(&self) -> SchemaRef {
355 Arc::clone(&self.schema)
356 }
357
358 async fn health_check(&self) -> Result<(), LookupError> {
359 self.client
360 .database(&self.database)
361 .run_command(doc! { "ping": 1 })
362 .await
363 .map(|_| ())
364 .map_err(|e| LookupError::Connection(format!("health check: {e}")))
365 }
366}
367
368#[cfg(feature = "mongodb-cdc")]
370fn bson_as_i64(b: Option<&Bson>) -> Option<i64> {
371 match b? {
372 Bson::Int32(v) => Some(i64::from(*v)),
373 Bson::Int64(v) => Some(*v),
374 #[allow(clippy::cast_possible_truncation)]
375 Bson::Double(v) => Some(*v as i64),
376 _ => None,
377 }
378}
379
380#[cfg(feature = "mongodb-cdc")]
382fn bson_as_f64(b: Option<&Bson>) -> Option<f64> {
383 match b? {
384 Bson::Double(v) => Some(*v),
385 Bson::Int32(v) => Some(f64::from(*v)),
386 #[allow(clippy::cast_precision_loss)]
387 Bson::Int64(v) => Some(*v as f64),
388 _ => None,
389 }
390}
391
392#[cfg(feature = "mongodb-cdc")]
394fn bson_to_string(b: &Bson) -> String {
395 match b {
396 Bson::String(s) => s.clone(),
397 Bson::ObjectId(oid) => oid.to_hex(),
398 Bson::Int32(v) => v.to_string(),
399 Bson::Int64(v) => v.to_string(),
400 Bson::Double(v) => v.to_string(),
401 Bson::Boolean(v) => v.to_string(),
402 other => other.to_string(),
403 }
404}
405
406#[cfg(all(test, feature = "mongodb-cdc"))]
407mod tests {
408 use super::*;
409 use arrow_array::{Int64Array, StringArray};
410
411 #[test]
412 fn cell_to_bson_types_and_null() {
413 assert_eq!(
414 MongoLookupSource::cell_to_bson(&Int64Array::from(vec![7i64]), 0).unwrap(),
415 Some(Bson::Int64(7))
416 );
417 assert_eq!(
418 MongoLookupSource::cell_to_bson(&StringArray::from(vec!["k"]), 0).unwrap(),
419 Some(Bson::String("k".into()))
420 );
421 let nullable = Int64Array::from(vec![None, Some(1)]);
422 assert!(MongoLookupSource::cell_to_bson(&nullable, 0)
423 .unwrap()
424 .is_none());
425 }
426
427 #[test]
428 fn cell_to_bson_rejects_unsupported_type() {
429 assert!(
430 MongoLookupSource::cell_to_bson(&arrow_array::Date32Array::from(vec![1]), 0).is_err()
431 );
432 }
433
434 #[test]
435 fn bson_numeric_coercion() {
436 assert_eq!(bson_as_i64(Some(&Bson::Double(3.9))), Some(3));
437 assert_eq!(bson_as_f64(Some(&Bson::Int64(5))), Some(5.0));
438 assert_eq!(bson_as_i64(Some(&Bson::String("x".into()))), None);
439 assert_eq!(bson_as_i64(None), None);
440 }
441}