laminar_storage/per_core_wal/
reader.rs1use std::fs::File;
4use std::io::{BufReader, Read, Seek, SeekFrom};
5use std::path::Path;
6
7use rkyv::rancor::Error as RkyvError;
8
9use super::entry::PerCoreWalEntry;
10use super::error::PerCoreWalError;
11
12const RECORD_HEADER_SIZE: u64 = 8;
14
15#[derive(Debug)]
17pub enum WalReadResult {
18 Entry(PerCoreWalEntry),
20 Eof,
22 TornWrite {
24 position: u64,
26 reason: String,
28 },
29 ChecksumMismatch {
31 position: u64,
33 expected: u32,
35 actual: u32,
37 },
38 Corrupted {
40 position: u64,
42 reason: String,
44 },
45}
46
47pub struct PerCoreWalReader {
52 core_id: usize,
54 reader: BufReader<File>,
56 position: u64,
58 file_len: u64,
60}
61
62impl PerCoreWalReader {
63 pub fn open(core_id: usize, path: &Path) -> Result<Self, PerCoreWalError> {
69 let file = File::open(path)?;
70 let file_len = file.metadata()?.len();
71 let reader = BufReader::new(file);
72
73 Ok(Self {
74 core_id,
75 reader,
76 position: 0,
77 file_len,
78 })
79 }
80
81 pub fn open_from(core_id: usize, path: &Path, position: u64) -> Result<Self, PerCoreWalError> {
87 let file = File::open(path)?;
88 let file_len = file.metadata()?.len();
89 let mut reader = BufReader::new(file);
90 reader.seek(SeekFrom::Start(position))?;
91
92 Ok(Self {
93 core_id,
94 reader,
95 position,
96 file_len,
97 })
98 }
99
100 #[must_use]
102 pub fn core_id(&self) -> usize {
103 self.core_id
104 }
105
106 #[must_use]
108 pub fn position(&self) -> u64 {
109 self.position
110 }
111
112 #[must_use]
114 pub fn file_len(&self) -> u64 {
115 self.file_len
116 }
117
118 pub fn read_next(&mut self) -> Result<WalReadResult, PerCoreWalError> {
127 let remaining = self.file_len.saturating_sub(self.position);
128
129 if remaining == 0 {
131 return Ok(WalReadResult::Eof);
132 }
133
134 if remaining < RECORD_HEADER_SIZE {
136 return Ok(WalReadResult::TornWrite {
137 position: self.position,
138 reason: format!(
139 "incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
140 ),
141 });
142 }
143
144 let record_start = self.position;
145
146 let mut len_bytes = [0u8; 4];
148 self.reader.read_exact(&mut len_bytes)?;
149 let len = u64::from(u32::from_le_bytes(len_bytes));
150 self.position += 4;
151
152 let mut crc_bytes = [0u8; 4];
154 self.reader.read_exact(&mut crc_bytes)?;
155 let expected_crc = u32::from_le_bytes(crc_bytes);
156 self.position += 4;
157
158 if len > 256 * 1024 * 1024 {
160 return Ok(WalReadResult::Corrupted {
161 position: record_start,
162 reason: format!("entry length {len} exceeds 256 MiB — likely corrupted"),
163 });
164 }
165
166 let data_remaining = self.file_len.saturating_sub(self.position);
168 if data_remaining < len {
169 return Ok(WalReadResult::TornWrite {
170 position: record_start,
171 reason: format!(
172 "incomplete data: only {data_remaining} bytes remaining, need {len}"
173 ),
174 });
175 }
176
177 #[allow(clippy::cast_possible_truncation)]
179 let mut data = vec![0u8; len as usize];
181 self.reader.read_exact(&mut data)?;
182 self.position += len;
183
184 let actual_crc = crc32c::crc32c(&data);
186 if actual_crc != expected_crc {
187 return Ok(WalReadResult::ChecksumMismatch {
188 position: record_start,
189 expected: expected_crc,
190 actual: actual_crc,
191 });
192 }
193
194 match rkyv::from_bytes::<PerCoreWalEntry, RkyvError>(&data) {
196 Ok(entry) => Ok(WalReadResult::Entry(entry)),
197 Err(e) => Err(PerCoreWalError::Deserialization(e.to_string())),
198 }
199 }
200
201 pub fn read_all(&mut self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
209 let mut entries = Vec::new();
210
211 while let WalReadResult::Entry(entry) = self.read_next()? {
212 entries.push(entry);
213 }
214
215 Ok(entries)
216 }
217
218 pub fn read_up_to_epoch(
224 &mut self,
225 max_epoch: u64,
226 ) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
227 let mut entries = Vec::new();
228
229 while let WalReadResult::Entry(entry) = self.read_next()? {
230 if entry.epoch > max_epoch {
231 break;
232 }
233 entries.push(entry);
234 }
235
236 Ok(entries)
237 }
238
239 pub fn find_valid_end(&mut self) -> Result<u64, PerCoreWalError> {
247 let mut valid_position = self.position;
248
249 loop {
250 let pos_before = self.position;
251 match self.read_next()? {
252 WalReadResult::Entry(_) => {
253 valid_position = self.position;
254 }
255 WalReadResult::Eof => {
256 break;
257 }
258 WalReadResult::TornWrite { .. }
259 | WalReadResult::ChecksumMismatch { .. }
260 | WalReadResult::Corrupted { .. } => {
261 valid_position = pos_before;
263 break;
264 }
265 }
266 }
267
268 Ok(valid_position)
269 }
270}
271
272impl Iterator for PerCoreWalReader {
273 type Item = Result<PerCoreWalEntry, PerCoreWalError>;
274
275 fn next(&mut self) -> Option<Self::Item> {
276 match self.read_next() {
277 Ok(WalReadResult::Entry(entry)) => Some(Ok(entry)),
278 Ok(WalReadResult::Eof) => None,
279 Ok(WalReadResult::TornWrite { position, reason }) => {
280 Some(Err(PerCoreWalError::TornWrite {
281 core_id: self.core_id,
282 position,
283 reason,
284 }))
285 }
286 Ok(WalReadResult::ChecksumMismatch {
287 position,
288 expected,
289 actual,
290 }) => Some(Err(PerCoreWalError::ChecksumMismatch {
291 core_id: self.core_id,
292 position,
293 expected,
294 actual,
295 })),
296 Ok(WalReadResult::Corrupted { position, reason }) => {
297 Some(Err(PerCoreWalError::Corrupted {
298 core_id: self.core_id,
299 position,
300 reason,
301 }))
302 }
303 Err(e) => Some(Err(e)),
304 }
305 }
306}
307
308impl std::fmt::Debug for PerCoreWalReader {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 f.debug_struct("PerCoreWalReader")
311 .field("core_id", &self.core_id)
312 .field("position", &self.position)
313 .field("file_len", &self.file_len)
314 .finish_non_exhaustive()
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::per_core_wal::writer::CoreWalWriter;
322 use tempfile::TempDir;
323
324 fn setup_test_segment(core_id: usize) -> (TempDir, std::path::PathBuf) {
325 let temp_dir = TempDir::new().unwrap();
326 let path = temp_dir.path().join(format!("wal-{core_id}.log"));
327
328 {
330 let mut writer = CoreWalWriter::new(core_id, &path).unwrap();
331 writer.set_epoch(1);
332 writer.append_put(b"key1", b"value1").unwrap();
333 writer.append_put(b"key2", b"value2").unwrap();
334 writer.set_epoch(2);
335 writer.append_put(b"key3", b"value3").unwrap();
336 writer.sync().unwrap();
337 }
338
339 (temp_dir, path)
340 }
341
342 #[test]
343 fn test_reader_open() {
344 let (_temp_dir, path) = setup_test_segment(0);
345 let reader = PerCoreWalReader::open(0, &path).unwrap();
346 assert_eq!(reader.core_id(), 0);
347 assert_eq!(reader.position(), 0);
348 assert!(reader.file_len() > 0);
349 }
350
351 #[test]
352 fn test_read_all() {
353 let (_temp_dir, path) = setup_test_segment(0);
354 let mut reader = PerCoreWalReader::open(0, &path).unwrap();
355
356 let entries = reader.read_all().unwrap();
357 assert_eq!(entries.len(), 3);
358
359 assert!(entries[0].is_put());
360 assert_eq!(entries[0].key(), Some(b"key1".as_slice()));
361 assert_eq!(entries[0].epoch, 1);
362
363 assert_eq!(entries[2].epoch, 2);
364 }
365
366 #[test]
367 fn test_read_up_to_epoch() {
368 let (_temp_dir, path) = setup_test_segment(0);
369 let mut reader = PerCoreWalReader::open(0, &path).unwrap();
370
371 let entries = reader.read_up_to_epoch(1).unwrap();
372 assert_eq!(entries.len(), 2); }
374
375 #[test]
376 fn test_iterator() {
377 let (_temp_dir, path) = setup_test_segment(0);
378 let reader = PerCoreWalReader::open(0, &path).unwrap();
379
380 let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
381 assert_eq!(entries.len(), 3);
382 }
383
384 #[test]
385 fn test_open_from_position() {
386 let (_temp_dir, path) = setup_test_segment(0);
387
388 let mut reader1 = PerCoreWalReader::open(0, &path).unwrap();
390 let _ = reader1.read_next().unwrap();
391 let pos_after_first = reader1.position();
392
393 let mut reader2 = PerCoreWalReader::open_from(0, &path, pos_after_first).unwrap();
395 let entries = reader2.read_all().unwrap();
396
397 assert_eq!(entries.len(), 2); }
399
400 #[test]
401 fn test_empty_segment() {
402 let temp_dir = TempDir::new().unwrap();
403 let path = temp_dir.path().join("wal-0.log");
404
405 {
407 let _writer = CoreWalWriter::new(0, &path).unwrap();
408 }
409
410 let mut reader = PerCoreWalReader::open(0, &path).unwrap();
411 match reader.read_next().unwrap() {
412 WalReadResult::Eof => {}
413 other => panic!("Expected Eof, got {other:?}"),
414 }
415 }
416
417 #[test]
418 fn test_find_valid_end() {
419 let (_temp_dir, path) = setup_test_segment(0);
420 let mut reader = PerCoreWalReader::open(0, &path).unwrap();
421
422 let valid_end = reader.find_valid_end().unwrap();
423 assert_eq!(valid_end, reader.file_len()); }
425
426 #[test]
427 fn test_torn_write_detection() {
428 let temp_dir = TempDir::new().unwrap();
429 let path = temp_dir.path().join("wal-0.log");
430
431 {
433 let mut writer = CoreWalWriter::new(0, &path).unwrap();
434 writer.append_put(b"key1", b"value1").unwrap();
435 writer.sync().unwrap();
436 }
437
438 {
440 use std::io::Write;
441 let mut file = std::fs::OpenOptions::new()
442 .append(true)
443 .open(&path)
444 .unwrap();
445 file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
447 file.sync_all().unwrap();
448 }
449
450 let mut reader = PerCoreWalReader::open(0, &path).unwrap();
451
452 match reader.read_next().unwrap() {
454 WalReadResult::Entry(entry) => {
455 assert_eq!(entry.key(), Some(b"key1".as_slice()));
456 }
457 other => panic!("Expected Entry, got {other:?}"),
458 }
459
460 match reader.read_next().unwrap() {
462 WalReadResult::TornWrite { .. } => {}
463 other => panic!("Expected TornWrite, got {other:?}"),
464 }
465 }
466
467 #[test]
468 fn test_checksum_mismatch() {
469 let temp_dir = TempDir::new().unwrap();
470 let path = temp_dir.path().join("wal-0.log");
471
472 {
474 let mut writer = CoreWalWriter::new(0, &path).unwrap();
475 writer.append_put(b"key1", b"value1").unwrap();
476 writer.sync().unwrap();
477 }
478
479 {
481 use std::io::Write;
482 let mut file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
483 file.seek(SeekFrom::Start(10)).unwrap();
485 file.write_all(&[0xFF]).unwrap();
486 file.sync_all().unwrap();
487 }
488
489 let mut reader = PerCoreWalReader::open(0, &path).unwrap();
490
491 match reader.read_next().unwrap() {
492 WalReadResult::ChecksumMismatch {
493 position,
494 expected,
495 actual,
496 } => {
497 assert_eq!(position, 0);
498 assert_ne!(expected, actual);
499 }
500 other => panic!("Expected ChecksumMismatch, got {other:?}"),
501 }
502 }
503
504 #[test]
505 fn test_debug_format() {
506 let (_temp_dir, path) = setup_test_segment(0);
507 let reader = PerCoreWalReader::open(0, &path).unwrap();
508 let debug_str = format!("{reader:?}");
509 assert!(debug_str.contains("PerCoreWalReader"));
510 assert!(debug_str.contains("core_id"));
511 }
512}