Skip to main content

laminar_connectors/lakehouse/
delta_metrics.rs

1//! Delta Lake sink connector metrics.
2//!
3//! [`DeltaLakeSinkMetrics`] provides lock-free atomic counters for
4//! tracking write statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use super::metrics::LakehouseSinkMetrics;
10use crate::metrics::ConnectorMetrics;
11
12/// Atomic counters for Delta Lake sink connector statistics.
13#[derive(Debug)]
14pub struct DeltaLakeSinkMetrics {
15    /// Common metrics (rows flushed, bytes written, commits, etc.).
16    pub common: LakehouseSinkMetrics,
17
18    /// Total MERGE operations (upsert mode).
19    pub merge_operations: AtomicU64,
20
21    /// Last Delta Lake table version committed.
22    pub last_delta_version: AtomicU64,
23
24    /// Total compaction runs completed.
25    pub compaction_runs: AtomicU64,
26
27    /// Total files added by compaction.
28    pub compaction_files_added: AtomicU64,
29
30    /// Total files removed by compaction.
31    pub compaction_files_removed: AtomicU64,
32
33    /// Total files deleted by vacuum.
34    pub vacuum_files_deleted: AtomicU64,
35}
36
37impl DeltaLakeSinkMetrics {
38    /// Creates a new metrics instance with all counters at zero.
39    #[must_use]
40    pub fn new() -> Self {
41        Self {
42            common: LakehouseSinkMetrics::new(),
43            merge_operations: AtomicU64::new(0),
44            last_delta_version: AtomicU64::new(0),
45            compaction_runs: AtomicU64::new(0),
46            compaction_files_added: AtomicU64::new(0),
47            compaction_files_removed: AtomicU64::new(0),
48            vacuum_files_deleted: AtomicU64::new(0),
49        }
50    }
51
52    /// Records a successful flush of `records` rows totaling `bytes`.
53    pub fn record_flush(&self, records: u64, bytes: u64) {
54        self.common.record_flush(records, bytes);
55    }
56
57    /// Records a successful epoch commit.
58    pub fn record_commit(&self, delta_version: u64) {
59        self.common.record_commit();
60        self.last_delta_version
61            .store(delta_version, Ordering::Relaxed);
62    }
63
64    /// Records a write or I/O error.
65    pub fn record_error(&self) {
66        self.common.record_error();
67    }
68
69    /// Records an epoch rollback.
70    pub fn record_rollback(&self) {
71        self.common.record_rollback();
72    }
73
74    /// Records a MERGE operation (upsert mode).
75    pub fn record_merge(&self) {
76        self.merge_operations.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Records changelog DELETE operations.
80    pub fn record_deletes(&self, count: u64) {
81        self.common.record_deletes(count);
82    }
83
84    /// Records a completed compaction run.
85    pub fn record_compaction(&self, files_added: u64, files_removed: u64) {
86        self.compaction_runs.fetch_add(1, Ordering::Relaxed);
87        self.compaction_files_added
88            .fetch_add(files_added, Ordering::Relaxed);
89        self.compaction_files_removed
90            .fetch_add(files_removed, Ordering::Relaxed);
91    }
92
93    /// Records files deleted by vacuum.
94    pub fn record_vacuum(&self, files_deleted: u64) {
95        self.vacuum_files_deleted
96            .fetch_add(files_deleted, Ordering::Relaxed);
97    }
98
99    /// Converts to the SDK's [`ConnectorMetrics`].
100    #[must_use]
101    #[allow(clippy::cast_precision_loss)]
102    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
103        let mut m = ConnectorMetrics::new();
104        self.common.populate_metrics(&mut m, "delta");
105
106        m.add_custom(
107            "delta.merge_operations",
108            self.merge_operations.load(Ordering::Relaxed) as f64,
109        );
110        m.add_custom(
111            "delta.last_version",
112            self.last_delta_version.load(Ordering::Relaxed) as f64,
113        );
114        m.add_custom(
115            "delta.compaction_runs",
116            self.compaction_runs.load(Ordering::Relaxed) as f64,
117        );
118        m.add_custom(
119            "delta.compaction_files_added",
120            self.compaction_files_added.load(Ordering::Relaxed) as f64,
121        );
122        m.add_custom(
123            "delta.compaction_files_removed",
124            self.compaction_files_removed.load(Ordering::Relaxed) as f64,
125        );
126        m.add_custom(
127            "delta.vacuum_files_deleted",
128            self.vacuum_files_deleted.load(Ordering::Relaxed) as f64,
129        );
130        m
131    }
132}
133
134impl Default for DeltaLakeSinkMetrics {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140#[cfg(test)]
141#[allow(clippy::float_cmp)]
142mod tests {
143    use super::*;
144
145    #[test]
146    fn test_initial_zeros() {
147        let m = DeltaLakeSinkMetrics::new();
148        let cm = m.to_connector_metrics();
149        assert_eq!(cm.records_total, 0);
150        assert_eq!(cm.bytes_total, 0);
151        assert_eq!(cm.errors_total, 0);
152    }
153
154    #[test]
155    fn test_record_flush() {
156        let m = DeltaLakeSinkMetrics::new();
157        m.record_flush(100, 5000);
158        m.record_flush(200, 10_000);
159
160        let cm = m.to_connector_metrics();
161        assert_eq!(cm.records_total, 300);
162        assert_eq!(cm.bytes_total, 15_000);
163
164        let flushes = cm.custom.iter().find(|(k, _)| k == "delta.flush_count");
165        assert_eq!(flushes.unwrap().1, 2.0);
166    }
167
168    #[test]
169    fn test_record_commit() {
170        let m = DeltaLakeSinkMetrics::new();
171        m.record_commit(1);
172        m.record_commit(5);
173
174        let cm = m.to_connector_metrics();
175        let commits = cm.custom.iter().find(|(k, _)| k == "delta.commits");
176        assert_eq!(commits.unwrap().1, 2.0);
177
178        let version = cm.custom.iter().find(|(k, _)| k == "delta.last_version");
179        assert_eq!(version.unwrap().1, 5.0);
180    }
181
182    #[test]
183    fn test_error_counting() {
184        let m = DeltaLakeSinkMetrics::new();
185        m.record_error();
186        m.record_error();
187        m.record_error();
188
189        let cm = m.to_connector_metrics();
190        assert_eq!(cm.errors_total, 3);
191    }
192
193    #[test]
194    fn test_rollback_counting() {
195        let m = DeltaLakeSinkMetrics::new();
196        m.record_rollback();
197        m.record_rollback();
198
199        let cm = m.to_connector_metrics();
200        let rolled_back = cm
201            .custom
202            .iter()
203            .find(|(k, _)| k == "delta.epochs_rolled_back");
204        assert_eq!(rolled_back.unwrap().1, 2.0);
205    }
206
207    #[test]
208    fn test_merge_operations() {
209        let m = DeltaLakeSinkMetrics::new();
210        m.record_merge();
211
212        let cm = m.to_connector_metrics();
213        let merges = cm
214            .custom
215            .iter()
216            .find(|(k, _)| k == "delta.merge_operations");
217        assert_eq!(merges.unwrap().1, 1.0);
218    }
219
220    #[test]
221    fn test_changelog_deletes() {
222        let m = DeltaLakeSinkMetrics::new();
223        m.record_deletes(50);
224        m.record_deletes(30);
225
226        let cm = m.to_connector_metrics();
227        let deletes = cm
228            .custom
229            .iter()
230            .find(|(k, _)| k == "delta.changelog_deletes");
231        assert_eq!(deletes.unwrap().1, 80.0);
232    }
233}