laminar_connectors/mongodb/
change_event.rs1use std::collections::HashMap;
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub enum OperationType {
25 Insert,
27 Update,
29 Replace,
31 Delete,
33 Drop,
35 Rename,
37 Invalidate,
39 DropDatabase,
41 Other(String),
43}
44
45impl OperationType {
46 #[must_use]
49 pub fn as_str(&self) -> &str {
50 match self {
51 Self::Insert => "I",
52 Self::Update => "U",
53 Self::Replace => "R",
54 Self::Delete => "D",
55 Self::Drop => "DROP",
56 Self::Rename => "RENAME",
57 Self::Invalidate => "INVALIDATE",
58 Self::DropDatabase => "DROP_DATABASE",
59 Self::Other(s) => s.as_str(),
60 }
61 }
62
63 #[must_use]
65 pub fn is_dml(&self) -> bool {
66 matches!(
67 self,
68 Self::Insert | Self::Update | Self::Replace | Self::Delete
69 )
70 }
71
72 #[must_use]
74 pub fn is_lifecycle(&self) -> bool {
75 !self.is_dml()
76 }
77}
78
79impl std::fmt::Display for OperationType {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.write_str(self.as_str())
82 }
83}
84
85#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
91pub struct UpdateDescription {
92 pub updated_fields: HashMap<String, serde_json::Value>,
94
95 pub removed_fields: Vec<String>,
97
98 #[serde(default)]
100 pub truncated_arrays: Vec<TruncatedArray>,
101}
102
103#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
105pub struct TruncatedArray {
106 pub field: String,
108 pub new_size: u32,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
114pub struct Namespace {
115 pub db: String,
117 pub coll: String,
119}
120
121impl Namespace {
122 #[must_use]
124 pub fn full_name(&self) -> String {
125 format!("{}.{}", self.db, self.coll)
126 }
127}
128
129impl std::fmt::Display for Namespace {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 write!(f, "{}.{}", self.db, self.coll)
132 }
133}
134
135#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
141pub struct MongoDbChangeEvent {
142 pub operation_type: OperationType,
144
145 pub namespace: Namespace,
147
148 pub document_key: String,
151
152 pub full_document: Option<String>,
155
156 pub update_description: Option<UpdateDescription>,
158
159 pub cluster_time_secs: u32,
161
162 pub cluster_time_inc: u32,
164
165 pub resume_token: String,
167
168 pub wall_time_ms: i64,
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn test_operation_type_as_str() {
178 assert_eq!(OperationType::Insert.as_str(), "I");
179 assert_eq!(OperationType::Update.as_str(), "U");
180 assert_eq!(OperationType::Replace.as_str(), "R");
181 assert_eq!(OperationType::Delete.as_str(), "D");
182 assert_eq!(OperationType::Drop.as_str(), "DROP");
183 assert_eq!(OperationType::Rename.as_str(), "RENAME");
184 assert_eq!(OperationType::Invalidate.as_str(), "INVALIDATE");
185 }
186
187 #[test]
188 fn test_operation_type_is_dml() {
189 assert!(OperationType::Insert.is_dml());
190 assert!(OperationType::Update.is_dml());
191 assert!(OperationType::Replace.is_dml());
192 assert!(OperationType::Delete.is_dml());
193 assert!(!OperationType::Drop.is_dml());
194 assert!(!OperationType::Rename.is_dml());
195 assert!(!OperationType::Invalidate.is_dml());
196 }
197
198 #[test]
199 fn test_operation_type_is_lifecycle() {
200 assert!(!OperationType::Insert.is_lifecycle());
201 assert!(OperationType::Drop.is_lifecycle());
202 assert!(OperationType::Rename.is_lifecycle());
203 assert!(OperationType::Invalidate.is_lifecycle());
204 }
205
206 #[test]
207 fn test_namespace_full_name() {
208 let ns = Namespace {
209 db: "mydb".to_string(),
210 coll: "users".to_string(),
211 };
212 assert_eq!(ns.full_name(), "mydb.users");
213 assert_eq!(ns.to_string(), "mydb.users");
214 }
215
216 #[test]
217 fn test_update_description_default() {
218 let ud = UpdateDescription::default();
219 assert!(ud.updated_fields.is_empty());
220 assert!(ud.removed_fields.is_empty());
221 assert!(ud.truncated_arrays.is_empty());
222 }
223
224 #[test]
225 fn test_change_event_serde_roundtrip() {
226 let event = MongoDbChangeEvent {
227 operation_type: OperationType::Insert,
228 namespace: Namespace {
229 db: "test".to_string(),
230 coll: "docs".to_string(),
231 },
232 document_key: r#"{"_id": "abc123"}"#.to_string(),
233 full_document: Some(r#"{"_id": "abc123", "name": "Alice"}"#.to_string()),
234 update_description: None,
235 cluster_time_secs: 1_700_000_000,
236 cluster_time_inc: 1,
237 resume_token: r#"{"_data": "token123"}"#.to_string(),
238 wall_time_ms: 1_700_000_000_000,
239 };
240
241 let json = serde_json::to_string(&event).unwrap();
242 let deserialized: MongoDbChangeEvent = serde_json::from_str(&json).unwrap();
243 assert_eq!(deserialized.operation_type, OperationType::Insert);
244 assert_eq!(deserialized.namespace.db, "test");
245 }
246}