From 3ad6aa59f9032b6f43a2e6fb2a7265d8bf3a93a1 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 28 Nov 2024 05:54:34 +0000 Subject: [PATCH] use smallvec for db query buffering Signed-off-by: Jason Volk --- Cargo.lock | 4 +++ Cargo.toml | 11 +++++- src/database/Cargo.toml | 1 + src/database/handle.rs | 1 + src/database/keyval.rs | 35 +++++++++++++++++-- src/database/map/contains.rs | 7 ++-- src/database/map/get.rs | 10 +++--- src/database/map/insert.rs | 26 ++++++++++---- src/database/map/keys_from.rs | 13 +++---- src/database/map/keys_prefix.rs | 8 ++--- src/database/map/remove.rs | 6 ++-- src/database/map/rev_keys_from.rs | 11 +++--- src/database/map/rev_keys_prefix.rs | 8 ++--- src/database/map/rev_stream_from.rs | 11 +++--- src/database/map/rev_stream_prefix.rs | 8 ++--- src/database/map/stream_from.rs | 13 +++---- src/database/map/stream_prefix.rs | 8 ++--- src/database/mod.rs | 4 +-- src/database/pool.rs | 50 +++++++++++++++++---------- src/database/ser.rs | 16 +++------ src/service/account_data/mod.rs | 2 +- src/service/media/data.rs | 4 +-- src/service/rooms/state_cache/mod.rs | 14 ++++---- 23 files changed, 173 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94a2b2bc..49c4127e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -765,6 +765,7 @@ dependencies = [ "rust-rocksdb-uwu", "serde", "serde_json", + "smallvec", "tokio", "tracing", ] @@ -3956,6 +3957,9 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +dependencies = [ + "serde", +] [[package]] name = "socket2" diff --git a/Cargo.toml b/Cargo.toml index 9557ae08..0e8596f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,16 @@ name = "conduit" [workspace.dependencies.arrayvec] version = "0.7.4" -features = ["std", "serde"] +features = ["serde"] + +[workspace.dependencies.smallvec] +version = "1.13.2" +features = [ + "const_generics", + "const_new", + "serde", + "write", +] [workspace.dependencies.const-str] version = "0.5.7" diff --git a/src/database/Cargo.toml b/src/database/Cargo.toml index c45931a5..1deaf980 100644 --- a/src/database/Cargo.toml +++ b/src/database/Cargo.toml @@ -44,6 +44,7 @@ log.workspace = true rust-rocksdb.workspace = true serde.workspace = true serde_json.workspace = true +smallvec.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/src/database/handle.rs b/src/database/handle.rs index daee224d..356bd859 100644 --- a/src/database/handle.rs +++ b/src/database/handle.rs @@ -58,6 +58,7 @@ impl<'a> Deserialized for Result<&'a Handle<'a>> { } impl<'a> Deserialized for &'a Handle<'a> { + #[inline] fn map_de(self, f: F) -> Result where F: FnOnce(T) -> U, diff --git a/src/database/keyval.rs b/src/database/keyval.rs index a288f184..d4568600 100644 --- a/src/database/keyval.rs +++ b/src/database/keyval.rs @@ -1,13 +1,42 @@ use conduit::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; -use crate::de; +use crate::{de, ser}; pub type KeyVal<'a, K = &'a Slice, V = &'a Slice> = (Key<'a, K>, Val<'a, V>); pub type Key<'a, T = &'a Slice> = T; pub type Val<'a, T = &'a Slice> = T; -pub type Slice = [u8]; +pub type KeyBuf = KeyBuffer; +pub type ValBuf = ValBuffer; + +pub type KeyBuffer = Buffer; +pub type ValBuffer = Buffer; +pub type Buffer = SmallVec<[Byte; CAP]>; + +pub type Slice = [Byte]; +pub type Byte = u8; + +pub const KEY_STACK_CAP: usize = 128; +pub const VAL_STACK_CAP: usize = 512; +pub const DEF_STACK_CAP: usize = KEY_STACK_CAP; + +#[inline] +pub fn serialize_key(val: T) -> Result +where + T: Serialize, +{ + ser::serialize_to::(val) +} + +#[inline] +pub fn serialize_val(val: T) -> Result +where + T: Serialize, +{ + ser::serialize_to::(val) +} #[inline] pub(crate) fn _expect_deserialize<'a, K, V>(kv: Result>) -> KeyVal<'a, K, V> diff --git a/src/database/map/contains.rs b/src/database/map/contains.rs index 7acef2f6..7016b744 100644 --- a/src/database/map/contains.rs +++ b/src/database/map/contains.rs @@ -9,23 +9,25 @@ use conduit::{ use futures::FutureExt; use serde::Serialize; -use crate::ser; +use crate::{keyval::KeyBuf, ser}; /// Returns true if the map contains the key. /// - key is serialized into allocated buffer /// - harder errors may not be reported +#[inline] #[implement(super::Map)] pub fn contains(self: &Arc, key: &K) -> impl Future + Send + '_ where K: Serialize + ?Sized + Debug, { - let mut buf = Vec::::with_capacity(64); + let mut buf = KeyBuf::new(); self.bcontains(key, &mut buf) } /// Returns true if the map contains the key. /// - key is serialized into stack-buffer /// - harder errors will panic +#[inline] #[implement(super::Map)] pub fn acontains(self: &Arc, key: &K) -> impl Future + Send + '_ where @@ -51,6 +53,7 @@ where /// Returns Ok if the map contains the key. /// - key is raw +#[inline] #[implement(super::Map)] pub fn exists<'a, K>(self: &'a Arc, key: &K) -> impl Future + Send + 'a where diff --git a/src/database/map/get.rs b/src/database/map/get.rs index a00eecfa..befc0b24 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -7,6 +7,7 @@ use rocksdb::DBPinnableSlice; use serde::Serialize; use crate::{ + keyval::KeyBuf, ser, util::{is_incomplete, map_err, or_else}, Handle, @@ -18,11 +19,12 @@ type RocksdbResult<'a> = Result>, rocksdb::Error>; /// asynchronously. The key is serialized into an allocated buffer to perform /// the query. #[implement(super::Map)] +#[inline] pub fn qry(self: &Arc, key: &K) -> impl Future>> + Send where K: Serialize + ?Sized + Debug, { - let mut buf = Vec::::with_capacity(64); + let mut buf = KeyBuf::new(); self.bqry(key, &mut buf) } @@ -30,6 +32,7 @@ where /// asynchronously. The key is serialized into a fixed-sized buffer to perform /// the query. The maximum size is supplied as const generic parameter. #[implement(super::Map)] +#[inline] pub fn aqry(self: &Arc, key: &K) -> impl Future>> + Send where K: Serialize + ?Sized + Debug, @@ -69,11 +72,8 @@ where debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete"); let cmd = Cmd::Get(Get { map: self.clone(), + key: key.as_ref().into(), res: None, - key: key - .as_ref() - .try_into() - .expect("failed to copy key into buffer"), }); self.db.pool.execute(cmd).boxed() diff --git a/src/database/map/insert.rs b/src/database/map/insert.rs index 9bebe7cf..5b2e816c 100644 --- a/src/database/map/insert.rs +++ b/src/database/map/insert.rs @@ -10,20 +10,25 @@ use conduit::implement; use rocksdb::WriteBatchWithTransaction; use serde::Serialize; -use crate::{ser, util::or_else}; +use crate::{ + keyval::{KeyBuf, ValBuf}, + ser, + util::or_else, +}; /// Insert Key/Value /// /// - Key is serialized /// - Val is serialized #[implement(super::Map)] +#[inline] pub fn put(&self, key: K, val: V) where K: Serialize + Debug, V: Serialize, { - let mut key_buf = Vec::new(); - let mut val_buf = Vec::new(); + let mut key_buf = KeyBuf::new(); + let mut val_buf = ValBuf::new(); self.bput(key, val, (&mut key_buf, &mut val_buf)); } @@ -32,12 +37,13 @@ where /// - Key is serialized /// - Val is raw #[implement(super::Map)] +#[inline] pub fn put_raw(&self, key: K, val: V) where K: Serialize + Debug, V: AsRef<[u8]>, { - let mut key_buf = Vec::new(); + let mut key_buf = KeyBuf::new(); self.bput_raw(key, val, &mut key_buf); } @@ -46,12 +52,13 @@ where /// - Key is raw /// - Val is serialized #[implement(super::Map)] +#[inline] pub fn raw_put(&self, key: K, val: V) where K: AsRef<[u8]>, V: Serialize, { - let mut val_buf = Vec::new(); + let mut val_buf = ValBuf::new(); self.raw_bput(key, val, &mut val_buf); } @@ -60,12 +67,13 @@ where /// - Key is serialized /// - Val is serialized to stack-buffer #[implement(super::Map)] +#[inline] pub fn put_aput(&self, key: K, val: V) where K: Serialize + Debug, V: Serialize, { - let mut key_buf = Vec::new(); + let mut key_buf = KeyBuf::new(); let mut val_buf = ArrayVec::::new(); self.bput(key, val, (&mut key_buf, &mut val_buf)); } @@ -75,13 +83,14 @@ where /// - Key is serialized to stack-buffer /// - Val is serialized #[implement(super::Map)] +#[inline] pub fn aput_put(&self, key: K, val: V) where K: Serialize + Debug, V: Serialize, { let mut key_buf = ArrayVec::::new(); - let mut val_buf = Vec::new(); + let mut val_buf = ValBuf::new(); self.bput(key, val, (&mut key_buf, &mut val_buf)); } @@ -90,6 +99,7 @@ where /// - Key is serialized to stack-buffer /// - Val is serialized to stack-buffer #[implement(super::Map)] +#[inline] pub fn aput(&self, key: K, val: V) where K: Serialize + Debug, @@ -105,6 +115,7 @@ where /// - Key is serialized to stack-buffer /// - Val is raw #[implement(super::Map)] +#[inline] pub fn aput_raw(&self, key: K, val: V) where K: Serialize + Debug, @@ -119,6 +130,7 @@ where /// - Key is raw /// - Val is serialized to stack-buffer #[implement(super::Map)] +#[inline] pub fn raw_aput(&self, key: K, val: V) where K: AsRef<[u8]>, diff --git a/src/database/map/keys_from.rs b/src/database/map/keys_from.rs index 630bf3fb..093f7fd6 100644 --- a/src/database/map/keys_from.rs +++ b/src/database/map/keys_from.rs @@ -4,7 +4,10 @@ use conduit::{implement, Result}; use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::Key, ser, stream}; +use crate::{ + keyval::{result_deserialize_key, serialize_key, Key}, + stream, +}; #[implement(super::Map)] pub fn keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream>> + Send @@ -12,8 +15,7 @@ where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, { - self.keys_from_raw(from) - .map(keyval::result_deserialize_key::) + self.keys_from_raw(from).map(result_deserialize_key::) } #[implement(super::Map)] @@ -22,7 +24,7 @@ pub fn keys_from_raw

(&self, from: &P) -> impl Stream>> where P: Serialize + ?Sized + Debug, { - let key = ser::serialize_to_vec(from).expect("failed to serialize query key"); + let key = serialize_key(from).expect("failed to serialize query key"); self.raw_keys_from(&key) } @@ -32,8 +34,7 @@ where P: AsRef<[u8]> + ?Sized + Debug + Sync, K: Deserialize<'a> + Send, { - self.raw_keys_from(from) - .map(keyval::result_deserialize_key::) + self.raw_keys_from(from).map(result_deserialize_key::) } #[implement(super::Map)] diff --git a/src/database/map/keys_prefix.rs b/src/database/map/keys_prefix.rs index df214af4..8963f002 100644 --- a/src/database/map/keys_prefix.rs +++ b/src/database/map/keys_prefix.rs @@ -8,7 +8,7 @@ use futures::{ }; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::Key, ser}; +use crate::keyval::{result_deserialize_key, serialize_key, Key}; #[implement(super::Map)] pub fn keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream>> + Send @@ -17,7 +17,7 @@ where K: Deserialize<'a> + Send, { self.keys_prefix_raw(prefix) - .map(keyval::result_deserialize_key::) + .map(result_deserialize_key::) } #[implement(super::Map)] @@ -26,7 +26,7 @@ pub fn keys_prefix_raw

(&self, prefix: &P) -> impl Stream| future::ok(k.starts_with(&key))) } @@ -38,7 +38,7 @@ where K: Deserialize<'a> + Send + 'a, { self.raw_keys_prefix(prefix) - .map(keyval::result_deserialize_key::) + .map(result_deserialize_key::) } #[implement(super::Map)] diff --git a/src/database/map/remove.rs b/src/database/map/remove.rs index 949817a0..18080c64 100644 --- a/src/database/map/remove.rs +++ b/src/database/map/remove.rs @@ -4,18 +4,20 @@ use arrayvec::ArrayVec; use conduit::implement; use serde::Serialize; -use crate::{ser, util::or_else}; +use crate::{keyval::KeyBuf, ser, util::or_else}; #[implement(super::Map)] +#[inline] pub fn del(&self, key: K) where K: Serialize + Debug, { - let mut buf = Vec::::with_capacity(64); + let mut buf = KeyBuf::new(); self.bdel(key, &mut buf); } #[implement(super::Map)] +#[inline] pub fn adel(&self, key: K) where K: Serialize + Debug, diff --git a/src/database/map/rev_keys_from.rs b/src/database/map/rev_keys_from.rs index c1c6f3da..75d062b5 100644 --- a/src/database/map/rev_keys_from.rs +++ b/src/database/map/rev_keys_from.rs @@ -4,7 +4,10 @@ use conduit::{implement, Result}; use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::Key, ser, stream}; +use crate::{ + keyval::{result_deserialize_key, serialize_key, Key}, + stream, +}; #[implement(super::Map)] pub fn rev_keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream>> + Send @@ -13,7 +16,7 @@ where K: Deserialize<'a> + Send, { self.rev_keys_from_raw(from) - .map(keyval::result_deserialize_key::) + .map(result_deserialize_key::) } #[implement(super::Map)] @@ -22,7 +25,7 @@ pub fn rev_keys_from_raw

