From c7ae9516767dff6767a725be3d144164c0d1ad56 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 27 Nov 2024 06:28:32 +0000 Subject: [PATCH] add frontend threadpool to database Signed-off-by: Jason Volk --- Cargo.lock | 51 +++++++++- Cargo.toml | 6 +- src/database/Cargo.toml | 1 + src/database/engine.rs | 9 +- src/database/map.rs | 11 ++- src/database/map/contains.rs | 49 +++++---- src/database/map/get.rs | 100 ++++++++++++++----- src/database/mod.rs | 1 + src/database/pool.rs | 186 +++++++++++++++++++++++++++++++++++ src/database/util.rs | 5 +- 10 files changed, 362 insertions(+), 57 deletions(-) create mode 100644 src/database/pool.rs diff --git a/Cargo.lock b/Cargo.lock index 28f0edad..5c79b4b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,6 +92,18 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f093eed78becd229346bf859eec0aa4dd7ddde0757287b2b4107a1f09c80002" +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.18" @@ -597,6 +609,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "conduit" version = "0.5.0" @@ -736,6 +757,7 @@ name = "conduit_database" version = "0.5.0" dependencies = [ "arrayvec", + "async-channel", "conduit_core", "const-str", "futures", @@ -1219,6 +1241,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fdeflate" version = "0.3.6" @@ -2151,7 +2194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -2589,6 +2632,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" diff --git a/Cargo.toml b/Cargo.toml index 12163f6a..283c5a95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,10 +307,14 @@ version = "0.13.0" [workspace.dependencies.cyborgtime] version = "2.1.1" -# used to replace the channels of the tokio runtime +# used for MPSC channels [workspace.dependencies.loole] version = "0.4.0" +# used for MPMC channels +[workspace.dependencies.async-channel] +version = "2.3.1" + [workspace.dependencies.async-trait] version = "0.1.83" diff --git a/src/database/Cargo.toml b/src/database/Cargo.toml index 0e718aa7..c45931a5 100644 --- a/src/database/Cargo.toml +++ b/src/database/Cargo.toml @@ -36,6 +36,7 @@ zstd_compression = [ [dependencies] arrayvec.workspace = true +async-channel.workspace = true conduit-core.workspace = true const-str.workspace = true futures.workspace = true diff --git a/src/database/engine.rs b/src/database/engine.rs index 9da20803..837c7259 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -16,7 +16,9 @@ use rocksdb::{ use crate::{ opts::{cf_options, db_options}, - or_else, result, + or_else, pool, + pool::Pool, + result, util::map_err, }; @@ -31,6 +33,7 @@ pub struct Engine { corks: AtomicU32, pub(super) read_only: bool, pub(super) secondary: bool, + pub(crate) pool: Arc, } pub(crate) type Db = DBWithThreadMode; @@ -111,6 +114,7 @@ impl Engine { corks: AtomicU32::new(0), read_only: config.rocksdb_read_only, secondary: config.rocksdb_secondary, + pool: Pool::new(&pool::Opts::default())?, })) } @@ -316,6 +320,9 @@ impl Drop for Engine { fn drop(&mut self) { const BLOCKING: bool = true; + debug!("Joining request threads..."); + self.pool.close(); + debug!("Waiting for background tasks to finish..."); self.db.cancel_all_background_work(BLOCKING); diff --git a/src/database/map.rs b/src/database/map.rs index d6b8bf38..4b55fa54 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -27,7 +27,7 @@ use std::{ }; use conduit::Result; -use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions}; +use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, ReadTier, WriteOptions}; use crate::{watchers::Watchers, Engine}; @@ -38,6 +38,7 @@ pub struct Map { watchers: Watchers, write_options: WriteOptions, read_options: ReadOptions, + cache_read_options: ReadOptions, } impl Map { @@ -49,6 +50,7 @@ impl Map { watchers: Watchers::default(), write_options: write_options_default(), read_options: read_options_default(), + cache_read_options: cache_read_options_default(), })) } @@ -112,6 +114,13 @@ fn open(db: &Arc, name: &str) -> Result> { }) } +#[inline] +fn cache_read_options_default() -> ReadOptions { + let mut read_options = read_options_default(); + read_options.set_read_tier(ReadTier::BlockCache); + read_options +} + #[inline] fn read_options_default() -> ReadOptions { let mut read_options = ReadOptions::default(); diff --git a/src/database/map/contains.rs b/src/database/map/contains.rs index a98fe7c5..7acef2f6 100644 --- a/src/database/map/contains.rs +++ b/src/database/map/contains.rs @@ -1,17 +1,21 @@ -use std::{convert::AsRef, fmt::Debug, future::Future, io::Write}; +use std::{convert::AsRef, fmt::Debug, future::Future, io::Write, sync::Arc}; use arrayvec::ArrayVec; -use conduit::{implement, utils::TryFutureExtExt, Err, Result}; -use futures::future::ready; +use conduit::{ + err, implement, + utils::{future::TryExtExt, result::FlatOk}, + Result, +}; +use futures::FutureExt; use serde::Serialize; -use crate::{ser, util}; +use crate::ser; /// Returns true if the map contains the key. /// - key is serialized into allocated buffer /// - harder errors may not be reported #[implement(super::Map)] -pub fn contains(&self, key: &K) -> impl Future + Send +pub fn contains(self: &Arc, key: &K) -> impl Future + Send + '_ where K: Serialize + ?Sized + Debug, { @@ -23,7 +27,7 @@ where /// - key is serialized into stack-buffer /// - harder errors will panic #[implement(super::Map)] -pub fn acontains(&self, key: &K) -> impl Future + Send +pub fn acontains(self: &Arc, key: &K) -> impl Future + Send + '_ where K: Serialize + ?Sized + Debug, { @@ -36,7 +40,7 @@ where /// - harder errors will panic #[implement(super::Map)] #[tracing::instrument(skip(self, buf), fields(%self), level = "trace")] -pub fn bcontains(&self, key: &K, buf: &mut B) -> impl Future + Send +pub fn bcontains(self: &Arc, key: &K, buf: &mut B) -> impl Future + Send + '_ where K: Serialize + ?Sized + Debug, B: Write + AsRef<[u8]>, @@ -48,41 +52,36 @@ where /// Returns Ok if the map contains the key. /// - key is raw #[implement(super::Map)] -pub fn exists(&self, key: &K) -> impl Future> + Send +pub fn exists<'a, K>(self: &'a Arc, key: &K) -> impl Future + Send + 'a where - K: AsRef<[u8]> + ?Sized + Debug, + K: AsRef<[u8]> + ?Sized + Debug + 'a, { - ready(self.exists_blocking(key)) + self.get(key).map(|res| res.map(|_| ())) } /// Returns Ok if the map contains the key; NotFound otherwise. Harder errors /// may not always be reported properly. #[implement(super::Map)] #[tracing::instrument(skip(self, key), fields(%self), level = "trace")] -pub fn exists_blocking(&self, key: &K) -> Result<()> +pub fn exists_blocking(&self, key: &K) -> Result where K: AsRef<[u8]> + ?Sized + Debug, { - if self.maybe_exists_blocking(key) - && self - .db - .db - .get_pinned_cf_opt(&self.cf(), key, &self.read_options) - .map_err(util::map_err)? - .is_some() - { - Ok(()) - } else { - Err!(Request(NotFound("Not found in database"))) - } + self.maybe_exists(key) + .then(|| self.get_blocking(key)) + .flat_ok() + .map(|_| ()) + .ok_or_else(|| err!(Request(NotFound("Not found in database")))) } +/// Rocksdb limits this to kBlockCacheTier internally so this is not actually a +/// blocking call; in case that changes we set this as well in our read_options. #[implement(super::Map)] -fn maybe_exists_blocking(&self, key: &K) -> bool +pub(crate) fn maybe_exists(&self, key: &K) -> bool where K: AsRef<[u8]> + ?Sized, { self.db .db - .key_may_exist_cf_opt(&self.cf(), key, &self.read_options) + .key_may_exist_cf_opt(&self.cf(), key, &self.cache_read_options) } diff --git a/src/database/map/get.rs b/src/database/map/get.rs index 24649175..a00eecfa 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -1,13 +1,16 @@ -use std::{convert::AsRef, fmt::Debug, future::Future, io::Write}; +use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc}; use arrayvec::ArrayVec; -use conduit::{err, implement, utils::IterStream, Result}; -use futures::{FutureExt, Stream}; +use conduit::{err, implement, utils::IterStream, Err, Result}; +use futures::{future, Future, FutureExt, Stream}; use rocksdb::DBPinnableSlice; use serde::Serialize; -use tokio::task; -use crate::{ser, util, Handle}; +use crate::{ + ser, + util::{is_incomplete, map_err, or_else}, + Handle, +}; type RocksdbResult<'a> = Result>, rocksdb::Error>; @@ -15,7 +18,7 @@ type RocksdbResult<'a> = Result>, rocksdb::Error>; /// asynchronously. The key is serialized into an allocated buffer to perform /// the query. #[implement(super::Map)] -pub fn qry(&self, key: &K) -> impl Future>> + Send +pub fn qry(self: &Arc, key: &K) -> impl Future>> + Send where K: Serialize + ?Sized + Debug, { @@ -27,7 +30,7 @@ where /// asynchronously. The key is serialized into a fixed-sized buffer to perform /// the query. The maximum size is supplied as const generic parameter. #[implement(super::Map)] -pub fn aqry(&self, key: &K) -> impl Future>> + Send +pub fn aqry(self: &Arc, key: &K) -> impl Future>> + Send where K: Serialize + ?Sized + Debug, { @@ -39,7 +42,7 @@ where /// asynchronously. The key is serialized into a user-supplied Writer. #[implement(super::Map)] #[tracing::instrument(skip(self, buf), level = "trace")] -pub fn bqry(&self, key: &K, buf: &mut B) -> impl Future>> + Send +pub fn bqry(self: &Arc, key: &K, buf: &mut B) -> impl Future>> + Send where K: Serialize + ?Sized + Debug, B: Write + AsRef<[u8]>, @@ -52,28 +55,28 @@ where /// asynchronously. The key is referenced directly to perform the query. #[implement(super::Map)] #[tracing::instrument(skip(self, key), fields(%self), level = "trace")] -pub fn get(&self, key: &K) -> impl Future>> + Send +pub fn get(self: &Arc, key: &K) -> impl Future>> + Send where - K: AsRef<[u8]> + ?Sized + Debug, + K: AsRef<[u8]> + Debug + ?Sized, { - let result = self.get_blocking(key); - task::consume_budget().map(move |()| result) -} + use crate::pool::{Cmd, Get}; -/// Fetch a value from the database into cache, returning a reference-handle. -/// The key is referenced directly to perform the query. This is a thread- -/// blocking call. -#[implement(super::Map)] -pub fn get_blocking(&self, key: &K) -> Result> -where - K: AsRef<[u8]> + ?Sized, -{ - let res = self - .db - .db - .get_pinned_cf_opt(&self.cf(), key, &self.read_options); + let cached = self.get_cached(key); + if matches!(cached, Err(_) | Ok(Some(_))) { + return future::ready(cached.map(|res| res.expect("Option is Some"))).boxed(); + } - into_result_handle(res) + debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete"); + let cmd = Cmd::Get(Get { + map: self.clone(), + res: None, + key: key + .as_ref() + .try_into() + .expect("failed to copy key into buffer"), + }); + + self.db.pool.execute(cmd).boxed() } #[implement(super::Map)] @@ -104,9 +107,52 @@ where .map(into_result_handle) } +/// Fetch a value from the database into cache, returning a reference-handle. +/// The key is referenced directly to perform the query. This is a thread- +/// blocking call. +#[implement(super::Map)] +pub fn get_blocking(&self, key: &K) -> Result> +where + K: AsRef<[u8]> + ?Sized, +{ + let res = self + .db + .db + .get_pinned_cf_opt(&self.cf(), key, &self.read_options); + + into_result_handle(res) +} + +/// Fetch a value from the cache without I/O. +#[implement(super::Map)] +#[tracing::instrument(skip(self, key), fields(%self), level = "trace")] +pub(crate) fn get_cached(&self, key: &K) -> Result>> +where + K: AsRef<[u8]> + Debug + ?Sized, +{ + let res = self + .db + .db + .get_pinned_cf_opt(&self.cf(), key, &self.cache_read_options); + + match res { + // cache hit; not found + Ok(None) => Err!(Request(NotFound("Not found in database"))), + + // cache hit; value found + Ok(Some(res)) => Ok(Some(Handle::from(res))), + + // cache miss; unknown + Err(e) if is_incomplete(&e) => Ok(None), + + // some other error occurred + Err(e) => or_else(e), + } +} + fn into_result_handle(result: RocksdbResult<'_>) -> Result> { result - .map_err(util::map_err)? + .map_err(map_err)? .map(Handle::from) .ok_or(err!(Request(NotFound("Not found in database")))) } diff --git a/src/database/mod.rs b/src/database/mod.rs index f09c4a71..cd91fba2 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -8,6 +8,7 @@ pub mod keyval; mod map; pub mod maps; mod opts; +mod pool; mod ser; mod stream; mod tests; diff --git a/src/database/pool.rs b/src/database/pool.rs new file mode 100644 index 00000000..e2cb2605 --- /dev/null +++ b/src/database/pool.rs @@ -0,0 +1,186 @@ +use std::{ + convert::identity, + mem::take, + sync::{Arc, Mutex}, + thread::JoinHandle, +}; + +use arrayvec::ArrayVec; +use async_channel::{bounded, Receiver, Sender}; +use conduit::{debug, defer, err, implement, Result}; +use futures::channel::oneshot; + +use crate::{Handle, Map}; + +pub(crate) struct Pool { + workers: Mutex>>, + recv: Receiver, + send: Sender, +} + +#[derive(Default)] +pub(crate) struct Opts { + queue_size: Option, + worker_num: Option, +} + +const WORKER_THREAD_NAME: &str = "conduwuit:db"; +const DEFAULT_QUEUE_SIZE: usize = 1024; +const DEFAULT_WORKER_NUM: usize = 32; +const KEY_MAX_BYTES: usize = 384; + +#[derive(Debug)] +pub(crate) enum Cmd { + Get(Get), +} + +#[derive(Debug)] +pub(crate) struct Get { + pub(crate) map: Arc, + pub(crate) key: ArrayVec, + pub(crate) res: Option>>>, +} + +#[implement(Pool)] +pub(crate) fn new(opts: &Opts) -> Result> { + let queue_size = opts.queue_size.unwrap_or(DEFAULT_QUEUE_SIZE); + + let (send, recv) = bounded(queue_size); + let pool = Arc::new(Self { + workers: Vec::new().into(), + recv, + send, + }); + + let worker_num = opts.worker_num.unwrap_or(DEFAULT_WORKER_NUM); + pool.spawn_until(worker_num)?; + + Ok(pool) +} + +#[implement(Pool)] +fn spawn_until(self: &Arc, max: usize) -> Result { + let mut workers = self.workers.lock()?; + + while workers.len() < max { + self.clone().spawn_one(&mut workers)?; + } + + Ok(()) +} + +#[implement(Pool)] +fn spawn_one(self: Arc, workers: &mut Vec>) -> Result { + use std::thread::Builder; + + let id = workers.len(); + + debug!(?id, "spawning {WORKER_THREAD_NAME}..."); + let thread = Builder::new() + .name(WORKER_THREAD_NAME.into()) + .spawn(move || self.worker(id))?; + + workers.push(thread); + + Ok(id) +} + +#[implement(Pool)] +pub(crate) fn close(self: &Arc) { + debug!( + senders = %self.send.sender_count(), + receivers = %self.send.receiver_count(), + "Closing pool channel" + ); + + let closing = self.send.close(); + debug_assert!(closing, "channel is not closing"); + + debug!("Shutting down pool..."); + let mut workers = self.workers.lock().expect("locked"); + + debug!( + workers = %workers.len(), + "Waiting for workers to join..." + ); + take(&mut *workers) + .into_iter() + .map(JoinHandle::join) + .try_for_each(identity) + .expect("failed to join worker threads"); + + debug_assert!(self.send.is_empty(), "channel is not empty"); +} + +#[implement(Pool)] +#[tracing::instrument(skip(self, cmd), level = "trace")] +pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result> { + let (send, recv) = oneshot::channel(); + match &mut cmd { + Cmd::Get(ref mut cmd) => { + _ = cmd.res.insert(send); + }, + }; + + self.send + .send(cmd) + .await + .map_err(|e| err!(error!("send failed {e:?}")))?; + + recv.await + .map(into_recv_result) + .map_err(|e| err!(error!("recv failed {e:?}")))? +} + +#[implement(Pool)] +#[tracing::instrument(skip(self))] +fn worker(self: Arc, id: usize) { + debug!(?id, "worker spawned"); + defer! {{ debug!(?id, "worker finished"); }} + self.worker_loop(id); +} + +#[implement(Pool)] +fn worker_loop(&self, id: usize) { + while let Ok(mut cmd) = self.recv.recv_blocking() { + self.handle(id, &mut cmd); + } +} + +#[implement(Pool)] +fn handle(&self, id: usize, cmd: &mut Cmd) { + match cmd { + Cmd::Get(get) => self.handle_get(id, get), + } +} + +#[implement(Pool)] +#[tracing::instrument(skip(self, cmd), fields(%cmd.map), level = "trace")] +fn handle_get(&self, id: usize, cmd: &mut Get) { + let chan = cmd.res.take().expect("missing result channel"); + + // If the future was dropped while the command was queued then we can bail + // without any query. This makes it more efficient to use select() variants and + // pessimistic parallel queries. + if chan.is_canceled() { + return; + } + + let result = cmd.map.get_blocking(&cmd.key); + let _sent = chan.send(into_send_result(result)).is_ok(); +} + +fn into_send_result(result: Result>) -> Result> { + // SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through + // the channel. The lifetime on the handle is a device by rust-rocksdb to + // associate a database lifetime with its assets, not a function of rocksdb or + // the asset. The Handle must be dropped before the database is dropped. The + // handle must pass through recv_handle() on the other end of the channel. + unsafe { std::mem::transmute(result) } +} + +fn into_recv_result(result: Result>) -> Result> { + // SAFETY: This is to receive the Handle from the channel. Previously it had + // passed through send_handle(). + unsafe { std::mem::transmute(result) } +} diff --git a/src/database/util.rs b/src/database/util.rs index ae076381..21764361 100644 --- a/src/database/util.rs +++ b/src/database/util.rs @@ -1,5 +1,5 @@ use conduit::{err, Result}; -use rocksdb::{Direction, IteratorMode}; +use rocksdb::{Direction, ErrorKind, IteratorMode}; //#[cfg(debug_assertions)] macro_rules! unhandled { @@ -45,6 +45,9 @@ pub(crate) fn and_then(t: T) -> Result { Ok(t) } pub(crate) fn or_else(e: rocksdb::Error) -> Result { Err(map_err(e)) } +#[inline] +pub(crate) fn is_incomplete(e: &rocksdb::Error) -> bool { e.kind() == ErrorKind::Incomplete } + pub(crate) fn map_err(e: rocksdb::Error) -> conduit::Error { let string = e.into_string(); err!(Database(error!("{string}")))