1#![cfg(feature = "cluster")]
4#![allow(clippy::disallowed_types)] use std::sync::Arc;
7use std::time::Duration;
8
9use laminar_core::cluster::control::{
10 AssignmentSnapshot, AssignmentSnapshotStore, ClusterController, RotateOutcome,
11};
12use laminar_core::state::{
13 owners_per_domain, rendezvous_assignment, Locality, NodeId, VnodeRegistry,
14};
15use tokio::sync::Notify;
16use tokio::task::JoinHandle;
17use tokio::time::MissedTickBehavior;
18use tracing::{debug, info, warn};
19
20use crate::db::{LaminarDB, SnapshotAdoption};
21use crate::engine_metrics::EngineMetrics;
22
23#[derive(Debug, Clone, Copy)]
25pub struct RebalanceConfig {
26 pub watcher_poll: Duration,
28 pub rebalance_debounce: Duration,
30 pub checkpoint_timeout: Duration,
32 pub retry_delay: Duration,
34 pub placement_isolation_tier: usize,
36}
37
38impl Default for RebalanceConfig {
39 fn default() -> Self {
40 Self {
41 watcher_poll: Duration::from_secs(2),
42 rebalance_debounce: Duration::from_secs(5),
43 checkpoint_timeout: Duration::from_secs(15),
48 retry_delay: Duration::from_secs(2),
49 placement_isolation_tier: 0,
50 }
51 }
52}
53
54impl RebalanceConfig {
55 #[doc(hidden)]
57 #[must_use]
58 pub fn test_defaults() -> Self {
59 Self {
60 watcher_poll: Duration::from_millis(200),
61 rebalance_debounce: Duration::from_millis(500),
62 checkpoint_timeout: Duration::from_secs(30),
63 retry_delay: Duration::from_millis(500),
64 placement_isolation_tier: 0,
65 }
66 }
67}
68
69fn log_adoption(source: &str, adoption: &SnapshotAdoption) {
74 if adoption.newly_acquired.is_empty() {
75 return;
76 }
77 info!(
78 source,
79 version = adoption.version,
80 newly_acquired = adoption.newly_acquired.len(),
81 rehydrated = adoption.rehydrated,
82 rehydration_epoch = ?adoption.rehydration_epoch,
83 "rehydrated newly-acquired vnodes after rebalance",
84 );
85}
86
87pub fn spawn_snapshot_watcher(
89 db: Arc<LaminarDB>,
90 store: Arc<AssignmentSnapshotStore>,
91 registry: Arc<VnodeRegistry>,
92 shutdown: Arc<Notify>,
93 config: RebalanceConfig,
94 controller: Option<Arc<ClusterController>>,
95) -> JoinHandle<()> {
96 tokio::spawn(async move {
97 let mut ticker = tokio::time::interval(config.watcher_poll);
98 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
99 ticker.tick().await; let mut published_version = 0;
101 loop {
102 tokio::select! {
103 biased;
104 () = shutdown.notified() => return,
105 _ = ticker.tick() => {}
106 }
107 let local = registry.assignment_version();
108
109 let mut remote_newer = true;
110 if let Some(ref c) = controller {
111 if let Some(gossiped_version) = c.read_snapshot_version().await {
112 if gossiped_version <= local {
113 remote_newer = false;
114 }
115 }
116 }
117
118 if remote_newer {
119 match store.load().await {
120 Ok(Some(snap)) if snap.version > local => {
121 debug!(local, remote = snap.version, "adopting newer assignment");
122 let adoption = db.adopt_assignment_snapshot(snap).await;
123 log_adoption("watcher", &adoption);
124 }
125 Ok(_) => {}
126 Err(e) => warn!(error = %e, "snapshot watcher: load failed"),
127 }
128 }
129
130 let version = registry.assignment_version();
132 if version != published_version {
133 published_version = version;
134 if let (Some(c), Some(metrics)) = (controller.as_ref(), db.engine_metrics()) {
135 let nodes = c.assignable_with_locality();
136 publish_placement_metrics(
137 &metrics,
138 ®istry,
139 &nodes,
140 config.placement_isolation_tier,
141 );
142 }
143 }
144 }
145 })
146}
147
148#[allow(clippy::cast_precision_loss)]
151fn publish_placement_metrics(
152 metrics: &EngineMetrics,
153 registry: &VnodeRegistry,
154 nodes: &[(NodeId, Locality)],
155 isolation_tier: usize,
156) {
157 let owners = registry.snapshot();
158 let total = owners.len().max(1);
159 let counts = owners_per_domain(&owners, nodes, isolation_tier);
160
161 metrics.placement_vnodes_per_domain.reset();
162 let mut max = 0u32;
163 for (domain, &count) in &counts {
164 let label = if domain.is_empty() {
165 "unknown"
166 } else {
167 domain.as_str()
168 };
169 metrics
170 .placement_vnodes_per_domain
171 .with_label_values(&[label])
172 .set(i64::from(count));
173 max = max.max(count);
174 }
175 metrics
176 .placement_blast_radius_ratio
177 .set(f64::from(max) / total as f64);
178}
179
180pub fn spawn_rebalance_controller(
183 db: Arc<LaminarDB>,
184 controller: Arc<ClusterController>,
185 store: Arc<AssignmentSnapshotStore>,
186 registry: Arc<VnodeRegistry>,
187 shutdown: Arc<Notify>,
188 config: RebalanceConfig,
189) -> JoinHandle<()> {
190 tokio::spawn(async move {
191 let mut members = controller.members_watch();
192 loop {
193 tokio::select! {
194 biased;
195 () = shutdown.notified() => return,
196 res = members.changed() => {
197 if res.is_err() {
198 warn!("membership watch sender dropped; rebalance controller exiting");
199 return;
200 }
201 }
202 }
203 debug!("membership change observed; debouncing");
204
205 loop {
207 tokio::select! {
208 biased;
209 () = shutdown.notified() => return,
210 res = tokio::time::timeout(
211 config.rebalance_debounce, members.changed()
212 ) => {
213 match res {
214 Ok(Ok(())) => {} Ok(Err(_)) => return, Err(_) => break, }
218 }
219 }
220 }
221
222 loop {
225 if !controller.is_leader() {
226 debug!("membership changed; not the leader — skipping rotation check");
227 break;
228 }
229 let live = controller.assignable_instances();
235 match try_rebalance(&db, &controller, &store, ®istry, &live, config).await {
236 Ok(Some(v)) => {
237 info!(version = v, "rotated assignment");
238 break;
239 }
240 Ok(None) => {
241 debug!("live set matches current snapshot; no rotation");
242 break;
243 }
244 Err(e) => {
245 warn!(error = %e, "rebalance failed; retrying after backoff");
246 tokio::select! {
247 biased;
248 () = shutdown.notified() => return,
249 () = tokio::time::sleep(config.retry_delay) => {}
250 }
251 }
252 }
253 }
254 }
255 })
256}
257
258pub async fn wait_until_drained(
263 store: &AssignmentSnapshotStore,
264 me: NodeId,
265 poll: Duration,
266 deadline: Duration,
267) -> bool {
268 let start = tokio::time::Instant::now();
269 loop {
270 match store.load().await {
271 Ok(None) => return true,
273 Ok(Some(snap)) => {
274 if !snap.vnodes.values().any(|owner| *owner == me) {
275 return true;
276 }
277 }
278 Err(e) => warn!(error = %e, "wait_until_drained: snapshot load failed"),
279 }
280 if start.elapsed() >= deadline {
281 return false;
282 }
283 let remaining = deadline.saturating_sub(start.elapsed());
284 tokio::time::sleep(poll.min(remaining)).await;
285 if start.elapsed() >= deadline {
286 return false;
287 }
288 }
289}
290
291async fn try_rebalance(
294 db: &Arc<LaminarDB>,
295 controller: &Arc<ClusterController>,
296 store: &Arc<AssignmentSnapshotStore>,
297 registry: &Arc<VnodeRegistry>,
298 live: &[NodeId],
299 config: RebalanceConfig,
300) -> Result<Option<u64>, String> {
301 if live.is_empty() {
305 return Ok(None);
306 }
307
308 let current = store
309 .load()
310 .await
311 .map_err(|e| e.to_string())?
312 .ok_or_else(|| "no snapshot on store — boot seed missing".to_string())?;
313
314 let new_assignment = rendezvous_assignment(registry.vnode_count(), live);
315 let new_vnodes = AssignmentSnapshot::vnodes_from_vec(&new_assignment);
316 if new_vnodes == current.vnodes {
317 return Ok(None);
318 }
319
320 let shedding_dead = {
336 use laminar_core::cluster::discovery::NodeState;
337 let owners = current.to_vnode_vec(registry.vnode_count());
338 let members = controller.members_watch().borrow().clone();
339 owners.iter().filter(|o| !live.contains(o)).any(|&o| {
340 let dead_in_membership = match members.iter().find(|m| m.id.0 == o.0) {
341 Some(node) => matches!(node.state, NodeState::Suspected | NodeState::Left),
342 None => true,
343 };
344 dead_in_membership || controller.is_recently_unresponsive(o)
345 })
346 };
347 if shedding_dead {
348 warn!(
349 "rotation sheds a dead node — skipping the pre-rotation drain \
350 checkpoint (it cannot seal without the dead node's captures)"
351 );
352 } else {
353 let ckpt = tokio::time::timeout(config.checkpoint_timeout, db.checkpoint())
354 .await
355 .map_err(|_| {
356 format!(
357 "pre-rotation checkpoint did not complete within {}s",
358 config.checkpoint_timeout.as_secs()
359 )
360 })?
361 .map_err(|e| e.to_string())?;
362 if !ckpt.success {
363 return Err(ckpt
364 .error
365 .unwrap_or_else(|| "checkpoint returned success=false".into()));
366 }
367 }
368
369 let proposal = current.next(new_vnodes);
370 match store
371 .save_if_version(&proposal, current.version)
372 .await
373 .map_err(|e| e.to_string())?
374 {
375 RotateOutcome::Rotated => {
376 let v = proposal.version;
377 let adoption = db.adopt_assignment_snapshot(proposal).await;
378 log_adoption("rebalance", &adoption);
379 controller.announce_snapshot_version(v).await;
380 let prune = v.saturating_sub(1);
383 if let Err(e) = store.prune_before(prune).await {
384 warn!(error = %e, "snapshot prune failed");
385 }
386 Ok(Some(v))
387 }
388 RotateOutcome::Conflict(winner) => {
389 let v = winner.version;
390 let adoption = db.adopt_assignment_snapshot(winner).await;
391 log_adoption("rebalance-conflict", &adoption);
392 controller.announce_snapshot_version(v).await;
393 Ok(Some(v))
394 }
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use std::collections::BTreeMap;
402
403 use object_store::memory::InMemory;
404 use object_store::ObjectStore;
405
406 fn store() -> AssignmentSnapshotStore {
407 let mem: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
408 AssignmentSnapshotStore::new(mem)
409 }
410
411 #[test]
412 fn publish_placement_metrics_labels_by_domain() {
413 let prom = prometheus::Registry::new();
414 let metrics = EngineMetrics::new(&prom);
415
416 let vreg = VnodeRegistry::new(4);
418 vreg.set_assignment(vec![NodeId(1), NodeId(1), NodeId(2), NodeId::UNASSIGNED].into());
419 let nodes = vec![
420 (NodeId(1), Locality::parse("region=r;zone=z1")),
421 (NodeId(2), Locality::parse("region=r;zone=z2")),
422 ];
423
424 publish_placement_metrics(&metrics, &vreg, &nodes, 1); let g = &metrics.placement_vnodes_per_domain;
427 assert_eq!(g.with_label_values(&["r;z1"]).get(), 2);
428 assert_eq!(g.with_label_values(&["r;z2"]).get(), 1);
429 assert_eq!(g.with_label_values(&["unknown"]).get(), 1); assert!((metrics.placement_blast_radius_ratio.get() - 0.5).abs() < 1e-9);
432 }
433
434 #[tokio::test]
435 async fn wait_until_drained_false_while_owning_vnodes() {
436 let s = store();
437 let me = NodeId(1);
438 let mut vnodes = BTreeMap::new();
439 vnodes.insert(0, me);
440 vnodes.insert(1, NodeId(2));
441 let snap = AssignmentSnapshot::empty().next(vnodes);
442 s.save_if_absent(&snap).await.unwrap();
443
444 let drained = wait_until_drained(
445 &s,
446 me,
447 Duration::from_millis(20),
448 Duration::from_millis(120),
449 )
450 .await;
451 assert!(!drained, "still owns vnode 0 → not drained");
452 }
453
454 #[tokio::test]
455 async fn wait_until_drained_true_when_owning_none() {
456 let s = store();
457 let me = NodeId(1);
458 let mut vnodes = BTreeMap::new();
459 vnodes.insert(0, NodeId(2));
460 vnodes.insert(1, NodeId(3));
461 let snap = AssignmentSnapshot::empty().next(vnodes);
462 s.save_if_absent(&snap).await.unwrap();
463
464 let drained =
465 wait_until_drained(&s, me, Duration::from_millis(20), Duration::from_secs(5)).await;
466 assert!(drained, "owns no vnode → drained quickly");
467 }
468
469 #[tokio::test]
470 async fn wait_until_drained_true_when_no_snapshot() {
471 let s = store();
472 let drained = wait_until_drained(
473 &s,
474 NodeId(1),
475 Duration::from_millis(20),
476 Duration::from_secs(5),
477 )
478 .await;
479 assert!(drained, "no snapshot → treated as drained");
480 }
481}