laminar_core/numa/
topology.rs1#[cfg(any(feature = "hwloc", target_os = "linux"))]
9use super::NumaError;
10
11#[derive(Debug, Clone)]
16pub struct NumaTopology {
17 num_nodes: usize,
19 cpus_per_node: Vec<Vec<usize>>,
21 cpu_to_node: Vec<usize>,
23}
24
25impl NumaTopology {
26 #[must_use]
35 pub fn detect() -> Self {
36 #[cfg(feature = "hwloc")]
38 {
39 if let Ok(topo) = Self::detect_hwlocality() {
40 return topo;
41 }
42 }
43
44 #[cfg(target_os = "linux")]
46 {
47 if let Ok(topo) = Self::detect_sysfs() {
48 return topo;
49 }
50 }
51
52 Self::single_node_fallback()
54 }
55
56 #[cfg(feature = "hwloc")]
58 fn detect_hwlocality() -> Result<Self, NumaError> {
59 use hwlocality::Topology;
60
61 let hwloc_topo = Topology::new()
62 .map_err(|e| NumaError::TopologyError(format!("hwlocality init failed: {e}")))?;
63
64 let numa_nodes: Vec<_> = hwloc_topo
65 .objects_with_type(hwlocality::object::types::ObjectType::NUMANode)
66 .collect();
67
68 if numa_nodes.is_empty() {
69 return Err(NumaError::TopologyError(
70 "No NUMA nodes found via hwlocality".to_string(),
71 ));
72 }
73
74 let node_count = numa_nodes.len();
75 let mut cpus_per_node = vec![Vec::new(); node_count];
76
77 let total_cpus = hwloc_topo
79 .objects_with_type(hwlocality::object::types::ObjectType::PU)
80 .count();
81
82 let mut cpu_to_node = vec![0usize; total_cpus];
83
84 for (node_idx, numa_node) in numa_nodes.iter().enumerate() {
85 if let Some(cpuset) = numa_node.cpuset() {
87 for cpu in cpuset.iter_set() {
88 let cpu_idx = usize::from(cpu);
89 if cpu_idx < total_cpus {
90 cpus_per_node[node_idx].push(cpu_idx);
91 cpu_to_node[cpu_idx] = node_idx;
92 }
93 }
94 }
95 }
96
97 Ok(Self {
98 num_nodes: node_count,
99 cpus_per_node,
100 cpu_to_node,
101 })
102 }
103
104 #[cfg(target_os = "linux")]
106 fn detect_sysfs() -> Result<Self, NumaError> {
107 use std::fs;
108 use std::path::Path;
109
110 let node_path = Path::new("/sys/devices/system/node");
111 if !node_path.exists() {
112 return Err(NumaError::TopologyError(
113 "sysfs node path not found".to_string(),
114 ));
115 }
116
117 let mut node_dirs: Vec<usize> = Vec::new();
119 for entry in fs::read_dir(node_path)
120 .map_err(|e| NumaError::TopologyError(format!("Failed to read node dir: {e}")))?
121 {
122 let entry = entry
123 .map_err(|e| NumaError::TopologyError(format!("Failed to read entry: {e}")))?;
124 let name = entry.file_name();
125 let name_str = name.to_string_lossy();
126 if let Some(suffix) = name_str.strip_prefix("node") {
127 if let Ok(node_id) = suffix.parse::<usize>() {
128 node_dirs.push(node_id);
129 }
130 }
131 }
132
133 if node_dirs.is_empty() {
134 return Err(NumaError::TopologyError("No NUMA nodes found".to_string()));
135 }
136
137 node_dirs.sort_unstable();
138 let num_nodes = node_dirs.iter().max().map_or(1, |m| m + 1);
139
140 let total_cpus = Self::get_cpu_count();
142
143 let mut cpus_per_node = vec![Vec::new(); num_nodes];
144 let mut cpu_to_node = vec![0usize; total_cpus];
145
146 for node_id in &node_dirs {
147 let node_dir = node_path.join(format!("node{node_id}"));
148
149 let cpulist_path = node_dir.join("cpulist");
151 if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
152 let cpus = Self::parse_cpulist(cpulist.trim());
153 for cpu in &cpus {
154 if *cpu < total_cpus {
155 cpu_to_node[*cpu] = *node_id;
156 }
157 }
158 cpus_per_node[*node_id] = cpus;
159 }
160 }
161
162 Ok(Self {
163 num_nodes,
164 cpus_per_node,
165 cpu_to_node,
166 })
167 }
168
169 #[cfg(target_os = "linux")]
171 fn get_cpu_count() -> usize {
172 use std::fs;
173
174 let online_path = "/sys/devices/system/cpu/online";
176 if let Ok(online) = fs::read_to_string(online_path) {
177 let cpus = Self::parse_cpulist(online.trim());
178 if let Some(max) = cpus.iter().max() {
179 return max + 1;
180 }
181 }
182
183 std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
185 }
186
187 #[cfg(target_os = "linux")]
189 fn parse_cpulist(s: &str) -> Vec<usize> {
190 let mut cpus = Vec::new();
191
192 for part in s.split(',') {
193 let part = part.trim();
194 if part.is_empty() {
195 continue;
196 }
197
198 if let Some((start, end)) = part.split_once('-') {
199 if let (Ok(start), Ok(end)) = (start.parse::<usize>(), end.parse::<usize>()) {
200 cpus.extend(start..=end);
201 }
202 } else if let Ok(cpu) = part.parse::<usize>() {
203 cpus.push(cpu);
204 }
205 }
206
207 cpus
208 }
209
210 fn single_node_fallback() -> Self {
212 let total_cpus = std::thread::available_parallelism().map_or(1, std::num::NonZero::get);
213 let cpus: Vec<usize> = (0..total_cpus).collect();
214
215 Self {
216 num_nodes: 1,
217 cpus_per_node: vec![cpus],
218 cpu_to_node: vec![0; total_cpus],
219 }
220 }
221
222 #[must_use]
224 pub fn num_nodes(&self) -> usize {
225 self.num_nodes
226 }
227
228 #[must_use]
232 pub fn cpus_for_node(&self, node: usize) -> &[usize] {
233 self.cpus_per_node.get(node).map_or(&[], Vec::as_slice)
234 }
235
236 #[must_use]
240 pub fn node_for_cpu(&self, cpu: usize) -> usize {
241 self.cpu_to_node.get(cpu).copied().unwrap_or(0)
242 }
243
244 #[must_use]
246 pub fn current_node(&self) -> usize {
247 let cpu = Self::current_cpu();
248 self.node_for_cpu(cpu)
249 }
250
251 #[cfg(target_os = "linux")]
260 pub fn bind_local_memory(&self) -> Result<(), NumaError> {
261 let node = self.current_node();
262 let nodemask: libc::c_ulong = 1 << node;
263 let ret = unsafe {
266 libc::syscall(
267 libc::SYS_set_mempolicy,
268 libc::MPOL_BIND,
269 &raw const nodemask,
270 self.num_nodes + 1,
271 )
272 };
273 if ret != 0 {
274 return Err(NumaError::BindFailed(std::io::Error::last_os_error()));
275 }
276 Ok(())
277 }
278
279 #[must_use]
281 pub fn current_cpu() -> usize {
282 #[cfg(target_os = "linux")]
283 {
284 let cpu = unsafe { libc::sched_getcpu() };
286 if cpu >= 0 {
287 #[allow(clippy::cast_sign_loss)]
288 return cpu as usize;
289 }
290 }
291
292 0
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn test_detect() {
303 let topo = NumaTopology::detect();
304 assert!(topo.num_nodes() >= 1);
305 }
306
307 #[test]
308 fn test_cpus_for_node() {
309 let topo = NumaTopology::detect();
310 let cpus = topo.cpus_for_node(0);
311 assert!(!cpus.is_empty());
312 }
313
314 #[test]
315 fn test_node_for_cpu() {
316 let topo = NumaTopology::detect();
317 let node = topo.node_for_cpu(0);
318 assert!(node < topo.num_nodes());
319 }
320
321 #[test]
322 fn test_current_node() {
323 let topo = NumaTopology::detect();
324 let node = topo.current_node();
325 assert!(node < topo.num_nodes());
326 }
327
328 #[test]
329 #[cfg(target_os = "linux")]
330 fn test_parse_cpulist() {
331 assert_eq!(NumaTopology::parse_cpulist("0"), vec![0]);
332 assert_eq!(NumaTopology::parse_cpulist("0-3"), vec![0, 1, 2, 3]);
333 assert_eq!(NumaTopology::parse_cpulist("0,2,4"), vec![0, 2, 4]);
334 assert_eq!(
335 NumaTopology::parse_cpulist("0-3,8-11"),
336 vec![0, 1, 2, 3, 8, 9, 10, 11]
337 );
338 }
339
340 #[test]
341 fn test_single_node_fallback() {
342 let topo = NumaTopology::single_node_fallback();
343 assert_eq!(topo.num_nodes(), 1);
344 assert_eq!(topo.node_for_cpu(0), 0);
345 }
346}