(&self, from: &P) -> impl Stream + Send, { self.rev_raw_keys_from(from) - .map(keyval::result_deserialize_key::) + .map(result_deserialize_key::) } #[implement(super::Map)] diff --git a/src/database/map/rev_keys_prefix.rs b/src/database/map/rev_keys_prefix.rs index 957b974e..c14909d4 100644 --- a/src/database/map/rev_keys_prefix.rs +++ b/src/database/map/rev_keys_prefix.rs @@ -8,7 +8,7 @@ use futures::{ }; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::Key, ser}; +use crate::keyval::{result_deserialize_key, serialize_key, Key}; #[implement(super::Map)] pub fn rev_keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream>> + Send @@ -17,7 +17,7 @@ where K: Deserialize<'a> + Send, { self.rev_keys_prefix_raw(prefix) - .map(keyval::result_deserialize_key::) + .map(result_deserialize_key::) } #[implement(super::Map)] @@ -26,7 +26,7 @@ pub fn rev_keys_prefix_raw

(&self, prefix: &P) -> impl Stream| future::ok(k.starts_with(&key))) } @@ -38,7 +38,7 @@ where K: Deserialize<'a> + Send + 'a, { self.rev_raw_keys_prefix(prefix) - .map(keyval::result_deserialize_key::) + .map(result_deserialize_key::) } #[implement(super::Map)] diff --git a/src/database/map/rev_stream_from.rs b/src/database/map/rev_stream_from.rs index 7ef25ee0..6ac1cd1a 100644 --- a/src/database/map/rev_stream_from.rs +++ b/src/database/map/rev_stream_from.rs @@ -4,7 +4,10 @@ use conduit::{implement, Result}; use futures::stream::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::KeyVal, ser, stream}; +use crate::{ + keyval::{result_deserialize, serialize_key, KeyVal}, + stream, +}; /// Iterate key-value entries in the map starting from upper-bound. /// @@ -18,7 +21,7 @@ where V: Deserialize<'a> + Send, { self.rev_stream_from_raw(from) - .map(keyval::result_deserialize::) + .map(result_deserialize::) } /// Iterate key-value entries in the map starting from upper-bound. @@ -31,7 +34,7 @@ pub fn rev_stream_from_raw

