From 2a9bb1ce11f931b1ec2e02865d4bf30ceec11175 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 28 Nov 2024 06:52:23 +0000 Subject: [PATCH] add configurables for frontend pool options Signed-off-by: Jason Volk --- conduwuit-example.toml | 12 ++++++++++++ src/core/config/mod.rs | 18 ++++++++++++++++++ src/database/engine.rs | 7 ++++++- src/database/pool.rs | 17 ++++++++--------- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 30da80e6..a0f05ebb 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1338,6 +1338,18 @@ # #admin_room_notices = true +# Sets the number of worker threads in the frontend-pool of the database. +# This number should reflect the I/O capabilities of the system, +# specifically the queue-depth or the number of simultaneous requests in +# flight. Defaults to 32 or number of CPU cores, whichever is greater. +# +#db_pool_workers = 32 + +# Size of the queue feeding the database's frontend-pool. Defaults to 256 +# or eight times the number of CPU cores, whichever is greater. +# +#db_pool_queue_size = 256 + [global.tls] # Path to a valid TLS certificate file. diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index fafd3396..edbb7c26 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1500,6 +1500,20 @@ pub struct Config { #[serde(default = "true_fn")] pub admin_room_notices: bool, + /// Sets the number of worker threads in the frontend-pool of the database. + /// This number should reflect the I/O capabilities of the system, + /// specifically the queue-depth or the number of simultaneous requests in + /// flight. Defaults to 32 or number of CPU cores, whichever is greater. + /// default: 32 + #[serde(default = "default_db_pool_workers")] + pub db_pool_workers: usize, + + /// Size of the queue feeding the database's frontend-pool. Defaults to 256 + /// or eight times the number of CPU cores, whichever is greater. + /// default: 256 + #[serde(default = "default_db_pool_queue_size")] + pub db_pool_queue_size: usize, + #[serde(flatten)] #[allow(clippy::zero_sized_map_values)] // this is a catchall, the map shouldn't be zero at runtime catchall: BTreeMap, @@ -2265,3 +2279,7 @@ fn parallelism_scaled_u32(val: u32) -> u32 { fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } + +fn default_db_pool_workers() -> usize { sys::available_parallelism().max(32) } + +fn default_db_pool_queue_size() -> usize { sys::available_parallelism().saturating_mul(8).max(256) } diff --git a/src/database/engine.rs b/src/database/engine.rs index 837c7259..e700be62 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -103,6 +103,11 @@ impl Engine { "Opened database." ); + let pool_opts = pool::Opts { + queue_size: config.db_pool_queue_size, + worker_num: config.db_pool_workers, + }; + Ok(Arc::new(Self { server: server.clone(), row_cache, @@ -114,7 +119,7 @@ impl Engine { corks: AtomicU32::new(0), read_only: config.rocksdb_read_only, secondary: config.rocksdb_secondary, - pool: Pool::new(&pool::Opts::default())?, + pool: Pool::new(&pool_opts)?, })) } diff --git a/src/database/pool.rs b/src/database/pool.rs index 6e7a1e29..ee3e67dd 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -17,15 +17,14 @@ pub(crate) struct Pool { send: Sender, } -#[derive(Default)] pub(crate) struct Opts { - queue_size: Option, - worker_num: Option, + pub(crate) queue_size: usize, + pub(crate) worker_num: usize, } +const QUEUE_LIMIT: (usize, usize) = (1, 8192); +const WORKER_LIMIT: (usize, usize) = (1, 512); const WORKER_THREAD_NAME: &str = "conduwuit:db"; -const DEFAULT_QUEUE_SIZE: usize = 1024; -const DEFAULT_WORKER_NUM: usize = 32; #[derive(Debug)] pub(crate) enum Cmd { @@ -43,7 +42,7 @@ type ResultSender = oneshot::Sender>>; #[implement(Pool)] pub(crate) fn new(opts: &Opts) -> Result> { - let queue_size = opts.queue_size.unwrap_or(DEFAULT_QUEUE_SIZE); + let queue_size = opts.queue_size.clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1); let (send, recv) = bounded(queue_size); let pool = Arc::new(Self { @@ -52,7 +51,7 @@ pub(crate) fn new(opts: &Opts) -> Result> { send, }); - let worker_num = opts.worker_num.unwrap_or(DEFAULT_WORKER_NUM); + let worker_num = opts.worker_num.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1); pool.spawn_until(worker_num)?; Ok(pool) @@ -147,12 +146,12 @@ fn worker(self: Arc, id: usize) { #[implement(Pool)] fn worker_loop(&self, id: usize) { while let Ok(mut cmd) = self.recv.recv_blocking() { - self.handle(id, &mut cmd); + self.worker_handle(id, &mut cmd); } } #[implement(Pool)] -fn handle(&self, id: usize, cmd: &mut Cmd) { +fn worker_handle(&self, id: usize, cmd: &mut Cmd) { match cmd { Cmd::Get(get) => self.handle_get(id, get), }