laminar_core/cluster/control/
snapshot.rs1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::Bytes;
14use object_store::path::Path as OsPath;
15use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
16use serde::{Deserialize, Serialize};
17use tokio_stream::StreamExt;
18
19use crate::cluster::discovery::NodeId;
20
21const SNAPSHOT_PREFIX: &str = "control/assignment-snapshots/";
22
23fn snapshot_path(version: u64) -> OsPath {
24 OsPath::from(format!("{SNAPSHOT_PREFIX}v{version:016}.json"))
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct AssignmentSnapshot {
31 pub version: u64,
33 pub vnodes: BTreeMap<u32, NodeId>,
37 pub updated_at_ms: i64,
39}
40
41impl AssignmentSnapshot {
42 #[must_use]
44 pub fn empty() -> Self {
45 Self {
46 version: 0,
47 vnodes: BTreeMap::new(),
48 updated_at_ms: 0,
49 }
50 }
51
52 #[must_use]
54 pub fn next(&self, vnodes: BTreeMap<u32, NodeId>) -> Self {
55 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
56 let now_ms = std::time::SystemTime::now()
57 .duration_since(std::time::UNIX_EPOCH)
58 .map_or(0, |d| d.as_millis() as i64);
59 Self {
60 version: self.version + 1,
61 vnodes,
62 updated_at_ms: now_ms,
63 }
64 }
65
66 #[must_use]
70 pub fn vnodes_from_vec(assignment: &[NodeId]) -> BTreeMap<u32, NodeId> {
71 #[allow(clippy::cast_possible_truncation)]
72 assignment
73 .iter()
74 .enumerate()
75 .map(|(i, n)| (i as u32, *n))
76 .collect()
77 }
78
79 #[must_use]
83 pub fn to_vnode_vec(&self, vnode_count: u32) -> Vec<NodeId> {
84 (0..vnode_count)
85 .map(|v| self.vnodes.get(&v).copied().unwrap_or(NodeId::UNASSIGNED))
86 .collect()
87 }
88}
89
90pub struct AssignmentSnapshotStore {
92 store: Arc<dyn ObjectStore>,
93}
94
95impl std::fmt::Debug for AssignmentSnapshotStore {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("AssignmentSnapshotStore")
98 .finish_non_exhaustive()
99 }
100}
101
102#[derive(Debug, thiserror::Error)]
104pub enum SnapshotError {
105 #[error("object store I/O: {0}")]
107 Io(String),
108 #[error("JSON: {0}")]
110 Json(#[from] serde_json::Error),
111}
112
113impl AssignmentSnapshotStore {
114 #[must_use]
116 pub fn new(store: Arc<dyn ObjectStore>) -> Self {
117 Self { store }
118 }
119
120 async fn list_versions(&self) -> Result<Vec<u64>, SnapshotError> {
124 let prefix = OsPath::from(SNAPSHOT_PREFIX);
125 let mut entries = self.store.list(Some(&prefix));
126 let mut versions: Vec<u64> = Vec::new();
127 while let Some(entry) = entries.next().await {
128 let entry = entry.map_err(|e| SnapshotError::Io(e.to_string()))?;
129 let loc = entry.location.as_ref();
130 let Some(rest) = loc.strip_prefix(SNAPSHOT_PREFIX) else {
133 continue;
134 };
135 let Some(num) = rest.strip_prefix('v').and_then(|s| s.strip_suffix(".json")) else {
136 continue;
137 };
138 if let Ok(v) = num.parse::<u64>() {
139 versions.push(v);
140 }
141 }
142 versions.sort_unstable();
143 Ok(versions)
144 }
145
146 pub async fn load(&self) -> Result<Option<AssignmentSnapshot>, SnapshotError> {
152 let versions = self.list_versions().await?;
153 let Some(&latest) = versions.last() else {
154 return Ok(None);
155 };
156 self.load_version(latest).await
157 }
158
159 pub async fn load_version(
165 &self,
166 version: u64,
167 ) -> Result<Option<AssignmentSnapshot>, SnapshotError> {
168 let path = snapshot_path(version);
169 match self.store.get(&path).await {
170 Ok(res) => {
171 let bytes = res
172 .bytes()
173 .await
174 .map_err(|e| SnapshotError::Io(e.to_string()))?;
175 let snap = serde_json::from_slice(&bytes)?;
176 Ok(Some(snap))
177 }
178 Err(object_store::Error::NotFound { .. }) => Ok(None),
179 Err(e) => Err(SnapshotError::Io(e.to_string())),
180 }
181 }
182
183 pub async fn save_if_absent(
190 &self,
191 snapshot: &AssignmentSnapshot,
192 ) -> Result<Option<AssignmentSnapshot>, SnapshotError> {
193 let path = snapshot_path(snapshot.version);
194 let bytes = serde_json::to_vec_pretty(snapshot)?;
195 let opts = PutOptions {
196 mode: PutMode::Create,
197 ..PutOptions::default()
198 };
199 match self
200 .store
201 .put_opts(&path, PutPayload::from(Bytes::from(bytes)), opts)
202 .await
203 {
204 Ok(_) => Ok(Some(snapshot.clone())),
205 Err(object_store::Error::AlreadyExists { .. }) => Ok(None),
206 Err(e) => Err(SnapshotError::Io(e.to_string())),
207 }
208 }
209
210 pub async fn save_if_version(
219 &self,
220 snapshot: &AssignmentSnapshot,
221 prior_version: u64,
222 ) -> Result<RotateOutcome, SnapshotError> {
223 if snapshot.version != prior_version + 1 {
224 return Err(SnapshotError::Io(format!(
225 "save_if_version requires monotonic +1 bump: prior={prior_version}, \
226 proposed={}",
227 snapshot.version,
228 )));
229 }
230 if self.save_if_absent(snapshot).await?.is_some() {
231 return Ok(RotateOutcome::Rotated);
232 }
233 let winner = self.load_version(snapshot.version).await?.ok_or_else(|| {
234 SnapshotError::Io("CAS conflict but load of winner returned None".into())
235 })?;
236 Ok(RotateOutcome::Conflict(winner))
237 }
238
239 pub async fn prune_before(&self, before: u64) -> Result<(), SnapshotError> {
245 if before == 0 {
246 return Ok(());
247 }
248 let versions = self.list_versions().await?;
249 for v in versions {
250 if v >= before {
251 break;
252 }
253 let path = snapshot_path(v);
254 match self.store.delete(&path).await {
255 Ok(()) | Err(object_store::Error::NotFound { .. }) => {}
256 Err(e) => {
257 tracing::warn!(version = v, error = %e, "snapshot prune: delete failed");
258 }
259 }
260 }
261 Ok(())
262 }
263}
264
265#[derive(Debug, Clone)]
267pub enum RotateOutcome {
268 Rotated,
270 Conflict(AssignmentSnapshot),
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use object_store::local::LocalFileSystem;
280 use tempfile::tempdir;
281
282 fn store_in(dir: &std::path::Path) -> AssignmentSnapshotStore {
283 let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
284 AssignmentSnapshotStore::new(fs)
285 }
286
287 #[tokio::test]
288 async fn load_missing_returns_none() {
289 let dir = tempdir().unwrap();
290 let s = store_in(dir.path());
291 assert!(s.load().await.unwrap().is_none());
292 }
293
294 #[tokio::test]
295 async fn save_if_absent_then_load_roundtrip() {
296 let dir = tempdir().unwrap();
297 let s = store_in(dir.path());
298
299 let mut vnodes = BTreeMap::new();
300 vnodes.insert(0, NodeId(1));
301 vnodes.insert(1, NodeId(2));
302 let snap = AssignmentSnapshot::empty().next(vnodes);
303
304 assert_eq!(s.save_if_absent(&snap).await.unwrap().as_ref(), Some(&snap),);
305 let loaded = s.load().await.unwrap().unwrap();
306 assert_eq!(loaded, snap);
307 }
308
309 #[tokio::test]
310 async fn load_returns_highest_version() {
311 let dir = tempdir().unwrap();
312 let s = store_in(dir.path());
313
314 let mut v1_map = BTreeMap::new();
315 v1_map.insert(0, NodeId(1));
316 let v1 = AssignmentSnapshot::empty().next(v1_map);
317 s.save_if_absent(&v1).await.unwrap();
318
319 let mut v2_map = BTreeMap::new();
320 v2_map.insert(0, NodeId(2));
321 let v2 = v1.next(v2_map);
322 assert!(matches!(
324 s.save_if_version(&v2, v1.version).await.unwrap(),
325 RotateOutcome::Rotated,
326 ));
327
328 let loaded = s.load().await.unwrap().unwrap();
329 assert_eq!(loaded.version, 2);
330 assert_eq!(loaded.vnodes.get(&0), Some(&NodeId(2)));
331
332 let v1_loaded = s.load_version(1).await.unwrap().unwrap();
334 assert_eq!(v1_loaded, v1);
335 }
336
337 #[tokio::test]
338 async fn save_if_absent_first_writer_wins() {
339 let dir = tempdir().unwrap();
340 let s = store_in(dir.path());
341
342 let mut first_map = BTreeMap::new();
343 first_map.insert(0, NodeId(1));
344 first_map.insert(1, NodeId(2));
345 let first = AssignmentSnapshot::empty().next(first_map);
346
347 let winner = s.save_if_absent(&first).await.unwrap();
348 assert_eq!(winner.as_ref(), Some(&first), "first writer must win");
349
350 let mut second_map = BTreeMap::new();
353 second_map.insert(0, NodeId(99));
354 let second = AssignmentSnapshot::empty().next(second_map);
355 let rejected = s.save_if_absent(&second).await.unwrap();
356 assert!(rejected.is_none(), "second writer must lose the CAS");
357
358 let loaded = s.load().await.unwrap().unwrap();
359 assert_eq!(loaded, first, "stored snapshot is the first writer's");
360 }
361
362 #[tokio::test]
363 async fn save_if_version_rejects_non_monotonic_bump() {
364 let dir = tempdir().unwrap();
365 let s = store_in(dir.path());
366
367 let mut m = BTreeMap::new();
368 m.insert(0, NodeId(1));
369 let v1 = AssignmentSnapshot::empty().next(m);
370 s.save_if_absent(&v1).await.unwrap();
371
372 let mut m2 = BTreeMap::new();
376 m2.insert(0, NodeId(2));
377 let v2 = v1.next(m2);
378 let mut m3 = BTreeMap::new();
379 m3.insert(0, NodeId(3));
380 let v3 = v2.next(m3);
381 let err = s.save_if_version(&v3, 1).await.unwrap_err();
382 assert!(
383 matches!(err, SnapshotError::Io(msg) if msg.contains("monotonic")),
384 "non-monotonic bump must surface a clear error",
385 );
386 }
387
388 #[tokio::test]
389 async fn save_if_version_succeeds_on_match() {
390 let dir = tempdir().unwrap();
391 let s = store_in(dir.path());
392
393 let mut v1_map = BTreeMap::new();
394 v1_map.insert(0, NodeId(1));
395 let first = AssignmentSnapshot::empty().next(v1_map);
396 s.save_if_absent(&first).await.unwrap();
397
398 let mut v2_map = BTreeMap::new();
399 v2_map.insert(0, NodeId(2));
400 let second = first.next(v2_map);
401 let outcome = s.save_if_version(&second, first.version).await.unwrap();
402 assert!(matches!(outcome, RotateOutcome::Rotated));
403
404 let loaded = s.load().await.unwrap().unwrap();
405 assert_eq!(loaded, second);
406 }
407
408 #[tokio::test]
409 async fn save_if_version_conflict_surfaces_winner() {
410 let dir = tempdir().unwrap();
414 let s = store_in(dir.path());
415
416 let mut seed = BTreeMap::new();
417 seed.insert(0, NodeId(1));
418 let v1 = AssignmentSnapshot::empty().next(seed);
419 s.save_if_absent(&v1).await.unwrap();
420
421 let mut winner_map = BTreeMap::new();
422 winner_map.insert(0, NodeId(10));
423 let winner = v1.next(winner_map);
424 assert!(matches!(
425 s.save_if_version(&winner, v1.version).await.unwrap(),
426 RotateOutcome::Rotated,
427 ));
428
429 let mut loser_map = BTreeMap::new();
430 loser_map.insert(0, NodeId(20));
431 let loser = v1.next(loser_map);
432 match s.save_if_version(&loser, v1.version).await.unwrap() {
433 RotateOutcome::Conflict(current) => {
434 assert_eq!(
435 current, winner,
436 "conflict must surface the winner's snapshot",
437 );
438 }
439 RotateOutcome::Rotated => {
440 panic!("stale-token update must not win the CAS");
441 }
442 }
443
444 let loaded = s.load().await.unwrap().unwrap();
445 assert_eq!(loaded, winner, "stored snapshot is the CAS winner's");
446 }
447
448 #[tokio::test]
449 async fn prune_before_drops_old_versions() {
450 let dir = tempdir().unwrap();
451 let s = store_in(dir.path());
452
453 let mut m = BTreeMap::new();
455 m.insert(0, NodeId(1));
456 let mut current = AssignmentSnapshot::empty().next(m);
457 s.save_if_absent(¤t).await.unwrap();
458 for _ in 0..3 {
459 let next = current.next(current.vnodes.clone());
460 s.save_if_version(&next, current.version).await.unwrap();
461 current = next;
462 }
463
464 s.prune_before(3).await.unwrap();
465
466 assert!(s.load_version(1).await.unwrap().is_none());
467 assert!(s.load_version(2).await.unwrap().is_none());
468 assert!(s.load_version(3).await.unwrap().is_some());
469 assert!(s.load_version(4).await.unwrap().is_some());
470 assert_eq!(s.load().await.unwrap().unwrap().version, 4);
472 }
473
474 #[test]
475 fn empty_starts_at_version_zero() {
476 let s = AssignmentSnapshot::empty();
477 assert_eq!(s.version, 0);
478 assert!(s.vnodes.is_empty());
479 }
480
481 #[test]
482 fn next_bumps_version() {
483 let mut vnodes = BTreeMap::new();
484 vnodes.insert(0, NodeId(1));
485 let s = AssignmentSnapshot::empty().next(vnodes);
486 assert_eq!(s.version, 1);
487 }
488
489 #[test]
490 fn roundtrip_vec_conversions() {
491 let assignment = vec![NodeId(1), NodeId(2), NodeId(1), NodeId(2)];
492 let map = AssignmentSnapshot::vnodes_from_vec(&assignment);
493 let snap = AssignmentSnapshot::empty().next(map);
494 let back = snap.to_vnode_vec(u32::try_from(assignment.len()).expect("test len fits u32"));
495 assert_eq!(back, assignment);
496 }
497}