laminar_db/
metrics_api.rs1use std::sync::Arc;
6
7use crate::db::{
8 LaminarDB, STATE_CREATED, STATE_RUNNING, STATE_SHUTTING_DOWN, STATE_STARTING, STATE_STOPPED,
9};
10use crate::error::DbError;
11
12impl LaminarDB {
13 pub fn pipeline_state(&self) -> &'static str {
15 match self.state.load(std::sync::atomic::Ordering::Acquire) {
16 STATE_CREATED => "Created",
17 STATE_STARTING => "Starting",
18 STATE_RUNNING => "Running",
19 STATE_SHUTTING_DOWN => "ShuttingDown",
20 STATE_STOPPED => "Stopped",
21 _ => "Unknown",
22 }
23 }
24
25 #[must_use]
30 pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
31 let snap = self.counters.snapshot();
32 crate::metrics::PipelineMetrics {
33 total_events_ingested: snap.events_ingested,
34 total_events_emitted: snap.events_emitted,
35 total_events_dropped: snap.events_dropped,
36 total_cycles: snap.cycles,
37 total_batches: snap.total_batches,
38 uptime: self.start_time.elapsed(),
39 state: self.pipeline_state_enum(),
40 last_cycle_duration_ns: snap.last_cycle_duration_ns,
41 source_count: self.catalog.list_sources().len(),
42 stream_count: self.catalog.list_streams().len(),
43 sink_count: self.catalog.list_sinks().len(),
44 pipeline_watermark: self.pipeline_watermark(),
45 }
46 }
47
48 #[must_use]
50 pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
51 let entry = self.catalog.get_source(name)?;
52 let pending = entry.source.pending();
53 let capacity = entry.source.capacity();
54 Some(crate::metrics::SourceMetrics {
55 name: entry.name.clone(),
56 total_events: entry.source.sequence(),
57 pending,
58 capacity,
59 is_backpressured: crate::metrics::is_backpressured(pending, capacity),
60 watermark: entry.source.current_watermark(),
61 utilization: crate::metrics::utilization(pending, capacity),
62 })
63 }
64
65 #[must_use]
67 pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
68 self.catalog
69 .list_sources()
70 .iter()
71 .filter_map(|name| self.source_metrics(name))
72 .collect()
73 }
74
75 #[must_use]
77 pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
78 let entry = self.catalog.get_stream_entry(name)?;
79 let pending = entry.source.pending();
80 let capacity = entry.source.capacity();
81 let sql = self
82 .connector_manager
83 .lock()
84 .streams()
85 .get(name)
86 .map(|reg| reg.query_sql.clone());
87 Some(crate::metrics::StreamMetrics {
88 name: entry.name.clone(),
89 total_events: entry.source.sequence(),
90 pending,
91 capacity,
92 is_backpressured: crate::metrics::is_backpressured(pending, capacity),
93 watermark: entry.source.current_watermark(),
94 sql,
95 })
96 }
97
98 #[must_use]
100 pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
101 self.catalog
102 .list_streams()
103 .iter()
104 .filter_map(|name| self.stream_metrics(name))
105 .collect()
106 }
107
108 #[must_use]
110 pub fn total_events_processed(&self) -> u64 {
111 let snap = self.counters.snapshot();
112 snap.events_ingested + snap.events_emitted
113 }
114
115 #[must_use]
120 pub fn counters(&self) -> &Arc<crate::metrics::PipelineCounters> {
121 &self.counters
122 }
123
124 #[must_use]
129 pub fn pipeline_watermark(&self) -> i64 {
130 self.pipeline_watermark
131 .load(std::sync::atomic::Ordering::Relaxed)
132 }
133
134 pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
136 match self.state.load(std::sync::atomic::Ordering::Acquire) {
137 STATE_CREATED => crate::metrics::PipelineState::Created,
138 STATE_STARTING => crate::metrics::PipelineState::Starting,
139 STATE_RUNNING => crate::metrics::PipelineState::Running,
140 STATE_SHUTTING_DOWN => crate::metrics::PipelineState::ShuttingDown,
141 _ => crate::metrics::PipelineState::Stopped,
142 }
143 }
144
145 pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
154 if self.catalog.deactivate_query(query_id) {
155 Ok(())
156 } else {
157 Err(DbError::QueryNotFound(query_id.to_string()))
158 }
159 }
160
161 pub fn source_count(&self) -> usize {
163 self.catalog.list_sources().len()
164 }
165
166 pub fn sink_count(&self) -> usize {
168 self.catalog.list_sinks().len()
169 }
170
171 pub fn active_query_count(&self) -> usize {
173 self.catalog
174 .list_queries()
175 .iter()
176 .filter(|(_, _, active)| *active)
177 .count()
178 }
179}