Skip to main content

laminar_connectors/kafka/
metrics.rs

1//! Kafka source connector metrics.
2//!
3//! [`KafkaSourceMetrics`] provides lock-free atomic counters for
4//! tracking consumption statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Atomic counters for Kafka source connector statistics.
12#[derive(Debug)]
13pub struct KafkaSourceMetrics {
14    /// Total records polled from Kafka.
15    pub records_polled: AtomicU64,
16    /// Total bytes polled from Kafka.
17    pub bytes_polled: AtomicU64,
18    /// Total deserialization or consumer errors.
19    pub errors: AtomicU64,
20    /// Total batches returned from `poll_batch()`.
21    pub batches_polled: AtomicU64,
22    /// Total offset commits to Kafka.
23    pub commits: AtomicU64,
24    /// Total consumer group rebalances.
25    pub rebalances: AtomicU64,
26}
27
28impl KafkaSourceMetrics {
29    /// All counters start at zero.
30    #[must_use]
31    pub fn new() -> Self {
32        Self {
33            records_polled: AtomicU64::new(0),
34            bytes_polled: AtomicU64::new(0),
35            errors: AtomicU64::new(0),
36            batches_polled: AtomicU64::new(0),
37            commits: AtomicU64::new(0),
38            rebalances: AtomicU64::new(0),
39        }
40    }
41
42    /// Records a successful poll of `records` records totaling `bytes`.
43    pub fn record_poll(&self, records: u64, bytes: u64) {
44        self.records_polled.fetch_add(records, Ordering::Relaxed);
45        self.bytes_polled.fetch_add(bytes, Ordering::Relaxed);
46        self.batches_polled.fetch_add(1, Ordering::Relaxed);
47    }
48
49    /// Records a consumer or deserialization error.
50    pub fn record_error(&self) {
51        self.errors.fetch_add(1, Ordering::Relaxed);
52    }
53
54    /// Records a successful offset commit.
55    pub fn record_commit(&self) {
56        self.commits.fetch_add(1, Ordering::Relaxed);
57    }
58
59    /// Records a consumer group rebalance event.
60    pub fn record_rebalance(&self) {
61        self.rebalances.fetch_add(1, Ordering::Relaxed);
62    }
63
64    /// Converts to the SDK's [`ConnectorMetrics`].
65    #[must_use]
66    #[allow(clippy::cast_precision_loss)]
67    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
68        let mut m = ConnectorMetrics {
69            records_total: self.records_polled.load(Ordering::Relaxed),
70            bytes_total: self.bytes_polled.load(Ordering::Relaxed),
71            errors_total: self.errors.load(Ordering::Relaxed),
72            lag: 0,
73            custom: Vec::new(),
74        };
75        m.add_custom(
76            "kafka.batches_polled",
77            self.batches_polled.load(Ordering::Relaxed) as f64,
78        );
79        m.add_custom("kafka.commits", self.commits.load(Ordering::Relaxed) as f64);
80        m.add_custom(
81            "kafka.rebalances",
82            self.rebalances.load(Ordering::Relaxed) as f64,
83        );
84        m
85    }
86}
87
88impl Default for KafkaSourceMetrics {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97
98    #[test]
99    fn test_initial_zeros() {
100        let m = KafkaSourceMetrics::new();
101        let cm = m.to_connector_metrics();
102        assert_eq!(cm.records_total, 0);
103        assert_eq!(cm.bytes_total, 0);
104        assert_eq!(cm.errors_total, 0);
105    }
106
107    #[test]
108    fn test_record_poll() {
109        let m = KafkaSourceMetrics::new();
110        m.record_poll(100, 5000);
111        m.record_poll(200, 10000);
112
113        let cm = m.to_connector_metrics();
114        assert_eq!(cm.records_total, 300);
115        assert_eq!(cm.bytes_total, 15000);
116    }
117
118    #[test]
119    fn test_record_error_and_commit() {
120        let m = KafkaSourceMetrics::new();
121        m.record_error();
122        m.record_error();
123        m.record_commit();
124
125        let cm = m.to_connector_metrics();
126        assert_eq!(cm.errors_total, 2);
127        assert_eq!(cm.custom.len(), 3);
128        // Check custom metrics
129        let commits = cm.custom.iter().find(|(k, _)| k == "kafka.commits");
130        assert_eq!(commits.unwrap().1, 1.0);
131    }
132
133    #[test]
134    fn test_record_rebalance() {
135        let m = KafkaSourceMetrics::new();
136        m.record_rebalance();
137        m.record_rebalance();
138
139        let cm = m.to_connector_metrics();
140        let rebalances = cm.custom.iter().find(|(k, _)| k == "kafka.rebalances");
141        assert_eq!(rebalances.unwrap().1, 2.0);
142    }
143}