Skip to main content

laminar_db/
metrics_api.rs

1//! Pipeline metrics and state query methods for `LaminarDB`.
2//!
3//! Reopens `impl LaminarDB` to keep the main `db.rs` focused on dispatch.
4
5use std::sync::Arc;
6
7use crate::db::{DbState, LaminarDB};
8use crate::error::DbError;
9
10impl LaminarDB {
11    /// Time elapsed since the database was created.
12    #[must_use]
13    pub fn uptime(&self) -> std::time::Duration {
14        self.start_time.elapsed()
15    }
16
17    /// Inject prometheus engine metrics. Called once at startup before `start()`.
18    pub fn set_engine_metrics(&self, metrics: Arc<crate::engine_metrics::EngineMetrics>) {
19        *self.engine_metrics.lock() = Some(metrics);
20    }
21
22    /// Inject a shared Prometheus registry for connector-level metrics.
23    ///
24    /// Called once at startup, after the registry is constructed but before
25    /// `start()`. Connectors created after this call will register their
26    /// metrics on this registry so they appear in the scrape output.
27    pub fn set_prometheus_registry(&self, registry: Arc<prometheus::Registry>) {
28        *self.prometheus_registry.lock() = Some(registry);
29    }
30
31    /// Get the engine metrics if set.
32    #[must_use]
33    pub fn engine_metrics(&self) -> Option<Arc<crate::engine_metrics::EngineMetrics>> {
34        self.engine_metrics.lock().clone()
35    }
36
37    /// Get the current pipeline state as a string.
38    pub fn pipeline_state(&self) -> &'static str {
39        let raw = self.state.load(std::sync::atomic::Ordering::Acquire);
40        match DbState::from_u8(raw) {
41            Some(DbState::Created) => "Created",
42            Some(DbState::Starting) => "Starting",
43            Some(DbState::Running) => "Running",
44            Some(DbState::ShuttingDown) => "ShuttingDown",
45            Some(DbState::Stopped) => "Stopped",
46            None => "Unknown",
47        }
48    }
49
50    /// Get a pipeline-wide metrics snapshot.
51    ///
52    /// Reads prometheus engine metrics and catalog sizes to produce a
53    /// point-in-time view of pipeline health.
54    #[must_use]
55    #[allow(clippy::cast_sign_loss)]
56    pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
57        let guard = self.engine_metrics.lock();
58        let (ingested, emitted, dropped, cycles, batches, mv_updates, mv_bytes) =
59            if let Some(ref m) = *guard {
60                (
61                    m.events_ingested.get(),
62                    m.events_emitted.get(),
63                    m.events_dropped.get(),
64                    m.cycles.get(),
65                    m.batches.get(),
66                    m.mv_updates.get(),
67                    m.mv_bytes_stored.get() as u64,
68                )
69            } else {
70                (0, 0, 0, 0, 0, 0, 0)
71            };
72        crate::metrics::PipelineMetrics {
73            total_events_ingested: ingested,
74            total_events_emitted: emitted,
75            total_events_dropped: dropped,
76            total_cycles: cycles,
77            total_batches: batches,
78            uptime: self.start_time.elapsed(),
79            state: self.pipeline_state_enum(),
80            source_count: self.catalog.list_sources().len(),
81            stream_count: self.catalog.list_streams().len(),
82            sink_count: self.catalog.list_sinks().len(),
83            pipeline_watermark: self.pipeline_watermark(),
84            mv_updates,
85            mv_bytes_stored: mv_bytes,
86        }
87    }
88
89    /// Get metrics for a single source by name.
90    #[must_use]
91    pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
92        let entry = self.catalog.get_source(name)?;
93        let pending = entry.source.pending();
94        let capacity = entry.source.capacity();
95        Some(crate::metrics::SourceMetrics {
96            name: entry.name.clone(),
97            total_events: entry.source.sequence(),
98            pending,
99            capacity,
100            is_backpressured: crate::metrics::is_backpressured(pending, capacity),
101            watermark: entry.source.current_watermark(),
102            utilization: crate::metrics::utilization(pending, capacity),
103        })
104    }
105
106    /// Get metrics for all registered sources.
107    #[must_use]
108    pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
109        self.catalog
110            .list_sources()
111            .iter()
112            .filter_map(|name| self.source_metrics(name))
113            .collect()
114    }
115
116    /// Get metrics for a single stream by name.
117    #[must_use]
118    pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
119        let entry = self.catalog.get_stream_entry(name)?;
120        let pending = entry.source.pending();
121        let capacity = entry.source.capacity();
122        let sql = self
123            .connector_manager
124            .lock()
125            .streams()
126            .get(name)
127            .map(|reg| reg.query_sql.clone());
128        Some(crate::metrics::StreamMetrics {
129            name: entry.name.clone(),
130            total_events: entry.source.sequence(),
131            pending,
132            capacity,
133            is_backpressured: crate::metrics::is_backpressured(pending, capacity),
134            watermark: entry.source.current_watermark(),
135            sql,
136        })
137    }
138
139    /// Get metrics for all registered streams.
140    #[must_use]
141    pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
142        self.catalog
143            .list_streams()
144            .iter()
145            .filter_map(|name| self.stream_metrics(name))
146            .collect()
147    }
148
149    /// Get the total number of events processed (ingested + emitted).
150    #[must_use]
151    pub fn total_events_processed(&self) -> u64 {
152        let guard = self.engine_metrics.lock();
153        if let Some(ref m) = *guard {
154            m.events_ingested.get() + m.events_emitted.get()
155        } else {
156            0
157        }
158    }
159
160    /// Returns the global pipeline watermark (minimum across all source watermarks).
161    ///
162    /// Returns `i64::MIN` if no watermark-enabled sources exist or no events
163    /// have been processed.
164    #[must_use]
165    pub fn pipeline_watermark(&self) -> i64 {
166        self.pipeline_watermark
167            .load(std::sync::atomic::Ordering::Relaxed)
168    }
169
170    /// Convert the internal `AtomicU8` state to a `PipelineState` enum.
171    pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
172        let raw = self.state.load(std::sync::atomic::Ordering::Acquire);
173        match DbState::from_u8(raw) {
174            Some(DbState::Created) => crate::metrics::PipelineState::Created,
175            Some(DbState::Starting) => crate::metrics::PipelineState::Starting,
176            Some(DbState::Running) => crate::metrics::PipelineState::Running,
177            Some(DbState::ShuttingDown) => crate::metrics::PipelineState::ShuttingDown,
178            Some(DbState::Stopped) | None => crate::metrics::PipelineState::Stopped,
179        }
180    }
181
182    /// Cancel a running query by ID.
183    ///
184    /// Marks the query as inactive in the catalog. Future subscription
185    /// polls for this query will receive no more data.
186    ///
187    /// # Errors
188    ///
189    /// Returns `DbError` if the query is not found.
190    pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
191        if self.catalog.deactivate_query(query_id) {
192            Ok(())
193        } else {
194            Err(DbError::QueryNotFound(query_id.to_string()))
195        }
196    }
197
198    /// Get the number of registered sources.
199    pub fn source_count(&self) -> usize {
200        self.catalog.list_sources().len()
201    }
202
203    /// Get the number of registered sinks.
204    pub fn sink_count(&self) -> usize {
205        self.catalog.list_sinks().len()
206    }
207
208    /// Returns checkpoint statistics if available (non-blocking).
209    ///
210    /// Uses `try_lock()` on the coordinator mutex. Returns `None` if
211    /// the coordinator is not initialized or the lock is contended.
212    pub fn checkpoint_stats_nonblocking(
213        &self,
214    ) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
215        let guard = self.coordinator.try_lock().ok()?;
216        guard
217            .as_ref()
218            .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
219    }
220
221    /// Get the number of active queries.
222    pub fn active_query_count(&self) -> usize {
223        self.catalog
224            .list_queries()
225            .iter()
226            .filter(|(_, _, active)| *active)
227            .count()
228    }
229}