laminar_connectors/kafka/
sink_metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
4
5use crate::metrics::ConnectorMetrics;
6
7#[derive(Debug, Clone)]
9pub struct KafkaSinkMetrics {
10 pub records_written: IntCounter,
12 pub bytes_written: IntCounter,
14 pub errors_total: IntCounter,
16 pub epochs_committed: IntCounter,
18 pub epochs_rolled_back: IntCounter,
20 pub dlq_records: IntCounter,
22 pub serialization_errors: IntCounter,
24 pub produce_latency_sum_us: IntCounter,
26 pub produce_latency_max_us: IntGauge,
28 pub produce_latency_count: IntCounter,
30}
31
32impl KafkaSinkMetrics {
33 #[must_use]
35 #[allow(clippy::missing_panics_doc)]
36 pub fn new(registry: Option<&Registry>) -> Self {
37 let local;
38 let reg = if let Some(r) = registry {
39 r
40 } else {
41 local = Registry::new();
42 &local
43 };
44
45 macro_rules! reg_c {
46 ($name:expr, $help:expr) => {{
47 let c = IntCounter::new($name, $help).unwrap();
48 let _ = reg.register(Box::new(c.clone()));
49 c
50 }};
51 }
52
53 let produce_latency_max_us = IntGauge::new(
54 "kafka_sink_produce_latency_max_us",
55 "Max produce delivery latency (us)",
56 )
57 .unwrap();
58 let _ = reg.register(Box::new(produce_latency_max_us.clone()));
59
60 Self {
61 records_written: reg_c!(
62 "kafka_sink_records_written_total",
63 "Records written to Kafka"
64 ),
65 bytes_written: reg_c!("kafka_sink_bytes_written_total", "Bytes written to Kafka"),
66 errors_total: reg_c!("kafka_sink_errors_total", "Kafka sink errors"),
67 epochs_committed: reg_c!("kafka_sink_epochs_committed_total", "Epochs committed"),
68 epochs_rolled_back: reg_c!("kafka_sink_epochs_rolled_back_total", "Epochs rolled back"),
69 dlq_records: reg_c!("kafka_sink_dlq_records_total", "Records routed to DLQ"),
70 serialization_errors: reg_c!(
71 "kafka_sink_serialization_errors_total",
72 "Serialization errors"
73 ),
74 produce_latency_sum_us: reg_c!(
75 "kafka_sink_produce_latency_sum_us",
76 "Sum of produce latencies (us)"
77 ),
78 produce_latency_count: reg_c!(
79 "kafka_sink_produce_latency_count",
80 "Produce latency samples"
81 ),
82 produce_latency_max_us,
83 }
84 }
85
86 pub fn record_write(&self, records: u64, bytes: u64) {
88 self.records_written.inc_by(records);
89 self.bytes_written.inc_by(bytes);
90 }
91
92 pub fn record_error(&self) {
94 self.errors_total.inc();
95 }
96
97 pub fn record_commit(&self) {
99 self.epochs_committed.inc();
100 }
101
102 pub fn record_rollback(&self) {
104 self.epochs_rolled_back.inc();
105 }
106
107 pub fn record_dlq(&self) {
109 self.dlq_records.inc();
110 }
111
112 pub fn record_serialization_error(&self) {
114 self.serialization_errors.inc();
115 }
116
117 #[allow(clippy::cast_possible_wrap)]
119 pub fn record_produce_latency(&self, latency_us: u64) {
120 self.produce_latency_sum_us.inc_by(latency_us);
121 self.produce_latency_count.inc();
122 if latency_us as i64 > self.produce_latency_max_us.get() {
123 self.produce_latency_max_us.set(latency_us as i64);
124 }
125 }
126
127 #[must_use]
129 #[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
130 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
131 let mut m = ConnectorMetrics {
132 records_total: self.records_written.get(),
133 bytes_total: self.bytes_written.get(),
134 errors_total: self.errors_total.get(),
135 lag: 0,
136 custom: Vec::new(),
137 };
138 m.add_custom("kafka.epochs_committed", self.epochs_committed.get() as f64);
139 m.add_custom(
140 "kafka.epochs_rolled_back",
141 self.epochs_rolled_back.get() as f64,
142 );
143 m.add_custom("kafka.dlq_records", self.dlq_records.get() as f64);
144 m.add_custom(
145 "kafka.serialization_errors",
146 self.serialization_errors.get() as f64,
147 );
148 let count = self.produce_latency_count.get();
149 let sum = self.produce_latency_sum_us.get();
150 let max = self.produce_latency_max_us.get() as u64;
151 let avg = if count > 0 {
152 sum as f64 / count as f64
153 } else {
154 0.0
155 };
156 m.add_custom("kafka.produce_latency_avg_us", avg);
157 m.add_custom("kafka.produce_latency_max_us", max as f64);
158 m
159 }
160}
161
162impl Default for KafkaSinkMetrics {
163 fn default() -> Self {
164 Self::new(None)
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn test_initial_zeros() {
174 let m = KafkaSinkMetrics::new(None);
175 let cm = m.to_connector_metrics();
176 assert_eq!(cm.records_total, 0);
177 assert_eq!(cm.bytes_total, 0);
178 assert_eq!(cm.errors_total, 0);
179 }
180
181 #[test]
182 fn test_record_write() {
183 let m = KafkaSinkMetrics::new(None);
184 m.record_write(100, 5000);
185 m.record_write(200, 10000);
186 let cm = m.to_connector_metrics();
187 assert_eq!(cm.records_total, 300);
188 assert_eq!(cm.bytes_total, 15000);
189 }
190
191 #[test]
192 fn test_produce_latency() {
193 let m = KafkaSinkMetrics::new(None);
194 m.record_produce_latency(100);
195 m.record_produce_latency(300);
196 m.record_produce_latency(50);
197
198 assert_eq!(m.produce_latency_count.get(), 3);
199 assert_eq!(m.produce_latency_sum_us.get(), 450);
200 assert_eq!(m.produce_latency_max_us.get(), 300);
201 }
202}