use cache builder for row and table cache options
add cache check using multi-get path Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
fc1170e12a
commit
96e85adc32
5 changed files with 90 additions and 34 deletions
|
@ -178,7 +178,7 @@ fn get_cache(ctx: &Context, desc: &Descriptor) -> Option<Cache> {
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("u32 to i32 conversion");
|
.expect("u32 to i32 conversion");
|
||||||
|
|
||||||
debug_assert!(shard_bits <= 6, "cache shards limited to 64");
|
debug_assert!(shard_bits <= 10, "cache shards probably too large");
|
||||||
let mut cache_opts = LruCacheOptions::default();
|
let mut cache_opts = LruCacheOptions::default();
|
||||||
cache_opts.set_num_shard_bits(shard_bits);
|
cache_opts.set_num_shard_bits(shard_bits);
|
||||||
cache_opts.set_capacity(size);
|
cache_opts.set_capacity(size);
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{debug, utils::math::usize_from_f64, Result, Server};
|
use conduwuit::{debug, utils::math::usize_from_f64, Result, Server};
|
||||||
use rocksdb::{Cache, Env};
|
use rocksdb::{Cache, Env, LruCacheOptions};
|
||||||
|
|
||||||
use crate::{or_else, pool::Pool};
|
use crate::{or_else, pool::Pool};
|
||||||
|
|
||||||
|
@ -25,12 +25,21 @@ impl Context {
|
||||||
let config = &server.config;
|
let config = &server.config;
|
||||||
let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
|
let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
|
||||||
|
|
||||||
let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
|
let col_shard_bits = 7;
|
||||||
let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes);
|
|
||||||
|
|
||||||
let col_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
|
let col_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
|
||||||
let col_cache = Cache::new_lru_cache(col_cache_capacity_bytes);
|
|
||||||
|
|
||||||
|
let row_shard_bits = 7;
|
||||||
|
let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
|
||||||
|
|
||||||
|
let mut row_cache_opts = LruCacheOptions::default();
|
||||||
|
row_cache_opts.set_num_shard_bits(row_shard_bits);
|
||||||
|
row_cache_opts.set_capacity(row_cache_capacity_bytes);
|
||||||
|
let row_cache = Cache::new_lru_cache_opts(&row_cache_opts);
|
||||||
|
|
||||||
|
let mut col_cache_opts = LruCacheOptions::default();
|
||||||
|
col_cache_opts.set_num_shard_bits(col_shard_bits);
|
||||||
|
col_cache_opts.set_capacity(col_cache_capacity_bytes);
|
||||||
|
let col_cache = Cache::new_lru_cache_opts(&col_cache_opts);
|
||||||
let col_cache: BTreeMap<_, _> = [("Shared".to_owned(), col_cache)].into();
|
let col_cache: BTreeMap<_, _> = [("Shared".to_owned(), col_cache)].into();
|
||||||
|
|
||||||
let mut env = Env::new().or_else(or_else)?;
|
let mut env = Env::new().or_else(or_else)?;
|
||||||
|
|
|
@ -67,6 +67,7 @@ pub(crate) static BASE: Descriptor = Descriptor {
|
||||||
pub(crate) static RANDOM: Descriptor = Descriptor {
|
pub(crate) static RANDOM: Descriptor = Descriptor {
|
||||||
compaction_pri: CompactionPri::OldestSmallestSeqFirst,
|
compaction_pri: CompactionPri::OldestSmallestSeqFirst,
|
||||||
write_size: 1024 * 1024 * 32,
|
write_size: 1024 * 1024 * 32,
|
||||||
|
cache_shards: 128,
|
||||||
..BASE
|
..BASE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -75,6 +76,7 @@ pub(crate) static SEQUENTIAL: Descriptor = Descriptor {
|
||||||
write_size: 1024 * 1024 * 64,
|
write_size: 1024 * 1024 * 64,
|
||||||
level_size: 1024 * 1024 * 32,
|
level_size: 1024 * 1024 * 32,
|
||||||
file_size: 1024 * 1024 * 2,
|
file_size: 1024 * 1024 * 2,
|
||||||
|
cache_shards: 128,
|
||||||
..BASE
|
..BASE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
|
||||||
use arrayvec::ArrayVec;
|
use arrayvec::ArrayVec;
|
||||||
use conduwuit::{err, implement, utils::result::MapExpect, Err, Result};
|
use conduwuit::{err, implement, utils::result::MapExpect, Err, Result};
|
||||||
use futures::{future::ready, Future, FutureExt, TryFutureExt};
|
use futures::{future::ready, Future, FutureExt, TryFutureExt};
|
||||||
|
use rocksdb::{DBPinnableSlice, ReadOptions};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
|
@ -90,6 +91,17 @@ where
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch a value from the cache without I/O.
|
||||||
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(skip(self, key), name = "cache", level = "trace")]
|
||||||
|
pub(crate) fn get_cached<K>(&self, key: &K) -> Result<Option<Handle<'_>>>
|
||||||
|
where
|
||||||
|
K: AsRef<[u8]> + Debug + ?Sized,
|
||||||
|
{
|
||||||
|
let res = self.get_blocking_opts(key, &self.cache_read_options);
|
||||||
|
cached_handle_from(res)
|
||||||
|
}
|
||||||
|
|
||||||
/// Fetch a value from the database into cache, returning a reference-handle.
|
/// Fetch a value from the database into cache, returning a reference-handle.
|
||||||
/// The key is referenced directly to perform the query. This is a thread-
|
/// The key is referenced directly to perform the query. This is a thread-
|
||||||
/// blocking call.
|
/// blocking call.
|
||||||
|
@ -99,37 +111,47 @@ pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
|
||||||
where
|
where
|
||||||
K: AsRef<[u8]> + ?Sized,
|
K: AsRef<[u8]> + ?Sized,
|
||||||
{
|
{
|
||||||
self.db
|
let res = self.get_blocking_opts(key, &self.read_options);
|
||||||
.db
|
handle_from(res)
|
||||||
.get_pinned_cf_opt(&self.cf(), key, &self.read_options)
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
fn get_blocking_opts<K>(
|
||||||
|
&self,
|
||||||
|
key: &K,
|
||||||
|
read_options: &ReadOptions,
|
||||||
|
) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>
|
||||||
|
where
|
||||||
|
K: AsRef<[u8]> + ?Sized,
|
||||||
|
{
|
||||||
|
self.db.db.get_pinned_cf_opt(&self.cf(), key, read_options)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub(super) fn handle_from(
|
||||||
|
result: Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>,
|
||||||
|
) -> Result<Handle<'_>> {
|
||||||
|
result
|
||||||
.map_err(map_err)?
|
.map_err(map_err)?
|
||||||
.map(Handle::from)
|
.map(Handle::from)
|
||||||
.ok_or(err!(Request(NotFound("Not found in database"))))
|
.ok_or(err!(Request(NotFound("Not found in database"))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch a value from the cache without I/O.
|
#[inline]
|
||||||
#[implement(super::Map)]
|
pub(super) fn cached_handle_from(
|
||||||
#[tracing::instrument(skip(self, key), name = "cache", level = "trace")]
|
result: Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>,
|
||||||
pub(crate) fn get_cached<K>(&self, key: &K) -> Result<Option<Handle<'_>>>
|
) -> Result<Option<Handle<'_>>> {
|
||||||
where
|
match result {
|
||||||
K: AsRef<[u8]> + Debug + ?Sized,
|
|
||||||
{
|
|
||||||
let res = self
|
|
||||||
.db
|
|
||||||
.db
|
|
||||||
.get_pinned_cf_opt(&self.cf(), key, &self.cache_read_options);
|
|
||||||
|
|
||||||
match res {
|
|
||||||
// cache hit; not found
|
// cache hit; not found
|
||||||
| Ok(None) => Err!(Request(NotFound("Not found in database"))),
|
| Ok(None) => Err!(Request(NotFound("Not found in database"))),
|
||||||
|
|
||||||
// cache hit; value found
|
// cache hit; value found
|
||||||
| Ok(Some(res)) => Ok(Some(Handle::from(res))),
|
| Ok(Some(result)) => Ok(Some(Handle::from(result))),
|
||||||
|
|
||||||
// cache miss; unknown
|
// cache miss; unknown
|
||||||
| Err(e) if is_incomplete(&e) => Ok(None),
|
| Err(error) if is_incomplete(&error) => Ok(None),
|
||||||
|
|
||||||
// some other error occurred
|
// some other error occurred
|
||||||
| Err(e) => or_else(e),
|
| Err(error) => or_else(error),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::{convert::AsRef, fmt::Debug, sync::Arc};
|
use std::{convert::AsRef, fmt::Debug, sync::Arc};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
err, implement,
|
implement,
|
||||||
utils::{
|
utils::{
|
||||||
stream::{automatic_amplification, automatic_width, WidebandExt},
|
stream::{automatic_amplification, automatic_width, WidebandExt},
|
||||||
IterStream,
|
IterStream,
|
||||||
|
@ -9,9 +9,11 @@ use conduwuit::{
|
||||||
Result,
|
Result,
|
||||||
};
|
};
|
||||||
use futures::{Stream, StreamExt, TryStreamExt};
|
use futures::{Stream, StreamExt, TryStreamExt};
|
||||||
|
use rocksdb::{DBPinnableSlice, ReadOptions};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use crate::{keyval::KeyBuf, ser, util::map_err, Handle};
|
use super::get::{cached_handle_from, handle_from};
|
||||||
|
use crate::{keyval::KeyBuf, ser, Handle};
|
||||||
|
|
||||||
#[implement(super::Map)]
|
#[implement(super::Map)]
|
||||||
#[tracing::instrument(skip(self, keys), level = "trace")]
|
#[tracing::instrument(skip(self, keys), level = "trace")]
|
||||||
|
@ -66,12 +68,40 @@ where
|
||||||
.try_flatten()
|
.try_flatten()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(name = "batch_cached", level = "trace", skip_all)]
|
||||||
|
pub(crate) fn get_batch_cached<'a, I, K>(
|
||||||
|
&self,
|
||||||
|
keys: I,
|
||||||
|
) -> impl Iterator<Item = Result<Option<Handle<'_>>>> + Send
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
|
||||||
|
K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
|
||||||
|
{
|
||||||
|
self.get_batch_blocking_opts(keys, &self.cache_read_options)
|
||||||
|
.map(cached_handle_from)
|
||||||
|
}
|
||||||
|
|
||||||
#[implement(super::Map)]
|
#[implement(super::Map)]
|
||||||
#[tracing::instrument(name = "batch_blocking", level = "trace", skip_all)]
|
#[tracing::instrument(name = "batch_blocking", level = "trace", skip_all)]
|
||||||
pub(crate) fn get_batch_blocking<'a, I, K>(
|
pub(crate) fn get_batch_blocking<'a, I, K>(
|
||||||
&self,
|
&self,
|
||||||
keys: I,
|
keys: I,
|
||||||
) -> impl Iterator<Item = Result<Handle<'_>>> + Send
|
) -> impl Iterator<Item = Result<Handle<'_>>> + Send
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
|
||||||
|
K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
|
||||||
|
{
|
||||||
|
self.get_batch_blocking_opts(keys, &self.read_options)
|
||||||
|
.map(handle_from)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
fn get_batch_blocking_opts<'a, I, K>(
|
||||||
|
&self,
|
||||||
|
keys: I,
|
||||||
|
read_options: &ReadOptions,
|
||||||
|
) -> impl Iterator<Item = Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>> + Send
|
||||||
where
|
where
|
||||||
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
|
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
|
||||||
K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
|
K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
|
||||||
|
@ -80,15 +110,8 @@ where
|
||||||
// comparator**.
|
// comparator**.
|
||||||
const SORTED: bool = false;
|
const SORTED: bool = false;
|
||||||
|
|
||||||
let read_options = &self.read_options;
|
|
||||||
self.db
|
self.db
|
||||||
.db
|
.db
|
||||||
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
|
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|result| {
|
|
||||||
result
|
|
||||||
.map_err(map_err)?
|
|
||||||
.map(Handle::from)
|
|
||||||
.ok_or(err!(Request(NotFound("Not found in database"))))
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue