Skip to main content

laminar_connectors/kafka/
sink_metrics.rs

1//! Kafka sink connector metrics.
2//!
3//! [`KafkaSinkMetrics`] provides lock-free atomic counters for
4//! tracking production statistics, convertible to the SDK's
5//! [`ConnectorMetrics`] type.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11/// Atomic counters for Kafka sink connector statistics.
12#[derive(Debug)]
13pub struct KafkaSinkMetrics {
14    /// Total records written to Kafka.
15    pub records_written: AtomicU64,
16    /// Total bytes written to Kafka (payload only).
17    pub bytes_written: AtomicU64,
18    /// Total errors encountered.
19    pub errors_total: AtomicU64,
20    /// Total epochs committed.
21    pub epochs_committed: AtomicU64,
22    /// Total epochs rolled back.
23    pub epochs_rolled_back: AtomicU64,
24    /// Total records routed to dead letter queue.
25    pub dlq_records: AtomicU64,
26    /// Total serialization errors.
27    pub serialization_errors: AtomicU64,
28}
29
30impl KafkaSinkMetrics {
31    /// All counters start at zero.
32    #[must_use]
33    pub fn new() -> Self {
34        Self {
35            records_written: AtomicU64::new(0),
36            bytes_written: AtomicU64::new(0),
37            errors_total: AtomicU64::new(0),
38            epochs_committed: AtomicU64::new(0),
39            epochs_rolled_back: AtomicU64::new(0),
40            dlq_records: AtomicU64::new(0),
41            serialization_errors: AtomicU64::new(0),
42        }
43    }
44
45    /// Records a successful write of `records` records totaling `bytes`.
46    pub fn record_write(&self, records: u64, bytes: u64) {
47        self.records_written.fetch_add(records, Ordering::Relaxed);
48        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
49    }
50
51    /// Records a production or serialization error.
52    pub fn record_error(&self) {
53        self.errors_total.fetch_add(1, Ordering::Relaxed);
54    }
55
56    /// Records a successful epoch commit.
57    pub fn record_commit(&self) {
58        self.epochs_committed.fetch_add(1, Ordering::Relaxed);
59    }
60
61    /// Records an epoch rollback.
62    pub fn record_rollback(&self) {
63        self.epochs_rolled_back.fetch_add(1, Ordering::Relaxed);
64    }
65
66    /// Records a DLQ routing event.
67    pub fn record_dlq(&self) {
68        self.dlq_records.fetch_add(1, Ordering::Relaxed);
69    }
70
71    /// Records a serialization error.
72    pub fn record_serialization_error(&self) {
73        self.serialization_errors.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Converts to the SDK's [`ConnectorMetrics`].
77    #[must_use]
78    #[allow(clippy::cast_precision_loss)]
79    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
80        let mut m = ConnectorMetrics {
81            records_total: self.records_written.load(Ordering::Relaxed),
82            bytes_total: self.bytes_written.load(Ordering::Relaxed),
83            errors_total: self.errors_total.load(Ordering::Relaxed),
84            lag: 0,
85            custom: Vec::new(),
86        };
87        m.add_custom(
88            "kafka.epochs_committed",
89            self.epochs_committed.load(Ordering::Relaxed) as f64,
90        );
91        m.add_custom(
92            "kafka.epochs_rolled_back",
93            self.epochs_rolled_back.load(Ordering::Relaxed) as f64,
94        );
95        m.add_custom(
96            "kafka.dlq_records",
97            self.dlq_records.load(Ordering::Relaxed) as f64,
98        );
99        m.add_custom(
100            "kafka.serialization_errors",
101            self.serialization_errors.load(Ordering::Relaxed) as f64,
102        );
103        m
104    }
105}
106
107impl Default for KafkaSinkMetrics {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn test_initial_zeros() {
119        let m = KafkaSinkMetrics::new();
120        let cm = m.to_connector_metrics();
121        assert_eq!(cm.records_total, 0);
122        assert_eq!(cm.bytes_total, 0);
123        assert_eq!(cm.errors_total, 0);
124    }
125
126    #[test]
127    fn test_record_write() {
128        let m = KafkaSinkMetrics::new();
129        m.record_write(100, 5000);
130        m.record_write(200, 10000);
131
132        let cm = m.to_connector_metrics();
133        assert_eq!(cm.records_total, 300);
134        assert_eq!(cm.bytes_total, 15000);
135    }
136
137    #[test]
138    fn test_epoch_metrics() {
139        let m = KafkaSinkMetrics::new();
140        m.record_commit();
141        m.record_commit();
142        m.record_rollback();
143
144        let cm = m.to_connector_metrics();
145        let committed = cm
146            .custom
147            .iter()
148            .find(|(k, _)| k == "kafka.epochs_committed");
149        assert_eq!(committed.unwrap().1, 2.0);
150        let rolled_back = cm
151            .custom
152            .iter()
153            .find(|(k, _)| k == "kafka.epochs_rolled_back");
154        assert_eq!(rolled_back.unwrap().1, 1.0);
155    }
156
157    #[test]
158    fn test_dlq_and_serde_errors() {
159        let m = KafkaSinkMetrics::new();
160        m.record_dlq();
161        m.record_dlq();
162        m.record_serialization_error();
163        m.record_error();
164
165        let cm = m.to_connector_metrics();
166        assert_eq!(cm.errors_total, 1);
167        let dlq = cm.custom.iter().find(|(k, _)| k == "kafka.dlq_records");
168        assert_eq!(dlq.unwrap().1, 2.0);
169        let serde = cm
170            .custom
171            .iter()
172            .find(|(k, _)| k == "kafka.serialization_errors");
173        assert_eq!(serde.unwrap().1, 1.0);
174    }
175}