laminar_connectors/mongodb/
metrics.rs1use prometheus::{IntCounter, Registry};
6
7use crate::prom::reg_or_local;
8
9#[derive(Debug, Clone)]
11pub struct MongoDbCdcMetrics {
12 pub events_received: IntCounter,
14 pub bytes_received: IntCounter,
16 pub errors: IntCounter,
18 pub batches_produced: IntCounter,
20 pub inserts: IntCounter,
22 pub updates: IntCounter,
24 pub replaces: IntCounter,
26 pub deletes: IntCounter,
28 pub lifecycle_events: IntCounter,
30 pub token_persists: IntCounter,
32 pub reconnects: IntCounter,
34 pub large_event_fragments: IntCounter,
36 pub large_events_reassembled: IntCounter,
38}
39
40impl MongoDbCdcMetrics {
41 #[must_use]
43 #[allow(clippy::missing_panics_doc)]
44 pub fn new(registry: Option<&Registry>) -> Self {
45 let mut local = None;
46 let reg = reg_or_local(registry, &mut local);
47
48 Self {
49 events_received: reg.counter(
50 "mongodb_cdc_events_received_total",
51 "Total MongoDB CDC events received",
52 ),
53 bytes_received: reg.counter(
54 "mongodb_cdc_bytes_received_total",
55 "Total bytes from change stream",
56 ),
57 errors: reg.counter("mongodb_cdc_errors_total", "Total MongoDB CDC errors"),
58 batches_produced: reg.counter(
59 "mongodb_cdc_batches_produced_total",
60 "Total batches produced",
61 ),
62 inserts: reg.counter("mongodb_cdc_inserts_total", "Total INSERT events"),
63 updates: reg.counter("mongodb_cdc_updates_total", "Total UPDATE events"),
64 replaces: reg.counter("mongodb_cdc_replaces_total", "Total REPLACE events"),
65 deletes: reg.counter("mongodb_cdc_deletes_total", "Total DELETE events"),
66 lifecycle_events: reg.counter(
67 "mongodb_cdc_lifecycle_events_total",
68 "Total lifecycle events",
69 ),
70 token_persists: reg.counter(
71 "mongodb_cdc_token_persists_total",
72 "Total resume token persist ops",
73 ),
74 reconnects: reg.counter(
75 "mongodb_cdc_reconnects_total",
76 "Total reconnection attempts",
77 ),
78 large_event_fragments: reg.counter(
79 "mongodb_cdc_large_event_fragments_total",
80 "Total large event fragments",
81 ),
82 large_events_reassembled: reg.counter(
83 "mongodb_cdc_large_events_reassembled_total",
84 "Total large events reassembled",
85 ),
86 }
87 }
88
89 pub fn record_event(&self, op: &str) {
91 self.events_received.inc();
92 match op {
93 "I" => self.inserts.inc(),
94 "U" => self.updates.inc(),
95 "R" => self.replaces.inc(),
96 "D" => self.deletes.inc(),
97 _ => self.lifecycle_events.inc(),
98 }
99 }
100
101 pub fn record_bytes(&self, bytes: u64) {
103 self.bytes_received.inc_by(bytes);
104 }
105
106 pub fn record_error(&self) {
108 self.errors.inc();
109 }
110
111 pub fn record_batch(&self) {
113 self.batches_produced.inc();
114 }
115
116 pub fn record_token_persist(&self) {
118 self.token_persists.inc();
119 }
120
121 pub fn record_reconnect(&self) {
123 self.reconnects.inc();
124 }
125
126 pub fn record_large_event_fragment(&self) {
128 self.large_event_fragments.inc();
129 }
130
131 pub fn record_large_event_reassembled(&self) {
133 self.large_events_reassembled.inc();
134 }
135}
136
137impl Default for MongoDbCdcMetrics {
138 fn default() -> Self {
139 Self::new(None)
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct MongoDbSinkMetrics {
146 pub records_written: IntCounter,
148 pub bytes_written: IntCounter,
150 pub errors: IntCounter,
152 pub batches_flushed: IntCounter,
154 pub bulk_writes: IntCounter,
156 pub inserts: IntCounter,
158 pub upserts: IntCounter,
160 pub deletes: IntCounter,
162}
163
164impl MongoDbSinkMetrics {
165 #[must_use]
167 #[allow(clippy::missing_panics_doc)]
168 pub fn new(registry: Option<&Registry>) -> Self {
169 let mut local = None;
170 let reg = reg_or_local(registry, &mut local);
171
172 Self {
173 records_written: reg.counter(
174 "mongodb_sink_records_written_total",
175 "Total MongoDB sink records written",
176 ),
177 bytes_written: reg.counter(
178 "mongodb_sink_bytes_written_total",
179 "Total MongoDB sink bytes written",
180 ),
181 errors: reg.counter("mongodb_sink_errors_total", "Total MongoDB sink errors"),
182 batches_flushed: reg.counter(
183 "mongodb_sink_batches_flushed_total",
184 "Total batches flushed",
185 ),
186 bulk_writes: reg.counter(
187 "mongodb_sink_bulk_writes_total",
188 "Total bulk write operations",
189 ),
190 inserts: reg.counter("mongodb_sink_inserts_total", "Total insert operations"),
191 upserts: reg.counter("mongodb_sink_upserts_total", "Total upsert operations"),
192 deletes: reg.counter("mongodb_sink_deletes_total", "Total delete operations"),
193 }
194 }
195
196 pub fn record_flush(&self, records: u64, bytes: u64) {
198 self.records_written.inc_by(records);
199 self.bytes_written.inc_by(bytes);
200 self.batches_flushed.inc();
201 }
202
203 pub fn record_bulk_write(&self) {
205 self.bulk_writes.inc();
206 }
207
208 pub fn record_error(&self) {
210 self.errors.inc();
211 }
212
213 pub fn record_inserts(&self, count: u64) {
215 self.inserts.inc_by(count);
216 }
217
218 pub fn record_upserts(&self, count: u64) {
220 self.upserts.inc_by(count);
221 }
222
223 pub fn record_deletes(&self, count: u64) {
225 self.deletes.inc_by(count);
226 }
227}
228
229impl Default for MongoDbSinkMetrics {
230 fn default() -> Self {
231 Self::new(None)
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn test_source_metrics_record_events() {
241 let m = MongoDbCdcMetrics::new(None);
242 m.record_event("I");
243 m.record_event("I");
244 m.record_event("U");
245 m.record_event("D");
246 m.record_event("DROP");
247 m.record_bytes(1024);
248 m.record_error();
249 m.record_batch();
250 m.record_token_persist();
251 m.record_reconnect();
252
253 assert_eq!(m.events_received.get(), 5);
254 assert_eq!(m.inserts.get(), 2);
255 assert_eq!(m.updates.get(), 1);
256 assert_eq!(m.deletes.get(), 1);
257 assert_eq!(m.lifecycle_events.get(), 1);
258 assert_eq!(m.bytes_received.get(), 1024);
259 assert_eq!(m.errors.get(), 1);
260 assert_eq!(m.batches_produced.get(), 1);
261 assert_eq!(m.token_persists.get(), 1);
262 assert_eq!(m.reconnects.get(), 1);
263 }
264
265 #[test]
266 fn test_sink_metrics_record_flush() {
267 let m = MongoDbSinkMetrics::new(None);
268 m.record_flush(100, 5000);
269 m.record_bulk_write();
270 m.record_inserts(80);
271 m.record_upserts(15);
272 m.record_deletes(5);
273 m.record_error();
274
275 assert_eq!(m.records_written.get(), 100);
276 assert_eq!(m.bytes_written.get(), 5000);
277 assert_eq!(m.batches_flushed.get(), 1);
278 assert_eq!(m.bulk_writes.get(), 1);
279 assert_eq!(m.inserts.get(), 80);
280 assert_eq!(m.upserts.get(), 15);
281 assert_eq!(m.deletes.get(), 5);
282 assert_eq!(m.errors.get(), 1);
283 }
284}