Skip to main content

laminar_connectors/mongodb/
metrics.rs

1//! `MongoDB` connector metrics.
2//!
3//! Prometheus-backed counters for tracking CDC source and sink performance.
4
5use prometheus::{IntCounter, Registry};
6
7use crate::metrics::ConnectorMetrics;
8
9/// Metrics for the `MongoDB` CDC source connector.
10#[derive(Debug, Clone)]
11pub struct MongoDbCdcMetrics {
12    /// Total change events received.
13    pub events_received: IntCounter,
14    /// Total bytes received from the change stream.
15    pub bytes_received: IntCounter,
16    /// Total errors encountered.
17    pub errors: IntCounter,
18    /// Total batches produced for downstream.
19    pub batches_produced: IntCounter,
20    /// Total INSERT operations received.
21    pub inserts: IntCounter,
22    /// Total UPDATE operations received.
23    pub updates: IntCounter,
24    /// Total REPLACE operations received.
25    pub replaces: IntCounter,
26    /// Total DELETE operations received.
27    pub deletes: IntCounter,
28    /// Total lifecycle events (drop/rename/invalidate).
29    pub lifecycle_events: IntCounter,
30    /// Total resume token persist operations.
31    pub token_persists: IntCounter,
32    /// Total reconnection attempts.
33    pub reconnects: IntCounter,
34    /// Total large event fragments received.
35    pub large_event_fragments: IntCounter,
36    /// Total large events reassembled.
37    pub large_events_reassembled: IntCounter,
38}
39
40impl MongoDbCdcMetrics {
41    /// Creates a new metrics instance with all counters at zero.
42    #[must_use]
43    #[allow(clippy::missing_panics_doc)]
44    pub fn new(registry: Option<&Registry>) -> Self {
45        let local;
46        let reg = if let Some(r) = registry {
47            r
48        } else {
49            local = Registry::new();
50            &local
51        };
52
53        let events_received = IntCounter::new(
54            "mongodb_cdc_events_received_total",
55            "Total MongoDB CDC events received",
56        )
57        .unwrap();
58        let bytes_received = IntCounter::new(
59            "mongodb_cdc_bytes_received_total",
60            "Total bytes from change stream",
61        )
62        .unwrap();
63        let errors =
64            IntCounter::new("mongodb_cdc_errors_total", "Total MongoDB CDC errors").unwrap();
65        let batches_produced = IntCounter::new(
66            "mongodb_cdc_batches_produced_total",
67            "Total batches produced",
68        )
69        .unwrap();
70        let inserts = IntCounter::new("mongodb_cdc_inserts_total", "Total INSERT events").unwrap();
71        let updates = IntCounter::new("mongodb_cdc_updates_total", "Total UPDATE events").unwrap();
72        let replaces =
73            IntCounter::new("mongodb_cdc_replaces_total", "Total REPLACE events").unwrap();
74        let deletes = IntCounter::new("mongodb_cdc_deletes_total", "Total DELETE events").unwrap();
75        let lifecycle_events = IntCounter::new(
76            "mongodb_cdc_lifecycle_events_total",
77            "Total lifecycle events",
78        )
79        .unwrap();
80        let token_persists = IntCounter::new(
81            "mongodb_cdc_token_persists_total",
82            "Total resume token persist ops",
83        )
84        .unwrap();
85        let reconnects = IntCounter::new(
86            "mongodb_cdc_reconnects_total",
87            "Total reconnection attempts",
88        )
89        .unwrap();
90        let large_event_fragments = IntCounter::new(
91            "mongodb_cdc_large_event_fragments_total",
92            "Total large event fragments",
93        )
94        .unwrap();
95        let large_events_reassembled = IntCounter::new(
96            "mongodb_cdc_large_events_reassembled_total",
97            "Total large events reassembled",
98        )
99        .unwrap();
100
101        let _ = reg.register(Box::new(events_received.clone()));
102        let _ = reg.register(Box::new(bytes_received.clone()));
103        let _ = reg.register(Box::new(errors.clone()));
104        let _ = reg.register(Box::new(batches_produced.clone()));
105        let _ = reg.register(Box::new(inserts.clone()));
106        let _ = reg.register(Box::new(updates.clone()));
107        let _ = reg.register(Box::new(replaces.clone()));
108        let _ = reg.register(Box::new(deletes.clone()));
109        let _ = reg.register(Box::new(lifecycle_events.clone()));
110        let _ = reg.register(Box::new(token_persists.clone()));
111        let _ = reg.register(Box::new(reconnects.clone()));
112        let _ = reg.register(Box::new(large_event_fragments.clone()));
113        let _ = reg.register(Box::new(large_events_reassembled.clone()));
114
115        Self {
116            events_received,
117            bytes_received,
118            errors,
119            batches_produced,
120            inserts,
121            updates,
122            replaces,
123            deletes,
124            lifecycle_events,
125            token_persists,
126            reconnects,
127            large_event_fragments,
128            large_events_reassembled,
129        }
130    }
131
132    /// Records a received change event by operation type.
133    pub fn record_event(&self, op: &str) {
134        self.events_received.inc();
135        match op {
136            "I" => self.inserts.inc(),
137            "U" => self.updates.inc(),
138            "R" => self.replaces.inc(),
139            "D" => self.deletes.inc(),
140            _ => self.lifecycle_events.inc(),
141        }
142    }
143
144    /// Records bytes received from the change stream.
145    pub fn record_bytes(&self, bytes: u64) {
146        self.bytes_received.inc_by(bytes);
147    }
148
149    /// Records an error.
150    pub fn record_error(&self) {
151        self.errors.inc();
152    }
153
154    /// Records a batch produced for downstream.
155    pub fn record_batch(&self) {
156        self.batches_produced.inc();
157    }
158
159    /// Records a resume token persistence operation.
160    pub fn record_token_persist(&self) {
161        self.token_persists.inc();
162    }
163
164    /// Records a reconnection attempt.
165    pub fn record_reconnect(&self) {
166        self.reconnects.inc();
167    }
168
169    /// Records a large event fragment received.
170    pub fn record_large_event_fragment(&self) {
171        self.large_event_fragments.inc();
172    }
173
174    /// Records a large event reassembled.
175    pub fn record_large_event_reassembled(&self) {
176        self.large_events_reassembled.inc();
177    }
178
179    /// Converts to the SDK's [`ConnectorMetrics`].
180    #[must_use]
181    #[allow(clippy::cast_precision_loss)]
182    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
183        let mut m = ConnectorMetrics::new();
184        m.records_total = self.events_received.get();
185        m.bytes_total = self.bytes_received.get();
186        m.errors_total = self.errors.get();
187
188        m.add_custom("inserts", self.inserts.get() as f64);
189        m.add_custom("updates", self.updates.get() as f64);
190        m.add_custom("replaces", self.replaces.get() as f64);
191        m.add_custom("deletes", self.deletes.get() as f64);
192        m.add_custom("lifecycle_events", self.lifecycle_events.get() as f64);
193        m.add_custom("reconnects", self.reconnects.get() as f64);
194        m.add_custom(
195            "large_events_reassembled",
196            self.large_events_reassembled.get() as f64,
197        );
198        m
199    }
200}
201
202impl Default for MongoDbCdcMetrics {
203    fn default() -> Self {
204        Self::new(None)
205    }
206}
207
208/// Metrics for the `MongoDB` sink connector.
209#[derive(Debug, Clone)]
210pub struct MongoDbSinkMetrics {
211    /// Total records written.
212    pub records_written: IntCounter,
213    /// Total bytes written.
214    pub bytes_written: IntCounter,
215    /// Total errors encountered.
216    pub errors: IntCounter,
217    /// Total batches flushed.
218    pub batches_flushed: IntCounter,
219    /// Total bulk write operations issued.
220    pub bulk_writes: IntCounter,
221    /// Total individual insert operations.
222    pub inserts: IntCounter,
223    /// Total upsert operations.
224    pub upserts: IntCounter,
225    /// Total delete operations.
226    pub deletes: IntCounter,
227}
228
229impl MongoDbSinkMetrics {
230    /// Creates a new metrics instance with all counters at zero.
231    #[must_use]
232    #[allow(clippy::missing_panics_doc)]
233    pub fn new(registry: Option<&Registry>) -> Self {
234        let local;
235        let reg = if let Some(r) = registry {
236            r
237        } else {
238            local = Registry::new();
239            &local
240        };
241
242        let records_written = IntCounter::new(
243            "mongodb_sink_records_written_total",
244            "Total MongoDB sink records written",
245        )
246        .unwrap();
247        let bytes_written = IntCounter::new(
248            "mongodb_sink_bytes_written_total",
249            "Total MongoDB sink bytes written",
250        )
251        .unwrap();
252        let errors =
253            IntCounter::new("mongodb_sink_errors_total", "Total MongoDB sink errors").unwrap();
254        let batches_flushed = IntCounter::new(
255            "mongodb_sink_batches_flushed_total",
256            "Total batches flushed",
257        )
258        .unwrap();
259        let bulk_writes = IntCounter::new(
260            "mongodb_sink_bulk_writes_total",
261            "Total bulk write operations",
262        )
263        .unwrap();
264        let inserts =
265            IntCounter::new("mongodb_sink_inserts_total", "Total insert operations").unwrap();
266        let upserts =
267            IntCounter::new("mongodb_sink_upserts_total", "Total upsert operations").unwrap();
268        let deletes =
269            IntCounter::new("mongodb_sink_deletes_total", "Total delete operations").unwrap();
270
271        let _ = reg.register(Box::new(records_written.clone()));
272        let _ = reg.register(Box::new(bytes_written.clone()));
273        let _ = reg.register(Box::new(errors.clone()));
274        let _ = reg.register(Box::new(batches_flushed.clone()));
275        let _ = reg.register(Box::new(bulk_writes.clone()));
276        let _ = reg.register(Box::new(inserts.clone()));
277        let _ = reg.register(Box::new(upserts.clone()));
278        let _ = reg.register(Box::new(deletes.clone()));
279
280        Self {
281            records_written,
282            bytes_written,
283            errors,
284            batches_flushed,
285            bulk_writes,
286            inserts,
287            upserts,
288            deletes,
289        }
290    }
291
292    /// Records a successful batch flush.
293    pub fn record_flush(&self, records: u64, bytes: u64) {
294        self.records_written.inc_by(records);
295        self.bytes_written.inc_by(bytes);
296        self.batches_flushed.inc();
297    }
298
299    /// Records a bulk write operation.
300    pub fn record_bulk_write(&self) {
301        self.bulk_writes.inc();
302    }
303
304    /// Records an error.
305    pub fn record_error(&self) {
306        self.errors.inc();
307    }
308
309    /// Records insert operations.
310    pub fn record_inserts(&self, count: u64) {
311        self.inserts.inc_by(count);
312    }
313
314    /// Records upsert operations.
315    pub fn record_upserts(&self, count: u64) {
316        self.upserts.inc_by(count);
317    }
318
319    /// Records delete operations.
320    pub fn record_deletes(&self, count: u64) {
321        self.deletes.inc_by(count);
322    }
323
324    /// Converts to the SDK's [`ConnectorMetrics`].
325    #[must_use]
326    #[allow(clippy::cast_precision_loss)]
327    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
328        let mut m = ConnectorMetrics::new();
329        m.records_total = self.records_written.get();
330        m.bytes_total = self.bytes_written.get();
331        m.errors_total = self.errors.get();
332
333        m.add_custom("inserts", self.inserts.get() as f64);
334        m.add_custom("upserts", self.upserts.get() as f64);
335        m.add_custom("deletes", self.deletes.get() as f64);
336        m.add_custom("bulk_writes", self.bulk_writes.get() as f64);
337        m.add_custom("batches_flushed", self.batches_flushed.get() as f64);
338        m
339    }
340}
341
342impl Default for MongoDbSinkMetrics {
343    fn default() -> Self {
344        Self::new(None)
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn test_source_metrics_record_events() {
354        let m = MongoDbCdcMetrics::new(None);
355        m.record_event("I");
356        m.record_event("I");
357        m.record_event("U");
358        m.record_event("D");
359        m.record_event("DROP");
360        m.record_bytes(1024);
361        m.record_error();
362        m.record_batch();
363        m.record_token_persist();
364        m.record_reconnect();
365
366        assert_eq!(m.events_received.get(), 5);
367        assert_eq!(m.inserts.get(), 2);
368        assert_eq!(m.updates.get(), 1);
369        assert_eq!(m.deletes.get(), 1);
370        assert_eq!(m.lifecycle_events.get(), 1);
371        assert_eq!(m.bytes_received.get(), 1024);
372        assert_eq!(m.errors.get(), 1);
373        assert_eq!(m.batches_produced.get(), 1);
374        assert_eq!(m.token_persists.get(), 1);
375        assert_eq!(m.reconnects.get(), 1);
376    }
377
378    #[test]
379    fn test_source_metrics_to_connector_metrics() {
380        let m = MongoDbCdcMetrics::new(None);
381        m.record_event("I");
382        m.record_bytes(512);
383        m.record_error();
384
385        let cm = m.to_connector_metrics();
386        assert_eq!(cm.records_total, 1);
387        assert_eq!(cm.bytes_total, 512);
388        assert_eq!(cm.errors_total, 1);
389        assert!(!cm.custom.is_empty());
390    }
391
392    #[test]
393    fn test_sink_metrics_record_flush() {
394        let m = MongoDbSinkMetrics::new(None);
395        m.record_flush(100, 5000);
396        m.record_bulk_write();
397        m.record_inserts(80);
398        m.record_upserts(15);
399        m.record_deletes(5);
400        m.record_error();
401
402        assert_eq!(m.records_written.get(), 100);
403        assert_eq!(m.bytes_written.get(), 5000);
404        assert_eq!(m.batches_flushed.get(), 1);
405        assert_eq!(m.bulk_writes.get(), 1);
406        assert_eq!(m.inserts.get(), 80);
407        assert_eq!(m.upserts.get(), 15);
408        assert_eq!(m.deletes.get(), 5);
409        assert_eq!(m.errors.get(), 1);
410    }
411
412    #[test]
413    fn test_sink_metrics_to_connector_metrics() {
414        let m = MongoDbSinkMetrics::new(None);
415        m.record_flush(50, 2500);
416        m.record_error();
417
418        let cm = m.to_connector_metrics();
419        assert_eq!(cm.records_total, 50);
420        assert_eq!(cm.bytes_total, 2500);
421        assert_eq!(cm.errors_total, 1);
422    }
423}