diff --git a/Cargo.lock b/Cargo.lock index 0c1890c4..7f9ef547 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -937,8 +937,7 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "core_affinity" version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622892f5635ce1fc38c8f16dfc938553ed64af482edb5e150bf4caedbfcb2304" +source = "git+https://github.com/jevolk/core_affinity_rs?rev=9c8e51510c35077df888ee72a36b4b05637147da#9c8e51510c35077df888ee72a36b4b05637147da" dependencies = [ "libc", "num_cpus", diff --git a/Cargo.toml b/Cargo.toml index 022baaa3..c66dfcff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -535,6 +535,10 @@ rev = "96d7e0fc026d8f708b19bc9267a382676a50354c" git = "https://github.com/jevolk/async-channel" rev = "fefa543ca5eddf21237d75776fce98b7e09e924a" +[patch.crates-io.core_affinity] +git = "https://github.com/jevolk/core_affinity_rs" +rev = "9c8e51510c35077df888ee72a36b4b05637147da" + # # Our crates # diff --git a/src/core/utils/sys/compute.rs b/src/core/utils/sys/compute.rs index e947b579..9e90fc90 100644 --- a/src/core/utils/sys/compute.rs +++ b/src/core/utils/sys/compute.rs @@ -1,6 +1,6 @@ //! System utilities related to compute/processing -use std::{cell::Cell, sync::LazyLock}; +use std::{cell::Cell, fmt::Debug, sync::LazyLock}; use crate::is_equal_to; @@ -22,33 +22,48 @@ thread_local! { /// Set the core affinity for this thread. The ID should be listed in /// CORES_AVAILABLE. Empty input is a no-op; prior affinity unchanged. -pub fn set_affinity(ids: I) +#[tracing::instrument( + level = "debug", + skip_all, + fields( + id = ?std::thread::current().id(), + name = %std::thread::current().name().unwrap_or("None"), + set = ?ids.by_ref().collect::>(), + CURRENT = %format!("[b{:b}]", CORE_AFFINITY.get()), + AVAILABLE = %format!("[b{:b}]", *CORES_AVAILABLE), + ), +)] +pub fn set_affinity(mut ids: I) where - I: Iterator, + I: Iterator + Clone + Debug, { - use core_affinity::{set_for_current, CoreId}; + use core_affinity::{set_each_for_current, set_for_current, CoreId}; - let mask: u128 = ids.fold(0, |mask, id| { + let n = ids.clone().count(); + let mask: u128 = ids.clone().fold(0, |mask, id| { debug_assert!(is_core_available(id), "setting affinity to unavailable core"); - set_for_current(CoreId { id }); mask | (1 << id) }); + if n > 1 { + set_each_for_current(ids.map(|id| CoreId { id })); + } else if n > 0 { + set_for_current(CoreId { id: ids.next().expect("n > 0") }); + } + if mask.count_ones() > 0 { CORE_AFFINITY.replace(mask); } } /// Get the core affinity for this thread. -pub fn get_affinity() -> impl Iterator { - (0..128).filter(|&i| ((CORE_AFFINITY.get() & (1 << i)) != 0)) -} +pub fn get_affinity() -> impl Iterator { iter_bits(CORE_AFFINITY.get()) } /// Gets the ID of the nth core available. This bijects our sequence of cores to /// actual ID's which may have gaps for cores which are not available. #[inline] #[must_use] -pub fn get_core_available(i: usize) -> Option { cores_available().nth(i) } +pub fn nth_core_available(i: usize) -> Option { cores_available().nth(i) } /// Determine if core (by id) is available to the process. #[inline] @@ -57,9 +72,7 @@ pub fn is_core_available(id: usize) -> bool { cores_available().any(is_equal_to! /// Get the list of cores available. The values were recorded at program start. #[inline] -pub fn cores_available() -> impl Iterator { - (0..128).filter(|&i| ((*CORES_AVAILABLE & (1 << i)) != 0)) -} +pub fn cores_available() -> impl Iterator { iter_bits(*CORES_AVAILABLE) } /// Get the number of threads which could execute in parallel based on the /// hardware and administrative constraints of this system. This value should be @@ -72,3 +85,7 @@ pub fn parallelism() -> usize { .expect("Unable to query for available parallelism.") .get() } + +fn iter_bits(v: u128) -> impl Iterator { + (0..128).filter(move |&i| (v & (1 << i)) != 0) +} diff --git a/src/database/pool.rs b/src/database/pool.rs index 1c55c456..8182f217 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -13,7 +13,7 @@ use conduwuit::{ debug, debug_warn, defer, err, implement, result::DebugInspect, trace, - utils::sys::compute::{get_affinity, get_core_available, set_affinity}, + utils::sys::compute::{get_affinity, nth_core_available, set_affinity}, Result, Server, }; use futures::{channel::oneshot, TryFutureExt}; @@ -270,7 +270,7 @@ fn worker_init(&self, id: usize) { .enumerate() .filter(|_| self.queues.len() > 1) .filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id)) - .filter_map(get_core_available); + .filter_map(nth_core_available); // affinity is empty (no-op) if there's only one queue set_affinity(affinity.clone()); diff --git a/src/main/runtime.rs b/src/main/runtime.rs index bfd2ef74..b9dfc866 100644 --- a/src/main/runtime.rs +++ b/src/main/runtime.rs @@ -9,7 +9,7 @@ use std::{ }; use conduwuit::{ - utils::sys::compute::{get_core_available, set_affinity}, + utils::sys::compute::{nth_core_available, set_affinity}, Result, }; use tokio::runtime::Builder; @@ -100,7 +100,7 @@ fn set_worker_affinity() { return; } - let Some(id) = get_core_available(i) else { + let Some(id) = nth_core_available(i) else { return; };