1use prometheus::{
4 Gauge, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
5 Registry,
6};
7
8pub struct EngineMetrics {
13 pub events_ingested: IntCounter,
15 pub events_emitted: IntCounter,
17 pub events_dropped: IntCounter,
19 pub cycles: IntCounter,
21 pub batches: IntCounter,
23 pub queries_compiled: IntCounter,
25 pub queries_cached_plan: IntCounter,
27 pub cycles_backpressured: IntCounter,
29 pub mv_updates: IntCounter,
31 pub mv_bytes_stored: IntGauge,
33 pub pipeline_watermark: IntGauge,
35 pub source_watermark_ms: IntGaugeVec,
37 pub source_idle: IntGaugeVec,
40 pub stream_watermark_ms: IntGaugeVec,
42 pub input_buf_bytes: IntGaugeVec,
44 pub shed_records_total: IntCounterVec,
46 pub checkpoints_completed: IntCounter,
48 pub checkpoints_failed: IntCounter,
50 pub checkpoint_epoch: IntGauge,
52 pub checkpoint_size_bytes: IntGauge,
54 pub sink_write_failures: IntCounter,
56 pub sink_write_timeouts: IntCounter,
58 pub sink_task_channel_closed: IntCounter,
60 pub sink_filter_rejected_rows: IntCounterVec,
63 pub window_late_dropped: IntCounter,
66 pub events_null_timestamp: IntCounter,
68 pub temporal_filter_buffered: IntGauge,
70 pub temporal_filter_inserts: IntCounter,
72 pub temporal_filter_retracts: IntCounter,
74 pub temporal_filter_dropped: IntCounter,
76 pub cycle_duration: Histogram,
78 pub checkpoint_duration: Histogram,
80 pub checkpoint_pipeline_stall_duration: Histogram,
84 pub checkpoint_restorable_gate_wait: Histogram,
90 pub checkpoint_unchanged_vnodes: IntCounter,
93 pub sink_precommit_duration: Histogram,
95 pub sink_commit_duration: Histogram,
97 pub lookup_cache_hits: IntCounterVec,
99 pub lookup_cache_misses: IntCounterVec,
101 pub lookup_source_errors: IntCounterVec,
103 pub lookup_in_flight_rows: IntGaugeVec,
105 pub remote_subscription_batches_dropped: IntCounter,
108 pub placement_vnodes_per_domain: IntGaugeVec,
110 pub placement_blast_radius_ratio: Gauge,
112}
113
114impl EngineMetrics {
115 #[must_use]
121 #[allow(clippy::too_many_lines)]
122 pub fn new(registry: &Registry) -> Self {
123 macro_rules! reg {
124 ($m:expr) => {{
125 let m = $m;
126 registry.register(Box::new(m.clone())).unwrap();
127 m
128 }};
129 }
130
131 Self {
132 events_ingested: reg!(IntCounter::new(
133 "events_ingested_total",
134 "Events ingested from sources"
135 )
136 .unwrap()),
137 events_emitted: reg!(IntCounter::new(
138 "events_emitted_total",
139 "Events emitted to streams"
140 )
141 .unwrap()),
142 events_dropped: reg!(IntCounter::new("events_dropped_total", "Events dropped").unwrap()),
143 cycles: reg!(IntCounter::new("cycles_total", "Processing cycles completed").unwrap()),
144 batches: reg!(IntCounter::new("batches_total", "Batches processed").unwrap()),
145 queries_compiled: reg!(IntCounter::new(
146 "queries_compiled_total",
147 "Queries using compiled PhysicalExpr"
148 )
149 .unwrap()),
150 queries_cached_plan: reg!(IntCounter::new(
151 "queries_cached_plan_total",
152 "Queries using cached logical plan"
153 )
154 .unwrap()),
155 cycles_backpressured: reg!(IntCounter::new(
156 "cycles_backpressured_total",
157 "Cycles skipped by backpressure"
158 )
159 .unwrap()),
160 mv_updates: reg!(
161 IntCounter::new("mv_updates_total", "Materialized view updates").unwrap()
162 ),
163 mv_bytes_stored: reg!(
164 IntGauge::new("mv_bytes_stored", "Approximate MV bytes stored").unwrap()
165 ),
166 pipeline_watermark: reg!(IntGauge::new(
167 "pipeline_watermark",
168 "Global pipeline watermark"
169 )
170 .unwrap()),
171 source_watermark_ms: reg!(IntGaugeVec::new(
173 Opts::new("source_watermark_ms", "Per-source watermark (epoch-ms)"),
174 &["source"],
175 )
176 .unwrap()),
177 source_idle: reg!(IntGaugeVec::new(
178 Opts::new(
179 "source_idle",
180 "1 if source idle (excluded from watermark min)"
181 ),
182 &["source"],
183 )
184 .unwrap()),
185 stream_watermark_ms: reg!(IntGaugeVec::new(
186 Opts::new("stream_watermark_ms", "Per-stream watermark (epoch-ms)"),
187 &["stream"],
188 )
189 .unwrap()),
190 input_buf_bytes: reg!(IntGaugeVec::new(
191 Opts::new("input_buf_bytes", "Per-stream input buffer bytes"),
192 &["stream"],
193 )
194 .unwrap()),
195 shed_records_total: reg!(IntCounterVec::new(
196 Opts::new("shed_records_total", "Rows shed by ShedOldest policy"),
197 &["stream"],
198 )
199 .unwrap()),
200 checkpoints_completed: reg!(IntCounter::new(
201 "checkpoints_completed_total",
202 "Completed checkpoints"
203 )
204 .unwrap()),
205 checkpoints_failed: reg!(IntCounter::new(
206 "checkpoints_failed_total",
207 "Failed checkpoints"
208 )
209 .unwrap()),
210 checkpoint_epoch: reg!(
211 IntGauge::new("checkpoint_epoch", "Current checkpoint epoch").unwrap()
212 ),
213 checkpoint_size_bytes: reg!(IntGauge::new(
214 "checkpoint_size_bytes",
215 "Last checkpoint size"
216 )
217 .unwrap()),
218 sink_write_failures: reg!(IntCounter::new(
219 "sink_write_failures_total",
220 "Sink write errors"
221 )
222 .unwrap()),
223 sink_write_timeouts: reg!(IntCounter::new(
224 "sink_write_timeouts_total",
225 "Sink write timeouts"
226 )
227 .unwrap()),
228 sink_task_channel_closed: reg!(IntCounter::new(
229 "sink_task_channel_closed_total",
230 "Sink task channel closed"
231 )
232 .unwrap()),
233 sink_filter_rejected_rows: reg!(IntCounterVec::new(
234 Opts::new(
235 "sink_filter_rejected_rows_total",
236 "Rows dropped because the sink filter failed to compile",
237 ),
238 &["sink"],
239 )
240 .unwrap()),
241 window_late_dropped: reg!(IntCounter::new(
242 "window_late_dropped_total",
243 "Rows dropped by window operators past allowed_lateness"
244 )
245 .unwrap()),
246 events_null_timestamp: reg!(IntCounter::new(
247 "events_null_timestamp_total",
248 "Source rows dropped because the event-time column was null"
249 )
250 .unwrap()),
251 temporal_filter_buffered: reg!(IntGauge::new(
252 "temporal_filter_buffered",
253 "Rows buffered by retracting temporal-filter operators"
254 )
255 .unwrap()),
256 temporal_filter_inserts: reg!(IntCounter::new(
257 "temporal_filter_inserts_total",
258 "Z-set inserts emitted by temporal-filter operators"
259 )
260 .unwrap()),
261 temporal_filter_retracts: reg!(IntCounter::new(
262 "temporal_filter_retracts_total",
263 "Z-set retractions emitted by temporal-filter operators"
264 )
265 .unwrap()),
266 temporal_filter_dropped: reg!(IntCounter::new(
267 "temporal_filter_dropped_total",
268 "Rows dropped un-emitted by temporal-filter operators"
269 )
270 .unwrap()),
271 cycle_duration: reg!(Histogram::with_opts(
272 HistogramOpts::new("cycle_duration_seconds", "Per-cycle processing duration")
273 .buckets(vec![
274 1e-7, 5e-7, 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 5e-3, 1e-2, 5e-2,
275 1e-1, 5e-1, 1.0,
276 ]),
277 )
278 .unwrap()),
279 checkpoint_duration: reg!(Histogram::with_opts(
282 HistogramOpts::new("checkpoint_duration_seconds", "Checkpoint cycle duration")
283 .buckets(prometheus::exponential_buckets(0.01, 2.0, 15).unwrap()),
284 )
285 .unwrap()),
286 checkpoint_pipeline_stall_duration: reg!(Histogram::with_opts(
289 HistogramOpts::new(
290 "checkpoint_pipeline_stall_duration_seconds",
291 "Pipeline stall per checkpoint barrier (align + capture + resume gate)",
292 )
293 .buckets(prometheus::exponential_buckets(0.001, 2.0, 16).unwrap()),
294 )
295 .unwrap()),
296 checkpoint_restorable_gate_wait: reg!(Histogram::with_opts(
298 HistogramOpts::new(
299 "checkpoint_restorable_gate_wait_seconds",
300 "Restorable-gate poll wait per epoch (vnode-partial presence)",
301 )
302 .buckets(prometheus::exponential_buckets(0.001, 2.0, 15).unwrap()),
303 )
304 .unwrap()),
305 checkpoint_unchanged_vnodes: reg!(IntCounter::new(
306 "checkpoint_unchanged_vnodes_total",
307 "Vnode partials written as unchanged-base references"
308 )
309 .unwrap()),
310 sink_precommit_duration: reg!(Histogram::with_opts(
312 HistogramOpts::new("sink_precommit_duration_seconds", "Sink pre-commit latency")
313 .buckets(prometheus::exponential_buckets(0.005, 2.0, 14).unwrap()),
314 )
315 .unwrap()),
316 sink_commit_duration: reg!(Histogram::with_opts(
318 HistogramOpts::new("sink_commit_duration_seconds", "Sink commit latency")
319 .buckets(prometheus::exponential_buckets(0.005, 2.0, 15).unwrap()),
320 )
321 .unwrap()),
322 lookup_cache_hits: reg!(IntCounterVec::new(
324 Opts::new("lookup_cache_hits_total", "On-demand lookup cache hits"),
325 &["table"],
326 )
327 .unwrap()),
328 lookup_cache_misses: reg!(IntCounterVec::new(
329 Opts::new("lookup_cache_misses_total", "On-demand lookup cache misses"),
330 &["table"],
331 )
332 .unwrap()),
333 lookup_source_errors: reg!(IntCounterVec::new(
334 Opts::new(
335 "lookup_source_errors_total",
336 "On-demand lookup source fetch errors"
337 ),
338 &["table"],
339 )
340 .unwrap()),
341 lookup_in_flight_rows: reg!(IntGaugeVec::new(
342 Opts::new(
343 "lookup_in_flight_rows",
344 "On-demand lookup rows awaiting a source fetch"
345 ),
346 &["table"],
347 )
348 .unwrap()),
349 remote_subscription_batches_dropped: reg!(IntCounter::new(
350 "remote_subscription_batches_dropped_total",
351 "Output batches dropped under remote-subscriber backpressure"
352 )
353 .unwrap()),
354 placement_vnodes_per_domain: reg!(IntGaugeVec::new(
355 Opts::new(
356 "placement_vnodes_per_domain",
357 "Vnodes owned per failure domain (cluster mode)"
358 ),
359 &["domain"],
360 )
361 .unwrap()),
362 placement_blast_radius_ratio: reg!(Gauge::new(
363 "placement_blast_radius_ratio",
364 "Largest single domain's share of all vnodes (0-1); state that goes Restoring if it fails"
365 )
366 .unwrap()),
367 }
368 }
369}