1#![allow(clippy::disallowed_types)] use std::sync::Arc;
9
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_connectors::config::ConnectorConfig;
12use laminar_connectors::connector::SourceConnector;
13use laminar_core::checkpoint::CheckpointBarrierInjector;
14use laminar_core::storage_io::IoCompletion;
15use laminar_core::tpc::TaggedOutput;
16use laminar_core::tpc::{CoreConfig, CoreHandle, TpcConfig, TpcError};
17
18use super::config::PipelineConfig;
19use super::source_adapter::{SourceIoMetrics, SourceIoThread};
20
21pub struct TpcRuntime {
23 cores: Vec<CoreHandle>,
24 source_threads: Vec<SourceIoThread>,
25 source_names: Vec<String>,
26 routing: Vec<usize>,
28 next_core: usize,
30 has_new_data: Arc<std::sync::atomic::AtomicBool>,
31 output_notify: Arc<tokio::sync::Notify>,
32 core_has_source: Vec<bool>,
34}
35
36impl TpcRuntime {
37 pub fn new(config: &TpcConfig) -> Result<Self, TpcError> {
46 config.validate()?;
47
48 let has_new_data = Arc::new(std::sync::atomic::AtomicBool::new(false));
49 let output_notify = Arc::new(tokio::sync::Notify::new());
50 let mut cores = Vec::with_capacity(config.num_cores);
51 for i in 0..config.num_cores {
52 let core_config = CoreConfig {
53 core_id: i,
54 cpu_affinity: if config.cpu_pinning {
55 Some(config.cpu_start + i)
56 } else {
57 None
58 },
59 inbox_capacity: config.inbox_capacity,
60 outbox_capacity: config.outbox_capacity,
61 reactor_config: config.reactor_config.clone(),
62 backpressure: laminar_core::tpc::BackpressureConfig::default(),
63 numa_aware: config.numa_aware,
64 enable_storage_io: config.enable_storage_io,
65 #[cfg(all(target_os = "linux", feature = "io-uring"))]
66 io_uring_config: config.io_uring_config.clone(),
67 };
68 cores.push(CoreHandle::spawn_with_notify(
69 core_config,
70 Vec::new(),
71 Arc::clone(&has_new_data),
72 Arc::clone(&output_notify),
73 )?);
74 }
75
76 let core_count = cores.len();
77 Ok(Self {
78 cores,
79 source_threads: Vec::new(),
80 source_names: Vec::new(),
81 routing: Vec::new(),
82 next_core: 0,
83 has_new_data,
84 output_notify,
85 core_has_source: vec![false; core_count],
86 })
87 }
88
89 pub fn attach_source(
102 &mut self,
103 source_idx: usize,
104 name: String,
105 connector: Box<dyn SourceConnector>,
106 connector_config: ConnectorConfig,
107 pipeline_config: &PipelineConfig,
108 restore_checkpoint: Option<SourceCheckpoint>,
109 ) -> std::io::Result<()> {
110 let core_id = self.next_core % self.cores.len();
111 assert!(
112 !self.core_has_source[core_id],
113 "SPSC violation: core {core_id} already has a source attached. \
114 Ensure num_cores >= num_sources."
115 );
116 self.core_has_source[core_id] = true;
117 self.next_core += 1;
118
119 let target_inbox = Arc::clone(self.cores[core_id].inbox());
120 let core_thread = self.cores[core_id].core_thread_handle().clone();
121
122 let io_thread = SourceIoThread::spawn(
123 source_idx,
124 name.clone(),
125 connector,
126 connector_config,
127 target_inbox,
128 pipeline_config.max_poll_records,
129 pipeline_config.fallback_poll_interval,
130 core_thread,
131 restore_checkpoint,
132 pipeline_config.delivery_guarantee,
133 )?;
134
135 self.source_threads.push(io_thread);
136 self.source_names.push(name);
137 self.routing.push(core_id);
138 Ok(())
139 }
140
141 pub fn poll_all_outputs(&self, buffer: &mut Vec<TaggedOutput>) -> usize {
143 let mut total = 0;
144 for core in &self.cores {
145 total += core.poll_outputs_into(buffer, 4096);
146 }
147 total
148 }
149
150 #[must_use]
152 pub fn injector(&self, source_idx: usize) -> &CheckpointBarrierInjector {
153 &self.source_threads[source_idx].injector
154 }
155
156 #[must_use]
158 pub fn source_checkpoint(&self, source_idx: usize) -> SourceCheckpoint {
159 self.source_threads[source_idx]
160 .checkpoint_rx
161 .borrow()
162 .clone()
163 }
164
165 #[must_use]
170 pub fn snapshot_all_source_checkpoints(
171 &self,
172 ) -> rustc_hash::FxHashMap<String, SourceCheckpoint> {
173 self.source_threads
174 .iter()
175 .zip(self.source_names.iter())
176 .map(|(thread, name)| (name.clone(), thread.checkpoint_rx.borrow().clone()))
177 .collect()
178 }
179
180 #[must_use]
184 pub fn failed_sources(&self) -> Vec<String> {
185 self.source_threads
186 .iter()
187 .zip(self.source_names.iter())
188 .filter(|(thread, _)| thread.has_failed())
189 .map(|(_, name)| name.clone())
190 .collect()
191 }
192
193 #[must_use]
195 pub fn source_metrics(&self, source_idx: usize) -> &Arc<SourceIoMetrics> {
196 &self.source_threads[source_idx].metrics
197 }
198
199 pub fn shutdown(&mut self) -> Vec<(String, Box<dyn SourceConnector>)> {
201 let mut connectors = Vec::new();
202
203 for (i, thread) in self.source_threads.iter_mut().enumerate() {
205 if let Some(connector) = thread.shutdown_and_join() {
206 connectors.push((self.source_names[i].clone(), connector));
207 }
208 }
209
210 for core in &self.cores {
212 core.shutdown();
213 }
214 self.cores.drain(..);
216
217 connectors
218 }
219
220 #[must_use]
222 pub fn num_cores(&self) -> usize {
223 self.cores.len()
224 }
225
226 #[must_use]
228 pub fn source_names(&self) -> &[String] {
229 &self.source_names
230 }
231
232 #[must_use]
234 pub fn num_sources(&self) -> usize {
235 self.source_threads.len()
236 }
237
238 pub fn drain_all_io_completions(&self, out: &mut Vec<IoCompletion>) {
244 for core in &self.cores {
245 core.drain_io_completions(out);
246 }
247 }
248
249 #[must_use]
254 pub fn output_signal(&self) -> (Arc<std::sync::atomic::AtomicBool>, Arc<tokio::sync::Notify>) {
255 (
256 Arc::clone(&self.has_new_data),
257 Arc::clone(&self.output_notify),
258 )
259 }
260}
261
262impl std::fmt::Debug for TpcRuntime {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 f.debug_struct("TpcRuntime")
265 .field("num_cores", &self.cores.len())
266 .field("num_sources", &self.source_threads.len())
267 .field("source_names", &self.source_names)
268 .field("routing", &self.routing)
269 .finish_non_exhaustive()
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use laminar_core::tpc::TpcConfig;
277
278 #[test]
279 fn test_tpc_runtime_creation() {
280 let config = TpcConfig {
281 num_cores: 2,
282 cpu_pinning: false,
283 ..Default::default()
284 };
285 let runtime = TpcRuntime::new(&config).unwrap();
286 assert_eq!(runtime.num_cores(), 2);
287 assert_eq!(runtime.num_sources(), 0);
288 }
289
290 #[test]
291 fn test_tpc_runtime_invalid_config() {
292 let config = TpcConfig {
293 num_cores: 0,
294 ..Default::default()
295 };
296 assert!(TpcRuntime::new(&config).is_err());
297 }
298
299 #[test]
300 fn test_tpc_runtime_poll_empty() {
301 let config = TpcConfig {
302 num_cores: 1,
303 cpu_pinning: false,
304 ..Default::default()
305 };
306 let runtime = TpcRuntime::new(&config).unwrap();
307 let mut buffer = Vec::new();
308 let count = runtime.poll_all_outputs(&mut buffer);
309 assert_eq!(count, 0);
310 }
311
312 #[test]
313 fn test_tpc_runtime_debug() {
314 let config = TpcConfig {
315 num_cores: 1,
316 cpu_pinning: false,
317 ..Default::default()
318 };
319 let runtime = TpcRuntime::new(&config).unwrap();
320 let debug = format!("{runtime:?}");
321 assert!(debug.contains("TpcRuntime"));
322 assert!(debug.contains("num_cores"));
323 }
324}