laminar_core/storage_io/
uring_backend.rs1use rustc_hash::FxHashMap;
20use std::fs::File;
21use std::os::unix::io::{FromRawFd, IntoRawFd, RawFd};
22
23use crate::io_uring::{Completion, CoreRingManager, IoUringConfig, IoUringError};
24
25use super::{IoCompletion, IoFd, StorageIo, StorageIoError};
26
27const MAX_FDS: usize = 64;
29
30pub struct UringStorageIo {
35 manager: CoreRingManager,
37 fd_table: Vec<Option<RawFd>>,
39 raw_to_io: FxHashMap<RawFd, IoFd>,
41 completion_scratch: Vec<Completion>,
43}
44
45impl UringStorageIo {
46 pub fn new(core_id: usize, config: &IoUringConfig) -> Result<Self, IoUringError> {
53 let manager = CoreRingManager::new(core_id, config)?;
54 let mut fd_table = Vec::with_capacity(MAX_FDS);
55 fd_table.resize_with(MAX_FDS, || None);
56 Ok(Self {
57 manager,
58 fd_table,
59 raw_to_io: FxHashMap::default(),
60 completion_scratch: Vec::with_capacity(config.ring_entries as usize),
61 })
62 }
63
64 fn find_free_slot(&self) -> Option<u32> {
66 #[allow(clippy::cast_possible_truncation)]
67 self.fd_table
68 .iter()
69 .position(Option::is_none)
70 .map(|i| i as u32)
71 }
72
73 fn raw_fd(&self, fd: IoFd) -> Result<RawFd, StorageIoError> {
75 self.fd_table
76 .get(fd.0 as usize)
77 .and_then(|slot| *slot)
78 .ok_or(StorageIoError::BadFd(fd))
79 }
80
81 #[must_use]
83 pub fn uses_sqpoll(&self) -> bool {
84 self.manager.uses_sqpoll()
85 }
86}
87
88impl StorageIo for UringStorageIo {
89 fn register_fd(&mut self, file: File) -> Result<IoFd, StorageIoError> {
90 let slot = self.find_free_slot().ok_or(StorageIoError::QueueFull)?;
91 let raw = file.into_raw_fd();
92 self.fd_table[slot as usize] = Some(raw);
93 let io_fd = IoFd(slot);
94 self.raw_to_io.insert(raw, io_fd);
95 Ok(io_fd)
96 }
97
98 fn deregister_fd(&mut self, fd: IoFd) -> Result<(), StorageIoError> {
99 let raw = self.raw_fd(fd)?;
100
101 let close_token = self.manager.submit_close(raw).map_err(|e| match e {
105 IoUringError::RingClosed => StorageIoError::Closed,
106 other => StorageIoError::Io(std::io::Error::other(other.to_string())),
107 })?;
108 let _ = self.manager.submit();
109
110 let mut close_result: Option<i32> = None;
111 while close_result.is_none() {
112 self.completion_scratch.clear();
113 self.manager
114 .poll_completions_into(&mut self.completion_scratch);
115
116 for c in self.completion_scratch.drain(..) {
117 if c.user_data == close_token {
118 close_result = Some(c.result);
119 } else {
120 if let Some(ref op) = c.op {
122 if let Some(buf_idx) = op.buf_index() {
123 self.manager.release_buffer(buf_idx);
124 }
125 }
126 }
127 }
128
129 if close_result.is_none() {
130 let _ = self.manager.submit_and_wait(1);
132 }
133 }
134
135 let result = close_result.unwrap_or(0);
136 if result < 0 {
137 return Err(StorageIoError::Io(std::io::Error::from_raw_os_error(
138 -result,
139 )));
140 }
141
142 self.fd_table[fd.0 as usize] = None;
143 self.raw_to_io.remove(&raw);
144 Ok(())
145 }
146
147 fn submit_write(&mut self, fd: IoFd, data: &[u8], offset: u64) -> Result<u64, StorageIoError> {
148 let raw = self.raw_fd(fd)?;
149
150 let (buf_idx, buf) = self
152 .manager
153 .acquire_buffer()
154 .map_err(|_| StorageIoError::BufferExhausted)?;
155
156 if data.len() > buf.len() {
159 self.manager.release_buffer(buf_idx);
160 return Err(StorageIoError::Io(std::io::Error::new(
161 std::io::ErrorKind::InvalidInput,
162 "data exceeds registered buffer size",
163 )));
164 }
165 buf[..data.len()].copy_from_slice(data);
166
167 #[allow(clippy::cast_possible_truncation)]
169 let token = match self
170 .manager
171 .submit_write(raw, buf_idx, offset, data.len() as u32)
172 {
173 Ok(t) => t,
174 Err(e) => {
175 self.manager.release_buffer(buf_idx);
176 return Err(match e {
177 IoUringError::SubmissionQueueFull => StorageIoError::QueueFull,
178 IoUringError::RingClosed => StorageIoError::Closed,
179 other => StorageIoError::Io(std::io::Error::other(other.to_string())),
180 });
181 }
182 };
183
184 Ok(token)
185 }
186
187 fn submit_append(&mut self, fd: IoFd, data: &[u8]) -> Result<u64, StorageIoError> {
188 self.submit_write(fd, data, u64::MAX)
192 }
193
194 fn submit_datasync(&mut self, fd: IoFd) -> Result<u64, StorageIoError> {
195 let raw = self.raw_fd(fd)?;
196
197 let token = self
198 .manager
199 .submit_sync(raw, true) .map_err(|e| match e {
201 IoUringError::SubmissionQueueFull => StorageIoError::QueueFull,
202 IoUringError::RingClosed => StorageIoError::Closed,
203 other => StorageIoError::Io(std::io::Error::other(other.to_string())),
204 })?;
205
206 Ok(token)
207 }
208
209 fn flush(&mut self) -> Result<usize, StorageIoError> {
210 self.manager.submit().map_err(|e| match e {
213 IoUringError::RingClosed => StorageIoError::Closed,
214 other => StorageIoError::Io(std::io::Error::other(other.to_string())),
215 })
216 }
217
218 fn poll_completions(&mut self, out: &mut Vec<IoCompletion>) {
219 self.completion_scratch.clear();
222 self.manager
223 .poll_completions_into(&mut self.completion_scratch);
224 for c in self.completion_scratch.drain(..) {
225 if let Some(ref op) = c.op {
228 if let Some(buf_idx) = op.buf_index() {
229 self.manager.release_buffer(buf_idx);
230 }
231 }
232 out.push(IoCompletion {
233 token: c.user_data,
234 result: c.result,
235 });
236 }
237 }
238
239 fn pending_count(&self) -> usize {
240 self.manager.pending_count()
241 }
242}
243
244impl Drop for UringStorageIo {
245 fn drop(&mut self) {
246 self.manager.close();
248
249 for slot in &mut self.fd_table {
251 if let Some(raw) = slot.take() {
252 drop(unsafe { File::from_raw_fd(raw) });
254 }
255 }
256 self.raw_to_io.clear();
257 }
258}
259
260impl std::fmt::Debug for UringStorageIo {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 let registered = self.fd_table.iter().filter(|e| e.is_some()).count();
263 f.debug_struct("UringStorageIo")
264 .field("core_id", &self.manager.core_id())
265 .field("uses_sqpoll", &self.manager.uses_sqpoll())
266 .field("registered_fds", ®istered)
267 .field("reverse_map_len", &self.raw_to_io.len())
268 .field(
269 "completion_scratch_cap",
270 &self.completion_scratch.capacity(),
271 )
272 .field("pending", &self.manager.pending_count())
273 .finish()
274 }
275}