laminar_connectors/postgres/
sink_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug)]
13pub struct PostgresSinkMetrics {
14 pub records_written: AtomicU64,
16
17 pub bytes_written: AtomicU64,
19
20 pub errors_total: AtomicU64,
22
23 pub batches_flushed: AtomicU64,
25
26 pub copy_operations: AtomicU64,
28
29 pub upsert_operations: AtomicU64,
31
32 pub epochs_committed: AtomicU64,
34
35 pub epochs_rolled_back: AtomicU64,
37
38 pub changelog_deletes: AtomicU64,
40}
41
42impl PostgresSinkMetrics {
43 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 records_written: AtomicU64::new(0),
48 bytes_written: AtomicU64::new(0),
49 errors_total: AtomicU64::new(0),
50 batches_flushed: AtomicU64::new(0),
51 copy_operations: AtomicU64::new(0),
52 upsert_operations: AtomicU64::new(0),
53 epochs_committed: AtomicU64::new(0),
54 epochs_rolled_back: AtomicU64::new(0),
55 changelog_deletes: AtomicU64::new(0),
56 }
57 }
58
59 pub fn record_write(&self, records: u64, bytes: u64) {
61 self.records_written.fetch_add(records, Ordering::Relaxed);
62 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
63 }
64
65 pub fn record_flush(&self) {
67 self.batches_flushed.fetch_add(1, Ordering::Relaxed);
68 }
69
70 pub fn record_copy(&self) {
72 self.copy_operations.fetch_add(1, Ordering::Relaxed);
73 }
74
75 pub fn record_upsert(&self) {
77 self.upsert_operations.fetch_add(1, Ordering::Relaxed);
78 }
79
80 pub fn record_error(&self) {
82 self.errors_total.fetch_add(1, Ordering::Relaxed);
83 }
84
85 pub fn record_commit(&self) {
87 self.epochs_committed.fetch_add(1, Ordering::Relaxed);
88 }
89
90 pub fn record_rollback(&self) {
92 self.epochs_rolled_back.fetch_add(1, Ordering::Relaxed);
93 }
94
95 pub fn record_deletes(&self, count: u64) {
97 self.changelog_deletes.fetch_add(count, Ordering::Relaxed);
98 }
99
100 #[must_use]
102 #[allow(clippy::cast_precision_loss)]
103 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
104 let mut m = ConnectorMetrics {
105 records_total: self.records_written.load(Ordering::Relaxed),
106 bytes_total: self.bytes_written.load(Ordering::Relaxed),
107 errors_total: self.errors_total.load(Ordering::Relaxed),
108 lag: 0,
109 custom: Vec::new(),
110 };
111 m.add_custom(
112 "pg.batches_flushed",
113 self.batches_flushed.load(Ordering::Relaxed) as f64,
114 );
115 m.add_custom(
116 "pg.copy_operations",
117 self.copy_operations.load(Ordering::Relaxed) as f64,
118 );
119 m.add_custom(
120 "pg.upsert_operations",
121 self.upsert_operations.load(Ordering::Relaxed) as f64,
122 );
123 m.add_custom(
124 "pg.epochs_committed",
125 self.epochs_committed.load(Ordering::Relaxed) as f64,
126 );
127 m.add_custom(
128 "pg.epochs_rolled_back",
129 self.epochs_rolled_back.load(Ordering::Relaxed) as f64,
130 );
131 m.add_custom(
132 "pg.changelog_deletes",
133 self.changelog_deletes.load(Ordering::Relaxed) as f64,
134 );
135 m
136 }
137}
138
139impl Default for PostgresSinkMetrics {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_initial_zeros() {
151 let m = PostgresSinkMetrics::new();
152 let cm = m.to_connector_metrics();
153 assert_eq!(cm.records_total, 0);
154 assert_eq!(cm.bytes_total, 0);
155 assert_eq!(cm.errors_total, 0);
156 }
157
158 #[test]
159 fn test_record_write() {
160 let m = PostgresSinkMetrics::new();
161 m.record_write(100, 5000);
162 m.record_write(200, 10_000);
163
164 let cm = m.to_connector_metrics();
165 assert_eq!(cm.records_total, 300);
166 assert_eq!(cm.bytes_total, 15_000);
167 }
168
169 #[test]
170 fn test_flush_and_copy_metrics() {
171 let m = PostgresSinkMetrics::new();
172 m.record_flush();
173 m.record_flush();
174 m.record_copy();
175
176 let cm = m.to_connector_metrics();
177 let flushed = cm.custom.iter().find(|(k, _)| k == "pg.batches_flushed");
178 assert_eq!(flushed.unwrap().1, 2.0);
179 let copies = cm.custom.iter().find(|(k, _)| k == "pg.copy_operations");
180 assert_eq!(copies.unwrap().1, 1.0);
181 }
182
183 #[test]
184 fn test_epoch_metrics() {
185 let m = PostgresSinkMetrics::new();
186 m.record_commit();
187 m.record_commit();
188 m.record_rollback();
189
190 let cm = m.to_connector_metrics();
191 let committed = cm.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
192 assert_eq!(committed.unwrap().1, 2.0);
193 let rolled_back = cm.custom.iter().find(|(k, _)| k == "pg.epochs_rolled_back");
194 assert_eq!(rolled_back.unwrap().1, 1.0);
195 }
196
197 #[test]
198 fn test_changelog_deletes() {
199 let m = PostgresSinkMetrics::new();
200 m.record_deletes(50);
201 m.record_deletes(30);
202
203 let cm = m.to_connector_metrics();
204 let deletes = cm.custom.iter().find(|(k, _)| k == "pg.changelog_deletes");
205 assert_eq!(deletes.unwrap().1, 80.0);
206 }
207
208 #[test]
209 fn test_error_counting() {
210 let m = PostgresSinkMetrics::new();
211 m.record_error();
212 m.record_error();
213 m.record_error();
214
215 let cm = m.to_connector_metrics();
216 assert_eq!(cm.errors_total, 3);
217 }
218
219 #[test]
220 fn test_upsert_metric() {
221 let m = PostgresSinkMetrics::new();
222 m.record_upsert();
223
224 let cm = m.to_connector_metrics();
225 let upserts = cm.custom.iter().find(|(k, _)| k == "pg.upsert_operations");
226 assert_eq!(upserts.unwrap().1, 1.0);
227 }
228}