From 5192927a5342cffd9a7284bad3eb2c4b4819c674 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 29 Sep 2024 07:37:43 +0000 Subject: [PATCH] split remaining map suites Signed-off-by: Jason Volk --- src/core/utils/mod.rs | 10 -- src/database/map.rs | 163 ++--------------------------- src/database/map/get.rs | 82 +++++++++++++++ src/database/map/insert.rs | 52 +++++++++ src/database/map/remove.rs | 44 ++++++++ src/service/globals/data.rs | 28 +++-- src/service/rooms/short/data.rs | 2 +- src/service/rooms/timeline/data.rs | 2 +- src/service/users/mod.rs | 2 +- 9 files changed, 205 insertions(+), 180 deletions(-) create mode 100644 src/database/map/get.rs create mode 100644 src/database/map/insert.rs create mode 100644 src/database/map/remove.rs diff --git a/src/core/utils/mod.rs b/src/core/utils/mod.rs index b1ea3709..fef83395 100644 --- a/src/core/utils/mod.rs +++ b/src/core/utils/mod.rs @@ -35,13 +35,3 @@ pub use self::{ #[inline] pub fn exchange(state: &mut T, source: T) -> T { std::mem::replace(state, source) } - -#[must_use] -pub fn generate_keypair() -> Vec { - let mut value = rand::string(8).as_bytes().to_vec(); - value.push(0xFF); - value.extend_from_slice( - &ruma::signatures::Ed25519KeyPair::generate().expect("Ed25519KeyPair generation always works (?)"), - ); - value -} diff --git a/src/database/map.rs b/src/database/map.rs index a3cf32d4..cac20d6a 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -1,7 +1,10 @@ mod count; +mod get; +mod insert; mod keys; mod keys_from; mod keys_prefix; +mod remove; mod rev_keys; mod rev_keys_from; mod rev_keys_prefix; @@ -18,23 +21,14 @@ use std::{ fmt, fmt::{Debug, Display}, future::Future, - io::Write, pin::Pin, sync::Arc, }; -use conduit::{err, Result}; -use futures::future; -use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteBatchWithTransaction, WriteOptions}; -use serde::Serialize; +use conduit::Result; +use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions}; -use crate::{ - keyval::{OwnedKey, OwnedVal}, - ser, - util::{map_err, or_else}, - watchers::Watchers, - Engine, Handle, -}; +use crate::{watchers::Watchers, Engine}; pub struct Map { name: String, @@ -57,146 +51,6 @@ impl Map { })) } - #[tracing::instrument(skip(self), fields(%self), level = "trace")] - pub fn del(&self, key: &K) - where - K: Serialize + ?Sized + Debug, - { - let mut buf = Vec::::with_capacity(64); - self.bdel(key, &mut buf); - } - - #[tracing::instrument(skip(self, buf), fields(%self), level = "trace")] - pub fn bdel(&self, key: &K, buf: &mut B) - where - K: Serialize + ?Sized + Debug, - B: Write + AsRef<[u8]>, - { - let key = ser::serialize(buf, key).expect("failed to serialize deletion key"); - self.remove(&key); - } - - #[tracing::instrument(level = "trace")] - pub fn remove(&self, key: &K) - where - K: AsRef<[u8]> + ?Sized + Debug, - { - let write_options = &self.write_options; - self.db - .db - .delete_cf_opt(&self.cf(), key, write_options) - .or_else(or_else) - .expect("database remove error"); - - if !self.db.corked() { - self.db.flush().expect("database flush error"); - } - } - - #[tracing::instrument(skip(self, value), fields(%self), level = "trace")] - pub fn insert(&self, key: &K, value: &V) - where - K: AsRef<[u8]> + ?Sized + Debug, - V: AsRef<[u8]> + ?Sized, - { - let write_options = &self.write_options; - self.db - .db - .put_cf_opt(&self.cf(), key, value, write_options) - .or_else(or_else) - .expect("database insert error"); - - if !self.db.corked() { - self.db.flush().expect("database flush error"); - } - - self.watchers.wake(key.as_ref()); - } - - #[tracing::instrument(skip(self), fields(%self), level = "trace")] - pub fn insert_batch<'a, I, K, V>(&'a self, iter: I) - where - I: Iterator + Send + Debug, - K: AsRef<[u8]> + Sized + Debug + 'a, - V: AsRef<[u8]> + Sized + 'a, - { - let mut batch = WriteBatchWithTransaction::::default(); - for (key, val) in iter { - batch.put_cf(&self.cf(), key.as_ref(), val.as_ref()); - } - - let write_options = &self.write_options; - self.db - .db - .write_opt(batch, write_options) - .or_else(or_else) - .expect("database insert batch error"); - - if !self.db.corked() { - self.db.flush().expect("database flush error"); - } - } - - #[tracing::instrument(skip(self), fields(%self), level = "trace")] - pub fn qry(&self, key: &K) -> impl Future>> + Send - where - K: Serialize + ?Sized + Debug, - { - let mut buf = Vec::::with_capacity(64); - self.bqry(key, &mut buf) - } - - #[tracing::instrument(skip(self, buf), fields(%self), level = "trace")] - pub fn bqry(&self, key: &K, buf: &mut B) -> impl Future>> + Send - where - K: Serialize + ?Sized + Debug, - B: Write + AsRef<[u8]>, - { - let key = ser::serialize(buf, key).expect("failed to serialize query key"); - let val = self.get(key); - future::ready(val) - } - - #[tracing::instrument(skip(self), fields(%self), level = "trace")] - pub fn get(&self, key: &K) -> Result> - where - K: AsRef<[u8]> + ?Sized + Debug, - { - self.db - .db - .get_pinned_cf_opt(&self.cf(), key, &self.read_options) - .map_err(map_err)? - .map(Handle::from) - .ok_or(err!(Request(NotFound("Not found in database")))) - } - - #[tracing::instrument(skip(self), fields(%self), level = "trace")] - pub fn multi_get<'a, I, K>(&self, keys: I) -> Vec> - where - I: Iterator + ExactSizeIterator + Send + Debug, - K: AsRef<[u8]> + Sized + Debug + 'a, - { - // Optimization can be `true` if key vector is pre-sorted **by the column - // comparator**. - const SORTED: bool = false; - - let mut ret: Vec> = Vec::with_capacity(keys.len()); - let read_options = &self.read_options; - for res in self - .db - .db - .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options) - { - match res { - Ok(Some(res)) => ret.push(Some((*res).to_vec())), - Ok(None) => ret.push(None), - Err(e) => or_else(e).expect("database multiget error"), - } - } - - ret - } - #[inline] pub fn watch_prefix<'a, K>(&'a self, prefix: &K) -> Pin + Send + 'a>> where @@ -230,10 +84,7 @@ fn open(db: &Arc, name: &str) -> Result> { let bounded_ptr = Arc::into_raw(bounded_arc); let cf_ptr = bounded_ptr.cast::(); - // SAFETY: After thorough contemplation this appears to be the best solution, - // even by a significant margin. - // - // BACKGROUND: Column family handles out of RocksDB are basic pointers and can + // SAFETY: Column family handles out of RocksDB are basic pointers and can // be invalidated: 1. when the database closes. 2. when the column is dropped or // closed. rust_rocksdb wraps this for us by storing handles in their own // `RwLock` map and returning an Arc>` to diff --git a/src/database/map/get.rs b/src/database/map/get.rs new file mode 100644 index 00000000..b4d6a6ea --- /dev/null +++ b/src/database/map/get.rs @@ -0,0 +1,82 @@ +use std::{convert::AsRef, fmt::Debug, future::Future, io::Write}; + +use conduit::{err, implement, Result}; +use futures::future::ready; +use serde::Serialize; + +use crate::{ + keyval::{OwnedKey, OwnedVal}, + ser, + util::{map_err, or_else}, + Handle, +}; + +#[implement(super::Map)] +pub fn qry(&self, key: &K) -> impl Future>> + Send +where + K: Serialize + ?Sized + Debug, +{ + let mut buf = Vec::::with_capacity(64); + self.bqry(key, &mut buf) +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")] +pub fn bqry(&self, key: &K, buf: &mut B) -> impl Future>> + Send +where + K: Serialize + ?Sized + Debug, + B: Write + AsRef<[u8]>, +{ + let key = ser::serialize(buf, key).expect("failed to serialize query key"); + self.get(key) +} + +#[implement(super::Map)] +pub fn get(&self, key: &K) -> impl Future>> + Send +where + K: AsRef<[u8]> + ?Sized + Debug, +{ + ready(self.get_blocking(key)) +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, key), fields(%self), level = "trace")] +pub fn get_blocking(&self, key: &K) -> Result> +where + K: AsRef<[u8]> + ?Sized + Debug, +{ + self.db + .db + .get_pinned_cf_opt(&self.cf(), key, &self.read_options) + .map_err(map_err)? + .map(Handle::from) + .ok_or(err!(Request(NotFound("Not found in database")))) +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")] +pub fn get_batch_blocking<'a, I, K>(&self, keys: I) -> Vec> +where + I: Iterator + ExactSizeIterator + Send + Debug, + K: AsRef<[u8]> + Sized + Debug + 'a, +{ + // Optimization can be `true` if key vector is pre-sorted **by the column + // comparator**. + const SORTED: bool = false; + + let mut ret: Vec> = Vec::with_capacity(keys.len()); + let read_options = &self.read_options; + for res in self + .db + .db + .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options) + { + match res { + Ok(Some(res)) => ret.push(Some((*res).to_vec())), + Ok(None) => ret.push(None), + Err(e) => or_else(e).expect("database multiget error"), + } + } + + ret +} diff --git a/src/database/map/insert.rs b/src/database/map/insert.rs new file mode 100644 index 00000000..953c9c94 --- /dev/null +++ b/src/database/map/insert.rs @@ -0,0 +1,52 @@ +use std::{convert::AsRef, fmt::Debug}; + +use conduit::implement; +use rocksdb::WriteBatchWithTransaction; + +use crate::util::or_else; + +#[implement(super::Map)] +#[tracing::instrument(skip(self, value), fields(%self), level = "trace")] +pub fn insert(&self, key: &K, value: &V) +where + K: AsRef<[u8]> + ?Sized + Debug, + V: AsRef<[u8]> + ?Sized, +{ + let write_options = &self.write_options; + self.db + .db + .put_cf_opt(&self.cf(), key, value, write_options) + .or_else(or_else) + .expect("database insert error"); + + if !self.db.corked() { + self.db.flush().expect("database flush error"); + } + + self.watchers.wake(key.as_ref()); +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, iter), fields(%self), level = "trace")] +pub fn insert_batch<'a, I, K, V>(&'a self, iter: I) +where + I: Iterator + Send + Debug, + K: AsRef<[u8]> + Sized + Debug + 'a, + V: AsRef<[u8]> + Sized + 'a, +{ + let mut batch = WriteBatchWithTransaction::::default(); + for (key, val) in iter { + batch.put_cf(&self.cf(), key.as_ref(), val.as_ref()); + } + + let write_options = &self.write_options; + self.db + .db + .write_opt(batch, write_options) + .or_else(or_else) + .expect("database insert batch error"); + + if !self.db.corked() { + self.db.flush().expect("database flush error"); + } +} diff --git a/src/database/map/remove.rs b/src/database/map/remove.rs new file mode 100644 index 00000000..fcf7587e --- /dev/null +++ b/src/database/map/remove.rs @@ -0,0 +1,44 @@ +use std::{convert::AsRef, fmt::Debug, io::Write}; + +use conduit::implement; +use serde::Serialize; + +use crate::{ser, util::or_else}; + +#[implement(super::Map)] +pub fn del(&self, key: &K) +where + K: Serialize + ?Sized + Debug, +{ + let mut buf = Vec::::with_capacity(64); + self.bdel(key, &mut buf); +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")] +pub fn bdel(&self, key: &K, buf: &mut B) +where + K: Serialize + ?Sized + Debug, + B: Write + AsRef<[u8]>, +{ + let key = ser::serialize(buf, key).expect("failed to serialize deletion key"); + self.remove(key); +} + +#[implement(super::Map)] +#[tracing::instrument(skip(self, key), fields(%self), level = "trace")] +pub fn remove(&self, key: &K) +where + K: AsRef<[u8]> + ?Sized + Debug, +{ + let write_options = &self.write_options; + self.db + .db + .delete_cf_opt(&self.cf(), key, write_options) + .or_else(or_else) + .expect("database remove error"); + + if !self.db.corked() { + self.db.flush().expect("database flush error"); + } +} diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 76f97944..5332f07d 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, RwLock}, }; -use conduit::{trace, utils, Error, Result, Server}; +use conduit::{trace, utils, utils::rand, Error, Result, Server}; use database::{Database, Deserialized, Map}; use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; use ruma::{ @@ -102,7 +102,7 @@ impl Data { fn stored_count(global: &Arc) -> Result { global - .get(COUNTER) + .get_blocking(COUNTER) .as_deref() .map_or(Ok(0_u64), utils::u64_from_bytes) } @@ -206,17 +206,23 @@ impl Data { } pub fn load_keypair(&self) -> Result { - let keypair_bytes = self.global.get(b"keypair").map_or_else( - |_| { - let keypair = utils::generate_keypair(); - self.global.insert(b"keypair", &keypair); - Ok::<_, Error>(keypair) - }, - |val| Ok(val.to_vec()), - )?; + let generate = |_| { + let keypair = Ed25519KeyPair::generate().expect("Ed25519KeyPair generation always works (?)"); + + let mut value = rand::string(8).as_bytes().to_vec(); + value.push(0xFF); + value.extend_from_slice(&keypair); + + self.global.insert(b"keypair", &value); + value + }; + + let keypair_bytes: Vec = self + .global + .get_blocking(b"keypair") + .map_or_else(generate, Into::into); let mut parts = keypair_bytes.splitn(2, |&b| b == 0xFF); - utils::string_from_bytes( // 1. version parts diff --git a/src/service/rooms/short/data.rs b/src/service/rooms/short/data.rs index f6a82488..fff3f2d6 100644 --- a/src/service/rooms/short/data.rs +++ b/src/service/rooms/short/data.rs @@ -59,7 +59,7 @@ impl Data { for (i, short) in self .eventid_shorteventid - .multi_get(keys.iter()) + .get_batch_blocking(keys.iter()) .iter() .enumerate() { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 314dcb9f..1f9dad1d 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -326,7 +326,7 @@ pub(super) fn pdu_count(pdu_id: &[u8]) -> PduCount { //TODO: this is an ABA fn increment(db: &Arc, key: &[u8]) { - let old = db.get(key); + let old = db.get_blocking(key); let new = utils::increment(old.ok().as_deref()); db.insert(key, &new); } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index ca37ed9d..fa8c41b6 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -1000,7 +1000,7 @@ where //TODO: this is an ABA fn increment(db: &Arc, key: &[u8]) { - let old = db.get(key); + let old = db.get_blocking(key); let new = utils::increment(old.ok().as_deref()); db.insert(key, &new); }