Skip to main content

laminar_storage/per_core_wal/
reader.rs

1//! Per-core WAL reader for segment file reading.
2
3use std::fs::File;
4use std::io::{BufReader, Read, Seek, SeekFrom};
5use std::path::Path;
6
7use rkyv::rancor::Error as RkyvError;
8
9use super::entry::PerCoreWalEntry;
10use super::error::PerCoreWalError;
11
12/// Size of the record header (length + CRC32).
13const RECORD_HEADER_SIZE: u64 = 8;
14
15/// Result of reading a WAL record.
16#[derive(Debug)]
17pub enum WalReadResult {
18    /// Successfully read an entry.
19    Entry(PerCoreWalEntry),
20    /// Reached end of valid records.
21    Eof,
22    /// Torn write detected (partial record at end).
23    TornWrite {
24        /// Position where torn write was detected.
25        position: u64,
26        /// Description of what was incomplete.
27        reason: String,
28    },
29    /// CRC32 checksum mismatch.
30    ChecksumMismatch {
31        /// Position of the corrupted record.
32        position: u64,
33        /// Expected CRC32C value from the record header.
34        expected: u32,
35        /// Actual CRC32C computed from the data.
36        actual: u32,
37    },
38    /// Data corruption detected (e.g., oversized entry).
39    Corrupted {
40        /// Position of the corrupted record.
41        position: u64,
42        /// Description of the corruption.
43        reason: String,
44    },
45}
46
47/// Per-core WAL reader for a single segment file.
48///
49/// Reads entries from a WAL segment, validating CRC32 checksums and
50/// detecting torn writes.
51pub struct PerCoreWalReader {
52    /// Core ID for this segment.
53    core_id: usize,
54    /// Buffered reader.
55    reader: BufReader<File>,
56    /// Current position in the file.
57    position: u64,
58    /// File length.
59    file_len: u64,
60}
61
62impl PerCoreWalReader {
63    /// Opens a WAL segment for reading.
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the file cannot be opened.
68    pub fn open(core_id: usize, path: &Path) -> Result<Self, PerCoreWalError> {
69        let file = File::open(path)?;
70        let file_len = file.metadata()?.len();
71        let reader = BufReader::new(file);
72
73        Ok(Self {
74            core_id,
75            reader,
76            position: 0,
77            file_len,
78        })
79    }
80
81    /// Opens a WAL segment and seeks to a specific position.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the file cannot be opened or seeked.
86    pub fn open_from(core_id: usize, path: &Path, position: u64) -> Result<Self, PerCoreWalError> {
87        let file = File::open(path)?;
88        let file_len = file.metadata()?.len();
89        let mut reader = BufReader::new(file);
90        reader.seek(SeekFrom::Start(position))?;
91
92        Ok(Self {
93            core_id,
94            reader,
95            position,
96            file_len,
97        })
98    }
99
100    /// Returns the core ID for this segment.
101    #[must_use]
102    pub fn core_id(&self) -> usize {
103        self.core_id
104    }
105
106    /// Returns the current position in the file.
107    #[must_use]
108    pub fn position(&self) -> u64 {
109        self.position
110    }
111
112    /// Returns the file length.
113    #[must_use]
114    pub fn file_len(&self) -> u64 {
115        self.file_len
116    }
117
118    /// Reads the next entry with detailed status.
119    ///
120    /// Unlike the Iterator implementation, this method distinguishes between
121    /// clean EOF, torn writes, and checksum errors.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if file reading fails or deserialization fails.
126    pub fn read_next(&mut self) -> Result<WalReadResult, PerCoreWalError> {
127        let remaining = self.file_len.saturating_sub(self.position);
128
129        // Check for EOF
130        if remaining == 0 {
131            return Ok(WalReadResult::Eof);
132        }
133
134        // Check for incomplete header
135        if remaining < RECORD_HEADER_SIZE {
136            return Ok(WalReadResult::TornWrite {
137                position: self.position,
138                reason: format!(
139                    "incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
140                ),
141            });
142        }
143
144        let record_start = self.position;
145
146        // Read length
147        let mut len_bytes = [0u8; 4];
148        self.reader.read_exact(&mut len_bytes)?;
149        let len = u64::from(u32::from_le_bytes(len_bytes));
150        self.position += 4;
151
152        // Read expected CRC32
153        let mut crc_bytes = [0u8; 4];
154        self.reader.read_exact(&mut crc_bytes)?;
155        let expected_crc = u32::from_le_bytes(crc_bytes);
156        self.position += 4;
157
158        // Guard against corrupted length field causing OOM (256 MiB)
159        if len > 256 * 1024 * 1024 {
160            return Ok(WalReadResult::Corrupted {
161                position: record_start,
162                reason: format!("entry length {len} exceeds 256 MiB — likely corrupted"),
163            });
164        }
165
166        // Check for incomplete data
167        let data_remaining = self.file_len.saturating_sub(self.position);
168        if data_remaining < len {
169            return Ok(WalReadResult::TornWrite {
170                position: record_start,
171                reason: format!(
172                    "incomplete data: only {data_remaining} bytes remaining, need {len}"
173                ),
174            });
175        }
176
177        // Read data
178        #[allow(clippy::cast_possible_truncation)]
179        // u32 → usize: lossless on all supported platforms
180        let mut data = vec![0u8; len as usize];
181        self.reader.read_exact(&mut data)?;
182        self.position += len;
183
184        // Validate CRC
185        let actual_crc = crc32c::crc32c(&data);
186        if actual_crc != expected_crc {
187            return Ok(WalReadResult::ChecksumMismatch {
188                position: record_start,
189                expected: expected_crc,
190                actual: actual_crc,
191            });
192        }
193
194        // Deserialize entry
195        match rkyv::from_bytes::<PerCoreWalEntry, RkyvError>(&data) {
196            Ok(entry) => Ok(WalReadResult::Entry(entry)),
197            Err(e) => Err(PerCoreWalError::Deserialization(e.to_string())),
198        }
199    }
200
201    /// Reads all valid entries from the segment.
202    ///
203    /// Stops at EOF, torn write, or checksum mismatch.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if reading fails (not on torn writes).
208    pub fn read_all(&mut self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
209        let mut entries = Vec::new();
210
211        while let WalReadResult::Entry(entry) = self.read_next()? {
212            entries.push(entry);
213        }
214
215        Ok(entries)
216    }
217
218    /// Reads entries from the segment up to a specific epoch.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if reading fails.
223    pub fn read_up_to_epoch(
224        &mut self,
225        max_epoch: u64,
226    ) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
227        let mut entries = Vec::new();
228
229        while let WalReadResult::Entry(entry) = self.read_next()? {
230            if entry.epoch > max_epoch {
231                break;
232            }
233            entries.push(entry);
234        }
235
236        Ok(entries)
237    }
238
239    /// Finds the last valid position in the segment.
240    ///
241    /// Used for repair/recovery to truncate at torn writes.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if reading fails.
246    pub fn find_valid_end(&mut self) -> Result<u64, PerCoreWalError> {
247        let mut valid_position = self.position;
248
249        loop {
250            let pos_before = self.position;
251            match self.read_next()? {
252                WalReadResult::Entry(_) => {
253                    valid_position = self.position;
254                }
255                WalReadResult::Eof => {
256                    break;
257                }
258                WalReadResult::TornWrite { .. }
259                | WalReadResult::ChecksumMismatch { .. }
260                | WalReadResult::Corrupted { .. } => {
261                    // Truncate at the start of the invalid record
262                    valid_position = pos_before;
263                    break;
264                }
265            }
266        }
267
268        Ok(valid_position)
269    }
270}
271
272impl Iterator for PerCoreWalReader {
273    type Item = Result<PerCoreWalEntry, PerCoreWalError>;
274
275    fn next(&mut self) -> Option<Self::Item> {
276        match self.read_next() {
277            Ok(WalReadResult::Entry(entry)) => Some(Ok(entry)),
278            Ok(WalReadResult::Eof) => None,
279            Ok(WalReadResult::TornWrite { position, reason }) => {
280                Some(Err(PerCoreWalError::TornWrite {
281                    core_id: self.core_id,
282                    position,
283                    reason,
284                }))
285            }
286            Ok(WalReadResult::ChecksumMismatch {
287                position,
288                expected,
289                actual,
290            }) => Some(Err(PerCoreWalError::ChecksumMismatch {
291                core_id: self.core_id,
292                position,
293                expected,
294                actual,
295            })),
296            Ok(WalReadResult::Corrupted { position, reason }) => {
297                Some(Err(PerCoreWalError::Corrupted {
298                    core_id: self.core_id,
299                    position,
300                    reason,
301                }))
302            }
303            Err(e) => Some(Err(e)),
304        }
305    }
306}
307
308impl std::fmt::Debug for PerCoreWalReader {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        f.debug_struct("PerCoreWalReader")
311            .field("core_id", &self.core_id)
312            .field("position", &self.position)
313            .field("file_len", &self.file_len)
314            .finish_non_exhaustive()
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::per_core_wal::writer::CoreWalWriter;
322    use tempfile::TempDir;
323
324    fn setup_test_segment(core_id: usize) -> (TempDir, std::path::PathBuf) {
325        let temp_dir = TempDir::new().unwrap();
326        let path = temp_dir.path().join(format!("wal-{core_id}.log"));
327
328        // Write some test entries
329        {
330            let mut writer = CoreWalWriter::new(core_id, &path).unwrap();
331            writer.set_epoch(1);
332            writer.append_put(b"key1", b"value1").unwrap();
333            writer.append_put(b"key2", b"value2").unwrap();
334            writer.set_epoch(2);
335            writer.append_put(b"key3", b"value3").unwrap();
336            writer.sync().unwrap();
337        }
338
339        (temp_dir, path)
340    }
341
342    #[test]
343    fn test_reader_open() {
344        let (_temp_dir, path) = setup_test_segment(0);
345        let reader = PerCoreWalReader::open(0, &path).unwrap();
346        assert_eq!(reader.core_id(), 0);
347        assert_eq!(reader.position(), 0);
348        assert!(reader.file_len() > 0);
349    }
350
351    #[test]
352    fn test_read_all() {
353        let (_temp_dir, path) = setup_test_segment(0);
354        let mut reader = PerCoreWalReader::open(0, &path).unwrap();
355
356        let entries = reader.read_all().unwrap();
357        assert_eq!(entries.len(), 3);
358
359        assert!(entries[0].is_put());
360        assert_eq!(entries[0].key(), Some(b"key1".as_slice()));
361        assert_eq!(entries[0].epoch, 1);
362
363        assert_eq!(entries[2].epoch, 2);
364    }
365
366    #[test]
367    fn test_read_up_to_epoch() {
368        let (_temp_dir, path) = setup_test_segment(0);
369        let mut reader = PerCoreWalReader::open(0, &path).unwrap();
370
371        let entries = reader.read_up_to_epoch(1).unwrap();
372        assert_eq!(entries.len(), 2); // Only epoch 1 entries
373    }
374
375    #[test]
376    fn test_iterator() {
377        let (_temp_dir, path) = setup_test_segment(0);
378        let reader = PerCoreWalReader::open(0, &path).unwrap();
379
380        let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
381        assert_eq!(entries.len(), 3);
382    }
383
384    #[test]
385    fn test_open_from_position() {
386        let (_temp_dir, path) = setup_test_segment(0);
387
388        // Read first entry to get position after it
389        let mut reader1 = PerCoreWalReader::open(0, &path).unwrap();
390        let _ = reader1.read_next().unwrap();
391        let pos_after_first = reader1.position();
392
393        // Open from that position
394        let mut reader2 = PerCoreWalReader::open_from(0, &path, pos_after_first).unwrap();
395        let entries = reader2.read_all().unwrap();
396
397        assert_eq!(entries.len(), 2); // Should skip first entry
398    }
399
400    #[test]
401    fn test_empty_segment() {
402        let temp_dir = TempDir::new().unwrap();
403        let path = temp_dir.path().join("wal-0.log");
404
405        // Create empty file
406        {
407            let _writer = CoreWalWriter::new(0, &path).unwrap();
408        }
409
410        let mut reader = PerCoreWalReader::open(0, &path).unwrap();
411        match reader.read_next().unwrap() {
412            WalReadResult::Eof => {}
413            other => panic!("Expected Eof, got {other:?}"),
414        }
415    }
416
417    #[test]
418    fn test_find_valid_end() {
419        let (_temp_dir, path) = setup_test_segment(0);
420        let mut reader = PerCoreWalReader::open(0, &path).unwrap();
421
422        let valid_end = reader.find_valid_end().unwrap();
423        assert_eq!(valid_end, reader.file_len()); // All entries are valid
424    }
425
426    #[test]
427    fn test_torn_write_detection() {
428        let temp_dir = TempDir::new().unwrap();
429        let path = temp_dir.path().join("wal-0.log");
430
431        // Write valid entry
432        {
433            let mut writer = CoreWalWriter::new(0, &path).unwrap();
434            writer.append_put(b"key1", b"value1").unwrap();
435            writer.sync().unwrap();
436        }
437
438        // Append incomplete data (simulate torn write)
439        {
440            use std::io::Write;
441            let mut file = std::fs::OpenOptions::new()
442                .append(true)
443                .open(&path)
444                .unwrap();
445            // Write incomplete header
446            file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
447            file.sync_all().unwrap();
448        }
449
450        let mut reader = PerCoreWalReader::open(0, &path).unwrap();
451
452        // First entry should be valid
453        match reader.read_next().unwrap() {
454            WalReadResult::Entry(entry) => {
455                assert_eq!(entry.key(), Some(b"key1".as_slice()));
456            }
457            other => panic!("Expected Entry, got {other:?}"),
458        }
459
460        // Second read should detect torn write
461        match reader.read_next().unwrap() {
462            WalReadResult::TornWrite { .. } => {}
463            other => panic!("Expected TornWrite, got {other:?}"),
464        }
465    }
466
467    #[test]
468    fn test_checksum_mismatch() {
469        let temp_dir = TempDir::new().unwrap();
470        let path = temp_dir.path().join("wal-0.log");
471
472        // Write valid entry
473        {
474            let mut writer = CoreWalWriter::new(0, &path).unwrap();
475            writer.append_put(b"key1", b"value1").unwrap();
476            writer.sync().unwrap();
477        }
478
479        // Corrupt the data
480        {
481            use std::io::Write;
482            let mut file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
483            // Seek past header and corrupt data
484            file.seek(SeekFrom::Start(10)).unwrap();
485            file.write_all(&[0xFF]).unwrap();
486            file.sync_all().unwrap();
487        }
488
489        let mut reader = PerCoreWalReader::open(0, &path).unwrap();
490
491        match reader.read_next().unwrap() {
492            WalReadResult::ChecksumMismatch {
493                position,
494                expected,
495                actual,
496            } => {
497                assert_eq!(position, 0);
498                assert_ne!(expected, actual);
499            }
500            other => panic!("Expected ChecksumMismatch, got {other:?}"),
501        }
502    }
503
504    #[test]
505    fn test_debug_format() {
506        let (_temp_dir, path) = setup_test_segment(0);
507        let reader = PerCoreWalReader::open(0, &path).unwrap();
508        let debug_str = format!("{reader:?}");
509        assert!(debug_str.contains("PerCoreWalReader"));
510        assert!(debug_str.contains("core_id"));
511    }
512}