1use std::fmt;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22
23#[derive(
26 Debug,
27 Clone,
28 Copy,
29 PartialEq,
30 Eq,
31 PartialOrd,
32 Ord,
33 Hash,
34 Serialize,
35 Deserialize,
36 rkyv::Archive,
37 rkyv::Serialize,
38 rkyv::Deserialize,
39)]
40pub struct NodeId(pub u64);
41
42impl NodeId {
43 pub const UNASSIGNED: Self = Self(0);
45
46 #[must_use]
48 pub const fn is_unassigned(&self) -> bool {
49 self.0 == 0
50 }
51}
52
53impl fmt::Display for NodeId {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 write!(f, "node-{}", self.0)
56 }
57}
58
59pub struct VnodeRegistry {
61 vnode_count: u32,
62 assignment: RwLock<Arc<[NodeId]>>,
63 assignment_version: AtomicU64,
64}
65
66impl std::fmt::Debug for VnodeRegistry {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("VnodeRegistry")
69 .field("vnode_count", &self.vnode_count)
70 .field(
71 "assignment_version",
72 &self.assignment_version.load(Ordering::Relaxed),
73 )
74 .finish_non_exhaustive()
75 }
76}
77
78impl VnodeRegistry {
79 #[must_use]
85 pub fn new(vnode_count: u32) -> Self {
86 assert!(vnode_count > 0, "vnode_count must be > 0");
87 let assignment: Arc<[NodeId]> =
88 std::iter::repeat_n(NodeId::UNASSIGNED, vnode_count as usize)
89 .collect::<Vec<_>>()
90 .into();
91 Self {
92 vnode_count,
93 assignment: RwLock::new(assignment),
94 assignment_version: AtomicU64::new(1),
95 }
96 }
97
98 #[must_use]
105 pub fn single_owner(vnode_count: u32, owner: NodeId) -> Self {
106 assert!(vnode_count > 0, "vnode_count must be > 0");
107 let assignment: Arc<[NodeId]> = std::iter::repeat_n(owner, vnode_count as usize)
108 .collect::<Vec<_>>()
109 .into();
110 Self {
111 vnode_count,
112 assignment: RwLock::new(assignment),
113 assignment_version: AtomicU64::new(1),
114 }
115 }
116
117 #[must_use]
119 pub fn vnode_count(&self) -> u32 {
120 self.vnode_count
121 }
122
123 #[must_use]
125 pub fn assignment_version(&self) -> u64 {
126 self.assignment_version.load(Ordering::Acquire)
127 }
128
129 #[must_use]
132 pub fn owner(&self, vnode: u32) -> NodeId {
133 if vnode >= self.vnode_count {
134 return NodeId::UNASSIGNED;
135 }
136 self.assignment.read()[vnode as usize]
137 }
138
139 #[must_use]
142 pub fn snapshot(&self) -> Arc<[NodeId]> {
143 Arc::clone(&self.assignment.read())
144 }
145
146 pub fn set_assignment(&self, new_assignment: Arc<[NodeId]>) {
151 assert_eq!(
152 new_assignment.len(),
153 self.vnode_count as usize,
154 "assignment length mismatch: got {}, expected {}",
155 new_assignment.len(),
156 self.vnode_count,
157 );
158 *self.assignment.write() = new_assignment;
159 self.assignment_version.fetch_add(1, Ordering::AcqRel);
160 }
161
162 pub fn set_assignment_and_version(&self, new_assignment: Arc<[NodeId]>, version: u64) {
170 assert_eq!(
171 new_assignment.len(),
172 self.vnode_count as usize,
173 "assignment length mismatch: got {}, expected {}",
174 new_assignment.len(),
175 self.vnode_count,
176 );
177 let mut guard = self.assignment.write();
178 let current = self.assignment_version.load(Ordering::Acquire);
179 assert!(
180 version >= current,
181 "assignment version must be monotonic: got {version}, current {current}",
182 );
183 *guard = new_assignment;
184 self.assignment_version.store(version, Ordering::Release);
185 }
186
187 #[must_use]
189 pub fn vnode_for_key(&self, key: &[u8]) -> u32 {
190 #[allow(clippy::cast_possible_truncation)]
191 let h = (key_hash(key) % u64::from(self.vnode_count)) as u32;
192 h
193 }
194}
195
196#[must_use]
202pub fn key_hash(key: &[u8]) -> u64 {
203 xxhash_rust::xxh3::xxh3_64(key)
204}
205
206#[must_use]
220pub fn round_robin_assignment(vnode_count: u32, peers: &[NodeId]) -> Arc<[NodeId]> {
221 assert!(
222 !peers.is_empty(),
223 "round_robin_assignment needs at least one peer"
224 );
225 let mut sorted: Vec<NodeId> = peers.to_vec();
226 sorted.sort_by_key(|n| n.0);
227 (0..vnode_count)
228 .map(|v| sorted[(v as usize) % sorted.len()])
229 .collect::<Vec<_>>()
230 .into()
231}
232
233#[must_use]
239pub fn owned_vnodes(registry: &VnodeRegistry, owner: NodeId) -> Vec<u32> {
240 (0..registry.vnode_count())
241 .filter(|&v| registry.owner(v) == owner)
242 .collect()
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn new_registry_is_unassigned() {
251 let r = VnodeRegistry::new(8);
252 assert_eq!(r.vnode_count(), 8);
253 for v in 0..8 {
254 assert!(r.owner(v).is_unassigned());
255 }
256 }
257
258 #[test]
259 fn single_owner_populates_all_slots() {
260 let r = VnodeRegistry::single_owner(4, NodeId(42));
261 for v in 0..4 {
262 assert_eq!(r.owner(v), NodeId(42));
263 }
264 }
265
266 #[test]
267 fn set_assignment_bumps_version() {
268 let r = VnodeRegistry::new(4);
269 let v0 = r.assignment_version();
270 let new_assign: Arc<[NodeId]> = vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into();
271 r.set_assignment(new_assign);
272 assert!(r.assignment_version() > v0);
273 assert_eq!(r.owner(0), NodeId(1));
274 assert_eq!(r.owner(1), NodeId(2));
275 }
276
277 #[test]
278 fn vnode_for_key_in_range() {
279 let r = VnodeRegistry::new(16);
280 for i in 0..100 {
281 let v = r.vnode_for_key(format!("k-{i}").as_bytes());
282 assert!(v < 16);
283 }
284 }
285
286 #[test]
287 #[should_panic(expected = "assignment length mismatch")]
288 fn set_assignment_rejects_wrong_length() {
289 let r = VnodeRegistry::new(4);
290 let bad: Arc<[NodeId]> = vec![NodeId(1)].into();
291 r.set_assignment(bad);
292 }
293
294 #[test]
295 fn owner_out_of_range_returns_unassigned() {
296 let r = VnodeRegistry::single_owner(4, NodeId(1));
297 assert!(r.owner(10).is_unassigned());
298 }
299
300 #[test]
301 fn vnode_for_key_is_deterministic() {
302 let r = VnodeRegistry::new(16);
303 assert_eq!(r.vnode_for_key(b"key-x"), r.vnode_for_key(b"key-x"));
304 }
305
306 #[test]
307 fn owned_vnodes_filters_by_owner() {
308 let r = VnodeRegistry::new(4);
309 r.set_assignment(vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into());
310 assert_eq!(owned_vnodes(&r, NodeId(1)), vec![0, 2]);
311 assert_eq!(owned_vnodes(&r, NodeId(2)), vec![1, 3]);
312 assert!(owned_vnodes(&r, NodeId(99)).is_empty());
313 }
314
315 #[test]
316 fn owned_vnodes_single_owner_returns_all() {
317 let r = VnodeRegistry::single_owner(8, NodeId(42));
318 assert_eq!(owned_vnodes(&r, NodeId(42)), (0..8).collect::<Vec<_>>());
319 }
320
321 #[test]
322 fn round_robin_is_deterministic_and_balanced() {
323 let peers = vec![NodeId(7), NodeId(3), NodeId(5)];
325 let assignment = round_robin_assignment(8, &peers);
326 assert_eq!(
328 &*assignment,
329 &[
330 NodeId(3),
331 NodeId(5),
332 NodeId(7),
333 NodeId(3),
334 NodeId(5),
335 NodeId(7),
336 NodeId(3),
337 NodeId(5),
338 ][..]
339 );
340 let reversed = vec![NodeId(3), NodeId(5), NodeId(7)];
342 assert_eq!(round_robin_assignment(8, &reversed), assignment);
343 }
344
345 #[test]
346 fn round_robin_single_peer_owns_everything() {
347 let assignment = round_robin_assignment(4, &[NodeId(99)]);
348 assert!(assignment.iter().all(|&n| n == NodeId(99)));
349 }
350
351 #[test]
352 #[should_panic(expected = "at least one peer")]
353 fn round_robin_rejects_empty_peer_list() {
354 let _ = round_robin_assignment(4, &[]);
355 }
356}