laminar_connectors/cdc/postgres/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
6
7use crate::prom::reg_or_local;
8
9#[derive(Debug, Clone)]
14pub struct PostgresCdcMetrics {
15 pub events_received: IntCounter,
17
18 pub bytes_received: IntCounter,
20
21 pub errors: IntCounter,
23
24 pub batches_produced: IntCounter,
26
27 pub inserts: IntCounter,
29
30 pub updates: IntCounter,
32
33 pub deletes: IntCounter,
35
36 pub transactions: IntCounter,
38
39 pub confirmed_flush_lsn: IntGauge,
41
42 pub replication_lag_bytes: IntGauge,
44
45 pub keepalives_sent: IntCounter,
47
48 pub events_dropped: IntCounter,
50}
51
52impl PostgresCdcMetrics {
53 #[must_use]
55 #[allow(clippy::missing_panics_doc)]
56 pub fn new(registry: Option<&Registry>) -> Self {
57 let mut local = None;
58 let reg = reg_or_local(registry, &mut local);
59
60 Self {
61 events_received: reg.counter(
62 "postgres_cdc_events_received_total",
63 "Total CDC change events received",
64 ),
65 bytes_received: reg.counter(
66 "postgres_cdc_bytes_received_total",
67 "Total bytes from WAL stream",
68 ),
69 errors: reg.counter("postgres_cdc_errors_total", "Total CDC errors"),
70 batches_produced: reg.counter(
71 "postgres_cdc_batches_produced_total",
72 "Total batches produced",
73 ),
74 inserts: reg.counter("postgres_cdc_inserts_total", "Total INSERT events"),
75 updates: reg.counter("postgres_cdc_updates_total", "Total UPDATE events"),
76 deletes: reg.counter("postgres_cdc_deletes_total", "Total DELETE events"),
77 transactions: reg.counter(
78 "postgres_cdc_transactions_total",
79 "Total transactions received",
80 ),
81 confirmed_flush_lsn: reg.gauge(
82 "postgres_cdc_confirmed_flush_lsn",
83 "Current confirmed flush LSN",
84 ),
85 replication_lag_bytes: reg.gauge(
86 "postgres_cdc_replication_lag_bytes",
87 "Replication lag in bytes",
88 ),
89 keepalives_sent: reg.counter(
90 "postgres_cdc_keepalives_sent_total",
91 "Total keepalive messages sent",
92 ),
93 events_dropped: reg.counter(
94 "postgres_cdc_events_dropped_total",
95 "Total events dropped (buffer cap)",
96 ),
97 }
98 }
99
100 pub fn record_insert(&self) {
102 self.inserts.inc();
103 self.events_received.inc();
104 }
105
106 pub fn record_update(&self) {
108 self.updates.inc();
109 self.events_received.inc();
110 }
111
112 pub fn record_delete(&self) {
114 self.deletes.inc();
115 self.events_received.inc();
116 }
117
118 pub fn record_transaction(&self) {
120 self.transactions.inc();
121 }
122
123 pub fn record_bytes(&self, bytes: u64) {
125 self.bytes_received.inc_by(bytes);
126 }
127
128 pub fn record_error(&self) {
130 self.errors.inc();
131 }
132
133 pub fn record_batch(&self) {
135 self.batches_produced.inc();
136 }
137
138 #[allow(clippy::cast_possible_wrap)]
140 pub fn set_confirmed_flush_lsn(&self, lsn: u64) {
141 self.confirmed_flush_lsn.set(lsn as i64);
142 }
143
144 #[allow(clippy::cast_possible_wrap)]
146 pub fn set_replication_lag_bytes(&self, lag: u64) {
147 self.replication_lag_bytes.set(lag as i64);
148 }
149
150 pub fn record_keepalive(&self) {
152 self.keepalives_sent.inc();
153 }
154
155 pub fn record_dropped(&self, count: u64) {
157 self.events_dropped.inc_by(count);
158 }
159}
160
161impl Default for PostgresCdcMetrics {
162 fn default() -> Self {
163 Self::new(None)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn test_record_operations() {
173 let m = PostgresCdcMetrics::new(None);
174 m.record_insert();
175 m.record_insert();
176 m.record_update();
177 m.record_delete();
178 m.record_transaction();
179 m.record_bytes(1024);
180 m.record_error();
181 m.record_batch();
182 m.record_keepalive();
183
184 assert_eq!(m.events_received.get(), 4);
185 assert_eq!(m.inserts.get(), 2);
186 assert_eq!(m.updates.get(), 1);
187 assert_eq!(m.deletes.get(), 1);
188 assert_eq!(m.transactions.get(), 1);
189 assert_eq!(m.bytes_received.get(), 1024);
190 assert_eq!(m.errors.get(), 1);
191 assert_eq!(m.batches_produced.get(), 1);
192 assert_eq!(m.keepalives_sent.get(), 1);
193 }
194
195 #[test]
196 fn test_lsn_and_lag_tracking() {
197 let m = PostgresCdcMetrics::new(None);
198 m.set_confirmed_flush_lsn(0x1234_ABCD);
199 m.set_replication_lag_bytes(4096);
200
201 assert_eq!(m.confirmed_flush_lsn.get(), 0x1234_ABCD_i64);
202 assert_eq!(m.replication_lag_bytes.get(), 4096);
203 }
204}