Skip to main content

laminar_core/cluster/control/
leader_lease.rs

1//! CAS-backed leader lease with TTL and a monotonic fencing token.
2//! One object per sequence at `control/leader-lease/v{seq:016}.json`,
3//! written with `PutMode::Create` so the CAS works on every backend
4//! (`LocalFileSystem` included). The lease prevents split-brain: a
5//! stale leader whose lease has expired loses the CAS to whoever
6//! acquires next, and the fencing `token` advances only on an owner
7//! change so followers can reject writes carrying a stale token.
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use bytes::Bytes;
13use object_store::path::Path as OsPath;
14use object_store::{ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
15use serde::{Deserialize, Serialize};
16use tokio::sync::watch;
17use tokio_stream::StreamExt;
18
19use crate::cluster::discovery::NodeId;
20
21const LEASE_PREFIX: &str = "control/leader-lease/";
22
23fn lease_path(seq: u64) -> OsPath {
24    // Fixed-width so lexicographic list order matches numeric order.
25    OsPath::from(format!("{LEASE_PREFIX}v{seq:016}.json"))
26}
27
28// Used only by the renewal loop, which is cluster-gated.
29#[cfg(feature = "cluster")]
30#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
31fn now_millis() -> i64 {
32    std::time::SystemTime::now()
33        .duration_since(std::time::UNIX_EPOCH)
34        .map_or(0, |d| d.as_millis() as i64)
35}
36
37/// A durable leader lease record.
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct LeaderLease {
40    /// CAS sequence; bumps on every write.
41    pub seq: u64,
42    /// Fencing token; bumps only when ownership changes.
43    pub token: u64,
44    /// Current lease holder.
45    pub owner: NodeId,
46    /// Expiry, millis since epoch. The lease is dead once `now >= this`.
47    pub expires_at_ms: i64,
48}
49
50impl LeaderLease {
51    /// Whether the lease has expired as of `now_ms`.
52    #[must_use]
53    pub fn is_expired(&self, now_ms: i64) -> bool {
54        self.expires_at_ms <= now_ms
55    }
56}
57
58/// Result of a [`LeaderLeaseStore::try_acquire`] attempt.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum LeaseOutcome {
61    /// We now hold the lease (fresh acquire or renew).
62    Acquired(LeaderLease),
63    /// A live lease is held by someone else; the attached record is the
64    /// current durable lease.
65    Held(LeaderLease),
66}
67
68/// Errors loading or saving a [`LeaderLease`].
69#[derive(Debug, thiserror::Error)]
70pub enum LeaseError {
71    /// Underlying object store I/O failure.
72    #[error("object store I/O: {0}")]
73    Io(String),
74    /// JSON de/serialization failure.
75    #[error("JSON: {0}")]
76    Json(#[from] serde_json::Error),
77}
78
79/// I/O wrapper for [`LeaderLease`] on an object store.
80pub struct LeaderLeaseStore {
81    store: Arc<dyn ObjectStore>,
82    ttl_ms: i64,
83}
84
85impl std::fmt::Debug for LeaderLeaseStore {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("LeaderLeaseStore")
88            .field("ttl_ms", &self.ttl_ms)
89            .finish_non_exhaustive()
90    }
91}
92
93impl LeaderLeaseStore {
94    /// Wrap a pre-constructed object store with the given lease TTL.
95    #[must_use]
96    pub fn new(store: Arc<dyn ObjectStore>, ttl_ms: i64) -> Self {
97        Self { store, ttl_ms }
98    }
99
100    /// Scan the lease prefix and return every stored sequence in
101    /// ascending order.
102    async fn list_seqs(&self) -> Result<Vec<u64>, LeaseError> {
103        let prefix = OsPath::from(LEASE_PREFIX);
104        let mut entries = self.store.list(Some(&prefix));
105        let mut seqs: Vec<u64> = Vec::new();
106        while let Some(entry) = entries.next().await {
107            let entry = entry.map_err(|e| LeaseError::Io(e.to_string()))?;
108            let loc = entry.location.as_ref();
109            let Some(rest) = loc.strip_prefix(LEASE_PREFIX) else {
110                continue;
111            };
112            let Some(num) = rest.strip_prefix('v').and_then(|s| s.strip_suffix(".json")) else {
113                continue;
114            };
115            if let Ok(s) = num.parse::<u64>() {
116                seqs.push(s);
117            }
118        }
119        seqs.sort_unstable();
120        Ok(seqs)
121    }
122
123    /// Load the current (highest-seq) lease; `Ok(None)` if none exists.
124    ///
125    /// # Errors
126    /// Object-store I/O or JSON decode failure.
127    pub async fn load(&self) -> Result<Option<LeaderLease>, LeaseError> {
128        let seqs = self.list_seqs().await?;
129        let Some(&latest) = seqs.last() else {
130            return Ok(None);
131        };
132        self.load_seq(latest).await
133    }
134
135    /// Load a specific sequence's lease. `Ok(None)` if it was never
136    /// written.
137    ///
138    /// # Errors
139    /// Object-store I/O or JSON decode failure.
140    pub async fn load_seq(&self, seq: u64) -> Result<Option<LeaderLease>, LeaseError> {
141        let path = lease_path(seq);
142        match self.store.get(&path).await {
143            Ok(res) => {
144                let bytes = res
145                    .bytes()
146                    .await
147                    .map_err(|e| LeaseError::Io(e.to_string()))?;
148                let lease = serde_json::from_slice(&bytes)?;
149                Ok(Some(lease))
150            }
151            Err(object_store::Error::NotFound { .. }) => Ok(None),
152            Err(e) => Err(LeaseError::Io(e.to_string())),
153        }
154    }
155
156    /// Attempt to acquire or renew the lease for `me` as of `now_ms`.
157    ///
158    /// Takes `now_ms` explicitly so the decision stays deterministic and
159    /// unit-testable; the renewal manager supplies wall-clock time.
160    ///
161    /// # Errors
162    /// Object-store I/O or JSON encode failure.
163    pub async fn try_acquire(&self, me: NodeId, now_ms: i64) -> Result<LeaseOutcome, LeaseError> {
164        let cur = self.load().await?;
165        let candidate = match cur {
166            None => LeaderLease {
167                seq: 1,
168                token: 1,
169                owner: me,
170                expires_at_ms: now_ms + self.ttl_ms,
171            },
172            Some(ref cur) if cur.owner == me || cur.is_expired(now_ms) => {
173                let token = if cur.owner == me {
174                    cur.token
175                } else {
176                    cur.token + 1
177                };
178                LeaderLease {
179                    seq: cur.seq + 1,
180                    token,
181                    owner: me,
182                    expires_at_ms: now_ms + self.ttl_ms,
183                }
184            }
185            // Live lease held by another node; back off.
186            Some(cur) => return Ok(LeaseOutcome::Held(cur)),
187        };
188
189        let path = lease_path(candidate.seq);
190        let bytes = serde_json::to_vec_pretty(&candidate)?;
191        let opts = PutOptions {
192            mode: PutMode::Create,
193            ..PutOptions::default()
194        };
195        match self
196            .store
197            .put_opts(&path, PutPayload::from(Bytes::from(bytes)), opts)
198            .await
199        {
200            Ok(_) => Ok(LeaseOutcome::Acquired(candidate)),
201            // A racer landed at our seq first. A single reload settles
202            // it; force the caller to re-evaluate next tick rather than
203            // spin here.
204            Err(object_store::Error::AlreadyExists { .. }) => {
205                let latest = self.load().await?.ok_or_else(|| {
206                    LeaseError::Io("CAS conflict but load of winner returned None".into())
207                })?;
208                Ok(LeaseOutcome::Held(latest))
209            }
210            Err(e) => Err(LeaseError::Io(e.to_string())),
211        }
212    }
213}
214
215/// Tunables for the lease renewal loop.
216#[derive(Debug, Clone, Copy)]
217pub struct LeaderLeaseConfig {
218    /// Lease lifetime; a lease is dead once `now >= expires_at_ms`.
219    pub ttl: Duration,
220    /// How often the manager re-acquires/renews. Must be well under
221    /// `ttl` so the holder renews before expiry.
222    pub renew_interval: Duration,
223}
224
225impl Default for LeaderLeaseConfig {
226    fn default() -> Self {
227        Self {
228            ttl: Duration::from_secs(5),
229            renew_interval: Duration::from_secs(2),
230        }
231    }
232}
233
234/// True iff `lease` is held by `me` and not expired at `now_ms`.
235#[must_use]
236pub fn lease_grants_leadership(lease: &Option<LeaderLease>, me: NodeId, now_ms: i64) -> bool {
237    matches!(lease, Some(l) if l.owner == me && !l.is_expired(now_ms))
238}
239
240/// Periodically renews the leader lease and publishes the latest record
241/// on a watch channel so other tasks can gate on the fencing token.
242pub struct LeaderLeaseManager {
243    store: Arc<LeaderLeaseStore>,
244    me: NodeId,
245    config: LeaderLeaseConfig,
246    tx: watch::Sender<Option<LeaderLease>>,
247}
248
249impl std::fmt::Debug for LeaderLeaseManager {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        f.debug_struct("LeaderLeaseManager")
252            .field("me", &self.me)
253            .field("config", &self.config)
254            .finish_non_exhaustive()
255    }
256}
257
258impl LeaderLeaseManager {
259    /// Build a manager. The watch starts at `None` until the first tick.
260    #[must_use]
261    pub fn new(store: Arc<LeaderLeaseStore>, me: NodeId, config: LeaderLeaseConfig) -> Self {
262        let (tx, _rx) = watch::channel(None);
263        Self {
264            store,
265            me,
266            config,
267            tx,
268        }
269    }
270
271    /// Subscribe to the latest observed lease.
272    #[must_use]
273    pub fn lease_watch(&self) -> watch::Receiver<Option<LeaderLease>> {
274        self.tx.subscribe()
275    }
276
277    /// Spawn the renewal loop. Every `renew_interval` it `try_acquire`s
278    /// and publishes the resulting lease — `Acquired` when we own it,
279    /// otherwise the `Held` record so followers learn the current
280    /// fencing token. Errors are logged and retried next tick. Stops
281    /// when `shutdown` is cancelled.
282    #[cfg(feature = "cluster")]
283    #[must_use]
284    pub fn spawn(
285        self,
286        shutdown: tokio_util::sync::CancellationToken,
287    ) -> tokio::task::JoinHandle<()> {
288        tokio::spawn(async move {
289            let mut ticker = tokio::time::interval(self.config.renew_interval);
290            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
291            loop {
292                tokio::select! {
293                    biased;
294                    () = shutdown.cancelled() => return,
295                    _ = ticker.tick() => {}
296                }
297                match self.store.try_acquire(self.me, now_millis()).await {
298                    Ok(LeaseOutcome::Acquired(lease) | LeaseOutcome::Held(lease)) => {
299                        self.tx.send_replace(Some(lease));
300                    }
301                    Err(e) => {
302                        tracing::warn!(error = %e, "leader lease renewal failed");
303                    }
304                }
305            }
306        })
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use object_store::memory::InMemory;
314
315    fn store(ttl_ms: i64) -> LeaderLeaseStore {
316        let mem: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
317        LeaderLeaseStore::new(mem, ttl_ms)
318    }
319
320    #[tokio::test]
321    async fn first_acquire_on_empty_store() {
322        let s = store(1_000);
323        let me = NodeId(1);
324        match s.try_acquire(me, 0).await.unwrap() {
325            LeaseOutcome::Acquired(l) => {
326                assert_eq!(l.seq, 1);
327                assert_eq!(l.token, 1);
328                assert_eq!(l.owner, me);
329                assert_eq!(l.expires_at_ms, 1_000);
330            }
331            LeaseOutcome::Held(_) => panic!("empty store must yield Acquired"),
332        }
333    }
334
335    #[tokio::test]
336    async fn same_owner_renew_keeps_token() {
337        let s = store(1_000);
338        let me = NodeId(1);
339        s.try_acquire(me, 0).await.unwrap();
340        match s.try_acquire(me, 500).await.unwrap() {
341            LeaseOutcome::Acquired(l) => {
342                assert_eq!(l.seq, 2, "seq bumps on every write");
343                assert_eq!(l.token, 1, "same owner keeps fencing token");
344                assert_eq!(l.expires_at_ms, 1_500, "expiry extended");
345            }
346            LeaseOutcome::Held(_) => panic!("owner renewal must be Acquired"),
347        }
348    }
349
350    #[tokio::test]
351    async fn different_node_blocked_while_live() {
352        let s = store(1_000);
353        let owner = NodeId(1);
354        s.try_acquire(owner, 0).await.unwrap();
355        // Challenger arrives before expiry.
356        match s.try_acquire(NodeId(2), 500).await.unwrap() {
357            LeaseOutcome::Held(l) => {
358                assert_eq!(l.owner, owner, "live lease keeps its owner");
359                assert_eq!(l.token, 1);
360            }
361            LeaseOutcome::Acquired(_) => panic!("must not steal a live lease"),
362        }
363    }
364
365    #[tokio::test]
366    async fn different_node_takes_over_after_expiry() {
367        let s = store(1_000);
368        let owner = NodeId(1);
369        s.try_acquire(owner, 0).await.unwrap();
370        // Challenger arrives at expiry boundary (>= expires_at_ms).
371        match s.try_acquire(NodeId(2), 1_000).await.unwrap() {
372            LeaseOutcome::Acquired(l) => {
373                assert_eq!(l.owner, NodeId(2));
374                assert_eq!(l.token, 2, "owner change bumps fencing token");
375                assert_eq!(l.seq, 2);
376            }
377            LeaseOutcome::Held(_) => panic!("expired lease must be acquirable"),
378        }
379    }
380
381    #[tokio::test]
382    async fn grants_leadership_only_for_live_owner() {
383        let me = NodeId(1);
384        let live = Some(LeaderLease {
385            seq: 1,
386            token: 1,
387            owner: me,
388            expires_at_ms: 1_000,
389        });
390        assert!(lease_grants_leadership(&live, me, 0));
391        assert!(!lease_grants_leadership(&live, me, 1_000), "expired");
392        assert!(!lease_grants_leadership(&live, NodeId(2), 0), "not owner");
393        assert!(!lease_grants_leadership(&None, me, 0), "no lease");
394    }
395
396    #[tokio::test]
397    async fn pre_seeded_live_incumbent_blocks_challenger() {
398        // Pre-seed seq=1 directly (as a racer's write would land), then a
399        // challenger sees the live incumbent on load and backs off.
400        let s = store(10_000);
401        let incumbent = LeaderLease {
402            seq: 1,
403            token: 1,
404            owner: NodeId(9),
405            expires_at_ms: 10_000,
406        };
407        let bytes = serde_json::to_vec_pretty(&incumbent).unwrap();
408        let opts = PutOptions {
409            mode: PutMode::Create,
410            ..PutOptions::default()
411        };
412        s.store
413            .put_opts(&lease_path(1), PutPayload::from(Bytes::from(bytes)), opts)
414            .await
415            .unwrap();
416
417        match s.try_acquire(NodeId(2), 0).await.unwrap() {
418            LeaseOutcome::Held(l) => assert_eq!(l, incumbent),
419            LeaseOutcome::Acquired(_) => panic!("live incumbent must block"),
420        }
421    }
422}