1use std::collections::VecDeque;
17use std::fmt;
18use std::fs::File;
19use std::io::Write;
20
21use super::{IoCompletion, IoFd, StorageIo, StorageIoError};
22
23const MAX_FDS: usize = 64;
25
26struct FdEntry {
28 file: File,
30 append_offset: u64,
32}
33
34pub struct SyncStorageIo {
42 fds: Vec<Option<FdEntry>>,
44 completions: VecDeque<IoCompletion>,
46 next_token: u64,
48 pending: usize,
50}
51
52impl SyncStorageIo {
53 #[must_use]
55 pub fn new() -> Self {
56 let mut fds = Vec::with_capacity(MAX_FDS);
57 fds.resize_with(MAX_FDS, || None);
58 Self {
59 fds,
60 completions: VecDeque::with_capacity(256),
61 next_token: 0,
62 pending: 0,
63 }
64 }
65
66 fn next_token(&mut self) -> u64 {
68 let t = self.next_token;
69 self.next_token = self.next_token.wrapping_add(1);
70 t
71 }
72
73 fn find_free_slot(&self) -> Option<u32> {
75 #[allow(clippy::cast_possible_truncation)]
76 self.fds.iter().position(Option::is_none).map(|i| i as u32)
77 }
78
79 fn get_entry(&mut self, fd: IoFd) -> Result<&mut FdEntry, StorageIoError> {
81 self.fds
82 .get_mut(fd.0 as usize)
83 .and_then(|slot| slot.as_mut())
84 .ok_or(StorageIoError::BadFd(fd))
85 }
86
87 fn stage_completion(&mut self, token: u64, result: i32) {
89 debug_assert!(
90 self.completions.len() < self.completions.capacity(),
91 "completion queue at capacity {} — poll_completions not draining fast enough",
92 self.completions.capacity()
93 );
94 self.completions.push_back(IoCompletion { token, result });
95 self.pending += 1;
96 }
97}
98
99impl Default for SyncStorageIo {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl StorageIo for SyncStorageIo {
106 fn register_fd(&mut self, file: File) -> Result<IoFd, StorageIoError> {
107 let slot = self.find_free_slot().ok_or(StorageIoError::QueueFull)?;
108
109 let append_offset = file.metadata().map(|m| m.len()).unwrap_or(0);
111
112 self.fds[slot as usize] = Some(FdEntry {
113 file,
114 append_offset,
115 });
116 Ok(IoFd(slot))
117 }
118
119 fn deregister_fd(&mut self, fd: IoFd) -> Result<(), StorageIoError> {
120 let slot = self
121 .fds
122 .get_mut(fd.0 as usize)
123 .ok_or(StorageIoError::BadFd(fd))?;
124 if slot.is_none() {
125 return Err(StorageIoError::BadFd(fd));
126 }
127 *slot = None;
129 Ok(())
130 }
131
132 fn submit_write(&mut self, fd: IoFd, data: &[u8], offset: u64) -> Result<u64, StorageIoError> {
133 let token = self.next_token();
134 let entry = self.get_entry(fd)?;
135
136 #[cfg(unix)]
138 {
139 use std::os::unix::fs::FileExt;
140 match entry.file.write_at(data, offset) {
141 Ok(n) => {
142 debug_assert!(
143 i32::try_from(n).is_ok(),
144 "write returned {n} bytes > i32::MAX"
145 );
146 let clamped = i32::try_from(n).unwrap_or(i32::MAX);
147 self.stage_completion(token, clamped);
148 }
149 Err(e) => {
150 let errno = e.raw_os_error().unwrap_or(-1);
151 self.stage_completion(token, -errno.abs());
152 }
153 }
154 }
155
156 #[cfg(windows)]
157 {
158 use std::os::windows::fs::FileExt;
159 match entry.file.seek_write(data, offset) {
160 Ok(n) => {
161 debug_assert!(
162 i32::try_from(n).is_ok(),
163 "write returned {n} bytes > i32::MAX"
164 );
165 let clamped = i32::try_from(n).unwrap_or(i32::MAX);
166 self.stage_completion(token, clamped);
167 }
168 Err(e) => {
169 let code = e.raw_os_error().unwrap_or(-1);
170 self.stage_completion(token, -code.abs());
171 }
172 }
173 }
174
175 Ok(token)
176 }
177
178 fn submit_append(&mut self, fd: IoFd, data: &[u8]) -> Result<u64, StorageIoError> {
179 let token = self.next_token();
180 let entry = self.get_entry(fd)?;
181
182 match entry.file.write_all(data) {
186 Ok(()) => {
187 debug_assert!(
188 i32::try_from(data.len()).is_ok(),
189 "append of {} bytes > i32::MAX",
190 data.len()
191 );
192 let n = i32::try_from(data.len()).unwrap_or(i32::MAX);
193 entry.append_offset += data.len() as u64;
194 self.stage_completion(token, n);
195 }
196 Err(e) => {
197 let errno = e.raw_os_error().unwrap_or(-1);
198 self.stage_completion(token, -errno.abs());
199 }
200 }
201
202 Ok(token)
203 }
204
205 fn submit_datasync(&mut self, fd: IoFd) -> Result<u64, StorageIoError> {
206 let token = self.next_token();
207 let entry = self.get_entry(fd)?;
208
209 match entry.file.flush().and_then(|()| entry.file.sync_data()) {
211 Ok(()) => self.stage_completion(token, 0),
212 Err(e) => {
213 let errno = e.raw_os_error().unwrap_or(-1);
214 self.stage_completion(token, -errno.abs());
215 }
216 }
217
218 Ok(token)
219 }
220
221 fn flush(&mut self) -> Result<usize, StorageIoError> {
222 Ok(0)
224 }
225
226 fn poll_completions(&mut self, out: &mut Vec<IoCompletion>) {
227 while let Some(c) = self.completions.pop_front() {
228 out.push(c);
229 self.pending -= 1;
230 }
231 }
232
233 fn pending_count(&self) -> usize {
234 self.pending
235 }
236}
237
238impl fmt::Debug for SyncStorageIo {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 let registered = self.fds.iter().filter(|e| e.is_some()).count();
241 f.debug_struct("SyncStorageIo")
242 .field("registered_fds", ®istered)
243 .field("pending_completions", &self.completions.len())
244 .field("pending", &self.pending)
245 .field("next_token", &self.next_token)
246 .finish()
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use std::io::Read;
254 use tempfile::NamedTempFile;
255
256 #[test]
257 fn test_register_and_write() {
258 let mut backend = SyncStorageIo::new();
259 let tmp = NamedTempFile::new().unwrap();
260 let path = tmp.path().to_path_buf();
261
262 let file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
264 let fd = backend.register_fd(file).unwrap();
265
266 let token = backend.submit_write(fd, b"hello", 0).unwrap();
268
269 let mut completions = Vec::new();
271 backend.poll_completions(&mut completions);
272 assert_eq!(completions.len(), 1);
273 assert_eq!(completions[0].token, token);
274 assert_eq!(completions[0].result, 5); let mut contents = String::new();
278 std::fs::File::open(&path)
279 .unwrap()
280 .read_to_string(&mut contents)
281 .unwrap();
282 assert_eq!(contents, "hello");
283
284 backend.deregister_fd(fd).unwrap();
286 }
287
288 #[test]
289 fn test_append() {
290 let mut backend = SyncStorageIo::new();
291 let tmp = NamedTempFile::new().unwrap();
292 let path = tmp.path().to_path_buf();
293
294 let file = std::fs::OpenOptions::new()
295 .append(true)
296 .open(&path)
297 .unwrap();
298 let fd = backend.register_fd(file).unwrap();
299
300 backend.submit_append(fd, b"hello").unwrap();
301 backend.submit_append(fd, b" world").unwrap();
302
303 let mut completions = Vec::new();
304 backend.poll_completions(&mut completions);
305 assert_eq!(completions.len(), 2);
306 assert_eq!(completions[0].result, 5);
307 assert_eq!(completions[1].result, 6);
308
309 let contents = std::fs::read_to_string(&path).unwrap();
310 assert_eq!(contents, "hello world");
311
312 backend.deregister_fd(fd).unwrap();
313 }
314
315 #[test]
316 fn test_datasync() {
317 let mut backend = SyncStorageIo::new();
318 let tmp = NamedTempFile::new().unwrap();
319 let path = tmp.path().to_path_buf();
320
321 let file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
322 let fd = backend.register_fd(file).unwrap();
323
324 let token = backend.submit_datasync(fd).unwrap();
325
326 let mut completions = Vec::new();
327 backend.poll_completions(&mut completions);
328 assert_eq!(completions.len(), 1);
329 assert_eq!(completions[0].token, token);
330 assert_eq!(completions[0].result, 0); backend.deregister_fd(fd).unwrap();
333 }
334
335 #[test]
336 fn test_bad_fd() {
337 let mut backend = SyncStorageIo::new();
338 let bad_fd = IoFd(99);
339 assert!(backend.submit_write(bad_fd, b"x", 0).is_err());
340 assert!(backend.submit_datasync(bad_fd).is_err());
341 assert!(backend.deregister_fd(bad_fd).is_err());
342 }
343
344 #[test]
345 fn test_pending_count() {
346 let mut backend = SyncStorageIo::new();
347 assert_eq!(backend.pending_count(), 0);
348
349 let tmp = NamedTempFile::new().unwrap();
350 let file = std::fs::OpenOptions::new()
351 .write(true)
352 .open(tmp.path())
353 .unwrap();
354 let fd = backend.register_fd(file).unwrap();
355
356 backend.submit_write(fd, b"x", 0).unwrap();
357 assert_eq!(backend.pending_count(), 1);
358
359 backend.submit_write(fd, b"y", 1).unwrap();
360 assert_eq!(backend.pending_count(), 2);
361
362 let mut completions = Vec::new();
363 backend.poll_completions(&mut completions);
364 assert_eq!(backend.pending_count(), 0);
365 assert_eq!(completions.len(), 2);
366 }
367
368 #[test]
369 fn test_flush_is_noop() {
370 let mut backend = SyncStorageIo::new();
371 assert_eq!(backend.flush().unwrap(), 0);
372 }
373
374 #[test]
375 fn test_multiple_fds() {
376 let mut backend = SyncStorageIo::new();
377
378 let tmp1 = NamedTempFile::new().unwrap();
379 let tmp2 = NamedTempFile::new().unwrap();
380
381 let f1 = std::fs::OpenOptions::new()
382 .write(true)
383 .open(tmp1.path())
384 .unwrap();
385 let f2 = std::fs::OpenOptions::new()
386 .write(true)
387 .open(tmp2.path())
388 .unwrap();
389
390 let fd1 = backend.register_fd(f1).unwrap();
391 let fd2 = backend.register_fd(f2).unwrap();
392 assert_ne!(fd1, fd2);
393
394 backend.submit_write(fd1, b"file1", 0).unwrap();
395 backend.submit_write(fd2, b"file2", 0).unwrap();
396
397 let mut completions = Vec::new();
398 backend.poll_completions(&mut completions);
399 assert_eq!(completions.len(), 2);
400
401 assert_eq!(std::fs::read_to_string(tmp1.path()).unwrap(), "file1");
402 assert_eq!(std::fs::read_to_string(tmp2.path()).unwrap(), "file2");
403 }
404}