laminar_core/storage_io/mod.rs
1//! # Storage I/O Abstraction
2//!
3//! Platform-abstracted non-blocking storage I/O for the thread-per-core architecture.
4//!
5//! ## Design
6//!
7//! Ring 0 cannot block, allocate, or syscall on the hot path. Storage I/O needs
8//! a submission-completion model that fits the synchronous spin-loop:
9//!
10//! 1. **Submit**: push a write or sync request (non-blocking)
11//! 2. **Poll**: check for completions on the next loop iteration
12//!
13//! Two backends implement this interface:
14//!
15//! - `SyncStorageIo` — executes I/O inline via `std::fs`. Write "completes"
16//! when data is in the kernel page cache. Sync "completes" after `fdatasync`.
17//! Works on all platforms. This is the default.
18//!
19//! - `UringStorageIo` (Linux + `io-uring` feature) — submits SQEs to a per-core
20//! ring. With SQPOLL, submissions are zero-syscall. Completions arrive via
21//! CQE polling.
22//!
23//! ## Usage from Ring 0
24//!
25//! ```text
26//! // Cold path (init_core_thread):
27//! let storage = SyncStorageIo::new();
28//! let wal_fd = storage.register_fd(file);
29//!
30//! // Hot path (core_thread_main loop):
31//! storage.submit_write(fd, &data, offset)?;
32//! storage.flush()?;
33//! // ... next iteration ...
34//! completions.clear();
35//! storage.poll_completions(&mut completions);
36//! ```
37
38mod sync_backend;
39
40#[cfg(all(target_os = "linux", feature = "io-uring"))]
41mod uring_backend;
42
43pub use sync_backend::SyncStorageIo;
44
45#[cfg(all(target_os = "linux", feature = "io-uring"))]
46pub use uring_backend::UringStorageIo;
47
48use std::fmt;
49
50// ── Opaque file descriptor ──────────────────────────────────────────────────
51
52/// Opaque file descriptor managed by a [`StorageIo`] backend.
53///
54/// Created via [`StorageIo::register_fd`]. The backend maps this to the
55/// platform-native handle internally (Unix `RawFd`, Windows `RawHandle`,
56/// or `io_uring` registered fd index).
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58pub struct IoFd(pub(crate) u32);
59
60// ── Completion token ────────────────────────────────────────────────────────
61
62/// Completion of a submitted I/O operation.
63///
64/// Returned by [`StorageIo::poll_completions`]. The `token` matches the
65/// value returned by the corresponding `submit_*` call.
66#[derive(Debug, Clone, Copy)]
67pub struct IoCompletion {
68 /// Token that was returned by `submit_write` / `submit_datasync`.
69 pub token: u64,
70 /// Bytes transferred (for writes) or 0 (for sync).
71 /// Negative values indicate a POSIX errno.
72 pub result: i32,
73}
74
75// ── Error type ──────────────────────────────────────────────────────────────
76
77/// Errors from the storage I/O layer.
78///
79/// Intentionally small — this is used on the hot path.
80#[derive(Debug)]
81pub enum StorageIoError {
82 /// The submission queue or internal buffer is full.
83 QueueFull,
84 /// An OS-level I/O error occurred.
85 Io(std::io::Error),
86 /// The requested `IoFd` is not registered.
87 BadFd(IoFd),
88 /// The backend is closed / shutting down.
89 Closed,
90 /// Buffer pool exhausted (`io_uring` only).
91 BufferExhausted,
92}
93
94impl fmt::Display for StorageIoError {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::QueueFull => write!(f, "storage I/O submission queue full"),
98 Self::Io(e) => write!(f, "storage I/O error: {e}"),
99 Self::BadFd(fd) => write!(f, "unregistered IoFd({})", fd.0),
100 Self::Closed => write!(f, "storage I/O backend closed"),
101 Self::BufferExhausted => write!(f, "registered buffer pool exhausted"),
102 }
103 }
104}
105
106impl std::error::Error for StorageIoError {
107 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
108 match self {
109 Self::Io(e) => Some(e),
110 _ => None,
111 }
112 }
113}
114
115impl From<std::io::Error> for StorageIoError {
116 fn from(e: std::io::Error) -> Self {
117 Self::Io(e)
118 }
119}
120
121// ── Trait ────────────────────────────────────────────────────────────────────
122
123/// Non-blocking storage I/O backend for Ring 0.
124///
125/// All `submit_*` methods are non-blocking. They enqueue work and return a
126/// token immediately. The caller polls for completions on the next loop
127/// iteration via [`poll_completions`](StorageIo::poll_completions).
128///
129/// # Ring 0 contract
130///
131/// Implementations **must not**:
132/// - Block (no mutexes, no condition variables)
133/// - Allocate on submit/poll hot path (pre-allocate in `new`)
134/// - Panic (return errors instead)
135///
136/// The sync backend executes I/O inline in `submit_*` and stages completions
137/// for the next `poll_completions` call. The `io_uring` backend pushes SQEs
138/// and polls CQEs.
139#[allow(clippy::missing_errors_doc)] // errors documented per-method below
140pub trait StorageIo: Send {
141 /// Register an already-opened file with this backend.
142 ///
143 /// Called on the cold path (init / checkpoint setup). Returns an opaque
144 /// [`IoFd`] used for subsequent I/O operations.
145 ///
146 /// Returns [`StorageIoError::QueueFull`] if the fd table is full.
147 ///
148 /// # Platform notes
149 ///
150 /// On Unix the backend extracts `RawFd` via `AsRawFd`.
151 /// On Windows the backend extracts `RawHandle` via `AsRawHandle`.
152 fn register_fd(&mut self, file: std::fs::File) -> Result<IoFd, StorageIoError>;
153
154 /// Deregister and close a previously registered file.
155 ///
156 /// Called on the cold path (shutdown / WAL truncate-and-reopen).
157 /// Returns [`StorageIoError::BadFd`] if the fd is not registered.
158 fn deregister_fd(&mut self, fd: IoFd) -> Result<(), StorageIoError>;
159
160 /// Submit a write at the given file offset. **Non-blocking.**
161 ///
162 /// `data` is copied into backend-managed storage before returning.
163 /// The caller's buffer is free to reuse immediately.
164 ///
165 /// Returns a token for tracking completion, or [`StorageIoError::BadFd`]
166 /// / [`StorageIoError::BufferExhausted`] on failure.
167 fn submit_write(&mut self, fd: IoFd, data: &[u8], offset: u64) -> Result<u64, StorageIoError>;
168
169 /// Submit an append (write at current end-of-file). **Non-blocking.**
170 ///
171 /// Semantically equivalent to `submit_write` at `offset = current_size`.
172 /// The sync backend uses the file's append mode. The `io_uring` backend
173 /// passes `offset = u64::MAX` which means "append" in Linux 6.0+.
174 ///
175 /// Returns a token for tracking completion, or [`StorageIoError::BadFd`]
176 /// / [`StorageIoError::BufferExhausted`] on failure.
177 fn submit_append(&mut self, fd: IoFd, data: &[u8]) -> Result<u64, StorageIoError>;
178
179 /// Submit an `fdatasync` (data-only sync, no metadata). **Non-blocking.**
180 ///
181 /// Returns a token for tracking completion, or [`StorageIoError::BadFd`]
182 /// on failure.
183 fn submit_datasync(&mut self, fd: IoFd) -> Result<u64, StorageIoError>;
184
185 /// Flush pending submissions to the kernel.
186 ///
187 /// - **`io_uring` (SQPOLL)**: no-op — kernel polls the SQ automatically.
188 /// - **`io_uring` (standard)**: calls `ring.submit()`.
189 /// - **Sync backend**: no-op — I/O was executed inline in `submit_*`.
190 ///
191 /// Returns the number of operations submitted, or
192 /// [`StorageIoError::Closed`] if the backend is shut down.
193 fn flush(&mut self) -> Result<usize, StorageIoError>;
194
195 /// Poll for completed operations. **Non-blocking.**
196 ///
197 /// Appends completions to `out`. The caller owns the Vec and should
198 /// clear it between iterations to reuse capacity (zero-alloc pattern).
199 fn poll_completions(&mut self, out: &mut Vec<IoCompletion>);
200
201 /// Returns the number of operations submitted but not yet completed.
202 fn pending_count(&self) -> usize;
203}