laminar_connectors/cdc/postgres/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
6
7use crate::metrics::ConnectorMetrics;
8
9#[derive(Debug)]
14pub struct CdcMetrics {
15 pub events_received: AtomicU64,
17
18 pub bytes_received: AtomicU64,
20
21 pub errors: AtomicU64,
23
24 pub batches_produced: AtomicU64,
26
27 pub inserts: AtomicU64,
29
30 pub updates: AtomicU64,
32
33 pub deletes: AtomicU64,
35
36 pub transactions: AtomicU64,
38
39 pub confirmed_flush_lsn: AtomicU64,
41
42 pub replication_lag_bytes: AtomicU64,
44
45 pub keepalives_sent: AtomicU64,
47}
48
49impl CdcMetrics {
50 #[must_use]
52 pub fn new() -> Self {
53 Self {
54 events_received: AtomicU64::new(0),
55 bytes_received: AtomicU64::new(0),
56 errors: AtomicU64::new(0),
57 batches_produced: AtomicU64::new(0),
58 inserts: AtomicU64::new(0),
59 updates: AtomicU64::new(0),
60 deletes: AtomicU64::new(0),
61 transactions: AtomicU64::new(0),
62 confirmed_flush_lsn: AtomicU64::new(0),
63 replication_lag_bytes: AtomicU64::new(0),
64 keepalives_sent: AtomicU64::new(0),
65 }
66 }
67
68 pub fn record_insert(&self) {
70 self.inserts.fetch_add(1, Ordering::Relaxed);
71 self.events_received.fetch_add(1, Ordering::Relaxed);
72 }
73
74 pub fn record_update(&self) {
76 self.updates.fetch_add(1, Ordering::Relaxed);
77 self.events_received.fetch_add(1, Ordering::Relaxed);
78 }
79
80 pub fn record_delete(&self) {
82 self.deletes.fetch_add(1, Ordering::Relaxed);
83 self.events_received.fetch_add(1, Ordering::Relaxed);
84 }
85
86 pub fn record_transaction(&self) {
88 self.transactions.fetch_add(1, Ordering::Relaxed);
89 }
90
91 pub fn record_bytes(&self, bytes: u64) {
93 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
94 }
95
96 pub fn record_error(&self) {
98 self.errors.fetch_add(1, Ordering::Relaxed);
99 }
100
101 pub fn record_batch(&self) {
103 self.batches_produced.fetch_add(1, Ordering::Relaxed);
104 }
105
106 pub fn set_confirmed_flush_lsn(&self, lsn: u64) {
108 self.confirmed_flush_lsn.store(lsn, Ordering::Relaxed);
109 }
110
111 pub fn set_replication_lag_bytes(&self, lag: u64) {
113 self.replication_lag_bytes.store(lag, Ordering::Relaxed);
114 }
115
116 pub fn record_keepalive(&self) {
118 self.keepalives_sent.fetch_add(1, Ordering::Relaxed);
119 }
120
121 #[must_use]
123 #[allow(clippy::cast_precision_loss)]
124 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
125 let mut m = ConnectorMetrics::new();
126 m.records_total = self.events_received.load(Ordering::Relaxed);
127 m.bytes_total = self.bytes_received.load(Ordering::Relaxed);
128 m.errors_total = self.errors.load(Ordering::Relaxed);
129 m.lag = self.replication_lag_bytes.load(Ordering::Relaxed);
130
131 m.add_custom("inserts", self.inserts.load(Ordering::Relaxed) as f64);
132 m.add_custom("updates", self.updates.load(Ordering::Relaxed) as f64);
133 m.add_custom("deletes", self.deletes.load(Ordering::Relaxed) as f64);
134 m.add_custom(
135 "transactions",
136 self.transactions.load(Ordering::Relaxed) as f64,
137 );
138 m.add_custom(
139 "confirmed_flush_lsn",
140 self.confirmed_flush_lsn.load(Ordering::Relaxed) as f64,
141 );
142 m.add_custom(
143 "keepalives_sent",
144 self.keepalives_sent.load(Ordering::Relaxed) as f64,
145 );
146 m
147 }
148}
149
150impl Default for CdcMetrics {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 #[test]
161 fn test_record_operations() {
162 let m = CdcMetrics::new();
163 m.record_insert();
164 m.record_insert();
165 m.record_update();
166 m.record_delete();
167 m.record_transaction();
168 m.record_bytes(1024);
169 m.record_error();
170 m.record_batch();
171 m.record_keepalive();
172
173 assert_eq!(m.events_received.load(Ordering::Relaxed), 4);
174 assert_eq!(m.inserts.load(Ordering::Relaxed), 2);
175 assert_eq!(m.updates.load(Ordering::Relaxed), 1);
176 assert_eq!(m.deletes.load(Ordering::Relaxed), 1);
177 assert_eq!(m.transactions.load(Ordering::Relaxed), 1);
178 assert_eq!(m.bytes_received.load(Ordering::Relaxed), 1024);
179 assert_eq!(m.errors.load(Ordering::Relaxed), 1);
180 assert_eq!(m.batches_produced.load(Ordering::Relaxed), 1);
181 assert_eq!(m.keepalives_sent.load(Ordering::Relaxed), 1);
182 }
183
184 #[test]
185 fn test_lsn_and_lag_tracking() {
186 let m = CdcMetrics::new();
187 m.set_confirmed_flush_lsn(0x1234_ABCD);
188 m.set_replication_lag_bytes(4096);
189
190 assert_eq!(m.confirmed_flush_lsn.load(Ordering::Relaxed), 0x1234_ABCD);
191 assert_eq!(m.replication_lag_bytes.load(Ordering::Relaxed), 4096);
192 }
193
194 #[test]
195 fn test_to_connector_metrics() {
196 let m = CdcMetrics::new();
197 m.record_insert();
198 m.record_bytes(512);
199 m.record_error();
200 m.set_replication_lag_bytes(100);
201
202 let cm = m.to_connector_metrics();
203 assert_eq!(cm.records_total, 1);
204 assert_eq!(cm.bytes_total, 512);
205 assert_eq!(cm.errors_total, 1);
206 assert_eq!(cm.lag, 100);
207 assert!(!cm.custom.is_empty());
208 }
209}