diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 526e9fe2..e2ed5daa 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1452,7 +1452,7 @@ # responsiveness for many users at the cost of throughput for each. # # Setting this value to 0.0 causes the stream width to be fixed at the -# value of stream_width_default. The default is 1.0 to match the +# value of stream_width_default. The default scale is 1.0 to match the # capabilities detected for the system. # #stream_width_scale = 1.0 diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index b1ede844..bf6a4ba6 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1646,7 +1646,7 @@ pub struct Config { /// responsiveness for many users at the cost of throughput for each. /// /// Setting this value to 0.0 causes the stream width to be fixed at the - /// value of stream_width_default. The default is 1.0 to match the + /// value of stream_width_default. The default scale is 1.0 to match the /// capabilities detected for the system. /// /// default: 1.0 diff --git a/src/database/pool.rs b/src/database/pool.rs index 1560c8b0..11871ff6 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -30,7 +30,7 @@ use crate::{keyval::KeyBuf, stream, Handle, Map}; /// requests which are not cached. These thread-blocking requests are offloaded /// from the tokio async workers and executed on this threadpool. pub(crate) struct Pool { - _server: Arc, + server: Arc, queues: Vec>, workers: Mutex>>, topology: Vec, @@ -67,7 +67,7 @@ pub(crate) type BatchResult<'a> = SmallVec<[ResultHandle<'a>; BATCH_INLINE]>; pub(crate) type ResultHandle<'a> = Result>; const WORKER_LIMIT: (usize, usize) = (1, 1024); -const QUEUE_LIMIT: (usize, usize) = (1, 2048); +const QUEUE_LIMIT: (usize, usize) = (1, 4096); const BATCH_INLINE: usize = 1; const WORKER_STACK_SIZE: usize = 1_048_576; @@ -85,7 +85,7 @@ pub(crate) fn new(server: &Arc) -> Result> { .unzip(); let pool = Arc::new(Self { - _server: server.clone(), + server: server.clone(), queues: senders, workers: Vec::new().into(), topology, @@ -288,6 +288,7 @@ fn worker_init(&self, id: usize) { .iter() .enumerate() .filter(|_| self.queues.len() > 1) + .filter(|_| self.server.config.db_pool_affinity) .filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id)) .filter_map(nth_core_available); diff --git a/src/database/pool/configure.rs b/src/database/pool/configure.rs index 6cac58e7..ff42ef51 100644 --- a/src/database/pool/configure.rs +++ b/src/database/pool/configure.rs @@ -1,14 +1,13 @@ use std::{path::PathBuf, sync::Arc}; use conduwuit::{ - debug, debug_info, expected, + debug, debug_info, expected, is_equal_to, utils::{ math::usize_from_f64, result::LogDebugErr, stream, stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT}, sys::{compute::is_core_available, storage}, - BoolExt, }, Server, }; @@ -19,39 +18,32 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) let config = &server.config; // This finds the block device and gathers all the properties we need. - let (device_name, device_prop) = config - .db_pool_affinity - .and_then(|| { - let path: PathBuf = config.database_path.clone(); - let name = storage::name_from_path(&path).log_debug_err().ok(); - let prop = storage::parallelism(&path); - name.map(|name| (name, prop)) - }) - .unzip(); + let path: PathBuf = config.database_path.clone(); + let device_name = storage::name_from_path(&path).log_debug_err().ok(); + let device_prop = storage::parallelism(&path); // The default worker count is masked-on if we didn't find better information. - let default_worker_count = device_prop - .as_ref() - .is_none_or(|prop| prop.mq.is_empty()) - .then_some(config.db_pool_workers); + let default_worker_count = device_prop.mq.is_empty().then_some(config.db_pool_workers); // Determine the worker groupings. Each indice represents a hardware queue and // contains the number of workers which will service it. let worker_counts: Vec<_> = device_prop + .mq .iter() - .map(|dev| &dev.mq) - .flat_map(|mq| mq.iter()) .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available)) .map(|mq| { - mq.nr_tags.unwrap_or_default().min( - config.db_pool_workers_limit.saturating_mul( - mq.cpu_list - .iter() - .filter(|&&id| is_core_available(id)) - .count() - .max(1), - ), - ) + let shares = mq + .cpu_list + .iter() + .filter(|&&id| is_core_available(id)) + .count() + .max(1); + + let limit = config.db_pool_workers_limit.saturating_mul(shares); + + let limit = device_prop.nr_requests.map_or(limit, |nr| nr.min(limit)); + + mq.nr_tags.unwrap_or(WORKER_LIMIT.0).min(limit) }) .chain(default_worker_count) .collect(); @@ -72,9 +64,8 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) // going on because cpu's which are not available to the process are filtered // out, similar to the worker_counts. let topology = device_prop + .mq .iter() - .map(|dev| &dev.mq) - .flat_map(|mq| mq.iter()) .fold(vec![0; 128], |mut topology, mq| { mq.cpu_list .iter() @@ -89,9 +80,12 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) // Regardless of the capacity of all queues we establish some limit on the total // number of workers; this is hopefully hinted by nr_requests. let max_workers = device_prop - .as_ref() - .and_then(|prop| prop.nr_requests) - .unwrap_or(WORKER_LIMIT.1); + .mq + .iter() + .filter_map(|mq| mq.nr_tags) + .chain(default_worker_count) + .fold(0_usize, usize::saturating_add) + .clamp(WORKER_LIMIT.0, WORKER_LIMIT.1); // Determine the final worker count which we'll be spawning. let total_workers = worker_counts @@ -102,7 +96,7 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) // After computing all of the above we can update the global automatic stream // width, hopefully with a better value tailored to this system. if config.stream_width_scale > 0.0 { - let num_queues = queue_sizes.len(); + let num_queues = queue_sizes.len().max(1); update_stream_width(server, num_queues, total_workers); } @@ -117,6 +111,13 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) "Frontend topology", ); + assert!(total_workers > 0, "some workers expected"); + assert!(!queue_sizes.is_empty(), "some queues expected"); + assert!( + !queue_sizes.iter().copied().any(is_equal_to!(0)), + "positive queue sizes expected" + ); + (total_workers, queue_sizes, topology) }