Skip to main content

laminar_db/
metrics.rs

1//! Pipeline observability metrics.
2
3use std::time::Duration;
4
5/// The state of a streaming pipeline.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum PipelineState {
8    /// Created but not started.
9    Created,
10    /// Starting.
11    Starting,
12    /// Processing events.
13    Running,
14    /// Gracefully shutting down.
15    ShuttingDown,
16    /// Stopped.
17    Stopped,
18}
19
20impl std::fmt::Display for PipelineState {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        match self {
23            Self::Created => write!(f, "Created"),
24            Self::Starting => write!(f, "Starting"),
25            Self::Running => write!(f, "Running"),
26            Self::ShuttingDown => write!(f, "ShuttingDown"),
27            Self::Stopped => write!(f, "Stopped"),
28        }
29    }
30}
31
32/// Pipeline-wide metrics snapshot.
33#[derive(Debug, Clone)]
34pub struct PipelineMetrics {
35    /// Events ingested.
36    pub total_events_ingested: u64,
37    /// Events emitted.
38    pub total_events_emitted: u64,
39    /// Events dropped.
40    pub total_events_dropped: u64,
41    /// Cycles.
42    pub total_cycles: u64,
43    /// Batches.
44    pub total_batches: u64,
45    /// Uptime.
46    pub uptime: Duration,
47    /// State.
48    pub state: PipelineState,
49    /// Sources.
50    pub source_count: usize,
51    /// Streams.
52    pub stream_count: usize,
53    /// Sinks.
54    pub sink_count: usize,
55    /// Min watermark across all sources.
56    pub pipeline_watermark: i64,
57    /// MV updates.
58    pub mv_updates: u64,
59    /// Approximate MV bytes.
60    pub mv_bytes_stored: u64,
61}
62
63/// Metrics for a single registered source.
64#[derive(Debug, Clone)]
65pub struct SourceMetrics {
66    /// Name.
67    pub name: String,
68    /// Total events (sequence number).
69    pub total_events: u64,
70    /// Buffered events.
71    pub pending: usize,
72    /// Capacity.
73    pub capacity: usize,
74    /// >80% full.
75    pub is_backpressured: bool,
76    /// Watermark.
77    pub watermark: i64,
78    /// 0.0..1.0.
79    pub utilization: f64,
80}
81
82/// Metrics for a single registered stream.
83#[derive(Debug, Clone)]
84pub struct StreamMetrics {
85    /// Name.
86    pub name: String,
87    /// Total events.
88    pub total_events: u64,
89    /// Buffered events.
90    pub pending: usize,
91    /// Capacity.
92    pub capacity: usize,
93    /// >80% full.
94    pub is_backpressured: bool,
95    /// Watermark.
96    pub watermark: i64,
97    /// Defining SQL query.
98    pub sql: Option<String>,
99}
100
101const BACKPRESSURE_THRESHOLD: f64 = 0.8;
102
103#[must_use]
104#[allow(clippy::cast_precision_loss)]
105pub(crate) fn is_backpressured(pending: usize, capacity: usize) -> bool {
106    capacity > 0 && (pending as f64 / capacity as f64) > BACKPRESSURE_THRESHOLD
107}
108
109#[must_use]
110#[allow(clippy::cast_precision_loss)]
111pub(crate) fn utilization(pending: usize, capacity: usize) -> f64 {
112    if capacity == 0 {
113        0.0
114    } else {
115        pending as f64 / capacity as f64
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn test_pipeline_state_display() {
125        assert_eq!(PipelineState::Created.to_string(), "Created");
126        assert_eq!(PipelineState::Starting.to_string(), "Starting");
127        assert_eq!(PipelineState::Running.to_string(), "Running");
128        assert_eq!(PipelineState::ShuttingDown.to_string(), "ShuttingDown");
129        assert_eq!(PipelineState::Stopped.to_string(), "Stopped");
130    }
131
132    #[test]
133    fn test_pipeline_state_equality() {
134        assert_eq!(PipelineState::Running, PipelineState::Running);
135        assert_ne!(PipelineState::Created, PipelineState::Running);
136    }
137
138    #[test]
139    fn test_backpressure_detection() {
140        // Empty buffer: not backpressured
141        assert!(!is_backpressured(0, 100));
142        // 50% full: not backpressured
143        assert!(!is_backpressured(50, 100));
144        // 80% full: not backpressured (threshold is >0.8, not >=)
145        assert!(!is_backpressured(80, 100));
146        // 81% full: backpressured
147        assert!(is_backpressured(81, 100));
148        // Full: backpressured
149        assert!(is_backpressured(100, 100));
150        // Zero capacity: not backpressured
151        assert!(!is_backpressured(0, 0));
152    }
153
154    #[test]
155    fn test_utilization() {
156        assert!((utilization(0, 100) - 0.0).abs() < f64::EPSILON);
157        assert!((utilization(50, 100) - 0.5).abs() < f64::EPSILON);
158        assert!((utilization(100, 100) - 1.0).abs() < f64::EPSILON);
159        assert!((utilization(0, 0) - 0.0).abs() < f64::EPSILON);
160    }
161
162    #[test]
163    fn test_pipeline_metrics_clone() {
164        let m = PipelineMetrics {
165            total_events_ingested: 100,
166            total_events_emitted: 50,
167            total_events_dropped: 0,
168            total_cycles: 10,
169            total_batches: 5,
170            uptime: Duration::from_secs(60),
171            state: PipelineState::Running,
172            source_count: 2,
173            stream_count: 1,
174            sink_count: 1,
175            pipeline_watermark: i64::MIN,
176            mv_updates: 0,
177            mv_bytes_stored: 0,
178        };
179        let m2 = m.clone();
180        assert_eq!(m2.total_events_ingested, 100);
181        assert_eq!(m2.state, PipelineState::Running);
182    }
183
184    #[test]
185    fn test_source_metrics_debug() {
186        let m = SourceMetrics {
187            name: "trades".to_string(),
188            total_events: 1000,
189            pending: 50,
190            capacity: 1024,
191            is_backpressured: false,
192            watermark: 12345,
193            utilization: 0.05,
194        };
195        let dbg = format!("{m:?}");
196        assert!(dbg.contains("trades"));
197        assert!(dbg.contains("1000"));
198    }
199
200    #[test]
201    fn test_stream_metrics_with_sql() {
202        let m = StreamMetrics {
203            name: "avg_price".to_string(),
204            total_events: 500,
205            pending: 0,
206            capacity: 1024,
207            is_backpressured: false,
208            watermark: 0,
209            sql: Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol".to_string()),
210        };
211        assert_eq!(
212            m.sql.as_deref(),
213            Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol")
214        );
215    }
216}