1use std::time::Duration;
14
15use crate::config::ConnectorConfig;
16use crate::error::ConnectorError;
17
18use super::resume_token::ResumeTokenStoreConfig;
19use super::timeseries::{CollectionKind, TimeSeriesConfig, TimeSeriesGranularity};
20use super::write_model::WriteMode;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum FullDocumentMode {
29 #[default]
32 Delta,
33
34 UpdateLookup,
42
43 RequirePostImage,
47
48 WhenAvailable,
51}
52
53str_enum!(FullDocumentMode, lowercase, ConnectorError, "unknown full document mode",
54 Delta => "delta";
55 UpdateLookup => "updatelookup", "update_lookup";
56 RequirePostImage => "requirepostimage", "require_post_image", "required";
57 WhenAvailable => "whenavailable", "when_available"
58);
59
60#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub struct MongoDbSourceConfig {
63 pub connection_uri: String,
65
66 pub database: String,
68
69 pub collection: String,
71
72 pub full_document_mode: FullDocumentMode,
74
75 #[serde(default)]
81 pub pipeline: Vec<serde_json::Value>,
82
83 #[serde(default)]
85 pub split_large_events: bool,
86
87 #[serde(default)]
89 pub resume_token_store: ResumeTokenStoreConfig,
90
91 pub max_await_time_ms: Option<u64>,
93
94 pub batch_size: Option<u32>,
96
97 pub start_at_operation_time: Option<(u32, u32)>,
100
101 #[serde(default = "default_max_buffered_events")]
103 pub max_buffered_events: usize,
104
105 #[serde(default = "default_max_poll_records")]
107 pub max_poll_records: usize,
108}
109
110fn default_max_buffered_events() -> usize {
111 100_000
112}
113
114fn default_max_poll_records() -> usize {
115 1000
116}
117
118impl Default for MongoDbSourceConfig {
119 fn default() -> Self {
120 Self {
121 connection_uri: "mongodb://localhost:27017".to_string(),
122 database: String::new(),
123 collection: String::new(),
124 full_document_mode: FullDocumentMode::default(),
125 pipeline: Vec::new(),
126 split_large_events: false,
127 resume_token_store: ResumeTokenStoreConfig::default(),
128 max_await_time_ms: Some(1000),
129 batch_size: Some(1000),
130 start_at_operation_time: None,
131 max_buffered_events: default_max_buffered_events(),
132 max_poll_records: default_max_poll_records(),
133 }
134 }
135}
136
137impl MongoDbSourceConfig {
138 #[must_use]
140 pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
141 Self {
142 connection_uri: connection_uri.to_string(),
143 database: database.to_string(),
144 collection: collection.to_string(),
145 ..Self::default()
146 }
147 }
148
149 #[must_use]
151 pub fn is_database_watch(&self) -> bool {
152 self.collection == "*"
153 }
154
155 pub fn validate(&self) -> Result<(), ConnectorError> {
161 crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
162 crate::config::require_non_empty(&self.database, "database")?;
163 crate::config::require_non_empty(&self.collection, "collection")?;
164
165 if self.max_poll_records == 0 {
166 return Err(ConnectorError::ConfigurationError(
167 "max_poll_records must be > 0".to_string(),
168 ));
169 }
170 if self.max_buffered_events == 0 {
171 return Err(ConnectorError::ConfigurationError(
172 "max_buffered_events must be > 0".to_string(),
173 ));
174 }
175
176 if self.split_large_events {
179 return Err(ConnectorError::ConfigurationError(
180 "split_large_events requires raw change stream access not supported \
181 by the mongodb v3 driver; set split_large_events = false"
182 .to_string(),
183 ));
184 }
185
186 validate_pipeline_no_id_modification(&self.pipeline)?;
188
189 Ok(())
190 }
191
192 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
198 let mut cfg = Self {
199 connection_uri: config.require("connection.uri")?.to_string(),
200 database: config.require("database")?.to_string(),
201 collection: config.require("collection")?.to_string(),
202 ..Self::default()
203 };
204
205 if let Some(mode) = config.get_parsed::<FullDocumentMode>("full.document.mode")? {
206 cfg.full_document_mode = mode;
207 }
208 if let Some(split) = config.get_parsed::<bool>("split.large.events")? {
209 cfg.split_large_events = split;
210 }
211 if let Some(timeout) = config.get_parsed::<u64>("max.await.time.ms")? {
212 cfg.max_await_time_ms = Some(timeout);
213 }
214 if let Some(batch) = config.get_parsed::<u32>("batch.size")? {
215 cfg.batch_size = Some(batch);
216 }
217 if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
218 cfg.max_poll_records = max;
219 }
220 if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
221 cfg.max_buffered_events = max;
222 }
223
224 cfg.validate()?;
225 Ok(cfg)
226 }
227}
228
229fn validate_pipeline_no_id_modification(
235 pipeline: &[serde_json::Value],
236) -> Result<(), ConnectorError> {
237 for (i, stage) in pipeline.iter().enumerate() {
238 if let Some(obj) = stage.as_object() {
239 if let Some(proj) = obj.get("$project") {
241 if let Some(proj_obj) = proj.as_object() {
242 if let Some(id_val) = proj_obj.get("_id") {
243 if id_val == &serde_json::Value::from(0)
245 || id_val == &serde_json::Value::from(false)
246 {
247 return Err(ConnectorError::ConfigurationError(format!(
248 "pipeline stage {i}: $project must not exclude _id field — \
249 MongoDB change streams require _id for resume tokens"
250 )));
251 }
252 }
253 }
254 }
255
256 if let Some(unset) = obj.get("$unset") {
258 let fields: Vec<&str> = match unset {
259 serde_json::Value::String(s) => vec![s.as_str()],
260 serde_json::Value::Array(arr) => {
261 arr.iter().filter_map(serde_json::Value::as_str).collect()
262 }
263 _ => Vec::new(),
264 };
265 if fields.contains(&"_id") {
266 return Err(ConnectorError::ConfigurationError(format!(
267 "pipeline stage {i}: $unset must not remove _id field"
268 )));
269 }
270 }
271
272 for op in &["$addFields", "$set"] {
274 if let Some(fields) = obj.get(*op) {
275 if let Some(fields_obj) = fields.as_object() {
276 if fields_obj.contains_key("_id") {
277 return Err(ConnectorError::ConfigurationError(format!(
278 "pipeline stage {i}: {op} must not modify _id field"
279 )));
280 }
281 }
282 }
283 }
284 }
285 }
286 Ok(())
287}
288
289#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub enum WriteConcernLevel {
293 #[default]
295 Majority,
296 Nodes(u32),
298}
299
300#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
302pub struct WriteConcernConfig {
303 #[serde(default)]
305 pub w: WriteConcernLevel,
306 #[serde(default = "default_journal")]
308 pub journal: bool,
309 pub timeout_ms: Option<u64>,
311}
312
313fn default_journal() -> bool {
314 true
315}
316
317impl Default for WriteConcernConfig {
318 fn default() -> Self {
319 Self {
320 w: WriteConcernLevel::default(),
321 journal: true,
322 timeout_ms: None,
323 }
324 }
325}
326
327#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
329pub struct MongoDbSinkConfig {
330 pub connection_uri: String,
332
333 pub database: String,
335
336 pub collection: String,
338
339 #[serde(default)]
341 pub collection_kind: CollectionKind,
342
343 #[serde(default)]
345 pub write_mode: WriteMode,
346
347 #[serde(default = "default_sink_batch_size")]
349 pub batch_size: usize,
350
351 #[serde(default = "default_flush_interval_ms")]
353 pub flush_interval_ms: u64,
354
355 #[serde(default = "default_ordered")]
362 pub ordered: bool,
363
364 #[serde(default)]
366 pub write_concern: WriteConcernConfig,
367}
368
369fn default_sink_batch_size() -> usize {
370 500
371}
372
373fn default_flush_interval_ms() -> u64 {
374 1000
375}
376
377fn default_ordered() -> bool {
378 true
379}
380
381impl Default for MongoDbSinkConfig {
382 fn default() -> Self {
383 Self {
384 connection_uri: "mongodb://localhost:27017".to_string(),
385 database: String::new(),
386 collection: String::new(),
387 collection_kind: CollectionKind::default(),
388 write_mode: WriteMode::default(),
389 batch_size: default_sink_batch_size(),
390 flush_interval_ms: default_flush_interval_ms(),
391 ordered: default_ordered(),
392 write_concern: WriteConcernConfig::default(),
393 }
394 }
395}
396
397impl MongoDbSinkConfig {
398 #[must_use]
400 pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
401 Self {
402 connection_uri: connection_uri.to_string(),
403 database: database.to_string(),
404 collection: collection.to_string(),
405 ..Self::default()
406 }
407 }
408
409 #[must_use]
411 pub fn flush_interval(&self) -> Duration {
412 Duration::from_millis(self.flush_interval_ms)
413 }
414
415 pub fn validate(&self) -> Result<(), ConnectorError> {
421 crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
422 crate::config::require_non_empty(&self.database, "database")?;
423 crate::config::require_non_empty(&self.collection, "collection")?;
424
425 if self.batch_size == 0 {
426 return Err(ConnectorError::ConfigurationError(
427 "batch_size must be > 0".to_string(),
428 ));
429 }
430
431 if let CollectionKind::TimeSeries(_) = &self.collection_kind {
433 super::write_model::validate_timeseries_write_mode(&self.write_mode)?;
434 }
435
436 Ok(())
437 }
438
439 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
445 let mut cfg = Self {
446 connection_uri: config.require("connection.uri")?.to_string(),
447 database: config.require("database")?.to_string(),
448 collection: config.require("collection")?.to_string(),
449 ..Self::default()
450 };
451
452 if let Some(batch) = config.get_parsed::<usize>("batch.size")? {
453 cfg.batch_size = batch;
454 }
455 if let Some(interval) = config.get_parsed::<u64>("flush.interval.ms")? {
456 cfg.flush_interval_ms = interval;
457 }
458 if let Some(ordered) = config.get_parsed::<bool>("ordered")? {
459 cfg.ordered = ordered;
460 }
461 if let Some(mode) = config.get("write.mode") {
462 cfg.write_mode = match mode {
463 "insert" => WriteMode::Insert,
464 "cdc_replay" | "cdc-replay" => WriteMode::CdcReplay,
465 "upsert" => {
466 let keys = config
467 .require("write.mode.key_fields")?
468 .split(',')
469 .map(|s| s.trim().to_string())
470 .collect();
471 WriteMode::Upsert { key_fields: keys }
472 }
473 "replace" => WriteMode::Replace {
474 upsert_on_missing: config
475 .get_parsed::<bool>("write.mode.upsert_on_missing")?
476 .unwrap_or(false),
477 },
478 other => {
479 return Err(ConnectorError::ConfigurationError(format!(
480 "unknown write.mode: {other}"
481 )));
482 }
483 };
484 }
485 if let Some(journal) = config.get_parsed::<bool>("write_concern.journal")? {
486 cfg.write_concern.journal = journal;
487 }
488 if let Some(timeout) = config.get_parsed::<u64>("write_concern.timeout_ms")? {
489 cfg.write_concern.timeout_ms = Some(timeout);
490 }
491
492 if let Some(time_field) = config.get("timeseries.time_field") {
493 if time_field.trim().is_empty() {
494 return Err(ConnectorError::ConfigurationError(
495 "timeseries.time_field must not be empty".to_string(),
496 ));
497 }
498 let meta_field = config.get("timeseries.meta_field").map(String::from);
499 let granularity = if let Some(gran_str) = config.get("timeseries.granularity") {
500 match gran_str.to_lowercase().as_str() {
501 "seconds" => TimeSeriesGranularity::Seconds,
502 "minutes" => TimeSeriesGranularity::Minutes,
503 "hours" => TimeSeriesGranularity::Hours,
504 "custom" => {
505 let span =
506 config.require_parsed::<u32>("timeseries.bucket_max_span_seconds")?;
507 let rounding =
508 config.require_parsed::<u32>("timeseries.bucket_rounding_seconds")?;
509 TimeSeriesGranularity::custom(span, rounding)?
510 }
511 other => {
512 return Err(ConnectorError::ConfigurationError(format!(
513 "unknown timeseries granularity: {other}"
514 )));
515 }
516 }
517 } else {
518 TimeSeriesGranularity::Seconds
519 };
520
521 let expire_after_seconds =
522 config.get_parsed::<u64>("timeseries.expire_after_seconds")?;
523
524 cfg.collection_kind = CollectionKind::TimeSeries(TimeSeriesConfig {
525 time_field: time_field.to_string(),
526 meta_field,
527 granularity,
528 expire_after_seconds,
529 });
530 }
531
532 cfg.validate()?;
533 Ok(cfg)
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 #[test]
544 fn test_source_config_default() {
545 let cfg = MongoDbSourceConfig::default();
546 assert_eq!(cfg.connection_uri, "mongodb://localhost:27017");
547 assert_eq!(cfg.full_document_mode, FullDocumentMode::Delta);
548 assert!(!cfg.split_large_events);
549 assert_eq!(cfg.max_poll_records, 1000);
550 }
551
552 #[test]
553 fn test_source_config_new() {
554 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
555 assert_eq!(cfg.connection_uri, "mongodb://db:27017");
556 assert_eq!(cfg.database, "mydb");
557 assert_eq!(cfg.collection, "users");
558 }
559
560 #[test]
561 fn test_source_config_database_watch() {
562 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "*");
563 assert!(cfg.is_database_watch());
564
565 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
566 assert!(!cfg.is_database_watch());
567 }
568
569 #[test]
570 fn test_source_config_validate_empty_uri() {
571 let mut cfg = MongoDbSourceConfig::new("", "db", "coll");
572 let err = cfg.validate().unwrap_err();
573 assert!(err.to_string().contains("connection_uri"));
574 }
575
576 #[test]
577 fn test_source_config_validate_empty_database() {
578 let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "", "coll");
579 let err = cfg.validate().unwrap_err();
580 assert!(err.to_string().contains("database"));
581 }
582
583 #[test]
584 fn test_source_config_validate_zero_max_poll() {
585 let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
586 cfg.max_poll_records = 0;
587 let err = cfg.validate().unwrap_err();
588 assert!(err.to_string().contains("max_poll_records"));
589 }
590
591 #[test]
592 fn test_source_config_split_large_events_rejected() {
593 let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
594 cfg.split_large_events = true;
595 let err = cfg.validate().unwrap_err();
596 assert!(err.to_string().contains("split_large_events"));
597 }
598
599 #[test]
600 fn test_source_config_from_connector_config() {
601 let mut config = ConnectorConfig::new("mongodb-cdc");
602 config.set("connection.uri", "mongodb://host:27017");
603 config.set("database", "testdb");
604 config.set("collection", "events");
605 config.set("full.document.mode", "update_lookup");
606 config.set("max.poll.records", "500");
607
608 let cfg = MongoDbSourceConfig::from_config(&config).unwrap();
609 assert_eq!(cfg.connection_uri, "mongodb://host:27017");
610 assert_eq!(cfg.database, "testdb");
611 assert_eq!(cfg.collection, "events");
612 assert_eq!(cfg.full_document_mode, FullDocumentMode::UpdateLookup);
613 assert_eq!(cfg.max_poll_records, 500);
614 }
615
616 #[test]
617 fn test_source_config_from_config_missing_required() {
618 let config = ConnectorConfig::new("mongodb-cdc");
619 assert!(MongoDbSourceConfig::from_config(&config).is_err());
620 }
621
622 #[test]
625 fn test_pipeline_valid_match() {
626 let pipeline = vec![serde_json::json!({
627 "$match": { "operationType": "insert" }
628 })];
629 validate_pipeline_no_id_modification(&pipeline).unwrap();
630 }
631
632 #[test]
633 fn test_pipeline_id_excluded_in_project() {
634 let pipeline = vec![serde_json::json!({
635 "$project": { "_id": 0 }
636 })];
637 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
638 assert!(err.to_string().contains("_id"));
639 }
640
641 #[test]
642 fn test_pipeline_id_excluded_false() {
643 let pipeline = vec![serde_json::json!({
644 "$project": { "_id": false }
645 })];
646 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
647 assert!(err.to_string().contains("_id"));
648 }
649
650 #[test]
651 fn test_pipeline_id_unset() {
652 let pipeline = vec![serde_json::json!({ "$unset": "_id" })];
653 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
654 assert!(err.to_string().contains("_id"));
655 }
656
657 #[test]
658 fn test_pipeline_id_unset_array() {
659 let pipeline = vec![serde_json::json!({ "$unset": ["_id", "other"] })];
660 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
661 assert!(err.to_string().contains("_id"));
662 }
663
664 #[test]
665 fn test_pipeline_id_addfields() {
666 let pipeline = vec![serde_json::json!({
667 "$addFields": { "_id": "overwritten" }
668 })];
669 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
670 assert!(err.to_string().contains("_id"));
671 }
672
673 #[test]
674 fn test_pipeline_id_set() {
675 let pipeline = vec![serde_json::json!({
676 "$set": { "_id": "overwritten" }
677 })];
678 let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
679 assert!(err.to_string().contains("_id"));
680 }
681
682 #[test]
683 fn test_pipeline_valid_project_includes_id() {
684 let pipeline = vec![serde_json::json!({
685 "$project": { "_id": 1, "name": 1 }
686 })];
687 validate_pipeline_no_id_modification(&pipeline).unwrap();
688 }
689
690 #[test]
693 fn test_full_document_mode_fromstr() {
694 assert_eq!(
695 "delta".parse::<FullDocumentMode>().unwrap(),
696 FullDocumentMode::Delta
697 );
698 assert_eq!(
699 "update_lookup".parse::<FullDocumentMode>().unwrap(),
700 FullDocumentMode::UpdateLookup
701 );
702 assert_eq!(
703 "updatelookup".parse::<FullDocumentMode>().unwrap(),
704 FullDocumentMode::UpdateLookup
705 );
706 assert_eq!(
707 "required".parse::<FullDocumentMode>().unwrap(),
708 FullDocumentMode::RequirePostImage
709 );
710 assert_eq!(
711 "when_available".parse::<FullDocumentMode>().unwrap(),
712 FullDocumentMode::WhenAvailable
713 );
714 assert!("bad".parse::<FullDocumentMode>().is_err());
715 }
716
717 #[test]
718 fn test_full_document_mode_display() {
719 assert_eq!(FullDocumentMode::Delta.to_string(), "delta");
720 assert_eq!(FullDocumentMode::UpdateLookup.to_string(), "updatelookup");
721 }
722
723 #[test]
726 fn test_sink_config_default() {
727 let cfg = MongoDbSinkConfig::default();
728 assert_eq!(cfg.batch_size, 500);
729 assert_eq!(cfg.flush_interval_ms, 1000);
730 assert!(cfg.ordered);
731 assert!(matches!(cfg.collection_kind, CollectionKind::Standard));
732 }
733
734 #[test]
735 fn test_sink_config_new() {
736 let cfg = MongoDbSinkConfig::new("mongodb://db:27017", "mydb", "events");
737 assert_eq!(cfg.connection_uri, "mongodb://db:27017");
738 assert_eq!(cfg.database, "mydb");
739 assert_eq!(cfg.collection, "events");
740 }
741
742 #[test]
743 fn test_sink_config_validate_empty_uri() {
744 let cfg = MongoDbSinkConfig::new("", "db", "coll");
745 let err = cfg.validate().unwrap_err();
746 assert!(err.to_string().contains("connection_uri"));
747 }
748
749 #[test]
750 fn test_sink_config_validate_zero_batch_size() {
751 let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "coll");
752 cfg.batch_size = 0;
753 let err = cfg.validate().unwrap_err();
754 assert!(err.to_string().contains("batch_size"));
755 }
756
757 #[test]
758 fn test_sink_config_timeseries_upsert_rejected() {
759 let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "ts");
760 cfg.collection_kind =
761 CollectionKind::TimeSeries(super::super::timeseries::TimeSeriesConfig {
762 time_field: "ts".to_string(),
763 meta_field: None,
764 granularity: super::super::timeseries::TimeSeriesGranularity::Seconds,
765 expire_after_seconds: None,
766 });
767 cfg.write_mode = WriteMode::Upsert {
768 key_fields: vec!["id".to_string()],
769 };
770 let err = cfg.validate().unwrap_err();
771 assert!(err.to_string().contains("time series"));
772 }
773
774 #[test]
775 fn test_sink_config_flush_interval() {
776 let cfg = MongoDbSinkConfig::default();
777 assert_eq!(cfg.flush_interval(), Duration::from_secs(1));
778 }
779
780 #[test]
781 fn test_sink_config_from_connector_config() {
782 let mut config = ConnectorConfig::new("mongodb-sink");
783 config.set("connection.uri", "mongodb://host:27017");
784 config.set("database", "testdb");
785 config.set("collection", "out");
786 config.set("batch.size", "1000");
787 config.set("ordered", "false");
788
789 let cfg = MongoDbSinkConfig::from_config(&config).unwrap();
790 assert_eq!(cfg.batch_size, 1000);
791 assert!(!cfg.ordered);
792 }
793
794 #[test]
795 fn test_sink_config_timeseries_parsing() {
796 let mut config = ConnectorConfig::new("mongodb-sink");
798 config.set("connection.uri", "mongodb://host:27017");
799 config.set("database", "testdb");
800 config.set("collection", "ts_out");
801 config.set("timeseries.time_field", "timestamp");
802 config.set("timeseries.meta_field", "sensor_id");
803 config.set("timeseries.granularity", "minutes");
804 config.set("timeseries.expire_after_seconds", "86400");
805
806 let cfg = MongoDbSinkConfig::from_config(&config).unwrap();
807 if let CollectionKind::TimeSeries(ts) = cfg.collection_kind {
808 assert_eq!(ts.time_field, "timestamp");
809 assert_eq!(ts.meta_field.as_deref(), Some("sensor_id"));
810 assert_eq!(ts.granularity, TimeSeriesGranularity::Minutes);
811 assert_eq!(ts.expire_after_seconds, Some(86400));
812 } else {
813 panic!("Expected TimeSeries collection kind");
814 }
815
816 let mut config_custom = ConnectorConfig::new("mongodb-sink");
818 config_custom.set("connection.uri", "mongodb://host:27017");
819 config_custom.set("database", "testdb");
820 config_custom.set("collection", "ts_custom");
821 config_custom.set("timeseries.time_field", "timestamp");
822 config_custom.set("timeseries.granularity", "custom");
823 config_custom.set("timeseries.bucket_max_span_seconds", "3600");
824 config_custom.set("timeseries.bucket_rounding_seconds", "3600");
825
826 let cfg_custom = MongoDbSinkConfig::from_config(&config_custom).unwrap();
827 if let CollectionKind::TimeSeries(ts) = cfg_custom.collection_kind {
828 assert_eq!(ts.time_field, "timestamp");
829 assert_eq!(
830 ts.granularity,
831 TimeSeriesGranularity::Custom {
832 bucket_max_span_seconds: 3600,
833 bucket_rounding_seconds: 3600,
834 }
835 );
836 } else {
837 panic!("Expected TimeSeries collection kind");
838 }
839 }
840
841 #[test]
842 fn test_sink_config_timeseries_empty_time_field_rejected() {
843 let mut config = ConnectorConfig::new("mongodb-sink");
844 config.set("connection.uri", "mongodb://host:27017");
845 config.set("database", "testdb");
846 config.set("collection", "ts");
847 config.set("timeseries.time_field", " ");
848 let err = MongoDbSinkConfig::from_config(&config).unwrap_err();
849 assert!(err.to_string().contains("time_field"));
850 }
851
852 #[test]
853 fn test_write_concern_default() {
854 let wc = WriteConcernConfig::default();
855 assert!(matches!(wc.w, WriteConcernLevel::Majority));
856 assert!(wc.journal);
857 assert!(wc.timeout_ms.is_none());
858 }
859}