laminar_connectors/mongodb/
mod.rs1pub mod change_event;
4pub mod config;
5pub mod large_event;
6pub mod lookup;
7pub mod metrics;
8pub mod resume_token;
9pub mod sink;
10pub mod source;
11pub mod timeseries;
12pub mod write_model;
13
14pub use config::{
16 FullDocumentMode, MongoDbSinkConfig, MongoDbSourceConfig, WriteConcernConfig, WriteConcernLevel,
17};
18pub use resume_token::{
19 FileResumeTokenStore, InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore,
20 ResumeTokenStoreConfig,
21};
22pub use sink::MongoDbSink;
23pub use source::{mongodb_cdc_envelope_schema, MongoDbCdcSource};
24pub use timeseries::{CollectionKind, TimeSeriesConfig, TimeSeriesGranularity};
25pub use write_model::WriteMode;
26
27#[cfg(feature = "mongodb-cdc")]
28pub use resume_token::MongoResumeTokenStore;
29
30use std::sync::Arc;
31
32use crate::config::{ConfigKeySpec, ConnectorInfo};
33use crate::registry::ConnectorRegistry;
34
35pub fn register_mongodb_cdc_source(registry: &ConnectorRegistry) {
37 let info = ConnectorInfo {
38 name: "mongodb-cdc".to_string(),
39 display_name: "MongoDB CDC Source".to_string(),
40 version: env!("CARGO_PKG_VERSION").to_string(),
41 is_source: true,
42 is_sink: false,
43 config_keys: mongodb_cdc_config_keys(),
44 };
45
46 registry.register_source(
47 "mongodb-cdc",
48 info,
49 Arc::new(|registry: Option<&prometheus::Registry>| {
50 Box::new(MongoDbCdcSource::new(
51 MongoDbSourceConfig::default(),
52 registry,
53 ))
54 }),
55 );
56
57 registry.register_lookup_source("mongodb", Arc::new(MongoLookupFactory));
59}
60
61struct MongoLookupFactory;
62
63#[async_trait::async_trait]
64impl crate::registry::LookupSourceFactory for MongoLookupFactory {
65 async fn build(
66 &self,
67 config: crate::config::ConnectorConfig,
68 declared_schema: Option<arrow_schema::SchemaRef>,
69 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
70 {
71 use crate::mongodb::lookup::{MongoLookupSource, MongoLookupSourceConfig};
72
73 let schema = declared_schema.ok_or_else(|| {
74 crate::error::ConnectorError::ConfigurationError(
75 "mongodb lookup source requires a declared table schema".into(),
76 )
77 })?;
78
79 let pk_columns: Vec<String> = config
80 .get("_primary_key_columns")
81 .unwrap_or("")
82 .split(',')
83 .map(|s| s.trim().to_string())
84 .filter(|s| !s.is_empty())
85 .collect();
86 if pk_columns.is_empty() {
87 return Err(crate::error::ConnectorError::ConfigurationError(
88 "mongodb lookup source requires primary key columns".into(),
89 ));
90 }
91
92 let src = MongoDbSourceConfig::from_config(&config)?;
93 let lookup_config = MongoLookupSourceConfig {
94 connection_uri: src.connection_uri,
95 database: src.database,
96 collection: src.collection,
97 primary_key_columns: pk_columns,
98 schema,
99 };
100
101 let source = MongoLookupSource::open(lookup_config).await?;
102 Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
103 }
104}
105
106pub fn register_mongodb_sink(registry: &ConnectorRegistry) {
108 use arrow_schema::{DataType, Field, Schema};
109
110 let info = ConnectorInfo {
111 name: "mongodb-sink".to_string(),
112 display_name: "MongoDB Sink".to_string(),
113 version: env!("CARGO_PKG_VERSION").to_string(),
114 is_source: false,
115 is_sink: true,
116 config_keys: mongodb_sink_config_keys(),
117 };
118
119 registry.register_sink(
120 "mongodb-sink",
121 info,
122 Arc::new(|registry: Option<&prometheus::Registry>| {
123 let schema = Arc::new(Schema::new(vec![
124 Field::new("key", DataType::Utf8, true),
125 Field::new("value", DataType::Utf8, false),
126 ]));
127 Box::new(MongoDbSink::new(
128 schema,
129 MongoDbSinkConfig::default(),
130 registry,
131 ))
132 }),
133 );
134}
135
136fn mongodb_cdc_config_keys() -> Vec<ConfigKeySpec> {
137 vec![
138 ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
139 ConfigKeySpec::required("database", "Database name"),
140 ConfigKeySpec::required("collection", "Collection name (* for all)"),
141 ConfigKeySpec::optional(
142 "full.document.mode",
143 "Full document mode (delta/update_lookup/required/when_available)",
144 "delta",
145 ),
146 ConfigKeySpec::optional(
147 "split.large.events",
148 "Enable $changeStreamSplitLargeEvent (requires MongoDB >= 6.0.9)",
149 "false",
150 ),
151 ConfigKeySpec::optional("max.await.time.ms", "getMore await timeout (ms)", "1000"),
152 ConfigKeySpec::optional("batch.size", "Cursor batch size", "1000"),
153 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
154 ConfigKeySpec::optional(
155 "max.buffered.events",
156 "Max events to buffer before backpressure",
157 "100000",
158 ),
159 ]
160}
161
162fn mongodb_sink_config_keys() -> Vec<ConfigKeySpec> {
163 vec![
164 ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
165 ConfigKeySpec::required("database", "Target database name"),
166 ConfigKeySpec::required("collection", "Target collection name"),
167 ConfigKeySpec::optional("batch.size", "Max documents per flush", "500"),
168 ConfigKeySpec::optional("flush.interval.ms", "Max time between flushes (ms)", "1000"),
169 ConfigKeySpec::optional(
170 "ordered",
171 "Ordered writes (fail-fast) vs unordered (higher throughput)",
172 "true",
173 ),
174 ConfigKeySpec::optional(
175 "write.mode",
176 "Write operation mode (insert, upsert, replace, cdc_replay)",
177 "insert",
178 ),
179 ConfigKeySpec::optional(
180 "write.mode.key_fields",
181 "Comma-separated key fields to match documents in upsert mode",
182 "",
183 ),
184 ConfigKeySpec::optional(
185 "write.mode.upsert_on_missing",
186 "Insert the document if not found in replace mode",
187 "false",
188 ),
189 ConfigKeySpec::optional(
190 "write_concern.journal",
191 "Request acknowledgment after the write has been journaled",
192 "true",
193 ),
194 ConfigKeySpec::optional(
195 "write_concern.timeout_ms",
196 "Timeout for the write concern to be satisfied",
197 "",
198 ),
199 ConfigKeySpec::optional(
200 "timeseries.time_field",
201 "The field in each document containing the date",
202 "",
203 ),
204 ConfigKeySpec::optional(
205 "timeseries.meta_field",
206 "An optional field labeling the data source",
207 "",
208 ),
209 ConfigKeySpec::optional(
210 "timeseries.granularity",
211 "Bucketing granularity (seconds, minutes, hours, custom)",
212 "seconds",
213 ),
214 ConfigKeySpec::optional(
215 "timeseries.bucket_max_span_seconds",
216 "Max span of a single bucket in seconds (custom granularity)",
217 "",
218 ),
219 ConfigKeySpec::optional(
220 "timeseries.bucket_rounding_seconds",
221 "Rounding boundary in seconds (custom granularity)",
222 "",
223 ),
224 ConfigKeySpec::optional(
225 "timeseries.expire_after_seconds",
226 "TTL in seconds (automatically delete documents after this span)",
227 "",
228 ),
229 ]
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn test_register_mongodb_cdc_source() {
238 let registry = ConnectorRegistry::new();
239 register_mongodb_cdc_source(®istry);
240
241 let info = registry.source_info("mongodb-cdc");
242 assert!(info.is_some());
243 let info = info.unwrap();
244 assert_eq!(info.name, "mongodb-cdc");
245 assert!(info.is_source);
246 assert!(!info.is_sink);
247 assert!(!info.config_keys.is_empty());
248 }
249
250 #[test]
251 fn test_register_mongodb_sink() {
252 let registry = ConnectorRegistry::new();
253 register_mongodb_sink(®istry);
254
255 let info = registry.sink_info("mongodb-sink");
256 assert!(info.is_some());
257 let info = info.unwrap();
258 assert_eq!(info.name, "mongodb-sink");
259 assert!(info.is_sink);
260 assert!(!info.is_source);
261 assert!(!info.config_keys.is_empty());
262 }
263
264 #[test]
265 fn test_cdc_config_keys() {
266 let keys = mongodb_cdc_config_keys();
267 let required: Vec<&str> = keys
268 .iter()
269 .filter(|k| k.required)
270 .map(|k| k.key.as_str())
271 .collect();
272 assert!(required.contains(&"connection.uri"));
273 assert!(required.contains(&"database"));
274 assert!(required.contains(&"collection"));
275 }
276
277 #[test]
278 fn test_sink_config_keys() {
279 let keys = mongodb_sink_config_keys();
280 let required: Vec<&str> = keys
281 .iter()
282 .filter(|k| k.required)
283 .map(|k| k.key.as_str())
284 .collect();
285 assert!(required.contains(&"connection.uri"));
286 assert!(required.contains(&"database"));
287 assert!(required.contains(&"collection"));
288 }
289
290 #[test]
291 fn test_factory_creates_source() {
292 let registry = ConnectorRegistry::new();
293 register_mongodb_cdc_source(®istry);
294
295 let config = crate::config::ConnectorConfig::new("mongodb-cdc");
296 let source = registry.create_source(&config, None);
297 assert!(source.is_ok());
298 }
299
300 #[test]
301 fn test_factory_creates_sink() {
302 let registry = ConnectorRegistry::new();
303 register_mongodb_sink(®istry);
304
305 let config = crate::config::ConnectorConfig::new("mongodb-sink");
306 let sink = registry.create_sink(&config, None);
307 assert!(sink.is_ok());
308 }
309}