1use std::time::Duration;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum PipelineState {
8 Created,
10 Starting,
12 Running,
14 ShuttingDown,
16 Stopped,
18}
19
20impl std::fmt::Display for PipelineState {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 match self {
23 Self::Created => write!(f, "Created"),
24 Self::Starting => write!(f, "Starting"),
25 Self::Running => write!(f, "Running"),
26 Self::ShuttingDown => write!(f, "ShuttingDown"),
27 Self::Stopped => write!(f, "Stopped"),
28 }
29 }
30}
31
32#[derive(Debug, Clone)]
34pub struct PipelineMetrics {
35 pub total_events_ingested: u64,
37 pub total_events_emitted: u64,
39 pub total_events_dropped: u64,
41 pub total_cycles: u64,
43 pub total_batches: u64,
45 pub uptime: Duration,
47 pub state: PipelineState,
49 pub source_count: usize,
51 pub stream_count: usize,
53 pub sink_count: usize,
55 pub pipeline_watermark: i64,
57 pub mv_updates: u64,
59 pub mv_bytes_stored: u64,
61}
62
63#[derive(Debug, Clone)]
65pub struct SourceMetrics {
66 pub name: String,
68 pub total_events: u64,
70 pub pending: usize,
72 pub capacity: usize,
74 pub is_backpressured: bool,
76 pub watermark: i64,
78 pub utilization: f64,
80}
81
82#[derive(Debug, Clone)]
84pub struct StreamMetrics {
85 pub name: String,
87 pub total_events: u64,
89 pub pending: usize,
91 pub capacity: usize,
93 pub is_backpressured: bool,
95 pub watermark: i64,
97 pub sql: Option<String>,
99}
100
101const BACKPRESSURE_THRESHOLD: f64 = 0.8;
102
103#[must_use]
104#[allow(clippy::cast_precision_loss)]
105pub(crate) fn is_backpressured(pending: usize, capacity: usize) -> bool {
106 capacity > 0 && (pending as f64 / capacity as f64) > BACKPRESSURE_THRESHOLD
107}
108
109#[must_use]
110#[allow(clippy::cast_precision_loss)]
111pub(crate) fn utilization(pending: usize, capacity: usize) -> f64 {
112 if capacity == 0 {
113 0.0
114 } else {
115 pending as f64 / capacity as f64
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[test]
124 fn test_pipeline_state_display() {
125 assert_eq!(PipelineState::Created.to_string(), "Created");
126 assert_eq!(PipelineState::Starting.to_string(), "Starting");
127 assert_eq!(PipelineState::Running.to_string(), "Running");
128 assert_eq!(PipelineState::ShuttingDown.to_string(), "ShuttingDown");
129 assert_eq!(PipelineState::Stopped.to_string(), "Stopped");
130 }
131
132 #[test]
133 fn test_pipeline_state_equality() {
134 assert_eq!(PipelineState::Running, PipelineState::Running);
135 assert_ne!(PipelineState::Created, PipelineState::Running);
136 }
137
138 #[test]
139 fn test_backpressure_detection() {
140 assert!(!is_backpressured(0, 100));
142 assert!(!is_backpressured(50, 100));
144 assert!(!is_backpressured(80, 100));
146 assert!(is_backpressured(81, 100));
148 assert!(is_backpressured(100, 100));
150 assert!(!is_backpressured(0, 0));
152 }
153
154 #[test]
155 fn test_utilization() {
156 assert!((utilization(0, 100) - 0.0).abs() < f64::EPSILON);
157 assert!((utilization(50, 100) - 0.5).abs() < f64::EPSILON);
158 assert!((utilization(100, 100) - 1.0).abs() < f64::EPSILON);
159 assert!((utilization(0, 0) - 0.0).abs() < f64::EPSILON);
160 }
161
162 #[test]
163 fn test_pipeline_metrics_clone() {
164 let m = PipelineMetrics {
165 total_events_ingested: 100,
166 total_events_emitted: 50,
167 total_events_dropped: 0,
168 total_cycles: 10,
169 total_batches: 5,
170 uptime: Duration::from_secs(60),
171 state: PipelineState::Running,
172 source_count: 2,
173 stream_count: 1,
174 sink_count: 1,
175 pipeline_watermark: i64::MIN,
176 mv_updates: 0,
177 mv_bytes_stored: 0,
178 };
179 let m2 = m.clone();
180 assert_eq!(m2.total_events_ingested, 100);
181 assert_eq!(m2.state, PipelineState::Running);
182 }
183
184 #[test]
185 fn test_source_metrics_debug() {
186 let m = SourceMetrics {
187 name: "trades".to_string(),
188 total_events: 1000,
189 pending: 50,
190 capacity: 1024,
191 is_backpressured: false,
192 watermark: 12345,
193 utilization: 0.05,
194 };
195 let dbg = format!("{m:?}");
196 assert!(dbg.contains("trades"));
197 assert!(dbg.contains("1000"));
198 }
199
200 #[test]
201 fn test_stream_metrics_with_sql() {
202 let m = StreamMetrics {
203 name: "avg_price".to_string(),
204 total_events: 500,
205 pending: 0,
206 capacity: 1024,
207 is_backpressured: false,
208 watermark: 0,
209 sql: Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol".to_string()),
210 };
211 assert_eq!(
212 m.sql.as_deref(),
213 Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol")
214 );
215 }
216}