From 71673b2a886cd7ec1693f09bff77c3f12f7a7aa2 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 17 Dec 2024 14:32:54 +0000 Subject: [PATCH] add worker_affinity feature split runtime init from main.rs Signed-off-by: Jason Volk --- Cargo.lock | 12 ++++ Cargo.toml | 3 + src/main/Cargo.toml | 57 ++++++++-------- src/main/main.rs | 29 ++------ src/main/runtime.rs | 163 ++++++++++++++++++++++++++++++++++++++++++++ src/main/server.rs | 2 +- 6 files changed, 212 insertions(+), 54 deletions(-) create mode 100644 src/main/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index c86904d7..f05da4eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -631,6 +631,7 @@ dependencies = [ "conduwuit_service", "console-subscriber", "const-str", + "core_affinity", "hardened_malloc-rs", "log", "opentelemetry", @@ -933,6 +934,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622892f5635ce1fc38c8f16dfc938553ed64af482edb5e150bf4caedbfcb2304" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "cpufeatures" version = "0.2.16" diff --git a/Cargo.toml b/Cargo.toml index cffbebfa..05fc3bc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -494,6 +494,9 @@ version = "1.0.89" [workspace.dependencies.bytesize] version = "1.3.0" +[workspace.dependencies.core_affinity] +version = "0.8.1" + # # Patches # diff --git a/src/main/Cargo.toml b/src/main/Cargo.toml index 99d41614..fe24d4c1 100644 --- a/src/main/Cargo.toml +++ b/src/main/Cargo.toml @@ -129,6 +129,9 @@ tokio_console = [ "dep:console-subscriber", "tokio/tracing", ] +worker_affinity = [ + "dep:core_affinity", +] zstd_compression = [ "conduwuit-api/zstd_compression", "conduwuit-core/zstd_compression", @@ -144,36 +147,34 @@ conduwuit-database.workspace = true conduwuit-router.workspace = true conduwuit-service.workspace = true -tokio.workspace = true -log.workspace = true -tracing.workspace = true -tracing-subscriber.workspace = true clap.workspace = true -const-str.workspace = true - -opentelemetry.workspace = true -opentelemetry.optional = true -tracing-flame.workspace = true -tracing-flame.optional = true -tracing-opentelemetry.workspace = true -tracing-opentelemetry.optional = true -opentelemetry_sdk.workspace = true -opentelemetry_sdk.optional = true -opentelemetry-jaeger.workspace = true -opentelemetry-jaeger.optional = true - -sentry.workspace = true -sentry.optional = true -sentry-tracing.workspace = true -sentry-tracing.optional = true -sentry-tower.workspace = true -sentry-tower.optional = true - -tokio-metrics.workspace = true -tokio-metrics.optional = true - -console-subscriber.workspace = true console-subscriber.optional = true +console-subscriber.workspace = true +const-str.workspace = true +core_affinity.optional = true +core_affinity.workspace = true +log.workspace = true +opentelemetry-jaeger.optional = true +opentelemetry-jaeger.workspace = true +opentelemetry.optional = true +opentelemetry.workspace = true +opentelemetry_sdk.optional = true +opentelemetry_sdk.workspace = true +sentry-tower.optional = true +sentry-tower.workspace = true +sentry-tracing.optional = true +sentry-tracing.workspace = true +sentry.optional = true +sentry.workspace = true +tokio-metrics.optional = true +tokio-metrics.workspace = true +tokio.workspace = true +tracing-flame.optional = true +tracing-flame.workspace = true +tracing-opentelemetry.optional = true +tracing-opentelemetry.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true [target.'cfg(all(not(target_env = "msvc"), target_os = "linux"))'.dependencies] hardened_malloc-rs.workspace = true diff --git a/src/main/main.rs b/src/main/main.rs index 0946e835..e7aaf3fc 100644 --- a/src/main/main.rs +++ b/src/main/main.rs @@ -2,45 +2,24 @@ pub(crate) mod clap; mod logging; mod mods; mod restart; +mod runtime; mod sentry; mod server; mod signal; extern crate conduwuit_core as conduwuit; -use std::{ - sync::{atomic::Ordering, Arc}, - time::Duration, -}; +use std::sync::{atomic::Ordering, Arc}; use conduwuit::{debug_info, error, rustc_flags_capture, Error, Result}; use server::Server; -use tokio::runtime; - -const WORKER_NAME: &str = "conduwuit:worker"; -const WORKER_MIN: usize = 2; -const WORKER_KEEPALIVE: u64 = 36; -const GLOBAL_QUEUE_INTERVAL: u32 = 192; -const SYSTEM_QUEUE_INTERVAL: u32 = 256; -const SYSTEM_EVENTS_PER_TICK: usize = 512; rustc_flags_capture! {} fn main() -> Result<(), Error> { let args = clap::parse(); - let runtime = runtime::Builder::new_multi_thread() - .enable_io() - .enable_time() - .thread_name(WORKER_NAME) - .worker_threads(args.worker_threads.max(WORKER_MIN)) - .thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE)) - .global_queue_interval(GLOBAL_QUEUE_INTERVAL) - .event_interval(SYSTEM_QUEUE_INTERVAL) - .max_io_events_per_tick(SYSTEM_EVENTS_PER_TICK) - .build() - .expect("built runtime"); - - let server: Arc = Server::build(&args, Some(runtime.handle()))?; + let runtime = runtime::new(&args)?; + let server = Server::new(&args, Some(runtime.handle()))?; runtime.spawn(signal::signal(server.clone())); runtime.block_on(async_main(&server))?; diff --git a/src/main/runtime.rs b/src/main/runtime.rs new file mode 100644 index 00000000..ad0c3cde --- /dev/null +++ b/src/main/runtime.rs @@ -0,0 +1,163 @@ +use std::{thread, time::Duration}; + +use conduwuit::Result; +use tokio::runtime::Builder; + +use crate::clap::Args; + +const WORKER_NAME: &str = "conduwuit:worker"; +const WORKER_MIN: usize = 2; +const WORKER_KEEPALIVE: u64 = 36; +const GLOBAL_QUEUE_INTERVAL: u32 = 192; +const KERNEL_QUEUE_INTERVAL: u32 = 256; +const KERNEL_EVENTS_PER_TICK: usize = 512; + +pub(super) fn new(args: &Args) -> Result { + let mut builder = Builder::new_multi_thread(); + + builder + .enable_io() + .enable_time() + .thread_name(WORKER_NAME) + .worker_threads(args.worker_threads.max(WORKER_MIN)) + .thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE)) + .max_io_events_per_tick(KERNEL_EVENTS_PER_TICK) + .event_interval(KERNEL_QUEUE_INTERVAL) + .global_queue_interval(GLOBAL_QUEUE_INTERVAL) + .on_thread_start(thread_start) + .on_thread_stop(thread_stop) + .on_thread_unpark(thread_unpark) + .on_thread_park(thread_park); + + #[cfg(tokio_unstable)] + builder + .on_task_spawn(task_spawn) + .on_task_terminate(task_terminate); + + #[cfg(tokio_unstable)] + enable_histogram(&mut builder); + + builder.build().map_err(Into::into) +} + +#[cfg(tokio_unstable)] +fn enable_histogram(builder: &mut Builder) { + use tokio::runtime::{HistogramConfiguration, LogHistogram}; + + let config = LogHistogram::builder() + .min_value(Duration::from_micros(10)) + .max_value(Duration::from_millis(1)) + .max_error(0.5) + .max_buckets(32) + .expect("erroneous histogram configuration"); + + builder + .enable_metrics_poll_time_histogram() + .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(config)); +} + +#[tracing::instrument( + name = "fork", + level = "debug", + skip_all, + fields( + id = ?thread::current().id(), + name = %thread::current().name().unwrap_or("None"), + ), +)] +fn thread_start() { + #[cfg(feature = "worker_affinity")] + set_worker_affinity(); +} + +#[cfg(feature = "worker_affinity")] +fn set_worker_affinity() { + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + LazyLock, + }; + + static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0); + static CORES_AVAILABLE: LazyLock>> = LazyLock::new(|| { + core_affinity::get_core_ids().map(|mut cores| { + cores.sort_unstable(); + cores + }) + }); + + let Some(cores) = CORES_AVAILABLE.as_ref() else { + return; + }; + + if thread::current().name() != Some(WORKER_NAME) { + return; + } + + let handle = tokio::runtime::Handle::current(); + let num_workers = handle.metrics().num_workers(); + let i = CORES_OCCUPIED.fetch_add(1, Ordering::Relaxed); + if i >= num_workers { + return; + } + + let Some(id) = cores.get(i) else { + return; + }; + + let _set = core_affinity::set_for_current(*id); +} + +#[tracing::instrument( + name = "join", + level = "debug", + skip_all, + fields( + id = ?thread::current().id(), + name = %thread::current().name().unwrap_or("None"), + ), +)] +fn thread_stop() {} + +#[tracing::instrument( + name = "work", + level = "trace", + skip_all, + fields( + id = ?thread::current().id(), + name = %thread::current().name().unwrap_or("None"), + ), +)] +fn thread_unpark() {} + +#[tracing::instrument( + name = "park", + level = "trace", + skip_all, + fields( + id = ?thread::current().id(), + name = %thread::current().name().unwrap_or("None"), + ), +)] +fn thread_park() {} + +#[cfg(tokio_unstable)] +#[tracing::instrument( + name = "spawn", + level = "trace", + skip_all, + fields( + id = %meta.id(), + ), +)] +fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {} + +#[cfg(tokio_unstable)] +#[tracing::instrument( + name = "finish", + level = "trace", + skip_all, + fields( + id = %meta.id() + ), +)] +fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {} diff --git a/src/main/server.rs b/src/main/server.rs index 00c7a6cc..bc2cff85 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -23,7 +23,7 @@ pub(crate) struct Server { } impl Server { - pub(crate) fn build( + pub(crate) fn new( args: &Args, runtime: Option<&runtime::Handle>, ) -> Result, Error> {