Skip to main content

laminar_connectors/mongodb/
mod.rs

1//! `MongoDB` CDC source and sink connectors.
2
3pub mod change_event;
4pub mod config;
5pub mod large_event;
6pub mod metrics;
7pub mod resume_token;
8pub mod sink;
9pub mod source;
10pub mod timeseries;
11pub mod write_model;
12
13// Re-export primary types at module level.
14pub use config::{
15    FullDocumentMode, MongoDbSinkConfig, MongoDbSourceConfig, WriteConcernConfig, WriteConcernLevel,
16};
17pub use resume_token::{
18    FileResumeTokenStore, InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore,
19    ResumeTokenStoreConfig,
20};
21pub use sink::MongoDbSink;
22pub use source::{mongodb_cdc_envelope_schema, MongoDbCdcSource};
23pub use timeseries::{CollectionKind, TimeSeriesConfig, TimeSeriesGranularity};
24pub use write_model::WriteMode;
25
26#[cfg(feature = "mongodb-cdc")]
27pub use resume_token::MongoResumeTokenStore;
28
29use std::sync::Arc;
30
31use crate::config::{ConfigKeySpec, ConnectorInfo};
32use crate::registry::ConnectorRegistry;
33
34/// Registers the `MongoDB` CDC source connector with the given registry.
35pub fn register_mongodb_cdc_source(registry: &ConnectorRegistry) {
36    let info = ConnectorInfo {
37        name: "mongodb-cdc".to_string(),
38        display_name: "MongoDB CDC Source".to_string(),
39        version: env!("CARGO_PKG_VERSION").to_string(),
40        is_source: true,
41        is_sink: false,
42        config_keys: mongodb_cdc_config_keys(),
43    };
44
45    registry.register_source(
46        "mongodb-cdc",
47        info,
48        Arc::new(|registry: Option<&prometheus::Registry>| {
49            Box::new(MongoDbCdcSource::new(
50                MongoDbSourceConfig::default(),
51                registry,
52            ))
53        }),
54    );
55}
56
57/// Registers the `MongoDB` sink connector with the given registry.
58pub fn register_mongodb_sink(registry: &ConnectorRegistry) {
59    use arrow_schema::{DataType, Field, Schema};
60
61    let info = ConnectorInfo {
62        name: "mongodb-sink".to_string(),
63        display_name: "MongoDB Sink".to_string(),
64        version: env!("CARGO_PKG_VERSION").to_string(),
65        is_source: false,
66        is_sink: true,
67        config_keys: mongodb_sink_config_keys(),
68    };
69
70    registry.register_sink(
71        "mongodb-sink",
72        info,
73        Arc::new(|registry: Option<&prometheus::Registry>| {
74            let schema = Arc::new(Schema::new(vec![
75                Field::new("key", DataType::Utf8, true),
76                Field::new("value", DataType::Utf8, false),
77            ]));
78            Box::new(MongoDbSink::new(
79                schema,
80                MongoDbSinkConfig::default(),
81                registry,
82            ))
83        }),
84    );
85}
86
87fn mongodb_cdc_config_keys() -> Vec<ConfigKeySpec> {
88    vec![
89        ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
90        ConfigKeySpec::required("database", "Database name"),
91        ConfigKeySpec::required("collection", "Collection name (* for all)"),
92        ConfigKeySpec::optional(
93            "full.document.mode",
94            "Full document mode (delta/update_lookup/required/when_available)",
95            "delta",
96        ),
97        ConfigKeySpec::optional(
98            "split.large.events",
99            "Enable $changeStreamSplitLargeEvent (requires MongoDB >= 6.0.9)",
100            "false",
101        ),
102        ConfigKeySpec::optional("max.await.time.ms", "getMore await timeout (ms)", "1000"),
103        ConfigKeySpec::optional("batch.size", "Cursor batch size", "1000"),
104        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
105        ConfigKeySpec::optional(
106            "max.buffered.events",
107            "Max events to buffer before backpressure",
108            "100000",
109        ),
110    ]
111}
112
113fn mongodb_sink_config_keys() -> Vec<ConfigKeySpec> {
114    vec![
115        ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
116        ConfigKeySpec::required("database", "Target database name"),
117        ConfigKeySpec::required("collection", "Target collection name"),
118        ConfigKeySpec::optional("batch.size", "Max documents per flush", "500"),
119        ConfigKeySpec::optional("flush.interval.ms", "Max time between flushes (ms)", "1000"),
120        ConfigKeySpec::optional(
121            "ordered",
122            "Ordered writes (fail-fast) vs unordered (higher throughput)",
123            "true",
124        ),
125    ]
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn test_register_mongodb_cdc_source() {
134        let registry = ConnectorRegistry::new();
135        register_mongodb_cdc_source(&registry);
136
137        let info = registry.source_info("mongodb-cdc");
138        assert!(info.is_some());
139        let info = info.unwrap();
140        assert_eq!(info.name, "mongodb-cdc");
141        assert!(info.is_source);
142        assert!(!info.is_sink);
143        assert!(!info.config_keys.is_empty());
144    }
145
146    #[test]
147    fn test_register_mongodb_sink() {
148        let registry = ConnectorRegistry::new();
149        register_mongodb_sink(&registry);
150
151        let info = registry.sink_info("mongodb-sink");
152        assert!(info.is_some());
153        let info = info.unwrap();
154        assert_eq!(info.name, "mongodb-sink");
155        assert!(info.is_sink);
156        assert!(!info.is_source);
157        assert!(!info.config_keys.is_empty());
158    }
159
160    #[test]
161    fn test_cdc_config_keys() {
162        let keys = mongodb_cdc_config_keys();
163        let required: Vec<&str> = keys
164            .iter()
165            .filter(|k| k.required)
166            .map(|k| k.key.as_str())
167            .collect();
168        assert!(required.contains(&"connection.uri"));
169        assert!(required.contains(&"database"));
170        assert!(required.contains(&"collection"));
171    }
172
173    #[test]
174    fn test_sink_config_keys() {
175        let keys = mongodb_sink_config_keys();
176        let required: Vec<&str> = keys
177            .iter()
178            .filter(|k| k.required)
179            .map(|k| k.key.as_str())
180            .collect();
181        assert!(required.contains(&"connection.uri"));
182        assert!(required.contains(&"database"));
183        assert!(required.contains(&"collection"));
184    }
185
186    #[test]
187    fn test_factory_creates_source() {
188        let registry = ConnectorRegistry::new();
189        register_mongodb_cdc_source(&registry);
190
191        let config = crate::config::ConnectorConfig::new("mongodb-cdc");
192        let source = registry.create_source(&config, None);
193        assert!(source.is_ok());
194    }
195
196    #[test]
197    fn test_factory_creates_sink() {
198        let registry = ConnectorRegistry::new();
199        register_mongodb_sink(&registry);
200
201        let config = crate::config::ConnectorConfig::new("mongodb-sink");
202        let sink = registry.create_sink(&config, None);
203        assert!(sink.is_ok());
204    }
205}