Skip to main content

laminar_connectors/cdc/mysql/
metrics.rs

1//! MySQL CDC source connector metrics.
2//!
3//! Prometheus-backed counters/gauges for CDC event processing statistics.
4
5use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::metrics::ConnectorMetrics;
8
9/// Metrics for MySQL CDC source connector.
10#[derive(Debug, Clone)]
11pub struct MySqlCdcMetrics {
12    /// Total number of binlog events received.
13    pub events_received: IntCounter,
14
15    /// Total number of INSERT row events processed.
16    pub inserts: IntCounter,
17
18    /// Total number of UPDATE row events processed.
19    pub updates: IntCounter,
20
21    /// Total number of DELETE row events processed.
22    pub deletes: IntCounter,
23
24    /// Total number of transactions seen.
25    pub transactions: IntCounter,
26
27    /// Total number of TABLE_MAP events processed.
28    pub table_maps: IntCounter,
29
30    /// Total number of bytes received from binlog.
31    pub bytes_received: IntCounter,
32
33    /// Total number of errors encountered.
34    pub errors: IntCounter,
35
36    /// Total number of heartbeats received.
37    pub heartbeats: IntCounter,
38
39    /// Total number of DDL (query) events.
40    pub ddl_events: IntCounter,
41
42    /// Current binlog position (low 32 bits).
43    pub binlog_position: IntGauge,
44}
45
46impl MySqlCdcMetrics {
47    /// Creates new metrics with all counters at zero.
48    #[must_use]
49    #[allow(clippy::missing_panics_doc)]
50    pub fn new(registry: Option<&Registry>) -> Self {
51        let local;
52        let reg = if let Some(r) = registry {
53            r
54        } else {
55            local = Registry::new();
56            &local
57        };
58
59        let events_received = IntCounter::new(
60            "mysql_cdc_events_received_total",
61            "Total binlog events received",
62        )
63        .unwrap();
64        let inserts =
65            IntCounter::new("mysql_cdc_inserts_total", "Total INSERT row events").unwrap();
66        let updates =
67            IntCounter::new("mysql_cdc_updates_total", "Total UPDATE row events").unwrap();
68        let deletes =
69            IntCounter::new("mysql_cdc_deletes_total", "Total DELETE row events").unwrap();
70        let transactions =
71            IntCounter::new("mysql_cdc_transactions_total", "Total transactions").unwrap();
72        let table_maps =
73            IntCounter::new("mysql_cdc_table_maps_total", "Total TABLE_MAP events").unwrap();
74        let bytes_received =
75            IntCounter::new("mysql_cdc_bytes_received_total", "Total bytes from binlog").unwrap();
76        let errors = IntCounter::new("mysql_cdc_errors_total", "Total CDC errors").unwrap();
77        let heartbeats =
78            IntCounter::new("mysql_cdc_heartbeats_total", "Total heartbeats received").unwrap();
79        let ddl_events = IntCounter::new("mysql_cdc_ddl_events_total", "Total DDL events").unwrap();
80        let binlog_position =
81            IntGauge::new("mysql_cdc_binlog_position", "Current binlog position").unwrap();
82
83        let _ = reg.register(Box::new(events_received.clone()));
84        let _ = reg.register(Box::new(inserts.clone()));
85        let _ = reg.register(Box::new(updates.clone()));
86        let _ = reg.register(Box::new(deletes.clone()));
87        let _ = reg.register(Box::new(transactions.clone()));
88        let _ = reg.register(Box::new(table_maps.clone()));
89        let _ = reg.register(Box::new(bytes_received.clone()));
90        let _ = reg.register(Box::new(errors.clone()));
91        let _ = reg.register(Box::new(heartbeats.clone()));
92        let _ = reg.register(Box::new(ddl_events.clone()));
93        let _ = reg.register(Box::new(binlog_position.clone()));
94
95        Self {
96            events_received,
97            inserts,
98            updates,
99            deletes,
100            transactions,
101            table_maps,
102            bytes_received,
103            errors,
104            heartbeats,
105            ddl_events,
106            binlog_position,
107        }
108    }
109
110    /// Increments the binlog events received counter.
111    pub fn inc_events_received(&self) {
112        self.events_received.inc();
113    }
114
115    /// Increments the INSERT row event counter by `count`.
116    pub fn inc_inserts(&self, count: u64) {
117        self.inserts.inc_by(count);
118    }
119
120    /// Increments the UPDATE row event counter by `count`.
121    pub fn inc_updates(&self, count: u64) {
122        self.updates.inc_by(count);
123    }
124
125    /// Increments the DELETE row event counter by `count`.
126    pub fn inc_deletes(&self, count: u64) {
127        self.deletes.inc_by(count);
128    }
129
130    /// Increments the transaction counter.
131    pub fn inc_transactions(&self) {
132        self.transactions.inc();
133    }
134
135    /// Increments the TABLE_MAP event counter.
136    pub fn inc_table_maps(&self) {
137        self.table_maps.inc();
138    }
139
140    /// Adds bytes to the bytes received counter.
141    pub fn add_bytes_received(&self, bytes: u64) {
142        self.bytes_received.inc_by(bytes);
143    }
144
145    /// Increments the error counter.
146    pub fn inc_errors(&self) {
147        self.errors.inc();
148    }
149
150    /// Increments the heartbeat counter.
151    pub fn inc_heartbeats(&self) {
152        self.heartbeats.inc();
153    }
154
155    /// Increments the DDL event counter.
156    pub fn inc_ddl_events(&self) {
157        self.ddl_events.inc();
158    }
159
160    /// Updates the current binlog position.
161    #[allow(clippy::cast_possible_wrap)]
162    pub fn set_binlog_position(&self, position: u64) {
163        self.binlog_position.set(position as i64);
164    }
165
166    /// Returns the current binlog position.
167    #[must_use]
168    #[allow(clippy::cast_sign_loss)]
169    pub fn get_binlog_position(&self) -> u64 {
170        self.binlog_position.get() as u64
171    }
172
173    /// Returns the total number of row events (insert + update + delete).
174    #[must_use]
175    pub fn total_row_events(&self) -> u64 {
176        self.inserts.get() + self.updates.get() + self.deletes.get()
177    }
178
179    /// Returns a snapshot of all metrics.
180    #[must_use]
181    #[allow(clippy::cast_sign_loss)]
182    pub fn snapshot(&self) -> MetricsSnapshot {
183        MetricsSnapshot {
184            events_received: self.events_received.get(),
185            inserts: self.inserts.get(),
186            updates: self.updates.get(),
187            deletes: self.deletes.get(),
188            transactions: self.transactions.get(),
189            table_maps: self.table_maps.get(),
190            bytes_received: self.bytes_received.get(),
191            errors: self.errors.get(),
192            heartbeats: self.heartbeats.get(),
193            ddl_events: self.ddl_events.get(),
194            binlog_position: self.binlog_position.get() as u64,
195        }
196    }
197
198    /// Converts to generic connector metrics.
199    #[must_use]
200    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
201        ConnectorMetrics {
202            records_total: self.total_row_events(),
203            bytes_total: self.bytes_received.get(),
204            errors_total: self.errors.get(),
205            ..ConnectorMetrics::default()
206        }
207    }
208}
209
210impl Default for MySqlCdcMetrics {
211    fn default() -> Self {
212        Self::new(None)
213    }
214}
215
216/// A snapshot of metrics values at a point in time.
217#[derive(Debug, Clone, PartialEq, Eq)]
218pub struct MetricsSnapshot {
219    /// Total binlog events received.
220    pub events_received: u64,
221    /// Total INSERT operations.
222    pub inserts: u64,
223    /// Total UPDATE operations.
224    pub updates: u64,
225    /// Total DELETE operations.
226    pub deletes: u64,
227    /// Total transactions.
228    pub transactions: u64,
229    /// Total TABLE_MAP events.
230    pub table_maps: u64,
231    /// Total bytes received.
232    pub bytes_received: u64,
233    /// Total errors.
234    pub errors: u64,
235    /// Total heartbeats.
236    pub heartbeats: u64,
237    /// Total DDL events.
238    pub ddl_events: u64,
239    /// Current binlog position.
240    pub binlog_position: u64,
241}
242
243impl MetricsSnapshot {
244    /// Returns the total row events.
245    #[must_use]
246    pub fn total_row_events(&self) -> u64 {
247        self.inserts + self.updates + self.deletes
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn test_new_metrics() {
257        let m = MySqlCdcMetrics::new(None);
258        assert_eq!(m.events_received.get(), 0);
259        assert_eq!(m.inserts.get(), 0);
260        assert_eq!(m.errors.get(), 0);
261    }
262
263    #[test]
264    fn test_inc_events_received() {
265        let m = MySqlCdcMetrics::new(None);
266        m.inc_events_received();
267        m.inc_events_received();
268        assert_eq!(m.events_received.get(), 2);
269    }
270
271    #[test]
272    fn test_inc_row_events() {
273        let m = MySqlCdcMetrics::new(None);
274        m.inc_inserts(5);
275        m.inc_updates(3);
276        m.inc_deletes(2);
277        assert_eq!(m.total_row_events(), 10);
278    }
279
280    #[test]
281    fn test_add_bytes() {
282        let m = MySqlCdcMetrics::new(None);
283        m.add_bytes_received(100);
284        m.add_bytes_received(50);
285        assert_eq!(m.bytes_received.get(), 150);
286    }
287
288    #[test]
289    fn test_binlog_position() {
290        let m = MySqlCdcMetrics::new(None);
291        m.set_binlog_position(12345);
292        assert_eq!(m.get_binlog_position(), 12345);
293    }
294
295    #[test]
296    fn test_snapshot() {
297        let m = MySqlCdcMetrics::new(None);
298        m.inc_inserts(10);
299        m.inc_updates(5);
300        m.inc_transactions();
301
302        let snap = m.snapshot();
303        assert_eq!(snap.inserts, 10);
304        assert_eq!(snap.updates, 5);
305        assert_eq!(snap.transactions, 1);
306        assert_eq!(snap.total_row_events(), 15);
307    }
308
309    #[test]
310    fn test_to_connector_metrics() {
311        let m = MySqlCdcMetrics::new(None);
312        m.inc_inserts(10);
313        m.add_bytes_received(1000);
314        m.inc_errors();
315
316        let cm = m.to_connector_metrics();
317        assert_eq!(cm.records_total, 10);
318        assert_eq!(cm.bytes_total, 1000);
319        assert_eq!(cm.errors_total, 1);
320    }
321
322    #[test]
323    fn test_inc_all_counters() {
324        let m = MySqlCdcMetrics::new(None);
325
326        m.inc_events_received();
327        m.inc_inserts(1);
328        m.inc_updates(1);
329        m.inc_deletes(1);
330        m.inc_transactions();
331        m.inc_table_maps();
332        m.add_bytes_received(100);
333        m.inc_errors();
334        m.inc_heartbeats();
335        m.inc_ddl_events();
336
337        let snap = m.snapshot();
338        assert_eq!(snap.events_received, 1);
339        assert_eq!(snap.inserts, 1);
340        assert_eq!(snap.updates, 1);
341        assert_eq!(snap.deletes, 1);
342        assert_eq!(snap.transactions, 1);
343        assert_eq!(snap.table_maps, 1);
344        assert_eq!(snap.bytes_received, 100);
345        assert_eq!(snap.errors, 1);
346        assert_eq!(snap.heartbeats, 1);
347        assert_eq!(snap.ddl_events, 1);
348    }
349}