laminar_connectors/lakehouse/
delta_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use super::metrics::LakehouseSinkMetrics;
10use crate::metrics::ConnectorMetrics;
11
12#[derive(Debug)]
14pub struct DeltaLakeSinkMetrics {
15 pub common: LakehouseSinkMetrics,
17
18 pub merge_operations: AtomicU64,
20
21 pub last_delta_version: AtomicU64,
23
24 pub compaction_runs: AtomicU64,
26
27 pub compaction_files_added: AtomicU64,
29
30 pub compaction_files_removed: AtomicU64,
32
33 pub vacuum_files_deleted: AtomicU64,
35}
36
37impl DeltaLakeSinkMetrics {
38 #[must_use]
40 pub fn new() -> Self {
41 Self {
42 common: LakehouseSinkMetrics::new(),
43 merge_operations: AtomicU64::new(0),
44 last_delta_version: AtomicU64::new(0),
45 compaction_runs: AtomicU64::new(0),
46 compaction_files_added: AtomicU64::new(0),
47 compaction_files_removed: AtomicU64::new(0),
48 vacuum_files_deleted: AtomicU64::new(0),
49 }
50 }
51
52 pub fn record_flush(&self, records: u64, bytes: u64) {
54 self.common.record_flush(records, bytes);
55 }
56
57 pub fn record_commit(&self, delta_version: u64) {
59 self.common.record_commit();
60 self.last_delta_version
61 .store(delta_version, Ordering::Relaxed);
62 }
63
64 pub fn record_error(&self) {
66 self.common.record_error();
67 }
68
69 pub fn record_rollback(&self) {
71 self.common.record_rollback();
72 }
73
74 pub fn record_merge(&self) {
76 self.merge_operations.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn record_deletes(&self, count: u64) {
81 self.common.record_deletes(count);
82 }
83
84 pub fn record_compaction(&self, files_added: u64, files_removed: u64) {
86 self.compaction_runs.fetch_add(1, Ordering::Relaxed);
87 self.compaction_files_added
88 .fetch_add(files_added, Ordering::Relaxed);
89 self.compaction_files_removed
90 .fetch_add(files_removed, Ordering::Relaxed);
91 }
92
93 pub fn record_vacuum(&self, files_deleted: u64) {
95 self.vacuum_files_deleted
96 .fetch_add(files_deleted, Ordering::Relaxed);
97 }
98
99 #[must_use]
101 #[allow(clippy::cast_precision_loss)]
102 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
103 let mut m = ConnectorMetrics::new();
104 self.common.populate_metrics(&mut m, "delta");
105
106 m.add_custom(
107 "delta.merge_operations",
108 self.merge_operations.load(Ordering::Relaxed) as f64,
109 );
110 m.add_custom(
111 "delta.last_version",
112 self.last_delta_version.load(Ordering::Relaxed) as f64,
113 );
114 m.add_custom(
115 "delta.compaction_runs",
116 self.compaction_runs.load(Ordering::Relaxed) as f64,
117 );
118 m.add_custom(
119 "delta.compaction_files_added",
120 self.compaction_files_added.load(Ordering::Relaxed) as f64,
121 );
122 m.add_custom(
123 "delta.compaction_files_removed",
124 self.compaction_files_removed.load(Ordering::Relaxed) as f64,
125 );
126 m.add_custom(
127 "delta.vacuum_files_deleted",
128 self.vacuum_files_deleted.load(Ordering::Relaxed) as f64,
129 );
130 m
131 }
132}
133
134impl Default for DeltaLakeSinkMetrics {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140#[cfg(test)]
141#[allow(clippy::float_cmp)]
142mod tests {
143 use super::*;
144
145 #[test]
146 fn test_initial_zeros() {
147 let m = DeltaLakeSinkMetrics::new();
148 let cm = m.to_connector_metrics();
149 assert_eq!(cm.records_total, 0);
150 assert_eq!(cm.bytes_total, 0);
151 assert_eq!(cm.errors_total, 0);
152 }
153
154 #[test]
155 fn test_record_flush() {
156 let m = DeltaLakeSinkMetrics::new();
157 m.record_flush(100, 5000);
158 m.record_flush(200, 10_000);
159
160 let cm = m.to_connector_metrics();
161 assert_eq!(cm.records_total, 300);
162 assert_eq!(cm.bytes_total, 15_000);
163
164 let flushes = cm.custom.iter().find(|(k, _)| k == "delta.flush_count");
165 assert_eq!(flushes.unwrap().1, 2.0);
166 }
167
168 #[test]
169 fn test_record_commit() {
170 let m = DeltaLakeSinkMetrics::new();
171 m.record_commit(1);
172 m.record_commit(5);
173
174 let cm = m.to_connector_metrics();
175 let commits = cm.custom.iter().find(|(k, _)| k == "delta.commits");
176 assert_eq!(commits.unwrap().1, 2.0);
177
178 let version = cm.custom.iter().find(|(k, _)| k == "delta.last_version");
179 assert_eq!(version.unwrap().1, 5.0);
180 }
181
182 #[test]
183 fn test_error_counting() {
184 let m = DeltaLakeSinkMetrics::new();
185 m.record_error();
186 m.record_error();
187 m.record_error();
188
189 let cm = m.to_connector_metrics();
190 assert_eq!(cm.errors_total, 3);
191 }
192
193 #[test]
194 fn test_rollback_counting() {
195 let m = DeltaLakeSinkMetrics::new();
196 m.record_rollback();
197 m.record_rollback();
198
199 let cm = m.to_connector_metrics();
200 let rolled_back = cm
201 .custom
202 .iter()
203 .find(|(k, _)| k == "delta.epochs_rolled_back");
204 assert_eq!(rolled_back.unwrap().1, 2.0);
205 }
206
207 #[test]
208 fn test_merge_operations() {
209 let m = DeltaLakeSinkMetrics::new();
210 m.record_merge();
211
212 let cm = m.to_connector_metrics();
213 let merges = cm
214 .custom
215 .iter()
216 .find(|(k, _)| k == "delta.merge_operations");
217 assert_eq!(merges.unwrap().1, 1.0);
218 }
219
220 #[test]
221 fn test_changelog_deletes() {
222 let m = DeltaLakeSinkMetrics::new();
223 m.record_deletes(50);
224 m.record_deletes(30);
225
226 let cm = m.to_connector_metrics();
227 let deletes = cm
228 .custom
229 .iter()
230 .find(|(k, _)| k == "delta.changelog_deletes");
231 assert_eq!(deletes.unwrap().1, 80.0);
232 }
233}