Skip to main content

laminar_connectors/kafka/
sink_metrics.rs

1//! Kafka sink connector metrics.
2
3use prometheus::{IntCounter, IntGauge, Registry};
4
5use crate::prom::reg_or_local;
6
7/// Prometheus-backed counters for Kafka sink connector statistics.
8#[derive(Debug, Clone)]
9pub struct KafkaSinkMetrics {
10    /// Records written to Kafka.
11    pub records_written: IntCounter,
12    /// Bytes written to Kafka (payload only).
13    pub bytes_written: IntCounter,
14    /// Errors encountered.
15    pub errors_total: IntCounter,
16    /// Epochs committed.
17    pub epochs_committed: IntCounter,
18    /// Epochs rolled back.
19    pub epochs_rolled_back: IntCounter,
20    /// Records routed to dead letter queue.
21    pub dlq_records: IntCounter,
22    /// Serialization errors.
23    pub serialization_errors: IntCounter,
24    /// Sum of produce delivery latencies in microseconds.
25    pub produce_latency_sum_us: IntCounter,
26    /// Maximum produce delivery latency in microseconds.
27    pub produce_latency_max_us: IntGauge,
28    /// Number of produce delivery latency samples.
29    pub produce_latency_count: IntCounter,
30}
31
32impl KafkaSinkMetrics {
33    /// All counters start at zero. Registers on `registry` if provided.
34    #[must_use]
35    #[allow(clippy::missing_panics_doc)]
36    pub fn new(registry: Option<&Registry>) -> Self {
37        let mut local = None;
38        let reg = reg_or_local(registry, &mut local);
39
40        Self {
41            records_written: reg.counter(
42                "kafka_sink_records_written_total",
43                "Records written to Kafka",
44            ),
45            bytes_written: reg.counter("kafka_sink_bytes_written_total", "Bytes written to Kafka"),
46            errors_total: reg.counter("kafka_sink_errors_total", "Kafka sink errors"),
47            epochs_committed: reg.counter("kafka_sink_epochs_committed_total", "Epochs committed"),
48            epochs_rolled_back: reg
49                .counter("kafka_sink_epochs_rolled_back_total", "Epochs rolled back"),
50            dlq_records: reg.counter("kafka_sink_dlq_records_total", "Records routed to DLQ"),
51            serialization_errors: reg.counter(
52                "kafka_sink_serialization_errors_total",
53                "Serialization errors",
54            ),
55            produce_latency_sum_us: reg.counter(
56                "kafka_sink_produce_latency_sum_us",
57                "Sum of produce latencies (us)",
58            ),
59            produce_latency_count: reg.counter(
60                "kafka_sink_produce_latency_count",
61                "Produce latency samples",
62            ),
63            produce_latency_max_us: reg.gauge(
64                "kafka_sink_produce_latency_max_us",
65                "Max produce delivery latency (us)",
66            ),
67        }
68    }
69
70    /// Records a successful write of `records` records totaling `bytes`.
71    pub fn record_write(&self, records: u64, bytes: u64) {
72        self.records_written.inc_by(records);
73        self.bytes_written.inc_by(bytes);
74    }
75
76    /// Records a production error.
77    pub fn record_error(&self) {
78        self.errors_total.inc();
79    }
80
81    /// Records a successful epoch commit.
82    pub fn record_commit(&self) {
83        self.epochs_committed.inc();
84    }
85
86    /// Records an epoch rollback.
87    pub fn record_rollback(&self) {
88        self.epochs_rolled_back.inc();
89    }
90
91    /// Records a DLQ routing event.
92    pub fn record_dlq(&self) {
93        self.dlq_records.inc();
94    }
95
96    /// Records a serialization error.
97    pub fn record_serialization_error(&self) {
98        self.serialization_errors.inc();
99    }
100
101    /// Records a produce delivery latency sample in microseconds.
102    #[allow(clippy::cast_possible_wrap)]
103    pub fn record_produce_latency(&self, latency_us: u64) {
104        self.produce_latency_sum_us.inc_by(latency_us);
105        self.produce_latency_count.inc();
106        if latency_us as i64 > self.produce_latency_max_us.get() {
107            self.produce_latency_max_us.set(latency_us as i64);
108        }
109    }
110}
111
112impl Default for KafkaSinkMetrics {
113    fn default() -> Self {
114        Self::new(None)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn test_initial_zeros() {
124        let m = KafkaSinkMetrics::new(None);
125        assert_eq!(m.records_written.get(), 0);
126        assert_eq!(m.bytes_written.get(), 0);
127        assert_eq!(m.errors_total.get(), 0);
128    }
129
130    #[test]
131    fn test_record_write() {
132        let m = KafkaSinkMetrics::new(None);
133        m.record_write(100, 5000);
134        m.record_write(200, 10000);
135        assert_eq!(m.records_written.get(), 300);
136        assert_eq!(m.bytes_written.get(), 15000);
137    }
138
139    #[test]
140    fn test_produce_latency() {
141        let m = KafkaSinkMetrics::new(None);
142        m.record_produce_latency(100);
143        m.record_produce_latency(300);
144        m.record_produce_latency(50);
145
146        assert_eq!(m.produce_latency_count.get(), 3);
147        assert_eq!(m.produce_latency_sum_us.get(), 450);
148        assert_eq!(m.produce_latency_max_us.get(), 300);
149    }
150}