laminar_connectors/cdc/mysql/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::prom::reg_or_local;
8
9#[derive(Debug, Clone)]
11pub struct MySqlCdcMetrics {
12 pub events_received: IntCounter,
14
15 pub inserts: IntCounter,
17
18 pub updates: IntCounter,
20
21 pub deletes: IntCounter,
23
24 pub transactions: IntCounter,
26
27 pub table_maps: IntCounter,
29
30 pub bytes_received: IntCounter,
32
33 pub errors: IntCounter,
35
36 pub heartbeats: IntCounter,
38
39 pub ddl_events: IntCounter,
41
42 pub binlog_position: IntGauge,
44}
45
46impl MySqlCdcMetrics {
47 #[must_use]
49 #[allow(clippy::missing_panics_doc)]
50 pub fn new(registry: Option<&Registry>) -> Self {
51 let mut local = None;
52 let reg = reg_or_local(registry, &mut local);
53
54 Self {
55 events_received: reg.counter(
56 "mysql_cdc_events_received_total",
57 "Total binlog events received",
58 ),
59 inserts: reg.counter("mysql_cdc_inserts_total", "Total INSERT row events"),
60 updates: reg.counter("mysql_cdc_updates_total", "Total UPDATE row events"),
61 deletes: reg.counter("mysql_cdc_deletes_total", "Total DELETE row events"),
62 transactions: reg.counter("mysql_cdc_transactions_total", "Total transactions"),
63 table_maps: reg.counter("mysql_cdc_table_maps_total", "Total TABLE_MAP events"),
64 bytes_received: reg
65 .counter("mysql_cdc_bytes_received_total", "Total bytes from binlog"),
66 errors: reg.counter("mysql_cdc_errors_total", "Total CDC errors"),
67 heartbeats: reg.counter("mysql_cdc_heartbeats_total", "Total heartbeats received"),
68 ddl_events: reg.counter("mysql_cdc_ddl_events_total", "Total DDL events"),
69 binlog_position: reg.gauge("mysql_cdc_binlog_position", "Current binlog position"),
70 }
71 }
72
73 pub fn inc_events_received(&self) {
75 self.events_received.inc();
76 }
77
78 pub fn inc_inserts(&self, count: u64) {
80 self.inserts.inc_by(count);
81 }
82
83 pub fn inc_updates(&self, count: u64) {
85 self.updates.inc_by(count);
86 }
87
88 pub fn inc_deletes(&self, count: u64) {
90 self.deletes.inc_by(count);
91 }
92
93 pub fn inc_transactions(&self) {
95 self.transactions.inc();
96 }
97
98 pub fn inc_table_maps(&self) {
100 self.table_maps.inc();
101 }
102
103 pub fn add_bytes_received(&self, bytes: u64) {
105 self.bytes_received.inc_by(bytes);
106 }
107
108 pub fn inc_errors(&self) {
110 self.errors.inc();
111 }
112
113 pub fn inc_heartbeats(&self) {
115 self.heartbeats.inc();
116 }
117
118 pub fn inc_ddl_events(&self) {
120 self.ddl_events.inc();
121 }
122
123 #[allow(clippy::cast_possible_wrap)]
125 pub fn set_binlog_position(&self, position: u64) {
126 self.binlog_position.set(position as i64);
127 }
128
129 #[must_use]
131 #[allow(clippy::cast_sign_loss)]
132 pub fn get_binlog_position(&self) -> u64 {
133 self.binlog_position.get() as u64
134 }
135
136 #[must_use]
138 pub fn total_row_events(&self) -> u64 {
139 self.inserts.get() + self.updates.get() + self.deletes.get()
140 }
141
142 #[must_use]
144 #[allow(clippy::cast_sign_loss)]
145 pub fn snapshot(&self) -> MetricsSnapshot {
146 MetricsSnapshot {
147 events_received: self.events_received.get(),
148 inserts: self.inserts.get(),
149 updates: self.updates.get(),
150 deletes: self.deletes.get(),
151 transactions: self.transactions.get(),
152 table_maps: self.table_maps.get(),
153 bytes_received: self.bytes_received.get(),
154 errors: self.errors.get(),
155 heartbeats: self.heartbeats.get(),
156 ddl_events: self.ddl_events.get(),
157 binlog_position: self.binlog_position.get() as u64,
158 }
159 }
160}
161
162impl Default for MySqlCdcMetrics {
163 fn default() -> Self {
164 Self::new(None)
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct MetricsSnapshot {
171 pub events_received: u64,
173 pub inserts: u64,
175 pub updates: u64,
177 pub deletes: u64,
179 pub transactions: u64,
181 pub table_maps: u64,
183 pub bytes_received: u64,
185 pub errors: u64,
187 pub heartbeats: u64,
189 pub ddl_events: u64,
191 pub binlog_position: u64,
193}
194
195impl MetricsSnapshot {
196 #[must_use]
198 pub fn total_row_events(&self) -> u64 {
199 self.inserts + self.updates + self.deletes
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn test_new_metrics() {
209 let m = MySqlCdcMetrics::new(None);
210 assert_eq!(m.events_received.get(), 0);
211 assert_eq!(m.inserts.get(), 0);
212 assert_eq!(m.errors.get(), 0);
213 }
214
215 #[test]
216 fn test_inc_events_received() {
217 let m = MySqlCdcMetrics::new(None);
218 m.inc_events_received();
219 m.inc_events_received();
220 assert_eq!(m.events_received.get(), 2);
221 }
222
223 #[test]
224 fn test_inc_row_events() {
225 let m = MySqlCdcMetrics::new(None);
226 m.inc_inserts(5);
227 m.inc_updates(3);
228 m.inc_deletes(2);
229 assert_eq!(m.total_row_events(), 10);
230 }
231
232 #[test]
233 fn test_add_bytes() {
234 let m = MySqlCdcMetrics::new(None);
235 m.add_bytes_received(100);
236 m.add_bytes_received(50);
237 assert_eq!(m.bytes_received.get(), 150);
238 }
239
240 #[test]
241 fn test_binlog_position() {
242 let m = MySqlCdcMetrics::new(None);
243 m.set_binlog_position(12345);
244 assert_eq!(m.get_binlog_position(), 12345);
245 }
246
247 #[test]
248 fn test_snapshot() {
249 let m = MySqlCdcMetrics::new(None);
250 m.inc_inserts(10);
251 m.inc_updates(5);
252 m.inc_transactions();
253
254 let snap = m.snapshot();
255 assert_eq!(snap.inserts, 10);
256 assert_eq!(snap.updates, 5);
257 assert_eq!(snap.transactions, 1);
258 assert_eq!(snap.total_row_events(), 15);
259 }
260
261 #[test]
262 fn test_inc_all_counters() {
263 let m = MySqlCdcMetrics::new(None);
264
265 m.inc_events_received();
266 m.inc_inserts(1);
267 m.inc_updates(1);
268 m.inc_deletes(1);
269 m.inc_transactions();
270 m.inc_table_maps();
271 m.add_bytes_received(100);
272 m.inc_errors();
273 m.inc_heartbeats();
274 m.inc_ddl_events();
275
276 let snap = m.snapshot();
277 assert_eq!(snap.events_received, 1);
278 assert_eq!(snap.inserts, 1);
279 assert_eq!(snap.updates, 1);
280 assert_eq!(snap.deletes, 1);
281 assert_eq!(snap.transactions, 1);
282 assert_eq!(snap.table_maps, 1);
283 assert_eq!(snap.bytes_received, 100);
284 assert_eq!(snap.errors, 1);
285 assert_eq!(snap.heartbeats, 1);
286 assert_eq!(snap.ddl_events, 1);
287 }
288}