laminar_db/
metrics_api.rs1use std::sync::Arc;
6
7use crate::db::{DbState, LaminarDB};
8use crate::error::DbError;
9
10impl LaminarDB {
11 #[must_use]
13 pub fn uptime(&self) -> std::time::Duration {
14 self.start_time.elapsed()
15 }
16
17 pub fn set_engine_metrics(&self, metrics: Arc<crate::engine_metrics::EngineMetrics>) {
19 *self.engine_metrics.lock() = Some(metrics);
20 }
21
22 pub fn set_prometheus_registry(&self, registry: Arc<prometheus::Registry>) {
28 *self.prometheus_registry.lock() = Some(registry);
29 }
30
31 #[must_use]
33 pub fn engine_metrics(&self) -> Option<Arc<crate::engine_metrics::EngineMetrics>> {
34 self.engine_metrics.lock().clone()
35 }
36
37 pub fn pipeline_state(&self) -> &'static str {
39 match DbState::load(&self.state) {
40 DbState::Created => "Created",
41 DbState::Starting => "Starting",
42 DbState::Running => "Running",
43 DbState::ShuttingDown => "ShuttingDown",
44 DbState::Stopped => "Stopped",
45 }
46 }
47
48 #[must_use]
53 #[allow(clippy::cast_sign_loss)]
54 pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
55 let guard = self.engine_metrics.lock();
56 let (ingested, emitted, dropped, cycles, batches, mv_updates, mv_bytes) =
57 if let Some(ref m) = *guard {
58 (
59 m.events_ingested.get(),
60 m.events_emitted.get(),
61 m.events_dropped.get(),
62 m.cycles.get(),
63 m.batches.get(),
64 m.mv_updates.get(),
65 m.mv_bytes_stored.get() as u64,
66 )
67 } else {
68 (0, 0, 0, 0, 0, 0, 0)
69 };
70 crate::metrics::PipelineMetrics {
71 total_events_ingested: ingested,
72 total_events_emitted: emitted,
73 total_events_dropped: dropped,
74 total_cycles: cycles,
75 total_batches: batches,
76 uptime: self.start_time.elapsed(),
77 state: self.pipeline_state_enum(),
78 source_count: self.catalog.list_sources().len(),
79 stream_count: self.catalog.list_streams().len(),
80 sink_count: self.catalog.list_sinks().len(),
81 pipeline_watermark: self.pipeline_watermark(),
82 mv_updates,
83 mv_bytes_stored: mv_bytes,
84 }
85 }
86
87 #[must_use]
89 pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
90 let entry = self.catalog.get_source(name)?;
91 let pending = entry.source.pending();
92 let capacity = entry.source.capacity();
93 Some(crate::metrics::SourceMetrics {
94 name: entry.name.clone(),
95 total_events: entry.source.sequence(),
96 pending,
97 capacity,
98 is_backpressured: crate::metrics::is_backpressured(pending, capacity),
99 watermark: entry.source.current_watermark(),
100 utilization: crate::metrics::utilization(pending, capacity),
101 })
102 }
103
104 #[must_use]
106 pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
107 self.catalog
108 .list_sources()
109 .iter()
110 .filter_map(|name| self.source_metrics(name))
111 .collect()
112 }
113
114 #[must_use]
116 pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
117 let entry = self.catalog.get_stream_entry(name)?;
118 let pending = entry.source.pending();
119 let capacity = entry.source.capacity();
120 let sql = self
121 .connector_manager
122 .lock()
123 .streams()
124 .get(name)
125 .map(|reg| reg.query_sql.clone());
126 Some(crate::metrics::StreamMetrics {
127 name: entry.name.clone(),
128 total_events: entry.source.sequence(),
129 pending,
130 capacity,
131 is_backpressured: crate::metrics::is_backpressured(pending, capacity),
132 watermark: entry.source.current_watermark(),
133 sql,
134 })
135 }
136
137 #[must_use]
139 pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
140 self.catalog
141 .list_streams()
142 .iter()
143 .filter_map(|name| self.stream_metrics(name))
144 .collect()
145 }
146
147 #[must_use]
149 pub fn total_events_processed(&self) -> u64 {
150 let guard = self.engine_metrics.lock();
151 if let Some(ref m) = *guard {
152 m.events_ingested.get() + m.events_emitted.get()
153 } else {
154 0
155 }
156 }
157
158 #[must_use]
163 pub fn pipeline_watermark(&self) -> i64 {
164 self.pipeline_watermark
165 .load(std::sync::atomic::Ordering::Relaxed)
166 }
167
168 pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
169 match DbState::load(&self.state) {
170 DbState::Created => crate::metrics::PipelineState::Created,
171 DbState::Starting => crate::metrics::PipelineState::Starting,
172 DbState::Running => crate::metrics::PipelineState::Running,
173 DbState::ShuttingDown => crate::metrics::PipelineState::ShuttingDown,
174 DbState::Stopped => crate::metrics::PipelineState::Stopped,
175 }
176 }
177
178 pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
187 if self.catalog.deactivate_query(query_id) {
188 Ok(())
189 } else {
190 Err(DbError::QueryNotFound(query_id.to_string()))
191 }
192 }
193
194 pub fn source_count(&self) -> usize {
196 self.catalog.list_sources().len()
197 }
198
199 pub fn sink_count(&self) -> usize {
201 self.catalog.list_sinks().len()
202 }
203
204 pub fn checkpoint_stats_nonblocking(
209 &self,
210 ) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
211 let guard = self.coordinator.try_lock().ok()?;
212 guard
213 .as_ref()
214 .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
215 }
216
217 pub fn active_query_count(&self) -> usize {
219 self.catalog
220 .list_queries()
221 .iter()
222 .filter(|(_, _, active)| *active)
223 .count()
224 }
225}