laminar_core/tpc/
runtime.rs1use std::ops::Deref;
7
8use crate::operator::Output;
9use crate::reactor::ReactorConfig;
10
11#[derive(Debug)]
36pub struct OutputBuffer {
37 items: Vec<Output>,
39}
40
41impl OutputBuffer {
42 #[must_use]
46 pub fn with_capacity(capacity: usize) -> Self {
47 Self {
48 items: Vec::with_capacity(capacity),
49 }
50 }
51
52 #[inline]
56 pub fn clear(&mut self) {
57 self.items.clear();
58 }
59
60 #[inline]
62 #[must_use]
63 pub fn len(&self) -> usize {
64 self.items.len()
65 }
66
67 #[inline]
69 #[must_use]
70 pub fn is_empty(&self) -> bool {
71 self.items.is_empty()
72 }
73
74 #[inline]
76 #[must_use]
77 pub fn capacity(&self) -> usize {
78 self.items.capacity()
79 }
80
81 #[inline]
83 #[must_use]
84 pub fn remaining(&self) -> usize {
85 self.items.capacity() - self.items.len()
86 }
87
88 #[inline]
90 #[must_use]
91 pub fn as_slice(&self) -> &[Output] {
92 &self.items
93 }
94
95 #[inline]
97 pub fn iter(&self) -> impl Iterator<Item = &Output> {
98 self.items.iter()
99 }
100
101 #[must_use]
103 pub fn into_vec(self) -> Vec<Output> {
104 self.items
105 }
106
107 #[inline]
112 pub fn extend<I: IntoIterator<Item = Output>>(&mut self, iter: I) {
113 self.items.extend(iter);
114 }
115
116 #[inline]
120 pub fn push(&mut self, output: Output) {
121 self.items.push(output);
122 }
123
124 #[inline]
128 pub fn as_vec_mut(&mut self) -> &mut Vec<Output> {
129 &mut self.items
130 }
131}
132
133impl Default for OutputBuffer {
134 fn default() -> Self {
135 Self::with_capacity(1024)
136 }
137}
138
139impl Deref for OutputBuffer {
140 type Target = [Output];
141
142 fn deref(&self) -> &Self::Target {
143 &self.items
144 }
145}
146
147impl<'a> IntoIterator for &'a OutputBuffer {
148 type Item = &'a Output;
149 type IntoIter = std::slice::Iter<'a, Output>;
150
151 fn into_iter(self) -> Self::IntoIter {
152 self.items.iter()
153 }
154}
155
156impl IntoIterator for OutputBuffer {
157 type Item = Output;
158 type IntoIter = std::vec::IntoIter<Output>;
159
160 fn into_iter(self) -> Self::IntoIter {
161 self.items.into_iter()
162 }
163}
164
165use super::router::KeySpec;
166use super::TpcError;
167
168#[derive(Debug, Clone)]
170pub struct TpcConfig {
171 pub num_cores: usize,
173 pub key_spec: KeySpec,
175 pub cpu_pinning: bool,
177 pub cpu_start: usize,
179 pub inbox_capacity: usize,
181 pub outbox_capacity: usize,
183 pub reactor_config: ReactorConfig,
185 pub numa_aware: bool,
187 pub enable_storage_io: bool,
193 #[cfg(all(target_os = "linux", feature = "io-uring"))]
198 pub io_uring_config: Option<crate::io_uring::IoUringConfig>,
199}
200
201impl Default for TpcConfig {
202 fn default() -> Self {
203 Self {
204 num_cores: std::thread::available_parallelism().map_or(1, std::num::NonZero::get),
205 key_spec: KeySpec::RoundRobin,
206 cpu_pinning: false,
207 cpu_start: 0,
208 inbox_capacity: 8192,
209 outbox_capacity: 8192,
210 reactor_config: ReactorConfig::default(),
211 numa_aware: false,
212 enable_storage_io: false,
213 #[cfg(all(target_os = "linux", feature = "io-uring"))]
214 io_uring_config: None,
215 }
216 }
217}
218
219impl TpcConfig {
220 #[must_use]
222 pub fn builder() -> TpcConfigBuilder {
223 TpcConfigBuilder::default()
224 }
225
226 #[must_use]
242 pub fn auto() -> Self {
243 let caps = crate::detect::SystemCapabilities::detect();
244 let recommended = caps.recommended_config();
245
246 Self {
247 num_cores: recommended.num_cores,
248 key_spec: KeySpec::RoundRobin,
249 cpu_pinning: recommended.cpu_pinning,
250 cpu_start: 0,
251 inbox_capacity: recommended.queue_capacity,
252 outbox_capacity: recommended.queue_capacity,
253 reactor_config: ReactorConfig::default(),
254 numa_aware: recommended.numa_aware,
255 enable_storage_io: false,
256 #[cfg(all(target_os = "linux", feature = "io-uring"))]
257 io_uring_config: None,
258 }
259 }
260
261 pub fn validate(&self) -> Result<(), TpcError> {
267 if self.num_cores == 0 {
268 return Err(TpcError::InvalidConfig("num_cores must be > 0".to_string()));
269 }
270 if self.inbox_capacity == 0 {
271 return Err(TpcError::InvalidConfig(
272 "inbox_capacity must be > 0".to_string(),
273 ));
274 }
275 if self.outbox_capacity == 0 {
276 return Err(TpcError::InvalidConfig(
277 "outbox_capacity must be > 0".to_string(),
278 ));
279 }
280 if self.cpu_pinning {
281 let available = std::thread::available_parallelism()
282 .map(std::num::NonZero::get)
283 .unwrap_or(1);
284 let max_cpu = self.cpu_start + self.num_cores;
285 if max_cpu > available {
286 return Err(TpcError::InvalidConfig(format!(
287 "cpu_start({}) + num_cores({}) = {max_cpu} exceeds available CPUs ({available})",
288 self.cpu_start, self.num_cores,
289 )));
290 }
291 }
292 Ok(())
293 }
294}
295
296#[derive(Debug, Default)]
298pub struct TpcConfigBuilder {
299 num_cores: Option<usize>,
300 key_spec: Option<KeySpec>,
301 cpu_pinning: Option<bool>,
302 cpu_start: Option<usize>,
303 inbox_capacity: Option<usize>,
304 outbox_capacity: Option<usize>,
305 reactor_config: Option<ReactorConfig>,
306 numa_aware: Option<bool>,
307 enable_storage_io: Option<bool>,
308 #[cfg(all(target_os = "linux", feature = "io-uring"))]
309 io_uring_config: Option<crate::io_uring::IoUringConfig>,
310}
311
312impl TpcConfigBuilder {
313 #[must_use]
315 pub fn num_cores(mut self, n: usize) -> Self {
316 self.num_cores = Some(n);
317 self
318 }
319
320 #[must_use]
322 pub fn key_spec(mut self, spec: KeySpec) -> Self {
323 self.key_spec = Some(spec);
324 self
325 }
326
327 #[must_use]
329 pub fn key_columns(self, columns: Vec<String>) -> Self {
330 self.key_spec(KeySpec::Columns(columns))
331 }
332
333 #[must_use]
335 pub fn cpu_pinning(mut self, enabled: bool) -> Self {
336 self.cpu_pinning = Some(enabled);
337 self
338 }
339
340 #[must_use]
342 pub fn cpu_start(mut self, cpu: usize) -> Self {
343 self.cpu_start = Some(cpu);
344 self
345 }
346
347 #[must_use]
349 pub fn inbox_capacity(mut self, capacity: usize) -> Self {
350 self.inbox_capacity = Some(capacity);
351 self
352 }
353
354 #[must_use]
356 pub fn outbox_capacity(mut self, capacity: usize) -> Self {
357 self.outbox_capacity = Some(capacity);
358 self
359 }
360
361 #[must_use]
363 pub fn reactor_config(mut self, config: ReactorConfig) -> Self {
364 self.reactor_config = Some(config);
365 self
366 }
367
368 #[must_use]
373 pub fn numa_aware(mut self, enabled: bool) -> Self {
374 self.numa_aware = Some(enabled);
375 self
376 }
377
378 #[must_use]
380 pub fn enable_storage_io(mut self, enabled: bool) -> Self {
381 self.enable_storage_io = Some(enabled);
382 self
383 }
384
385 #[cfg(all(target_os = "linux", feature = "io-uring"))]
387 #[must_use]
388 pub fn io_uring_config(mut self, config: crate::io_uring::IoUringConfig) -> Self {
389 self.io_uring_config = Some(config);
390 self
391 }
392
393 pub fn build(self) -> Result<TpcConfig, TpcError> {
399 let config = TpcConfig {
400 num_cores: self.num_cores.unwrap_or_else(|| {
401 std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
402 }),
403 key_spec: self.key_spec.unwrap_or_default(),
404 cpu_pinning: self.cpu_pinning.unwrap_or(false),
405 cpu_start: self.cpu_start.unwrap_or(0),
406 inbox_capacity: self.inbox_capacity.unwrap_or(8192),
407 outbox_capacity: self.outbox_capacity.unwrap_or(8192),
408 reactor_config: self.reactor_config.unwrap_or_default(),
409 numa_aware: self.numa_aware.unwrap_or(false),
410 enable_storage_io: self.enable_storage_io.unwrap_or(false),
411 #[cfg(all(target_os = "linux", feature = "io-uring"))]
412 io_uring_config: self.io_uring_config,
413 };
414 config.validate()?;
415 Ok(config)
416 }
417}