Skip to main content

laminar_connectors/lakehouse/
delta_metrics.rs

1//! Prometheus-backed Delta Lake sink metrics.
2
3use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Registry};
4
5use super::metrics::LakehouseSinkMetrics;
6use crate::prom::reg_or_local;
7
8/// Prometheus-backed counters/gauges for Delta Lake sink connector statistics.
9#[derive(Debug, Clone)]
10pub struct DeltaLakeSinkMetrics {
11    /// Common metrics (rows flushed, bytes written, commits, etc.).
12    pub common: LakehouseSinkMetrics,
13
14    /// Total MERGE operations (upsert mode).
15    pub merge_operations: IntCounter,
16
17    /// Last Delta Lake table version committed.
18    pub last_delta_version: IntGauge,
19
20    /// Total compaction runs completed.
21    pub compaction_runs: IntCounter,
22
23    /// Total files added by compaction.
24    pub compaction_files_added: IntCounter,
25
26    /// Total files removed by compaction.
27    pub compaction_files_removed: IntCounter,
28
29    /// Total files deleted by vacuum.
30    pub vacuum_files_deleted: IntCounter,
31
32    /// Total optimistic-concurrency conflicts encountered (per retry).
33    pub conflicts: IntCounter,
34
35    /// Total retry attempts kicked off (both conflict and timeout).
36    pub retries: IntCounter,
37
38    /// End-to-end flush duration histogram (concat → write → checkpoint).
39    /// Buckets cover 5ms up to ~160s (0.005 * 2^15).
40    pub flush_duration: Histogram,
41
42    /// Total changelog rows entering collapse (pre-dedup, per upsert flush).
43    pub collapse_rows_in: IntCounter,
44
45    /// Total upsert rows emitted by collapse (`_op = U`).
46    pub collapse_upserts_out: IntCounter,
47
48    /// Total delete rows emitted by collapse (`_op = D`).
49    pub collapse_deletes_out: IntCounter,
50
51    /// Changelog-collapse duration histogram (per upsert flush).
52    /// Buckets cover 100µs up to ~3.3s (0.0001 * 2^15).
53    pub collapse_duration: Histogram,
54}
55
56impl DeltaLakeSinkMetrics {
57    /// Creates a new metrics instance with all counters at zero.
58    #[must_use]
59    #[allow(clippy::missing_panics_doc)]
60    pub fn new(registry: Option<&Registry>) -> Self {
61        let mut local = None;
62        let handle = reg_or_local(registry, &mut local);
63
64        let flush_duration = Histogram::with_opts(
65            HistogramOpts::new(
66                "delta_sink_flush_duration_seconds",
67                "End-to-end Delta Lake flush duration (pre-concat → write → checkpoint)",
68            )
69            .buckets(prometheus::exponential_buckets(0.005, 2.0, 16).unwrap()),
70        )
71        .unwrap();
72        // Best-effort registration (matches `kafka/metrics.rs` pattern):
73        // `AlreadyReg` is benign on re-init; surface anything else so a
74        // dropped histogram doesn't disappear silently.
75        if let Err(e) = handle.registry().register(Box::new(flush_duration.clone())) {
76            tracing::warn!(
77                metric = "delta_sink_flush_duration_seconds",
78                error = %e,
79                "failed to register delta lake flush_duration histogram"
80            );
81        }
82
83        let collapse_duration = Histogram::with_opts(
84            HistogramOpts::new(
85                "delta_sink_collapse_duration_seconds",
86                "Changelog collapse duration per upsert flush (Z-set/CDC dedup)",
87            )
88            .buckets(prometheus::exponential_buckets(0.0001, 2.0, 16).unwrap()),
89        )
90        .unwrap();
91        if let Err(e) = handle
92            .registry()
93            .register(Box::new(collapse_duration.clone()))
94        {
95            tracing::warn!(
96                metric = "delta_sink_collapse_duration_seconds",
97                error = %e,
98                "failed to register delta lake collapse_duration histogram"
99            );
100        }
101
102        Self {
103            common: LakehouseSinkMetrics::new(registry),
104            merge_operations: handle.counter(
105                "delta_sink_merge_operations_total",
106                "Total MERGE operations (upsert)",
107            ),
108            last_delta_version: handle.gauge(
109                "delta_sink_last_version",
110                "Last committed Delta table version",
111            ),
112            compaction_runs: handle
113                .counter("delta_sink_compaction_runs_total", "Total compaction runs"),
114            compaction_files_added: handle.counter(
115                "delta_sink_compaction_files_added_total",
116                "Total files added by compaction",
117            ),
118            compaction_files_removed: handle.counter(
119                "delta_sink_compaction_files_removed_total",
120                "Total files removed by compaction",
121            ),
122            vacuum_files_deleted: handle.counter(
123                "delta_sink_vacuum_files_deleted_total",
124                "Total files deleted by vacuum",
125            ),
126            conflicts: handle.counter(
127                "delta_sink_conflicts_total",
128                "Delta Lake optimistic-concurrency conflicts observed",
129            ),
130            retries: handle.counter(
131                "delta_sink_retries_total",
132                "Retry attempts kicked off (conflict + timeout)",
133            ),
134            flush_duration,
135            collapse_rows_in: handle.counter(
136                "delta_sink_collapse_rows_in_total",
137                "Changelog rows entering collapse (pre-dedup)",
138            ),
139            collapse_upserts_out: handle.counter(
140                "delta_sink_collapse_upserts_out_total",
141                "Upsert rows emitted by collapse (_op = U)",
142            ),
143            collapse_deletes_out: handle.counter(
144                "delta_sink_collapse_deletes_out_total",
145                "Delete rows emitted by collapse (_op = D)",
146            ),
147            collapse_duration,
148        }
149    }
150
151    /// Records a successful flush of `records` rows totaling `bytes`.
152    pub fn record_flush(&self, records: u64, bytes: u64) {
153        self.common.record_flush(records, bytes);
154    }
155
156    /// Records a successful epoch commit.
157    #[allow(clippy::cast_possible_wrap)]
158    pub fn record_commit(&self, delta_version: u64) {
159        self.common.record_commit();
160        self.last_delta_version.set(delta_version as i64);
161    }
162
163    /// Records a write or I/O error.
164    pub fn record_error(&self) {
165        self.common.record_error();
166    }
167
168    /// Records an epoch rollback.
169    pub fn record_rollback(&self) {
170        self.common.record_rollback();
171    }
172
173    /// Records a MERGE operation (upsert mode).
174    pub fn record_merge(&self) {
175        self.merge_operations.inc();
176    }
177
178    /// Records changelog DELETE operations.
179    pub fn record_deletes(&self, count: u64) {
180        self.common.record_deletes(count);
181    }
182
183    /// Records a completed compaction run.
184    pub fn record_compaction(&self, files_added: u64, files_removed: u64) {
185        self.compaction_runs.inc();
186        self.compaction_files_added.inc_by(files_added);
187        self.compaction_files_removed.inc_by(files_removed);
188    }
189
190    /// Records files deleted by vacuum.
191    pub fn record_vacuum(&self, files_deleted: u64) {
192        self.vacuum_files_deleted.inc_by(files_deleted);
193    }
194
195    /// Records an optimistic-concurrency conflict (one per retry-triggering conflict).
196    pub fn record_conflict(&self) {
197        self.conflicts.inc();
198    }
199
200    /// Records a retry attempt.
201    pub fn record_retry(&self) {
202        self.retries.inc();
203    }
204
205    /// Records a completed flush duration (seconds).
206    pub fn observe_flush_duration(&self, seconds: f64) {
207        self.flush_duration.observe(seconds);
208    }
209
210    /// Records one changelog-collapse pass: `rows_in` rows folded down to
211    /// `upserts_out` upserts and `deletes_out` deletes in `seconds`.
212    pub fn observe_collapse(&self, rows_in: u64, upserts_out: u64, deletes_out: u64, seconds: f64) {
213        self.collapse_rows_in.inc_by(rows_in);
214        self.collapse_upserts_out.inc_by(upserts_out);
215        self.collapse_deletes_out.inc_by(deletes_out);
216        self.collapse_duration.observe(seconds);
217    }
218}
219
220impl Default for DeltaLakeSinkMetrics {
221    fn default() -> Self {
222        Self::new(None)
223    }
224}
225
226#[cfg(test)]
227#[allow(clippy::float_cmp)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_initial_zeros() {
233        let m = DeltaLakeSinkMetrics::new(None);
234        assert_eq!(m.common.rows_flushed.get(), 0);
235        assert_eq!(m.common.bytes_written.get(), 0);
236        assert_eq!(m.common.errors_total.get(), 0);
237    }
238
239    #[test]
240    fn test_record_flush() {
241        let m = DeltaLakeSinkMetrics::new(None);
242        m.record_flush(100, 5000);
243        m.record_flush(200, 10_000);
244
245        assert_eq!(m.common.rows_flushed.get(), 300);
246        assert_eq!(m.common.bytes_written.get(), 15_000);
247        assert_eq!(m.common.flush_count.get(), 2);
248    }
249
250    #[test]
251    fn test_record_commit() {
252        let m = DeltaLakeSinkMetrics::new(None);
253        m.record_commit(1);
254        m.record_commit(5);
255
256        assert_eq!(m.common.commits.get(), 2);
257        assert_eq!(m.last_delta_version.get(), 5);
258    }
259
260    #[test]
261    fn test_error_counting() {
262        let m = DeltaLakeSinkMetrics::new(None);
263        m.record_error();
264        m.record_error();
265        m.record_error();
266
267        assert_eq!(m.common.errors_total.get(), 3);
268    }
269
270    #[test]
271    fn test_rollback_counting() {
272        let m = DeltaLakeSinkMetrics::new(None);
273        m.record_rollback();
274        m.record_rollback();
275
276        assert_eq!(m.common.epochs_rolled_back.get(), 2);
277    }
278
279    #[test]
280    fn test_merge_operations() {
281        let m = DeltaLakeSinkMetrics::new(None);
282        m.record_merge();
283
284        assert_eq!(m.merge_operations.get(), 1);
285    }
286
287    #[test]
288    fn test_changelog_deletes() {
289        let m = DeltaLakeSinkMetrics::new(None);
290        m.record_deletes(50);
291        m.record_deletes(30);
292
293        assert_eq!(m.common.changelog_deletes.get(), 80);
294    }
295}