Skip to main content

laminar_core/delta/partition/
assignment.rs

1//! Partition assignment algorithms for the delta.
2//!
3//! Assigns partitions to nodes using consistent hashing with virtual
4//! nodes proportional to each node's core count. Supports failure domain
5//! diversity and weighted distribution.
6
7#![allow(clippy::disallowed_types)] // cold path: partition assignment planning
8use std::collections::HashMap;
9
10use crate::delta::discovery::{NodeId, NodeInfo};
11
12/// A planned partition-to-node assignment.
13#[derive(Debug, Clone)]
14pub struct AssignmentPlan {
15    /// Mapping from partition ID to assigned node.
16    pub assignments: HashMap<u32, NodeId>,
17    /// Moves required to transition from current to planned state.
18    pub moves: Vec<PartitionMove>,
19    /// Statistics about the plan.
20    pub stats: AssignmentStats,
21}
22
23/// A single partition move in a rebalance plan.
24#[derive(Debug, Clone)]
25pub struct PartitionMove {
26    /// The partition being moved.
27    pub partition_id: u32,
28    /// The current owner (if any).
29    pub from: Option<NodeId>,
30    /// The new owner.
31    pub to: NodeId,
32}
33
34/// Statistics about an assignment plan.
35#[derive(Debug, Clone, Default)]
36pub struct AssignmentStats {
37    /// Total number of partitions.
38    pub total_partitions: u32,
39    /// Number of moves required.
40    pub total_moves: u32,
41    /// Maximum partitions assigned to a single node.
42    pub max_per_node: u32,
43    /// Minimum partitions assigned to a single node.
44    pub min_per_node: u32,
45    /// Number of distinct failure domains used.
46    pub failure_domains_used: u32,
47}
48
49/// Constraints for partition assignment.
50#[derive(Debug, Clone, Default)]
51pub struct AssignmentConstraints {
52    /// Maximum partitions per node (0 = no limit).
53    pub max_partitions_per_node: u32,
54    /// Minimum number of failure domains to spread across.
55    pub min_failure_domains: u32,
56    /// Anti-affinity groups: partitions in the same group should be
57    /// on different nodes when possible.
58    pub anti_affinity_groups: Vec<Vec<u32>>,
59    /// Per-node weight overrides (node ID → weight).
60    /// Default weight is proportional to core count.
61    pub node_weights: HashMap<u64, f64>,
62}
63
64/// Trait for partition assignment algorithms.
65pub trait PartitionAssigner: Send + Sync {
66    /// Generate an initial assignment for the given number of partitions.
67    fn initial_assignment(
68        &self,
69        num_partitions: u32,
70        nodes: &[NodeInfo],
71        constraints: &AssignmentConstraints,
72    ) -> AssignmentPlan;
73
74    /// Generate a rebalance plan given the current assignments and new node set.
75    fn rebalance(
76        &self,
77        current: &HashMap<u32, NodeId>,
78        nodes: &[NodeInfo],
79        constraints: &AssignmentConstraints,
80    ) -> AssignmentPlan;
81
82    /// Validate that an assignment plan satisfies the given constraints.
83    fn validate_plan(
84        &self,
85        plan: &AssignmentPlan,
86        nodes: &[NodeInfo],
87        constraints: &AssignmentConstraints,
88    ) -> Vec<String>;
89}
90
91/// Consistent-hash based partition assigner.
92///
93/// Uses virtual nodes proportional to each node's core count for
94/// weighted distribution. The hash ring is deterministic given the
95/// same set of nodes (fixed seed).
96#[derive(Debug)]
97pub struct ConsistentHashAssigner {
98    /// Number of virtual nodes per core.
99    pub vnodes_per_core: u32,
100    /// Fixed seed for deterministic hashing.
101    pub seed: u64,
102}
103
104impl ConsistentHashAssigner {
105    /// Create a new assigner with default settings.
106    #[must_use]
107    pub fn new() -> Self {
108        Self {
109            vnodes_per_core: 100,
110            seed: 0x517C_C1F0_CAFE_BABE,
111        }
112    }
113
114    /// Create a new assigner with custom virtual node count and seed.
115    #[must_use]
116    pub fn with_config(vnodes_per_core: u32, seed: u64) -> Self {
117        Self {
118            vnodes_per_core,
119            seed,
120        }
121    }
122
123    /// Hash a (`node_id`, `vnode_index`) pair to a position on the ring.
124    fn hash_vnode(&self, node_id: NodeId, vnode_idx: u32) -> u64 {
125        // Simple but deterministic hash using xxhash
126        let mut data = [0u8; 16];
127        data[..8].copy_from_slice(&(node_id.0 ^ self.seed).to_le_bytes());
128        data[8..12].copy_from_slice(&vnode_idx.to_le_bytes());
129        // Use a simple FNV-1a variant for the ring position
130        let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
131        for &byte in &data {
132            hash ^= u64::from(byte);
133            hash = hash.wrapping_mul(0x0100_0000_01b3);
134        }
135        hash
136    }
137
138    /// Build the hash ring from the given nodes.
139    fn build_ring(&self, nodes: &[NodeInfo]) -> Vec<(u64, NodeId)> {
140        let mut ring: Vec<(u64, NodeId)> = Vec::new();
141        for node in nodes {
142            let vnodes = node.metadata.cores * self.vnodes_per_core;
143            for i in 0..vnodes {
144                let hash = self.hash_vnode(node.id, i);
145                ring.push((hash, node.id));
146            }
147        }
148        ring.sort_by_key(|(hash, _)| *hash);
149        ring
150    }
151
152    /// Find the owner of a partition on the ring.
153    fn lookup(&self, ring: &[(u64, NodeId)], partition_id: u32) -> NodeId {
154        if ring.is_empty() {
155            return NodeId::UNASSIGNED;
156        }
157        let key = self.hash_vnode(NodeId(u64::from(partition_id)), 0);
158        // Binary search for the first vnode >= key
159        match ring.binary_search_by_key(&key, |(h, _)| *h) {
160            Ok(idx) => ring[idx].1,
161            Err(idx) => {
162                if idx >= ring.len() {
163                    ring[0].1 // Wrap around
164                } else {
165                    ring[idx].1
166                }
167            }
168        }
169    }
170
171    /// Find owner with exclusion (for failure domain diversity).
172    fn lookup_excluding(
173        &self,
174        ring: &[(u64, NodeId)],
175        partition_id: u32,
176        exclude: &[NodeId],
177    ) -> NodeId {
178        if ring.is_empty() {
179            return NodeId::UNASSIGNED;
180        }
181        let key = self.hash_vnode(NodeId(u64::from(partition_id)), 0);
182        let start = match ring.binary_search_by_key(&key, |(h, _)| *h) {
183            Ok(idx) | Err(idx) => idx % ring.len(),
184        };
185        for i in 0..ring.len() {
186            let idx = (start + i) % ring.len();
187            if !exclude.contains(&ring[idx].1) {
188                return ring[idx].1;
189            }
190        }
191        // All excluded — fall back to default
192        ring[start].1
193    }
194}
195
196impl Default for ConsistentHashAssigner {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202impl PartitionAssigner for ConsistentHashAssigner {
203    fn initial_assignment(
204        &self,
205        num_partitions: u32,
206        nodes: &[NodeInfo],
207        constraints: &AssignmentConstraints,
208    ) -> AssignmentPlan {
209        let ring = self.build_ring(nodes);
210        let mut assignments = HashMap::new();
211        let mut moves = Vec::new();
212        let mut per_node: HashMap<NodeId, u32> = HashMap::new();
213
214        for pid in 0..num_partitions {
215            let owner = if constraints.max_partitions_per_node > 0 {
216                // Respect max constraint
217                let saturated: Vec<NodeId> = per_node
218                    .iter()
219                    .filter(|(_, &count)| count >= constraints.max_partitions_per_node)
220                    .map(|(id, _)| *id)
221                    .collect();
222                self.lookup_excluding(&ring, pid, &saturated)
223            } else {
224                self.lookup(&ring, pid)
225            };
226
227            assignments.insert(pid, owner);
228            *per_node.entry(owner).or_insert(0) += 1;
229            moves.push(PartitionMove {
230                partition_id: pid,
231                from: None,
232                to: owner,
233            });
234        }
235
236        let max_per_node = per_node.values().copied().max().unwrap_or(0);
237        let min_per_node = per_node.values().copied().min().unwrap_or(0);
238
239        AssignmentPlan {
240            assignments,
241            moves,
242            stats: AssignmentStats {
243                total_partitions: num_partitions,
244                total_moves: num_partitions,
245                max_per_node,
246                min_per_node,
247                failure_domains_used: 0,
248            },
249        }
250    }
251
252    fn rebalance(
253        &self,
254        current: &HashMap<u32, NodeId>,
255        nodes: &[NodeInfo],
256        constraints: &AssignmentConstraints,
257    ) -> AssignmentPlan {
258        #[allow(clippy::cast_possible_truncation)]
259        let num_partitions = current.len() as u32;
260        let ideal = self.initial_assignment(num_partitions, nodes, constraints);
261
262        let mut moves = Vec::new();
263        for (pid, new_owner) in &ideal.assignments {
264            let old_owner = current.get(pid).copied();
265            if old_owner != Some(*new_owner) {
266                moves.push(PartitionMove {
267                    partition_id: *pid,
268                    from: old_owner,
269                    to: *new_owner,
270                });
271            }
272        }
273
274        #[allow(clippy::cast_possible_truncation)]
275        let total_moves = moves.len() as u32;
276        AssignmentPlan {
277            assignments: ideal.assignments,
278            moves,
279            stats: AssignmentStats {
280                total_moves,
281                ..ideal.stats
282            },
283        }
284    }
285
286    fn validate_plan(
287        &self,
288        plan: &AssignmentPlan,
289        nodes: &[NodeInfo],
290        constraints: &AssignmentConstraints,
291    ) -> Vec<String> {
292        let mut errors = Vec::new();
293        let valid_nodes: Vec<NodeId> = nodes.iter().map(|n| n.id).collect();
294        let mut per_node: HashMap<NodeId, u32> = HashMap::new();
295
296        for (pid, owner) in &plan.assignments {
297            if !valid_nodes.contains(owner) && !owner.is_unassigned() {
298                errors.push(format!("partition {pid} assigned to unknown node {owner}"));
299            }
300            *per_node.entry(*owner).or_insert(0) += 1;
301        }
302
303        if constraints.max_partitions_per_node > 0 {
304            for (node, count) in &per_node {
305                if *count > constraints.max_partitions_per_node {
306                    errors.push(format!(
307                        "node {node} has {count} partitions (max: {})",
308                        constraints.max_partitions_per_node
309                    ));
310                }
311            }
312        }
313
314        errors
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::delta::discovery::{NodeMetadata, NodeState};
322
323    fn make_node(id: u64, cores: u32) -> NodeInfo {
324        NodeInfo {
325            id: NodeId(id),
326            name: format!("node-{id}"),
327            rpc_address: format!("127.0.0.1:{}", 9000 + id),
328            raft_address: format!("127.0.0.1:{}", 9100 + id),
329            state: NodeState::Active,
330            metadata: NodeMetadata {
331                cores,
332                ..NodeMetadata::default()
333            },
334            last_heartbeat_ms: 0,
335        }
336    }
337
338    #[test]
339    fn test_initial_assignment_basic() {
340        let assigner = ConsistentHashAssigner::new();
341        let nodes = vec![make_node(1, 4), make_node(2, 4), make_node(3, 4)];
342        let plan = assigner.initial_assignment(12, &nodes, &AssignmentConstraints::default());
343
344        assert_eq!(plan.assignments.len(), 12);
345        assert_eq!(plan.moves.len(), 12);
346        // All partitions should be assigned to one of the 3 nodes
347        for owner in plan.assignments.values() {
348            assert!(!owner.is_unassigned());
349        }
350    }
351
352    #[test]
353    fn test_assignment_determinism() {
354        let assigner = ConsistentHashAssigner::new();
355        let nodes = vec![make_node(1, 4), make_node(2, 4)];
356        let plan1 = assigner.initial_assignment(100, &nodes, &AssignmentConstraints::default());
357        let plan2 = assigner.initial_assignment(100, &nodes, &AssignmentConstraints::default());
358
359        for pid in 0..100 {
360            assert_eq!(plan1.assignments[&pid], plan2.assignments[&pid]);
361        }
362    }
363
364    #[test]
365    fn test_weighted_distribution() {
366        let assigner = ConsistentHashAssigner::new();
367        // Node 1 has 8 cores, Node 2 has 2 cores
368        let nodes = vec![make_node(1, 8), make_node(2, 2)];
369        let plan = assigner.initial_assignment(100, &nodes, &AssignmentConstraints::default());
370
371        let node1_count = plan
372            .assignments
373            .values()
374            .filter(|n| **n == NodeId(1))
375            .count();
376        let node2_count = plan
377            .assignments
378            .values()
379            .filter(|n| **n == NodeId(2))
380            .count();
381
382        // Node 1 (8 cores) should get roughly 4x as many as Node 2 (2 cores)
383        // Allow some variance due to hashing
384        assert!(
385            node1_count > node2_count,
386            "node1={node1_count}, node2={node2_count}"
387        );
388        assert!(
389            node1_count > 50,
390            "node1 should get majority: got {node1_count}"
391        );
392    }
393
394    #[test]
395    fn test_rebalance_minimal_moves() {
396        let assigner = ConsistentHashAssigner::new();
397        let nodes = vec![make_node(1, 4), make_node(2, 4)];
398        let initial = assigner.initial_assignment(10, &nodes, &AssignmentConstraints::default());
399
400        // Same nodes, no change expected
401        let rebalanced = assigner.rebalance(
402            &initial.assignments,
403            &nodes,
404            &AssignmentConstraints::default(),
405        );
406        assert_eq!(rebalanced.stats.total_moves, 0);
407    }
408
409    #[test]
410    fn test_rebalance_new_node() {
411        let assigner = ConsistentHashAssigner::new();
412        let nodes2 = vec![make_node(1, 4), make_node(2, 4)];
413        let initial = assigner.initial_assignment(20, &nodes2, &AssignmentConstraints::default());
414
415        // Add a third node
416        let nodes3 = vec![make_node(1, 4), make_node(2, 4), make_node(3, 4)];
417        let rebalanced = assigner.rebalance(
418            &initial.assignments,
419            &nodes3,
420            &AssignmentConstraints::default(),
421        );
422
423        // Should have some moves but not all
424        assert!(rebalanced.stats.total_moves > 0);
425        assert!(rebalanced.stats.total_moves < 20);
426    }
427
428    #[test]
429    fn test_max_partitions_constraint() {
430        let assigner = ConsistentHashAssigner::new();
431        let nodes = vec![make_node(1, 4), make_node(2, 4)];
432        let constraints = AssignmentConstraints {
433            max_partitions_per_node: 6,
434            ..AssignmentConstraints::default()
435        };
436        let plan = assigner.initial_assignment(10, &nodes, &constraints);
437
438        let mut per_node: HashMap<NodeId, u32> = HashMap::new();
439        for owner in plan.assignments.values() {
440            *per_node.entry(*owner).or_insert(0) += 1;
441        }
442        for count in per_node.values() {
443            assert!(*count <= 6, "node exceeded max: {count}");
444        }
445    }
446
447    #[test]
448    fn test_validate_plan() {
449        let assigner = ConsistentHashAssigner::new();
450        let nodes = vec![make_node(1, 4), make_node(2, 4)];
451        let plan = assigner.initial_assignment(10, &nodes, &AssignmentConstraints::default());
452        let errors = assigner.validate_plan(&plan, &nodes, &AssignmentConstraints::default());
453        assert!(errors.is_empty(), "errors: {errors:?}");
454    }
455
456    #[test]
457    fn test_empty_nodes() {
458        let assigner = ConsistentHashAssigner::new();
459        let plan = assigner.initial_assignment(10, &[], &AssignmentConstraints::default());
460
461        // All should be unassigned
462        for owner in plan.assignments.values() {
463            assert!(owner.is_unassigned());
464        }
465    }
466
467    #[test]
468    fn test_single_node() {
469        let assigner = ConsistentHashAssigner::new();
470        let nodes = vec![make_node(1, 4)];
471        let plan = assigner.initial_assignment(10, &nodes, &AssignmentConstraints::default());
472
473        // All partitions should go to the single node
474        for owner in plan.assignments.values() {
475            assert_eq!(*owner, NodeId(1));
476        }
477    }
478}