Skip to main content

laminar_connectors/cdc/mysql/
metrics.rs

1//! MySQL CDC source connector metrics.
2//!
3//! Prometheus-backed counters/gauges for CDC event processing statistics.
4
5use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::prom::reg_or_local;
8
9/// Metrics for MySQL CDC source connector.
10#[derive(Debug, Clone)]
11pub struct MySqlCdcMetrics {
12    /// Total number of binlog events received.
13    pub events_received: IntCounter,
14
15    /// Total number of INSERT row events processed.
16    pub inserts: IntCounter,
17
18    /// Total number of UPDATE row events processed.
19    pub updates: IntCounter,
20
21    /// Total number of DELETE row events processed.
22    pub deletes: IntCounter,
23
24    /// Total number of transactions seen.
25    pub transactions: IntCounter,
26
27    /// Total number of TABLE_MAP events processed.
28    pub table_maps: IntCounter,
29
30    /// Total number of bytes received from binlog.
31    pub bytes_received: IntCounter,
32
33    /// Total number of errors encountered.
34    pub errors: IntCounter,
35
36    /// Total number of heartbeats received.
37    pub heartbeats: IntCounter,
38
39    /// Total number of DDL (query) events.
40    pub ddl_events: IntCounter,
41
42    /// Current binlog position (low 32 bits).
43    pub binlog_position: IntGauge,
44}
45
46impl MySqlCdcMetrics {
47    /// Creates new metrics with all counters at zero.
48    #[must_use]
49    #[allow(clippy::missing_panics_doc)]
50    pub fn new(registry: Option<&Registry>) -> Self {
51        let mut local = None;
52        let reg = reg_or_local(registry, &mut local);
53
54        Self {
55            events_received: reg.counter(
56                "mysql_cdc_events_received_total",
57                "Total binlog events received",
58            ),
59            inserts: reg.counter("mysql_cdc_inserts_total", "Total INSERT row events"),
60            updates: reg.counter("mysql_cdc_updates_total", "Total UPDATE row events"),
61            deletes: reg.counter("mysql_cdc_deletes_total", "Total DELETE row events"),
62            transactions: reg.counter("mysql_cdc_transactions_total", "Total transactions"),
63            table_maps: reg.counter("mysql_cdc_table_maps_total", "Total TABLE_MAP events"),
64            bytes_received: reg
65                .counter("mysql_cdc_bytes_received_total", "Total bytes from binlog"),
66            errors: reg.counter("mysql_cdc_errors_total", "Total CDC errors"),
67            heartbeats: reg.counter("mysql_cdc_heartbeats_total", "Total heartbeats received"),
68            ddl_events: reg.counter("mysql_cdc_ddl_events_total", "Total DDL events"),
69            binlog_position: reg.gauge("mysql_cdc_binlog_position", "Current binlog position"),
70        }
71    }
72
73    /// Increments the binlog events received counter.
74    pub fn inc_events_received(&self) {
75        self.events_received.inc();
76    }
77
78    /// Increments the INSERT row event counter by `count`.
79    pub fn inc_inserts(&self, count: u64) {
80        self.inserts.inc_by(count);
81    }
82
83    /// Increments the UPDATE row event counter by `count`.
84    pub fn inc_updates(&self, count: u64) {
85        self.updates.inc_by(count);
86    }
87
88    /// Increments the DELETE row event counter by `count`.
89    pub fn inc_deletes(&self, count: u64) {
90        self.deletes.inc_by(count);
91    }
92
93    /// Increments the transaction counter.
94    pub fn inc_transactions(&self) {
95        self.transactions.inc();
96    }
97
98    /// Increments the TABLE_MAP event counter.
99    pub fn inc_table_maps(&self) {
100        self.table_maps.inc();
101    }
102
103    /// Adds bytes to the bytes received counter.
104    pub fn add_bytes_received(&self, bytes: u64) {
105        self.bytes_received.inc_by(bytes);
106    }
107
108    /// Increments the error counter.
109    pub fn inc_errors(&self) {
110        self.errors.inc();
111    }
112
113    /// Increments the heartbeat counter.
114    pub fn inc_heartbeats(&self) {
115        self.heartbeats.inc();
116    }
117
118    /// Increments the DDL event counter.
119    pub fn inc_ddl_events(&self) {
120        self.ddl_events.inc();
121    }
122
123    /// Updates the current binlog position.
124    #[allow(clippy::cast_possible_wrap)]
125    pub fn set_binlog_position(&self, position: u64) {
126        self.binlog_position.set(position as i64);
127    }
128
129    /// Returns the current binlog position.
130    #[must_use]
131    #[allow(clippy::cast_sign_loss)]
132    pub fn get_binlog_position(&self) -> u64 {
133        self.binlog_position.get() as u64
134    }
135
136    /// Returns the total number of row events (insert + update + delete).
137    #[must_use]
138    pub fn total_row_events(&self) -> u64 {
139        self.inserts.get() + self.updates.get() + self.deletes.get()
140    }
141
142    /// Returns a snapshot of all metrics.
143    #[must_use]
144    #[allow(clippy::cast_sign_loss)]
145    pub fn snapshot(&self) -> MetricsSnapshot {
146        MetricsSnapshot {
147            events_received: self.events_received.get(),
148            inserts: self.inserts.get(),
149            updates: self.updates.get(),
150            deletes: self.deletes.get(),
151            transactions: self.transactions.get(),
152            table_maps: self.table_maps.get(),
153            bytes_received: self.bytes_received.get(),
154            errors: self.errors.get(),
155            heartbeats: self.heartbeats.get(),
156            ddl_events: self.ddl_events.get(),
157            binlog_position: self.binlog_position.get() as u64,
158        }
159    }
160}
161
162impl Default for MySqlCdcMetrics {
163    fn default() -> Self {
164        Self::new(None)
165    }
166}
167
168/// A snapshot of metrics values at a point in time.
169#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct MetricsSnapshot {
171    /// Total binlog events received.
172    pub events_received: u64,
173    /// Total INSERT operations.
174    pub inserts: u64,
175    /// Total UPDATE operations.
176    pub updates: u64,
177    /// Total DELETE operations.
178    pub deletes: u64,
179    /// Total transactions.
180    pub transactions: u64,
181    /// Total TABLE_MAP events.
182    pub table_maps: u64,
183    /// Total bytes received.
184    pub bytes_received: u64,
185    /// Total errors.
186    pub errors: u64,
187    /// Total heartbeats.
188    pub heartbeats: u64,
189    /// Total DDL events.
190    pub ddl_events: u64,
191    /// Current binlog position.
192    pub binlog_position: u64,
193}
194
195impl MetricsSnapshot {
196    /// Returns the total row events.
197    #[must_use]
198    pub fn total_row_events(&self) -> u64 {
199        self.inserts + self.updates + self.deletes
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn test_new_metrics() {
209        let m = MySqlCdcMetrics::new(None);
210        assert_eq!(m.events_received.get(), 0);
211        assert_eq!(m.inserts.get(), 0);
212        assert_eq!(m.errors.get(), 0);
213    }
214
215    #[test]
216    fn test_inc_events_received() {
217        let m = MySqlCdcMetrics::new(None);
218        m.inc_events_received();
219        m.inc_events_received();
220        assert_eq!(m.events_received.get(), 2);
221    }
222
223    #[test]
224    fn test_inc_row_events() {
225        let m = MySqlCdcMetrics::new(None);
226        m.inc_inserts(5);
227        m.inc_updates(3);
228        m.inc_deletes(2);
229        assert_eq!(m.total_row_events(), 10);
230    }
231
232    #[test]
233    fn test_add_bytes() {
234        let m = MySqlCdcMetrics::new(None);
235        m.add_bytes_received(100);
236        m.add_bytes_received(50);
237        assert_eq!(m.bytes_received.get(), 150);
238    }
239
240    #[test]
241    fn test_binlog_position() {
242        let m = MySqlCdcMetrics::new(None);
243        m.set_binlog_position(12345);
244        assert_eq!(m.get_binlog_position(), 12345);
245    }
246
247    #[test]
248    fn test_snapshot() {
249        let m = MySqlCdcMetrics::new(None);
250        m.inc_inserts(10);
251        m.inc_updates(5);
252        m.inc_transactions();
253
254        let snap = m.snapshot();
255        assert_eq!(snap.inserts, 10);
256        assert_eq!(snap.updates, 5);
257        assert_eq!(snap.transactions, 1);
258        assert_eq!(snap.total_row_events(), 15);
259    }
260
261    #[test]
262    fn test_inc_all_counters() {
263        let m = MySqlCdcMetrics::new(None);
264
265        m.inc_events_received();
266        m.inc_inserts(1);
267        m.inc_updates(1);
268        m.inc_deletes(1);
269        m.inc_transactions();
270        m.inc_table_maps();
271        m.add_bytes_received(100);
272        m.inc_errors();
273        m.inc_heartbeats();
274        m.inc_ddl_events();
275
276        let snap = m.snapshot();
277        assert_eq!(snap.events_received, 1);
278        assert_eq!(snap.inserts, 1);
279        assert_eq!(snap.updates, 1);
280        assert_eq!(snap.deletes, 1);
281        assert_eq!(snap.transactions, 1);
282        assert_eq!(snap.table_maps, 1);
283        assert_eq!(snap.bytes_received, 100);
284        assert_eq!(snap.errors, 1);
285        assert_eq!(snap.heartbeats, 1);
286        assert_eq!(snap.ddl_events, 1);
287    }
288}