Skip to main content

laminar_core/storage_io/
sync_backend.rs

1//! Synchronous storage I/O backend.
2//!
3//! Executes all I/O inline via `std::fs`. Writes go through the OS page cache.
4//! Sync calls `fdatasync`. Works on all platforms.
5//!
6//! This is the default backend when `io_uring` is not available (Windows, macOS,
7//! older Linux kernels, or when the `io-uring` feature is not enabled).
8//!
9//! ## Completion model
10//!
11//! `submit_write` executes `pwrite`/`write` immediately and stages the
12//! completion internally. `poll_completions` returns the staged completions
13//! on the next call. This matches the submission-completion API contract
14//! while doing zero async work.
15
16use std::collections::VecDeque;
17use std::fmt;
18use std::fs::File;
19use std::io::Write;
20
21use super::{IoCompletion, IoFd, StorageIo, StorageIoError};
22
23/// Maximum registered file descriptors.
24const MAX_FDS: usize = 64;
25
26/// Entry in the file descriptor table.
27struct FdEntry {
28    /// The owned file handle.
29    file: File,
30    /// Current append position (tracked for append mode).
31    append_offset: u64,
32}
33
34/// Synchronous storage I/O backend.
35///
36/// Executes I/O inline in `submit_*` calls. Completions are staged in an
37/// internal ring buffer and returned on the next `poll_completions` call.
38///
39/// Zero heap allocation on the hot path — the completion buffer and fd table
40/// are pre-allocated at construction.
41pub struct SyncStorageIo {
42    /// Registered file descriptors (sparse array, indexed by `IoFd`).
43    fds: Vec<Option<FdEntry>>,
44    /// Staged completions from the last submit batch.
45    completions: VecDeque<IoCompletion>,
46    /// Next token ID.
47    next_token: u64,
48    /// Number of staged (unpolled) completions.
49    pending: usize,
50}
51
52impl SyncStorageIo {
53    /// Create a new synchronous backend.
54    #[must_use]
55    pub fn new() -> Self {
56        let mut fds = Vec::with_capacity(MAX_FDS);
57        fds.resize_with(MAX_FDS, || None);
58        Self {
59            fds,
60            completions: VecDeque::with_capacity(256),
61            next_token: 0,
62            pending: 0,
63        }
64    }
65
66    /// Allocate the next token.
67    fn next_token(&mut self) -> u64 {
68        let t = self.next_token;
69        self.next_token = self.next_token.wrapping_add(1);
70        t
71    }
72
73    /// Find a free slot in the fd table.
74    fn find_free_slot(&self) -> Option<u32> {
75        #[allow(clippy::cast_possible_truncation)]
76        self.fds.iter().position(Option::is_none).map(|i| i as u32)
77    }
78
79    /// Get a mutable reference to an fd entry.
80    fn get_entry(&mut self, fd: IoFd) -> Result<&mut FdEntry, StorageIoError> {
81        self.fds
82            .get_mut(fd.0 as usize)
83            .and_then(|slot| slot.as_mut())
84            .ok_or(StorageIoError::BadFd(fd))
85    }
86
87    /// Stage a completion for the next poll.
88    fn stage_completion(&mut self, token: u64, result: i32) {
89        debug_assert!(
90            self.completions.len() < self.completions.capacity(),
91            "completion queue at capacity {} — poll_completions not draining fast enough",
92            self.completions.capacity()
93        );
94        self.completions.push_back(IoCompletion { token, result });
95        self.pending += 1;
96    }
97}
98
99impl Default for SyncStorageIo {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl StorageIo for SyncStorageIo {
106    fn register_fd(&mut self, file: File) -> Result<IoFd, StorageIoError> {
107        let slot = self.find_free_slot().ok_or(StorageIoError::QueueFull)?;
108
109        // Get current file size for append tracking
110        let append_offset = file.metadata().map(|m| m.len()).unwrap_or(0);
111
112        self.fds[slot as usize] = Some(FdEntry {
113            file,
114            append_offset,
115        });
116        Ok(IoFd(slot))
117    }
118
119    fn deregister_fd(&mut self, fd: IoFd) -> Result<(), StorageIoError> {
120        let slot = self
121            .fds
122            .get_mut(fd.0 as usize)
123            .ok_or(StorageIoError::BadFd(fd))?;
124        if slot.is_none() {
125            return Err(StorageIoError::BadFd(fd));
126        }
127        // Drop the File (closes the fd)
128        *slot = None;
129        Ok(())
130    }
131
132    fn submit_write(&mut self, fd: IoFd, data: &[u8], offset: u64) -> Result<u64, StorageIoError> {
133        let token = self.next_token();
134        let entry = self.get_entry(fd)?;
135
136        // Execute pwrite immediately
137        #[cfg(unix)]
138        {
139            use std::os::unix::fs::FileExt;
140            match entry.file.write_at(data, offset) {
141                Ok(n) => {
142                    debug_assert!(
143                        i32::try_from(n).is_ok(),
144                        "write returned {n} bytes > i32::MAX"
145                    );
146                    let clamped = i32::try_from(n).unwrap_or(i32::MAX);
147                    self.stage_completion(token, clamped);
148                }
149                Err(e) => {
150                    let errno = e.raw_os_error().unwrap_or(-1);
151                    self.stage_completion(token, -errno.abs());
152                }
153            }
154        }
155
156        #[cfg(windows)]
157        {
158            use std::os::windows::fs::FileExt;
159            match entry.file.seek_write(data, offset) {
160                Ok(n) => {
161                    debug_assert!(
162                        i32::try_from(n).is_ok(),
163                        "write returned {n} bytes > i32::MAX"
164                    );
165                    let clamped = i32::try_from(n).unwrap_or(i32::MAX);
166                    self.stage_completion(token, clamped);
167                }
168                Err(e) => {
169                    let code = e.raw_os_error().unwrap_or(-1);
170                    self.stage_completion(token, -code.abs());
171                }
172            }
173        }
174
175        Ok(token)
176    }
177
178    fn submit_append(&mut self, fd: IoFd, data: &[u8]) -> Result<u64, StorageIoError> {
179        let token = self.next_token();
180        let entry = self.get_entry(fd)?;
181
182        // Write at tracked append position, then advance it.
183        // Using write_all instead of pwrite for append semantics —
184        // the file was opened in append mode, so write() appends atomically.
185        match entry.file.write_all(data) {
186            Ok(()) => {
187                debug_assert!(
188                    i32::try_from(data.len()).is_ok(),
189                    "append of {} bytes > i32::MAX",
190                    data.len()
191                );
192                let n = i32::try_from(data.len()).unwrap_or(i32::MAX);
193                entry.append_offset += data.len() as u64;
194                self.stage_completion(token, n);
195            }
196            Err(e) => {
197                let errno = e.raw_os_error().unwrap_or(-1);
198                self.stage_completion(token, -errno.abs());
199            }
200        }
201
202        Ok(token)
203    }
204
205    fn submit_datasync(&mut self, fd: IoFd) -> Result<u64, StorageIoError> {
206        let token = self.next_token();
207        let entry = self.get_entry(fd)?;
208
209        // Flush + fdatasync immediately
210        match entry.file.flush().and_then(|()| entry.file.sync_data()) {
211            Ok(()) => self.stage_completion(token, 0),
212            Err(e) => {
213                let errno = e.raw_os_error().unwrap_or(-1);
214                self.stage_completion(token, -errno.abs());
215            }
216        }
217
218        Ok(token)
219    }
220
221    fn flush(&mut self) -> Result<usize, StorageIoError> {
222        // No-op: sync backend executes I/O inline.
223        Ok(0)
224    }
225
226    fn poll_completions(&mut self, out: &mut Vec<IoCompletion>) {
227        while let Some(c) = self.completions.pop_front() {
228            out.push(c);
229            self.pending -= 1;
230        }
231    }
232
233    fn pending_count(&self) -> usize {
234        self.pending
235    }
236}
237
238impl fmt::Debug for SyncStorageIo {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        let registered = self.fds.iter().filter(|e| e.is_some()).count();
241        f.debug_struct("SyncStorageIo")
242            .field("registered_fds", &registered)
243            .field("pending_completions", &self.completions.len())
244            .field("pending", &self.pending)
245            .field("next_token", &self.next_token)
246            .finish()
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use std::io::Read;
254    use tempfile::NamedTempFile;
255
256    #[test]
257    fn test_register_and_write() {
258        let mut backend = SyncStorageIo::new();
259        let tmp = NamedTempFile::new().unwrap();
260        let path = tmp.path().to_path_buf();
261
262        // Register the file
263        let file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
264        let fd = backend.register_fd(file).unwrap();
265
266        // Submit a write at offset 0
267        let token = backend.submit_write(fd, b"hello", 0).unwrap();
268
269        // Poll completions
270        let mut completions = Vec::new();
271        backend.poll_completions(&mut completions);
272        assert_eq!(completions.len(), 1);
273        assert_eq!(completions[0].token, token);
274        assert_eq!(completions[0].result, 5); // 5 bytes written
275
276        // Verify data
277        let mut contents = String::new();
278        std::fs::File::open(&path)
279            .unwrap()
280            .read_to_string(&mut contents)
281            .unwrap();
282        assert_eq!(contents, "hello");
283
284        // Deregister
285        backend.deregister_fd(fd).unwrap();
286    }
287
288    #[test]
289    fn test_append() {
290        let mut backend = SyncStorageIo::new();
291        let tmp = NamedTempFile::new().unwrap();
292        let path = tmp.path().to_path_buf();
293
294        let file = std::fs::OpenOptions::new()
295            .append(true)
296            .open(&path)
297            .unwrap();
298        let fd = backend.register_fd(file).unwrap();
299
300        backend.submit_append(fd, b"hello").unwrap();
301        backend.submit_append(fd, b" world").unwrap();
302
303        let mut completions = Vec::new();
304        backend.poll_completions(&mut completions);
305        assert_eq!(completions.len(), 2);
306        assert_eq!(completions[0].result, 5);
307        assert_eq!(completions[1].result, 6);
308
309        let contents = std::fs::read_to_string(&path).unwrap();
310        assert_eq!(contents, "hello world");
311
312        backend.deregister_fd(fd).unwrap();
313    }
314
315    #[test]
316    fn test_datasync() {
317        let mut backend = SyncStorageIo::new();
318        let tmp = NamedTempFile::new().unwrap();
319        let path = tmp.path().to_path_buf();
320
321        let file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
322        let fd = backend.register_fd(file).unwrap();
323
324        let token = backend.submit_datasync(fd).unwrap();
325
326        let mut completions = Vec::new();
327        backend.poll_completions(&mut completions);
328        assert_eq!(completions.len(), 1);
329        assert_eq!(completions[0].token, token);
330        assert_eq!(completions[0].result, 0); // success
331
332        backend.deregister_fd(fd).unwrap();
333    }
334
335    #[test]
336    fn test_bad_fd() {
337        let mut backend = SyncStorageIo::new();
338        let bad_fd = IoFd(99);
339        assert!(backend.submit_write(bad_fd, b"x", 0).is_err());
340        assert!(backend.submit_datasync(bad_fd).is_err());
341        assert!(backend.deregister_fd(bad_fd).is_err());
342    }
343
344    #[test]
345    fn test_pending_count() {
346        let mut backend = SyncStorageIo::new();
347        assert_eq!(backend.pending_count(), 0);
348
349        let tmp = NamedTempFile::new().unwrap();
350        let file = std::fs::OpenOptions::new()
351            .write(true)
352            .open(tmp.path())
353            .unwrap();
354        let fd = backend.register_fd(file).unwrap();
355
356        backend.submit_write(fd, b"x", 0).unwrap();
357        assert_eq!(backend.pending_count(), 1);
358
359        backend.submit_write(fd, b"y", 1).unwrap();
360        assert_eq!(backend.pending_count(), 2);
361
362        let mut completions = Vec::new();
363        backend.poll_completions(&mut completions);
364        assert_eq!(backend.pending_count(), 0);
365        assert_eq!(completions.len(), 2);
366    }
367
368    #[test]
369    fn test_flush_is_noop() {
370        let mut backend = SyncStorageIo::new();
371        assert_eq!(backend.flush().unwrap(), 0);
372    }
373
374    #[test]
375    fn test_multiple_fds() {
376        let mut backend = SyncStorageIo::new();
377
378        let tmp1 = NamedTempFile::new().unwrap();
379        let tmp2 = NamedTempFile::new().unwrap();
380
381        let f1 = std::fs::OpenOptions::new()
382            .write(true)
383            .open(tmp1.path())
384            .unwrap();
385        let f2 = std::fs::OpenOptions::new()
386            .write(true)
387            .open(tmp2.path())
388            .unwrap();
389
390        let fd1 = backend.register_fd(f1).unwrap();
391        let fd2 = backend.register_fd(f2).unwrap();
392        assert_ne!(fd1, fd2);
393
394        backend.submit_write(fd1, b"file1", 0).unwrap();
395        backend.submit_write(fd2, b"file2", 0).unwrap();
396
397        let mut completions = Vec::new();
398        backend.poll_completions(&mut completions);
399        assert_eq!(completions.len(), 2);
400
401        assert_eq!(std::fs::read_to_string(tmp1.path()).unwrap(), "file1");
402        assert_eq!(std::fs::read_to_string(tmp2.path()).unwrap(), "file2");
403    }
404}