Skip to main content

laminar_connectors/lakehouse/
metrics.rs

1//! Common metrics for Lakehouse sink connectors.
2//!
3//! Provides shared atomic counters for tracking statistics across different
4//! table formats (Delta Lake, Iceberg, Hudi, Paimon).
5
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use crate::metrics::ConnectorMetrics;
9
10/// Atomic counters for Lakehouse sink connector statistics.
11#[derive(Debug)]
12pub struct LakehouseSinkMetrics {
13    /// Total rows flushed to storage (Parquet/ORC/etc.).
14    pub rows_flushed: AtomicU64,
15
16    /// Total bytes written to storage.
17    pub bytes_written: AtomicU64,
18
19    /// Total number of flush operations.
20    pub flush_count: AtomicU64,
21
22    /// Total number of commits (transactions/snapshots).
23    pub commits: AtomicU64,
24
25    /// Total errors encountered.
26    pub errors_total: AtomicU64,
27
28    /// Total epochs rolled back.
29    pub epochs_rolled_back: AtomicU64,
30
31    /// Total changelog DELETE operations processed.
32    pub changelog_deletes: AtomicU64,
33}
34
35impl LakehouseSinkMetrics {
36    /// Creates a new metrics instance with all counters at zero.
37    #[must_use]
38    pub fn new() -> Self {
39        Self {
40            rows_flushed: AtomicU64::new(0),
41            bytes_written: AtomicU64::new(0),
42            flush_count: AtomicU64::new(0),
43            commits: AtomicU64::new(0),
44            errors_total: AtomicU64::new(0),
45            epochs_rolled_back: AtomicU64::new(0),
46            changelog_deletes: AtomicU64::new(0),
47        }
48    }
49
50    /// Records a successful flush of `records` rows totaling `bytes`.
51    pub fn record_flush(&self, records: u64, bytes: u64) {
52        self.rows_flushed.fetch_add(records, Ordering::Relaxed);
53        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
54        self.flush_count.fetch_add(1, Ordering::Relaxed);
55    }
56
57    /// Records a successful commit (transaction/snapshot).
58    pub fn record_commit(&self) {
59        self.commits.fetch_add(1, Ordering::Relaxed);
60    }
61
62    /// Records a write or I/O error.
63    pub fn record_error(&self) {
64        self.errors_total.fetch_add(1, Ordering::Relaxed);
65    }
66
67    /// Records an epoch rollback.
68    pub fn record_rollback(&self) {
69        self.epochs_rolled_back.fetch_add(1, Ordering::Relaxed);
70    }
71
72    /// Records changelog DELETE operations processed.
73    pub fn record_deletes(&self, count: u64) {
74        self.changelog_deletes.fetch_add(count, Ordering::Relaxed);
75    }
76
77    /// Populates standard `ConnectorMetrics` fields and adds common custom metrics with a prefix.
78    ///
79    /// The `prefix` is used for custom metrics keys, e.g., `"{prefix}.flush_count"`.
80    #[allow(clippy::cast_precision_loss)]
81    pub fn populate_metrics(&self, metrics: &mut ConnectorMetrics, prefix: &str) {
82        metrics.records_total = self.rows_flushed.load(Ordering::Relaxed);
83        metrics.bytes_total = self.bytes_written.load(Ordering::Relaxed);
84        metrics.errors_total = self.errors_total.load(Ordering::Relaxed);
85
86        metrics.add_custom(
87            format!("{prefix}.flush_count"),
88            self.flush_count.load(Ordering::Relaxed) as f64,
89        );
90        metrics.add_custom(
91            format!("{prefix}.commits"),
92            self.commits.load(Ordering::Relaxed) as f64,
93        );
94        metrics.add_custom(
95            format!("{prefix}.epochs_rolled_back"),
96            self.epochs_rolled_back.load(Ordering::Relaxed) as f64,
97        );
98        metrics.add_custom(
99            format!("{prefix}.changelog_deletes"),
100            self.changelog_deletes.load(Ordering::Relaxed) as f64,
101        );
102    }
103}
104
105impl Default for LakehouseSinkMetrics {
106    fn default() -> Self {
107        Self::new()
108    }
109}