use tokio for threadpool mgmt

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-30 03:16:57 +00:00
parent 89a158ab0b
commit 9a9c071e82
4 changed files with 155 additions and 91 deletions

View file

@ -2,7 +2,7 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduit::{err, implement, utils::IterStream, Err, Result};
use futures::{future, Future, FutureExt, Stream};
use futures::{future, Future, FutureExt, Stream, StreamExt};
use rocksdb::DBPinnableSlice;
use serde::Serialize;
@ -54,6 +54,18 @@ where
self.get(key)
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")]
pub fn get_batch<'a, I, K>(self: &'a Arc<Self>, keys: I) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
where
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send + 'a,
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
{
keys.stream()
.map(move |key| self.get(key))
.buffered(self.db.server.config.db_pool_workers.saturating_mul(2))
}
/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is referenced directly to perform the query.
#[implement(super::Map)]
@ -80,17 +92,8 @@ where
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")]
pub fn get_batch<'a, I, K>(&self, keys: I) -> impl Stream<Item = Result<Handle<'_>>>
where
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send,
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
{
self.get_batch_blocking(keys).stream()
}
#[implement(super::Map)]
pub fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator<Item = Result<Handle<'_>>>
#[tracing::instrument(skip(self, keys), name = "batch_blocking", level = "trace")]
pub(crate) fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator<Item = Result<Handle<'_>>> + Send
where
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send,
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
@ -111,6 +114,7 @@ where
/// The key is referenced directly to perform the query. This is a thread-
/// blocking call.
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), name = "blocking", level = "trace")]
pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
where
K: AsRef<[u8]> + ?Sized,
@ -125,7 +129,7 @@ where
/// Fetch a value from the cache without I/O.
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
#[tracing::instrument(skip(self, key), name = "cache", level = "trace")]
pub(crate) fn get_cached<K>(&self, key: &K) -> Result<Option<Handle<'_>>>
where
K: AsRef<[u8]> + Debug + ?Sized,