(&self, from: &P) -> impl Stream + Send, { self.rev_raw_stream_from(from) - .map(keyval::result_deserialize::) + .map(result_deserialize::) } /// Iterate key-value entries in the map starting from upper-bound. diff --git a/src/database/map/rev_stream_prefix.rs b/src/database/map/rev_stream_prefix.rs index 286cedca..fd0d93ff 100644 --- a/src/database/map/rev_stream_prefix.rs +++ b/src/database/map/rev_stream_prefix.rs @@ -8,7 +8,7 @@ use futures::{ }; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::KeyVal, ser}; +use crate::keyval::{result_deserialize, serialize_key, KeyVal}; /// Iterate key-value entries in the map where the key matches a prefix. /// @@ -22,7 +22,7 @@ where V: Deserialize<'a> + Send, { self.rev_stream_prefix_raw(prefix) - .map(keyval::result_deserialize::) + .map(result_deserialize::) } /// Iterate key-value entries in the map where the key matches a prefix. @@ -35,7 +35,7 @@ pub fn rev_stream_prefix_raw

(&self, prefix: &P) -> impl Stream| future::ok(k.starts_with(&key))) } @@ -54,7 +54,7 @@ where V: Deserialize<'a> + Send + 'a, { self.rev_raw_stream_prefix(prefix) - .map(keyval::result_deserialize::) + .map(result_deserialize::) } /// Iterate key-value entries in the map where the key matches a prefix. diff --git a/src/database/map/stream_from.rs b/src/database/map/stream_from.rs index fe89afe1..052a2e74 100644 --- a/src/database/map/stream_from.rs +++ b/src/database/map/stream_from.rs @@ -4,7 +4,10 @@ use conduit::{implement, Result}; use futures::stream::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::KeyVal, ser, stream}; +use crate::{ + keyval::{result_deserialize, serialize_key, KeyVal}, + stream, +}; /// Iterate key-value entries in the map starting from lower-bound. /// @@ -17,8 +20,7 @@ where K: Deserialize<'a> + Send, V: Deserialize<'a> + Send, { - self.stream_from_raw(from) - .map(keyval::result_deserialize::) + self.stream_from_raw(from).map(result_deserialize::) } /// Iterate key-value entries in the map starting from lower-bound. @@ -31,7 +33,7 @@ pub fn stream_from_raw

