laminar_connectors/mongodb/
metrics.rs1use prometheus::{IntCounter, Registry};
6
7use crate::metrics::ConnectorMetrics;
8
9#[derive(Debug, Clone)]
11pub struct MongoDbCdcMetrics {
12 pub events_received: IntCounter,
14 pub bytes_received: IntCounter,
16 pub errors: IntCounter,
18 pub batches_produced: IntCounter,
20 pub inserts: IntCounter,
22 pub updates: IntCounter,
24 pub replaces: IntCounter,
26 pub deletes: IntCounter,
28 pub lifecycle_events: IntCounter,
30 pub token_persists: IntCounter,
32 pub reconnects: IntCounter,
34 pub large_event_fragments: IntCounter,
36 pub large_events_reassembled: IntCounter,
38}
39
40impl MongoDbCdcMetrics {
41 #[must_use]
43 #[allow(clippy::missing_panics_doc)]
44 pub fn new(registry: Option<&Registry>) -> Self {
45 let local;
46 let reg = if let Some(r) = registry {
47 r
48 } else {
49 local = Registry::new();
50 &local
51 };
52
53 let events_received = IntCounter::new(
54 "mongodb_cdc_events_received_total",
55 "Total MongoDB CDC events received",
56 )
57 .unwrap();
58 let bytes_received = IntCounter::new(
59 "mongodb_cdc_bytes_received_total",
60 "Total bytes from change stream",
61 )
62 .unwrap();
63 let errors =
64 IntCounter::new("mongodb_cdc_errors_total", "Total MongoDB CDC errors").unwrap();
65 let batches_produced = IntCounter::new(
66 "mongodb_cdc_batches_produced_total",
67 "Total batches produced",
68 )
69 .unwrap();
70 let inserts = IntCounter::new("mongodb_cdc_inserts_total", "Total INSERT events").unwrap();
71 let updates = IntCounter::new("mongodb_cdc_updates_total", "Total UPDATE events").unwrap();
72 let replaces =
73 IntCounter::new("mongodb_cdc_replaces_total", "Total REPLACE events").unwrap();
74 let deletes = IntCounter::new("mongodb_cdc_deletes_total", "Total DELETE events").unwrap();
75 let lifecycle_events = IntCounter::new(
76 "mongodb_cdc_lifecycle_events_total",
77 "Total lifecycle events",
78 )
79 .unwrap();
80 let token_persists = IntCounter::new(
81 "mongodb_cdc_token_persists_total",
82 "Total resume token persist ops",
83 )
84 .unwrap();
85 let reconnects = IntCounter::new(
86 "mongodb_cdc_reconnects_total",
87 "Total reconnection attempts",
88 )
89 .unwrap();
90 let large_event_fragments = IntCounter::new(
91 "mongodb_cdc_large_event_fragments_total",
92 "Total large event fragments",
93 )
94 .unwrap();
95 let large_events_reassembled = IntCounter::new(
96 "mongodb_cdc_large_events_reassembled_total",
97 "Total large events reassembled",
98 )
99 .unwrap();
100
101 let _ = reg.register(Box::new(events_received.clone()));
102 let _ = reg.register(Box::new(bytes_received.clone()));
103 let _ = reg.register(Box::new(errors.clone()));
104 let _ = reg.register(Box::new(batches_produced.clone()));
105 let _ = reg.register(Box::new(inserts.clone()));
106 let _ = reg.register(Box::new(updates.clone()));
107 let _ = reg.register(Box::new(replaces.clone()));
108 let _ = reg.register(Box::new(deletes.clone()));
109 let _ = reg.register(Box::new(lifecycle_events.clone()));
110 let _ = reg.register(Box::new(token_persists.clone()));
111 let _ = reg.register(Box::new(reconnects.clone()));
112 let _ = reg.register(Box::new(large_event_fragments.clone()));
113 let _ = reg.register(Box::new(large_events_reassembled.clone()));
114
115 Self {
116 events_received,
117 bytes_received,
118 errors,
119 batches_produced,
120 inserts,
121 updates,
122 replaces,
123 deletes,
124 lifecycle_events,
125 token_persists,
126 reconnects,
127 large_event_fragments,
128 large_events_reassembled,
129 }
130 }
131
132 pub fn record_event(&self, op: &str) {
134 self.events_received.inc();
135 match op {
136 "I" => self.inserts.inc(),
137 "U" => self.updates.inc(),
138 "R" => self.replaces.inc(),
139 "D" => self.deletes.inc(),
140 _ => self.lifecycle_events.inc(),
141 }
142 }
143
144 pub fn record_bytes(&self, bytes: u64) {
146 self.bytes_received.inc_by(bytes);
147 }
148
149 pub fn record_error(&self) {
151 self.errors.inc();
152 }
153
154 pub fn record_batch(&self) {
156 self.batches_produced.inc();
157 }
158
159 pub fn record_token_persist(&self) {
161 self.token_persists.inc();
162 }
163
164 pub fn record_reconnect(&self) {
166 self.reconnects.inc();
167 }
168
169 pub fn record_large_event_fragment(&self) {
171 self.large_event_fragments.inc();
172 }
173
174 pub fn record_large_event_reassembled(&self) {
176 self.large_events_reassembled.inc();
177 }
178
179 #[must_use]
181 #[allow(clippy::cast_precision_loss)]
182 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
183 let mut m = ConnectorMetrics::new();
184 m.records_total = self.events_received.get();
185 m.bytes_total = self.bytes_received.get();
186 m.errors_total = self.errors.get();
187
188 m.add_custom("inserts", self.inserts.get() as f64);
189 m.add_custom("updates", self.updates.get() as f64);
190 m.add_custom("replaces", self.replaces.get() as f64);
191 m.add_custom("deletes", self.deletes.get() as f64);
192 m.add_custom("lifecycle_events", self.lifecycle_events.get() as f64);
193 m.add_custom("reconnects", self.reconnects.get() as f64);
194 m.add_custom(
195 "large_events_reassembled",
196 self.large_events_reassembled.get() as f64,
197 );
198 m
199 }
200}
201
202impl Default for MongoDbCdcMetrics {
203 fn default() -> Self {
204 Self::new(None)
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct MongoDbSinkMetrics {
211 pub records_written: IntCounter,
213 pub bytes_written: IntCounter,
215 pub errors: IntCounter,
217 pub batches_flushed: IntCounter,
219 pub bulk_writes: IntCounter,
221 pub inserts: IntCounter,
223 pub upserts: IntCounter,
225 pub deletes: IntCounter,
227}
228
229impl MongoDbSinkMetrics {
230 #[must_use]
232 #[allow(clippy::missing_panics_doc)]
233 pub fn new(registry: Option<&Registry>) -> Self {
234 let local;
235 let reg = if let Some(r) = registry {
236 r
237 } else {
238 local = Registry::new();
239 &local
240 };
241
242 let records_written = IntCounter::new(
243 "mongodb_sink_records_written_total",
244 "Total MongoDB sink records written",
245 )
246 .unwrap();
247 let bytes_written = IntCounter::new(
248 "mongodb_sink_bytes_written_total",
249 "Total MongoDB sink bytes written",
250 )
251 .unwrap();
252 let errors =
253 IntCounter::new("mongodb_sink_errors_total", "Total MongoDB sink errors").unwrap();
254 let batches_flushed = IntCounter::new(
255 "mongodb_sink_batches_flushed_total",
256 "Total batches flushed",
257 )
258 .unwrap();
259 let bulk_writes = IntCounter::new(
260 "mongodb_sink_bulk_writes_total",
261 "Total bulk write operations",
262 )
263 .unwrap();
264 let inserts =
265 IntCounter::new("mongodb_sink_inserts_total", "Total insert operations").unwrap();
266 let upserts =
267 IntCounter::new("mongodb_sink_upserts_total", "Total upsert operations").unwrap();
268 let deletes =
269 IntCounter::new("mongodb_sink_deletes_total", "Total delete operations").unwrap();
270
271 let _ = reg.register(Box::new(records_written.clone()));
272 let _ = reg.register(Box::new(bytes_written.clone()));
273 let _ = reg.register(Box::new(errors.clone()));
274 let _ = reg.register(Box::new(batches_flushed.clone()));
275 let _ = reg.register(Box::new(bulk_writes.clone()));
276 let _ = reg.register(Box::new(inserts.clone()));
277 let _ = reg.register(Box::new(upserts.clone()));
278 let _ = reg.register(Box::new(deletes.clone()));
279
280 Self {
281 records_written,
282 bytes_written,
283 errors,
284 batches_flushed,
285 bulk_writes,
286 inserts,
287 upserts,
288 deletes,
289 }
290 }
291
292 pub fn record_flush(&self, records: u64, bytes: u64) {
294 self.records_written.inc_by(records);
295 self.bytes_written.inc_by(bytes);
296 self.batches_flushed.inc();
297 }
298
299 pub fn record_bulk_write(&self) {
301 self.bulk_writes.inc();
302 }
303
304 pub fn record_error(&self) {
306 self.errors.inc();
307 }
308
309 pub fn record_inserts(&self, count: u64) {
311 self.inserts.inc_by(count);
312 }
313
314 pub fn record_upserts(&self, count: u64) {
316 self.upserts.inc_by(count);
317 }
318
319 pub fn record_deletes(&self, count: u64) {
321 self.deletes.inc_by(count);
322 }
323
324 #[must_use]
326 #[allow(clippy::cast_precision_loss)]
327 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
328 let mut m = ConnectorMetrics::new();
329 m.records_total = self.records_written.get();
330 m.bytes_total = self.bytes_written.get();
331 m.errors_total = self.errors.get();
332
333 m.add_custom("inserts", self.inserts.get() as f64);
334 m.add_custom("upserts", self.upserts.get() as f64);
335 m.add_custom("deletes", self.deletes.get() as f64);
336 m.add_custom("bulk_writes", self.bulk_writes.get() as f64);
337 m.add_custom("batches_flushed", self.batches_flushed.get() as f64);
338 m
339 }
340}
341
342impl Default for MongoDbSinkMetrics {
343 fn default() -> Self {
344 Self::new(None)
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
353 fn test_source_metrics_record_events() {
354 let m = MongoDbCdcMetrics::new(None);
355 m.record_event("I");
356 m.record_event("I");
357 m.record_event("U");
358 m.record_event("D");
359 m.record_event("DROP");
360 m.record_bytes(1024);
361 m.record_error();
362 m.record_batch();
363 m.record_token_persist();
364 m.record_reconnect();
365
366 assert_eq!(m.events_received.get(), 5);
367 assert_eq!(m.inserts.get(), 2);
368 assert_eq!(m.updates.get(), 1);
369 assert_eq!(m.deletes.get(), 1);
370 assert_eq!(m.lifecycle_events.get(), 1);
371 assert_eq!(m.bytes_received.get(), 1024);
372 assert_eq!(m.errors.get(), 1);
373 assert_eq!(m.batches_produced.get(), 1);
374 assert_eq!(m.token_persists.get(), 1);
375 assert_eq!(m.reconnects.get(), 1);
376 }
377
378 #[test]
379 fn test_source_metrics_to_connector_metrics() {
380 let m = MongoDbCdcMetrics::new(None);
381 m.record_event("I");
382 m.record_bytes(512);
383 m.record_error();
384
385 let cm = m.to_connector_metrics();
386 assert_eq!(cm.records_total, 1);
387 assert_eq!(cm.bytes_total, 512);
388 assert_eq!(cm.errors_total, 1);
389 assert!(!cm.custom.is_empty());
390 }
391
392 #[test]
393 fn test_sink_metrics_record_flush() {
394 let m = MongoDbSinkMetrics::new(None);
395 m.record_flush(100, 5000);
396 m.record_bulk_write();
397 m.record_inserts(80);
398 m.record_upserts(15);
399 m.record_deletes(5);
400 m.record_error();
401
402 assert_eq!(m.records_written.get(), 100);
403 assert_eq!(m.bytes_written.get(), 5000);
404 assert_eq!(m.batches_flushed.get(), 1);
405 assert_eq!(m.bulk_writes.get(), 1);
406 assert_eq!(m.inserts.get(), 80);
407 assert_eq!(m.upserts.get(), 15);
408 assert_eq!(m.deletes.get(), 5);
409 assert_eq!(m.errors.get(), 1);
410 }
411
412 #[test]
413 fn test_sink_metrics_to_connector_metrics() {
414 let m = MongoDbSinkMetrics::new(None);
415 m.record_flush(50, 2500);
416 m.record_error();
417
418 let cm = m.to_connector_metrics();
419 assert_eq!(cm.records_total, 50);
420 assert_eq!(cm.bytes_total, 2500);
421 assert_eq!(cm.errors_total, 1);
422 }
423}