Skip to main content

laminar_connectors/mongodb/
mod.rs

1//! `MongoDB` CDC source and sink connectors.
2
3pub mod change_event;
4pub mod config;
5pub mod large_event;
6pub mod lookup;
7pub mod metrics;
8pub mod resume_token;
9pub mod sink;
10pub mod source;
11pub mod timeseries;
12pub mod write_model;
13
14// Re-export primary types at module level.
15pub use config::{
16    FullDocumentMode, MongoDbSinkConfig, MongoDbSourceConfig, WriteConcernConfig, WriteConcernLevel,
17};
18pub use resume_token::{
19    FileResumeTokenStore, InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore,
20    ResumeTokenStoreConfig,
21};
22pub use sink::MongoDbSink;
23pub use source::{mongodb_cdc_envelope_schema, MongoDbCdcSource};
24pub use timeseries::{CollectionKind, TimeSeriesConfig, TimeSeriesGranularity};
25pub use write_model::WriteMode;
26
27#[cfg(feature = "mongodb-cdc")]
28pub use resume_token::MongoResumeTokenStore;
29
30use std::sync::Arc;
31
32use crate::config::{ConfigKeySpec, ConnectorInfo};
33use crate::registry::ConnectorRegistry;
34
35/// Registers the `MongoDB` CDC source connector with the given registry.
36pub fn register_mongodb_cdc_source(registry: &ConnectorRegistry) {
37    let info = ConnectorInfo {
38        name: "mongodb-cdc".to_string(),
39        display_name: "MongoDB CDC Source".to_string(),
40        version: env!("CARGO_PKG_VERSION").to_string(),
41        is_source: true,
42        is_sink: false,
43        config_keys: mongodb_cdc_config_keys(),
44    };
45
46    registry.register_source(
47        "mongodb-cdc",
48        info,
49        Arc::new(|registry: Option<&prometheus::Registry>| {
50            Box::new(MongoDbCdcSource::new(
51                MongoDbSourceConfig::default(),
52                registry,
53            ))
54        }),
55    );
56
57    // On-demand (partial cache mode) lookup source: find({ pk: { $in: [...] } }).
58    registry.register_lookup_source("mongodb", Arc::new(MongoLookupFactory));
59}
60
61struct MongoLookupFactory;
62
63#[async_trait::async_trait]
64impl crate::registry::LookupSourceFactory for MongoLookupFactory {
65    async fn build(
66        &self,
67        config: crate::config::ConnectorConfig,
68        declared_schema: Option<arrow_schema::SchemaRef>,
69    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
70    {
71        use crate::mongodb::lookup::{MongoLookupSource, MongoLookupSourceConfig};
72
73        let schema = declared_schema.ok_or_else(|| {
74            crate::error::ConnectorError::ConfigurationError(
75                "mongodb lookup source requires a declared table schema".into(),
76            )
77        })?;
78
79        let pk_columns: Vec<String> = config
80            .get("_primary_key_columns")
81            .unwrap_or("")
82            .split(',')
83            .map(|s| s.trim().to_string())
84            .filter(|s| !s.is_empty())
85            .collect();
86        if pk_columns.is_empty() {
87            return Err(crate::error::ConnectorError::ConfigurationError(
88                "mongodb lookup source requires primary key columns".into(),
89            ));
90        }
91
92        let src = MongoDbSourceConfig::from_config(&config)?;
93        let lookup_config = MongoLookupSourceConfig {
94            connection_uri: src.connection_uri,
95            database: src.database,
96            collection: src.collection,
97            primary_key_columns: pk_columns,
98            schema,
99        };
100
101        let source = MongoLookupSource::open(lookup_config).await?;
102        Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
103    }
104}
105
106/// Registers the `MongoDB` sink connector with the given registry.
107pub fn register_mongodb_sink(registry: &ConnectorRegistry) {
108    use arrow_schema::{DataType, Field, Schema};
109
110    let info = ConnectorInfo {
111        name: "mongodb-sink".to_string(),
112        display_name: "MongoDB Sink".to_string(),
113        version: env!("CARGO_PKG_VERSION").to_string(),
114        is_source: false,
115        is_sink: true,
116        config_keys: mongodb_sink_config_keys(),
117    };
118
119    registry.register_sink(
120        "mongodb-sink",
121        info,
122        Arc::new(|registry: Option<&prometheus::Registry>| {
123            let schema = Arc::new(Schema::new(vec![
124                Field::new("key", DataType::Utf8, true),
125                Field::new("value", DataType::Utf8, false),
126            ]));
127            Box::new(MongoDbSink::new(
128                schema,
129                MongoDbSinkConfig::default(),
130                registry,
131            ))
132        }),
133    );
134}
135
136fn mongodb_cdc_config_keys() -> Vec<ConfigKeySpec> {
137    vec![
138        ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
139        ConfigKeySpec::required("database", "Database name"),
140        ConfigKeySpec::required("collection", "Collection name (* for all)"),
141        ConfigKeySpec::optional(
142            "full.document.mode",
143            "Full document mode (delta/update_lookup/required/when_available)",
144            "delta",
145        ),
146        ConfigKeySpec::optional(
147            "split.large.events",
148            "Enable $changeStreamSplitLargeEvent (requires MongoDB >= 6.0.9)",
149            "false",
150        ),
151        ConfigKeySpec::optional("max.await.time.ms", "getMore await timeout (ms)", "1000"),
152        ConfigKeySpec::optional("batch.size", "Cursor batch size", "1000"),
153        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
154        ConfigKeySpec::optional(
155            "max.buffered.events",
156            "Max events to buffer before backpressure",
157            "100000",
158        ),
159    ]
160}
161
162fn mongodb_sink_config_keys() -> Vec<ConfigKeySpec> {
163    vec![
164        ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
165        ConfigKeySpec::required("database", "Target database name"),
166        ConfigKeySpec::required("collection", "Target collection name"),
167        ConfigKeySpec::optional("batch.size", "Max documents per flush", "500"),
168        ConfigKeySpec::optional("flush.interval.ms", "Max time between flushes (ms)", "1000"),
169        ConfigKeySpec::optional(
170            "ordered",
171            "Ordered writes (fail-fast) vs unordered (higher throughput)",
172            "true",
173        ),
174        ConfigKeySpec::optional(
175            "write.mode",
176            "Write operation mode (insert, upsert, replace, cdc_replay)",
177            "insert",
178        ),
179        ConfigKeySpec::optional(
180            "write.mode.key_fields",
181            "Comma-separated key fields to match documents in upsert mode",
182            "",
183        ),
184        ConfigKeySpec::optional(
185            "write.mode.upsert_on_missing",
186            "Insert the document if not found in replace mode",
187            "false",
188        ),
189        ConfigKeySpec::optional(
190            "write_concern.journal",
191            "Request acknowledgment after the write has been journaled",
192            "true",
193        ),
194        ConfigKeySpec::optional(
195            "write_concern.timeout_ms",
196            "Timeout for the write concern to be satisfied",
197            "",
198        ),
199        ConfigKeySpec::optional(
200            "timeseries.time_field",
201            "The field in each document containing the date",
202            "",
203        ),
204        ConfigKeySpec::optional(
205            "timeseries.meta_field",
206            "An optional field labeling the data source",
207            "",
208        ),
209        ConfigKeySpec::optional(
210            "timeseries.granularity",
211            "Bucketing granularity (seconds, minutes, hours, custom)",
212            "seconds",
213        ),
214        ConfigKeySpec::optional(
215            "timeseries.bucket_max_span_seconds",
216            "Max span of a single bucket in seconds (custom granularity)",
217            "",
218        ),
219        ConfigKeySpec::optional(
220            "timeseries.bucket_rounding_seconds",
221            "Rounding boundary in seconds (custom granularity)",
222            "",
223        ),
224        ConfigKeySpec::optional(
225            "timeseries.expire_after_seconds",
226            "TTL in seconds (automatically delete documents after this span)",
227            "",
228        ),
229    ]
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_register_mongodb_cdc_source() {
238        let registry = ConnectorRegistry::new();
239        register_mongodb_cdc_source(&registry);
240
241        let info = registry.source_info("mongodb-cdc");
242        assert!(info.is_some());
243        let info = info.unwrap();
244        assert_eq!(info.name, "mongodb-cdc");
245        assert!(info.is_source);
246        assert!(!info.is_sink);
247        assert!(!info.config_keys.is_empty());
248    }
249
250    #[test]
251    fn test_register_mongodb_sink() {
252        let registry = ConnectorRegistry::new();
253        register_mongodb_sink(&registry);
254
255        let info = registry.sink_info("mongodb-sink");
256        assert!(info.is_some());
257        let info = info.unwrap();
258        assert_eq!(info.name, "mongodb-sink");
259        assert!(info.is_sink);
260        assert!(!info.is_source);
261        assert!(!info.config_keys.is_empty());
262    }
263
264    #[test]
265    fn test_cdc_config_keys() {
266        let keys = mongodb_cdc_config_keys();
267        let required: Vec<&str> = keys
268            .iter()
269            .filter(|k| k.required)
270            .map(|k| k.key.as_str())
271            .collect();
272        assert!(required.contains(&"connection.uri"));
273        assert!(required.contains(&"database"));
274        assert!(required.contains(&"collection"));
275    }
276
277    #[test]
278    fn test_sink_config_keys() {
279        let keys = mongodb_sink_config_keys();
280        let required: Vec<&str> = keys
281            .iter()
282            .filter(|k| k.required)
283            .map(|k| k.key.as_str())
284            .collect();
285        assert!(required.contains(&"connection.uri"));
286        assert!(required.contains(&"database"));
287        assert!(required.contains(&"collection"));
288    }
289
290    #[test]
291    fn test_factory_creates_source() {
292        let registry = ConnectorRegistry::new();
293        register_mongodb_cdc_source(&registry);
294
295        let config = crate::config::ConnectorConfig::new("mongodb-cdc");
296        let source = registry.create_source(&config, None);
297        assert!(source.is_ok());
298    }
299
300    #[test]
301    fn test_factory_creates_sink() {
302        let registry = ConnectorRegistry::new();
303        register_mongodb_sink(&registry);
304
305        let config = crate::config::ConnectorConfig::new("mongodb-sink");
306        let sink = registry.create_sink(&config, None);
307        assert!(sink.is_ok());
308    }
309}