Skip to main content

laminar_connectors/kafka/
metrics.rs

1//! Prometheus-backed Kafka source metrics.
2
3use prometheus::{IntCounter, IntGauge, Registry};
4
5use crate::metrics::ConnectorMetrics;
6
7/// Prometheus-backed counters/gauges for Kafka source connector statistics.
8#[derive(Debug, Clone)]
9pub struct KafkaSourceMetrics {
10    /// Total records polled from Kafka.
11    pub records_polled: IntCounter,
12    /// Total bytes polled from Kafka.
13    pub bytes_polled: IntCounter,
14    /// Total deserialization or consumer errors.
15    pub errors: IntCounter,
16    /// Total batches returned from `poll_batch()`.
17    pub batches_polled: IntCounter,
18    /// Total offset commits to Kafka.
19    pub commits: IntCounter,
20    /// Total offset commit failures (broker rejected, timeout, task panic).
21    pub commit_failures: IntCounter,
22    /// Total consumer group rebalances.
23    pub rebalances: IntCounter,
24    /// Consumer lag (sum across all partitions of `high_watermark - current_offset`).
25    pub lag: IntGauge,
26    /// Count of successful Schema Registry discoveries at DDL time.
27    pub sr_discovery_successes: IntCounter,
28    /// Count of Schema Registry discovery failures (HTTP error, parse error).
29    pub sr_discovery_failures: IntCounter,
30    /// Count of Schema Registry discovery timeouts.
31    pub sr_discovery_timeouts: IntCounter,
32}
33
34impl KafkaSourceMetrics {
35    /// If `registry` is `Some`, counters are registered there (visible
36    /// in the Prometheus scrape); otherwise a throwaway registry is used.
37    #[must_use]
38    #[allow(clippy::missing_panics_doc)]
39    pub fn new(registry: Option<&Registry>) -> Self {
40        let local;
41        let reg = if let Some(r) = registry {
42            r
43        } else {
44            local = Registry::new();
45            &local
46        };
47
48        let records_polled = IntCounter::new(
49            "kafka_source_records_polled_total",
50            "Total records polled from Kafka",
51        )
52        .unwrap();
53        let bytes_polled = IntCounter::new(
54            "kafka_source_bytes_polled_total",
55            "Total bytes polled from Kafka",
56        )
57        .unwrap();
58        let errors =
59            IntCounter::new("kafka_source_errors_total", "Total Kafka consumer errors").unwrap();
60        let batches_polled = IntCounter::new(
61            "kafka_source_batches_polled_total",
62            "Total batches polled from Kafka",
63        )
64        .unwrap();
65        let commits = IntCounter::new(
66            "kafka_source_commits_total",
67            "Total offset commits to Kafka",
68        )
69        .unwrap();
70        let commit_failures = IntCounter::new(
71            "kafka_source_commit_failures_total",
72            "Total offset commit failures (broker rejection, timeout, panic)",
73        )
74        .unwrap();
75        let rebalances = IntCounter::new(
76            "kafka_source_rebalances_total",
77            "Total consumer group rebalances",
78        )
79        .unwrap();
80        let lag = IntGauge::new(
81            "kafka_source_consumer_lag",
82            "Consumer lag (sum across partitions)",
83        )
84        .unwrap();
85        let sr_discovery_successes = IntCounter::new(
86            "kafka_source_sr_discovery_successes_total",
87            "Schema Registry discovery successes",
88        )
89        .unwrap();
90        let sr_discovery_failures = IntCounter::new(
91            "kafka_source_sr_discovery_failures_total",
92            "Schema Registry discovery failures",
93        )
94        .unwrap();
95        let sr_discovery_timeouts = IntCounter::new(
96            "kafka_source_sr_discovery_timeouts_total",
97            "Schema Registry discovery timeouts",
98        )
99        .unwrap();
100
101        // Best-effort registration — ignore `AlreadyReg` if another
102        // Kafka source is already on the same registry.
103        let _ = reg.register(Box::new(records_polled.clone()));
104        let _ = reg.register(Box::new(bytes_polled.clone()));
105        let _ = reg.register(Box::new(errors.clone()));
106        let _ = reg.register(Box::new(batches_polled.clone()));
107        let _ = reg.register(Box::new(commits.clone()));
108        let _ = reg.register(Box::new(commit_failures.clone()));
109        let _ = reg.register(Box::new(rebalances.clone()));
110        let _ = reg.register(Box::new(lag.clone()));
111        let _ = reg.register(Box::new(sr_discovery_successes.clone()));
112        let _ = reg.register(Box::new(sr_discovery_failures.clone()));
113        let _ = reg.register(Box::new(sr_discovery_timeouts.clone()));
114
115        Self {
116            records_polled,
117            bytes_polled,
118            errors,
119            batches_polled,
120            commits,
121            commit_failures,
122            rebalances,
123            lag,
124            sr_discovery_successes,
125            sr_discovery_failures,
126            sr_discovery_timeouts,
127        }
128    }
129
130    /// Records a successful poll of `records` records totaling `bytes`.
131    pub fn record_poll(&self, records: u64, bytes: u64) {
132        self.records_polled.inc_by(records);
133        self.bytes_polled.inc_by(bytes);
134        self.batches_polled.inc();
135    }
136
137    /// Records a consumer or deserialization error.
138    pub fn record_error(&self) {
139        self.errors.inc();
140    }
141
142    /// Records a successful offset commit.
143    pub fn record_commit(&self) {
144        self.commits.inc();
145    }
146
147    /// Records an offset commit failure (broker rejection, timeout, panic).
148    pub fn record_commit_failure(&self) {
149        self.commit_failures.inc();
150    }
151
152    /// Records a consumer group rebalance event.
153    pub fn record_rebalance(&self) {
154        self.rebalances.inc();
155    }
156
157    /// Updates the consumer lag value.
158    #[allow(clippy::cast_possible_wrap)]
159    pub fn set_lag(&self, lag: u64) {
160        self.lag.set(lag as i64);
161    }
162
163    /// Records a successful Schema Registry discovery at DDL time.
164    pub fn record_sr_discovery_success(&self) {
165        self.sr_discovery_successes.inc();
166    }
167
168    /// Records a Schema Registry discovery failure.
169    pub fn record_sr_discovery_failure(&self) {
170        self.sr_discovery_failures.inc();
171    }
172
173    /// Records a Schema Registry discovery timeout.
174    pub fn record_sr_discovery_timeout(&self) {
175        self.sr_discovery_timeouts.inc();
176    }
177
178    /// Converts to the SDK's [`ConnectorMetrics`].
179    #[must_use]
180    #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
181    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
182        let mut m = ConnectorMetrics {
183            records_total: self.records_polled.get(),
184            bytes_total: self.bytes_polled.get(),
185            errors_total: self.errors.get(),
186            lag: self.lag.get() as u64,
187            custom: Vec::new(),
188        };
189        m.add_custom("kafka.batches_polled", self.batches_polled.get() as f64);
190        m.add_custom("kafka.commits", self.commits.get() as f64);
191        m.add_custom("kafka.commit_failures", self.commit_failures.get() as f64);
192        m.add_custom("kafka.rebalances", self.rebalances.get() as f64);
193        m.add_custom(
194            "kafka.sr_discovery_successes",
195            self.sr_discovery_successes.get() as f64,
196        );
197        m.add_custom(
198            "kafka.sr_discovery_failures",
199            self.sr_discovery_failures.get() as f64,
200        );
201        m.add_custom(
202            "kafka.sr_discovery_timeouts",
203            self.sr_discovery_timeouts.get() as f64,
204        );
205        m
206    }
207}
208
209impl Default for KafkaSourceMetrics {
210    fn default() -> Self {
211        Self::new(None)
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_initial_zeros() {
221        let m = KafkaSourceMetrics::new(None);
222        let cm = m.to_connector_metrics();
223        assert_eq!(cm.records_total, 0);
224        assert_eq!(cm.bytes_total, 0);
225        assert_eq!(cm.errors_total, 0);
226    }
227
228    #[test]
229    fn test_record_poll() {
230        let m = KafkaSourceMetrics::new(None);
231        m.record_poll(100, 5000);
232        m.record_poll(200, 10000);
233
234        let cm = m.to_connector_metrics();
235        assert_eq!(cm.records_total, 300);
236        assert_eq!(cm.bytes_total, 15000);
237    }
238
239    #[test]
240    fn test_record_error_and_commit() {
241        let m = KafkaSourceMetrics::new(None);
242        m.record_error();
243        m.record_error();
244        m.record_commit();
245
246        let cm = m.to_connector_metrics();
247        assert_eq!(cm.errors_total, 2);
248        // 4 base (batches_polled, commits, commit_failures, rebalances) +
249        // 3 SR-discovery counters.
250        assert_eq!(cm.custom.len(), 7);
251        let commits = cm.custom.iter().find(|(k, _)| k == "kafka.commits");
252        assert_eq!(commits.unwrap().1, 1.0);
253    }
254
255    #[test]
256    fn test_record_commit_failure() {
257        let m = KafkaSourceMetrics::new(None);
258        m.record_commit_failure();
259        m.record_commit_failure();
260
261        let cm = m.to_connector_metrics();
262        let failures = cm.custom.iter().find(|(k, _)| k == "kafka.commit_failures");
263        assert_eq!(failures.unwrap().1, 2.0);
264    }
265
266    #[test]
267    fn test_record_rebalance() {
268        let m = KafkaSourceMetrics::new(None);
269        m.record_rebalance();
270        m.record_rebalance();
271
272        let cm = m.to_connector_metrics();
273        let rebalances = cm.custom.iter().find(|(k, _)| k == "kafka.rebalances");
274        assert_eq!(rebalances.unwrap().1, 2.0);
275    }
276
277    #[test]
278    fn test_set_lag() {
279        let m = KafkaSourceMetrics::new(None);
280        assert_eq!(m.to_connector_metrics().lag, 0);
281
282        m.set_lag(42);
283        assert_eq!(m.to_connector_metrics().lag, 42);
284
285        m.set_lag(100);
286        assert_eq!(m.to_connector_metrics().lag, 100);
287    }
288
289    #[test]
290    fn test_sr_discovery_counters() {
291        let m = KafkaSourceMetrics::new(None);
292        m.record_sr_discovery_success();
293        m.record_sr_discovery_success();
294        m.record_sr_discovery_failure();
295        m.record_sr_discovery_timeout();
296
297        let cm = m.to_connector_metrics();
298        let successes = cm
299            .custom
300            .iter()
301            .find(|(k, _)| k == "kafka.sr_discovery_successes")
302            .unwrap();
303        let failures = cm
304            .custom
305            .iter()
306            .find(|(k, _)| k == "kafka.sr_discovery_failures")
307            .unwrap();
308        let timeouts = cm
309            .custom
310            .iter()
311            .find(|(k, _)| k == "kafka.sr_discovery_timeouts")
312            .unwrap();
313        assert_eq!(successes.1, 2.0);
314        assert_eq!(failures.1, 1.0);
315        assert_eq!(timeouts.1, 1.0);
316    }
317
318    #[test]
319    fn test_lag_computation() {
320        // Simulates 3 partitions: high_watermark - (offset + 1) for each.
321        let partitions = [
322            (1000_i64, 900_i64), // lag = 1000 - (900 + 1) = 99
323            (500, 499),          // lag = 500 - (499 + 1) = 0
324            (2000, 1500),        // lag = 2000 - (1500 + 1) = 499
325        ];
326        let total_lag: u64 = partitions
327            .iter()
328            .map(|(hw, off)| {
329                let lag = hw - (off + 1);
330                if lag > 0 {
331                    lag as u64
332                } else {
333                    0
334                }
335            })
336            .sum();
337        assert_eq!(total_lag, 598);
338
339        let m = KafkaSourceMetrics::new(None);
340        m.set_lag(total_lag);
341        assert_eq!(m.to_connector_metrics().lag, 598);
342    }
343
344    #[test]
345    fn test_registered_on_prometheus_registry() {
346        let reg = Registry::new();
347        let m = KafkaSourceMetrics::new(Some(&reg));
348        m.record_poll(10, 500);
349        m.record_error();
350
351        // Verify the metrics are registered on the registry.
352        let families = reg.gather();
353        let names: Vec<&str> = families
354            .iter()
355            .map(prometheus::proto::MetricFamily::name)
356            .collect();
357        assert!(names.contains(&"kafka_source_records_polled_total"));
358        assert!(names.contains(&"kafka_source_errors_total"));
359    }
360}