Skip to main content

laminar_connectors/postgres/
sink_metrics.rs

1//! `PostgreSQL` sink connector metrics.
2//!
3//! [`PostgresSinkMetrics`] provides prometheus-backed counters for
4//! tracking write statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use prometheus::{IntCounter, Registry};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Prometheus-backed counters for `PostgreSQL` sink connector statistics.
12#[derive(Debug, Clone)]
13pub struct PostgresSinkMetrics {
14    /// Total records written to `PostgreSQL`.
15    pub records_written: IntCounter,
16
17    /// Total bytes written (estimated from `RecordBatch` sizes).
18    pub bytes_written: IntCounter,
19
20    /// Total errors encountered.
21    pub errors_total: IntCounter,
22
23    /// Total batches flushed.
24    pub batches_flushed: IntCounter,
25
26    /// Total COPY BINARY operations (append mode).
27    pub copy_operations: IntCounter,
28
29    /// Total upsert operations (upsert mode).
30    pub upsert_operations: IntCounter,
31
32    /// Total epochs committed (exactly-once).
33    pub epochs_committed: IntCounter,
34
35    /// Total epochs rolled back.
36    pub epochs_rolled_back: IntCounter,
37
38    /// Total changelog deletes applied (Z-set weight -1).
39    pub changelog_deletes: IntCounter,
40}
41
42impl PostgresSinkMetrics {
43    /// Creates a new metrics instance with all counters at zero.
44    #[must_use]
45    #[allow(clippy::missing_panics_doc)]
46    pub fn new(registry: Option<&Registry>) -> Self {
47        let local;
48        let reg = if let Some(r) = registry {
49            r
50        } else {
51            local = Registry::new();
52            &local
53        };
54
55        let records_written = IntCounter::new(
56            "postgres_sink_records_written_total",
57            "Total records written to PostgreSQL",
58        )
59        .unwrap();
60        let bytes_written = IntCounter::new(
61            "postgres_sink_bytes_written_total",
62            "Total bytes written to PostgreSQL",
63        )
64        .unwrap();
65        let errors_total =
66            IntCounter::new("postgres_sink_errors_total", "Total PostgreSQL sink errors").unwrap();
67        let batches_flushed = IntCounter::new(
68            "postgres_sink_batches_flushed_total",
69            "Total batches flushed",
70        )
71        .unwrap();
72        let copy_operations = IntCounter::new(
73            "postgres_sink_copy_operations_total",
74            "Total COPY BINARY operations",
75        )
76        .unwrap();
77        let upsert_operations = IntCounter::new(
78            "postgres_sink_upsert_operations_total",
79            "Total upsert operations",
80        )
81        .unwrap();
82        let epochs_committed = IntCounter::new(
83            "postgres_sink_epochs_committed_total",
84            "Total epochs committed",
85        )
86        .unwrap();
87        let epochs_rolled_back = IntCounter::new(
88            "postgres_sink_epochs_rolled_back_total",
89            "Total epochs rolled back",
90        )
91        .unwrap();
92        let changelog_deletes = IntCounter::new(
93            "postgres_sink_changelog_deletes_total",
94            "Total changelog deletes applied",
95        )
96        .unwrap();
97
98        let _ = reg.register(Box::new(records_written.clone()));
99        let _ = reg.register(Box::new(bytes_written.clone()));
100        let _ = reg.register(Box::new(errors_total.clone()));
101        let _ = reg.register(Box::new(batches_flushed.clone()));
102        let _ = reg.register(Box::new(copy_operations.clone()));
103        let _ = reg.register(Box::new(upsert_operations.clone()));
104        let _ = reg.register(Box::new(epochs_committed.clone()));
105        let _ = reg.register(Box::new(epochs_rolled_back.clone()));
106        let _ = reg.register(Box::new(changelog_deletes.clone()));
107
108        Self {
109            records_written,
110            bytes_written,
111            errors_total,
112            batches_flushed,
113            copy_operations,
114            upsert_operations,
115            epochs_committed,
116            epochs_rolled_back,
117            changelog_deletes,
118        }
119    }
120
121    /// Records a successful write of `records` records totaling `bytes`.
122    pub fn record_write(&self, records: u64, bytes: u64) {
123        self.records_written.inc_by(records);
124        self.bytes_written.inc_by(bytes);
125    }
126
127    /// Records a successful batch flush.
128    pub fn record_flush(&self) {
129        self.batches_flushed.inc();
130    }
131
132    /// Records a COPY BINARY operation.
133    pub fn record_copy(&self) {
134        self.copy_operations.inc();
135    }
136
137    /// Records an upsert operation.
138    pub fn record_upsert(&self) {
139        self.upsert_operations.inc();
140    }
141
142    /// Records a write or connection error.
143    pub fn record_error(&self) {
144        self.errors_total.inc();
145    }
146
147    /// Records a successful epoch commit.
148    pub fn record_commit(&self) {
149        self.epochs_committed.inc();
150    }
151
152    /// Records an epoch rollback.
153    pub fn record_rollback(&self) {
154        self.epochs_rolled_back.inc();
155    }
156
157    /// Records changelog DELETE operations.
158    pub fn record_deletes(&self, count: u64) {
159        self.changelog_deletes.inc_by(count);
160    }
161
162    /// Converts to the SDK's [`ConnectorMetrics`].
163    #[must_use]
164    #[allow(clippy::cast_precision_loss)]
165    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
166        let mut m = ConnectorMetrics {
167            records_total: self.records_written.get(),
168            bytes_total: self.bytes_written.get(),
169            errors_total: self.errors_total.get(),
170            lag: 0,
171            custom: Vec::new(),
172        };
173        m.add_custom("pg.batches_flushed", self.batches_flushed.get() as f64);
174        m.add_custom("pg.copy_operations", self.copy_operations.get() as f64);
175        m.add_custom("pg.upsert_operations", self.upsert_operations.get() as f64);
176        m.add_custom("pg.epochs_committed", self.epochs_committed.get() as f64);
177        m.add_custom(
178            "pg.epochs_rolled_back",
179            self.epochs_rolled_back.get() as f64,
180        );
181        m.add_custom("pg.changelog_deletes", self.changelog_deletes.get() as f64);
182        m
183    }
184}
185
186impl Default for PostgresSinkMetrics {
187    fn default() -> Self {
188        Self::new(None)
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_initial_zeros() {
198        let m = PostgresSinkMetrics::new(None);
199        let cm = m.to_connector_metrics();
200        assert_eq!(cm.records_total, 0);
201        assert_eq!(cm.bytes_total, 0);
202        assert_eq!(cm.errors_total, 0);
203    }
204
205    #[test]
206    fn test_record_write() {
207        let m = PostgresSinkMetrics::new(None);
208        m.record_write(100, 5000);
209        m.record_write(200, 10_000);
210
211        let cm = m.to_connector_metrics();
212        assert_eq!(cm.records_total, 300);
213        assert_eq!(cm.bytes_total, 15_000);
214    }
215
216    #[test]
217    fn test_flush_and_copy_metrics() {
218        let m = PostgresSinkMetrics::new(None);
219        m.record_flush();
220        m.record_flush();
221        m.record_copy();
222
223        let cm = m.to_connector_metrics();
224        let flushed = cm.custom.iter().find(|(k, _)| k == "pg.batches_flushed");
225        assert_eq!(flushed.unwrap().1, 2.0);
226        let copies = cm.custom.iter().find(|(k, _)| k == "pg.copy_operations");
227        assert_eq!(copies.unwrap().1, 1.0);
228    }
229
230    #[test]
231    fn test_epoch_metrics() {
232        let m = PostgresSinkMetrics::new(None);
233        m.record_commit();
234        m.record_commit();
235        m.record_rollback();
236
237        let cm = m.to_connector_metrics();
238        let committed = cm.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
239        assert_eq!(committed.unwrap().1, 2.0);
240        let rolled_back = cm.custom.iter().find(|(k, _)| k == "pg.epochs_rolled_back");
241        assert_eq!(rolled_back.unwrap().1, 1.0);
242    }
243
244    #[test]
245    fn test_changelog_deletes() {
246        let m = PostgresSinkMetrics::new(None);
247        m.record_deletes(50);
248        m.record_deletes(30);
249
250        let cm = m.to_connector_metrics();
251        let deletes = cm.custom.iter().find(|(k, _)| k == "pg.changelog_deletes");
252        assert_eq!(deletes.unwrap().1, 80.0);
253    }
254
255    #[test]
256    fn test_error_counting() {
257        let m = PostgresSinkMetrics::new(None);
258        m.record_error();
259        m.record_error();
260        m.record_error();
261
262        let cm = m.to_connector_metrics();
263        assert_eq!(cm.errors_total, 3);
264    }
265
266    #[test]
267    fn test_upsert_metric() {
268        let m = PostgresSinkMetrics::new(None);
269        m.record_upsert();
270
271        let cm = m.to_connector_metrics();
272        let upserts = cm.custom.iter().find(|(k, _)| k == "pg.upsert_operations");
273        assert_eq!(upserts.unwrap().1, 1.0);
274    }
275}