diff --git a/src/database/map.rs b/src/database/map.rs index 4b55fa54..a15d5e9d 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -1,6 +1,7 @@ mod contains; mod count; mod get; +mod get_batch; mod insert; mod keys; mod keys_from; diff --git a/src/database/map/get.rs b/src/database/map/get.rs index ef552177..04f5d0ae 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -1,13 +1,8 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc}; use arrayvec::ArrayVec; -use conduit::{ - err, implement, - utils::{result::MapExpect, IterStream}, - Err, Result, -}; -use futures::{future, Future, FutureExt, Stream, StreamExt}; -use rocksdb::DBPinnableSlice; +use conduit::{err, implement, utils::result::MapExpect, Err, Result}; +use futures::{future, Future, FutureExt}; use serde::Serialize; use crate::{ @@ -17,8 +12,6 @@ use crate::{ Handle, }; -type RocksdbResult<'a> = Result>, rocksdb::Error>; - /// Fetch a value from the database into cache, returning a reference-handle /// asynchronously. The key is serialized into an allocated buffer to perform /// the query. @@ -58,18 +51,6 @@ where self.get(key) } -#[implement(super::Map)] -#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")] -pub fn get_batch<'a, I, K>(self: &'a Arc, keys: I) -> impl Stream>> + Send + 'a -where - I: Iterator + ExactSizeIterator + Debug + Send + 'a, - K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a, -{ - keys.stream() - .map(move |key| self.get(key)) - .buffered(self.db.server.config.db_pool_workers.saturating_mul(2)) -} - /// Fetch a value from the database into cache, returning a reference-handle /// asynchronously. The key is referenced directly to perform the query. #[implement(super::Map)] @@ -95,25 +76,6 @@ where self.db.pool.execute_get(cmd).boxed() } -#[implement(super::Map)] -#[tracing::instrument(skip(self, keys), name = "batch_blocking", level = "trace")] -pub(crate) fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator>> + Send -where - I: Iterator + ExactSizeIterator + Debug + Send, - K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a, -{ - // Optimization can be `true` if key vector is pre-sorted **by the column - // comparator**. - const SORTED: bool = false; - - let read_options = &self.read_options; - self.db - .db - .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options) - .into_iter() - .map(into_result_handle) -} - /// Fetch a value from the database into cache, returning a reference-handle. /// The key is referenced directly to perform the query. This is a thread- /// blocking call. @@ -123,12 +85,12 @@ pub fn get_blocking(&self, key: &K) -> Result> where K: AsRef<[u8]> + ?Sized, { - let res = self + self.db .db - .db - .get_pinned_cf_opt(&self.cf(), key, &self.read_options); - - into_result_handle(res) + .get_pinned_cf_opt(&self.cf(), key, &self.read_options) + .map_err(map_err)? + .map(Handle::from) + .ok_or(err!(Request(NotFound("Not found in database")))) } /// Fetch a value from the cache without I/O. @@ -157,10 +119,3 @@ where Err(e) => or_else(e), } } - -fn into_result_handle(result: RocksdbResult<'_>) -> Result> { - result - .map_err(map_err)? - .map(Handle::from) - .ok_or(err!(Request(NotFound("Not found in database")))) -} diff --git a/src/database/map/get_batch.rs b/src/database/map/get_batch.rs new file mode 100644 index 00000000..0f1fdea7 --- /dev/null +++ b/src/database/map/get_batch.rs @@ -0,0 +1,57 @@ +use std::{convert::AsRef, fmt::Debug, sync::Arc}; + +use conduit::{err, implement, utils::IterStream, Result}; +use futures::{Stream, StreamExt}; +use serde::Serialize; + +use crate::{util::map_err, Handle}; + +#[implement(super::Map)] +#[tracing::instrument(skip(self, keys), level = "trace")] +pub fn aqry_batch<'b, 'a: 'b, const MAX: usize, I, K>( + self: &'a Arc, keys: I, +) -> impl Stream>> + Send + 'a +where + I: Iterator + Send + 'a, + K: Serialize + ?Sized + Debug + 'b, +{ + keys.stream() + .map(move |key| self.aqry::(&key)) + .buffered(self.db.server.config.db_pool_workers.saturating_mul(2)) +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, keys), level = "trace")] +pub fn get_batch<'a, I, K>(self: &'a Arc, keys: I) -> impl Stream>> + Send + 'a +where + I: Iterator + Debug + Send + 'a, + K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a, +{ + keys.stream() + .map(move |key| self.get(key)) + .buffered(self.db.server.config.db_pool_workers.saturating_mul(2)) +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, keys), name = "batch_blocking", level = "trace")] +pub(crate) fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator>> + Send +where + I: Iterator + ExactSizeIterator + Debug + Send, + K: AsRef<[u8]> + Debug + Send + ?Sized + Sync + 'a, +{ + // Optimization can be `true` if key vector is pre-sorted **by the column + // comparator**. + const SORTED: bool = false; + + let read_options = &self.read_options; + self.db + .db + .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options) + .into_iter() + .map(|result| { + result + .map_err(map_err)? + .map(Handle::from) + .ok_or(err!(Request(NotFound("Not found in database")))) + }) +}