add preferred jemalloc config
add muzzy/dirty configuration mallctl interface add program argument for --gc-muzzy=false Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
3eed408b29
commit
3dae02b886
12 changed files with 289 additions and 82 deletions
|
@ -967,7 +967,7 @@ pub(super) async fn database_stats(
|
|||
|
||||
#[admin_command]
|
||||
pub(super) async fn trim_memory(&self) -> Result<RoomMessageEventContent> {
|
||||
conduwuit::alloc::trim()?;
|
||||
conduwuit::alloc::trim(None)?;
|
||||
|
||||
writeln!(self, "done").await?;
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ jemalloc_stats = [
|
|||
"tikv-jemalloc-ctl/stats",
|
||||
"tikv-jemallocator/stats",
|
||||
]
|
||||
jemalloc_conf = []
|
||||
hardened_malloc = [
|
||||
"dep:hardened_malloc-rs"
|
||||
]
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Default allocator with no special features
|
||||
|
||||
/// Always returns Ok
|
||||
pub fn trim() -> crate::Result { Ok(()) }
|
||||
pub fn trim<I: Into<Option<usize>>>(_: I) -> crate::Result { Ok(()) }
|
||||
|
||||
/// Always returns None
|
||||
#[must_use]
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
#[global_allocator]
|
||||
static HMALLOC: hardened_malloc_rs::HardenedMalloc = hardened_malloc_rs::HardenedMalloc;
|
||||
|
||||
pub fn trim() -> crate::Result { Ok(()) }
|
||||
pub fn trim<I: Into<Option<usize>>>(_: I) -> crate::Result { Ok(()) }
|
||||
|
||||
#[must_use]
|
||||
//TODO: get usage
|
||||
|
|
|
@ -2,8 +2,9 @@
|
|||
|
||||
use std::{
|
||||
cell::OnceCell,
|
||||
ffi::{c_char, c_void},
|
||||
ffi::{c_char, c_void, CStr},
|
||||
fmt::Debug,
|
||||
sync::RwLock,
|
||||
};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
|
@ -11,10 +12,14 @@ use tikv_jemalloc_ctl as mallctl;
|
|||
use tikv_jemalloc_sys as ffi;
|
||||
use tikv_jemallocator as jemalloc;
|
||||
|
||||
use crate::{err, is_equal_to, utils::math::Tried, Result};
|
||||
use crate::{
|
||||
err, is_equal_to, is_nonzero,
|
||||
utils::{math, math::Tried},
|
||||
Result,
|
||||
};
|
||||
|
||||
#[cfg(feature = "jemalloc_conf")]
|
||||
#[no_mangle]
|
||||
#[unsafe(no_mangle)]
|
||||
pub static malloc_conf: &[u8] = b"\
|
||||
metadata_thp:always\
|
||||
,percpu_arena:percpu\
|
||||
|
@ -22,19 +27,26 @@ metadata_thp:always\
|
|||
,max_background_threads:-1\
|
||||
,lg_extent_max_active_fit:4\
|
||||
,oversize_threshold:33554432\
|
||||
,tcache_max:2097152\
|
||||
,tcache_max:1048576\
|
||||
,dirty_decay_ms:16000\
|
||||
,muzzy_decay_ms:144000\
|
||||
\0";
|
||||
|
||||
#[global_allocator]
|
||||
static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc;
|
||||
static CONTROL: RwLock<()> = RwLock::new(());
|
||||
|
||||
type Key = ArrayVec<usize, KEY_SEGS>;
|
||||
type Name = ArrayVec<u8, NAME_MAX>;
|
||||
type Key = ArrayVec<usize, KEY_SEGS>;
|
||||
|
||||
const KEY_SEGS: usize = 8;
|
||||
const NAME_MAX: usize = 128;
|
||||
const KEY_SEGS: usize = 8;
|
||||
|
||||
#[crate::ctor]
|
||||
fn _static_initialization() {
|
||||
acq_epoch().expect("pre-initialization of jemalloc failed");
|
||||
acq_epoch().expect("pre-initialization of jemalloc failed");
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[cfg(feature = "jemalloc_stats")]
|
||||
|
@ -49,6 +61,9 @@ pub fn memory_usage() -> Option<String> {
|
|||
kibs / 1024.0
|
||||
};
|
||||
|
||||
// Acquire the epoch; ensure latest stats are pulled in
|
||||
acq_epoch().ok()?;
|
||||
|
||||
let allocated = mibs(stats::allocated::read());
|
||||
let active = mibs(stats::active::read());
|
||||
let mapped = mibs(stats::mapped::read());
|
||||
|
@ -76,6 +91,9 @@ pub fn memory_stats(opts: &str) -> Option<String> {
|
|||
.into_raw()
|
||||
.cast_const();
|
||||
|
||||
// Acquire the epoch; ensure latest stats are pulled in
|
||||
acq_epoch().ok()?;
|
||||
|
||||
// SAFETY: calls malloc_stats_print() with our string instance which must remain
|
||||
// in this frame. https://docs.rs/tikv-jemalloc-sys/latest/tikv_jemalloc_sys/fn.malloc_stats_print.html
|
||||
unsafe { ffi::malloc_stats_print(Some(malloc_stats_cb), opaque, opts_p) };
|
||||
|
@ -95,7 +113,7 @@ unsafe extern "C" fn malloc_stats_cb(opaque: *mut c_void, msg: *const c_char) {
|
|||
};
|
||||
|
||||
// SAFETY: we have to trust the string is null terminated.
|
||||
let msg = unsafe { std::ffi::CStr::from_ptr(msg) };
|
||||
let msg = unsafe { CStr::from_ptr(msg) };
|
||||
|
||||
let msg = String::from_utf8_lossy(msg.to_bytes());
|
||||
res.push_str(msg.as_ref());
|
||||
|
@ -114,58 +132,168 @@ macro_rules! mallctl {
|
|||
}};
|
||||
}
|
||||
|
||||
pub fn trim() -> Result { set(&mallctl!("arena.4096.purge"), ()) }
|
||||
|
||||
pub fn decay() -> Result { set(&mallctl!("arena.4096.purge"), ()) }
|
||||
|
||||
pub fn set_by_name<T: Copy + Debug>(name: &str, val: T) -> Result { set(&key(name)?, val) }
|
||||
|
||||
pub fn get_by_name<T: Copy + Debug>(name: &str) -> Result<T> { get(&key(name)?) }
|
||||
|
||||
pub mod this_thread {
|
||||
use super::{get, key, set, Key, OnceCell, Result};
|
||||
use super::{is_nonzero, key, math, Debug, Key, OnceCell, Result};
|
||||
|
||||
pub fn trim() -> Result {
|
||||
let mut key = mallctl!("arena.0.purge");
|
||||
key[1] = arena_id()?.try_into()?;
|
||||
set(&key, ())
|
||||
pub fn trim() -> Result { notify(mallctl!("arena.0.purge")) }
|
||||
|
||||
pub fn decay() -> Result { notify(mallctl!("arena.0.decay")) }
|
||||
|
||||
pub fn flush() -> Result { super::notify(&mallctl!("thread.tcache.flush")) }
|
||||
|
||||
pub fn set_muzzy_decay(decay_ms: isize) -> Result<isize> {
|
||||
set(mallctl!("arena.0.muzzy_decay_ms"), decay_ms)
|
||||
}
|
||||
|
||||
pub fn decay() -> Result {
|
||||
let mut key = mallctl!("arena.0.decay");
|
||||
key[1] = arena_id()?.try_into()?;
|
||||
set(&key, ())
|
||||
pub fn get_muzzy_decay() -> Result<isize> { get(mallctl!("arena.0.muzzy_decay_ms")) }
|
||||
|
||||
pub fn set_dirty_decay(decay_ms: isize) -> Result<isize> {
|
||||
set(mallctl!("arena.0.dirty_decay_ms"), decay_ms)
|
||||
}
|
||||
|
||||
pub fn cache(enable: bool) -> Result {
|
||||
set(&mallctl!("thread.tcache.enabled"), u8::from(enable))
|
||||
pub fn get_dirty_decay() -> Result<isize> { get(mallctl!("arena.0.dirty_decay_ms")) }
|
||||
|
||||
pub fn enable_cache(enable: bool) -> Result<bool> {
|
||||
super::set::<u8>(&mallctl!("thread.tcache.enabled"), enable.into()).map(is_nonzero!())
|
||||
}
|
||||
|
||||
pub fn flush() -> Result { set(&mallctl!("thread.tcache.flush"), ()) }
|
||||
pub fn is_cache_enabled() -> Result<bool> {
|
||||
super::get::<u8>(&mallctl!("thread.tcache.enabled")).map(is_nonzero!())
|
||||
}
|
||||
|
||||
pub fn allocated() -> Result<u64> { get::<u64>(&mallctl!("thread.allocated")) }
|
||||
pub fn set_arena(id: usize) -> Result<usize> {
|
||||
super::set::<u32>(&mallctl!("thread.arena"), id.try_into()?).and_then(math::try_into)
|
||||
}
|
||||
|
||||
pub fn deallocated() -> Result<u64> { get::<u64>(&mallctl!("thread.deallocated")) }
|
||||
pub fn arena_id() -> Result<usize> {
|
||||
super::get::<u32>(&mallctl!("thread.arena")).and_then(math::try_into)
|
||||
}
|
||||
|
||||
pub fn arena_id() -> Result<u32> { get::<u32>(&mallctl!("thread.arena")) }
|
||||
pub fn allocated() -> Result<u64> { super::get(&mallctl!("thread.allocated")) }
|
||||
|
||||
pub fn deallocated() -> Result<u64> { super::get(&mallctl!("thread.deallocated")) }
|
||||
|
||||
fn notify(key: Key) -> Result { super::notify_by_arena(Some(arena_id()?), key) }
|
||||
|
||||
fn set<T>(key: Key, val: T) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
super::set_by_arena(Some(arena_id()?), key, val)
|
||||
}
|
||||
|
||||
fn get<T>(key: Key) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
super::get_by_arena(Some(arena_id()?), key)
|
||||
}
|
||||
}
|
||||
|
||||
fn set<T>(key: &Key, val: T) -> Result
|
||||
pub fn trim<I: Into<Option<usize>>>(arena: I) -> Result {
|
||||
notify_by_arena(arena.into(), mallctl!("arena.4096.purge"))
|
||||
}
|
||||
|
||||
pub fn decay<I: Into<Option<usize>>>(arena: I) -> Result {
|
||||
notify_by_arena(arena.into(), mallctl!("arena.4096.decay"))
|
||||
}
|
||||
|
||||
pub fn set_muzzy_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Result<isize> {
|
||||
if let Some(arena) = arena.into() {
|
||||
set_by_arena(Some(arena), mallctl!("arena.4096.muzzy_decay_ms"), decay_ms)
|
||||
} else {
|
||||
set(&mallctl!("arenas.muzzy_decay_ms"), decay_ms)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Result<isize> {
|
||||
if let Some(arena) = arena.into() {
|
||||
set_by_arena(Some(arena), mallctl!("arena.4096.dirty_decay_ms"), decay_ms)
|
||||
} else {
|
||||
set(&mallctl!("arenas.dirty_decay_ms"), decay_ms)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_percpu_arena() -> bool { percpu_arenas().is_ok_and(is_equal_to!("percpu")) }
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_phycpu_arena() -> bool { percpu_arenas().is_ok_and(is_equal_to!("phycpu")) }
|
||||
|
||||
pub fn percpu_arenas() -> Result<&'static str> {
|
||||
let ptr = get::<*const c_char>(&mallctl!("opt.percpu_arena"))?;
|
||||
//SAFETY: ptr points to a null-terminated string returned for opt.percpu_arena.
|
||||
let cstr = unsafe { CStr::from_ptr(ptr) };
|
||||
cstr.to_str().map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn arenas() -> Result<usize> {
|
||||
get::<u32>(&mallctl!("arenas.narenas")).and_then(math::try_into)
|
||||
}
|
||||
|
||||
pub fn inc_epoch() -> Result<u64> { xchg(&mallctl!("epoch"), 1_u64) }
|
||||
|
||||
pub fn acq_epoch() -> Result<u64> { xchg(&mallctl!("epoch"), 0_u64) }
|
||||
|
||||
fn notify_by_arena(id: Option<usize>, mut key: Key) -> Result {
|
||||
key[1] = id.unwrap_or(4096);
|
||||
notify(&key)
|
||||
}
|
||||
|
||||
fn set_by_arena<T>(id: Option<usize>, mut key: Key, val: T) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
// SAFETY: T must be the exact expected type.
|
||||
unsafe { mallctl::raw::write_mib(key.as_slice(), val) }.map_err(map_err)
|
||||
key[1] = id.unwrap_or(4096);
|
||||
set(&key, val)
|
||||
}
|
||||
|
||||
fn get_by_arena<T>(id: Option<usize>, mut key: Key) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
key[1] = id.unwrap_or(4096);
|
||||
get(&key)
|
||||
}
|
||||
|
||||
fn notify(key: &Key) -> Result { xchg(key, ()) }
|
||||
|
||||
fn set<T>(key: &Key, val: T) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
let _lock = CONTROL.write()?;
|
||||
let res = xchg(key, val)?;
|
||||
inc_epoch()?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn get<T>(key: &Key) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
acq_epoch()?;
|
||||
acq_epoch()?;
|
||||
|
||||
// SAFETY: T must be perfectly valid to receive value.
|
||||
unsafe { mallctl::raw::read_mib(key.as_slice()) }.map_err(map_err)
|
||||
}
|
||||
|
||||
fn xchg<T>(key: &Key, val: T) -> Result<T>
|
||||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
// SAFETY: T must be the exact expected type.
|
||||
unsafe { mallctl::raw::update_mib(key.as_slice(), val) }.map_err(map_err)
|
||||
}
|
||||
|
||||
fn key(name: &str) -> Result<Key> {
|
||||
// tikv asserts the output buffer length is tight to the number of required mibs
|
||||
// so we slice that down here.
|
||||
|
|
|
@ -13,7 +13,7 @@ use std::{
|
|||
use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
|
||||
use conduwuit::{
|
||||
debug, debug_warn, err, error, implement,
|
||||
result::DebugInspect,
|
||||
result::{DebugInspect, LogDebugErr},
|
||||
trace,
|
||||
utils::sys::compute::{get_affinity, nth_core_available, set_affinity},
|
||||
Error, Result, Server,
|
||||
|
@ -289,6 +289,20 @@ fn worker_init(&self, id: usize) {
|
|||
|
||||
// affinity is empty (no-op) if there's only one queue
|
||||
set_affinity(affinity.clone());
|
||||
|
||||
#[cfg(feature = "jemalloc")]
|
||||
if affinity.clone().count() == 1 && conduwuit::alloc::je::is_affine_arena() {
|
||||
use conduwuit::alloc::je::this_thread::{arena_id, set_arena};
|
||||
|
||||
let id = affinity.clone().next().expect("at least one id");
|
||||
|
||||
if let Ok(arena) = arena_id() {
|
||||
if arena != id {
|
||||
set_arena(id).log_debug_err().ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
?group,
|
||||
affinity = ?affinity.collect::<Vec<_>>(),
|
||||
|
|
|
@ -41,6 +41,7 @@ default = [
|
|||
"gzip_compression",
|
||||
"io_uring",
|
||||
"jemalloc",
|
||||
"jemalloc_conf",
|
||||
"media_thumbnail",
|
||||
"release_max_log_level",
|
||||
"systemd",
|
||||
|
@ -85,6 +86,9 @@ jemalloc_prof = [
|
|||
jemalloc_stats = [
|
||||
"conduwuit-core/jemalloc_stats",
|
||||
]
|
||||
jemalloc_conf = [
|
||||
"conduwuit-core/jemalloc_conf",
|
||||
]
|
||||
media_thumbnail = [
|
||||
"conduwuit-service/media_thumbnail",
|
||||
]
|
||||
|
|
|
@ -92,6 +92,22 @@ pub(crate) struct Args {
|
|||
require_equals(false),
|
||||
)]
|
||||
pub(crate) gc_on_park: Option<bool>,
|
||||
|
||||
/// Toggles muzzy decay for jemalloc arenas associated with a tokio
|
||||
/// worker (when worker-affinity is enabled). Setting to false releases
|
||||
/// memory to the operating system using MADV_FREE without MADV_DONTNEED.
|
||||
/// Setting to false increases performance by reducing pagefaults, but
|
||||
/// resident memory usage appears high until there is memory pressure. The
|
||||
/// default is true unless the system has four or more cores.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "CONDUWUIT_RUNTIME_GC_MUZZY",
|
||||
action = ArgAction::Set,
|
||||
num_args = 0..=1,
|
||||
require_equals(false),
|
||||
)]
|
||||
pub(crate) gc_muzzy: Option<bool>,
|
||||
}
|
||||
|
||||
/// Parse commandline arguments into structured data
|
||||
|
|
|
@ -9,8 +9,12 @@ use std::{
|
|||
};
|
||||
|
||||
use conduwuit::{
|
||||
result::LogErr,
|
||||
utils::sys::compute::{nth_core_available, set_affinity},
|
||||
is_true,
|
||||
result::LogDebugErr,
|
||||
utils::{
|
||||
available_parallelism,
|
||||
sys::compute::{nth_core_available, set_affinity},
|
||||
},
|
||||
Result,
|
||||
};
|
||||
use tokio::runtime::Builder;
|
||||
|
@ -21,9 +25,11 @@ const WORKER_NAME: &str = "conduwuit:worker";
|
|||
const WORKER_MIN: usize = 2;
|
||||
const WORKER_KEEPALIVE: u64 = 36;
|
||||
const MAX_BLOCKING_THREADS: usize = 1024;
|
||||
const DISABLE_MUZZY_THRESHOLD: usize = 4;
|
||||
|
||||
static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();
|
||||
static GC_ON_PARK: OnceLock<Option<bool>> = OnceLock::new();
|
||||
static GC_MUZZY: OnceLock<Option<bool>> = OnceLock::new();
|
||||
|
||||
pub(super) fn new(args: &Args) -> Result<tokio::runtime::Runtime> {
|
||||
WORKER_AFFINITY
|
||||
|
@ -34,6 +40,10 @@ pub(super) fn new(args: &Args) -> Result<tokio::runtime::Runtime> {
|
|||
.set(args.gc_on_park)
|
||||
.expect("set GC_ON_PARK from program argument");
|
||||
|
||||
GC_MUZZY
|
||||
.set(args.gc_muzzy)
|
||||
.expect("set GC_MUZZY from program argument");
|
||||
|
||||
let mut builder = Builder::new_multi_thread();
|
||||
builder
|
||||
.enable_io()
|
||||
|
@ -83,11 +93,13 @@ fn enable_histogram(builder: &mut Builder, args: &Args) {
|
|||
),
|
||||
)]
|
||||
fn thread_start() {
|
||||
if WORKER_AFFINITY
|
||||
.get()
|
||||
.copied()
|
||||
.expect("WORKER_AFFINITY initialized by runtime::new()")
|
||||
{
|
||||
debug_assert_eq!(
|
||||
Some(WORKER_NAME),
|
||||
thread::current().name(),
|
||||
"tokio worker name mismatch at thread start"
|
||||
);
|
||||
|
||||
if WORKER_AFFINITY.get().is_some_and(is_true!()) {
|
||||
set_worker_affinity();
|
||||
}
|
||||
}
|
||||
|
@ -95,10 +107,6 @@ fn thread_start() {
|
|||
fn set_worker_affinity() {
|
||||
static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
if thread::current().name() != Some(WORKER_NAME) {
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
let num_workers = handle.metrics().num_workers();
|
||||
let i = CORES_OCCUPIED.fetch_add(1, Ordering::Relaxed);
|
||||
|
@ -111,8 +119,33 @@ fn set_worker_affinity() {
|
|||
};
|
||||
|
||||
set_affinity(once(id));
|
||||
set_worker_mallctl(id);
|
||||
}
|
||||
|
||||
#[cfg(feature = "jemalloc")]
|
||||
fn set_worker_mallctl(id: usize) {
|
||||
use conduwuit::alloc::je::{
|
||||
is_affine_arena,
|
||||
this_thread::{set_arena, set_muzzy_decay},
|
||||
};
|
||||
|
||||
if is_affine_arena() {
|
||||
set_arena(id).log_debug_err().ok();
|
||||
}
|
||||
|
||||
let muzzy_option = GC_MUZZY
|
||||
.get()
|
||||
.expect("GC_MUZZY initialized by runtime::new()");
|
||||
|
||||
let muzzy_auto_disable = available_parallelism() >= DISABLE_MUZZY_THRESHOLD;
|
||||
if matches!(muzzy_option, Some(false) | None if muzzy_auto_disable) {
|
||||
set_muzzy_decay(-1).log_debug_err().ok();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "jemalloc"))]
|
||||
fn set_worker_mallctl(_: usize) {}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "join",
|
||||
level = "debug",
|
||||
|
@ -157,7 +190,9 @@ fn thread_park() {
|
|||
|
||||
fn gc_on_park() {
|
||||
#[cfg(feature = "jemalloc")]
|
||||
conduwuit::alloc::je::this_thread::decay().log_err().ok();
|
||||
conduwuit::alloc::je::this_thread::decay()
|
||||
.log_debug_err()
|
||||
.ok();
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue