laminar_connectors/lakehouse/
delta_metrics.rs1use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Registry};
4
5use super::metrics::LakehouseSinkMetrics;
6use crate::metrics::ConnectorMetrics;
7
8#[derive(Debug, Clone)]
10pub struct DeltaLakeSinkMetrics {
11 pub common: LakehouseSinkMetrics,
13
14 pub merge_operations: IntCounter,
16
17 pub last_delta_version: IntGauge,
19
20 pub compaction_runs: IntCounter,
22
23 pub compaction_files_added: IntCounter,
25
26 pub compaction_files_removed: IntCounter,
28
29 pub vacuum_files_deleted: IntCounter,
31
32 pub conflicts: IntCounter,
34
35 pub retries: IntCounter,
37
38 pub flush_duration: Histogram,
41}
42
43impl DeltaLakeSinkMetrics {
44 #[must_use]
46 #[allow(clippy::missing_panics_doc)]
47 pub fn new(registry: Option<&Registry>) -> Self {
48 let local;
49 let reg = if let Some(r) = registry {
50 r
51 } else {
52 local = Registry::new();
53 &local
54 };
55
56 let merge_operations = IntCounter::new(
57 "delta_sink_merge_operations_total",
58 "Total MERGE operations (upsert)",
59 )
60 .unwrap();
61 let last_delta_version = IntGauge::new(
62 "delta_sink_last_version",
63 "Last committed Delta table version",
64 )
65 .unwrap();
66 let compaction_runs =
67 IntCounter::new("delta_sink_compaction_runs_total", "Total compaction runs").unwrap();
68 let compaction_files_added = IntCounter::new(
69 "delta_sink_compaction_files_added_total",
70 "Total files added by compaction",
71 )
72 .unwrap();
73 let compaction_files_removed = IntCounter::new(
74 "delta_sink_compaction_files_removed_total",
75 "Total files removed by compaction",
76 )
77 .unwrap();
78 let vacuum_files_deleted = IntCounter::new(
79 "delta_sink_vacuum_files_deleted_total",
80 "Total files deleted by vacuum",
81 )
82 .unwrap();
83 let conflicts = IntCounter::new(
84 "delta_sink_conflicts_total",
85 "Delta Lake optimistic-concurrency conflicts observed",
86 )
87 .unwrap();
88 let retries = IntCounter::new(
89 "delta_sink_retries_total",
90 "Retry attempts kicked off (conflict + timeout)",
91 )
92 .unwrap();
93 let flush_duration = Histogram::with_opts(
94 HistogramOpts::new(
95 "delta_sink_flush_duration_seconds",
96 "End-to-end Delta Lake flush duration (pre-concat → write → checkpoint)",
97 )
98 .buckets(prometheus::exponential_buckets(0.005, 2.0, 16).unwrap()),
99 )
100 .unwrap();
101
102 let _ = reg.register(Box::new(merge_operations.clone()));
103 let _ = reg.register(Box::new(last_delta_version.clone()));
104 let _ = reg.register(Box::new(compaction_runs.clone()));
105 let _ = reg.register(Box::new(compaction_files_added.clone()));
106 let _ = reg.register(Box::new(compaction_files_removed.clone()));
107 let _ = reg.register(Box::new(vacuum_files_deleted.clone()));
108 let _ = reg.register(Box::new(conflicts.clone()));
109 let _ = reg.register(Box::new(retries.clone()));
110 let _ = reg.register(Box::new(flush_duration.clone()));
111
112 Self {
113 common: LakehouseSinkMetrics::new(registry),
114 merge_operations,
115 last_delta_version,
116 compaction_runs,
117 compaction_files_added,
118 compaction_files_removed,
119 vacuum_files_deleted,
120 conflicts,
121 retries,
122 flush_duration,
123 }
124 }
125
126 pub fn record_flush(&self, records: u64, bytes: u64) {
128 self.common.record_flush(records, bytes);
129 }
130
131 #[allow(clippy::cast_possible_wrap)]
133 pub fn record_commit(&self, delta_version: u64) {
134 self.common.record_commit();
135 self.last_delta_version.set(delta_version as i64);
136 }
137
138 pub fn record_error(&self) {
140 self.common.record_error();
141 }
142
143 pub fn record_rollback(&self) {
145 self.common.record_rollback();
146 }
147
148 pub fn record_merge(&self) {
150 self.merge_operations.inc();
151 }
152
153 pub fn record_deletes(&self, count: u64) {
155 self.common.record_deletes(count);
156 }
157
158 pub fn record_compaction(&self, files_added: u64, files_removed: u64) {
160 self.compaction_runs.inc();
161 self.compaction_files_added.inc_by(files_added);
162 self.compaction_files_removed.inc_by(files_removed);
163 }
164
165 pub fn record_vacuum(&self, files_deleted: u64) {
167 self.vacuum_files_deleted.inc_by(files_deleted);
168 }
169
170 pub fn record_conflict(&self) {
172 self.conflicts.inc();
173 }
174
175 pub fn record_retry(&self) {
177 self.retries.inc();
178 }
179
180 pub fn observe_flush_duration(&self, seconds: f64) {
182 self.flush_duration.observe(seconds);
183 }
184
185 #[must_use]
187 #[allow(clippy::cast_precision_loss)]
188 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
189 let mut m = ConnectorMetrics::new();
190 self.common.populate_metrics(&mut m, "delta");
191
192 m.add_custom("delta.merge_operations", self.merge_operations.get() as f64);
193 m.add_custom("delta.last_version", self.last_delta_version.get() as f64);
194 m.add_custom("delta.compaction_runs", self.compaction_runs.get() as f64);
195 m.add_custom(
196 "delta.compaction_files_added",
197 self.compaction_files_added.get() as f64,
198 );
199 m.add_custom(
200 "delta.compaction_files_removed",
201 self.compaction_files_removed.get() as f64,
202 );
203 m.add_custom(
204 "delta.vacuum_files_deleted",
205 self.vacuum_files_deleted.get() as f64,
206 );
207 m.add_custom("delta.conflicts", self.conflicts.get() as f64);
208 m.add_custom("delta.retries", self.retries.get() as f64);
209 m
210 }
211}
212
213impl Default for DeltaLakeSinkMetrics {
214 fn default() -> Self {
215 Self::new(None)
216 }
217}
218
219#[cfg(test)]
220#[allow(clippy::float_cmp)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn test_initial_zeros() {
226 let m = DeltaLakeSinkMetrics::new(None);
227 let cm = m.to_connector_metrics();
228 assert_eq!(cm.records_total, 0);
229 assert_eq!(cm.bytes_total, 0);
230 assert_eq!(cm.errors_total, 0);
231 }
232
233 #[test]
234 fn test_record_flush() {
235 let m = DeltaLakeSinkMetrics::new(None);
236 m.record_flush(100, 5000);
237 m.record_flush(200, 10_000);
238
239 let cm = m.to_connector_metrics();
240 assert_eq!(cm.records_total, 300);
241 assert_eq!(cm.bytes_total, 15_000);
242
243 let flushes = cm.custom.iter().find(|(k, _)| k == "delta.flush_count");
244 assert_eq!(flushes.unwrap().1, 2.0);
245 }
246
247 #[test]
248 fn test_record_commit() {
249 let m = DeltaLakeSinkMetrics::new(None);
250 m.record_commit(1);
251 m.record_commit(5);
252
253 let cm = m.to_connector_metrics();
254 let commits = cm.custom.iter().find(|(k, _)| k == "delta.commits");
255 assert_eq!(commits.unwrap().1, 2.0);
256
257 let version = cm.custom.iter().find(|(k, _)| k == "delta.last_version");
258 assert_eq!(version.unwrap().1, 5.0);
259 }
260
261 #[test]
262 fn test_error_counting() {
263 let m = DeltaLakeSinkMetrics::new(None);
264 m.record_error();
265 m.record_error();
266 m.record_error();
267
268 let cm = m.to_connector_metrics();
269 assert_eq!(cm.errors_total, 3);
270 }
271
272 #[test]
273 fn test_rollback_counting() {
274 let m = DeltaLakeSinkMetrics::new(None);
275 m.record_rollback();
276 m.record_rollback();
277
278 let cm = m.to_connector_metrics();
279 let rolled_back = cm
280 .custom
281 .iter()
282 .find(|(k, _)| k == "delta.epochs_rolled_back");
283 assert_eq!(rolled_back.unwrap().1, 2.0);
284 }
285
286 #[test]
287 fn test_merge_operations() {
288 let m = DeltaLakeSinkMetrics::new(None);
289 m.record_merge();
290
291 let cm = m.to_connector_metrics();
292 let merges = cm
293 .custom
294 .iter()
295 .find(|(k, _)| k == "delta.merge_operations");
296 assert_eq!(merges.unwrap().1, 1.0);
297 }
298
299 #[test]
300 fn test_changelog_deletes() {
301 let m = DeltaLakeSinkMetrics::new(None);
302 m.record_deletes(50);
303 m.record_deletes(30);
304
305 let cm = m.to_connector_metrics();
306 let deletes = cm
307 .custom
308 .iter()
309 .find(|(k, _)| k == "delta.changelog_deletes");
310 assert_eq!(deletes.unwrap().1, 80.0);
311 }
312}