Skip to main content

laminar_connectors/lakehouse/
metrics.rs

1//! Common metrics for Lakehouse sink connectors.
2//!
3//! Provides shared prometheus-backed counters for tracking statistics across
4//! different table formats (Delta Lake, Iceberg, Hudi, Paimon).
5
6use prometheus::{IntCounter, Registry};
7
8use crate::metrics::ConnectorMetrics;
9
10/// Prometheus-backed counters for Lakehouse sink connector statistics.
11#[derive(Debug, Clone)]
12pub struct LakehouseSinkMetrics {
13    /// Total rows flushed to storage (Parquet/ORC/etc.).
14    pub rows_flushed: IntCounter,
15
16    /// Total bytes written to storage.
17    pub bytes_written: IntCounter,
18
19    /// Total number of flush operations.
20    pub flush_count: IntCounter,
21
22    /// Total number of commits (transactions/snapshots).
23    pub commits: IntCounter,
24
25    /// Total errors encountered.
26    pub errors_total: IntCounter,
27
28    /// Total epochs rolled back.
29    pub epochs_rolled_back: IntCounter,
30
31    /// Total changelog DELETE operations processed.
32    pub changelog_deletes: IntCounter,
33}
34
35impl LakehouseSinkMetrics {
36    /// Creates a new metrics instance with all counters at zero.
37    #[must_use]
38    #[allow(clippy::missing_panics_doc)]
39    pub fn new(registry: Option<&Registry>) -> Self {
40        let local;
41        let reg = if let Some(r) = registry {
42            r
43        } else {
44            local = Registry::new();
45            &local
46        };
47
48        let rows_flushed = IntCounter::new(
49            "lakehouse_sink_rows_flushed_total",
50            "Total rows flushed to storage",
51        )
52        .unwrap();
53        let bytes_written = IntCounter::new(
54            "lakehouse_sink_bytes_written_total",
55            "Total bytes written to storage",
56        )
57        .unwrap();
58        let flush_count =
59            IntCounter::new("lakehouse_sink_flush_count_total", "Total flush operations").unwrap();
60        let commits = IntCounter::new(
61            "lakehouse_sink_commits_total",
62            "Total commits (transactions/snapshots)",
63        )
64        .unwrap();
65        let errors_total =
66            IntCounter::new("lakehouse_sink_errors_total", "Total lakehouse sink errors").unwrap();
67        let epochs_rolled_back = IntCounter::new(
68            "lakehouse_sink_epochs_rolled_back_total",
69            "Total epochs rolled back",
70        )
71        .unwrap();
72        let changelog_deletes = IntCounter::new(
73            "lakehouse_sink_changelog_deletes_total",
74            "Total changelog DELETE operations",
75        )
76        .unwrap();
77
78        let _ = reg.register(Box::new(rows_flushed.clone()));
79        let _ = reg.register(Box::new(bytes_written.clone()));
80        let _ = reg.register(Box::new(flush_count.clone()));
81        let _ = reg.register(Box::new(commits.clone()));
82        let _ = reg.register(Box::new(errors_total.clone()));
83        let _ = reg.register(Box::new(epochs_rolled_back.clone()));
84        let _ = reg.register(Box::new(changelog_deletes.clone()));
85
86        Self {
87            rows_flushed,
88            bytes_written,
89            flush_count,
90            commits,
91            errors_total,
92            epochs_rolled_back,
93            changelog_deletes,
94        }
95    }
96
97    /// Records a successful flush of `records` rows totaling `bytes`.
98    pub fn record_flush(&self, records: u64, bytes: u64) {
99        self.rows_flushed.inc_by(records);
100        self.bytes_written.inc_by(bytes);
101        self.flush_count.inc();
102    }
103
104    /// Records a successful commit (transaction/snapshot).
105    pub fn record_commit(&self) {
106        self.commits.inc();
107    }
108
109    /// Records a write or I/O error.
110    pub fn record_error(&self) {
111        self.errors_total.inc();
112    }
113
114    /// Records an epoch rollback.
115    pub fn record_rollback(&self) {
116        self.epochs_rolled_back.inc();
117    }
118
119    /// Records changelog DELETE operations processed.
120    pub fn record_deletes(&self, count: u64) {
121        self.changelog_deletes.inc_by(count);
122    }
123
124    /// Populates standard `ConnectorMetrics` fields and adds common custom metrics with a prefix.
125    ///
126    /// The `prefix` is used for custom metrics keys, e.g., `"{prefix}.flush_count"`.
127    #[allow(clippy::cast_precision_loss)]
128    pub fn populate_metrics(&self, metrics: &mut ConnectorMetrics, prefix: &str) {
129        metrics.records_total = self.rows_flushed.get();
130        metrics.bytes_total = self.bytes_written.get();
131        metrics.errors_total = self.errors_total.get();
132
133        metrics.add_custom(
134            format!("{prefix}.flush_count"),
135            self.flush_count.get() as f64,
136        );
137        metrics.add_custom(format!("{prefix}.commits"), self.commits.get() as f64);
138        metrics.add_custom(
139            format!("{prefix}.epochs_rolled_back"),
140            self.epochs_rolled_back.get() as f64,
141        );
142        metrics.add_custom(
143            format!("{prefix}.changelog_deletes"),
144            self.changelog_deletes.get() as f64,
145        );
146    }
147}
148
149impl Default for LakehouseSinkMetrics {
150    fn default() -> Self {
151        Self::new(None)
152    }
153}