diff --git a/Cargo.lock b/Cargo.lock index d65ae18f..6522aa55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -631,7 +631,6 @@ dependencies = [ "conduwuit_service", "console-subscriber", "const-str", - "core_affinity", "hardened_malloc-rs", "log", "opentelemetry", @@ -716,6 +715,7 @@ dependencies = [ "clap", "conduwuit_macros", "const-str", + "core_affinity", "ctor", "cyborgtime", "either", @@ -740,6 +740,7 @@ dependencies = [ "serde_json", "serde_regex", "serde_yaml", + "sysinfo", "thiserror 2.0.7", "tikv-jemalloc-ctl", "tikv-jemalloc-sys", @@ -1675,7 +1676,7 @@ checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" dependencies = [ "cfg-if", "libc", - "windows", + "windows 0.52.0", ] [[package]] @@ -4075,6 +4076,18 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "sysinfo" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "948512566b1895f93b1592c7574baeb2de842f224f2aab158799ecadb8ebbb46" +dependencies = [ + "core-foundation-sys", + "libc", + "serde", + "windows 0.57.0", +] + [[package]] name = "tendril" version = "0.4.3" @@ -4932,7 +4945,17 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "windows-core", + "windows-core 0.52.0", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" +dependencies = [ + "windows-core 0.57.0", "windows-targets 0.52.6", ] @@ -4945,17 +4968,60 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result 0.1.2", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-registry" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ - "windows-result", + "windows-result 0.2.0", "windows-strings", "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-result" version = "0.2.0" @@ -4971,7 +5037,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" dependencies = [ - "windows-result", + "windows-result 0.2.0", "windows-targets 0.52.6", ] diff --git a/Cargo.toml b/Cargo.toml index 15f054bf..689f2b5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -497,6 +497,11 @@ version = "1.3.0" [workspace.dependencies.core_affinity] version = "0.8.1" +[workspace.dependencies.sysinfo] +version = "0.33.0" +default-features = false +features = ["disk", "serde"] + # # Patches # diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index 27c6da52..d249f647 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -64,6 +64,7 @@ chrono.workspace = true clap.workspace = true conduwuit-macros.workspace = true const-str.workspace = true +core_affinity.workspace = true ctor.workspace = true cyborgtime.workspace = true either.workspace = true @@ -86,6 +87,7 @@ serde_json.workspace = true serde_regex.workspace = true serde_yaml.workspace = true serde.workspace = true +sysinfo.workspace = true thiserror.workspace = true tikv-jemallocator.optional = true tikv-jemallocator.workspace = true diff --git a/src/core/utils/mod.rs b/src/core/utils/mod.rs index a9b73fb6..38232820 100644 --- a/src/core/utils/mod.rs +++ b/src/core/utils/mod.rs @@ -36,7 +36,7 @@ pub use self::{ rand::{shuffle, string as random_string}, stream::{IterStream, ReadyExt, Tools as StreamTools, TryReadyExt}, string::{str_from_bytes, string_from_bytes}, - sys::available_parallelism, + sys::compute::parallelism as available_parallelism, time::{now_millis as millis_since_unix_epoch, timepoint_ago, timepoint_from_now}, }; diff --git a/src/core/utils/sys.rs b/src/core/utils/sys.rs index 05ef12ca..5c5564c4 100644 --- a/src/core/utils/sys.rs +++ b/src/core/utils/sys.rs @@ -1,3 +1,10 @@ +pub mod compute; +pub mod storage; + +use std::path::PathBuf; + +pub use compute::parallelism as available_parallelism; + use crate::{debug, Result}; /// This is needed for opening lots of file descriptors, which tends to @@ -21,18 +28,6 @@ pub fn maximize_fd_limit() -> Result<(), nix::errno::Errno> { Ok(()) } -/// Get the number of threads which could execute in parallel based on the -/// hardware and administrative constraints of this system. This value should be -/// used to hint the size of thread-pools and divide-and-conquer algorithms. -/// -/// * -#[must_use] -pub fn available_parallelism() -> usize { - std::thread::available_parallelism() - .expect("Unable to query for available parallelism.") - .get() -} - /// Return a possibly corrected std::env::current_exe() even if the path is /// marked deleted. /// @@ -40,9 +35,7 @@ pub fn available_parallelism() -> usize { /// This function is declared unsafe because the original result was altered for /// security purposes, and altering it back ignores those urposes and should be /// understood by the user. -pub unsafe fn current_exe() -> Result { - use std::path::PathBuf; - +pub unsafe fn current_exe() -> Result { let exe = std::env::current_exe()?; match exe.to_str() { | None => Ok(exe), diff --git a/src/core/utils/sys/compute.rs b/src/core/utils/sys/compute.rs new file mode 100644 index 00000000..4e9ef743 --- /dev/null +++ b/src/core/utils/sys/compute.rs @@ -0,0 +1,74 @@ +//! System utilities related to compute/processing + +use std::{cell::Cell, sync::LazyLock}; + +use crate::is_equal_to; + +/// The list of cores available to the process (at startup) +static CORES_AVAILABLE: LazyLock = LazyLock::new(|| { + core_affinity::get_core_ids() + .unwrap_or_default() + .into_iter() + .map(|core_id| core_id.id) + .inspect(|&id| assert!(id < 128, "Core ID must be < 128 at least for now")) + .fold(0_u128, |mask, id| mask | (1 << id)) +}); + +thread_local! { + /// Tracks the affinity for this thread. This is updated when affinities + /// are set via our set_affinity() interface. + static CORE_AFFINITY: Cell = Cell::default(); +} + +/// Set the core affinity for this thread. The ID should be listed in +/// CORES_AVAILABLE. Empty input is a no-op; prior affinity unchanged. +pub fn set_affinity(ids: I) +where + I: Iterator, +{ + use core_affinity::{set_for_current, CoreId}; + + let mask: u128 = ids.fold(0, |mask, id| { + debug_assert!(is_core_available(id), "setting affinity to unavailable core"); + set_for_current(CoreId { id }); + mask | (1 << id) + }); + + if mask.count_ones() > 0 { + CORE_AFFINITY.replace(mask); + } +} + +/// Get the core affinity for this thread. +pub fn get_affinity() -> impl Iterator { + (0..128).filter(|&i| ((CORE_AFFINITY.get() & (1 << i)) != 0)) +} + +/// Gets the ID of the nth core available. This bijects our sequence of cores to +/// actual ID's which may have gaps for cores which are not available. +#[inline] +#[must_use] +pub fn get_core_available(i: usize) -> Option { cores_available().nth(i) } + +/// Determine if core (by id) is available to the process. +#[inline] +#[must_use] +pub fn is_core_available(id: usize) -> bool { cores_available().any(is_equal_to!(id)) } + +/// Get the list of cores available. The values were recorded at program start. +#[inline] +pub fn cores_available() -> impl Iterator { + (0..128).filter(|&i| ((*CORES_AVAILABLE & (1 << i)) != 0)) +} + +/// Get the number of threads which could execute in parallel based on the +/// hardware and administrative constraints of this system. This value should be +/// used to hint the size of thread-pools and divide-and-conquer algorithms. +/// +/// * +#[must_use] +pub fn parallelism() -> usize { + std::thread::available_parallelism() + .expect("Unable to query for available parallelism.") + .get() +} diff --git a/src/core/utils/sys/storage.rs b/src/core/utils/sys/storage.rs new file mode 100644 index 00000000..8dc75236 --- /dev/null +++ b/src/core/utils/sys/storage.rs @@ -0,0 +1,112 @@ +//! System utilities related to devices/peripherals + +use std::{ + ffi::{OsStr, OsString}, + fs, + fs::{read_to_string, FileType}, + iter::IntoIterator, + path::Path, +}; + +use crate::{result::FlatOk, Result}; + +/// Device characteristics useful for random access throughput +#[derive(Clone, Debug, Default)] +pub struct Parallelism { + /// Number of requests for the device. + pub nr_requests: Option, + + /// Individual queue characteristics. + pub mq: Vec, +} + +/// Device queue characteristics +#[derive(Clone, Debug, Default)] +pub struct Queue { + /// Queue's indice. + pub id: usize, + + /// Number of requests for the queue. + pub nr_tags: Option, + + /// CPU affinities for the queue. + pub cpu_list: Vec, +} + +/// Get device characteristics useful for random access throughput by name. +#[must_use] +pub fn parallelism(name: &OsStr) -> Parallelism { + let name = name + .to_str() + .expect("device name expected to be utf-8 representable"); + + let block_path = Path::new("/").join("sys/").join("block/"); + + let mq_path = Path::new(&block_path).join(format!("{name}/mq/")); + + let nr_requests_path = Path::new(&block_path).join(format!("{name}/queue/nr_requests")); + + Parallelism { + nr_requests: read_to_string(&nr_requests_path) + .ok() + .as_deref() + .map(str::trim) + .map(str::parse) + .flat_ok(), + + mq: fs::read_dir(&mq_path) + .into_iter() + .flat_map(IntoIterator::into_iter) + .filter_map(Result::ok) + .filter(|entry| entry.file_type().as_ref().is_ok_and(FileType::is_dir)) + .map(|dir| queue_parallelism(&dir.path())) + .collect(), + } +} + +/// Get device queue characteristics by mq path on sysfs(5) +fn queue_parallelism(dir: &Path) -> Queue { + let queue_id = dir.file_name(); + + let nr_tags_path = dir.join("nr_tags"); + + let cpu_list_path = dir.join("cpu_list"); + + Queue { + id: queue_id + .and_then(OsStr::to_str) + .map(str::parse) + .flat_ok() + .expect("queue has some numerical identifier"), + + nr_tags: read_to_string(&nr_tags_path) + .ok() + .as_deref() + .map(str::trim) + .map(str::parse) + .flat_ok(), + + cpu_list: read_to_string(&cpu_list_path) + .iter() + .flat_map(|list| list.trim().split(',')) + .map(str::trim) + .map(str::parse) + .filter_map(Result::ok) + .collect(), + } +} + +/// Get the name of the device on which Path is mounted. +#[must_use] +pub fn name_from_path(path: &Path) -> Option { + sysinfo::Disks::new_with_refreshed_list() + .into_iter() + .filter(|disk| path.starts_with(disk.mount_point())) + .max_by(|a, b| { + let a = a.mount_point().ancestors().count(); + let b = b.mount_point().ancestors().count(); + a.cmp(&b) + }) + .map(|disk| Path::new(disk.name())) + .and_then(|path| path.file_name().map(ToOwned::to_owned)) +} diff --git a/src/main/Cargo.toml b/src/main/Cargo.toml index fe24d4c1..eeb6f2bc 100644 --- a/src/main/Cargo.toml +++ b/src/main/Cargo.toml @@ -129,9 +129,6 @@ tokio_console = [ "dep:console-subscriber", "tokio/tracing", ] -worker_affinity = [ - "dep:core_affinity", -] zstd_compression = [ "conduwuit-api/zstd_compression", "conduwuit-core/zstd_compression", @@ -151,8 +148,6 @@ clap.workspace = true console-subscriber.optional = true console-subscriber.workspace = true const-str.workspace = true -core_affinity.optional = true -core_affinity.workspace = true log.workspace = true opentelemetry-jaeger.optional = true opentelemetry-jaeger.workspace = true diff --git a/src/main/runtime.rs b/src/main/runtime.rs index e98e73d6..cee093ea 100644 --- a/src/main/runtime.rs +++ b/src/main/runtime.rs @@ -1,6 +1,17 @@ -use std::{sync::OnceLock, thread, time::Duration}; +use std::{ + iter::once, + sync::{ + atomic::{AtomicUsize, Ordering}, + OnceLock, + }, + thread, + time::Duration, +}; -use conduwuit::Result; +use conduwuit::{ + utils::sys::compute::{get_core_available, set_affinity}, + Result, +}; use tokio::runtime::Builder; use crate::clap::Args; @@ -66,7 +77,6 @@ fn enable_histogram(builder: &mut Builder, args: &Args) { ), )] fn thread_start() { - #[cfg(feature = "worker_affinity")] if WORKER_AFFINITY .get() .copied() @@ -76,24 +86,8 @@ fn thread_start() { } } -#[cfg(feature = "worker_affinity")] fn set_worker_affinity() { - use std::sync::{ - atomic::{AtomicUsize, Ordering}, - LazyLock, - }; - static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0); - static CORES_AVAILABLE: LazyLock>> = LazyLock::new(|| { - core_affinity::get_core_ids().map(|mut cores| { - cores.sort_unstable(); - cores - }) - }); - - let Some(cores) = CORES_AVAILABLE.as_ref() else { - return; - }; if thread::current().name() != Some(WORKER_NAME) { return; @@ -106,11 +100,11 @@ fn set_worker_affinity() { return; } - let Some(id) = cores.get(i) else { + let Some(id) = get_core_available(i) else { return; }; - let _set = core_affinity::set_for_current(*id); + set_affinity(once(id)); } #[tracing::instrument(