Skip to main content

laminar_connectors/kafka/
metrics.rs

1//! Prometheus-backed Kafka source metrics.
2
3use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Opts, Registry};
4
5use crate::prom::reg_or_local;
6
7/// Prometheus-backed counters/gauges for Kafka source connector statistics.
8#[derive(Debug, Clone)]
9pub struct KafkaSourceMetrics {
10    /// Total records polled from Kafka.
11    pub records_polled: IntCounter,
12    /// Total bytes polled from Kafka.
13    pub bytes_polled: IntCounter,
14    /// Total deserialization or consumer errors.
15    pub errors: IntCounter,
16    /// Total batches returned from `poll_batch()`.
17    pub batches_polled: IntCounter,
18    /// Total offset commits to Kafka.
19    pub commits: IntCounter,
20    /// Broker rejected the commit.
21    pub commit_failures_rejected: IntCounter,
22    /// Commit `spawn_blocking` task panicked.
23    pub commit_failures_panic: IntCounter,
24    /// `notify_epoch_committed` could not enqueue (commit task gone).
25    pub commit_failures_enqueue_dropped: IntCounter,
26    /// Total consumer group rebalances.
27    pub rebalances: IntCounter,
28    /// Consumer lag (sum across all partitions of `high_watermark - current_offset`).
29    pub lag: IntGauge,
30    /// Duration of a broker offset commit, in seconds.
31    pub broker_commit_duration: Histogram,
32    /// Count of successful Schema Registry discoveries at DDL time.
33    pub sr_discovery_successes: IntCounter,
34    /// Count of Schema Registry discovery failures (HTTP error, parse error).
35    pub sr_discovery_failures: IntCounter,
36    /// Count of Schema Registry discovery timeouts.
37    pub sr_discovery_timeouts: IntCounter,
38}
39
40impl KafkaSourceMetrics {
41    /// If `registry` is `Some`, counters are registered there (visible
42    /// in the Prometheus scrape); otherwise a throwaway registry is used.
43    #[must_use]
44    #[allow(clippy::missing_panics_doc, clippy::too_many_lines)]
45    pub fn new(registry: Option<&Registry>) -> Self {
46        let mut local = None;
47        let handle = reg_or_local(registry, &mut local);
48        let reg = handle.registry();
49
50        // Const-label counters (`reason=...`) share a metric name and
51        // can't go through the shared helper.
52        let make_failure = |reason: &str| {
53            let c = IntCounter::with_opts(
54                Opts::new(
55                    "kafka_source_commit_failures_total",
56                    "Offset commit failures by reason",
57                )
58                .const_label("reason", reason),
59            )
60            .unwrap();
61            // Best-effort registration — ignore `AlreadyReg` if another
62            // Kafka source is already on the same registry.
63            let _ = reg.register(Box::new(c.clone()));
64            c
65        };
66        let commit_failures_rejected = make_failure("rejected");
67        let commit_failures_panic = make_failure("panic");
68        let commit_failures_enqueue_dropped = make_failure("enqueue_dropped");
69
70        let broker_commit_duration = Histogram::with_opts(
71            HistogramOpts::new(
72                "kafka_source_broker_commit_duration_seconds",
73                "Duration of broker offset commits, in seconds",
74            )
75            .buckets(prometheus::exponential_buckets(0.01, 4.0, 5).unwrap()),
76        )
77        .unwrap();
78        let _ = reg.register(Box::new(broker_commit_duration.clone()));
79
80        Self {
81            records_polled: handle.counter(
82                "kafka_source_records_polled_total",
83                "Total records polled from Kafka",
84            ),
85            bytes_polled: handle.counter(
86                "kafka_source_bytes_polled_total",
87                "Total bytes polled from Kafka",
88            ),
89            errors: handle.counter("kafka_source_errors_total", "Total Kafka consumer errors"),
90            batches_polled: handle.counter(
91                "kafka_source_batches_polled_total",
92                "Total batches polled from Kafka",
93            ),
94            commits: handle.counter(
95                "kafka_source_commits_total",
96                "Total offset commits to Kafka",
97            ),
98            commit_failures_rejected,
99            commit_failures_panic,
100            commit_failures_enqueue_dropped,
101            rebalances: handle.counter(
102                "kafka_source_rebalances_total",
103                "Total consumer group rebalances",
104            ),
105            lag: handle.gauge(
106                "kafka_source_consumer_lag",
107                "Consumer lag (sum across partitions)",
108            ),
109            broker_commit_duration,
110            sr_discovery_successes: handle.counter(
111                "kafka_source_sr_discovery_successes_total",
112                "Schema Registry discovery successes",
113            ),
114            sr_discovery_failures: handle.counter(
115                "kafka_source_sr_discovery_failures_total",
116                "Schema Registry discovery failures",
117            ),
118            sr_discovery_timeouts: handle.counter(
119                "kafka_source_sr_discovery_timeouts_total",
120                "Schema Registry discovery timeouts",
121            ),
122        }
123    }
124
125    /// Records a successful poll of `records` records totaling `bytes`.
126    pub fn record_poll(&self, records: u64, bytes: u64) {
127        self.records_polled.inc_by(records);
128        self.bytes_polled.inc_by(bytes);
129        self.batches_polled.inc();
130    }
131
132    /// Records a consumer or deserialization error.
133    pub fn record_error(&self) {
134        self.errors.inc();
135    }
136
137    /// Records a successful offset commit.
138    pub fn record_commit(&self) {
139        self.commits.inc();
140    }
141
142    /// Records a consumer group rebalance event.
143    pub fn record_rebalance(&self) {
144        self.rebalances.inc();
145    }
146
147    /// Updates the consumer lag value.
148    #[allow(clippy::cast_possible_wrap)]
149    pub fn set_lag(&self, lag: u64) {
150        self.lag.set(lag as i64);
151    }
152
153    /// Records a single broker offset commit duration in seconds.
154    pub fn observe_broker_commit_duration(&self, secs: f64) {
155        self.broker_commit_duration.observe(secs);
156    }
157
158    /// Records a successful Schema Registry discovery at DDL time.
159    pub fn record_sr_discovery_success(&self) {
160        self.sr_discovery_successes.inc();
161    }
162
163    /// Records a Schema Registry discovery failure.
164    pub fn record_sr_discovery_failure(&self) {
165        self.sr_discovery_failures.inc();
166    }
167
168    /// Records a Schema Registry discovery timeout.
169    pub fn record_sr_discovery_timeout(&self) {
170        self.sr_discovery_timeouts.inc();
171    }
172}
173
174impl Default for KafkaSourceMetrics {
175    fn default() -> Self {
176        Self::new(None)
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_initial_zeros() {
186        let m = KafkaSourceMetrics::new(None);
187        assert_eq!(m.records_polled.get(), 0);
188        assert_eq!(m.bytes_polled.get(), 0);
189        assert_eq!(m.errors.get(), 0);
190    }
191
192    #[test]
193    fn test_record_poll() {
194        let m = KafkaSourceMetrics::new(None);
195        m.record_poll(100, 5000);
196        m.record_poll(200, 10000);
197
198        assert_eq!(m.records_polled.get(), 300);
199        assert_eq!(m.bytes_polled.get(), 15000);
200    }
201
202    #[test]
203    fn test_record_error_and_commit() {
204        let m = KafkaSourceMetrics::new(None);
205        m.record_error();
206        m.record_error();
207        m.record_commit();
208
209        assert_eq!(m.errors.get(), 2);
210        assert_eq!(m.commits.get(), 1);
211    }
212
213    #[test]
214    fn test_record_commit_failure() {
215        let m = KafkaSourceMetrics::new(None);
216        m.commit_failures_rejected.inc();
217        m.commit_failures_panic.inc();
218
219        let total = m.commit_failures_rejected.get() + m.commit_failures_panic.get();
220        assert_eq!(total, 2);
221    }
222
223    #[test]
224    fn test_record_rebalance() {
225        let m = KafkaSourceMetrics::new(None);
226        m.record_rebalance();
227        m.record_rebalance();
228
229        assert_eq!(m.rebalances.get(), 2);
230    }
231
232    #[test]
233    fn test_set_lag() {
234        let m = KafkaSourceMetrics::new(None);
235        assert_eq!(m.lag.get(), 0);
236
237        m.set_lag(42);
238        assert_eq!(m.lag.get(), 42);
239
240        m.set_lag(100);
241        assert_eq!(m.lag.get(), 100);
242    }
243
244    #[test]
245    fn test_sr_discovery_counters() {
246        let m = KafkaSourceMetrics::new(None);
247        m.record_sr_discovery_success();
248        m.record_sr_discovery_success();
249        m.record_sr_discovery_failure();
250        m.record_sr_discovery_timeout();
251
252        assert_eq!(m.sr_discovery_successes.get(), 2);
253        assert_eq!(m.sr_discovery_failures.get(), 1);
254        assert_eq!(m.sr_discovery_timeouts.get(), 1);
255    }
256
257    #[test]
258    fn test_lag_computation() {
259        // Simulates 3 partitions: high_watermark - (offset + 1) for each.
260        let partitions = [
261            (1000_i64, 900_i64), // lag = 1000 - (900 + 1) = 99
262            (500, 499),          // lag = 500 - (499 + 1) = 0
263            (2000, 1500),        // lag = 2000 - (1500 + 1) = 499
264        ];
265        let total_lag: u64 = partitions
266            .iter()
267            .map(|(hw, off)| {
268                let lag = hw - (off + 1);
269                if lag > 0 {
270                    lag as u64
271                } else {
272                    0
273                }
274            })
275            .sum();
276        assert_eq!(total_lag, 598);
277
278        let m = KafkaSourceMetrics::new(None);
279        m.set_lag(total_lag);
280        assert_eq!(m.lag.get(), 598);
281    }
282
283    #[test]
284    fn test_registered_on_prometheus_registry() {
285        let reg = Registry::new();
286        let m = KafkaSourceMetrics::new(Some(&reg));
287        m.record_poll(10, 500);
288        m.record_error();
289
290        // Verify the metrics are registered on the registry.
291        let families = reg.gather();
292        let names: Vec<&str> = families
293            .iter()
294            .map(prometheus::proto::MetricFamily::name)
295            .collect();
296        assert!(names.contains(&"kafka_source_records_polled_total"));
297        assert!(names.contains(&"kafka_source_errors_total"));
298    }
299}