Skip to main content

laminar_core/delta/partition/
guard.rs

1//! Epoch-fenced partition guards for split-brain prevention.
2//!
3//! A `PartitionGuard` validates that this node still owns a partition
4//! at the current epoch before processing events. The hot-path `check()`
5//! is a single `AtomicU64::load(Acquire)` — target < 10ns.
6
7#![allow(clippy::disallowed_types)] // cold path: partition guard management
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use crate::delta::discovery::NodeId;
13
14/// Error returned when an epoch fence check fails.
15#[derive(Debug, thiserror::Error)]
16pub enum EpochError {
17    /// This node no longer owns the partition.
18    #[error("partition {partition_id} epoch fenced: expected {expected}, got {actual}")]
19    Fenced {
20        /// The partition that was fenced.
21        partition_id: u32,
22        /// The epoch this node expected.
23        expected: u64,
24        /// The actual current epoch.
25        actual: u64,
26    },
27
28    /// The partition is not assigned to any node.
29    #[error("partition {0} is unassigned")]
30    Unassigned(u32),
31
32    /// The partition is not in the guard set.
33    #[error("partition {0} not in guard set")]
34    NotTracked(u32),
35}
36
37/// An epoch-fenced guard for a single partition.
38///
39/// The hot-path `check()` is a single atomic load, designed for
40/// sub-10ns latency in the event processing loop.
41#[derive(Debug)]
42pub struct PartitionGuard {
43    /// The partition this guard protects.
44    pub partition_id: u32,
45    /// The epoch at which this node acquired ownership.
46    pub epoch: u64,
47    /// The owning node.
48    pub node_id: NodeId,
49    /// Cached current epoch — updated periodically from Raft.
50    /// When this diverges from `self.epoch`, the guard is fenced.
51    cached_current_epoch: Arc<AtomicU64>,
52}
53
54impl PartitionGuard {
55    /// Create a new guard for a partition at the given epoch.
56    #[must_use]
57    pub fn new(partition_id: u32, epoch: u64, node_id: NodeId) -> Self {
58        Self {
59            partition_id,
60            epoch,
61            node_id,
62            cached_current_epoch: Arc::new(AtomicU64::new(epoch)),
63        }
64    }
65
66    /// Fast-path epoch check (single atomic load).
67    ///
68    /// Returns `Ok(())` if this node still owns the partition at the
69    /// expected epoch. Returns `Err(EpochError::Fenced)` if the epoch
70    /// has advanced (meaning ownership has changed).
71    ///
72    /// # Errors
73    ///
74    /// Returns [`EpochError::Fenced`] if the partition's epoch has changed.
75    #[inline]
76    pub fn check(&self) -> Result<(), EpochError> {
77        let current = self.cached_current_epoch.load(Ordering::Acquire);
78        if current == self.epoch {
79            Ok(())
80        } else {
81            Err(EpochError::Fenced {
82                partition_id: self.partition_id,
83                expected: self.epoch,
84                actual: current,
85            })
86        }
87    }
88
89    /// Update the cached epoch from an external source (e.g., Raft read).
90    ///
91    /// Called periodically by the health-check task to propagate epoch
92    /// changes from the Raft state machine.
93    pub fn update_cached_epoch(&self, new_epoch: u64) {
94        self.cached_current_epoch
95            .store(new_epoch, Ordering::Release);
96    }
97
98    /// Get a handle to the cached epoch atomic for external updates.
99    #[must_use]
100    pub fn cached_epoch_handle(&self) -> Arc<AtomicU64> {
101        Arc::clone(&self.cached_current_epoch)
102    }
103}
104
105/// A set of partition guards managed by a single node.
106#[derive(Debug)]
107pub struct PartitionGuardSet {
108    guards: HashMap<u32, PartitionGuard>,
109    node_id: NodeId,
110}
111
112impl PartitionGuardSet {
113    /// Create a new empty guard set for the given node.
114    #[must_use]
115    pub fn new(node_id: NodeId) -> Self {
116        Self {
117            guards: HashMap::new(),
118            node_id,
119        }
120    }
121
122    /// Add a guard for a partition.
123    pub fn insert(&mut self, partition_id: u32, epoch: u64) {
124        self.guards.insert(
125            partition_id,
126            PartitionGuard::new(partition_id, epoch, self.node_id),
127        );
128    }
129
130    /// Remove a guard for a partition.
131    pub fn remove(&mut self, partition_id: u32) -> Option<PartitionGuard> {
132        self.guards.remove(&partition_id)
133    }
134
135    /// Check that a partition is still owned at the expected epoch.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`EpochError::NotTracked`] if the partition is not in the set,
140    /// or [`EpochError::Fenced`] if the epoch has changed.
141    pub fn check(&self, partition_id: u32) -> Result<(), EpochError> {
142        self.guards
143            .get(&partition_id)
144            .ok_or(EpochError::NotTracked(partition_id))?
145            .check()
146    }
147
148    /// Get a guard by partition ID.
149    #[must_use]
150    pub fn get(&self, partition_id: u32) -> Option<&PartitionGuard> {
151        self.guards.get(&partition_id)
152    }
153
154    /// Get the number of tracked partitions.
155    #[must_use]
156    pub fn len(&self) -> usize {
157        self.guards.len()
158    }
159
160    /// Returns `true` if no partitions are tracked.
161    #[must_use]
162    pub fn is_empty(&self) -> bool {
163        self.guards.is_empty()
164    }
165
166    /// Iterate over all guards.
167    pub fn iter(&self) -> impl Iterator<Item = (&u32, &PartitionGuard)> {
168        self.guards.iter()
169    }
170
171    /// Update the cached epoch for a partition from Raft state.
172    pub fn update_epoch(&self, partition_id: u32, new_epoch: u64) {
173        if let Some(guard) = self.guards.get(&partition_id) {
174            guard.update_cached_epoch(new_epoch);
175        }
176    }
177}
178
179/// Conditional PUT parameters for object storage epoch fencing.
180///
181/// Used when writing checkpoint data to S3/GCS/Azure to ensure that
182/// only the current epoch owner can write.
183#[derive(Debug, Clone)]
184pub struct ConditionalPut {
185    /// The partition being checkpointed.
186    pub partition_id: u32,
187    /// The epoch that must match for the write to succeed.
188    pub expected_epoch: u64,
189    /// The node that is writing.
190    pub writer_node: NodeId,
191}
192
193impl ConditionalPut {
194    /// Create conditional PUT parameters.
195    #[must_use]
196    pub const fn new(partition_id: u32, expected_epoch: u64, writer_node: NodeId) -> Self {
197        Self {
198            partition_id,
199            expected_epoch,
200            writer_node,
201        }
202    }
203
204    /// Convert to HTTP header metadata for S3 conditional PUT.
205    #[must_use]
206    pub fn to_metadata(&self) -> Vec<(String, String)> {
207        vec![
208            ("x-amz-meta-epoch".into(), self.expected_epoch.to_string()),
209            (
210                "x-amz-meta-writer-node".into(),
211                self.writer_node.0.to_string(),
212            ),
213            (
214                "x-amz-meta-partition-id".into(),
215                self.partition_id.to_string(),
216            ),
217        ]
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn test_guard_check_ok() {
227        let guard = PartitionGuard::new(0, 1, NodeId(1));
228        assert!(guard.check().is_ok());
229    }
230
231    #[test]
232    fn test_guard_check_fenced() {
233        let guard = PartitionGuard::new(0, 1, NodeId(1));
234        guard.update_cached_epoch(2);
235        let err = guard.check().unwrap_err();
236        match err {
237            EpochError::Fenced {
238                partition_id,
239                expected,
240                actual,
241            } => {
242                assert_eq!(partition_id, 0);
243                assert_eq!(expected, 1);
244                assert_eq!(actual, 2);
245            }
246            _ => panic!("expected Fenced error"),
247        }
248    }
249
250    #[test]
251    fn test_guard_set_insert_check() {
252        let mut set = PartitionGuardSet::new(NodeId(1));
253        set.insert(0, 1);
254        set.insert(1, 1);
255        assert_eq!(set.len(), 2);
256        assert!(set.check(0).is_ok());
257        assert!(set.check(1).is_ok());
258    }
259
260    #[test]
261    fn test_guard_set_not_tracked() {
262        let set = PartitionGuardSet::new(NodeId(1));
263        let err = set.check(99).unwrap_err();
264        assert!(matches!(err, EpochError::NotTracked(99)));
265    }
266
267    #[test]
268    fn test_guard_set_remove() {
269        let mut set = PartitionGuardSet::new(NodeId(1));
270        set.insert(0, 1);
271        assert_eq!(set.len(), 1);
272        let removed = set.remove(0);
273        assert!(removed.is_some());
274        assert!(set.is_empty());
275    }
276
277    #[test]
278    fn test_guard_set_update_epoch() {
279        let mut set = PartitionGuardSet::new(NodeId(1));
280        set.insert(0, 1);
281        assert!(set.check(0).is_ok());
282
283        set.update_epoch(0, 2);
284        assert!(set.check(0).is_err());
285    }
286
287    #[test]
288    fn test_conditional_put_metadata() {
289        let cp = ConditionalPut::new(5, 10, NodeId(3));
290        let meta = cp.to_metadata();
291        assert_eq!(meta.len(), 3);
292        assert!(meta
293            .iter()
294            .any(|(k, v)| k == "x-amz-meta-epoch" && v == "10"));
295        assert!(meta
296            .iter()
297            .any(|(k, v)| k == "x-amz-meta-writer-node" && v == "3"));
298    }
299
300    #[test]
301    fn test_guard_atomic_handle() {
302        let guard = PartitionGuard::new(0, 5, NodeId(1));
303        let handle = guard.cached_epoch_handle();
304        assert_eq!(handle.load(Ordering::Relaxed), 5);
305
306        // External update via handle
307        handle.store(6, Ordering::Release);
308        assert!(guard.check().is_err());
309    }
310
311    #[test]
312    fn test_guard_set_iter() {
313        let mut set = PartitionGuardSet::new(NodeId(1));
314        set.insert(0, 1);
315        set.insert(1, 2);
316
317        let partitions: Vec<u32> = set.iter().map(|(id, _)| *id).collect();
318        assert_eq!(partitions.len(), 2);
319    }
320}