laminar_core/aggregation/
gossip_aggregates.rs1use rustc_hash::FxHashMap;
17
18use serde::{Deserialize, Serialize};
19
20use crate::delta::discovery::NodeId;
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24pub enum AggregateState {
25 Count(i64),
27 Sum(f64),
29 Min(f64),
31 Max(f64),
33 Avg {
35 sum: f64,
37 count: i64,
39 },
40 Custom(Vec<u8>),
42}
43
44impl AggregateState {
45 pub fn merge(&mut self, other: &Self) {
47 match (self, other) {
48 (Self::Count(a), Self::Count(b)) => *a += b,
49 (Self::Sum(a), Self::Sum(b)) => *a += b,
50 (Self::Min(a), Self::Min(b)) => {
51 if *b < *a {
52 *a = *b;
53 }
54 }
55 (Self::Max(a), Self::Max(b)) => {
56 if *b > *a {
57 *a = *b;
58 }
59 }
60 (Self::Avg { sum: s1, count: c1 }, Self::Avg { sum: s2, count: c2 }) => {
61 *s1 += s2;
62 *c1 += c2;
63 }
64 _ => {} }
66 }
67
68 #[must_use]
70 #[allow(clippy::cast_precision_loss)]
71 pub fn finalize(&self) -> f64 {
72 match self {
73 Self::Count(n) => *n as f64,
74 Self::Sum(s) | Self::Min(s) | Self::Max(s) => *s,
75 Self::Avg { sum, count } => {
76 if *count > 0 {
77 sum / (*count as f64)
78 } else {
79 0.0
80 }
81 }
82 Self::Custom(_) => f64::NAN,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct GossipAggregateValue {
90 pub node_id: NodeId,
92 pub watermark_ms: i64,
94 pub epoch: u64,
96 pub state: AggregateState,
98}
99
100#[derive(Debug, Clone)]
102pub struct AggregateKeyspace {
103 pub pipeline_id: String,
105 pub aggregate_name: String,
107}
108
109impl AggregateKeyspace {
110 #[must_use]
112 pub fn new(pipeline_id: String, aggregate_name: String) -> Self {
113 Self {
114 pipeline_id,
115 aggregate_name,
116 }
117 }
118
119 #[must_use]
121 pub fn global_key(&self) -> String {
122 format!("agg/{}/{}/global", self.pipeline_id, self.aggregate_name)
123 }
124
125 #[must_use]
127 pub fn window_key(&self, window_start: i64, window_end: i64) -> String {
128 format!(
129 "agg/{}/{}/window/{window_start}_{window_end}",
130 self.pipeline_id, self.aggregate_name
131 )
132 }
133
134 #[must_use]
136 pub fn parse_scope(key: &str) -> Option<AggregateScope> {
137 let parts: Vec<&str> = key.split('/').collect();
138 if parts.len() < 4 || parts[0] != "agg" {
139 return None;
140 }
141 match parts[3] {
142 "global" => Some(AggregateScope::Global),
143 "window" if parts.len() >= 5 => {
144 let window_parts: Vec<&str> = parts[4].split('_').collect();
145 if window_parts.len() == 2 {
146 let start = window_parts[0].parse().ok()?;
147 let end = window_parts[1].parse().ok()?;
148 Some(AggregateScope::Window { start, end })
149 } else {
150 None
151 }
152 }
153 _ => None,
154 }
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160pub enum AggregateScope {
161 Global,
163 Window {
165 start: i64,
167 end: i64,
169 },
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum WatermarkGateStatus {
175 Complete,
177 Incomplete {
179 lagging_nodes: Vec<NodeId>,
181 min_watermark: i64,
183 },
184 Unknown,
186}
187
188#[must_use]
190#[allow(clippy::implicit_hasher)] pub fn check_watermark_gate(
192 window_end: i64,
193 node_watermarks: &FxHashMap<NodeId, i64>,
194) -> WatermarkGateStatus {
195 if node_watermarks.is_empty() {
196 return WatermarkGateStatus::Unknown;
197 }
198
199 let mut lagging = Vec::new();
200 let mut min_wm = i64::MAX;
201
202 for (node_id, &wm) in node_watermarks {
203 min_wm = min_wm.min(wm);
204 if wm < window_end {
205 lagging.push(*node_id);
206 }
207 }
208
209 if lagging.is_empty() {
210 WatermarkGateStatus::Complete
211 } else {
212 WatermarkGateStatus::Incomplete {
213 lagging_nodes: lagging,
214 min_watermark: min_wm,
215 }
216 }
217}
218
219#[derive(Debug, Clone)]
221pub struct ClusterAggregateResult {
222 pub state: AggregateState,
224 pub contributing_nodes: usize,
226 pub is_complete: bool,
228 pub max_staleness_ms: i64,
230}
231
232#[must_use]
234pub fn merge_cluster_aggregates(
235 partials: &[GossipAggregateValue],
236 expected_nodes: usize,
237) -> Option<ClusterAggregateResult> {
238 if partials.is_empty() {
239 return None;
240 }
241
242 let mut merged = partials[0].state.clone();
243 let now = chrono::Utc::now().timestamp_millis();
244 let mut max_staleness: i64 = 0;
245
246 for partial in &partials[1..] {
247 merged.merge(&partial.state);
248 let staleness = now.saturating_sub(partial.watermark_ms);
249 max_staleness = max_staleness.max(staleness);
250 }
251
252 Some(ClusterAggregateResult {
253 state: merged,
254 contributing_nodes: partials.len(),
255 is_complete: partials.len() >= expected_nodes,
256 max_staleness_ms: max_staleness,
257 })
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_aggregate_state_merge_count() {
266 let mut a = AggregateState::Count(10);
267 a.merge(&AggregateState::Count(5));
268 assert_eq!(a, AggregateState::Count(15));
269 }
270
271 #[test]
272 fn test_aggregate_state_merge_sum() {
273 let mut a = AggregateState::Sum(1.5);
274 a.merge(&AggregateState::Sum(2.5));
275 assert_eq!(a, AggregateState::Sum(4.0));
276 }
277
278 #[test]
279 fn test_aggregate_state_merge_min() {
280 let mut a = AggregateState::Min(10.0);
281 a.merge(&AggregateState::Min(5.0));
282 assert_eq!(a, AggregateState::Min(5.0));
283 }
284
285 #[test]
286 fn test_aggregate_state_merge_max() {
287 let mut a = AggregateState::Max(5.0);
288 a.merge(&AggregateState::Max(10.0));
289 assert_eq!(a, AggregateState::Max(10.0));
290 }
291
292 #[test]
293 fn test_aggregate_state_merge_avg() {
294 let mut a = AggregateState::Avg {
295 sum: 10.0,
296 count: 2,
297 };
298 a.merge(&AggregateState::Avg {
299 sum: 20.0,
300 count: 3,
301 });
302 match a {
303 AggregateState::Avg { sum, count } => {
304 assert!((sum - 30.0).abs() < f64::EPSILON);
305 assert_eq!(count, 5);
306 }
307 _ => panic!("expected Avg"),
308 }
309 }
310
311 #[test]
312 fn test_aggregate_state_finalize() {
313 assert!((AggregateState::Count(42).finalize() - 42.0).abs() < f64::EPSILON);
314 assert!(
315 (AggregateState::Sum(std::f64::consts::PI).finalize() - std::f64::consts::PI).abs()
316 < f64::EPSILON
317 );
318 let avg = AggregateState::Avg {
319 sum: 10.0,
320 count: 4,
321 };
322 assert!((avg.finalize() - 2.5).abs() < f64::EPSILON);
323 }
324
325 #[test]
326 fn test_aggregate_keyspace_global() {
327 let ks = AggregateKeyspace::new("pipe1".into(), "total_sales".into());
328 assert_eq!(ks.global_key(), "agg/pipe1/total_sales/global");
329 }
330
331 #[test]
332 fn test_aggregate_keyspace_window() {
333 let ks = AggregateKeyspace::new("pipe1".into(), "hourly_count".into());
334 assert_eq!(
335 ks.window_key(1000, 2000),
336 "agg/pipe1/hourly_count/window/1000_2000"
337 );
338 }
339
340 #[test]
341 fn test_parse_scope_global() {
342 let scope = AggregateKeyspace::parse_scope("agg/pipe1/total/global").unwrap();
343 assert_eq!(scope, AggregateScope::Global);
344 }
345
346 #[test]
347 fn test_parse_scope_window() {
348 let scope = AggregateKeyspace::parse_scope("agg/pipe1/hourly/window/1000_2000").unwrap();
349 assert_eq!(
350 scope,
351 AggregateScope::Window {
352 start: 1000,
353 end: 2000
354 }
355 );
356 }
357
358 #[test]
359 fn test_parse_scope_invalid() {
360 assert!(AggregateKeyspace::parse_scope("invalid").is_none());
361 assert!(AggregateKeyspace::parse_scope("agg/a/b").is_none());
362 assert!(AggregateKeyspace::parse_scope("agg/a/b/unknown").is_none());
363 }
364
365 #[test]
366 fn test_watermark_gate_complete() {
367 let mut wms = FxHashMap::default();
368 wms.insert(NodeId(1), 2000);
369 wms.insert(NodeId(2), 1500);
370 assert_eq!(
371 check_watermark_gate(1000, &wms),
372 WatermarkGateStatus::Complete
373 );
374 }
375
376 #[test]
377 fn test_watermark_gate_incomplete() {
378 let mut wms = FxHashMap::default();
379 wms.insert(NodeId(1), 2000);
380 wms.insert(NodeId(2), 500);
381 let status = check_watermark_gate(1000, &wms);
382 match status {
383 WatermarkGateStatus::Incomplete {
384 lagging_nodes,
385 min_watermark,
386 } => {
387 assert_eq!(lagging_nodes, vec![NodeId(2)]);
388 assert_eq!(min_watermark, 500);
389 }
390 _ => panic!("expected Incomplete"),
391 }
392 }
393
394 #[test]
395 fn test_watermark_gate_empty() {
396 assert_eq!(
397 check_watermark_gate(1000, &FxHashMap::default()),
398 WatermarkGateStatus::Unknown
399 );
400 }
401
402 #[test]
403 fn test_merge_cluster_aggregates() {
404 let partials = vec![
405 GossipAggregateValue {
406 node_id: NodeId(1),
407 watermark_ms: chrono::Utc::now().timestamp_millis(),
408 epoch: 1,
409 state: AggregateState::Count(10),
410 },
411 GossipAggregateValue {
412 node_id: NodeId(2),
413 watermark_ms: chrono::Utc::now().timestamp_millis(),
414 epoch: 1,
415 state: AggregateState::Count(20),
416 },
417 ];
418
419 let result = merge_cluster_aggregates(&partials, 2).unwrap();
420 assert_eq!(result.state, AggregateState::Count(30));
421 assert_eq!(result.contributing_nodes, 2);
422 assert!(result.is_complete);
423 }
424
425 #[test]
426 fn test_merge_cluster_aggregates_empty() {
427 assert!(merge_cluster_aggregates(&[], 3).is_none());
428 }
429
430 #[test]
431 fn test_merge_cluster_aggregates_incomplete() {
432 let partials = vec![GossipAggregateValue {
433 node_id: NodeId(1),
434 watermark_ms: chrono::Utc::now().timestamp_millis(),
435 epoch: 1,
436 state: AggregateState::Sum(42.0),
437 }];
438
439 let result = merge_cluster_aggregates(&partials, 3).unwrap();
440 assert!(!result.is_complete);
441 assert_eq!(result.contributing_nodes, 1);
442 }
443
444 #[test]
445 fn test_gossip_aggregate_value_serialization() {
446 let val = GossipAggregateValue {
447 node_id: NodeId(1),
448 watermark_ms: 1000,
449 epoch: 5,
450 state: AggregateState::Count(42),
451 };
452 let json = serde_json::to_string(&val).unwrap();
453 let back: GossipAggregateValue = serde_json::from_str(&json).unwrap();
454 assert_eq!(back.node_id, NodeId(1));
455 assert_eq!(back.epoch, 5);
456 }
457
458 #[test]
459 fn test_aggregate_type_mismatch_merge_noop() {
460 let mut a = AggregateState::Count(10);
461 a.merge(&AggregateState::Sum(5.0));
462 assert_eq!(a, AggregateState::Count(10));
464 }
465
466 #[test]
467 fn test_avg_zero_count_finalize() {
468 let avg = AggregateState::Avg { sum: 0.0, count: 0 };
469 assert!((avg.finalize()).abs() < f64::EPSILON);
470 }
471}