1use std::fs::{File, OpenOptions};
11use std::io::{BufWriter, Write};
12use std::path::{Path, PathBuf};
13use std::time::Instant;
14
15use laminar_core::storage_io::{IoCompletion, IoFd, StorageIo, StorageIoError};
16use rkyv::api::high;
17use rkyv::rancor::Error as RkyvError;
18use rkyv::util::AlignedVec;
19
20use super::entry::PerCoreWalEntry;
21use super::error::PerCoreWalError;
22
23const RECORD_HEADER_SIZE: u64 = 8;
25
26pub struct CoreWalWriter {
31 core_id: usize,
33 writer: BufWriter<File>,
35 path: PathBuf,
37 position: u64,
39 synced_position: u64,
44 epoch: u64,
46 sequence: u64,
48 last_sync: Instant,
50 entries_since_sync: u64,
52 write_buffer: Vec<u8>,
55 serialize_buffer: AlignedVec,
57 pending_sync: Option<PendingSync>,
62}
63
64#[derive(Debug, Clone, Copy)]
66struct PendingSync {
67 token: u64,
68 boundary: u64,
70}
71
72impl CoreWalWriter {
73 pub fn new(core_id: usize, path: &Path) -> Result<Self, PerCoreWalError> {
84 let file = OpenOptions::new().create(true).append(true).open(path)?;
85
86 let position = file.metadata()?.len();
87
88 Ok(Self {
89 core_id,
90 writer: BufWriter::with_capacity(64 * 1024, file), path: path.to_path_buf(),
92 position,
93 synced_position: position,
94 epoch: 0,
95 sequence: 0,
96 last_sync: Instant::now(),
97 entries_since_sync: 0,
98 write_buffer: Vec::with_capacity(4096),
99 serialize_buffer: AlignedVec::with_capacity(256),
100 pending_sync: None,
101 })
102 }
103
104 pub fn open_at(core_id: usize, path: &Path, position: u64) -> Result<Self, PerCoreWalError> {
112 let file = OpenOptions::new().write(true).open(path)?;
113
114 file.set_len(position)?;
116
117 let file = OpenOptions::new().append(true).open(path)?;
118
119 Ok(Self {
120 core_id,
121 writer: BufWriter::with_capacity(64 * 1024, file),
122 path: path.to_path_buf(),
123 position,
124 synced_position: position,
125 epoch: 0,
126 sequence: 0,
127 last_sync: Instant::now(),
128 entries_since_sync: 0,
129 write_buffer: Vec::with_capacity(4096),
130 serialize_buffer: AlignedVec::with_capacity(256),
131 pending_sync: None,
132 })
133 }
134
135 #[must_use]
137 pub fn core_id(&self) -> usize {
138 self.core_id
139 }
140
141 #[must_use]
143 pub fn position(&self) -> u64 {
144 self.position
145 }
146
147 #[must_use]
152 pub fn synced_position(&self) -> u64 {
153 self.synced_position
154 }
155
156 #[must_use]
158 pub fn epoch(&self) -> u64 {
159 self.epoch
160 }
161
162 #[must_use]
164 pub fn sequence(&self) -> u64 {
165 self.sequence
166 }
167
168 #[must_use]
170 pub fn path(&self) -> &Path {
171 &self.path
172 }
173
174 #[must_use]
176 pub fn entries_since_sync(&self) -> u64 {
177 self.entries_since_sync
178 }
179
180 pub fn set_epoch(&mut self, epoch: u64) {
182 self.epoch = epoch;
183 }
184
185 #[inline]
191 #[allow(clippy::cast_possible_truncation)] pub fn append_put(&mut self, key: &[u8], value: &[u8]) -> Result<u64, PerCoreWalError> {
193 let ts = PerCoreWalEntry::now_ns();
194 let entry = PerCoreWalEntry::put(
195 self.core_id as u16,
196 self.epoch,
197 self.sequence,
198 key.to_vec(),
199 value.to_vec(),
200 ts,
201 );
202 self.append(&entry)
203 }
204
205 #[inline]
211 #[allow(clippy::cast_possible_truncation)] pub fn append_delete(&mut self, key: &[u8]) -> Result<u64, PerCoreWalError> {
213 let ts = PerCoreWalEntry::now_ns();
214 let entry = PerCoreWalEntry::delete(
215 self.core_id as u16,
216 self.epoch,
217 self.sequence,
218 key.to_vec(),
219 ts,
220 );
221 self.append(&entry)
222 }
223
224 #[inline]
230 #[allow(clippy::cast_possible_truncation)]
231 pub fn append_put_via_storage_io(
232 &mut self,
233 key: &[u8],
234 value: &[u8],
235 sio: &mut dyn StorageIo,
236 fd: IoFd,
237 ) -> Result<u64, PerCoreWalError> {
238 let ts = PerCoreWalEntry::now_ns();
239 let entry = PerCoreWalEntry::put(
240 self.core_id as u16,
241 self.epoch,
242 self.sequence,
243 key.to_vec(),
244 value.to_vec(),
245 ts,
246 );
247 self.append_via_storage_io(&entry, sio, fd)
248 }
249
250 #[inline]
256 #[allow(clippy::cast_possible_truncation)]
257 pub fn append_delete_via_storage_io(
258 &mut self,
259 key: &[u8],
260 sio: &mut dyn StorageIo,
261 fd: IoFd,
262 ) -> Result<u64, PerCoreWalError> {
263 let ts = PerCoreWalEntry::now_ns();
264 let entry = PerCoreWalEntry::delete(
265 self.core_id as u16,
266 self.epoch,
267 self.sequence,
268 key.to_vec(),
269 ts,
270 );
271 self.append_via_storage_io(&entry, sio, fd)
272 }
273
274 fn serialize_entry(&mut self, entry: &PerCoreWalEntry) -> Result<(), PerCoreWalError> {
279 self.serialize_buffer.clear();
280 let taken = std::mem::take(&mut self.serialize_buffer);
281 let bytes = high::to_bytes_in::<_, RkyvError>(entry, taken)
282 .map_err(|e| PerCoreWalError::Serialization(e.to_string()))?;
283
284 let crc = crc32c::crc32c(&bytes);
285
286 #[allow(clippy::cast_possible_truncation)]
287 let len = bytes.len() as u32;
288 self.write_buffer.clear();
289 #[allow(clippy::cast_possible_truncation)]
290 self.write_buffer
291 .reserve(RECORD_HEADER_SIZE as usize + bytes.len());
292 self.write_buffer.extend_from_slice(&len.to_le_bytes());
293 self.write_buffer.extend_from_slice(&crc.to_le_bytes());
294 self.write_buffer.extend_from_slice(&bytes);
295
296 self.serialize_buffer = bytes;
297 Ok(())
298 }
299
300 fn advance_position(&mut self) {
302 #[allow(clippy::cast_possible_truncation)]
303 let data_len = self.write_buffer.len() as u64 - RECORD_HEADER_SIZE;
304 self.position += RECORD_HEADER_SIZE + data_len;
305 self.sequence += 1;
306 self.entries_since_sync += 1;
307 }
308
309 pub fn append(&mut self, entry: &PerCoreWalEntry) -> Result<u64, PerCoreWalError> {
317 let start_pos = self.position;
318 self.serialize_entry(entry)?;
319 self.writer.write_all(&self.write_buffer)?;
320 self.advance_position();
321 Ok(start_pos)
322 }
323
324 pub fn append_via_storage_io(
333 &mut self,
334 entry: &PerCoreWalEntry,
335 sio: &mut dyn StorageIo,
336 fd: IoFd,
337 ) -> Result<u64, PerCoreWalError> {
338 let start_pos = self.position;
339 self.serialize_entry(entry)?;
340 sio.submit_append(fd, &self.write_buffer)
341 .map_err(storage_io_to_wal_error)?;
342 self.advance_position();
343 Ok(start_pos)
344 }
345
346 pub fn sync_via_storage_io(
355 &mut self,
356 sio: &mut dyn StorageIo,
357 fd: IoFd,
358 ) -> Result<u64, PerCoreWalError> {
359 if self.pending_sync.is_some() {
360 return Err(PerCoreWalError::Io(std::io::Error::other(
361 "sync already in flight — wait for completion before submitting another",
362 )));
363 }
364 let token = sio.submit_datasync(fd).map_err(storage_io_to_wal_error)?;
365 self.pending_sync = Some(PendingSync {
366 token,
367 boundary: self.position,
368 });
369 Ok(token)
370 }
371
372 pub fn check_completions(
388 &mut self,
389 completions: &[IoCompletion],
390 ) -> Result<bool, PerCoreWalError> {
391 let Some(pending) = self.pending_sync else {
392 return Ok(false);
393 };
394 for c in completions {
395 if c.token == pending.token {
396 self.pending_sync = None;
397 if c.result >= 0 {
398 self.synced_position = pending.boundary;
402 self.last_sync = Instant::now();
403 self.entries_since_sync = 0;
404 return Ok(true);
405 }
406 return Err(PerCoreWalError::Io(std::io::Error::from_raw_os_error(
407 -c.result,
408 )));
409 }
410 }
411 Ok(false)
412 }
413
414 #[must_use]
416 pub fn is_sync_pending(&self) -> bool {
417 self.pending_sync.is_some()
418 }
419
420 pub fn mark_synced(&mut self) {
427 self.synced_position = self.position;
428 self.last_sync = Instant::now();
429 self.entries_since_sync = 0;
430 }
431
432 #[allow(clippy::cast_possible_truncation)] pub fn append_checkpoint(&mut self, checkpoint_id: u64) -> Result<u64, PerCoreWalError> {
439 let ts = PerCoreWalEntry::now_ns();
440 let entry = PerCoreWalEntry::checkpoint(
441 self.core_id as u16,
442 self.epoch,
443 self.sequence,
444 checkpoint_id,
445 ts,
446 );
447 self.append(&entry)
448 }
449
450 #[allow(clippy::cast_possible_truncation)] pub fn append_epoch_barrier(&mut self) -> Result<u64, PerCoreWalError> {
457 let ts = PerCoreWalEntry::now_ns();
458 let entry =
459 PerCoreWalEntry::epoch_barrier(self.core_id as u16, self.epoch, self.sequence, ts);
460 self.append(&entry)
461 }
462
463 #[allow(clippy::cast_possible_truncation)] #[allow(clippy::disallowed_types)] pub fn append_commit(
471 &mut self,
472 offsets: std::collections::HashMap<String, u64>,
473 watermark: Option<i64>,
474 ) -> Result<u64, PerCoreWalError> {
475 let ts = PerCoreWalEntry::now_ns();
476 let entry = PerCoreWalEntry::commit(
477 self.core_id as u16,
478 self.epoch,
479 self.sequence,
480 offsets,
481 watermark,
482 ts,
483 );
484 self.append(&entry)
485 }
486
487 pub fn sync(&mut self) -> Result<(), PerCoreWalError> {
495 self.writer.flush()?;
496 self.writer.get_ref().sync_data()?;
498 self.synced_position = self.position;
499 self.last_sync = Instant::now();
500 self.entries_since_sync = 0;
501 Ok(())
502 }
503
504 pub fn truncate(&mut self, position: u64) -> Result<(), PerCoreWalError> {
512 self.sync()?;
513
514 let file = OpenOptions::new()
516 .write(true)
517 .truncate(false)
518 .open(&self.path)?;
519
520 file.set_len(position)?;
523 file.sync_all()?;
524
525 let file = OpenOptions::new().append(true).open(&self.path)?;
527
528 self.writer = BufWriter::with_capacity(64 * 1024, file);
529 self.position = position;
530 self.synced_position = position;
531
532 Ok(())
533 }
534
535 pub fn reset(&mut self) -> Result<(), PerCoreWalError> {
543 self.truncate(0)?;
544 self.sequence = 0;
545 Ok(())
546 }
547}
548
549fn storage_io_to_wal_error(e: StorageIoError) -> PerCoreWalError {
551 match e {
552 StorageIoError::Io(io_err) => PerCoreWalError::Io(io_err),
553 other => PerCoreWalError::Io(std::io::Error::other(other.to_string())),
554 }
555}
556
557impl std::fmt::Debug for CoreWalWriter {
558 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559 f.debug_struct("CoreWalWriter")
560 .field("core_id", &self.core_id)
561 .field("path", &self.path)
562 .field("position", &self.position)
563 .field("synced_position", &self.synced_position)
564 .field("epoch", &self.epoch)
565 .field("sequence", &self.sequence)
566 .field("entries_since_sync", &self.entries_since_sync)
567 .finish_non_exhaustive()
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use tempfile::TempDir;
575
576 fn create_temp_writer(core_id: usize) -> (CoreWalWriter, TempDir) {
577 let temp_dir = TempDir::new().unwrap();
578 let path = temp_dir.path().join(format!("wal-{core_id}.log"));
579 let writer = CoreWalWriter::new(core_id, &path).unwrap();
580 (writer, temp_dir)
581 }
582
583 #[test]
584 fn test_writer_creation() {
585 let (writer, _temp_dir) = create_temp_writer(0);
586 assert_eq!(writer.core_id(), 0);
587 assert_eq!(writer.position(), 0);
588 assert_eq!(writer.epoch(), 0);
589 assert_eq!(writer.sequence(), 0);
590 }
591
592 #[test]
593 fn test_append_put() {
594 let (mut writer, _temp_dir) = create_temp_writer(0);
595
596 let pos = writer.append_put(b"key1", b"value1").unwrap();
597 assert_eq!(pos, 0);
598 assert!(writer.position() > 0);
599 assert_eq!(writer.sequence(), 1);
600
601 let pos2 = writer.append_put(b"key2", b"value2").unwrap();
602 assert!(pos2 > pos);
603 assert_eq!(writer.sequence(), 2);
604 }
605
606 #[test]
607 fn test_append_delete() {
608 let (mut writer, _temp_dir) = create_temp_writer(1);
609
610 let pos = writer.append_delete(b"key1").unwrap();
611 assert_eq!(pos, 0);
612 assert!(writer.position() > 0);
613 }
614
615 #[test]
616 fn test_sync() {
617 let (mut writer, _temp_dir) = create_temp_writer(0);
618
619 writer.append_put(b"key1", b"value1").unwrap();
620 assert_eq!(writer.entries_since_sync(), 1);
621
622 writer.sync().unwrap();
623 assert_eq!(writer.entries_since_sync(), 0);
624 }
625
626 #[test]
627 fn test_epoch_setting() {
628 let (mut writer, _temp_dir) = create_temp_writer(0);
629
630 assert_eq!(writer.epoch(), 0);
631 writer.set_epoch(5);
632 assert_eq!(writer.epoch(), 5);
633 }
634
635 #[test]
636 fn test_truncate() {
637 let (mut writer, _temp_dir) = create_temp_writer(0);
638
639 writer.append_put(b"key1", b"value1").unwrap();
640 let pos1 = writer.position();
641
642 writer.append_put(b"key2", b"value2").unwrap();
643 let pos2 = writer.position();
644
645 assert!(pos2 > pos1);
646
647 writer.truncate(pos1).unwrap();
648 assert_eq!(writer.position(), pos1);
649 }
650
651 #[test]
652 fn test_reset() {
653 let (mut writer, _temp_dir) = create_temp_writer(0);
654
655 writer.append_put(b"key1", b"value1").unwrap();
656 writer.append_put(b"key2", b"value2").unwrap();
657 assert!(writer.position() > 0);
658 assert_eq!(writer.sequence(), 2);
659
660 writer.reset().unwrap();
661 assert_eq!(writer.position(), 0);
662 assert_eq!(writer.sequence(), 0);
663 }
664
665 #[test]
666 fn test_append_checkpoint() {
667 let (mut writer, _temp_dir) = create_temp_writer(0);
668
669 let pos = writer.append_checkpoint(100).unwrap();
670 assert_eq!(pos, 0);
671 assert!(writer.position() > 0);
672 }
673
674 #[test]
675 fn test_append_epoch_barrier() {
676 let (mut writer, _temp_dir) = create_temp_writer(0);
677 writer.set_epoch(5);
678
679 let pos = writer.append_epoch_barrier().unwrap();
680 assert_eq!(pos, 0);
681 assert!(writer.position() > 0);
682 }
683
684 #[test]
685 fn test_append_commit() {
686 let (mut writer, _temp_dir) = create_temp_writer(0);
687
688 #[allow(clippy::disallowed_types)] let mut offsets = std::collections::HashMap::new();
690 offsets.insert("topic1".to_string(), 100);
691
692 let pos = writer.append_commit(offsets, Some(12345)).unwrap();
693 assert_eq!(pos, 0);
694 assert!(writer.position() > 0);
695 }
696
697 #[test]
698 fn test_synced_position_tracks_sync() {
699 let (mut writer, _temp_dir) = create_temp_writer(0);
700
701 assert_eq!(writer.position(), 0);
703 assert_eq!(writer.synced_position(), 0);
704
705 writer.append_put(b"key1", b"value1").unwrap();
707 assert!(writer.position() > 0);
708 assert_eq!(writer.synced_position(), 0);
709
710 let pos_before_sync = writer.position();
712 writer.sync().unwrap();
713 assert_eq!(writer.synced_position(), pos_before_sync);
714
715 writer.append_put(b"key2", b"value2").unwrap();
717 assert!(writer.position() > writer.synced_position());
718 assert_eq!(writer.synced_position(), pos_before_sync);
719 }
720
721 #[test]
722 fn test_debug_format() {
723 let (writer, _temp_dir) = create_temp_writer(42);
724 let debug_str = format!("{writer:?}");
725 assert!(debug_str.contains("CoreWalWriter"));
726 assert!(debug_str.contains("42"));
727 }
728
729 #[test]
730 fn test_open_at() {
731 let temp_dir = TempDir::new().unwrap();
732 let path = temp_dir.path().join("wal-0.log");
733
734 {
736 let mut writer = CoreWalWriter::new(0, &path).unwrap();
737 writer.append_put(b"key1", b"value1").unwrap();
738 writer.append_put(b"key2", b"value2").unwrap();
739 writer.sync().unwrap();
740 }
741
742 let file_size = std::fs::metadata(&path).unwrap().len();
744
745 let writer = CoreWalWriter::open_at(0, &path, 0).unwrap();
747 assert_eq!(writer.position(), 0);
748
749 let new_size = std::fs::metadata(&path).unwrap().len();
751 assert!(new_size < file_size);
752 }
753
754 #[test]
755 fn test_append_via_storage_io() {
756 use laminar_core::storage_io::SyncStorageIo;
757
758 let temp_dir = TempDir::new().unwrap();
759 let path = temp_dir.path().join("wal-sio.log");
760
761 let mut writer = CoreWalWriter::new(0, &path).unwrap();
763
764 let mut sio = SyncStorageIo::new();
766 let sio_file = std::fs::OpenOptions::new()
767 .append(true)
768 .open(&path)
769 .unwrap();
770 let fd = sio.register_fd(sio_file).unwrap();
771
772 let pos = writer
774 .append_put_via_storage_io(b"key1", b"value1", &mut sio, fd)
775 .unwrap();
776 assert_eq!(pos, 0);
777 assert!(writer.position() > 0);
778 assert_eq!(writer.sequence(), 1);
779
780 assert!(!writer.is_sync_pending());
782 let _token = writer.sync_via_storage_io(&mut sio, fd).unwrap();
783 assert!(writer.is_sync_pending());
784 assert_eq!(writer.synced_position(), 0); let mut completions = Vec::new();
788 sio.poll_completions(&mut completions);
789 assert!(completions.len() >= 2);
790
791 let synced = writer.check_completions(&completions).unwrap();
793 assert!(synced);
794 assert!(!writer.is_sync_pending());
795 assert_eq!(writer.synced_position(), writer.position());
796
797 let file_size = std::fs::metadata(&path).unwrap().len();
799 assert!(file_size > 0);
800 }
801
802 #[test]
803 fn test_check_completions_no_match() {
804 use laminar_core::storage_io::IoCompletion;
805
806 let (mut writer, _temp_dir) = create_temp_writer(0);
807
808 let completions = vec![IoCompletion {
810 token: 999,
811 result: 0,
812 }];
813 assert!(!writer.check_completions(&completions).unwrap());
814
815 assert!(!writer.check_completions(&[]).unwrap());
817 }
818
819 #[test]
820 fn test_sync_boundary_not_advanced_past_submission() {
821 use laminar_core::storage_io::SyncStorageIo;
822
823 let temp_dir = TempDir::new().unwrap();
824 let path = temp_dir.path().join("wal-boundary.log");
825 let mut writer = CoreWalWriter::new(0, &path).unwrap();
826
827 let mut sio = SyncStorageIo::new();
828 let sio_file = std::fs::OpenOptions::new()
829 .append(true)
830 .open(&path)
831 .unwrap();
832 let fd = sio.register_fd(sio_file).unwrap();
833
834 writer
836 .append_put_via_storage_io(b"k1", b"v1", &mut sio, fd)
837 .unwrap();
838 let pos_at_sync = writer.position();
839 let _token = writer.sync_via_storage_io(&mut sio, fd).unwrap();
840
841 writer
843 .append_put_via_storage_io(b"k2", b"v2", &mut sio, fd)
844 .unwrap();
845 assert!(writer.position() > pos_at_sync);
846
847 let mut completions = Vec::new();
849 sio.poll_completions(&mut completions);
850 let synced = writer.check_completions(&completions).unwrap();
851 assert!(synced);
852 assert_eq!(writer.synced_position(), pos_at_sync);
853 assert!(writer.synced_position() < writer.position());
854 }
855
856 #[test]
857 fn test_overlapping_sync_rejected() {
858 use laminar_core::storage_io::SyncStorageIo;
859
860 let temp_dir = TempDir::new().unwrap();
861 let path = temp_dir.path().join("wal-overlap.log");
862 let mut writer = CoreWalWriter::new(0, &path).unwrap();
863
864 let mut sio = SyncStorageIo::new();
865 let sio_file = std::fs::OpenOptions::new()
866 .append(true)
867 .open(&path)
868 .unwrap();
869 let fd = sio.register_fd(sio_file).unwrap();
870
871 writer
872 .append_put_via_storage_io(b"k1", b"v1", &mut sio, fd)
873 .unwrap();
874 writer.sync_via_storage_io(&mut sio, fd).unwrap();
875
876 let result = writer.sync_via_storage_io(&mut sio, fd);
878 assert!(result.is_err());
879 }
880}