laminar_core/budget/
task_budget.rs1use std::time::Instant;
4
5#[derive(Debug)]
32pub struct TaskBudget {
33 start: Instant,
35 budget_ns: u64,
37 name: &'static str,
39 ring: u8,
41 record_metrics: bool,
43}
44
45impl TaskBudget {
46 pub const RING0_EVENT_NS: u64 = 500;
50
51 pub const RING0_BATCH_NS: u64 = 5_000;
53
54 pub const RING0_LOOKUP_NS: u64 = 200;
56
57 pub const RING0_WINDOW_NS: u64 = 10_000;
59
60 pub const RING0_ITERATION_NS: u64 = 10_000;
62
63 pub const RING1_CHUNK_NS: u64 = 1_000_000;
67
68 pub const RING1_CHECKPOINT_NS: u64 = 10_000_000;
70
71 pub const RING1_WAL_FLUSH_NS: u64 = 100_000;
73
74 pub const RING1_COMPACTION_NS: u64 = 5_000_000;
76
77 #[inline]
81 #[must_use]
82 pub fn ring0_event() -> Self {
83 Self {
84 start: Instant::now(),
85 budget_ns: Self::RING0_EVENT_NS,
86 name: "ring0_event",
87 ring: 0,
88 record_metrics: true,
89 }
90 }
91
92 #[inline]
95 #[must_use]
96 pub fn ring0_event_untracked() -> Self {
97 Self {
98 start: Instant::now(),
99 budget_ns: Self::RING0_EVENT_NS,
100 name: "ring0_event",
101 ring: 0,
102 record_metrics: false,
103 }
104 }
105
106 #[inline]
108 #[must_use]
109 pub fn ring0_batch() -> Self {
110 Self {
111 start: Instant::now(),
112 budget_ns: Self::RING0_BATCH_NS,
113 name: "ring0_batch",
114 ring: 0,
115 record_metrics: true,
116 }
117 }
118
119 #[inline]
121 #[must_use]
122 pub fn ring0_lookup() -> Self {
123 Self {
124 start: Instant::now(),
125 budget_ns: Self::RING0_LOOKUP_NS,
126 name: "ring0_lookup",
127 ring: 0,
128 record_metrics: true,
129 }
130 }
131
132 #[inline]
134 #[must_use]
135 pub fn ring0_window() -> Self {
136 Self {
137 start: Instant::now(),
138 budget_ns: Self::RING0_WINDOW_NS,
139 name: "ring0_window",
140 ring: 0,
141 record_metrics: true,
142 }
143 }
144
145 #[inline]
147 #[must_use]
148 pub fn ring0_iteration() -> Self {
149 Self {
150 start: Instant::now(),
151 budget_ns: Self::RING0_ITERATION_NS,
152 name: "ring0_iteration",
153 ring: 0,
154 record_metrics: true,
155 }
156 }
157
158 #[inline]
162 #[must_use]
163 pub fn ring1_chunk() -> Self {
164 Self {
165 start: Instant::now(),
166 budget_ns: Self::RING1_CHUNK_NS,
167 name: "ring1_chunk",
168 ring: 1,
169 record_metrics: true,
170 }
171 }
172
173 #[inline]
175 #[must_use]
176 pub fn ring1_checkpoint() -> Self {
177 Self {
178 start: Instant::now(),
179 budget_ns: Self::RING1_CHECKPOINT_NS,
180 name: "ring1_checkpoint",
181 ring: 1,
182 record_metrics: true,
183 }
184 }
185
186 #[inline]
188 #[must_use]
189 pub fn ring1_wal_flush() -> Self {
190 Self {
191 start: Instant::now(),
192 budget_ns: Self::RING1_WAL_FLUSH_NS,
193 name: "ring1_wal_flush",
194 ring: 1,
195 record_metrics: true,
196 }
197 }
198
199 #[inline]
201 #[must_use]
202 pub fn ring1_compaction() -> Self {
203 Self {
204 start: Instant::now(),
205 budget_ns: Self::RING1_COMPACTION_NS,
206 name: "ring1_compaction",
207 ring: 1,
208 record_metrics: true,
209 }
210 }
211
212 #[inline]
222 #[must_use]
223 pub fn custom(name: &'static str, ring: u8, budget_ns: u64) -> Self {
224 Self {
225 start: Instant::now(),
226 budget_ns,
227 name,
228 ring,
229 record_metrics: true,
230 }
231 }
232
233 #[inline]
235 #[must_use]
236 pub fn custom_untracked(name: &'static str, ring: u8, budget_ns: u64) -> Self {
237 Self {
238 start: Instant::now(),
239 budget_ns,
240 name,
241 ring,
242 record_metrics: false,
243 }
244 }
245
246 #[inline]
250 #[must_use]
251 pub fn name(&self) -> &'static str {
252 self.name
253 }
254
255 #[inline]
257 #[must_use]
258 pub fn ring(&self) -> u8 {
259 self.ring
260 }
261
262 #[inline]
264 #[must_use]
265 pub fn budget_ns(&self) -> u64 {
266 self.budget_ns
267 }
268
269 #[inline]
277 #[must_use]
278 #[allow(clippy::cast_possible_truncation)]
279 pub fn elapsed_ns(&self) -> u64 {
280 self.start.elapsed().as_nanos() as u64
281 }
282
283 #[inline]
289 #[must_use]
290 #[allow(clippy::cast_possible_wrap)]
291 pub fn remaining_ns(&self) -> i64 {
292 self.budget_ns as i64 - self.elapsed_ns() as i64
296 }
297
298 #[inline]
300 #[must_use]
301 pub fn exceeded(&self) -> bool {
302 self.elapsed_ns() > self.budget_ns
303 }
304
305 #[inline]
309 #[must_use]
310 pub fn almost_exceeded(&self) -> bool {
311 self.elapsed_ns() > (self.budget_ns * 8) / 10
312 }
313
314 #[inline]
318 #[must_use]
319 pub fn half_used(&self) -> bool {
320 self.elapsed_ns() > self.budget_ns / 2
321 }
322
323 #[inline]
325 #[must_use]
326 pub fn percentage_used(&self) -> u64 {
327 let elapsed = self.elapsed_ns();
328 if self.budget_ns == 0 {
329 return 100;
330 }
331 (elapsed * 100) / self.budget_ns
332 }
333}
334
335impl Drop for TaskBudget {
336 fn drop(&mut self) {
337 if self.record_metrics {
338 let elapsed = self.elapsed_ns();
339 if elapsed > self.budget_ns {
340 tracing::trace!(
341 task = self.name,
342 ring = self.ring,
343 budget_ns = self.budget_ns,
344 elapsed_ns = elapsed,
345 "budget exceeded",
346 );
347 }
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use std::thread;
356 use std::time::Duration;
357
358 #[test]
359 fn test_elapsed_increases() {
360 let budget = TaskBudget::ring1_chunk();
361 let t1 = budget.elapsed_ns();
362 thread::sleep(Duration::from_micros(100));
363 let t2 = budget.elapsed_ns();
364 assert!(t2 > t1);
365 }
366
367 #[test]
368 fn test_remaining_decreases() {
369 let budget = TaskBudget::ring1_chunk();
370 let r1 = budget.remaining_ns();
371 thread::sleep(Duration::from_micros(100));
372 let r2 = budget.remaining_ns();
373 assert!(r2 < r1);
374 }
375
376 #[test]
377 fn test_percentage_used() {
378 let budget = TaskBudget::custom("test", 0, 100_000); let pct = budget.percentage_used();
382 assert!(pct < 50, "Early percentage {pct} should be low");
383 }
384
385 #[test]
386 fn test_half_used() {
387 let budget = TaskBudget::custom("test", 0, 1_000_000_000); assert!(!budget.half_used());
392
393 thread::sleep(Duration::from_millis(600));
395
396 assert!(budget.half_used());
398 }
399
400 #[test]
401 fn test_untracked_budget() {
402 let budget = TaskBudget::ring0_event_untracked();
403 assert!(!budget.record_metrics);
404
405 let budget2 = TaskBudget::custom_untracked("test", 0, 1000);
406 assert!(!budget2.record_metrics);
407 }
408
409 #[test]
410 fn test_ring0_iteration() {
411 let budget = TaskBudget::ring0_iteration();
412 assert_eq!(budget.name(), "ring0_iteration");
413 assert_eq!(budget.budget_ns(), TaskBudget::RING0_ITERATION_NS);
414 }
415
416 #[test]
417 fn test_ring1_compaction() {
418 let budget = TaskBudget::ring1_compaction();
419 assert_eq!(budget.name(), "ring1_compaction");
420 assert_eq!(budget.budget_ns(), TaskBudget::RING1_COMPACTION_NS);
421 }
422}