laminar_connectors/kafka/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug)]
13pub struct KafkaSourceMetrics {
14 pub records_polled: AtomicU64,
16 pub bytes_polled: AtomicU64,
18 pub errors: AtomicU64,
20 pub batches_polled: AtomicU64,
22 pub commits: AtomicU64,
24 pub rebalances: AtomicU64,
26}
27
28impl KafkaSourceMetrics {
29 #[must_use]
31 pub fn new() -> Self {
32 Self {
33 records_polled: AtomicU64::new(0),
34 bytes_polled: AtomicU64::new(0),
35 errors: AtomicU64::new(0),
36 batches_polled: AtomicU64::new(0),
37 commits: AtomicU64::new(0),
38 rebalances: AtomicU64::new(0),
39 }
40 }
41
42 pub fn record_poll(&self, records: u64, bytes: u64) {
44 self.records_polled.fetch_add(records, Ordering::Relaxed);
45 self.bytes_polled.fetch_add(bytes, Ordering::Relaxed);
46 self.batches_polled.fetch_add(1, Ordering::Relaxed);
47 }
48
49 pub fn record_error(&self) {
51 self.errors.fetch_add(1, Ordering::Relaxed);
52 }
53
54 pub fn record_commit(&self) {
56 self.commits.fetch_add(1, Ordering::Relaxed);
57 }
58
59 pub fn record_rebalance(&self) {
61 self.rebalances.fetch_add(1, Ordering::Relaxed);
62 }
63
64 #[must_use]
66 #[allow(clippy::cast_precision_loss)]
67 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
68 let mut m = ConnectorMetrics {
69 records_total: self.records_polled.load(Ordering::Relaxed),
70 bytes_total: self.bytes_polled.load(Ordering::Relaxed),
71 errors_total: self.errors.load(Ordering::Relaxed),
72 lag: 0,
73 custom: Vec::new(),
74 };
75 m.add_custom(
76 "kafka.batches_polled",
77 self.batches_polled.load(Ordering::Relaxed) as f64,
78 );
79 m.add_custom("kafka.commits", self.commits.load(Ordering::Relaxed) as f64);
80 m.add_custom(
81 "kafka.rebalances",
82 self.rebalances.load(Ordering::Relaxed) as f64,
83 );
84 m
85 }
86}
87
88impl Default for KafkaSourceMetrics {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97
98 #[test]
99 fn test_initial_zeros() {
100 let m = KafkaSourceMetrics::new();
101 let cm = m.to_connector_metrics();
102 assert_eq!(cm.records_total, 0);
103 assert_eq!(cm.bytes_total, 0);
104 assert_eq!(cm.errors_total, 0);
105 }
106
107 #[test]
108 fn test_record_poll() {
109 let m = KafkaSourceMetrics::new();
110 m.record_poll(100, 5000);
111 m.record_poll(200, 10000);
112
113 let cm = m.to_connector_metrics();
114 assert_eq!(cm.records_total, 300);
115 assert_eq!(cm.bytes_total, 15000);
116 }
117
118 #[test]
119 fn test_record_error_and_commit() {
120 let m = KafkaSourceMetrics::new();
121 m.record_error();
122 m.record_error();
123 m.record_commit();
124
125 let cm = m.to_connector_metrics();
126 assert_eq!(cm.errors_total, 2);
127 assert_eq!(cm.custom.len(), 3);
128 let commits = cm.custom.iter().find(|(k, _)| k == "kafka.commits");
130 assert_eq!(commits.unwrap().1, 1.0);
131 }
132
133 #[test]
134 fn test_record_rebalance() {
135 let m = KafkaSourceMetrics::new();
136 m.record_rebalance();
137 m.record_rebalance();
138
139 let cm = m.to_connector_metrics();
140 let rebalances = cm.custom.iter().find(|(k, _)| k == "kafka.rebalances");
141 assert_eq!(rebalances.unwrap().1, 2.0);
142 }
143}