laminar_connectors/postgres/
sink_metrics.rs1use prometheus::{IntCounter, Registry};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug, Clone)]
13pub struct PostgresSinkMetrics {
14 pub records_written: IntCounter,
16
17 pub bytes_written: IntCounter,
19
20 pub errors_total: IntCounter,
22
23 pub batches_flushed: IntCounter,
25
26 pub copy_operations: IntCounter,
28
29 pub upsert_operations: IntCounter,
31
32 pub epochs_committed: IntCounter,
34
35 pub epochs_rolled_back: IntCounter,
37
38 pub changelog_deletes: IntCounter,
40}
41
42impl PostgresSinkMetrics {
43 #[must_use]
45 #[allow(clippy::missing_panics_doc)]
46 pub fn new(registry: Option<&Registry>) -> Self {
47 let local;
48 let reg = if let Some(r) = registry {
49 r
50 } else {
51 local = Registry::new();
52 &local
53 };
54
55 let records_written = IntCounter::new(
56 "postgres_sink_records_written_total",
57 "Total records written to PostgreSQL",
58 )
59 .unwrap();
60 let bytes_written = IntCounter::new(
61 "postgres_sink_bytes_written_total",
62 "Total bytes written to PostgreSQL",
63 )
64 .unwrap();
65 let errors_total =
66 IntCounter::new("postgres_sink_errors_total", "Total PostgreSQL sink errors").unwrap();
67 let batches_flushed = IntCounter::new(
68 "postgres_sink_batches_flushed_total",
69 "Total batches flushed",
70 )
71 .unwrap();
72 let copy_operations = IntCounter::new(
73 "postgres_sink_copy_operations_total",
74 "Total COPY BINARY operations",
75 )
76 .unwrap();
77 let upsert_operations = IntCounter::new(
78 "postgres_sink_upsert_operations_total",
79 "Total upsert operations",
80 )
81 .unwrap();
82 let epochs_committed = IntCounter::new(
83 "postgres_sink_epochs_committed_total",
84 "Total epochs committed",
85 )
86 .unwrap();
87 let epochs_rolled_back = IntCounter::new(
88 "postgres_sink_epochs_rolled_back_total",
89 "Total epochs rolled back",
90 )
91 .unwrap();
92 let changelog_deletes = IntCounter::new(
93 "postgres_sink_changelog_deletes_total",
94 "Total changelog deletes applied",
95 )
96 .unwrap();
97
98 let _ = reg.register(Box::new(records_written.clone()));
99 let _ = reg.register(Box::new(bytes_written.clone()));
100 let _ = reg.register(Box::new(errors_total.clone()));
101 let _ = reg.register(Box::new(batches_flushed.clone()));
102 let _ = reg.register(Box::new(copy_operations.clone()));
103 let _ = reg.register(Box::new(upsert_operations.clone()));
104 let _ = reg.register(Box::new(epochs_committed.clone()));
105 let _ = reg.register(Box::new(epochs_rolled_back.clone()));
106 let _ = reg.register(Box::new(changelog_deletes.clone()));
107
108 Self {
109 records_written,
110 bytes_written,
111 errors_total,
112 batches_flushed,
113 copy_operations,
114 upsert_operations,
115 epochs_committed,
116 epochs_rolled_back,
117 changelog_deletes,
118 }
119 }
120
121 pub fn record_write(&self, records: u64, bytes: u64) {
123 self.records_written.inc_by(records);
124 self.bytes_written.inc_by(bytes);
125 }
126
127 pub fn record_flush(&self) {
129 self.batches_flushed.inc();
130 }
131
132 pub fn record_copy(&self) {
134 self.copy_operations.inc();
135 }
136
137 pub fn record_upsert(&self) {
139 self.upsert_operations.inc();
140 }
141
142 pub fn record_error(&self) {
144 self.errors_total.inc();
145 }
146
147 pub fn record_commit(&self) {
149 self.epochs_committed.inc();
150 }
151
152 pub fn record_rollback(&self) {
154 self.epochs_rolled_back.inc();
155 }
156
157 pub fn record_deletes(&self, count: u64) {
159 self.changelog_deletes.inc_by(count);
160 }
161
162 #[must_use]
164 #[allow(clippy::cast_precision_loss)]
165 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
166 let mut m = ConnectorMetrics {
167 records_total: self.records_written.get(),
168 bytes_total: self.bytes_written.get(),
169 errors_total: self.errors_total.get(),
170 lag: 0,
171 custom: Vec::new(),
172 };
173 m.add_custom("pg.batches_flushed", self.batches_flushed.get() as f64);
174 m.add_custom("pg.copy_operations", self.copy_operations.get() as f64);
175 m.add_custom("pg.upsert_operations", self.upsert_operations.get() as f64);
176 m.add_custom("pg.epochs_committed", self.epochs_committed.get() as f64);
177 m.add_custom(
178 "pg.epochs_rolled_back",
179 self.epochs_rolled_back.get() as f64,
180 );
181 m.add_custom("pg.changelog_deletes", self.changelog_deletes.get() as f64);
182 m
183 }
184}
185
186impl Default for PostgresSinkMetrics {
187 fn default() -> Self {
188 Self::new(None)
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_initial_zeros() {
198 let m = PostgresSinkMetrics::new(None);
199 let cm = m.to_connector_metrics();
200 assert_eq!(cm.records_total, 0);
201 assert_eq!(cm.bytes_total, 0);
202 assert_eq!(cm.errors_total, 0);
203 }
204
205 #[test]
206 fn test_record_write() {
207 let m = PostgresSinkMetrics::new(None);
208 m.record_write(100, 5000);
209 m.record_write(200, 10_000);
210
211 let cm = m.to_connector_metrics();
212 assert_eq!(cm.records_total, 300);
213 assert_eq!(cm.bytes_total, 15_000);
214 }
215
216 #[test]
217 fn test_flush_and_copy_metrics() {
218 let m = PostgresSinkMetrics::new(None);
219 m.record_flush();
220 m.record_flush();
221 m.record_copy();
222
223 let cm = m.to_connector_metrics();
224 let flushed = cm.custom.iter().find(|(k, _)| k == "pg.batches_flushed");
225 assert_eq!(flushed.unwrap().1, 2.0);
226 let copies = cm.custom.iter().find(|(k, _)| k == "pg.copy_operations");
227 assert_eq!(copies.unwrap().1, 1.0);
228 }
229
230 #[test]
231 fn test_epoch_metrics() {
232 let m = PostgresSinkMetrics::new(None);
233 m.record_commit();
234 m.record_commit();
235 m.record_rollback();
236
237 let cm = m.to_connector_metrics();
238 let committed = cm.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
239 assert_eq!(committed.unwrap().1, 2.0);
240 let rolled_back = cm.custom.iter().find(|(k, _)| k == "pg.epochs_rolled_back");
241 assert_eq!(rolled_back.unwrap().1, 1.0);
242 }
243
244 #[test]
245 fn test_changelog_deletes() {
246 let m = PostgresSinkMetrics::new(None);
247 m.record_deletes(50);
248 m.record_deletes(30);
249
250 let cm = m.to_connector_metrics();
251 let deletes = cm.custom.iter().find(|(k, _)| k == "pg.changelog_deletes");
252 assert_eq!(deletes.unwrap().1, 80.0);
253 }
254
255 #[test]
256 fn test_error_counting() {
257 let m = PostgresSinkMetrics::new(None);
258 m.record_error();
259 m.record_error();
260 m.record_error();
261
262 let cm = m.to_connector_metrics();
263 assert_eq!(cm.errors_total, 3);
264 }
265
266 #[test]
267 fn test_upsert_metric() {
268 let m = PostgresSinkMetrics::new(None);
269 m.record_upsert();
270
271 let cm = m.to_connector_metrics();
272 let upserts = cm.custom.iter().find(|(k, _)| k == "pg.upsert_operations");
273 assert_eq!(upserts.unwrap().1, 1.0);
274 }
275}