Skip to main content

laminar_core/state/
vnode.rs

1//! [`VnodeRegistry`] — runtime-configurable virtual node topology.
2//!
3//! Replaces the compile-time `VNODE_COUNT` constant that previously
4//! lived in `laminar-storage`. A registry owns:
5//!
6//! - the current vnode count (configurable; 256 by default),
7//! - the node-per-vnode assignment (for distributed modes),
8//! - a monotonically increasing `assignment_version` used by
9//!   [`ObjectStoreBackend`](super::object_store::ObjectStoreBackend) to
10//!   fence out stale writers.
11//!
12//! Vnode assignment is derived from the row's primary key via
13//! [`key_hash`] (xxh3) and modulo `vnode_count`. Connectors that
14//! need a vnode ID for an event call [`VnodeRegistry::vnode_for_key`].
15
16use std::collections::BTreeMap;
17use std::fmt;
18use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
19use std::sync::Arc;
20
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23
24/// Unique identifier for a node. Also the owner id for vnodes; cluster
25/// membership and vnode ownership identify the same thing.
26#[derive(
27    Debug,
28    Clone,
29    Copy,
30    PartialEq,
31    Eq,
32    PartialOrd,
33    Ord,
34    Hash,
35    Serialize,
36    Deserialize,
37    rkyv::Archive,
38    rkyv::Serialize,
39    rkyv::Deserialize,
40)]
41pub struct NodeId(pub u64);
42
43impl NodeId {
44    /// Sentinel meaning "unassigned".
45    pub const UNASSIGNED: Self = Self(0);
46
47    /// True if this is the unassigned sentinel.
48    #[must_use]
49    pub const fn is_unassigned(&self) -> bool {
50        self.0 == 0
51    }
52}
53
54impl fmt::Display for NodeId {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        write!(f, "node-{}", self.0)
57    }
58}
59
60/// Per-vnode lifecycle state. Distinct from ownership: a vnode this node
61/// owns can still be [`Restoring`](Self::Restoring) while its committed
62/// state is being rehydrated from durable storage after a rebalance.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum VnodeLifecycleState {
65    /// Fully owned and serving — state is consistent.
66    Active,
67    /// Newly acquired in a rebalance; durable state is still being
68    /// applied. Operators suppress emission for keys in this vnode until
69    /// it flips back to [`Active`](Self::Active).
70    Restoring,
71}
72
73impl VnodeLifecycleState {
74    const ACTIVE: u8 = 0;
75    const RESTORING: u8 = 1;
76
77    const fn to_u8(self) -> u8 {
78        match self {
79            Self::Active => Self::ACTIVE,
80            Self::Restoring => Self::RESTORING,
81        }
82    }
83}
84
85/// Runtime registry of vnode topology and assignment.
86pub struct VnodeRegistry {
87    vnode_count: u32,
88    assignment: RwLock<Arc<[NodeId]>>,
89    assignment_version: AtomicU64,
90    /// Per-vnode lifecycle, indexed by vnode id. `0` = `Active`,
91    /// `1` = `Restoring`. Lock-free: rebalance flips individual entries
92    /// and the hot emission gate reads them without taking a lock. Not
93    /// serialized — rebuilt (all `Active`) from the `AssignmentSnapshot`
94    /// on boot, so adding it never touches a wire format.
95    lifecycle: Arc<[AtomicU8]>,
96}
97
98impl std::fmt::Debug for VnodeRegistry {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("VnodeRegistry")
101            .field("vnode_count", &self.vnode_count)
102            .field(
103                "assignment_version",
104                &self.assignment_version.load(Ordering::Relaxed),
105            )
106            .finish_non_exhaustive()
107    }
108}
109
110impl VnodeRegistry {
111    /// Create a registry sized for `vnode_count` vnodes, all marked
112    /// as [`NodeId::UNASSIGNED`]. The assignment version starts at 1.
113    ///
114    /// # Panics
115    /// Panics if `vnode_count == 0`.
116    #[must_use]
117    pub fn new(vnode_count: u32) -> Self {
118        assert!(vnode_count > 0, "vnode_count must be > 0");
119        let assignment: Arc<[NodeId]> =
120            std::iter::repeat_n(NodeId::UNASSIGNED, vnode_count as usize)
121                .collect::<Vec<_>>()
122                .into();
123        Self {
124            vnode_count,
125            assignment: RwLock::new(assignment),
126            assignment_version: AtomicU64::new(1),
127            lifecycle: new_lifecycle(vnode_count),
128        }
129    }
130
131    /// Create a registry where every vnode is owned by the same node.
132    ///
133    /// Used by single-instance / embedded deployments.
134    ///
135    /// # Panics
136    /// Panics if `vnode_count == 0`.
137    #[must_use]
138    pub fn single_owner(vnode_count: u32, owner: NodeId) -> Self {
139        assert!(vnode_count > 0, "vnode_count must be > 0");
140        let assignment: Arc<[NodeId]> = std::iter::repeat_n(owner, vnode_count as usize)
141            .collect::<Vec<_>>()
142            .into();
143        Self {
144            vnode_count,
145            assignment: RwLock::new(assignment),
146            assignment_version: AtomicU64::new(1),
147            lifecycle: new_lifecycle(vnode_count),
148        }
149    }
150
151    /// Number of vnodes.
152    #[must_use]
153    pub fn vnode_count(&self) -> u32 {
154        self.vnode_count
155    }
156
157    /// Current monotonic assignment version.
158    #[must_use]
159    pub fn assignment_version(&self) -> u64 {
160        self.assignment_version.load(Ordering::Acquire)
161    }
162
163    /// Owner of a given vnode. Returns [`NodeId::UNASSIGNED`] if the
164    /// vnode is out of range or unassigned.
165    #[must_use]
166    pub fn owner(&self, vnode: u32) -> NodeId {
167        if vnode >= self.vnode_count {
168            return NodeId::UNASSIGNED;
169        }
170        self.assignment.read()[vnode as usize]
171    }
172
173    /// Snapshot the current assignment vector. Cheap — internally an
174    /// `Arc::clone`.
175    #[must_use]
176    pub fn snapshot(&self) -> Arc<[NodeId]> {
177        Arc::clone(&self.assignment.read())
178    }
179
180    /// Replace the full assignment and bump the version.
181    ///
182    /// # Panics
183    /// Panics if `new_assignment.len() != self.vnode_count`.
184    pub fn set_assignment(&self, new_assignment: Arc<[NodeId]>) {
185        assert_eq!(
186            new_assignment.len(),
187            self.vnode_count as usize,
188            "assignment length mismatch: got {}, expected {}",
189            new_assignment.len(),
190            self.vnode_count,
191        );
192        *self.assignment.write() = new_assignment;
193        self.assignment_version.fetch_add(1, Ordering::AcqRel);
194    }
195
196    /// Replace the full assignment and set the version to `version`
197    /// atomically. For recovery paths that must restore the registry to
198    /// a persisted fence generation, not a fresh bump.
199    ///
200    /// # Panics
201    /// Panics on length mismatch, or if `version` is less than the
202    /// current one (assignment versions are monotonic).
203    pub fn set_assignment_and_version(&self, new_assignment: Arc<[NodeId]>, version: u64) {
204        assert_eq!(
205            new_assignment.len(),
206            self.vnode_count as usize,
207            "assignment length mismatch: got {}, expected {}",
208            new_assignment.len(),
209            self.vnode_count,
210        );
211        let mut guard = self.assignment.write();
212        let current = self.assignment_version.load(Ordering::Acquire);
213        assert!(
214            version >= current,
215            "assignment version must be monotonic: got {version}, current {current}",
216        );
217        *guard = new_assignment;
218        self.assignment_version.store(version, Ordering::Release);
219    }
220
221    /// Map a primary key to a vnode.
222    #[must_use]
223    pub fn vnode_for_key(&self, key: &[u8]) -> u32 {
224        #[allow(clippy::cast_possible_truncation)]
225        let h = (key_hash(key) % u64::from(self.vnode_count)) as u32;
226        h
227    }
228
229    /// Mark `vnodes` as [`Restoring`](VnodeLifecycleState::Restoring).
230    ///
231    /// Called during a rebalance for the vnodes a node newly acquires,
232    /// before their committed state has been applied. Out-of-range ids
233    /// are ignored.
234    pub fn mark_restoring(&self, vnodes: &[u32]) {
235        self.set_lifecycle(vnodes, VnodeLifecycleState::Restoring);
236    }
237
238    /// Mark `vnodes` as [`Active`](VnodeLifecycleState::Active).
239    ///
240    /// Called once a newly-acquired vnode's state has been applied (or
241    /// immediately for vnodes that had no durable state to restore).
242    /// Out-of-range ids are ignored.
243    pub fn mark_active(&self, vnodes: &[u32]) {
244        self.set_lifecycle(vnodes, VnodeLifecycleState::Active);
245    }
246
247    fn set_lifecycle(&self, vnodes: &[u32], state: VnodeLifecycleState) {
248        let byte = state.to_u8();
249        for &v in vnodes {
250            if let Some(slot) = self.lifecycle.get(v as usize) {
251                slot.store(byte, Ordering::Release);
252            }
253        }
254    }
255
256    /// Whether `vnode` is currently [`Restoring`](VnodeLifecycleState::Restoring).
257    /// Out-of-range ids are reported as not restoring.
258    #[must_use]
259    pub fn is_restoring(&self, vnode: u32) -> bool {
260        self.lifecycle
261            .get(vnode as usize)
262            .is_some_and(|s| s.load(Ordering::Acquire) == VnodeLifecycleState::RESTORING)
263    }
264
265    /// Whether any vnode is currently restoring. Cheap pre-check the
266    /// emission gate uses to skip per-row work in the common case.
267    #[must_use]
268    pub fn any_restoring(&self) -> bool {
269        self.lifecycle
270            .iter()
271            .any(|s| s.load(Ordering::Acquire) == VnodeLifecycleState::RESTORING)
272    }
273
274    /// Vnodes currently [`Restoring`](VnodeLifecycleState::Restoring), ascending.
275    #[must_use]
276    #[allow(clippy::cast_possible_truncation)] // index < vnode_count, which is u32
277    pub fn restoring_vnodes(&self) -> Vec<u32> {
278        self.lifecycle
279            .iter()
280            .enumerate()
281            .filter_map(|(i, s)| {
282                (s.load(Ordering::Acquire) == VnodeLifecycleState::RESTORING).then_some(i as u32)
283            })
284            .collect()
285    }
286}
287
288/// Build a fresh lifecycle array with every vnode [`Active`].
289fn new_lifecycle(vnode_count: u32) -> Arc<[AtomicU8]> {
290    std::iter::repeat_with(|| AtomicU8::new(VnodeLifecycleState::ACTIVE))
291        .take(vnode_count as usize)
292        .collect::<Vec<_>>()
293        .into()
294}
295
296/// Hash a key to a 64-bit value. Used to derive vnode IDs and for any
297/// other keyed-partitioning decisions.
298///
299/// Fixed to xxh3 so all pipeline stages produce the same vnode for the
300/// same key without needing to share a hasher instance.
301#[must_use]
302pub fn key_hash(key: &[u8]) -> u64 {
303    xxhash_rust::xxh3::xxh3_64(key)
304}
305
306/// Build a vnode-to-owner assignment using Rendezvous Hashing (Highest Random Weight).
307///
308/// Deterministic for a given `(vnode_count, peers)` input. Minimizes partition
309/// reshuffling on membership changes (node joins/leaves).
310///
311/// # Panics
312/// Panics if `peers` is empty.
313#[must_use]
314pub fn rendezvous_assignment(vnode_count: u32, peers: &[NodeId]) -> Arc<[NodeId]> {
315    assert!(
316        !peers.is_empty(),
317        "rendezvous_assignment needs at least one peer"
318    );
319    let mut sorted_peers = peers.to_vec();
320    sorted_peers.sort_by_key(|n| n.0);
321
322    let mut assignment = Vec::with_capacity(vnode_count as usize);
323    for v in 0..vnode_count {
324        let mut max_weight = 0;
325        let mut selected_node = sorted_peers[0];
326
327        for &node in &sorted_peers {
328            // Hash the combination of vnode ID and node ID
329            let mut buf = [0u8; 16];
330            buf[0..8].copy_from_slice(&u64::from(v).to_le_bytes());
331            buf[8..16].copy_from_slice(&node.0.to_le_bytes());
332            let weight = xxhash_rust::xxh3::xxh3_64(&buf);
333
334            // Highest weight wins, tie-break by NodeId
335            if weight > max_weight || (weight == max_weight && node.0 > selected_node.0) {
336                max_weight = weight;
337                selected_node = node;
338            }
339        }
340        assignment.push(selected_node);
341    }
342    assignment.into()
343}
344
345/// A node's failure-domain locality: ordered tier values, coarsest first
346/// (e.g. `["us-east-1", "us-east-1a", "r17"]`). Parsed from the node's
347/// `failure_domain` gossip string.
348#[derive(Debug, Clone, Default, PartialEq, Eq)]
349pub struct Locality {
350    tiers: Vec<String>,
351}
352
353impl Locality {
354    /// Build from ordered tier values, coarsest first.
355    #[must_use]
356    pub fn new(tiers: Vec<String>) -> Self {
357        Self { tiers }
358    }
359
360    /// Parse `"region=us-east-1;zone=us-east-1a;rack=r17"` (or a bare label
361    /// `"rack17"`) into its tier values. Tier names are ignored.
362    #[must_use]
363    pub fn parse(s: &str) -> Self {
364        let tiers = s
365            .split(';')
366            .map(str::trim)
367            .filter(|seg| !seg.is_empty())
368            .map(|seg| {
369                seg.split_once('=')
370                    .map_or(seg, |(_, v)| v.trim())
371                    .to_string()
372            })
373            .collect();
374        Self { tiers }
375    }
376
377    /// Failure-domain key at `tier`: the `;`-joined value prefix `0..=tier`.
378    /// Two nodes share a domain iff equal; an unlabeled node yields the empty key.
379    #[must_use]
380    pub fn domain_at(&self, tier: usize) -> String {
381        if self.tiers.is_empty() {
382            return String::new();
383        }
384        let end = tier.min(self.tiers.len() - 1);
385        self.tiers[..=end].join(";")
386    }
387}
388
389/// Resolve each node's failure-domain key at `isolation_tier`.
390fn resolve_domains(nodes: &[(NodeId, Locality)], isolation_tier: usize) -> Vec<(NodeId, String)> {
391    nodes
392        .iter()
393        .map(|(id, loc)| (*id, loc.domain_at(isolation_tier)))
394        .collect()
395}
396
397/// Owner counts per failure domain at `isolation_tier`. The largest value over
398/// `vnode_count` is the blast radius — the share of state that goes `Restoring`
399/// if that one domain fails at once.
400#[must_use]
401pub fn owners_per_domain(
402    owners: &[NodeId],
403    nodes: &[(NodeId, Locality)],
404    isolation_tier: usize,
405) -> BTreeMap<String, u32> {
406    let dom: BTreeMap<NodeId, String> =
407        resolve_domains(nodes, isolation_tier).into_iter().collect();
408    let mut counts = BTreeMap::new();
409    for &o in owners {
410        *counts
411            .entry(dom.get(&o).cloned().unwrap_or_default())
412            .or_default() += 1;
413    }
414    counts
415}
416
417/// Vnodes currently assigned to `owner`.
418///
419/// Used by the checkpoint coordinator to decide which vnodes' durability
420/// markers it is responsible for writing each epoch, and by the leader's
421/// `epoch_complete` gate to know the full set to check.
422#[must_use]
423pub fn owned_vnodes(registry: &VnodeRegistry, owner: NodeId) -> Vec<u32> {
424    (0..registry.vnode_count())
425        .filter(|&v| registry.owner(v) == owner)
426        .collect()
427}
428
429/// Distinct assigned nodes other than `self_id`, sorted by id — the peer set a
430/// node fans checkpoint barriers and shuffle data out to.
431#[must_use]
432pub fn peer_owners(registry: &VnodeRegistry, self_id: NodeId) -> Vec<NodeId> {
433    let mut peers: Vec<NodeId> = (0..registry.vnode_count())
434        .map(|v| registry.owner(v))
435        .filter(|o| !o.is_unassigned() && *o != self_id)
436        .collect();
437    peers.sort_unstable();
438    peers.dedup();
439    peers
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn new_registry_is_unassigned() {
448        let r = VnodeRegistry::new(8);
449        assert_eq!(r.vnode_count(), 8);
450        for v in 0..8 {
451            assert!(r.owner(v).is_unassigned());
452        }
453    }
454
455    #[test]
456    fn single_owner_populates_all_slots() {
457        let r = VnodeRegistry::single_owner(4, NodeId(42));
458        for v in 0..4 {
459            assert_eq!(r.owner(v), NodeId(42));
460        }
461    }
462
463    #[test]
464    fn set_assignment_bumps_version() {
465        let r = VnodeRegistry::new(4);
466        let v0 = r.assignment_version();
467        let new_assign: Arc<[NodeId]> = vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into();
468        r.set_assignment(new_assign);
469        assert!(r.assignment_version() > v0);
470        assert_eq!(r.owner(0), NodeId(1));
471        assert_eq!(r.owner(1), NodeId(2));
472    }
473
474    #[test]
475    fn vnode_for_key_in_range() {
476        let r = VnodeRegistry::new(16);
477        for i in 0..100 {
478            let v = r.vnode_for_key(format!("k-{i}").as_bytes());
479            assert!(v < 16);
480        }
481    }
482
483    #[test]
484    #[should_panic(expected = "assignment length mismatch")]
485    fn set_assignment_rejects_wrong_length() {
486        let r = VnodeRegistry::new(4);
487        let bad: Arc<[NodeId]> = vec![NodeId(1)].into();
488        r.set_assignment(bad);
489    }
490
491    #[test]
492    fn owner_out_of_range_returns_unassigned() {
493        let r = VnodeRegistry::single_owner(4, NodeId(1));
494        assert!(r.owner(10).is_unassigned());
495    }
496
497    #[test]
498    fn vnode_for_key_is_deterministic() {
499        let r = VnodeRegistry::new(16);
500        assert_eq!(r.vnode_for_key(b"key-x"), r.vnode_for_key(b"key-x"));
501    }
502
503    #[test]
504    fn owned_vnodes_filters_by_owner() {
505        let r = VnodeRegistry::new(4);
506        r.set_assignment(vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)].into());
507        assert_eq!(owned_vnodes(&r, NodeId(1)), vec![0, 2]);
508        assert_eq!(owned_vnodes(&r, NodeId(2)), vec![1, 3]);
509        assert!(owned_vnodes(&r, NodeId(99)).is_empty());
510    }
511
512    #[test]
513    fn owned_vnodes_single_owner_returns_all() {
514        let r = VnodeRegistry::single_owner(8, NodeId(42));
515        assert_eq!(owned_vnodes(&r, NodeId(42)), (0..8).collect::<Vec<_>>());
516    }
517
518    #[test]
519    fn rendezvous_is_deterministic() {
520        let peers = vec![NodeId(7), NodeId(3), NodeId(5)];
521        let assignment = rendezvous_assignment(8, &peers);
522        // Input order doesn't matter.
523        let reversed = vec![NodeId(3), NodeId(5), NodeId(7)];
524        assert_eq!(rendezvous_assignment(8, &reversed), assignment);
525    }
526
527    #[test]
528    fn rendezvous_single_peer_owns_everything() {
529        let assignment = rendezvous_assignment(4, &[NodeId(99)]);
530        assert!(assignment.iter().all(|&n| n == NodeId(99)));
531    }
532
533    #[test]
534    #[should_panic(expected = "needs at least one peer")]
535    fn rendezvous_rejects_empty_peer_list() {
536        let _ = rendezvous_assignment(4, &[]);
537    }
538
539    #[test]
540    fn rendezvous_minimizes_state_movement() {
541        let peers3 = vec![NodeId(1), NodeId(2), NodeId(3)];
542        let peers4 = vec![NodeId(1), NodeId(2), NodeId(3), NodeId(4)];
543
544        let a3 = rendezvous_assignment(256, &peers3);
545        let a4 = rendezvous_assignment(256, &peers4);
546
547        let mut moved = 0;
548        let mut moved_between_existing = 0;
549
550        for v in 0..256usize {
551            let o3 = a3[v];
552            let o4 = a4[v];
553            if o3 != o4 {
554                moved += 1;
555                if o4 != NodeId(4) {
556                    moved_between_existing += 1;
557                }
558            }
559        }
560
561        assert_eq!(
562            moved_between_existing, 0,
563            "No vnode should move between existing peers on a node join"
564        );
565        assert!(
566            moved > 40 && moved < 90,
567            "Expected roughly 25% of vnodes to move to the new peer, got {moved}"
568        );
569
570        for v in 0..256usize {
571            if a3[v] != a4[v] {
572                assert_eq!(a4[v], NodeId(4));
573            }
574        }
575    }
576
577    #[test]
578    fn vnodes_start_active() {
579        let r = VnodeRegistry::new(4);
580        assert!(!r.any_restoring());
581        for v in 0..4 {
582            assert!(!r.is_restoring(v));
583        }
584        assert!(r.restoring_vnodes().is_empty());
585    }
586
587    #[test]
588    fn mark_restoring_and_active_round_trip() {
589        let r = VnodeRegistry::new(4);
590        r.mark_restoring(&[1, 3]);
591        assert!(r.any_restoring());
592        assert!(r.is_restoring(1));
593        assert!(r.is_restoring(3));
594        assert!(!r.is_restoring(0));
595        assert_eq!(r.restoring_vnodes(), vec![1, 3]);
596
597        r.mark_active(&[1]);
598        assert!(!r.is_restoring(1));
599        assert_eq!(r.restoring_vnodes(), vec![3]);
600
601        r.mark_active(&[3]);
602        assert!(!r.any_restoring());
603    }
604
605    #[test]
606    fn lifecycle_ignores_out_of_range() {
607        let r = VnodeRegistry::new(2);
608        r.mark_restoring(&[5, 99]); // no panic
609        assert!(!r.is_restoring(5));
610        assert!(!r.any_restoring());
611    }
612
613    #[test]
614    fn lifecycle_independent_of_assignment() {
615        // Reassigning ownership must not clear lifecycle state — the two
616        // are orthogonal and the caller drives the Restoring→Active flip.
617        let r = VnodeRegistry::new(4);
618        r.mark_restoring(&[2]);
619        r.set_assignment(vec![NodeId(1), NodeId(1), NodeId(1), NodeId(1)].into());
620        assert!(r.is_restoring(2));
621    }
622
623    // -- Topology-aware placement --------------------------------------------
624
625    /// A node at (region, zone, rack).
626    fn node(id: u64, region: &str, zone: &str, rack: &str) -> (NodeId, Locality) {
627        (
628            NodeId(id),
629            Locality::new(vec![region.into(), zone.into(), rack.into()]),
630        )
631    }
632
633    const TIER_ZONE: usize = 1;
634
635    #[test]
636    fn locality_parse_and_domain_at() {
637        let l = Locality::parse("region=us-east-1;zone=us-east-1a;rack=r17");
638        assert_eq!(l.domain_at(0), "us-east-1");
639        assert_eq!(l.domain_at(1), "us-east-1;us-east-1a");
640        assert_eq!(l.domain_at(2), "us-east-1;us-east-1a;r17");
641        assert_eq!(l.domain_at(99), "us-east-1;us-east-1a;r17"); // clamps to finest
642        assert_eq!(Locality::parse("rack17").domain_at(0), "rack17"); // bare label
643        assert_eq!(Locality::parse("").domain_at(0), ""); // unknown → empty domain
644    }
645
646    #[test]
647    fn owners_per_domain_counts_by_zone() {
648        let nodes = vec![node(1, "r", "z1", "a"), node(2, "r", "z2", "a")];
649        // z1 owns 2, z2 owns 1, and an unassigned owner folds into the empty domain.
650        let owners = [NodeId(1), NodeId(1), NodeId(2), NodeId::UNASSIGNED];
651        let counts = owners_per_domain(&owners, &nodes, TIER_ZONE);
652        assert_eq!(counts["r;z1"], 2);
653        assert_eq!(counts["r;z2"], 1);
654        assert_eq!(counts[""], 1);
655    }
656}