laminar_connectors/lakehouse/
metrics.rs1use prometheus::{IntCounter, Registry};
7
8use crate::metrics::ConnectorMetrics;
9
10#[derive(Debug, Clone)]
12pub struct LakehouseSinkMetrics {
13 pub rows_flushed: IntCounter,
15
16 pub bytes_written: IntCounter,
18
19 pub flush_count: IntCounter,
21
22 pub commits: IntCounter,
24
25 pub errors_total: IntCounter,
27
28 pub epochs_rolled_back: IntCounter,
30
31 pub changelog_deletes: IntCounter,
33}
34
35impl LakehouseSinkMetrics {
36 #[must_use]
38 #[allow(clippy::missing_panics_doc)]
39 pub fn new(registry: Option<&Registry>) -> Self {
40 let local;
41 let reg = if let Some(r) = registry {
42 r
43 } else {
44 local = Registry::new();
45 &local
46 };
47
48 let rows_flushed = IntCounter::new(
49 "lakehouse_sink_rows_flushed_total",
50 "Total rows flushed to storage",
51 )
52 .unwrap();
53 let bytes_written = IntCounter::new(
54 "lakehouse_sink_bytes_written_total",
55 "Total bytes written to storage",
56 )
57 .unwrap();
58 let flush_count =
59 IntCounter::new("lakehouse_sink_flush_count_total", "Total flush operations").unwrap();
60 let commits = IntCounter::new(
61 "lakehouse_sink_commits_total",
62 "Total commits (transactions/snapshots)",
63 )
64 .unwrap();
65 let errors_total =
66 IntCounter::new("lakehouse_sink_errors_total", "Total lakehouse sink errors").unwrap();
67 let epochs_rolled_back = IntCounter::new(
68 "lakehouse_sink_epochs_rolled_back_total",
69 "Total epochs rolled back",
70 )
71 .unwrap();
72 let changelog_deletes = IntCounter::new(
73 "lakehouse_sink_changelog_deletes_total",
74 "Total changelog DELETE operations",
75 )
76 .unwrap();
77
78 let _ = reg.register(Box::new(rows_flushed.clone()));
79 let _ = reg.register(Box::new(bytes_written.clone()));
80 let _ = reg.register(Box::new(flush_count.clone()));
81 let _ = reg.register(Box::new(commits.clone()));
82 let _ = reg.register(Box::new(errors_total.clone()));
83 let _ = reg.register(Box::new(epochs_rolled_back.clone()));
84 let _ = reg.register(Box::new(changelog_deletes.clone()));
85
86 Self {
87 rows_flushed,
88 bytes_written,
89 flush_count,
90 commits,
91 errors_total,
92 epochs_rolled_back,
93 changelog_deletes,
94 }
95 }
96
97 pub fn record_flush(&self, records: u64, bytes: u64) {
99 self.rows_flushed.inc_by(records);
100 self.bytes_written.inc_by(bytes);
101 self.flush_count.inc();
102 }
103
104 pub fn record_commit(&self) {
106 self.commits.inc();
107 }
108
109 pub fn record_error(&self) {
111 self.errors_total.inc();
112 }
113
114 pub fn record_rollback(&self) {
116 self.epochs_rolled_back.inc();
117 }
118
119 pub fn record_deletes(&self, count: u64) {
121 self.changelog_deletes.inc_by(count);
122 }
123
124 #[allow(clippy::cast_precision_loss)]
128 pub fn populate_metrics(&self, metrics: &mut ConnectorMetrics, prefix: &str) {
129 metrics.records_total = self.rows_flushed.get();
130 metrics.bytes_total = self.bytes_written.get();
131 metrics.errors_total = self.errors_total.get();
132
133 metrics.add_custom(
134 format!("{prefix}.flush_count"),
135 self.flush_count.get() as f64,
136 );
137 metrics.add_custom(format!("{prefix}.commits"), self.commits.get() as f64);
138 metrics.add_custom(
139 format!("{prefix}.epochs_rolled_back"),
140 self.epochs_rolled_back.get() as f64,
141 );
142 metrics.add_custom(
143 format!("{prefix}.changelog_deletes"),
144 self.changelog_deletes.get() as f64,
145 );
146 }
147}
148
149impl Default for LakehouseSinkMetrics {
150 fn default() -> Self {
151 Self::new(None)
152 }
153}