Skip to main content

laminar_core/tpc/
runtime.rs

1//! Thread-per-core runtime configuration and output buffer.
2//!
3//! Provides [`TpcConfig`] for configuring per-core reactors and
4//! [`OutputBuffer`] for zero-allocation output collection.
5
6use std::ops::Deref;
7
8use crate::operator::Output;
9use crate::reactor::ReactorConfig;
10
11// OutputBuffer - Pre-allocated buffer for zero-allocation polling
12
13/// A pre-allocated buffer for collecting outputs without allocation.
14///
15/// This buffer can be reused across multiple poll cycles, avoiding
16/// memory allocation on the hot path.
17///
18/// # Example
19///
20/// ```rust,ignore
21/// use laminar_core::tpc::OutputBuffer;
22///
23/// // Create buffer once at startup
24/// let mut buffer = OutputBuffer::with_capacity(1024);
25///
26/// // Poll loop - no allocation after warmup
27/// loop {
28///     let count = runtime.poll_into(&mut buffer, 256);
29///     for output in buffer.iter() {
30///         process(output);
31///     }
32///     buffer.clear();
33/// }
34/// ```
35#[derive(Debug)]
36pub struct OutputBuffer {
37    /// Internal storage (pre-allocated)
38    items: Vec<Output>,
39}
40
41impl OutputBuffer {
42    /// Creates a new output buffer with the given capacity.
43    ///
44    /// The buffer will not allocate until `capacity` items are added.
45    #[must_use]
46    pub fn with_capacity(capacity: usize) -> Self {
47        Self {
48            items: Vec::with_capacity(capacity),
49        }
50    }
51
52    /// Clears the buffer for reuse (no deallocation).
53    ///
54    /// The capacity remains unchanged, allowing zero-allocation reuse.
55    #[inline]
56    pub fn clear(&mut self) {
57        self.items.clear();
58    }
59
60    /// Returns the number of items in the buffer.
61    #[inline]
62    #[must_use]
63    pub fn len(&self) -> usize {
64        self.items.len()
65    }
66
67    /// Returns true if the buffer is empty.
68    #[inline]
69    #[must_use]
70    pub fn is_empty(&self) -> bool {
71        self.items.is_empty()
72    }
73
74    /// Returns the current capacity of the buffer.
75    #[inline]
76    #[must_use]
77    pub fn capacity(&self) -> usize {
78        self.items.capacity()
79    }
80
81    /// Returns the remaining capacity before reallocation.
82    #[inline]
83    #[must_use]
84    pub fn remaining(&self) -> usize {
85        self.items.capacity() - self.items.len()
86    }
87
88    /// Returns a slice of the collected outputs.
89    #[inline]
90    #[must_use]
91    pub fn as_slice(&self) -> &[Output] {
92        &self.items
93    }
94
95    /// Returns an iterator over the outputs.
96    #[inline]
97    pub fn iter(&self) -> impl Iterator<Item = &Output> {
98        self.items.iter()
99    }
100
101    /// Consumes the buffer and returns the inner Vec.
102    #[must_use]
103    pub fn into_vec(self) -> Vec<Output> {
104        self.items
105    }
106
107    /// Extends the buffer with outputs from an iterator.
108    ///
109    /// Note: This may allocate if the iterator produces more items than
110    /// the remaining capacity.
111    #[inline]
112    pub fn extend<I: IntoIterator<Item = Output>>(&mut self, iter: I) {
113        self.items.extend(iter);
114    }
115
116    /// Pushes a single output to the buffer.
117    ///
118    /// Note: This may allocate if the buffer is at capacity.
119    #[inline]
120    pub fn push(&mut self, output: Output) {
121        self.items.push(output);
122    }
123
124    /// Returns a mutable reference to the internal Vec.
125    ///
126    /// This is useful for passing to functions that expect `&mut Vec<Output>`.
127    #[inline]
128    pub fn as_vec_mut(&mut self) -> &mut Vec<Output> {
129        &mut self.items
130    }
131}
132
133impl Default for OutputBuffer {
134    fn default() -> Self {
135        Self::with_capacity(1024)
136    }
137}
138
139impl Deref for OutputBuffer {
140    type Target = [Output];
141
142    fn deref(&self) -> &Self::Target {
143        &self.items
144    }
145}
146
147impl<'a> IntoIterator for &'a OutputBuffer {
148    type Item = &'a Output;
149    type IntoIter = std::slice::Iter<'a, Output>;
150
151    fn into_iter(self) -> Self::IntoIter {
152        self.items.iter()
153    }
154}
155
156impl IntoIterator for OutputBuffer {
157    type Item = Output;
158    type IntoIter = std::vec::IntoIter<Output>;
159
160    fn into_iter(self) -> Self::IntoIter {
161        self.items.into_iter()
162    }
163}
164
165use super::router::KeySpec;
166use super::TpcError;
167
168/// Configuration for the thread-per-core runtime.
169#[derive(Debug, Clone)]
170pub struct TpcConfig {
171    /// Number of cores to use
172    pub num_cores: usize,
173    /// Key specification for routing
174    pub key_spec: KeySpec,
175    /// Whether to pin cores to CPUs
176    pub cpu_pinning: bool,
177    /// Starting CPU ID for pinning (cores use `cpu_start`, `cpu_start+1`, ...)
178    pub cpu_start: usize,
179    /// Inbox queue capacity per core
180    pub inbox_capacity: usize,
181    /// Outbox queue capacity per core
182    pub outbox_capacity: usize,
183    /// Reactor configuration (applied to all cores)
184    pub reactor_config: ReactorConfig,
185    /// Enable NUMA-aware memory allocation
186    pub numa_aware: bool,
187    /// Enable per-core storage I/O backend.
188    ///
189    /// When true, each core gets a [`StorageIo`](crate::storage_io::StorageIo)
190    /// instance. On Linux with `io-uring` feature, uses `io_uring` with SQPOLL.
191    /// Everywhere else, uses synchronous `std::fs`.
192    pub enable_storage_io: bool,
193    /// `io_uring` configuration (Linux only, requires `io-uring` feature).
194    ///
195    /// When `enable_storage_io` is true and this is `Some`, the `io_uring`
196    /// backend is used. When `None`, falls back to the sync backend.
197    #[cfg(all(target_os = "linux", feature = "io-uring"))]
198    pub io_uring_config: Option<crate::io_uring::IoUringConfig>,
199}
200
201impl Default for TpcConfig {
202    fn default() -> Self {
203        Self {
204            num_cores: std::thread::available_parallelism().map_or(1, std::num::NonZero::get),
205            key_spec: KeySpec::RoundRobin,
206            cpu_pinning: false,
207            cpu_start: 0,
208            inbox_capacity: 8192,
209            outbox_capacity: 8192,
210            reactor_config: ReactorConfig::default(),
211            numa_aware: false,
212            enable_storage_io: false,
213            #[cfg(all(target_os = "linux", feature = "io-uring"))]
214            io_uring_config: None,
215        }
216    }
217}
218
219impl TpcConfig {
220    /// Creates a new configuration builder.
221    #[must_use]
222    pub fn builder() -> TpcConfigBuilder {
223        TpcConfigBuilder::default()
224    }
225
226    /// Creates configuration with automatic detection.
227    ///
228    /// Detects system capabilities and generates an optimal configuration:
229    /// - Uses all available physical cores (minus 1 on systems with >8 cores)
230    /// - Enables CPU pinning on multi-core systems
231    /// - Enables NUMA-aware allocation on multi-socket systems
232    ///
233    /// # Example
234    ///
235    /// ```rust,ignore
236    /// use laminar_core::tpc::TpcConfig;
237    ///
238    /// let config = TpcConfig::auto();
239    /// println!("Using {} cores", config.num_cores);
240    /// ```
241    #[must_use]
242    pub fn auto() -> Self {
243        let caps = crate::detect::SystemCapabilities::detect();
244        let recommended = caps.recommended_config();
245
246        Self {
247            num_cores: recommended.num_cores,
248            key_spec: KeySpec::RoundRobin,
249            cpu_pinning: recommended.cpu_pinning,
250            cpu_start: 0,
251            inbox_capacity: recommended.queue_capacity,
252            outbox_capacity: recommended.queue_capacity,
253            reactor_config: ReactorConfig::default(),
254            numa_aware: recommended.numa_aware,
255            enable_storage_io: false,
256            #[cfg(all(target_os = "linux", feature = "io-uring"))]
257            io_uring_config: None,
258        }
259    }
260
261    /// Validates the configuration.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the configuration is invalid.
266    pub fn validate(&self) -> Result<(), TpcError> {
267        if self.num_cores == 0 {
268            return Err(TpcError::InvalidConfig("num_cores must be > 0".to_string()));
269        }
270        if self.inbox_capacity == 0 {
271            return Err(TpcError::InvalidConfig(
272                "inbox_capacity must be > 0".to_string(),
273            ));
274        }
275        if self.outbox_capacity == 0 {
276            return Err(TpcError::InvalidConfig(
277                "outbox_capacity must be > 0".to_string(),
278            ));
279        }
280        if self.cpu_pinning {
281            let available = std::thread::available_parallelism()
282                .map(std::num::NonZero::get)
283                .unwrap_or(1);
284            let max_cpu = self.cpu_start + self.num_cores;
285            if max_cpu > available {
286                return Err(TpcError::InvalidConfig(format!(
287                    "cpu_start({}) + num_cores({}) = {max_cpu} exceeds available CPUs ({available})",
288                    self.cpu_start, self.num_cores,
289                )));
290            }
291        }
292        Ok(())
293    }
294}
295
296/// Builder for `TpcConfig`.
297#[derive(Debug, Default)]
298pub struct TpcConfigBuilder {
299    num_cores: Option<usize>,
300    key_spec: Option<KeySpec>,
301    cpu_pinning: Option<bool>,
302    cpu_start: Option<usize>,
303    inbox_capacity: Option<usize>,
304    outbox_capacity: Option<usize>,
305    reactor_config: Option<ReactorConfig>,
306    numa_aware: Option<bool>,
307    enable_storage_io: Option<bool>,
308    #[cfg(all(target_os = "linux", feature = "io-uring"))]
309    io_uring_config: Option<crate::io_uring::IoUringConfig>,
310}
311
312impl TpcConfigBuilder {
313    /// Sets the number of cores.
314    #[must_use]
315    pub fn num_cores(mut self, n: usize) -> Self {
316        self.num_cores = Some(n);
317        self
318    }
319
320    /// Sets the key specification for routing.
321    #[must_use]
322    pub fn key_spec(mut self, spec: KeySpec) -> Self {
323        self.key_spec = Some(spec);
324        self
325    }
326
327    /// Sets key columns for routing (convenience method).
328    #[must_use]
329    pub fn key_columns(self, columns: Vec<String>) -> Self {
330        self.key_spec(KeySpec::Columns(columns))
331    }
332
333    /// Enables or disables CPU pinning.
334    #[must_use]
335    pub fn cpu_pinning(mut self, enabled: bool) -> Self {
336        self.cpu_pinning = Some(enabled);
337        self
338    }
339
340    /// Sets the starting CPU ID for pinning.
341    #[must_use]
342    pub fn cpu_start(mut self, cpu: usize) -> Self {
343        self.cpu_start = Some(cpu);
344        self
345    }
346
347    /// Sets the inbox capacity per core.
348    #[must_use]
349    pub fn inbox_capacity(mut self, capacity: usize) -> Self {
350        self.inbox_capacity = Some(capacity);
351        self
352    }
353
354    /// Sets the outbox capacity per core.
355    #[must_use]
356    pub fn outbox_capacity(mut self, capacity: usize) -> Self {
357        self.outbox_capacity = Some(capacity);
358        self
359    }
360
361    /// Sets the reactor configuration.
362    #[must_use]
363    pub fn reactor_config(mut self, config: ReactorConfig) -> Self {
364        self.reactor_config = Some(config);
365        self
366    }
367
368    /// Enables or disables NUMA-aware memory allocation.
369    ///
370    /// When enabled, per-core state stores and buffers are allocated
371    /// on the NUMA node local to that core, improving memory access latency.
372    #[must_use]
373    pub fn numa_aware(mut self, enabled: bool) -> Self {
374        self.numa_aware = Some(enabled);
375        self
376    }
377
378    /// Enables per-core storage I/O backend.
379    #[must_use]
380    pub fn enable_storage_io(mut self, enabled: bool) -> Self {
381        self.enable_storage_io = Some(enabled);
382        self
383    }
384
385    /// Sets the `io_uring` configuration (Linux only).
386    #[cfg(all(target_os = "linux", feature = "io-uring"))]
387    #[must_use]
388    pub fn io_uring_config(mut self, config: crate::io_uring::IoUringConfig) -> Self {
389        self.io_uring_config = Some(config);
390        self
391    }
392
393    /// Builds the configuration.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the configuration is invalid.
398    pub fn build(self) -> Result<TpcConfig, TpcError> {
399        let config = TpcConfig {
400            num_cores: self.num_cores.unwrap_or_else(|| {
401                std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
402            }),
403            key_spec: self.key_spec.unwrap_or_default(),
404            cpu_pinning: self.cpu_pinning.unwrap_or(false),
405            cpu_start: self.cpu_start.unwrap_or(0),
406            inbox_capacity: self.inbox_capacity.unwrap_or(8192),
407            outbox_capacity: self.outbox_capacity.unwrap_or(8192),
408            reactor_config: self.reactor_config.unwrap_or_default(),
409            numa_aware: self.numa_aware.unwrap_or(false),
410            enable_storage_io: self.enable_storage_io.unwrap_or(false),
411            #[cfg(all(target_os = "linux", feature = "io-uring"))]
412            io_uring_config: self.io_uring_config,
413        };
414        config.validate()?;
415        Ok(config)
416    }
417}