diff --git a/src/main/clap.rs b/src/main/clap.rs index ad5c815a..d3d40491 100644 --- a/src/main/clap.rs +++ b/src/main/clap.rs @@ -80,6 +80,18 @@ pub(crate) struct Args { default_missing_value = "true", )] pub(crate) worker_affinity: bool, + + /// Toggles feature to promote memory reclamation by the operating system + /// when tokio worker runs out of work. + #[arg( + long, + hide(true), + env = "CONDUWUIT_RUNTIME_GC_ON_PARK", + action = ArgAction::Set, + num_args = 0..=1, + require_equals(false), + )] + pub(crate) gc_on_park: Option, } /// Parse commandline arguments into structured data diff --git a/src/main/runtime.rs b/src/main/runtime.rs index 3039ef1b..315336b0 100644 --- a/src/main/runtime.rs +++ b/src/main/runtime.rs @@ -9,6 +9,7 @@ use std::{ }; use conduwuit::{ + result::LogErr, utils::sys::compute::{nth_core_available, set_affinity}, Result, }; @@ -22,12 +23,17 @@ const WORKER_KEEPALIVE: u64 = 36; const MAX_BLOCKING_THREADS: usize = 1024; static WORKER_AFFINITY: OnceLock = OnceLock::new(); +static GC_ON_PARK: OnceLock> = OnceLock::new(); pub(super) fn new(args: &Args) -> Result { WORKER_AFFINITY .set(args.worker_affinity) .expect("set WORKER_AFFINITY from program argument"); + GC_ON_PARK + .set(args.gc_on_park) + .expect("set GC_ON_PARK from program argument"); + let mut builder = Builder::new_multi_thread(); builder .enable_io() @@ -138,7 +144,21 @@ fn thread_unpark() {} name = %thread::current().name().unwrap_or("None"), ), )] -fn thread_park() {} +fn thread_park() { + match GC_ON_PARK + .get() + .as_ref() + .expect("GC_ON_PARK initialized by runtime::new()") + { + | Some(true) | None if cfg!(feature = "jemalloc_conf") => gc_on_park(), + | _ => (), + } +} + +fn gc_on_park() { + #[cfg(feature = "jemalloc")] + conduwuit::alloc::je::this_thread::decay().log_err().ok(); +} #[cfg(tokio_unstable)] #[tracing::instrument(