Skip to main content

laminar_db/
rebalance.rs

1//! Dynamic vnode rebalance control plane.
2
3#![cfg(feature = "cluster")]
4#![allow(clippy::disallowed_types)] // cold path
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use laminar_core::cluster::control::{
10    AssignmentSnapshot, AssignmentSnapshotStore, ClusterController, RotateOutcome,
11};
12use laminar_core::state::{
13    owners_per_domain, rendezvous_assignment, Locality, NodeId, VnodeRegistry,
14};
15use tokio::sync::Notify;
16use tokio::task::JoinHandle;
17use tokio::time::MissedTickBehavior;
18use tracing::{debug, info, warn};
19
20use crate::db::{LaminarDB, SnapshotAdoption};
21use crate::engine_metrics::EngineMetrics;
22
23/// Tunables for the rebalance control plane.
24#[derive(Debug, Clone, Copy)]
25pub struct RebalanceConfig {
26    /// Interval between snapshot-store polls.
27    pub watcher_poll: Duration,
28    /// Quiet period before a membership change triggers rotation.
29    pub rebalance_debounce: Duration,
30    /// Upper bound on the pre-rotation forced checkpoint.
31    pub checkpoint_timeout: Duration,
32    /// Delay before retrying a failed rotation.
33    pub retry_delay: Duration,
34    /// Locality tier the placement metrics group by (0 = coarsest).
35    pub placement_isolation_tier: usize,
36}
37
38impl Default for RebalanceConfig {
39    fn default() -> Self {
40        Self {
41            watcher_poll: Duration::from_secs(2),
42            rebalance_debounce: Duration::from_secs(5),
43            // A healthy pre-rotation drain commits in well under a
44            // second; a long budget only delays recovery when a node
45            // dies mid-drain (the drain then cannot succeed and the
46            // rotation is what restores commit availability).
47            checkpoint_timeout: Duration::from_secs(15),
48            retry_delay: Duration::from_secs(2),
49            placement_isolation_tier: 0,
50        }
51    }
52}
53
54impl RebalanceConfig {
55    /// Fast timings for tests — 500ms debounce thrashes in production.
56    #[doc(hidden)]
57    #[must_use]
58    pub fn test_defaults() -> Self {
59        Self {
60            watcher_poll: Duration::from_millis(200),
61            rebalance_debounce: Duration::from_millis(500),
62            checkpoint_timeout: Duration::from_secs(30),
63            retry_delay: Duration::from_millis(500),
64            placement_isolation_tier: 0,
65        }
66    }
67}
68
69/// Surface the rehydration outcome of an adopted snapshot. A node that
70/// gained vnodes pulled their last committed state off the shared backend
71/// in [`LaminarDB::adopt_assignment_snapshot`]; log what moved so operators
72/// have an audit trail of every rebalance-driven state transfer.
73fn log_adoption(source: &str, adoption: &SnapshotAdoption) {
74    if adoption.newly_acquired.is_empty() {
75        return;
76    }
77    info!(
78        source,
79        version = adoption.version,
80        newly_acquired = adoption.newly_acquired.len(),
81        rehydrated = adoption.rehydrated,
82        rehydration_epoch = ?adoption.rehydration_epoch,
83        "rehydrated newly-acquired vnodes after rebalance",
84    );
85}
86
87/// Spawn the per-node snapshot watcher. Exits on `shutdown`.
88pub fn spawn_snapshot_watcher(
89    db: Arc<LaminarDB>,
90    store: Arc<AssignmentSnapshotStore>,
91    registry: Arc<VnodeRegistry>,
92    shutdown: Arc<Notify>,
93    config: RebalanceConfig,
94    controller: Option<Arc<ClusterController>>,
95) -> JoinHandle<()> {
96    tokio::spawn(async move {
97        let mut ticker = tokio::time::interval(config.watcher_poll);
98        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
99        ticker.tick().await; // burn the immediate first tick
100        let mut published_version = 0;
101        loop {
102            tokio::select! {
103                biased;
104                () = shutdown.notified() => return,
105                _ = ticker.tick() => {}
106            }
107            let local = registry.assignment_version();
108
109            let mut remote_newer = true;
110            if let Some(ref c) = controller {
111                if let Some(gossiped_version) = c.read_snapshot_version().await {
112                    if gossiped_version <= local {
113                        remote_newer = false;
114                    }
115                }
116            }
117
118            if remote_newer {
119                match store.load().await {
120                    Ok(Some(snap)) if snap.version > local => {
121                        debug!(local, remote = snap.version, "adopting newer assignment");
122                        let adoption = db.adopt_assignment_snapshot(snap).await;
123                        log_adoption("watcher", &adoption);
124                    }
125                    Ok(_) => {}
126                    Err(e) => warn!(error = %e, "snapshot watcher: load failed"),
127                }
128            }
129
130            // Refresh placement metrics only when the assignment changed.
131            let version = registry.assignment_version();
132            if version != published_version {
133                published_version = version;
134                if let (Some(c), Some(metrics)) = (controller.as_ref(), db.engine_metrics()) {
135                    let nodes = c.assignable_with_locality();
136                    publish_placement_metrics(
137                        &metrics,
138                        &registry,
139                        &nodes,
140                        config.placement_isolation_tier,
141                    );
142                }
143            }
144        }
145    })
146}
147
148/// Publish per-domain owner counts and the blast-radius ratio. The gauge vector
149/// is reset so domains that disappear don't leave stale series.
150#[allow(clippy::cast_precision_loss)]
151fn publish_placement_metrics(
152    metrics: &EngineMetrics,
153    registry: &VnodeRegistry,
154    nodes: &[(NodeId, Locality)],
155    isolation_tier: usize,
156) {
157    let owners = registry.snapshot();
158    let total = owners.len().max(1);
159    let counts = owners_per_domain(&owners, nodes, isolation_tier);
160
161    metrics.placement_vnodes_per_domain.reset();
162    let mut max = 0u32;
163    for (domain, &count) in &counts {
164        let label = if domain.is_empty() {
165            "unknown"
166        } else {
167            domain.as_str()
168        };
169        metrics
170            .placement_vnodes_per_domain
171            .with_label_values(&[label])
172            .set(i64::from(count));
173        max = max.max(count);
174    }
175    metrics
176        .placement_blast_radius_ratio
177        .set(f64::from(max) / total as f64);
178}
179
180/// Spawn the leader-gated rebalance controller. Runs on every node;
181/// leadership is re-checked after the debounce.
182pub fn spawn_rebalance_controller(
183    db: Arc<LaminarDB>,
184    controller: Arc<ClusterController>,
185    store: Arc<AssignmentSnapshotStore>,
186    registry: Arc<VnodeRegistry>,
187    shutdown: Arc<Notify>,
188    config: RebalanceConfig,
189) -> JoinHandle<()> {
190    tokio::spawn(async move {
191        let mut members = controller.members_watch();
192        loop {
193            tokio::select! {
194                biased;
195                () = shutdown.notified() => return,
196                res = members.changed() => {
197                    if res.is_err() {
198                        warn!("membership watch sender dropped; rebalance controller exiting");
199                        return;
200                    }
201                }
202            }
203            debug!("membership change observed; debouncing");
204
205            // Debounce: absorb further churn before acting.
206            loop {
207                tokio::select! {
208                    biased;
209                    () = shutdown.notified() => return,
210                    res = tokio::time::timeout(
211                        config.rebalance_debounce, members.changed()
212                    ) => {
213                        match res {
214                            Ok(Ok(())) => {}       // another change; keep waiting
215                            Ok(Err(_)) => return,  // sender dropped
216                            Err(_) => break,        // quiet period elapsed
217                        }
218                    }
219                }
220            }
221
222            // Retry transient failures so a single hiccup doesn't
223            // leave the cluster on a stale assignment.
224            loop {
225                if !controller.is_leader() {
226                    debug!("membership changed; not the leader — skipping rotation check");
227                    break;
228                }
229                // Use assignable (Active, non-draining) instances so
230                // Draining/Suspected nodes are never handed vnodes. The
231                // weak leader gate above still uses live membership;
232                // `LeaderLeaseManager` is the fencing authority for split-
233                // brain hardening (kept standalone, see leader_lease.rs).
234                let live = controller.assignable_instances();
235                match try_rebalance(&db, &controller, &store, &registry, &live, config).await {
236                    Ok(Some(v)) => {
237                        info!(version = v, "rotated assignment");
238                        break;
239                    }
240                    Ok(None) => {
241                        debug!("live set matches current snapshot; no rotation");
242                        break;
243                    }
244                    Err(e) => {
245                        warn!(error = %e, "rebalance failed; retrying after backoff");
246                        tokio::select! {
247                            biased;
248                            () = shutdown.notified() => return,
249                            () = tokio::time::sleep(config.retry_delay) => {}
250                        }
251                    }
252                }
253            }
254        }
255    })
256}
257
258/// Poll the durable assignment snapshot until `me` owns no vnodes (its
259/// state has been reassigned elsewhere) or `deadline` elapses. Returns
260/// true if fully drained. Used by a draining node to know when it is
261/// safe to exit.
262pub async fn wait_until_drained(
263    store: &AssignmentSnapshotStore,
264    me: NodeId,
265    poll: Duration,
266    deadline: Duration,
267) -> bool {
268    let start = tokio::time::Instant::now();
269    loop {
270        match store.load().await {
271            // No snapshot at all → nothing owns us → drained.
272            Ok(None) => return true,
273            Ok(Some(snap)) => {
274                if !snap.vnodes.values().any(|owner| *owner == me) {
275                    return true;
276                }
277            }
278            Err(e) => warn!(error = %e, "wait_until_drained: snapshot load failed"),
279        }
280        if start.elapsed() >= deadline {
281            return false;
282        }
283        let remaining = deadline.saturating_sub(start.elapsed());
284        tokio::time::sleep(poll.min(remaining)).await;
285        if start.elapsed() >= deadline {
286            return false;
287        }
288    }
289}
290
291/// `Ok(Some(version))` on rotation (ours or a peer's), `Ok(None)` if
292/// no change is needed.
293async fn try_rebalance(
294    db: &Arc<LaminarDB>,
295    controller: &Arc<ClusterController>,
296    store: &Arc<AssignmentSnapshotStore>,
297    registry: &Arc<VnodeRegistry>,
298    live: &[NodeId],
299    config: RebalanceConfig,
300) -> Result<Option<u64>, String> {
301    // No assignable instances (whole cluster draining/joining) — nothing to
302    // rotate to. Hold the current assignment rather than panicking the
303    // placement call on an empty node set.
304    if live.is_empty() {
305        return Ok(None);
306    }
307
308    let current = store
309        .load()
310        .await
311        .map_err(|e| e.to_string())?
312        .ok_or_else(|| "no snapshot on store — boot seed missing".to_string())?;
313
314    let new_assignment = rendezvous_assignment(registry.vnode_count(), live);
315    let new_vnodes = AssignmentSnapshot::vnodes_from_vec(&new_assignment);
316    if new_vnodes == current.vnodes {
317        return Ok(None);
318    }
319
320    // Drain in-flight shuffle rows into durable state at the old
321    // fence version before rotating. When the rotation sheds a DEAD
322    // node, skip the drain entirely: its durability gate needs captures
323    // from a node that will never provide them, so it can only burn its
324    // full timeout and abort — deadlocking rotation against the gate
325    // when rotation is the only thing that restores commit
326    // availability. The dead node's in-flight rows are unrecoverable
327    // regardless; survivors rehydrate its vnodes from the last
328    // committed epoch — the same at-least-once duplication window every
329    // ungraceful failover already has.
330    //
331    // "Dead" must be judged from MEMBERSHIP, not the assignable set:
332    // a Draining node is excluded from assignment but is alive and can
333    // checkpoint — its graceful handoff depends on this drain sealing
334    // its in-flight rows before the rotation takes its vnodes.
335    let shedding_dead = {
336        use laminar_core::cluster::discovery::NodeState;
337        let owners = current.to_vnode_vec(registry.vnode_count());
338        let members = controller.members_watch().borrow().clone();
339        owners.iter().filter(|o| !live.contains(o)).any(|&o| {
340            let dead_in_membership = match members.iter().find(|m| m.id.0 == o.0) {
341                Some(node) => matches!(node.state, NodeState::Suspected | NodeState::Left),
342                None => true,
343            };
344            dead_in_membership || controller.is_recently_unresponsive(o)
345        })
346    };
347    if shedding_dead {
348        warn!(
349            "rotation sheds a dead node — skipping the pre-rotation drain \
350             checkpoint (it cannot seal without the dead node's captures)"
351        );
352    } else {
353        let ckpt = tokio::time::timeout(config.checkpoint_timeout, db.checkpoint())
354            .await
355            .map_err(|_| {
356                format!(
357                    "pre-rotation checkpoint did not complete within {}s",
358                    config.checkpoint_timeout.as_secs()
359                )
360            })?
361            .map_err(|e| e.to_string())?;
362        if !ckpt.success {
363            return Err(ckpt
364                .error
365                .unwrap_or_else(|| "checkpoint returned success=false".into()));
366        }
367    }
368
369    let proposal = current.next(new_vnodes);
370    match store
371        .save_if_version(&proposal, current.version)
372        .await
373        .map_err(|e| e.to_string())?
374    {
375        RotateOutcome::Rotated => {
376            let v = proposal.version;
377            let adoption = db.adopt_assignment_snapshot(proposal).await;
378            log_adoption("rebalance", &adoption);
379            controller.announce_snapshot_version(v).await;
380            // Keep the current plus one prior as slack for in-flight
381            // readers — `prune_before(v - 1)` retains `[v-1, v]`.
382            let prune = v.saturating_sub(1);
383            if let Err(e) = store.prune_before(prune).await {
384                warn!(error = %e, "snapshot prune failed");
385            }
386            Ok(Some(v))
387        }
388        RotateOutcome::Conflict(winner) => {
389            let v = winner.version;
390            let adoption = db.adopt_assignment_snapshot(winner).await;
391            log_adoption("rebalance-conflict", &adoption);
392            controller.announce_snapshot_version(v).await;
393            Ok(Some(v))
394        }
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::collections::BTreeMap;
402
403    use object_store::memory::InMemory;
404    use object_store::ObjectStore;
405
406    fn store() -> AssignmentSnapshotStore {
407        let mem: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
408        AssignmentSnapshotStore::new(mem)
409    }
410
411    #[test]
412    fn publish_placement_metrics_labels_by_domain() {
413        let prom = prometheus::Registry::new();
414        let metrics = EngineMetrics::new(&prom);
415
416        // 4 vnodes: node 1 owns two, node 2 owns one, one is unassigned.
417        let vreg = VnodeRegistry::new(4);
418        vreg.set_assignment(vec![NodeId(1), NodeId(1), NodeId(2), NodeId::UNASSIGNED].into());
419        let nodes = vec![
420            (NodeId(1), Locality::parse("region=r;zone=z1")),
421            (NodeId(2), Locality::parse("region=r;zone=z2")),
422        ];
423
424        publish_placement_metrics(&metrics, &vreg, &nodes, 1); // isolation_tier 1 = zone
425
426        let g = &metrics.placement_vnodes_per_domain;
427        assert_eq!(g.with_label_values(&["r;z1"]).get(), 2);
428        assert_eq!(g.with_label_values(&["r;z2"]).get(), 1);
429        assert_eq!(g.with_label_values(&["unknown"]).get(), 1); // the unassigned vnode
430                                                                // Blast radius = largest domain (2) / total vnodes (4).
431        assert!((metrics.placement_blast_radius_ratio.get() - 0.5).abs() < 1e-9);
432    }
433
434    #[tokio::test]
435    async fn wait_until_drained_false_while_owning_vnodes() {
436        let s = store();
437        let me = NodeId(1);
438        let mut vnodes = BTreeMap::new();
439        vnodes.insert(0, me);
440        vnodes.insert(1, NodeId(2));
441        let snap = AssignmentSnapshot::empty().next(vnodes);
442        s.save_if_absent(&snap).await.unwrap();
443
444        let drained = wait_until_drained(
445            &s,
446            me,
447            Duration::from_millis(20),
448            Duration::from_millis(120),
449        )
450        .await;
451        assert!(!drained, "still owns vnode 0 → not drained");
452    }
453
454    #[tokio::test]
455    async fn wait_until_drained_true_when_owning_none() {
456        let s = store();
457        let me = NodeId(1);
458        let mut vnodes = BTreeMap::new();
459        vnodes.insert(0, NodeId(2));
460        vnodes.insert(1, NodeId(3));
461        let snap = AssignmentSnapshot::empty().next(vnodes);
462        s.save_if_absent(&snap).await.unwrap();
463
464        let drained =
465            wait_until_drained(&s, me, Duration::from_millis(20), Duration::from_secs(5)).await;
466        assert!(drained, "owns no vnode → drained quickly");
467    }
468
469    #[tokio::test]
470    async fn wait_until_drained_true_when_no_snapshot() {
471        let s = store();
472        let drained = wait_until_drained(
473            &s,
474            NodeId(1),
475            Duration::from_millis(20),
476            Duration::from_secs(5),
477        )
478        .await;
479        assert!(drained, "no snapshot → treated as drained");
480    }
481}