improve db pool topology configuration
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
d36167ab64
commit
94c8683836
4 changed files with 39 additions and 37 deletions
|
@ -1452,7 +1452,7 @@
|
||||||
# responsiveness for many users at the cost of throughput for each.
|
# responsiveness for many users at the cost of throughput for each.
|
||||||
#
|
#
|
||||||
# Setting this value to 0.0 causes the stream width to be fixed at the
|
# Setting this value to 0.0 causes the stream width to be fixed at the
|
||||||
# value of stream_width_default. The default is 1.0 to match the
|
# value of stream_width_default. The default scale is 1.0 to match the
|
||||||
# capabilities detected for the system.
|
# capabilities detected for the system.
|
||||||
#
|
#
|
||||||
#stream_width_scale = 1.0
|
#stream_width_scale = 1.0
|
||||||
|
|
|
@ -1646,7 +1646,7 @@ pub struct Config {
|
||||||
/// responsiveness for many users at the cost of throughput for each.
|
/// responsiveness for many users at the cost of throughput for each.
|
||||||
///
|
///
|
||||||
/// Setting this value to 0.0 causes the stream width to be fixed at the
|
/// Setting this value to 0.0 causes the stream width to be fixed at the
|
||||||
/// value of stream_width_default. The default is 1.0 to match the
|
/// value of stream_width_default. The default scale is 1.0 to match the
|
||||||
/// capabilities detected for the system.
|
/// capabilities detected for the system.
|
||||||
///
|
///
|
||||||
/// default: 1.0
|
/// default: 1.0
|
||||||
|
|
|
@ -30,7 +30,7 @@ use crate::{keyval::KeyBuf, stream, Handle, Map};
|
||||||
/// requests which are not cached. These thread-blocking requests are offloaded
|
/// requests which are not cached. These thread-blocking requests are offloaded
|
||||||
/// from the tokio async workers and executed on this threadpool.
|
/// from the tokio async workers and executed on this threadpool.
|
||||||
pub(crate) struct Pool {
|
pub(crate) struct Pool {
|
||||||
_server: Arc<Server>,
|
server: Arc<Server>,
|
||||||
queues: Vec<Sender<Cmd>>,
|
queues: Vec<Sender<Cmd>>,
|
||||||
workers: Mutex<Vec<JoinHandle<()>>>,
|
workers: Mutex<Vec<JoinHandle<()>>>,
|
||||||
topology: Vec<usize>,
|
topology: Vec<usize>,
|
||||||
|
@ -67,7 +67,7 @@ pub(crate) type BatchResult<'a> = SmallVec<[ResultHandle<'a>; BATCH_INLINE]>;
|
||||||
pub(crate) type ResultHandle<'a> = Result<Handle<'a>>;
|
pub(crate) type ResultHandle<'a> = Result<Handle<'a>>;
|
||||||
|
|
||||||
const WORKER_LIMIT: (usize, usize) = (1, 1024);
|
const WORKER_LIMIT: (usize, usize) = (1, 1024);
|
||||||
const QUEUE_LIMIT: (usize, usize) = (1, 2048);
|
const QUEUE_LIMIT: (usize, usize) = (1, 4096);
|
||||||
const BATCH_INLINE: usize = 1;
|
const BATCH_INLINE: usize = 1;
|
||||||
|
|
||||||
const WORKER_STACK_SIZE: usize = 1_048_576;
|
const WORKER_STACK_SIZE: usize = 1_048_576;
|
||||||
|
@ -85,7 +85,7 @@ pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
|
||||||
.unzip();
|
.unzip();
|
||||||
|
|
||||||
let pool = Arc::new(Self {
|
let pool = Arc::new(Self {
|
||||||
_server: server.clone(),
|
server: server.clone(),
|
||||||
queues: senders,
|
queues: senders,
|
||||||
workers: Vec::new().into(),
|
workers: Vec::new().into(),
|
||||||
topology,
|
topology,
|
||||||
|
@ -288,6 +288,7 @@ fn worker_init(&self, id: usize) {
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter(|_| self.queues.len() > 1)
|
.filter(|_| self.queues.len() > 1)
|
||||||
|
.filter(|_| self.server.config.db_pool_affinity)
|
||||||
.filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id))
|
.filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id))
|
||||||
.filter_map(nth_core_available);
|
.filter_map(nth_core_available);
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,13 @@
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
debug, debug_info, expected,
|
debug, debug_info, expected, is_equal_to,
|
||||||
utils::{
|
utils::{
|
||||||
math::usize_from_f64,
|
math::usize_from_f64,
|
||||||
result::LogDebugErr,
|
result::LogDebugErr,
|
||||||
stream,
|
stream,
|
||||||
stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
|
stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
|
||||||
sys::{compute::is_core_available, storage},
|
sys::{compute::is_core_available, storage},
|
||||||
BoolExt,
|
|
||||||
},
|
},
|
||||||
Server,
|
Server,
|
||||||
};
|
};
|
||||||
|
@ -19,39 +18,32 @@ pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>)
|
||||||
let config = &server.config;
|
let config = &server.config;
|
||||||
|
|
||||||
// This finds the block device and gathers all the properties we need.
|
// This finds the block device and gathers all the properties we need.
|
||||||
let (device_name, device_prop) = config
|
let path: PathBuf = config.database_path.clone();
|
||||||
.db_pool_affinity
|
let device_name = storage::name_from_path(&path).log_debug_err().ok();
|
||||||
.and_then(|| {
|
let device_prop = storage::parallelism(&path);
|
||||||
let path: PathBuf = config.database_path.clone();
|
|
||||||
let name = storage::name_from_path(&path).log_debug_err().ok();
|
|
||||||
let prop = storage::parallelism(&path);
|
|
||||||
name.map(|name| (name, prop))
|
|
||||||
})
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
// The default worker count is masked-on if we didn't find better information.
|
// The default worker count is masked-on if we didn't find better information.
|
||||||
let default_worker_count = device_prop
|
let default_worker_count = device_prop.mq.is_empty().then_some(config.db_pool_workers);
|
||||||
.as_ref()
|
|
||||||
.is_none_or(|prop| prop.mq.is_empty())
|
|
||||||
.then_some(config.db_pool_workers);
|
|
||||||
|
|
||||||
// Determine the worker groupings. Each indice represents a hardware queue and
|
// Determine the worker groupings. Each indice represents a hardware queue and
|
||||||
// contains the number of workers which will service it.
|
// contains the number of workers which will service it.
|
||||||
let worker_counts: Vec<_> = device_prop
|
let worker_counts: Vec<_> = device_prop
|
||||||
|
.mq
|
||||||
.iter()
|
.iter()
|
||||||
.map(|dev| &dev.mq)
|
|
||||||
.flat_map(|mq| mq.iter())
|
|
||||||
.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
|
.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
|
||||||
.map(|mq| {
|
.map(|mq| {
|
||||||
mq.nr_tags.unwrap_or_default().min(
|
let shares = mq
|
||||||
config.db_pool_workers_limit.saturating_mul(
|
.cpu_list
|
||||||
mq.cpu_list
|
.iter()
|
||||||
.iter()
|
.filter(|&&id| is_core_available(id))
|
||||||
.filter(|&&id| is_core_available(id))
|
.count()
|
||||||
.count()
|
.max(1);
|
||||||
.max(1),
|
|
||||||
),
|
let limit = config.db_pool_workers_limit.saturating_mul(shares);
|
||||||
)
|
|
||||||
|
let limit = device_prop.nr_requests.map_or(limit, |nr| nr.min(limit));
|
||||||
|
|
||||||
|
mq.nr_tags.unwrap_or(WORKER_LIMIT.0).min(limit)
|
||||||
})
|
})
|
||||||
.chain(default_worker_count)
|
.chain(default_worker_count)
|
||||||
.collect();
|
.collect();
|
||||||
|
@ -72,9 +64,8 @@ pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>)
|
||||||
// going on because cpu's which are not available to the process are filtered
|
// going on because cpu's which are not available to the process are filtered
|
||||||
// out, similar to the worker_counts.
|
// out, similar to the worker_counts.
|
||||||
let topology = device_prop
|
let topology = device_prop
|
||||||
|
.mq
|
||||||
.iter()
|
.iter()
|
||||||
.map(|dev| &dev.mq)
|
|
||||||
.flat_map(|mq| mq.iter())
|
|
||||||
.fold(vec![0; 128], |mut topology, mq| {
|
.fold(vec![0; 128], |mut topology, mq| {
|
||||||
mq.cpu_list
|
mq.cpu_list
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -89,9 +80,12 @@ pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>)
|
||||||
// Regardless of the capacity of all queues we establish some limit on the total
|
// Regardless of the capacity of all queues we establish some limit on the total
|
||||||
// number of workers; this is hopefully hinted by nr_requests.
|
// number of workers; this is hopefully hinted by nr_requests.
|
||||||
let max_workers = device_prop
|
let max_workers = device_prop
|
||||||
.as_ref()
|
.mq
|
||||||
.and_then(|prop| prop.nr_requests)
|
.iter()
|
||||||
.unwrap_or(WORKER_LIMIT.1);
|
.filter_map(|mq| mq.nr_tags)
|
||||||
|
.chain(default_worker_count)
|
||||||
|
.fold(0_usize, usize::saturating_add)
|
||||||
|
.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
|
||||||
|
|
||||||
// Determine the final worker count which we'll be spawning.
|
// Determine the final worker count which we'll be spawning.
|
||||||
let total_workers = worker_counts
|
let total_workers = worker_counts
|
||||||
|
@ -102,7 +96,7 @@ pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>)
|
||||||
// After computing all of the above we can update the global automatic stream
|
// After computing all of the above we can update the global automatic stream
|
||||||
// width, hopefully with a better value tailored to this system.
|
// width, hopefully with a better value tailored to this system.
|
||||||
if config.stream_width_scale > 0.0 {
|
if config.stream_width_scale > 0.0 {
|
||||||
let num_queues = queue_sizes.len();
|
let num_queues = queue_sizes.len().max(1);
|
||||||
update_stream_width(server, num_queues, total_workers);
|
update_stream_width(server, num_queues, total_workers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,6 +111,13 @@ pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>)
|
||||||
"Frontend topology",
|
"Frontend topology",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
assert!(total_workers > 0, "some workers expected");
|
||||||
|
assert!(!queue_sizes.is_empty(), "some queues expected");
|
||||||
|
assert!(
|
||||||
|
!queue_sizes.iter().copied().any(is_equal_to!(0)),
|
||||||
|
"positive queue sizes expected"
|
||||||
|
);
|
||||||
|
|
||||||
(total_workers, queue_sizes, topology)
|
(total_workers, queue_sizes, topology)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue