Skip to main content

laminar_connectors/cdc/mysql/
metrics.rs

1//! MySQL CDC source connector metrics.
2//!
3//! Lock-free atomic counters for CDC event processing statistics.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use crate::metrics::ConnectorMetrics;
8
9/// Metrics for MySQL CDC source connector.
10#[derive(Debug, Default)]
11pub struct MySqlCdcMetrics {
12    /// Total number of binlog events received.
13    pub events_received: AtomicU64,
14
15    /// Total number of INSERT row events processed.
16    pub inserts: AtomicU64,
17
18    /// Total number of UPDATE row events processed.
19    pub updates: AtomicU64,
20
21    /// Total number of DELETE row events processed.
22    pub deletes: AtomicU64,
23
24    /// Total number of transactions seen.
25    pub transactions: AtomicU64,
26
27    /// Total number of TABLE_MAP events processed.
28    pub table_maps: AtomicU64,
29
30    /// Total number of bytes received from binlog.
31    pub bytes_received: AtomicU64,
32
33    /// Total number of errors encountered.
34    pub errors: AtomicU64,
35
36    /// Total number of heartbeats received.
37    pub heartbeats: AtomicU64,
38
39    /// Total number of DDL (query) events.
40    pub ddl_events: AtomicU64,
41
42    /// Current binlog position (low 32 bits).
43    pub binlog_position: AtomicU64,
44}
45
46impl MySqlCdcMetrics {
47    /// Creates new metrics with all counters at zero.
48    #[must_use]
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    /// Increments the binlog events received counter.
54    pub fn inc_events_received(&self) {
55        self.events_received.fetch_add(1, Ordering::Relaxed);
56    }
57
58    /// Increments the INSERT row event counter by `count`.
59    pub fn inc_inserts(&self, count: u64) {
60        self.inserts.fetch_add(count, Ordering::Relaxed);
61    }
62
63    /// Increments the UPDATE row event counter by `count`.
64    pub fn inc_updates(&self, count: u64) {
65        self.updates.fetch_add(count, Ordering::Relaxed);
66    }
67
68    /// Increments the DELETE row event counter by `count`.
69    pub fn inc_deletes(&self, count: u64) {
70        self.deletes.fetch_add(count, Ordering::Relaxed);
71    }
72
73    /// Increments the transaction counter.
74    pub fn inc_transactions(&self) {
75        self.transactions.fetch_add(1, Ordering::Relaxed);
76    }
77
78    /// Increments the TABLE_MAP event counter.
79    pub fn inc_table_maps(&self) {
80        self.table_maps.fetch_add(1, Ordering::Relaxed);
81    }
82
83    /// Adds bytes to the bytes received counter.
84    pub fn add_bytes_received(&self, bytes: u64) {
85        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
86    }
87
88    /// Increments the error counter.
89    pub fn inc_errors(&self) {
90        self.errors.fetch_add(1, Ordering::Relaxed);
91    }
92
93    /// Increments the heartbeat counter.
94    pub fn inc_heartbeats(&self) {
95        self.heartbeats.fetch_add(1, Ordering::Relaxed);
96    }
97
98    /// Increments the DDL event counter.
99    pub fn inc_ddl_events(&self) {
100        self.ddl_events.fetch_add(1, Ordering::Relaxed);
101    }
102
103    /// Updates the current binlog position.
104    pub fn set_binlog_position(&self, position: u64) {
105        self.binlog_position.store(position, Ordering::Relaxed);
106    }
107
108    /// Returns the current binlog position.
109    #[must_use]
110    pub fn get_binlog_position(&self) -> u64 {
111        self.binlog_position.load(Ordering::Relaxed)
112    }
113
114    /// Returns the total number of row events (insert + update + delete).
115    #[must_use]
116    pub fn total_row_events(&self) -> u64 {
117        self.inserts.load(Ordering::Relaxed)
118            + self.updates.load(Ordering::Relaxed)
119            + self.deletes.load(Ordering::Relaxed)
120    }
121
122    /// Returns a snapshot of all metrics.
123    #[must_use]
124    pub fn snapshot(&self) -> MetricsSnapshot {
125        MetricsSnapshot {
126            events_received: self.events_received.load(Ordering::Relaxed),
127            inserts: self.inserts.load(Ordering::Relaxed),
128            updates: self.updates.load(Ordering::Relaxed),
129            deletes: self.deletes.load(Ordering::Relaxed),
130            transactions: self.transactions.load(Ordering::Relaxed),
131            table_maps: self.table_maps.load(Ordering::Relaxed),
132            bytes_received: self.bytes_received.load(Ordering::Relaxed),
133            errors: self.errors.load(Ordering::Relaxed),
134            heartbeats: self.heartbeats.load(Ordering::Relaxed),
135            ddl_events: self.ddl_events.load(Ordering::Relaxed),
136            binlog_position: self.binlog_position.load(Ordering::Relaxed),
137        }
138    }
139
140    /// Converts to generic connector metrics.
141    #[must_use]
142    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
143        ConnectorMetrics {
144            records_total: self.total_row_events(),
145            bytes_total: self.bytes_received.load(Ordering::Relaxed),
146            errors_total: self.errors.load(Ordering::Relaxed),
147            ..ConnectorMetrics::default()
148        }
149    }
150
151    /// Resets all counters to zero.
152    pub fn reset(&self) {
153        self.events_received.store(0, Ordering::Relaxed);
154        self.inserts.store(0, Ordering::Relaxed);
155        self.updates.store(0, Ordering::Relaxed);
156        self.deletes.store(0, Ordering::Relaxed);
157        self.transactions.store(0, Ordering::Relaxed);
158        self.table_maps.store(0, Ordering::Relaxed);
159        self.bytes_received.store(0, Ordering::Relaxed);
160        self.errors.store(0, Ordering::Relaxed);
161        self.heartbeats.store(0, Ordering::Relaxed);
162        self.ddl_events.store(0, Ordering::Relaxed);
163        self.binlog_position.store(0, Ordering::Relaxed);
164    }
165}
166
167/// A snapshot of metrics values at a point in time.
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct MetricsSnapshot {
170    /// Total binlog events received.
171    pub events_received: u64,
172    /// Total INSERT operations.
173    pub inserts: u64,
174    /// Total UPDATE operations.
175    pub updates: u64,
176    /// Total DELETE operations.
177    pub deletes: u64,
178    /// Total transactions.
179    pub transactions: u64,
180    /// Total TABLE_MAP events.
181    pub table_maps: u64,
182    /// Total bytes received.
183    pub bytes_received: u64,
184    /// Total errors.
185    pub errors: u64,
186    /// Total heartbeats.
187    pub heartbeats: u64,
188    /// Total DDL events.
189    pub ddl_events: u64,
190    /// Current binlog position.
191    pub binlog_position: u64,
192}
193
194impl MetricsSnapshot {
195    /// Returns the total row events.
196    #[must_use]
197    pub fn total_row_events(&self) -> u64 {
198        self.inserts + self.updates + self.deletes
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn test_new_metrics() {
208        let m = MySqlCdcMetrics::new();
209        assert_eq!(m.events_received.load(Ordering::Relaxed), 0);
210        assert_eq!(m.inserts.load(Ordering::Relaxed), 0);
211        assert_eq!(m.errors.load(Ordering::Relaxed), 0);
212    }
213
214    #[test]
215    fn test_inc_events_received() {
216        let m = MySqlCdcMetrics::new();
217        m.inc_events_received();
218        m.inc_events_received();
219        assert_eq!(m.events_received.load(Ordering::Relaxed), 2);
220    }
221
222    #[test]
223    fn test_inc_row_events() {
224        let m = MySqlCdcMetrics::new();
225        m.inc_inserts(5);
226        m.inc_updates(3);
227        m.inc_deletes(2);
228        assert_eq!(m.total_row_events(), 10);
229    }
230
231    #[test]
232    fn test_add_bytes() {
233        let m = MySqlCdcMetrics::new();
234        m.add_bytes_received(100);
235        m.add_bytes_received(50);
236        assert_eq!(m.bytes_received.load(Ordering::Relaxed), 150);
237    }
238
239    #[test]
240    fn test_binlog_position() {
241        let m = MySqlCdcMetrics::new();
242        m.set_binlog_position(12345);
243        assert_eq!(m.get_binlog_position(), 12345);
244    }
245
246    #[test]
247    fn test_snapshot() {
248        let m = MySqlCdcMetrics::new();
249        m.inc_inserts(10);
250        m.inc_updates(5);
251        m.inc_transactions();
252
253        let snap = m.snapshot();
254        assert_eq!(snap.inserts, 10);
255        assert_eq!(snap.updates, 5);
256        assert_eq!(snap.transactions, 1);
257        assert_eq!(snap.total_row_events(), 15);
258    }
259
260    #[test]
261    fn test_to_connector_metrics() {
262        let m = MySqlCdcMetrics::new();
263        m.inc_inserts(10);
264        m.add_bytes_received(1000);
265        m.inc_errors();
266
267        let cm = m.to_connector_metrics();
268        assert_eq!(cm.records_total, 10);
269        assert_eq!(cm.bytes_total, 1000);
270        assert_eq!(cm.errors_total, 1);
271    }
272
273    #[test]
274    fn test_reset() {
275        let m = MySqlCdcMetrics::new();
276        m.inc_inserts(10);
277        m.inc_errors();
278        m.set_binlog_position(12345);
279
280        m.reset();
281
282        assert_eq!(m.inserts.load(Ordering::Relaxed), 0);
283        assert_eq!(m.errors.load(Ordering::Relaxed), 0);
284        assert_eq!(m.binlog_position.load(Ordering::Relaxed), 0);
285    }
286
287    #[test]
288    fn test_inc_all_counters() {
289        let m = MySqlCdcMetrics::new();
290
291        m.inc_events_received();
292        m.inc_inserts(1);
293        m.inc_updates(1);
294        m.inc_deletes(1);
295        m.inc_transactions();
296        m.inc_table_maps();
297        m.add_bytes_received(100);
298        m.inc_errors();
299        m.inc_heartbeats();
300        m.inc_ddl_events();
301
302        let snap = m.snapshot();
303        assert_eq!(snap.events_received, 1);
304        assert_eq!(snap.inserts, 1);
305        assert_eq!(snap.updates, 1);
306        assert_eq!(snap.deletes, 1);
307        assert_eq!(snap.transactions, 1);
308        assert_eq!(snap.table_maps, 1);
309        assert_eq!(snap.bytes_received, 100);
310        assert_eq!(snap.errors, 1);
311        assert_eq!(snap.heartbeats, 1);
312        assert_eq!(snap.ddl_events, 1);
313    }
314}