Skip to main content

laminar_connectors/mongodb/
config.rs

1//! `MongoDB` connector configuration.
2//!
3//! Provides [`MongoDbSourceConfig`] for the CDC change stream source and
4//! [`MongoDbSinkConfig`] for the write sink. Both support construction
5//! from a generic [`ConnectorConfig`] key-value map and programmatic builders.
6//!
7//! # Pipeline Validation
8//!
9//! The source config validates that user-supplied aggregation pipeline stages
10//! do not modify the `_id` field. `MongoDB` 8.x throws a server-side error if
11//! `_id` is projected away; validating at construction prevents runtime failures.
12
13use std::time::Duration;
14
15use crate::config::ConnectorConfig;
16use crate::error::ConnectorError;
17
18use super::resume_token::ResumeTokenStoreConfig;
19use super::timeseries::{CollectionKind, TimeSeriesConfig, TimeSeriesGranularity};
20use super::write_model::WriteMode;
21
22/// Mode for requesting full documents on update events.
23///
24/// Controls the `fullDocument` option on the change stream cursor.
25/// The choice has significant correctness and performance implications.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum FullDocumentMode {
29    /// Default: only the delta (`updateDescription`) for update events.
30    /// `fullDocument` is `None` on updates.
31    #[default]
32    Delta,
33
34    /// `fullDocument: "updateLookup"` — fetches the current document from
35    /// the collection on update events.
36    ///
37    /// **Warning**: Dangerous on high-churn collections with `$match` filters.
38    /// If the matched document is quickly deleted, the lookup returns `null`
39    /// and can cause "Resume Token Not Found" errors. Prefer `WhenAvailable`
40    /// for high-churn patterns.
41    UpdateLookup,
42
43    /// `fullDocument: "required"` — the collection must have
44    /// `changeStreamPreAndPostImages` enabled. Returns
45    /// `SourceError::PostImageNotEnabled` if not configured.
46    RequirePostImage,
47
48    /// `fullDocument: "whenAvailable"` — safest option for high-churn
49    /// collections. Returns the post-image when available, `null` otherwise.
50    WhenAvailable,
51}
52
53str_enum!(FullDocumentMode, lowercase, ConnectorError, "unknown full document mode",
54    Delta => "delta";
55    UpdateLookup => "updatelookup", "update_lookup";
56    RequirePostImage => "requirepostimage", "require_post_image", "required";
57    WhenAvailable => "whenavailable", "when_available"
58);
59
60/// Configuration for the `MongoDB` CDC source connector.
61#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub struct MongoDbSourceConfig {
63    /// `MongoDB` connection URI (e.g., `mongodb://host:27017`).
64    pub connection_uri: String,
65
66    /// Database name.
67    pub database: String,
68
69    /// Collection name, or `"*"` to watch all collections in the database.
70    pub collection: String,
71
72    /// Full document retrieval mode for update events.
73    pub full_document_mode: FullDocumentMode,
74
75    /// Additional aggregation pipeline stages injected before the change
76    /// stream opens. Callers can inject `$match`, `$project`, etc.
77    ///
78    /// **Constraint**: Pipeline stages must not modify the `_id` field.
79    /// This is validated at construction.
80    #[serde(default)]
81    pub pipeline: Vec<serde_json::Value>,
82
83    /// Whether to enable `$changeStreamSplitLargeEvent` (`MongoDB` ≥ 6.0.9).
84    #[serde(default)]
85    pub split_large_events: bool,
86
87    /// Resume token persistence configuration.
88    #[serde(default)]
89    pub resume_token_store: ResumeTokenStoreConfig,
90
91    /// `getMore` await timeout in milliseconds.
92    pub max_await_time_ms: Option<u64>,
93
94    /// Cursor batch size hint.
95    pub batch_size: Option<u32>,
96
97    /// For fresh starts with no persisted token, start at this operation time.
98    /// Encoded as `{ t: <seconds>, i: <increment> }`.
99    pub start_at_operation_time: Option<(u32, u32)>,
100
101    /// Maximum events to buffer before applying backpressure.
102    #[serde(default = "default_max_buffered_events")]
103    pub max_buffered_events: usize,
104
105    /// Maximum records to return per `poll_batch` call.
106    #[serde(default = "default_max_poll_records")]
107    pub max_poll_records: usize,
108}
109
110fn default_max_buffered_events() -> usize {
111    100_000
112}
113
114fn default_max_poll_records() -> usize {
115    1000
116}
117
118impl Default for MongoDbSourceConfig {
119    fn default() -> Self {
120        Self {
121            connection_uri: "mongodb://localhost:27017".to_string(),
122            database: String::new(),
123            collection: String::new(),
124            full_document_mode: FullDocumentMode::default(),
125            pipeline: Vec::new(),
126            split_large_events: false,
127            resume_token_store: ResumeTokenStoreConfig::default(),
128            max_await_time_ms: Some(1000),
129            batch_size: Some(1000),
130            start_at_operation_time: None,
131            max_buffered_events: default_max_buffered_events(),
132            max_poll_records: default_max_poll_records(),
133        }
134    }
135}
136
137impl MongoDbSourceConfig {
138    /// Creates a new source config with required fields.
139    #[must_use]
140    pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
141        Self {
142            connection_uri: connection_uri.to_string(),
143            database: database.to_string(),
144            collection: collection.to_string(),
145            ..Self::default()
146        }
147    }
148
149    /// Returns `true` if the source is configured to watch all collections.
150    #[must_use]
151    pub fn is_database_watch(&self) -> bool {
152        self.collection == "*"
153    }
154
155    /// Validates the configuration.
156    ///
157    /// # Errors
158    ///
159    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
160    pub fn validate(&self) -> Result<(), ConnectorError> {
161        crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
162        crate::config::require_non_empty(&self.database, "database")?;
163        crate::config::require_non_empty(&self.collection, "collection")?;
164
165        if self.max_poll_records == 0 {
166            return Err(ConnectorError::ConfigurationError(
167                "max_poll_records must be > 0".to_string(),
168            ));
169        }
170        if self.max_buffered_events == 0 {
171            return Err(ConnectorError::ConfigurationError(
172                "max_buffered_events must be > 0".to_string(),
173            ));
174        }
175
176        // split_large_events requires raw change stream access not available
177        // in the mongodb v3 driver (ChangeStreamEvent drops unknown fields).
178        if self.split_large_events {
179            return Err(ConnectorError::ConfigurationError(
180                "split_large_events requires raw change stream access not supported \
181                 by the mongodb v3 driver; set split_large_events = false"
182                    .to_string(),
183            ));
184        }
185
186        // Validate pipeline does not modify the _id field.
187        validate_pipeline_no_id_modification(&self.pipeline)?;
188
189        Ok(())
190    }
191
192    /// Parses configuration from a generic [`ConnectorConfig`].
193    ///
194    /// # Errors
195    ///
196    /// Returns `ConnectorError` if required keys are missing or invalid.
197    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
198        let mut cfg = Self {
199            connection_uri: config.require("connection.uri")?.to_string(),
200            database: config.require("database")?.to_string(),
201            collection: config.require("collection")?.to_string(),
202            ..Self::default()
203        };
204
205        if let Some(mode) = config.get_parsed::<FullDocumentMode>("full.document.mode")? {
206            cfg.full_document_mode = mode;
207        }
208        if let Some(split) = config.get_parsed::<bool>("split.large.events")? {
209            cfg.split_large_events = split;
210        }
211        if let Some(timeout) = config.get_parsed::<u64>("max.await.time.ms")? {
212            cfg.max_await_time_ms = Some(timeout);
213        }
214        if let Some(batch) = config.get_parsed::<u32>("batch.size")? {
215            cfg.batch_size = Some(batch);
216        }
217        if let Some(max) = config.get_parsed::<usize>("max.poll.records")? {
218            cfg.max_poll_records = max;
219        }
220        if let Some(max) = config.get_parsed::<usize>("max.buffered.events")? {
221            cfg.max_buffered_events = max;
222        }
223
224        cfg.validate()?;
225        Ok(cfg)
226    }
227}
228
229/// Validates that a pipeline does not modify the `_id` field.
230///
231/// `MongoDB` 8.x throws a server error if `_id` is projected away or
232/// modified in change stream pipeline stages. We fail fast at config
233/// time instead.
234fn validate_pipeline_no_id_modification(
235    pipeline: &[serde_json::Value],
236) -> Result<(), ConnectorError> {
237    for (i, stage) in pipeline.iter().enumerate() {
238        if let Some(obj) = stage.as_object() {
239            // Check $project stages.
240            if let Some(proj) = obj.get("$project") {
241                if let Some(proj_obj) = proj.as_object() {
242                    if let Some(id_val) = proj_obj.get("_id") {
243                        // _id: 0 or _id: false means exclusion.
244                        if id_val == &serde_json::Value::from(0)
245                            || id_val == &serde_json::Value::from(false)
246                        {
247                            return Err(ConnectorError::ConfigurationError(format!(
248                                "pipeline stage {i}: $project must not exclude _id field — \
249                                 MongoDB change streams require _id for resume tokens"
250                            )));
251                        }
252                    }
253                }
254            }
255
256            // Check $unset stages.
257            if let Some(unset) = obj.get("$unset") {
258                let fields: Vec<&str> = match unset {
259                    serde_json::Value::String(s) => vec![s.as_str()],
260                    serde_json::Value::Array(arr) => {
261                        arr.iter().filter_map(serde_json::Value::as_str).collect()
262                    }
263                    _ => Vec::new(),
264                };
265                if fields.contains(&"_id") {
266                    return Err(ConnectorError::ConfigurationError(format!(
267                        "pipeline stage {i}: $unset must not remove _id field"
268                    )));
269                }
270            }
271
272            // Check $addFields / $set that overwrite _id.
273            for op in &["$addFields", "$set"] {
274                if let Some(fields) = obj.get(*op) {
275                    if let Some(fields_obj) = fields.as_object() {
276                        if fields_obj.contains_key("_id") {
277                            return Err(ConnectorError::ConfigurationError(format!(
278                                "pipeline stage {i}: {op} must not modify _id field"
279                            )));
280                        }
281                    }
282                }
283            }
284        }
285    }
286    Ok(())
287}
288
289/// Write concern level for `MongoDB` write operations.
290#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub enum WriteConcernLevel {
293    /// Write acknowledged by the majority of replica set members.
294    #[default]
295    Majority,
296    /// Write acknowledged by the specified number of nodes.
297    Nodes(u32),
298}
299
300/// Write concern configuration for `MongoDB` operations.
301#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
302pub struct WriteConcernConfig {
303    /// The write concern level.
304    #[serde(default)]
305    pub w: WriteConcernLevel,
306    /// Whether to wait for journal commit.
307    #[serde(default = "default_journal")]
308    pub journal: bool,
309    /// Write concern timeout in milliseconds.
310    pub timeout_ms: Option<u64>,
311}
312
313fn default_journal() -> bool {
314    true
315}
316
317impl Default for WriteConcernConfig {
318    fn default() -> Self {
319        Self {
320            w: WriteConcernLevel::default(),
321            journal: true,
322            timeout_ms: None,
323        }
324    }
325}
326
327/// Configuration for the `MongoDB` sink connector.
328#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
329pub struct MongoDbSinkConfig {
330    /// `MongoDB` connection URI.
331    pub connection_uri: String,
332
333    /// Target database name.
334    pub database: String,
335
336    /// Target collection name.
337    pub collection: String,
338
339    /// Whether the target is a standard or time series collection.
340    #[serde(default)]
341    pub collection_kind: CollectionKind,
342
343    /// Write operation mode.
344    #[serde(default)]
345    pub write_mode: WriteMode,
346
347    /// Maximum documents per batch flush.
348    #[serde(default = "default_sink_batch_size")]
349    pub batch_size: usize,
350
351    /// Maximum time between flushes in milliseconds.
352    #[serde(default = "default_flush_interval_ms")]
353    pub flush_interval_ms: u64,
354
355    /// `true` = ordered bulk write (fail-fast); `false` = unordered
356    /// (higher throughput).
357    ///
358    /// Ordered writes stop on the first error, preserving operation order.
359    /// Unordered writes attempt all operations and report all failures,
360    /// achieving higher throughput at the cost of non-deterministic ordering.
361    #[serde(default = "default_ordered")]
362    pub ordered: bool,
363
364    /// Write concern configuration.
365    #[serde(default)]
366    pub write_concern: WriteConcernConfig,
367}
368
369fn default_sink_batch_size() -> usize {
370    500
371}
372
373fn default_flush_interval_ms() -> u64 {
374    1000
375}
376
377fn default_ordered() -> bool {
378    true
379}
380
381impl Default for MongoDbSinkConfig {
382    fn default() -> Self {
383        Self {
384            connection_uri: "mongodb://localhost:27017".to_string(),
385            database: String::new(),
386            collection: String::new(),
387            collection_kind: CollectionKind::default(),
388            write_mode: WriteMode::default(),
389            batch_size: default_sink_batch_size(),
390            flush_interval_ms: default_flush_interval_ms(),
391            ordered: default_ordered(),
392            write_concern: WriteConcernConfig::default(),
393        }
394    }
395}
396
397impl MongoDbSinkConfig {
398    /// Creates a new sink config with required fields.
399    #[must_use]
400    pub fn new(connection_uri: &str, database: &str, collection: &str) -> Self {
401        Self {
402            connection_uri: connection_uri.to_string(),
403            database: database.to_string(),
404            collection: collection.to_string(),
405            ..Self::default()
406        }
407    }
408
409    /// Returns the flush interval as a `Duration`.
410    #[must_use]
411    pub fn flush_interval(&self) -> Duration {
412        Duration::from_millis(self.flush_interval_ms)
413    }
414
415    /// Validates the configuration.
416    ///
417    /// # Errors
418    ///
419    /// Returns `ConnectorError::ConfigurationError` for invalid settings.
420    pub fn validate(&self) -> Result<(), ConnectorError> {
421        crate::config::require_non_empty(&self.connection_uri, "connection_uri")?;
422        crate::config::require_non_empty(&self.database, "database")?;
423        crate::config::require_non_empty(&self.collection, "collection")?;
424
425        if self.batch_size == 0 {
426            return Err(ConnectorError::ConfigurationError(
427                "batch_size must be > 0".to_string(),
428            ));
429        }
430
431        // Time series collections only support Insert mode.
432        if let CollectionKind::TimeSeries(_) = &self.collection_kind {
433            super::write_model::validate_timeseries_write_mode(&self.write_mode)?;
434        }
435
436        Ok(())
437    }
438
439    /// Parses configuration from a generic [`ConnectorConfig`].
440    ///
441    /// # Errors
442    ///
443    /// Returns `ConnectorError` if required keys are missing or invalid.
444    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
445        let mut cfg = Self {
446            connection_uri: config.require("connection.uri")?.to_string(),
447            database: config.require("database")?.to_string(),
448            collection: config.require("collection")?.to_string(),
449            ..Self::default()
450        };
451
452        if let Some(batch) = config.get_parsed::<usize>("batch.size")? {
453            cfg.batch_size = batch;
454        }
455        if let Some(interval) = config.get_parsed::<u64>("flush.interval.ms")? {
456            cfg.flush_interval_ms = interval;
457        }
458        if let Some(ordered) = config.get_parsed::<bool>("ordered")? {
459            cfg.ordered = ordered;
460        }
461        if let Some(mode) = config.get("write.mode") {
462            cfg.write_mode = match mode {
463                "insert" => WriteMode::Insert,
464                "cdc_replay" | "cdc-replay" => WriteMode::CdcReplay,
465                "upsert" => {
466                    let keys = config
467                        .require("write.mode.key_fields")?
468                        .split(',')
469                        .map(|s| s.trim().to_string())
470                        .collect();
471                    WriteMode::Upsert { key_fields: keys }
472                }
473                "replace" => WriteMode::Replace {
474                    upsert_on_missing: config
475                        .get_parsed::<bool>("write.mode.upsert_on_missing")?
476                        .unwrap_or(false),
477                },
478                other => {
479                    return Err(ConnectorError::ConfigurationError(format!(
480                        "unknown write.mode: {other}"
481                    )));
482                }
483            };
484        }
485        if let Some(journal) = config.get_parsed::<bool>("write_concern.journal")? {
486            cfg.write_concern.journal = journal;
487        }
488        if let Some(timeout) = config.get_parsed::<u64>("write_concern.timeout_ms")? {
489            cfg.write_concern.timeout_ms = Some(timeout);
490        }
491
492        if let Some(time_field) = config.get("timeseries.time_field") {
493            if time_field.trim().is_empty() {
494                return Err(ConnectorError::ConfigurationError(
495                    "timeseries.time_field must not be empty".to_string(),
496                ));
497            }
498            let meta_field = config.get("timeseries.meta_field").map(String::from);
499            let granularity = if let Some(gran_str) = config.get("timeseries.granularity") {
500                match gran_str.to_lowercase().as_str() {
501                    "seconds" => TimeSeriesGranularity::Seconds,
502                    "minutes" => TimeSeriesGranularity::Minutes,
503                    "hours" => TimeSeriesGranularity::Hours,
504                    "custom" => {
505                        let span =
506                            config.require_parsed::<u32>("timeseries.bucket_max_span_seconds")?;
507                        let rounding =
508                            config.require_parsed::<u32>("timeseries.bucket_rounding_seconds")?;
509                        TimeSeriesGranularity::custom(span, rounding)?
510                    }
511                    other => {
512                        return Err(ConnectorError::ConfigurationError(format!(
513                            "unknown timeseries granularity: {other}"
514                        )));
515                    }
516                }
517            } else {
518                TimeSeriesGranularity::Seconds
519            };
520
521            let expire_after_seconds =
522                config.get_parsed::<u64>("timeseries.expire_after_seconds")?;
523
524            cfg.collection_kind = CollectionKind::TimeSeries(TimeSeriesConfig {
525                time_field: time_field.to_string(),
526                meta_field,
527                granularity,
528                expire_after_seconds,
529            });
530        }
531
532        cfg.validate()?;
533        Ok(cfg)
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    // ── Source config tests ──
542
543    #[test]
544    fn test_source_config_default() {
545        let cfg = MongoDbSourceConfig::default();
546        assert_eq!(cfg.connection_uri, "mongodb://localhost:27017");
547        assert_eq!(cfg.full_document_mode, FullDocumentMode::Delta);
548        assert!(!cfg.split_large_events);
549        assert_eq!(cfg.max_poll_records, 1000);
550    }
551
552    #[test]
553    fn test_source_config_new() {
554        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
555        assert_eq!(cfg.connection_uri, "mongodb://db:27017");
556        assert_eq!(cfg.database, "mydb");
557        assert_eq!(cfg.collection, "users");
558    }
559
560    #[test]
561    fn test_source_config_database_watch() {
562        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "*");
563        assert!(cfg.is_database_watch());
564
565        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "mydb", "users");
566        assert!(!cfg.is_database_watch());
567    }
568
569    #[test]
570    fn test_source_config_validate_empty_uri() {
571        let mut cfg = MongoDbSourceConfig::new("", "db", "coll");
572        let err = cfg.validate().unwrap_err();
573        assert!(err.to_string().contains("connection_uri"));
574    }
575
576    #[test]
577    fn test_source_config_validate_empty_database() {
578        let cfg = MongoDbSourceConfig::new("mongodb://db:27017", "", "coll");
579        let err = cfg.validate().unwrap_err();
580        assert!(err.to_string().contains("database"));
581    }
582
583    #[test]
584    fn test_source_config_validate_zero_max_poll() {
585        let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
586        cfg.max_poll_records = 0;
587        let err = cfg.validate().unwrap_err();
588        assert!(err.to_string().contains("max_poll_records"));
589    }
590
591    #[test]
592    fn test_source_config_split_large_events_rejected() {
593        let mut cfg = MongoDbSourceConfig::new("mongodb://db:27017", "db", "coll");
594        cfg.split_large_events = true;
595        let err = cfg.validate().unwrap_err();
596        assert!(err.to_string().contains("split_large_events"));
597    }
598
599    #[test]
600    fn test_source_config_from_connector_config() {
601        let mut config = ConnectorConfig::new("mongodb-cdc");
602        config.set("connection.uri", "mongodb://host:27017");
603        config.set("database", "testdb");
604        config.set("collection", "events");
605        config.set("full.document.mode", "update_lookup");
606        config.set("max.poll.records", "500");
607
608        let cfg = MongoDbSourceConfig::from_config(&config).unwrap();
609        assert_eq!(cfg.connection_uri, "mongodb://host:27017");
610        assert_eq!(cfg.database, "testdb");
611        assert_eq!(cfg.collection, "events");
612        assert_eq!(cfg.full_document_mode, FullDocumentMode::UpdateLookup);
613        assert_eq!(cfg.max_poll_records, 500);
614    }
615
616    #[test]
617    fn test_source_config_from_config_missing_required() {
618        let config = ConnectorConfig::new("mongodb-cdc");
619        assert!(MongoDbSourceConfig::from_config(&config).is_err());
620    }
621
622    // ── Pipeline validation tests ──
623
624    #[test]
625    fn test_pipeline_valid_match() {
626        let pipeline = vec![serde_json::json!({
627            "$match": { "operationType": "insert" }
628        })];
629        validate_pipeline_no_id_modification(&pipeline).unwrap();
630    }
631
632    #[test]
633    fn test_pipeline_id_excluded_in_project() {
634        let pipeline = vec![serde_json::json!({
635            "$project": { "_id": 0 }
636        })];
637        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
638        assert!(err.to_string().contains("_id"));
639    }
640
641    #[test]
642    fn test_pipeline_id_excluded_false() {
643        let pipeline = vec![serde_json::json!({
644            "$project": { "_id": false }
645        })];
646        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
647        assert!(err.to_string().contains("_id"));
648    }
649
650    #[test]
651    fn test_pipeline_id_unset() {
652        let pipeline = vec![serde_json::json!({ "$unset": "_id" })];
653        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
654        assert!(err.to_string().contains("_id"));
655    }
656
657    #[test]
658    fn test_pipeline_id_unset_array() {
659        let pipeline = vec![serde_json::json!({ "$unset": ["_id", "other"] })];
660        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
661        assert!(err.to_string().contains("_id"));
662    }
663
664    #[test]
665    fn test_pipeline_id_addfields() {
666        let pipeline = vec![serde_json::json!({
667            "$addFields": { "_id": "overwritten" }
668        })];
669        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
670        assert!(err.to_string().contains("_id"));
671    }
672
673    #[test]
674    fn test_pipeline_id_set() {
675        let pipeline = vec![serde_json::json!({
676            "$set": { "_id": "overwritten" }
677        })];
678        let err = validate_pipeline_no_id_modification(&pipeline).unwrap_err();
679        assert!(err.to_string().contains("_id"));
680    }
681
682    #[test]
683    fn test_pipeline_valid_project_includes_id() {
684        let pipeline = vec![serde_json::json!({
685            "$project": { "_id": 1, "name": 1 }
686        })];
687        validate_pipeline_no_id_modification(&pipeline).unwrap();
688    }
689
690    // ── Full document mode tests ──
691
692    #[test]
693    fn test_full_document_mode_fromstr() {
694        assert_eq!(
695            "delta".parse::<FullDocumentMode>().unwrap(),
696            FullDocumentMode::Delta
697        );
698        assert_eq!(
699            "update_lookup".parse::<FullDocumentMode>().unwrap(),
700            FullDocumentMode::UpdateLookup
701        );
702        assert_eq!(
703            "updatelookup".parse::<FullDocumentMode>().unwrap(),
704            FullDocumentMode::UpdateLookup
705        );
706        assert_eq!(
707            "required".parse::<FullDocumentMode>().unwrap(),
708            FullDocumentMode::RequirePostImage
709        );
710        assert_eq!(
711            "when_available".parse::<FullDocumentMode>().unwrap(),
712            FullDocumentMode::WhenAvailable
713        );
714        assert!("bad".parse::<FullDocumentMode>().is_err());
715    }
716
717    #[test]
718    fn test_full_document_mode_display() {
719        assert_eq!(FullDocumentMode::Delta.to_string(), "delta");
720        assert_eq!(FullDocumentMode::UpdateLookup.to_string(), "updatelookup");
721    }
722
723    // ── Sink config tests ──
724
725    #[test]
726    fn test_sink_config_default() {
727        let cfg = MongoDbSinkConfig::default();
728        assert_eq!(cfg.batch_size, 500);
729        assert_eq!(cfg.flush_interval_ms, 1000);
730        assert!(cfg.ordered);
731        assert!(matches!(cfg.collection_kind, CollectionKind::Standard));
732    }
733
734    #[test]
735    fn test_sink_config_new() {
736        let cfg = MongoDbSinkConfig::new("mongodb://db:27017", "mydb", "events");
737        assert_eq!(cfg.connection_uri, "mongodb://db:27017");
738        assert_eq!(cfg.database, "mydb");
739        assert_eq!(cfg.collection, "events");
740    }
741
742    #[test]
743    fn test_sink_config_validate_empty_uri() {
744        let cfg = MongoDbSinkConfig::new("", "db", "coll");
745        let err = cfg.validate().unwrap_err();
746        assert!(err.to_string().contains("connection_uri"));
747    }
748
749    #[test]
750    fn test_sink_config_validate_zero_batch_size() {
751        let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "coll");
752        cfg.batch_size = 0;
753        let err = cfg.validate().unwrap_err();
754        assert!(err.to_string().contains("batch_size"));
755    }
756
757    #[test]
758    fn test_sink_config_timeseries_upsert_rejected() {
759        let mut cfg = MongoDbSinkConfig::new("mongodb://db:27017", "db", "ts");
760        cfg.collection_kind =
761            CollectionKind::TimeSeries(super::super::timeseries::TimeSeriesConfig {
762                time_field: "ts".to_string(),
763                meta_field: None,
764                granularity: super::super::timeseries::TimeSeriesGranularity::Seconds,
765                expire_after_seconds: None,
766            });
767        cfg.write_mode = WriteMode::Upsert {
768            key_fields: vec!["id".to_string()],
769        };
770        let err = cfg.validate().unwrap_err();
771        assert!(err.to_string().contains("time series"));
772    }
773
774    #[test]
775    fn test_sink_config_flush_interval() {
776        let cfg = MongoDbSinkConfig::default();
777        assert_eq!(cfg.flush_interval(), Duration::from_secs(1));
778    }
779
780    #[test]
781    fn test_sink_config_from_connector_config() {
782        let mut config = ConnectorConfig::new("mongodb-sink");
783        config.set("connection.uri", "mongodb://host:27017");
784        config.set("database", "testdb");
785        config.set("collection", "out");
786        config.set("batch.size", "1000");
787        config.set("ordered", "false");
788
789        let cfg = MongoDbSinkConfig::from_config(&config).unwrap();
790        assert_eq!(cfg.batch_size, 1000);
791        assert!(!cfg.ordered);
792    }
793
794    #[test]
795    fn test_sink_config_timeseries_parsing() {
796        // Test standard granularity with metadata and TTL
797        let mut config = ConnectorConfig::new("mongodb-sink");
798        config.set("connection.uri", "mongodb://host:27017");
799        config.set("database", "testdb");
800        config.set("collection", "ts_out");
801        config.set("timeseries.time_field", "timestamp");
802        config.set("timeseries.meta_field", "sensor_id");
803        config.set("timeseries.granularity", "minutes");
804        config.set("timeseries.expire_after_seconds", "86400");
805
806        let cfg = MongoDbSinkConfig::from_config(&config).unwrap();
807        if let CollectionKind::TimeSeries(ts) = cfg.collection_kind {
808            assert_eq!(ts.time_field, "timestamp");
809            assert_eq!(ts.meta_field.as_deref(), Some("sensor_id"));
810            assert_eq!(ts.granularity, TimeSeriesGranularity::Minutes);
811            assert_eq!(ts.expire_after_seconds, Some(86400));
812        } else {
813            panic!("Expected TimeSeries collection kind");
814        }
815
816        // Test custom granularity
817        let mut config_custom = ConnectorConfig::new("mongodb-sink");
818        config_custom.set("connection.uri", "mongodb://host:27017");
819        config_custom.set("database", "testdb");
820        config_custom.set("collection", "ts_custom");
821        config_custom.set("timeseries.time_field", "timestamp");
822        config_custom.set("timeseries.granularity", "custom");
823        config_custom.set("timeseries.bucket_max_span_seconds", "3600");
824        config_custom.set("timeseries.bucket_rounding_seconds", "3600");
825
826        let cfg_custom = MongoDbSinkConfig::from_config(&config_custom).unwrap();
827        if let CollectionKind::TimeSeries(ts) = cfg_custom.collection_kind {
828            assert_eq!(ts.time_field, "timestamp");
829            assert_eq!(
830                ts.granularity,
831                TimeSeriesGranularity::Custom {
832                    bucket_max_span_seconds: 3600,
833                    bucket_rounding_seconds: 3600,
834                }
835            );
836        } else {
837            panic!("Expected TimeSeries collection kind");
838        }
839    }
840
841    #[test]
842    fn test_sink_config_timeseries_empty_time_field_rejected() {
843        let mut config = ConnectorConfig::new("mongodb-sink");
844        config.set("connection.uri", "mongodb://host:27017");
845        config.set("database", "testdb");
846        config.set("collection", "ts");
847        config.set("timeseries.time_field", "  ");
848        let err = MongoDbSinkConfig::from_config(&config).unwrap_err();
849        assert!(err.to_string().contains("time_field"));
850    }
851
852    #[test]
853    fn test_write_concern_default() {
854        let wc = WriteConcernConfig::default();
855        assert!(matches!(wc.w, WriteConcernLevel::Majority));
856        assert!(wc.journal);
857        assert!(wc.timeout_ms.is_none());
858    }
859}