Skip to main content

laminar_db/
rebalance.rs

1//! Dynamic vnode rebalance control plane. See
2//! `docs/plans/cluster-production-readiness.md`.
3
4#![cfg(feature = "cluster-unstable")]
5#![allow(clippy::disallowed_types)] // cold path
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use laminar_core::cluster::control::{
11    AssignmentSnapshot, AssignmentSnapshotStore, ClusterController, RotateOutcome,
12};
13use laminar_core::state::{round_robin_assignment, NodeId, VnodeRegistry};
14use tokio::sync::Notify;
15use tokio::task::JoinHandle;
16use tokio::time::MissedTickBehavior;
17use tracing::{debug, info, warn};
18
19use crate::db::LaminarDB;
20
21/// Tunables for the rebalance control plane.
22#[derive(Debug, Clone, Copy)]
23pub struct RebalanceConfig {
24    /// Interval between snapshot-store polls.
25    pub watcher_poll: Duration,
26    /// Quiet period before a membership change triggers rotation.
27    pub rebalance_debounce: Duration,
28    /// Upper bound on the pre-rotation forced checkpoint.
29    pub checkpoint_timeout: Duration,
30    /// Delay before retrying a failed rotation.
31    pub retry_delay: Duration,
32}
33
34impl Default for RebalanceConfig {
35    fn default() -> Self {
36        Self {
37            watcher_poll: Duration::from_secs(2),
38            rebalance_debounce: Duration::from_secs(5),
39            checkpoint_timeout: Duration::from_secs(60),
40            retry_delay: Duration::from_secs(10),
41        }
42    }
43}
44
45impl RebalanceConfig {
46    /// Fast timings for tests — 500ms debounce thrashes in production.
47    #[doc(hidden)]
48    #[must_use]
49    pub fn test_defaults() -> Self {
50        Self {
51            watcher_poll: Duration::from_millis(200),
52            rebalance_debounce: Duration::from_millis(500),
53            checkpoint_timeout: Duration::from_secs(30),
54            retry_delay: Duration::from_millis(500),
55        }
56    }
57}
58
59/// Spawn the per-node snapshot watcher. Exits on `shutdown`.
60pub fn spawn_snapshot_watcher(
61    db: Arc<LaminarDB>,
62    store: Arc<AssignmentSnapshotStore>,
63    registry: Arc<VnodeRegistry>,
64    shutdown: Arc<Notify>,
65    config: RebalanceConfig,
66) -> JoinHandle<()> {
67    tokio::spawn(async move {
68        let mut ticker = tokio::time::interval(config.watcher_poll);
69        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
70        ticker.tick().await; // burn the immediate first tick
71        loop {
72            tokio::select! {
73                biased;
74                () = shutdown.notified() => return,
75                _ = ticker.tick() => {}
76            }
77            let local = registry.assignment_version();
78            match store.load().await {
79                Ok(Some(snap)) if snap.version > local => {
80                    debug!(local, remote = snap.version, "adopting newer assignment");
81                    db.adopt_assignment_snapshot(snap).await;
82                }
83                Ok(_) => {}
84                Err(e) => warn!(error = %e, "snapshot watcher: load failed"),
85            }
86        }
87    })
88}
89
90/// Spawn the leader-gated rebalance controller. Runs on every node;
91/// leadership is re-checked after the debounce.
92pub fn spawn_rebalance_controller(
93    db: Arc<LaminarDB>,
94    controller: Arc<ClusterController>,
95    store: Arc<AssignmentSnapshotStore>,
96    registry: Arc<VnodeRegistry>,
97    shutdown: Arc<Notify>,
98    config: RebalanceConfig,
99) -> JoinHandle<()> {
100    tokio::spawn(async move {
101        let mut members = controller.members_watch();
102        loop {
103            tokio::select! {
104                biased;
105                () = shutdown.notified() => return,
106                res = members.changed() => {
107                    if res.is_err() { return; }
108                }
109            }
110
111            // Debounce: absorb further churn before acting.
112            loop {
113                tokio::select! {
114                    biased;
115                    () = shutdown.notified() => return,
116                    res = tokio::time::timeout(
117                        config.rebalance_debounce, members.changed()
118                    ) => {
119                        match res {
120                            Ok(Ok(())) => {}       // another change; keep waiting
121                            Ok(Err(_)) => return,  // sender dropped
122                            Err(_) => break,        // quiet period elapsed
123                        }
124                    }
125                }
126            }
127
128            // Retry transient failures so a single hiccup doesn't
129            // leave the cluster on a stale assignment.
130            loop {
131                if !controller.is_leader() {
132                    break;
133                }
134                let live = controller.live_instances();
135                match try_rebalance(&db, &store, &registry, &live, config).await {
136                    Ok(Some(v)) => {
137                        info!(version = v, "rotated assignment");
138                        break;
139                    }
140                    Ok(None) => {
141                        debug!("live set matches current snapshot; no rotation");
142                        break;
143                    }
144                    Err(e) => {
145                        warn!(error = %e, "rebalance failed; retrying after backoff");
146                        tokio::select! {
147                            biased;
148                            () = shutdown.notified() => return,
149                            () = tokio::time::sleep(config.retry_delay) => {}
150                        }
151                    }
152                }
153            }
154        }
155    })
156}
157
158/// `Ok(Some(version))` on rotation (ours or a peer's), `Ok(None)` if
159/// no change is needed.
160async fn try_rebalance(
161    db: &Arc<LaminarDB>,
162    store: &Arc<AssignmentSnapshotStore>,
163    registry: &Arc<VnodeRegistry>,
164    live: &[NodeId],
165    config: RebalanceConfig,
166) -> Result<Option<u64>, String> {
167    let current = store
168        .load()
169        .await
170        .map_err(|e| e.to_string())?
171        .ok_or_else(|| "no snapshot on store — boot seed missing".to_string())?;
172
173    let new_assignment = round_robin_assignment(registry.vnode_count(), live);
174    let new_vnodes = AssignmentSnapshot::vnodes_from_vec(&new_assignment);
175    if new_vnodes == current.vnodes {
176        return Ok(None);
177    }
178
179    // Drain in-flight shuffle rows into durable state at the old
180    // fence version before rotating.
181    let ckpt = tokio::time::timeout(config.checkpoint_timeout, db.checkpoint())
182        .await
183        .map_err(|_| {
184            format!(
185                "pre-rotation checkpoint did not complete within {}s",
186                config.checkpoint_timeout.as_secs()
187            )
188        })?
189        .map_err(|e| e.to_string())?;
190    if !ckpt.success {
191        return Err(ckpt
192            .error
193            .unwrap_or_else(|| "checkpoint returned success=false".into()));
194    }
195
196    let proposal = current.next(new_vnodes);
197    match store
198        .save_if_version(&proposal, current.version)
199        .await
200        .map_err(|e| e.to_string())?
201    {
202        RotateOutcome::Rotated => {
203            let v = proposal.version;
204            db.adopt_assignment_snapshot(proposal).await;
205            // Keep the current plus one prior as slack for in-flight
206            // readers — `prune_before(v - 1)` retains `[v-1, v]`.
207            let prune = v.saturating_sub(1);
208            if let Err(e) = store.prune_before(prune).await {
209                warn!(error = %e, "snapshot prune failed");
210            }
211            Ok(Some(v))
212        }
213        RotateOutcome::Conflict(winner) => {
214            let v = winner.version;
215            db.adopt_assignment_snapshot(winner).await;
216            Ok(Some(v))
217        }
218    }
219}