1use prometheus::{
4 Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
5 Registry,
6};
7
8pub struct EngineMetrics {
13 pub events_ingested: IntCounter,
15 pub events_emitted: IntCounter,
17 pub events_dropped: IntCounter,
19 pub cycles: IntCounter,
21 pub batches: IntCounter,
23 pub queries_compiled: IntCounter,
25 pub queries_cached_plan: IntCounter,
27 pub cycles_backpressured: IntCounter,
29 pub mv_updates: IntCounter,
31 pub mv_bytes_stored: IntGauge,
33 pub pipeline_watermark: IntGauge,
35 pub source_watermark_ms: IntGaugeVec,
37 pub stream_watermark_ms: IntGaugeVec,
39 pub input_buf_bytes: IntGaugeVec,
41 pub shed_records_total: IntCounterVec,
43 pub checkpoints_completed: IntCounter,
45 pub checkpoints_failed: IntCounter,
47 pub checkpoint_epoch: IntGauge,
49 pub checkpoint_size_bytes: IntGauge,
51 pub sink_write_failures: IntCounter,
53 pub sink_write_timeouts: IntCounter,
55 pub sink_task_channel_closed: IntCounter,
57 pub sink_filter_rejected_rows: IntCounterVec,
60 pub cycle_duration: Histogram,
62 pub checkpoint_duration: Histogram,
64 pub sink_precommit_duration: Histogram,
66 pub sink_commit_duration: Histogram,
68 pub barrier_alignment_wait: HistogramVec,
71 pub watermark_propagation: Histogram,
74 pub pipeline_e2e_latency: HistogramVec,
77}
78
79impl EngineMetrics {
80 #[must_use]
86 #[allow(clippy::too_many_lines)]
87 pub fn new(registry: &Registry) -> Self {
88 macro_rules! reg {
89 ($m:expr) => {{
90 let m = $m;
91 registry.register(Box::new(m.clone())).unwrap();
92 m
93 }};
94 }
95
96 Self {
97 events_ingested: reg!(IntCounter::new(
98 "events_ingested_total",
99 "Events ingested from sources"
100 )
101 .unwrap()),
102 events_emitted: reg!(IntCounter::new(
103 "events_emitted_total",
104 "Events emitted to streams"
105 )
106 .unwrap()),
107 events_dropped: reg!(IntCounter::new("events_dropped_total", "Events dropped").unwrap()),
108 cycles: reg!(IntCounter::new("cycles_total", "Processing cycles completed").unwrap()),
109 batches: reg!(IntCounter::new("batches_total", "Batches processed").unwrap()),
110 queries_compiled: reg!(IntCounter::new(
111 "queries_compiled_total",
112 "Queries using compiled PhysicalExpr"
113 )
114 .unwrap()),
115 queries_cached_plan: reg!(IntCounter::new(
116 "queries_cached_plan_total",
117 "Queries using cached logical plan"
118 )
119 .unwrap()),
120 cycles_backpressured: reg!(IntCounter::new(
121 "cycles_backpressured_total",
122 "Cycles skipped by backpressure"
123 )
124 .unwrap()),
125 mv_updates: reg!(
126 IntCounter::new("mv_updates_total", "Materialized view updates").unwrap()
127 ),
128 mv_bytes_stored: reg!(
129 IntGauge::new("mv_bytes_stored", "Approximate MV bytes stored").unwrap()
130 ),
131 pipeline_watermark: reg!(IntGauge::new(
132 "pipeline_watermark",
133 "Global pipeline watermark"
134 )
135 .unwrap()),
136 source_watermark_ms: reg!(IntGaugeVec::new(
138 Opts::new("source_watermark_ms", "Per-source watermark (epoch-ms)"),
139 &["source"],
140 )
141 .unwrap()),
142 stream_watermark_ms: reg!(IntGaugeVec::new(
143 Opts::new("stream_watermark_ms", "Per-stream watermark (epoch-ms)"),
144 &["stream"],
145 )
146 .unwrap()),
147 input_buf_bytes: reg!(IntGaugeVec::new(
148 Opts::new("input_buf_bytes", "Per-stream input buffer bytes"),
149 &["stream"],
150 )
151 .unwrap()),
152 shed_records_total: reg!(IntCounterVec::new(
153 Opts::new("shed_records_total", "Rows shed by ShedOldest policy"),
154 &["stream"],
155 )
156 .unwrap()),
157 checkpoints_completed: reg!(IntCounter::new(
158 "checkpoints_completed_total",
159 "Completed checkpoints"
160 )
161 .unwrap()),
162 checkpoints_failed: reg!(IntCounter::new(
163 "checkpoints_failed_total",
164 "Failed checkpoints"
165 )
166 .unwrap()),
167 checkpoint_epoch: reg!(
168 IntGauge::new("checkpoint_epoch", "Current checkpoint epoch").unwrap()
169 ),
170 checkpoint_size_bytes: reg!(IntGauge::new(
171 "checkpoint_size_bytes",
172 "Last checkpoint size"
173 )
174 .unwrap()),
175 sink_write_failures: reg!(IntCounter::new(
176 "sink_write_failures_total",
177 "Sink write errors"
178 )
179 .unwrap()),
180 sink_write_timeouts: reg!(IntCounter::new(
181 "sink_write_timeouts_total",
182 "Sink write timeouts"
183 )
184 .unwrap()),
185 sink_task_channel_closed: reg!(IntCounter::new(
186 "sink_task_channel_closed_total",
187 "Sink task channel closed"
188 )
189 .unwrap()),
190 sink_filter_rejected_rows: reg!(IntCounterVec::new(
191 Opts::new(
192 "sink_filter_rejected_rows_total",
193 "Rows dropped because the sink filter failed to compile",
194 ),
195 &["sink"],
196 )
197 .unwrap()),
198 cycle_duration: reg!(Histogram::with_opts(
199 HistogramOpts::new("cycle_duration_seconds", "Per-cycle processing duration")
200 .buckets(vec![1e-7, 5e-7, 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3]),
201 )
202 .unwrap()),
203 checkpoint_duration: reg!(Histogram::with_opts(
206 HistogramOpts::new("checkpoint_duration_seconds", "Checkpoint cycle duration")
207 .buckets(prometheus::exponential_buckets(0.01, 2.0, 15).unwrap()),
208 )
209 .unwrap()),
210 sink_precommit_duration: reg!(Histogram::with_opts(
212 HistogramOpts::new("sink_precommit_duration_seconds", "Sink pre-commit latency")
213 .buckets(prometheus::exponential_buckets(0.005, 2.0, 14).unwrap()),
214 )
215 .unwrap()),
216 sink_commit_duration: reg!(Histogram::with_opts(
218 HistogramOpts::new("sink_commit_duration_seconds", "Sink commit latency")
219 .buckets(prometheus::exponential_buckets(0.005, 2.0, 15).unwrap()),
220 )
221 .unwrap()),
222 barrier_alignment_wait: reg!(HistogramVec::new(
225 HistogramOpts::new(
226 "barrier_alignment_wait_seconds",
227 "Operator wait for slowest input barrier",
228 )
229 .buckets(prometheus::exponential_buckets(0.0001, 2.0, 16).unwrap()),
230 &["operator"],
231 )
232 .unwrap()),
233 watermark_propagation: reg!(Histogram::with_opts(
236 HistogramOpts::new(
237 "watermark_propagation_seconds",
238 "Delay between local watermark advancement and peer observation",
239 )
240 .buckets(prometheus::exponential_buckets(0.001, 2.0, 14).unwrap()),
241 )
242 .unwrap()),
243 pipeline_e2e_latency: reg!(HistogramVec::new(
246 HistogramOpts::new(
247 "pipeline_e2e_latency_seconds",
248 "End-to-end event latency, source to sink commit",
249 )
250 .buckets(prometheus::exponential_buckets(0.01, 2.0, 15).unwrap()),
251 &["pipeline"],
252 )
253 .unwrap()),
254 }
255 }
256}