1use std::collections::BTreeMap;
17use std::fmt;
18use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
19use std::sync::Arc;
20
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23
24#[derive(
27 Debug,
28 Clone,
29 Copy,
30 PartialEq,
31 Eq,
32 PartialOrd,
33 Ord,
34 Hash,
35 Serialize,
36 Deserialize,
37 rkyv::Archive,
38 rkyv::Serialize,
39 rkyv::Deserialize,
40)]
41pub struct NodeId(pub u64);
42
43impl NodeId {
44 pub const UNASSIGNED: Self = Self(0);
46
47 #[must_use]
49 pub const fn is_unassigned(&self) -> bool {
50 self.0 == 0
51 }
52}
53
54impl fmt::Display for NodeId {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 write!(f, "node-{}", self.0)
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum VnodeLifecycleState {
65 Active,
67 Restoring,
71}
72
73impl VnodeLifecycleState {
74 const ACTIVE: u8 = 0;
75 const RESTORING: u8 = 1;
76
77 const fn to_u8(self) -> u8 {
78 match self {
79 Self::Active => Self::ACTIVE,
80 Self::Restoring => Self::RESTORING,
81 }
82 }
83}
84
85pub struct VnodeRegistry {
87 vnode_count: u32,
88 assignment: RwLock<Arc<[NodeId]>>,
89 assignment_version: AtomicU64,
90 lifecycle: Arc<[AtomicU8]>,
96}
97
98impl std::fmt::Debug for VnodeRegistry {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("VnodeRegistry")
101 .field("vnode_count", &self.vnode_count)
102 .field(
103 "assignment_version",
104 &self.assignment_version.load(Ordering::Relaxed),
105 )
106 .finish_non_exhaustive()
107 }
108}
109
110impl VnodeRegistry {
111 #[must_use]
117 pub fn new(vnode_count: u32) -> Self {
118 assert!(vnode_count > 0, "vnode_count must be > 0");
119 let assignment: Arc<[NodeId]> =
120 std::iter::repeat_n(NodeId::UNASSIGNED, vnode_count as usize)
121 .collect::<Vec<_>>()
122 .into();
123 Self {
124 vnode_count,
125 assignment: RwLock::new(assignment),
126 assignment_version: AtomicU64::new(1),
127 lifecycle: new_lifecycle(vnode_count),
128 }
129 }
130
131 #[must_use]
138 pub fn single_owner(vnode_count: u32, owner: NodeId) -> Self {
139 assert!(vnode_count > 0, "vnode_count must be > 0");
140 let assignment: Arc<[NodeId]> = std::iter::repeat_n(owner, vnode_count as usize)
141 .collect::<Vec<_>>()
142 .into();
143 Self {
144 vnode_count,
145 assignment: RwLock::new(assignment),
146 assignment_version: AtomicU64::new(1),
147 lifecycle: new_lifecycle(vnode_count),
148 }
149 }
150
151 #[must_use]
153 pub fn vnode_count(&self) -> u32 {
154 self.vnode_count
155 }
156
157 #[must_use]
159 pub fn assignment_version(&self) -> u64 {
160 self.assignment_version.load(Ordering::Acquire)
161 }
162
163 #[must_use]
166 pub fn owner(&self, vnode: u32) -> NodeId {
167 if vnode >= self.vnode_count {
168 return NodeId::UNASSIGNED;
169 }
170 self.assignment.read()[vnode as usize]
171 }
172
173 #[must_use]
176 pub fn snapshot(&self) -> Arc<[NodeId]> {
177 Arc::clone(&self.assignment.read())
178 }
179
180 pub fn set_assignment(&self, new_assignment: Arc<[NodeId]>) {
185 assert_eq!(
186 new_assignment.len(),
187 self.vnode_count as usize,
188 "assignment length mismatch: got {}, expected {}",
189 new_assignment.len(),
190 self.vnode_count,
191 );
192 *self.assignment.write() = new_assignment;
193 self.assignment_version.fetch_add(1, Ordering::AcqRel);
194 }
195
196 pub fn set_assignment_and_version(&self, new_assignment: Arc<[NodeId]>, version: u64) {
204 assert_eq!(
205 new_assignment.len(),
206 self.vnode_count as usize,
207 "assignment length mismatch: got {}, expected {}",
208 new_assignment.len(),
209 self.vnode_count,
210 );
211 let mut guard = self.assignment.write();
212 let current = self.assignment_version.load(Ordering::Acquire);
213 assert!(
214 version >= current,
215 "assignment version must be monotonic: got {version}, current {current}",
216 );
217 *guard = new_assignment;
218 self.assignment_version.store(version, Ordering::Release);
219 }
220
221 #[must_use]
223 pub fn vnode_for_key(&self, key: &[u8]) -> u32 {
224 #[allow(clippy::cast_possible_truncation)]
225 let h = (key_hash(key) % u64::from(self.vnode_count)) as u32;
226 h
227 }
228
229 pub fn mark_restoring(&self, vnodes: &[u32]) {
235 self.set_lifecycle(vnodes, VnodeLifecycleState::Restoring);
236 }
237
238 pub fn mark_active(&self, vnodes: &[u32]) {
244 self.set_lifecycle(vnodes, VnodeLifecycleState::Active);
245 }
246
247 fn set_lifecycle(&self, vnodes: &[u32], state: VnodeLifecycleState) {
248 let byte = state.to_u8();
249 for &v in vnodes {
250 if let Some(slot) = self.lifecycle.get(v as usize) {
251 slot.store(byte, Ordering::Release);
252 }
253 }
254 }
255
256 #[must_use]
259 pub fn is_restoring(&self, vnode: u32) -> bool {
260 self.lifecycle
261 .get(vnode as usize)
262 .is_some_and(|s| s.load(Ordering::Acquire) == VnodeLifecycleState::RESTORING)
263 }
264
265 #[must_use]
268 pub fn any_restoring(&self) -> bool {
269 self.lifecycle
270 .iter()
271 .any(|s| s.load(Ordering::Acquire) == VnodeLifecycleState::RESTORING)
272 }
273
274 #[must_use]
276 #[allow(clippy::cast_possible_truncation)] pub fn restoring_vnodes(&self) -> Vec<u32> {
278 self.lifecycle
279 .iter()
280 .enumerate()
281 .filter_map(|(i, s)| {
282 (s.load(Ordering::Acquire) == VnodeLifecycleState::RESTORING).then_some(i as u32)
283 })
284 .collect()
285 }
286}
287
288fn new_lifecycle(vnode_count: u32) -> Arc<[AtomicU8]> {
290 std::iter::repeat_with(|| AtomicU8::new(VnodeLifecycleState::ACTIVE))
291 .take(vnode_count as usize)
292 .collect::<Vec<_>>()
293 .into()
294}
295
296#[must_use]
302pub fn key_hash(key: &[u8]) -> u64 {
303 xxhash_rust::xxh3::xxh3_64(key)
304}
305
306#[must_use]
314pub fn rendezvous_assignment(vnode_count: u32, peers: &[NodeId]) -> Arc<[NodeId]> {
315 assert!(
316 !peers.is_empty(),
317 "rendezvous_assignment needs at least one peer"
318 );
319 let mut sorted_peers = peers.to_vec();
320 sorted_peers.sort_by_key(|n| n.0);
321
322 let mut assignment = Vec::with_capacity(vnode_count as usize);
323 for v in 0..vnode_count {
324 let mut max_weight = 0;
325 let mut selected_node = sorted_peers[0];
326
327 for &node in &sorted_peers {
328 let mut buf = [0u8; 16];
330 buf[0..8].copy_from_slice(&u64::from(v).to_le_bytes());
331 buf[8..16].copy_from_slice(&node.0.to_le_bytes());
332 let weight = xxhash_rust::xxh3::xxh3_64(&buf);
333
334 if weight > max_weight || (weight == max_weight && node.0 > selected_node.0) {
336 max_weight = weight;
337 selected_node = node;
338 }
339 }
340 assignment.push(selected_node);
341 }
342 assignment.into()
343}
344
345#[derive(Debug, Clone, Default, PartialEq, Eq)]
349pub struct Locality {
350 tiers: Vec<String>,
351}
352
353impl Locality {
354 #[must_use]
356 pub fn new(tiers: Vec<String>) -> Self {
357 Self { tiers }
358 }
359
360 #[must_use]
363 pub fn parse(s: &str) -> Self {
364 let tiers = s
365 .split(';')
366 .map(str::trim)
367 .filter(|seg| !seg.is_empty())
368 .map(|seg| {
369 seg.split_once('=')
370 .map_or(seg, |(_, v)| v.trim())
371 .to_string()
372 })
373 .collect();
374 Self { tiers }
375 }
376
377 #[must_use]
380 pub fn domain_at(&self, tier: usize) -> String {
381 if self.tiers.is_empty() {
382 return String::new();
383 }
384 let end = tier.min(self.tiers.len() - 1);
385 self.tiers[..=end].join(";")
386 }
387}
388
389fn resolve_domains(nodes: &[(NodeId, Locality)], isolation_tier: usize) -> Vec<(NodeId, String)> {
391 nodes
392 .iter()
393 .map(|(id, loc)| (*id, loc.domain_at(isolation_tier)))
394 .collect()
395}
396
397#[must_use]
401pub fn owners_per_domain(
402 owners: &[NodeId],
403 nodes: &[(NodeId, Locality)],
404 isolation_tier: usize,
405) -> BTreeMap<String, u32> {
406 let dom: BTreeMap<NodeId, String> =
407 resolve_domains(nodes, isolation_tier).into_iter().collect();
408 let mut counts = BTreeMap::new();
409 for &o in owners {
410 *counts
411 .entry(dom.get(&o).cloned().unwrap_or_default())
412 .or_default() += 1;
413 }
414 counts
415}
416
417#[must_use]
423pub fn owned_vnodes(registry: &VnodeRegistry, owner: NodeId) -> Vec<u32> {
424 (0..registry.vnode_count())
425 .filter(|&v| registry.owner(v) == owner)
426 .collect()
427}
428
429#[must_use]
432pub fn peer_owners(registry: &VnodeRegistry, self_id: NodeId) -> Vec<NodeId> {
433 let mut peers: Vec<NodeId> = (0..registry.vnode_count())
434 .map(|v| registry.owner(v))
435 .filter(|o| !o.is_unassigned() && *o != self_id)
436 .collect();
437 peers.sort_unstable();
438 peers.dedup();
439 peers
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn new_registry_is_unassigned() {
448 let r = VnodeRegistry::new(8);
449 assert_eq!(r.vnode_count(), 8);
450 for v in 0..8 {
451 assert!(r.owner(v).is_unassigned());
452 }
453 }
454
455 #[test]
456 fn single_owner_populates_all_slots() {
457 let r = VnodeRegistry::single_owner(4, NodeId(42));
458 for v in 0..4 {
459 assert_eq!(r.owner(v), NodeId(42));
460 }
461 }
462
463 #[test]
464 fn set_assignment_bumps_version() {
465 let r = VnodeRegistry::new(4);
466 let v0 = r.assignment_version();
467 let new_assign: Arc<[NodeId]> = vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into();
468 r.set_assignment(new_assign);
469 assert!(r.assignment_version() > v0);
470 assert_eq!(r.owner(0), NodeId(1));
471 assert_eq!(r.owner(1), NodeId(2));
472 }
473
474 #[test]
475 fn vnode_for_key_in_range() {
476 let r = VnodeRegistry::new(16);
477 for i in 0..100 {
478 let v = r.vnode_for_key(format!("k-{i}").as_bytes());
479 assert!(v < 16);
480 }
481 }
482
483 #[test]
484 #[should_panic(expected = "assignment length mismatch")]
485 fn set_assignment_rejects_wrong_length() {
486 let r = VnodeRegistry::new(4);
487 let bad: Arc<[NodeId]> = vec![NodeId(1)].into();
488 r.set_assignment(bad);
489 }
490
491 #[test]
492 fn owner_out_of_range_returns_unassigned() {
493 let r = VnodeRegistry::single_owner(4, NodeId(1));
494 assert!(r.owner(10).is_unassigned());
495 }
496
497 #[test]
498 fn vnode_for_key_is_deterministic() {
499 let r = VnodeRegistry::new(16);
500 assert_eq!(r.vnode_for_key(b"key-x"), r.vnode_for_key(b"key-x"));
501 }
502
503 #[test]
504 fn owned_vnodes_filters_by_owner() {
505 let r = VnodeRegistry::new(4);
506 r.set_assignment(vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into());
507 assert_eq!(owned_vnodes(&r, NodeId(1)), vec![0, 2]);
508 assert_eq!(owned_vnodes(&r, NodeId(2)), vec![1, 3]);
509 assert!(owned_vnodes(&r, NodeId(99)).is_empty());
510 }
511
512 #[test]
513 fn owned_vnodes_single_owner_returns_all() {
514 let r = VnodeRegistry::single_owner(8, NodeId(42));
515 assert_eq!(owned_vnodes(&r, NodeId(42)), (0..8).collect::<Vec<_>>());
516 }
517
518 #[test]
519 fn rendezvous_is_deterministic() {
520 let peers = vec![NodeId(7), NodeId(3), NodeId(5)];
521 let assignment = rendezvous_assignment(8, &peers);
522 let reversed = vec![NodeId(3), NodeId(5), NodeId(7)];
524 assert_eq!(rendezvous_assignment(8, &reversed), assignment);
525 }
526
527 #[test]
528 fn rendezvous_single_peer_owns_everything() {
529 let assignment = rendezvous_assignment(4, &[NodeId(99)]);
530 assert!(assignment.iter().all(|&n| n == NodeId(99)));
531 }
532
533 #[test]
534 #[should_panic(expected = "needs at least one peer")]
535 fn rendezvous_rejects_empty_peer_list() {
536 let _ = rendezvous_assignment(4, &[]);
537 }
538
539 #[test]
540 fn rendezvous_minimizes_state_movement() {
541 let peers3 = vec![NodeId(1), NodeId(2), NodeId(3)];
542 let peers4 = vec![NodeId(1), NodeId(2), NodeId(3), NodeId(4)];
543
544 let a3 = rendezvous_assignment(256, &peers3);
545 let a4 = rendezvous_assignment(256, &peers4);
546
547 let mut moved = 0;
548 let mut moved_between_existing = 0;
549
550 for v in 0..256usize {
551 let o3 = a3[v];
552 let o4 = a4[v];
553 if o3 != o4 {
554 moved += 1;
555 if o4 != NodeId(4) {
556 moved_between_existing += 1;
557 }
558 }
559 }
560
561 assert_eq!(
562 moved_between_existing, 0,
563 "No vnode should move between existing peers on a node join"
564 );
565 assert!(
566 moved > 40 && moved < 90,
567 "Expected roughly 25% of vnodes to move to the new peer, got {moved}"
568 );
569
570 for v in 0..256usize {
571 if a3[v] != a4[v] {
572 assert_eq!(a4[v], NodeId(4));
573 }
574 }
575 }
576
577 #[test]
578 fn vnodes_start_active() {
579 let r = VnodeRegistry::new(4);
580 assert!(!r.any_restoring());
581 for v in 0..4 {
582 assert!(!r.is_restoring(v));
583 }
584 assert!(r.restoring_vnodes().is_empty());
585 }
586
587 #[test]
588 fn mark_restoring_and_active_round_trip() {
589 let r = VnodeRegistry::new(4);
590 r.mark_restoring(&[1, 3]);
591 assert!(r.any_restoring());
592 assert!(r.is_restoring(1));
593 assert!(r.is_restoring(3));
594 assert!(!r.is_restoring(0));
595 assert_eq!(r.restoring_vnodes(), vec![1, 3]);
596
597 r.mark_active(&[1]);
598 assert!(!r.is_restoring(1));
599 assert_eq!(r.restoring_vnodes(), vec![3]);
600
601 r.mark_active(&[3]);
602 assert!(!r.any_restoring());
603 }
604
605 #[test]
606 fn lifecycle_ignores_out_of_range() {
607 let r = VnodeRegistry::new(2);
608 r.mark_restoring(&[5, 99]); assert!(!r.is_restoring(5));
610 assert!(!r.any_restoring());
611 }
612
613 #[test]
614 fn lifecycle_independent_of_assignment() {
615 let r = VnodeRegistry::new(4);
618 r.mark_restoring(&[2]);
619 r.set_assignment(vec![NodeId(1), NodeId(1), NodeId(1), NodeId(1)].into());
620 assert!(r.is_restoring(2));
621 }
622
623 fn node(id: u64, region: &str, zone: &str, rack: &str) -> (NodeId, Locality) {
627 (
628 NodeId(id),
629 Locality::new(vec![region.into(), zone.into(), rack.into()]),
630 )
631 }
632
633 const TIER_ZONE: usize = 1;
634
635 #[test]
636 fn locality_parse_and_domain_at() {
637 let l = Locality::parse("region=us-east-1;zone=us-east-1a;rack=r17");
638 assert_eq!(l.domain_at(0), "us-east-1");
639 assert_eq!(l.domain_at(1), "us-east-1;us-east-1a");
640 assert_eq!(l.domain_at(2), "us-east-1;us-east-1a;r17");
641 assert_eq!(l.domain_at(99), "us-east-1;us-east-1a;r17"); assert_eq!(Locality::parse("rack17").domain_at(0), "rack17"); assert_eq!(Locality::parse("").domain_at(0), ""); }
645
646 #[test]
647 fn owners_per_domain_counts_by_zone() {
648 let nodes = vec![node(1, "r", "z1", "a"), node(2, "r", "z2", "a")];
649 let owners = [NodeId(1), NodeId(1), NodeId(2), NodeId::UNASSIGNED];
651 let counts = owners_per_domain(&owners, &nodes, TIER_ZONE);
652 assert_eq!(counts["r;z1"], 2);
653 assert_eq!(counts["r;z2"], 1);
654 assert_eq!(counts[""], 1);
655 }
656}