laminar_connectors/cdc/postgres/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::metrics::ConnectorMetrics;
8
9#[derive(Debug, Clone)]
14pub struct PostgresCdcMetrics {
15 pub events_received: IntCounter,
17
18 pub bytes_received: IntCounter,
20
21 pub errors: IntCounter,
23
24 pub batches_produced: IntCounter,
26
27 pub inserts: IntCounter,
29
30 pub updates: IntCounter,
32
33 pub deletes: IntCounter,
35
36 pub transactions: IntCounter,
38
39 pub confirmed_flush_lsn: IntGauge,
41
42 pub replication_lag_bytes: IntGauge,
44
45 pub keepalives_sent: IntCounter,
47
48 pub events_dropped: IntCounter,
50}
51
52impl PostgresCdcMetrics {
53 #[must_use]
55 #[allow(clippy::missing_panics_doc)]
56 pub fn new(registry: Option<&Registry>) -> Self {
57 let local;
58 let reg = if let Some(r) = registry {
59 r
60 } else {
61 local = Registry::new();
62 &local
63 };
64
65 let events_received = IntCounter::new(
66 "postgres_cdc_events_received_total",
67 "Total CDC change events received",
68 )
69 .unwrap();
70 let bytes_received = IntCounter::new(
71 "postgres_cdc_bytes_received_total",
72 "Total bytes from WAL stream",
73 )
74 .unwrap();
75 let errors = IntCounter::new("postgres_cdc_errors_total", "Total CDC errors").unwrap();
76 let batches_produced = IntCounter::new(
77 "postgres_cdc_batches_produced_total",
78 "Total batches produced",
79 )
80 .unwrap();
81 let inserts = IntCounter::new("postgres_cdc_inserts_total", "Total INSERT events").unwrap();
82 let updates = IntCounter::new("postgres_cdc_updates_total", "Total UPDATE events").unwrap();
83 let deletes = IntCounter::new("postgres_cdc_deletes_total", "Total DELETE events").unwrap();
84 let transactions = IntCounter::new(
85 "postgres_cdc_transactions_total",
86 "Total transactions received",
87 )
88 .unwrap();
89 let confirmed_flush_lsn = IntGauge::new(
90 "postgres_cdc_confirmed_flush_lsn",
91 "Current confirmed flush LSN",
92 )
93 .unwrap();
94 let replication_lag_bytes = IntGauge::new(
95 "postgres_cdc_replication_lag_bytes",
96 "Replication lag in bytes",
97 )
98 .unwrap();
99 let keepalives_sent = IntCounter::new(
100 "postgres_cdc_keepalives_sent_total",
101 "Total keepalive messages sent",
102 )
103 .unwrap();
104 let events_dropped = IntCounter::new(
105 "postgres_cdc_events_dropped_total",
106 "Total events dropped (buffer cap)",
107 )
108 .unwrap();
109
110 let _ = reg.register(Box::new(events_received.clone()));
111 let _ = reg.register(Box::new(bytes_received.clone()));
112 let _ = reg.register(Box::new(errors.clone()));
113 let _ = reg.register(Box::new(batches_produced.clone()));
114 let _ = reg.register(Box::new(inserts.clone()));
115 let _ = reg.register(Box::new(updates.clone()));
116 let _ = reg.register(Box::new(deletes.clone()));
117 let _ = reg.register(Box::new(transactions.clone()));
118 let _ = reg.register(Box::new(confirmed_flush_lsn.clone()));
119 let _ = reg.register(Box::new(replication_lag_bytes.clone()));
120 let _ = reg.register(Box::new(keepalives_sent.clone()));
121 let _ = reg.register(Box::new(events_dropped.clone()));
122
123 Self {
124 events_received,
125 bytes_received,
126 errors,
127 batches_produced,
128 inserts,
129 updates,
130 deletes,
131 transactions,
132 confirmed_flush_lsn,
133 replication_lag_bytes,
134 keepalives_sent,
135 events_dropped,
136 }
137 }
138
139 pub fn record_insert(&self) {
141 self.inserts.inc();
142 self.events_received.inc();
143 }
144
145 pub fn record_update(&self) {
147 self.updates.inc();
148 self.events_received.inc();
149 }
150
151 pub fn record_delete(&self) {
153 self.deletes.inc();
154 self.events_received.inc();
155 }
156
157 pub fn record_transaction(&self) {
159 self.transactions.inc();
160 }
161
162 pub fn record_bytes(&self, bytes: u64) {
164 self.bytes_received.inc_by(bytes);
165 }
166
167 pub fn record_error(&self) {
169 self.errors.inc();
170 }
171
172 pub fn record_batch(&self) {
174 self.batches_produced.inc();
175 }
176
177 #[allow(clippy::cast_possible_wrap)]
179 pub fn set_confirmed_flush_lsn(&self, lsn: u64) {
180 self.confirmed_flush_lsn.set(lsn as i64);
181 }
182
183 #[allow(clippy::cast_possible_wrap)]
185 pub fn set_replication_lag_bytes(&self, lag: u64) {
186 self.replication_lag_bytes.set(lag as i64);
187 }
188
189 pub fn record_keepalive(&self) {
191 self.keepalives_sent.inc();
192 }
193
194 pub fn record_dropped(&self, count: u64) {
196 self.events_dropped.inc_by(count);
197 }
198
199 #[must_use]
201 #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
202 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
203 let mut m = ConnectorMetrics::new();
204 m.records_total = self.events_received.get();
205 m.bytes_total = self.bytes_received.get();
206 m.errors_total = self.errors.get();
207 m.lag = self.replication_lag_bytes.get() as u64;
208
209 m.add_custom("inserts", self.inserts.get() as f64);
210 m.add_custom("updates", self.updates.get() as f64);
211 m.add_custom("deletes", self.deletes.get() as f64);
212 m.add_custom("transactions", self.transactions.get() as f64);
213 m.add_custom("confirmed_flush_lsn", self.confirmed_flush_lsn.get() as f64);
214 m.add_custom("events_dropped", self.events_dropped.get() as f64);
215 m.add_custom("keepalives_sent", self.keepalives_sent.get() as f64);
216 m
217 }
218}
219
220impl Default for PostgresCdcMetrics {
221 fn default() -> Self {
222 Self::new(None)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_record_operations() {
232 let m = PostgresCdcMetrics::new(None);
233 m.record_insert();
234 m.record_insert();
235 m.record_update();
236 m.record_delete();
237 m.record_transaction();
238 m.record_bytes(1024);
239 m.record_error();
240 m.record_batch();
241 m.record_keepalive();
242
243 assert_eq!(m.events_received.get(), 4);
244 assert_eq!(m.inserts.get(), 2);
245 assert_eq!(m.updates.get(), 1);
246 assert_eq!(m.deletes.get(), 1);
247 assert_eq!(m.transactions.get(), 1);
248 assert_eq!(m.bytes_received.get(), 1024);
249 assert_eq!(m.errors.get(), 1);
250 assert_eq!(m.batches_produced.get(), 1);
251 assert_eq!(m.keepalives_sent.get(), 1);
252 }
253
254 #[test]
255 fn test_lsn_and_lag_tracking() {
256 let m = PostgresCdcMetrics::new(None);
257 m.set_confirmed_flush_lsn(0x1234_ABCD);
258 m.set_replication_lag_bytes(4096);
259
260 assert_eq!(m.confirmed_flush_lsn.get(), 0x1234_ABCD_i64);
261 assert_eq!(m.replication_lag_bytes.get(), 4096);
262 }
263
264 #[test]
265 fn test_to_connector_metrics() {
266 let m = PostgresCdcMetrics::new(None);
267 m.record_insert();
268 m.record_bytes(512);
269 m.record_error();
270 m.set_replication_lag_bytes(100);
271
272 let cm = m.to_connector_metrics();
273 assert_eq!(cm.records_total, 1);
274 assert_eq!(cm.bytes_total, 512);
275 assert_eq!(cm.errors_total, 1);
276 assert_eq!(cm.lag, 100);
277 assert!(!cm.custom.is_empty());
278 }
279}