Skip to main content

laminar_connectors/mongodb/
write_model.rs

1//! Write mode configuration for the `MongoDB` sink connector.
2//!
3//! Defines [`WriteMode`] which determines how incoming `RecordBatch` rows
4//! are translated into `MongoDB` write operations.
5
6use crate::error::ConnectorError;
7
8/// Write operation mode for the `MongoDB` sink.
9///
10/// Determines how incoming `RecordBatch` rows are translated into
11/// `MongoDB` write operations.
12#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
13#[serde(tag = "mode", rename_all = "snake_case")]
14pub enum WriteMode {
15    /// Append-only inserts using `insertOne` / `insertMany`.
16    #[default]
17    Insert,
18
19    /// Upsert by caller-supplied key fields. Uses `replaceOne` with
20    /// `upsert: true`, keyed by the specified fields.
21    Upsert {
22        /// Fields used to match existing documents for upsert.
23        key_fields: Vec<String>,
24    },
25
26    /// Full document replacement. Fails if the document is absent
27    /// unless `upsert_on_missing` is `true`.
28    Replace {
29        /// If `true`, insert the document when no match is found.
30        upsert_on_missing: bool,
31    },
32
33    /// Routes operations based on the incoming event's `operationType`.
34    ///
35    /// Only valid for `LaminarEvent<MongoDbChangeEvent>` (CDC fan-out
36    /// replication). Maps operations as follows:
37    ///
38    /// - `insert` → `insertOne`
39    /// - `update` → `updateOne` using `$set`/`$unset` from `updateDescription`
40    /// - `replace` → `replaceOne` with `upsert: true`
41    /// - `delete` → `deleteOne` using `documentKey._id`
42    /// - `drop`/`rename`/`invalidate` → lifecycle events, no write issued
43    CdcReplay,
44}
45
46/// Validates that a write mode is compatible with time series collections.
47///
48/// Time series collections only accept `Insert`. Any other mode returns
49/// an error.
50///
51/// # Errors
52///
53/// Returns `ConnectorError::ConfigurationError` if the mode is not `Insert`.
54pub fn validate_timeseries_write_mode(mode: &WriteMode) -> Result<(), ConnectorError> {
55    if matches!(mode, WriteMode::Insert) {
56        Ok(())
57    } else {
58        Err(ConnectorError::ConfigurationError(format!(
59            "time series collections only support Insert write mode, got: {mode:?}"
60        )))
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67
68    #[test]
69    fn test_write_mode_default() {
70        assert!(matches!(WriteMode::default(), WriteMode::Insert));
71    }
72
73    #[test]
74    fn test_validate_timeseries_insert_ok() {
75        validate_timeseries_write_mode(&WriteMode::Insert).unwrap();
76    }
77
78    #[test]
79    fn test_validate_timeseries_upsert_fails() {
80        let mode = WriteMode::Upsert {
81            key_fields: vec!["id".to_string()],
82        };
83        let err = validate_timeseries_write_mode(&mode).unwrap_err();
84        assert!(err.to_string().contains("time series"));
85    }
86
87    #[test]
88    fn test_validate_timeseries_cdc_replay_fails() {
89        let err = validate_timeseries_write_mode(&WriteMode::CdcReplay).unwrap_err();
90        assert!(err.to_string().contains("time series"));
91    }
92}