laminar_connectors/kafka/
metrics.rs1use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Opts, Registry};
4
5use crate::prom::reg_or_local;
6
7#[derive(Debug, Clone)]
9pub struct KafkaSourceMetrics {
10 pub records_polled: IntCounter,
12 pub bytes_polled: IntCounter,
14 pub errors: IntCounter,
16 pub batches_polled: IntCounter,
18 pub commits: IntCounter,
20 pub commit_failures_rejected: IntCounter,
22 pub commit_failures_panic: IntCounter,
24 pub commit_failures_enqueue_dropped: IntCounter,
26 pub rebalances: IntCounter,
28 pub lag: IntGauge,
30 pub broker_commit_duration: Histogram,
32 pub sr_discovery_successes: IntCounter,
34 pub sr_discovery_failures: IntCounter,
36 pub sr_discovery_timeouts: IntCounter,
38}
39
40impl KafkaSourceMetrics {
41 #[must_use]
44 #[allow(clippy::missing_panics_doc, clippy::too_many_lines)]
45 pub fn new(registry: Option<&Registry>) -> Self {
46 let mut local = None;
47 let handle = reg_or_local(registry, &mut local);
48 let reg = handle.registry();
49
50 let make_failure = |reason: &str| {
53 let c = IntCounter::with_opts(
54 Opts::new(
55 "kafka_source_commit_failures_total",
56 "Offset commit failures by reason",
57 )
58 .const_label("reason", reason),
59 )
60 .unwrap();
61 let _ = reg.register(Box::new(c.clone()));
64 c
65 };
66 let commit_failures_rejected = make_failure("rejected");
67 let commit_failures_panic = make_failure("panic");
68 let commit_failures_enqueue_dropped = make_failure("enqueue_dropped");
69
70 let broker_commit_duration = Histogram::with_opts(
71 HistogramOpts::new(
72 "kafka_source_broker_commit_duration_seconds",
73 "Duration of broker offset commits, in seconds",
74 )
75 .buckets(prometheus::exponential_buckets(0.01, 4.0, 5).unwrap()),
76 )
77 .unwrap();
78 let _ = reg.register(Box::new(broker_commit_duration.clone()));
79
80 Self {
81 records_polled: handle.counter(
82 "kafka_source_records_polled_total",
83 "Total records polled from Kafka",
84 ),
85 bytes_polled: handle.counter(
86 "kafka_source_bytes_polled_total",
87 "Total bytes polled from Kafka",
88 ),
89 errors: handle.counter("kafka_source_errors_total", "Total Kafka consumer errors"),
90 batches_polled: handle.counter(
91 "kafka_source_batches_polled_total",
92 "Total batches polled from Kafka",
93 ),
94 commits: handle.counter(
95 "kafka_source_commits_total",
96 "Total offset commits to Kafka",
97 ),
98 commit_failures_rejected,
99 commit_failures_panic,
100 commit_failures_enqueue_dropped,
101 rebalances: handle.counter(
102 "kafka_source_rebalances_total",
103 "Total consumer group rebalances",
104 ),
105 lag: handle.gauge(
106 "kafka_source_consumer_lag",
107 "Consumer lag (sum across partitions)",
108 ),
109 broker_commit_duration,
110 sr_discovery_successes: handle.counter(
111 "kafka_source_sr_discovery_successes_total",
112 "Schema Registry discovery successes",
113 ),
114 sr_discovery_failures: handle.counter(
115 "kafka_source_sr_discovery_failures_total",
116 "Schema Registry discovery failures",
117 ),
118 sr_discovery_timeouts: handle.counter(
119 "kafka_source_sr_discovery_timeouts_total",
120 "Schema Registry discovery timeouts",
121 ),
122 }
123 }
124
125 pub fn record_poll(&self, records: u64, bytes: u64) {
127 self.records_polled.inc_by(records);
128 self.bytes_polled.inc_by(bytes);
129 self.batches_polled.inc();
130 }
131
132 pub fn record_error(&self) {
134 self.errors.inc();
135 }
136
137 pub fn record_commit(&self) {
139 self.commits.inc();
140 }
141
142 pub fn record_rebalance(&self) {
144 self.rebalances.inc();
145 }
146
147 #[allow(clippy::cast_possible_wrap)]
149 pub fn set_lag(&self, lag: u64) {
150 self.lag.set(lag as i64);
151 }
152
153 pub fn observe_broker_commit_duration(&self, secs: f64) {
155 self.broker_commit_duration.observe(secs);
156 }
157
158 pub fn record_sr_discovery_success(&self) {
160 self.sr_discovery_successes.inc();
161 }
162
163 pub fn record_sr_discovery_failure(&self) {
165 self.sr_discovery_failures.inc();
166 }
167
168 pub fn record_sr_discovery_timeout(&self) {
170 self.sr_discovery_timeouts.inc();
171 }
172}
173
174impl Default for KafkaSourceMetrics {
175 fn default() -> Self {
176 Self::new(None)
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_initial_zeros() {
186 let m = KafkaSourceMetrics::new(None);
187 assert_eq!(m.records_polled.get(), 0);
188 assert_eq!(m.bytes_polled.get(), 0);
189 assert_eq!(m.errors.get(), 0);
190 }
191
192 #[test]
193 fn test_record_poll() {
194 let m = KafkaSourceMetrics::new(None);
195 m.record_poll(100, 5000);
196 m.record_poll(200, 10000);
197
198 assert_eq!(m.records_polled.get(), 300);
199 assert_eq!(m.bytes_polled.get(), 15000);
200 }
201
202 #[test]
203 fn test_record_error_and_commit() {
204 let m = KafkaSourceMetrics::new(None);
205 m.record_error();
206 m.record_error();
207 m.record_commit();
208
209 assert_eq!(m.errors.get(), 2);
210 assert_eq!(m.commits.get(), 1);
211 }
212
213 #[test]
214 fn test_record_commit_failure() {
215 let m = KafkaSourceMetrics::new(None);
216 m.commit_failures_rejected.inc();
217 m.commit_failures_panic.inc();
218
219 let total = m.commit_failures_rejected.get() + m.commit_failures_panic.get();
220 assert_eq!(total, 2);
221 }
222
223 #[test]
224 fn test_record_rebalance() {
225 let m = KafkaSourceMetrics::new(None);
226 m.record_rebalance();
227 m.record_rebalance();
228
229 assert_eq!(m.rebalances.get(), 2);
230 }
231
232 #[test]
233 fn test_set_lag() {
234 let m = KafkaSourceMetrics::new(None);
235 assert_eq!(m.lag.get(), 0);
236
237 m.set_lag(42);
238 assert_eq!(m.lag.get(), 42);
239
240 m.set_lag(100);
241 assert_eq!(m.lag.get(), 100);
242 }
243
244 #[test]
245 fn test_sr_discovery_counters() {
246 let m = KafkaSourceMetrics::new(None);
247 m.record_sr_discovery_success();
248 m.record_sr_discovery_success();
249 m.record_sr_discovery_failure();
250 m.record_sr_discovery_timeout();
251
252 assert_eq!(m.sr_discovery_successes.get(), 2);
253 assert_eq!(m.sr_discovery_failures.get(), 1);
254 assert_eq!(m.sr_discovery_timeouts.get(), 1);
255 }
256
257 #[test]
258 fn test_lag_computation() {
259 let partitions = [
261 (1000_i64, 900_i64), (500, 499), (2000, 1500), ];
265 let total_lag: u64 = partitions
266 .iter()
267 .map(|(hw, off)| {
268 let lag = hw - (off + 1);
269 if lag > 0 {
270 lag as u64
271 } else {
272 0
273 }
274 })
275 .sum();
276 assert_eq!(total_lag, 598);
277
278 let m = KafkaSourceMetrics::new(None);
279 m.set_lag(total_lag);
280 assert_eq!(m.lag.get(), 598);
281 }
282
283 #[test]
284 fn test_registered_on_prometheus_registry() {
285 let reg = Registry::new();
286 let m = KafkaSourceMetrics::new(Some(®));
287 m.record_poll(10, 500);
288 m.record_error();
289
290 let families = reg.gather();
292 let names: Vec<&str> = families
293 .iter()
294 .map(prometheus::proto::MetricFamily::name)
295 .collect();
296 assert!(names.contains(&"kafka_source_records_polled_total"));
297 assert!(names.contains(&"kafka_source_errors_total"));
298 }
299}