1#![deny(clippy::disallowed_types)]
6
7use io_uring::opcode;
8use io_uring::types::{self, Fd};
9use rustc_hash::FxHashMap;
10use std::io;
11use std::os::fd::RawFd;
12use std::time::{Duration, Instant};
13
14use super::buffer_pool::{BufferPoolStats, RegisteredBufferPool};
15use super::config::{IoUringConfig, RingMode};
16use super::error::IoUringError;
17use super::ring::IoUringRing;
18
19#[derive(Debug, Clone)]
21pub enum PendingOp {
22 Read {
24 buf_index: u16,
26 len: u32,
28 submitted_at: Instant,
30 },
31 Write {
33 buf_index: u16,
35 len: u32,
37 submitted_at: Instant,
39 },
40 Sync {
42 submitted_at: Instant,
44 },
45 Close {
47 fd: RawFd,
49 submitted_at: Instant,
51 },
52 Custom {
54 op_type: u32,
56 data: u64,
58 submitted_at: Instant,
60 },
61}
62
63impl PendingOp {
64 #[must_use]
66 pub const fn buf_index(&self) -> Option<u16> {
67 match self {
68 Self::Read { buf_index, .. } | Self::Write { buf_index, .. } => Some(*buf_index),
69 _ => None,
70 }
71 }
72
73 #[must_use]
75 pub const fn submitted_at(&self) -> Instant {
76 match self {
77 Self::Read { submitted_at, .. }
78 | Self::Write { submitted_at, .. }
79 | Self::Sync { submitted_at }
80 | Self::Close { submitted_at, .. }
81 | Self::Custom { submitted_at, .. } => *submitted_at,
82 }
83 }
84}
85
86#[derive(Debug)]
88pub struct Completion {
89 pub user_data: u64,
91 pub result: i32,
93 pub flags: u32,
95 pub op: Option<PendingOp>,
97 pub latency: Option<Duration>,
99}
100
101impl Completion {
102 #[must_use]
104 pub const fn is_success(&self) -> bool {
105 self.result >= 0
106 }
107
108 #[must_use]
110 pub fn error(&self) -> Option<io::Error> {
111 if self.result < 0 {
112 Some(io::Error::from_raw_os_error(-self.result))
113 } else {
114 None
115 }
116 }
117
118 #[must_use]
120 #[allow(clippy::cast_sign_loss)]
121 pub const fn bytes_transferred(&self) -> Option<usize> {
122 if self.result >= 0 {
123 Some(self.result as usize)
124 } else {
125 None
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub enum CompletionKind {
133 Read,
135 Write,
137 Sync,
139 Close,
141 Custom,
143 Unknown,
145}
146
147impl Completion {
148 #[must_use]
150 pub fn kind(&self) -> CompletionKind {
151 match &self.op {
152 Some(PendingOp::Read { .. }) => CompletionKind::Read,
153 Some(PendingOp::Write { .. }) => CompletionKind::Write,
154 Some(PendingOp::Sync { .. }) => CompletionKind::Sync,
155 Some(PendingOp::Close { .. }) => CompletionKind::Close,
156 Some(PendingOp::Custom { .. }) => CompletionKind::Custom,
157 None => CompletionKind::Unknown,
158 }
159 }
160}
161
162pub struct CoreRingManager {
167 core_id: usize,
169 main_ring: IoUringRing,
171 buffer_pool: Option<RegisteredBufferPool>,
173 iopoll_ring: Option<IoUringRing>,
175 pending: FxHashMap<u64, PendingOp>,
177 next_id: u64,
179 metrics: RingMetrics,
181 closed: bool,
183 cqe_scratch: Vec<(u64, i32, u32)>,
186}
187
188impl CoreRingManager {
189 pub fn new(core_id: usize, config: &IoUringConfig) -> Result<Self, IoUringError> {
200 let mut main_ring = IoUringRing::new(config)?;
202
203 let buffer_pool = if config.buffer_count > 0 {
209 Some(RegisteredBufferPool::new(
210 main_ring.ring_mut(),
211 config.buffer_size,
212 config.buffer_count,
213 )?)
214 } else {
215 None
216 };
217
218 let iopoll_ring = if config.mode.uses_iopoll() {
220 let iopoll_config = IoUringConfig {
221 mode: RingMode::IoPoll,
222 ..config.clone()
223 };
224 let ring = IoUringRing::new(&iopoll_config)?;
225 Some(ring)
226 } else {
227 None
228 };
229
230 Ok(Self {
231 core_id,
232 main_ring,
233 buffer_pool,
234 iopoll_ring,
235 pending: FxHashMap::default(),
236 next_id: 0,
237 metrics: RingMetrics::default(),
238 closed: false,
239 cqe_scratch: Vec::with_capacity(config.ring_entries as usize),
240 })
241 }
242
243 #[must_use]
245 pub const fn core_id(&self) -> usize {
246 self.core_id
247 }
248
249 #[must_use]
251 pub const fn is_closed(&self) -> bool {
252 self.closed
253 }
254
255 #[must_use]
257 pub const fn mode(&self) -> RingMode {
258 self.main_ring.mode()
259 }
260
261 #[must_use]
263 pub const fn uses_sqpoll(&self) -> bool {
264 self.main_ring.uses_sqpoll()
265 }
266
267 #[must_use]
269 pub fn has_iopoll_ring(&self) -> bool {
270 self.iopoll_ring.is_some()
271 }
272
273 pub fn acquire_buffer(&mut self) -> Result<(u16, &mut [u8]), IoUringError> {
279 self.buffer_pool
280 .as_mut()
281 .ok_or(IoUringError::InvalidConfig(
282 "No buffer pool configured".to_string(),
283 ))?
284 .acquire()
285 }
286
287 pub fn release_buffer(&mut self, buf_index: u16) {
289 if let Some(pool) = &mut self.buffer_pool {
290 pool.release(buf_index);
291 }
292 }
293
294 pub fn submit_read(
300 &mut self,
301 fd: RawFd,
302 buf_index: u16,
303 offset: u64,
304 len: u32,
305 ) -> Result<u64, IoUringError> {
306 if self.closed {
307 return Err(IoUringError::RingClosed);
308 }
309
310 let pool = self
311 .buffer_pool
312 .as_mut()
313 .ok_or(IoUringError::InvalidConfig(
314 "No buffer pool configured".to_string(),
315 ))?;
316
317 let user_data =
318 pool.submit_read_fixed(self.main_ring.ring_mut(), fd, buf_index, offset, len)?;
319
320 self.pending.insert(
321 user_data,
322 PendingOp::Read {
323 buf_index,
324 len,
325 submitted_at: Instant::now(),
326 },
327 );
328
329 self.metrics.reads_submitted += 1;
330 Ok(user_data)
331 }
332
333 pub fn submit_write(
339 &mut self,
340 fd: RawFd,
341 buf_index: u16,
342 offset: u64,
343 len: u32,
344 ) -> Result<u64, IoUringError> {
345 if self.closed {
346 return Err(IoUringError::RingClosed);
347 }
348
349 let pool = self
350 .buffer_pool
351 .as_mut()
352 .ok_or(IoUringError::InvalidConfig(
353 "No buffer pool configured".to_string(),
354 ))?;
355
356 let user_data =
357 pool.submit_write_fixed(self.main_ring.ring_mut(), fd, buf_index, offset, len)?;
358
359 self.pending.insert(
360 user_data,
361 PendingOp::Write {
362 buf_index,
363 len,
364 submitted_at: Instant::now(),
365 },
366 );
367
368 self.metrics.writes_submitted += 1;
369 Ok(user_data)
370 }
371
372 pub fn submit_sync(&mut self, fd: RawFd, datasync: bool) -> Result<u64, IoUringError> {
383 if self.closed {
384 return Err(IoUringError::RingClosed);
385 }
386
387 let user_data = self.next_user_data();
388
389 let entry = if datasync {
390 opcode::Fsync::new(Fd(fd))
391 .flags(types::FsyncFlags::DATASYNC)
392 .build()
393 .user_data(user_data)
394 } else {
395 opcode::Fsync::new(Fd(fd)).build().user_data(user_data)
396 };
397
398 unsafe {
400 self.main_ring
401 .ring_mut()
402 .submission()
403 .push(&entry)
404 .map_err(|_| IoUringError::SubmissionQueueFull)?;
405 }
406
407 self.pending.insert(
408 user_data,
409 PendingOp::Sync {
410 submitted_at: Instant::now(),
411 },
412 );
413
414 self.metrics.syncs_submitted += 1;
415 Ok(user_data)
416 }
417
418 pub fn submit_close(&mut self, fd: RawFd) -> Result<u64, IoUringError> {
424 if self.closed {
425 return Err(IoUringError::RingClosed);
426 }
427
428 let user_data = self.next_user_data();
429
430 let entry = opcode::Close::new(Fd(fd)).build().user_data(user_data);
431
432 unsafe {
434 self.main_ring
435 .ring_mut()
436 .submission()
437 .push(&entry)
438 .map_err(|_| IoUringError::SubmissionQueueFull)?;
439 }
440
441 self.pending.insert(
442 user_data,
443 PendingOp::Close {
444 fd,
445 submitted_at: Instant::now(),
446 },
447 );
448
449 Ok(user_data)
450 }
451
452 pub fn submit(&mut self) -> Result<usize, IoUringError> {
460 if self.closed {
461 return Err(IoUringError::RingClosed);
462 }
463
464 let submitted = self
465 .main_ring
466 .ring_mut()
467 .submit()
468 .map_err(IoUringError::SubmissionFailed)?;
469
470 self.metrics.submissions += 1;
471 Ok(submitted)
472 }
473
474 pub fn submit_and_wait(&mut self, want: usize) -> Result<usize, IoUringError> {
480 if self.closed {
481 return Err(IoUringError::RingClosed);
482 }
483
484 let submitted = self
485 .main_ring
486 .ring_mut()
487 .submit_and_wait(want)
488 .map_err(IoUringError::SubmissionFailed)?;
489
490 self.metrics.submissions += 1;
491 Ok(submitted)
492 }
493
494 #[must_use]
499 pub fn poll_completions(&mut self) -> Vec<Completion> {
500 let mut completions = Vec::new();
501 self.poll_completions_into(&mut completions);
502 completions
503 }
504
505 pub fn poll_completions_into(&mut self, out: &mut Vec<Completion>) {
511 if self.closed {
512 return;
513 }
514
515 self.poll_ring_completions_into(out, false);
517
518 if self.iopoll_ring.is_some() {
520 self.poll_ring_completions_into(out, true);
521 }
522 }
523
524 fn poll_ring_completions_into(&mut self, completions: &mut Vec<Completion>, iopoll: bool) {
527 self.cqe_scratch.clear();
531 {
532 let ring = if iopoll {
533 match &mut self.iopoll_ring {
534 Some(r) => r.ring_mut(),
535 None => return,
536 }
537 } else {
538 self.main_ring.ring_mut()
539 };
540
541 let cq = ring.completion();
542 for cqe in cq {
543 self.cqe_scratch
544 .push((cqe.user_data(), cqe.result(), cqe.flags()));
545 }
546 }
547
548 for i in 0..self.cqe_scratch.len() {
552 let (user_data, result, flags) = self.cqe_scratch[i];
553 let completion = self.process_completion_data(user_data, result, flags);
554 completions.push(completion);
555 }
556 self.cqe_scratch.clear();
557 }
558
559 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
561 fn process_completion_data(&mut self, user_data: u64, result: i32, flags: u32) -> Completion {
562 let op = self.pending.remove(&user_data);
563 let latency = op.as_ref().map(|o| o.submitted_at().elapsed());
564
565 if let Some(ref op) = op {
570 if let Some(buf_idx) = op.buf_index() {
571 if let Some(ref mut pool) = self.buffer_pool {
572 pool.complete_in_flight(buf_idx);
573 }
574 }
575 }
576
577 if result >= 0 {
579 self.metrics.completions_success += 1;
580 if let Some(ref op) = op {
581 match op {
582 PendingOp::Read { .. } => {
583 self.metrics.bytes_read += result as u64;
584 }
585 PendingOp::Write { .. } => {
586 self.metrics.bytes_written += result as u64;
587 }
588 _ => {}
589 }
590 }
591 } else {
592 self.metrics.completions_failed += 1;
593 }
594
595 if let Some(lat) = latency {
596 self.metrics.total_latency_ns += lat.as_nanos() as u64;
597 self.metrics.latency_samples += 1;
598 }
599
600 Completion {
601 user_data,
602 result,
603 flags,
604 op,
605 latency,
606 }
607 }
608
609 pub fn wait_for(&mut self, user_data: u64) -> Result<Completion, IoUringError> {
615 loop {
617 let completions = self.poll_completions();
619
620 for completion in completions {
621 if completion.user_data == user_data {
622 return Ok(completion);
623 }
624 }
625
626 if !self.pending.contains_key(&user_data) {
628 return Err(IoUringError::PendingNotFound(user_data));
629 }
630
631 self.submit_and_wait(1)?;
633 }
634 }
635
636 #[must_use]
638 pub fn pending_count(&self) -> usize {
639 self.pending.len()
640 }
641
642 #[must_use]
644 pub fn buffer_pool_stats(&self) -> Option<BufferPoolStats> {
645 self.buffer_pool.as_ref().map(RegisteredBufferPool::stats)
646 }
647
648 #[must_use]
650 pub const fn metrics(&self) -> &RingMetrics {
651 &self.metrics
652 }
653
654 pub fn close(&mut self) {
659 if self.closed {
660 return;
661 }
662 self.closed = true;
663
664 if !self.pending.is_empty() {
667 let _ = self.main_ring.ring_mut().submit_and_wait(0);
669 for _ in self.main_ring.ring_mut().completion() {}
670 if let Some(ref mut iopoll) = self.iopoll_ring {
671 let _ = iopoll.ring_mut().submit_and_wait(0);
672 for _ in iopoll.ring_mut().completion() {}
673 }
674 }
675
676 self.pending.clear();
677 }
678
679 fn next_user_data(&mut self) -> u64 {
681 let id = self.next_id;
682 self.next_id += 1;
683 id
684 }
685}
686
687impl Drop for CoreRingManager {
688 fn drop(&mut self) {
689 self.close();
690 }
691}
692
693impl std::fmt::Debug for CoreRingManager {
694 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695 f.debug_struct("CoreRingManager")
696 .field("core_id", &self.core_id)
697 .field("mode", &self.main_ring.mode())
698 .field("pending_count", &self.pending.len())
699 .field("closed", &self.closed)
700 .field("metrics", &self.metrics)
701 .finish_non_exhaustive()
702 }
703}
704
705#[derive(Debug, Default, Clone)]
707pub struct RingMetrics {
708 pub reads_submitted: u64,
710 pub writes_submitted: u64,
712 pub syncs_submitted: u64,
714 pub submissions: u64,
716 pub completions_success: u64,
718 pub completions_failed: u64,
720 pub bytes_read: u64,
722 pub bytes_written: u64,
724 pub total_latency_ns: u64,
726 pub latency_samples: u64,
728}
729
730impl RingMetrics {
731 #[must_use]
733 pub fn avg_latency_ns(&self) -> u64 {
734 if self.latency_samples > 0 {
735 self.total_latency_ns / self.latency_samples
736 } else {
737 0
738 }
739 }
740
741 #[must_use]
743 pub const fn total_ops(&self) -> u64 {
744 self.reads_submitted + self.writes_submitted + self.syncs_submitted
745 }
746
747 #[must_use]
749 pub const fn total_completions(&self) -> u64 {
750 self.completions_success + self.completions_failed
751 }
752
753 #[must_use]
755 #[allow(clippy::cast_precision_loss)]
756 pub fn success_rate(&self) -> f64 {
757 let total = self.total_completions();
758 if total > 0 {
759 self.completions_success as f64 / total as f64
760 } else {
761 1.0
762 }
763 }
764}
765
766#[cfg(test)]
767#[allow(
768 clippy::manual_let_else,
769 clippy::single_match_else,
770 clippy::items_after_statements
771)]
772mod tests {
773 use super::*;
774 use std::fs::OpenOptions;
775 use std::io::Write;
776 use tempfile::tempdir;
777
778 fn make_config() -> IoUringConfig {
779 IoUringConfig {
780 ring_entries: 32,
781 mode: RingMode::Standard,
782 buffer_size: 4096,
783 buffer_count: 8,
784 ..Default::default()
785 }
786 }
787
788 #[test]
789 fn test_manager_creation() {
790 let config = make_config();
791 let manager = CoreRingManager::new(0, &config);
792
793 match manager {
794 Ok(m) => {
795 assert_eq!(m.core_id(), 0);
796 assert!(!m.is_closed());
797 assert_eq!(m.mode(), RingMode::Standard);
798 }
799 Err(e) => {
800 tracing::error!("io_uring not available: {e}");
801 }
802 }
803 }
804
805 #[test]
806 fn test_buffer_acquire_release() {
807 let config = make_config();
808 let mut manager = match CoreRingManager::new(0, &config) {
809 Ok(m) => m,
810 Err(_) => return,
811 };
812
813 let result = manager.acquire_buffer();
815 match result {
816 Ok((idx, buf)) => {
817 assert_eq!(buf.len(), 4096);
818 manager.release_buffer(idx);
819 }
820 Err(e) => {
821 tracing::error!("Buffer acquire failed: {e}");
822 }
823 }
824 }
825
826 #[test]
827 fn test_metrics() {
828 let metrics = RingMetrics {
829 reads_submitted: 10,
830 writes_submitted: 20,
831 syncs_submitted: 5,
832 submissions: 35,
833 completions_success: 30,
834 completions_failed: 5,
835 bytes_read: 40960,
836 bytes_written: 81920,
837 total_latency_ns: 100_000,
838 latency_samples: 10,
839 };
840
841 assert_eq!(metrics.total_ops(), 35);
842 assert_eq!(metrics.total_completions(), 35);
843 assert_eq!(metrics.avg_latency_ns(), 10000);
844 assert!((metrics.success_rate() - 0.857).abs() < 0.01);
845 }
846
847 #[test]
848 fn test_completion_kind() {
849 let completion = Completion {
850 user_data: 1,
851 result: 100,
852 flags: 0,
853 op: Some(PendingOp::Read {
854 buf_index: 0,
855 len: 100,
856 submitted_at: Instant::now(),
857 }),
858 latency: None,
859 };
860
861 assert_eq!(completion.kind(), CompletionKind::Read);
862 assert!(completion.is_success());
863 assert_eq!(completion.bytes_transferred(), Some(100));
864 }
865
866 #[test]
867 fn test_completion_error() {
868 let completion = Completion {
869 user_data: 1,
870 result: -5, flags: 0,
872 op: None,
873 latency: None,
874 };
875
876 assert!(!completion.is_success());
877 assert!(completion.error().is_some());
878 assert_eq!(completion.kind(), CompletionKind::Unknown);
879 }
880
881 #[test]
882 fn test_write_and_poll() {
883 let config = make_config();
884 let mut manager = match CoreRingManager::new(0, &config) {
885 Ok(m) => m,
886 Err(_) => return,
887 };
888
889 let dir = tempdir().unwrap();
891 let path = dir.path().join("test.dat");
892 let mut file = OpenOptions::new()
893 .read(true)
894 .write(true)
895 .create(true)
896 .truncate(true)
897 .open(&path)
898 .unwrap();
899 file.write_all(&[0u8; 4096]).unwrap();
900 file.flush().unwrap();
901 drop(file);
902
903 let file = OpenOptions::new()
904 .read(true)
905 .write(true)
906 .open(&path)
907 .unwrap();
908 use std::os::unix::io::AsRawFd;
909 let fd = file.as_raw_fd();
910
911 let (idx, buf) = match manager.acquire_buffer() {
913 Ok(x) => x,
914 Err(_) => return,
915 };
916 buf[..5].copy_from_slice(b"hello");
917
918 let user_data = match manager.submit_write(fd, idx, 0, 5) {
920 Ok(ud) => ud,
921 Err(_) => return,
922 };
923
924 let _ = manager.submit();
926
927 match manager.wait_for(user_data) {
929 Ok(completion) => {
930 if !completion.is_success() {
933 tracing::error!("Write completion failed (possibly unsupported environment)");
934 return;
935 }
936 assert_eq!(completion.kind(), CompletionKind::Write);
937 }
938 Err(e) => {
939 tracing::error!("Wait failed: {e}");
940 return;
941 }
942 }
943
944 manager.release_buffer(idx);
945 }
946
947 #[test]
948 fn test_close() {
949 let config = make_config();
950 let mut manager = match CoreRingManager::new(0, &config) {
951 Ok(m) => m,
952 Err(_) => return,
953 };
954
955 assert!(!manager.is_closed());
956 manager.close();
957 assert!(manager.is_closed());
958
959 assert!(matches!(
961 manager.submit_sync(0, true),
962 Err(IoUringError::RingClosed)
963 ));
964 }
965}