Skip to main content

laminar_storage/per_core_wal/
writer.rs

1//! Per-core WAL writer for lock-free segment writes.
2//!
3//! Supports two write paths:
4//! - **Direct** (`append` / `sync`): uses `BufWriter<File>` — always available.
5//! - **Storage I/O** (`append_via_storage_io` / `sync_via_storage_io`): uses the
6//!   [`StorageIo`] trait for non-blocking writes. On Linux with `io_uring` +
7//!   SQPOLL this is zero-syscall. On Windows it falls back to synchronous I/O
8//!   through the same trait interface.
9
10use std::fs::{File, OpenOptions};
11use std::io::{BufWriter, Write};
12use std::path::{Path, PathBuf};
13use std::time::Instant;
14
15use laminar_core::storage_io::{IoCompletion, IoFd, StorageIo, StorageIoError};
16use rkyv::api::high;
17use rkyv::rancor::Error as RkyvError;
18use rkyv::util::AlignedVec;
19
20use super::entry::PerCoreWalEntry;
21use super::error::PerCoreWalError;
22
23/// Size of the record header (length + CRC32).
24const RECORD_HEADER_SIZE: u64 = 8;
25
26/// Per-core WAL writer.
27///
28/// Each core owns its own writer, eliminating cross-core synchronization on the write path.
29/// Record format is compatible with `[length: 4][crc32: 4][data: length]`
30pub struct CoreWalWriter {
31    /// Core ID this writer belongs to.
32    core_id: usize,
33    /// Buffered writer for efficient writes.
34    writer: BufWriter<File>,
35    /// Path to the segment file.
36    path: PathBuf,
37    /// Current write position in bytes (includes buffered, un-synced data).
38    position: u64,
39    /// Last synced position (data confirmed durable via `fdatasync`).
40    ///
41    /// Only this position is safe for checkpoint manifests — data beyond
42    /// it may be lost on crash.
43    synced_position: u64,
44    /// Current epoch (set by manager during checkpoint).
45    epoch: u64,
46    /// Core-local sequence number (monotonically increasing).
47    sequence: u64,
48    /// Last sync time for group commit.
49    last_sync: Instant,
50    /// Number of entries since last sync.
51    entries_since_sync: u64,
52    /// Pre-allocated write buffer reused across `append()` calls.
53    /// Grows to high-water mark and stays, eliminating per-append allocation.
54    write_buffer: Vec<u8>,
55    /// Reusable rkyv serialization buffer (avoids `AlignedVec` alloc per append).
56    serialize_buffer: AlignedVec,
57    /// Pending sync state: (token, position boundary at submission time).
58    /// The boundary is the `position` when `sync_via_storage_io` was called.
59    /// On completion, `synced_position` advances to the boundary — not to
60    /// the current `position`, which may have advanced from later appends.
61    pending_sync: Option<PendingSync>,
62}
63
64/// In-flight fdatasync: token + position at submission time.
65#[derive(Debug, Clone, Copy)]
66struct PendingSync {
67    token: u64,
68    /// Only data up to this offset is durable when the CQE arrives.
69    boundary: u64,
70}
71
72impl CoreWalWriter {
73    /// Creates a new per-core WAL writer.
74    ///
75    /// # Arguments
76    ///
77    /// * `core_id` - The core ID for this writer
78    /// * `path` - Path to the segment file
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the file cannot be created or opened.
83    pub fn new(core_id: usize, path: &Path) -> Result<Self, PerCoreWalError> {
84        let file = OpenOptions::new().create(true).append(true).open(path)?;
85
86        let position = file.metadata()?.len();
87
88        Ok(Self {
89            core_id,
90            writer: BufWriter::with_capacity(64 * 1024, file), // 64KB buffer
91            path: path.to_path_buf(),
92            position,
93            synced_position: position,
94            epoch: 0,
95            sequence: 0,
96            last_sync: Instant::now(),
97            entries_since_sync: 0,
98            write_buffer: Vec::with_capacity(4096),
99            serialize_buffer: AlignedVec::with_capacity(256),
100            pending_sync: None,
101        })
102    }
103
104    /// Opens an existing segment file at a specific position.
105    ///
106    /// Used during recovery to resume writing.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the file cannot be opened or truncated.
111    pub fn open_at(core_id: usize, path: &Path, position: u64) -> Result<Self, PerCoreWalError> {
112        let file = OpenOptions::new().write(true).open(path)?;
113
114        // Truncate to the specified position (in case of torn writes)
115        file.set_len(position)?;
116
117        let file = OpenOptions::new().append(true).open(path)?;
118
119        Ok(Self {
120            core_id,
121            writer: BufWriter::with_capacity(64 * 1024, file),
122            path: path.to_path_buf(),
123            position,
124            synced_position: position,
125            epoch: 0,
126            sequence: 0,
127            last_sync: Instant::now(),
128            entries_since_sync: 0,
129            write_buffer: Vec::with_capacity(4096),
130            serialize_buffer: AlignedVec::with_capacity(256),
131            pending_sync: None,
132        })
133    }
134
135    /// Returns the core ID for this writer.
136    #[must_use]
137    pub fn core_id(&self) -> usize {
138        self.core_id
139    }
140
141    /// Returns the current write position in bytes (includes un-synced data).
142    #[must_use]
143    pub fn position(&self) -> u64 {
144        self.position
145    }
146
147    /// Returns the last synced position (durable after `fdatasync`).
148    ///
149    /// Only data up to this position is guaranteed to survive a crash.
150    /// Use this for checkpoint manifests instead of [`position()`](Self::position).
151    #[must_use]
152    pub fn synced_position(&self) -> u64 {
153        self.synced_position
154    }
155
156    /// Returns the current epoch.
157    #[must_use]
158    pub fn epoch(&self) -> u64 {
159        self.epoch
160    }
161
162    /// Returns the current sequence number.
163    #[must_use]
164    pub fn sequence(&self) -> u64 {
165        self.sequence
166    }
167
168    /// Returns the path to the segment file.
169    #[must_use]
170    pub fn path(&self) -> &Path {
171        &self.path
172    }
173
174    /// Returns the number of entries since last sync.
175    #[must_use]
176    pub fn entries_since_sync(&self) -> u64 {
177        self.entries_since_sync
178    }
179
180    /// Sets the current epoch (called by manager during checkpoint).
181    pub fn set_epoch(&mut self, epoch: u64) {
182        self.epoch = epoch;
183    }
184
185    /// Appends a Put operation to the WAL.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if serialization or I/O fails.
190    #[inline]
191    #[allow(clippy::cast_possible_truncation)] // core_id bounded by physical CPU count (< u16::MAX)
192    pub fn append_put(&mut self, key: &[u8], value: &[u8]) -> Result<u64, PerCoreWalError> {
193        let ts = PerCoreWalEntry::now_ns();
194        let entry = PerCoreWalEntry::put(
195            self.core_id as u16,
196            self.epoch,
197            self.sequence,
198            key.to_vec(),
199            value.to_vec(),
200            ts,
201        );
202        self.append(&entry)
203    }
204
205    /// Appends a Delete operation to the WAL.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if serialization or I/O fails.
210    #[inline]
211    #[allow(clippy::cast_possible_truncation)] // core_id bounded by physical CPU count (< u16::MAX)
212    pub fn append_delete(&mut self, key: &[u8]) -> Result<u64, PerCoreWalError> {
213        let ts = PerCoreWalEntry::now_ns();
214        let entry = PerCoreWalEntry::delete(
215            self.core_id as u16,
216            self.epoch,
217            self.sequence,
218            key.to_vec(),
219            ts,
220        );
221        self.append(&entry)
222    }
223
224    /// Appends a Put operation via [`StorageIo`].
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if serialization or I/O submission fails.
229    #[inline]
230    #[allow(clippy::cast_possible_truncation)]
231    pub fn append_put_via_storage_io(
232        &mut self,
233        key: &[u8],
234        value: &[u8],
235        sio: &mut dyn StorageIo,
236        fd: IoFd,
237    ) -> Result<u64, PerCoreWalError> {
238        let ts = PerCoreWalEntry::now_ns();
239        let entry = PerCoreWalEntry::put(
240            self.core_id as u16,
241            self.epoch,
242            self.sequence,
243            key.to_vec(),
244            value.to_vec(),
245            ts,
246        );
247        self.append_via_storage_io(&entry, sio, fd)
248    }
249
250    /// Appends a Delete operation via [`StorageIo`].
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if serialization or I/O submission fails.
255    #[inline]
256    #[allow(clippy::cast_possible_truncation)]
257    pub fn append_delete_via_storage_io(
258        &mut self,
259        key: &[u8],
260        sio: &mut dyn StorageIo,
261        fd: IoFd,
262    ) -> Result<u64, PerCoreWalError> {
263        let ts = PerCoreWalEntry::now_ns();
264        let entry = PerCoreWalEntry::delete(
265            self.core_id as u16,
266            self.epoch,
267            self.sequence,
268            key.to_vec(),
269            ts,
270        );
271        self.append_via_storage_io(&entry, sio, fd)
272    }
273
274    /// Serialize an entry into `self.write_buffer` (header + CRC + data).
275    ///
276    /// Shared by [`append`] and [`append_via_storage_io`]. After this call,
277    /// `self.write_buffer` contains the complete record ready for I/O.
278    fn serialize_entry(&mut self, entry: &PerCoreWalEntry) -> Result<(), PerCoreWalError> {
279        self.serialize_buffer.clear();
280        let taken = std::mem::take(&mut self.serialize_buffer);
281        let bytes = high::to_bytes_in::<_, RkyvError>(entry, taken)
282            .map_err(|e| PerCoreWalError::Serialization(e.to_string()))?;
283
284        let crc = crc32c::crc32c(&bytes);
285
286        #[allow(clippy::cast_possible_truncation)]
287        let len = bytes.len() as u32;
288        self.write_buffer.clear();
289        #[allow(clippy::cast_possible_truncation)]
290        self.write_buffer
291            .reserve(RECORD_HEADER_SIZE as usize + bytes.len());
292        self.write_buffer.extend_from_slice(&len.to_le_bytes());
293        self.write_buffer.extend_from_slice(&crc.to_le_bytes());
294        self.write_buffer.extend_from_slice(&bytes);
295
296        self.serialize_buffer = bytes;
297        Ok(())
298    }
299
300    /// Advance position/sequence counters after a successful write.
301    fn advance_position(&mut self) {
302        #[allow(clippy::cast_possible_truncation)]
303        let data_len = self.write_buffer.len() as u64 - RECORD_HEADER_SIZE;
304        self.position += RECORD_HEADER_SIZE + data_len;
305        self.sequence += 1;
306        self.entries_since_sync += 1;
307    }
308
309    /// Appends a raw entry to the WAL.
310    ///
311    /// Record format: `[length: 4 bytes][crc32: 4 bytes][data: length bytes]`
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if serialization or I/O fails.
316    pub fn append(&mut self, entry: &PerCoreWalEntry) -> Result<u64, PerCoreWalError> {
317        let start_pos = self.position;
318        self.serialize_entry(entry)?;
319        self.writer.write_all(&self.write_buffer)?;
320        self.advance_position();
321        Ok(start_pos)
322    }
323
324    /// Appends a raw entry to the WAL via a [`StorageIo`] backend.
325    ///
326    /// Identical to [`append`](Self::append) but uses non-blocking I/O.
327    /// The caller must poll `sio.poll_completions()` to drain completions.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if serialization fails or the I/O submission fails.
332    pub fn append_via_storage_io(
333        &mut self,
334        entry: &PerCoreWalEntry,
335        sio: &mut dyn StorageIo,
336        fd: IoFd,
337    ) -> Result<u64, PerCoreWalError> {
338        let start_pos = self.position;
339        self.serialize_entry(entry)?;
340        sio.submit_append(fd, &self.write_buffer)
341            .map_err(storage_io_to_wal_error)?;
342        self.advance_position();
343        Ok(start_pos)
344    }
345
346    /// Syncs the WAL segment via a [`StorageIo`] backend.
347    ///
348    /// Submits an `fdatasync` through the I/O backend. Non-blocking — the
349    /// caller must poll for the completion token.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if the submission fails.
354    pub fn sync_via_storage_io(
355        &mut self,
356        sio: &mut dyn StorageIo,
357        fd: IoFd,
358    ) -> Result<u64, PerCoreWalError> {
359        if self.pending_sync.is_some() {
360            return Err(PerCoreWalError::Io(std::io::Error::other(
361                "sync already in flight — wait for completion before submitting another",
362            )));
363        }
364        let token = sio.submit_datasync(fd).map_err(storage_io_to_wal_error)?;
365        self.pending_sync = Some(PendingSync {
366            token,
367            boundary: self.position,
368        });
369        Ok(token)
370    }
371
372    /// Check I/O completions and update sync state.
373    ///
374    /// Scans `completions` for the pending sync token. If found and
375    /// successful, calls [`mark_synced`](Self::mark_synced). If found
376    /// and failed (negative result = errno), returns an error.
377    ///
378    /// Returns `Ok(true)` if the sync completed successfully,
379    /// `Ok(false)` if no matching completion was found,
380    /// or `Err` if the sync failed.
381    ///
382    /// Call this after every `StorageIo::poll_completions` round.
383    ///
384    /// # Errors
385    ///
386    /// Returns `PerCoreWalError::Io` if the kernel reported a sync failure.
387    pub fn check_completions(
388        &mut self,
389        completions: &[IoCompletion],
390    ) -> Result<bool, PerCoreWalError> {
391        let Some(pending) = self.pending_sync else {
392            return Ok(false);
393        };
394        for c in completions {
395            if c.token == pending.token {
396                self.pending_sync = None;
397                if c.result >= 0 {
398                    // Advance synced_position only to the boundary that was
399                    // current when the sync was submitted — not to the
400                    // current position, which may include later appends.
401                    self.synced_position = pending.boundary;
402                    self.last_sync = Instant::now();
403                    self.entries_since_sync = 0;
404                    return Ok(true);
405                }
406                return Err(PerCoreWalError::Io(std::io::Error::from_raw_os_error(
407                    -c.result,
408                )));
409            }
410        }
411        Ok(false)
412    }
413
414    /// Returns `true` if a sync is pending (submitted but not yet completed).
415    #[must_use]
416    pub fn is_sync_pending(&self) -> bool {
417        self.pending_sync.is_some()
418    }
419
420    /// Mark the current write position as synced (durable).
421    ///
422    /// Used by the direct (non-`StorageIo`) sync path after a blocking
423    /// `fdatasync` where all data up to `position` is guaranteed durable.
424    /// The `StorageIo` path uses [`Self::check_completions`] instead, which
425    /// advances only to the boundary recorded at submission time.
426    pub fn mark_synced(&mut self) {
427        self.synced_position = self.position;
428        self.last_sync = Instant::now();
429        self.entries_since_sync = 0;
430    }
431
432    /// Appends a Checkpoint marker to the WAL.
433    ///
434    /// # Errors
435    ///
436    /// Returns an error if serialization or I/O fails.
437    #[allow(clippy::cast_possible_truncation)] // core_id bounded by physical CPU count (< u16::MAX)
438    pub fn append_checkpoint(&mut self, checkpoint_id: u64) -> Result<u64, PerCoreWalError> {
439        let ts = PerCoreWalEntry::now_ns();
440        let entry = PerCoreWalEntry::checkpoint(
441            self.core_id as u16,
442            self.epoch,
443            self.sequence,
444            checkpoint_id,
445            ts,
446        );
447        self.append(&entry)
448    }
449
450    /// Appends an `EpochBarrier` to the WAL.
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if serialization or I/O fails.
455    #[allow(clippy::cast_possible_truncation)] // core_id bounded by physical CPU count (< u16::MAX)
456    pub fn append_epoch_barrier(&mut self) -> Result<u64, PerCoreWalError> {
457        let ts = PerCoreWalEntry::now_ns();
458        let entry =
459            PerCoreWalEntry::epoch_barrier(self.core_id as u16, self.epoch, self.sequence, ts);
460        self.append(&entry)
461    }
462
463    /// Appends a Commit entry to the WAL.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if serialization or I/O fails.
468    #[allow(clippy::cast_possible_truncation)] // core_id bounded by physical CPU count (< u16::MAX)
469    #[allow(clippy::disallowed_types)] // cold path: WAL coordination
470    pub fn append_commit(
471        &mut self,
472        offsets: std::collections::HashMap<String, u64>,
473        watermark: Option<i64>,
474    ) -> Result<u64, PerCoreWalError> {
475        let ts = PerCoreWalEntry::now_ns();
476        let entry = PerCoreWalEntry::commit(
477            self.core_id as u16,
478            self.epoch,
479            self.sequence,
480            offsets,
481            watermark,
482            ts,
483        );
484        self.append(&entry)
485    }
486
487    /// Syncs the WAL segment to disk using fdatasync.
488    ///
489    /// Uses `sync_data()` instead of `sync_all()` for better performance.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if the sync fails.
494    pub fn sync(&mut self) -> Result<(), PerCoreWalError> {
495        self.writer.flush()?;
496        // Use sync_data() (fdatasync) instead of sync_all() (fsync)
497        self.writer.get_ref().sync_data()?;
498        self.synced_position = self.position;
499        self.last_sync = Instant::now();
500        self.entries_since_sync = 0;
501        Ok(())
502    }
503
504    /// Truncates the segment file at the specified position.
505    ///
506    /// Used after checkpoint to remove entries that have been checkpointed.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if truncation fails.
511    pub fn truncate(&mut self, position: u64) -> Result<(), PerCoreWalError> {
512        self.sync()?;
513
514        // Close current writer by dropping, then truncate
515        let file = OpenOptions::new()
516            .write(true)
517            .truncate(false)
518            .open(&self.path)?;
519
520        // Sync after truncation to make it durable.
521        // Without this, a crash could leave the file at its old length.
522        file.set_len(position)?;
523        file.sync_all()?;
524
525        // Reopen for append
526        let file = OpenOptions::new().append(true).open(&self.path)?;
527
528        self.writer = BufWriter::with_capacity(64 * 1024, file);
529        self.position = position;
530        self.synced_position = position;
531
532        Ok(())
533    }
534
535    /// Resets the segment (truncates to zero).
536    ///
537    /// Used after successful checkpoint to clear the WAL.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if truncation fails.
542    pub fn reset(&mut self) -> Result<(), PerCoreWalError> {
543        self.truncate(0)?;
544        self.sequence = 0;
545        Ok(())
546    }
547}
548
549/// Convert a [`StorageIoError`] to a [`PerCoreWalError`].
550fn storage_io_to_wal_error(e: StorageIoError) -> PerCoreWalError {
551    match e {
552        StorageIoError::Io(io_err) => PerCoreWalError::Io(io_err),
553        other => PerCoreWalError::Io(std::io::Error::other(other.to_string())),
554    }
555}
556
557impl std::fmt::Debug for CoreWalWriter {
558    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559        f.debug_struct("CoreWalWriter")
560            .field("core_id", &self.core_id)
561            .field("path", &self.path)
562            .field("position", &self.position)
563            .field("synced_position", &self.synced_position)
564            .field("epoch", &self.epoch)
565            .field("sequence", &self.sequence)
566            .field("entries_since_sync", &self.entries_since_sync)
567            .finish_non_exhaustive()
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use tempfile::TempDir;
575
576    fn create_temp_writer(core_id: usize) -> (CoreWalWriter, TempDir) {
577        let temp_dir = TempDir::new().unwrap();
578        let path = temp_dir.path().join(format!("wal-{core_id}.log"));
579        let writer = CoreWalWriter::new(core_id, &path).unwrap();
580        (writer, temp_dir)
581    }
582
583    #[test]
584    fn test_writer_creation() {
585        let (writer, _temp_dir) = create_temp_writer(0);
586        assert_eq!(writer.core_id(), 0);
587        assert_eq!(writer.position(), 0);
588        assert_eq!(writer.epoch(), 0);
589        assert_eq!(writer.sequence(), 0);
590    }
591
592    #[test]
593    fn test_append_put() {
594        let (mut writer, _temp_dir) = create_temp_writer(0);
595
596        let pos = writer.append_put(b"key1", b"value1").unwrap();
597        assert_eq!(pos, 0);
598        assert!(writer.position() > 0);
599        assert_eq!(writer.sequence(), 1);
600
601        let pos2 = writer.append_put(b"key2", b"value2").unwrap();
602        assert!(pos2 > pos);
603        assert_eq!(writer.sequence(), 2);
604    }
605
606    #[test]
607    fn test_append_delete() {
608        let (mut writer, _temp_dir) = create_temp_writer(1);
609
610        let pos = writer.append_delete(b"key1").unwrap();
611        assert_eq!(pos, 0);
612        assert!(writer.position() > 0);
613    }
614
615    #[test]
616    fn test_sync() {
617        let (mut writer, _temp_dir) = create_temp_writer(0);
618
619        writer.append_put(b"key1", b"value1").unwrap();
620        assert_eq!(writer.entries_since_sync(), 1);
621
622        writer.sync().unwrap();
623        assert_eq!(writer.entries_since_sync(), 0);
624    }
625
626    #[test]
627    fn test_epoch_setting() {
628        let (mut writer, _temp_dir) = create_temp_writer(0);
629
630        assert_eq!(writer.epoch(), 0);
631        writer.set_epoch(5);
632        assert_eq!(writer.epoch(), 5);
633    }
634
635    #[test]
636    fn test_truncate() {
637        let (mut writer, _temp_dir) = create_temp_writer(0);
638
639        writer.append_put(b"key1", b"value1").unwrap();
640        let pos1 = writer.position();
641
642        writer.append_put(b"key2", b"value2").unwrap();
643        let pos2 = writer.position();
644
645        assert!(pos2 > pos1);
646
647        writer.truncate(pos1).unwrap();
648        assert_eq!(writer.position(), pos1);
649    }
650
651    #[test]
652    fn test_reset() {
653        let (mut writer, _temp_dir) = create_temp_writer(0);
654
655        writer.append_put(b"key1", b"value1").unwrap();
656        writer.append_put(b"key2", b"value2").unwrap();
657        assert!(writer.position() > 0);
658        assert_eq!(writer.sequence(), 2);
659
660        writer.reset().unwrap();
661        assert_eq!(writer.position(), 0);
662        assert_eq!(writer.sequence(), 0);
663    }
664
665    #[test]
666    fn test_append_checkpoint() {
667        let (mut writer, _temp_dir) = create_temp_writer(0);
668
669        let pos = writer.append_checkpoint(100).unwrap();
670        assert_eq!(pos, 0);
671        assert!(writer.position() > 0);
672    }
673
674    #[test]
675    fn test_append_epoch_barrier() {
676        let (mut writer, _temp_dir) = create_temp_writer(0);
677        writer.set_epoch(5);
678
679        let pos = writer.append_epoch_barrier().unwrap();
680        assert_eq!(pos, 0);
681        assert!(writer.position() > 0);
682    }
683
684    #[test]
685    fn test_append_commit() {
686        let (mut writer, _temp_dir) = create_temp_writer(0);
687
688        #[allow(clippy::disallowed_types)] // cold path: WAL coordination
689        let mut offsets = std::collections::HashMap::new();
690        offsets.insert("topic1".to_string(), 100);
691
692        let pos = writer.append_commit(offsets, Some(12345)).unwrap();
693        assert_eq!(pos, 0);
694        assert!(writer.position() > 0);
695    }
696
697    #[test]
698    fn test_synced_position_tracks_sync() {
699        let (mut writer, _temp_dir) = create_temp_writer(0);
700
701        // Initially both positions are 0
702        assert_eq!(writer.position(), 0);
703        assert_eq!(writer.synced_position(), 0);
704
705        // After write, position advances but synced_position stays
706        writer.append_put(b"key1", b"value1").unwrap();
707        assert!(writer.position() > 0);
708        assert_eq!(writer.synced_position(), 0);
709
710        // After sync, synced_position catches up
711        let pos_before_sync = writer.position();
712        writer.sync().unwrap();
713        assert_eq!(writer.synced_position(), pos_before_sync);
714
715        // Another write without sync
716        writer.append_put(b"key2", b"value2").unwrap();
717        assert!(writer.position() > writer.synced_position());
718        assert_eq!(writer.synced_position(), pos_before_sync);
719    }
720
721    #[test]
722    fn test_debug_format() {
723        let (writer, _temp_dir) = create_temp_writer(42);
724        let debug_str = format!("{writer:?}");
725        assert!(debug_str.contains("CoreWalWriter"));
726        assert!(debug_str.contains("42"));
727    }
728
729    #[test]
730    fn test_open_at() {
731        let temp_dir = TempDir::new().unwrap();
732        let path = temp_dir.path().join("wal-0.log");
733
734        // Create and write some data
735        {
736            let mut writer = CoreWalWriter::new(0, &path).unwrap();
737            writer.append_put(b"key1", b"value1").unwrap();
738            writer.append_put(b"key2", b"value2").unwrap();
739            writer.sync().unwrap();
740        }
741
742        // Get file size
743        let file_size = std::fs::metadata(&path).unwrap().len();
744
745        // Open at position 0 (truncates everything)
746        let writer = CoreWalWriter::open_at(0, &path, 0).unwrap();
747        assert_eq!(writer.position(), 0);
748
749        // File should be truncated
750        let new_size = std::fs::metadata(&path).unwrap().len();
751        assert!(new_size < file_size);
752    }
753
754    #[test]
755    fn test_append_via_storage_io() {
756        use laminar_core::storage_io::SyncStorageIo;
757
758        let temp_dir = TempDir::new().unwrap();
759        let path = temp_dir.path().join("wal-sio.log");
760
761        // Create the writer (owns the BufWriter path)
762        let mut writer = CoreWalWriter::new(0, &path).unwrap();
763
764        // Create a SyncStorageIo backend and register the same file
765        let mut sio = SyncStorageIo::new();
766        let sio_file = std::fs::OpenOptions::new()
767            .append(true)
768            .open(&path)
769            .unwrap();
770        let fd = sio.register_fd(sio_file).unwrap();
771
772        // Write via storage I/O path
773        let pos = writer
774            .append_put_via_storage_io(b"key1", b"value1", &mut sio, fd)
775            .unwrap();
776        assert_eq!(pos, 0);
777        assert!(writer.position() > 0);
778        assert_eq!(writer.sequence(), 1);
779
780        // Sync via storage I/O path — stores token internally
781        assert!(!writer.is_sync_pending());
782        let _token = writer.sync_via_storage_io(&mut sio, fd).unwrap();
783        assert!(writer.is_sync_pending());
784        assert_eq!(writer.synced_position(), 0); // not yet synced
785
786        // Drain completions (write + sync)
787        let mut completions = Vec::new();
788        sio.poll_completions(&mut completions);
789        assert!(completions.len() >= 2);
790
791        // Pass completions to writer — it matches the sync token and calls mark_synced
792        let synced = writer.check_completions(&completions).unwrap();
793        assert!(synced);
794        assert!(!writer.is_sync_pending());
795        assert_eq!(writer.synced_position(), writer.position());
796
797        // Verify data is on disk and readable via the standard reader
798        let file_size = std::fs::metadata(&path).unwrap().len();
799        assert!(file_size > 0);
800    }
801
802    #[test]
803    fn test_check_completions_no_match() {
804        use laminar_core::storage_io::IoCompletion;
805
806        let (mut writer, _temp_dir) = create_temp_writer(0);
807
808        // No pending sync → returns Ok(false)
809        let completions = vec![IoCompletion {
810            token: 999,
811            result: 0,
812        }];
813        assert!(!writer.check_completions(&completions).unwrap());
814
815        // Empty completions → returns Ok(false)
816        assert!(!writer.check_completions(&[]).unwrap());
817    }
818
819    #[test]
820    fn test_sync_boundary_not_advanced_past_submission() {
821        use laminar_core::storage_io::SyncStorageIo;
822
823        let temp_dir = TempDir::new().unwrap();
824        let path = temp_dir.path().join("wal-boundary.log");
825        let mut writer = CoreWalWriter::new(0, &path).unwrap();
826
827        let mut sio = SyncStorageIo::new();
828        let sio_file = std::fs::OpenOptions::new()
829            .append(true)
830            .open(&path)
831            .unwrap();
832        let fd = sio.register_fd(sio_file).unwrap();
833
834        // Write, then sync
835        writer
836            .append_put_via_storage_io(b"k1", b"v1", &mut sio, fd)
837            .unwrap();
838        let pos_at_sync = writer.position();
839        let _token = writer.sync_via_storage_io(&mut sio, fd).unwrap();
840
841        // Write MORE after sync was submitted
842        writer
843            .append_put_via_storage_io(b"k2", b"v2", &mut sio, fd)
844            .unwrap();
845        assert!(writer.position() > pos_at_sync);
846
847        // Drain and check — synced_position must be pos_at_sync, NOT position()
848        let mut completions = Vec::new();
849        sio.poll_completions(&mut completions);
850        let synced = writer.check_completions(&completions).unwrap();
851        assert!(synced);
852        assert_eq!(writer.synced_position(), pos_at_sync);
853        assert!(writer.synced_position() < writer.position());
854    }
855
856    #[test]
857    fn test_overlapping_sync_rejected() {
858        use laminar_core::storage_io::SyncStorageIo;
859
860        let temp_dir = TempDir::new().unwrap();
861        let path = temp_dir.path().join("wal-overlap.log");
862        let mut writer = CoreWalWriter::new(0, &path).unwrap();
863
864        let mut sio = SyncStorageIo::new();
865        let sio_file = std::fs::OpenOptions::new()
866            .append(true)
867            .open(&path)
868            .unwrap();
869        let fd = sio.register_fd(sio_file).unwrap();
870
871        writer
872            .append_put_via_storage_io(b"k1", b"v1", &mut sio, fd)
873            .unwrap();
874        writer.sync_via_storage_io(&mut sio, fd).unwrap();
875
876        // Second sync while first is pending → error
877        let result = writer.sync_via_storage_io(&mut sio, fd);
878        assert!(result.is_err());
879    }
880}