laminar_connectors/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, Default)]
14pub struct ConnectorMetrics {
15 pub records_total: u64,
17
18 pub bytes_total: u64,
20
21 pub errors_total: u64,
23
24 pub lag: u64,
26
27 pub custom: Vec<(String, f64)>,
29}
30
31impl ConnectorMetrics {
32 #[must_use]
34 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn add_custom(&mut self, name: impl Into<String>, value: f64) {
40 self.custom.push((name.into(), value));
41 }
42}
43
44#[derive(Debug)]
49pub struct RuntimeMetrics {
50 pub records_total: AtomicU64,
52
53 pub bytes_total: AtomicU64,
55
56 pub errors_total: AtomicU64,
58
59 pub batches_total: AtomicU64,
61
62 pub checkpoints_total: AtomicU64,
64}
65
66impl RuntimeMetrics {
67 #[must_use]
69 pub fn new() -> Self {
70 Self {
71 records_total: AtomicU64::new(0),
72 bytes_total: AtomicU64::new(0),
73 errors_total: AtomicU64::new(0),
74 batches_total: AtomicU64::new(0),
75 checkpoints_total: AtomicU64::new(0),
76 }
77 }
78
79 pub fn record_batch(&self, record_count: u64, byte_count: u64) {
81 self.records_total
82 .fetch_add(record_count, Ordering::Relaxed);
83 self.bytes_total.fetch_add(byte_count, Ordering::Relaxed);
84 self.batches_total.fetch_add(1, Ordering::Relaxed);
85 }
86
87 pub fn record_error(&self) {
89 self.errors_total.fetch_add(1, Ordering::Relaxed);
90 }
91
92 pub fn record_checkpoint(&self) {
94 self.checkpoints_total.fetch_add(1, Ordering::Relaxed);
95 }
96
97 #[must_use]
99 pub fn snapshot(&self) -> RuntimeMetricsSnapshot {
100 RuntimeMetricsSnapshot {
101 records_total: self.records_total.load(Ordering::Relaxed),
102 bytes_total: self.bytes_total.load(Ordering::Relaxed),
103 errors_total: self.errors_total.load(Ordering::Relaxed),
104 batches_total: self.batches_total.load(Ordering::Relaxed),
105 checkpoints_total: self.checkpoints_total.load(Ordering::Relaxed),
106 }
107 }
108}
109
110impl Default for RuntimeMetrics {
111 fn default() -> Self {
112 Self::new()
113 }
114}
115
116#[derive(Debug, Clone, Default)]
118pub struct RuntimeMetricsSnapshot {
119 pub records_total: u64,
121
122 pub bytes_total: u64,
124
125 pub errors_total: u64,
127
128 pub batches_total: u64,
130
131 pub checkpoints_total: u64,
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn test_connector_metrics() {
141 let mut metrics = ConnectorMetrics::new();
142 metrics.records_total = 1000;
143 metrics.bytes_total = 50_000;
144 metrics.add_custom("kafka.lag", 42.0);
145
146 assert_eq!(metrics.records_total, 1000);
147 assert_eq!(metrics.custom.len(), 1);
148 assert_eq!(metrics.custom[0].0, "kafka.lag");
149 }
150
151 #[test]
152 fn test_runtime_metrics() {
153 let metrics = RuntimeMetrics::new();
154 metrics.record_batch(100, 5000);
155 metrics.record_batch(200, 10000);
156 metrics.record_error();
157 metrics.record_checkpoint();
158
159 let snap = metrics.snapshot();
160 assert_eq!(snap.records_total, 300);
161 assert_eq!(snap.bytes_total, 15000);
162 assert_eq!(snap.errors_total, 1);
163 assert_eq!(snap.batches_total, 2);
164 assert_eq!(snap.checkpoints_total, 1);
165 }
166}