laminar_connectors/lakehouse/
metrics.rs1use prometheus::{IntCounter, Registry};
7
8use crate::prom::reg_or_local;
9
10#[derive(Debug, Clone)]
12pub struct LakehouseSinkMetrics {
13 pub rows_flushed: IntCounter,
15
16 pub bytes_written: IntCounter,
18
19 pub flush_count: IntCounter,
21
22 pub commits: IntCounter,
24
25 pub errors_total: IntCounter,
27
28 pub epochs_rolled_back: IntCounter,
30
31 pub changelog_deletes: IntCounter,
33}
34
35impl LakehouseSinkMetrics {
36 #[must_use]
38 #[allow(clippy::missing_panics_doc)]
39 pub fn new(registry: Option<&Registry>) -> Self {
40 let mut local = None;
41 let reg = reg_or_local(registry, &mut local);
42
43 Self {
44 rows_flushed: reg.counter(
45 "lakehouse_sink_rows_flushed_total",
46 "Total rows flushed to storage",
47 ),
48 bytes_written: reg.counter(
49 "lakehouse_sink_bytes_written_total",
50 "Total bytes written to storage",
51 ),
52 flush_count: reg.counter("lakehouse_sink_flush_count_total", "Total flush operations"),
53 commits: reg.counter(
54 "lakehouse_sink_commits_total",
55 "Total commits (transactions/snapshots)",
56 ),
57 errors_total: reg.counter("lakehouse_sink_errors_total", "Total lakehouse sink errors"),
58 epochs_rolled_back: reg.counter(
59 "lakehouse_sink_epochs_rolled_back_total",
60 "Total epochs rolled back",
61 ),
62 changelog_deletes: reg.counter(
63 "lakehouse_sink_changelog_deletes_total",
64 "Total changelog DELETE operations",
65 ),
66 }
67 }
68
69 pub fn record_flush(&self, records: u64, bytes: u64) {
71 self.rows_flushed.inc_by(records);
72 self.bytes_written.inc_by(bytes);
73 self.flush_count.inc();
74 }
75
76 pub fn record_commit(&self) {
78 self.commits.inc();
79 }
80
81 pub fn record_error(&self) {
83 self.errors_total.inc();
84 }
85
86 pub fn record_rollback(&self) {
88 self.epochs_rolled_back.inc();
89 }
90
91 pub fn record_deletes(&self, count: u64) {
93 self.changelog_deletes.inc_by(count);
94 }
95}
96
97impl Default for LakehouseSinkMetrics {
98 fn default() -> Self {
99 Self::new(None)
100 }
101}