Skip to main content

laminar_connectors/lakehouse/
delta_metrics.rs

1//! Prometheus-backed Delta Lake sink metrics.
2
3use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Registry};
4
5use super::metrics::LakehouseSinkMetrics;
6use crate::metrics::ConnectorMetrics;
7
8/// Prometheus-backed counters/gauges for Delta Lake sink connector statistics.
9#[derive(Debug, Clone)]
10pub struct DeltaLakeSinkMetrics {
11    /// Common metrics (rows flushed, bytes written, commits, etc.).
12    pub common: LakehouseSinkMetrics,
13
14    /// Total MERGE operations (upsert mode).
15    pub merge_operations: IntCounter,
16
17    /// Last Delta Lake table version committed.
18    pub last_delta_version: IntGauge,
19
20    /// Total compaction runs completed.
21    pub compaction_runs: IntCounter,
22
23    /// Total files added by compaction.
24    pub compaction_files_added: IntCounter,
25
26    /// Total files removed by compaction.
27    pub compaction_files_removed: IntCounter,
28
29    /// Total files deleted by vacuum.
30    pub vacuum_files_deleted: IntCounter,
31
32    /// Total optimistic-concurrency conflicts encountered (per retry).
33    pub conflicts: IntCounter,
34
35    /// Total retry attempts kicked off (both conflict and timeout).
36    pub retries: IntCounter,
37
38    /// End-to-end flush duration histogram (concat → write → checkpoint).
39    /// Buckets cover 5ms up to ~160s (0.005 * 2^15).
40    pub flush_duration: Histogram,
41}
42
43impl DeltaLakeSinkMetrics {
44    /// Creates a new metrics instance with all counters at zero.
45    #[must_use]
46    #[allow(clippy::missing_panics_doc)]
47    pub fn new(registry: Option<&Registry>) -> Self {
48        let local;
49        let reg = if let Some(r) = registry {
50            r
51        } else {
52            local = Registry::new();
53            &local
54        };
55
56        let merge_operations = IntCounter::new(
57            "delta_sink_merge_operations_total",
58            "Total MERGE operations (upsert)",
59        )
60        .unwrap();
61        let last_delta_version = IntGauge::new(
62            "delta_sink_last_version",
63            "Last committed Delta table version",
64        )
65        .unwrap();
66        let compaction_runs =
67            IntCounter::new("delta_sink_compaction_runs_total", "Total compaction runs").unwrap();
68        let compaction_files_added = IntCounter::new(
69            "delta_sink_compaction_files_added_total",
70            "Total files added by compaction",
71        )
72        .unwrap();
73        let compaction_files_removed = IntCounter::new(
74            "delta_sink_compaction_files_removed_total",
75            "Total files removed by compaction",
76        )
77        .unwrap();
78        let vacuum_files_deleted = IntCounter::new(
79            "delta_sink_vacuum_files_deleted_total",
80            "Total files deleted by vacuum",
81        )
82        .unwrap();
83        let conflicts = IntCounter::new(
84            "delta_sink_conflicts_total",
85            "Delta Lake optimistic-concurrency conflicts observed",
86        )
87        .unwrap();
88        let retries = IntCounter::new(
89            "delta_sink_retries_total",
90            "Retry attempts kicked off (conflict + timeout)",
91        )
92        .unwrap();
93        let flush_duration = Histogram::with_opts(
94            HistogramOpts::new(
95                "delta_sink_flush_duration_seconds",
96                "End-to-end Delta Lake flush duration (pre-concat → write → checkpoint)",
97            )
98            .buckets(prometheus::exponential_buckets(0.005, 2.0, 16).unwrap()),
99        )
100        .unwrap();
101
102        let _ = reg.register(Box::new(merge_operations.clone()));
103        let _ = reg.register(Box::new(last_delta_version.clone()));
104        let _ = reg.register(Box::new(compaction_runs.clone()));
105        let _ = reg.register(Box::new(compaction_files_added.clone()));
106        let _ = reg.register(Box::new(compaction_files_removed.clone()));
107        let _ = reg.register(Box::new(vacuum_files_deleted.clone()));
108        let _ = reg.register(Box::new(conflicts.clone()));
109        let _ = reg.register(Box::new(retries.clone()));
110        let _ = reg.register(Box::new(flush_duration.clone()));
111
112        Self {
113            common: LakehouseSinkMetrics::new(registry),
114            merge_operations,
115            last_delta_version,
116            compaction_runs,
117            compaction_files_added,
118            compaction_files_removed,
119            vacuum_files_deleted,
120            conflicts,
121            retries,
122            flush_duration,
123        }
124    }
125
126    /// Records a successful flush of `records` rows totaling `bytes`.
127    pub fn record_flush(&self, records: u64, bytes: u64) {
128        self.common.record_flush(records, bytes);
129    }
130
131    /// Records a successful epoch commit.
132    #[allow(clippy::cast_possible_wrap)]
133    pub fn record_commit(&self, delta_version: u64) {
134        self.common.record_commit();
135        self.last_delta_version.set(delta_version as i64);
136    }
137
138    /// Records a write or I/O error.
139    pub fn record_error(&self) {
140        self.common.record_error();
141    }
142
143    /// Records an epoch rollback.
144    pub fn record_rollback(&self) {
145        self.common.record_rollback();
146    }
147
148    /// Records a MERGE operation (upsert mode).
149    pub fn record_merge(&self) {
150        self.merge_operations.inc();
151    }
152
153    /// Records changelog DELETE operations.
154    pub fn record_deletes(&self, count: u64) {
155        self.common.record_deletes(count);
156    }
157
158    /// Records a completed compaction run.
159    pub fn record_compaction(&self, files_added: u64, files_removed: u64) {
160        self.compaction_runs.inc();
161        self.compaction_files_added.inc_by(files_added);
162        self.compaction_files_removed.inc_by(files_removed);
163    }
164
165    /// Records files deleted by vacuum.
166    pub fn record_vacuum(&self, files_deleted: u64) {
167        self.vacuum_files_deleted.inc_by(files_deleted);
168    }
169
170    /// Records an optimistic-concurrency conflict (one per retry-triggering conflict).
171    pub fn record_conflict(&self) {
172        self.conflicts.inc();
173    }
174
175    /// Records a retry attempt.
176    pub fn record_retry(&self) {
177        self.retries.inc();
178    }
179
180    /// Records a completed flush duration (seconds).
181    pub fn observe_flush_duration(&self, seconds: f64) {
182        self.flush_duration.observe(seconds);
183    }
184
185    /// Converts to the SDK's [`ConnectorMetrics`].
186    #[must_use]
187    #[allow(clippy::cast_precision_loss)]
188    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
189        let mut m = ConnectorMetrics::new();
190        self.common.populate_metrics(&mut m, "delta");
191
192        m.add_custom("delta.merge_operations", self.merge_operations.get() as f64);
193        m.add_custom("delta.last_version", self.last_delta_version.get() as f64);
194        m.add_custom("delta.compaction_runs", self.compaction_runs.get() as f64);
195        m.add_custom(
196            "delta.compaction_files_added",
197            self.compaction_files_added.get() as f64,
198        );
199        m.add_custom(
200            "delta.compaction_files_removed",
201            self.compaction_files_removed.get() as f64,
202        );
203        m.add_custom(
204            "delta.vacuum_files_deleted",
205            self.vacuum_files_deleted.get() as f64,
206        );
207        m.add_custom("delta.conflicts", self.conflicts.get() as f64);
208        m.add_custom("delta.retries", self.retries.get() as f64);
209        m
210    }
211}
212
213impl Default for DeltaLakeSinkMetrics {
214    fn default() -> Self {
215        Self::new(None)
216    }
217}
218
219#[cfg(test)]
220#[allow(clippy::float_cmp)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn test_initial_zeros() {
226        let m = DeltaLakeSinkMetrics::new(None);
227        let cm = m.to_connector_metrics();
228        assert_eq!(cm.records_total, 0);
229        assert_eq!(cm.bytes_total, 0);
230        assert_eq!(cm.errors_total, 0);
231    }
232
233    #[test]
234    fn test_record_flush() {
235        let m = DeltaLakeSinkMetrics::new(None);
236        m.record_flush(100, 5000);
237        m.record_flush(200, 10_000);
238
239        let cm = m.to_connector_metrics();
240        assert_eq!(cm.records_total, 300);
241        assert_eq!(cm.bytes_total, 15_000);
242
243        let flushes = cm.custom.iter().find(|(k, _)| k == "delta.flush_count");
244        assert_eq!(flushes.unwrap().1, 2.0);
245    }
246
247    #[test]
248    fn test_record_commit() {
249        let m = DeltaLakeSinkMetrics::new(None);
250        m.record_commit(1);
251        m.record_commit(5);
252
253        let cm = m.to_connector_metrics();
254        let commits = cm.custom.iter().find(|(k, _)| k == "delta.commits");
255        assert_eq!(commits.unwrap().1, 2.0);
256
257        let version = cm.custom.iter().find(|(k, _)| k == "delta.last_version");
258        assert_eq!(version.unwrap().1, 5.0);
259    }
260
261    #[test]
262    fn test_error_counting() {
263        let m = DeltaLakeSinkMetrics::new(None);
264        m.record_error();
265        m.record_error();
266        m.record_error();
267
268        let cm = m.to_connector_metrics();
269        assert_eq!(cm.errors_total, 3);
270    }
271
272    #[test]
273    fn test_rollback_counting() {
274        let m = DeltaLakeSinkMetrics::new(None);
275        m.record_rollback();
276        m.record_rollback();
277
278        let cm = m.to_connector_metrics();
279        let rolled_back = cm
280            .custom
281            .iter()
282            .find(|(k, _)| k == "delta.epochs_rolled_back");
283        assert_eq!(rolled_back.unwrap().1, 2.0);
284    }
285
286    #[test]
287    fn test_merge_operations() {
288        let m = DeltaLakeSinkMetrics::new(None);
289        m.record_merge();
290
291        let cm = m.to_connector_metrics();
292        let merges = cm
293            .custom
294            .iter()
295            .find(|(k, _)| k == "delta.merge_operations");
296        assert_eq!(merges.unwrap().1, 1.0);
297    }
298
299    #[test]
300    fn test_changelog_deletes() {
301        let m = DeltaLakeSinkMetrics::new(None);
302        m.record_deletes(50);
303        m.record_deletes(30);
304
305        let cm = m.to_connector_metrics();
306        let deletes = cm
307            .custom
308            .iter()
309            .find(|(k, _)| k == "delta.changelog_deletes");
310        assert_eq!(deletes.unwrap().1, 80.0);
311    }
312}