Skip to main content

laminar_core/state/
vnode.rs

1//! [`VnodeRegistry`] — runtime-configurable virtual node topology.
2//!
3//! Replaces the compile-time `VNODE_COUNT` constant that previously
4//! lived in `laminar-storage`. A registry owns:
5//!
6//! - the current vnode count (configurable; 256 by default),
7//! - the node-per-vnode assignment (for distributed modes),
8//! - a monotonically increasing `assignment_version` used by
9//!   [`ObjectStoreBackend`](super::object_store::ObjectStoreBackend) to
10//!   fence out stale writers.
11//!
12//! Vnode assignment is derived from the row's primary key via
13//! [`key_hash`] (xxh3) and modulo `vnode_count`. Connectors that
14//! need a vnode ID for an event call [`VnodeRegistry::vnode_for_key`].
15
16use std::fmt;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22
23/// Unique identifier for a node. Also the owner id for vnodes; cluster
24/// membership and vnode ownership identify the same thing.
25#[derive(
26    Debug,
27    Clone,
28    Copy,
29    PartialEq,
30    Eq,
31    PartialOrd,
32    Ord,
33    Hash,
34    Serialize,
35    Deserialize,
36    rkyv::Archive,
37    rkyv::Serialize,
38    rkyv::Deserialize,
39)]
40pub struct NodeId(pub u64);
41
42impl NodeId {
43    /// Sentinel meaning "unassigned".
44    pub const UNASSIGNED: Self = Self(0);
45
46    /// True if this is the unassigned sentinel.
47    #[must_use]
48    pub const fn is_unassigned(&self) -> bool {
49        self.0 == 0
50    }
51}
52
53impl fmt::Display for NodeId {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        write!(f, "node-{}", self.0)
56    }
57}
58
59/// Runtime registry of vnode topology and assignment.
60pub struct VnodeRegistry {
61    vnode_count: u32,
62    assignment: RwLock<Arc<[NodeId]>>,
63    assignment_version: AtomicU64,
64}
65
66impl std::fmt::Debug for VnodeRegistry {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("VnodeRegistry")
69            .field("vnode_count", &self.vnode_count)
70            .field(
71                "assignment_version",
72                &self.assignment_version.load(Ordering::Relaxed),
73            )
74            .finish_non_exhaustive()
75    }
76}
77
78impl VnodeRegistry {
79    /// Create a registry sized for `vnode_count` vnodes, all marked
80    /// as [`NodeId::UNASSIGNED`]. The assignment version starts at 1.
81    ///
82    /// # Panics
83    /// Panics if `vnode_count == 0`.
84    #[must_use]
85    pub fn new(vnode_count: u32) -> Self {
86        assert!(vnode_count > 0, "vnode_count must be > 0");
87        let assignment: Arc<[NodeId]> =
88            std::iter::repeat_n(NodeId::UNASSIGNED, vnode_count as usize)
89                .collect::<Vec<_>>()
90                .into();
91        Self {
92            vnode_count,
93            assignment: RwLock::new(assignment),
94            assignment_version: AtomicU64::new(1),
95        }
96    }
97
98    /// Create a registry where every vnode is owned by the same node.
99    ///
100    /// Used by single-instance / embedded deployments.
101    ///
102    /// # Panics
103    /// Panics if `vnode_count == 0`.
104    #[must_use]
105    pub fn single_owner(vnode_count: u32, owner: NodeId) -> Self {
106        assert!(vnode_count > 0, "vnode_count must be > 0");
107        let assignment: Arc<[NodeId]> = std::iter::repeat_n(owner, vnode_count as usize)
108            .collect::<Vec<_>>()
109            .into();
110        Self {
111            vnode_count,
112            assignment: RwLock::new(assignment),
113            assignment_version: AtomicU64::new(1),
114        }
115    }
116
117    /// Number of vnodes.
118    #[must_use]
119    pub fn vnode_count(&self) -> u32 {
120        self.vnode_count
121    }
122
123    /// Current monotonic assignment version.
124    #[must_use]
125    pub fn assignment_version(&self) -> u64 {
126        self.assignment_version.load(Ordering::Acquire)
127    }
128
129    /// Owner of a given vnode. Returns [`NodeId::UNASSIGNED`] if the
130    /// vnode is out of range or unassigned.
131    #[must_use]
132    pub fn owner(&self, vnode: u32) -> NodeId {
133        if vnode >= self.vnode_count {
134            return NodeId::UNASSIGNED;
135        }
136        self.assignment.read()[vnode as usize]
137    }
138
139    /// Snapshot the current assignment vector. Cheap — internally an
140    /// `Arc::clone`.
141    #[must_use]
142    pub fn snapshot(&self) -> Arc<[NodeId]> {
143        Arc::clone(&self.assignment.read())
144    }
145
146    /// Replace the full assignment and bump the version.
147    ///
148    /// # Panics
149    /// Panics if `new_assignment.len() != self.vnode_count`.
150    pub fn set_assignment(&self, new_assignment: Arc<[NodeId]>) {
151        assert_eq!(
152            new_assignment.len(),
153            self.vnode_count as usize,
154            "assignment length mismatch: got {}, expected {}",
155            new_assignment.len(),
156            self.vnode_count,
157        );
158        *self.assignment.write() = new_assignment;
159        self.assignment_version.fetch_add(1, Ordering::AcqRel);
160    }
161
162    /// Replace the full assignment and set the version to `version`
163    /// atomically. For recovery paths that must restore the registry to
164    /// a persisted fence generation, not a fresh bump.
165    ///
166    /// # Panics
167    /// Panics on length mismatch, or if `version` is less than the
168    /// current one (assignment versions are monotonic).
169    pub fn set_assignment_and_version(&self, new_assignment: Arc<[NodeId]>, version: u64) {
170        assert_eq!(
171            new_assignment.len(),
172            self.vnode_count as usize,
173            "assignment length mismatch: got {}, expected {}",
174            new_assignment.len(),
175            self.vnode_count,
176        );
177        let mut guard = self.assignment.write();
178        let current = self.assignment_version.load(Ordering::Acquire);
179        assert!(
180            version >= current,
181            "assignment version must be monotonic: got {version}, current {current}",
182        );
183        *guard = new_assignment;
184        self.assignment_version.store(version, Ordering::Release);
185    }
186
187    /// Map a primary key to a vnode.
188    #[must_use]
189    pub fn vnode_for_key(&self, key: &[u8]) -> u32 {
190        #[allow(clippy::cast_possible_truncation)]
191        let h = (key_hash(key) % u64::from(self.vnode_count)) as u32;
192        h
193    }
194}
195
196/// Hash a key to a 64-bit value. Used to derive vnode IDs and for any
197/// other keyed-partitioning decisions.
198///
199/// Fixed to xxh3 so all pipeline stages produce the same vnode for the
200/// same key without needing to share a hasher instance.
201#[must_use]
202pub fn key_hash(key: &[u8]) -> u64 {
203    xxhash_rust::xxh3::xxh3_64(key)
204}
205
206/// Build a vnode-to-owner assignment by round-robin across sorted peers.
207///
208/// Deterministic for a given `(vnode_count, peers)` input — every node
209/// computing this independently agrees, given the same peer list. Uses
210/// sort-by-id + modulo: vnode `v` → `peers[v % peers.len()]` after
211/// sorting. Simple and fine for uniform key distributions.
212///
213/// **Not** consistent hashing — a peer join/leave reshuffles every
214/// vnode. Acceptable for the current phase since reassignment happens
215/// only at cluster start; dynamic rebalance is deferred work.
216///
217/// # Panics
218/// Panics if `peers` is empty.
219#[must_use]
220pub fn round_robin_assignment(vnode_count: u32, peers: &[NodeId]) -> Arc<[NodeId]> {
221    assert!(
222        !peers.is_empty(),
223        "round_robin_assignment needs at least one peer"
224    );
225    let mut sorted: Vec<NodeId> = peers.to_vec();
226    sorted.sort_by_key(|n| n.0);
227    (0..vnode_count)
228        .map(|v| sorted[(v as usize) % sorted.len()])
229        .collect::<Vec<_>>()
230        .into()
231}
232
233/// Vnodes currently assigned to `owner`.
234///
235/// Used by the checkpoint coordinator to decide which vnodes' durability
236/// markers it is responsible for writing each epoch, and by the leader's
237/// `epoch_complete` gate to know the full set to check.
238#[must_use]
239pub fn owned_vnodes(registry: &VnodeRegistry, owner: NodeId) -> Vec<u32> {
240    (0..registry.vnode_count())
241        .filter(|&v| registry.owner(v) == owner)
242        .collect()
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn new_registry_is_unassigned() {
251        let r = VnodeRegistry::new(8);
252        assert_eq!(r.vnode_count(), 8);
253        for v in 0..8 {
254            assert!(r.owner(v).is_unassigned());
255        }
256    }
257
258    #[test]
259    fn single_owner_populates_all_slots() {
260        let r = VnodeRegistry::single_owner(4, NodeId(42));
261        for v in 0..4 {
262            assert_eq!(r.owner(v), NodeId(42));
263        }
264    }
265
266    #[test]
267    fn set_assignment_bumps_version() {
268        let r = VnodeRegistry::new(4);
269        let v0 = r.assignment_version();
270        let new_assign: Arc<[NodeId]> = vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into();
271        r.set_assignment(new_assign);
272        assert!(r.assignment_version() > v0);
273        assert_eq!(r.owner(0), NodeId(1));
274        assert_eq!(r.owner(1), NodeId(2));
275    }
276
277    #[test]
278    fn vnode_for_key_in_range() {
279        let r = VnodeRegistry::new(16);
280        for i in 0..100 {
281            let v = r.vnode_for_key(format!("k-{i}").as_bytes());
282            assert!(v < 16);
283        }
284    }
285
286    #[test]
287    #[should_panic(expected = "assignment length mismatch")]
288    fn set_assignment_rejects_wrong_length() {
289        let r = VnodeRegistry::new(4);
290        let bad: Arc<[NodeId]> = vec![NodeId(1)].into();
291        r.set_assignment(bad);
292    }
293
294    #[test]
295    fn owner_out_of_range_returns_unassigned() {
296        let r = VnodeRegistry::single_owner(4, NodeId(1));
297        assert!(r.owner(10).is_unassigned());
298    }
299
300    #[test]
301    fn vnode_for_key_is_deterministic() {
302        let r = VnodeRegistry::new(16);
303        assert_eq!(r.vnode_for_key(b"key-x"), r.vnode_for_key(b"key-x"));
304    }
305
306    #[test]
307    fn owned_vnodes_filters_by_owner() {
308        let r = VnodeRegistry::new(4);
309        r.set_assignment(vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into());
310        assert_eq!(owned_vnodes(&r, NodeId(1)), vec![0, 2]);
311        assert_eq!(owned_vnodes(&r, NodeId(2)), vec![1, 3]);
312        assert!(owned_vnodes(&r, NodeId(99)).is_empty());
313    }
314
315    #[test]
316    fn owned_vnodes_single_owner_returns_all() {
317        let r = VnodeRegistry::single_owner(8, NodeId(42));
318        assert_eq!(owned_vnodes(&r, NodeId(42)), (0..8).collect::<Vec<_>>());
319    }
320
321    #[test]
322    fn round_robin_is_deterministic_and_balanced() {
323        // 8 vnodes, 3 peers → 3+3+2 distribution after modulo.
324        let peers = vec![NodeId(7), NodeId(3), NodeId(5)];
325        let assignment = round_robin_assignment(8, &peers);
326        // Sorted: [3, 5, 7]. vnode v owned by sorted[v % 3].
327        assert_eq!(
328            &*assignment,
329            &[
330                NodeId(3),
331                NodeId(5),
332                NodeId(7),
333                NodeId(3),
334                NodeId(5),
335                NodeId(7),
336                NodeId(3),
337                NodeId(5),
338            ][..]
339        );
340        // Input order doesn't matter.
341        let reversed = vec![NodeId(3), NodeId(5), NodeId(7)];
342        assert_eq!(round_robin_assignment(8, &reversed), assignment);
343    }
344
345    #[test]
346    fn round_robin_single_peer_owns_everything() {
347        let assignment = round_robin_assignment(4, &[NodeId(99)]);
348        assert!(assignment.iter().all(|&n| n == NodeId(99)));
349    }
350
351    #[test]
352    #[should_panic(expected = "at least one peer")]
353    fn round_robin_rejects_empty_peer_list() {
354        let _ = round_robin_assignment(4, &[]);
355    }
356}