laminar_db/
metrics_api.rs1use std::sync::Arc;
6
7use crate::db::{DbState, LaminarDB};
8use crate::error::DbError;
9
10impl LaminarDB {
11 #[must_use]
13 pub fn uptime(&self) -> std::time::Duration {
14 self.start_time.elapsed()
15 }
16
17 pub fn set_engine_metrics(&self, metrics: Arc<crate::engine_metrics::EngineMetrics>) {
19 *self.engine_metrics.lock() = Some(metrics);
20 }
21
22 pub fn set_prometheus_registry(&self, registry: Arc<prometheus::Registry>) {
28 *self.prometheus_registry.lock() = Some(registry);
29 }
30
31 #[must_use]
33 pub fn engine_metrics(&self) -> Option<Arc<crate::engine_metrics::EngineMetrics>> {
34 self.engine_metrics.lock().clone()
35 }
36
37 pub fn pipeline_state(&self) -> &'static str {
39 let raw = self.state.load(std::sync::atomic::Ordering::Acquire);
40 match DbState::from_u8(raw) {
41 Some(DbState::Created) => "Created",
42 Some(DbState::Starting) => "Starting",
43 Some(DbState::Running) => "Running",
44 Some(DbState::ShuttingDown) => "ShuttingDown",
45 Some(DbState::Stopped) => "Stopped",
46 None => "Unknown",
47 }
48 }
49
50 #[must_use]
55 #[allow(clippy::cast_sign_loss)]
56 pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
57 let guard = self.engine_metrics.lock();
58 let (ingested, emitted, dropped, cycles, batches, mv_updates, mv_bytes) =
59 if let Some(ref m) = *guard {
60 (
61 m.events_ingested.get(),
62 m.events_emitted.get(),
63 m.events_dropped.get(),
64 m.cycles.get(),
65 m.batches.get(),
66 m.mv_updates.get(),
67 m.mv_bytes_stored.get() as u64,
68 )
69 } else {
70 (0, 0, 0, 0, 0, 0, 0)
71 };
72 crate::metrics::PipelineMetrics {
73 total_events_ingested: ingested,
74 total_events_emitted: emitted,
75 total_events_dropped: dropped,
76 total_cycles: cycles,
77 total_batches: batches,
78 uptime: self.start_time.elapsed(),
79 state: self.pipeline_state_enum(),
80 source_count: self.catalog.list_sources().len(),
81 stream_count: self.catalog.list_streams().len(),
82 sink_count: self.catalog.list_sinks().len(),
83 pipeline_watermark: self.pipeline_watermark(),
84 mv_updates,
85 mv_bytes_stored: mv_bytes,
86 }
87 }
88
89 #[must_use]
91 pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
92 let entry = self.catalog.get_source(name)?;
93 let pending = entry.source.pending();
94 let capacity = entry.source.capacity();
95 Some(crate::metrics::SourceMetrics {
96 name: entry.name.clone(),
97 total_events: entry.source.sequence(),
98 pending,
99 capacity,
100 is_backpressured: crate::metrics::is_backpressured(pending, capacity),
101 watermark: entry.source.current_watermark(),
102 utilization: crate::metrics::utilization(pending, capacity),
103 })
104 }
105
106 #[must_use]
108 pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
109 self.catalog
110 .list_sources()
111 .iter()
112 .filter_map(|name| self.source_metrics(name))
113 .collect()
114 }
115
116 #[must_use]
118 pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
119 let entry = self.catalog.get_stream_entry(name)?;
120 let pending = entry.source.pending();
121 let capacity = entry.source.capacity();
122 let sql = self
123 .connector_manager
124 .lock()
125 .streams()
126 .get(name)
127 .map(|reg| reg.query_sql.clone());
128 Some(crate::metrics::StreamMetrics {
129 name: entry.name.clone(),
130 total_events: entry.source.sequence(),
131 pending,
132 capacity,
133 is_backpressured: crate::metrics::is_backpressured(pending, capacity),
134 watermark: entry.source.current_watermark(),
135 sql,
136 })
137 }
138
139 #[must_use]
141 pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
142 self.catalog
143 .list_streams()
144 .iter()
145 .filter_map(|name| self.stream_metrics(name))
146 .collect()
147 }
148
149 #[must_use]
151 pub fn total_events_processed(&self) -> u64 {
152 let guard = self.engine_metrics.lock();
153 if let Some(ref m) = *guard {
154 m.events_ingested.get() + m.events_emitted.get()
155 } else {
156 0
157 }
158 }
159
160 #[must_use]
165 pub fn pipeline_watermark(&self) -> i64 {
166 self.pipeline_watermark
167 .load(std::sync::atomic::Ordering::Relaxed)
168 }
169
170 pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
172 let raw = self.state.load(std::sync::atomic::Ordering::Acquire);
173 match DbState::from_u8(raw) {
174 Some(DbState::Created) => crate::metrics::PipelineState::Created,
175 Some(DbState::Starting) => crate::metrics::PipelineState::Starting,
176 Some(DbState::Running) => crate::metrics::PipelineState::Running,
177 Some(DbState::ShuttingDown) => crate::metrics::PipelineState::ShuttingDown,
178 Some(DbState::Stopped) | None => crate::metrics::PipelineState::Stopped,
179 }
180 }
181
182 pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
191 if self.catalog.deactivate_query(query_id) {
192 Ok(())
193 } else {
194 Err(DbError::QueryNotFound(query_id.to_string()))
195 }
196 }
197
198 pub fn source_count(&self) -> usize {
200 self.catalog.list_sources().len()
201 }
202
203 pub fn sink_count(&self) -> usize {
205 self.catalog.list_sinks().len()
206 }
207
208 pub fn checkpoint_stats_nonblocking(
213 &self,
214 ) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
215 let guard = self.coordinator.try_lock().ok()?;
216 guard
217 .as_ref()
218 .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
219 }
220
221 pub fn active_query_count(&self) -> usize {
223 self.catalog
224 .list_queries()
225 .iter()
226 .filter(|(_, _, active)| *active)
227 .count()
228 }
229}