optimize for multi-queue storage topologies with affinity

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-22 15:09:30 +00:00 committed by strawberry
parent e5a1309583
commit b195107053
6 changed files with 297 additions and 77 deletions

View file

@ -1397,18 +1397,42 @@
#
#admin_room_notices = true
# Enable database pool affinity support. On supporting systems, block
# device queue topologies are detected and the request pool is optimized
# for the hardware; db_pool_workers is determined automatically.
#
#db_pool_affinity = true
# Sets the number of worker threads in the frontend-pool of the database.
# This number should reflect the I/O capabilities of the system,
# specifically the queue-depth or the number of simultaneous requests in
# such as the queue-depth or the number of simultaneous requests in
# flight. Defaults to 32 or four times the number of CPU cores, whichever
# is greater.
#
# Note: This value is only used if db_pool_affinity is disabled or not
# detected on the system, otherwise it is determined automatically.
#
#db_pool_workers = 32
# Size of the queue feeding the database's frontend-pool. Defaults to 256
# or eight times the number of CPU cores, whichever is greater.
# When db_pool_affinity is enabled and detected, the size of any worker
# group will not exceed the determined value. This is necessary when
# thread-pooling approach does not scale to the full capabilities of
# high-end hardware; using detected values without limitation could
# degrade performance.
#
#db_pool_queue_size = 256
# The value is multiplied by the number of cores which share a device
# queue, since group workers can be scheduled on any of those cores.
#
#db_pool_workers_limit = 64
# Determines the size of the queues feeding the database's frontend-pool.
# The size of the queue is determined by multiplying this value with the
# number of pool workers. When this queue is full, tokio tasks conducting
# requests will yield until space is available; this is good for
# flow-control by avoiding buffer-bloat, but can inhibit throughput if
# too low.
#
#db_pool_queue_mult = 4
# Number of sender task workers; determines sender parallelism. Default is
# '0' which means the value is determined internally, likely matching the

View file

