laminar_connectors/lakehouse/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
7
8use crate::metrics::ConnectorMetrics;
9
10#[derive(Debug)]
12pub struct LakehouseSinkMetrics {
13 pub rows_flushed: AtomicU64,
15
16 pub bytes_written: AtomicU64,
18
19 pub flush_count: AtomicU64,
21
22 pub commits: AtomicU64,
24
25 pub errors_total: AtomicU64,
27
28 pub epochs_rolled_back: AtomicU64,
30
31 pub changelog_deletes: AtomicU64,
33}
34
35impl LakehouseSinkMetrics {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 rows_flushed: AtomicU64::new(0),
41 bytes_written: AtomicU64::new(0),
42 flush_count: AtomicU64::new(0),
43 commits: AtomicU64::new(0),
44 errors_total: AtomicU64::new(0),
45 epochs_rolled_back: AtomicU64::new(0),
46 changelog_deletes: AtomicU64::new(0),
47 }
48 }
49
50 pub fn record_flush(&self, records: u64, bytes: u64) {
52 self.rows_flushed.fetch_add(records, Ordering::Relaxed);
53 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
54 self.flush_count.fetch_add(1, Ordering::Relaxed);
55 }
56
57 pub fn record_commit(&self) {
59 self.commits.fetch_add(1, Ordering::Relaxed);
60 }
61
62 pub fn record_error(&self) {
64 self.errors_total.fetch_add(1, Ordering::Relaxed);
65 }
66
67 pub fn record_rollback(&self) {
69 self.epochs_rolled_back.fetch_add(1, Ordering::Relaxed);
70 }
71
72 pub fn record_deletes(&self, count: u64) {
74 self.changelog_deletes.fetch_add(count, Ordering::Relaxed);
75 }
76
77 #[allow(clippy::cast_precision_loss)]
81 pub fn populate_metrics(&self, metrics: &mut ConnectorMetrics, prefix: &str) {
82 metrics.records_total = self.rows_flushed.load(Ordering::Relaxed);
83 metrics.bytes_total = self.bytes_written.load(Ordering::Relaxed);
84 metrics.errors_total = self.errors_total.load(Ordering::Relaxed);
85
86 metrics.add_custom(
87 format!("{prefix}.flush_count"),
88 self.flush_count.load(Ordering::Relaxed) as f64,
89 );
90 metrics.add_custom(
91 format!("{prefix}.commits"),
92 self.commits.load(Ordering::Relaxed) as f64,
93 );
94 metrics.add_custom(
95 format!("{prefix}.epochs_rolled_back"),
96 self.epochs_rolled_back.load(Ordering::Relaxed) as f64,
97 );
98 metrics.add_custom(
99 format!("{prefix}.changelog_deletes"),
100 self.changelog_deletes.load(Ordering::Relaxed) as f64,
101 );
102 }
103}
104
105impl Default for LakehouseSinkMetrics {
106 fn default() -> Self {
107 Self::new()
108 }
109}