From b1951070535674b4bcae429c8dee740e72fe104f Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 22 Dec 2024 15:09:30 +0000 Subject: [PATCH] optimize for multi-queue storage topologies with affinity Signed-off-by: Jason Volk --- conduwuit-example.toml | 32 ++++++- src/core/config/mod.rs | 52 ++++++++-- src/database/engine.rs | 12 +-- src/database/pool.rs | 170 ++++++++++++++++++++++----------- src/database/pool/configure.rs | 106 ++++++++++++++++++++ src/main/runtime.rs | 2 +- 6 files changed, 297 insertions(+), 77 deletions(-) create mode 100644 src/database/pool/configure.rs diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 3669961a..111acb05 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1397,18 +1397,42 @@ # #admin_room_notices = true +# Enable database pool affinity support. On supporting systems, block +# device queue topologies are detected and the request pool is optimized +# for the hardware; db_pool_workers is determined automatically. +# +#db_pool_affinity = true + # Sets the number of worker threads in the frontend-pool of the database. # This number should reflect the I/O capabilities of the system, -# specifically the queue-depth or the number of simultaneous requests in +# such as the queue-depth or the number of simultaneous requests in # flight. Defaults to 32 or four times the number of CPU cores, whichever # is greater. # +# Note: This value is only used if db_pool_affinity is disabled or not +# detected on the system, otherwise it is determined automatically. +# #db_pool_workers = 32 -# Size of the queue feeding the database's frontend-pool. Defaults to 256 -# or eight times the number of CPU cores, whichever is greater. +# When db_pool_affinity is enabled and detected, the size of any worker +# group will not exceed the determined value. This is necessary when +# thread-pooling approach does not scale to the full capabilities of +# high-end hardware; using detected values without limitation could +# degrade performance. # -#db_pool_queue_size = 256 +# The value is multiplied by the number of cores which share a device +# queue, since group workers can be scheduled on any of those cores. +# +#db_pool_workers_limit = 64 + +# Determines the size of the queues feeding the database's frontend-pool. +# The size of the queue is determined by multiplying this value with the +# number of pool workers. When this queue is full, tokio tasks conducting +# requests will yield until space is available; this is good for +# flow-control by avoiding buffer-bloat, but can inhibit throughput if +# too low. +# +#db_pool_queue_mult = 4 # Number of sender task workers; determines sender parallelism. Default is # '0' which means the value is determined internally, likely matching the diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 23feb0ca..3772aa16 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1581,22 +1581,50 @@ pub struct Config { #[serde(default = "true_fn")] pub admin_room_notices: bool, + /// Enable database pool affinity support. On supporting systems, block + /// device queue topologies are detected and the request pool is optimized + /// for the hardware; db_pool_workers is determined automatically. + /// + /// default: true + #[serde(default = "true_fn")] + pub db_pool_affinity: bool, + /// Sets the number of worker threads in the frontend-pool of the database. /// This number should reflect the I/O capabilities of the system, - /// specifically the queue-depth or the number of simultaneous requests in + /// such as the queue-depth or the number of simultaneous requests in /// flight. Defaults to 32 or four times the number of CPU cores, whichever /// is greater. /// + /// Note: This value is only used if db_pool_affinity is disabled or not + /// detected on the system, otherwise it is determined automatically. + /// /// default: 32 #[serde(default = "default_db_pool_workers")] pub db_pool_workers: usize, - /// Size of the queue feeding the database's frontend-pool. Defaults to 256 - /// or eight times the number of CPU cores, whichever is greater. + /// When db_pool_affinity is enabled and detected, the size of any worker + /// group will not exceed the determined value. This is necessary when + /// thread-pooling approach does not scale to the full capabilities of + /// high-end hardware; using detected values without limitation could + /// degrade performance. /// - /// default: 256 - #[serde(default = "default_db_pool_queue_size")] - pub db_pool_queue_size: usize, + /// The value is multiplied by the number of cores which share a device + /// queue, since group workers can be scheduled on any of those cores. + /// + /// default: 64 + #[serde(default = "default_db_pool_workers_limit")] + pub db_pool_workers_limit: usize, + + /// Determines the size of the queues feeding the database's frontend-pool. + /// The size of the queue is determined by multiplying this value with the + /// number of pool workers. When this queue is full, tokio tasks conducting + /// requests will yield until space is available; this is good for + /// flow-control by avoiding buffer-bloat, but can inhibit throughput if + /// too low. + /// + /// default: 4 + #[serde(default = "default_db_pool_queue_mult")] + pub db_pool_queue_mult: usize, /// Number of sender task workers; determines sender parallelism. Default is /// '0' which means the value is determined internally, likely matching the @@ -2399,8 +2427,12 @@ fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_p fn default_trusted_server_batch_size() -> usize { 256 } -fn default_db_pool_workers() -> usize { sys::available_parallelism().saturating_mul(4).max(32) } - -fn default_db_pool_queue_size() -> usize { - sys::available_parallelism().saturating_mul(8).max(256) +fn default_db_pool_workers() -> usize { + sys::available_parallelism() + .saturating_mul(4) + .clamp(32, 1024) } + +fn default_db_pool_workers_limit() -> usize { 64 } + +fn default_db_pool_queue_mult() -> usize { 4 } diff --git a/src/database/engine.rs b/src/database/engine.rs index 73ea559d..faf5b131 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -18,7 +18,7 @@ use rocksdb::{ use crate::{ opts::{cf_options, db_options}, - or_else, pool, + or_else, pool::Pool, result, util::map_err, @@ -87,8 +87,9 @@ impl Engine { .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)) .collect::>(); - debug!("Opening database..."); let path = &config.database_path; + + debug!("Opening database..."); let res = if config.rocksdb_read_only { Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false) } else if config.rocksdb_secondary { @@ -105,11 +106,6 @@ impl Engine { "Opened database." ); - let pool_opts = pool::Opts { - queue_size: config.db_pool_queue_size, - worker_num: config.db_pool_workers, - }; - Ok(Arc::new(Self { server: server.clone(), row_cache, @@ -121,7 +117,7 @@ impl Engine { corks: AtomicU32::new(0), read_only: config.rocksdb_read_only, secondary: config.rocksdb_secondary, - pool: Pool::new(server, &pool_opts).await?, + pool: Pool::new(server).await?, })) } diff --git a/src/database/pool.rs b/src/database/pool.rs index 4f018a38..51e705ce 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -1,3 +1,5 @@ +mod configure; + use std::{ mem::take, sync::{ @@ -6,39 +8,50 @@ use std::{ }, }; -use async_channel::{bounded, Receiver, RecvError, Sender}; -use conduwuit::{debug, debug_warn, defer, err, implement, result::DebugInspect, Result, Server}; +use async_channel::{Receiver, RecvError, Sender}; +use conduwuit::{ + debug, debug_warn, defer, err, implement, + result::DebugInspect, + trace, + utils::sys::compute::{get_affinity, get_core_available, set_affinity}, + Result, Server, +}; use futures::{channel::oneshot, TryFutureExt}; use oneshot::Sender as ResultSender; use rocksdb::Direction; use tokio::task::JoinSet; +use self::configure::configure; use crate::{keyval::KeyBuf, stream, Handle, Map}; +/// Frontend thread-pool. Operating system threads are used to make database +/// requests which are not cached. These thread-blocking requests are offloaded +/// from the tokio async workers and executed on this threadpool. pub(crate) struct Pool { server: Arc, + queues: Vec>, workers: Mutex>, - queue: Sender, + topology: Vec, busy: AtomicUsize, queued_max: AtomicUsize, } -pub(crate) struct Opts { - pub(crate) queue_size: usize, - pub(crate) worker_num: usize, -} - +/// Operations which can be submitted to the pool. pub(crate) enum Cmd { Get(Get), Iter(Seek), } +/// Point-query pub(crate) struct Get { pub(crate) map: Arc, pub(crate) key: KeyBuf, pub(crate) res: Option>>>, } +/// Iterator-seek. +/// Note: only initial seek is supported at this time on the assumption rocksdb +/// prefetching prevents mid-iteration polls from blocking on I/O. pub(crate) struct Seek { pub(crate) map: Arc, pub(crate) state: stream::State<'static>, @@ -47,34 +60,44 @@ pub(crate) struct Seek { pub(crate) res: Option>>, } -const QUEUE_LIMIT: (usize, usize) = (1, 3072); -const WORKER_LIMIT: (usize, usize) = (1, 512); - -impl Drop for Pool { - fn drop(&mut self) { - debug_assert!(self.queue.is_empty(), "channel must be empty on drop"); - debug_assert!(self.queue.is_closed(), "channel should be closed on drop"); - } -} +const WORKER_LIMIT: (usize, usize) = (1, 1024); +const QUEUE_LIMIT: (usize, usize) = (1, 2048); #[implement(Pool)] -pub(crate) async fn new(server: &Arc, opts: &Opts) -> Result> { - let queue_size = opts.queue_size.clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1); - let (send, recv) = bounded(queue_size); +pub(crate) async fn new(server: &Arc) -> Result> { + let (total_workers, queue_sizes, topology) = configure(server); + + let (senders, receivers) = queue_sizes.into_iter().map(async_channel::bounded).unzip(); + let pool = Arc::new(Self { server: server.clone(), + + queues: senders, + workers: JoinSet::new().into(), - queue: send, + + topology, + busy: AtomicUsize::default(), + queued_max: AtomicUsize::default(), }); - let worker_num = opts.worker_num.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1); - pool.spawn_until(recv, worker_num).await?; + pool.spawn_until(receivers, total_workers).await?; Ok(pool) } +impl Drop for Pool { + fn drop(&mut self) { + debug_assert!(self.queues.iter().all(Sender::is_empty), "channel must be empty on drop"); + debug_assert!( + self.queues.iter().all(Sender::is_closed), + "channel should be closed on drop" + ); + } +} + #[implement(Pool)] pub(crate) async fn shutdown(self: &Arc) { self.close(); @@ -83,36 +106,39 @@ pub(crate) async fn shutdown(self: &Arc) { debug!(workers = workers.len(), "Waiting for workers to join..."); workers.join_all().await; - debug_assert!(self.queue.is_empty(), "channel is not empty"); } #[implement(Pool)] -pub(crate) fn close(&self) -> bool { - if !self.queue.close() { - return false; - } +pub(crate) fn close(&self) { + let senders = self.queues.iter().map(Sender::sender_count).sum::(); - let mut workers = take(&mut *self.workers.lock().expect("locked")); - debug!(workers = workers.len(), "Waiting for workers to join..."); - workers.abort_all(); - drop(workers); + let receivers = self + .queues + .iter() + .map(Sender::receiver_count) + .sum::(); - std::thread::yield_now(); - debug_assert!(self.queue.is_empty(), "channel is not empty"); debug!( - senders = self.queue.sender_count(), - receivers = self.queue.receiver_count(), - "Closed pool channel" + queues = self.queues.len(), + workers = self.workers.lock().expect("locked").len(), + ?senders, + ?receivers, + "Closing pool..." ); - true + for queue in &self.queues { + queue.close(); + } + + self.workers.lock().expect("locked").abort_all(); + std::thread::yield_now(); } #[implement(Pool)] -async fn spawn_until(self: &Arc, recv: Receiver, max: usize) -> Result { +async fn spawn_until(self: &Arc, recv: Vec>, count: usize) -> Result { let mut workers = self.workers.lock().expect("locked"); - while workers.len() < max { - self.spawn_one(&mut workers, recv.clone())?; + while workers.len() < count { + self.spawn_one(&mut workers, &recv)?; } Ok(()) @@ -125,8 +151,13 @@ async fn spawn_until(self: &Arc, recv: Receiver, max: usize) -> Resul skip_all, fields(id = %workers.len()) )] -fn spawn_one(self: &Arc, workers: &mut JoinSet<()>, recv: Receiver) -> Result { +fn spawn_one(self: &Arc, workers: &mut JoinSet<()>, recv: &[Receiver]) -> Result { + debug_assert!(!self.queues.is_empty(), "Must have at least one queue"); + debug_assert!(!recv.is_empty(), "Must have at least one receiver"); + let id = workers.len(); + let group = id.overflowing_rem(self.queues.len()).0; + let recv = recv[group].clone(); let self_ = self.clone(); #[cfg(not(tokio_unstable))] @@ -146,7 +177,9 @@ fn spawn_one(self: &Arc, workers: &mut JoinSet<()>, recv: Receiver) - pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result> { let (send, recv) = oneshot::channel(); _ = cmd.res.insert(send); - self.execute(Cmd::Get(cmd)) + + let queue = self.select_queue(); + self.execute(queue, Cmd::Get(cmd)) .and_then(|()| { recv.map_ok(into_recv_get_result) .map_err(|e| err!(error!("recv failed {e:?}"))) @@ -159,7 +192,9 @@ pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result> { pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result> { let (send, recv) = oneshot::channel(); _ = cmd.res.insert(send); - self.execute(Cmd::Iter(cmd)) + + let queue = self.select_queue(); + self.execute(queue, Cmd::Iter(cmd)) .and_then(|()| { recv.map_ok(into_recv_seek) .map_err(|e| err!(error!("recv failed {e:?}"))) @@ -167,6 +202,13 @@ pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result &Sender { + let core_id = get_affinity().next().unwrap_or(0); + let chan_id = self.topology[core_id]; + self.queues.get(chan_id).unwrap_or_else(|| &self.queues[0]) +} + #[implement(Pool)] #[tracing::instrument( level = "trace", @@ -174,25 +216,24 @@ pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result Result { +async fn execute(&self, queue: &Sender, cmd: Cmd) -> Result { if cfg!(debug_assertions) { - self.queued_max - .fetch_max(self.queue.len(), Ordering::Relaxed); + self.queued_max.fetch_max(queue.len(), Ordering::Relaxed); } - if self.queue.is_full() { + if queue.is_full() { debug_warn!( - capacity = ?self.queue.capacity(), + capacity = ?queue.capacity(), "pool queue is full" ); } - self.queue + queue .send(cmd) .await .map_err(|e| err!(error!("send failed {e:?}"))) @@ -208,12 +249,33 @@ async fn execute(&self, cmd: Cmd) -> Result { ), )] fn worker(self: Arc, id: usize, recv: Receiver) { - debug!("worker spawned"); - defer! {{ debug!("worker finished"); }} + defer! {{ trace!("worker finished"); }} + trace!("worker spawned"); + self.worker_init(id); self.worker_loop(&recv); } +#[implement(Pool)] +fn worker_init(&self, id: usize) { + let group = id.overflowing_rem(self.queues.len()).0; + let affinity = self + .topology + .iter() + .enumerate() + .filter(|_| self.queues.len() > 1) + .filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id)) + .filter_map(get_core_available); + + // affinity is empty (no-op) if there's only one queue + set_affinity(affinity.clone()); + debug!( + ?group, + affinity = ?affinity.collect::>(), + "worker ready" + ); +} + #[implement(Pool)] fn worker_loop(&self, recv: &Receiver) { // initial +1 needed prior to entering wait diff --git a/src/database/pool/configure.rs b/src/database/pool/configure.rs new file mode 100644 index 00000000..8353a265 --- /dev/null +++ b/src/database/pool/configure.rs @@ -0,0 +1,106 @@ +use std::{ffi::OsStr, sync::Arc}; + +use conduwuit::{ + debug_info, + utils::{ + sys::{compute::is_core_available, storage}, + BoolExt, + }, + Server, +}; + +use super::{QUEUE_LIMIT, WORKER_LIMIT}; + +pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) { + let config = &server.config; + + // This finds the block device and gathers all the properties we need. + let (device_name, device_prop) = config + .db_pool_affinity + .and_then(|| storage::name_from_path(&config.database_path)) + .map(|device_name| (device_name.clone(), storage::parallelism(&device_name))) + .unzip(); + + // The default worker count is masked-on if we didn't find better information. + let default_worker_count = device_prop + .as_ref() + .is_none_or(|prop| prop.mq.is_empty()) + .then_some(config.db_pool_workers); + + // Determine the worker groupings. Each indice represents a hardware queue and + // contains the number of workers which will service it. + let worker_counts: Vec<_> = device_prop + .iter() + .map(|dev| &dev.mq) + .flat_map(|mq| mq.iter()) + .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available)) + .map(|mq| { + mq.nr_tags.unwrap_or_default().min( + config.db_pool_workers_limit.saturating_mul( + mq.cpu_list + .iter() + .filter(|&&id| is_core_available(id)) + .count() + .max(1), + ), + ) + }) + .chain(default_worker_count) + .collect(); + + // Determine our software queue size for each hardware queue. This is the mpmc + // between the tokio worker and the pool worker. + let queue_sizes: Vec<_> = worker_counts + .iter() + .map(|worker_count| { + worker_count + .saturating_mul(config.db_pool_queue_mult) + .clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1) + }) + .collect(); + + // Determine the CPU affinities of each hardware queue. Each indice is a cpu and + // each value is the associated hardware queue. There is a little shiftiness + // going on because cpu's which are not available to the process are filtered + // out, similar to the worker_counts. + let topology = device_prop + .iter() + .map(|dev| &dev.mq) + .flat_map(|mq| mq.iter()) + .fold(vec![0; 128], |mut topology, mq| { + mq.cpu_list + .iter() + .filter(|&&id| is_core_available(id)) + .for_each(|&id| { + topology[id] = mq.id; + }); + + topology + }); + + // Regardless of the capacity of all queues we establish some limit on the total + // number of workers; this is hopefully hinted by nr_requests. + let max_workers = device_prop + .as_ref() + .and_then(|prop| prop.nr_requests) + .unwrap_or(WORKER_LIMIT.1); + + // Determine the final worker count which we'll be spawning. + let total_workers = worker_counts + .iter() + .sum::() + .clamp(WORKER_LIMIT.0, max_workers); + + debug_info!( + device_name = ?device_name + .as_deref() + .and_then(OsStr::to_str) + .unwrap_or("None"), + ?worker_counts, + ?queue_sizes, + ?total_workers, + "Frontend topology", + ); + + (total_workers, queue_sizes, topology) +} diff --git a/src/main/runtime.rs b/src/main/runtime.rs index cee093ea..bfd2ef74 100644 --- a/src/main/runtime.rs +++ b/src/main/runtime.rs @@ -19,7 +19,7 @@ use crate::clap::Args; const WORKER_NAME: &str = "conduwuit:worker"; const WORKER_MIN: usize = 2; const WORKER_KEEPALIVE: u64 = 36; -const MAX_BLOCKING_THREADS: usize = 1024; +const MAX_BLOCKING_THREADS: usize = 2048; static WORKER_AFFINITY: OnceLock = OnceLock::new();