@ -1581,22 +1581,50 @@ pub struct Config {
#[serde(default = "true_fn")]
pub admin_room_notices: bool,
/// Enable database pool affinity support. On supporting systems, block
/// device queue topologies are detected and the request pool is optimized
/// for the hardware; db_pool_workers is determined automatically.
///
/// default: true
#[serde(default = "true_fn")]
pub db_pool_affinity: bool,
/// Sets the number of worker threads in the frontend-pool of the database.
/// This number should reflect the I/O capabilities of the system,
/// specifically the queue-depth or the number of simultaneous requests in
/// such as the queue-depth or the number of simultaneous requests in
/// flight. Defaults to 32 or four times the number of CPU cores, whichever
/// is greater.
///
/// Note: This value is only used if db_pool_affinity is disabled or not
/// detected on the system, otherwise it is determined automatically.
///
/// default: 32
#[serde(default = "default_db_pool_workers")]
pub db_pool_workers: usize,
/// Size of the queue feeding the database's frontend-pool. Defaults to 256
/// or eight times the number of CPU cores, whichever is greater.
/// When db_pool_affinity is enabled and detected, the size of any worker
/// group will not exceed the determined value. This is necessary when
/// thread-pooling approach does not scale to the full capabilities of
/// high-end hardware; using detected values without limitation could
/// degrade performance.
///
/// default: 256
#[serde(default = "default_db_pool_queue_size")]
pub db_pool_queue_size: usize,
/// The value is multiplied by the number of cores which share a device
/// queue, since group workers can be scheduled on any of those cores.
///
/// default: 64
#[serde(default = "default_db_pool_workers_limit")]
pub db_pool_workers_limit: usize,
/// Determines the size of the queues feeding the database's frontend-pool.
/// The size of the queue is determined by multiplying this value with the
/// number of pool workers. When this queue is full, tokio tasks conducting
/// requests will yield until space is available; this is good for
/// flow-control by avoiding buffer-bloat, but can inhibit throughput if
/// too low.
///
/// default: 4
#[serde(default = "default_db_pool_queue_mult")]
pub db_pool_queue_mult: usize,
/// Number of sender task workers; determines sender parallelism. Default is
/// '0' which means the value is determined internally, likely matching the
@ -2399,8 +2427,12 @@ fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_p
fn default_trusted_server_batch_size() -> usize { 256 }
fn default_db_pool_workers() -> usize { sys::available_parallelism().saturating_mul(4).max(32) }
fn default_db_pool_queue_size() -> usize {
sys::available_parallelism().saturating_mul(8).max(256)
fn default_db_pool_workers() -> usize {
sys::available_parallelism()
.saturating_mul(4)
.clamp(32, 1024)
}
fn default_db_pool_workers_limit() -> usize { 64 }
fn default_db_pool_queue_mult() -> usize { 4 }

View file

@ -18,7 +18,7 @@ use rocksdb::{
use crate::{
opts::{cf_options, db_options},
or_else, pool,
or_else,
pool::Pool,
result,
util::map_err,
@ -87,8 +87,9 @@ impl Engine {
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
.collect::<Vec<_>>();
debug!("Opening database...");
let path = &config.database_path;
debug!("Opening database...");
let res = if config.rocksdb_read_only {
Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
} else if config.rocksdb_secondary {
@ -105,11 +106,6 @@ impl Engine {
"Opened database."
);
let pool_opts = pool::Opts {
queue_size: config.db_pool_queue_size,
worker_num: config.db_pool_workers,
};
Ok(Arc::new(Self {
server: server.clone(),
row_cache,
@ -121,7 +117,7 @@ impl Engine {
corks: AtomicU32::new(0),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
pool: Pool::new(server, &pool_opts).await?,
pool: Pool::new(server).await?,
}))
}

View file

@ -1,3 +1,5 @@
mod configure;
use std::{
mem::take,
sync::{
@ -6,39 +8,50 @@ use std::{
},
};
use async_channel::{bounded, Receiver, RecvError, Sender};
use conduwuit::{debug, debug_warn, defer, err, implement, result::DebugInspect, Result, Server};
use async_channel::{Receiver, RecvError, Sender};
use conduwuit::{
debug, debug_warn, defer, err, implement,
result::DebugInspect,
trace,
utils::sys::compute::{get_affinity, get_core_available, set_affinity},
Result, Server,
};
use futures::{channel::oneshot, TryFutureExt};
use oneshot::Sender as ResultSender;
use rocksdb::Direction;
use tokio::task::JoinSet;
use self::configure::configure;
use crate::{keyval::KeyBuf, stream, Handle, Map};
/// Frontend thread-pool. Operating system threads are used to make database
/// requests which are not cached. These thread-blocking requests are offloaded
/// from the tokio async workers and executed on this threadpool.
pub(crate) struct Pool {
server: Arc<Server>,
queues: Vec<Sender<Cmd>>,
workers: Mutex<JoinSet<()>>,
queue: Sender<Cmd>,
topology: Vec<usize>,
busy: AtomicUsize,
queued_max: AtomicUsize,
}
pub(crate) struct Opts {
pub(crate) queue_size: usize,
pub(crate) worker_num: usize,
}
/// Operations which can be submitted to the pool.
pub(crate) enum Cmd {
Get(Get),
Iter(Seek),
}
/// Point-query
pub(crate) struct Get {
pub(crate) map: Arc<Map>,
pub(crate) key: KeyBuf,
pub(crate) res: Option<ResultSender<Result<Handle<'static>>>>,
}
/// Iterator-seek.
/// Note: only initial seek is supported at this time on the assumption rocksdb
/// prefetching prevents mid-iteration polls from blocking on I/O.
pub(crate) struct Seek {
pub(crate) map: Arc<Map>,
pub(crate) state: stream::State<'static>,
@ -47,34 +60,44 @@ pub(crate) struct Seek {
pub(crate) res: Option<ResultSender<stream::State<'static>>>,
}
const QUEUE_LIMIT: (usize, usize) = (1, 3072);
const WORKER_LIMIT: (usize, usize) = (1, 512);
impl Drop for Pool {
fn drop(&mut self) {
debug_assert!(self.queue.is_empty(), "channel must be empty on drop");
debug_assert!(self.queue.is_closed(), "channel should be closed on drop");
}
}
const WORKER_LIMIT: (usize, usize) = (1, 1024);
const QUEUE_LIMIT: (usize, usize) = (1, 2048);
#[implement(Pool)]
pub(crate) async fn new(server: &Arc<Server>, opts: &Opts) -> Result<Arc<Self>> {
let queue_size = opts.queue_size.clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1);
let (send, recv) = bounded(queue_size);
pub(crate) async fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
let (total_workers, queue_sizes, topology) = configure(server);
let (senders, receivers) = queue_sizes.into_iter().map(async_channel::bounded).unzip();
let pool = Arc::new(Self {
server: server.clone(),
queues: senders,
workers: JoinSet::new().into(),
queue: send,
topology,
busy: AtomicUsize::default(),
queued_max: AtomicUsize::default(),
});
let worker_num = opts.worker_num.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
pool.spawn_until(recv, worker_num).await?;
pool.spawn_until(receivers, total_workers).await?;
Ok(pool)
}
impl Drop for Pool {
fn drop(&mut self) {
debug_assert!(self.queues.iter().all(Sender::is_empty), "channel must be empty on drop");
debug_assert!(
self.queues.iter().all(Sender::is_closed),
"channel should be closed on drop"
);
}
}
#[implement(Pool)]
pub(crate) async fn shutdown(self: &Arc<Self>) {
self.close();
@ -83,36 +106,39 @@ pub(crate) async fn shutdown(self: &Arc<Self>) {
debug!(workers = workers.len(), "Waiting for workers to join...");
workers.join_all().await;
debug_assert!(self.queue.is_empty(), "channel is not empty");
}
#[implement(Pool)]
pub(crate) fn close(&self) -> bool {
if !self.queue.close() {
return false;
}
pub(crate) fn close(&self) {
let senders = self.queues.iter().map(Sender::sender_count).sum::<usize>();
let mut workers = take(&mut *self.workers.lock().expect("locked"));
debug!(workers = workers.len(), "Waiting for workers to join...");
workers.abort_all();
drop(workers);
let receivers = self
.queues
.iter()
.map(Sender::receiver_count)
.sum::<usize>();
std::thread::yield_now();
debug_assert!(self.queue.is_empty(), "channel is not empty");
debug!(
senders = self.queue.sender_count(),
receivers = self.queue.receiver_count(),
"Closed pool channel"
queues = self.queues.len(),
workers = self.workers.lock().expect("locked").len(),
?senders,
?receivers,
"Closing pool..."
);
true
for queue in &self.queues {
queue.close();
}
self.workers.lock().expect("locked").abort_all();
std::thread::yield_now();
}
#[implement(Pool)]
async fn spawn_until(self: &Arc<Self>, recv: Receiver<Cmd>, max: usize) -> Result {
async fn spawn_until(self: &Arc<Self>, recv: Vec<Receiver<Cmd>>, count: usize) -> Result {
let mut workers = self.workers.lock().expect("locked");
while workers.len() < max {
self.spawn_one(&mut workers, recv.clone())?;
while workers.len() < count {
self.spawn_one(&mut workers, &recv)?;
}
Ok(())
@ -125,8 +151,13 @@ async fn spawn_until(self: &Arc<Self>, recv: Receiver<Cmd>, max: usize) -> Resul
skip_all,
fields(id = %workers.len())
)]
fn spawn_one(self: &Arc<Self>, workers: &mut JoinSet<()>, recv: Receiver<Cmd>) -> Result {
fn spawn_one(self: &Arc<Self>, workers: &mut JoinSet<()>, recv: &[Receiver<Cmd>]) -> Result {
debug_assert!(!self.queues.is_empty(), "Must have at least one queue");
debug_assert!(!recv.is_empty(), "Must have at least one receiver");
let id = workers.len();
let group = id.overflowing_rem(self.queues.len()).0;
let recv = recv[group].clone();
let self_ = self.clone();
#[cfg(not(tokio_unstable))]
@ -146,7 +177,9 @@ fn spawn_one(self: &Arc<Self>, workers: &mut JoinSet<()>, recv: Receiver<Cmd>) -
pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result<Handle<'_>> {
let (send, recv) = oneshot::channel();
_ = cmd.res.insert(send);
self.execute(Cmd::Get(cmd))
let queue = self.select_queue();
self.execute(queue, Cmd::Get(cmd))
.and_then(|()| {
recv.map_ok(into_recv_get_result)
.map_err(|e| err!(error!("recv failed {e:?}")))
@ -159,7 +192,9 @@ pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result<Handle<'_>> {
pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result<stream::State<'_>> {
let (send, recv) = oneshot::channel();
_ = cmd.res.insert(send);
self.execute(Cmd::Iter(cmd))
let queue = self.select_queue();
self.execute(queue, Cmd::Iter(cmd))
.and_then(|()| {
recv.map_ok(into_recv_seek)
.map_err(|e| err!(error!("recv failed {e:?}")))
@ -167,6 +202,13 @@ pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result<stream::State<'
.await
}
#[implement(Pool)]
fn select_queue(&self) -> &Sender<Cmd> {
let core_id = get_affinity().next().unwrap_or(0);
let chan_id = self.topology[core_id];
self.queues.get(chan_id).unwrap_or_else(|| &self.queues[0])
}
#[implement(Pool)]
#[tracing::instrument(
level = "trace",
@ -174,25 +216,24 @@ pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result<stream::State<'
skip(self, cmd),
fields(
task = ?tokio::task::try_id(),
receivers = self.queue.receiver_count(),
queued = self.queue.len(),
receivers = queue.receiver_count(),
queued = queue.len(),
queued_max = self.queued_max.load(Ordering::Relaxed),
),
)]
async fn execute(&self, cmd: Cmd) -> Result {
async fn execute(&self, queue: &Sender<Cmd>, cmd: Cmd) -> Result {
if cfg!(debug_assertions) {
self.queued_max
.fetch_max(self.queue.len(), Ordering::Relaxed);
self.queued_max.fetch_max(queue.len(), Ordering::Relaxed);
}
if self.queue.is_full() {
if queue.is_full() {
debug_warn!(
capacity = ?self.queue.capacity(),
capacity = ?queue.capacity(),
"pool queue is full"
);
}
self.queue
queue
.send(cmd)
.await
.map_err(|e| err!(error!("send failed {e:?}")))
@ -208,12 +249,33 @@ async fn execute(&self, cmd: Cmd) -> Result {
),
)]
fn worker(self: Arc<Self>, id: usize, recv: Receiver<Cmd>) {
debug!("worker spawned");
defer! {{ debug!("worker finished"); }}
defer! {{ trace!("worker finished"); }}
trace!("worker spawned");
self.worker_init(id);
self.worker_loop(&recv);
}
#[implement(Pool)]
fn worker_init(&self, id: usize) {
let group = id.overflowing_rem(self.queues.len()).0;
let affinity = self
.topology
.iter()
.enumerate()
.filter(|_| self.queues.len() > 1)
.filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id))
.filter_map(get_core_available);
// affinity is empty (no-op) if there's only one queue
set_affinity(affinity.clone());
debug!(
?group,
affinity = ?affinity.collect::<Vec<_>>(),
"worker ready"
);
}
#[implement(Pool)]
fn worker_loop(&self, recv: &Receiver<Cmd>) {
// initial +1 needed prior to entering wait

View file

@ -0,0 +1,106 @@
use std::{ffi::OsStr, sync::Arc};
use conduwuit::{
debug_info,
utils::{
sys::{compute::is_core_available, storage},
BoolExt,
},
Server,
};
use super::{QUEUE_LIMIT, WORKER_LIMIT};
pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>) {
let config = &server.config;
// This finds the block device and gathers all the properties we need.
let (device_name, device_prop) = config
.db_pool_affinity
.and_then(|| storage::name_from_path(&config.database_path))
.map(|device_name| (device_name.clone(), storage::parallelism(&device_name)))
.unzip();
// The default worker count is masked-on if we didn't find better information.
let default_worker_count = device_prop
.as_ref()
.is_none_or(|prop| prop.mq.is_empty())
.then_some(config.db_pool_workers);
// Determine the worker groupings. Each indice represents a hardware queue and
// contains the number of workers which will service it.
let worker_counts: Vec<_> = device_prop
.iter()
.map(|dev| &dev.mq)
.flat_map(|mq| mq.iter())
.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
.map(|mq| {
mq.nr_tags.unwrap_or_default().min(
config.db_pool_workers_limit.saturating_mul(
mq.cpu_list
.iter()
.filter(|&&id| is_core_available(id))
.count()
.max(1),
),
)
})
.chain(default_worker_count)
.collect();
// Determine our software queue size for each hardware queue. This is the mpmc
// between the tokio worker and the pool worker.
let queue_sizes: Vec<_> = worker_counts
.iter()
.map(|worker_count| {
worker_count
.saturating_mul(config.db_pool_queue_mult)
.clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1)
})
.collect();
// Determine the CPU affinities of each hardware queue. Each indice is a cpu and
// each value is the associated hardware queue. There is a little shiftiness
// going on because cpu's which are not available to the process are filtered
// out, similar to the worker_counts.
let topology = device_prop
.iter()
.map(|dev| &dev.mq)
.flat_map(|mq| mq.iter())
.fold(vec![0; 128], |mut topology, mq| {
mq.cpu_list
.iter()
.filter(|&&id| is_core_available(id))
.for_each(|&id| {
topology[id] = mq.id;
});
topology
});
// Regardless of the capacity of all queues we establish some limit on the total
// number of workers; this is hopefully hinted by nr_requests.
let max_workers = device_prop
.as_ref()
.and_then(|prop| prop.nr_requests)
.unwrap_or(WORKER_LIMIT.1);
// Determine the final worker count which we'll be spawning.
let total_workers = worker_counts
.iter()
.sum::<usize>()
.clamp(WORKER_LIMIT.0, max_workers);
debug_info!(
device_name = ?device_name
.as_deref()
.and_then(OsStr::to_str)
.unwrap_or("None"),
?worker_counts,
?queue_sizes,
?total_workers,
"Frontend topology",
);
(total_workers, queue_sizes, topology)
}

View file

@ -19,7 +19,7 @@ use crate::clap::Args;
const WORKER_NAME: &str = "conduwuit:worker";
const WORKER_MIN: usize = 2;
const WORKER_KEEPALIVE: u64 = 36;
const MAX_BLOCKING_THREADS: usize = 1024;
const MAX_BLOCKING_THREADS: usize = 2048;
static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();