laminar_core/cluster/control/
leader_lease.rs1use std::sync::Arc;
10use std::time::Duration;
11
12use bytes::Bytes;
13use object_store::path::Path as OsPath;
14use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
15use serde::{Deserialize, Serialize};
16use tokio::sync::watch;
17use tokio_stream::StreamExt;
18
19use crate::cluster::discovery::NodeId;
20
21const LEASE_PREFIX: &str = "control/leader-lease/";
22
23fn lease_path(seq: u64) -> OsPath {
24 OsPath::from(format!("{LEASE_PREFIX}v{seq:016}.json"))
26}
27
28#[cfg(feature = "cluster")]
30#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
31fn now_millis() -> i64 {
32 std::time::SystemTime::now()
33 .duration_since(std::time::UNIX_EPOCH)
34 .map_or(0, |d| d.as_millis() as i64)
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct LeaderLease {
40 pub seq: u64,
42 pub token: u64,
44 pub owner: NodeId,
46 pub expires_at_ms: i64,
48}
49
50impl LeaderLease {
51 #[must_use]
53 pub fn is_expired(&self, now_ms: i64) -> bool {
54 self.expires_at_ms <= now_ms
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum LeaseOutcome {
61 Acquired(LeaderLease),
63 Held(LeaderLease),
66}
67
68#[derive(Debug, thiserror::Error)]
70pub enum LeaseError {
71 #[error("object store I/O: {0}")]
73 Io(String),
74 #[error("JSON: {0}")]
76 Json(#[from] serde_json::Error),
77}
78
79pub struct LeaderLeaseStore {
81 store: Arc<dyn ObjectStore>,
82 ttl_ms: i64,
83}
84
85impl std::fmt::Debug for LeaderLeaseStore {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("LeaderLeaseStore")
88 .field("ttl_ms", &self.ttl_ms)
89 .finish_non_exhaustive()
90 }
91}
92
93impl LeaderLeaseStore {
94 #[must_use]
96 pub fn new(store: Arc<dyn ObjectStore>, ttl_ms: i64) -> Self {
97 Self { store, ttl_ms }
98 }
99
100 async fn list_seqs(&self) -> Result<Vec<u64>, LeaseError> {
103 let prefix = OsPath::from(LEASE_PREFIX);
104 let mut entries = self.store.list(Some(&prefix));
105 let mut seqs: Vec<u64> = Vec::new();
106 while let Some(entry) = entries.next().await {
107 let entry = entry.map_err(|e| LeaseError::Io(e.to_string()))?;
108 let loc = entry.location.as_ref();
109 let Some(rest) = loc.strip_prefix(LEASE_PREFIX) else {
110 continue;
111 };
112 let Some(num) = rest.strip_prefix('v').and_then(|s| s.strip_suffix(".json")) else {
113 continue;
114 };
115 if let Ok(s) = num.parse::<u64>() {
116 seqs.push(s);
117 }
118 }
119 seqs.sort_unstable();
120 Ok(seqs)
121 }
122
123 pub async fn load(&self) -> Result<Option<LeaderLease>, LeaseError> {
128 let seqs = self.list_seqs().await?;
129 let Some(&latest) = seqs.last() else {
130 return Ok(None);
131 };
132 self.load_seq(latest).await
133 }
134
135 pub async fn load_seq(&self, seq: u64) -> Result<Option<LeaderLease>, LeaseError> {
141 let path = lease_path(seq);
142 match self.store.get(&path).await {
143 Ok(res) => {
144 let bytes = res
145 .bytes()
146 .await
147 .map_err(|e| LeaseError::Io(e.to_string()))?;
148 let lease = serde_json::from_slice(&bytes)?;
149 Ok(Some(lease))
150 }
151 Err(object_store::Error::NotFound { .. }) => Ok(None),
152 Err(e) => Err(LeaseError::Io(e.to_string())),
153 }
154 }
155
156 pub async fn try_acquire(&self, me: NodeId, now_ms: i64) -> Result<LeaseOutcome, LeaseError> {
164 let cur = self.load().await?;
165 let candidate = match cur {
166 None => LeaderLease {
167 seq: 1,
168 token: 1,
169 owner: me,
170 expires_at_ms: now_ms + self.ttl_ms,
171 },
172 Some(ref cur) if cur.owner == me || cur.is_expired(now_ms) => {
173 let token = if cur.owner == me {
174 cur.token
175 } else {
176 cur.token + 1
177 };
178 LeaderLease {
179 seq: cur.seq + 1,
180 token,
181 owner: me,
182 expires_at_ms: now_ms + self.ttl_ms,
183 }
184 }
185 Some(cur) => return Ok(LeaseOutcome::Held(cur)),
187 };
188
189 let path = lease_path(candidate.seq);
190 let bytes = serde_json::to_vec_pretty(&candidate)?;
191 let opts = PutOptions {
192 mode: PutMode::Create,
193 ..PutOptions::default()
194 };
195 match self
196 .store
197 .put_opts(&path, PutPayload::from(Bytes::from(bytes)), opts)
198 .await
199 {
200 Ok(_) => Ok(LeaseOutcome::Acquired(candidate)),
201 Err(object_store::Error::AlreadyExists { .. }) => {
205 let latest = self.load().await?.ok_or_else(|| {
206 LeaseError::Io("CAS conflict but load of winner returned None".into())
207 })?;
208 Ok(LeaseOutcome::Held(latest))
209 }
210 Err(e) => Err(LeaseError::Io(e.to_string())),
211 }
212 }
213}
214
215#[derive(Debug, Clone, Copy)]
217pub struct LeaderLeaseConfig {
218 pub ttl: Duration,
220 pub renew_interval: Duration,
223}
224
225impl Default for LeaderLeaseConfig {
226 fn default() -> Self {
227 Self {
228 ttl: Duration::from_secs(5),
229 renew_interval: Duration::from_secs(2),
230 }
231 }
232}
233
234#[must_use]
236pub fn lease_grants_leadership(lease: &Option<LeaderLease>, me: NodeId, now_ms: i64) -> bool {
237 matches!(lease, Some(l) if l.owner == me && !l.is_expired(now_ms))
238}
239
240pub struct LeaderLeaseManager {
243 store: Arc<LeaderLeaseStore>,
244 me: NodeId,
245 config: LeaderLeaseConfig,
246 tx: watch::Sender<Option<LeaderLease>>,
247}
248
249impl std::fmt::Debug for LeaderLeaseManager {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 f.debug_struct("LeaderLeaseManager")
252 .field("me", &self.me)
253 .field("config", &self.config)
254 .finish_non_exhaustive()
255 }
256}
257
258impl LeaderLeaseManager {
259 #[must_use]
261 pub fn new(store: Arc<LeaderLeaseStore>, me: NodeId, config: LeaderLeaseConfig) -> Self {
262 let (tx, _rx) = watch::channel(None);
263 Self {
264 store,
265 me,
266 config,
267 tx,
268 }
269 }
270
271 #[must_use]
273 pub fn lease_watch(&self) -> watch::Receiver<Option<LeaderLease>> {
274 self.tx.subscribe()
275 }
276
277 #[cfg(feature = "cluster")]
283 #[must_use]
284 pub fn spawn(
285 self,
286 shutdown: tokio_util::sync::CancellationToken,
287 ) -> tokio::task::JoinHandle<()> {
288 tokio::spawn(async move {
289 let mut ticker = tokio::time::interval(self.config.renew_interval);
290 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
291 loop {
292 tokio::select! {
293 biased;
294 () = shutdown.cancelled() => return,
295 _ = ticker.tick() => {}
296 }
297 match self.store.try_acquire(self.me, now_millis()).await {
298 Ok(LeaseOutcome::Acquired(lease) | LeaseOutcome::Held(lease)) => {
299 self.tx.send_replace(Some(lease));
300 }
301 Err(e) => {
302 tracing::warn!(error = %e, "leader lease renewal failed");
303 }
304 }
305 }
306 })
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use object_store::memory::InMemory;
314
315 fn store(ttl_ms: i64) -> LeaderLeaseStore {
316 let mem: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
317 LeaderLeaseStore::new(mem, ttl_ms)
318 }
319
320 #[tokio::test]
321 async fn first_acquire_on_empty_store() {
322 let s = store(1_000);
323 let me = NodeId(1);
324 match s.try_acquire(me, 0).await.unwrap() {
325 LeaseOutcome::Acquired(l) => {
326 assert_eq!(l.seq, 1);
327 assert_eq!(l.token, 1);
328 assert_eq!(l.owner, me);
329 assert_eq!(l.expires_at_ms, 1_000);
330 }
331 LeaseOutcome::Held(_) => panic!("empty store must yield Acquired"),
332 }
333 }
334
335 #[tokio::test]
336 async fn same_owner_renew_keeps_token() {
337 let s = store(1_000);
338 let me = NodeId(1);
339 s.try_acquire(me, 0).await.unwrap();
340 match s.try_acquire(me, 500).await.unwrap() {
341 LeaseOutcome::Acquired(l) => {
342 assert_eq!(l.seq, 2, "seq bumps on every write");
343 assert_eq!(l.token, 1, "same owner keeps fencing token");
344 assert_eq!(l.expires_at_ms, 1_500, "expiry extended");
345 }
346 LeaseOutcome::Held(_) => panic!("owner renewal must be Acquired"),
347 }
348 }
349
350 #[tokio::test]
351 async fn different_node_blocked_while_live() {
352 let s = store(1_000);
353 let owner = NodeId(1);
354 s.try_acquire(owner, 0).await.unwrap();
355 match s.try_acquire(NodeId(2), 500).await.unwrap() {
357 LeaseOutcome::Held(l) => {
358 assert_eq!(l.owner, owner, "live lease keeps its owner");
359 assert_eq!(l.token, 1);
360 }
361 LeaseOutcome::Acquired(_) => panic!("must not steal a live lease"),
362 }
363 }
364
365 #[tokio::test]
366 async fn different_node_takes_over_after_expiry() {
367 let s = store(1_000);
368 let owner = NodeId(1);
369 s.try_acquire(owner, 0).await.unwrap();
370 match s.try_acquire(NodeId(2), 1_000).await.unwrap() {
372 LeaseOutcome::Acquired(l) => {
373 assert_eq!(l.owner, NodeId(2));
374 assert_eq!(l.token, 2, "owner change bumps fencing token");
375 assert_eq!(l.seq, 2);
376 }
377 LeaseOutcome::Held(_) => panic!("expired lease must be acquirable"),
378 }
379 }
380
381 #[tokio::test]
382 async fn grants_leadership_only_for_live_owner() {
383 let me = NodeId(1);
384 let live = Some(LeaderLease {
385 seq: 1,
386 token: 1,
387 owner: me,
388 expires_at_ms: 1_000,
389 });
390 assert!(lease_grants_leadership(&live, me, 0));
391 assert!(!lease_grants_leadership(&live, me, 1_000), "expired");
392 assert!(!lease_grants_leadership(&live, NodeId(2), 0), "not owner");
393 assert!(!lease_grants_leadership(&None, me, 0), "no lease");
394 }
395
396 #[tokio::test]
397 async fn pre_seeded_live_incumbent_blocks_challenger() {
398 let s = store(10_000);
401 let incumbent = LeaderLease {
402 seq: 1,
403 token: 1,
404 owner: NodeId(9),
405 expires_at_ms: 10_000,
406 };
407 let bytes = serde_json::to_vec_pretty(&incumbent).unwrap();
408 let opts = PutOptions {
409 mode: PutMode::Create,
410 ..PutOptions::default()
411 };
412 s.store
413 .put_opts(&lease_path(1), PutPayload::from(Bytes::from(bytes)), opts)
414 .await
415 .unwrap();
416
417 match s.try_acquire(NodeId(2), 0).await.unwrap() {
418 LeaseOutcome::Held(l) => assert_eq!(l, incumbent),
419 LeaseOutcome::Acquired(_) => panic!("live incumbent must block"),
420 }
421 }
422}