Skip to main content

laminar_connectors/mongodb/
config.rs

1//! `MongoDB` connector configuration.
2//!
3//! Provides [`MongoDbSourceConfig`] for the CDC change stream source and
4//! [`MongoDbSinkConfig`] for the write sink. Both support construction
5//! from a generic [`ConnectorConfig`] key-value map and programmatic builders.
6//!
7//! # Pipeline Validation
8//!
9//! The source config validates that user-supplied aggregation pipeline stages
10//! do not modify the `_id` field. `MongoDB` 8.x throws a server-side error if
11//! `_id` is projected away; validating at construction prevents runtime failures.
12
13use std::time::Duration;
14
15use crate::config::ConnectorConfig;
16use crate::error::ConnectorError;
17
18use super::resume_token::ResumeTokenStoreConfig;
19use super::timeseries::CollectionKind;
20use super::write_model::WriteMode;
21
22/// Mode for requesting full documents on update events.
23///
24/// Controls the `fullDocument` option on the change stream cursor.
25/// The choice has significant correctness and performance implications.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum FullDocumentMode {
29    /// Default: only the delta (`updateDescription`) for update events.
30    /// `fullDocument` is `None` on updates.
31    #[default]
32    Delta,
33
34    /// `fullDocument: "updateLookup"` — fetches the current document from
35    /// the collection on update events.
36    ///
37    /// **Warning**: Dangerous on high-churn collections with `$match` filters.
38    /// If the matched document is quickly deleted, the lookup returns `null`
39    /// and can cause "Resume Token Not Found" errors. Prefer `WhenAvailable`
40    /// for high-churn patterns.
41    UpdateLookup,
42
43    /// `fullDocument: "required"` — the collection must have
44    /// `changeStreamPreAndPostImages` enabled. Returns
45    /// `SourceError::PostImageNotEnabled` if not configured.
46    RequirePostImage,
47
48    /// `fullDocument: "whenAvailable"` — safest option for high-churn
49    /// collections. Returns the post-image when available, `null` otherwise.
50    WhenAvailable,
51}
52
53str_enum!(FullDocumentMode, lowercase, ConnectorError, "unknown full document mode",
54    Delta => "delta";
55    UpdateLookup => "updatelookup", "update_lookup";
56    RequirePostImage => "requirepostimage", "require_post_image", "required";
57    WhenAvailable => "whenavailable", "when_available"
58);
59
60/// Configuration for the `MongoDB` CDC source connector.
61#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub struct MongoDbSourceConfig {
63    /// `MongoDB` connection URI (e.g., `mongodb://host:27017`).
64    pub connection_uri: String,
65
66    /// Database name.
67    pub database: String,
68
69    /// Collection name, or `"*"` to watch all collections in the database.
70    pub collection: String,
71
72    /// Full document retrieval mode for update events.
73    pub full_document_mode: FullDocumentMode,
74
75    /// Additional aggregation pipeline stages injected before the change
76    /// stream opens. Callers can inject `$match`, `$project`, etc.
77    ///
78    /// **Constraint**: Pipeline stages must not modify the `_id` field.
79    /// This is validated at construction.
80    #[serde(default)]
81    pub pipeline: Vec<serde_json::Value>,
82
83    /// Whether to enable `$changeStreamSplitLargeEvent` (`MongoDB` ≥ 6.0.9).
84    #[serde(default)]
85    pub split_large_events: bool,
86
87    /// Resume token persistence configuration.
88    #[serde(default)]
89    pub resume_token_store: ResumeTokenStoreConfig,
90
91    /// `getMore` await timeout in milliseconds.
92    pub max_await_time_ms: Option<u64>,
93
94    /// Cursor batch size hint.
95    pub batch_size: Option<u32>,
96
97    /// For fresh starts with no persisted token, start at this operation time.
98    /// Encoded as `{ t: <seconds>, i: <increment> }`.
99    pub start_at_operation_time: Option<(u32, u32)>,
100
101    /// Maximum events to buffer before applying backpressure.
102    #[serde(default = "default_max_buffered_events")]
103    pub max_buffered_events: usize,
104
105    /// Maximum records to return per `poll_batch` call.
106    #[serde(default = "default_max_poll_records")]
107    pub max_poll_records: usize,
108}
109
110fn default_max_buffered_events() -> usize {
111    100_000
112}
113
114fn default_max_poll_records() -> usize {
115    1000
116}
117
118impl Default for MongoDbSourceConfig {
119    fn default() -> Self {
120        Self {
121            connection_uri: "mongodb://localhost:27017".to_string(),
122            database: String::new(),
123            collection: String::new(),
124            full_document_mode: FullDocumentMode::default(),
125            pipeline: Vec::new(),
126            split_large_events: false,
127            resume_token_store: ResumeTokenStoreConfig::default(),
128            max_await_time_ms: Some(1000),
129            batch_size: Some(1000),
130            start_at_operation_time: None,
131            max_buffered_events: default_max_buffered_events(),
132            max_poll_records: default_max_poll_records(),
133        }
134    }
135}
136
137impl MongoDbSourceConfig {
138    /// Creates a new source config with required fields.
139    #[must_use]
140    pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
141        Self {
142            connection_uri: connection_uri.to_string(),
143            database: database.to_string(),
144            collection: collection.to_string(),
145            ..Self::default()
146        }
147    }
148
149    /// Returns `true` if the source is configured to watch all collections.
150    #[must_use]
151    pub fn is_database_watch(&self) -> bool {
152        self.collection == "*"
153    }
154
155    /// Validates the configuration.
156    ///
157    /// # Errors
158    ///
159    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
160    pub fn validate(&self) -> Result<(), ConnectorError> {
161        crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
162        crate::config::require_non_empty(&self.database, "database")?;
163        crate::config::require_non_empty(&self.collection, "collection")?;
164
165        if self.max_poll_records == 0 {
166            return Err(ConnectorError::ConfigurationError(
167                "max_poll_records must be > 0".to_string(),
168            ));
169        }
170        if self.max_buffered_events == 0 {
171            return Err(ConnectorError::ConfigurationError(
172                "max_buffered_events must be > 0".to_string(),
173            ));
174        }
175
176        // split_large_events requires raw change stream access not available
177        // in the mongodb v3 driver (ChangeStreamEvent drops unknown fields).
178        if self.split_large_events {
179            return Err(ConnectorError::ConfigurationError(
180                "split_large_events requires raw change stream access not supported \
181                 by the mongodb v3 driver; set split_large_events = false"
182                    .to_string(),
183            ));
184        }
185
186        // Validate pipeline does not modify the _id field.
187        validate_pipeline_no_id_modification(&self.pipeline)?;
188
189        Ok(())
190    }
191
192    /// Parses configuration from a generic [`ConnectorConfig`].
193    ///
194    /// # Errors
195    ///
196    /// Returns `ConnectorError` if required keys are missing or invalid.
197    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
198        let mut cfg = Self {
199            connection_uri: config.require("connection.uri")?.to_string(),
200            database: config.require("database")?.to_string(),
201            collection: config.require("collection")?.to_string(),
202            ..Self::default()
203        };
204
205        if let Some(mode) = config.get_parsed::<FullDocumentMode>("full.document.mode")? {
206            cfg.full_document_mode = mode;
207        }
208        if let Some(split) = config.get_parsed::<bool>("split.large.events")? {
209            cfg.split_large_events = split;
210        }
211        if let Some(timeout) = config.get_parsed::<u64>("max.await.time.ms")? {
212            cfg.max_await_time_ms = Some(timeout);
213        }
214        if let Some(batch) = config.get_parsed::<u32>("batch.size")? {
215            cfg.batch_size = Some(batch);
216        }
217        if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
218            cfg.max_poll_records = max;
219        }
220        if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
221            cfg.max_buffered_events = max;
222        }
223
224        cfg.validate()?;
225        Ok(cfg)
226    }
227}
228
229/// Validates that a pipeline does not modify the `_id` field.
230///
231/// `MongoDB` 8.x throws a server error if `_id` is projected away or
232/// modified in change stream pipeline stages. We fail fast at config
233/// time instead.
234fn validate_pipeline_no_id_modification(
235    pipeline: &[serde_json::Value],
236) -> Result<(), ConnectorError> {
237    for (i, stage) in pipeline.iter().enumerate() {
238        if let Some(obj) = stage.as_object() {
239            // Check $project stages.
240            if let Some(proj) = obj.get("$project") {
241                if let Some(proj_obj) = proj.as_object() {
242                    if let Some(id_val) = proj_obj.get("_id") {
243                        // _id: 0 or _id: false means exclusion.
244                        if id_val == &serde_json::Value::from(0)
245                            || id_val == &serde_json::Value::from(false)
246                        {
247                            return Err(ConnectorError::ConfigurationError(format!(
248                                "pipeline stage {i}: $project must not exclude _id field — \
249                                 MongoDB change streams require _id for resume tokens"
250                            )));
251                        }
252                    }
253                }
254            }
255
256            // Check $unset stages.
257            if let Some(unset) = obj.get("$unset") {
258                let fields: Vec<&str> = match unset {
259                    serde_json::Value::String(s) => vec![s.as_str()],
260                    serde_json::Value::Array(arr) => {
261                        arr.iter().filter_map(serde_json::Value::as_str).collect()
262                    }
263                    _ => Vec::new(),
264                };
265                if fields.contains(&"_id") {
266                    return Err(ConnectorError::ConfigurationError(format!(
267                        "pipeline stage {i}: $unset must not remove _id field"
268                    )));
269                }
270            }
271
272            // Check $addFields / $set that overwrite _id.
273            for op in &["$addFields", "$set"] {
274                if let Some(fields) = obj.get(*op) {
275                    if let Some(fields_obj) = fields.as_object() {
276                        if fields_obj.contains_key("_id") {
277                            return Err(ConnectorError::ConfigurationError(format!(
278                                "pipeline stage {i}: {op} must not modify _id field"
279                            )));
280                        }
281                    }
282                }
283            }
284        }
285    }
286    Ok(())
287}
288
289/// Write concern level for `MongoDB` write operations.
290#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub enum WriteConcernLevel {
293    /// Write acknowledged by the majority of replica set members.
294    #[default]
295    Majority,
296    /// Write acknowledged by the specified number of nodes.
297    Nodes(u32),
298}
299
300/// Write concern configuration for `MongoDB` operations.
301#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
302pub struct WriteConcernConfig {
303    /// The write concern level.
304    #[serde(default)]
305    pub w: WriteConcernLevel,
306    /// Whether to wait for journal commit.
307    #[serde(default = "default_journal")]
308    pub journal: bool,
309    /// Write concern timeout in milliseconds.
310    pub timeout_ms: Option<u64>,
311}
312
313fn default_journal() -> bool {
314    true
315}
316
317impl Default for WriteConcernConfig {
318    fn default() -> Self {
319        Self {
320            w: WriteConcernLevel::default(),
321            journal: true,
322            timeout_ms: None,
323        }
324    }
325}
326
327/// Configuration for the `MongoDB` sink connector.
328#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
329pub struct MongoDbSinkConfig {
330    /// `MongoDB` connection URI.
331    pub connection_uri: String,
332
333    /// Target database name.
334    pub database: String,
335
336    /// Target collection name.
337    pub collection: String,
338
339    /// Whether the target is a standard or time series collection.
340    #[serde(default)]
341    pub collection_kind: CollectionKind,
342
343    /// Write operation mode.
344    #[serde(default)]
345    pub write_mode: WriteMode,
346
347    /// Maximum documents per batch flush.
348    #[serde(default = "default_sink_batch_size")]
349    pub batch_size: usize,
350
351    /// Maximum time between flushes in milliseconds.
352    #[serde(default = "default_flush_interval_ms")]
353    pub flush_interval_ms: u64,
354
355    /// `true` = ordered bulk write (fail-fast); `false` = unordered
356    /// (higher throughput).
357    ///
358    /// Ordered writes stop on the first error, preserving operation order.
359    /// Unordered writes attempt all operations and report all failures,
360    /// achieving higher throughput at the cost of non-deterministic ordering.
361    #[serde(default = "default_ordered")]
362    pub ordered: bool,
363
364    /// Write concern configuration.
365    #[serde(default)]
366    pub write_concern: WriteConcernConfig,
367}
368
369fn default_sink_batch_size() -> usize {
370    500
371}
372
373fn default_flush_interval_ms() -> u64 {
374    1000
375}
376
377fn default_ordered() -> bool {
378    true
379}
380
381impl Default for MongoDbSinkConfig {
382    fn default() -> Self {
383        Self {
384            connection_uri: "mongodb://localhost:27017".to_string(),
385            database: String::new(),
386            collection: String::new(),
387            collection_kind: CollectionKind::default(),
388            write_mode: WriteMode::default(),
389            batch_size: default_sink_batch_size(),
390            flush_interval_ms: default_flush_interval_ms(),
391            ordered: default_ordered(),
392            write_concern: WriteConcernConfig::default(),
393        }
394    }
395}
396
397impl MongoDbSinkConfig {
398    /// Creates a new sink config with required fields.
399    #[must_use]
400    pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
401        Self {
402            connection_uri: connection_uri.to_string(),
403            database: database.to_string(),
404            collection: collection.to_string(),
405            ..Self::default()
406        }
407    }
408
409    /// Returns the flush interval as a `Duration`.
410    #[must_use]
411    pub fn flush_interval(&self) -> Duration {
412        Duration::from_millis(self.flush_interval_ms)
413    }
414
415    /// Validates the configuration.
416    ///
417    /// # Errors
418    ///
419    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
420    pub fn validate(&self) -> Result<(), ConnectorError> {
421        crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
422        crate::config::require_non_empty(&self.database, "database")?;
423        crate::config::require_non_empty(&self.collection, "collection")?;
424
425        if self.batch_size == 0 {
426            return Err(ConnectorError::ConfigurationError(
427                "batch_size must be > 0".to_string(),
428            ));
429        }
430
431        // Time series collections only support Insert mode.
432        if let CollectionKind::TimeSeries(_) = &self.collection_kind {
433            super::write_model::validate_timeseries_write_mode(&self.write_mode)?;
434        }
435
436        Ok(())
437    }
438
439    /// Parses configuration from a generic [`ConnectorConfig`].
440    ///
441    /// # Errors
442    ///
443    /// Returns `ConnectorError` if required keys are missing or invalid.
444    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
445        let mut cfg = Self {
446            connection_uri: config.require("connection.uri")?.to_string(),
447            database: config.require("database")?.to_string(),
448            collection: config.require("collection")?.to_string(),
449            ..Self::default()
450        };
451
452        if let Some(batch) = config.get_parsed::<usize>("batch.size")? {
453            cfg.batch_size = batch;
454        }
455        if let Some(interval) = config.get_parsed::<u64>("flush.interval.ms")? {
456            cfg.flush_interval_ms = interval;
457        }
458        if let Some(ordered) = config.get_parsed::<bool>("ordered")? {
459            cfg.ordered = ordered;
460        }
461        if let Some(mode) = config.get("write.mode") {
462            cfg.write_mode = match mode {
463                "insert" => WriteMode::Insert,
464                "cdc_replay" | "cdc-replay" => WriteMode::CdcReplay,
465                "upsert" => {
466                    let keys = config
467                        .require("write.mode.key_fields")?
468                        .split(',')
469                        .map(|s| s.trim().to_string())
470                        .collect();
471                    WriteMode::Upsert { key_fields: keys }
472                }
473                "replace" => WriteMode::Replace {
474                    upsert_on_missing: config
475                        .get_parsed::<bool>("write.mode.upsert_on_missing")?
476                        .unwrap_or(false),
477                },
478                other => {
479                    return Err(ConnectorError::ConfigurationError(format!(
480                        "unknown write.mode: {other}"
481                    )));
482                }
483            };
484        }
485        if let Some(journal) = config.get_parsed::<bool>("write_concern.journal")? {
486            cfg.write_concern.journal = journal;
487        }
488        if let Some(timeout) = config.get_parsed::<u64>("write_concern.timeout_ms")? {
489            cfg.write_concern.timeout_ms = Some(timeout);
490        }
491
492        cfg.validate()?;
493        Ok(cfg)
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    // ── Source config tests ──
502
503    #[test]
504    fn test_source_config_default() {
505        let cfg = MongoDbSourceConfig::default();
506        assert_eq!(cfg.connection_uri, "mongodb://localhost:27017");
507        assert_eq!(cfg.full_document_mode, FullDocumentMode::Delta);
508        assert!(!cfg.split_large_events);
509        assert_eq!(cfg.max_poll_records, 1000);
510    }
511
512    #[test]
513    fn test_source_config_new() {
514        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
515        assert_eq!(cfg.connection_uri, "mongodb://db:27017");
516        assert_eq!(cfg.database, "mydb");
517        assert_eq!(cfg.collection, "users");
518    }
519
520    #[test]
521    fn test_source_config_database_watch() {
522        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "*");
523        assert!(cfg.is_database_watch());
524
525        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
526        assert!(!cfg.is_database_watch());
527    }
528
529    #[test]
530    fn test_source_config_validate_empty_uri() {
531        let mut cfg = MongoDbSourceConfig::new("", "db", "coll");
532        let err = cfg.validate().unwrap_err();
533        assert!(err.to_string().contains("connection_uri"));
534    }
535
536    #[test]
537    fn test_source_config_validate_empty_database() {
538        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "", "coll");
539        let err = cfg.validate().unwrap_err();
540        assert!(err.to_string().contains("database"));
541    }
542
543    #[test]
544    fn test_source_config_validate_zero_max_poll() {
545        let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
546        cfg.max_poll_records = 0;
547        let err = cfg.validate().unwrap_err();
548        assert!(err.to_string().contains("max_poll_records"));
549    }
550
551    #[test]
552    fn test_source_config_split_large_events_rejected() {
553        let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
554        cfg.split_large_events = true;
555        let err = cfg.validate().unwrap_err();
556        assert!(err.to_string().contains("split_large_events"));
557    }
558
559    #[test]
560    fn test_source_config_from_connector_config() {
561        let mut config = ConnectorConfig::new("mongodb-cdc");
562        config.set("connection.uri", "mongodb://host:27017");
563        config.set("database", "testdb");
564        config.set("collection", "events");
565        config.set("full.document.mode", "update_lookup");
566        config.set("max.poll.records", "500");
567
568        let cfg = MongoDbSourceConfig::from_config(&config).unwrap();
569        assert_eq!(cfg.connection_uri, "mongodb://host:27017");
570        assert_eq!(cfg.database, "testdb");
571        assert_eq!(cfg.collection, "events");
572        assert_eq!(cfg.full_document_mode, FullDocumentMode::UpdateLookup);
573        assert_eq!(cfg.max_poll_records, 500);
574    }
575
576    #[test]
577    fn test_source_config_from_config_missing_required() {
578        let config = ConnectorConfig::new("mongodb-cdc");
579        assert!(MongoDbSourceConfig::from_config(&config).is_err());
580    }
581
582    // ── Pipeline validation tests ──
583
584    #[test]
585    fn test_pipeline_valid_match() {
586        let pipeline = vec![serde_json::json!({
587            "$match": { "operationType": "insert" }
588        })];
589        validate_pipeline_no_id_modification(&pipeline).unwrap();
590    }
591
592    #[test]
593    fn test_pipeline_id_excluded_in_project() {
594        let pipeline = vec![serde_json::json!({
595            "$project": { "_id": 0 }
596        })];
597        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
598        assert!(err.to_string().contains("_id"));
599    }
600
601    #[test]
602    fn test_pipeline_id_excluded_false() {
603        let pipeline = vec![serde_json::json!({
604            "$project": { "_id": false }
605        })];
606        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
607        assert!(err.to_string().contains("_id"));
608    }
609
610    #[test]
611    fn test_pipeline_id_unset() {
612        let pipeline = vec![serde_json::json!({ "$unset": "_id" })];
613        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
614        assert!(err.to_string().contains("_id"));
615    }
616
617    #[test]
618    fn test_pipeline_id_unset_array() {
619        let pipeline = vec![serde_json::json!({ "$unset": ["_id", "other"] })];
620        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
621        assert!(err.to_string().contains("_id"));
622    }
623
624    #[test]
625    fn test_pipeline_id_addfields() {
626        let pipeline = vec![serde_json::json!({
627            "$addFields": { "_id": "overwritten" }
628        })];
629        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
630        assert!(err.to_string().contains("_id"));
631    }
632
633    #[test]
634    fn test_pipeline_id_set() {
635        let pipeline = vec![serde_json::json!({
636            "$set": { "_id": "overwritten" }
637        })];
638        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
639        assert!(err.to_string().contains("_id"));
640    }
641
642    #[test]
643    fn test_pipeline_valid_project_includes_id() {
644        let pipeline = vec![serde_json::json!({
645            "$project": { "_id": 1, "name": 1 }
646        })];
647        validate_pipeline_no_id_modification(&pipeline).unwrap();
648    }
649
650    // ── Full document mode tests ──
651
652    #[test]
653    fn test_full_document_mode_fromstr() {
654        assert_eq!(
655            "delta".parse::<FullDocumentMode>().unwrap(),
656            FullDocumentMode::Delta
657        );
658        assert_eq!(
659            "update_lookup".parse::<FullDocumentMode>().unwrap(),
660            FullDocumentMode::UpdateLookup
661        );
662        assert_eq!(
663            "updatelookup".parse::<FullDocumentMode>().unwrap(),
664            FullDocumentMode::UpdateLookup
665        );
666        assert_eq!(
667            "required".parse::<FullDocumentMode>().unwrap(),
668            FullDocumentMode::RequirePostImage
669        );
670        assert_eq!(
671            "when_available".parse::<FullDocumentMode>().unwrap(),
672            FullDocumentMode::WhenAvailable
673        );
674        assert!("bad".parse::<FullDocumentMode>().is_err());
675    }
676
677    #[test]
678    fn test_full_document_mode_display() {
679        assert_eq!(FullDocumentMode::Delta.to_string(), "delta");
680        assert_eq!(FullDocumentMode::UpdateLookup.to_string(), "updatelookup");
681    }
682
683    // ── Sink config tests ──
684
685    #[test]
686    fn test_sink_config_default() {
687        let cfg = MongoDbSinkConfig::default();
688        assert_eq!(cfg.batch_size, 500);
689        assert_eq!(cfg.flush_interval_ms, 1000);
690        assert!(cfg.ordered);
691        assert!(matches!(cfg.collection_kind, CollectionKind::Standard));
692    }
693
694    #[test]
695    fn test_sink_config_new() {
696        let cfg = MongoDbSinkConfig::new("mongodb://db:27017", "mydb", "events");
697        assert_eq!(cfg.connection_uri, "mongodb://db:27017");
698        assert_eq!(cfg.database, "mydb");
699        assert_eq!(cfg.collection, "events");
700    }
701
702    #[test]
703    fn test_sink_config_validate_empty_uri() {
704        let cfg = MongoDbSinkConfig::new("", "db", "coll");
705        let err = cfg.validate().unwrap_err();
706        assert!(err.to_string().contains("connection_uri"));
707    }
708
709    #[test]
710    fn test_sink_config_validate_zero_batch_size() {
711        let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "coll");
712        cfg.batch_size = 0;
713        let err = cfg.validate().unwrap_err();
714        assert!(err.to_string().contains("batch_size"));
715    }
716
717    #[test]
718    fn test_sink_config_timeseries_upsert_rejected() {
719        let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "ts");
720        cfg.collection_kind =
721            CollectionKind::TimeSeries(super::super::timeseries::TimeSeriesConfig {
722                time_field: "ts".to_string(),
723                meta_field: None,
724                granularity: super::super::timeseries::TimeSeriesGranularity::Seconds,
725                expire_after_seconds: None,
726            });
727        cfg.write_mode = WriteMode::Upsert {
728            key_fields: vec!["id".to_string()],
729        };
730        let err = cfg.validate().unwrap_err();
731        assert!(err.to_string().contains("time series"));
732    }
733
734    #[test]
735    fn test_sink_config_flush_interval() {
736        let cfg = MongoDbSinkConfig::default();
737        assert_eq!(cfg.flush_interval(), Duration::from_secs(1));
738    }
739
740    #[test]
741    fn test_sink_config_from_connector_config() {
742        let mut config = ConnectorConfig::new("mongodb-sink");
743        config.set("connection.uri", "mongodb://host:27017");
744        config.set("database", "testdb");
745        config.set("collection", "out");
746        config.set("batch.size", "1000");
747        config.set("ordered", "false");
748
749        let cfg = MongoDbSinkConfig::from_config(&config).unwrap();
750        assert_eq!(cfg.batch_size, 1000);
751        assert!(!cfg.ordered);
752    }
753
754    #[test]
755    fn test_write_concern_default() {
756        let wc = WriteConcernConfig::default();
757        assert!(matches!(wc.w, WriteConcernLevel::Majority));
758        assert!(wc.journal);
759        assert!(wc.timeout_ms.is_none());
760    }
761}