Skip to main content

laminar_connectors/cdc/postgres/
metrics.rs

1//! `PostgreSQL` CDC source connector metrics.
2//!
3//! Lock-free atomic counters for tracking CDC replication performance.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use crate::metrics::ConnectorMetrics;
8
9/// Metrics for the `PostgreSQL` CDC source connector.
10///
11/// All counters use relaxed atomic ordering for lock-free access
12/// from the Ring 1 async runtime.
13#[derive(Debug)]
14pub struct CdcMetrics {
15    /// Total change events received (insert + update + delete).
16    pub events_received: AtomicU64,
17
18    /// Total bytes received from the WAL stream.
19    pub bytes_received: AtomicU64,
20
21    /// Total errors encountered.
22    pub errors: AtomicU64,
23
24    /// Total batches produced for downstream.
25    pub batches_produced: AtomicU64,
26
27    /// Total INSERT operations received.
28    pub inserts: AtomicU64,
29
30    /// Total UPDATE operations received.
31    pub updates: AtomicU64,
32
33    /// Total DELETE operations received.
34    pub deletes: AtomicU64,
35
36    /// Total transactions (commit messages) received.
37    pub transactions: AtomicU64,
38
39    /// Current confirmed flush LSN (as raw u64).
40    pub confirmed_flush_lsn: AtomicU64,
41
42    /// Current replication lag in bytes (`write_lsn` - `confirmed_flush_lsn`).
43    pub replication_lag_bytes: AtomicU64,
44
45    /// Total keepalive/heartbeat messages sent.
46    pub keepalives_sent: AtomicU64,
47}
48
49impl CdcMetrics {
50    /// Creates a new metrics instance with all counters at zero.
51    #[must_use]
52    pub fn new() -> Self {
53        Self {
54            events_received: AtomicU64::new(0),
55            bytes_received: AtomicU64::new(0),
56            errors: AtomicU64::new(0),
57            batches_produced: AtomicU64::new(0),
58            inserts: AtomicU64::new(0),
59            updates: AtomicU64::new(0),
60            deletes: AtomicU64::new(0),
61            transactions: AtomicU64::new(0),
62            confirmed_flush_lsn: AtomicU64::new(0),
63            replication_lag_bytes: AtomicU64::new(0),
64            keepalives_sent: AtomicU64::new(0),
65        }
66    }
67
68    /// Records a received INSERT event.
69    pub fn record_insert(&self) {
70        self.inserts.fetch_add(1, Ordering::Relaxed);
71        self.events_received.fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Records a received UPDATE event.
75    pub fn record_update(&self) {
76        self.updates.fetch_add(1, Ordering::Relaxed);
77        self.events_received.fetch_add(1, Ordering::Relaxed);
78    }
79
80    /// Records a received DELETE event.
81    pub fn record_delete(&self) {
82        self.deletes.fetch_add(1, Ordering::Relaxed);
83        self.events_received.fetch_add(1, Ordering::Relaxed);
84    }
85
86    /// Records a received transaction commit.
87    pub fn record_transaction(&self) {
88        self.transactions.fetch_add(1, Ordering::Relaxed);
89    }
90
91    /// Records bytes received from the WAL stream.
92    pub fn record_bytes(&self, bytes: u64) {
93        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
94    }
95
96    /// Records an error.
97    pub fn record_error(&self) {
98        self.errors.fetch_add(1, Ordering::Relaxed);
99    }
100
101    /// Records a batch produced for downstream.
102    pub fn record_batch(&self) {
103        self.batches_produced.fetch_add(1, Ordering::Relaxed);
104    }
105
106    /// Updates the confirmed flush LSN.
107    pub fn set_confirmed_flush_lsn(&self, lsn: u64) {
108        self.confirmed_flush_lsn.store(lsn, Ordering::Relaxed);
109    }
110
111    /// Updates the replication lag in bytes.
112    pub fn set_replication_lag_bytes(&self, lag: u64) {
113        self.replication_lag_bytes.store(lag, Ordering::Relaxed);
114    }
115
116    /// Records a keepalive sent to `PostgreSQL`.
117    pub fn record_keepalive(&self) {
118        self.keepalives_sent.fetch_add(1, Ordering::Relaxed);
119    }
120
121    /// Converts to the SDK's [`ConnectorMetrics`].
122    #[must_use]
123    #[allow(clippy::cast_precision_loss)]
124    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
125        let mut m = ConnectorMetrics::new();
126        m.records_total = self.events_received.load(Ordering::Relaxed);
127        m.bytes_total = self.bytes_received.load(Ordering::Relaxed);
128        m.errors_total = self.errors.load(Ordering::Relaxed);
129        m.lag = self.replication_lag_bytes.load(Ordering::Relaxed);
130
131        m.add_custom("inserts", self.inserts.load(Ordering::Relaxed) as f64);
132        m.add_custom("updates", self.updates.load(Ordering::Relaxed) as f64);
133        m.add_custom("deletes", self.deletes.load(Ordering::Relaxed) as f64);
134        m.add_custom(
135            "transactions",
136            self.transactions.load(Ordering::Relaxed) as f64,
137        );
138        m.add_custom(
139            "confirmed_flush_lsn",
140            self.confirmed_flush_lsn.load(Ordering::Relaxed) as f64,
141        );
142        m.add_custom(
143            "keepalives_sent",
144            self.keepalives_sent.load(Ordering::Relaxed) as f64,
145        );
146        m
147    }
148}
149
150impl Default for CdcMetrics {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn test_record_operations() {
162        let m = CdcMetrics::new();
163        m.record_insert();
164        m.record_insert();
165        m.record_update();
166        m.record_delete();
167        m.record_transaction();
168        m.record_bytes(1024);
169        m.record_error();
170        m.record_batch();
171        m.record_keepalive();
172
173        assert_eq!(m.events_received.load(Ordering::Relaxed), 4);
174        assert_eq!(m.inserts.load(Ordering::Relaxed), 2);
175        assert_eq!(m.updates.load(Ordering::Relaxed), 1);
176        assert_eq!(m.deletes.load(Ordering::Relaxed), 1);
177        assert_eq!(m.transactions.load(Ordering::Relaxed), 1);
178        assert_eq!(m.bytes_received.load(Ordering::Relaxed), 1024);
179        assert_eq!(m.errors.load(Ordering::Relaxed), 1);
180        assert_eq!(m.batches_produced.load(Ordering::Relaxed), 1);
181        assert_eq!(m.keepalives_sent.load(Ordering::Relaxed), 1);
182    }
183
184    #[test]
185    fn test_lsn_and_lag_tracking() {
186        let m = CdcMetrics::new();
187        m.set_confirmed_flush_lsn(0x1234_ABCD);
188        m.set_replication_lag_bytes(4096);
189
190        assert_eq!(m.confirmed_flush_lsn.load(Ordering::Relaxed), 0x1234_ABCD);
191        assert_eq!(m.replication_lag_bytes.load(Ordering::Relaxed), 4096);
192    }
193
194    #[test]
195    fn test_to_connector_metrics() {
196        let m = CdcMetrics::new();
197        m.record_insert();
198        m.record_bytes(512);
199        m.record_error();
200        m.set_replication_lag_bytes(100);
201
202        let cm = m.to_connector_metrics();
203        assert_eq!(cm.records_total, 1);
204        assert_eq!(cm.bytes_total, 512);
205        assert_eq!(cm.errors_total, 1);
206        assert_eq!(cm.lag, 100);
207        assert!(!cm.custom.is_empty());
208    }
209}