additional affinity utils
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
7045481fae
commit
df3eb95d4f
3 changed files with 116 additions and 33 deletions
|
@ -37,7 +37,7 @@ pub use self::{
|
||||||
rand::{shuffle, string as random_string},
|
rand::{shuffle, string as random_string},
|
||||||
stream::{IterStream, ReadyExt, Tools as StreamTools, TryReadyExt},
|
stream::{IterStream, ReadyExt, Tools as StreamTools, TryReadyExt},
|
||||||
string::{str_from_bytes, string_from_bytes},
|
string::{str_from_bytes, string_from_bytes},
|
||||||
sys::compute::parallelism as available_parallelism,
|
sys::compute::available_parallelism,
|
||||||
time::{
|
time::{
|
||||||
exponential_backoff::{continue_exponential_backoff, continue_exponential_backoff_secs},
|
exponential_backoff::{continue_exponential_backoff, continue_exponential_backoff_secs},
|
||||||
now_millis as millis_since_unix_epoch, timepoint_ago, timepoint_from_now,
|
now_millis as millis_since_unix_epoch, timepoint_ago, timepoint_from_now,
|
||||||
|
|
|
@ -3,7 +3,7 @@ pub mod storage;
|
||||||
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
pub use compute::parallelism as available_parallelism;
|
pub use compute::available_parallelism;
|
||||||
|
|
||||||
use crate::{debug, Result};
|
use crate::{debug, Result};
|
||||||
|
|
||||||
|
|
|
@ -1,23 +1,31 @@
|
||||||
//! System utilities related to compute/processing
|
//! System utilities related to compute/processing
|
||||||
|
|
||||||
use std::{cell::Cell, fmt::Debug, sync::LazyLock};
|
use std::{cell::Cell, fmt::Debug, path::PathBuf, sync::LazyLock};
|
||||||
|
|
||||||
use crate::is_equal_to;
|
use crate::{is_equal_to, Result};
|
||||||
|
|
||||||
/// The list of cores available to the process (at startup)
|
type Id = usize;
|
||||||
static CORES_AVAILABLE: LazyLock<u128> = LazyLock::new(|| {
|
|
||||||
core_affinity::get_core_ids()
|
type Mask = u128;
|
||||||
.unwrap_or_default()
|
type Masks = [Mask; MASK_BITS];
|
||||||
.into_iter()
|
|
||||||
.map(|core_id| core_id.id)
|
const MASK_BITS: usize = 128;
|
||||||
.inspect(|&id| debug_assert!(id < 128, "Core ID must be < 128 at least for now"))
|
|
||||||
.fold(0_u128, |mask, id| mask | (1 << id))
|
/// The mask of logical cores available to the process (at startup).
|
||||||
});
|
static CORES_AVAILABLE: LazyLock<Mask> = LazyLock::new(|| into_mask(query_cores_available()));
|
||||||
|
|
||||||
|
/// Stores the mask of logical-cores with thread/HT/SMT association. Each group
|
||||||
|
/// here makes up a physical-core.
|
||||||
|
static SMT_TOPOLOGY: LazyLock<Masks> = LazyLock::new(init_smt_topology);
|
||||||
|
|
||||||
|
/// Stores the mask of logical-core associations on a node/socket. Bits are set
|
||||||
|
/// for all logical cores within all physical cores of the node.
|
||||||
|
static NODE_TOPOLOGY: LazyLock<Masks> = LazyLock::new(init_node_topology);
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
/// Tracks the affinity for this thread. This is updated when affinities
|
/// Tracks the affinity for this thread. This is updated when affinities
|
||||||
/// are set via our set_affinity() interface.
|
/// are set via our set_affinity() interface.
|
||||||
static CORE_AFFINITY: Cell<u128> = Cell::default();
|
static CORE_AFFINITY: Cell<Mask> = Cell::default();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the core affinity for this thread. The ID should be listed in
|
/// Set the core affinity for this thread. The ID should be listed in
|
||||||
|
@ -28,19 +36,19 @@ thread_local! {
|
||||||
fields(
|
fields(
|
||||||
id = ?std::thread::current().id(),
|
id = ?std::thread::current().id(),
|
||||||
name = %std::thread::current().name().unwrap_or("None"),
|
name = %std::thread::current().name().unwrap_or("None"),
|
||||||
set = ?ids.by_ref().collect::<Vec<_>>(),
|
set = ?ids.clone().collect::<Vec<_>>(),
|
||||||
CURRENT = %format!("[b{:b}]", CORE_AFFINITY.get()),
|
CURRENT = %format!("[b{:b}]", CORE_AFFINITY.get()),
|
||||||
AVAILABLE = %format!("[b{:b}]", *CORES_AVAILABLE),
|
AVAILABLE = %format!("[b{:b}]", *CORES_AVAILABLE),
|
||||||
),
|
),
|
||||||
)]
|
)]
|
||||||
pub fn set_affinity<I>(mut ids: I)
|
pub fn set_affinity<I>(mut ids: I)
|
||||||
where
|
where
|
||||||
I: Iterator<Item = usize> + Clone + Debug,
|
I: Iterator<Item = Id> + Clone + Debug,
|
||||||
{
|
{
|
||||||
use core_affinity::{set_each_for_current, set_for_current, CoreId};
|
use core_affinity::{set_each_for_current, set_for_current, CoreId};
|
||||||
|
|
||||||
let n = ids.clone().count();
|
let n = ids.clone().count();
|
||||||
let mask: u128 = ids.clone().fold(0, |mask, id| {
|
let mask: Mask = ids.clone().fold(0, |mask, id| {
|
||||||
debug_assert!(is_core_available(id), "setting affinity to unavailable core");
|
debug_assert!(is_core_available(id), "setting affinity to unavailable core");
|
||||||
mask | (1 << id)
|
mask | (1 << id)
|
||||||
});
|
});
|
||||||
|
@ -57,35 +65,110 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the core affinity for this thread.
|
/// Get the core affinity for this thread.
|
||||||
pub fn get_affinity() -> impl Iterator<Item = usize> { iter_bits(CORE_AFFINITY.get()) }
|
pub fn get_affinity() -> impl Iterator<Item = Id> { from_mask(CORE_AFFINITY.get()) }
|
||||||
|
|
||||||
|
/// List the cores sharing SMT-tier resources
|
||||||
|
pub fn smt_siblings() -> impl Iterator<Item = Id> {
|
||||||
|
from_mask(get_affinity().fold(0_u128, |mask, id| {
|
||||||
|
mask | SMT_TOPOLOGY.get(id).expect("ID must not exceed max cpus")
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List the cores sharing Node-tier resources relative to this threads current
|
||||||
|
/// affinity.
|
||||||
|
pub fn node_siblings() -> impl Iterator<Item = Id> {
|
||||||
|
from_mask(get_affinity().fold(0_u128, |mask, id| {
|
||||||
|
mask | NODE_TOPOLOGY.get(id).expect("Id must not exceed max cpus")
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the cores sharing SMT resources relative to id.
|
||||||
|
#[inline]
|
||||||
|
pub fn smt_affinity(id: Id) -> impl Iterator<Item = Id> {
|
||||||
|
from_mask(*SMT_TOPOLOGY.get(id).expect("ID must not exceed max cpus"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the cores sharing Node resources relative to id.
|
||||||
|
#[inline]
|
||||||
|
pub fn node_affinity(id: Id) -> impl Iterator<Item = Id> {
|
||||||
|
from_mask(*NODE_TOPOLOGY.get(id).expect("ID must not exceed max cpus"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the number of threads which could execute in parallel based on hardware
|
||||||
|
/// constraints of this system.
|
||||||
|
#[inline]
|
||||||
|
#[must_use]
|
||||||
|
pub fn available_parallelism() -> usize { cores_available().count() }
|
||||||
|
|
||||||
/// Gets the ID of the nth core available. This bijects our sequence of cores to
|
/// Gets the ID of the nth core available. This bijects our sequence of cores to
|
||||||
/// actual ID's which may have gaps for cores which are not available.
|
/// actual ID's which may have gaps for cores which are not available.
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn nth_core_available(i: usize) -> Option<usize> { cores_available().nth(i) }
|
pub fn nth_core_available(i: usize) -> Option<Id> { cores_available().nth(i) }
|
||||||
|
|
||||||
/// Determine if core (by id) is available to the process.
|
/// Determine if core (by id) is available to the process.
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn is_core_available(id: usize) -> bool { cores_available().any(is_equal_to!(id)) }
|
pub fn is_core_available(id: Id) -> bool { cores_available().any(is_equal_to!(id)) }
|
||||||
|
|
||||||
/// Get the list of cores available. The values were recorded at program start.
|
/// Get the list of cores available. The values were recorded at program start.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn cores_available() -> impl Iterator<Item = usize> { iter_bits(*CORES_AVAILABLE) }
|
pub fn cores_available() -> impl Iterator<Item = Id> { from_mask(*CORES_AVAILABLE) }
|
||||||
|
|
||||||
/// Get the number of threads which could execute in parallel based on the
|
#[cfg(target_os = "linux")]
|
||||||
/// hardware and administrative constraints of this system. This value should be
|
#[inline]
|
||||||
/// used to hint the size of thread-pools and divide-and-conquer algorithms.
|
pub fn getcpu() -> Result<usize> {
|
||||||
///
|
use crate::{utils::math, Error};
|
||||||
/// * <https://doc.rust-lang.org/std/thread/fn.available_parallelism.html>
|
|
||||||
#[must_use]
|
// SAFETY: This is part of an interface with many low-level calls taking many
|
||||||
pub fn parallelism() -> usize {
|
// raw params, but it's unclear why this specific call is unsafe. Nevertheless
|
||||||
std::thread::available_parallelism()
|
// the value obtained here is semantically unsafe because it can change on the
|
||||||
.expect("Unable to query for available parallelism.")
|
// instruction boundary trailing its own acquisition and also any other time.
|
||||||
.get()
|
let ret: i32 = unsafe { libc::sched_getcpu() };
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
// SAFETY: On modern linux systems with a vdso if we can optimize away the branch checking
|
||||||
|
// for error (see getcpu(2)) then this system call becomes a memory access.
|
||||||
|
unsafe {
|
||||||
|
std::hint::assert_unchecked(ret >= 0);
|
||||||
|
};
|
||||||
|
|
||||||
|
if ret == -1 {
|
||||||
|
return Err(Error::from_errno());
|
||||||
|
}
|
||||||
|
|
||||||
|
math::try_into(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iter_bits(v: u128) -> impl Iterator<Item = usize> {
|
#[cfg(not(target_os = "linux"))]
|
||||||
(0..128).filter(move |&i| (v & (1 << i)) != 0)
|
#[inline]
|
||||||
|
pub fn getcpu() -> Result<usize> { Err(crate::Error::Io(std::io::ErrorKind::Unsupported.into())) }
|
||||||
|
|
||||||
|
fn query_cores_available() -> impl Iterator<Item = Id> {
|
||||||
|
core_affinity::get_core_ids()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.map(|core_id| core_id.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_smt_topology() -> [Mask; MASK_BITS] { [Mask::default(); MASK_BITS] }
|
||||||
|
|
||||||
|
fn init_node_topology() -> [Mask; MASK_BITS] { [Mask::default(); MASK_BITS] }
|
||||||
|
|
||||||
|
fn into_mask<I>(ids: I) -> Mask
|
||||||
|
where
|
||||||
|
I: Iterator<Item = Id>,
|
||||||
|
{
|
||||||
|
ids.inspect(|&id| {
|
||||||
|
debug_assert!(id < MASK_BITS, "Core ID must be < Mask::BITS at least for now");
|
||||||
|
})
|
||||||
|
.fold(Mask::default(), |mask, id| mask | (1 << id))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_mask(v: Mask) -> impl Iterator<Item = Id> {
|
||||||
|
(0..MASK_BITS).filter(move |&i| (v & (1 << i)) != 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _sys_path(id: usize, suffix: &str) -> PathBuf {
|
||||||
|
format!("/sys/devices/system/cpu/cpu{id}/{suffix}").into()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue