Skip to main content

laminar_db/pipeline/
tpc_runtime.rs

1//! Thread-per-core runtime managing N `CoreHandle`s and their source I/O threads.
2//!
3//! The `TpcRuntime` is the entry point for TPC mode. It spawns core threads,
4//! attaches source connectors via [`SourceIoThread`]s, and provides a unified
5//! interface for polling all core outboxes.
6#![allow(clippy::disallowed_types)] // cold path
7
8use std::sync::Arc;
9
10use laminar_connectors::checkpoint::SourceCheckpoint;
11use laminar_connectors::config::ConnectorConfig;
12use laminar_connectors::connector::SourceConnector;
13use laminar_core::checkpoint::CheckpointBarrierInjector;
14use laminar_core::storage_io::IoCompletion;
15use laminar_core::tpc::TaggedOutput;
16use laminar_core::tpc::{CoreConfig, CoreHandle, TpcConfig, TpcError};
17
18use super::config::PipelineConfig;
19use super::source_adapter::{SourceIoMetrics, SourceIoThread};
20
21/// Thread-per-core runtime managing N `CoreHandle`s and their source I/O threads.
22pub struct TpcRuntime {
23    cores: Vec<CoreHandle>,
24    source_threads: Vec<SourceIoThread>,
25    source_names: Vec<String>,
26    /// `source_idx` → `core_id` routing (round-robin for now).
27    routing: Vec<usize>,
28    /// Round-robin counter for source attachment.
29    next_core: usize,
30    has_new_data: Arc<std::sync::atomic::AtomicBool>,
31    output_notify: Arc<tokio::sync::Notify>,
32    /// Tracks whether each core already has a source attached (SPSC invariant).
33    core_has_source: Vec<bool>,
34}
35
36impl TpcRuntime {
37    /// Create a new TPC runtime with the given configuration.
38    ///
39    /// Spawns `config.num_cores` core threads immediately.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the configuration is invalid or core threads
44    /// fail to spawn.
45    pub fn new(config: &TpcConfig) -> Result<Self, TpcError> {
46        config.validate()?;
47
48        let has_new_data = Arc::new(std::sync::atomic::AtomicBool::new(false));
49        let output_notify = Arc::new(tokio::sync::Notify::new());
50        let mut cores = Vec::with_capacity(config.num_cores);
51        for i in 0..config.num_cores {
52            let core_config = CoreConfig {
53                core_id: i,
54                cpu_affinity: if config.cpu_pinning {
55                    Some(config.cpu_start + i)
56                } else {
57                    None
58                },
59                inbox_capacity: config.inbox_capacity,
60                outbox_capacity: config.outbox_capacity,
61                reactor_config: config.reactor_config.clone(),
62                backpressure: laminar_core::tpc::BackpressureConfig::default(),
63                numa_aware: config.numa_aware,
64                enable_storage_io: config.enable_storage_io,
65                #[cfg(all(target_os = "linux", feature = "io-uring"))]
66                io_uring_config: config.io_uring_config.clone(),
67            };
68            cores.push(CoreHandle::spawn_with_notify(
69                core_config,
70                Vec::new(),
71                Arc::clone(&has_new_data),
72                Arc::clone(&output_notify),
73            )?);
74        }
75
76        let core_count = cores.len();
77        Ok(Self {
78            cores,
79            source_threads: Vec::new(),
80            source_names: Vec::new(),
81            routing: Vec::new(),
82            next_core: 0,
83            has_new_data,
84            output_notify,
85            core_has_source: vec![false; core_count],
86        })
87    }
88
89    /// Attach a source to a core, spawning an I/O thread.
90    ///
91    /// Sources are assigned to cores in round-robin order.
92    ///
93    /// # Panics
94    ///
95    /// Panics if a core already has a source attached (SPSC invariant).
96    /// Ensure `num_cores >= num_sources`.
97    ///
98    /// # Errors
99    ///
100    /// Returns `std::io::Error` if the source I/O thread cannot be spawned.
101    pub fn attach_source(
102        &mut self,
103        source_idx: usize,
104        name: String,
105        connector: Box<dyn SourceConnector>,
106        connector_config: ConnectorConfig,
107        pipeline_config: &PipelineConfig,
108        restore_checkpoint: Option<SourceCheckpoint>,
109    ) -> std::io::Result<()> {
110        let core_id = self.next_core % self.cores.len();
111        assert!(
112            !self.core_has_source[core_id],
113            "SPSC violation: core {core_id} already has a source attached. \
114             Ensure num_cores >= num_sources."
115        );
116        self.core_has_source[core_id] = true;
117        self.next_core += 1;
118
119        let target_inbox = Arc::clone(self.cores[core_id].inbox());
120        let core_thread = self.cores[core_id].core_thread_handle().clone();
121
122        let io_thread = SourceIoThread::spawn(
123            source_idx,
124            name.clone(),
125            connector,
126            connector_config,
127            target_inbox,
128            pipeline_config.max_poll_records,
129            pipeline_config.fallback_poll_interval,
130            core_thread,
131            restore_checkpoint,
132            pipeline_config.delivery_guarantee,
133        )?;
134
135        self.source_threads.push(io_thread);
136        self.source_names.push(name);
137        self.routing.push(core_id);
138        Ok(())
139    }
140
141    /// Drain all core outboxes into the buffer. Returns total outputs collected.
142    pub fn poll_all_outputs(&self, buffer: &mut Vec<TaggedOutput>) -> usize {
143        let mut total = 0;
144        for core in &self.cores {
145            total += core.poll_outputs_into(buffer, 4096);
146        }
147        total
148    }
149
150    /// Get the barrier injector for a source (used by coordinator for checkpoint).
151    #[must_use]
152    pub fn injector(&self, source_idx: usize) -> &CheckpointBarrierInjector {
153        &self.source_threads[source_idx].injector
154    }
155
156    /// Get checkpoint snapshot for a source (lock-free watch read).
157    #[must_use]
158    pub fn source_checkpoint(&self, source_idx: usize) -> SourceCheckpoint {
159        self.source_threads[source_idx]
160            .checkpoint_rx
161            .borrow()
162            .clone()
163    }
164
165    /// Snapshot all source checkpoints (lock-free watch reads).
166    ///
167    /// Returns a map of source name → current checkpoint. Used by
168    /// timer-based checkpoints to capture source offsets without barriers.
169    #[must_use]
170    pub fn snapshot_all_source_checkpoints(
171        &self,
172    ) -> rustc_hash::FxHashMap<String, SourceCheckpoint> {
173        self.source_threads
174            .iter()
175            .zip(self.source_names.iter())
176            .map(|(thread, name)| (name.clone(), thread.checkpoint_rx.borrow().clone()))
177            .collect()
178    }
179
180    /// Returns names of sources whose I/O threads exited without starting
181    /// (open or restore failed). Empty means all sources are healthy or
182    /// still initializing.
183    #[must_use]
184    pub fn failed_sources(&self) -> Vec<String> {
185        self.source_threads
186            .iter()
187            .zip(self.source_names.iter())
188            .filter(|(thread, _)| thread.has_failed())
189            .map(|(_, name)| name.clone())
190            .collect()
191    }
192
193    /// Get metrics for a source.
194    #[must_use]
195    pub fn source_metrics(&self, source_idx: usize) -> &Arc<SourceIoMetrics> {
196        &self.source_threads[source_idx].metrics
197    }
198
199    /// Shutdown all I/O threads and core handles. Returns connectors for `close()`.
200    pub fn shutdown(&mut self) -> Vec<(String, Box<dyn SourceConnector>)> {
201        let mut connectors = Vec::new();
202
203        // Shutdown source I/O threads first (they push to core inboxes)
204        for (i, thread) in self.source_threads.iter_mut().enumerate() {
205            if let Some(connector) = thread.shutdown_and_join() {
206                connectors.push((self.source_names[i].clone(), connector));
207            }
208        }
209
210        // Shutdown core threads and join them (Drop impl joins on take)
211        for core in &self.cores {
212            core.shutdown();
213        }
214        // Drain cores to trigger Drop (which joins the threads)
215        self.cores.drain(..);
216
217        connectors
218    }
219
220    /// Returns the number of cores.
221    #[must_use]
222    pub fn num_cores(&self) -> usize {
223        self.cores.len()
224    }
225
226    /// Returns the source names.
227    #[must_use]
228    pub fn source_names(&self) -> &[String] {
229        &self.source_names
230    }
231
232    /// Returns the number of attached sources.
233    #[must_use]
234    pub fn num_sources(&self) -> usize {
235        self.source_threads.len()
236    }
237
238    /// Drain storage I/O completions from all cores into `out`.
239    ///
240    /// Called by the checkpoint coordinator (Ring 2) before checking WAL
241    /// sync status. Collects completions that Ring 0 pushed after polling
242    /// its `StorageIo` backend.
243    pub fn drain_all_io_completions(&self, out: &mut Vec<IoCompletion>) {
244        for core in &self.cores {
245            core.drain_io_completions(out);
246        }
247    }
248
249    /// Returns the shared output signaling handles.
250    ///
251    /// The coordinator awaits `notified()` on the `Notify` and clears
252    /// the `AtomicBool` after each drain cycle.
253    #[must_use]
254    pub fn output_signal(&self) -> (Arc<std::sync::atomic::AtomicBool>, Arc<tokio::sync::Notify>) {
255        (
256            Arc::clone(&self.has_new_data),
257            Arc::clone(&self.output_notify),
258        )
259    }
260}
261
262impl std::fmt::Debug for TpcRuntime {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        f.debug_struct("TpcRuntime")
265            .field("num_cores", &self.cores.len())
266            .field("num_sources", &self.source_threads.len())
267            .field("source_names", &self.source_names)
268            .field("routing", &self.routing)
269            .finish_non_exhaustive()
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use laminar_core::tpc::TpcConfig;
277
278    #[test]
279    fn test_tpc_runtime_creation() {
280        let config = TpcConfig {
281            num_cores: 2,
282            cpu_pinning: false,
283            ..Default::default()
284        };
285        let runtime = TpcRuntime::new(&config).unwrap();
286        assert_eq!(runtime.num_cores(), 2);
287        assert_eq!(runtime.num_sources(), 0);
288    }
289
290    #[test]
291    fn test_tpc_runtime_invalid_config() {
292        let config = TpcConfig {
293            num_cores: 0,
294            ..Default::default()
295        };
296        assert!(TpcRuntime::new(&config).is_err());
297    }
298
299    #[test]
300    fn test_tpc_runtime_poll_empty() {
301        let config = TpcConfig {
302            num_cores: 1,
303            cpu_pinning: false,
304            ..Default::default()
305        };
306        let runtime = TpcRuntime::new(&config).unwrap();
307        let mut buffer = Vec::new();
308        let count = runtime.poll_all_outputs(&mut buffer);
309        assert_eq!(count, 0);
310    }
311
312    #[test]
313    fn test_tpc_runtime_debug() {
314        let config = TpcConfig {
315            num_cores: 1,
316            cpu_pinning: false,
317            ..Default::default()
318        };
319        let runtime = TpcRuntime::new(&config).unwrap();
320        let debug = format!("{runtime:?}");
321        assert!(debug.contains("TpcRuntime"));
322        assert!(debug.contains("num_cores"));
323    }
324}