Skip to main content

laminar_connectors/kafka/
sink_metrics.rs

1//! Kafka sink connector metrics.
2
3use prometheus::{IntCounter, IntGauge, Registry};
4
5use crate::metrics::ConnectorMetrics;
6
7/// Prometheus-backed counters for Kafka sink connector statistics.
8#[derive(Debug, Clone)]
9pub struct KafkaSinkMetrics {
10    /// Records written to Kafka.
11    pub records_written: IntCounter,
12    /// Bytes written to Kafka (payload only).
13    pub bytes_written: IntCounter,
14    /// Errors encountered.
15    pub errors_total: IntCounter,
16    /// Epochs committed.
17    pub epochs_committed: IntCounter,
18    /// Epochs rolled back.
19    pub epochs_rolled_back: IntCounter,
20    /// Records routed to dead letter queue.
21    pub dlq_records: IntCounter,
22    /// Serialization errors.
23    pub serialization_errors: IntCounter,
24    /// Sum of produce delivery latencies in microseconds.
25    pub produce_latency_sum_us: IntCounter,
26    /// Maximum produce delivery latency in microseconds.
27    pub produce_latency_max_us: IntGauge,
28    /// Number of produce delivery latency samples.
29    pub produce_latency_count: IntCounter,
30}
31
32impl KafkaSinkMetrics {
33    /// All counters start at zero. Registers on `registry` if provided.
34    #[must_use]
35    #[allow(clippy::missing_panics_doc)]
36    pub fn new(registry: Option<&Registry>) -> Self {
37        let local;
38        let reg = if let Some(r) = registry {
39            r
40        } else {
41            local = Registry::new();
42            &local
43        };
44
45        macro_rules! reg_c {
46            ($name:expr, $help:expr) => {{
47                let c = IntCounter::new($name, $help).unwrap();
48                let _ = reg.register(Box::new(c.clone()));
49                c
50            }};
51        }
52
53        let produce_latency_max_us = IntGauge::new(
54            "kafka_sink_produce_latency_max_us",
55            "Max produce delivery latency (us)",
56        )
57        .unwrap();
58        let _ = reg.register(Box::new(produce_latency_max_us.clone()));
59
60        Self {
61            records_written: reg_c!(
62                "kafka_sink_records_written_total",
63                "Records written to Kafka"
64            ),
65            bytes_written: reg_c!("kafka_sink_bytes_written_total", "Bytes written to Kafka"),
66            errors_total: reg_c!("kafka_sink_errors_total", "Kafka sink errors"),
67            epochs_committed: reg_c!("kafka_sink_epochs_committed_total", "Epochs committed"),
68            epochs_rolled_back: reg_c!("kafka_sink_epochs_rolled_back_total", "Epochs rolled back"),
69            dlq_records: reg_c!("kafka_sink_dlq_records_total", "Records routed to DLQ"),
70            serialization_errors: reg_c!(
71                "kafka_sink_serialization_errors_total",
72                "Serialization errors"
73            ),
74            produce_latency_sum_us: reg_c!(
75                "kafka_sink_produce_latency_sum_us",
76                "Sum of produce latencies (us)"
77            ),
78            produce_latency_count: reg_c!(
79                "kafka_sink_produce_latency_count",
80                "Produce latency samples"
81            ),
82            produce_latency_max_us,
83        }
84    }
85
86    /// Records a successful write of `records` records totaling `bytes`.
87    pub fn record_write(&self, records: u64, bytes: u64) {
88        self.records_written.inc_by(records);
89        self.bytes_written.inc_by(bytes);
90    }
91
92    /// Records a production error.
93    pub fn record_error(&self) {
94        self.errors_total.inc();
95    }
96
97    /// Records a successful epoch commit.
98    pub fn record_commit(&self) {
99        self.epochs_committed.inc();
100    }
101
102    /// Records an epoch rollback.
103    pub fn record_rollback(&self) {
104        self.epochs_rolled_back.inc();
105    }
106
107    /// Records a DLQ routing event.
108    pub fn record_dlq(&self) {
109        self.dlq_records.inc();
110    }
111
112    /// Records a serialization error.
113    pub fn record_serialization_error(&self) {
114        self.serialization_errors.inc();
115    }
116
117    /// Records a produce delivery latency sample in microseconds.
118    #[allow(clippy::cast_possible_wrap)]
119    pub fn record_produce_latency(&self, latency_us: u64) {
120        self.produce_latency_sum_us.inc_by(latency_us);
121        self.produce_latency_count.inc();
122        if latency_us as i64 > self.produce_latency_max_us.get() {
123            self.produce_latency_max_us.set(latency_us as i64);
124        }
125    }
126
127    /// Converts to the SDK's [`ConnectorMetrics`].
128    #[must_use]
129    #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
130    pub fn to_connector_metrics(&self) -> ConnectorMetrics {
131        let mut m = ConnectorMetrics {
132            records_total: self.records_written.get(),
133            bytes_total: self.bytes_written.get(),
134            errors_total: self.errors_total.get(),
135            lag: 0,
136            custom: Vec::new(),
137        };
138        m.add_custom("kafka.epochs_committed", self.epochs_committed.get() as f64);
139        m.add_custom(
140            "kafka.epochs_rolled_back",
141            self.epochs_rolled_back.get() as f64,
142        );
143        m.add_custom("kafka.dlq_records", self.dlq_records.get() as f64);
144        m.add_custom(
145            "kafka.serialization_errors",
146            self.serialization_errors.get() as f64,
147        );
148        let count = self.produce_latency_count.get();
149        let sum = self.produce_latency_sum_us.get();
150        let max = self.produce_latency_max_us.get() as u64;
151        let avg = if count > 0 {
152            sum as f64 / count as f64
153        } else {
154            0.0
155        };
156        m.add_custom("kafka.produce_latency_avg_us", avg);
157        m.add_custom("kafka.produce_latency_max_us", max as f64);
158        m
159    }
160}
161
162impl Default for KafkaSinkMetrics {
163    fn default() -> Self {
164        Self::new(None)
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn test_initial_zeros() {
174        let m = KafkaSinkMetrics::new(None);
175        let cm = m.to_connector_metrics();
176        assert_eq!(cm.records_total, 0);
177        assert_eq!(cm.bytes_total, 0);
178        assert_eq!(cm.errors_total, 0);
179    }
180
181    #[test]
182    fn test_record_write() {
183        let m = KafkaSinkMetrics::new(None);
184        m.record_write(100, 5000);
185        m.record_write(200, 10000);
186        let cm = m.to_connector_metrics();
187        assert_eq!(cm.records_total, 300);
188        assert_eq!(cm.bytes_total, 15000);
189    }
190
191    #[test]
192    fn test_produce_latency() {
193        let m = KafkaSinkMetrics::new(None);
194        m.record_produce_latency(100);
195        m.record_produce_latency(300);
196        m.record_produce_latency(50);
197
198        assert_eq!(m.produce_latency_count.get(), 3);
199        assert_eq!(m.produce_latency_sum_us.get(), 450);
200        assert_eq!(m.produce_latency_max_us.get(), 300);
201    }
202}