Skip to main content

laminar_storage/per_core_wal/
entry.rs

1//! WAL entry types with epoch-based ordering for per-core WAL.
2
3use std::cmp::Ordering;
4#[allow(clippy::disallowed_types)] // cold path: WAL coordination
5use std::collections::HashMap;
6
7// WAL entry types with derive macros
8mod entry_types {
9    #![allow(missing_docs)] // Allow for derive-generated code
10
11    #[allow(clippy::disallowed_types)] // cold path: WAL coordination
12    use std::collections::HashMap;
13
14    use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
15
16    /// Operations that can be logged in the WAL.
17    #[derive(Debug, Clone, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
18    pub enum WalOperation {
19        /// Put a key-value pair.
20        Put {
21            /// The key.
22            key: Vec<u8>,
23            /// The value.
24            value: Vec<u8>,
25        },
26        /// Delete a key.
27        Delete {
28            /// The key to delete.
29            key: Vec<u8>,
30        },
31        /// Checkpoint marker.
32        Checkpoint {
33            /// Checkpoint ID.
34            id: u64,
35        },
36        /// Commit offsets for exactly-once semantics.
37        Commit {
38            /// Map of topic/partition to offset.
39            offsets: HashMap<String, u64>,
40            /// Current watermark at commit time.
41            watermark: Option<i64>,
42        },
43        /// Barrier for epoch boundary.
44        EpochBarrier {
45            /// The epoch that just completed.
46            epoch: u64,
47        },
48    }
49
50    /// Per-core WAL entry with epoch for cross-core ordering.
51    ///
52    /// Entries are ordered by (epoch, `timestamp_ns`) for deterministic recovery.
53    /// The 32-byte compact layout is designed for cache efficiency.
54    #[derive(Debug, Clone, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
55    pub struct PerCoreWalEntry {
56        /// Global epoch (for ordering during recovery).
57        pub epoch: u64,
58        /// Core-local sequence number (monotonically increasing per core).
59        pub sequence: u64,
60        /// Core ID that wrote this entry.
61        pub core_id: u16,
62        /// Timestamp in nanoseconds since Unix epoch.
63        pub timestamp_ns: i64,
64        /// The actual operation.
65        pub operation: WalOperation,
66    }
67}
68
69pub use entry_types::{PerCoreWalEntry, WalOperation};
70
71impl PerCoreWalEntry {
72    /// Creates a new Put entry.
73    ///
74    /// # Arguments
75    ///
76    /// * `timestamp_ns` - Pre-cached timestamp from [`now_ns()`](Self::now_ns)
77    #[must_use]
78    pub fn put(
79        core_id: u16,
80        epoch: u64,
81        sequence: u64,
82        key: Vec<u8>,
83        value: Vec<u8>,
84        timestamp_ns: i64,
85    ) -> Self {
86        Self {
87            epoch,
88            sequence,
89            core_id,
90            timestamp_ns,
91            operation: WalOperation::Put { key, value },
92        }
93    }
94
95    /// Creates a new Delete entry.
96    ///
97    /// # Arguments
98    ///
99    /// * `timestamp_ns` - Pre-cached timestamp from [`now_ns()`](Self::now_ns)
100    #[must_use]
101    pub fn delete(
102        core_id: u16,
103        epoch: u64,
104        sequence: u64,
105        key: Vec<u8>,
106        timestamp_ns: i64,
107    ) -> Self {
108        Self {
109            epoch,
110            sequence,
111            core_id,
112            timestamp_ns,
113            operation: WalOperation::Delete { key },
114        }
115    }
116
117    /// Creates a new Checkpoint entry.
118    ///
119    /// # Arguments
120    ///
121    /// * `timestamp_ns` - Pre-cached timestamp from [`now_ns()`](Self::now_ns)
122    #[must_use]
123    pub fn checkpoint(
124        core_id: u16,
125        epoch: u64,
126        sequence: u64,
127        checkpoint_id: u64,
128        timestamp_ns: i64,
129    ) -> Self {
130        Self {
131            epoch,
132            sequence,
133            core_id,
134            timestamp_ns,
135            operation: WalOperation::Checkpoint { id: checkpoint_id },
136        }
137    }
138
139    /// Creates a new Commit entry.
140    ///
141    /// # Arguments
142    ///
143    /// * `timestamp_ns` - Pre-cached timestamp from [`now_ns()`](Self::now_ns)
144    #[must_use]
145    pub fn commit(
146        core_id: u16,
147        epoch: u64,
148        sequence: u64,
149        offsets: HashMap<String, u64>,
150        watermark: Option<i64>,
151        timestamp_ns: i64,
152    ) -> Self {
153        Self {
154            epoch,
155            sequence,
156            core_id,
157            timestamp_ns,
158            operation: WalOperation::Commit { offsets, watermark },
159        }
160    }
161
162    /// Creates a new `EpochBarrier` entry.
163    ///
164    /// # Arguments
165    ///
166    /// * `timestamp_ns` - Pre-cached timestamp from [`now_ns()`](Self::now_ns)
167    #[must_use]
168    pub fn epoch_barrier(core_id: u16, epoch: u64, sequence: u64, timestamp_ns: i64) -> Self {
169        Self {
170            epoch,
171            sequence,
172            core_id,
173            timestamp_ns,
174            operation: WalOperation::EpochBarrier { epoch },
175        }
176    }
177
178    /// Returns true if this is a Put operation.
179    #[must_use]
180    pub fn is_put(&self) -> bool {
181        matches!(self.operation, WalOperation::Put { .. })
182    }
183
184    /// Returns true if this is a Delete operation.
185    #[must_use]
186    pub fn is_delete(&self) -> bool {
187        matches!(self.operation, WalOperation::Delete { .. })
188    }
189
190    /// Returns true if this is a Checkpoint operation.
191    #[must_use]
192    pub fn is_checkpoint(&self) -> bool {
193        matches!(self.operation, WalOperation::Checkpoint { .. })
194    }
195
196    /// Returns true if this is a state-modifying operation (Put or Delete).
197    #[must_use]
198    pub fn is_state_operation(&self) -> bool {
199        self.is_put() || self.is_delete()
200    }
201
202    /// Gets the key if this is a Put or Delete operation.
203    #[must_use]
204    pub fn key(&self) -> Option<&[u8]> {
205        match &self.operation {
206            WalOperation::Put { key, .. } | WalOperation::Delete { key } => Some(key),
207            _ => None,
208        }
209    }
210
211    /// Gets the value if this is a Put operation.
212    #[must_use]
213    pub fn value(&self) -> Option<&[u8]> {
214        match &self.operation {
215            WalOperation::Put { value, .. } => Some(value),
216            _ => None,
217        }
218    }
219
220    /// Returns current timestamp in nanoseconds.
221    ///
222    /// Call once and pass the result to entry constructors to avoid
223    /// repeated `SystemTime::now()` syscalls in tight loops.
224    #[must_use]
225    #[allow(clippy::cast_possible_truncation)] // i64 ns won't overflow for ~292 years
226    pub fn now_ns() -> i64 {
227        use std::time::{SystemTime, UNIX_EPOCH};
228        SystemTime::now()
229            .duration_since(UNIX_EPOCH)
230            .map_or(0, |d| d.as_nanos() as i64)
231    }
232}
233
234impl Ord for PerCoreWalEntry {
235    fn cmp(&self, other: &Self) -> Ordering {
236        // Order by epoch first, then by timestamp within epoch
237        match self.epoch.cmp(&other.epoch) {
238            Ordering::Equal => {
239                // Within same epoch, order by timestamp
240                match self.timestamp_ns.cmp(&other.timestamp_ns) {
241                    Ordering::Equal => {
242                        // Tie-breaker: use core_id then sequence
243                        match self.core_id.cmp(&other.core_id) {
244                            Ordering::Equal => self.sequence.cmp(&other.sequence),
245                            ord => ord,
246                        }
247                    }
248                    ord => ord,
249                }
250            }
251            ord => ord,
252        }
253    }
254}
255
256impl PartialOrd for PerCoreWalEntry {
257    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
258        Some(self.cmp(other))
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn test_entry_ordering_by_epoch() {
268        let e1 = PerCoreWalEntry {
269            epoch: 1,
270            sequence: 100,
271            core_id: 0,
272            timestamp_ns: 1000,
273            operation: WalOperation::Put {
274                key: vec![1],
275                value: vec![1],
276            },
277        };
278        let e2 = PerCoreWalEntry {
279            epoch: 2,
280            sequence: 1,
281            core_id: 1,
282            timestamp_ns: 500, // Earlier timestamp but later epoch
283            operation: WalOperation::Put {
284                key: vec![2],
285                value: vec![2],
286            },
287        };
288
289        assert!(e1 < e2); // Epoch takes precedence
290    }
291
292    #[test]
293    fn test_entry_ordering_by_timestamp() {
294        let e1 = PerCoreWalEntry {
295            epoch: 1,
296            sequence: 1,
297            core_id: 0,
298            timestamp_ns: 1000,
299            operation: WalOperation::Put {
300                key: vec![1],
301                value: vec![1],
302            },
303        };
304        let e2 = PerCoreWalEntry {
305            epoch: 1,
306            sequence: 2,
307            core_id: 1,
308            timestamp_ns: 2000,
309            operation: WalOperation::Put {
310                key: vec![2],
311                value: vec![2],
312            },
313        };
314
315        assert!(e1 < e2); // Same epoch, timestamp determines order
316    }
317
318    #[test]
319    fn test_entry_ordering_tiebreaker() {
320        let e1 = PerCoreWalEntry {
321            epoch: 1,
322            sequence: 1,
323            core_id: 0,
324            timestamp_ns: 1000,
325            operation: WalOperation::Put {
326                key: vec![1],
327                value: vec![1],
328            },
329        };
330        let e2 = PerCoreWalEntry {
331            epoch: 1,
332            sequence: 2,
333            core_id: 1,
334            timestamp_ns: 1000, // Same timestamp
335            operation: WalOperation::Put {
336                key: vec![2],
337                value: vec![2],
338            },
339        };
340
341        assert!(e1 < e2); // Same epoch and timestamp, core_id determines order
342    }
343
344    #[test]
345    fn test_entry_constructors() {
346        let ts = PerCoreWalEntry::now_ns();
347
348        let put = PerCoreWalEntry::put(0, 1, 1, b"key".to_vec(), b"value".to_vec(), ts);
349        assert!(put.is_put());
350        assert!(!put.is_delete());
351        assert_eq!(put.key(), Some(b"key".as_slice()));
352        assert_eq!(put.value(), Some(b"value".as_slice()));
353
354        let delete = PerCoreWalEntry::delete(1, 1, 2, b"key2".to_vec(), ts);
355        assert!(delete.is_delete());
356        assert!(!delete.is_put());
357        assert_eq!(delete.key(), Some(b"key2".as_slice()));
358        assert!(delete.value().is_none());
359
360        let checkpoint = PerCoreWalEntry::checkpoint(0, 1, 3, 100, ts);
361        assert!(checkpoint.is_checkpoint());
362        assert!(!checkpoint.is_state_operation());
363    }
364
365    #[test]
366    fn test_sorting_multiple_entries() {
367        let mut entries = [
368            PerCoreWalEntry {
369                epoch: 2,
370                sequence: 1,
371                core_id: 0,
372                timestamp_ns: 100,
373                operation: WalOperation::Put {
374                    key: vec![1],
375                    value: vec![1],
376                },
377            },
378            PerCoreWalEntry {
379                epoch: 1,
380                sequence: 2,
381                core_id: 1,
382                timestamp_ns: 200,
383                operation: WalOperation::Put {
384                    key: vec![2],
385                    value: vec![2],
386                },
387            },
388            PerCoreWalEntry {
389                epoch: 1,
390                sequence: 1,
391                core_id: 0,
392                timestamp_ns: 100,
393                operation: WalOperation::Put {
394                    key: vec![3],
395                    value: vec![3],
396                },
397            },
398            PerCoreWalEntry {
399                epoch: 2,
400                sequence: 2,
401                core_id: 1,
402                timestamp_ns: 200,
403                operation: WalOperation::Put {
404                    key: vec![4],
405                    value: vec![4],
406                },
407            },
408        ];
409
410        entries.sort();
411
412        // Check ordering: epoch 1 entries first (by timestamp), then epoch 2 entries
413        assert_eq!(entries[0].epoch, 1);
414        assert_eq!(entries[0].timestamp_ns, 100);
415        assert_eq!(entries[1].epoch, 1);
416        assert_eq!(entries[1].timestamp_ns, 200);
417        assert_eq!(entries[2].epoch, 2);
418        assert_eq!(entries[2].timestamp_ns, 100);
419        assert_eq!(entries[3].epoch, 2);
420        assert_eq!(entries[3].timestamp_ns, 200);
421    }
422
423    #[test]
424    fn test_commit_entry() {
425        let mut offsets = HashMap::new();
426        offsets.insert("topic1".to_string(), 100);
427        offsets.insert("topic2".to_string(), 200);
428
429        let commit =
430            PerCoreWalEntry::commit(0, 1, 1, offsets, Some(12345), PerCoreWalEntry::now_ns());
431
432        match &commit.operation {
433            WalOperation::Commit { offsets, watermark } => {
434                assert_eq!(offsets.get("topic1"), Some(&100));
435                assert_eq!(offsets.get("topic2"), Some(&200));
436                assert_eq!(*watermark, Some(12345));
437            }
438            _ => panic!("Expected Commit operation"),
439        }
440    }
441
442    #[test]
443    fn test_epoch_barrier_entry() {
444        let barrier = PerCoreWalEntry::epoch_barrier(0, 5, 100, PerCoreWalEntry::now_ns());
445
446        match &barrier.operation {
447            WalOperation::EpochBarrier { epoch } => {
448                assert_eq!(*epoch, 5);
449            }
450            _ => panic!("Expected EpochBarrier operation"),
451        }
452    }
453}