Skip to main content

laminar_connectors/cdc/postgres/
metrics.rs

1//! `PostgreSQL` CDC source connector metrics.
2//!
3//! Prometheus-backed counters/gauges for tracking CDC replication performance.
4
5use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::prom::reg_or_local;
8
9/// Metrics for the `PostgreSQL` CDC source connector.
10///
11/// All counters are prometheus-backed and will appear in the scrape
12/// output when a shared registry is provided.
13#[derive(Debug, Clone)]
14pub struct PostgresCdcMetrics {
15    /// Total change events received (insert + update + delete).
16    pub events_received: IntCounter,
17
18    /// Total bytes received from the WAL stream.
19    pub bytes_received: IntCounter,
20
21    /// Total errors encountered.
22    pub errors: IntCounter,
23
24    /// Total batches produced for downstream.
25    pub batches_produced: IntCounter,
26
27    /// Total INSERT operations received.
28    pub inserts: IntCounter,
29
30    /// Total UPDATE operations received.
31    pub updates: IntCounter,
32
33    /// Total DELETE operations received.
34    pub deletes: IntCounter,
35
36    /// Total transactions (commit messages) received.
37    pub transactions: IntCounter,
38
39    /// Current confirmed flush LSN (as raw u64).
40    pub confirmed_flush_lsn: IntGauge,
41
42    /// Current replication lag in bytes (`write_lsn` - `confirmed_flush_lsn`).
43    pub replication_lag_bytes: IntGauge,
44
45    /// Total keepalive/heartbeat messages sent.
46    pub keepalives_sent: IntCounter,
47
48    /// Total events dropped due to buffer cap enforcement.
49    pub events_dropped: IntCounter,
50}
51
52impl PostgresCdcMetrics {
53    /// Creates a new metrics instance with all counters at zero.
54    #[must_use]
55    #[allow(clippy::missing_panics_doc)]
56    pub fn new(registry: Option<&Registry>) -> Self {
57        let mut local = None;
58        let reg = reg_or_local(registry, &mut local);
59
60        Self {
61            events_received: reg.counter(
62                "postgres_cdc_events_received_total",
63                "Total CDC change events received",
64            ),
65            bytes_received: reg.counter(
66                "postgres_cdc_bytes_received_total",
67                "Total bytes from WAL stream",
68            ),
69            errors: reg.counter("postgres_cdc_errors_total", "Total CDC errors"),
70            batches_produced: reg.counter(
71                "postgres_cdc_batches_produced_total",
72                "Total batches produced",
73            ),
74            inserts: reg.counter("postgres_cdc_inserts_total", "Total INSERT events"),
75            updates: reg.counter("postgres_cdc_updates_total", "Total UPDATE events"),
76            deletes: reg.counter("postgres_cdc_deletes_total", "Total DELETE events"),
77            transactions: reg.counter(
78                "postgres_cdc_transactions_total",
79                "Total transactions received",
80            ),
81            confirmed_flush_lsn: reg.gauge(
82                "postgres_cdc_confirmed_flush_lsn",
83                "Current confirmed flush LSN",
84            ),
85            replication_lag_bytes: reg.gauge(
86                "postgres_cdc_replication_lag_bytes",
87                "Replication lag in bytes",
88            ),
89            keepalives_sent: reg.counter(
90                "postgres_cdc_keepalives_sent_total",
91                "Total keepalive messages sent",
92            ),
93            events_dropped: reg.counter(
94                "postgres_cdc_events_dropped_total",
95                "Total events dropped (buffer cap)",
96            ),
97        }
98    }
99
100    /// Records a received INSERT event.
101    pub fn record_insert(&self) {
102        self.inserts.inc();
103        self.events_received.inc();
104    }
105
106    /// Records a received UPDATE event.
107    pub fn record_update(&self) {
108        self.updates.inc();
109        self.events_received.inc();
110    }
111
112    /// Records a received DELETE event.
113    pub fn record_delete(&self) {
114        self.deletes.inc();
115        self.events_received.inc();
116    }
117
118    /// Records a received transaction commit.
119    pub fn record_transaction(&self) {
120        self.transactions.inc();
121    }
122
123    /// Records bytes received from the WAL stream.
124    pub fn record_bytes(&self, bytes: u64) {
125        self.bytes_received.inc_by(bytes);
126    }
127
128    /// Records an error.
129    pub fn record_error(&self) {
130        self.errors.inc();
131    }
132
133    /// Records a batch produced for downstream.
134    pub fn record_batch(&self) {
135        self.batches_produced.inc();
136    }
137
138    /// Updates the confirmed flush LSN.
139    #[allow(clippy::cast_possible_wrap)]
140    pub fn set_confirmed_flush_lsn(&self, lsn: u64) {
141        self.confirmed_flush_lsn.set(lsn as i64);
142    }
143
144    /// Updates the replication lag in bytes.
145    #[allow(clippy::cast_possible_wrap)]
146    pub fn set_replication_lag_bytes(&self, lag: u64) {
147        self.replication_lag_bytes.set(lag as i64);
148    }
149
150    /// Records a keepalive sent to `PostgreSQL`.
151    pub fn record_keepalive(&self) {
152        self.keepalives_sent.inc();
153    }
154
155    /// Records events dropped due to buffer cap.
156    pub fn record_dropped(&self, count: u64) {
157        self.events_dropped.inc_by(count);
158    }
159}
160
161impl Default for PostgresCdcMetrics {
162    fn default() -> Self {
163        Self::new(None)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn test_record_operations() {
173        let m = PostgresCdcMetrics::new(None);
174        m.record_insert();
175        m.record_insert();
176        m.record_update();
177        m.record_delete();
178        m.record_transaction();
179        m.record_bytes(1024);
180        m.record_error();
181        m.record_batch();
182        m.record_keepalive();
183
184        assert_eq!(m.events_received.get(), 4);
185        assert_eq!(m.inserts.get(), 2);
186        assert_eq!(m.updates.get(), 1);
187        assert_eq!(m.deletes.get(), 1);
188        assert_eq!(m.transactions.get(), 1);
189        assert_eq!(m.bytes_received.get(), 1024);
190        assert_eq!(m.errors.get(), 1);
191        assert_eq!(m.batches_produced.get(), 1);
192        assert_eq!(m.keepalives_sent.get(), 1);
193    }
194
195    #[test]
196    fn test_lsn_and_lag_tracking() {
197        let m = PostgresCdcMetrics::new(None);
198        m.set_confirmed_flush_lsn(0x1234_ABCD);
199        m.set_replication_lag_bytes(4096);
200
201        assert_eq!(m.confirmed_flush_lsn.get(), 0x1234_ABCD_i64);
202        assert_eq!(m.replication_lag_bytes.get(), 4096);
203    }
204}