laminar_connectors/kafka/
metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
4
5use crate::metrics::ConnectorMetrics;
6
7#[derive(Debug, Clone)]
9pub struct KafkaSourceMetrics {
10 pub records_polled: IntCounter,
12 pub bytes_polled: IntCounter,
14 pub errors: IntCounter,
16 pub batches_polled: IntCounter,
18 pub commits: IntCounter,
20 pub commit_failures: IntCounter,
22 pub rebalances: IntCounter,
24 pub lag: IntGauge,
26 pub sr_discovery_successes: IntCounter,
28 pub sr_discovery_failures: IntCounter,
30 pub sr_discovery_timeouts: IntCounter,
32}
33
34impl KafkaSourceMetrics {
35 #[must_use]
38 #[allow(clippy::missing_panics_doc)]
39 pub fn new(registry: Option<&Registry>) -> Self {
40 let local;
41 let reg = if let Some(r) = registry {
42 r
43 } else {
44 local = Registry::new();
45 &local
46 };
47
48 let records_polled = IntCounter::new(
49 "kafka_source_records_polled_total",
50 "Total records polled from Kafka",
51 )
52 .unwrap();
53 let bytes_polled = IntCounter::new(
54 "kafka_source_bytes_polled_total",
55 "Total bytes polled from Kafka",
56 )
57 .unwrap();
58 let errors =
59 IntCounter::new("kafka_source_errors_total", "Total Kafka consumer errors").unwrap();
60 let batches_polled = IntCounter::new(
61 "kafka_source_batches_polled_total",
62 "Total batches polled from Kafka",
63 )
64 .unwrap();
65 let commits = IntCounter::new(
66 "kafka_source_commits_total",
67 "Total offset commits to Kafka",
68 )
69 .unwrap();
70 let commit_failures = IntCounter::new(
71 "kafka_source_commit_failures_total",
72 "Total offset commit failures (broker rejection, timeout, panic)",
73 )
74 .unwrap();
75 let rebalances = IntCounter::new(
76 "kafka_source_rebalances_total",
77 "Total consumer group rebalances",
78 )
79 .unwrap();
80 let lag = IntGauge::new(
81 "kafka_source_consumer_lag",
82 "Consumer lag (sum across partitions)",
83 )
84 .unwrap();
85 let sr_discovery_successes = IntCounter::new(
86 "kafka_source_sr_discovery_successes_total",
87 "Schema Registry discovery successes",
88 )
89 .unwrap();
90 let sr_discovery_failures = IntCounter::new(
91 "kafka_source_sr_discovery_failures_total",
92 "Schema Registry discovery failures",
93 )
94 .unwrap();
95 let sr_discovery_timeouts = IntCounter::new(
96 "kafka_source_sr_discovery_timeouts_total",
97 "Schema Registry discovery timeouts",
98 )
99 .unwrap();
100
101 let _ = reg.register(Box::new(records_polled.clone()));
104 let _ = reg.register(Box::new(bytes_polled.clone()));
105 let _ = reg.register(Box::new(errors.clone()));
106 let _ = reg.register(Box::new(batches_polled.clone()));
107 let _ = reg.register(Box::new(commits.clone()));
108 let _ = reg.register(Box::new(commit_failures.clone()));
109 let _ = reg.register(Box::new(rebalances.clone()));
110 let _ = reg.register(Box::new(lag.clone()));
111 let _ = reg.register(Box::new(sr_discovery_successes.clone()));
112 let _ = reg.register(Box::new(sr_discovery_failures.clone()));
113 let _ = reg.register(Box::new(sr_discovery_timeouts.clone()));
114
115 Self {
116 records_polled,
117 bytes_polled,
118 errors,
119 batches_polled,
120 commits,
121 commit_failures,
122 rebalances,
123 lag,
124 sr_discovery_successes,
125 sr_discovery_failures,
126 sr_discovery_timeouts,
127 }
128 }
129
130 pub fn record_poll(&self, records: u64, bytes: u64) {
132 self.records_polled.inc_by(records);
133 self.bytes_polled.inc_by(bytes);
134 self.batches_polled.inc();
135 }
136
137 pub fn record_error(&self) {
139 self.errors.inc();
140 }
141
142 pub fn record_commit(&self) {
144 self.commits.inc();
145 }
146
147 pub fn record_commit_failure(&self) {
149 self.commit_failures.inc();
150 }
151
152 pub fn record_rebalance(&self) {
154 self.rebalances.inc();
155 }
156
157 #[allow(clippy::cast_possible_wrap)]
159 pub fn set_lag(&self, lag: u64) {
160 self.lag.set(lag as i64);
161 }
162
163 pub fn record_sr_discovery_success(&self) {
165 self.sr_discovery_successes.inc();
166 }
167
168 pub fn record_sr_discovery_failure(&self) {
170 self.sr_discovery_failures.inc();
171 }
172
173 pub fn record_sr_discovery_timeout(&self) {
175 self.sr_discovery_timeouts.inc();
176 }
177
178 #[must_use]
180 #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
181 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
182 let mut m = ConnectorMetrics {
183 records_total: self.records_polled.get(),
184 bytes_total: self.bytes_polled.get(),
185 errors_total: self.errors.get(),
186 lag: self.lag.get() as u64,
187 custom: Vec::new(),
188 };
189 m.add_custom("kafka.batches_polled", self.batches_polled.get() as f64);
190 m.add_custom("kafka.commits", self.commits.get() as f64);
191 m.add_custom("kafka.commit_failures", self.commit_failures.get() as f64);
192 m.add_custom("kafka.rebalances", self.rebalances.get() as f64);
193 m.add_custom(
194 "kafka.sr_discovery_successes",
195 self.sr_discovery_successes.get() as f64,
196 );
197 m.add_custom(
198 "kafka.sr_discovery_failures",
199 self.sr_discovery_failures.get() as f64,
200 );
201 m.add_custom(
202 "kafka.sr_discovery_timeouts",
203 self.sr_discovery_timeouts.get() as f64,
204 );
205 m
206 }
207}
208
209impl Default for KafkaSourceMetrics {
210 fn default() -> Self {
211 Self::new(None)
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn test_initial_zeros() {
221 let m = KafkaSourceMetrics::new(None);
222 let cm = m.to_connector_metrics();
223 assert_eq!(cm.records_total, 0);
224 assert_eq!(cm.bytes_total, 0);
225 assert_eq!(cm.errors_total, 0);
226 }
227
228 #[test]
229 fn test_record_poll() {
230 let m = KafkaSourceMetrics::new(None);
231 m.record_poll(100, 5000);
232 m.record_poll(200, 10000);
233
234 let cm = m.to_connector_metrics();
235 assert_eq!(cm.records_total, 300);
236 assert_eq!(cm.bytes_total, 15000);
237 }
238
239 #[test]
240 fn test_record_error_and_commit() {
241 let m = KafkaSourceMetrics::new(None);
242 m.record_error();
243 m.record_error();
244 m.record_commit();
245
246 let cm = m.to_connector_metrics();
247 assert_eq!(cm.errors_total, 2);
248 assert_eq!(cm.custom.len(), 7);
251 let commits = cm.custom.iter().find(|(k, _)| k == "kafka.commits");
252 assert_eq!(commits.unwrap().1, 1.0);
253 }
254
255 #[test]
256 fn test_record_commit_failure() {
257 let m = KafkaSourceMetrics::new(None);
258 m.record_commit_failure();
259 m.record_commit_failure();
260
261 let cm = m.to_connector_metrics();
262 let failures = cm.custom.iter().find(|(k, _)| k == "kafka.commit_failures");
263 assert_eq!(failures.unwrap().1, 2.0);
264 }
265
266 #[test]
267 fn test_record_rebalance() {
268 let m = KafkaSourceMetrics::new(None);
269 m.record_rebalance();
270 m.record_rebalance();
271
272 let cm = m.to_connector_metrics();
273 let rebalances = cm.custom.iter().find(|(k, _)| k == "kafka.rebalances");
274 assert_eq!(rebalances.unwrap().1, 2.0);
275 }
276
277 #[test]
278 fn test_set_lag() {
279 let m = KafkaSourceMetrics::new(None);
280 assert_eq!(m.to_connector_metrics().lag, 0);
281
282 m.set_lag(42);
283 assert_eq!(m.to_connector_metrics().lag, 42);
284
285 m.set_lag(100);
286 assert_eq!(m.to_connector_metrics().lag, 100);
287 }
288
289 #[test]
290 fn test_sr_discovery_counters() {
291 let m = KafkaSourceMetrics::new(None);
292 m.record_sr_discovery_success();
293 m.record_sr_discovery_success();
294 m.record_sr_discovery_failure();
295 m.record_sr_discovery_timeout();
296
297 let cm = m.to_connector_metrics();
298 let successes = cm
299 .custom
300 .iter()
301 .find(|(k, _)| k == "kafka.sr_discovery_successes")
302 .unwrap();
303 let failures = cm
304 .custom
305 .iter()
306 .find(|(k, _)| k == "kafka.sr_discovery_failures")
307 .unwrap();
308 let timeouts = cm
309 .custom
310 .iter()
311 .find(|(k, _)| k == "kafka.sr_discovery_timeouts")
312 .unwrap();
313 assert_eq!(successes.1, 2.0);
314 assert_eq!(failures.1, 1.0);
315 assert_eq!(timeouts.1, 1.0);
316 }
317
318 #[test]
319 fn test_lag_computation() {
320 let partitions = [
322 (1000_i64, 900_i64), (500, 499), (2000, 1500), ];
326 let total_lag: u64 = partitions
327 .iter()
328 .map(|(hw, off)| {
329 let lag = hw - (off + 1);
330 if lag > 0 {
331 lag as u64
332 } else {
333 0
334 }
335 })
336 .sum();
337 assert_eq!(total_lag, 598);
338
339 let m = KafkaSourceMetrics::new(None);
340 m.set_lag(total_lag);
341 assert_eq!(m.to_connector_metrics().lag, 598);
342 }
343
344 #[test]
345 fn test_registered_on_prometheus_registry() {
346 let reg = Registry::new();
347 let m = KafkaSourceMetrics::new(Some(®));
348 m.record_poll(10, 500);
349 m.record_error();
350
351 let families = reg.gather();
353 let names: Vec<&str> = families
354 .iter()
355 .map(prometheus::proto::MetricFamily::name)
356 .collect();
357 assert!(names.contains(&"kafka_source_records_polled_total"));
358 assert!(names.contains(&"kafka_source_errors_total"));
359 }
360}