Skip to main content

laminar_connectors/postgres/
sink_metrics.rs

1//! `PostgreSQL` sink connector metrics.
2//!
3//! [`PostgresSinkMetrics`] provides prometheus-backed counters for
4//! tracking write statistics.
5
6use prometheus::{IntCounter, Registry};
7
8use crate::prom::reg_or_local;
9
10/// Prometheus-backed counters for `PostgreSQL` sink connector statistics.
11#[derive(Debug, Clone)]
12pub struct PostgresSinkMetrics {
13    /// Total records written to `PostgreSQL`.
14    pub records_written: IntCounter,
15
16    /// Total bytes written (estimated from `RecordBatch` sizes).
17    pub bytes_written: IntCounter,
18
19    /// Total errors encountered.
20    pub errors_total: IntCounter,
21
22    /// Total batches flushed.
23    pub batches_flushed: IntCounter,
24
25    /// Total COPY BINARY operations (append mode).
26    pub copy_operations: IntCounter,
27
28    /// Total upsert operations (upsert mode).
29    pub upsert_operations: IntCounter,
30
31    /// Total epochs committed (exactly-once).
32    pub epochs_committed: IntCounter,
33
34    /// Total epochs rolled back.
35    pub epochs_rolled_back: IntCounter,
36
37    /// Total changelog deletes applied (Z-set weight -1).
38    pub changelog_deletes: IntCounter,
39}
40
41impl PostgresSinkMetrics {
42    /// Creates a new metrics instance with all counters at zero.
43    #[must_use]
44    #[allow(clippy::missing_panics_doc)]
45    pub fn new(registry: Option<&Registry>) -> Self {
46        let mut local = None;
47        let reg = reg_or_local(registry, &mut local);
48
49        Self {
50            records_written: reg.counter(
51                "postgres_sink_records_written_total",
52                "Total records written to PostgreSQL",
53            ),
54            bytes_written: reg.counter(
55                "postgres_sink_bytes_written_total",
56                "Total bytes written to PostgreSQL",
57            ),
58            errors_total: reg.counter("postgres_sink_errors_total", "Total PostgreSQL sink errors"),
59            batches_flushed: reg.counter(
60                "postgres_sink_batches_flushed_total",
61                "Total batches flushed",
62            ),
63            copy_operations: reg.counter(
64                "postgres_sink_copy_operations_total",
65                "Total COPY BINARY operations",
66            ),
67            upsert_operations: reg.counter(
68                "postgres_sink_upsert_operations_total",
69                "Total upsert operations",
70            ),
71            epochs_committed: reg.counter(
72                "postgres_sink_epochs_committed_total",
73                "Total epochs committed",
74            ),
75            epochs_rolled_back: reg.counter(
76                "postgres_sink_epochs_rolled_back_total",
77                "Total epochs rolled back",
78            ),
79            changelog_deletes: reg.counter(
80                "postgres_sink_changelog_deletes_total",
81                "Total changelog deletes applied",
82            ),
83        }
84    }
85
86    /// Records a successful write of `records` records totaling `bytes`.
87    pub fn record_write(&self, records: u64, bytes: u64) {
88        self.records_written.inc_by(records);
89        self.bytes_written.inc_by(bytes);
90    }
91
92    /// Records a successful batch flush.
93    pub fn record_flush(&self) {
94        self.batches_flushed.inc();
95    }
96
97    /// Records a COPY BINARY operation.
98    pub fn record_copy(&self) {
99        self.copy_operations.inc();
100    }
101
102    /// Records an upsert operation.
103    pub fn record_upsert(&self) {
104        self.upsert_operations.inc();
105    }
106
107    /// Records a write or connection error.
108    pub fn record_error(&self) {
109        self.errors_total.inc();
110    }
111
112    /// Records a successful epoch commit.
113    pub fn record_commit(&self) {
114        self.epochs_committed.inc();
115    }
116
117    /// Records an epoch rollback.
118    pub fn record_rollback(&self) {
119        self.epochs_rolled_back.inc();
120    }
121
122    /// Records changelog DELETE operations.
123    pub fn record_deletes(&self, count: u64) {
124        self.changelog_deletes.inc_by(count);
125    }
126}
127
128impl Default for PostgresSinkMetrics {
129    fn default() -> Self {
130        Self::new(None)
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn test_initial_zeros() {
140        let m = PostgresSinkMetrics::new(None);
141        assert_eq!(m.records_written.get(), 0);
142        assert_eq!(m.bytes_written.get(), 0);
143        assert_eq!(m.errors_total.get(), 0);
144    }
145
146    #[test]
147    fn test_record_write() {
148        let m = PostgresSinkMetrics::new(None);
149        m.record_write(100, 5000);
150        m.record_write(200, 10_000);
151
152        assert_eq!(m.records_written.get(), 300);
153        assert_eq!(m.bytes_written.get(), 15_000);
154    }
155
156    #[test]
157    fn test_flush_and_copy_metrics() {
158        let m = PostgresSinkMetrics::new(None);
159        m.record_flush();
160        m.record_flush();
161        m.record_copy();
162
163        assert_eq!(m.batches_flushed.get(), 2);
164        assert_eq!(m.copy_operations.get(), 1);
165    }
166
167    #[test]
168    fn test_epoch_metrics() {
169        let m = PostgresSinkMetrics::new(None);
170        m.record_commit();
171        m.record_commit();
172        m.record_rollback();
173
174        assert_eq!(m.epochs_committed.get(), 2);
175        assert_eq!(m.epochs_rolled_back.get(), 1);
176    }
177
178    #[test]
179    fn test_changelog_deletes() {
180        let m = PostgresSinkMetrics::new(None);
181        m.record_deletes(50);
182        m.record_deletes(30);
183
184        assert_eq!(m.changelog_deletes.get(), 80);
185    }
186
187    #[test]
188    fn test_error_counting() {
189        let m = PostgresSinkMetrics::new(None);
190        m.record_error();
191        m.record_error();
192        m.record_error();
193
194        assert_eq!(m.errors_total.get(), 3);
195    }
196
197    #[test]
198    fn test_upsert_metric() {
199        let m = PostgresSinkMetrics::new(None);
200        m.record_upsert();
201
202        assert_eq!(m.upsert_operations.get(), 1);
203    }
204}