laminar_connectors/kafka/
sink_metrics.rs1use prometheus::{IntCounter, IntGauge, Registry};
4
5use crate::prom::reg_or_local;
6
7#[derive(Debug, Clone)]
9pub struct KafkaSinkMetrics {
10 pub records_written: IntCounter,
12 pub bytes_written: IntCounter,
14 pub errors_total: IntCounter,
16 pub epochs_committed: IntCounter,
18 pub epochs_rolled_back: IntCounter,
20 pub dlq_records: IntCounter,
22 pub serialization_errors: IntCounter,
24 pub produce_latency_sum_us: IntCounter,
26 pub produce_latency_max_us: IntGauge,
28 pub produce_latency_count: IntCounter,
30}
31
32impl KafkaSinkMetrics {
33 #[must_use]
35 #[allow(clippy::missing_panics_doc)]
36 pub fn new(registry: Option<&Registry>) -> Self {
37 let mut local = None;
38 let reg = reg_or_local(registry, &mut local);
39
40 Self {
41 records_written: reg.counter(
42 "kafka_sink_records_written_total",
43 "Records written to Kafka",
44 ),
45 bytes_written: reg.counter("kafka_sink_bytes_written_total", "Bytes written to Kafka"),
46 errors_total: reg.counter("kafka_sink_errors_total", "Kafka sink errors"),
47 epochs_committed: reg.counter("kafka_sink_epochs_committed_total", "Epochs committed"),
48 epochs_rolled_back: reg
49 .counter("kafka_sink_epochs_rolled_back_total", "Epochs rolled back"),
50 dlq_records: reg.counter("kafka_sink_dlq_records_total", "Records routed to DLQ"),
51 serialization_errors: reg.counter(
52 "kafka_sink_serialization_errors_total",
53 "Serialization errors",
54 ),
55 produce_latency_sum_us: reg.counter(
56 "kafka_sink_produce_latency_sum_us",
57 "Sum of produce latencies (us)",
58 ),
59 produce_latency_count: reg.counter(
60 "kafka_sink_produce_latency_count",
61 "Produce latency samples",
62 ),
63 produce_latency_max_us: reg.gauge(
64 "kafka_sink_produce_latency_max_us",
65 "Max produce delivery latency (us)",
66 ),
67 }
68 }
69
70 pub fn record_write(&self, records: u64, bytes: u64) {
72 self.records_written.inc_by(records);
73 self.bytes_written.inc_by(bytes);
74 }
75
76 pub fn record_error(&self) {
78 self.errors_total.inc();
79 }
80
81 pub fn record_commit(&self) {
83 self.epochs_committed.inc();
84 }
85
86 pub fn record_rollback(&self) {
88 self.epochs_rolled_back.inc();
89 }
90
91 pub fn record_dlq(&self) {
93 self.dlq_records.inc();
94 }
95
96 pub fn record_serialization_error(&self) {
98 self.serialization_errors.inc();
99 }
100
101 #[allow(clippy::cast_possible_wrap)]
103 pub fn record_produce_latency(&self, latency_us: u64) {
104 self.produce_latency_sum_us.inc_by(latency_us);
105 self.produce_latency_count.inc();
106 if latency_us as i64 > self.produce_latency_max_us.get() {
107 self.produce_latency_max_us.set(latency_us as i64);
108 }
109 }
110}
111
112impl Default for KafkaSinkMetrics {
113 fn default() -> Self {
114 Self::new(None)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn test_initial_zeros() {
124 let m = KafkaSinkMetrics::new(None);
125 assert_eq!(m.records_written.get(), 0);
126 assert_eq!(m.bytes_written.get(), 0);
127 assert_eq!(m.errors_total.get(), 0);
128 }
129
130 #[test]
131 fn test_record_write() {
132 let m = KafkaSinkMetrics::new(None);
133 m.record_write(100, 5000);
134 m.record_write(200, 10000);
135 assert_eq!(m.records_written.get(), 300);
136 assert_eq!(m.bytes_written.get(), 15000);
137 }
138
139 #[test]
140 fn test_produce_latency() {
141 let m = KafkaSinkMetrics::new(None);
142 m.record_produce_latency(100);
143 m.record_produce_latency(300);
144 m.record_produce_latency(50);
145
146 assert_eq!(m.produce_latency_count.get(), 3);
147 assert_eq!(m.produce_latency_sum_us.get(), 450);
148 assert_eq!(m.produce_latency_max_us.get(), 300);
149 }
150}