laminar_connectors/cdc/mysql/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::metrics::ConnectorMetrics;
8
9#[derive(Debug, Clone)]
11pub struct MySqlCdcMetrics {
12 pub events_received: IntCounter,
14
15 pub inserts: IntCounter,
17
18 pub updates: IntCounter,
20
21 pub deletes: IntCounter,
23
24 pub transactions: IntCounter,
26
27 pub table_maps: IntCounter,
29
30 pub bytes_received: IntCounter,
32
33 pub errors: IntCounter,
35
36 pub heartbeats: IntCounter,
38
39 pub ddl_events: IntCounter,
41
42 pub binlog_position: IntGauge,
44}
45
46impl MySqlCdcMetrics {
47 #[must_use]
49 #[allow(clippy::missing_panics_doc)]
50 pub fn new(registry: Option<&Registry>) -> Self {
51 let local;
52 let reg = if let Some(r) = registry {
53 r
54 } else {
55 local = Registry::new();
56 &local
57 };
58
59 let events_received = IntCounter::new(
60 "mysql_cdc_events_received_total",
61 "Total binlog events received",
62 )
63 .unwrap();
64 let inserts =
65 IntCounter::new("mysql_cdc_inserts_total", "Total INSERT row events").unwrap();
66 let updates =
67 IntCounter::new("mysql_cdc_updates_total", "Total UPDATE row events").unwrap();
68 let deletes =
69 IntCounter::new("mysql_cdc_deletes_total", "Total DELETE row events").unwrap();
70 let transactions =
71 IntCounter::new("mysql_cdc_transactions_total", "Total transactions").unwrap();
72 let table_maps =
73 IntCounter::new("mysql_cdc_table_maps_total", "Total TABLE_MAP events").unwrap();
74 let bytes_received =
75 IntCounter::new("mysql_cdc_bytes_received_total", "Total bytes from binlog").unwrap();
76 let errors = IntCounter::new("mysql_cdc_errors_total", "Total CDC errors").unwrap();
77 let heartbeats =
78 IntCounter::new("mysql_cdc_heartbeats_total", "Total heartbeats received").unwrap();
79 let ddl_events = IntCounter::new("mysql_cdc_ddl_events_total", "Total DDL events").unwrap();
80 let binlog_position =
81 IntGauge::new("mysql_cdc_binlog_position", "Current binlog position").unwrap();
82
83 let _ = reg.register(Box::new(events_received.clone()));
84 let _ = reg.register(Box::new(inserts.clone()));
85 let _ = reg.register(Box::new(updates.clone()));
86 let _ = reg.register(Box::new(deletes.clone()));
87 let _ = reg.register(Box::new(transactions.clone()));
88 let _ = reg.register(Box::new(table_maps.clone()));
89 let _ = reg.register(Box::new(bytes_received.clone()));
90 let _ = reg.register(Box::new(errors.clone()));
91 let _ = reg.register(Box::new(heartbeats.clone()));
92 let _ = reg.register(Box::new(ddl_events.clone()));
93 let _ = reg.register(Box::new(binlog_position.clone()));
94
95 Self {
96 events_received,
97 inserts,
98 updates,
99 deletes,
100 transactions,
101 table_maps,
102 bytes_received,
103 errors,
104 heartbeats,
105 ddl_events,
106 binlog_position,
107 }
108 }
109
110 pub fn inc_events_received(&self) {
112 self.events_received.inc();
113 }
114
115 pub fn inc_inserts(&self, count: u64) {
117 self.inserts.inc_by(count);
118 }
119
120 pub fn inc_updates(&self, count: u64) {
122 self.updates.inc_by(count);
123 }
124
125 pub fn inc_deletes(&self, count: u64) {
127 self.deletes.inc_by(count);
128 }
129
130 pub fn inc_transactions(&self) {
132 self.transactions.inc();
133 }
134
135 pub fn inc_table_maps(&self) {
137 self.table_maps.inc();
138 }
139
140 pub fn add_bytes_received(&self, bytes: u64) {
142 self.bytes_received.inc_by(bytes);
143 }
144
145 pub fn inc_errors(&self) {
147 self.errors.inc();
148 }
149
150 pub fn inc_heartbeats(&self) {
152 self.heartbeats.inc();
153 }
154
155 pub fn inc_ddl_events(&self) {
157 self.ddl_events.inc();
158 }
159
160 #[allow(clippy::cast_possible_wrap)]
162 pub fn set_binlog_position(&self, position: u64) {
163 self.binlog_position.set(position as i64);
164 }
165
166 #[must_use]
168 #[allow(clippy::cast_sign_loss)]
169 pub fn get_binlog_position(&self) -> u64 {
170 self.binlog_position.get() as u64
171 }
172
173 #[must_use]
175 pub fn total_row_events(&self) -> u64 {
176 self.inserts.get() + self.updates.get() + self.deletes.get()
177 }
178
179 #[must_use]
181 #[allow(clippy::cast_sign_loss)]
182 pub fn snapshot(&self) -> MetricsSnapshot {
183 MetricsSnapshot {
184 events_received: self.events_received.get(),
185 inserts: self.inserts.get(),
186 updates: self.updates.get(),
187 deletes: self.deletes.get(),
188 transactions: self.transactions.get(),
189 table_maps: self.table_maps.get(),
190 bytes_received: self.bytes_received.get(),
191 errors: self.errors.get(),
192 heartbeats: self.heartbeats.get(),
193 ddl_events: self.ddl_events.get(),
194 binlog_position: self.binlog_position.get() as u64,
195 }
196 }
197
198 #[must_use]
200 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
201 ConnectorMetrics {
202 records_total: self.total_row_events(),
203 bytes_total: self.bytes_received.get(),
204 errors_total: self.errors.get(),
205 ..ConnectorMetrics::default()
206 }
207 }
208}
209
210impl Default for MySqlCdcMetrics {
211 fn default() -> Self {
212 Self::new(None)
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
218pub struct MetricsSnapshot {
219 pub events_received: u64,
221 pub inserts: u64,
223 pub updates: u64,
225 pub deletes: u64,
227 pub transactions: u64,
229 pub table_maps: u64,
231 pub bytes_received: u64,
233 pub errors: u64,
235 pub heartbeats: u64,
237 pub ddl_events: u64,
239 pub binlog_position: u64,
241}
242
243impl MetricsSnapshot {
244 #[must_use]
246 pub fn total_row_events(&self) -> u64 {
247 self.inserts + self.updates + self.deletes
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn test_new_metrics() {
257 let m = MySqlCdcMetrics::new(None);
258 assert_eq!(m.events_received.get(), 0);
259 assert_eq!(m.inserts.get(), 0);
260 assert_eq!(m.errors.get(), 0);
261 }
262
263 #[test]
264 fn test_inc_events_received() {
265 let m = MySqlCdcMetrics::new(None);
266 m.inc_events_received();
267 m.inc_events_received();
268 assert_eq!(m.events_received.get(), 2);
269 }
270
271 #[test]
272 fn test_inc_row_events() {
273 let m = MySqlCdcMetrics::new(None);
274 m.inc_inserts(5);
275 m.inc_updates(3);
276 m.inc_deletes(2);
277 assert_eq!(m.total_row_events(), 10);
278 }
279
280 #[test]
281 fn test_add_bytes() {
282 let m = MySqlCdcMetrics::new(None);
283 m.add_bytes_received(100);
284 m.add_bytes_received(50);
285 assert_eq!(m.bytes_received.get(), 150);
286 }
287
288 #[test]
289 fn test_binlog_position() {
290 let m = MySqlCdcMetrics::new(None);
291 m.set_binlog_position(12345);
292 assert_eq!(m.get_binlog_position(), 12345);
293 }
294
295 #[test]
296 fn test_snapshot() {
297 let m = MySqlCdcMetrics::new(None);
298 m.inc_inserts(10);
299 m.inc_updates(5);
300 m.inc_transactions();
301
302 let snap = m.snapshot();
303 assert_eq!(snap.inserts, 10);
304 assert_eq!(snap.updates, 5);
305 assert_eq!(snap.transactions, 1);
306 assert_eq!(snap.total_row_events(), 15);
307 }
308
309 #[test]
310 fn test_to_connector_metrics() {
311 let m = MySqlCdcMetrics::new(None);
312 m.inc_inserts(10);
313 m.add_bytes_received(1000);
314 m.inc_errors();
315
316 let cm = m.to_connector_metrics();
317 assert_eq!(cm.records_total, 10);
318 assert_eq!(cm.bytes_total, 1000);
319 assert_eq!(cm.errors_total, 1);
320 }
321
322 #[test]
323 fn test_inc_all_counters() {
324 let m = MySqlCdcMetrics::new(None);
325
326 m.inc_events_received();
327 m.inc_inserts(1);
328 m.inc_updates(1);
329 m.inc_deletes(1);
330 m.inc_transactions();
331 m.inc_table_maps();
332 m.add_bytes_received(100);
333 m.inc_errors();
334 m.inc_heartbeats();
335 m.inc_ddl_events();
336
337 let snap = m.snapshot();
338 assert_eq!(snap.events_received, 1);
339 assert_eq!(snap.inserts, 1);
340 assert_eq!(snap.updates, 1);
341 assert_eq!(snap.deletes, 1);
342 assert_eq!(snap.transactions, 1);
343 assert_eq!(snap.table_maps, 1);
344 assert_eq!(snap.bytes_received, 100);
345 assert_eq!(snap.errors, 1);
346 assert_eq!(snap.heartbeats, 1);
347 assert_eq!(snap.ddl_events, 1);
348 }
349}