Skip to main content

laminar_db/
metrics_api.rs

1//! Pipeline metrics and state query methods for `LaminarDB`.
2//!
3//! Reopens `impl LaminarDB` to keep the main `db.rs` focused on dispatch.
4
5use std::sync::Arc;
6
7use crate::db::{
8    LaminarDB, STATE_CREATED, STATE_RUNNING, STATE_SHUTTING_DOWN, STATE_STARTING, STATE_STOPPED,
9};
10use crate::error::DbError;
11
12impl LaminarDB {
13    /// Get the current pipeline state as a string.
14    pub fn pipeline_state(&self) -> &'static str {
15        match self.state.load(std::sync::atomic::Ordering::Acquire) {
16            STATE_CREATED => "Created",
17            STATE_STARTING => "Starting",
18            STATE_RUNNING => "Running",
19            STATE_SHUTTING_DOWN => "ShuttingDown",
20            STATE_STOPPED => "Stopped",
21            _ => "Unknown",
22        }
23    }
24
25    /// Get a pipeline-wide metrics snapshot.
26    ///
27    /// Reads shared atomic counters and catalog sizes to produce a
28    /// point-in-time view of pipeline health.
29    #[must_use]
30    pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
31        let snap = self.counters.snapshot();
32        crate::metrics::PipelineMetrics {
33            total_events_ingested: snap.events_ingested,
34            total_events_emitted: snap.events_emitted,
35            total_events_dropped: snap.events_dropped,
36            total_cycles: snap.cycles,
37            total_batches: snap.total_batches,
38            uptime: self.start_time.elapsed(),
39            state: self.pipeline_state_enum(),
40            last_cycle_duration_ns: snap.last_cycle_duration_ns,
41            source_count: self.catalog.list_sources().len(),
42            stream_count: self.catalog.list_streams().len(),
43            sink_count: self.catalog.list_sinks().len(),
44            pipeline_watermark: self.pipeline_watermark(),
45        }
46    }
47
48    /// Get metrics for a single source by name.
49    #[must_use]
50    pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
51        let entry = self.catalog.get_source(name)?;
52        let pending = entry.source.pending();
53        let capacity = entry.source.capacity();
54        Some(crate::metrics::SourceMetrics {
55            name: entry.name.clone(),
56            total_events: entry.source.sequence(),
57            pending,
58            capacity,
59            is_backpressured: crate::metrics::is_backpressured(pending, capacity),
60            watermark: entry.source.current_watermark(),
61            utilization: crate::metrics::utilization(pending, capacity),
62        })
63    }
64
65    /// Get metrics for all registered sources.
66    #[must_use]
67    pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
68        self.catalog
69            .list_sources()
70            .iter()
71            .filter_map(|name| self.source_metrics(name))
72            .collect()
73    }
74
75    /// Get metrics for a single stream by name.
76    #[must_use]
77    pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
78        let entry = self.catalog.get_stream_entry(name)?;
79        let pending = entry.source.pending();
80        let capacity = entry.source.capacity();
81        let sql = self
82            .connector_manager
83            .lock()
84            .streams()
85            .get(name)
86            .map(|reg| reg.query_sql.clone());
87        Some(crate::metrics::StreamMetrics {
88            name: entry.name.clone(),
89            total_events: entry.source.sequence(),
90            pending,
91            capacity,
92            is_backpressured: crate::metrics::is_backpressured(pending, capacity),
93            watermark: entry.source.current_watermark(),
94            sql,
95        })
96    }
97
98    /// Get metrics for all registered streams.
99    #[must_use]
100    pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
101        self.catalog
102            .list_streams()
103            .iter()
104            .filter_map(|name| self.stream_metrics(name))
105            .collect()
106    }
107
108    /// Get the total number of events processed (ingested + emitted).
109    #[must_use]
110    pub fn total_events_processed(&self) -> u64 {
111        let snap = self.counters.snapshot();
112        snap.events_ingested + snap.events_emitted
113    }
114
115    /// Get a reference to the shared pipeline counters.
116    ///
117    /// Useful for external code that needs to read counters directly
118    /// (e.g. a TUI dashboard polling at high frequency).
119    #[must_use]
120    pub fn counters(&self) -> &Arc<crate::metrics::PipelineCounters> {
121        &self.counters
122    }
123
124    /// Returns the global pipeline watermark (minimum across all source watermarks).
125    ///
126    /// Returns `i64::MIN` if no watermark-enabled sources exist or no events
127    /// have been processed.
128    #[must_use]
129    pub fn pipeline_watermark(&self) -> i64 {
130        self.pipeline_watermark
131            .load(std::sync::atomic::Ordering::Relaxed)
132    }
133
134    /// Convert the internal `AtomicU8` state to a `PipelineState` enum.
135    pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
136        match self.state.load(std::sync::atomic::Ordering::Acquire) {
137            STATE_CREATED => crate::metrics::PipelineState::Created,
138            STATE_STARTING => crate::metrics::PipelineState::Starting,
139            STATE_RUNNING => crate::metrics::PipelineState::Running,
140            STATE_SHUTTING_DOWN => crate::metrics::PipelineState::ShuttingDown,
141            _ => crate::metrics::PipelineState::Stopped,
142        }
143    }
144
145    /// Cancel a running query by ID.
146    ///
147    /// Marks the query as inactive in the catalog. Future subscription
148    /// polls for this query will receive no more data.
149    ///
150    /// # Errors
151    ///
152    /// Returns `DbError` if the query is not found.
153    pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
154        if self.catalog.deactivate_query(query_id) {
155            Ok(())
156        } else {
157            Err(DbError::QueryNotFound(query_id.to_string()))
158        }
159    }
160
161    /// Get the number of registered sources.
162    pub fn source_count(&self) -> usize {
163        self.catalog.list_sources().len()
164    }
165
166    /// Get the number of registered sinks.
167    pub fn sink_count(&self) -> usize {
168        self.catalog.list_sinks().len()
169    }
170
171    /// Get the number of active queries.
172    pub fn active_query_count(&self) -> usize {
173        self.catalog
174            .list_queries()
175            .iter()
176            .filter(|(_, _, active)| *active)
177            .count()
178    }
179}