laminar_connectors/kafka/
sink_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::metrics::ConnectorMetrics;
10
11#[derive(Debug)]
13pub struct KafkaSinkMetrics {
14 pub records_written: AtomicU64,
16 pub bytes_written: AtomicU64,
18 pub errors_total: AtomicU64,
20 pub epochs_committed: AtomicU64,
22 pub epochs_rolled_back: AtomicU64,
24 pub dlq_records: AtomicU64,
26 pub serialization_errors: AtomicU64,
28}
29
30impl KafkaSinkMetrics {
31 #[must_use]
33 pub fn new() -> Self {
34 Self {
35 records_written: AtomicU64::new(0),
36 bytes_written: AtomicU64::new(0),
37 errors_total: AtomicU64::new(0),
38 epochs_committed: AtomicU64::new(0),
39 epochs_rolled_back: AtomicU64::new(0),
40 dlq_records: AtomicU64::new(0),
41 serialization_errors: AtomicU64::new(0),
42 }
43 }
44
45 pub fn record_write(&self, records: u64, bytes: u64) {
47 self.records_written.fetch_add(records, Ordering::Relaxed);
48 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
49 }
50
51 pub fn record_error(&self) {
53 self.errors_total.fetch_add(1, Ordering::Relaxed);
54 }
55
56 pub fn record_commit(&self) {
58 self.epochs_committed.fetch_add(1, Ordering::Relaxed);
59 }
60
61 pub fn record_rollback(&self) {
63 self.epochs_rolled_back.fetch_add(1, Ordering::Relaxed);
64 }
65
66 pub fn record_dlq(&self) {
68 self.dlq_records.fetch_add(1, Ordering::Relaxed);
69 }
70
71 pub fn record_serialization_error(&self) {
73 self.serialization_errors.fetch_add(1, Ordering::Relaxed);
74 }
75
76 #[must_use]
78 #[allow(clippy::cast_precision_loss)]
79 pub fn to_connector_metrics(&self) -> ConnectorMetrics {
80 let mut m = ConnectorMetrics {
81 records_total: self.records_written.load(Ordering::Relaxed),
82 bytes_total: self.bytes_written.load(Ordering::Relaxed),
83 errors_total: self.errors_total.load(Ordering::Relaxed),
84 lag: 0,
85 custom: Vec::new(),
86 };
87 m.add_custom(
88 "kafka.epochs_committed",
89 self.epochs_committed.load(Ordering::Relaxed) as f64,
90 );
91 m.add_custom(
92 "kafka.epochs_rolled_back",
93 self.epochs_rolled_back.load(Ordering::Relaxed) as f64,
94 );
95 m.add_custom(
96 "kafka.dlq_records",
97 self.dlq_records.load(Ordering::Relaxed) as f64,
98 );
99 m.add_custom(
100 "kafka.serialization_errors",
101 self.serialization_errors.load(Ordering::Relaxed) as f64,
102 );
103 m
104 }
105}
106
107impl Default for KafkaSinkMetrics {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116
117 #[test]
118 fn test_initial_zeros() {
119 let m = KafkaSinkMetrics::new();
120 let cm = m.to_connector_metrics();
121 assert_eq!(cm.records_total, 0);
122 assert_eq!(cm.bytes_total, 0);
123 assert_eq!(cm.errors_total, 0);
124 }
125
126 #[test]
127 fn test_record_write() {
128 let m = KafkaSinkMetrics::new();
129 m.record_write(100, 5000);
130 m.record_write(200, 10000);
131
132 let cm = m.to_connector_metrics();
133 assert_eq!(cm.records_total, 300);
134 assert_eq!(cm.bytes_total, 15000);
135 }
136
137 #[test]
138 fn test_epoch_metrics() {
139 let m = KafkaSinkMetrics::new();
140 m.record_commit();
141 m.record_commit();
142 m.record_rollback();
143
144 let cm = m.to_connector_metrics();
145 let committed = cm
146 .custom
147 .iter()
148 .find(|(k, _)| k == "kafka.epochs_committed");
149 assert_eq!(committed.unwrap().1, 2.0);
150 let rolled_back = cm
151 .custom
152 .iter()
153 .find(|(k, _)| k == "kafka.epochs_rolled_back");
154 assert_eq!(rolled_back.unwrap().1, 1.0);
155 }
156
157 #[test]
158 fn test_dlq_and_serde_errors() {
159 let m = KafkaSinkMetrics::new();
160 m.record_dlq();
161 m.record_dlq();
162 m.record_serialization_error();
163 m.record_error();
164
165 let cm = m.to_connector_metrics();
166 assert_eq!(cm.errors_total, 1);
167 let dlq = cm.custom.iter().find(|(k, _)| k == "kafka.dlq_records");
168 assert_eq!(dlq.unwrap().1, 2.0);
169 let serde = cm
170 .custom
171 .iter()
172 .find(|(k, _)| k == "kafka.serialization_errors");
173 assert_eq!(serde.unwrap().1, 1.0);
174 }
175}