Skip to main content

laminar_core/io_uring/
manager.rs

1//! Per-core `io_uring` manager for thread-per-core architecture.
2//!
3//! Provides a unified interface for managing `io_uring` operations on a single core,
4//! including ring management, buffer pools, and pending operation tracking.
5#![deny(clippy::disallowed_types)]
6
7use io_uring::opcode;
8use io_uring::types::{self, Fd};
9use rustc_hash::FxHashMap;
10use std::io;
11use std::os::fd::RawFd;
12use std::time::{Duration, Instant};
13
14use super::buffer_pool::{BufferPoolStats, RegisteredBufferPool};
15use super::config::{IoUringConfig, RingMode};
16use super::error::IoUringError;
17use super::ring::IoUringRing;
18
19/// Pending operation types for tracking.
20#[derive(Debug, Clone)]
21pub enum PendingOp {
22    /// Read operation.
23    Read {
24        /// Buffer index.
25        buf_index: u16,
26        /// Expected length.
27        len: u32,
28        /// Submission timestamp.
29        submitted_at: Instant,
30    },
31    /// Write operation.
32    Write {
33        /// Buffer index.
34        buf_index: u16,
35        /// Expected length.
36        len: u32,
37        /// Submission timestamp.
38        submitted_at: Instant,
39    },
40    /// Fsync/fdatasync operation.
41    Sync {
42        /// Submission timestamp.
43        submitted_at: Instant,
44    },
45    /// Close operation.
46    Close {
47        /// File descriptor being closed.
48        fd: RawFd,
49        /// Submission timestamp.
50        submitted_at: Instant,
51    },
52    /// Custom operation with opaque data.
53    Custom {
54        /// Operation type identifier.
55        op_type: u32,
56        /// Opaque user data.
57        data: u64,
58        /// Submission timestamp.
59        submitted_at: Instant,
60    },
61}
62
63impl PendingOp {
64    /// Get the buffer index if this is a read/write operation.
65    #[must_use]
66    pub const fn buf_index(&self) -> Option<u16> {
67        match self {
68            Self::Read { buf_index, .. } | Self::Write { buf_index, .. } => Some(*buf_index),
69            _ => None,
70        }
71    }
72
73    /// Get the submission timestamp.
74    #[must_use]
75    pub const fn submitted_at(&self) -> Instant {
76        match self {
77            Self::Read { submitted_at, .. }
78            | Self::Write { submitted_at, .. }
79            | Self::Sync { submitted_at }
80            | Self::Close { submitted_at, .. }
81            | Self::Custom { submitted_at, .. } => *submitted_at,
82        }
83    }
84}
85
86/// Completion result from an `io_uring` operation.
87#[derive(Debug)]
88pub struct Completion {
89    /// User data identifying the operation.
90    pub user_data: u64,
91    /// Result code (bytes transferred or negative errno).
92    pub result: i32,
93    /// Flags from the completion.
94    pub flags: u32,
95    /// The original pending operation, if tracked.
96    pub op: Option<PendingOp>,
97    /// Latency from submission to completion.
98    pub latency: Option<Duration>,
99}
100
101impl Completion {
102    /// Check if the operation succeeded.
103    #[must_use]
104    pub const fn is_success(&self) -> bool {
105        self.result >= 0
106    }
107
108    /// Get the error code if the operation failed.
109    #[must_use]
110    pub fn error(&self) -> Option<io::Error> {
111        if self.result < 0 {
112            Some(io::Error::from_raw_os_error(-self.result))
113        } else {
114            None
115        }
116    }
117
118    /// Get the number of bytes transferred (for read/write operations).
119    #[must_use]
120    #[allow(clippy::cast_sign_loss)]
121    pub const fn bytes_transferred(&self) -> Option<usize> {
122        if self.result >= 0 {
123            Some(self.result as usize)
124        } else {
125            None
126        }
127    }
128}
129
130/// Kind of completion for quick matching.
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub enum CompletionKind {
133    /// Read completed.
134    Read,
135    /// Write completed.
136    Write,
137    /// Sync completed.
138    Sync,
139    /// Close completed.
140    Close,
141    /// Custom operation completed.
142    Custom,
143    /// Unknown operation (not tracked).
144    Unknown,
145}
146
147impl Completion {
148    /// Get the kind of this completion.
149    #[must_use]
150    pub fn kind(&self) -> CompletionKind {
151        match &self.op {
152            Some(PendingOp::Read { .. }) => CompletionKind::Read,
153            Some(PendingOp::Write { .. }) => CompletionKind::Write,
154            Some(PendingOp::Sync { .. }) => CompletionKind::Sync,
155            Some(PendingOp::Close { .. }) => CompletionKind::Close,
156            Some(PendingOp::Custom { .. }) => CompletionKind::Custom,
157            None => CompletionKind::Unknown,
158        }
159    }
160}
161
162/// Per-core `io_uring` manager.
163///
164/// Manages a single core's `io_uring` instance, buffer pool, and pending operations.
165/// Designed for integration with the thread-per-core architecture.
166pub struct CoreRingManager {
167    /// Core ID.
168    core_id: usize,
169    /// Main ring for general I/O.
170    main_ring: IoUringRing,
171    /// Registered buffer pool.
172    buffer_pool: Option<RegisteredBufferPool>,
173    /// IOPOLL ring for storage (optional).
174    iopoll_ring: Option<IoUringRing>,
175    /// Pending operations tracking.
176    pending: FxHashMap<u64, PendingOp>,
177    /// Next `user_data` ID.
178    next_id: u64,
179    /// Metrics.
180    metrics: RingMetrics,
181    /// Whether the ring is closed.
182    closed: bool,
183    /// Pre-allocated scratch buffer for CQE data collection.
184    /// Avoids per-poll allocation in [`poll_completions_into`].
185    cqe_scratch: Vec<(u64, i32, u32)>,
186}
187
188impl CoreRingManager {
189    /// Create a new per-core ring manager.
190    ///
191    /// # Arguments
192    ///
193    /// * `core_id` - The core ID this manager is for
194    /// * `config` - `io_uring` configuration
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if ring creation fails.
199    pub fn new(core_id: usize, config: &IoUringConfig) -> Result<Self, IoUringError> {
200        // Create main ring
201        let mut main_ring = IoUringRing::new(config)?;
202
203        // Create buffer pool and register buffers on the actual main ring.
204        // SAFETY: The buffer pool's memory outlives the ring registration because
205        // the pool is stored alongside the ring in this struct, and Rust drops
206        // fields in declaration order (main_ring drops before buffer_pool would,
207        // but we clear pending ops in close() before either drops).
208        let buffer_pool = if config.buffer_count > 0 {
209            Some(RegisteredBufferPool::new(
210                main_ring.ring_mut(),
211                config.buffer_size,
212                config.buffer_count,
213            )?)
214        } else {
215            None
216        };
217
218        // Create IOPOLL ring if requested and mode allows.
219        let iopoll_ring = if config.mode.uses_iopoll() {
220            let iopoll_config = IoUringConfig {
221                mode: RingMode::IoPoll,
222                ..config.clone()
223            };
224            let ring = IoUringRing::new(&iopoll_config)?;
225            Some(ring)
226        } else {
227            None
228        };
229
230        Ok(Self {
231            core_id,
232            main_ring,
233            buffer_pool,
234            iopoll_ring,
235            pending: FxHashMap::default(),
236            next_id: 0,
237            metrics: RingMetrics::default(),
238            closed: false,
239            cqe_scratch: Vec::with_capacity(config.ring_entries as usize),
240        })
241    }
242
243    /// Get the core ID.
244    #[must_use]
245    pub const fn core_id(&self) -> usize {
246        self.core_id
247    }
248
249    /// Check if the ring is closed.
250    #[must_use]
251    pub const fn is_closed(&self) -> bool {
252        self.closed
253    }
254
255    /// Get the main ring mode.
256    #[must_use]
257    pub const fn mode(&self) -> RingMode {
258        self.main_ring.mode()
259    }
260
261    /// Check if SQPOLL is enabled.
262    #[must_use]
263    pub const fn uses_sqpoll(&self) -> bool {
264        self.main_ring.uses_sqpoll()
265    }
266
267    /// Check if IOPOLL is enabled.
268    #[must_use]
269    pub fn has_iopoll_ring(&self) -> bool {
270        self.iopoll_ring.is_some()
271    }
272
273    /// Acquire a buffer from the main pool.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if no buffers are available.
278    pub fn acquire_buffer(&mut self) -> Result<(u16, &mut [u8]), IoUringError> {
279        self.buffer_pool
280            .as_mut()
281            .ok_or(IoUringError::InvalidConfig(
282                "No buffer pool configured".to_string(),
283            ))?
284            .acquire()
285    }
286
287    /// Release a buffer back to the main pool.
288    pub fn release_buffer(&mut self, buf_index: u16) {
289        if let Some(pool) = &mut self.buffer_pool {
290            pool.release(buf_index);
291        }
292    }
293
294    /// Submit a read operation using a registered buffer.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if submission fails.
299    pub fn submit_read(
300        &mut self,
301        fd: RawFd,
302        buf_index: u16,
303        offset: u64,
304        len: u32,
305    ) -> Result<u64, IoUringError> {
306        if self.closed {
307            return Err(IoUringError::RingClosed);
308        }
309
310        let pool = self
311            .buffer_pool
312            .as_mut()
313            .ok_or(IoUringError::InvalidConfig(
314                "No buffer pool configured".to_string(),
315            ))?;
316
317        let user_data =
318            pool.submit_read_fixed(self.main_ring.ring_mut(), fd, buf_index, offset, len)?;
319
320        self.pending.insert(
321            user_data,
322            PendingOp::Read {
323                buf_index,
324                len,
325                submitted_at: Instant::now(),
326            },
327        );
328
329        self.metrics.reads_submitted += 1;
330        Ok(user_data)
331    }
332
333    /// Submit a write operation using a registered buffer.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if submission fails.
338    pub fn submit_write(
339        &mut self,
340        fd: RawFd,
341        buf_index: u16,
342        offset: u64,
343        len: u32,
344    ) -> Result<u64, IoUringError> {
345        if self.closed {
346            return Err(IoUringError::RingClosed);
347        }
348
349        let pool = self
350            .buffer_pool
351            .as_mut()
352            .ok_or(IoUringError::InvalidConfig(
353                "No buffer pool configured".to_string(),
354            ))?;
355
356        let user_data =
357            pool.submit_write_fixed(self.main_ring.ring_mut(), fd, buf_index, offset, len)?;
358
359        self.pending.insert(
360            user_data,
361            PendingOp::Write {
362                buf_index,
363                len,
364                submitted_at: Instant::now(),
365            },
366        );
367
368        self.metrics.writes_submitted += 1;
369        Ok(user_data)
370    }
371
372    /// Submit an fsync/fdatasync operation.
373    ///
374    /// # Arguments
375    ///
376    /// * `fd` - File descriptor to sync
377    /// * `datasync` - If true, use fdatasync (don't sync metadata)
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if submission fails.
382    pub fn submit_sync(&mut self, fd: RawFd, datasync: bool) -> Result<u64, IoUringError> {
383        if self.closed {
384            return Err(IoUringError::RingClosed);
385        }
386
387        let user_data = self.next_user_data();
388
389        let entry = if datasync {
390            opcode::Fsync::new(Fd(fd))
391                .flags(types::FsyncFlags::DATASYNC)
392                .build()
393                .user_data(user_data)
394        } else {
395            opcode::Fsync::new(Fd(fd)).build().user_data(user_data)
396        };
397
398        // SAFETY: We're submitting a valid SQE.
399        unsafe {
400            self.main_ring
401                .ring_mut()
402                .submission()
403                .push(&entry)
404                .map_err(|_| IoUringError::SubmissionQueueFull)?;
405        }
406
407        self.pending.insert(
408            user_data,
409            PendingOp::Sync {
410                submitted_at: Instant::now(),
411            },
412        );
413
414        self.metrics.syncs_submitted += 1;
415        Ok(user_data)
416    }
417
418    /// Submit a close operation.
419    ///
420    /// # Errors
421    ///
422    /// Returns an error if submission fails.
423    pub fn submit_close(&mut self, fd: RawFd) -> Result<u64, IoUringError> {
424        if self.closed {
425            return Err(IoUringError::RingClosed);
426        }
427
428        let user_data = self.next_user_data();
429
430        let entry = opcode::Close::new(Fd(fd)).build().user_data(user_data);
431
432        // SAFETY: We're submitting a valid SQE.
433        unsafe {
434            self.main_ring
435                .ring_mut()
436                .submission()
437                .push(&entry)
438                .map_err(|_| IoUringError::SubmissionQueueFull)?;
439        }
440
441        self.pending.insert(
442            user_data,
443            PendingOp::Close {
444                fd,
445                submitted_at: Instant::now(),
446            },
447        );
448
449        Ok(user_data)
450    }
451
452    /// Submit pending operations to the kernel.
453    ///
454    /// In SQPOLL mode, this is often a no-op since the kernel polls automatically.
455    ///
456    /// # Errors
457    ///
458    /// Returns an error if submission fails.
459    pub fn submit(&mut self) -> Result<usize, IoUringError> {
460        if self.closed {
461            return Err(IoUringError::RingClosed);
462        }
463
464        let submitted = self
465            .main_ring
466            .ring_mut()
467            .submit()
468            .map_err(IoUringError::SubmissionFailed)?;
469
470        self.metrics.submissions += 1;
471        Ok(submitted)
472    }
473
474    /// Submit and wait for at least one completion.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if submission or wait fails.
479    pub fn submit_and_wait(&mut self, want: usize) -> Result<usize, IoUringError> {
480        if self.closed {
481            return Err(IoUringError::RingClosed);
482        }
483
484        let submitted = self
485            .main_ring
486            .ring_mut()
487            .submit_and_wait(want)
488            .map_err(IoUringError::SubmissionFailed)?;
489
490        self.metrics.submissions += 1;
491        Ok(submitted)
492    }
493
494    /// Poll for completions without blocking.
495    ///
496    /// Returns all available completions from both rings.
497    /// **Allocates a Vec per call** — prefer [`Self::poll_completions_into`] on hot paths.
498    #[must_use]
499    pub fn poll_completions(&mut self) -> Vec<Completion> {
500        let mut completions = Vec::new();
501        self.poll_completions_into(&mut completions);
502        completions
503    }
504
505    /// Poll for completions into a caller-provided buffer. **Zero-alloc.**
506    ///
507    /// Appends completions to `out`. The caller should clear `out` between
508    /// iterations to reuse capacity. Uses an internal scratch buffer for
509    /// CQE data collection (pre-allocated at ring creation).
510    pub fn poll_completions_into(&mut self, out: &mut Vec<Completion>) {
511        if self.closed {
512            return;
513        }
514
515        // Poll main ring
516        self.poll_ring_completions_into(out, false);
517
518        // Poll IOPOLL ring if present
519        if self.iopoll_ring.is_some() {
520            self.poll_ring_completions_into(out, true);
521        }
522    }
523
524    /// Poll completions from a specific ring into caller's buffer.
525    /// Uses `self.cqe_scratch` to avoid per-call allocation.
526    fn poll_ring_completions_into(&mut self, completions: &mut Vec<Completion>, iopoll: bool) {
527        // Collect CQE data into pre-allocated scratch buffer to avoid
528        // borrow conflict (iterating CQ borrows ring, process_completion
529        // borrows &mut self).
530        self.cqe_scratch.clear();
531        {
532            let ring = if iopoll {
533                match &mut self.iopoll_ring {
534                    Some(r) => r.ring_mut(),
535                    None => return,
536                }
537            } else {
538                self.main_ring.ring_mut()
539            };
540
541            let cq = ring.completion();
542            for cqe in cq {
543                self.cqe_scratch
544                    .push((cqe.user_data(), cqe.result(), cqe.flags()));
545            }
546        }
547
548        // Process collected completions (ring borrow released).
549        // Index loop rather than drain() because process_completion_data
550        // needs &mut self, which conflicts with drain's borrow.
551        for i in 0..self.cqe_scratch.len() {
552            let (user_data, result, flags) = self.cqe_scratch[i];
553            let completion = self.process_completion_data(user_data, result, flags);
554            completions.push(completion);
555        }
556        self.cqe_scratch.clear();
557    }
558
559    /// Process completion data from a CQE.
560    #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
561    fn process_completion_data(&mut self, user_data: u64, result: i32, flags: u32) -> Completion {
562        let op = self.pending.remove(&user_data);
563        let latency = op.as_ref().map(|o| o.submitted_at().elapsed());
564
565        // Mark buffer as no longer in-flight. The caller is responsible
566        // for calling release_buffer(idx) after consuming the completion.
567        // (UringStorageIo does this in poll_completions; direct callers
568        // of wait_for must release explicitly.)
569        if let Some(ref op) = op {
570            if let Some(buf_idx) = op.buf_index() {
571                if let Some(ref mut pool) = self.buffer_pool {
572                    pool.complete_in_flight(buf_idx);
573                }
574            }
575        }
576
577        // Update metrics
578        if result >= 0 {
579            self.metrics.completions_success += 1;
580            if let Some(ref op) = op {
581                match op {
582                    PendingOp::Read { .. } => {
583                        self.metrics.bytes_read += result as u64;
584                    }
585                    PendingOp::Write { .. } => {
586                        self.metrics.bytes_written += result as u64;
587                    }
588                    _ => {}
589                }
590            }
591        } else {
592            self.metrics.completions_failed += 1;
593        }
594
595        if let Some(lat) = latency {
596            self.metrics.total_latency_ns += lat.as_nanos() as u64;
597            self.metrics.latency_samples += 1;
598        }
599
600        Completion {
601            user_data,
602            result,
603            flags,
604            op,
605            latency,
606        }
607    }
608
609    /// Wait for a specific operation to complete.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if the operation is not found or wait fails.
614    pub fn wait_for(&mut self, user_data: u64) -> Result<Completion, IoUringError> {
615        // First check if already completed
616        loop {
617            // Poll for completions
618            let completions = self.poll_completions();
619
620            for completion in completions {
621                if completion.user_data == user_data {
622                    return Ok(completion);
623                }
624            }
625
626            // If not found and still pending, wait for more
627            if !self.pending.contains_key(&user_data) {
628                return Err(IoUringError::PendingNotFound(user_data));
629            }
630
631            // Submit and wait for at least one completion
632            self.submit_and_wait(1)?;
633        }
634    }
635
636    /// Get the number of pending operations.
637    #[must_use]
638    pub fn pending_count(&self) -> usize {
639        self.pending.len()
640    }
641
642    /// Get buffer pool statistics.
643    #[must_use]
644    pub fn buffer_pool_stats(&self) -> Option<BufferPoolStats> {
645        self.buffer_pool.as_ref().map(RegisteredBufferPool::stats)
646    }
647
648    /// Get ring metrics.
649    #[must_use]
650    pub const fn metrics(&self) -> &RingMetrics {
651        &self.metrics
652    }
653
654    /// Close the ring manager.
655    ///
656    /// Drains all pending operations before closing to prevent in-flight
657    /// operations from accessing freed memory.
658    pub fn close(&mut self) {
659        if self.closed {
660            return;
661        }
662        self.closed = true;
663
664        // Drain pending operations to ensure the kernel is not still accessing
665        // any user-space buffers before we drop them.
666        if !self.pending.is_empty() {
667            // Best-effort drain — ignore errors during shutdown.
668            let _ = self.main_ring.ring_mut().submit_and_wait(0);
669            for _ in self.main_ring.ring_mut().completion() {}
670            if let Some(ref mut iopoll) = self.iopoll_ring {
671                let _ = iopoll.ring_mut().submit_and_wait(0);
672                for _ in iopoll.ring_mut().completion() {}
673            }
674        }
675
676        self.pending.clear();
677    }
678
679    /// Generate the next `user_data` ID.
680    fn next_user_data(&mut self) -> u64 {
681        let id = self.next_id;
682        self.next_id += 1;
683        id
684    }
685}
686
687impl Drop for CoreRingManager {
688    fn drop(&mut self) {
689        self.close();
690    }
691}
692
693impl std::fmt::Debug for CoreRingManager {
694    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695        f.debug_struct("CoreRingManager")
696            .field("core_id", &self.core_id)
697            .field("mode", &self.main_ring.mode())
698            .field("pending_count", &self.pending.len())
699            .field("closed", &self.closed)
700            .field("metrics", &self.metrics)
701            .finish_non_exhaustive()
702    }
703}
704
705/// Metrics for ring operations.
706#[derive(Debug, Default, Clone)]
707pub struct RingMetrics {
708    /// Number of read operations submitted.
709    pub reads_submitted: u64,
710    /// Number of write operations submitted.
711    pub writes_submitted: u64,
712    /// Number of sync operations submitted.
713    pub syncs_submitted: u64,
714    /// Total submission calls.
715    pub submissions: u64,
716    /// Successful completions.
717    pub completions_success: u64,
718    /// Failed completions.
719    pub completions_failed: u64,
720    /// Total bytes read.
721    pub bytes_read: u64,
722    /// Total bytes written.
723    pub bytes_written: u64,
724    /// Total latency in nanoseconds.
725    pub total_latency_ns: u64,
726    /// Number of latency samples.
727    pub latency_samples: u64,
728}
729
730impl RingMetrics {
731    /// Get average latency in nanoseconds.
732    #[must_use]
733    pub fn avg_latency_ns(&self) -> u64 {
734        if self.latency_samples > 0 {
735            self.total_latency_ns / self.latency_samples
736        } else {
737            0
738        }
739    }
740
741    /// Get total operations submitted.
742    #[must_use]
743    pub const fn total_ops(&self) -> u64 {
744        self.reads_submitted + self.writes_submitted + self.syncs_submitted
745    }
746
747    /// Get total completions.
748    #[must_use]
749    pub const fn total_completions(&self) -> u64 {
750        self.completions_success + self.completions_failed
751    }
752
753    /// Get success rate (0.0 to 1.0).
754    #[must_use]
755    #[allow(clippy::cast_precision_loss)]
756    pub fn success_rate(&self) -> f64 {
757        let total = self.total_completions();
758        if total > 0 {
759            self.completions_success as f64 / total as f64
760        } else {
761            1.0
762        }
763    }
764}
765
766#[cfg(test)]
767#[allow(
768    clippy::manual_let_else,
769    clippy::single_match_else,
770    clippy::items_after_statements
771)]
772mod tests {
773    use super::*;
774    use std::fs::OpenOptions;
775    use std::io::Write;
776    use tempfile::tempdir;
777
778    fn make_config() -> IoUringConfig {
779        IoUringConfig {
780            ring_entries: 32,
781            mode: RingMode::Standard,
782            buffer_size: 4096,
783            buffer_count: 8,
784            ..Default::default()
785        }
786    }
787
788    #[test]
789    fn test_manager_creation() {
790        let config = make_config();
791        let manager = CoreRingManager::new(0, &config);
792
793        match manager {
794            Ok(m) => {
795                assert_eq!(m.core_id(), 0);
796                assert!(!m.is_closed());
797                assert_eq!(m.mode(), RingMode::Standard);
798            }
799            Err(e) => {
800                tracing::error!("io_uring not available: {e}");
801            }
802        }
803    }
804
805    #[test]
806    fn test_buffer_acquire_release() {
807        let config = make_config();
808        let mut manager = match CoreRingManager::new(0, &config) {
809            Ok(m) => m,
810            Err(_) => return,
811        };
812
813        // Acquire a buffer
814        let result = manager.acquire_buffer();
815        match result {
816            Ok((idx, buf)) => {
817                assert_eq!(buf.len(), 4096);
818                manager.release_buffer(idx);
819            }
820            Err(e) => {
821                tracing::error!("Buffer acquire failed: {e}");
822            }
823        }
824    }
825
826    #[test]
827    fn test_metrics() {
828        let metrics = RingMetrics {
829            reads_submitted: 10,
830            writes_submitted: 20,
831            syncs_submitted: 5,
832            submissions: 35,
833            completions_success: 30,
834            completions_failed: 5,
835            bytes_read: 40960,
836            bytes_written: 81920,
837            total_latency_ns: 100_000,
838            latency_samples: 10,
839        };
840
841        assert_eq!(metrics.total_ops(), 35);
842        assert_eq!(metrics.total_completions(), 35);
843        assert_eq!(metrics.avg_latency_ns(), 10000);
844        assert!((metrics.success_rate() - 0.857).abs() < 0.01);
845    }
846
847    #[test]
848    fn test_completion_kind() {
849        let completion = Completion {
850            user_data: 1,
851            result: 100,
852            flags: 0,
853            op: Some(PendingOp::Read {
854                buf_index: 0,
855                len: 100,
856                submitted_at: Instant::now(),
857            }),
858            latency: None,
859        };
860
861        assert_eq!(completion.kind(), CompletionKind::Read);
862        assert!(completion.is_success());
863        assert_eq!(completion.bytes_transferred(), Some(100));
864    }
865
866    #[test]
867    fn test_completion_error() {
868        let completion = Completion {
869            user_data: 1,
870            result: -5, // EIO
871            flags: 0,
872            op: None,
873            latency: None,
874        };
875
876        assert!(!completion.is_success());
877        assert!(completion.error().is_some());
878        assert_eq!(completion.kind(), CompletionKind::Unknown);
879    }
880
881    #[test]
882    fn test_write_and_poll() {
883        let config = make_config();
884        let mut manager = match CoreRingManager::new(0, &config) {
885            Ok(m) => m,
886            Err(_) => return,
887        };
888
889        // Create temp file
890        let dir = tempdir().unwrap();
891        let path = dir.path().join("test.dat");
892        let mut file = OpenOptions::new()
893            .read(true)
894            .write(true)
895            .create(true)
896            .truncate(true)
897            .open(&path)
898            .unwrap();
899        file.write_all(&[0u8; 4096]).unwrap();
900        file.flush().unwrap();
901        drop(file);
902
903        let file = OpenOptions::new()
904            .read(true)
905            .write(true)
906            .open(&path)
907            .unwrap();
908        use std::os::unix::io::AsRawFd;
909        let fd = file.as_raw_fd();
910
911        // Acquire buffer and write data
912        let (idx, buf) = match manager.acquire_buffer() {
913            Ok(x) => x,
914            Err(_) => return,
915        };
916        buf[..5].copy_from_slice(b"hello");
917
918        // Submit write
919        let user_data = match manager.submit_write(fd, idx, 0, 5) {
920            Ok(ud) => ud,
921            Err(_) => return,
922        };
923
924        // Submit to kernel
925        let _ = manager.submit();
926
927        // Wait for completion
928        match manager.wait_for(user_data) {
929            Ok(completion) => {
930                // In some CI environments, io_uring operations may fail
931                // (e.g., containers without proper io_uring support)
932                if !completion.is_success() {
933                    tracing::error!("Write completion failed (possibly unsupported environment)");
934                    return;
935                }
936                assert_eq!(completion.kind(), CompletionKind::Write);
937            }
938            Err(e) => {
939                tracing::error!("Wait failed: {e}");
940                return;
941            }
942        }
943
944        manager.release_buffer(idx);
945    }
946
947    #[test]
948    fn test_close() {
949        let config = make_config();
950        let mut manager = match CoreRingManager::new(0, &config) {
951            Ok(m) => m,
952            Err(_) => return,
953        };
954
955        assert!(!manager.is_closed());
956        manager.close();
957        assert!(manager.is_closed());
958
959        // Operations should fail after close
960        assert!(matches!(
961            manager.submit_sync(0, true),
962            Err(IoUringError::RingClosed)
963        ));
964    }
965}