laminar_connectors/lakehouse/
delta_metrics.rs1use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Registry};
4
5use super::metrics::LakehouseSinkMetrics;
6use crate::prom::reg_or_local;
7
8#[derive(Debug, Clone)]
10pub struct DeltaLakeSinkMetrics {
11 pub common: LakehouseSinkMetrics,
13
14 pub merge_operations: IntCounter,
16
17 pub last_delta_version: IntGauge,
19
20 pub compaction_runs: IntCounter,
22
23 pub compaction_files_added: IntCounter,
25
26 pub compaction_files_removed: IntCounter,
28
29 pub vacuum_files_deleted: IntCounter,
31
32 pub conflicts: IntCounter,
34
35 pub retries: IntCounter,
37
38 pub flush_duration: Histogram,
41
42 pub collapse_rows_in: IntCounter,
44
45 pub collapse_upserts_out: IntCounter,
47
48 pub collapse_deletes_out: IntCounter,
50
51 pub collapse_duration: Histogram,
54}
55
56impl DeltaLakeSinkMetrics {
57 #[must_use]
59 #[allow(clippy::missing_panics_doc)]
60 pub fn new(registry: Option<&Registry>) -> Self {
61 let mut local = None;
62 let handle = reg_or_local(registry, &mut local);
63
64 let flush_duration = Histogram::with_opts(
65 HistogramOpts::new(
66 "delta_sink_flush_duration_seconds",
67 "End-to-end Delta Lake flush duration (pre-concat → write → checkpoint)",
68 )
69 .buckets(prometheus::exponential_buckets(0.005, 2.0, 16).unwrap()),
70 )
71 .unwrap();
72 if let Err(e) = handle.registry().register(Box::new(flush_duration.clone())) {
76 tracing::warn!(
77 metric = "delta_sink_flush_duration_seconds",
78 error = %e,
79 "failed to register delta lake flush_duration histogram"
80 );
81 }
82
83 let collapse_duration = Histogram::with_opts(
84 HistogramOpts::new(
85 "delta_sink_collapse_duration_seconds",
86 "Changelog collapse duration per upsert flush (Z-set/CDC dedup)",
87 )
88 .buckets(prometheus::exponential_buckets(0.0001, 2.0, 16).unwrap()),
89 )
90 .unwrap();
91 if let Err(e) = handle
92 .registry()
93 .register(Box::new(collapse_duration.clone()))
94 {
95 tracing::warn!(
96 metric = "delta_sink_collapse_duration_seconds",
97 error = %e,
98 "failed to register delta lake collapse_duration histogram"
99 );
100 }
101
102 Self {
103 common: LakehouseSinkMetrics::new(registry),
104 merge_operations: handle.counter(
105 "delta_sink_merge_operations_total",
106 "Total MERGE operations (upsert)",
107 ),
108 last_delta_version: handle.gauge(
109 "delta_sink_last_version",
110 "Last committed Delta table version",
111 ),
112 compaction_runs: handle
113 .counter("delta_sink_compaction_runs_total", "Total compaction runs"),
114 compaction_files_added: handle.counter(
115 "delta_sink_compaction_files_added_total",
116 "Total files added by compaction",
117 ),
118 compaction_files_removed: handle.counter(
119 "delta_sink_compaction_files_removed_total",
120 "Total files removed by compaction",
121 ),
122 vacuum_files_deleted: handle.counter(
123 "delta_sink_vacuum_files_deleted_total",
124 "Total files deleted by vacuum",
125 ),
126 conflicts: handle.counter(
127 "delta_sink_conflicts_total",
128 "Delta Lake optimistic-concurrency conflicts observed",
129 ),
130 retries: handle.counter(
131 "delta_sink_retries_total",
132 "Retry attempts kicked off (conflict + timeout)",
133 ),
134 flush_duration,
135 collapse_rows_in: handle.counter(
136 "delta_sink_collapse_rows_in_total",
137 "Changelog rows entering collapse (pre-dedup)",
138 ),
139 collapse_upserts_out: handle.counter(
140 "delta_sink_collapse_upserts_out_total",
141 "Upsert rows emitted by collapse (_op = U)",
142 ),
143 collapse_deletes_out: handle.counter(
144 "delta_sink_collapse_deletes_out_total",
145 "Delete rows emitted by collapse (_op = D)",
146 ),
147 collapse_duration,
148 }
149 }
150
151 pub fn record_flush(&self, records: u64, bytes: u64) {
153 self.common.record_flush(records, bytes);
154 }
155
156 #[allow(clippy::cast_possible_wrap)]
158 pub fn record_commit(&self, delta_version: u64) {
159 self.common.record_commit();
160 self.last_delta_version.set(delta_version as i64);
161 }
162
163 pub fn record_error(&self) {
165 self.common.record_error();
166 }
167
168 pub fn record_rollback(&self) {
170 self.common.record_rollback();
171 }
172
173 pub fn record_merge(&self) {
175 self.merge_operations.inc();
176 }
177
178 pub fn record_deletes(&self, count: u64) {
180 self.common.record_deletes(count);
181 }
182
183 pub fn record_compaction(&self, files_added: u64, files_removed: u64) {
185 self.compaction_runs.inc();
186 self.compaction_files_added.inc_by(files_added);
187 self.compaction_files_removed.inc_by(files_removed);
188 }
189
190 pub fn record_vacuum(&self, files_deleted: u64) {
192 self.vacuum_files_deleted.inc_by(files_deleted);
193 }
194
195 pub fn record_conflict(&self) {
197 self.conflicts.inc();
198 }
199
200 pub fn record_retry(&self) {
202 self.retries.inc();
203 }
204
205 pub fn observe_flush_duration(&self, seconds: f64) {
207 self.flush_duration.observe(seconds);
208 }
209
210 pub fn observe_collapse(&self, rows_in: u64, upserts_out: u64, deletes_out: u64, seconds: f64) {
213 self.collapse_rows_in.inc_by(rows_in);
214 self.collapse_upserts_out.inc_by(upserts_out);
215 self.collapse_deletes_out.inc_by(deletes_out);
216 self.collapse_duration.observe(seconds);
217 }
218}
219
220impl Default for DeltaLakeSinkMetrics {
221 fn default() -> Self {
222 Self::new(None)
223 }
224}
225
226#[cfg(test)]
227#[allow(clippy::float_cmp)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_initial_zeros() {
233 let m = DeltaLakeSinkMetrics::new(None);
234 assert_eq!(m.common.rows_flushed.get(), 0);
235 assert_eq!(m.common.bytes_written.get(), 0);
236 assert_eq!(m.common.errors_total.get(), 0);
237 }
238
239 #[test]
240 fn test_record_flush() {
241 let m = DeltaLakeSinkMetrics::new(None);
242 m.record_flush(100, 5000);
243 m.record_flush(200, 10_000);
244
245 assert_eq!(m.common.rows_flushed.get(), 300);
246 assert_eq!(m.common.bytes_written.get(), 15_000);
247 assert_eq!(m.common.flush_count.get(), 2);
248 }
249
250 #[test]
251 fn test_record_commit() {
252 let m = DeltaLakeSinkMetrics::new(None);
253 m.record_commit(1);
254 m.record_commit(5);
255
256 assert_eq!(m.common.commits.get(), 2);
257 assert_eq!(m.last_delta_version.get(), 5);
258 }
259
260 #[test]
261 fn test_error_counting() {
262 let m = DeltaLakeSinkMetrics::new(None);
263 m.record_error();
264 m.record_error();
265 m.record_error();
266
267 assert_eq!(m.common.errors_total.get(), 3);
268 }
269
270 #[test]
271 fn test_rollback_counting() {
272 let m = DeltaLakeSinkMetrics::new(None);
273 m.record_rollback();
274 m.record_rollback();
275
276 assert_eq!(m.common.epochs_rolled_back.get(), 2);
277 }
278
279 #[test]
280 fn test_merge_operations() {
281 let m = DeltaLakeSinkMetrics::new(None);
282 m.record_merge();
283
284 assert_eq!(m.merge_operations.get(), 1);
285 }
286
287 #[test]
288 fn test_changelog_deletes() {
289 let m = DeltaLakeSinkMetrics::new(None);
290 m.record_deletes(50);
291 m.record_deletes(30);
292
293 assert_eq!(m.common.changelog_deletes.get(), 80);
294 }
295}