Skip to main content

laminar_connectors/mongodb/
metrics.rs

1//! `MongoDB` connector metrics.
2//!
3//! Prometheus-backed counters for tracking CDC source and sink performance.
4
5use prometheus::{IntCounter, Registry};
6
7use crate::prom::reg_or_local;
8
9/// Metrics for the `MongoDB` CDC source connector.
10#[derive(Debug, Clone)]
11pub struct MongoDbCdcMetrics {
12    /// Total change events received.
13    pub events_received: IntCounter,
14    /// Total bytes received from the change stream.
15    pub bytes_received: IntCounter,
16    /// Total errors encountered.
17    pub errors: IntCounter,
18    /// Total batches produced for downstream.
19    pub batches_produced: IntCounter,
20    /// Total INSERT operations received.
21    pub inserts: IntCounter,
22    /// Total UPDATE operations received.
23    pub updates: IntCounter,
24    /// Total REPLACE operations received.
25    pub replaces: IntCounter,
26    /// Total DELETE operations received.
27    pub deletes: IntCounter,
28    /// Total lifecycle events (drop/rename/invalidate).
29    pub lifecycle_events: IntCounter,
30    /// Total resume token persist operations.
31    pub token_persists: IntCounter,
32    /// Total reconnection attempts.
33    pub reconnects: IntCounter,
34    /// Total large event fragments received.
35    pub large_event_fragments: IntCounter,
36    /// Total large events reassembled.
37    pub large_events_reassembled: IntCounter,
38}
39
40impl MongoDbCdcMetrics {
41    /// Creates a new metrics instance with all counters at zero.
42    #[must_use]
43    #[allow(clippy::missing_panics_doc)]
44    pub fn new(registry: Option<&Registry>) -> Self {
45        let mut local = None;
46        let reg = reg_or_local(registry, &mut local);
47
48        Self {
49            events_received: reg.counter(
50                "mongodb_cdc_events_received_total",
51                "Total MongoDB CDC events received",
52            ),
53            bytes_received: reg.counter(
54                "mongodb_cdc_bytes_received_total",
55                "Total bytes from change stream",
56            ),
57            errors: reg.counter("mongodb_cdc_errors_total", "Total MongoDB CDC errors"),
58            batches_produced: reg.counter(
59                "mongodb_cdc_batches_produced_total",
60                "Total batches produced",
61            ),
62            inserts: reg.counter("mongodb_cdc_inserts_total", "Total INSERT events"),
63            updates: reg.counter("mongodb_cdc_updates_total", "Total UPDATE events"),
64            replaces: reg.counter("mongodb_cdc_replaces_total", "Total REPLACE events"),
65            deletes: reg.counter("mongodb_cdc_deletes_total", "Total DELETE events"),
66            lifecycle_events: reg.counter(
67                "mongodb_cdc_lifecycle_events_total",
68                "Total lifecycle events",
69            ),
70            token_persists: reg.counter(
71                "mongodb_cdc_token_persists_total",
72                "Total resume token persist ops",
73            ),
74            reconnects: reg.counter(
75                "mongodb_cdc_reconnects_total",
76                "Total reconnection attempts",
77            ),
78            large_event_fragments: reg.counter(
79                "mongodb_cdc_large_event_fragments_total",
80                "Total large event fragments",
81            ),
82            large_events_reassembled: reg.counter(
83                "mongodb_cdc_large_events_reassembled_total",
84                "Total large events reassembled",
85            ),
86        }
87    }
88
89    /// Records a received change event by operation type.
90    pub fn record_event(&self, op: &str) {
91        self.events_received.inc();
92        match op {
93            "I" => self.inserts.inc(),
94            "U" => self.updates.inc(),
95            "R" => self.replaces.inc(),
96            "D" => self.deletes.inc(),
97            _ => self.lifecycle_events.inc(),
98        }
99    }
100
101    /// Records bytes received from the change stream.
102    pub fn record_bytes(&self, bytes: u64) {
103        self.bytes_received.inc_by(bytes);
104    }
105
106    /// Records an error.
107    pub fn record_error(&self) {
108        self.errors.inc();
109    }
110
111    /// Records a batch produced for downstream.
112    pub fn record_batch(&self) {
113        self.batches_produced.inc();
114    }
115
116    /// Records a resume token persistence operation.
117    pub fn record_token_persist(&self) {
118        self.token_persists.inc();
119    }
120
121    /// Records a reconnection attempt.
122    pub fn record_reconnect(&self) {
123        self.reconnects.inc();
124    }
125
126    /// Records a large event fragment received.
127    pub fn record_large_event_fragment(&self) {
128        self.large_event_fragments.inc();
129    }
130
131    /// Records a large event reassembled.
132    pub fn record_large_event_reassembled(&self) {
133        self.large_events_reassembled.inc();
134    }
135}
136
137impl Default for MongoDbCdcMetrics {
138    fn default() -> Self {
139        Self::new(None)
140    }
141}
142
143/// Metrics for the `MongoDB` sink connector.
144#[derive(Debug, Clone)]
145pub struct MongoDbSinkMetrics {
146    /// Total records written.
147    pub records_written: IntCounter,
148    /// Total bytes written.
149    pub bytes_written: IntCounter,
150    /// Total errors encountered.
151    pub errors: IntCounter,
152    /// Total batches flushed.
153    pub batches_flushed: IntCounter,
154    /// Total bulk write operations issued.
155    pub bulk_writes: IntCounter,
156    /// Total individual insert operations.
157    pub inserts: IntCounter,
158    /// Total upsert operations.
159    pub upserts: IntCounter,
160    /// Total delete operations.
161    pub deletes: IntCounter,
162}
163
164impl MongoDbSinkMetrics {
165    /// Creates a new metrics instance with all counters at zero.
166    #[must_use]
167    #[allow(clippy::missing_panics_doc)]
168    pub fn new(registry: Option<&Registry>) -> Self {
169        let mut local = None;
170        let reg = reg_or_local(registry, &mut local);
171
172        Self {
173            records_written: reg.counter(
174                "mongodb_sink_records_written_total",
175                "Total MongoDB sink records written",
176            ),
177            bytes_written: reg.counter(
178                "mongodb_sink_bytes_written_total",
179                "Total MongoDB sink bytes written",
180            ),
181            errors: reg.counter("mongodb_sink_errors_total", "Total MongoDB sink errors"),
182            batches_flushed: reg.counter(
183                "mongodb_sink_batches_flushed_total",
184                "Total batches flushed",
185            ),
186            bulk_writes: reg.counter(
187                "mongodb_sink_bulk_writes_total",
188                "Total bulk write operations",
189            ),
190            inserts: reg.counter("mongodb_sink_inserts_total", "Total insert operations"),
191            upserts: reg.counter("mongodb_sink_upserts_total", "Total upsert operations"),
192            deletes: reg.counter("mongodb_sink_deletes_total", "Total delete operations"),
193        }
194    }
195
196    /// Records a successful batch flush.
197    pub fn record_flush(&self, records: u64, bytes: u64) {
198        self.records_written.inc_by(records);
199        self.bytes_written.inc_by(bytes);
200        self.batches_flushed.inc();
201    }
202
203    /// Records a bulk write operation.
204    pub fn record_bulk_write(&self) {
205        self.bulk_writes.inc();
206    }
207
208    /// Records an error.
209    pub fn record_error(&self) {
210        self.errors.inc();
211    }
212
213    /// Records insert operations.
214    pub fn record_inserts(&self, count: u64) {
215        self.inserts.inc_by(count);
216    }
217
218    /// Records upsert operations.
219    pub fn record_upserts(&self, count: u64) {
220        self.upserts.inc_by(count);
221    }
222
223    /// Records delete operations.
224    pub fn record_deletes(&self, count: u64) {
225        self.deletes.inc_by(count);
226    }
227}
228
229impl Default for MongoDbSinkMetrics {
230    fn default() -> Self {
231        Self::new(None)
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn test_source_metrics_record_events() {
241        let m = MongoDbCdcMetrics::new(None);
242        m.record_event("I");
243        m.record_event("I");
244        m.record_event("U");
245        m.record_event("D");
246        m.record_event("DROP");
247        m.record_bytes(1024);
248        m.record_error();
249        m.record_batch();
250        m.record_token_persist();
251        m.record_reconnect();
252
253        assert_eq!(m.events_received.get(), 5);
254        assert_eq!(m.inserts.get(), 2);
255        assert_eq!(m.updates.get(), 1);
256        assert_eq!(m.deletes.get(), 1);
257        assert_eq!(m.lifecycle_events.get(), 1);
258        assert_eq!(m.bytes_received.get(), 1024);
259        assert_eq!(m.errors.get(), 1);
260        assert_eq!(m.batches_produced.get(), 1);
261        assert_eq!(m.token_persists.get(), 1);
262        assert_eq!(m.reconnects.get(), 1);
263    }
264
265    #[test]
266    fn test_sink_metrics_record_flush() {
267        let m = MongoDbSinkMetrics::new(None);
268        m.record_flush(100, 5000);
269        m.record_bulk_write();
270        m.record_inserts(80);
271        m.record_upserts(15);
272        m.record_deletes(5);
273        m.record_error();
274
275        assert_eq!(m.records_written.get(), 100);
276        assert_eq!(m.bytes_written.get(), 5000);
277        assert_eq!(m.batches_flushed.get(), 1);
278        assert_eq!(m.bulk_writes.get(), 1);
279        assert_eq!(m.inserts.get(), 80);
280        assert_eq!(m.upserts.get(), 15);
281        assert_eq!(m.deletes.get(), 5);
282        assert_eq!(m.errors.get(), 1);
283    }
284}