Skip to main content

laminar_connectors/
partition_assignment.rs

1//! Engine-controlled Kafka partition → vnode assignment.
2//!
3//! In cluster mode LaminarDB owns the partition-to-node mapping rather than
4//! delegating it to Kafka's consumer-group coordinator: each Kafka partition
5//! is bound to a vnode by `partition % vnode_count`, and a node consumes a
6//! partition iff it owns that vnode in the current
7//! [`VnodeRegistry`](laminar_core::state::VnodeRegistry) assignment. This keeps
8//! Kafka ingestion co-located with the vnode state it feeds, so a vnode and the
9//! partitions whose data lands in it always move together on rebalance.
10//!
11//! This module holds only the pure mapping logic (no rdkafka), so it lives
12//! outside the `kafka` feature gate and compiles + is unit-tested without a
13//! native Kafka/OpenSSL toolchain or a live broker. The Kafka source consumes
14//! it (under the `kafka` feature) and applies the result via manual `assign()`.
15
16use laminar_core::state::{NodeId, VnodeRegistry};
17
18/// The Kafka partitions (of a `total_partitions`-partition topic) this node
19/// owns under the registry's current assignment.
20///
21/// Partition `p` maps to vnode `p % vnode_count`; it is owned iff that vnode's
22/// owner is `self_id`. Returned ascending. Negative partition ids (never
23/// produced by Kafka metadata) are skipped defensively.
24#[must_use]
25pub fn owned_partitions(
26    total_partitions: i32,
27    registry: &VnodeRegistry,
28    self_id: NodeId,
29) -> Vec<i32> {
30    let vnode_count = registry.vnode_count();
31    (0..total_partitions)
32        .filter(|&p| {
33            let Ok(pid) = u32::try_from(p) else {
34                return false;
35            };
36            registry.owner(pid % vnode_count) == self_id
37        })
38        .collect()
39}
40
41#[cfg(test)]
42mod tests {
43    use super::*;
44    use laminar_core::state::rendezvous_assignment;
45    use std::sync::Arc;
46
47    fn registry_with(vnode_count: u32, owners: &[NodeId]) -> VnodeRegistry {
48        let assignment = rendezvous_assignment(vnode_count, owners);
49        let r = VnodeRegistry::new(vnode_count);
50        r.set_assignment(assignment);
51        r
52    }
53
54    #[test]
55    fn partitions_split_by_modulo_across_two_nodes() {
56        // 4 vnodes, nodes 1 and 2.
57        let r = registry_with(4, &[NodeId(1), NodeId(2)]);
58        // 8 partitions: p % 4 → vnode → owner.
59        let expected_node1: Vec<i32> = (0..8)
60            .filter(|&p| r.owner((p % 4) as u32) == NodeId(1))
61            .collect();
62        let expected_node2: Vec<i32> = (0..8)
63            .filter(|&p| r.owner((p % 4) as u32) == NodeId(2))
64            .collect();
65
66        assert_eq!(owned_partitions(8, &r, NodeId(1)), expected_node1);
67        assert_eq!(owned_partitions(8, &r, NodeId(2)), expected_node2);
68    }
69
70    #[test]
71    fn union_of_owners_covers_all_partitions_disjointly() {
72        let r = registry_with(4, &[NodeId(1), NodeId(2), NodeId(3)]);
73        let total = 16;
74        let mut all: Vec<i32> = Vec::new();
75        for owner in [NodeId(1), NodeId(2), NodeId(3)] {
76            all.extend(owned_partitions(total, &r, owner));
77        }
78        all.sort_unstable();
79        assert_eq!(
80            all,
81            (0..total).collect::<Vec<_>>(),
82            "every partition owned exactly once"
83        );
84    }
85
86    #[test]
87    fn single_owner_takes_all_partitions() {
88        let r = registry_with(8, &[NodeId(7)]);
89        assert_eq!(owned_partitions(5, &r, NodeId(7)), vec![0, 1, 2, 3, 4],);
90    }
91
92    #[test]
93    fn unowned_node_gets_nothing() {
94        let r = registry_with(4, &[NodeId(1), NodeId(2)]);
95        assert!(owned_partitions(8, &r, NodeId(99)).is_empty());
96    }
97
98    #[test]
99    fn fewer_partitions_than_vnodes_leaves_some_nodes_empty() {
100        // 4 vnodes, only 2 partitions.
101        let r = registry_with(4, &[NodeId(1), NodeId(2)]);
102        let expected_node1: Vec<i32> = (0..2)
103            .filter(|&p| r.owner((p % 4) as u32) == NodeId(1))
104            .collect();
105        let expected_node2: Vec<i32> = (0..2)
106            .filter(|&p| r.owner((p % 4) as u32) == NodeId(2))
107            .collect();
108
109        assert_eq!(owned_partitions(2, &r, NodeId(1)), expected_node1);
110        assert_eq!(owned_partitions(2, &r, NodeId(2)), expected_node2);
111    }
112
113    #[test]
114    fn reassignment_moves_partitions() {
115        let r = VnodeRegistry::new(4);
116        r.set_assignment(Arc::from([NodeId(1), NodeId(2), NodeId(1), NodeId(2)]));
117        assert_eq!(owned_partitions(4, &r, NodeId(1)), vec![0, 2]);
118        // Rotate every vnode to node 1 — it should now own all partitions.
119        r.set_assignment(Arc::from([NodeId(1), NodeId(1), NodeId(1), NodeId(1)]));
120        assert_eq!(owned_partitions(4, &r, NodeId(1)), vec![0, 1, 2, 3]);
121        assert!(owned_partitions(4, &r, NodeId(2)).is_empty());
122    }
123}