From 7a6d6575585cd001f0c889aa41c1a2663b2435d0 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 23 Dec 2024 04:32:28 +0000 Subject: [PATCH] configurable dynamic stream concurrency scalar Signed-off-by: Jason Volk --- conduwuit-example.toml | 23 +++++++++++++++++ src/core/config/mod.rs | 31 +++++++++++++++++++++++ src/core/utils/stream/broadband.rs | 13 +++++----- src/core/utils/stream/mod.rs | 29 ++++++++++++++++++++++ src/core/utils/stream/try_broadband.rs | 5 ++-- src/core/utils/stream/wideband.rs | 9 +++---- src/database/map/get_batch.rs | 10 +++++--- src/database/pool/configure.rs | 34 +++++++++++++++++++++++++- src/main/server.rs | 10 +++++++- 9 files changed, 144 insertions(+), 20 deletions(-) diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 111acb05..c64b18e8 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1434,6 +1434,29 @@ # #db_pool_queue_mult = 4 +# Sets the initial value for the concurrency of streams. This value simply +# allows overriding the default in the code. The default is 32, which is +# the same as the default in the code. Note this value is itself +# overridden by the computed stream_width_scale, unless that is disabled; +# this value can serve as a fixed-width instead. +# +#stream_width_default = 32 + +# Scales the stream width starting from a base value detected for the +# specific system. The base value is the database pool worker count +# determined from the hardware queue size (e.g. 32 for SSD or 64 or 128+ +# for NVMe). This float allows scaling the width up or down by multiplying +# it (e.g. 1.5, 2.0, etc). The maximum result can be the size of the pool +# queue (see: db_pool_queue_mult) as any larger value will stall the tokio +# task. The value can also be scaled down (e.g. 0.5) to improve +# responsiveness for many users at the cost of throughput for each. +# +# Setting this value to 0.0 causes the stream width to be fixed at the +# value of stream_width_default. The default is 1.0 to match the +# capabilities detected for the system. +# +#stream_width_scale = 1.0 + # Number of sender task workers; determines sender parallelism. Default is # '0' which means the value is determined internally, likely matching the # number of tokio worker-threads or number of cores, etc. Override by diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 3772aa16..e1f578c8 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1626,6 +1626,33 @@ pub struct Config { #[serde(default = "default_db_pool_queue_mult")] pub db_pool_queue_mult: usize, + /// Sets the initial value for the concurrency of streams. This value simply + /// allows overriding the default in the code. The default is 32, which is + /// the same as the default in the code. Note this value is itself + /// overridden by the computed stream_width_scale, unless that is disabled; + /// this value can serve as a fixed-width instead. + /// + /// default: 32 + #[serde(default = "default_stream_width_default")] + pub stream_width_default: usize, + + /// Scales the stream width starting from a base value detected for the + /// specific system. The base value is the database pool worker count + /// determined from the hardware queue size (e.g. 32 for SSD or 64 or 128+ + /// for NVMe). This float allows scaling the width up or down by multiplying + /// it (e.g. 1.5, 2.0, etc). The maximum result can be the size of the pool + /// queue (see: db_pool_queue_mult) as any larger value will stall the tokio + /// task. The value can also be scaled down (e.g. 0.5) to improve + /// responsiveness for many users at the cost of throughput for each. + /// + /// Setting this value to 0.0 causes the stream width to be fixed at the + /// value of stream_width_default. The default is 1.0 to match the + /// capabilities detected for the system. + /// + /// default: 1.0 + #[serde(default = "default_stream_width_scale")] + pub stream_width_scale: f32, + /// Number of sender task workers; determines sender parallelism. Default is /// '0' which means the value is determined internally, likely matching the /// number of tokio worker-threads or number of cores, etc. Override by @@ -2436,3 +2463,7 @@ fn default_db_pool_workers() -> usize { fn default_db_pool_workers_limit() -> usize { 64 } fn default_db_pool_queue_mult() -> usize { 4 } + +fn default_stream_width_default() -> usize { 32 } + +fn default_stream_width_scale() -> f32 { 1.0 } diff --git a/src/core/utils/stream/broadband.rs b/src/core/utils/stream/broadband.rs index 37416d63..6d1ff6fe 100644 --- a/src/core/utils/stream/broadband.rs +++ b/src/core/utils/stream/broadband.rs @@ -7,9 +7,7 @@ use futures::{ Future, }; -use super::ReadyExt; - -const WIDTH: usize = 32; +use super::{automatic_width, ReadyExt}; /// Concurrency extensions to augment futures::StreamExt. broad_ combinators /// produce out-of-order @@ -95,7 +93,7 @@ where Fut: Future + Send, { self.map(f) - .buffer_unordered(n.into().unwrap_or(WIDTH)) + .buffer_unordered(n.into().unwrap_or_else(automatic_width)) .ready_all(identity) } @@ -107,7 +105,7 @@ where Fut: Future + Send, { self.map(f) - .buffer_unordered(n.into().unwrap_or(WIDTH)) + .buffer_unordered(n.into().unwrap_or_else(automatic_width)) .ready_any(identity) } @@ -120,7 +118,7 @@ where U: Send, { self.map(f) - .buffer_unordered(n.into().unwrap_or(WIDTH)) + .buffer_unordered(n.into().unwrap_or_else(automatic_width)) .ready_filter_map(identity) } @@ -132,6 +130,7 @@ where Fut: Future + Send, U: Send, { - self.map(f).buffer_unordered(n.into().unwrap_or(WIDTH)) + self.map(f) + .buffer_unordered(n.into().unwrap_or_else(automatic_width)) } } diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index c9138116..4456784f 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -19,3 +19,32 @@ pub use tools::Tools; pub use try_broadband::TryBroadbandExt; pub use try_ready::TryReadyExt; pub use wideband::WidebandExt; + +/// Stream concurrency factor; this is a live value. +static WIDTH: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(32); + +/// Practicable limits on the stream width +pub const WIDTH_LIMIT: (usize, usize) = (1, 1024); + +/// Sets the live concurrency factor. The first return value is the previous +/// width which was replaced. The second return value is the value which was set +/// after any applied limits. +pub fn set_width(width: usize) -> (usize, usize) { + use std::sync::atomic::Ordering; + + let width = width.clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1); + (WIDTH.swap(width, Ordering::Relaxed), width) +} + +/// Used by stream operations where the concurrency factor hasn't been manually +/// supplied by the caller (most uses). Instead we provide a default value which +/// is adjusted at startup for the specific system and also dynamically. +#[inline] +pub fn automatic_width() -> usize { + use std::sync::atomic::Ordering; + + let width = WIDTH.load(Ordering::Relaxed); + debug_assert!(width >= WIDTH_LIMIT.0, "WIDTH should not be zero"); + debug_assert!(width <= WIDTH_LIMIT.1, "WIDTH is probably too large"); + width +} diff --git a/src/core/utils/stream/try_broadband.rs b/src/core/utils/stream/try_broadband.rs index d1213174..c72fcc2c 100644 --- a/src/core/utils/stream/try_broadband.rs +++ b/src/core/utils/stream/try_broadband.rs @@ -2,10 +2,9 @@ use futures::{TryFuture, TryStream, TryStreamExt}; +use super::automatic_width; use crate::Result; -const WIDTH: usize = 32; - /// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators /// produce out-of-order pub trait TryBroadbandExt @@ -49,6 +48,6 @@ where Fut: TryFuture> + Send, { self.map_ok(f) - .try_buffer_unordered(n.into().unwrap_or(WIDTH)) + .try_buffer_unordered(n.into().unwrap_or_else(automatic_width)) } } diff --git a/src/core/utils/stream/wideband.rs b/src/core/utils/stream/wideband.rs index 053a351f..a8560bb4 100644 --- a/src/core/utils/stream/wideband.rs +++ b/src/core/utils/stream/wideband.rs @@ -7,9 +7,7 @@ use futures::{ Future, }; -use super::ReadyExt; - -const WIDTH: usize = 32; +use super::{automatic_width, ReadyExt}; /// Concurrency extensions to augment futures::StreamExt. wideband_ combinators /// produce in-order. @@ -66,7 +64,7 @@ where U: Send, { self.map(f) - .buffered(n.into().unwrap_or(WIDTH)) + .buffered(n.into().unwrap_or_else(automatic_width)) .ready_filter_map(identity) } @@ -78,6 +76,7 @@ where Fut: Future + Send, U: Send, { - self.map(f).buffered(n.into().unwrap_or(WIDTH)) + self.map(f) + .buffered(n.into().unwrap_or_else(automatic_width)) } } diff --git a/src/database/map/get_batch.rs b/src/database/map/get_batch.rs index 49cd5920..631692fe 100644 --- a/src/database/map/get_batch.rs +++ b/src/database/map/get_batch.rs @@ -1,6 +1,10 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; -use conduwuit::{err, implement, utils::IterStream, Result}; +use conduwuit::{ + err, implement, + utils::{stream::automatic_width, IterStream}, + Result, +}; use futures::{Stream, StreamExt}; use serde::Serialize; @@ -18,7 +22,7 @@ where { keys.stream() .map(move |key| self.aqry::(&key)) - .buffered(self.db.server.config.db_pool_workers.saturating_mul(2)) + .buffered(automatic_width()) } #[implement(super::Map)] @@ -33,7 +37,7 @@ where { keys.stream() .map(move |key| self.get(key)) - .buffered(self.db.server.config.db_pool_workers.saturating_mul(2)) + .buffered(automatic_width()) } #[implement(super::Map)] diff --git a/src/database/pool/configure.rs b/src/database/pool/configure.rs index 8353a265..9361a534 100644 --- a/src/database/pool/configure.rs +++ b/src/database/pool/configure.rs @@ -1,8 +1,11 @@ use std::{ffi::OsStr, sync::Arc}; use conduwuit::{ - debug_info, + debug, debug_info, expected, utils::{ + math::usize_from_f64, + stream, + stream::WIDTH_LIMIT, sys::{compute::is_core_available, storage}, BoolExt, }, @@ -91,6 +94,13 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) .sum::() .clamp(WORKER_LIMIT.0, max_workers); + // After computing all of the above we can update the global automatic stream + // width, hopefully with a better value tailored to this system. + if config.stream_width_scale > 0.0 { + let num_queues = queue_sizes.len(); + update_stream_width(server, num_queues, total_workers); + } + debug_info!( device_name = ?device_name .as_deref() @@ -99,8 +109,30 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) ?worker_counts, ?queue_sizes, ?total_workers, + stream_width = ?stream::automatic_width(), "Frontend topology", ); (total_workers, queue_sizes, topology) } + +#[allow(clippy::as_conversions, clippy::cast_precision_loss)] +fn update_stream_width(server: &Arc, num_queues: usize, total_workers: usize) { + let config = &server.config; + let scale: f64 = config.stream_width_scale.min(100.0).into(); + let req_width = expected!(total_workers / num_queues).next_multiple_of(2); + let req_width = req_width as f64; + let req_width = usize_from_f64(req_width * scale) + .expect("failed to convert f64 to usize") + .clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1); + + let (old_width, new_width) = stream::set_width(req_width); + debug!( + scale = ?config.stream_width_scale, + ?num_queues, + ?req_width, + ?old_width, + ?new_width, + "Updated global stream width" + ); +} diff --git a/src/main/server.rs b/src/main/server.rs index bc2cff85..e1389f6d 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -1,6 +1,12 @@ use std::sync::Arc; -use conduwuit::{config::Config, info, log::Log, utils::sys, Error, Result}; +use conduwuit::{ + config::Config, + info, + log::Log, + utils::{stream, sys}, + Error, Result, +}; use tokio::{runtime, sync::Mutex}; use crate::{clap::Args, logging::TracingFlameGuard}; @@ -45,6 +51,8 @@ impl Server { sys::maximize_fd_limit() .expect("Unable to increase maximum soft and hard file descriptor limit"); + let (_old_width, _new_width) = stream::set_width(config.stream_width_default); + info!( server_name = %config.server_name, database_path = ?config.database_path,