split get_batch from get.rs; add aqry_batch
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
c01b049910
commit
48703173bc
3 changed files with 65 additions and 52 deletions
|
@ -1,6 +1,7 @@
|
||||||
mod contains;
|
mod contains;
|
||||||
mod count;
|
mod count;
|
||||||
mod get;
|
mod get;
|
||||||
|
mod get_batch;
|
||||||
mod insert;
|
mod insert;
|
||||||
mod keys;
|
mod keys;
|
||||||
mod keys_from;
|
mod keys_from;
|
||||||
|
|
|
@ -1,13 +1,8 @@
|
||||||
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
|
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
|
||||||
|
|
||||||
use arrayvec::ArrayVec;
|
use arrayvec::ArrayVec;
|
||||||
use conduit::{
|
use conduit::{err, implement, utils::result::MapExpect, Err, Result};
|
||||||
err, implement,
|
use futures::{future, Future, FutureExt};
|
||||||
utils::{result::MapExpect, IterStream},
|
|
||||||
Err, Result,
|
|
||||||
};
|
|
||||||
use futures::{future, Future, FutureExt, Stream, StreamExt};
|
|
||||||
use rocksdb::DBPinnableSlice;
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -17,8 +12,6 @@ use crate::{
|
||||||
Handle,
|
Handle,
|
||||||
};
|
};
|
||||||
|
|
||||||
type RocksdbResult<'a> = Result<Option<DBPinnableSlice<'a>>, rocksdb::Error>;
|
|
||||||
|
|
||||||
/// Fetch a value from the database into cache, returning a reference-handle
|
/// Fetch a value from the database into cache, returning a reference-handle
|
||||||
/// asynchronously. The key is serialized into an allocated buffer to perform
|
/// asynchronously. The key is serialized into an allocated buffer to perform
|
||||||
/// the query.
|
/// the query.
|
||||||
|
@ -58,18 +51,6 @@ where
|
||||||
self.get(key)
|
self.get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(super::Map)]
|
|
||||||
#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")]
|
|
||||||
pub fn get_batch<'a, I, K>(self: &'a Arc<Self>, keys: I) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
|
|
||||||
where
|
|
||||||
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send + 'a,
|
|
||||||
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
|
|
||||||
{
|
|
||||||
keys.stream()
|
|
||||||
.map(move |key| self.get(key))
|
|
||||||
.buffered(self.db.server.config.db_pool_workers.saturating_mul(2))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetch a value from the database into cache, returning a reference-handle
|
/// Fetch a value from the database into cache, returning a reference-handle
|
||||||
/// asynchronously. The key is referenced directly to perform the query.
|
/// asynchronously. The key is referenced directly to perform the query.
|
||||||
#[implement(super::Map)]
|
#[implement(super::Map)]
|
||||||
|
@ -95,25 +76,6 @@ where
|
||||||
self.db.pool.execute_get(cmd).boxed()
|
self.db.pool.execute_get(cmd).boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(super::Map)]
|
|
||||||
#[tracing::instrument(skip(self, keys), name = "batch_blocking", level = "trace")]
|
|
||||||
pub(crate) fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator<Item = Result<Handle<'_>>> + Send
|
|
||||||
where
|
|
||||||
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send,
|
|
||||||
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
|
|
||||||
{
|
|
||||||
// Optimization can be `true` if key vector is pre-sorted **by the column
|
|
||||||
// comparator**.
|
|
||||||
const SORTED: bool = false;
|
|
||||||
|
|
||||||
let read_options = &self.read_options;
|
|
||||||
self.db
|
|
||||||
.db
|
|
||||||
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
|
|
||||||
.into_iter()
|
|
||||||
.map(into_result_handle)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetch a value from the database into cache, returning a reference-handle.
|
/// Fetch a value from the database into cache, returning a reference-handle.
|
||||||
/// The key is referenced directly to perform the query. This is a thread-
|
/// The key is referenced directly to perform the query. This is a thread-
|
||||||
/// blocking call.
|
/// blocking call.
|
||||||
|
@ -123,12 +85,12 @@ pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
|
||||||
where
|
where
|
||||||
K: AsRef<[u8]> + ?Sized,
|
K: AsRef<[u8]> + ?Sized,
|
||||||
{
|
{
|
||||||
let res = self
|
self.db
|
||||||
.db
|
.db
|
||||||
.db
|
.get_pinned_cf_opt(&self.cf(), key, &self.read_options)
|
||||||
.get_pinned_cf_opt(&self.cf(), key, &self.read_options);
|
.map_err(map_err)?
|
||||||
|
.map(Handle::from)
|
||||||
into_result_handle(res)
|
.ok_or(err!(Request(NotFound("Not found in database"))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch a value from the cache without I/O.
|
/// Fetch a value from the cache without I/O.
|
||||||
|
@ -157,10 +119,3 @@ where
|
||||||
Err(e) => or_else(e),
|
Err(e) => or_else(e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn into_result_handle(result: RocksdbResult<'_>) -> Result<Handle<'_>> {
|
|
||||||
result
|
|
||||||
.map_err(map_err)?
|
|
||||||
.map(Handle::from)
|
|
||||||
.ok_or(err!(Request(NotFound("Not found in database"))))
|
|
||||||
}
|
|
||||||
|
|
57
src/database/map/get_batch.rs
Normal file
57
src/database/map/get_batch.rs
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
use std::{convert::AsRef, fmt::Debug, sync::Arc};
|
||||||
|
|
||||||
|
use conduit::{err, implement, utils::IterStream, Result};
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use crate::{util::map_err, Handle};
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(skip(self, keys), level = "trace")]
|
||||||
|
pub fn aqry_batch<'b, 'a: 'b, const MAX: usize, I, K>(
|
||||||
|
self: &'a Arc<Self>, keys: I,
|
||||||
|
) -> impl Stream<Item = Result<Handle<'b>>> + Send + 'a
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'b K> + Send + 'a,
|
||||||
|
K: Serialize + ?Sized + Debug + 'b,
|
||||||
|
{
|
||||||
|
keys.stream()
|
||||||
|
.map(move |key| self.aqry::<MAX, _>(&key))
|
||||||
|
.buffered(self.db.server.config.db_pool_workers.saturating_mul(2))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(skip(self, keys), level = "trace")]
|
||||||
|
pub fn get_batch<'a, I, K>(self: &'a Arc<Self>, keys: I) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'a K> + Debug + Send + 'a,
|
||||||
|
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
|
||||||
|
{
|
||||||
|
keys.stream()
|
||||||
|
.map(move |key| self.get(key))
|
||||||
|
.buffered(self.db.server.config.db_pool_workers.saturating_mul(2))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(skip(self, keys), name = "batch_blocking", level = "trace")]
|
||||||
|
pub(crate) fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator<Item = Result<Handle<'_>>> + Send
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'a K> + ExactSizeIterator + Debug + Send,
|
||||||
|
K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a,
|
||||||
|
{
|
||||||
|
// Optimization can be `true` if key vector is pre-sorted **by the column
|
||||||
|
// comparator**.
|
||||||
|
const SORTED: bool = false;
|
||||||
|
|
||||||
|
let read_options = &self.read_options;
|
||||||
|
self.db
|
||||||
|
.db
|
||||||
|
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
|
||||||
|
.into_iter()
|
||||||
|
.map(|result| {
|
||||||
|
result
|
||||||
|
.map_err(map_err)?
|
||||||
|
.map(Handle::from)
|
||||||
|
.ok_or(err!(Request(NotFound("Not found in database"))))
|
||||||
|
})
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue