1use std::cmp::Ordering;
4#[allow(clippy::disallowed_types)] use std::collections::HashMap;
6
7mod entry_types {
9 #![allow(missing_docs)] #[allow(clippy::disallowed_types)] use std::collections::HashMap;
13
14 use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
15
16 #[derive(Debug, Clone, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
18 pub enum WalOperation {
19 Put {
21 key: Vec<u8>,
23 value: Vec<u8>,
25 },
26 Delete {
28 key: Vec<u8>,
30 },
31 Checkpoint {
33 id: u64,
35 },
36 Commit {
38 offsets: HashMap<String, u64>,
40 watermark: Option<i64>,
42 },
43 EpochBarrier {
45 epoch: u64,
47 },
48 }
49
50 #[derive(Debug, Clone, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
55 pub struct PerCoreWalEntry {
56 pub epoch: u64,
58 pub sequence: u64,
60 pub core_id: u16,
62 pub timestamp_ns: i64,
64 pub operation: WalOperation,
66 }
67}
68
69pub use entry_types::{PerCoreWalEntry, WalOperation};
70
71impl PerCoreWalEntry {
72 #[must_use]
78 pub fn put(
79 core_id: u16,
80 epoch: u64,
81 sequence: u64,
82 key: Vec<u8>,
83 value: Vec<u8>,
84 timestamp_ns: i64,
85 ) -> Self {
86 Self {
87 epoch,
88 sequence,
89 core_id,
90 timestamp_ns,
91 operation: WalOperation::Put { key, value },
92 }
93 }
94
95 #[must_use]
101 pub fn delete(
102 core_id: u16,
103 epoch: u64,
104 sequence: u64,
105 key: Vec<u8>,
106 timestamp_ns: i64,
107 ) -> Self {
108 Self {
109 epoch,
110 sequence,
111 core_id,
112 timestamp_ns,
113 operation: WalOperation::Delete { key },
114 }
115 }
116
117 #[must_use]
123 pub fn checkpoint(
124 core_id: u16,
125 epoch: u64,
126 sequence: u64,
127 checkpoint_id: u64,
128 timestamp_ns: i64,
129 ) -> Self {
130 Self {
131 epoch,
132 sequence,
133 core_id,
134 timestamp_ns,
135 operation: WalOperation::Checkpoint { id: checkpoint_id },
136 }
137 }
138
139 #[must_use]
145 pub fn commit(
146 core_id: u16,
147 epoch: u64,
148 sequence: u64,
149 offsets: HashMap<String, u64>,
150 watermark: Option<i64>,
151 timestamp_ns: i64,
152 ) -> Self {
153 Self {
154 epoch,
155 sequence,
156 core_id,
157 timestamp_ns,
158 operation: WalOperation::Commit { offsets, watermark },
159 }
160 }
161
162 #[must_use]
168 pub fn epoch_barrier(core_id: u16, epoch: u64, sequence: u64, timestamp_ns: i64) -> Self {
169 Self {
170 epoch,
171 sequence,
172 core_id,
173 timestamp_ns,
174 operation: WalOperation::EpochBarrier { epoch },
175 }
176 }
177
178 #[must_use]
180 pub fn is_put(&self) -> bool {
181 matches!(self.operation, WalOperation::Put { .. })
182 }
183
184 #[must_use]
186 pub fn is_delete(&self) -> bool {
187 matches!(self.operation, WalOperation::Delete { .. })
188 }
189
190 #[must_use]
192 pub fn is_checkpoint(&self) -> bool {
193 matches!(self.operation, WalOperation::Checkpoint { .. })
194 }
195
196 #[must_use]
198 pub fn is_state_operation(&self) -> bool {
199 self.is_put() || self.is_delete()
200 }
201
202 #[must_use]
204 pub fn key(&self) -> Option<&[u8]> {
205 match &self.operation {
206 WalOperation::Put { key, .. } | WalOperation::Delete { key } => Some(key),
207 _ => None,
208 }
209 }
210
211 #[must_use]
213 pub fn value(&self) -> Option<&[u8]> {
214 match &self.operation {
215 WalOperation::Put { value, .. } => Some(value),
216 _ => None,
217 }
218 }
219
220 #[must_use]
225 #[allow(clippy::cast_possible_truncation)] pub fn now_ns() -> i64 {
227 use std::time::{SystemTime, UNIX_EPOCH};
228 SystemTime::now()
229 .duration_since(UNIX_EPOCH)
230 .map_or(0, |d| d.as_nanos() as i64)
231 }
232}
233
234impl Ord for PerCoreWalEntry {
235 fn cmp(&self, other: &Self) -> Ordering {
236 match self.epoch.cmp(&other.epoch) {
238 Ordering::Equal => {
239 match self.timestamp_ns.cmp(&other.timestamp_ns) {
241 Ordering::Equal => {
242 match self.core_id.cmp(&other.core_id) {
244 Ordering::Equal => self.sequence.cmp(&other.sequence),
245 ord => ord,
246 }
247 }
248 ord => ord,
249 }
250 }
251 ord => ord,
252 }
253 }
254}
255
256impl PartialOrd for PerCoreWalEntry {
257 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
258 Some(self.cmp(other))
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn test_entry_ordering_by_epoch() {
268 let e1 = PerCoreWalEntry {
269 epoch: 1,
270 sequence: 100,
271 core_id: 0,
272 timestamp_ns: 1000,
273 operation: WalOperation::Put {
274 key: vec![1],
275 value: vec![1],
276 },
277 };
278 let e2 = PerCoreWalEntry {
279 epoch: 2,
280 sequence: 1,
281 core_id: 1,
282 timestamp_ns: 500, operation: WalOperation::Put {
284 key: vec![2],
285 value: vec![2],
286 },
287 };
288
289 assert!(e1 < e2); }
291
292 #[test]
293 fn test_entry_ordering_by_timestamp() {
294 let e1 = PerCoreWalEntry {
295 epoch: 1,
296 sequence: 1,
297 core_id: 0,
298 timestamp_ns: 1000,
299 operation: WalOperation::Put {
300 key: vec![1],
301 value: vec![1],
302 },
303 };
304 let e2 = PerCoreWalEntry {
305 epoch: 1,
306 sequence: 2,
307 core_id: 1,
308 timestamp_ns: 2000,
309 operation: WalOperation::Put {
310 key: vec![2],
311 value: vec![2],
312 },
313 };
314
315 assert!(e1 < e2); }
317
318 #[test]
319 fn test_entry_ordering_tiebreaker() {
320 let e1 = PerCoreWalEntry {
321 epoch: 1,
322 sequence: 1,
323 core_id: 0,
324 timestamp_ns: 1000,
325 operation: WalOperation::Put {
326 key: vec![1],
327 value: vec![1],
328 },
329 };
330 let e2 = PerCoreWalEntry {
331 epoch: 1,
332 sequence: 2,
333 core_id: 1,
334 timestamp_ns: 1000, operation: WalOperation::Put {
336 key: vec![2],
337 value: vec![2],
338 },
339 };
340
341 assert!(e1 < e2); }
343
344 #[test]
345 fn test_entry_constructors() {
346 let ts = PerCoreWalEntry::now_ns();
347
348 let put = PerCoreWalEntry::put(0, 1, 1, b"key".to_vec(), b"value".to_vec(), ts);
349 assert!(put.is_put());
350 assert!(!put.is_delete());
351 assert_eq!(put.key(), Some(b"key".as_slice()));
352 assert_eq!(put.value(), Some(b"value".as_slice()));
353
354 let delete = PerCoreWalEntry::delete(1, 1, 2, b"key2".to_vec(), ts);
355 assert!(delete.is_delete());
356 assert!(!delete.is_put());
357 assert_eq!(delete.key(), Some(b"key2".as_slice()));
358 assert!(delete.value().is_none());
359
360 let checkpoint = PerCoreWalEntry::checkpoint(0, 1, 3, 100, ts);
361 assert!(checkpoint.is_checkpoint());
362 assert!(!checkpoint.is_state_operation());
363 }
364
365 #[test]
366 fn test_sorting_multiple_entries() {
367 let mut entries = [
368 PerCoreWalEntry {
369 epoch: 2,
370 sequence: 1,
371 core_id: 0,
372 timestamp_ns: 100,
373 operation: WalOperation::Put {
374 key: vec![1],
375 value: vec![1],
376 },
377 },
378 PerCoreWalEntry {
379 epoch: 1,
380 sequence: 2,
381 core_id: 1,
382 timestamp_ns: 200,
383 operation: WalOperation::Put {
384 key: vec![2],
385 value: vec![2],
386 },
387 },
388 PerCoreWalEntry {
389 epoch: 1,
390 sequence: 1,
391 core_id: 0,
392 timestamp_ns: 100,
393 operation: WalOperation::Put {
394 key: vec![3],
395 value: vec![3],
396 },
397 },
398 PerCoreWalEntry {
399 epoch: 2,
400 sequence: 2,
401 core_id: 1,
402 timestamp_ns: 200,
403 operation: WalOperation::Put {
404 key: vec![4],
405 value: vec![4],
406 },
407 },
408 ];
409
410 entries.sort();
411
412 assert_eq!(entries[0].epoch, 1);
414 assert_eq!(entries[0].timestamp_ns, 100);
415 assert_eq!(entries[1].epoch, 1);
416 assert_eq!(entries[1].timestamp_ns, 200);
417 assert_eq!(entries[2].epoch, 2);
418 assert_eq!(entries[2].timestamp_ns, 100);
419 assert_eq!(entries[3].epoch, 2);
420 assert_eq!(entries[3].timestamp_ns, 200);
421 }
422
423 #[test]
424 fn test_commit_entry() {
425 let mut offsets = HashMap::new();
426 offsets.insert("topic1".to_string(), 100);
427 offsets.insert("topic2".to_string(), 200);
428
429 let commit =
430 PerCoreWalEntry::commit(0, 1, 1, offsets, Some(12345), PerCoreWalEntry::now_ns());
431
432 match &commit.operation {
433 WalOperation::Commit { offsets, watermark } => {
434 assert_eq!(offsets.get("topic1"), Some(&100));
435 assert_eq!(offsets.get("topic2"), Some(&200));
436 assert_eq!(*watermark, Some(12345));
437 }
438 _ => panic!("Expected Commit operation"),
439 }
440 }
441
442 #[test]
443 fn test_epoch_barrier_entry() {
444 let barrier = PerCoreWalEntry::epoch_barrier(0, 5, 100, PerCoreWalEntry::now_ns());
445
446 match &barrier.operation {
447 WalOperation::EpochBarrier { epoch } => {
448 assert_eq!(*epoch, 5);
449 }
450 _ => panic!("Expected EpochBarrier operation"),
451 }
452 }
453}