laminar_connectors/cdc/mysql/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
6
7use crate::metrics::ConnectorMetrics;
8
9#[derive(Debug, Default)]
11pub struct MySqlCdcMetrics {
12 pub events_received: AtomicU64,
14
15 pub inserts: AtomicU64,
17
18 pub updates: AtomicU64,
20
21 pub deletes: AtomicU64,
23
24 pub transactions: AtomicU64,
26
27 pub table_maps: AtomicU64,
29
30 pub bytes_received: AtomicU64,
32
33 pub errors: AtomicU64,
35
36 pub heartbeats: AtomicU64,
38
39 pub ddl_events: AtomicU64,
41
42 pub binlog_position: AtomicU64,
44}
45
46impl MySqlCdcMetrics {
47 #[must_use]
49 pub fn new() -> Self {
50 Self::default()
51 }
52
53 pub fn inc_events_received(&self) {
55 self.events_received.fetch_add(1, Ordering::Relaxed);
56 }
57
58 pub fn inc_inserts(&self, count: u64) {
60 self.inserts.fetch_add(count, Ordering::Relaxed);
61 }
62
63 pub fn inc_updates(&self, count: u64) {
65 self.updates.fetch_add(count, Ordering::Relaxed);
66 }
67
68 pub fn inc_deletes(&self, count: u64) {
70 self.deletes.fetch_add(count, Ordering::Relaxed);
71 }
72
73 pub fn inc_transactions(&self) {
75 self.transactions.fetch_add(1, Ordering::Relaxed);
76 }
77
78 pub fn inc_table_maps(&self) {
80 self.table_maps.fetch_add(1, Ordering::Relaxed);
81 }
82
83 pub fn add_bytes_received(&self, bytes: u64) {
85 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
86 }
87
88 pub fn inc_errors(&self) {
90 self.errors.fetch_add(1, Ordering::Relaxed);
91 }
92
93 pub fn inc_heartbeats(&self) {
95 self.heartbeats.fetch_add(1, Ordering::Relaxed);
96 }
97
98 pub fn inc_ddl_events(&self) {
100 self.ddl_events.fetch_add(1, Ordering::Relaxed);
101 }
102
103 pub fn set_binlog_position(&self, position: u64) {
105 self.binlog_position.store(position, Ordering::Relaxed);
106 }
107
108 #[must_use]
110 pub fn get_binlog_position(&self) -> u64 {
111 self.binlog_position.load(Ordering::Relaxed)
112 }
113
114 #[must_use]
116 pub fn total_row_events(&self) -> u64 {
117 self.inserts.load(Ordering::Relaxed)
118 + self.updates.load(Ordering::Relaxed)
119 + self.deletes.load(Ordering::Relaxed)
120 }
121
122 #[must_use]
124 pub fn snapshot(&self) -> MetricsSnapshot {
125 MetricsSnapshot {
126 events_received: self.events_received.load(Ordering::Relaxed),
127 inserts: self.inserts.load(Ordering::Relaxed),
128 updates: self.updates.load(Ordering::Relaxed),
129 deletes: self.deletes.load(Ordering::Relaxed),
130 transactions: self.transactions.load(Ordering::Relaxed),
131 table_maps: self.table_maps.load(Ordering::Relaxed),
132 bytes_received: self.bytes_received.load(Ordering::Relaxed),
133 errors: self.errors.load(Ordering::Relaxed),
134 heartbeats: self.heartbeats.load(Ordering::Relaxed),
135 ddl_events: self.ddl_events.load(Ordering::Relaxed),
136 binlog_position: self.binlog_position.load(Ordering::Relaxed),
137 }
138 }
139
140 #[must_use]
142 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
143 ConnectorMetrics {
144 records_total: self.total_row_events(),
145 bytes_total: self.bytes_received.load(Ordering::Relaxed),
146 errors_total: self.errors.load(Ordering::Relaxed),
147 ..ConnectorMetrics::default()
148 }
149 }
150
151 pub fn reset(&self) {
153 self.events_received.store(0, Ordering::Relaxed);
154 self.inserts.store(0, Ordering::Relaxed);
155 self.updates.store(0, Ordering::Relaxed);
156 self.deletes.store(0, Ordering::Relaxed);
157 self.transactions.store(0, Ordering::Relaxed);
158 self.table_maps.store(0, Ordering::Relaxed);
159 self.bytes_received.store(0, Ordering::Relaxed);
160 self.errors.store(0, Ordering::Relaxed);
161 self.heartbeats.store(0, Ordering::Relaxed);
162 self.ddl_events.store(0, Ordering::Relaxed);
163 self.binlog_position.store(0, Ordering::Relaxed);
164 }
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct MetricsSnapshot {
170 pub events_received: u64,
172 pub inserts: u64,
174 pub updates: u64,
176 pub deletes: u64,
178 pub transactions: u64,
180 pub table_maps: u64,
182 pub bytes_received: u64,
184 pub errors: u64,
186 pub heartbeats: u64,
188 pub ddl_events: u64,
190 pub binlog_position: u64,
192}
193
194impl MetricsSnapshot {
195 #[must_use]
197 pub fn total_row_events(&self) -> u64 {
198 self.inserts + self.updates + self.deletes
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn test_new_metrics() {
208 let m = MySqlCdcMetrics::new();
209 assert_eq!(m.events_received.load(Ordering::Relaxed), 0);
210 assert_eq!(m.inserts.load(Ordering::Relaxed), 0);
211 assert_eq!(m.errors.load(Ordering::Relaxed), 0);
212 }
213
214 #[test]
215 fn test_inc_events_received() {
216 let m = MySqlCdcMetrics::new();
217 m.inc_events_received();
218 m.inc_events_received();
219 assert_eq!(m.events_received.load(Ordering::Relaxed), 2);
220 }
221
222 #[test]
223 fn test_inc_row_events() {
224 let m = MySqlCdcMetrics::new();
225 m.inc_inserts(5);
226 m.inc_updates(3);
227 m.inc_deletes(2);
228 assert_eq!(m.total_row_events(), 10);
229 }
230
231 #[test]
232 fn test_add_bytes() {
233 let m = MySqlCdcMetrics::new();
234 m.add_bytes_received(100);
235 m.add_bytes_received(50);
236 assert_eq!(m.bytes_received.load(Ordering::Relaxed), 150);
237 }
238
239 #[test]
240 fn test_binlog_position() {
241 let m = MySqlCdcMetrics::new();
242 m.set_binlog_position(12345);
243 assert_eq!(m.get_binlog_position(), 12345);
244 }
245
246 #[test]
247 fn test_snapshot() {
248 let m = MySqlCdcMetrics::new();
249 m.inc_inserts(10);
250 m.inc_updates(5);
251 m.inc_transactions();
252
253 let snap = m.snapshot();
254 assert_eq!(snap.inserts, 10);
255 assert_eq!(snap.updates, 5);
256 assert_eq!(snap.transactions, 1);
257 assert_eq!(snap.total_row_events(), 15);
258 }
259
260 #[test]
261 fn test_to_connector_metrics() {
262 let m = MySqlCdcMetrics::new();
263 m.inc_inserts(10);
264 m.add_bytes_received(1000);
265 m.inc_errors();
266
267 let cm = m.to_connector_metrics();
268 assert_eq!(cm.records_total, 10);
269 assert_eq!(cm.bytes_total, 1000);
270 assert_eq!(cm.errors_total, 1);
271 }
272
273 #[test]
274 fn test_reset() {
275 let m = MySqlCdcMetrics::new();
276 m.inc_inserts(10);
277 m.inc_errors();
278 m.set_binlog_position(12345);
279
280 m.reset();
281
282 assert_eq!(m.inserts.load(Ordering::Relaxed), 0);
283 assert_eq!(m.errors.load(Ordering::Relaxed), 0);
284 assert_eq!(m.binlog_position.load(Ordering::Relaxed), 0);
285 }
286
287 #[test]
288 fn test_inc_all_counters() {
289 let m = MySqlCdcMetrics::new();
290
291 m.inc_events_received();
292 m.inc_inserts(1);
293 m.inc_updates(1);
294 m.inc_deletes(1);
295 m.inc_transactions();
296 m.inc_table_maps();
297 m.add_bytes_received(100);
298 m.inc_errors();
299 m.inc_heartbeats();
300 m.inc_ddl_events();
301
302 let snap = m.snapshot();
303 assert_eq!(snap.events_received, 1);
304 assert_eq!(snap.inserts, 1);
305 assert_eq!(snap.updates, 1);
306 assert_eq!(snap.deletes, 1);
307 assert_eq!(snap.transactions, 1);
308 assert_eq!(snap.table_maps, 1);
309 assert_eq!(snap.bytes_received, 100);
310 assert_eq!(snap.errors, 1);
311 assert_eq!(snap.heartbeats, 1);
312 assert_eq!(snap.ddl_events, 1);
313 }
314}