Skip to main content

laminar_core/tpc/
core_handle.rs

1//! # Core Handle
2//!
3//! Manages a single core's reactor thread with SPSC queue communication.
4//!
5//! ## Architecture
6//!
7//! Each `CoreHandle` spawns a dedicated thread that:
8//! 1. Sets CPU affinity to pin to a specific core
9//! 2. Creates a `Reactor` with its own state partition
10//! 3. Optionally initializes an `io_uring` ring for async I/O (Linux only)
11//! 4. Drains the inbox SPSC queue for incoming events
12//! 5. Processes events through the reactor
13//! 6. Pushes outputs to the outbox SPSC queue
14//!
15//! ## Communication
16//!
17//! - **Inbox**: Main thread → Core thread (events, watermarks, commands)
18//! - **Outbox**: Core thread → Main thread (outputs)
19//!
20//! Both use lock-free SPSC queues for minimal latency.
21//!
22//! ## `io_uring` Integration
23//!
24//! On Linux with the `io-uring` feature enabled, each core thread can have its own
25//! `io_uring` ring for high-performance async I/O. This enables:
26//! - SQPOLL mode for syscall-free submission
27//! - Registered buffers for zero-copy I/O
28//! - Per-core I/O isolation for thread-per-core architecture
29
30use std::panic::{catch_unwind, AssertUnwindSafe};
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::Arc;
33use std::thread::{self, JoinHandle};
34
35use tokio::sync::Notify;
36
37#[cfg(all(target_os = "linux", feature = "io-uring"))]
38use crate::io_uring::IoUringConfig;
39
40use crate::alloc::HotPathGuard;
41use crate::budget::TaskBudget;
42use crate::checkpoint::CheckpointBarrier;
43use crate::numa::NumaTopology;
44use crate::operator::{CheckpointCompleteData, Event, Operator, OperatorState, Output};
45use crate::reactor::{Reactor, ReactorConfig};
46use crate::storage_io::{IoCompletion, StorageIo, SyncStorageIo};
47
48use super::backpressure::{
49    BackpressureConfig, CreditAcquireResult, CreditGate, CreditMetrics, OverflowStrategy,
50};
51use super::spsc::SpscQueue;
52use super::TpcError;
53
54/// Messages sent to a core thread.
55#[derive(Debug)]
56pub enum CoreMessage {
57    /// Process an event from a specific source.
58    Event {
59        /// Index of the source that produced this event.
60        source_idx: usize,
61        /// The event data.
62        event: Event,
63    },
64    /// Watermark advancement
65    Watermark(i64),
66    /// Request a checkpoint
67    CheckpointRequest(u64),
68    /// Checkpoint barrier from a source.
69    ///
70    /// The core thread flushes the reactor before forwarding this
71    /// as `Output::Barrier` so the coordinator can track alignment.
72    Barrier {
73        /// Index of the source that emitted this barrier.
74        source_idx: usize,
75        /// The checkpoint barrier.
76        barrier: CheckpointBarrier,
77    },
78    /// Graceful shutdown
79    Shutdown,
80}
81
82/// Output tagged with its source index for coordinator routing.
83///
84/// Core threads wrap every `Output` in a `TaggedOutput` so the
85/// coordinator can map outputs back to the originating source.
86#[derive(Debug)]
87pub struct TaggedOutput {
88    /// Index of the source that produced this output.
89    pub source_idx: usize,
90    /// The operator output.
91    pub output: Output,
92}
93
94/// Configuration for a core handle.
95#[derive(Debug, Clone)]
96pub struct CoreConfig {
97    /// Core ID (0-indexed)
98    pub core_id: usize,
99    /// CPU ID to pin to (None = no pinning)
100    pub cpu_affinity: Option<usize>,
101    /// Inbox queue capacity
102    pub inbox_capacity: usize,
103    /// Outbox queue capacity
104    pub outbox_capacity: usize,
105    /// Reactor configuration
106    pub reactor_config: ReactorConfig,
107    /// Backpressure configuration
108    pub backpressure: BackpressureConfig,
109    /// Enable NUMA-aware memory allocation
110    pub numa_aware: bool,
111    /// Enable per-core storage I/O backend.
112    ///
113    /// When true, creates a [`StorageIo`] instance for this core. On Linux
114    /// with the `io-uring` feature and a supported kernel, this uses
115    /// `io_uring` with SQPOLL. Otherwise falls back to synchronous I/O.
116    pub enable_storage_io: bool,
117    /// `io_uring` configuration (Linux only, requires `io-uring` feature)
118    #[cfg(all(target_os = "linux", feature = "io-uring"))]
119    pub io_uring_config: Option<IoUringConfig>,
120}
121
122impl Default for CoreConfig {
123    fn default() -> Self {
124        Self {
125            core_id: 0,
126            cpu_affinity: None,
127            inbox_capacity: 8192,
128            outbox_capacity: 8192,
129            reactor_config: ReactorConfig::default(),
130            backpressure: BackpressureConfig::default(),
131            numa_aware: false,
132            enable_storage_io: false,
133            #[cfg(all(target_os = "linux", feature = "io-uring"))]
134            io_uring_config: None,
135        }
136    }
137}
138
139/// Handle to a core's reactor thread.
140///
141/// Provides lock-free communication with the core via SPSC queues
142/// and credit-based flow control for backpressure.
143pub struct CoreHandle {
144    /// Core ID
145    core_id: usize,
146    /// NUMA node for this core
147    numa_node: usize,
148    /// Inbox queue (main thread writes, core reads)
149    inbox: Arc<SpscQueue<CoreMessage>>,
150    /// Outbox queue (core writes, main thread reads)
151    outbox: Arc<SpscQueue<TaggedOutput>>,
152    /// Storage I/O completion queue (Ring 0 writes, Ring 2 reads).
153    /// Ring 0 pushes completions from `StorageIo::poll_completions`.
154    /// Ring 2 drains during checkpoint to feed `PerCoreWalManager::check_all_completions`.
155    io_completion_outbox: Arc<SpscQueue<IoCompletion>>,
156    /// Credit gate for backpressure (sender acquires, receiver releases)
157    credit_gate: Arc<CreditGate>,
158    /// Thread handle (None if thread hasn't started or has been joined)
159    thread: Option<JoinHandle<Result<(), TpcError>>>,
160    /// Core thread's `Thread` handle for `unpark()` wakeups.
161    core_thread_handle: thread::Thread,
162    /// Shutdown flag
163    shutdown: Arc<AtomicBool>,
164    /// Events processed counter
165    events_processed: Arc<AtomicU64>,
166    /// Outputs dropped due to full outbox
167    outputs_dropped: Arc<AtomicU64>,
168    /// Running state
169    is_running: Arc<AtomicBool>,
170}
171
172impl CoreHandle {
173    /// Spawns a new core thread with the given configuration.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the thread cannot be spawned.
178    pub fn spawn(config: CoreConfig) -> Result<Self, TpcError> {
179        let notify = Arc::new(Notify::new());
180        let flag = Arc::new(AtomicBool::new(false));
181        Self::spawn_with_notify(config, Vec::new(), flag, notify)
182    }
183
184    /// Spawns a new core thread with operators.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the thread cannot be spawned.
189    #[allow(clippy::needless_pass_by_value)]
190    pub fn spawn_with_operators(
191        config: CoreConfig,
192        operators: Vec<Box<dyn Operator>>,
193    ) -> Result<Self, TpcError> {
194        let notify = Arc::new(Notify::new());
195        let flag = Arc::new(AtomicBool::new(false));
196        Self::spawn_with_notify(config, operators, flag, notify)
197    }
198
199    /// Spawns a new core thread with operators and shared signaling.
200    ///
201    /// `has_new_data` is a lock-free flag set by the core thread when
202    /// outputs are available. Only the false→true transition triggers
203    /// `output_notify.notify_one()`, ensuring at most one mutex
204    /// acquisition per coordinator drain cycle across all cores.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if the thread cannot be spawned.
209    #[allow(clippy::needless_pass_by_value)]
210    pub fn spawn_with_notify(
211        config: CoreConfig,
212        operators: Vec<Box<dyn Operator>>,
213        has_new_data: Arc<AtomicBool>,
214        output_notify: Arc<Notify>,
215    ) -> Result<Self, TpcError> {
216        let core_id = config.core_id;
217        let cpu_affinity = config.cpu_affinity;
218        let reactor_config = config.reactor_config.clone();
219
220        // Detect NUMA topology for NUMA-aware allocation
221        let topology = NumaTopology::detect();
222        let numa_node =
223            cpu_affinity.map_or_else(|| topology.current_node(), |cpu| topology.node_for_cpu(cpu));
224
225        let inbox = Arc::new(SpscQueue::new(config.inbox_capacity));
226        let outbox: Arc<SpscQueue<TaggedOutput>> = Arc::new(SpscQueue::new(config.outbox_capacity));
227        // I/O completion queue: Ring 0 produces, Ring 2 consumes during checkpoint.
228        // Capacity 256 matches default io_uring ring_entries.
229        let io_completion_outbox: Arc<SpscQueue<IoCompletion>> = Arc::new(SpscQueue::new(256));
230        let credit_gate = Arc::new(CreditGate::new(config.backpressure.clone()));
231        let shutdown = Arc::new(AtomicBool::new(false));
232        let events_processed = Arc::new(AtomicU64::new(0));
233        let outputs_dropped = Arc::new(AtomicU64::new(0));
234        let is_running = Arc::new(AtomicBool::new(false));
235
236        // Create per-core storage I/O backend (cold path — allocation is fine here).
237        let storage_io: Option<Box<dyn StorageIo>> = if config.enable_storage_io {
238            // On Linux with io-uring feature, try io_uring first.
239            #[cfg(all(target_os = "linux", feature = "io-uring"))]
240            {
241                if let Some(ref uring_cfg) = config.io_uring_config {
242                    match crate::storage_io::UringStorageIo::new(core_id, uring_cfg) {
243                        Ok(backend) => {
244                            tracing::info!(
245                                "Core {core_id}: using io_uring storage I/O (SQPOLL={})",
246                                backend.uses_sqpoll()
247                            );
248                            Some(Box::new(backend))
249                        }
250                        Err(e) => {
251                            tracing::warn!(
252                                "Core {core_id}: io_uring init failed ({e}), falling back to sync"
253                            );
254                            Some(Box::new(SyncStorageIo::new()))
255                        }
256                    }
257                } else {
258                    Some(Box::new(SyncStorageIo::new()))
259                }
260            }
261            // On non-Linux or without io-uring feature, use sync backend.
262            #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
263            {
264                Some(Box::new(SyncStorageIo::new()))
265            }
266        } else {
267            None
268        };
269
270        let thread_context = CoreThreadContext {
271            core_id,
272            cpu_affinity,
273            reactor_config,
274            numa_aware: config.numa_aware,
275            numa_node,
276            #[cfg(target_os = "linux")]
277            numa_topology: topology,
278            inbox: Arc::clone(&inbox),
279            outbox: Arc::clone(&outbox),
280            credit_gate: Arc::clone(&credit_gate),
281            shutdown: Arc::clone(&shutdown),
282            events_processed: Arc::clone(&events_processed),
283            outputs_dropped: Arc::clone(&outputs_dropped),
284            is_running: Arc::clone(&is_running),
285            has_new_data: Arc::clone(&has_new_data),
286            output_notify: Arc::clone(&output_notify),
287        };
288
289        let thread = thread::Builder::new()
290            .name(format!("laminar-core-{core_id}"))
291            .spawn({
292                let io_cq = Arc::clone(&io_completion_outbox);
293                move || core_thread_main(&thread_context, operators, storage_io, io_cq)
294            })
295            .map_err(|e| TpcError::SpawnFailed {
296                core_id,
297                message: e.to_string(),
298            })?;
299
300        // Capture the thread handle for unpark() wakeups before we store
301        // the JoinHandle (which consumes it on join).
302        let core_thread_handle = thread.thread().clone();
303
304        // Wait for thread to signal it's running
305        while !is_running.load(Ordering::Acquire) {
306            thread::yield_now();
307        }
308
309        Ok(Self {
310            core_id,
311            numa_node,
312            inbox,
313            outbox,
314            io_completion_outbox,
315            credit_gate,
316            thread: Some(thread),
317            core_thread_handle,
318            shutdown,
319            events_processed,
320            outputs_dropped,
321            is_running,
322        })
323    }
324
325    /// Returns the core ID.
326    #[must_use]
327    pub fn core_id(&self) -> usize {
328        self.core_id
329    }
330
331    /// Returns the NUMA node for this core.
332    #[must_use]
333    pub fn numa_node(&self) -> usize {
334        self.numa_node
335    }
336
337    /// Returns true if the core thread is running.
338    #[must_use]
339    pub fn is_running(&self) -> bool {
340        self.is_running.load(Ordering::Acquire)
341    }
342
343    /// Returns the core thread's handle for `unpark()` wakeups.
344    ///
345    /// Source I/O threads call `unpark()` on this handle after pushing
346    /// to the inbox SPSC queue, waking the core from `park_timeout()`.
347    #[must_use]
348    pub fn core_thread_handle(&self) -> &thread::Thread {
349        &self.core_thread_handle
350    }
351
352    /// Returns the number of events processed by this core.
353    #[must_use]
354    pub fn events_processed(&self) -> u64 {
355        self.events_processed.load(Ordering::Relaxed)
356    }
357
358    /// Returns the number of outputs dropped due to a full outbox.
359    #[must_use]
360    pub fn outputs_dropped(&self) -> u64 {
361        self.outputs_dropped.load(Ordering::Relaxed)
362    }
363
364    /// Sends a message to the core with credit-based flow control.
365    ///
366    /// This method respects the backpressure configuration:
367    /// - `Block`: Spins until credits available, then sends
368    /// - `Drop`: Returns Ok but drops the message if no credits
369    /// - `Error`: Returns error if no credits available
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the inbox queue is full or credits exhausted (with Error strategy).
374    pub fn send(&self, message: CoreMessage) -> Result<(), TpcError> {
375        // Try to acquire a credit
376        match self.credit_gate.try_acquire() {
377            CreditAcquireResult::Acquired => {
378                // Have credit, try to push
379                self.inbox.push(message).map_err(|_| TpcError::QueueFull {
380                    core_id: self.core_id,
381                })
382            }
383            CreditAcquireResult::WouldBlock => {
384                // Check overflow strategy
385                if self.credit_gate.config().overflow_strategy == OverflowStrategy::Block {
386                    // Spin until we get a credit
387                    self.credit_gate.acquire_blocking(1);
388                    self.inbox.push(message).map_err(|_| TpcError::QueueFull {
389                        core_id: self.core_id,
390                    })
391                } else {
392                    // Error strategy - return error
393                    Err(TpcError::Backpressure {
394                        core_id: self.core_id,
395                    })
396                }
397            }
398            CreditAcquireResult::Dropped => {
399                // Drop strategy - silently drop, already recorded in metrics
400                Ok(())
401            }
402        }
403    }
404
405    /// Tries to send a message without blocking.
406    ///
407    /// Returns `Err` if no credits available or queue is full.
408    /// Does not block regardless of overflow strategy.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if credits exhausted or queue full.
413    pub fn try_send(&self, message: CoreMessage) -> Result<(), TpcError> {
414        match self.credit_gate.try_acquire() {
415            CreditAcquireResult::Acquired => {
416                self.inbox.push(message).map_err(|_| TpcError::QueueFull {
417                    core_id: self.core_id,
418                })
419            }
420            CreditAcquireResult::WouldBlock | CreditAcquireResult::Dropped => {
421                Err(TpcError::Backpressure {
422                    core_id: self.core_id,
423                })
424            }
425        }
426    }
427
428    /// Sends an event to the core with credit-based flow control.
429    ///
430    /// The `source_idx` identifies which source produced this event,
431    /// allowing the coordinator to route outputs back to the correct source.
432    ///
433    /// # Errors
434    ///
435    /// Returns an error if the inbox queue is full or backpressure applies.
436    pub fn send_event(&self, source_idx: usize, event: Event) -> Result<(), TpcError> {
437        self.send(CoreMessage::Event { source_idx, event })
438    }
439
440    /// Tries to send an event without blocking.
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if credits exhausted or queue full.
445    pub fn try_send_event(&self, source_idx: usize, event: Event) -> Result<(), TpcError> {
446        self.try_send(CoreMessage::Event { source_idx, event })
447    }
448
449    /// Returns a reference to the inbox queue (for direct SPSC push from I/O threads).
450    #[must_use]
451    pub fn inbox(&self) -> &Arc<SpscQueue<CoreMessage>> {
452        &self.inbox
453    }
454
455    /// Returns a reference to the outbox queue (for direct SPSC pop).
456    #[must_use]
457    pub fn outbox(&self) -> &Arc<SpscQueue<TaggedOutput>> {
458        &self.outbox
459    }
460
461    /// Polls the outbox for tagged outputs.
462    ///
463    /// Returns up to `max_count` outputs, each tagged with the source index.
464    ///
465    /// # Note
466    ///
467    /// This method allocates memory. For zero-allocation polling, use
468    /// [`poll_outputs_into`](Self::poll_outputs_into) or [`poll_each`](Self::poll_each) instead.
469    #[must_use]
470    pub fn poll_outputs(&self, max_count: usize) -> Vec<TaggedOutput> {
471        self.outbox.pop_batch(max_count)
472    }
473
474    /// Polls the outbox for tagged outputs into a pre-allocated buffer (zero-allocation).
475    ///
476    /// Outputs are appended to `buffer`. Returns the number of outputs added.
477    /// The buffer should have sufficient capacity to avoid reallocation.
478    #[inline]
479    pub fn poll_outputs_into(&self, buffer: &mut Vec<TaggedOutput>, max_count: usize) -> usize {
480        let start_len = buffer.len();
481
482        self.outbox.pop_each(max_count, |output| {
483            buffer.push(output);
484            true
485        });
486
487        buffer.len() - start_len
488    }
489
490    /// Polls the outbox with a callback for each tagged output (zero-allocation).
491    ///
492    /// Processing stops when:
493    /// - `max_count` outputs have been processed
494    /// - The outbox becomes empty
495    /// - The callback returns `false`
496    ///
497    /// Returns the number of outputs processed.
498    #[inline]
499    pub fn poll_each<F>(&self, max_count: usize, f: F) -> usize
500    where
501        F: FnMut(TaggedOutput) -> bool,
502    {
503        self.outbox.pop_each(max_count, f)
504    }
505
506    /// Polls a single tagged output from the outbox.
507    #[must_use]
508    pub fn poll_output(&self) -> Option<TaggedOutput> {
509        self.outbox.pop()
510    }
511
512    /// Drain storage I/O completions from this core.
513    ///
514    /// Ring 0 pushes completions from `StorageIo::poll_completions` into a
515    /// per-core SPSC queue. Ring 2 (checkpoint coordinator) calls this to
516    /// retrieve them and pass to `PerCoreWalManager::check_all_completions`.
517    pub fn drain_io_completions(&self, out: &mut Vec<IoCompletion>) {
518        self.io_completion_outbox.pop_each(256, |c| {
519            out.push(c);
520            true
521        });
522    }
523
524    /// Returns the number of pending messages in the inbox.
525    #[must_use]
526    pub fn inbox_len(&self) -> usize {
527        self.inbox.len()
528    }
529
530    /// Returns the number of pending outputs in the outbox.
531    #[must_use]
532    pub fn outbox_len(&self) -> usize {
533        self.outbox.len()
534    }
535
536    /// Returns true if backpressure is currently active.
537    #[must_use]
538    pub fn is_backpressured(&self) -> bool {
539        self.credit_gate.is_backpressured()
540    }
541
542    /// Returns the number of available credits.
543    #[must_use]
544    pub fn available_credits(&self) -> usize {
545        self.credit_gate.available()
546    }
547
548    /// Returns the maximum credits configured.
549    #[must_use]
550    pub fn max_credits(&self) -> usize {
551        self.credit_gate.max_credits()
552    }
553
554    /// Returns the credit metrics.
555    #[must_use]
556    pub fn credit_metrics(&self) -> &CreditMetrics {
557        self.credit_gate.metrics()
558    }
559
560    /// Signals the core to shut down gracefully.
561    pub fn shutdown(&self) {
562        self.shutdown.store(true, Ordering::Release);
563        // Also send a shutdown message to wake up the thread
564        let _ = self.inbox.push(CoreMessage::Shutdown);
565        // Unpark the core thread in case it's sleeping in park_timeout()
566        self.core_thread_handle.unpark();
567    }
568
569    /// Waits for the core thread to finish.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if the thread panicked or returned an error.
574    pub fn join(mut self) -> Result<(), TpcError> {
575        if let Some(handle) = self.thread.take() {
576            handle.join().map_err(|_| TpcError::SpawnFailed {
577                core_id: self.core_id,
578                message: "Thread panicked".to_string(),
579            })?
580        } else {
581            Ok(())
582        }
583    }
584
585    /// Sends a shutdown signal and waits for the thread to finish.
586    ///
587    /// # Errors
588    ///
589    /// Returns an error if the thread cannot be joined cleanly.
590    pub fn shutdown_and_join(self) -> Result<(), TpcError> {
591        self.shutdown();
592        self.join()
593    }
594}
595
596impl Drop for CoreHandle {
597    fn drop(&mut self) {
598        // Signal shutdown if not already done
599        self.shutdown.store(true, Ordering::Release);
600        // Try to send shutdown message (may fail if queue is full, that's OK)
601        let _ = self.inbox.push(CoreMessage::Shutdown);
602        // Unpark the core thread in case it's sleeping in park_timeout()
603        self.core_thread_handle.unpark();
604
605        // Join the thread if we haven't already
606        if let Some(handle) = self.thread.take() {
607            let _ = handle.join();
608        }
609    }
610}
611
612impl std::fmt::Debug for CoreHandle {
613    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614        f.debug_struct("CoreHandle")
615            .field("core_id", &self.core_id)
616            .field("numa_node", &self.numa_node)
617            .field("is_running", &self.is_running())
618            .field("events_processed", &self.events_processed())
619            .field("outputs_dropped", &self.outputs_dropped())
620            .field("inbox_len", &self.inbox_len())
621            .field("outbox_len", &self.outbox_len())
622            .field("available_credits", &self.available_credits())
623            .field("is_backpressured", &self.is_backpressured())
624            .finish_non_exhaustive()
625    }
626}
627
628/// Context passed to the core thread.
629pub(super) struct CoreThreadContext {
630    pub(super) core_id: usize,
631    cpu_affinity: Option<usize>,
632    reactor_config: ReactorConfig,
633    numa_aware: bool,
634    numa_node: usize,
635    #[cfg(target_os = "linux")]
636    numa_topology: NumaTopology,
637    pub(super) inbox: Arc<SpscQueue<CoreMessage>>,
638    pub(super) outbox: Arc<SpscQueue<TaggedOutput>>,
639    pub(super) credit_gate: Arc<CreditGate>,
640    pub(super) shutdown: Arc<AtomicBool>,
641    pub(super) events_processed: Arc<AtomicU64>,
642    pub(super) outputs_dropped: Arc<AtomicU64>,
643    pub(super) is_running: Arc<AtomicBool>,
644    pub(super) has_new_data: Arc<AtomicBool>,
645    pub(super) output_notify: Arc<Notify>,
646}
647
648/// Initializes the core thread: sets CPU affinity, NUMA allocator, `io_uring`, and creates the reactor.
649fn init_core_thread(
650    ctx: &CoreThreadContext,
651    operators: Vec<Box<dyn Operator>>,
652) -> Result<Reactor, TpcError> {
653    // Set CPU affinity if requested
654    if let Some(cpu_id) = ctx.cpu_affinity {
655        set_cpu_affinity(ctx.core_id, cpu_id)?;
656    }
657
658    // Bind memory allocations to local NUMA node (Linux only, after CPU affinity)
659    #[cfg(target_os = "linux")]
660    if ctx.numa_aware {
661        if let Err(e) = ctx.numa_topology.bind_local_memory() {
662            tracing::warn!(core_id = ctx.core_id, ?e, "NUMA memory bind failed");
663        }
664    }
665
666    // Log NUMA information if NUMA-aware mode is enabled
667    if ctx.numa_aware {
668        tracing::info!(
669            "Core {} starting on NUMA node {}",
670            ctx.core_id,
671            ctx.numa_node
672        );
673    }
674
675    // Create the reactor with configured settings
676    let mut reactor_config = ctx.reactor_config.clone();
677    reactor_config.cpu_affinity = ctx.cpu_affinity;
678
679    let mut reactor = Reactor::new(reactor_config).map_err(|e| TpcError::ReactorError {
680        core_id: ctx.core_id,
681        source: e,
682    })?;
683
684    // Add operators
685    for op in operators {
686        reactor.add_operator(op);
687    }
688
689    Ok(reactor)
690}
691
692/// Main function for the core thread.
693#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
694fn core_thread_main(
695    ctx: &CoreThreadContext,
696    operators: Vec<Box<dyn Operator>>,
697    mut storage_io: Option<Box<dyn StorageIo>>,
698    io_completion_outbox: Arc<SpscQueue<IoCompletion>>,
699) -> Result<(), TpcError> {
700    let mut reactor = init_core_thread(ctx, operators)?;
701
702    // Signal that we're running
703    ctx.is_running.store(true, Ordering::Release);
704
705    // Reusable buffer for reactor outputs (avoids per-poll Vec allocation)
706    let mut poll_buffer: Vec<Output> = Vec::with_capacity(256);
707
708    // Reusable buffer for storage I/O completions (zero-alloc across iterations)
709    let mut io_completions: Vec<IoCompletion> = Vec::with_capacity(64);
710
711    // Pre-allocated checkpoint Box reused across checkpoint cycles.
712    // Taken when a CheckpointRequest arrives, replenished after the inbox drain.
713    let mut checkpoint_slot: Option<Box<CheckpointCompleteData>> =
714        Some(Box::new(CheckpointCompleteData {
715            checkpoint_id: 0,
716            operator_states: Vec::new(),
717        }));
718    // Reusable buffer for operator states (avoids Vec alloc per checkpoint)
719    let mut checkpoint_states_buf: Vec<OperatorState> = Vec::new();
720
721    // Track the last source_idx seen for tagging reactor outputs.
722    // When events from multiple sources are interleaved, reactor outputs
723    // are attributed to source 0 (the coordinator resolves this).
724    let mut last_source_idx: usize = 0;
725
726    // Rate-limit submit errors to avoid allocation storms on the hot path.
727    let mut submit_error_count: u64 = 0;
728
729    // Tiered idle strategy: tracks consecutive iterations with no work.
730    // 0..64   → spin_loop() (PAUSE/YIELD hint, ~ns wake)
731    // 64..128 → thread::yield_now() (OS scheduler yield, ~μs wake)
732    // 128+    → thread::park_timeout(1ms) (sleep, woken by unpark())
733    let mut idle_spins: u32 = 0;
734
735    // Wrap the main loop in catch_unwind so an operator panic does not
736    // silently kill this core thread.  On panic we set is_running = false
737    // (the coordinator checks this) and propagate the error.
738    let panic_result = catch_unwind(AssertUnwindSafe(|| -> Result<(), TpcError> {
739        // Main loop
740        loop {
741            // Check for shutdown
742            if ctx.shutdown.load(Ordering::Acquire) {
743                break;
744            }
745
746            // Hot path guard for inbox processing
747            let _guard = HotPathGuard::enter("CoreThread::process_inbox");
748
749            // Task budget tracking for batch processing
750            let batch_budget = TaskBudget::ring0_batch();
751
752            // Drain inbox and track messages processed for credit release
753            let mut had_work = false;
754            let mut messages_processed = 0usize;
755
756            while let Some(message) = ctx.inbox.pop() {
757                match message {
758                    CoreMessage::Event { source_idx, event } => {
759                        last_source_idx = source_idx;
760                        if let Err(e) = reactor.submit(event) {
761                            submit_error_count += 1;
762                            if submit_error_count.is_power_of_two() {
763                                tracing::error!(
764                                    "Core {}: Failed to submit event (n={}): {e}",
765                                    ctx.core_id,
766                                    submit_error_count,
767                                );
768                            }
769                        }
770                        messages_processed += 1;
771                        had_work = true;
772                    }
773                    CoreMessage::Watermark(timestamp) => {
774                        // Advance the reactor's watermark so downstream operators
775                        // see the updated event-time progress on the next poll().
776                        reactor.advance_watermark(timestamp);
777                        messages_processed += 1;
778                        had_work = true;
779                    }
780                    CoreMessage::CheckpointRequest(checkpoint_id) => {
781                        // Snapshot operator states into the pre-allocated Box.
782                        // The Box is taken and replenished after the inbox drain loop.
783                        reactor.trigger_checkpoint_into(&mut checkpoint_states_buf);
784                        let mut data = checkpoint_slot.take().unwrap_or_else(|| {
785                            Box::new(CheckpointCompleteData {
786                                checkpoint_id: 0,
787                                operator_states: Vec::new(),
788                            })
789                        });
790                        data.checkpoint_id = checkpoint_id;
791                        std::mem::swap(&mut data.operator_states, &mut checkpoint_states_buf);
792                        // Checkpoint completion must not be dropped — the coordinator
793                        // would stall waiting for a response that never arrives.
794                        // Spin-wait: checkpoints are infrequent (seconds apart).
795                        let mut cp_out = TaggedOutput {
796                            source_idx: 0,
797                            output: Output::CheckpointComplete(data),
798                        };
799                        loop {
800                            match ctx.outbox.push(cp_out) {
801                                Ok(()) => break,
802                                Err(returned) => {
803                                    if ctx.shutdown.load(Ordering::Acquire) {
804                                        break;
805                                    }
806                                    cp_out = returned;
807                                    std::hint::spin_loop();
808                                }
809                            }
810                        }
811                        messages_processed += 1;
812                        had_work = true;
813                    }
814                    CoreMessage::Barrier {
815                        source_idx,
816                        barrier,
817                    } => {
818                        // Fully drain the reactor before forwarding the barrier.
819                        // A single poll_into may only process batch_size events;
820                        // we must loop until the queue is empty so no pre-barrier
821                        // event slips past the barrier.
822                        loop {
823                            poll_buffer.clear();
824                            reactor.poll_into(&mut poll_buffer);
825                            if poll_buffer.is_empty() {
826                                break;
827                            }
828                            for output in poll_buffer.drain(..) {
829                                if ctx
830                                    .outbox
831                                    .push(TaggedOutput { source_idx, output })
832                                    .is_err()
833                                {
834                                    ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
835                                }
836                            }
837                        }
838                        // Barrier must not be dropped — a missing barrier breaks
839                        // checkpoint alignment. Spin-wait until outbox has space.
840                        let mut barrier_out = TaggedOutput {
841                            source_idx,
842                            output: Output::Barrier(barrier),
843                        };
844                        loop {
845                            match ctx.outbox.push(barrier_out) {
846                                Ok(()) => break,
847                                Err(returned) => {
848                                    if ctx.shutdown.load(Ordering::Acquire) {
849                                        break;
850                                    }
851                                    barrier_out = returned;
852                                    std::hint::spin_loop();
853                                }
854                            }
855                        }
856                        messages_processed += 1;
857                        had_work = true;
858                    }
859                    CoreMessage::Shutdown => {
860                        // Release credits for any messages we've processed so far
861                        if messages_processed > 0 {
862                            ctx.credit_gate.release(messages_processed);
863                            messages_processed = 0;
864                        }
865                        break;
866                    }
867                }
868
869                // Check if batch budget is almost exceeded and break to process reactor
870                // This ensures Ring 0 latency guarantees by limiting batch processing time
871                if batch_budget.almost_exceeded() {
872                    break;
873                }
874            }
875
876            // Release credits for processed messages
877            // This signals to senders that we have capacity for more
878            if messages_processed > 0 {
879                ctx.credit_gate.release(messages_processed);
880            }
881
882            // Poll storage I/O completions (non-blocking, zero-alloc).
883            // Push completions to the SPSC outbox for Ring 2 (checkpoint coordinator)
884            // to drain and feed to PerCoreWalManager::check_all_completions().
885            if let Some(ref mut sio) = storage_io {
886                io_completions.clear();
887                sio.poll_completions(&mut io_completions);
888                for completion in &io_completions {
889                    // Spin-wait on push failure. A dropped completion
890                    // permanently stalls the WAL writer (pending_sync
891                    // never clears, blocking all future syncs).
892                    let mut c = *completion;
893                    loop {
894                        match io_completion_outbox.push(c) {
895                            Ok(()) => break,
896                            Err(returned) => {
897                                if ctx.shutdown.load(Ordering::Acquire) {
898                                    break;
899                                }
900                                c = returned;
901                                std::hint::spin_loop();
902                            }
903                        }
904                    }
905                }
906                if !io_completions.is_empty() {
907                    had_work = true;
908                }
909            }
910
911            // Replenish checkpoint slot if it was consumed (infrequent — once per checkpoint).
912            // checkpoint_states_buf was drained by the swap above; move its capacity into the
913            // new Box so both buffers retain their allocations across cycles.
914            if checkpoint_slot.is_none() {
915                checkpoint_states_buf.clear();
916                let mut data = Box::new(CheckpointCompleteData {
917                    checkpoint_id: 0,
918                    operator_states: Vec::new(),
919                });
920                std::mem::swap(&mut data.operator_states, &mut checkpoint_states_buf);
921                checkpoint_slot = Some(data);
922            }
923
924            // Process events in reactor (reuses poll_buffer capacity across iterations)
925            poll_buffer.clear();
926            reactor.poll_into(&mut poll_buffer);
927            ctx.events_processed
928                .fetch_add(poll_buffer.len() as u64, Ordering::Relaxed);
929
930            // Push outputs to outbox tagged with the last source index
931            for output in poll_buffer.drain(..) {
932                if ctx
933                    .outbox
934                    .push(TaggedOutput {
935                        source_idx: last_source_idx,
936                        output,
937                    })
938                    .is_err()
939                {
940                    ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
941                }
942                had_work = true;
943            }
944
945            // Tiered idle: progressively back off to avoid burning CPU
946            // when no work is available. Source I/O threads call unpark()
947            // after pushing to the inbox, giving sub-ms wake latency.
948            if had_work {
949                // Signal the coordinator that outputs are available.
950                // Lock-free: atomic swap is ~5ns. Only the false→true
951                // transition calls notify_one(), so at most ONE core per
952                // coordinator cycle hits the Notify mutex — no N-way contention.
953                if !ctx.has_new_data.swap(true, Ordering::Release) {
954                    ctx.output_notify.notify_one();
955                }
956                idle_spins = 0;
957            } else {
958                idle_spins = idle_spins.saturating_add(1);
959                if idle_spins < 64 {
960                    std::hint::spin_loop();
961                } else if idle_spins < 128 {
962                    thread::yield_now();
963                } else {
964                    thread::park_timeout(std::time::Duration::from_millis(1));
965                }
966            }
967        }
968
969        // Drain any remaining events before shutdown
970        poll_buffer.clear();
971        reactor.poll_into(&mut poll_buffer);
972        for output in poll_buffer.drain(..) {
973            let _ = ctx.outbox.push(TaggedOutput {
974                source_idx: last_source_idx,
975                output,
976            });
977        }
978
979        Ok(())
980    })); // end catch_unwind
981
982    // Always clear is_running so coordinator detects core death.
983    ctx.is_running.store(false, Ordering::Release);
984    // Signal coordinator — it may be parked waiting for output.
985    ctx.output_notify.notify_one();
986
987    match panic_result {
988        Ok(inner) => inner,
989        Err(payload) => {
990            let message = if let Some(s) = payload.downcast_ref::<&str>() {
991                (*s).to_string()
992            } else if let Some(s) = payload.downcast_ref::<String>() {
993                s.clone()
994            } else {
995                "unknown panic".to_string()
996            };
997            tracing::error!("Core {}: operator panic caught: {message}", ctx.core_id,);
998            Err(TpcError::OperatorPanic {
999                core_id: ctx.core_id,
1000                message,
1001            })
1002        }
1003    }
1004}
1005
1006/// Sets CPU affinity for the current thread.
1007fn set_cpu_affinity(core_id: usize, cpu_id: usize) -> Result<(), TpcError> {
1008    #[cfg(target_os = "linux")]
1009    {
1010        use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
1011        use std::mem;
1012
1013        // SAFETY: We're calling libc functions with valid parameters.
1014        // The cpu_set_t is properly initialized with CPU_ZERO.
1015        // The process ID 0 refers to the current thread.
1016        #[allow(unsafe_code)]
1017        unsafe {
1018            let mut set: cpu_set_t = mem::zeroed();
1019            CPU_ZERO(&mut set);
1020            CPU_SET(cpu_id, &mut set);
1021
1022            let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
1023            if result != 0 {
1024                return Err(TpcError::AffinityFailed {
1025                    core_id,
1026                    message: format!(
1027                        "sched_setaffinity failed: {}",
1028                        std::io::Error::last_os_error()
1029                    ),
1030                });
1031            }
1032        }
1033    }
1034
1035    #[cfg(target_os = "windows")]
1036    {
1037        use windows_sys::Win32::System::Threading::{GetCurrentThread, SetThreadAffinityMask};
1038
1039        if cpu_id >= usize::BITS as usize {
1040            return Err(TpcError::AffinityFailed {
1041                core_id,
1042                message: format!(
1043                    "cpu_id {cpu_id} >= {} — Windows processor groups not yet supported",
1044                    usize::BITS
1045                ),
1046            });
1047        }
1048        // SAFETY: We're calling Windows API functions with valid parameters.
1049        // GetCurrentThread returns a pseudo-handle that doesn't need to be closed.
1050        // The mask is a valid CPU mask for the specified core.
1051        #[allow(unsafe_code)]
1052        unsafe {
1053            let mask: usize = 1 << cpu_id;
1054            let result = SetThreadAffinityMask(GetCurrentThread(), mask);
1055            if result == 0 {
1056                return Err(TpcError::AffinityFailed {
1057                    core_id,
1058                    message: format!(
1059                        "SetThreadAffinityMask failed: {}",
1060                        std::io::Error::last_os_error()
1061                    ),
1062                });
1063            }
1064        }
1065    }
1066
1067    #[cfg(not(any(target_os = "linux", target_os = "windows")))]
1068    {
1069        let _ = (core_id, cpu_id);
1070        // No-op on other platforms
1071    }
1072
1073    Ok(())
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079    use crate::operator::{OperatorState, OutputVec, Timer};
1080    use arrow_array::{Int64Array, RecordBatch};
1081    use std::sync::Arc;
1082    use std::time::Duration;
1083
1084    // Simple passthrough operator for testing
1085    struct PassthroughOperator;
1086
1087    impl Operator for PassthroughOperator {
1088        fn process(
1089            &mut self,
1090            event: &Event,
1091            _ctx: &mut crate::operator::OperatorContext,
1092        ) -> OutputVec {
1093            let mut output = OutputVec::new();
1094            output.push(Output::Event(event.clone()));
1095            output
1096        }
1097
1098        fn on_timer(
1099            &mut self,
1100            _timer: Timer,
1101            _ctx: &mut crate::operator::OperatorContext,
1102        ) -> OutputVec {
1103            OutputVec::new()
1104        }
1105
1106        fn checkpoint(&self) -> OperatorState {
1107            OperatorState {
1108                operator_id: "passthrough".to_string(),
1109                data: vec![],
1110            }
1111        }
1112
1113        fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
1114            Ok(())
1115        }
1116    }
1117
1118    fn make_event(value: i64) -> Event {
1119        let array = Arc::new(Int64Array::from(vec![value]));
1120        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
1121        Event::new(value, batch)
1122    }
1123
1124    #[test]
1125    fn test_core_handle_spawn() {
1126        let config = CoreConfig {
1127            core_id: 0,
1128            cpu_affinity: None, // Don't pin in tests
1129            inbox_capacity: 1024,
1130            outbox_capacity: 1024,
1131            reactor_config: ReactorConfig::default(),
1132            backpressure: super::BackpressureConfig::default(),
1133            numa_aware: false,
1134            enable_storage_io: false,
1135            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1136            io_uring_config: None,
1137        };
1138
1139        let handle = CoreHandle::spawn(config).unwrap();
1140        assert!(handle.is_running());
1141        assert_eq!(handle.core_id(), 0);
1142
1143        handle.shutdown_and_join().unwrap();
1144    }
1145
1146    #[test]
1147    fn test_core_handle_with_operator() {
1148        let config = CoreConfig {
1149            core_id: 0,
1150            cpu_affinity: None,
1151            inbox_capacity: 1024,
1152            outbox_capacity: 1024,
1153            reactor_config: ReactorConfig::default(),
1154            backpressure: super::BackpressureConfig::default(),
1155            numa_aware: false,
1156            enable_storage_io: false,
1157            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1158            io_uring_config: None,
1159        };
1160
1161        let handle =
1162            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1163
1164        // Send an event
1165        let event = make_event(42);
1166        handle.send_event(0, event).unwrap();
1167
1168        // Wait a bit for processing
1169        thread::sleep(Duration::from_millis(50));
1170
1171        // Poll for outputs
1172        let outputs = handle.poll_outputs(10);
1173        assert!(!outputs.is_empty());
1174        // Verify source_idx is preserved
1175        assert_eq!(outputs[0].source_idx, 0);
1176
1177        handle.shutdown_and_join().unwrap();
1178    }
1179
1180    #[test]
1181    fn test_core_handle_multiple_events() {
1182        let config = CoreConfig {
1183            core_id: 1,
1184            cpu_affinity: None,
1185            inbox_capacity: 1024,
1186            outbox_capacity: 1024,
1187            reactor_config: ReactorConfig::default(),
1188            backpressure: super::BackpressureConfig::default(),
1189            numa_aware: false,
1190            enable_storage_io: false,
1191            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1192            io_uring_config: None,
1193        };
1194
1195        let handle =
1196            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1197
1198        // Send multiple events from source 0
1199        for i in 0..100 {
1200            handle.send_event(0, make_event(i)).unwrap();
1201        }
1202
1203        // Wait for processing
1204        thread::sleep(Duration::from_millis(100));
1205
1206        // Poll all outputs
1207        let mut total_outputs = 0;
1208        loop {
1209            let outputs = handle.poll_outputs(1000);
1210            if outputs.is_empty() {
1211                break;
1212            }
1213            total_outputs += outputs.len();
1214        }
1215
1216        // Should have at least some outputs (may have watermarks too)
1217        assert!(total_outputs >= 100);
1218
1219        handle.shutdown_and_join().unwrap();
1220    }
1221
1222    #[test]
1223    fn test_core_handle_shutdown() {
1224        let config = CoreConfig::default();
1225        let handle = CoreHandle::spawn(config).unwrap();
1226
1227        assert!(handle.is_running());
1228
1229        handle.shutdown();
1230
1231        // Wait for shutdown
1232        thread::sleep(Duration::from_millis(100));
1233
1234        // Thread should have stopped
1235        assert!(!handle.is_running());
1236    }
1237
1238    #[test]
1239    fn test_core_handle_debug() {
1240        let config = CoreConfig {
1241            core_id: 42,
1242            ..Default::default()
1243        };
1244        let handle = CoreHandle::spawn(config).unwrap();
1245
1246        let debug_str = format!("{handle:?}");
1247        assert!(debug_str.contains("CoreHandle"));
1248        assert!(debug_str.contains("42"));
1249
1250        handle.shutdown_and_join().unwrap();
1251    }
1252
1253    #[test]
1254    fn test_core_config_default() {
1255        let config = CoreConfig::default();
1256        assert_eq!(config.core_id, 0);
1257        assert!(config.cpu_affinity.is_none());
1258        assert_eq!(config.inbox_capacity, 8192);
1259        assert_eq!(config.outbox_capacity, 8192);
1260        assert!(!config.numa_aware);
1261    }
1262
1263    #[test]
1264    fn test_core_handle_numa_node() {
1265        let config = CoreConfig {
1266            core_id: 0,
1267            cpu_affinity: None,
1268            numa_aware: true,
1269            ..Default::default()
1270        };
1271
1272        let handle = CoreHandle::spawn(config).unwrap();
1273        // On any system, numa_node should be a valid value (0 on non-NUMA systems)
1274        assert!(handle.numa_node() < 64);
1275
1276        handle.shutdown_and_join().unwrap();
1277    }
1278
1279    #[test]
1280    fn test_poll_outputs_into() {
1281        let config = CoreConfig {
1282            core_id: 0,
1283            cpu_affinity: None,
1284            inbox_capacity: 1024,
1285            outbox_capacity: 1024,
1286            reactor_config: ReactorConfig::default(),
1287            backpressure: super::BackpressureConfig::default(),
1288            numa_aware: false,
1289            enable_storage_io: false,
1290            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1291            io_uring_config: None,
1292        };
1293
1294        let handle =
1295            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1296
1297        // Send events
1298        for i in 0..10 {
1299            handle.send_event(0, make_event(i)).unwrap();
1300        }
1301
1302        // Wait for processing
1303        thread::sleep(Duration::from_millis(100));
1304
1305        // Poll into pre-allocated buffer
1306        let mut buffer = Vec::with_capacity(100);
1307        let count = handle.poll_outputs_into(&mut buffer, 100);
1308
1309        assert!(count > 0);
1310        assert_eq!(buffer.len(), count);
1311
1312        // Reuse buffer - no new allocation
1313        let cap_before = buffer.capacity();
1314        buffer.clear();
1315        let _ = handle.poll_outputs_into(&mut buffer, 100);
1316        assert_eq!(buffer.capacity(), cap_before); // Capacity unchanged
1317
1318        handle.shutdown_and_join().unwrap();
1319    }
1320
1321    #[test]
1322    fn test_poll_each() {
1323        let config = CoreConfig {
1324            core_id: 0,
1325            cpu_affinity: None,
1326            inbox_capacity: 1024,
1327            outbox_capacity: 1024,
1328            reactor_config: ReactorConfig::default(),
1329            backpressure: super::BackpressureConfig::default(),
1330            numa_aware: false,
1331            enable_storage_io: false,
1332            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1333            io_uring_config: None,
1334        };
1335
1336        let handle =
1337            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1338
1339        // Send events
1340        for i in 0..10 {
1341            handle.send_event(0, make_event(i)).unwrap();
1342        }
1343
1344        // Wait for processing
1345        thread::sleep(Duration::from_millis(100));
1346
1347        // Poll with callback
1348        let mut event_count = 0;
1349        let count = handle.poll_each(100, |tagged| {
1350            if matches!(tagged.output, Output::Event(_)) {
1351                event_count += 1;
1352            }
1353            true
1354        });
1355
1356        assert!(count > 0);
1357        assert!(event_count > 0);
1358
1359        handle.shutdown_and_join().unwrap();
1360    }
1361
1362    #[test]
1363    fn test_poll_each_early_stop() {
1364        let config = CoreConfig {
1365            core_id: 0,
1366            cpu_affinity: None,
1367            inbox_capacity: 1024,
1368            outbox_capacity: 1024,
1369            reactor_config: ReactorConfig::default(),
1370            backpressure: super::BackpressureConfig::default(),
1371            numa_aware: false,
1372            enable_storage_io: false,
1373            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1374            io_uring_config: None,
1375        };
1376
1377        let handle =
1378            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1379
1380        // Send events
1381        for i in 0..20 {
1382            handle.send_event(0, make_event(i)).unwrap();
1383        }
1384
1385        // Wait for processing
1386        thread::sleep(Duration::from_millis(100));
1387
1388        // Poll with early stop after 5 items
1389        let mut processed = 0;
1390        let count = handle.poll_each(100, |_| {
1391            processed += 1;
1392            processed < 5 // Stop after 5
1393        });
1394
1395        assert_eq!(count, 5);
1396        assert_eq!(processed, 5);
1397
1398        // There should be more items remaining
1399        let remaining = handle.outbox_len();
1400        assert!(remaining > 0);
1401
1402        handle.shutdown_and_join().unwrap();
1403    }
1404
1405    #[test]
1406    fn test_watermark_propagation() {
1407        let config = CoreConfig {
1408            core_id: 0,
1409            cpu_affinity: None,
1410            inbox_capacity: 1024,
1411            outbox_capacity: 1024,
1412            reactor_config: ReactorConfig::default(),
1413            backpressure: super::BackpressureConfig::default(),
1414            numa_aware: false,
1415            enable_storage_io: false,
1416            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1417            io_uring_config: None,
1418        };
1419
1420        let handle =
1421            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1422
1423        // Send a watermark advancement
1424        handle.send(CoreMessage::Watermark(5000)).unwrap();
1425
1426        // Wait for processing
1427        thread::sleep(Duration::from_millis(50));
1428
1429        // Poll outputs - should contain a watermark
1430        let outputs = handle.poll_outputs(100);
1431        let has_watermark = outputs
1432            .iter()
1433            .any(|o| matches!(o.output, Output::Watermark(_)));
1434        assert!(
1435            has_watermark,
1436            "Expected watermark output after Watermark message"
1437        );
1438
1439        handle.shutdown_and_join().unwrap();
1440    }
1441
1442    #[test]
1443    fn test_checkpoint_triggering() {
1444        let config = CoreConfig {
1445            core_id: 0,
1446            cpu_affinity: None,
1447            inbox_capacity: 1024,
1448            outbox_capacity: 1024,
1449            reactor_config: ReactorConfig::default(),
1450            backpressure: super::BackpressureConfig::default(),
1451            numa_aware: false,
1452            enable_storage_io: false,
1453            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1454            io_uring_config: None,
1455        };
1456
1457        let handle =
1458            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1459
1460        // Send a checkpoint request
1461        handle.send(CoreMessage::CheckpointRequest(42)).unwrap();
1462
1463        // Wait for processing
1464        thread::sleep(Duration::from_millis(50));
1465
1466        // Poll outputs - should contain a CheckpointComplete
1467        let outputs = handle.poll_outputs(100);
1468        let checkpoint = outputs
1469            .iter()
1470            .find(|o| matches!(o.output, Output::CheckpointComplete(_)));
1471        assert!(checkpoint.is_some(), "Expected CheckpointComplete output");
1472
1473        if let Some(TaggedOutput {
1474            output: Output::CheckpointComplete(data),
1475            ..
1476        }) = checkpoint
1477        {
1478            assert_eq!(data.checkpoint_id, 42);
1479            // One operator (PassthroughOperator)
1480            assert_eq!(data.operator_states.len(), 1);
1481            assert_eq!(data.operator_states[0].operator_id, "passthrough");
1482        }
1483
1484        handle.shutdown_and_join().unwrap();
1485    }
1486
1487    #[test]
1488    fn test_outputs_dropped_counter() {
1489        let config = CoreConfig {
1490            core_id: 0,
1491            cpu_affinity: None,
1492            inbox_capacity: 1024,
1493            outbox_capacity: 4, // Very small outbox to force drops
1494            reactor_config: ReactorConfig::default(),
1495            backpressure: super::BackpressureConfig::default(),
1496            numa_aware: false,
1497            enable_storage_io: false,
1498            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1499            io_uring_config: None,
1500        };
1501
1502        let handle =
1503            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1504
1505        // Send many events without polling - outbox should fill up
1506        for i in 0..100 {
1507            let _ = handle.send_event(0, make_event(i));
1508        }
1509
1510        // Wait for processing to fill and overflow the outbox
1511        thread::sleep(Duration::from_millis(200));
1512
1513        // Some outputs should have been dropped
1514        let dropped = handle.outputs_dropped();
1515        assert!(
1516            dropped > 0,
1517            "Expected some outputs to be dropped with outbox_capacity=4"
1518        );
1519
1520        handle.shutdown_and_join().unwrap();
1521    }
1522
1523    #[test]
1524    fn test_barrier_handling() {
1525        let config = CoreConfig {
1526            core_id: 0,
1527            cpu_affinity: None,
1528            inbox_capacity: 1024,
1529            outbox_capacity: 1024,
1530            reactor_config: ReactorConfig::default(),
1531            backpressure: super::BackpressureConfig::default(),
1532            numa_aware: false,
1533            enable_storage_io: false,
1534            #[cfg(all(target_os = "linux", feature = "io-uring"))]
1535            io_uring_config: None,
1536        };
1537
1538        let handle =
1539            CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1540
1541        // Send some events then a barrier
1542        handle.send_event(0, make_event(1)).unwrap();
1543        handle.send_event(0, make_event(2)).unwrap();
1544        handle
1545            .send(CoreMessage::Barrier {
1546                source_idx: 0,
1547                barrier: CheckpointBarrier::new(99, 1),
1548            })
1549            .unwrap();
1550
1551        // Wait for processing
1552        thread::sleep(Duration::from_millis(100));
1553
1554        // Poll outputs - should contain events then a barrier
1555        let outputs = handle.poll_outputs(100);
1556        let has_barrier = outputs
1557            .iter()
1558            .any(|o| matches!(o.output, Output::Barrier(_)));
1559        assert!(has_barrier, "Expected Barrier output");
1560
1561        // Barrier should have correct source_idx
1562        let barrier_tagged = outputs
1563            .iter()
1564            .find(|o| matches!(o.output, Output::Barrier(_)))
1565            .unwrap();
1566        assert_eq!(barrier_tagged.source_idx, 0);
1567
1568        handle.shutdown_and_join().unwrap();
1569    }
1570}