laminar_connectors/mongodb/
mod.rs1pub mod change_event;
4pub mod config;
5pub mod large_event;
6pub mod metrics;
7pub mod resume_token;
8pub mod sink;
9pub mod source;
10pub mod timeseries;
11pub mod write_model;
12
13pub use config::{
15 FullDocumentMode, MongoDbSinkConfig, MongoDbSourceConfig, WriteConcernConfig, WriteConcernLevel,
16};
17pub use resume_token::{
18 FileResumeTokenStore, InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore,
19 ResumeTokenStoreConfig,
20};
21pub use sink::MongoDbSink;
22pub use source::{mongodb_cdc_envelope_schema, MongoDbCdcSource};
23pub use timeseries::{CollectionKind, TimeSeriesConfig, TimeSeriesGranularity};
24pub use write_model::WriteMode;
25
26#[cfg(feature = "mongodb-cdc")]
27pub use resume_token::MongoResumeTokenStore;
28
29use std::sync::Arc;
30
31use crate::config::{ConfigKeySpec, ConnectorInfo};
32use crate::registry::ConnectorRegistry;
33
34pub fn register_mongodb_cdc_source(registry: &ConnectorRegistry) {
36 let info = ConnectorInfo {
37 name: "mongodb-cdc".to_string(),
38 display_name: "MongoDB CDC Source".to_string(),
39 version: env!("CARGO_PKG_VERSION").to_string(),
40 is_source: true,
41 is_sink: false,
42 config_keys: mongodb_cdc_config_keys(),
43 };
44
45 registry.register_source(
46 "mongodb-cdc",
47 info,
48 Arc::new(|registry: Option<&prometheus::Registry>| {
49 Box::new(MongoDbCdcSource::new(
50 MongoDbSourceConfig::default(),
51 registry,
52 ))
53 }),
54 );
55}
56
57pub fn register_mongodb_sink(registry: &ConnectorRegistry) {
59 use arrow_schema::{DataType, Field, Schema};
60
61 let info = ConnectorInfo {
62 name: "mongodb-sink".to_string(),
63 display_name: "MongoDB Sink".to_string(),
64 version: env!("CARGO_PKG_VERSION").to_string(),
65 is_source: false,
66 is_sink: true,
67 config_keys: mongodb_sink_config_keys(),
68 };
69
70 registry.register_sink(
71 "mongodb-sink",
72 info,
73 Arc::new(|registry: Option<&prometheus::Registry>| {
74 let schema = Arc::new(Schema::new(vec![
75 Field::new("key", DataType::Utf8, true),
76 Field::new("value", DataType::Utf8, false),
77 ]));
78 Box::new(MongoDbSink::new(
79 schema,
80 MongoDbSinkConfig::default(),
81 registry,
82 ))
83 }),
84 );
85}
86
87fn mongodb_cdc_config_keys() -> Vec<ConfigKeySpec> {
88 vec![
89 ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
90 ConfigKeySpec::required("database", "Database name"),
91 ConfigKeySpec::required("collection", "Collection name (* for all)"),
92 ConfigKeySpec::optional(
93 "full.document.mode",
94 "Full document mode (delta/update_lookup/required/when_available)",
95 "delta",
96 ),
97 ConfigKeySpec::optional(
98 "split.large.events",
99 "Enable $changeStreamSplitLargeEvent (requires MongoDB >= 6.0.9)",
100 "false",
101 ),
102 ConfigKeySpec::optional("max.await.time.ms", "getMore await timeout (ms)", "1000"),
103 ConfigKeySpec::optional("batch.size", "Cursor batch size", "1000"),
104 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
105 ConfigKeySpec::optional(
106 "max.buffered.events",
107 "Max events to buffer before backpressure",
108 "100000",
109 ),
110 ]
111}
112
113fn mongodb_sink_config_keys() -> Vec<ConfigKeySpec> {
114 vec![
115 ConfigKeySpec::required("connection.uri", "MongoDB connection URI"),
116 ConfigKeySpec::required("database", "Target database name"),
117 ConfigKeySpec::required("collection", "Target collection name"),
118 ConfigKeySpec::optional("batch.size", "Max documents per flush", "500"),
119 ConfigKeySpec::optional("flush.interval.ms", "Max time between flushes (ms)", "1000"),
120 ConfigKeySpec::optional(
121 "ordered",
122 "Ordered writes (fail-fast) vs unordered (higher throughput)",
123 "true",
124 ),
125 ]
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn test_register_mongodb_cdc_source() {
134 let registry = ConnectorRegistry::new();
135 register_mongodb_cdc_source(®istry);
136
137 let info = registry.source_info("mongodb-cdc");
138 assert!(info.is_some());
139 let info = info.unwrap();
140 assert_eq!(info.name, "mongodb-cdc");
141 assert!(info.is_source);
142 assert!(!info.is_sink);
143 assert!(!info.config_keys.is_empty());
144 }
145
146 #[test]
147 fn test_register_mongodb_sink() {
148 let registry = ConnectorRegistry::new();
149 register_mongodb_sink(®istry);
150
151 let info = registry.sink_info("mongodb-sink");
152 assert!(info.is_some());
153 let info = info.unwrap();
154 assert_eq!(info.name, "mongodb-sink");
155 assert!(info.is_sink);
156 assert!(!info.is_source);
157 assert!(!info.config_keys.is_empty());
158 }
159
160 #[test]
161 fn test_cdc_config_keys() {
162 let keys = mongodb_cdc_config_keys();
163 let required: Vec<&str> = keys
164 .iter()
165 .filter(|k| k.required)
166 .map(|k| k.key.as_str())
167 .collect();
168 assert!(required.contains(&"connection.uri"));
169 assert!(required.contains(&"database"));
170 assert!(required.contains(&"collection"));
171 }
172
173 #[test]
174 fn test_sink_config_keys() {
175 let keys = mongodb_sink_config_keys();
176 let required: Vec<&str> = keys
177 .iter()
178 .filter(|k| k.required)
179 .map(|k| k.key.as_str())
180 .collect();
181 assert!(required.contains(&"connection.uri"));
182 assert!(required.contains(&"database"));
183 assert!(required.contains(&"collection"));
184 }
185
186 #[test]
187 fn test_factory_creates_source() {
188 let registry = ConnectorRegistry::new();
189 register_mongodb_cdc_source(®istry);
190
191 let config = crate::config::ConnectorConfig::new("mongodb-cdc");
192 let source = registry.create_source(&config, None);
193 assert!(source.is_ok());
194 }
195
196 #[test]
197 fn test_factory_creates_sink() {
198 let registry = ConnectorRegistry::new();
199 register_mongodb_sink(®istry);
200
201 let config = crate::config::ConnectorConfig::new("mongodb-sink");
202 let sink = registry.create_sink(&config, None);
203 assert!(sink.is_ok());
204 }
205}