Skip to main content

laminar_core/tpc/
mod.rs

1//! # Thread-Per-Core (TPC) Module
2//!
3//! Implements thread-per-core architecture for linear scaling on multi-core systems.
4//!
5//! ## Components
6//!
7//! - [`SpscQueue`] - Lock-free single-producer single-consumer queue
8//! - [`CoreHandle`] - Manages a single core's reactor thread
9//! - [`CreditGate`] - Credit-based backpressure
10//! - [`TpcConfig`] / [`OutputBuffer`] - Runtime configuration and output collection
11
12mod backpressure;
13mod core_handle;
14mod partitioned_router;
15mod router;
16mod runtime;
17mod spsc;
18#[cfg(test)]
19mod zero_alloc_tests;
20
21pub use backpressure::{
22    BackpressureConfig, BackpressureConfigBuilder, CreditAcquireResult, CreditGate, CreditMetrics,
23    CreditMetricsSnapshot, OverflowStrategy,
24};
25pub use core_handle::{CoreConfig, CoreHandle, CoreMessage, TaggedOutput};
26pub use partitioned_router::PartitionedRouter;
27pub use router::{KeySpec, RouterError};
28pub use runtime::{OutputBuffer, TpcConfig, TpcConfigBuilder};
29pub use spsc::{CachePadded, SpscQueue};
30
31/// Errors that can occur in the TPC runtime.
32#[derive(Debug, thiserror::Error)]
33pub enum TpcError {
34    /// Failed to spawn a core thread
35    #[error("Failed to spawn core {core_id}: {message}")]
36    SpawnFailed {
37        /// The core ID that failed to spawn
38        core_id: usize,
39        /// Error message
40        message: String,
41    },
42
43    /// Failed to set CPU affinity
44    #[error("Failed to set CPU affinity for core {core_id}: {message}")]
45    AffinityFailed {
46        /// The core ID
47        core_id: usize,
48        /// Error message
49        message: String,
50    },
51
52    /// Queue is full, cannot accept more events
53    #[error("Queue full for core {core_id}")]
54    QueueFull {
55        /// The core ID whose queue is full
56        core_id: usize,
57    },
58
59    /// Backpressure active, no credits available
60    #[error("Backpressure active for core {core_id}")]
61    Backpressure {
62        /// The core ID that is backpressured
63        core_id: usize,
64    },
65
66    /// Runtime is not running
67    #[error("Runtime is not running")]
68    NotRunning,
69
70    /// Runtime is already running
71    #[error("Runtime is already running")]
72    AlreadyRunning,
73
74    /// Invalid configuration
75    #[error("Invalid configuration: {0}")]
76    InvalidConfig(String),
77
78    /// Reactor error from a core
79    #[error("Reactor error on core {core_id}: {source}")]
80    ReactorError {
81        /// The core ID
82        core_id: usize,
83        /// The underlying reactor error
84        #[source]
85        source: crate::reactor::ReactorError,
86    },
87
88    /// An operator panicked inside Ring 0.
89    #[error("Operator panic on core {core_id}: {message}")]
90    OperatorPanic {
91        /// The core ID where the panic occurred.
92        core_id: usize,
93        /// Panic message extracted from the payload.
94        message: String,
95    },
96
97    /// Key extraction failed
98    #[error("Key extraction failed: {0}")]
99    KeyExtractionFailed(String),
100
101    /// Router error (zero-allocation variant)
102    #[error("Router error: {0}")]
103    RouterError(#[from] RouterError),
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn test_error_display() {
112        let err = TpcError::QueueFull { core_id: 3 };
113        assert_eq!(err.to_string(), "Queue full for core 3");
114    }
115}