laminar_connectors/postgres/
sink_metrics.rs1use prometheus::{IntCounter, Registry};
7
8use crate::prom::reg_or_local;
9
10#[derive(Debug, Clone)]
12pub struct PostgresSinkMetrics {
13 pub records_written: IntCounter,
15
16 pub bytes_written: IntCounter,
18
19 pub errors_total: IntCounter,
21
22 pub batches_flushed: IntCounter,
24
25 pub copy_operations: IntCounter,
27
28 pub upsert_operations: IntCounter,
30
31 pub epochs_committed: IntCounter,
33
34 pub epochs_rolled_back: IntCounter,
36
37 pub changelog_deletes: IntCounter,
39}
40
41impl PostgresSinkMetrics {
42 #[must_use]
44 #[allow(clippy::missing_panics_doc)]
45 pub fn new(registry: Option<&Registry>) -> Self {
46 let mut local = None;
47 let reg = reg_or_local(registry, &mut local);
48
49 Self {
50 records_written: reg.counter(
51 "postgres_sink_records_written_total",
52 "Total records written to PostgreSQL",
53 ),
54 bytes_written: reg.counter(
55 "postgres_sink_bytes_written_total",
56 "Total bytes written to PostgreSQL",
57 ),
58 errors_total: reg.counter("postgres_sink_errors_total", "Total PostgreSQL sink errors"),
59 batches_flushed: reg.counter(
60 "postgres_sink_batches_flushed_total",
61 "Total batches flushed",
62 ),
63 copy_operations: reg.counter(
64 "postgres_sink_copy_operations_total",
65 "Total COPY BINARY operations",
66 ),
67 upsert_operations: reg.counter(
68 "postgres_sink_upsert_operations_total",
69 "Total upsert operations",
70 ),
71 epochs_committed: reg.counter(
72 "postgres_sink_epochs_committed_total",
73 "Total epochs committed",
74 ),
75 epochs_rolled_back: reg.counter(
76 "postgres_sink_epochs_rolled_back_total",
77 "Total epochs rolled back",
78 ),
79 changelog_deletes: reg.counter(
80 "postgres_sink_changelog_deletes_total",
81 "Total changelog deletes applied",
82 ),
83 }
84 }
85
86 pub fn record_write(&self, records: u64, bytes: u64) {
88 self.records_written.inc_by(records);
89 self.bytes_written.inc_by(bytes);
90 }
91
92 pub fn record_flush(&self) {
94 self.batches_flushed.inc();
95 }
96
97 pub fn record_copy(&self) {
99 self.copy_operations.inc();
100 }
101
102 pub fn record_upsert(&self) {
104 self.upsert_operations.inc();
105 }
106
107 pub fn record_error(&self) {
109 self.errors_total.inc();
110 }
111
112 pub fn record_commit(&self) {
114 self.epochs_committed.inc();
115 }
116
117 pub fn record_rollback(&self) {
119 self.epochs_rolled_back.inc();
120 }
121
122 pub fn record_deletes(&self, count: u64) {
124 self.changelog_deletes.inc_by(count);
125 }
126}
127
128impl Default for PostgresSinkMetrics {
129 fn default() -> Self {
130 Self::new(None)
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn test_initial_zeros() {
140 let m = PostgresSinkMetrics::new(None);
141 assert_eq!(m.records_written.get(), 0);
142 assert_eq!(m.bytes_written.get(), 0);
143 assert_eq!(m.errors_total.get(), 0);
144 }
145
146 #[test]
147 fn test_record_write() {
148 let m = PostgresSinkMetrics::new(None);
149 m.record_write(100, 5000);
150 m.record_write(200, 10_000);
151
152 assert_eq!(m.records_written.get(), 300);
153 assert_eq!(m.bytes_written.get(), 15_000);
154 }
155
156 #[test]
157 fn test_flush_and_copy_metrics() {
158 let m = PostgresSinkMetrics::new(None);
159 m.record_flush();
160 m.record_flush();
161 m.record_copy();
162
163 assert_eq!(m.batches_flushed.get(), 2);
164 assert_eq!(m.copy_operations.get(), 1);
165 }
166
167 #[test]
168 fn test_epoch_metrics() {
169 let m = PostgresSinkMetrics::new(None);
170 m.record_commit();
171 m.record_commit();
172 m.record_rollback();
173
174 assert_eq!(m.epochs_committed.get(), 2);
175 assert_eq!(m.epochs_rolled_back.get(), 1);
176 }
177
178 #[test]
179 fn test_changelog_deletes() {
180 let m = PostgresSinkMetrics::new(None);
181 m.record_deletes(50);
182 m.record_deletes(30);
183
184 assert_eq!(m.changelog_deletes.get(), 80);
185 }
186
187 #[test]
188 fn test_error_counting() {
189 let m = PostgresSinkMetrics::new(None);
190 m.record_error();
191 m.record_error();
192 m.record_error();
193
194 assert_eq!(m.errors_total.get(), 3);
195 }
196
197 #[test]
198 fn test_upsert_metric() {
199 let m = PostgresSinkMetrics::new(None);
200 m.record_upsert();
201
202 assert_eq!(m.upsert_operations.get(), 1);
203 }
204}