Skip to main content

laminar_connectors/
metrics.rs

1//! Connector metrics types.
2//!
3//! Provides metrics reporting for connectors:
4//! - `ConnectorMetrics`: Metrics reported by a connector implementation
5//! - `RuntimeMetrics`: Metrics tracked by the connector runtime
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9/// Metrics reported by a connector implementation.
10///
11/// Connectors return this from their `metrics()` method to expose
12/// internal state to the runtime and monitoring systems.
13#[derive(Debug, Clone, Default)]
14pub struct ConnectorMetrics {
15    /// Total number of records processed.
16    pub records_total: u64,
17
18    /// Total bytes processed.
19    pub bytes_total: u64,
20
21    /// Number of errors encountered.
22    pub errors_total: u64,
23
24    /// Current lag (records behind for sources, pending for sinks).
25    pub lag: u64,
26
27    /// Additional connector-specific metrics.
28    pub custom: Vec<(String, f64)>,
29}
30
31impl ConnectorMetrics {
32    /// Creates empty metrics.
33    #[must_use]
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Adds a custom metric.
39    pub fn add_custom(&mut self, name: impl Into<String>, value: f64) {
40        self.custom.push((name.into(), value));
41    }
42}
43
44/// Metrics tracked by the connector runtime.
45///
46/// These are maintained by the runtime layer wrapping the connector,
47/// providing consistent metrics across all connector types.
48#[derive(Debug)]
49pub struct RuntimeMetrics {
50    /// Total records ingested (sources) or written (sinks).
51    pub records_total: AtomicU64,
52
53    /// Total bytes processed.
54    pub bytes_total: AtomicU64,
55
56    /// Total errors encountered.
57    pub errors_total: AtomicU64,
58
59    /// Total number of batches processed.
60    pub batches_total: AtomicU64,
61
62    /// Total number of checkpoint commits.
63    pub checkpoints_total: AtomicU64,
64}
65
66impl RuntimeMetrics {
67    /// Creates a new runtime metrics instance.
68    #[must_use]
69    pub fn new() -> Self {
70        Self {
71            records_total: AtomicU64::new(0),
72            bytes_total: AtomicU64::new(0),
73            errors_total: AtomicU64::new(0),
74            batches_total: AtomicU64::new(0),
75            checkpoints_total: AtomicU64::new(0),
76        }
77    }
78
79    /// Records that a batch of records was processed.
80    pub fn record_batch(&self, record_count: u64, byte_count: u64) {
81        self.records_total
82            .fetch_add(record_count, Ordering::Relaxed);
83        self.bytes_total.fetch_add(byte_count, Ordering::Relaxed);
84        self.batches_total.fetch_add(1, Ordering::Relaxed);
85    }
86
87    /// Records an error.
88    pub fn record_error(&self) {
89        self.errors_total.fetch_add(1, Ordering::Relaxed);
90    }
91
92    /// Records a checkpoint commit.
93    pub fn record_checkpoint(&self) {
94        self.checkpoints_total.fetch_add(1, Ordering::Relaxed);
95    }
96
97    /// Returns a snapshot of the current metrics.
98    #[must_use]
99    pub fn snapshot(&self) -> RuntimeMetricsSnapshot {
100        RuntimeMetricsSnapshot {
101            records_total: self.records_total.load(Ordering::Relaxed),
102            bytes_total: self.bytes_total.load(Ordering::Relaxed),
103            errors_total: self.errors_total.load(Ordering::Relaxed),
104            batches_total: self.batches_total.load(Ordering::Relaxed),
105            checkpoints_total: self.checkpoints_total.load(Ordering::Relaxed),
106        }
107    }
108}
109
110impl Default for RuntimeMetrics {
111    fn default() -> Self {
112        Self::new()
113    }
114}
115
116/// Point-in-time snapshot of runtime metrics.
117#[derive(Debug, Clone, Default)]
118pub struct RuntimeMetricsSnapshot {
119    /// Total records processed.
120    pub records_total: u64,
121
122    /// Total bytes processed.
123    pub bytes_total: u64,
124
125    /// Total errors.
126    pub errors_total: u64,
127
128    /// Total batches.
129    pub batches_total: u64,
130
131    /// Total checkpoints.
132    pub checkpoints_total: u64,
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn test_connector_metrics() {
141        let mut metrics = ConnectorMetrics::new();
142        metrics.records_total = 1000;
143        metrics.bytes_total = 50_000;
144        metrics.add_custom("kafka.lag", 42.0);
145
146        assert_eq!(metrics.records_total, 1000);
147        assert_eq!(metrics.custom.len(), 1);
148        assert_eq!(metrics.custom[0].0, "kafka.lag");
149    }
150
151    #[test]
152    fn test_runtime_metrics() {
153        let metrics = RuntimeMetrics::new();
154        metrics.record_batch(100, 5000);
155        metrics.record_batch(200, 10000);
156        metrics.record_error();
157        metrics.record_checkpoint();
158
159        let snap = metrics.snapshot();
160        assert_eq!(snap.records_total, 300);
161        assert_eq!(snap.bytes_total, 15000);
162        assert_eq!(snap.errors_total, 1);
163        assert_eq!(snap.batches_total, 2);
164        assert_eq!(snap.checkpoints_total, 1);
165    }
166}