laminar_core/delta/partition/
guard.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use crate::delta::discovery::NodeId;
13
14#[derive(Debug, thiserror::Error)]
16pub enum EpochError {
17 #[error("partition {partition_id} epoch fenced: expected {expected}, got {actual}")]
19 Fenced {
20 partition_id: u32,
22 expected: u64,
24 actual: u64,
26 },
27
28 #[error("partition {0} is unassigned")]
30 Unassigned(u32),
31
32 #[error("partition {0} not in guard set")]
34 NotTracked(u32),
35}
36
37#[derive(Debug)]
42pub struct PartitionGuard {
43 pub partition_id: u32,
45 pub epoch: u64,
47 pub node_id: NodeId,
49 cached_current_epoch: Arc<AtomicU64>,
52}
53
54impl PartitionGuard {
55 #[must_use]
57 pub fn new(partition_id: u32, epoch: u64, node_id: NodeId) -> Self {
58 Self {
59 partition_id,
60 epoch,
61 node_id,
62 cached_current_epoch: Arc::new(AtomicU64::new(epoch)),
63 }
64 }
65
66 #[inline]
76 pub fn check(&self) -> Result<(), EpochError> {
77 let current = self.cached_current_epoch.load(Ordering::Acquire);
78 if current == self.epoch {
79 Ok(())
80 } else {
81 Err(EpochError::Fenced {
82 partition_id: self.partition_id,
83 expected: self.epoch,
84 actual: current,
85 })
86 }
87 }
88
89 pub fn update_cached_epoch(&self, new_epoch: u64) {
94 self.cached_current_epoch
95 .store(new_epoch, Ordering::Release);
96 }
97
98 #[must_use]
100 pub fn cached_epoch_handle(&self) -> Arc<AtomicU64> {
101 Arc::clone(&self.cached_current_epoch)
102 }
103}
104
105#[derive(Debug)]
107pub struct PartitionGuardSet {
108 guards: HashMap<u32, PartitionGuard>,
109 node_id: NodeId,
110}
111
112impl PartitionGuardSet {
113 #[must_use]
115 pub fn new(node_id: NodeId) -> Self {
116 Self {
117 guards: HashMap::new(),
118 node_id,
119 }
120 }
121
122 pub fn insert(&mut self, partition_id: u32, epoch: u64) {
124 self.guards.insert(
125 partition_id,
126 PartitionGuard::new(partition_id, epoch, self.node_id),
127 );
128 }
129
130 pub fn remove(&mut self, partition_id: u32) -> Option<PartitionGuard> {
132 self.guards.remove(&partition_id)
133 }
134
135 pub fn check(&self, partition_id: u32) -> Result<(), EpochError> {
142 self.guards
143 .get(&partition_id)
144 .ok_or(EpochError::NotTracked(partition_id))?
145 .check()
146 }
147
148 #[must_use]
150 pub fn get(&self, partition_id: u32) -> Option<&PartitionGuard> {
151 self.guards.get(&partition_id)
152 }
153
154 #[must_use]
156 pub fn len(&self) -> usize {
157 self.guards.len()
158 }
159
160 #[must_use]
162 pub fn is_empty(&self) -> bool {
163 self.guards.is_empty()
164 }
165
166 pub fn iter(&self) -> impl Iterator<Item = (&u32, &PartitionGuard)> {
168 self.guards.iter()
169 }
170
171 pub fn update_epoch(&self, partition_id: u32, new_epoch: u64) {
173 if let Some(guard) = self.guards.get(&partition_id) {
174 guard.update_cached_epoch(new_epoch);
175 }
176 }
177}
178
179#[derive(Debug, Clone)]
184pub struct ConditionalPut {
185 pub partition_id: u32,
187 pub expected_epoch: u64,
189 pub writer_node: NodeId,
191}
192
193impl ConditionalPut {
194 #[must_use]
196 pub const fn new(partition_id: u32, expected_epoch: u64, writer_node: NodeId) -> Self {
197 Self {
198 partition_id,
199 expected_epoch,
200 writer_node,
201 }
202 }
203
204 #[must_use]
206 pub fn to_metadata(&self) -> Vec<(String, String)> {
207 vec![
208 ("x-amz-meta-epoch".into(), self.expected_epoch.to_string()),
209 (
210 "x-amz-meta-writer-node".into(),
211 self.writer_node.0.to_string(),
212 ),
213 (
214 "x-amz-meta-partition-id".into(),
215 self.partition_id.to_string(),
216 ),
217 ]
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn test_guard_check_ok() {
227 let guard = PartitionGuard::new(0, 1, NodeId(1));
228 assert!(guard.check().is_ok());
229 }
230
231 #[test]
232 fn test_guard_check_fenced() {
233 let guard = PartitionGuard::new(0, 1, NodeId(1));
234 guard.update_cached_epoch(2);
235 let err = guard.check().unwrap_err();
236 match err {
237 EpochError::Fenced {
238 partition_id,
239 expected,
240 actual,
241 } => {
242 assert_eq!(partition_id, 0);
243 assert_eq!(expected, 1);
244 assert_eq!(actual, 2);
245 }
246 _ => panic!("expected Fenced error"),
247 }
248 }
249
250 #[test]
251 fn test_guard_set_insert_check() {
252 let mut set = PartitionGuardSet::new(NodeId(1));
253 set.insert(0, 1);
254 set.insert(1, 1);
255 assert_eq!(set.len(), 2);
256 assert!(set.check(0).is_ok());
257 assert!(set.check(1).is_ok());
258 }
259
260 #[test]
261 fn test_guard_set_not_tracked() {
262 let set = PartitionGuardSet::new(NodeId(1));
263 let err = set.check(99).unwrap_err();
264 assert!(matches!(err, EpochError::NotTracked(99)));
265 }
266
267 #[test]
268 fn test_guard_set_remove() {
269 let mut set = PartitionGuardSet::new(NodeId(1));
270 set.insert(0, 1);
271 assert_eq!(set.len(), 1);
272 let removed = set.remove(0);
273 assert!(removed.is_some());
274 assert!(set.is_empty());
275 }
276
277 #[test]
278 fn test_guard_set_update_epoch() {
279 let mut set = PartitionGuardSet::new(NodeId(1));
280 set.insert(0, 1);
281 assert!(set.check(0).is_ok());
282
283 set.update_epoch(0, 2);
284 assert!(set.check(0).is_err());
285 }
286
287 #[test]
288 fn test_conditional_put_metadata() {
289 let cp = ConditionalPut::new(5, 10, NodeId(3));
290 let meta = cp.to_metadata();
291 assert_eq!(meta.len(), 3);
292 assert!(meta
293 .iter()
294 .any(|(k, v)| k == "x-amz-meta-epoch" && v == "10"));
295 assert!(meta
296 .iter()
297 .any(|(k, v)| k == "x-amz-meta-writer-node" && v == "3"));
298 }
299
300 #[test]
301 fn test_guard_atomic_handle() {
302 let guard = PartitionGuard::new(0, 5, NodeId(1));
303 let handle = guard.cached_epoch_handle();
304 assert_eq!(handle.load(Ordering::Relaxed), 5);
305
306 handle.store(6, Ordering::Release);
308 assert!(guard.check().is_err());
309 }
310
311 #[test]
312 fn test_guard_set_iter() {
313 let mut set = PartitionGuardSet::new(NodeId(1));
314 set.insert(0, 1);
315 set.insert(1, 2);
316
317 let partitions: Vec<u32> = set.iter().map(|(id, _)| *id).collect();
318 assert_eq!(partitions.len(), 2);
319 }
320}