1use std::panic::{catch_unwind, AssertUnwindSafe};
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::Arc;
33use std::thread::{self, JoinHandle};
34
35use tokio::sync::Notify;
36
37#[cfg(all(target_os = "linux", feature = "io-uring"))]
38use crate::io_uring::IoUringConfig;
39
40use crate::alloc::HotPathGuard;
41use crate::budget::TaskBudget;
42use crate::checkpoint::CheckpointBarrier;
43use crate::numa::NumaTopology;
44use crate::operator::{CheckpointCompleteData, Event, Operator, OperatorState, Output};
45use crate::reactor::{Reactor, ReactorConfig};
46use crate::storage_io::{IoCompletion, StorageIo, SyncStorageIo};
47
48use super::backpressure::{
49 BackpressureConfig, CreditAcquireResult, CreditGate, CreditMetrics, OverflowStrategy,
50};
51use super::spsc::SpscQueue;
52use super::TpcError;
53
54#[derive(Debug)]
56pub enum CoreMessage {
57 Event {
59 source_idx: usize,
61 event: Event,
63 },
64 Watermark(i64),
66 CheckpointRequest(u64),
68 Barrier {
73 source_idx: usize,
75 barrier: CheckpointBarrier,
77 },
78 Shutdown,
80}
81
82#[derive(Debug)]
87pub struct TaggedOutput {
88 pub source_idx: usize,
90 pub output: Output,
92}
93
94#[derive(Debug, Clone)]
96pub struct CoreConfig {
97 pub core_id: usize,
99 pub cpu_affinity: Option<usize>,
101 pub inbox_capacity: usize,
103 pub outbox_capacity: usize,
105 pub reactor_config: ReactorConfig,
107 pub backpressure: BackpressureConfig,
109 pub numa_aware: bool,
111 pub enable_storage_io: bool,
117 #[cfg(all(target_os = "linux", feature = "io-uring"))]
119 pub io_uring_config: Option<IoUringConfig>,
120}
121
122impl Default for CoreConfig {
123 fn default() -> Self {
124 Self {
125 core_id: 0,
126 cpu_affinity: None,
127 inbox_capacity: 8192,
128 outbox_capacity: 8192,
129 reactor_config: ReactorConfig::default(),
130 backpressure: BackpressureConfig::default(),
131 numa_aware: false,
132 enable_storage_io: false,
133 #[cfg(all(target_os = "linux", feature = "io-uring"))]
134 io_uring_config: None,
135 }
136 }
137}
138
139pub struct CoreHandle {
144 core_id: usize,
146 numa_node: usize,
148 inbox: Arc<SpscQueue<CoreMessage>>,
150 outbox: Arc<SpscQueue<TaggedOutput>>,
152 io_completion_outbox: Arc<SpscQueue<IoCompletion>>,
156 credit_gate: Arc<CreditGate>,
158 thread: Option<JoinHandle<Result<(), TpcError>>>,
160 core_thread_handle: thread::Thread,
162 shutdown: Arc<AtomicBool>,
164 events_processed: Arc<AtomicU64>,
166 outputs_dropped: Arc<AtomicU64>,
168 is_running: Arc<AtomicBool>,
170}
171
172impl CoreHandle {
173 pub fn spawn(config: CoreConfig) -> Result<Self, TpcError> {
179 let notify = Arc::new(Notify::new());
180 let flag = Arc::new(AtomicBool::new(false));
181 Self::spawn_with_notify(config, Vec::new(), flag, notify)
182 }
183
184 #[allow(clippy::needless_pass_by_value)]
190 pub fn spawn_with_operators(
191 config: CoreConfig,
192 operators: Vec<Box<dyn Operator>>,
193 ) -> Result<Self, TpcError> {
194 let notify = Arc::new(Notify::new());
195 let flag = Arc::new(AtomicBool::new(false));
196 Self::spawn_with_notify(config, operators, flag, notify)
197 }
198
199 #[allow(clippy::needless_pass_by_value)]
210 pub fn spawn_with_notify(
211 config: CoreConfig,
212 operators: Vec<Box<dyn Operator>>,
213 has_new_data: Arc<AtomicBool>,
214 output_notify: Arc<Notify>,
215 ) -> Result<Self, TpcError> {
216 let core_id = config.core_id;
217 let cpu_affinity = config.cpu_affinity;
218 let reactor_config = config.reactor_config.clone();
219
220 let topology = NumaTopology::detect();
222 let numa_node =
223 cpu_affinity.map_or_else(|| topology.current_node(), |cpu| topology.node_for_cpu(cpu));
224
225 let inbox = Arc::new(SpscQueue::new(config.inbox_capacity));
226 let outbox: Arc<SpscQueue<TaggedOutput>> = Arc::new(SpscQueue::new(config.outbox_capacity));
227 let io_completion_outbox: Arc<SpscQueue<IoCompletion>> = Arc::new(SpscQueue::new(256));
230 let credit_gate = Arc::new(CreditGate::new(config.backpressure.clone()));
231 let shutdown = Arc::new(AtomicBool::new(false));
232 let events_processed = Arc::new(AtomicU64::new(0));
233 let outputs_dropped = Arc::new(AtomicU64::new(0));
234 let is_running = Arc::new(AtomicBool::new(false));
235
236 let storage_io: Option<Box<dyn StorageIo>> = if config.enable_storage_io {
238 #[cfg(all(target_os = "linux", feature = "io-uring"))]
240 {
241 if let Some(ref uring_cfg) = config.io_uring_config {
242 match crate::storage_io::UringStorageIo::new(core_id, uring_cfg) {
243 Ok(backend) => {
244 tracing::info!(
245 "Core {core_id}: using io_uring storage I/O (SQPOLL={})",
246 backend.uses_sqpoll()
247 );
248 Some(Box::new(backend))
249 }
250 Err(e) => {
251 tracing::warn!(
252 "Core {core_id}: io_uring init failed ({e}), falling back to sync"
253 );
254 Some(Box::new(SyncStorageIo::new()))
255 }
256 }
257 } else {
258 Some(Box::new(SyncStorageIo::new()))
259 }
260 }
261 #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
263 {
264 Some(Box::new(SyncStorageIo::new()))
265 }
266 } else {
267 None
268 };
269
270 let thread_context = CoreThreadContext {
271 core_id,
272 cpu_affinity,
273 reactor_config,
274 numa_aware: config.numa_aware,
275 numa_node,
276 #[cfg(target_os = "linux")]
277 numa_topology: topology,
278 inbox: Arc::clone(&inbox),
279 outbox: Arc::clone(&outbox),
280 credit_gate: Arc::clone(&credit_gate),
281 shutdown: Arc::clone(&shutdown),
282 events_processed: Arc::clone(&events_processed),
283 outputs_dropped: Arc::clone(&outputs_dropped),
284 is_running: Arc::clone(&is_running),
285 has_new_data: Arc::clone(&has_new_data),
286 output_notify: Arc::clone(&output_notify),
287 };
288
289 let thread = thread::Builder::new()
290 .name(format!("laminar-core-{core_id}"))
291 .spawn({
292 let io_cq = Arc::clone(&io_completion_outbox);
293 move || core_thread_main(&thread_context, operators, storage_io, io_cq)
294 })
295 .map_err(|e| TpcError::SpawnFailed {
296 core_id,
297 message: e.to_string(),
298 })?;
299
300 let core_thread_handle = thread.thread().clone();
303
304 while !is_running.load(Ordering::Acquire) {
306 thread::yield_now();
307 }
308
309 Ok(Self {
310 core_id,
311 numa_node,
312 inbox,
313 outbox,
314 io_completion_outbox,
315 credit_gate,
316 thread: Some(thread),
317 core_thread_handle,
318 shutdown,
319 events_processed,
320 outputs_dropped,
321 is_running,
322 })
323 }
324
325 #[must_use]
327 pub fn core_id(&self) -> usize {
328 self.core_id
329 }
330
331 #[must_use]
333 pub fn numa_node(&self) -> usize {
334 self.numa_node
335 }
336
337 #[must_use]
339 pub fn is_running(&self) -> bool {
340 self.is_running.load(Ordering::Acquire)
341 }
342
343 #[must_use]
348 pub fn core_thread_handle(&self) -> &thread::Thread {
349 &self.core_thread_handle
350 }
351
352 #[must_use]
354 pub fn events_processed(&self) -> u64 {
355 self.events_processed.load(Ordering::Relaxed)
356 }
357
358 #[must_use]
360 pub fn outputs_dropped(&self) -> u64 {
361 self.outputs_dropped.load(Ordering::Relaxed)
362 }
363
364 pub fn send(&self, message: CoreMessage) -> Result<(), TpcError> {
375 match self.credit_gate.try_acquire() {
377 CreditAcquireResult::Acquired => {
378 self.inbox.push(message).map_err(|_| TpcError::QueueFull {
380 core_id: self.core_id,
381 })
382 }
383 CreditAcquireResult::WouldBlock => {
384 if self.credit_gate.config().overflow_strategy == OverflowStrategy::Block {
386 self.credit_gate.acquire_blocking(1);
388 self.inbox.push(message).map_err(|_| TpcError::QueueFull {
389 core_id: self.core_id,
390 })
391 } else {
392 Err(TpcError::Backpressure {
394 core_id: self.core_id,
395 })
396 }
397 }
398 CreditAcquireResult::Dropped => {
399 Ok(())
401 }
402 }
403 }
404
405 pub fn try_send(&self, message: CoreMessage) -> Result<(), TpcError> {
414 match self.credit_gate.try_acquire() {
415 CreditAcquireResult::Acquired => {
416 self.inbox.push(message).map_err(|_| TpcError::QueueFull {
417 core_id: self.core_id,
418 })
419 }
420 CreditAcquireResult::WouldBlock | CreditAcquireResult::Dropped => {
421 Err(TpcError::Backpressure {
422 core_id: self.core_id,
423 })
424 }
425 }
426 }
427
428 pub fn send_event(&self, source_idx: usize, event: Event) -> Result<(), TpcError> {
437 self.send(CoreMessage::Event { source_idx, event })
438 }
439
440 pub fn try_send_event(&self, source_idx: usize, event: Event) -> Result<(), TpcError> {
446 self.try_send(CoreMessage::Event { source_idx, event })
447 }
448
449 #[must_use]
451 pub fn inbox(&self) -> &Arc<SpscQueue<CoreMessage>> {
452 &self.inbox
453 }
454
455 #[must_use]
457 pub fn outbox(&self) -> &Arc<SpscQueue<TaggedOutput>> {
458 &self.outbox
459 }
460
461 #[must_use]
470 pub fn poll_outputs(&self, max_count: usize) -> Vec<TaggedOutput> {
471 self.outbox.pop_batch(max_count)
472 }
473
474 #[inline]
479 pub fn poll_outputs_into(&self, buffer: &mut Vec<TaggedOutput>, max_count: usize) -> usize {
480 let start_len = buffer.len();
481
482 self.outbox.pop_each(max_count, |output| {
483 buffer.push(output);
484 true
485 });
486
487 buffer.len() - start_len
488 }
489
490 #[inline]
499 pub fn poll_each<F>(&self, max_count: usize, f: F) -> usize
500 where
501 F: FnMut(TaggedOutput) -> bool,
502 {
503 self.outbox.pop_each(max_count, f)
504 }
505
506 #[must_use]
508 pub fn poll_output(&self) -> Option<TaggedOutput> {
509 self.outbox.pop()
510 }
511
512 pub fn drain_io_completions(&self, out: &mut Vec<IoCompletion>) {
518 self.io_completion_outbox.pop_each(256, |c| {
519 out.push(c);
520 true
521 });
522 }
523
524 #[must_use]
526 pub fn inbox_len(&self) -> usize {
527 self.inbox.len()
528 }
529
530 #[must_use]
532 pub fn outbox_len(&self) -> usize {
533 self.outbox.len()
534 }
535
536 #[must_use]
538 pub fn is_backpressured(&self) -> bool {
539 self.credit_gate.is_backpressured()
540 }
541
542 #[must_use]
544 pub fn available_credits(&self) -> usize {
545 self.credit_gate.available()
546 }
547
548 #[must_use]
550 pub fn max_credits(&self) -> usize {
551 self.credit_gate.max_credits()
552 }
553
554 #[must_use]
556 pub fn credit_metrics(&self) -> &CreditMetrics {
557 self.credit_gate.metrics()
558 }
559
560 pub fn shutdown(&self) {
562 self.shutdown.store(true, Ordering::Release);
563 let _ = self.inbox.push(CoreMessage::Shutdown);
565 self.core_thread_handle.unpark();
567 }
568
569 pub fn join(mut self) -> Result<(), TpcError> {
575 if let Some(handle) = self.thread.take() {
576 handle.join().map_err(|_| TpcError::SpawnFailed {
577 core_id: self.core_id,
578 message: "Thread panicked".to_string(),
579 })?
580 } else {
581 Ok(())
582 }
583 }
584
585 pub fn shutdown_and_join(self) -> Result<(), TpcError> {
591 self.shutdown();
592 self.join()
593 }
594}
595
596impl Drop for CoreHandle {
597 fn drop(&mut self) {
598 self.shutdown.store(true, Ordering::Release);
600 let _ = self.inbox.push(CoreMessage::Shutdown);
602 self.core_thread_handle.unpark();
604
605 if let Some(handle) = self.thread.take() {
607 let _ = handle.join();
608 }
609 }
610}
611
612impl std::fmt::Debug for CoreHandle {
613 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614 f.debug_struct("CoreHandle")
615 .field("core_id", &self.core_id)
616 .field("numa_node", &self.numa_node)
617 .field("is_running", &self.is_running())
618 .field("events_processed", &self.events_processed())
619 .field("outputs_dropped", &self.outputs_dropped())
620 .field("inbox_len", &self.inbox_len())
621 .field("outbox_len", &self.outbox_len())
622 .field("available_credits", &self.available_credits())
623 .field("is_backpressured", &self.is_backpressured())
624 .finish_non_exhaustive()
625 }
626}
627
628pub(super) struct CoreThreadContext {
630 pub(super) core_id: usize,
631 cpu_affinity: Option<usize>,
632 reactor_config: ReactorConfig,
633 numa_aware: bool,
634 numa_node: usize,
635 #[cfg(target_os = "linux")]
636 numa_topology: NumaTopology,
637 pub(super) inbox: Arc<SpscQueue<CoreMessage>>,
638 pub(super) outbox: Arc<SpscQueue<TaggedOutput>>,
639 pub(super) credit_gate: Arc<CreditGate>,
640 pub(super) shutdown: Arc<AtomicBool>,
641 pub(super) events_processed: Arc<AtomicU64>,
642 pub(super) outputs_dropped: Arc<AtomicU64>,
643 pub(super) is_running: Arc<AtomicBool>,
644 pub(super) has_new_data: Arc<AtomicBool>,
645 pub(super) output_notify: Arc<Notify>,
646}
647
648fn init_core_thread(
650 ctx: &CoreThreadContext,
651 operators: Vec<Box<dyn Operator>>,
652) -> Result<Reactor, TpcError> {
653 if let Some(cpu_id) = ctx.cpu_affinity {
655 set_cpu_affinity(ctx.core_id, cpu_id)?;
656 }
657
658 #[cfg(target_os = "linux")]
660 if ctx.numa_aware {
661 if let Err(e) = ctx.numa_topology.bind_local_memory() {
662 tracing::warn!(core_id = ctx.core_id, ?e, "NUMA memory bind failed");
663 }
664 }
665
666 if ctx.numa_aware {
668 tracing::info!(
669 "Core {} starting on NUMA node {}",
670 ctx.core_id,
671 ctx.numa_node
672 );
673 }
674
675 let mut reactor_config = ctx.reactor_config.clone();
677 reactor_config.cpu_affinity = ctx.cpu_affinity;
678
679 let mut reactor = Reactor::new(reactor_config).map_err(|e| TpcError::ReactorError {
680 core_id: ctx.core_id,
681 source: e,
682 })?;
683
684 for op in operators {
686 reactor.add_operator(op);
687 }
688
689 Ok(reactor)
690}
691
692#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
694fn core_thread_main(
695 ctx: &CoreThreadContext,
696 operators: Vec<Box<dyn Operator>>,
697 mut storage_io: Option<Box<dyn StorageIo>>,
698 io_completion_outbox: Arc<SpscQueue<IoCompletion>>,
699) -> Result<(), TpcError> {
700 let mut reactor = init_core_thread(ctx, operators)?;
701
702 ctx.is_running.store(true, Ordering::Release);
704
705 let mut poll_buffer: Vec<Output> = Vec::with_capacity(256);
707
708 let mut io_completions: Vec<IoCompletion> = Vec::with_capacity(64);
710
711 let mut checkpoint_slot: Option<Box<CheckpointCompleteData>> =
714 Some(Box::new(CheckpointCompleteData {
715 checkpoint_id: 0,
716 operator_states: Vec::new(),
717 }));
718 let mut checkpoint_states_buf: Vec<OperatorState> = Vec::new();
720
721 let mut last_source_idx: usize = 0;
725
726 let mut submit_error_count: u64 = 0;
728
729 let mut idle_spins: u32 = 0;
734
735 let panic_result = catch_unwind(AssertUnwindSafe(|| -> Result<(), TpcError> {
739 loop {
741 if ctx.shutdown.load(Ordering::Acquire) {
743 break;
744 }
745
746 let _guard = HotPathGuard::enter("CoreThread::process_inbox");
748
749 let batch_budget = TaskBudget::ring0_batch();
751
752 let mut had_work = false;
754 let mut messages_processed = 0usize;
755
756 while let Some(message) = ctx.inbox.pop() {
757 match message {
758 CoreMessage::Event { source_idx, event } => {
759 last_source_idx = source_idx;
760 if let Err(e) = reactor.submit(event) {
761 submit_error_count += 1;
762 if submit_error_count.is_power_of_two() {
763 tracing::error!(
764 "Core {}: Failed to submit event (n={}): {e}",
765 ctx.core_id,
766 submit_error_count,
767 );
768 }
769 }
770 messages_processed += 1;
771 had_work = true;
772 }
773 CoreMessage::Watermark(timestamp) => {
774 reactor.advance_watermark(timestamp);
777 messages_processed += 1;
778 had_work = true;
779 }
780 CoreMessage::CheckpointRequest(checkpoint_id) => {
781 reactor.trigger_checkpoint_into(&mut checkpoint_states_buf);
784 let mut data = checkpoint_slot.take().unwrap_or_else(|| {
785 Box::new(CheckpointCompleteData {
786 checkpoint_id: 0,
787 operator_states: Vec::new(),
788 })
789 });
790 data.checkpoint_id = checkpoint_id;
791 std::mem::swap(&mut data.operator_states, &mut checkpoint_states_buf);
792 let mut cp_out = TaggedOutput {
796 source_idx: 0,
797 output: Output::CheckpointComplete(data),
798 };
799 loop {
800 match ctx.outbox.push(cp_out) {
801 Ok(()) => break,
802 Err(returned) => {
803 if ctx.shutdown.load(Ordering::Acquire) {
804 break;
805 }
806 cp_out = returned;
807 std::hint::spin_loop();
808 }
809 }
810 }
811 messages_processed += 1;
812 had_work = true;
813 }
814 CoreMessage::Barrier {
815 source_idx,
816 barrier,
817 } => {
818 loop {
823 poll_buffer.clear();
824 reactor.poll_into(&mut poll_buffer);
825 if poll_buffer.is_empty() {
826 break;
827 }
828 for output in poll_buffer.drain(..) {
829 if ctx
830 .outbox
831 .push(TaggedOutput { source_idx, output })
832 .is_err()
833 {
834 ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
835 }
836 }
837 }
838 let mut barrier_out = TaggedOutput {
841 source_idx,
842 output: Output::Barrier(barrier),
843 };
844 loop {
845 match ctx.outbox.push(barrier_out) {
846 Ok(()) => break,
847 Err(returned) => {
848 if ctx.shutdown.load(Ordering::Acquire) {
849 break;
850 }
851 barrier_out = returned;
852 std::hint::spin_loop();
853 }
854 }
855 }
856 messages_processed += 1;
857 had_work = true;
858 }
859 CoreMessage::Shutdown => {
860 if messages_processed > 0 {
862 ctx.credit_gate.release(messages_processed);
863 messages_processed = 0;
864 }
865 break;
866 }
867 }
868
869 if batch_budget.almost_exceeded() {
872 break;
873 }
874 }
875
876 if messages_processed > 0 {
879 ctx.credit_gate.release(messages_processed);
880 }
881
882 if let Some(ref mut sio) = storage_io {
886 io_completions.clear();
887 sio.poll_completions(&mut io_completions);
888 for completion in &io_completions {
889 let mut c = *completion;
893 loop {
894 match io_completion_outbox.push(c) {
895 Ok(()) => break,
896 Err(returned) => {
897 if ctx.shutdown.load(Ordering::Acquire) {
898 break;
899 }
900 c = returned;
901 std::hint::spin_loop();
902 }
903 }
904 }
905 }
906 if !io_completions.is_empty() {
907 had_work = true;
908 }
909 }
910
911 if checkpoint_slot.is_none() {
915 checkpoint_states_buf.clear();
916 let mut data = Box::new(CheckpointCompleteData {
917 checkpoint_id: 0,
918 operator_states: Vec::new(),
919 });
920 std::mem::swap(&mut data.operator_states, &mut checkpoint_states_buf);
921 checkpoint_slot = Some(data);
922 }
923
924 poll_buffer.clear();
926 reactor.poll_into(&mut poll_buffer);
927 ctx.events_processed
928 .fetch_add(poll_buffer.len() as u64, Ordering::Relaxed);
929
930 for output in poll_buffer.drain(..) {
932 if ctx
933 .outbox
934 .push(TaggedOutput {
935 source_idx: last_source_idx,
936 output,
937 })
938 .is_err()
939 {
940 ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
941 }
942 had_work = true;
943 }
944
945 if had_work {
949 if !ctx.has_new_data.swap(true, Ordering::Release) {
954 ctx.output_notify.notify_one();
955 }
956 idle_spins = 0;
957 } else {
958 idle_spins = idle_spins.saturating_add(1);
959 if idle_spins < 64 {
960 std::hint::spin_loop();
961 } else if idle_spins < 128 {
962 thread::yield_now();
963 } else {
964 thread::park_timeout(std::time::Duration::from_millis(1));
965 }
966 }
967 }
968
969 poll_buffer.clear();
971 reactor.poll_into(&mut poll_buffer);
972 for output in poll_buffer.drain(..) {
973 let _ = ctx.outbox.push(TaggedOutput {
974 source_idx: last_source_idx,
975 output,
976 });
977 }
978
979 Ok(())
980 })); ctx.is_running.store(false, Ordering::Release);
984 ctx.output_notify.notify_one();
986
987 match panic_result {
988 Ok(inner) => inner,
989 Err(payload) => {
990 let message = if let Some(s) = payload.downcast_ref::<&str>() {
991 (*s).to_string()
992 } else if let Some(s) = payload.downcast_ref::<String>() {
993 s.clone()
994 } else {
995 "unknown panic".to_string()
996 };
997 tracing::error!("Core {}: operator panic caught: {message}", ctx.core_id,);
998 Err(TpcError::OperatorPanic {
999 core_id: ctx.core_id,
1000 message,
1001 })
1002 }
1003 }
1004}
1005
1006fn set_cpu_affinity(core_id: usize, cpu_id: usize) -> Result<(), TpcError> {
1008 #[cfg(target_os = "linux")]
1009 {
1010 use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
1011 use std::mem;
1012
1013 #[allow(unsafe_code)]
1017 unsafe {
1018 let mut set: cpu_set_t = mem::zeroed();
1019 CPU_ZERO(&mut set);
1020 CPU_SET(cpu_id, &mut set);
1021
1022 let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
1023 if result != 0 {
1024 return Err(TpcError::AffinityFailed {
1025 core_id,
1026 message: format!(
1027 "sched_setaffinity failed: {}",
1028 std::io::Error::last_os_error()
1029 ),
1030 });
1031 }
1032 }
1033 }
1034
1035 #[cfg(target_os = "windows")]
1036 {
1037 use windows_sys::Win32::System::Threading::{GetCurrentThread, SetThreadAffinityMask};
1038
1039 if cpu_id >= usize::BITS as usize {
1040 return Err(TpcError::AffinityFailed {
1041 core_id,
1042 message: format!(
1043 "cpu_id {cpu_id} >= {} — Windows processor groups not yet supported",
1044 usize::BITS
1045 ),
1046 });
1047 }
1048 #[allow(unsafe_code)]
1052 unsafe {
1053 let mask: usize = 1 << cpu_id;
1054 let result = SetThreadAffinityMask(GetCurrentThread(), mask);
1055 if result == 0 {
1056 return Err(TpcError::AffinityFailed {
1057 core_id,
1058 message: format!(
1059 "SetThreadAffinityMask failed: {}",
1060 std::io::Error::last_os_error()
1061 ),
1062 });
1063 }
1064 }
1065 }
1066
1067 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
1068 {
1069 let _ = (core_id, cpu_id);
1070 }
1072
1073 Ok(())
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079 use crate::operator::{OperatorState, OutputVec, Timer};
1080 use arrow_array::{Int64Array, RecordBatch};
1081 use std::sync::Arc;
1082 use std::time::Duration;
1083
1084 struct PassthroughOperator;
1086
1087 impl Operator for PassthroughOperator {
1088 fn process(
1089 &mut self,
1090 event: &Event,
1091 _ctx: &mut crate::operator::OperatorContext,
1092 ) -> OutputVec {
1093 let mut output = OutputVec::new();
1094 output.push(Output::Event(event.clone()));
1095 output
1096 }
1097
1098 fn on_timer(
1099 &mut self,
1100 _timer: Timer,
1101 _ctx: &mut crate::operator::OperatorContext,
1102 ) -> OutputVec {
1103 OutputVec::new()
1104 }
1105
1106 fn checkpoint(&self) -> OperatorState {
1107 OperatorState {
1108 operator_id: "passthrough".to_string(),
1109 data: vec![],
1110 }
1111 }
1112
1113 fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
1114 Ok(())
1115 }
1116 }
1117
1118 fn make_event(value: i64) -> Event {
1119 let array = Arc::new(Int64Array::from(vec![value]));
1120 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
1121 Event::new(value, batch)
1122 }
1123
1124 #[test]
1125 fn test_core_handle_spawn() {
1126 let config = CoreConfig {
1127 core_id: 0,
1128 cpu_affinity: None, inbox_capacity: 1024,
1130 outbox_capacity: 1024,
1131 reactor_config: ReactorConfig::default(),
1132 backpressure: super::BackpressureConfig::default(),
1133 numa_aware: false,
1134 enable_storage_io: false,
1135 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1136 io_uring_config: None,
1137 };
1138
1139 let handle = CoreHandle::spawn(config).unwrap();
1140 assert!(handle.is_running());
1141 assert_eq!(handle.core_id(), 0);
1142
1143 handle.shutdown_and_join().unwrap();
1144 }
1145
1146 #[test]
1147 fn test_core_handle_with_operator() {
1148 let config = CoreConfig {
1149 core_id: 0,
1150 cpu_affinity: None,
1151 inbox_capacity: 1024,
1152 outbox_capacity: 1024,
1153 reactor_config: ReactorConfig::default(),
1154 backpressure: super::BackpressureConfig::default(),
1155 numa_aware: false,
1156 enable_storage_io: false,
1157 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1158 io_uring_config: None,
1159 };
1160
1161 let handle =
1162 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1163
1164 let event = make_event(42);
1166 handle.send_event(0, event).unwrap();
1167
1168 thread::sleep(Duration::from_millis(50));
1170
1171 let outputs = handle.poll_outputs(10);
1173 assert!(!outputs.is_empty());
1174 assert_eq!(outputs[0].source_idx, 0);
1176
1177 handle.shutdown_and_join().unwrap();
1178 }
1179
1180 #[test]
1181 fn test_core_handle_multiple_events() {
1182 let config = CoreConfig {
1183 core_id: 1,
1184 cpu_affinity: None,
1185 inbox_capacity: 1024,
1186 outbox_capacity: 1024,
1187 reactor_config: ReactorConfig::default(),
1188 backpressure: super::BackpressureConfig::default(),
1189 numa_aware: false,
1190 enable_storage_io: false,
1191 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1192 io_uring_config: None,
1193 };
1194
1195 let handle =
1196 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1197
1198 for i in 0..100 {
1200 handle.send_event(0, make_event(i)).unwrap();
1201 }
1202
1203 thread::sleep(Duration::from_millis(100));
1205
1206 let mut total_outputs = 0;
1208 loop {
1209 let outputs = handle.poll_outputs(1000);
1210 if outputs.is_empty() {
1211 break;
1212 }
1213 total_outputs += outputs.len();
1214 }
1215
1216 assert!(total_outputs >= 100);
1218
1219 handle.shutdown_and_join().unwrap();
1220 }
1221
1222 #[test]
1223 fn test_core_handle_shutdown() {
1224 let config = CoreConfig::default();
1225 let handle = CoreHandle::spawn(config).unwrap();
1226
1227 assert!(handle.is_running());
1228
1229 handle.shutdown();
1230
1231 thread::sleep(Duration::from_millis(100));
1233
1234 assert!(!handle.is_running());
1236 }
1237
1238 #[test]
1239 fn test_core_handle_debug() {
1240 let config = CoreConfig {
1241 core_id: 42,
1242 ..Default::default()
1243 };
1244 let handle = CoreHandle::spawn(config).unwrap();
1245
1246 let debug_str = format!("{handle:?}");
1247 assert!(debug_str.contains("CoreHandle"));
1248 assert!(debug_str.contains("42"));
1249
1250 handle.shutdown_and_join().unwrap();
1251 }
1252
1253 #[test]
1254 fn test_core_config_default() {
1255 let config = CoreConfig::default();
1256 assert_eq!(config.core_id, 0);
1257 assert!(config.cpu_affinity.is_none());
1258 assert_eq!(config.inbox_capacity, 8192);
1259 assert_eq!(config.outbox_capacity, 8192);
1260 assert!(!config.numa_aware);
1261 }
1262
1263 #[test]
1264 fn test_core_handle_numa_node() {
1265 let config = CoreConfig {
1266 core_id: 0,
1267 cpu_affinity: None,
1268 numa_aware: true,
1269 ..Default::default()
1270 };
1271
1272 let handle = CoreHandle::spawn(config).unwrap();
1273 assert!(handle.numa_node() < 64);
1275
1276 handle.shutdown_and_join().unwrap();
1277 }
1278
1279 #[test]
1280 fn test_poll_outputs_into() {
1281 let config = CoreConfig {
1282 core_id: 0,
1283 cpu_affinity: None,
1284 inbox_capacity: 1024,
1285 outbox_capacity: 1024,
1286 reactor_config: ReactorConfig::default(),
1287 backpressure: super::BackpressureConfig::default(),
1288 numa_aware: false,
1289 enable_storage_io: false,
1290 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1291 io_uring_config: None,
1292 };
1293
1294 let handle =
1295 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1296
1297 for i in 0..10 {
1299 handle.send_event(0, make_event(i)).unwrap();
1300 }
1301
1302 thread::sleep(Duration::from_millis(100));
1304
1305 let mut buffer = Vec::with_capacity(100);
1307 let count = handle.poll_outputs_into(&mut buffer, 100);
1308
1309 assert!(count > 0);
1310 assert_eq!(buffer.len(), count);
1311
1312 let cap_before = buffer.capacity();
1314 buffer.clear();
1315 let _ = handle.poll_outputs_into(&mut buffer, 100);
1316 assert_eq!(buffer.capacity(), cap_before); handle.shutdown_and_join().unwrap();
1319 }
1320
1321 #[test]
1322 fn test_poll_each() {
1323 let config = CoreConfig {
1324 core_id: 0,
1325 cpu_affinity: None,
1326 inbox_capacity: 1024,
1327 outbox_capacity: 1024,
1328 reactor_config: ReactorConfig::default(),
1329 backpressure: super::BackpressureConfig::default(),
1330 numa_aware: false,
1331 enable_storage_io: false,
1332 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1333 io_uring_config: None,
1334 };
1335
1336 let handle =
1337 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1338
1339 for i in 0..10 {
1341 handle.send_event(0, make_event(i)).unwrap();
1342 }
1343
1344 thread::sleep(Duration::from_millis(100));
1346
1347 let mut event_count = 0;
1349 let count = handle.poll_each(100, |tagged| {
1350 if matches!(tagged.output, Output::Event(_)) {
1351 event_count += 1;
1352 }
1353 true
1354 });
1355
1356 assert!(count > 0);
1357 assert!(event_count > 0);
1358
1359 handle.shutdown_and_join().unwrap();
1360 }
1361
1362 #[test]
1363 fn test_poll_each_early_stop() {
1364 let config = CoreConfig {
1365 core_id: 0,
1366 cpu_affinity: None,
1367 inbox_capacity: 1024,
1368 outbox_capacity: 1024,
1369 reactor_config: ReactorConfig::default(),
1370 backpressure: super::BackpressureConfig::default(),
1371 numa_aware: false,
1372 enable_storage_io: false,
1373 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1374 io_uring_config: None,
1375 };
1376
1377 let handle =
1378 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1379
1380 for i in 0..20 {
1382 handle.send_event(0, make_event(i)).unwrap();
1383 }
1384
1385 thread::sleep(Duration::from_millis(100));
1387
1388 let mut processed = 0;
1390 let count = handle.poll_each(100, |_| {
1391 processed += 1;
1392 processed < 5 });
1394
1395 assert_eq!(count, 5);
1396 assert_eq!(processed, 5);
1397
1398 let remaining = handle.outbox_len();
1400 assert!(remaining > 0);
1401
1402 handle.shutdown_and_join().unwrap();
1403 }
1404
1405 #[test]
1406 fn test_watermark_propagation() {
1407 let config = CoreConfig {
1408 core_id: 0,
1409 cpu_affinity: None,
1410 inbox_capacity: 1024,
1411 outbox_capacity: 1024,
1412 reactor_config: ReactorConfig::default(),
1413 backpressure: super::BackpressureConfig::default(),
1414 numa_aware: false,
1415 enable_storage_io: false,
1416 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1417 io_uring_config: None,
1418 };
1419
1420 let handle =
1421 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1422
1423 handle.send(CoreMessage::Watermark(5000)).unwrap();
1425
1426 thread::sleep(Duration::from_millis(50));
1428
1429 let outputs = handle.poll_outputs(100);
1431 let has_watermark = outputs
1432 .iter()
1433 .any(|o| matches!(o.output, Output::Watermark(_)));
1434 assert!(
1435 has_watermark,
1436 "Expected watermark output after Watermark message"
1437 );
1438
1439 handle.shutdown_and_join().unwrap();
1440 }
1441
1442 #[test]
1443 fn test_checkpoint_triggering() {
1444 let config = CoreConfig {
1445 core_id: 0,
1446 cpu_affinity: None,
1447 inbox_capacity: 1024,
1448 outbox_capacity: 1024,
1449 reactor_config: ReactorConfig::default(),
1450 backpressure: super::BackpressureConfig::default(),
1451 numa_aware: false,
1452 enable_storage_io: false,
1453 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1454 io_uring_config: None,
1455 };
1456
1457 let handle =
1458 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1459
1460 handle.send(CoreMessage::CheckpointRequest(42)).unwrap();
1462
1463 thread::sleep(Duration::from_millis(50));
1465
1466 let outputs = handle.poll_outputs(100);
1468 let checkpoint = outputs
1469 .iter()
1470 .find(|o| matches!(o.output, Output::CheckpointComplete(_)));
1471 assert!(checkpoint.is_some(), "Expected CheckpointComplete output");
1472
1473 if let Some(TaggedOutput {
1474 output: Output::CheckpointComplete(data),
1475 ..
1476 }) = checkpoint
1477 {
1478 assert_eq!(data.checkpoint_id, 42);
1479 assert_eq!(data.operator_states.len(), 1);
1481 assert_eq!(data.operator_states[0].operator_id, "passthrough");
1482 }
1483
1484 handle.shutdown_and_join().unwrap();
1485 }
1486
1487 #[test]
1488 fn test_outputs_dropped_counter() {
1489 let config = CoreConfig {
1490 core_id: 0,
1491 cpu_affinity: None,
1492 inbox_capacity: 1024,
1493 outbox_capacity: 4, reactor_config: ReactorConfig::default(),
1495 backpressure: super::BackpressureConfig::default(),
1496 numa_aware: false,
1497 enable_storage_io: false,
1498 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1499 io_uring_config: None,
1500 };
1501
1502 let handle =
1503 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1504
1505 for i in 0..100 {
1507 let _ = handle.send_event(0, make_event(i));
1508 }
1509
1510 thread::sleep(Duration::from_millis(200));
1512
1513 let dropped = handle.outputs_dropped();
1515 assert!(
1516 dropped > 0,
1517 "Expected some outputs to be dropped with outbox_capacity=4"
1518 );
1519
1520 handle.shutdown_and_join().unwrap();
1521 }
1522
1523 #[test]
1524 fn test_barrier_handling() {
1525 let config = CoreConfig {
1526 core_id: 0,
1527 cpu_affinity: None,
1528 inbox_capacity: 1024,
1529 outbox_capacity: 1024,
1530 reactor_config: ReactorConfig::default(),
1531 backpressure: super::BackpressureConfig::default(),
1532 numa_aware: false,
1533 enable_storage_io: false,
1534 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1535 io_uring_config: None,
1536 };
1537
1538 let handle =
1539 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1540
1541 handle.send_event(0, make_event(1)).unwrap();
1543 handle.send_event(0, make_event(2)).unwrap();
1544 handle
1545 .send(CoreMessage::Barrier {
1546 source_idx: 0,
1547 barrier: CheckpointBarrier::new(99, 1),
1548 })
1549 .unwrap();
1550
1551 thread::sleep(Duration::from_millis(100));
1553
1554 let outputs = handle.poll_outputs(100);
1556 let has_barrier = outputs
1557 .iter()
1558 .any(|o| matches!(o.output, Output::Barrier(_)));
1559 assert!(has_barrier, "Expected Barrier output");
1560
1561 let barrier_tagged = outputs
1563 .iter()
1564 .find(|o| matches!(o.output, Output::Barrier(_)))
1565 .unwrap();
1566 assert_eq!(barrier_tagged.source_idx, 0);
1567
1568 handle.shutdown_and_join().unwrap();
1569 }
1570}