1#![cfg(feature = "cluster-unstable")]
5#![allow(clippy::disallowed_types)] use std::sync::Arc;
8use std::time::Duration;
9
10use laminar_core::cluster::control::{
11 AssignmentSnapshot, AssignmentSnapshotStore, ClusterController, RotateOutcome,
12};
13use laminar_core::state::{round_robin_assignment, NodeId, VnodeRegistry};
14use tokio::sync::Notify;
15use tokio::task::JoinHandle;
16use tokio::time::MissedTickBehavior;
17use tracing::{debug, info, warn};
18
19use crate::db::LaminarDB;
20
21#[derive(Debug, Clone, Copy)]
23pub struct RebalanceConfig {
24 pub watcher_poll: Duration,
26 pub rebalance_debounce: Duration,
28 pub checkpoint_timeout: Duration,
30 pub retry_delay: Duration,
32}
33
34impl Default for RebalanceConfig {
35 fn default() -> Self {
36 Self {
37 watcher_poll: Duration::from_secs(2),
38 rebalance_debounce: Duration::from_secs(5),
39 checkpoint_timeout: Duration::from_secs(60),
40 retry_delay: Duration::from_secs(10),
41 }
42 }
43}
44
45impl RebalanceConfig {
46 #[doc(hidden)]
48 #[must_use]
49 pub fn test_defaults() -> Self {
50 Self {
51 watcher_poll: Duration::from_millis(200),
52 rebalance_debounce: Duration::from_millis(500),
53 checkpoint_timeout: Duration::from_secs(30),
54 retry_delay: Duration::from_millis(500),
55 }
56 }
57}
58
59pub fn spawn_snapshot_watcher(
61 db: Arc<LaminarDB>,
62 store: Arc<AssignmentSnapshotStore>,
63 registry: Arc<VnodeRegistry>,
64 shutdown: Arc<Notify>,
65 config: RebalanceConfig,
66) -> JoinHandle<()> {
67 tokio::spawn(async move {
68 let mut ticker = tokio::time::interval(config.watcher_poll);
69 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
70 ticker.tick().await; loop {
72 tokio::select! {
73 biased;
74 () = shutdown.notified() => return,
75 _ = ticker.tick() => {}
76 }
77 let local = registry.assignment_version();
78 match store.load().await {
79 Ok(Some(snap)) if snap.version > local => {
80 debug!(local, remote = snap.version, "adopting newer assignment");
81 db.adopt_assignment_snapshot(snap).await;
82 }
83 Ok(_) => {}
84 Err(e) => warn!(error = %e, "snapshot watcher: load failed"),
85 }
86 }
87 })
88}
89
90pub fn spawn_rebalance_controller(
93 db: Arc<LaminarDB>,
94 controller: Arc<ClusterController>,
95 store: Arc<AssignmentSnapshotStore>,
96 registry: Arc<VnodeRegistry>,
97 shutdown: Arc<Notify>,
98 config: RebalanceConfig,
99) -> JoinHandle<()> {
100 tokio::spawn(async move {
101 let mut members = controller.members_watch();
102 loop {
103 tokio::select! {
104 biased;
105 () = shutdown.notified() => return,
106 res = members.changed() => {
107 if res.is_err() { return; }
108 }
109 }
110
111 loop {
113 tokio::select! {
114 biased;
115 () = shutdown.notified() => return,
116 res = tokio::time::timeout(
117 config.rebalance_debounce, members.changed()
118 ) => {
119 match res {
120 Ok(Ok(())) => {} Ok(Err(_)) => return, Err(_) => break, }
124 }
125 }
126 }
127
128 loop {
131 if !controller.is_leader() {
132 break;
133 }
134 let live = controller.live_instances();
135 match try_rebalance(&db, &store, ®istry, &live, config).await {
136 Ok(Some(v)) => {
137 info!(version = v, "rotated assignment");
138 break;
139 }
140 Ok(None) => {
141 debug!("live set matches current snapshot; no rotation");
142 break;
143 }
144 Err(e) => {
145 warn!(error = %e, "rebalance failed; retrying after backoff");
146 tokio::select! {
147 biased;
148 () = shutdown.notified() => return,
149 () = tokio::time::sleep(config.retry_delay) => {}
150 }
151 }
152 }
153 }
154 }
155 })
156}
157
158async fn try_rebalance(
161 db: &Arc<LaminarDB>,
162 store: &Arc<AssignmentSnapshotStore>,
163 registry: &Arc<VnodeRegistry>,
164 live: &[NodeId],
165 config: RebalanceConfig,
166) -> Result<Option<u64>, String> {
167 let current = store
168 .load()
169 .await
170 .map_err(|e| e.to_string())?
171 .ok_or_else(|| "no snapshot on store — boot seed missing".to_string())?;
172
173 let new_assignment = round_robin_assignment(registry.vnode_count(), live);
174 let new_vnodes = AssignmentSnapshot::vnodes_from_vec(&new_assignment);
175 if new_vnodes == current.vnodes {
176 return Ok(None);
177 }
178
179 let ckpt = tokio::time::timeout(config.checkpoint_timeout, db.checkpoint())
182 .await
183 .map_err(|_| {
184 format!(
185 "pre-rotation checkpoint did not complete within {}s",
186 config.checkpoint_timeout.as_secs()
187 )
188 })?
189 .map_err(|e| e.to_string())?;
190 if !ckpt.success {
191 return Err(ckpt
192 .error
193 .unwrap_or_else(|| "checkpoint returned success=false".into()));
194 }
195
196 let proposal = current.next(new_vnodes);
197 match store
198 .save_if_version(&proposal, current.version)
199 .await
200 .map_err(|e| e.to_string())?
201 {
202 RotateOutcome::Rotated => {
203 let v = proposal.version;
204 db.adopt_assignment_snapshot(proposal).await;
205 let prune = v.saturating_sub(1);
208 if let Err(e) = store.prune_before(prune).await {
209 warn!(error = %e, "snapshot prune failed");
210 }
211 Ok(Some(v))
212 }
213 RotateOutcome::Conflict(winner) => {
214 let v = winner.version;
215 db.adopt_assignment_snapshot(winner).await;
216 Ok(Some(v))
217 }
218 }
219}