From 2259e2c82f890415b84e5999242c3ee102973d23 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 1 Jan 2025 06:08:20 +0000 Subject: [PATCH] batch queries to maximize throughput query-side streams for first level of callsites Signed-off-by: Jason Volk --- conduwuit-example.toml | 13 ++++ src/core/config/mod.rs | 17 ++++++ src/core/utils/stream/band.rs | 26 +++++++- src/core/utils/stream/mod.rs | 5 +- src/database/map/get.rs | 10 +++- src/database/map/get_batch.rs | 66 +++++++++++++------- src/database/pool.rs | 76 ++++++++++++++++++------ src/database/pool/configure.rs | 11 +++- src/main/server.rs | 1 + src/service/rooms/auth_chain/mod.rs | 2 +- src/service/rooms/short/mod.rs | 12 ++-- src/service/rooms/state/mod.rs | 2 +- src/service/rooms/state_accessor/data.rs | 6 +- 13 files changed, 191 insertions(+), 56 deletions(-) diff --git a/conduwuit-example.toml b/conduwuit-example.toml index c64b18e8..526e9fe2 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1457,6 +1457,19 @@ # #stream_width_scale = 1.0 +# Sets the initial amplification factor. This controls batch sizes of +# requests made by each pool worker, multiplying the throughput of each +# stream. This value is somewhat abstract from specific hardware +# characteristics and can be significantly larger than any thread count or +# queue size. This is because each database query may require several +# index lookups, thus many database queries in a batch may make progress +# independently while also sharing index and data blocks which may or may +# not be cached. It is worthwhile to submit huge batches to reduce +# complexity. The maximum value is 32768, though sufficient hardware is +# still advised for that. +# +#stream_amplification = 1024 + # Number of sender task workers; determines sender parallelism. Default is # '0' which means the value is determined internally, likely matching the # number of tokio worker-threads or number of cores, etc. Override by diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index e1f578c8..b1ede844 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1653,6 +1653,21 @@ pub struct Config { #[serde(default = "default_stream_width_scale")] pub stream_width_scale: f32, + /// Sets the initial amplification factor. This controls batch sizes of + /// requests made by each pool worker, multiplying the throughput of each + /// stream. This value is somewhat abstract from specific hardware + /// characteristics and can be significantly larger than any thread count or + /// queue size. This is because each database query may require several + /// index lookups, thus many database queries in a batch may make progress + /// independently while also sharing index and data blocks which may or may + /// not be cached. It is worthwhile to submit huge batches to reduce + /// complexity. The maximum value is 32768, though sufficient hardware is + /// still advised for that. + /// + /// default: 1024 + #[serde(default = "default_stream_amplification")] + pub stream_amplification: usize, + /// Number of sender task workers; determines sender parallelism. Default is /// '0' which means the value is determined internally, likely matching the /// number of tokio worker-threads or number of cores, etc. Override by @@ -2467,3 +2482,5 @@ fn default_db_pool_queue_mult() -> usize { 4 } fn default_stream_width_default() -> usize { 32 } fn default_stream_width_scale() -> f32 { 1.0 } + +fn default_stream_amplification() -> usize { 1024 } diff --git a/src/core/utils/stream/band.rs b/src/core/utils/stream/band.rs index 76f2a85a..45ad7d94 100644 --- a/src/core/utils/stream/band.rs +++ b/src/core/utils/stream/band.rs @@ -3,9 +3,15 @@ use std::sync::atomic::{AtomicUsize, Ordering}; /// Stream concurrency factor; this is a live value. static WIDTH: AtomicUsize = AtomicUsize::new(32); -/// Practicable limits on the stream width +/// Stream throughput amplifier; this is a live value. +static AMPLIFICATION: AtomicUsize = AtomicUsize::new(1024); + +/// Practicable limits on the stream width. pub const WIDTH_LIMIT: (usize, usize) = (1, 1024); +/// Practicable limits on the stream amplifier. +pub const AMPLIFICATION_LIMIT: (usize, usize) = (32, 32768); + /// Sets the live concurrency factor. The first return value is the previous /// width which was replaced. The second return value is the value which was set /// after any applied limits. @@ -14,6 +20,14 @@ pub fn set_width(width: usize) -> (usize, usize) { (WIDTH.swap(width, Ordering::Relaxed), width) } +/// Sets the live concurrency amplification. The first return value is the +/// previous width which was replaced. The second return value is the value +/// which was set after any applied limits. +pub fn set_amplification(width: usize) -> (usize, usize) { + let width = width.clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1); + (AMPLIFICATION.swap(width, Ordering::Relaxed), width) +} + /// Used by stream operations where the concurrency factor hasn't been manually /// supplied by the caller (most uses). Instead we provide a default value which /// is adjusted at startup for the specific system and also dynamically. @@ -24,3 +38,13 @@ pub fn automatic_width() -> usize { debug_assert!(width <= WIDTH_LIMIT.1, "WIDTH is probably too large"); width } + +/// Used by stream operations where the amplification hasn't been manually +/// supplied by the caller. Instead we provide a computed value. +#[inline] +pub fn automatic_amplification() -> usize { + let amplification = AMPLIFICATION.load(Ordering::Relaxed); + debug_assert!(amplification >= AMPLIFICATION_LIMIT.0, "amplification is too low"); + debug_assert!(amplification <= AMPLIFICATION_LIMIT.1, "amplification is too high"); + amplification +} diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index a5ef17c5..61ae993d 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -10,7 +10,10 @@ mod try_broadband; mod try_ready; mod wideband; -pub use band::{automatic_width, set_width, WIDTH_LIMIT}; +pub use band::{ + automatic_amplification, automatic_width, set_amplification, set_width, AMPLIFICATION_LIMIT, + WIDTH_LIMIT, +}; pub use broadband::BroadbandExt; pub use cloned::Cloned; pub use expect::TryExpect; diff --git a/src/database/map/get.rs b/src/database/map/get.rs index 94a6b727..e64ef2ec 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -2,7 +2,7 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc}; use arrayvec::ArrayVec; use conduwuit::{err, implement, utils::result::MapExpect, Err, Result}; -use futures::{Future, FutureExt}; +use futures::{future::ready, Future, FutureExt, TryFutureExt}; use serde::Serialize; use tokio::task; @@ -79,11 +79,15 @@ where debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete"); let cmd = Get { map: self.clone(), - key: key.as_ref().into(), + key: [key.as_ref().into()].into(), res: None, }; - self.db.pool.execute_get(cmd).boxed() + self.db + .pool + .execute_get(cmd) + .and_then(|mut res| ready(res.remove(0))) + .boxed() } /// Fetch a value from the database into cache, returning a reference-handle. diff --git a/src/database/map/get_batch.rs b/src/database/map/get_batch.rs index 631692fe..452697f1 100644 --- a/src/database/map/get_batch.rs +++ b/src/database/map/get_batch.rs @@ -2,42 +2,68 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{ err, implement, - utils::{stream::automatic_width, IterStream}, + utils::{ + stream::{automatic_amplification, automatic_width, WidebandExt}, + IterStream, + }, Result, }; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use serde::Serialize; -use crate::{util::map_err, Handle}; +use crate::{keyval::KeyBuf, ser, util::map_err, Handle}; #[implement(super::Map)] #[tracing::instrument(skip(self, keys), level = "trace")] -pub fn aqry_batch<'b, 'a: 'b, const MAX: usize, I, K>( +pub fn qry_batch<'a, S, K>( self: &'a Arc, - keys: I, -) -> impl Stream>> + Send + 'a + keys: S, +) -> impl Stream>> + Send + 'a where - I: Iterator + Send + 'a, - K: Serialize + ?Sized + Debug + 'b, + S: Stream + Send + 'a, + K: Serialize + Debug + 'a, { - keys.stream() - .map(move |key| self.aqry::(&key)) - .buffered(automatic_width()) + use crate::pool::Get; + + keys.ready_chunks(automatic_amplification()) + .widen_then(automatic_width(), |chunk| { + let keys = chunk + .iter() + .map(ser::serialize_to::) + .map(|result| result.expect("failed to serialize query key")) + .map(Into::into) + .collect(); + + self.db + .pool + .execute_get(Get { map: self.clone(), key: keys, res: None }) + }) + .map_ok(|results| results.into_iter().stream()) + .try_flatten() } #[implement(super::Map)] #[tracing::instrument(skip(self, keys), level = "trace")] -pub fn get_batch<'a, I, K>( +pub fn get_batch<'a, S, K>( self: &'a Arc, - keys: I, + keys: S, ) -> impl Stream>> + Send + 'a where - I: Iterator + Debug + Send + 'a, - K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a, + S: Stream + Send + 'a, + K: AsRef<[u8]> + Send + Sync + 'a, { - keys.stream() - .map(move |key| self.get(key)) - .buffered(automatic_width()) + use crate::pool::Get; + + keys.ready_chunks(automatic_amplification()) + .widen_then(automatic_width(), |chunk| { + self.db.pool.execute_get(Get { + map: self.clone(), + key: chunk.iter().map(AsRef::as_ref).map(Into::into).collect(), + res: None, + }) + }) + .map_ok(|results| results.into_iter().stream()) + .try_flatten() } #[implement(super::Map)] @@ -47,8 +73,8 @@ pub(crate) fn get_batch_blocking<'a, I, K>( keys: I, ) -> impl Iterator>> + Send where - I: Iterator + ExactSizeIterator + Debug + Send, - K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a, + I: Iterator + ExactSizeIterator + Send, + K: AsRef<[u8]> + Send + ?Sized + Sync + 'a, { // Optimization can be `true` if key vector is pre-sorted **by the column // comparator**. diff --git a/src/database/pool.rs b/src/database/pool.rs index 8182f217..bcf20de8 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -19,6 +19,7 @@ use conduwuit::{ use futures::{channel::oneshot, TryFutureExt}; use oneshot::Sender as ResultSender; use rocksdb::Direction; +use smallvec::SmallVec; use tokio::task::JoinSet; use self::configure::configure; @@ -42,11 +43,11 @@ pub(crate) enum Cmd { Iter(Seek), } -/// Point-query +/// Multi-point-query pub(crate) struct Get { pub(crate) map: Arc, - pub(crate) key: KeyBuf, - pub(crate) res: Option>>>, + pub(crate) key: BatchQuery<'static>, + pub(crate) res: Option>>, } /// Iterator-seek. @@ -60,8 +61,13 @@ pub(crate) struct Seek { pub(crate) res: Option>>, } +pub(crate) type BatchQuery<'a> = SmallVec<[KeyBuf; BATCH_INLINE]>; +pub(crate) type BatchResult<'a> = SmallVec<[ResultHandle<'a>; BATCH_INLINE]>; +pub(crate) type ResultHandle<'a> = Result>; + const WORKER_LIMIT: (usize, usize) = (1, 1024); const QUEUE_LIMIT: (usize, usize) = (1, 2048); +const BATCH_INLINE: usize = 1; #[implement(Pool)] pub(crate) async fn new(server: &Arc) -> Result> { @@ -179,22 +185,24 @@ fn spawn_one(self: &Arc, workers: &mut JoinSet<()>, recv: &[Receiver] #[implement(Pool)] #[tracing::instrument(level = "trace", name = "get", skip(self, cmd))] -pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result> { +pub(crate) async fn execute_get(self: &Arc, mut cmd: Get) -> Result> { let (send, recv) = oneshot::channel(); _ = cmd.res.insert(send); let queue = self.select_queue(); self.execute(queue, Cmd::Get(cmd)) - .and_then(|()| { - recv.map_ok(into_recv_get_result) + .and_then(move |()| { + recv.map_ok(into_recv_get) .map_err(|e| err!(error!("recv failed {e:?}"))) }) - .await? + .await + .map(Into::into) + .map_err(Into::into) } #[implement(Pool)] #[tracing::instrument(level = "trace", name = "iter", skip(self, cmd))] -pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result> { +pub(crate) async fn execute_iter(self: &Arc, mut cmd: Seek) -> Result> { let (send, recv) = oneshot::channel(); _ = cmd.res.insert(send); @@ -282,7 +290,7 @@ fn worker_init(&self, id: usize) { } #[implement(Pool)] -fn worker_loop(&self, recv: &Receiver) { +fn worker_loop(self: &Arc, recv: &Receiver) { // initial +1 needed prior to entering wait self.busy.fetch_add(1, Ordering::Relaxed); @@ -302,18 +310,19 @@ fn worker_loop(&self, recv: &Receiver) { busy = self.busy.fetch_sub(1, Ordering::Relaxed) - 1, ), )] -fn worker_wait(&self, recv: &Receiver) -> Result { +fn worker_wait(self: &Arc, recv: &Receiver) -> Result { recv.recv_blocking().debug_inspect(|_| { self.busy.fetch_add(1, Ordering::Relaxed); }) } #[implement(Pool)] -fn worker_handle(&self, cmd: Cmd) { +fn worker_handle(self: &Arc, cmd: Cmd) { match cmd { - | Cmd::Get(cmd) => self.handle_get(cmd), + | Cmd::Get(cmd) if cmd.key.len() == 1 => self.handle_get(cmd), + | Cmd::Get(cmd) => self.handle_batch(cmd), | Cmd::Iter(cmd) => self.handle_iter(cmd), - } + }; } #[implement(Pool)] @@ -331,12 +340,43 @@ fn handle_iter(&self, mut cmd: Seek) { } let from = cmd.key.as_deref().map(Into::into); + let result = match cmd.dir { | Direction::Forward => cmd.state.init_fwd(from), | Direction::Reverse => cmd.state.init_rev(from), }; let chan_result = chan.send(into_send_seek(result)); + + let _chan_sent = chan_result.is_ok(); +} + +#[implement(Pool)] +#[tracing::instrument( + name = "batch", + level = "trace", + skip_all, + fields( + %cmd.map, + keys = %cmd.key.len(), + ), +)] +fn handle_batch(self: &Arc, mut cmd: Get) { + debug_assert!(cmd.key.len() > 1, "should have more than one key"); + debug_assert!(!cmd.key.iter().any(SmallVec::is_empty), "querying for empty key"); + + let chan = cmd.res.take().expect("missing result channel"); + + if chan.is_canceled() { + return; + } + + let keys = cmd.key.iter().map(Into::into); + + let result: SmallVec<_> = cmd.map.get_batch_blocking(keys).collect(); + + let chan_result = chan.send(into_send_get(result)); + let _chan_sent = chan_result.is_ok(); } @@ -348,7 +388,7 @@ fn handle_iter(&self, mut cmd: Seek) { fields(%cmd.map), )] fn handle_get(&self, mut cmd: Get) { - debug_assert!(!cmd.key.is_empty(), "querying for empty key"); + debug_assert!(!cmd.key[0].is_empty(), "querying for empty key"); // Obtain the result channel. let chan = cmd.res.take().expect("missing result channel"); @@ -362,16 +402,16 @@ fn handle_get(&self, mut cmd: Get) { // Perform the actual database query. We reuse our database::Map interface but // limited to the blocking calls, rather than creating another surface directly // with rocksdb here. - let result = cmd.map.get_blocking(&cmd.key); + let result = cmd.map.get_blocking(&cmd.key[0]); // Send the result back to the submitter. - let chan_result = chan.send(into_send_get_result(result)); + let chan_result = chan.send(into_send_get([result].into())); // If the future was dropped during the query this will fail acceptably. let _chan_sent = chan_result.is_ok(); } -fn into_send_get_result(result: Result>) -> Result> { +fn into_send_get(result: BatchResult<'_>) -> BatchResult<'static> { // SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through // the channel. The lifetime on the handle is a device by rust-rocksdb to // associate a database lifetime with its assets. The Handle must be dropped @@ -379,7 +419,7 @@ fn into_send_get_result(result: Result>) -> Result> { unsafe { std::mem::transmute(result) } } -fn into_recv_get_result(result: Result>) -> Result> { +fn into_recv_get<'a>(result: BatchResult<'static>) -> BatchResult<'a> { // SAFETY: This is to receive the Handle from the channel. unsafe { std::mem::transmute(result) } } diff --git a/src/database/pool/configure.rs b/src/database/pool/configure.rs index 2a192a9c..6cac58e7 100644 --- a/src/database/pool/configure.rs +++ b/src/database/pool/configure.rs @@ -6,7 +6,7 @@ use conduwuit::{ math::usize_from_f64, result::LogDebugErr, stream, - stream::WIDTH_LIMIT, + stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT}, sys::{compute::is_core_available, storage}, BoolExt, }, @@ -124,19 +124,28 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) fn update_stream_width(server: &Arc, num_queues: usize, total_workers: usize) { let config = &server.config; let scale: f64 = config.stream_width_scale.min(100.0).into(); + let req_width = expected!(total_workers / num_queues).next_multiple_of(2); let req_width = req_width as f64; let req_width = usize_from_f64(req_width * scale) .expect("failed to convert f64 to usize") .clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1); + let req_amp = config.stream_amplification as f64; + let req_amp = usize_from_f64(req_amp * scale) + .expect("failed to convert f64 to usize") + .clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1); + let (old_width, new_width) = stream::set_width(req_width); + let (old_amp, new_amp) = stream::set_amplification(req_amp); debug!( scale = ?config.stream_width_scale, ?num_queues, ?req_width, ?old_width, ?new_width, + ?old_amp, + ?new_amp, "Updated global stream width" ); } diff --git a/src/main/server.rs b/src/main/server.rs index e1389f6d..a81b708d 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -52,6 +52,7 @@ impl Server { .expect("Unable to increase maximum soft and hard file descriptor limit"); let (_old_width, _new_width) = stream::set_width(config.stream_width_default); + let (_old_amp, _new_amp) = stream::set_amplification(config.stream_amplification); info!( server_name = %config.server_name, diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 67883d01..f6534825 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -71,7 +71,7 @@ impl Service { let event_ids = self .services .short - .multi_get_eventid_from_short(chain.iter()) + .multi_get_eventid_from_short(chain.into_iter().stream()) .ready_filter_map(Result::ok) .collect() .await; diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index 00c1d16c..b645f9f1 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -69,7 +69,7 @@ where { self.db .eventid_shorteventid - .get_batch(event_ids.clone()) + .get_batch(event_ids.clone().stream()) .zip(event_ids.into_iter().stream()) .map(|(result, event_id)| match result { | Ok(ref short) => utils::u64_from_u8(short), @@ -162,20 +162,18 @@ where } #[implement(Service)] -pub fn multi_get_eventid_from_short<'a, Id, I>( +pub fn multi_get_eventid_from_short<'a, Id, S>( &'a self, - shorteventid: I, + shorteventid: S, ) -> impl Stream> + Send + 'a where - I: Iterator + Send + 'a, + S: Stream + Send + 'a, Id: for<'de> Deserialize<'de> + Sized + ToOwned + 'a, ::Owned: Borrow, { - const BUFSIZE: usize = size_of::(); - self.db .shorteventid_eventid - .aqry_batch::(shorteventid) + .qry_batch(shorteventid) .map(Deserialized::deserialized) } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 2769beb8..fd303667 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -467,7 +467,7 @@ impl Service { let auth_pdus = self .services .short - .multi_get_eventid_from_short(event_ids.iter()) + .multi_get_eventid_from_short(event_ids.into_iter().stream()) .zip(state_keys.into_iter().stream()) .ready_filter_map(|(event_id, tsk)| Some((tsk, event_id.ok()?))) .broad_filter_map(|(tsk, event_id): (_, OwnedEventId)| async move { diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index d60e505e..29b27a05 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use conduwuit::{ - at, err, ref_at, + at, err, utils::stream::{BroadbandExt, IterStream, ReadyExt}, PduEvent, Result, }; @@ -69,7 +69,7 @@ impl Data { let full_pdus = self .services .short - .multi_get_eventid_from_short(short_ids.iter().map(ref_at!(1))) + .multi_get_eventid_from_short(short_ids.into_iter().map(at!(1)).stream()) .ready_filter_map(Result::ok) .broad_filter_map(|event_id: OwnedEventId| async move { self.services.timeline.get_pdu(&event_id).await.ok() @@ -93,7 +93,7 @@ impl Data { let full_ids = self .services .short - .multi_get_eventid_from_short(short_ids.iter().map(ref_at!(1))) + .multi_get_eventid_from_short(short_ids.iter().map(at!(1)).stream()) .zip(short_ids.iter().stream().map(at!(0))) .ready_filter_map(|(event_id, shortstatekey)| Some((shortstatekey, event_id.ok()?))) .collect()