1use std::time::Duration;
14
15use crate::config::ConnectorConfig;
16use crate::error::ConnectorError;
17
18use super::resume_token::ResumeTokenStoreConfig;
19use super::timeseries::CollectionKind;
20use super::write_model::WriteMode;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum FullDocumentMode {
29 #[default]
32 Delta,
33
34 UpdateLookup,
42
43 RequirePostImage,
47
48 WhenAvailable,
51}
52
53str_enum!(FullDocumentMode, lowercase, ConnectorError, "unknown full document mode",
54 Delta => "delta";
55 UpdateLookup => "updatelookup", "update_lookup";
56 RequirePostImage => "requirepostimage", "require_post_image", "required";
57 WhenAvailable => "whenavailable", "when_available"
58);
59
60#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub struct MongoDbSourceConfig {
63 pub connection_uri: String,
65
66 pub database: String,
68
69 pub collection: String,
71
72 pub full_document_mode: FullDocumentMode,
74
75 #[serde(default)]
81 pub pipeline: Vec<serde_json::Value>,
82
83 #[serde(default)]
85 pub split_large_events: bool,
86
87 #[serde(default)]
89 pub resume_token_store: ResumeTokenStoreConfig,
90
91 pub max_await_time_ms: Option<u64>,
93
94 pub batch_size: Option<u32>,
96
97 pub start_at_operation_time: Option<(u32, u32)>,
100
101 #[serde(default = "default_max_buffered_events")]
103 pub max_buffered_events: usize,
104
105 #[serde(default = "default_max_poll_records")]
107 pub max_poll_records: usize,
108}
109
110fn default_max_buffered_events() -> usize {
111 100_000
112}
113
114fn default_max_poll_records() -> usize {
115 1000
116}
117
118impl Default for MongoDbSourceConfig {
119 fn default() -> Self {
120 Self {
121 connection_uri: "mongodb://localhost:27017".to_string(),
122 database: String::new(),
123 collection: String::new(),
124 full_document_mode: FullDocumentMode::default(),
125 pipeline: Vec::new(),
126 split_large_events: false,
127 resume_token_store: ResumeTokenStoreConfig::default(),
128 max_await_time_ms: Some(1000),
129 batch_size: Some(1000),
130 start_at_operation_time: None,
131 max_buffered_events: default_max_buffered_events(),
132 max_poll_records: default_max_poll_records(),
133 }
134 }
135}
136
137impl MongoDbSourceConfig {
138 #[must_use]
140 pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
141 Self {
142 connection_uri: connection_uri.to_string(),
143 database: database.to_string(),
144 collection: collection.to_string(),
145 ..Self::default()
146 }
147 }
148
149 #[must_use]
151 pub fn is_database_watch(&self) -> bool {
152 self.collection == "*"
153 }
154
155 pub fn validate(&self) -> Result<(), ConnectorError> {
161 crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
162 crate::config::require_non_empty(&self.database, "database")?;
163 crate::config::require_non_empty(&self.collection, "collection")?;
164
165 if self.max_poll_records == 0 {
166 return Err(ConnectorError::ConfigurationError(
167 "max_poll_records must be > 0".to_string(),
168 ));
169 }
170 if self.max_buffered_events == 0 {
171 return Err(ConnectorError::ConfigurationError(
172 "max_buffered_events must be > 0".to_string(),
173 ));
174 }
175
176 if self.split_large_events {
179 return Err(ConnectorError::ConfigurationError(
180 "split_large_events requires raw change stream access not supported \
181 by the mongodb v3 driver; set split_large_events = false"
182 .to_string(),
183 ));
184 }
185
186 validate_pipeline_no_id_modification(&self.pipeline)?;
188
189 Ok(())
190 }
191
192 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
198 let mut cfg = Self {
199 connection_uri: config.require("connection.uri")?.to_string(),
200 database: config.require("database")?.to_string(),
201 collection: config.require("collection")?.to_string(),
202 ..Self::default()
203 };
204
205 if let Some(mode) = config.get_parsed::<FullDocumentMode>("full.document.mode")? {
206 cfg.full_document_mode = mode;
207 }
208 if let Some(split) = config.get_parsed::<bool>("split.large.events")? {
209 cfg.split_large_events = split;
210 }
211 if let Some(timeout) = config.get_parsed::<u64>("max.await.time.ms")? {
212 cfg.max_await_time_ms = Some(timeout);
213 }
214 if let Some(batch) = config.get_parsed::<u32>("batch.size")? {
215 cfg.batch_size = Some(batch);
216 }
217 if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
218 cfg.max_poll_records = max;
219 }
220 if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
221 cfg.max_buffered_events = max;
222 }
223
224 cfg.validate()?;
225 Ok(cfg)
226 }
227}
228
229fn validate_pipeline_no_id_modification(
235 pipeline: &[serde_json::Value],
236) -> Result<(), ConnectorError> {
237 for (i, stage) in pipeline.iter().enumerate() {
238 if let Some(obj) = stage.as_object() {
239 if let Some(proj) = obj.get("$project") {
241 if let Some(proj_obj) = proj.as_object() {
242 if let Some(id_val) = proj_obj.get("_id") {
243 if id_val == &serde_json::Value::from(0)
245 || id_val == &serde_json::Value::from(false)
246 {
247 return Err(ConnectorError::ConfigurationError(format!(
248 "pipeline stage {i}: $project must not exclude _id field — \
249 MongoDB change streams require _id for resume tokens"
250 )));
251 }
252 }
253 }
254 }
255
256 if let Some(unset) = obj.get("$unset") {
258 let fields: Vec<&str> = match unset {
259 serde_json::Value::String(s) => vec![s.as_str()],
260 serde_json::Value::Array(arr) => {
261 arr.iter().filter_map(serde_json::Value::as_str).collect()
262 }
263 _ => Vec::new(),
264 };
265 if fields.contains(&"_id") {
266 return Err(ConnectorError::ConfigurationError(format!(
267 "pipeline stage {i}: $unset must not remove _id field"
268 )));
269 }
270 }
271
272 for op in &["$addFields", "$set"] {
274 if let Some(fields) = obj.get(*op) {
275 if let Some(fields_obj) = fields.as_object() {
276 if fields_obj.contains_key("_id") {
277 return Err(ConnectorError::ConfigurationError(format!(
278 "pipeline stage {i}: {op} must not modify _id field"
279 )));
280 }
281 }
282 }
283 }
284 }
285 }
286 Ok(())
287}
288
289#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub enum WriteConcernLevel {
293 #[default]
295 Majority,
296 Nodes(u32),
298}
299
300#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
302pub struct WriteConcernConfig {
303 #[serde(default)]
305 pub w: WriteConcernLevel,
306 #[serde(default = "default_journal")]
308 pub journal: bool,
309 pub timeout_ms: Option<u64>,
311}
312
313fn default_journal() -> bool {
314 true
315}
316
317impl Default for WriteConcernConfig {
318 fn default() -> Self {
319 Self {
320 w: WriteConcernLevel::default(),
321 journal: true,
322 timeout_ms: None,
323 }
324 }
325}
326
327#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
329pub struct MongoDbSinkConfig {
330 pub connection_uri: String,
332
333 pub database: String,
335
336 pub collection: String,
338
339 #[serde(default)]
341 pub collection_kind: CollectionKind,
342
343 #[serde(default)]
345 pub write_mode: WriteMode,
346
347 #[serde(default = "default_sink_batch_size")]
349 pub batch_size: usize,
350
351 #[serde(default = "default_flush_interval_ms")]
353 pub flush_interval_ms: u64,
354
355 #[serde(default = "default_ordered")]
362 pub ordered: bool,
363
364 #[serde(default)]
366 pub write_concern: WriteConcernConfig,
367}
368
369fn default_sink_batch_size() -> usize {
370 500
371}
372
373fn default_flush_interval_ms() -> u64 {
374 1000
375}
376
377fn default_ordered() -> bool {
378 true
379}
380
381impl Default for MongoDbSinkConfig {
382 fn default() -> Self {
383 Self {
384 connection_uri: "mongodb://localhost:27017".to_string(),
385 database: String::new(),
386 collection: String::new(),
387 collection_kind: CollectionKind::default(),
388 write_mode: WriteMode::default(),
389 batch_size: default_sink_batch_size(),
390 flush_interval_ms: default_flush_interval_ms(),
391 ordered: default_ordered(),
392 write_concern: WriteConcernConfig::default(),
393 }
394 }
395}
396
397impl MongoDbSinkConfig {
398 #[must_use]
400 pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
401 Self {
402 connection_uri: connection_uri.to_string(),
403 database: database.to_string(),
404 collection: collection.to_string(),
405 ..Self::default()
406 }
407 }
408
409 #[must_use]
411 pub fn flush_interval(&self) -> Duration {
412 Duration::from_millis(self.flush_interval_ms)
413 }
414
415 pub fn validate(&self) -> Result<(), ConnectorError> {
421 crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
422 crate::config::require_non_empty(&self.database, "database")?;
423 crate::config::require_non_empty(&self.collection, "collection")?;
424
425 if self.batch_size == 0 {
426 return Err(ConnectorError::ConfigurationError(
427 "batch_size must be > 0".to_string(),
428 ));
429 }
430
431 if let CollectionKind::TimeSeries(_) = &self.collection_kind {
433 super::write_model::validate_timeseries_write_mode(&self.write_mode)?;
434 }
435
436 Ok(())
437 }
438
439 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
445 let mut cfg = Self {
446 connection_uri: config.require("connection.uri")?.to_string(),
447 database: config.require("database")?.to_string(),
448 collection: config.require("collection")?.to_string(),
449 ..Self::default()
450 };
451
452 if let Some(batch) = config.get_parsed::<usize>("batch.size")? {
453 cfg.batch_size = batch;
454 }
455 if let Some(interval) = config.get_parsed::<u64>("flush.interval.ms")? {
456 cfg.flush_interval_ms = interval;
457 }
458 if let Some(ordered) = config.get_parsed::<bool>("ordered")? {
459 cfg.ordered = ordered;
460 }
461 if let Some(mode) = config.get("write.mode") {
462 cfg.write_mode = match mode {
463 "insert" => WriteMode::Insert,
464 "cdc_replay" | "cdc-replay" => WriteMode::CdcReplay,
465 "upsert" => {
466 let keys = config
467 .require("write.mode.key_fields")?
468 .split(',')
469 .map(|s| s.trim().to_string())
470 .collect();
471 WriteMode::Upsert { key_fields: keys }
472 }
473 "replace" => WriteMode::Replace {
474 upsert_on_missing: config
475 .get_parsed::<bool>("write.mode.upsert_on_missing")?
476 .unwrap_or(false),
477 },
478 other => {
479 return Err(ConnectorError::ConfigurationError(format!(
480 "unknown write.mode: {other}"
481 )));
482 }
483 };
484 }
485 if let Some(journal) = config.get_parsed::<bool>("write_concern.journal")? {
486 cfg.write_concern.journal = journal;
487 }
488 if let Some(timeout) = config.get_parsed::<u64>("write_concern.timeout_ms")? {
489 cfg.write_concern.timeout_ms = Some(timeout);
490 }
491
492 cfg.validate()?;
493 Ok(cfg)
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500
501 #[test]
504 fn test_source_config_default() {
505 let cfg = MongoDbSourceConfig::default();
506 assert_eq!(cfg.connection_uri, "mongodb://localhost:27017");
507 assert_eq!(cfg.full_document_mode, FullDocumentMode::Delta);
508 assert!(!cfg.split_large_events);
509 assert_eq!(cfg.max_poll_records, 1000);
510 }
511
512 #[test]
513 fn test_source_config_new() {
514 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
515 assert_eq!(cfg.connection_uri, "mongodb://db:27017");
516 assert_eq!(cfg.database, "mydb");
517 assert_eq!(cfg.collection, "users");
518 }
519
520 #[test]
521 fn test_source_config_database_watch() {
522 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "*");
523 assert!(cfg.is_database_watch());
524
525 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
526 assert!(!cfg.is_database_watch());
527 }
528
529 #[test]
530 fn test_source_config_validate_empty_uri() {
531 let mut cfg = MongoDbSourceConfig::new("", "db", "coll");
532 let err = cfg.validate().unwrap_err();
533 assert!(err.to_string().contains("connection_uri"));
534 }
535
536 #[test]
537 fn test_source_config_validate_empty_database() {
538 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "", "coll");
539 let err = cfg.validate().unwrap_err();
540 assert!(err.to_string().contains("database"));
541 }
542
543 #[test]
544 fn test_source_config_validate_zero_max_poll() {
545 let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
546 cfg.max_poll_records = 0;
547 let err = cfg.validate().unwrap_err();
548 assert!(err.to_string().contains("max_poll_records"));
549 }
550
551 #[test]
552 fn test_source_config_split_large_events_rejected() {
553 let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
554 cfg.split_large_events = true;
555 let err = cfg.validate().unwrap_err();
556 assert!(err.to_string().contains("split_large_events"));
557 }
558
559 #[test]
560 fn test_source_config_from_connector_config() {
561 let mut config = ConnectorConfig::new("mongodb-cdc");
562 config.set("connection.uri", "mongodb://host:27017");
563 config.set("database", "testdb");
564 config.set("collection", "events");
565 config.set("full.document.mode", "update_lookup");
566 config.set("max.poll.records", "500");
567
568 let cfg = MongoDbSourceConfig::from_config(&config).unwrap();
569 assert_eq!(cfg.connection_uri, "mongodb://host:27017");
570 assert_eq!(cfg.database, "testdb");
571 assert_eq!(cfg.collection, "events");
572 assert_eq!(cfg.full_document_mode, FullDocumentMode::UpdateLookup);
573 assert_eq!(cfg.max_poll_records, 500);
574 }
575
576 #[test]
577 fn test_source_config_from_config_missing_required() {
578 let config = ConnectorConfig::new("mongodb-cdc");
579 assert!(MongoDbSourceConfig::from_config(&config).is_err());
580 }
581
582 #[test]
585 fn test_pipeline_valid_match() {
586 let pipeline = vec![serde_json::json!({
587 "$match": { "operationType": "insert" }
588 })];
589 validate_pipeline_no_id_modification(&pipeline).unwrap();
590 }
591
592 #[test]
593 fn test_pipeline_id_excluded_in_project() {
594 let pipeline = vec![serde_json::json!({
595 "$project": { "_id": 0 }
596 })];
597 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
598 assert!(err.to_string().contains("_id"));
599 }
600
601 #[test]
602 fn test_pipeline_id_excluded_false() {
603 let pipeline = vec![serde_json::json!({
604 "$project": { "_id": false }
605 })];
606 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
607 assert!(err.to_string().contains("_id"));
608 }
609
610 #[test]
611 fn test_pipeline_id_unset() {
612 let pipeline = vec![serde_json::json!({ "$unset": "_id" })];
613 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
614 assert!(err.to_string().contains("_id"));
615 }
616
617 #[test]
618 fn test_pipeline_id_unset_array() {
619 let pipeline = vec![serde_json::json!({ "$unset": ["_id", "other"] })];
620 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
621 assert!(err.to_string().contains("_id"));
622 }
623
624 #[test]
625 fn test_pipeline_id_addfields() {
626 let pipeline = vec![serde_json::json!({
627 "$addFields": { "_id": "overwritten" }
628 })];
629 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
630 assert!(err.to_string().contains("_id"));
631 }
632
633 #[test]
634 fn test_pipeline_id_set() {
635 let pipeline = vec![serde_json::json!({
636 "$set": { "_id": "overwritten" }
637 })];
638 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
639 assert!(err.to_string().contains("_id"));
640 }
641
642 #[test]
643 fn test_pipeline_valid_project_includes_id() {
644 let pipeline = vec![serde_json::json!({
645 "$project": { "_id": 1, "name": 1 }
646 })];
647 validate_pipeline_no_id_modification(&pipeline).unwrap();
648 }
649
650 #[test]
653 fn test_full_document_mode_fromstr() {
654 assert_eq!(
655 "delta".parse::<FullDocumentMode>().unwrap(),
656 FullDocumentMode::Delta
657 );
658 assert_eq!(
659 "update_lookup".parse::<FullDocumentMode>().unwrap(),
660 FullDocumentMode::UpdateLookup
661 );
662 assert_eq!(
663 "updatelookup".parse::<FullDocumentMode>().unwrap(),
664 FullDocumentMode::UpdateLookup
665 );
666 assert_eq!(
667 "required".parse::<FullDocumentMode>().unwrap(),
668 FullDocumentMode::RequirePostImage
669 );
670 assert_eq!(
671 "when_available".parse::<FullDocumentMode>().unwrap(),
672 FullDocumentMode::WhenAvailable
673 );
674 assert!("bad".parse::<FullDocumentMode>().is_err());
675 }
676
677 #[test]
678 fn test_full_document_mode_display() {
679 assert_eq!(FullDocumentMode::Delta.to_string(), "delta");
680 assert_eq!(FullDocumentMode::UpdateLookup.to_string(), "updatelookup");
681 }
682
683 #[test]
686 fn test_sink_config_default() {
687 let cfg = MongoDbSinkConfig::default();
688 assert_eq!(cfg.batch_size, 500);
689 assert_eq!(cfg.flush_interval_ms, 1000);
690 assert!(cfg.ordered);
691 assert!(matches!(cfg.collection_kind, CollectionKind::Standard));
692 }
693
694 #[test]
695 fn test_sink_config_new() {
696 let cfg = MongoDbSinkConfig::new("mongodb://db:27017", "mydb", "events");
697 assert_eq!(cfg.connection_uri, "mongodb://db:27017");
698 assert_eq!(cfg.database, "mydb");
699 assert_eq!(cfg.collection, "events");
700 }
701
702 #[test]
703 fn test_sink_config_validate_empty_uri() {
704 let cfg = MongoDbSinkConfig::new("", "db", "coll");
705 let err = cfg.validate().unwrap_err();
706 assert!(err.to_string().contains("connection_uri"));
707 }
708
709 #[test]
710 fn test_sink_config_validate_zero_batch_size() {
711 let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "coll");
712 cfg.batch_size = 0;
713 let err = cfg.validate().unwrap_err();
714 assert!(err.to_string().contains("batch_size"));
715 }
716
717 #[test]
718 fn test_sink_config_timeseries_upsert_rejected() {
719 let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "ts");
720 cfg.collection_kind =
721 CollectionKind::TimeSeries(super::super::timeseries::TimeSeriesConfig {
722 time_field: "ts".to_string(),
723 meta_field: None,
724 granularity: super::super::timeseries::TimeSeriesGranularity::Seconds,
725 expire_after_seconds: None,
726 });
727 cfg.write_mode = WriteMode::Upsert {
728 key_fields: vec!["id".to_string()],
729 };
730 let err = cfg.validate().unwrap_err();
731 assert!(err.to_string().contains("time series"));
732 }
733
734 #[test]
735 fn test_sink_config_flush_interval() {
736 let cfg = MongoDbSinkConfig::default();
737 assert_eq!(cfg.flush_interval(), Duration::from_secs(1));
738 }
739
740 #[test]
741 fn test_sink_config_from_connector_config() {
742 let mut config = ConnectorConfig::new("mongodb-sink");
743 config.set("connection.uri", "mongodb://host:27017");
744 config.set("database", "testdb");
745 config.set("collection", "out");
746 config.set("batch.size", "1000");
747 config.set("ordered", "false");
748
749 let cfg = MongoDbSinkConfig::from_config(&config).unwrap();
750 assert_eq!(cfg.batch_size, 1000);
751 assert!(!cfg.ordered);
752 }
753
754 #[test]
755 fn test_write_concern_default() {
756 let wc = WriteConcernConfig::default();
757 assert!(matches!(wc.w, WriteConcernLevel::Majority));
758 assert!(wc.journal);
759 assert!(wc.timeout_ms.is_none());
760 }
761}