Skip to main content

laminar_db/
engine_metrics.rs

1//! Prometheus metrics for the streaming engine.
2
3use prometheus::{
4    Gauge, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
5    Registry,
6};
7
8/// Pipeline metrics registered on an explicit prometheus `Registry`.
9///
10/// Constructed once at startup, `Arc`-shared into `PipelineCallback`,
11/// `CheckpointCoordinator`, and `OperatorGraph`.
12pub struct EngineMetrics {
13    /// Events ingested from sources.
14    pub events_ingested: IntCounter,
15    /// Events emitted to streams.
16    pub events_emitted: IntCounter,
17    /// Events dropped.
18    pub events_dropped: IntCounter,
19    /// Processing cycles completed.
20    pub cycles: IntCounter,
21    /// Batches processed.
22    pub batches: IntCounter,
23    /// Queries using compiled `PhysicalExpr`.
24    pub queries_compiled: IntCounter,
25    /// Queries using cached logical plan.
26    pub queries_cached_plan: IntCounter,
27    /// Cycles skipped by backpressure.
28    pub cycles_backpressured: IntCounter,
29    /// Materialized view updates.
30    pub mv_updates: IntCounter,
31    /// Approximate MV bytes stored.
32    pub mv_bytes_stored: IntGauge,
33    /// Global pipeline watermark.
34    pub pipeline_watermark: IntGauge,
35    /// Per-source watermark (epoch-ms). Label: `source`.
36    pub source_watermark_ms: IntGaugeVec,
37    /// `1` if a source is idle (excluded from the watermark min), else
38    /// `0`. Label: `source`.
39    pub source_idle: IntGaugeVec,
40    /// Per-stream watermark (epoch-ms). Label: `stream`.
41    pub stream_watermark_ms: IntGaugeVec,
42    /// Per-stream input-port buffered bytes. Label: `stream`.
43    pub input_buf_bytes: IntGaugeVec,
44    /// Per-stream rows shed by the `ShedOldest` policy. Label: `stream`.
45    pub shed_records_total: IntCounterVec,
46    /// Completed checkpoints.
47    pub checkpoints_completed: IntCounter,
48    /// Failed checkpoints.
49    pub checkpoints_failed: IntCounter,
50    /// Current checkpoint epoch.
51    pub checkpoint_epoch: IntGauge,
52    /// Last checkpoint size in bytes.
53    pub checkpoint_size_bytes: IntGauge,
54    /// Sink write errors.
55    pub sink_write_failures: IntCounter,
56    /// Sink write timeouts.
57    pub sink_write_timeouts: IntCounter,
58    /// Sink task channel closed.
59    pub sink_task_channel_closed: IntCounter,
60    /// Rows dropped because the sink's WHERE filter failed to compile to
61    /// a `PhysicalExpr` (fail-closed). Label: `sink`.
62    pub sink_filter_rejected_rows: IntCounterVec,
63    /// Rows dropped at operator level past `allowed_lateness` (distinct
64    /// from `events_dropped`, which is source-side).
65    pub window_late_dropped: IntCounter,
66    /// Source rows dropped because the event-time column was null.
67    pub events_null_timestamp: IntCounter,
68    /// Rows currently buffered by temporal-filter operators.
69    pub temporal_filter_buffered: IntGauge,
70    /// Z-set inserts (+1) emitted by temporal-filter operators.
71    pub temporal_filter_inserts: IntCounter,
72    /// Z-set retractions (-1) emitted by temporal-filter operators.
73    pub temporal_filter_retracts: IntCounter,
74    /// Late / born-expired / beyond-horizon rows dropped un-emitted.
75    pub temporal_filter_dropped: IntCounter,
76    /// Per-cycle processing duration.
77    pub cycle_duration: Histogram,
78    /// Checkpoint cycle duration.
79    pub checkpoint_duration: Histogram,
80    /// Pipeline stall per barrier: time the pipeline task is blocked by
81    /// a checkpoint (shuffle alignment + state capture + the Aligned
82    /// resume gate), excluding the background durable tail.
83    pub checkpoint_pipeline_stall_duration: Histogram,
84    /// Time the leader's restorable gate spends polling for vnode
85    /// partials (failed gates that burn the timeout are observed too).
86    /// When this dominates restorable latency at production cadence,
87    /// the push-driven upload-completion-ack follow-up is worth
88    /// building.
89    pub checkpoint_restorable_gate_wait: Histogram,
90    /// Vnode partials written as references to an unchanged base
91    /// instead of re-uploading state.
92    pub checkpoint_unchanged_vnodes: IntCounter,
93    /// Sink pre-commit round-trip (2PC phase 1).
94    pub sink_precommit_duration: Histogram,
95    /// Sink commit round-trip (2PC phase 2).
96    pub sink_commit_duration: Histogram,
97    /// On-demand lookup cache hits (served without a source fetch). Label: `table`.
98    pub lookup_cache_hits: IntCounterVec,
99    /// On-demand lookup cache misses (not in cache). Label: `table`.
100    pub lookup_cache_misses: IntCounterVec,
101    /// On-demand lookup source fetch errors/timeouts. Label: `table`.
102    pub lookup_source_errors: IntCounterVec,
103    /// On-demand lookup rows awaiting a source fetch. Label: `table`.
104    pub lookup_in_flight_rows: IntGaugeVec,
105    /// Output batches dropped when a remote subscriber's routing queue is full
106    /// (cluster mode, best-effort delivery under backpressure).
107    pub remote_subscription_batches_dropped: IntCounter,
108    /// Vnodes owned per failure domain (cluster mode). Label: `domain`.
109    pub placement_vnodes_per_domain: IntGaugeVec,
110    /// Largest single domain's share of all vnodes (`[0, 1]`) — the blast radius.
111    pub placement_blast_radius_ratio: Gauge,
112}
113
114impl EngineMetrics {
115    /// Register all engine metrics on the given registry. Startup only.
116    ///
117    /// # Panics
118    ///
119    /// Panics if metric registration fails (duplicate names).
120    #[must_use]
121    #[allow(clippy::too_many_lines)]
122    pub fn new(registry: &Registry) -> Self {
123        macro_rules! reg {
124            ($m:expr) => {{
125                let m = $m;
126                registry.register(Box::new(m.clone())).unwrap();
127                m
128            }};
129        }
130
131        Self {
132            events_ingested: reg!(IntCounter::new(
133                "events_ingested_total",
134                "Events ingested from sources"
135            )
136            .unwrap()),
137            events_emitted: reg!(IntCounter::new(
138                "events_emitted_total",
139                "Events emitted to streams"
140            )
141            .unwrap()),
142            events_dropped: reg!(IntCounter::new("events_dropped_total", "Events dropped").unwrap()),
143            cycles: reg!(IntCounter::new("cycles_total", "Processing cycles completed").unwrap()),
144            batches: reg!(IntCounter::new("batches_total", "Batches processed").unwrap()),
145            queries_compiled: reg!(IntCounter::new(
146                "queries_compiled_total",
147                "Queries using compiled PhysicalExpr"
148            )
149            .unwrap()),
150            queries_cached_plan: reg!(IntCounter::new(
151                "queries_cached_plan_total",
152                "Queries using cached logical plan"
153            )
154            .unwrap()),
155            cycles_backpressured: reg!(IntCounter::new(
156                "cycles_backpressured_total",
157                "Cycles skipped by backpressure"
158            )
159            .unwrap()),
160            mv_updates: reg!(
161                IntCounter::new("mv_updates_total", "Materialized view updates").unwrap()
162            ),
163            mv_bytes_stored: reg!(
164                IntGauge::new("mv_bytes_stored", "Approximate MV bytes stored").unwrap()
165            ),
166            pipeline_watermark: reg!(IntGauge::new(
167                "pipeline_watermark",
168                "Global pipeline watermark"
169            )
170            .unwrap()),
171            // Labels are catalog-bound, so cardinality is finite.
172            source_watermark_ms: reg!(IntGaugeVec::new(
173                Opts::new("source_watermark_ms", "Per-source watermark (epoch-ms)"),
174                &["source"],
175            )
176            .unwrap()),
177            source_idle: reg!(IntGaugeVec::new(
178                Opts::new(
179                    "source_idle",
180                    "1 if source idle (excluded from watermark min)"
181                ),
182                &["source"],
183            )
184            .unwrap()),
185            stream_watermark_ms: reg!(IntGaugeVec::new(
186                Opts::new("stream_watermark_ms", "Per-stream watermark (epoch-ms)"),
187                &["stream"],
188            )
189            .unwrap()),
190            input_buf_bytes: reg!(IntGaugeVec::new(
191                Opts::new("input_buf_bytes", "Per-stream input buffer bytes"),
192                &["stream"],
193            )
194            .unwrap()),
195            shed_records_total: reg!(IntCounterVec::new(
196                Opts::new("shed_records_total", "Rows shed by ShedOldest policy"),
197                &["stream"],
198            )
199            .unwrap()),
200            checkpoints_completed: reg!(IntCounter::new(
201                "checkpoints_completed_total",
202                "Completed checkpoints"
203            )
204            .unwrap()),
205            checkpoints_failed: reg!(IntCounter::new(
206                "checkpoints_failed_total",
207                "Failed checkpoints"
208            )
209            .unwrap()),
210            checkpoint_epoch: reg!(
211                IntGauge::new("checkpoint_epoch", "Current checkpoint epoch").unwrap()
212            ),
213            checkpoint_size_bytes: reg!(IntGauge::new(
214                "checkpoint_size_bytes",
215                "Last checkpoint size"
216            )
217            .unwrap()),
218            sink_write_failures: reg!(IntCounter::new(
219                "sink_write_failures_total",
220                "Sink write errors"
221            )
222            .unwrap()),
223            sink_write_timeouts: reg!(IntCounter::new(
224                "sink_write_timeouts_total",
225                "Sink write timeouts"
226            )
227            .unwrap()),
228            sink_task_channel_closed: reg!(IntCounter::new(
229                "sink_task_channel_closed_total",
230                "Sink task channel closed"
231            )
232            .unwrap()),
233            sink_filter_rejected_rows: reg!(IntCounterVec::new(
234                Opts::new(
235                    "sink_filter_rejected_rows_total",
236                    "Rows dropped because the sink filter failed to compile",
237                ),
238                &["sink"],
239            )
240            .unwrap()),
241            window_late_dropped: reg!(IntCounter::new(
242                "window_late_dropped_total",
243                "Rows dropped by window operators past allowed_lateness"
244            )
245            .unwrap()),
246            events_null_timestamp: reg!(IntCounter::new(
247                "events_null_timestamp_total",
248                "Source rows dropped because the event-time column was null"
249            )
250            .unwrap()),
251            temporal_filter_buffered: reg!(IntGauge::new(
252                "temporal_filter_buffered",
253                "Rows buffered by retracting temporal-filter operators"
254            )
255            .unwrap()),
256            temporal_filter_inserts: reg!(IntCounter::new(
257                "temporal_filter_inserts_total",
258                "Z-set inserts emitted by temporal-filter operators"
259            )
260            .unwrap()),
261            temporal_filter_retracts: reg!(IntCounter::new(
262                "temporal_filter_retracts_total",
263                "Z-set retractions emitted by temporal-filter operators"
264            )
265            .unwrap()),
266            temporal_filter_dropped: reg!(IntCounter::new(
267                "temporal_filter_dropped_total",
268                "Rows dropped un-emitted by temporal-filter operators"
269            )
270            .unwrap()),
271            cycle_duration: reg!(Histogram::with_opts(
272                HistogramOpts::new("cycle_duration_seconds", "Per-cycle processing duration")
273                    .buckets(vec![
274                        1e-7, 5e-7, 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 5e-3, 1e-2, 5e-2,
275                        1e-1, 5e-1, 1.0,
276                    ]),
277            )
278            .unwrap()),
279            // Checkpoint: serialization_timeout=120s, so max bucket must cover that.
280            // 0.01 * 2^14 = 163.84s.
281            checkpoint_duration: reg!(Histogram::with_opts(
282                HistogramOpts::new("checkpoint_duration_seconds", "Checkpoint cycle duration")
283                    .buckets(prometheus::exponential_buckets(0.01, 2.0, 15).unwrap()),
284            )
285            .unwrap()),
286            // Stall target is sub-second; the resume gate is bounded at
287            // 30s. 0.001 * 2^15 = 32.77s.
288            checkpoint_pipeline_stall_duration: reg!(Histogram::with_opts(
289                HistogramOpts::new(
290                    "checkpoint_pipeline_stall_duration_seconds",
291                    "Pipeline stall per checkpoint barrier (align + capture + resume gate)",
292                )
293                .buckets(prometheus::exponential_buckets(0.001, 2.0, 16).unwrap()),
294            )
295            .unwrap()),
296            // Gate timeout default 10s. 0.001 * 2^14 = 16.38s.
297            checkpoint_restorable_gate_wait: reg!(Histogram::with_opts(
298                HistogramOpts::new(
299                    "checkpoint_restorable_gate_wait_seconds",
300                    "Restorable-gate poll wait per epoch (vnode-partial presence)",
301                )
302                .buckets(prometheus::exponential_buckets(0.001, 2.0, 15).unwrap()),
303            )
304            .unwrap()),
305            checkpoint_unchanged_vnodes: reg!(IntCounter::new(
306                "checkpoint_unchanged_vnodes_total",
307                "Vnode partials written as unchanged-base references"
308            )
309            .unwrap()),
310            // pre_commit_timeout=30s. 0.005 * 2^13 = 40.96s.
311            sink_precommit_duration: reg!(Histogram::with_opts(
312                HistogramOpts::new("sink_precommit_duration_seconds", "Sink pre-commit latency")
313                    .buckets(prometheus::exponential_buckets(0.005, 2.0, 14).unwrap()),
314            )
315            .unwrap()),
316            // commit_timeout=60s. 0.005 * 2^14 = 81.92s.
317            sink_commit_duration: reg!(Histogram::with_opts(
318                HistogramOpts::new("sink_commit_duration_seconds", "Sink commit latency")
319                    .buckets(prometheus::exponential_buckets(0.005, 2.0, 15).unwrap()),
320            )
321            .unwrap()),
322            // Labels are bound to the registered lookup tables, so cardinality is finite.
323            lookup_cache_hits: reg!(IntCounterVec::new(
324                Opts::new("lookup_cache_hits_total", "On-demand lookup cache hits"),
325                &["table"],
326            )
327            .unwrap()),
328            lookup_cache_misses: reg!(IntCounterVec::new(
329                Opts::new("lookup_cache_misses_total", "On-demand lookup cache misses"),
330                &["table"],
331            )
332            .unwrap()),
333            lookup_source_errors: reg!(IntCounterVec::new(
334                Opts::new(
335                    "lookup_source_errors_total",
336                    "On-demand lookup source fetch errors"
337                ),
338                &["table"],
339            )
340            .unwrap()),
341            lookup_in_flight_rows: reg!(IntGaugeVec::new(
342                Opts::new(
343                    "lookup_in_flight_rows",
344                    "On-demand lookup rows awaiting a source fetch"
345                ),
346                &["table"],
347            )
348            .unwrap()),
349            remote_subscription_batches_dropped: reg!(IntCounter::new(
350                "remote_subscription_batches_dropped_total",
351                "Output batches dropped under remote-subscriber backpressure"
352            )
353            .unwrap()),
354            placement_vnodes_per_domain: reg!(IntGaugeVec::new(
355                Opts::new(
356                    "placement_vnodes_per_domain",
357                    "Vnodes owned per failure domain (cluster mode)"
358                ),
359                &["domain"],
360            )
361            .unwrap()),
362            placement_blast_radius_ratio: reg!(Gauge::new(
363                "placement_blast_radius_ratio",
364                "Largest single domain's share of all vnodes (0-1); state that goes Restoring if it fails"
365            )
366            .unwrap()),
367        }
368    }
369}