Skip to main content

laminar_connectors/lakehouse/
metrics.rs

1//! Common metrics for Lakehouse sink connectors.
2//!
3//! Provides shared prometheus-backed counters for tracking statistics across
4//! different table formats (Delta Lake, Iceberg, Hudi, Paimon).
5
6use prometheus::{IntCounter, Registry};
7
8use crate::prom::reg_or_local;
9
10/// Prometheus-backed counters for Lakehouse sink connector statistics.
11#[derive(Debug, Clone)]
12pub struct LakehouseSinkMetrics {
13    /// Total rows flushed to storage (Parquet/ORC/etc.).
14    pub rows_flushed: IntCounter,
15
16    /// Total bytes written to storage.
17    pub bytes_written: IntCounter,
18
19    /// Total number of flush operations.
20    pub flush_count: IntCounter,
21
22    /// Total number of commits (transactions/snapshots).
23    pub commits: IntCounter,
24
25    /// Total errors encountered.
26    pub errors_total: IntCounter,
27
28    /// Total epochs rolled back.
29    pub epochs_rolled_back: IntCounter,
30
31    /// Total changelog DELETE operations processed.
32    pub changelog_deletes: IntCounter,
33}
34
35impl LakehouseSinkMetrics {
36    /// Creates a new metrics instance with all counters at zero.
37    #[must_use]
38    #[allow(clippy::missing_panics_doc)]
39    pub fn new(registry: Option<&Registry>) -> Self {
40        let mut local = None;
41        let reg = reg_or_local(registry, &mut local);
42
43        Self {
44            rows_flushed: reg.counter(
45                "lakehouse_sink_rows_flushed_total",
46                "Total rows flushed to storage",
47            ),
48            bytes_written: reg.counter(
49                "lakehouse_sink_bytes_written_total",
50                "Total bytes written to storage",
51            ),
52            flush_count: reg.counter("lakehouse_sink_flush_count_total", "Total flush operations"),
53            commits: reg.counter(
54                "lakehouse_sink_commits_total",
55                "Total commits (transactions/snapshots)",
56            ),
57            errors_total: reg.counter("lakehouse_sink_errors_total", "Total lakehouse sink errors"),
58            epochs_rolled_back: reg.counter(
59                "lakehouse_sink_epochs_rolled_back_total",
60                "Total epochs rolled back",
61            ),
62            changelog_deletes: reg.counter(
63                "lakehouse_sink_changelog_deletes_total",
64                "Total changelog DELETE operations",
65            ),
66        }
67    }
68
69    /// Records a successful flush of `records` rows totaling `bytes`.
70    pub fn record_flush(&self, records: u64, bytes: u64) {
71        self.rows_flushed.inc_by(records);
72        self.bytes_written.inc_by(bytes);
73        self.flush_count.inc();
74    }
75
76    /// Records a successful commit (transaction/snapshot).
77    pub fn record_commit(&self) {
78        self.commits.inc();
79    }
80
81    /// Records a write or I/O error.
82    pub fn record_error(&self) {
83        self.errors_total.inc();
84    }
85
86    /// Records an epoch rollback.
87    pub fn record_rollback(&self) {
88        self.epochs_rolled_back.inc();
89    }
90
91    /// Records changelog DELETE operations processed.
92    pub fn record_deletes(&self, count: u64) {
93        self.changelog_deletes.inc_by(count);
94    }
95}
96
97impl Default for LakehouseSinkMetrics {
98    fn default() -> Self {
99        Self::new(None)
100    }
101}