diff --git a/src/core/utils/mod.rs b/src/core/utils/mod.rs index 2bbadb50..4b5330ed 100644 --- a/src/core/utils/mod.rs +++ b/src/core/utils/mod.rs @@ -37,7 +37,7 @@ pub use self::{ rand::{shuffle, string as random_string}, stream::{IterStream, ReadyExt, Tools as StreamTools, TryReadyExt}, string::{str_from_bytes, string_from_bytes}, - sys::compute::parallelism as available_parallelism, + sys::compute::available_parallelism, time::{ exponential_backoff::{continue_exponential_backoff, continue_exponential_backoff_secs}, now_millis as millis_since_unix_epoch, timepoint_ago, timepoint_from_now, diff --git a/src/core/utils/sys.rs b/src/core/utils/sys.rs index 5c5564c4..a0d5be52 100644 --- a/src/core/utils/sys.rs +++ b/src/core/utils/sys.rs @@ -3,7 +3,7 @@ pub mod storage; use std::path::PathBuf; -pub use compute::parallelism as available_parallelism; +pub use compute::available_parallelism; use crate::{debug, Result}; diff --git a/src/core/utils/sys/compute.rs b/src/core/utils/sys/compute.rs index 9e90fc90..ce2aa504 100644 --- a/src/core/utils/sys/compute.rs +++ b/src/core/utils/sys/compute.rs @@ -1,23 +1,31 @@ //! System utilities related to compute/processing -use std::{cell::Cell, fmt::Debug, sync::LazyLock}; +use std::{cell::Cell, fmt::Debug, path::PathBuf, sync::LazyLock}; -use crate::is_equal_to; +use crate::{is_equal_to, Result}; -/// The list of cores available to the process (at startup) -static CORES_AVAILABLE: LazyLock = LazyLock::new(|| { - core_affinity::get_core_ids() - .unwrap_or_default() - .into_iter() - .map(|core_id| core_id.id) - .inspect(|&id| debug_assert!(id < 128, "Core ID must be < 128 at least for now")) - .fold(0_u128, |mask, id| mask | (1 << id)) -}); +type Id = usize; + +type Mask = u128; +type Masks = [Mask; MASK_BITS]; + +const MASK_BITS: usize = 128; + +/// The mask of logical cores available to the process (at startup). +static CORES_AVAILABLE: LazyLock = LazyLock::new(|| into_mask(query_cores_available())); + +/// Stores the mask of logical-cores with thread/HT/SMT association. Each group +/// here makes up a physical-core. +static SMT_TOPOLOGY: LazyLock = LazyLock::new(init_smt_topology); + +/// Stores the mask of logical-core associations on a node/socket. Bits are set +/// for all logical cores within all physical cores of the node. +static NODE_TOPOLOGY: LazyLock = LazyLock::new(init_node_topology); thread_local! { /// Tracks the affinity for this thread. This is updated when affinities /// are set via our set_affinity() interface. - static CORE_AFFINITY: Cell = Cell::default(); + static CORE_AFFINITY: Cell = Cell::default(); } /// Set the core affinity for this thread. The ID should be listed in @@ -28,19 +36,19 @@ thread_local! { fields( id = ?std::thread::current().id(), name = %std::thread::current().name().unwrap_or("None"), - set = ?ids.by_ref().collect::>(), + set = ?ids.clone().collect::>(), CURRENT = %format!("[b{:b}]", CORE_AFFINITY.get()), AVAILABLE = %format!("[b{:b}]", *CORES_AVAILABLE), ), )] pub fn set_affinity(mut ids: I) where - I: Iterator + Clone + Debug, + I: Iterator + Clone + Debug, { use core_affinity::{set_each_for_current, set_for_current, CoreId}; let n = ids.clone().count(); - let mask: u128 = ids.clone().fold(0, |mask, id| { + let mask: Mask = ids.clone().fold(0, |mask, id| { debug_assert!(is_core_available(id), "setting affinity to unavailable core"); mask | (1 << id) }); @@ -57,35 +65,110 @@ where } /// Get the core affinity for this thread. -pub fn get_affinity() -> impl Iterator { iter_bits(CORE_AFFINITY.get()) } +pub fn get_affinity() -> impl Iterator { from_mask(CORE_AFFINITY.get()) } + +/// List the cores sharing SMT-tier resources +pub fn smt_siblings() -> impl Iterator { + from_mask(get_affinity().fold(0_u128, |mask, id| { + mask | SMT_TOPOLOGY.get(id).expect("ID must not exceed max cpus") + })) +} + +/// List the cores sharing Node-tier resources relative to this threads current +/// affinity. +pub fn node_siblings() -> impl Iterator { + from_mask(get_affinity().fold(0_u128, |mask, id| { + mask | NODE_TOPOLOGY.get(id).expect("Id must not exceed max cpus") + })) +} + +/// Get the cores sharing SMT resources relative to id. +#[inline] +pub fn smt_affinity(id: Id) -> impl Iterator { + from_mask(*SMT_TOPOLOGY.get(id).expect("ID must not exceed max cpus")) +} + +/// Get the cores sharing Node resources relative to id. +#[inline] +pub fn node_affinity(id: Id) -> impl Iterator { + from_mask(*NODE_TOPOLOGY.get(id).expect("ID must not exceed max cpus")) +} + +/// Get the number of threads which could execute in parallel based on hardware +/// constraints of this system. +#[inline] +#[must_use] +pub fn available_parallelism() -> usize { cores_available().count() } /// Gets the ID of the nth core available. This bijects our sequence of cores to /// actual ID's which may have gaps for cores which are not available. #[inline] #[must_use] -pub fn nth_core_available(i: usize) -> Option { cores_available().nth(i) } +pub fn nth_core_available(i: usize) -> Option { cores_available().nth(i) } /// Determine if core (by id) is available to the process. #[inline] #[must_use] -pub fn is_core_available(id: usize) -> bool { cores_available().any(is_equal_to!(id)) } +pub fn is_core_available(id: Id) -> bool { cores_available().any(is_equal_to!(id)) } /// Get the list of cores available. The values were recorded at program start. #[inline] -pub fn cores_available() -> impl Iterator { iter_bits(*CORES_AVAILABLE) } +pub fn cores_available() -> impl Iterator { from_mask(*CORES_AVAILABLE) } -/// Get the number of threads which could execute in parallel based on the -/// hardware and administrative constraints of this system. This value should be -/// used to hint the size of thread-pools and divide-and-conquer algorithms. -/// -/// * -#[must_use] -pub fn parallelism() -> usize { - std::thread::available_parallelism() - .expect("Unable to query for available parallelism.") - .get() +#[cfg(target_os = "linux")] +#[inline] +pub fn getcpu() -> Result { + use crate::{utils::math, Error}; + + // SAFETY: This is part of an interface with many low-level calls taking many + // raw params, but it's unclear why this specific call is unsafe. Nevertheless + // the value obtained here is semantically unsafe because it can change on the + // instruction boundary trailing its own acquisition and also any other time. + let ret: i32 = unsafe { libc::sched_getcpu() }; + + #[cfg(target_os = "linux")] + // SAFETY: On modern linux systems with a vdso if we can optimize away the branch checking + // for error (see getcpu(2)) then this system call becomes a memory access. + unsafe { + std::hint::assert_unchecked(ret >= 0); + }; + + if ret == -1 { + return Err(Error::from_errno()); + } + + math::try_into(ret) } -fn iter_bits(v: u128) -> impl Iterator { - (0..128).filter(move |&i| (v & (1 << i)) != 0) +#[cfg(not(target_os = "linux"))] +#[inline] +pub fn getcpu() -> Result { Err(crate::Error::Io(std::io::ErrorKind::Unsupported.into())) } + +fn query_cores_available() -> impl Iterator { + core_affinity::get_core_ids() + .unwrap_or_default() + .into_iter() + .map(|core_id| core_id.id) +} + +fn init_smt_topology() -> [Mask; MASK_BITS] { [Mask::default(); MASK_BITS] } + +fn init_node_topology() -> [Mask; MASK_BITS] { [Mask::default(); MASK_BITS] } + +fn into_mask(ids: I) -> Mask +where + I: Iterator, +{ + ids.inspect(|&id| { + debug_assert!(id < MASK_BITS, "Core ID must be < Mask::BITS at least for now"); + }) + .fold(Mask::default(), |mask, id| mask | (1 << id)) +} + +fn from_mask(v: Mask) -> impl Iterator { + (0..MASK_BITS).filter(move |&i| (v & (1 << i)) != 0) +} + +fn _sys_path(id: usize, suffix: &str) -> PathBuf { + format!("/sys/devices/system/cpu/cpu{id}/{suffix}").into() }