1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
9
10use crate::delta::discovery::{NodeId, NodeInfo};
11
12#[derive(Debug, Clone)]
14pub struct AssignmentPlan {
15 pub assignments: HashMap<u32, NodeId>,
17 pub moves: Vec<PartitionMove>,
19 pub stats: AssignmentStats,
21}
22
23#[derive(Debug, Clone)]
25pub struct PartitionMove {
26 pub partition_id: u32,
28 pub from: Option<NodeId>,
30 pub to: NodeId,
32}
33
34#[derive(Debug, Clone, Default)]
36pub struct AssignmentStats {
37 pub total_partitions: u32,
39 pub total_moves: u32,
41 pub max_per_node: u32,
43 pub min_per_node: u32,
45 pub failure_domains_used: u32,
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct AssignmentConstraints {
52 pub max_partitions_per_node: u32,
54 pub min_failure_domains: u32,
56 pub anti_affinity_groups: Vec<Vec<u32>>,
59 pub node_weights: HashMap<u64, f64>,
62}
63
64pub trait PartitionAssigner: Send + Sync {
66 fn initial_assignment(
68 &self,
69 num_partitions: u32,
70 nodes: &[NodeInfo],
71 constraints: &AssignmentConstraints,
72 ) -> AssignmentPlan;
73
74 fn rebalance(
76 &self,
77 current: &HashMap<u32, NodeId>,
78 nodes: &[NodeInfo],
79 constraints: &AssignmentConstraints,
80 ) -> AssignmentPlan;
81
82 fn validate_plan(
84 &self,
85 plan: &AssignmentPlan,
86 nodes: &[NodeInfo],
87 constraints: &AssignmentConstraints,
88 ) -> Vec<String>;
89}
90
91#[derive(Debug)]
97pub struct ConsistentHashAssigner {
98 pub vnodes_per_core: u32,
100 pub seed: u64,
102}
103
104impl ConsistentHashAssigner {
105 #[must_use]
107 pub fn new() -> Self {
108 Self {
109 vnodes_per_core: 100,
110 seed: 0x517C_C1F0_CAFE_BABE,
111 }
112 }
113
114 #[must_use]
116 pub fn with_config(vnodes_per_core: u32, seed: u64) -> Self {
117 Self {
118 vnodes_per_core,
119 seed,
120 }
121 }
122
123 fn hash_vnode(&self, node_id: NodeId, vnode_idx: u32) -> u64 {
125 let mut data = [0u8; 16];
127 data[..8].copy_from_slice(&(node_id.0 ^ self.seed).to_le_bytes());
128 data[8..12].copy_from_slice(&vnode_idx.to_le_bytes());
129 let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
131 for &byte in &data {
132 hash ^= u64::from(byte);
133 hash = hash.wrapping_mul(0x0100_0000_01b3);
134 }
135 hash
136 }
137
138 fn build_ring(&self, nodes: &[NodeInfo]) -> Vec<(u64, NodeId)> {
140 let mut ring: Vec<(u64, NodeId)> = Vec::new();
141 for node in nodes {
142 let vnodes = node.metadata.cores * self.vnodes_per_core;
143 for i in 0..vnodes {
144 let hash = self.hash_vnode(node.id, i);
145 ring.push((hash, node.id));
146 }
147 }
148 ring.sort_by_key(|(hash, _)| *hash);
149 ring
150 }
151
152 fn lookup(&self, ring: &[(u64, NodeId)], partition_id: u32) -> NodeId {
154 if ring.is_empty() {
155 return NodeId::UNASSIGNED;
156 }
157 let key = self.hash_vnode(NodeId(u64::from(partition_id)), 0);
158 match ring.binary_search_by_key(&key, |(h, _)| *h) {
160 Ok(idx) => ring[idx].1,
161 Err(idx) => {
162 if idx >= ring.len() {
163 ring[0].1 } else {
165 ring[idx].1
166 }
167 }
168 }
169 }
170
171 fn lookup_excluding(
173 &self,
174 ring: &[(u64, NodeId)],
175 partition_id: u32,
176 exclude: &[NodeId],
177 ) -> NodeId {
178 if ring.is_empty() {
179 return NodeId::UNASSIGNED;
180 }
181 let key = self.hash_vnode(NodeId(u64::from(partition_id)), 0);
182 let start = match ring.binary_search_by_key(&key, |(h, _)| *h) {
183 Ok(idx) | Err(idx) => idx % ring.len(),
184 };
185 for i in 0..ring.len() {
186 let idx = (start + i) % ring.len();
187 if !exclude.contains(&ring[idx].1) {
188 return ring[idx].1;
189 }
190 }
191 ring[start].1
193 }
194}
195
196impl Default for ConsistentHashAssigner {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202impl PartitionAssigner for ConsistentHashAssigner {
203 fn initial_assignment(
204 &self,
205 num_partitions: u32,
206 nodes: &[NodeInfo],
207 constraints: &AssignmentConstraints,
208 ) -> AssignmentPlan {
209 let ring = self.build_ring(nodes);
210 let mut assignments = HashMap::new();
211 let mut moves = Vec::new();
212 let mut per_node: HashMap<NodeId, u32> = HashMap::new();
213
214 for pid in 0..num_partitions {
215 let owner = if constraints.max_partitions_per_node > 0 {
216 let saturated: Vec<NodeId> = per_node
218 .iter()
219 .filter(|(_, &count)| count >= constraints.max_partitions_per_node)
220 .map(|(id, _)| *id)
221 .collect();
222 self.lookup_excluding(&ring, pid, &saturated)
223 } else {
224 self.lookup(&ring, pid)
225 };
226
227 assignments.insert(pid, owner);
228 *per_node.entry(owner).or_insert(0) += 1;
229 moves.push(PartitionMove {
230 partition_id: pid,
231 from: None,
232 to: owner,
233 });
234 }
235
236 let max_per_node = per_node.values().copied().max().unwrap_or(0);
237 let min_per_node = per_node.values().copied().min().unwrap_or(0);
238
239 AssignmentPlan {
240 assignments,
241 moves,
242 stats: AssignmentStats {
243 total_partitions: num_partitions,
244 total_moves: num_partitions,
245 max_per_node,
246 min_per_node,
247 failure_domains_used: 0,
248 },
249 }
250 }
251
252 fn rebalance(
253 &self,
254 current: &HashMap<u32, NodeId>,
255 nodes: &[NodeInfo],
256 constraints: &AssignmentConstraints,
257 ) -> AssignmentPlan {
258 #[allow(clippy::cast_possible_truncation)]
259 let num_partitions = current.len() as u32;
260 let ideal = self.initial_assignment(num_partitions, nodes, constraints);
261
262 let mut moves = Vec::new();
263 for (pid, new_owner) in &ideal.assignments {
264 let old_owner = current.get(pid).copied();
265 if old_owner != Some(*new_owner) {
266 moves.push(PartitionMove {
267 partition_id: *pid,
268 from: old_owner,
269 to: *new_owner,
270 });
271 }
272 }
273
274 #[allow(clippy::cast_possible_truncation)]
275 let total_moves = moves.len() as u32;
276 AssignmentPlan {
277 assignments: ideal.assignments,
278 moves,
279 stats: AssignmentStats {
280 total_moves,
281 ..ideal.stats
282 },
283 }
284 }
285
286 fn validate_plan(
287 &self,
288 plan: &AssignmentPlan,
289 nodes: &[NodeInfo],
290 constraints: &AssignmentConstraints,
291 ) -> Vec<String> {
292 let mut errors = Vec::new();
293 let valid_nodes: Vec<NodeId> = nodes.iter().map(|n| n.id).collect();
294 let mut per_node: HashMap<NodeId, u32> = HashMap::new();
295
296 for (pid, owner) in &plan.assignments {
297 if !valid_nodes.contains(owner) && !owner.is_unassigned() {
298 errors.push(format!("partition {pid} assigned to unknown node {owner}"));
299 }
300 *per_node.entry(*owner).or_insert(0) += 1;
301 }
302
303 if constraints.max_partitions_per_node > 0 {
304 for (node, count) in &per_node {
305 if *count > constraints.max_partitions_per_node {
306 errors.push(format!(
307 "node {node} has {count} partitions (max: {})",
308 constraints.max_partitions_per_node
309 ));
310 }
311 }
312 }
313
314 errors
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::delta::discovery::{NodeMetadata, NodeState};
322
323 fn make_node(id: u64, cores: u32) -> NodeInfo {
324 NodeInfo {
325 id: NodeId(id),
326 name: format!("node-{id}"),
327 rpc_address: format!("127.0.0.1:{}", 9000 + id),
328 raft_address: format!("127.0.0.1:{}", 9100 + id),
329 state: NodeState::Active,
330 metadata: NodeMetadata {
331 cores,
332 ..NodeMetadata::default()
333 },
334 last_heartbeat_ms: 0,
335 }
336 }
337
338 #[test]
339 fn test_initial_assignment_basic() {
340 let assigner = ConsistentHashAssigner::new();
341 let nodes = vec![make_node(1, 4), make_node(2, 4), make_node(3, 4)];
342 let plan = assigner.initial_assignment(12, &nodes, &AssignmentConstraints::default());
343
344 assert_eq!(plan.assignments.len(), 12);
345 assert_eq!(plan.moves.len(), 12);
346 for owner in plan.assignments.values() {
348 assert!(!owner.is_unassigned());
349 }
350 }
351
352 #[test]
353 fn test_assignment_determinism() {
354 let assigner = ConsistentHashAssigner::new();
355 let nodes = vec![make_node(1, 4), make_node(2, 4)];
356 let plan1 = assigner.initial_assignment(100, &nodes, &AssignmentConstraints::default());
357 let plan2 = assigner.initial_assignment(100, &nodes, &AssignmentConstraints::default());
358
359 for pid in 0..100 {
360 assert_eq!(plan1.assignments[&pid], plan2.assignments[&pid]);
361 }
362 }
363
364 #[test]
365 fn test_weighted_distribution() {
366 let assigner = ConsistentHashAssigner::new();
367 let nodes = vec![make_node(1, 8), make_node(2, 2)];
369 let plan = assigner.initial_assignment(100, &nodes, &AssignmentConstraints::default());
370
371 let node1_count = plan
372 .assignments
373 .values()
374 .filter(|n| **n == NodeId(1))
375 .count();
376 let node2_count = plan
377 .assignments
378 .values()
379 .filter(|n| **n == NodeId(2))
380 .count();
381
382 assert!(
385 node1_count > node2_count,
386 "node1={node1_count}, node2={node2_count}"
387 );
388 assert!(
389 node1_count > 50,
390 "node1 should get majority: got {node1_count}"
391 );
392 }
393
394 #[test]
395 fn test_rebalance_minimal_moves() {
396 let assigner = ConsistentHashAssigner::new();
397 let nodes = vec![make_node(1, 4), make_node(2, 4)];
398 let initial = assigner.initial_assignment(10, &nodes, &AssignmentConstraints::default());
399
400 let rebalanced = assigner.rebalance(
402 &initial.assignments,
403 &nodes,
404 &AssignmentConstraints::default(),
405 );
406 assert_eq!(rebalanced.stats.total_moves, 0);
407 }
408
409 #[test]
410 fn test_rebalance_new_node() {
411 let assigner = ConsistentHashAssigner::new();
412 let nodes2 = vec![make_node(1, 4), make_node(2, 4)];
413 let initial = assigner.initial_assignment(20, &nodes2, &AssignmentConstraints::default());
414
415 let nodes3 = vec![make_node(1, 4), make_node(2, 4), make_node(3, 4)];
417 let rebalanced = assigner.rebalance(
418 &initial.assignments,
419 &nodes3,
420 &AssignmentConstraints::default(),
421 );
422
423 assert!(rebalanced.stats.total_moves > 0);
425 assert!(rebalanced.stats.total_moves < 20);
426 }
427
428 #[test]
429 fn test_max_partitions_constraint() {
430 let assigner = ConsistentHashAssigner::new();
431 let nodes = vec![make_node(1, 4), make_node(2, 4)];
432 let constraints = AssignmentConstraints {
433 max_partitions_per_node: 6,
434 ..AssignmentConstraints::default()
435 };
436 let plan = assigner.initial_assignment(10, &nodes, &constraints);
437
438 let mut per_node: HashMap<NodeId, u32> = HashMap::new();
439 for owner in plan.assignments.values() {
440 *per_node.entry(*owner).or_insert(0) += 1;
441 }
442 for count in per_node.values() {
443 assert!(*count <= 6, "node exceeded max: {count}");
444 }
445 }
446
447 #[test]
448 fn test_validate_plan() {
449 let assigner = ConsistentHashAssigner::new();
450 let nodes = vec![make_node(1, 4), make_node(2, 4)];
451 let plan = assigner.initial_assignment(10, &nodes, &AssignmentConstraints::default());
452 let errors = assigner.validate_plan(&plan, &nodes, &AssignmentConstraints::default());
453 assert!(errors.is_empty(), "errors: {errors:?}");
454 }
455
456 #[test]
457 fn test_empty_nodes() {
458 let assigner = ConsistentHashAssigner::new();
459 let plan = assigner.initial_assignment(10, &[], &AssignmentConstraints::default());
460
461 for owner in plan.assignments.values() {
463 assert!(owner.is_unassigned());
464 }
465 }
466
467 #[test]
468 fn test_single_node() {
469 let assigner = ConsistentHashAssigner::new();
470 let nodes = vec![make_node(1, 4)];
471 let plan = assigner.initial_assignment(10, &nodes, &AssignmentConstraints::default());
472
473 for owner in plan.assignments.values() {
475 assert_eq!(*owner, NodeId(1));
476 }
477 }
478}