Skip to main content

laminar_core/aggregation/
gossip_aggregates.rs

1//! Gossip-based partial aggregate replication across nodes.
2//!
3//! Reuses the chitchat gossip protocol from the gossip discovery module to disseminate
4//! partial aggregates across the delta. Each node publishes
5//! its local partials to chitchat, and a merger reads both local and
6//! remote partials to produce cluster-wide aggregates.
7//!
8//! ## Key Format
9//!
10//! Chitchat key namespace: `agg/{pipeline_id}/{aggregate_name}/{window_scope}`
11//!
12//! ## Watermark-Gated Windows
13//!
14//! A window is considered FINAL when ALL nodes have a watermark >= window_end.
15
16use rustc_hash::FxHashMap;
17
18use serde::{Deserialize, Serialize};
19
20use crate::delta::discovery::NodeId;
21
22/// Aggregate state for a single partial.
23#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24pub enum AggregateState {
25    /// Count aggregate.
26    Count(i64),
27    /// Sum aggregate.
28    Sum(f64),
29    /// Minimum value.
30    Min(f64),
31    /// Maximum value.
32    Max(f64),
33    /// Average (running sum + count).
34    Avg {
35        /// Running sum.
36        sum: f64,
37        /// Running count.
38        count: i64,
39    },
40    /// Custom aggregate with opaque serialized state.
41    Custom(Vec<u8>),
42}
43
44impl AggregateState {
45    /// Merge another partial into this one.
46    pub fn merge(&mut self, other: &Self) {
47        match (self, other) {
48            (Self::Count(a), Self::Count(b)) => *a += b,
49            (Self::Sum(a), Self::Sum(b)) => *a += b,
50            (Self::Min(a), Self::Min(b)) => {
51                if *b < *a {
52                    *a = *b;
53                }
54            }
55            (Self::Max(a), Self::Max(b)) => {
56                if *b > *a {
57                    *a = *b;
58                }
59            }
60            (Self::Avg { sum: s1, count: c1 }, Self::Avg { sum: s2, count: c2 }) => {
61                *s1 += s2;
62                *c1 += c2;
63            }
64            _ => {} // Type mismatch — ignore
65        }
66    }
67
68    /// Finalize the aggregate to a scalar value.
69    #[must_use]
70    #[allow(clippy::cast_precision_loss)]
71    pub fn finalize(&self) -> f64 {
72        match self {
73            Self::Count(n) => *n as f64,
74            Self::Sum(s) | Self::Min(s) | Self::Max(s) => *s,
75            Self::Avg { sum, count } => {
76                if *count > 0 {
77                    sum / (*count as f64)
78                } else {
79                    0.0
80                }
81            }
82            Self::Custom(_) => f64::NAN,
83        }
84    }
85}
86
87/// A gossip-published aggregate value.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct GossipAggregateValue {
90    /// Source node.
91    pub node_id: NodeId,
92    /// Watermark at time of publication (millis since epoch).
93    pub watermark_ms: i64,
94    /// Epoch at time of publication.
95    pub epoch: u64,
96    /// The partial aggregate state.
97    pub state: AggregateState,
98}
99
100/// Chitchat key builder for aggregate namespacing.
101#[derive(Debug, Clone)]
102pub struct AggregateKeyspace {
103    /// Pipeline identifier.
104    pub pipeline_id: String,
105    /// Aggregate name.
106    pub aggregate_name: String,
107}
108
109impl AggregateKeyspace {
110    /// Create a new aggregate keyspace.
111    #[must_use]
112    pub fn new(pipeline_id: String, aggregate_name: String) -> Self {
113        Self {
114            pipeline_id,
115            aggregate_name,
116        }
117    }
118
119    /// Build a chitchat key for a global aggregate.
120    #[must_use]
121    pub fn global_key(&self) -> String {
122        format!("agg/{}/{}/global", self.pipeline_id, self.aggregate_name)
123    }
124
125    /// Build a chitchat key for a windowed aggregate.
126    #[must_use]
127    pub fn window_key(&self, window_start: i64, window_end: i64) -> String {
128        format!(
129            "agg/{}/{}/window/{window_start}_{window_end}",
130            self.pipeline_id, self.aggregate_name
131        )
132    }
133
134    /// Parse a chitchat key to extract its scope.
135    #[must_use]
136    pub fn parse_scope(key: &str) -> Option<AggregateScope> {
137        let parts: Vec<&str> = key.split('/').collect();
138        if parts.len() < 4 || parts[0] != "agg" {
139            return None;
140        }
141        match parts[3] {
142            "global" => Some(AggregateScope::Global),
143            "window" if parts.len() >= 5 => {
144                let window_parts: Vec<&str> = parts[4].split('_').collect();
145                if window_parts.len() == 2 {
146                    let start = window_parts[0].parse().ok()?;
147                    let end = window_parts[1].parse().ok()?;
148                    Some(AggregateScope::Window { start, end })
149                } else {
150                    None
151                }
152            }
153            _ => None,
154        }
155    }
156}
157
158/// The scope of a gossip aggregate.
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub enum AggregateScope {
161    /// Global (un-windowed) aggregate.
162    Global,
163    /// Windowed aggregate.
164    Window {
165        /// Window start (millis since epoch).
166        start: i64,
167        /// Window end (millis since epoch).
168        end: i64,
169    },
170}
171
172/// Watermark-gated window completion status.
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum WatermarkGateStatus {
175    /// All nodes have watermark >= `window_end`.
176    Complete,
177    /// Some nodes have not yet advanced past `window_end`.
178    Incomplete {
179        /// Nodes that haven't advanced.
180        lagging_nodes: Vec<NodeId>,
181        /// Minimum watermark across all nodes.
182        min_watermark: i64,
183    },
184    /// Not enough information (e.g., unknown node set).
185    Unknown,
186}
187
188/// Checks watermark convergence for window completion.
189#[must_use]
190#[allow(clippy::implicit_hasher)] // intentionally requires FxHashMap for hot-path hashing
191pub fn check_watermark_gate(
192    window_end: i64,
193    node_watermarks: &FxHashMap<NodeId, i64>,
194) -> WatermarkGateStatus {
195    if node_watermarks.is_empty() {
196        return WatermarkGateStatus::Unknown;
197    }
198
199    let mut lagging = Vec::new();
200    let mut min_wm = i64::MAX;
201
202    for (node_id, &wm) in node_watermarks {
203        min_wm = min_wm.min(wm);
204        if wm < window_end {
205            lagging.push(*node_id);
206        }
207    }
208
209    if lagging.is_empty() {
210        WatermarkGateStatus::Complete
211    } else {
212        WatermarkGateStatus::Incomplete {
213            lagging_nodes: lagging,
214            min_watermark: min_wm,
215        }
216    }
217}
218
219/// Result of merging aggregates across the cluster.
220#[derive(Debug, Clone)]
221pub struct ClusterAggregateResult {
222    /// The merged aggregate state.
223    pub state: AggregateState,
224    /// Number of nodes that contributed.
225    pub contributing_nodes: usize,
226    /// Whether the result is complete (all nodes reported).
227    pub is_complete: bool,
228    /// Maximum staleness across contributing nodes (millis).
229    pub max_staleness_ms: i64,
230}
231
232/// Merges partial aggregates from multiple nodes.
233#[must_use]
234pub fn merge_cluster_aggregates(
235    partials: &[GossipAggregateValue],
236    expected_nodes: usize,
237) -> Option<ClusterAggregateResult> {
238    if partials.is_empty() {
239        return None;
240    }
241
242    let mut merged = partials[0].state.clone();
243    let now = chrono::Utc::now().timestamp_millis();
244    let mut max_staleness: i64 = 0;
245
246    for partial in &partials[1..] {
247        merged.merge(&partial.state);
248        let staleness = now.saturating_sub(partial.watermark_ms);
249        max_staleness = max_staleness.max(staleness);
250    }
251
252    Some(ClusterAggregateResult {
253        state: merged,
254        contributing_nodes: partials.len(),
255        is_complete: partials.len() >= expected_nodes,
256        max_staleness_ms: max_staleness,
257    })
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_aggregate_state_merge_count() {
266        let mut a = AggregateState::Count(10);
267        a.merge(&AggregateState::Count(5));
268        assert_eq!(a, AggregateState::Count(15));
269    }
270
271    #[test]
272    fn test_aggregate_state_merge_sum() {
273        let mut a = AggregateState::Sum(1.5);
274        a.merge(&AggregateState::Sum(2.5));
275        assert_eq!(a, AggregateState::Sum(4.0));
276    }
277
278    #[test]
279    fn test_aggregate_state_merge_min() {
280        let mut a = AggregateState::Min(10.0);
281        a.merge(&AggregateState::Min(5.0));
282        assert_eq!(a, AggregateState::Min(5.0));
283    }
284
285    #[test]
286    fn test_aggregate_state_merge_max() {
287        let mut a = AggregateState::Max(5.0);
288        a.merge(&AggregateState::Max(10.0));
289        assert_eq!(a, AggregateState::Max(10.0));
290    }
291
292    #[test]
293    fn test_aggregate_state_merge_avg() {
294        let mut a = AggregateState::Avg {
295            sum: 10.0,
296            count: 2,
297        };
298        a.merge(&AggregateState::Avg {
299            sum: 20.0,
300            count: 3,
301        });
302        match a {
303            AggregateState::Avg { sum, count } => {
304                assert!((sum - 30.0).abs() < f64::EPSILON);
305                assert_eq!(count, 5);
306            }
307            _ => panic!("expected Avg"),
308        }
309    }
310
311    #[test]
312    fn test_aggregate_state_finalize() {
313        assert!((AggregateState::Count(42).finalize() - 42.0).abs() < f64::EPSILON);
314        assert!(
315            (AggregateState::Sum(std::f64::consts::PI).finalize() - std::f64::consts::PI).abs()
316                < f64::EPSILON
317        );
318        let avg = AggregateState::Avg {
319            sum: 10.0,
320            count: 4,
321        };
322        assert!((avg.finalize() - 2.5).abs() < f64::EPSILON);
323    }
324
325    #[test]
326    fn test_aggregate_keyspace_global() {
327        let ks = AggregateKeyspace::new("pipe1".into(), "total_sales".into());
328        assert_eq!(ks.global_key(), "agg/pipe1/total_sales/global");
329    }
330
331    #[test]
332    fn test_aggregate_keyspace_window() {
333        let ks = AggregateKeyspace::new("pipe1".into(), "hourly_count".into());
334        assert_eq!(
335            ks.window_key(1000, 2000),
336            "agg/pipe1/hourly_count/window/1000_2000"
337        );
338    }
339
340    #[test]
341    fn test_parse_scope_global() {
342        let scope = AggregateKeyspace::parse_scope("agg/pipe1/total/global").unwrap();
343        assert_eq!(scope, AggregateScope::Global);
344    }
345
346    #[test]
347    fn test_parse_scope_window() {
348        let scope = AggregateKeyspace::parse_scope("agg/pipe1/hourly/window/1000_2000").unwrap();
349        assert_eq!(
350            scope,
351            AggregateScope::Window {
352                start: 1000,
353                end: 2000
354            }
355        );
356    }
357
358    #[test]
359    fn test_parse_scope_invalid() {
360        assert!(AggregateKeyspace::parse_scope("invalid").is_none());
361        assert!(AggregateKeyspace::parse_scope("agg/a/b").is_none());
362        assert!(AggregateKeyspace::parse_scope("agg/a/b/unknown").is_none());
363    }
364
365    #[test]
366    fn test_watermark_gate_complete() {
367        let mut wms = FxHashMap::default();
368        wms.insert(NodeId(1), 2000);
369        wms.insert(NodeId(2), 1500);
370        assert_eq!(
371            check_watermark_gate(1000, &wms),
372            WatermarkGateStatus::Complete
373        );
374    }
375
376    #[test]
377    fn test_watermark_gate_incomplete() {
378        let mut wms = FxHashMap::default();
379        wms.insert(NodeId(1), 2000);
380        wms.insert(NodeId(2), 500);
381        let status = check_watermark_gate(1000, &wms);
382        match status {
383            WatermarkGateStatus::Incomplete {
384                lagging_nodes,
385                min_watermark,
386            } => {
387                assert_eq!(lagging_nodes, vec![NodeId(2)]);
388                assert_eq!(min_watermark, 500);
389            }
390            _ => panic!("expected Incomplete"),
391        }
392    }
393
394    #[test]
395    fn test_watermark_gate_empty() {
396        assert_eq!(
397            check_watermark_gate(1000, &FxHashMap::default()),
398            WatermarkGateStatus::Unknown
399        );
400    }
401
402    #[test]
403    fn test_merge_cluster_aggregates() {
404        let partials = vec![
405            GossipAggregateValue {
406                node_id: NodeId(1),
407                watermark_ms: chrono::Utc::now().timestamp_millis(),
408                epoch: 1,
409                state: AggregateState::Count(10),
410            },
411            GossipAggregateValue {
412                node_id: NodeId(2),
413                watermark_ms: chrono::Utc::now().timestamp_millis(),
414                epoch: 1,
415                state: AggregateState::Count(20),
416            },
417        ];
418
419        let result = merge_cluster_aggregates(&partials, 2).unwrap();
420        assert_eq!(result.state, AggregateState::Count(30));
421        assert_eq!(result.contributing_nodes, 2);
422        assert!(result.is_complete);
423    }
424
425    #[test]
426    fn test_merge_cluster_aggregates_empty() {
427        assert!(merge_cluster_aggregates(&[], 3).is_none());
428    }
429
430    #[test]
431    fn test_merge_cluster_aggregates_incomplete() {
432        let partials = vec![GossipAggregateValue {
433            node_id: NodeId(1),
434            watermark_ms: chrono::Utc::now().timestamp_millis(),
435            epoch: 1,
436            state: AggregateState::Sum(42.0),
437        }];
438
439        let result = merge_cluster_aggregates(&partials, 3).unwrap();
440        assert!(!result.is_complete);
441        assert_eq!(result.contributing_nodes, 1);
442    }
443
444    #[test]
445    fn test_gossip_aggregate_value_serialization() {
446        let val = GossipAggregateValue {
447            node_id: NodeId(1),
448            watermark_ms: 1000,
449            epoch: 5,
450            state: AggregateState::Count(42),
451        };
452        let json = serde_json::to_string(&val).unwrap();
453        let back: GossipAggregateValue = serde_json::from_str(&json).unwrap();
454        assert_eq!(back.node_id, NodeId(1));
455        assert_eq!(back.epoch, 5);
456    }
457
458    #[test]
459    fn test_aggregate_type_mismatch_merge_noop() {
460        let mut a = AggregateState::Count(10);
461        a.merge(&AggregateState::Sum(5.0));
462        // Type mismatch should be a no-op
463        assert_eq!(a, AggregateState::Count(10));
464    }
465
466    #[test]
467    fn test_avg_zero_count_finalize() {
468        let avg = AggregateState::Avg { sum: 0.0, count: 0 };
469        assert!((avg.finalize()).abs() < f64::EPSILON);
470    }
471}