(&self, from: &P) -> impl Stream + Send, V: Deserialize<'a> + Send, { - self.raw_stream_from(from) - .map(keyval::result_deserialize::) + self.raw_stream_from(from).map(result_deserialize::) } /// Iterate key-value entries in the map starting from lower-bound. diff --git a/src/database/map/stream_prefix.rs b/src/database/map/stream_prefix.rs index ca4cfeaa..a08b1e2a 100644 --- a/src/database/map/stream_prefix.rs +++ b/src/database/map/stream_prefix.rs @@ -8,7 +8,7 @@ use futures::{ }; use serde::{Deserialize, Serialize}; -use crate::{keyval, keyval::KeyVal, ser}; +use crate::keyval::{result_deserialize, serialize_key, KeyVal}; /// Iterate key-value entries in the map where the key matches a prefix. /// @@ -22,7 +22,7 @@ where V: Deserialize<'a> + Send, { self.stream_prefix_raw(prefix) - .map(keyval::result_deserialize::) + .map(result_deserialize::) } /// Iterate key-value entries in the map where the key matches a prefix. @@ -35,7 +35,7 @@ pub fn stream_prefix_raw

(&self, prefix: &P) -> impl Stream| future::ok(k.starts_with(&key))) } @@ -54,7 +54,7 @@ where V: Deserialize<'a> + Send + 'a, { self.raw_stream_prefix(prefix) - .map(keyval::result_deserialize::) + .map(result_deserialize::) } /// Iterate key-value entries in the map where the key matches a prefix. diff --git a/src/database/mod.rs b/src/database/mod.rs index cd91fba2..de060b3a 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -28,9 +28,9 @@ pub use self::{ de::{Ignore, IgnoreAll}, deserialized::Deserialized, handle::Handle, - keyval::{KeyVal, Slice}, + keyval::{serialize_key, serialize_val, KeyVal, Slice}, map::Map, - ser::{serialize, serialize_to_array, serialize_to_vec, Interfix, Json, Separator, SEP}, + ser::{serialize, serialize_to, serialize_to_vec, Interfix, Json, Separator, SEP}, }; conduit::mod_ctor! {} diff --git a/src/database/pool.rs b/src/database/pool.rs index e2cb2605..6e7a1e29 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -5,12 +5,11 @@ use std::{ thread::JoinHandle, }; -use arrayvec::ArrayVec; use async_channel::{bounded, Receiver, Sender}; use conduit::{debug, defer, err, implement, Result}; use futures::channel::oneshot; -use crate::{Handle, Map}; +use crate::{keyval::KeyBuf, Handle, Map}; pub(crate) struct Pool { workers: Mutex>>, @@ -27,7 +26,6 @@ pub(crate) struct Opts { const WORKER_THREAD_NAME: &str = "conduwuit:db"; const DEFAULT_QUEUE_SIZE: usize = 1024; const DEFAULT_WORKER_NUM: usize = 32; -const KEY_MAX_BYTES: usize = 384; #[derive(Debug)] pub(crate) enum Cmd { @@ -37,10 +35,12 @@ pub(crate) enum Cmd { #[derive(Debug)] pub(crate) struct Get { pub(crate) map: Arc, - pub(crate) key: ArrayVec, - pub(crate) res: Option>>>, + pub(crate) key: KeyBuf, + pub(crate) res: Option, } +type ResultSender = oneshot::Sender>>; + #[implement(Pool)] pub(crate) fn new(opts: &Opts) -> Result> { let queue_size = opts.queue_size.unwrap_or(DEFAULT_QUEUE_SIZE); @@ -92,7 +92,6 @@ pub(crate) fn close(self: &Arc) { receivers = %self.send.receiver_count(), "Closing pool channel" ); - let closing = self.send.close(); debug_assert!(closing, "channel is not closing"); @@ -116,11 +115,7 @@ pub(crate) fn close(self: &Arc) { #[tracing::instrument(skip(self, cmd), level = "trace")] pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result> { let (send, recv) = oneshot::channel(); - match &mut cmd { - Cmd::Get(ref mut cmd) => { - _ = cmd.res.insert(send); - }, - }; + Self::prepare(&mut cmd, send); self.send .send(cmd) @@ -132,6 +127,15 @@ pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result> { .map_err(|e| err!(error!("recv failed {e:?}")))? } +#[implement(Pool)] +fn prepare(cmd: &mut Cmd, send: ResultSender) { + match cmd { + Cmd::Get(ref mut cmd) => { + _ = cmd.res.insert(send); + }, + }; +} + #[implement(Pool)] #[tracing::instrument(skip(self))] fn worker(self: Arc, id: usize) { @@ -157,25 +161,35 @@ fn handle(&self, id: usize, cmd: &mut Cmd) { #[implement(Pool)] #[tracing::instrument(skip(self, cmd), fields(%cmd.map), level = "trace")] fn handle_get(&self, id: usize, cmd: &mut Get) { + debug_assert!(!cmd.key.is_empty(), "querying for empty key"); + + // Obtain the result channel. let chan = cmd.res.take().expect("missing result channel"); - // If the future was dropped while the command was queued then we can bail - // without any query. This makes it more efficient to use select() variants and - // pessimistic parallel queries. + // It is worth checking if the future was dropped while the command was queued + // so we can bail without paying for any query. if chan.is_canceled() { return; } + // Perform the actual database query. We reuse our database::Map interface but + // limited to the blocking calls, rather than creating another surface directly + // with rocksdb here. let result = cmd.map.get_blocking(&cmd.key); - let _sent = chan.send(into_send_result(result)).is_ok(); + + // Send the result back to the submitter. + let chan_result = chan.send(into_send_result(result)); + + // If the future was dropped during the query this will fail acceptably. + let _chan_sent = chan_result.is_ok(); } fn into_send_result(result: Result>) -> Result> { // SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through // the channel. The lifetime on the handle is a device by rust-rocksdb to - // associate a database lifetime with its assets, not a function of rocksdb or - // the asset. The Handle must be dropped before the database is dropped. The - // handle must pass through recv_handle() on the other end of the channel. + // associate a database lifetime with its assets. The Handle must be dropped + // before the database is dropped. The handle must pass through recv_handle() on + // the other end of the channel. unsafe { std::mem::transmute(result) } } diff --git a/src/database/ser.rs b/src/database/ser.rs index 961d2700..a60812aa 100644 --- a/src/database/ser.rs +++ b/src/database/ser.rs @@ -1,28 +1,20 @@ use std::io::Write; -use arrayvec::ArrayVec; use conduit::{debug::type_name, err, result::DebugInspect, utils::exchange, Error, Result}; use serde::{ser, Serialize}; use crate::util::unhandled; #[inline] -pub fn serialize_to_array(val: T) -> Result> -where - T: Serialize, -{ - let mut buf = ArrayVec::::new(); - serialize(&mut buf, val)?; - - Ok(buf) -} +pub fn serialize_to_vec(val: T) -> Result> { serialize_to::, T>(val) } #[inline] -pub fn serialize_to_vec(val: T) -> Result> +pub fn serialize_to(val: T) -> Result where + B: Default + Write + AsRef<[u8]>, T: Serialize, { - let mut buf = Vec::with_capacity(64); + let mut buf = B::default(); serialize(&mut buf, val)?; Ok(buf) diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index b752f9b8..5dc17640 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -116,7 +116,7 @@ pub fn changes_since<'a>( &'a self, room_id: Option<&'a RoomId>, user_id: &'a UserId, since: u64, ) -> impl Stream + Send + 'a { let prefix = (room_id, user_id, Interfix); - let prefix = database::serialize_to_vec(prefix).expect("failed to serialize prefix"); + let prefix = database::serialize_key(prefix).expect("failed to serialize prefix"); // Skip the data that's exactly at since, because we sent that last time let first_possible = (room_id, user_id, since.saturating_add(1)); diff --git a/src/service/media/data.rs b/src/service/media/data.rs index 9afbd708..3922dec9 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -39,14 +39,14 @@ impl Data { ) -> Result> { let dim: &[u32] = &[dim.width, dim.height]; let key = (mxc, dim, content_disposition, content_type); - let key = database::serialize_to_vec(key)?; + let key = database::serialize_key(key)?; self.mediaid_file.insert(&key, []); if let Some(user) = user { let key = (mxc, user); self.mediaid_user.put_raw(key, user); } - Ok(key) + Ok(key.to_vec()) } pub(super) async fn delete_file_mxc(&self, mxc: &Mxc<'_>) { diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 156345fe..4a33224e 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -9,7 +9,7 @@ use conduit::{ utils::{stream::TryIgnore, ReadyExt, StreamTools}, warn, Result, }; -use database::{serialize_to_vec, Deserialized, Ignore, Interfix, Json, Map}; +use database::{serialize_key, Deserialized, Ignore, Interfix, Json, Map}; use futures::{future::join4, pin_mut, stream::iter, Stream, StreamExt}; use itertools::Itertools; use ruma::{ @@ -289,10 +289,10 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { let userroom_id = (user_id, room_id); - let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); + let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id"); let roomuser_id = (room_id, user_id); - let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); + let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id"); self.db.userroomid_joined.insert(&userroom_id, []); self.db.roomuserid_joined.insert(&roomuser_id, []); @@ -312,10 +312,10 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { let userroom_id = (user_id, room_id); - let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); + let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id"); let roomuser_id = (room_id, user_id); - let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); + let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id"); // (timo) TODO let leftstate = Vec::>::new(); @@ -716,10 +716,10 @@ impl Service { invite_via: Option>, ) { let roomuser_id = (room_id, user_id); - let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); + let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id"); let userroom_id = (user_id, room_id); - let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); + let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id"); self.db .userroomid_invitestate