laminar_connectors/mongodb/write_model.rs
1//! Write mode configuration for the `MongoDB` sink connector.
2//!
3//! Defines [`WriteMode`] which determines how incoming `RecordBatch` rows
4//! are translated into `MongoDB` write operations.
5
6use crate::error::ConnectorError;
7
8/// Write operation mode for the `MongoDB` sink.
9///
10/// Determines how incoming `RecordBatch` rows are translated into
11/// `MongoDB` write operations.
12#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
13#[serde(tag = "mode", rename_all = "snake_case")]
14pub enum WriteMode {
15 /// Append-only inserts using `insertOne` / `insertMany`.
16 #[default]
17 Insert,
18
19 /// Upsert by caller-supplied key fields. Uses `replaceOne` with
20 /// `upsert: true`, keyed by the specified fields.
21 Upsert {
22 /// Fields used to match existing documents for upsert.
23 key_fields: Vec<String>,
24 },
25
26 /// Full document replacement. Fails if the document is absent
27 /// unless `upsert_on_missing` is `true`.
28 Replace {
29 /// If `true`, insert the document when no match is found.
30 upsert_on_missing: bool,
31 },
32
33 /// Routes operations based on the incoming event's `operationType`.
34 ///
35 /// Only valid for `LaminarEvent<MongoDbChangeEvent>` (CDC fan-out
36 /// replication). Maps operations as follows:
37 ///
38 /// - `insert` → `insertOne`
39 /// - `update` → `updateOne` using `$set`/`$unset` from `updateDescription`
40 /// - `replace` → `replaceOne` with `upsert: true`
41 /// - `delete` → `deleteOne` using `documentKey._id`
42 /// - `drop`/`rename`/`invalidate` → lifecycle events, no write issued
43 CdcReplay,
44}
45
46/// Validates that a write mode is compatible with time series collections.
47///
48/// Time series collections only accept `Insert`. Any other mode returns
49/// an error.
50///
51/// # Errors
52///
53/// Returns `ConnectorError::ConfigurationError` if the mode is not `Insert`.
54pub fn validate_timeseries_write_mode(mode: &WriteMode) -> Result<(), ConnectorError> {
55 if matches!(mode, WriteMode::Insert) {
56 Ok(())
57 } else {
58 Err(ConnectorError::ConfigurationError(format!(
59 "time series collections only support Insert write mode, got: {mode:?}"
60 )))
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67
68 #[test]
69 fn test_write_mode_default() {
70 assert!(matches!(WriteMode::default(), WriteMode::Insert));
71 }
72
73 #[test]
74 fn test_validate_timeseries_insert_ok() {
75 validate_timeseries_write_mode(&WriteMode::Insert).unwrap();
76 }
77
78 #[test]
79 fn test_validate_timeseries_upsert_fails() {
80 let mode = WriteMode::Upsert {
81 key_fields: vec!["id".to_string()],
82 };
83 let err = validate_timeseries_write_mode(&mode).unwrap_err();
84 assert!(err.to_string().contains("time series"));
85 }
86
87 #[test]
88 fn test_validate_timeseries_cdc_replay_fails() {
89 let err = validate_timeseries_write_mode(&WriteMode::CdcReplay).unwrap_err();
90 assert!(err.to_string().contains("time series"));
91 }
92}