Skip to main content

laminar_db/
metrics_api.rs

1//! Pipeline metrics and state query methods for `LaminarDB`.
2//!
3//! Reopens `impl LaminarDB` to keep the main `db.rs` focused on dispatch.
4
5use std::sync::Arc;
6
7use crate::db::{DbState, LaminarDB};
8use crate::error::DbError;
9
10impl LaminarDB {
11    /// Time elapsed since the database was created.
12    #[must_use]
13    pub fn uptime(&self) -> std::time::Duration {
14        self.start_time.elapsed()
15    }
16
17    /// Inject prometheus engine metrics. Called once at startup before `start()`.
18    pub fn set_engine_metrics(&self, metrics: Arc<crate::engine_metrics::EngineMetrics>) {
19        *self.engine_metrics.lock() = Some(metrics);
20    }
21
22    /// Inject a shared Prometheus registry for connector-level metrics.
23    ///
24    /// Called once at startup, after the registry is constructed but before
25    /// `start()`. Connectors created after this call will register their
26    /// metrics on this registry so they appear in the scrape output.
27    pub fn set_prometheus_registry(&self, registry: Arc<prometheus::Registry>) {
28        *self.prometheus_registry.lock() = Some(registry);
29    }
30
31    /// Get the engine metrics if set.
32    #[must_use]
33    pub fn engine_metrics(&self) -> Option<Arc<crate::engine_metrics::EngineMetrics>> {
34        self.engine_metrics.lock().clone()
35    }
36
37    /// Get the current pipeline state as a string.
38    pub fn pipeline_state(&self) -> &'static str {
39        match DbState::load(&self.state) {
40            DbState::Created => "Created",
41            DbState::Starting => "Starting",
42            DbState::Running => "Running",
43            DbState::ShuttingDown => "ShuttingDown",
44            DbState::Stopped => "Stopped",
45        }
46    }
47
48    /// Get a pipeline-wide metrics snapshot.
49    ///
50    /// Reads prometheus engine metrics and catalog sizes to produce a
51    /// point-in-time view of pipeline health.
52    #[must_use]
53    #[allow(clippy::cast_sign_loss)]
54    pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
55        let guard = self.engine_metrics.lock();
56        let (ingested, emitted, dropped, cycles, batches, mv_updates, mv_bytes) =
57            if let Some(ref m) = *guard {
58                (
59                    m.events_ingested.get(),
60                    m.events_emitted.get(),
61                    m.events_dropped.get(),
62                    m.cycles.get(),
63                    m.batches.get(),
64                    m.mv_updates.get(),
65                    m.mv_bytes_stored.get() as u64,
66                )
67            } else {
68                (0, 0, 0, 0, 0, 0, 0)
69            };
70        crate::metrics::PipelineMetrics {
71            total_events_ingested: ingested,
72            total_events_emitted: emitted,
73            total_events_dropped: dropped,
74            total_cycles: cycles,
75            total_batches: batches,
76            uptime: self.start_time.elapsed(),
77            state: self.pipeline_state_enum(),
78            source_count: self.catalog.list_sources().len(),
79            stream_count: self.catalog.list_streams().len(),
80            sink_count: self.catalog.list_sinks().len(),
81            pipeline_watermark: self.pipeline_watermark(),
82            mv_updates,
83            mv_bytes_stored: mv_bytes,
84        }
85    }
86
87    /// Get metrics for a single source by name.
88    #[must_use]
89    pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
90        let entry = self.catalog.get_source(name)?;
91        let pending = entry.source.pending();
92        let capacity = entry.source.capacity();
93        Some(crate::metrics::SourceMetrics {
94            name: entry.name.clone(),
95            total_events: entry.source.sequence(),
96            pending,
97            capacity,
98            is_backpressured: crate::metrics::is_backpressured(pending, capacity),
99            watermark: entry.source.current_watermark(),
100            utilization: crate::metrics::utilization(pending, capacity),
101        })
102    }
103
104    /// Get metrics for all registered sources.
105    #[must_use]
106    pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
107        self.catalog
108            .list_sources()
109            .iter()
110            .filter_map(|name| self.source_metrics(name))
111            .collect()
112    }
113
114    /// Get metrics for a single stream by name.
115    #[must_use]
116    pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
117        let entry = self.catalog.get_stream_entry(name)?;
118        let pending = entry.source.pending();
119        let capacity = entry.source.capacity();
120        let sql = self
121            .connector_manager
122            .lock()
123            .streams()
124            .get(name)
125            .map(|reg| reg.query_sql.clone());
126        Some(crate::metrics::StreamMetrics {
127            name: entry.name.clone(),
128            total_events: entry.source.sequence(),
129            pending,
130            capacity,
131            is_backpressured: crate::metrics::is_backpressured(pending, capacity),
132            watermark: entry.source.current_watermark(),
133            sql,
134        })
135    }
136
137    /// Get metrics for all registered streams.
138    #[must_use]
139    pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
140        self.catalog
141            .list_streams()
142            .iter()
143            .filter_map(|name| self.stream_metrics(name))
144            .collect()
145    }
146
147    /// Get the total number of events processed (ingested + emitted).
148    #[must_use]
149    pub fn total_events_processed(&self) -> u64 {
150        let guard = self.engine_metrics.lock();
151        if let Some(ref m) = *guard {
152            m.events_ingested.get() + m.events_emitted.get()
153        } else {
154            0
155        }
156    }
157
158    /// Returns the global pipeline watermark (minimum across all source watermarks).
159    ///
160    /// Returns `i64::MIN` if no watermark-enabled sources exist or no events
161    /// have been processed.
162    #[must_use]
163    pub fn pipeline_watermark(&self) -> i64 {
164        self.pipeline_watermark
165            .load(std::sync::atomic::Ordering::Relaxed)
166    }
167
168    pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
169        match DbState::load(&self.state) {
170            DbState::Created => crate::metrics::PipelineState::Created,
171            DbState::Starting => crate::metrics::PipelineState::Starting,
172            DbState::Running => crate::metrics::PipelineState::Running,
173            DbState::ShuttingDown => crate::metrics::PipelineState::ShuttingDown,
174            DbState::Stopped => crate::metrics::PipelineState::Stopped,
175        }
176    }
177
178    /// Cancel a running query by ID.
179    ///
180    /// Marks the query as inactive in the catalog. Future subscription
181    /// polls for this query will receive no more data.
182    ///
183    /// # Errors
184    ///
185    /// Returns `DbError` if the query is not found.
186    pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
187        if self.catalog.deactivate_query(query_id) {
188            Ok(())
189        } else {
190            Err(DbError::QueryNotFound(query_id.to_string()))
191        }
192    }
193
194    /// Get the number of registered sources.
195    pub fn source_count(&self) -> usize {
196        self.catalog.list_sources().len()
197    }
198
199    /// Get the number of registered sinks.
200    pub fn sink_count(&self) -> usize {
201        self.catalog.list_sinks().len()
202    }
203
204    /// Returns checkpoint statistics if available (non-blocking).
205    ///
206    /// Uses `try_lock()` on the coordinator mutex. Returns `None` if
207    /// the coordinator is not initialized or the lock is contended.
208    pub fn checkpoint_stats_nonblocking(
209        &self,
210    ) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
211        let guard = self.coordinator.try_lock().ok()?;
212        guard
213            .as_ref()
214            .map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
215    }
216
217    /// Get the number of active queries.
218    pub fn active_query_count(&self) -> usize {
219        self.catalog
220            .list_queries()
221            .iter()
222            .filter(|(_, _, active)| *active)
223            .count()
224    }
225}