batch queries to maximize throughput

query-side streams for first level of callsites

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-01 06:08:20 +00:00 committed by strawberry
parent 1792711d09
commit 2259e2c82f
13 changed files with 191 additions and 56 deletions

View file

@ -2,7 +2,7 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduwuit::{err, implement, utils::result::MapExpect, Err, Result};
use futures::{Future, FutureExt};
use futures::{future::ready, Future, FutureExt, TryFutureExt};
use serde::Serialize;
use tokio::task;
@ -79,11 +79,15 @@ where
debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
let cmd = Get {
map: self.clone(),
key: key.as_ref().into(),
key: [key.as_ref().into()].into(),
res: None,
};
self.db.pool.execute_get(cmd).boxed()
self.db
.pool
.execute_get(cmd)
.and_then(|mut res| ready(res.remove(0)))
.boxed()
}
/// Fetch a value from the database into cache, returning a reference-handle.

View file

@ -2,42 +2,68 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduwuit::{
err, implement,
utils::{stream::automatic_width, IterStream},
utils::{
stream::{automatic_amplification, automatic_width, WidebandExt},
IterStream,
},
Result,
};
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use serde::Serialize;
use crate::{util::map_err, Handle};
use crate::{keyval::KeyBuf, ser, util::map_err, Handle};
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
pub fn aqry_batch<'b, 'a: 'b, const MAX: usize, I, K>(
pub fn qry_batch<'a, S, K>(
self: &'a Arc<Self>,
keys: I,
) -> impl Stream<Item = Result<Handle<'b>>> + Send + 'a
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
where
I: Iterator<Item = &'b K> + Send + 'a,
K: Serialize + ?Sized + Debug + 'b,
S: Stream<Item = K> + Send + 'a,
K: Serialize + Debug + 'a,
{
keys.stream()
.map(move |key| self.aqry::<MAX, _>(&key))
.buffered(automatic_width())
use crate::pool::Get;
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
let keys = chunk
.iter()
.map(ser::serialize_to::<KeyBuf, _>)
.map(|result| result.expect("failed to serialize query key"))
.map(Into::into)
.collect();
self.db
.pool
.execute_get(Get { map: self.clone(), key: keys, res: None })
})
.map_ok(|results| results.into_iter().stream())
.try_flatten()
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
pub fn get_batch<'a, I, K>(
pub fn get_batch<'a, S, K>(
self: &'a Arc<Self>,
keys: I,
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
where
I: Iterator<Item = &'a K> + Debug + Send + 'a,
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
S: Stream<Item = K> + Send + 'a,
K: AsRef<[u8]> + Send + Sync + 'a,
{
keys.stream()
.map(move |key| self.get(key))
.buffered(automatic_width())
use crate::pool::Get;
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
self.db.pool.execute_get(Get {
map: self.clone(),
key: chunk.iter().map(AsRef::as_ref).map(Into::into).collect(),
res: None,
})
})
.map_ok(|results| results.into_iter().stream())
.try_flatten()
}
#[implement(super::Map)]
@ -47,8 +73,8 @@ pub(crate) fn get_batch_blocking<'a, I, K>(
keys: I,
) -> impl Iterator<Item = Result<Handle<'_>>> + Send
where
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send,
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
{
// Optimization can be `true` if key vector is pre-sorted **by the column
// comparator**.