diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index 158fb3c8..7b3a1d49 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -178,7 +178,7 @@ fn get_cache(ctx: &Context, desc: &Descriptor) -> Option { .try_into() .expect("u32 to i32 conversion"); - debug_assert!(shard_bits <= 6, "cache shards limited to 64"); + debug_assert!(shard_bits <= 10, "cache shards probably too large"); let mut cache_opts = LruCacheOptions::default(); cache_opts.set_num_shard_bits(shard_bits); cache_opts.set_capacity(size); diff --git a/src/database/engine/context.rs b/src/database/engine/context.rs index 76238f7d..04e08854 100644 --- a/src/database/engine/context.rs +++ b/src/database/engine/context.rs @@ -4,7 +4,7 @@ use std::{ }; use conduwuit::{debug, utils::math::usize_from_f64, Result, Server}; -use rocksdb::{Cache, Env}; +use rocksdb::{Cache, Env, LruCacheOptions}; use crate::{or_else, pool::Pool}; @@ -25,12 +25,21 @@ impl Context { let config = &server.config; let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0; - let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?; - let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes); - + let col_shard_bits = 7; let col_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?; - let col_cache = Cache::new_lru_cache(col_cache_capacity_bytes); + let row_shard_bits = 7; + let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?; + + let mut row_cache_opts = LruCacheOptions::default(); + row_cache_opts.set_num_shard_bits(row_shard_bits); + row_cache_opts.set_capacity(row_cache_capacity_bytes); + let row_cache = Cache::new_lru_cache_opts(&row_cache_opts); + + let mut col_cache_opts = LruCacheOptions::default(); + col_cache_opts.set_num_shard_bits(col_shard_bits); + col_cache_opts.set_capacity(col_cache_capacity_bytes); + let col_cache = Cache::new_lru_cache_opts(&col_cache_opts); let col_cache: BTreeMap<_, _> = [("Shared".to_owned(), col_cache)].into(); let mut env = Env::new().or_else(or_else)?; diff --git a/src/database/engine/descriptor.rs b/src/database/engine/descriptor.rs index d668862b..234ca2bf 100644 --- a/src/database/engine/descriptor.rs +++ b/src/database/engine/descriptor.rs @@ -67,6 +67,7 @@ pub(crate) static BASE: Descriptor = Descriptor { pub(crate) static RANDOM: Descriptor = Descriptor { compaction_pri: CompactionPri::OldestSmallestSeqFirst, write_size: 1024 * 1024 * 32, + cache_shards: 128, ..BASE }; @@ -75,6 +76,7 @@ pub(crate) static SEQUENTIAL: Descriptor = Descriptor { write_size: 1024 * 1024 * 64, level_size: 1024 * 1024 * 32, file_size: 1024 * 1024 * 2, + cache_shards: 128, ..BASE }; diff --git a/src/database/map/get.rs b/src/database/map/get.rs index e64ef2ec..73182042 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -3,6 +3,7 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc}; use arrayvec::ArrayVec; use conduwuit::{err, implement, utils::result::MapExpect, Err, Result}; use futures::{future::ready, Future, FutureExt, TryFutureExt}; +use rocksdb::{DBPinnableSlice, ReadOptions}; use serde::Serialize; use tokio::task; @@ -90,6 +91,17 @@ where .boxed() } +/// Fetch a value from the cache without I/O. +#[implement(super::Map)] +#[tracing::instrument(skip(self, key), name = "cache", level = "trace")] +pub(crate) fn get_cached(&self, key: &K) -> Result>> +where + K: AsRef<[u8]> + Debug + ?Sized, +{ + let res = self.get_blocking_opts(key, &self.cache_read_options); + cached_handle_from(res) +} + /// Fetch a value from the database into cache, returning a reference-handle. /// The key is referenced directly to perform the query. This is a thread- /// blocking call. @@ -99,37 +111,47 @@ pub fn get_blocking(&self, key: &K) -> Result> where K: AsRef<[u8]> + ?Sized, { - self.db - .db - .get_pinned_cf_opt(&self.cf(), key, &self.read_options) + let res = self.get_blocking_opts(key, &self.read_options); + handle_from(res) +} + +#[implement(super::Map)] +fn get_blocking_opts( + &self, + key: &K, + read_options: &ReadOptions, +) -> Result>, rocksdb::Error> +where + K: AsRef<[u8]> + ?Sized, +{ + self.db.db.get_pinned_cf_opt(&self.cf(), key, read_options) +} + +#[inline] +pub(super) fn handle_from( + result: Result>, rocksdb::Error>, +) -> Result> { + result .map_err(map_err)? .map(Handle::from) .ok_or(err!(Request(NotFound("Not found in database")))) } -/// Fetch a value from the cache without I/O. -#[implement(super::Map)] -#[tracing::instrument(skip(self, key), name = "cache", level = "trace")] -pub(crate) fn get_cached(&self, key: &K) -> Result>> -where - K: AsRef<[u8]> + Debug + ?Sized, -{ - let res = self - .db - .db - .get_pinned_cf_opt(&self.cf(), key, &self.cache_read_options); - - match res { +#[inline] +pub(super) fn cached_handle_from( + result: Result>, rocksdb::Error>, +) -> Result>> { + match result { // cache hit; not found | Ok(None) => Err!(Request(NotFound("Not found in database"))), // cache hit; value found - | Ok(Some(res)) => Ok(Some(Handle::from(res))), + | Ok(Some(result)) => Ok(Some(Handle::from(result))), // cache miss; unknown - | Err(e) if is_incomplete(&e) => Ok(None), + | Err(error) if is_incomplete(&error) => Ok(None), // some other error occurred - | Err(e) => or_else(e), + | Err(error) => or_else(error), } } diff --git a/src/database/map/get_batch.rs b/src/database/map/get_batch.rs index 452697f1..ee9269e3 100644 --- a/src/database/map/get_batch.rs +++ b/src/database/map/get_batch.rs @@ -1,7 +1,7 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{ - err, implement, + implement, utils::{ stream::{automatic_amplification, automatic_width, WidebandExt}, IterStream, @@ -9,9 +9,11 @@ use conduwuit::{ Result, }; use futures::{Stream, StreamExt, TryStreamExt}; +use rocksdb::{DBPinnableSlice, ReadOptions}; use serde::Serialize; -use crate::{keyval::KeyBuf, ser, util::map_err, Handle}; +use super::get::{cached_handle_from, handle_from}; +use crate::{keyval::KeyBuf, ser, Handle}; #[implement(super::Map)] #[tracing::instrument(skip(self, keys), level = "trace")] @@ -66,12 +68,40 @@ where .try_flatten() } +#[implement(super::Map)] +#[tracing::instrument(name = "batch_cached", level = "trace", skip_all)] +pub(crate) fn get_batch_cached<'a, I, K>( + &self, + keys: I, +) -> impl Iterator>>> + Send +where + I: Iterator + ExactSizeIterator + Send, + K: AsRef<[u8]> + Send + ?Sized + Sync + 'a, +{ + self.get_batch_blocking_opts(keys, &self.cache_read_options) + .map(cached_handle_from) +} + #[implement(super::Map)] #[tracing::instrument(name = "batch_blocking", level = "trace", skip_all)] pub(crate) fn get_batch_blocking<'a, I, K>( &self, keys: I, ) -> impl Iterator>> + Send +where + I: Iterator + ExactSizeIterator + Send, + K: AsRef<[u8]> + Send + ?Sized + Sync + 'a, +{ + self.get_batch_blocking_opts(keys, &self.read_options) + .map(handle_from) +} + +#[implement(super::Map)] +fn get_batch_blocking_opts<'a, I, K>( + &self, + keys: I, + read_options: &ReadOptions, +) -> impl Iterator>, rocksdb::Error>> + Send where I: Iterator + ExactSizeIterator + Send, K: AsRef<[u8]> + Send + ?Sized + Sync + 'a, @@ -80,15 +110,8 @@ where // comparator**. const SORTED: bool = false; - let read_options = &self.read_options; self.db .db .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options) .into_iter() - .map(|result| { - result - .map_err(map_err)? - .map(Handle::from) - .ok_or(err!(Request(NotFound("Not found in database")))) - }) }