laminar_connectors/
partition_assignment.rs1use laminar_core::state::{NodeId, VnodeRegistry};
17
18#[must_use]
25pub fn owned_partitions(
26 total_partitions: i32,
27 registry: &VnodeRegistry,
28 self_id: NodeId,
29) -> Vec<i32> {
30 let vnode_count = registry.vnode_count();
31 (0..total_partitions)
32 .filter(|&p| {
33 let Ok(pid) = u32::try_from(p) else {
34 return false;
35 };
36 registry.owner(pid % vnode_count) == self_id
37 })
38 .collect()
39}
40
41#[cfg(test)]
42mod tests {
43 use super::*;
44 use laminar_core::state::rendezvous_assignment;
45 use std::sync::Arc;
46
47 fn registry_with(vnode_count: u32, owners: &[NodeId]) -> VnodeRegistry {
48 let assignment = rendezvous_assignment(vnode_count, owners);
49 let r = VnodeRegistry::new(vnode_count);
50 r.set_assignment(assignment);
51 r
52 }
53
54 #[test]
55 fn partitions_split_by_modulo_across_two_nodes() {
56 let r = registry_with(4, &[NodeId(1), NodeId(2)]);
58 let expected_node1: Vec<i32> = (0..8)
60 .filter(|&p| r.owner((p % 4) as u32) == NodeId(1))
61 .collect();
62 let expected_node2: Vec<i32> = (0..8)
63 .filter(|&p| r.owner((p % 4) as u32) == NodeId(2))
64 .collect();
65
66 assert_eq!(owned_partitions(8, &r, NodeId(1)), expected_node1);
67 assert_eq!(owned_partitions(8, &r, NodeId(2)), expected_node2);
68 }
69
70 #[test]
71 fn union_of_owners_covers_all_partitions_disjointly() {
72 let r = registry_with(4, &[NodeId(1), NodeId(2), NodeId(3)]);
73 let total = 16;
74 let mut all: Vec<i32> = Vec::new();
75 for owner in [NodeId(1), NodeId(2), NodeId(3)] {
76 all.extend(owned_partitions(total, &r, owner));
77 }
78 all.sort_unstable();
79 assert_eq!(
80 all,
81 (0..total).collect::<Vec<_>>(),
82 "every partition owned exactly once"
83 );
84 }
85
86 #[test]
87 fn single_owner_takes_all_partitions() {
88 let r = registry_with(8, &[NodeId(7)]);
89 assert_eq!(owned_partitions(5, &r, NodeId(7)), vec![0, 1, 2, 3, 4],);
90 }
91
92 #[test]
93 fn unowned_node_gets_nothing() {
94 let r = registry_with(4, &[NodeId(1), NodeId(2)]);
95 assert!(owned_partitions(8, &r, NodeId(99)).is_empty());
96 }
97
98 #[test]
99 fn fewer_partitions_than_vnodes_leaves_some_nodes_empty() {
100 let r = registry_with(4, &[NodeId(1), NodeId(2)]);
102 let expected_node1: Vec<i32> = (0..2)
103 .filter(|&p| r.owner((p % 4) as u32) == NodeId(1))
104 .collect();
105 let expected_node2: Vec<i32> = (0..2)
106 .filter(|&p| r.owner((p % 4) as u32) == NodeId(2))
107 .collect();
108
109 assert_eq!(owned_partitions(2, &r, NodeId(1)), expected_node1);
110 assert_eq!(owned_partitions(2, &r, NodeId(2)), expected_node2);
111 }
112
113 #[test]
114 fn reassignment_moves_partitions() {
115 let r = VnodeRegistry::new(4);
116 r.set_assignment(Arc::from([NodeId(1), NodeId(2), NodeId(1), NodeId(2)]));
117 assert_eq!(owned_partitions(4, &r, NodeId(1)), vec![0, 2]);
118 r.set_assignment(Arc::from([NodeId(1), NodeId(1), NodeId(1), NodeId(1)]));
120 assert_eq!(owned_partitions(4, &r, NodeId(1)), vec![0, 1, 2, 3]);
121 assert!(owned_partitions(4, &r, NodeId(2)).is_empty());
122 }
123}