Skip to main content

laminar_connectors/postgres/
sink_metrics.rs

1//! `PostgreSQL` sink connector metrics.
2//!
3//! [`PostgresSinkMetrics`] provides lock-free atomic counters for
4//! tracking write statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Atomic counters for `PostgreSQL` sink connector statistics.
12#[derive(Debug)]
13pub struct PostgresSinkMetrics {
14    /// Total records written to `PostgreSQL`.
15    pub records_written: AtomicU64,
16
17    /// Total bytes written (estimated from `RecordBatch` sizes).
18    pub bytes_written: AtomicU64,
19
20    /// Total errors encountered.
21    pub errors_total: AtomicU64,
22
23    /// Total batches flushed.
24    pub batches_flushed: AtomicU64,
25
26    /// Total COPY BINARY operations (append mode).
27    pub copy_operations: AtomicU64,
28
29    /// Total upsert operations (upsert mode).
30    pub upsert_operations: AtomicU64,
31
32    /// Total epochs committed (exactly-once).
33    pub epochs_committed: AtomicU64,
34
35    /// Total epochs rolled back.
36    pub epochs_rolled_back: AtomicU64,
37
38    /// Total changelog deletes applied (Z-set weight -1).
39    pub changelog_deletes: AtomicU64,
40}
41
42impl PostgresSinkMetrics {
43    /// Creates a new metrics instance with all counters at zero.
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            records_written: AtomicU64::new(0),
48            bytes_written: AtomicU64::new(0),
49            errors_total: AtomicU64::new(0),
50            batches_flushed: AtomicU64::new(0),
51            copy_operations: AtomicU64::new(0),
52            upsert_operations: AtomicU64::new(0),
53            epochs_committed: AtomicU64::new(0),
54            epochs_rolled_back: AtomicU64::new(0),
55            changelog_deletes: AtomicU64::new(0),
56        }
57    }
58
59    /// Records a successful write of `records` records totaling `bytes`.
60    pub fn record_write(&self, records: u64, bytes: u64) {
61        self.records_written.fetch_add(records, Ordering::Relaxed);
62        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
63    }
64
65    /// Records a successful batch flush.
66    pub fn record_flush(&self) {
67        self.batches_flushed.fetch_add(1, Ordering::Relaxed);
68    }
69
70    /// Records a COPY BINARY operation.
71    pub fn record_copy(&self) {
72        self.copy_operations.fetch_add(1, Ordering::Relaxed);
73    }
74
75    /// Records an upsert operation.
76    pub fn record_upsert(&self) {
77        self.upsert_operations.fetch_add(1, Ordering::Relaxed);
78    }
79
80    /// Records a write or connection error.
81    pub fn record_error(&self) {
82        self.errors_total.fetch_add(1, Ordering::Relaxed);
83    }
84
85    /// Records a successful epoch commit.
86    pub fn record_commit(&self) {
87        self.epochs_committed.fetch_add(1, Ordering::Relaxed);
88    }
89
90    /// Records an epoch rollback.
91    pub fn record_rollback(&self) {
92        self.epochs_rolled_back.fetch_add(1, Ordering::Relaxed);
93    }
94
95    /// Records changelog DELETE operations.
96    pub fn record_deletes(&self, count: u64) {
97        self.changelog_deletes.fetch_add(count, Ordering::Relaxed);
98    }
99
100    /// Converts to the SDK's [`ConnectorMetrics`].
101    #[must_use]
102    #[allow(clippy::cast_precision_loss)]
103    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
104        let mut m = ConnectorMetrics {
105            records_total: self.records_written.load(Ordering::Relaxed),
106            bytes_total: self.bytes_written.load(Ordering::Relaxed),
107            errors_total: self.errors_total.load(Ordering::Relaxed),
108            lag: 0,
109            custom: Vec::new(),
110        };
111        m.add_custom(
112            "pg.batches_flushed",
113            self.batches_flushed.load(Ordering::Relaxed) as f64,
114        );
115        m.add_custom(
116            "pg.copy_operations",
117            self.copy_operations.load(Ordering::Relaxed) as f64,
118        );
119        m.add_custom(
120            "pg.upsert_operations",
121            self.upsert_operations.load(Ordering::Relaxed) as f64,
122        );
123        m.add_custom(
124            "pg.epochs_committed",
125            self.epochs_committed.load(Ordering::Relaxed) as f64,
126        );
127        m.add_custom(
128            "pg.epochs_rolled_back",
129            self.epochs_rolled_back.load(Ordering::Relaxed) as f64,
130        );
131        m.add_custom(
132            "pg.changelog_deletes",
133            self.changelog_deletes.load(Ordering::Relaxed) as f64,
134        );
135        m
136    }
137}
138
139impl Default for PostgresSinkMetrics {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_initial_zeros() {
151        let m = PostgresSinkMetrics::new();
152        let cm = m.to_connector_metrics();
153        assert_eq!(cm.records_total, 0);
154        assert_eq!(cm.bytes_total, 0);
155        assert_eq!(cm.errors_total, 0);
156    }
157
158    #[test]
159    fn test_record_write() {
160        let m = PostgresSinkMetrics::new();
161        m.record_write(100, 5000);
162        m.record_write(200, 10_000);
163
164        let cm = m.to_connector_metrics();
165        assert_eq!(cm.records_total, 300);
166        assert_eq!(cm.bytes_total, 15_000);
167    }
168
169    #[test]
170    fn test_flush_and_copy_metrics() {
171        let m = PostgresSinkMetrics::new();
172        m.record_flush();
173        m.record_flush();
174        m.record_copy();
175
176        let cm = m.to_connector_metrics();
177        let flushed = cm.custom.iter().find(|(k, _)| k == "pg.batches_flushed");
178        assert_eq!(flushed.unwrap().1, 2.0);
179        let copies = cm.custom.iter().find(|(k, _)| k == "pg.copy_operations");
180        assert_eq!(copies.unwrap().1, 1.0);
181    }
182
183    #[test]
184    fn test_epoch_metrics() {
185        let m = PostgresSinkMetrics::new();
186        m.record_commit();
187        m.record_commit();
188        m.record_rollback();
189
190        let cm = m.to_connector_metrics();
191        let committed = cm.custom.iter().find(|(k, _)| k == "pg.epochs_committed");
192        assert_eq!(committed.unwrap().1, 2.0);
193        let rolled_back = cm.custom.iter().find(|(k, _)| k == "pg.epochs_rolled_back");
194        assert_eq!(rolled_back.unwrap().1, 1.0);
195    }
196
197    #[test]
198    fn test_changelog_deletes() {
199        let m = PostgresSinkMetrics::new();
200        m.record_deletes(50);
201        m.record_deletes(30);
202
203        let cm = m.to_connector_metrics();
204        let deletes = cm.custom.iter().find(|(k, _)| k == "pg.changelog_deletes");
205        assert_eq!(deletes.unwrap().1, 80.0);
206    }
207
208    #[test]
209    fn test_error_counting() {
210        let m = PostgresSinkMetrics::new();
211        m.record_error();
212        m.record_error();
213        m.record_error();
214
215        let cm = m.to_connector_metrics();
216        assert_eq!(cm.errors_total, 3);
217    }
218
219    #[test]
220    fn test_upsert_metric() {
221        let m = PostgresSinkMetrics::new();
222        m.record_upsert();
223
224        let cm = m.to_connector_metrics();
225        let upserts = cm.custom.iter().find(|(k, _)| k == "pg.upsert_operations");
226        assert_eq!(upserts.unwrap().1, 1.0);
227    }
228}