Skip to main content

laminar_connectors/cdc/postgres/
metrics.rs

1//! `PostgreSQL` CDC source connector metrics.
2//!
3//! Prometheus-backed counters/gauges for tracking CDC replication performance.
4
5use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::metrics::ConnectorMetrics;
8
9/// Metrics for the `PostgreSQL` CDC source connector.
10///
11/// All counters are prometheus-backed and will appear in the scrape
12/// output when a shared registry is provided.
13#[derive(Debug, Clone)]
14pub struct PostgresCdcMetrics {
15    /// Total change events received (insert + update + delete).
16    pub events_received: IntCounter,
17
18    /// Total bytes received from the WAL stream.
19    pub bytes_received: IntCounter,
20
21    /// Total errors encountered.
22    pub errors: IntCounter,
23
24    /// Total batches produced for downstream.
25    pub batches_produced: IntCounter,
26
27    /// Total INSERT operations received.
28    pub inserts: IntCounter,
29
30    /// Total UPDATE operations received.
31    pub updates: IntCounter,
32
33    /// Total DELETE operations received.
34    pub deletes: IntCounter,
35
36    /// Total transactions (commit messages) received.
37    pub transactions: IntCounter,
38
39    /// Current confirmed flush LSN (as raw u64).
40    pub confirmed_flush_lsn: IntGauge,
41
42    /// Current replication lag in bytes (`write_lsn` - `confirmed_flush_lsn`).
43    pub replication_lag_bytes: IntGauge,
44
45    /// Total keepalive/heartbeat messages sent.
46    pub keepalives_sent: IntCounter,
47
48    /// Total events dropped due to buffer cap enforcement.
49    pub events_dropped: IntCounter,
50}
51
52impl PostgresCdcMetrics {
53    /// Creates a new metrics instance with all counters at zero.
54    #[must_use]
55    #[allow(clippy::missing_panics_doc)]
56    pub fn new(registry: Option<&Registry>) -> Self {
57        let local;
58        let reg = if let Some(r) = registry {
59            r
60        } else {
61            local = Registry::new();
62            &local
63        };
64
65        let events_received = IntCounter::new(
66            "postgres_cdc_events_received_total",
67            "Total CDC change events received",
68        )
69        .unwrap();
70        let bytes_received = IntCounter::new(
71            "postgres_cdc_bytes_received_total",
72            "Total bytes from WAL stream",
73        )
74        .unwrap();
75        let errors = IntCounter::new("postgres_cdc_errors_total", "Total CDC errors").unwrap();
76        let batches_produced = IntCounter::new(
77            "postgres_cdc_batches_produced_total",
78            "Total batches produced",
79        )
80        .unwrap();
81        let inserts = IntCounter::new("postgres_cdc_inserts_total", "Total INSERT events").unwrap();
82        let updates = IntCounter::new("postgres_cdc_updates_total", "Total UPDATE events").unwrap();
83        let deletes = IntCounter::new("postgres_cdc_deletes_total", "Total DELETE events").unwrap();
84        let transactions = IntCounter::new(
85            "postgres_cdc_transactions_total",
86            "Total transactions received",
87        )
88        .unwrap();
89        let confirmed_flush_lsn = IntGauge::new(
90            "postgres_cdc_confirmed_flush_lsn",
91            "Current confirmed flush LSN",
92        )
93        .unwrap();
94        let replication_lag_bytes = IntGauge::new(
95            "postgres_cdc_replication_lag_bytes",
96            "Replication lag in bytes",
97        )
98        .unwrap();
99        let keepalives_sent = IntCounter::new(
100            "postgres_cdc_keepalives_sent_total",
101            "Total keepalive messages sent",
102        )
103        .unwrap();
104        let events_dropped = IntCounter::new(
105            "postgres_cdc_events_dropped_total",
106            "Total events dropped (buffer cap)",
107        )
108        .unwrap();
109
110        let _ = reg.register(Box::new(events_received.clone()));
111        let _ = reg.register(Box::new(bytes_received.clone()));
112        let _ = reg.register(Box::new(errors.clone()));
113        let _ = reg.register(Box::new(batches_produced.clone()));
114        let _ = reg.register(Box::new(inserts.clone()));
115        let _ = reg.register(Box::new(updates.clone()));
116        let _ = reg.register(Box::new(deletes.clone()));
117        let _ = reg.register(Box::new(transactions.clone()));
118        let _ = reg.register(Box::new(confirmed_flush_lsn.clone()));
119        let _ = reg.register(Box::new(replication_lag_bytes.clone()));
120        let _ = reg.register(Box::new(keepalives_sent.clone()));
121        let _ = reg.register(Box::new(events_dropped.clone()));
122
123        Self {
124            events_received,
125            bytes_received,
126            errors,
127            batches_produced,
128            inserts,
129            updates,
130            deletes,
131            transactions,
132            confirmed_flush_lsn,
133            replication_lag_bytes,
134            keepalives_sent,
135            events_dropped,
136        }
137    }
138
139    /// Records a received INSERT event.
140    pub fn record_insert(&self) {
141        self.inserts.inc();
142        self.events_received.inc();
143    }
144
145    /// Records a received UPDATE event.
146    pub fn record_update(&self) {
147        self.updates.inc();
148        self.events_received.inc();
149    }
150
151    /// Records a received DELETE event.
152    pub fn record_delete(&self) {
153        self.deletes.inc();
154        self.events_received.inc();
155    }
156
157    /// Records a received transaction commit.
158    pub fn record_transaction(&self) {
159        self.transactions.inc();
160    }
161
162    /// Records bytes received from the WAL stream.
163    pub fn record_bytes(&self, bytes: u64) {
164        self.bytes_received.inc_by(bytes);
165    }
166
167    /// Records an error.
168    pub fn record_error(&self) {
169        self.errors.inc();
170    }
171
172    /// Records a batch produced for downstream.
173    pub fn record_batch(&self) {
174        self.batches_produced.inc();
175    }
176
177    /// Updates the confirmed flush LSN.
178    #[allow(clippy::cast_possible_wrap)]
179    pub fn set_confirmed_flush_lsn(&self, lsn: u64) {
180        self.confirmed_flush_lsn.set(lsn as i64);
181    }
182
183    /// Updates the replication lag in bytes.
184    #[allow(clippy::cast_possible_wrap)]
185    pub fn set_replication_lag_bytes(&self, lag: u64) {
186        self.replication_lag_bytes.set(lag as i64);
187    }
188
189    /// Records a keepalive sent to `PostgreSQL`.
190    pub fn record_keepalive(&self) {
191        self.keepalives_sent.inc();
192    }
193
194    /// Records events dropped due to buffer cap.
195    pub fn record_dropped(&self, count: u64) {
196        self.events_dropped.inc_by(count);
197    }
198
199    /// Converts to the SDK's [`ConnectorMetrics`].
200    #[must_use]
201    #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
202    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
203        let mut m = ConnectorMetrics::new();
204        m.records_total = self.events_received.get();
205        m.bytes_total = self.bytes_received.get();
206        m.errors_total = self.errors.get();
207        m.lag = self.replication_lag_bytes.get() as u64;
208
209        m.add_custom("inserts", self.inserts.get() as f64);
210        m.add_custom("updates", self.updates.get() as f64);
211        m.add_custom("deletes", self.deletes.get() as f64);
212        m.add_custom("transactions", self.transactions.get() as f64);
213        m.add_custom("confirmed_flush_lsn", self.confirmed_flush_lsn.get() as f64);
214        m.add_custom("events_dropped", self.events_dropped.get() as f64);
215        m.add_custom("keepalives_sent", self.keepalives_sent.get() as f64);
216        m
217    }
218}
219
220impl Default for PostgresCdcMetrics {
221    fn default() -> Self {
222        Self::new(None)
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_record_operations() {
232        let m = PostgresCdcMetrics::new(None);
233        m.record_insert();
234        m.record_insert();
235        m.record_update();
236        m.record_delete();
237        m.record_transaction();
238        m.record_bytes(1024);
239        m.record_error();
240        m.record_batch();
241        m.record_keepalive();
242
243        assert_eq!(m.events_received.get(), 4);
244        assert_eq!(m.inserts.get(), 2);
245        assert_eq!(m.updates.get(), 1);
246        assert_eq!(m.deletes.get(), 1);
247        assert_eq!(m.transactions.get(), 1);
248        assert_eq!(m.bytes_received.get(), 1024);
249        assert_eq!(m.errors.get(), 1);
250        assert_eq!(m.batches_produced.get(), 1);
251        assert_eq!(m.keepalives_sent.get(), 1);
252    }
253
254    #[test]
255    fn test_lsn_and_lag_tracking() {
256        let m = PostgresCdcMetrics::new(None);
257        m.set_confirmed_flush_lsn(0x1234_ABCD);
258        m.set_replication_lag_bytes(4096);
259
260        assert_eq!(m.confirmed_flush_lsn.get(), 0x1234_ABCD_i64);
261        assert_eq!(m.replication_lag_bytes.get(), 4096);
262    }
263
264    #[test]
265    fn test_to_connector_metrics() {
266        let m = PostgresCdcMetrics::new(None);
267        m.record_insert();
268        m.record_bytes(512);
269        m.record_error();
270        m.set_replication_lag_bytes(100);
271
272        let cm = m.to_connector_metrics();
273        assert_eq!(cm.records_total, 1);
274        assert_eq!(cm.bytes_total, 512);
275        assert_eq!(cm.errors_total, 1);
276        assert_eq!(cm.lag, 100);
277        assert!(!cm.custom.is_empty());
278    }
279}