Skip to main content

laminar_connectors/mongodb/
lookup.rs

1//! `MongoDB` on-demand lookup source for cache-miss fallback.
2//!
3//! Implements `LookupSource` via a multi-get on the indexed key
4//! (`find({ pk: { $in: [keys] } })`), so all missed keys of a probe fold into
5//! one round trip. `MongoDB` is schemaless, so the source projects each
6//! returned document into the table's **declared** Arrow schema (from
7//! `CREATE LOOKUP TABLE`); [`KeyAligner`] handles key decode and realignment.
8//!
9//! v1 limits: single-column key; declared column types Int32/Int64/Float64/
10//! Boolean/Utf8 (others render as a string/JSON fallback or NULL).
11
12#[cfg(feature = "mongodb-cdc")]
13use std::sync::Arc;
14
15#[cfg(feature = "mongodb-cdc")]
16use arrow_array::{Array, RecordBatch};
17#[cfg(feature = "mongodb-cdc")]
18use arrow_row::SortField;
19#[cfg(feature = "mongodb-cdc")]
20use arrow_schema::{DataType, SchemaRef};
21#[cfg(feature = "mongodb-cdc")]
22use mongodb::bson::{doc, Bson, Document};
23#[cfg(feature = "mongodb-cdc")]
24use mongodb::Client;
25
26#[cfg(feature = "mongodb-cdc")]
27use laminar_core::lookup::predicate::Predicate;
28#[cfg(feature = "mongodb-cdc")]
29use laminar_core::lookup::source::{
30    projection_names, ColumnId, LookupError, LookupSource, LookupSourceCapabilities,
31};
32#[cfg(feature = "mongodb-cdc")]
33use laminar_core::lookup::KeyAligner;
34
35/// Configuration for [`MongoLookupSource`].
36#[cfg(feature = "mongodb-cdc")]
37#[derive(Debug, Clone)]
38pub struct MongoLookupSourceConfig {
39    /// `MongoDB` connection URI.
40    pub connection_uri: String,
41    /// Database name.
42    pub database: String,
43    /// Collection name.
44    pub collection: String,
45    /// Primary key field (v1: exactly one).
46    pub primary_key_columns: Vec<String>,
47    /// Declared Arrow schema (the projection target).
48    pub schema: SchemaRef,
49}
50
51/// `MongoDB` lookup source for on-demand/partial cache mode.
52#[cfg(feature = "mongodb-cdc")]
53pub struct MongoLookupSource {
54    client: Client,
55    database: String,
56    collection: String,
57    pk_field: String,
58    schema: SchemaRef,
59    aligner: KeyAligner,
60}
61
62#[cfg(feature = "mongodb-cdc")]
63impl MongoLookupSource {
64    /// Connects to `MongoDB` and validates the declared key column.
65    ///
66    /// # Errors
67    ///
68    /// Returns `LookupError` if the client cannot be built, the key is not a
69    /// single column, or the key column is missing from the declared schema.
70    pub async fn open(config: MongoLookupSourceConfig) -> Result<Self, LookupError> {
71        if config.primary_key_columns.len() != 1 {
72            return Err(LookupError::Internal(format!(
73                "mongodb lookup requires exactly one primary key column, got {}",
74                config.primary_key_columns.len()
75            )));
76        }
77        let pk_field = config.primary_key_columns[0].clone();
78
79        let pk_idx = config.schema.index_of(&pk_field).map_err(|_| {
80            LookupError::Internal(format!("pk column not in declared schema: {pk_field}"))
81        })?;
82
83        for field in config.schema.fields() {
84            match field.data_type() {
85                DataType::Int32
86                | DataType::Int64
87                | DataType::Float64
88                | DataType::Boolean
89                | DataType::Utf8
90                | DataType::LargeUtf8 => {}
91                dt => {
92                    return Err(LookupError::Internal(format!(
93                        "unsupported field data type in schema for mongodb lookup: {dt}"
94                    )));
95                }
96            }
97        }
98
99        let pk_sort_fields = vec![SortField::new(
100            config.schema.field(pk_idx).data_type().clone(),
101        )];
102        let aligner = KeyAligner::new(pk_sort_fields, config.primary_key_columns)?;
103
104        let client_options = mongodb::options::ClientOptions::parse(&config.connection_uri)
105            .await
106            .map_err(|e| LookupError::Connection(format!("mongodb client options: {e}")))?;
107        let client = Client::with_options(client_options)
108            .map_err(|e| LookupError::Connection(format!("mongodb client: {e}")))?;
109
110        Ok(Self {
111            client,
112            database: config.database,
113            collection: config.collection,
114            pk_field,
115            schema: config.schema,
116            aligner,
117        })
118    }
119
120    /// Convert one decoded PK cell into a BSON value for the `$in` array, or
121    /// `None` when NULL (a NULL key is dropped — it can never match).
122    fn cell_to_bson(array: &dyn Array, row: usize) -> Result<Option<Bson>, LookupError> {
123        use arrow_array::{
124            BooleanArray, Float64Array, Int32Array, Int64Array, LargeStringArray, StringArray,
125            StringViewArray,
126        };
127
128        fn downcast<T: 'static>(array: &dyn Array) -> Result<&T, LookupError> {
129            array
130                .as_any()
131                .downcast_ref::<T>()
132                .ok_or_else(|| LookupError::Internal("pk column downcast failed".into()))
133        }
134
135        if array.is_null(row) {
136            return Ok(None);
137        }
138
139        let bson = match array.data_type() {
140            DataType::Int32 => Bson::Int32(downcast::<Int32Array>(array)?.value(row)),
141            DataType::Int64 => Bson::Int64(downcast::<Int64Array>(array)?.value(row)),
142            DataType::Float64 => Bson::Double(downcast::<Float64Array>(array)?.value(row)),
143            DataType::Boolean => Bson::Boolean(downcast::<BooleanArray>(array)?.value(row)),
144            DataType::Utf8 => Bson::String(downcast::<StringArray>(array)?.value(row).to_string()),
145            DataType::LargeUtf8 => {
146                Bson::String(downcast::<LargeStringArray>(array)?.value(row).to_string())
147            }
148            DataType::Utf8View => {
149                Bson::String(downcast::<StringViewArray>(array)?.value(row).to_string())
150            }
151            dt => {
152                return Err(LookupError::Internal(format!(
153                    "unsupported PK data type for mongodb lookup: {dt}"
154                )));
155            }
156        };
157        Ok(Some(bson))
158    }
159
160    /// Project fetched documents into one Arrow `RecordBatch` matching
161    /// `schema` (the full declared schema, or its projection). Missing or
162    /// incompatible fields become NULL.
163    fn docs_to_batch(schema: &SchemaRef, docs: &[Document]) -> Result<RecordBatch, LookupError> {
164        use arrow_array::builder::{
165            BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, LargeStringBuilder,
166            StringBuilder,
167        };
168
169        let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
170        for field in schema.fields() {
171            let name = field.name().as_str();
172            let array: Arc<dyn Array> = match field.data_type() {
173                DataType::Int32 => {
174                    let mut b = Int32Builder::with_capacity(docs.len());
175                    for d in docs {
176                        b.append_option(
177                            bson_as_i64(d.get(name)).and_then(|v| i32::try_from(v).ok()),
178                        );
179                    }
180                    Arc::new(b.finish())
181                }
182                DataType::Int64 => {
183                    let mut b = Int64Builder::with_capacity(docs.len());
184                    for d in docs {
185                        b.append_option(bson_as_i64(d.get(name)));
186                    }
187                    Arc::new(b.finish())
188                }
189                DataType::Float64 => {
190                    let mut b = Float64Builder::with_capacity(docs.len());
191                    for d in docs {
192                        b.append_option(bson_as_f64(d.get(name)));
193                    }
194                    Arc::new(b.finish())
195                }
196                DataType::Boolean => {
197                    let mut b = BooleanBuilder::with_capacity(docs.len());
198                    for d in docs {
199                        b.append_option(d.get(name).and_then(Bson::as_bool));
200                    }
201                    Arc::new(b.finish())
202                }
203                DataType::LargeUtf8 => {
204                    let mut b = LargeStringBuilder::with_capacity(docs.len(), docs.len() * 16);
205                    for d in docs {
206                        match d.get(name) {
207                            None | Some(Bson::Null) => b.append_null(),
208                            Some(v) => b.append_value(bson_to_string(v)),
209                        }
210                    }
211                    Arc::new(b.finish())
212                }
213                DataType::Utf8 => {
214                    let mut b = StringBuilder::with_capacity(docs.len(), docs.len() * 16);
215                    for d in docs {
216                        match d.get(name) {
217                            None | Some(Bson::Null) => b.append_null(),
218                            Some(v) => b.append_value(bson_to_string(v)),
219                        }
220                    }
221                    Arc::new(b.finish())
222                }
223                _ => {
224                    return Err(LookupError::Internal(format!(
225                        "unsupported field data type: {:?}",
226                        field.data_type()
227                    )));
228                }
229            };
230            columns.push(array);
231        }
232        RecordBatch::try_new(Arc::clone(schema), columns)
233            .map_err(|e| LookupError::Internal(format!("arrow batch construction: {e}")))
234    }
235}
236
237#[cfg(feature = "mongodb-cdc")]
238impl LookupSource for MongoLookupSource {
239    async fn query(
240        &self,
241        keys: &[&[u8]],
242        _predicates: &[Predicate],
243        projection: &[ColumnId],
244    ) -> Result<Vec<Option<RecordBatch>>, LookupError> {
245        use tokio_stream::StreamExt;
246
247        if keys.is_empty() {
248            return Ok(Vec::new());
249        }
250
251        let pk_arrays = self.aligner.decode_keys(keys)?;
252        let pk_array = pk_arrays[0].as_ref();
253        let mut in_values: Vec<Bson> = Vec::with_capacity(keys.len());
254        for row in 0..pk_array.len() {
255            if let Some(b) = Self::cell_to_bson(pk_array, row)? {
256                in_values.push(b);
257            }
258        }
259
260        let filter = doc! { &self.pk_field: doc! { "$in": in_values } };
261        let collection = self
262            .client
263            .database(&self.database)
264            .collection::<Document>(&self.collection);
265
266        // Projection pushdown: ask Mongo for only the requested fields (always
267        // incl. the key), and build the batch in the matching projected schema.
268        let mut find = collection.find(filter);
269        let mut project_needed = false;
270        let out_schema = if projection.is_empty() {
271            Arc::clone(&self.schema)
272        } else {
273            let mut names = projection_names(&self.schema, projection)?;
274            let mut idx: Vec<usize> = projection.iter().map(|&c| c as usize).collect();
275            if !names.contains(&self.pk_field) {
276                names.push(self.pk_field.clone());
277                let pk_idx = self
278                    .schema
279                    .index_of(&self.pk_field)
280                    .map_err(|e| LookupError::Internal(format!("pk column index: {e}")))?;
281                idx.push(pk_idx);
282                project_needed = true;
283            }
284
285            let mut proj_doc = Document::new();
286            for name in &names {
287                proj_doc.insert(name.clone(), 1);
288            }
289            find = find.projection(proj_doc);
290            Arc::new(
291                self.schema
292                    .project(&idx)
293                    .map_err(|e| LookupError::Internal(format!("project mongodb schema: {e}")))?,
294            )
295        };
296
297        let mut cursor = find
298            .await
299            .map_err(|e| LookupError::Query(format!("mongodb find: {e}")))?;
300        let mut docs: Vec<Document> = Vec::new();
301        while let Some(next) = cursor.next().await {
302            docs.push(next.map_err(|e| LookupError::Query(format!("mongodb cursor: {e}")))?);
303        }
304
305        let batches = if docs.is_empty() {
306            Vec::new()
307        } else {
308            vec![Self::docs_to_batch(&out_schema, &docs)?]
309        };
310        let aligned = self.aligner.align(keys, &batches)?;
311
312        if project_needed {
313            let orig_names = projection_names(&self.schema, projection)?;
314            let mut projected_aligned = Vec::with_capacity(aligned.len());
315            for maybe_batch in aligned {
316                if let Some(batch) = maybe_batch {
317                    let indices: Vec<usize> = orig_names
318                        .iter()
319                        .map(|name| {
320                            batch.schema().index_of(name).map_err(|e| {
321                                LookupError::Internal(format!(
322                                    "column not found in aligned schema: {e}"
323                                ))
324                            })
325                        })
326                        .collect::<Result<Vec<usize>, LookupError>>()?;
327                    let projected = batch.project(&indices).map_err(|e| {
328                        LookupError::Internal(format!("project aligned batch: {e}"))
329                    })?;
330                    projected_aligned.push(Some(projected));
331                } else {
332                    projected_aligned.push(None);
333                }
334            }
335            Ok(projected_aligned)
336        } else {
337            Ok(aligned)
338        }
339    }
340
341    fn capabilities(&self) -> LookupSourceCapabilities {
342        LookupSourceCapabilities {
343            supports_batch_lookup: true,
344            supports_projection_pushdown: true,
345            ..LookupSourceCapabilities::none()
346        }
347    }
348
349    #[allow(clippy::unnecessary_literal_bound)]
350    fn source_name(&self) -> &str {
351        "mongodb"
352    }
353
354    fn schema(&self) -> SchemaRef {
355        Arc::clone(&self.schema)
356    }
357
358    async fn health_check(&self) -> Result<(), LookupError> {
359        self.client
360            .database(&self.database)
361            .run_command(doc! { "ping": 1 })
362            .await
363            .map(|_| ())
364            .map_err(|e| LookupError::Connection(format!("health check: {e}")))
365    }
366}
367
368/// Extract an integer-valued BSON field as `i64` (Int32/Int64/Double).
369#[cfg(feature = "mongodb-cdc")]
370fn bson_as_i64(b: Option<&Bson>) -> Option<i64> {
371    match b? {
372        Bson::Int32(v) => Some(i64::from(*v)),
373        Bson::Int64(v) => Some(*v),
374        #[allow(clippy::cast_possible_truncation)]
375        Bson::Double(v) => Some(*v as i64),
376        _ => None,
377    }
378}
379
380/// Extract a float-valued BSON field as `f64` (Double/Int32/Int64).
381#[cfg(feature = "mongodb-cdc")]
382fn bson_as_f64(b: Option<&Bson>) -> Option<f64> {
383    match b? {
384        Bson::Double(v) => Some(*v),
385        Bson::Int32(v) => Some(f64::from(*v)),
386        #[allow(clippy::cast_precision_loss)]
387        Bson::Int64(v) => Some(*v as f64),
388        _ => None,
389    }
390}
391
392/// Render a BSON value as a string cell (scalars verbatim, others as JSON).
393#[cfg(feature = "mongodb-cdc")]
394fn bson_to_string(b: &Bson) -> String {
395    match b {
396        Bson::String(s) => s.clone(),
397        Bson::ObjectId(oid) => oid.to_hex(),
398        Bson::Int32(v) => v.to_string(),
399        Bson::Int64(v) => v.to_string(),
400        Bson::Double(v) => v.to_string(),
401        Bson::Boolean(v) => v.to_string(),
402        other => other.to_string(),
403    }
404}
405
406#[cfg(all(test, feature = "mongodb-cdc"))]
407mod tests {
408    use super::*;
409    use arrow_array::{Int64Array, StringArray};
410
411    #[test]
412    fn cell_to_bson_types_and_null() {
413        assert_eq!(
414            MongoLookupSource::cell_to_bson(&Int64Array::from(vec![7i64]), 0).unwrap(),
415            Some(Bson::Int64(7))
416        );
417        assert_eq!(
418            MongoLookupSource::cell_to_bson(&StringArray::from(vec!["k"]), 0).unwrap(),
419            Some(Bson::String("k".into()))
420        );
421        let nullable = Int64Array::from(vec![None, Some(1)]);
422        assert!(MongoLookupSource::cell_to_bson(&nullable, 0)
423            .unwrap()
424            .is_none());
425    }
426
427    #[test]
428    fn cell_to_bson_rejects_unsupported_type() {
429        assert!(
430            MongoLookupSource::cell_to_bson(&arrow_array::Date32Array::from(vec![1]), 0).is_err()
431        );
432    }
433
434    #[test]
435    fn bson_numeric_coercion() {
436        assert_eq!(bson_as_i64(Some(&Bson::Double(3.9))), Some(3));
437        assert_eq!(bson_as_f64(Some(&Bson::Int64(5))), Some(5.0));
438        assert_eq!(bson_as_i64(Some(&Bson::String("x".into()))), None);
439        assert_eq!(bson_as_i64(None), None);
440    }
441}