Skip to main content

laminar_db/
engine_metrics.rs

1//! Prometheus metrics for the streaming engine.
2
3use prometheus::{
4    Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
5    Registry,
6};
7
8/// Pipeline metrics registered on an explicit prometheus `Registry`.
9///
10/// Constructed once at startup, `Arc`-shared into `PipelineCallback`,
11/// `CheckpointCoordinator`, and `OperatorGraph`.
12pub struct EngineMetrics {
13    /// Events ingested from sources.
14    pub events_ingested: IntCounter,
15    /// Events emitted to streams.
16    pub events_emitted: IntCounter,
17    /// Events dropped.
18    pub events_dropped: IntCounter,
19    /// Processing cycles completed.
20    pub cycles: IntCounter,
21    /// Batches processed.
22    pub batches: IntCounter,
23    /// Queries using compiled `PhysicalExpr`.
24    pub queries_compiled: IntCounter,
25    /// Queries using cached logical plan.
26    pub queries_cached_plan: IntCounter,
27    /// Cycles skipped by backpressure.
28    pub cycles_backpressured: IntCounter,
29    /// Materialized view updates.
30    pub mv_updates: IntCounter,
31    /// Approximate MV bytes stored.
32    pub mv_bytes_stored: IntGauge,
33    /// Global pipeline watermark.
34    pub pipeline_watermark: IntGauge,
35    /// Per-source watermark (epoch-ms). Label: `source`.
36    pub source_watermark_ms: IntGaugeVec,
37    /// Per-stream watermark (epoch-ms). Label: `stream`.
38    pub stream_watermark_ms: IntGaugeVec,
39    /// Per-stream input-port buffered bytes. Label: `stream`.
40    pub input_buf_bytes: IntGaugeVec,
41    /// Per-stream rows shed by the `ShedOldest` policy. Label: `stream`.
42    pub shed_records_total: IntCounterVec,
43    /// Completed checkpoints.
44    pub checkpoints_completed: IntCounter,
45    /// Failed checkpoints.
46    pub checkpoints_failed: IntCounter,
47    /// Current checkpoint epoch.
48    pub checkpoint_epoch: IntGauge,
49    /// Last checkpoint size in bytes.
50    pub checkpoint_size_bytes: IntGauge,
51    /// Sink write errors.
52    pub sink_write_failures: IntCounter,
53    /// Sink write timeouts.
54    pub sink_write_timeouts: IntCounter,
55    /// Sink task channel closed.
56    pub sink_task_channel_closed: IntCounter,
57    /// Rows dropped because the sink's WHERE filter failed to compile to
58    /// a `PhysicalExpr` (fail-closed). Label: `sink`.
59    pub sink_filter_rejected_rows: IntCounterVec,
60    /// Per-cycle processing duration.
61    pub cycle_duration: Histogram,
62    /// Checkpoint cycle duration.
63    pub checkpoint_duration: Histogram,
64    /// Sink pre-commit round-trip (2PC phase 1).
65    pub sink_precommit_duration: Histogram,
66    /// Sink commit round-trip (2PC phase 2).
67    pub sink_commit_duration: Histogram,
68    /// Time an operator spent waiting for the slowest input to deliver
69    /// its checkpoint barrier. Label: `operator`. Zero for single-input.
70    pub barrier_alignment_wait: HistogramVec,
71    /// Delay between local watermark advancement and its cluster-wide
72    /// observation. Written by the cluster watermark bus.
73    pub watermark_propagation: Histogram,
74    /// End-to-end event latency, source ingest to sink commit.
75    /// Label: `pipeline`. The primary SLA metric.
76    pub pipeline_e2e_latency: HistogramVec,
77}
78
79impl EngineMetrics {
80    /// Register all engine metrics on the given registry. Startup only.
81    ///
82    /// # Panics
83    ///
84    /// Panics if metric registration fails (duplicate names).
85    #[must_use]
86    #[allow(clippy::too_many_lines)]
87    pub fn new(registry: &Registry) -> Self {
88        macro_rules! reg {
89            ($m:expr) => {{
90                let m = $m;
91                registry.register(Box::new(m.clone())).unwrap();
92                m
93            }};
94        }
95
96        Self {
97            events_ingested: reg!(IntCounter::new(
98                "events_ingested_total",
99                "Events ingested from sources"
100            )
101            .unwrap()),
102            events_emitted: reg!(IntCounter::new(
103                "events_emitted_total",
104                "Events emitted to streams"
105            )
106            .unwrap()),
107            events_dropped: reg!(IntCounter::new("events_dropped_total", "Events dropped").unwrap()),
108            cycles: reg!(IntCounter::new("cycles_total", "Processing cycles completed").unwrap()),
109            batches: reg!(IntCounter::new("batches_total", "Batches processed").unwrap()),
110            queries_compiled: reg!(IntCounter::new(
111                "queries_compiled_total",
112                "Queries using compiled PhysicalExpr"
113            )
114            .unwrap()),
115            queries_cached_plan: reg!(IntCounter::new(
116                "queries_cached_plan_total",
117                "Queries using cached logical plan"
118            )
119            .unwrap()),
120            cycles_backpressured: reg!(IntCounter::new(
121                "cycles_backpressured_total",
122                "Cycles skipped by backpressure"
123            )
124            .unwrap()),
125            mv_updates: reg!(
126                IntCounter::new("mv_updates_total", "Materialized view updates").unwrap()
127            ),
128            mv_bytes_stored: reg!(
129                IntGauge::new("mv_bytes_stored", "Approximate MV bytes stored").unwrap()
130            ),
131            pipeline_watermark: reg!(IntGauge::new(
132                "pipeline_watermark",
133                "Global pipeline watermark"
134            )
135            .unwrap()),
136            // Labels are catalog-bound, so cardinality is finite.
137            source_watermark_ms: reg!(IntGaugeVec::new(
138                Opts::new("source_watermark_ms", "Per-source watermark (epoch-ms)"),
139                &["source"],
140            )
141            .unwrap()),
142            stream_watermark_ms: reg!(IntGaugeVec::new(
143                Opts::new("stream_watermark_ms", "Per-stream watermark (epoch-ms)"),
144                &["stream"],
145            )
146            .unwrap()),
147            input_buf_bytes: reg!(IntGaugeVec::new(
148                Opts::new("input_buf_bytes", "Per-stream input buffer bytes"),
149                &["stream"],
150            )
151            .unwrap()),
152            shed_records_total: reg!(IntCounterVec::new(
153                Opts::new("shed_records_total", "Rows shed by ShedOldest policy"),
154                &["stream"],
155            )
156            .unwrap()),
157            checkpoints_completed: reg!(IntCounter::new(
158                "checkpoints_completed_total",
159                "Completed checkpoints"
160            )
161            .unwrap()),
162            checkpoints_failed: reg!(IntCounter::new(
163                "checkpoints_failed_total",
164                "Failed checkpoints"
165            )
166            .unwrap()),
167            checkpoint_epoch: reg!(
168                IntGauge::new("checkpoint_epoch", "Current checkpoint epoch").unwrap()
169            ),
170            checkpoint_size_bytes: reg!(IntGauge::new(
171                "checkpoint_size_bytes",
172                "Last checkpoint size"
173            )
174            .unwrap()),
175            sink_write_failures: reg!(IntCounter::new(
176                "sink_write_failures_total",
177                "Sink write errors"
178            )
179            .unwrap()),
180            sink_write_timeouts: reg!(IntCounter::new(
181                "sink_write_timeouts_total",
182                "Sink write timeouts"
183            )
184            .unwrap()),
185            sink_task_channel_closed: reg!(IntCounter::new(
186                "sink_task_channel_closed_total",
187                "Sink task channel closed"
188            )
189            .unwrap()),
190            sink_filter_rejected_rows: reg!(IntCounterVec::new(
191                Opts::new(
192                    "sink_filter_rejected_rows_total",
193                    "Rows dropped because the sink filter failed to compile",
194                ),
195                &["sink"],
196            )
197            .unwrap()),
198            cycle_duration: reg!(Histogram::with_opts(
199                HistogramOpts::new("cycle_duration_seconds", "Per-cycle processing duration")
200                    .buckets(vec![1e-7, 5e-7, 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3]),
201            )
202            .unwrap()),
203            // Checkpoint: serialization_timeout=120s, so max bucket must cover that.
204            // 0.01 * 2^14 = 163.84s.
205            checkpoint_duration: reg!(Histogram::with_opts(
206                HistogramOpts::new("checkpoint_duration_seconds", "Checkpoint cycle duration")
207                    .buckets(prometheus::exponential_buckets(0.01, 2.0, 15).unwrap()),
208            )
209            .unwrap()),
210            // pre_commit_timeout=30s. 0.005 * 2^13 = 40.96s.
211            sink_precommit_duration: reg!(Histogram::with_opts(
212                HistogramOpts::new("sink_precommit_duration_seconds", "Sink pre-commit latency")
213                    .buckets(prometheus::exponential_buckets(0.005, 2.0, 14).unwrap()),
214            )
215            .unwrap()),
216            // commit_timeout=60s. 0.005 * 2^14 = 81.92s.
217            sink_commit_duration: reg!(Histogram::with_opts(
218                HistogramOpts::new("sink_commit_duration_seconds", "Sink commit latency")
219                    .buckets(prometheus::exponential_buckets(0.005, 2.0, 15).unwrap()),
220            )
221            .unwrap()),
222            // Alignment wait: typical <1ms, worst-case tens of seconds
223            // under skew. 0.0001s * 2^16 = 6.5s base range; tail captured.
224            barrier_alignment_wait: reg!(HistogramVec::new(
225                HistogramOpts::new(
226                    "barrier_alignment_wait_seconds",
227                    "Operator wait for slowest input barrier",
228                )
229                .buckets(prometheus::exponential_buckets(0.0001, 2.0, 16).unwrap()),
230                &["operator"],
231            )
232            .unwrap()),
233            // Watermark propagation: gossip is ~100ms–1s; coordinator-
234            // mediated could push down to 20–50ms. Bucketing covers both.
235            watermark_propagation: reg!(Histogram::with_opts(
236                HistogramOpts::new(
237                    "watermark_propagation_seconds",
238                    "Delay between local watermark advancement and peer observation",
239                )
240                .buckets(prometheus::exponential_buckets(0.001, 2.0, 14).unwrap()),
241            )
242            .unwrap()),
243            // E2E latency: dominated by checkpoint cadence, so same
244            // bucket shape as checkpoint_duration.
245            pipeline_e2e_latency: reg!(HistogramVec::new(
246                HistogramOpts::new(
247                    "pipeline_e2e_latency_seconds",
248                    "End-to-end event latency, source to sink commit",
249                )
250                .buckets(prometheus::exponential_buckets(0.01, 2.0, 15).unwrap()),
251                &["pipeline"],
252            )
253            .unwrap()),
254        }
255    }
256}