Skip to main content

laminar_core/storage_io/
uring_backend.rs

1//! `io_uring` storage I/O backend (Linux only, requires `io-uring` feature).
2//!
3//! Wraps [`CoreRingManager`] behind the [`StorageIo`] trait. With SQPOLL enabled,
4//! submissions are zero-syscall — the kernel polling thread picks up SQEs
5//! automatically. Registered buffers eliminate kernel-side copies.
6//!
7//! ## Buffer lifecycle
8//!
9//! `submit_write` acquires a registered buffer, copies data into it, and pushes
10//! a `WriteFixed` SQE. The buffer is marked in-flight until the CQE arrives in
11//! `poll_completions`, at which point it is released back to the pool.
12//!
13//! ## Ordering
14//!
15//! SQEs within a single ring are processed in submission order. For a per-core
16//! WAL writer (single file, single thread), this guarantees that a write
17//! followed by an fdatasync will sync all preceding writes.
18
19use rustc_hash::FxHashMap;
20use std::fs::File;
21use std::os::unix::io::{FromRawFd, IntoRawFd, RawFd};
22
23use crate::io_uring::{Completion, CoreRingManager, IoUringConfig, IoUringError};
24
25use super::{IoCompletion, IoFd, StorageIo, StorageIoError};
26
27/// Maximum registered file descriptors.
28const MAX_FDS: usize = 64;
29
30/// `io_uring` storage I/O backend.
31///
32/// One instance per core. Uses SQPOLL for zero-syscall submission and
33/// registered buffers for zero-copy writes.
34pub struct UringStorageIo {
35    /// The per-core ring manager.
36    manager: CoreRingManager,
37    /// Map from `IoFd` to OS `RawFd`.
38    fd_table: Vec<Option<RawFd>>,
39    /// Reverse map for cleanup.
40    raw_to_io: FxHashMap<RawFd, IoFd>,
41    /// Pre-allocated scratch buffer for completions (zero-alloc poll path).
42    completion_scratch: Vec<Completion>,
43}
44
45impl UringStorageIo {
46    /// Create a new `io_uring` backend for a specific core.
47    ///
48    /// # Errors
49    ///
50    /// Returns an error if ring creation fails (insufficient privileges for
51    /// SQPOLL, unsupported kernel version, etc.).
52    pub fn new(core_id: usize, config: &IoUringConfig) -> Result<Self, IoUringError> {
53        let manager = CoreRingManager::new(core_id, config)?;
54        let mut fd_table = Vec::with_capacity(MAX_FDS);
55        fd_table.resize_with(MAX_FDS, || None);
56        Ok(Self {
57            manager,
58            fd_table,
59            raw_to_io: FxHashMap::default(),
60            completion_scratch: Vec::with_capacity(config.ring_entries as usize),
61        })
62    }
63
64    /// Find a free slot in the fd table.
65    fn find_free_slot(&self) -> Option<u32> {
66        #[allow(clippy::cast_possible_truncation)]
67        self.fd_table
68            .iter()
69            .position(Option::is_none)
70            .map(|i| i as u32)
71    }
72
73    /// Get the raw fd for an `IoFd`.
74    fn raw_fd(&self, fd: IoFd) -> Result<RawFd, StorageIoError> {
75        self.fd_table
76            .get(fd.0 as usize)
77            .and_then(|slot| *slot)
78            .ok_or(StorageIoError::BadFd(fd))
79    }
80
81    /// Returns true if the underlying ring uses SQPOLL.
82    #[must_use]
83    pub fn uses_sqpoll(&self) -> bool {
84        self.manager.uses_sqpoll()
85    }
86}
87
88impl StorageIo for UringStorageIo {
89    fn register_fd(&mut self, file: File) -> Result<IoFd, StorageIoError> {
90        let slot = self.find_free_slot().ok_or(StorageIoError::QueueFull)?;
91        let raw = file.into_raw_fd();
92        self.fd_table[slot as usize] = Some(raw);
93        let io_fd = IoFd(slot);
94        self.raw_to_io.insert(raw, io_fd);
95        Ok(io_fd)
96    }
97
98    fn deregister_fd(&mut self, fd: IoFd) -> Result<(), StorageIoError> {
99        let raw = self.raw_fd(fd)?;
100
101        // Submit close and poll until its CQE arrives. Non-close
102        // completions are processed normally (buffers released) so
103        // we don't leak buffers that complete while we wait.
104        let close_token = self.manager.submit_close(raw).map_err(|e| match e {
105            IoUringError::RingClosed => StorageIoError::Closed,
106            other => StorageIoError::Io(std::io::Error::other(other.to_string())),
107        })?;
108        let _ = self.manager.submit();
109
110        let mut close_result: Option<i32> = None;
111        while close_result.is_none() {
112            self.completion_scratch.clear();
113            self.manager
114                .poll_completions_into(&mut self.completion_scratch);
115
116            for c in self.completion_scratch.drain(..) {
117                if c.user_data == close_token {
118                    close_result = Some(c.result);
119                } else {
120                    // Release buffer for non-close completions.
121                    if let Some(ref op) = c.op {
122                        if let Some(buf_idx) = op.buf_index() {
123                            self.manager.release_buffer(buf_idx);
124                        }
125                    }
126                }
127            }
128
129            if close_result.is_none() {
130                // No CQEs yet — submit and wait for at least one.
131                let _ = self.manager.submit_and_wait(1);
132            }
133        }
134
135        let result = close_result.unwrap_or(0);
136        if result < 0 {
137            return Err(StorageIoError::Io(std::io::Error::from_raw_os_error(
138                -result,
139            )));
140        }
141
142        self.fd_table[fd.0 as usize] = None;
143        self.raw_to_io.remove(&raw);
144        Ok(())
145    }
146
147    fn submit_write(&mut self, fd: IoFd, data: &[u8], offset: u64) -> Result<u64, StorageIoError> {
148        let raw = self.raw_fd(fd)?;
149
150        // Acquire a registered buffer, copy data into it
151        let (buf_idx, buf) = self
152            .manager
153            .acquire_buffer()
154            .map_err(|_| StorageIoError::BufferExhausted)?;
155
156        // Reject writes that exceed the registered buffer size instead
157        // of silently truncating.
158        if data.len() > buf.len() {
159            self.manager.release_buffer(buf_idx);
160            return Err(StorageIoError::Io(std::io::Error::new(
161                std::io::ErrorKind::InvalidInput,
162                "data exceeds registered buffer size",
163            )));
164        }
165        buf[..data.len()].copy_from_slice(data);
166
167        // Submit WriteFixed SQE — release buffer on failure
168        #[allow(clippy::cast_possible_truncation)]
169        let token = match self
170            .manager
171            .submit_write(raw, buf_idx, offset, data.len() as u32)
172        {
173            Ok(t) => t,
174            Err(e) => {
175                self.manager.release_buffer(buf_idx);
176                return Err(match e {
177                    IoUringError::SubmissionQueueFull => StorageIoError::QueueFull,
178                    IoUringError::RingClosed => StorageIoError::Closed,
179                    other => StorageIoError::Io(std::io::Error::other(other.to_string())),
180                });
181            }
182        };
183
184        Ok(token)
185    }
186
187    fn submit_append(&mut self, fd: IoFd, data: &[u8]) -> Result<u64, StorageIoError> {
188        // io_uring: offset = u64::MAX means "append" (Linux 6.0+, IORING_FILE_INDEX_ALLOC)
189        // For older kernels this falls back to offset -1 which is equivalent for
190        // files opened with O_APPEND.
191        self.submit_write(fd, data, u64::MAX)
192    }
193
194    fn submit_datasync(&mut self, fd: IoFd) -> Result<u64, StorageIoError> {
195        let raw = self.raw_fd(fd)?;
196
197        let token = self
198            .manager
199            .submit_sync(raw, true) // datasync=true → FDATASYNC
200            .map_err(|e| match e {
201                IoUringError::SubmissionQueueFull => StorageIoError::QueueFull,
202                IoUringError::RingClosed => StorageIoError::Closed,
203                other => StorageIoError::Io(std::io::Error::other(other.to_string())),
204            })?;
205
206        Ok(token)
207    }
208
209    fn flush(&mut self) -> Result<usize, StorageIoError> {
210        // In SQPOLL mode this is typically a no-op.
211        // In standard mode this calls ring.submit().
212        self.manager.submit().map_err(|e| match e {
213            IoUringError::RingClosed => StorageIoError::Closed,
214            other => StorageIoError::Io(std::io::Error::other(other.to_string())),
215        })
216    }
217
218    fn poll_completions(&mut self, out: &mut Vec<IoCompletion>) {
219        // Use the zero-alloc path: poll into pre-allocated scratch buffer,
220        // then convert to IoCompletion in the caller's output buffer.
221        self.completion_scratch.clear();
222        self.manager
223            .poll_completions_into(&mut self.completion_scratch);
224        for c in self.completion_scratch.drain(..) {
225            // Release the registered buffer back to the pool now that the
226            // kernel is done with it.
227            if let Some(ref op) = c.op {
228                if let Some(buf_idx) = op.buf_index() {
229                    self.manager.release_buffer(buf_idx);
230                }
231            }
232            out.push(IoCompletion {
233                token: c.user_data,
234                result: c.result,
235            });
236        }
237    }
238
239    fn pending_count(&self) -> usize {
240        self.manager.pending_count()
241    }
242}
243
244impl Drop for UringStorageIo {
245    fn drop(&mut self) {
246        // Drain the ring first so no in-flight SQEs reference our fds.
247        self.manager.close();
248
249        // Now safe to close fds — kernel is done with them.
250        for slot in &mut self.fd_table {
251            if let Some(raw) = slot.take() {
252                // SAFETY: we own this fd (into_raw_fd in register_fd).
253                drop(unsafe { File::from_raw_fd(raw) });
254            }
255        }
256        self.raw_to_io.clear();
257    }
258}
259
260impl std::fmt::Debug for UringStorageIo {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        let registered = self.fd_table.iter().filter(|e| e.is_some()).count();
263        f.debug_struct("UringStorageIo")
264            .field("core_id", &self.manager.core_id())
265            .field("uses_sqpoll", &self.manager.uses_sqpoll())
266            .field("registered_fds", &registered)
267            .field("reverse_map_len", &self.raw_to_io.len())
268            .field(
269                "completion_scratch_cap",
270                &self.completion_scratch.capacity(),
271            )
272            .field("pending", &self.manager.pending_count())
273            .finish()
274    }
275}