Skip to main content

laminar_connectors/mongodb/
change_event.rs

1//! `MongoDB` change stream event types.
2//!
3//! Represents the structure of change events emitted by `MongoDB` change
4//! streams. Maps `MongoDB`'s change event document structure to typed Rust
5//! structs for safe downstream processing.
6//!
7//! # Operation Types
8//!
9//! `MongoDB` change streams emit events for the following operations:
10//!
11//! - `insert` — new document created; `full_document` always present
12//! - `update` — document modified; `update_description` contains delta
13//! - `replace` — full document replacement
14//! - `delete` — document removed; only `document_key` present
15//! - `drop` — collection dropped (lifecycle event)
16//! - `rename` — collection renamed (lifecycle event)
17//! - `invalidate` — stream invalidated; must restart with `startAfter`
18
19use std::collections::HashMap;
20
21/// The type of operation described by a change event.
22#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub enum OperationType {
25    /// A new document was inserted.
26    Insert,
27    /// An existing document was updated (delta available).
28    Update,
29    /// An existing document was fully replaced.
30    Replace,
31    /// A document was deleted.
32    Delete,
33    /// The collection was dropped.
34    Drop,
35    /// The collection was renamed.
36    Rename,
37    /// The change stream was invalidated and must be restarted.
38    Invalidate,
39    /// The database was dropped.
40    DropDatabase,
41    /// An operation type not yet mapped by this enum.
42    Other(String),
43}
44
45impl OperationType {
46    /// Returns the single-character code for the operation, compatible with
47    /// the CDC envelope format used elsewhere in `LaminarDB`.
48    #[must_use]
49    pub fn as_str(&self) -> &str {
50        match self {
51            Self::Insert => "I",
52            Self::Update => "U",
53            Self::Replace => "R",
54            Self::Delete => "D",
55            Self::Drop => "DROP",
56            Self::Rename => "RENAME",
57            Self::Invalidate => "INVALIDATE",
58            Self::DropDatabase => "DROP_DATABASE",
59            Self::Other(s) => s.as_str(),
60        }
61    }
62
63    /// Returns `true` if this is a DML operation (insert/update/replace/delete).
64    #[must_use]
65    pub fn is_dml(&self) -> bool {
66        matches!(
67            self,
68            Self::Insert | Self::Update | Self::Replace | Self::Delete
69        )
70    }
71
72    /// Returns `true` if this is a lifecycle event (drop/rename/invalidate).
73    #[must_use]
74    pub fn is_lifecycle(&self) -> bool {
75        !self.is_dml()
76    }
77}
78
79impl std::fmt::Display for OperationType {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.write_str(self.as_str())
82    }
83}
84
85/// Describes the fields changed during an `update` operation.
86///
87/// Mirrors `MongoDB`'s `updateDescription` subdocument in change events.
88/// Only populated for `update` operations; `insert`, `replace`, and
89/// `delete` events do not include this.
90#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
91pub struct UpdateDescription {
92    /// Fields that were set or modified, as key-value JSON pairs.
93    pub updated_fields: HashMap<String, serde_json::Value>,
94
95    /// Fields that were removed from the document.
96    pub removed_fields: Vec<String>,
97
98    /// Arrays that were truncated (`MongoDB` 6.0+).
99    #[serde(default)]
100    pub truncated_arrays: Vec<TruncatedArray>,
101}
102
103/// Describes a truncated array in an update operation (`MongoDB` 6.0+).
104#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
105pub struct TruncatedArray {
106    /// Dot-notation path of the truncated array field.
107    pub field: String,
108    /// New size of the array after truncation.
109    pub new_size: u32,
110}
111
112/// The namespace (database + collection) for a change event.
113#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
114pub struct Namespace {
115    /// Database name.
116    pub db: String,
117    /// Collection name.
118    pub coll: String,
119}
120
121impl Namespace {
122    /// Returns the fully qualified namespace as `db.coll`.
123    #[must_use]
124    pub fn full_name(&self) -> String {
125        format!("{}.{}", self.db, self.coll)
126    }
127}
128
129impl std::fmt::Display for Namespace {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        write!(f, "{}.{}", self.db, self.coll)
132    }
133}
134
135/// A change event from a `MongoDB` change stream.
136///
137/// This struct captures all fields relevant for CDC processing. The
138/// `resume_token` is the opaque `_id` field from the change event document,
139/// used for resuming the stream.
140#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
141pub struct MongoDbChangeEvent {
142    /// The operation type.
143    pub operation_type: OperationType,
144
145    /// The namespace (database + collection).
146    pub namespace: Namespace,
147
148    /// The document key (`_id` and any shard key fields).
149    /// Serialized as a JSON string for Arrow compatibility.
150    pub document_key: String,
151
152    /// The full document (present for insert, replace, and optionally
153    /// for update events depending on `FullDocumentMode`).
154    pub full_document: Option<String>,
155
156    /// Update delta (only for `update` operations).
157    pub update_description: Option<UpdateDescription>,
158
159    /// Server timestamp of the operation (cluster time).
160    pub cluster_time_secs: u32,
161
162    /// Cluster time increment.
163    pub cluster_time_inc: u32,
164
165    /// The opaque resume token for this event, serialized as JSON.
166    pub resume_token: String,
167
168    /// Wall clock timestamp in milliseconds since Unix epoch.
169    pub wall_time_ms: i64,
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    #[test]
177    fn test_operation_type_as_str() {
178        assert_eq!(OperationType::Insert.as_str(), "I");
179        assert_eq!(OperationType::Update.as_str(), "U");
180        assert_eq!(OperationType::Replace.as_str(), "R");
181        assert_eq!(OperationType::Delete.as_str(), "D");
182        assert_eq!(OperationType::Drop.as_str(), "DROP");
183        assert_eq!(OperationType::Rename.as_str(), "RENAME");
184        assert_eq!(OperationType::Invalidate.as_str(), "INVALIDATE");
185    }
186
187    #[test]
188    fn test_operation_type_is_dml() {
189        assert!(OperationType::Insert.is_dml());
190        assert!(OperationType::Update.is_dml());
191        assert!(OperationType::Replace.is_dml());
192        assert!(OperationType::Delete.is_dml());
193        assert!(!OperationType::Drop.is_dml());
194        assert!(!OperationType::Rename.is_dml());
195        assert!(!OperationType::Invalidate.is_dml());
196    }
197
198    #[test]
199    fn test_operation_type_is_lifecycle() {
200        assert!(!OperationType::Insert.is_lifecycle());
201        assert!(OperationType::Drop.is_lifecycle());
202        assert!(OperationType::Rename.is_lifecycle());
203        assert!(OperationType::Invalidate.is_lifecycle());
204    }
205
206    #[test]
207    fn test_namespace_full_name() {
208        let ns = Namespace {
209            db: "mydb".to_string(),
210            coll: "users".to_string(),
211        };
212        assert_eq!(ns.full_name(), "mydb.users");
213        assert_eq!(ns.to_string(), "mydb.users");
214    }
215
216    #[test]
217    fn test_update_description_default() {
218        let ud = UpdateDescription::default();
219        assert!(ud.updated_fields.is_empty());
220        assert!(ud.removed_fields.is_empty());
221        assert!(ud.truncated_arrays.is_empty());
222    }
223
224    #[test]
225    fn test_change_event_serde_roundtrip() {
226        let event = MongoDbChangeEvent {
227            operation_type: OperationType::Insert,
228            namespace: Namespace {
229                db: "test".to_string(),
230                coll: "docs".to_string(),
231            },
232            document_key: r#"{"_id": "abc123"}"#.to_string(),
233            full_document: Some(r#"{"_id": "abc123", "name": "Alice"}"#.to_string()),
234            update_description: None,
235            cluster_time_secs: 1_700_000_000,
236            cluster_time_inc: 1,
237            resume_token: r#"{"_data": "token123"}"#.to_string(),
238            wall_time_ms: 1_700_000_000_000,
239        };
240
241        let json = serde_json::to_string(&event).unwrap();
242        let deserialized: MongoDbChangeEvent = serde_json::from_str(&json).unwrap();
243        assert_eq!(deserialized.operation_type, OperationType::Insert);
244        assert_eq!(deserialized.namespace.db, "test");
245    }
246}