split remaining map suites

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-09-29 07:37:43 +00:00 committed by strawberry
parent 4496cf2d5b
commit 5192927a53
9 changed files with 205 additions and 180 deletions

View file

@ -35,13 +35,3 @@ pub use self::{
#[inline] #[inline]
pub fn exchange<T>(state: &mut T, source: T) -> T { std::mem::replace(state, source) } pub fn exchange<T>(state: &mut T, source: T) -> T { std::mem::replace(state, source) }
#[must_use]
pub fn generate_keypair() -> Vec<u8> {
let mut value = rand::string(8).as_bytes().to_vec();
value.push(0xFF);
value.extend_from_slice(
&ruma::signatures::Ed25519KeyPair::generate().expect("Ed25519KeyPair generation always works (?)"),
);
value
}

View file

@ -1,7 +1,10 @@
mod count; mod count;
mod get;
mod insert;
mod keys; mod keys;
mod keys_from; mod keys_from;
mod keys_prefix; mod keys_prefix;
mod remove;
mod rev_keys; mod rev_keys;
mod rev_keys_from; mod rev_keys_from;
mod rev_keys_prefix; mod rev_keys_prefix;
@ -18,23 +21,14 @@ use std::{
fmt, fmt,
fmt::{Debug, Display}, fmt::{Debug, Display},
future::Future, future::Future,
io::Write,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
}; };
use conduit::{err, Result}; use conduit::Result;
use futures::future; use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions};
use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteBatchWithTransaction, WriteOptions};
use serde::Serialize;
use crate::{ use crate::{watchers::Watchers, Engine};
keyval::{OwnedKey, OwnedVal},
ser,
util::{map_err, or_else},
watchers::Watchers,
Engine, Handle,
};
pub struct Map { pub struct Map {
name: String, name: String,
@ -57,146 +51,6 @@ impl Map {
})) }))
} }
#[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn del<K>(&self, key: &K)
where
K: Serialize + ?Sized + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
self.bdel(key, &mut buf);
}
#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")]
pub fn bdel<K, B>(&self, key: &K, buf: &mut B)
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
{
let key = ser::serialize(buf, key).expect("failed to serialize deletion key");
self.remove(&key);
}
#[tracing::instrument(level = "trace")]
pub fn remove<K>(&self, key: &K)
where
K: AsRef<[u8]> + ?Sized + Debug,
{
let write_options = &self.write_options;
self.db
.db
.delete_cf_opt(&self.cf(), key, write_options)
.or_else(or_else)
.expect("database remove error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
}
}
#[tracing::instrument(skip(self, value), fields(%self), level = "trace")]
pub fn insert<K, V>(&self, key: &K, value: &V)
where
K: AsRef<[u8]> + ?Sized + Debug,
V: AsRef<[u8]> + ?Sized,
{
let write_options = &self.write_options;
self.db
.db
.put_cf_opt(&self.cf(), key, value, write_options)
.or_else(or_else)
.expect("database insert error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
}
self.watchers.wake(key.as_ref());
}
#[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn insert_batch<'a, I, K, V>(&'a self, iter: I)
where
I: Iterator<Item = &'a (K, V)> + Send + Debug,
K: AsRef<[u8]> + Sized + Debug + 'a,
V: AsRef<[u8]> + Sized + 'a,
{
let mut batch = WriteBatchWithTransaction::<false>::default();
for (key, val) in iter {
batch.put_cf(&self.cf(), key.as_ref(), val.as_ref());
}
let write_options = &self.write_options;
self.db
.db
.write_opt(batch, write_options)
.or_else(or_else)
.expect("database insert batch error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
}
}
#[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn qry<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
self.bqry(key, &mut buf)
}
#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")]
pub fn bqry<K, B>(&self, key: &K, buf: &mut B) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
{
let key = ser::serialize(buf, key).expect("failed to serialize query key");
let val = self.get(key);
future::ready(val)
}
#[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn get<K>(&self, key: &K) -> Result<Handle<'_>>
where
K: AsRef<[u8]> + ?Sized + Debug,
{
self.db
.db
.get_pinned_cf_opt(&self.cf(), key, &self.read_options)
.map_err(map_err)?
.map(Handle::from)
.ok_or(err!(Request(NotFound("Not found in database"))))
}
#[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn multi_get<'a, I, K>(&self, keys: I) -> Vec<Option<OwnedVal>>
where
I: Iterator<Item = &'a K> + ExactSizeIterator + Send + Debug,
K: AsRef<[u8]> + Sized + Debug + 'a,
{
// Optimization can be `true` if key vector is pre-sorted **by the column
// comparator**.
const SORTED: bool = false;
let mut ret: Vec<Option<OwnedKey>> = Vec::with_capacity(keys.len());
let read_options = &self.read_options;
for res in self
.db
.db
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
{
match res {
Ok(Some(res)) => ret.push(Some((*res).to_vec())),
Ok(None) => ret.push(None),
Err(e) => or_else(e).expect("database multiget error"),
}
}
ret
}
#[inline] #[inline]
pub fn watch_prefix<'a, K>(&'a self, prefix: &K) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> pub fn watch_prefix<'a, K>(&'a self, prefix: &K) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
where where
@ -230,10 +84,7 @@ fn open(db: &Arc<Engine>, name: &str) -> Result<Arc<ColumnFamily>> {
let bounded_ptr = Arc::into_raw(bounded_arc); let bounded_ptr = Arc::into_raw(bounded_arc);
let cf_ptr = bounded_ptr.cast::<ColumnFamily>(); let cf_ptr = bounded_ptr.cast::<ColumnFamily>();
// SAFETY: After thorough contemplation this appears to be the best solution, // SAFETY: Column family handles out of RocksDB are basic pointers and can
// even by a significant margin.
//
// BACKGROUND: Column family handles out of RocksDB are basic pointers and can
// be invalidated: 1. when the database closes. 2. when the column is dropped or // be invalidated: 1. when the database closes. 2. when the column is dropped or
// closed. rust_rocksdb wraps this for us by storing handles in their own // closed. rust_rocksdb wraps this for us by storing handles in their own
// `RwLock<BTreeMap>` map and returning an Arc<BoundColumnFamily<'_>>` to // `RwLock<BTreeMap>` map and returning an Arc<BoundColumnFamily<'_>>` to

82
src/database/map/get.rs Normal file
View file

@ -0,0 +1,82 @@
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write};
use conduit::{err, implement, Result};
use futures::future::ready;
use serde::Serialize;
use crate::{
keyval::{OwnedKey, OwnedVal},
ser,
util::{map_err, or_else},
Handle,
};
#[implement(super::Map)]
pub fn qry<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
self.bqry(key, &mut buf)
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")]
pub fn bqry<K, B>(&self, key: &K, buf: &mut B) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
{
let key = ser::serialize(buf, key).expect("failed to serialize query key");
self.get(key)
}
#[implement(super::Map)]
pub fn get<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: AsRef<[u8]> + ?Sized + Debug,
{
ready(self.get_blocking(key))
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
where
K: AsRef<[u8]> + ?Sized + Debug,
{
self.db
.db
.get_pinned_cf_opt(&self.cf(), key, &self.read_options)
.map_err(map_err)?
.map(Handle::from)
.ok_or(err!(Request(NotFound("Not found in database"))))
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")]
pub fn get_batch_blocking<'a, I, K>(&self, keys: I) -> Vec<Option<OwnedVal>>
where
I: Iterator<Item = &'a K> + ExactSizeIterator + Send + Debug,
K: AsRef<[u8]> + Sized + Debug + 'a,
{
// Optimization can be `true` if key vector is pre-sorted **by the column
// comparator**.
const SORTED: bool = false;
let mut ret: Vec<Option<OwnedKey>> = Vec::with_capacity(keys.len());
let read_options = &self.read_options;
for res in self
.db
.db
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
{
match res {
Ok(Some(res)) => ret.push(Some((*res).to_vec())),
Ok(None) => ret.push(None),
Err(e) => or_else(e).expect("database multiget error"),
}
}
ret
}

View file

@ -0,0 +1,52 @@
use std::{convert::AsRef, fmt::Debug};
use conduit::implement;
use rocksdb::WriteBatchWithTransaction;
use crate::util::or_else;
#[implement(super::Map)]
#[tracing::instrument(skip(self, value), fields(%self), level = "trace")]
pub fn insert<K, V>(&self, key: &K, value: &V)
where
K: AsRef<[u8]> + ?Sized + Debug,
V: AsRef<[u8]> + ?Sized,
{
let write_options = &self.write_options;
self.db
.db
.put_cf_opt(&self.cf(), key, value, write_options)
.or_else(or_else)
.expect("database insert error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
}
self.watchers.wake(key.as_ref());
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, iter), fields(%self), level = "trace")]
pub fn insert_batch<'a, I, K, V>(&'a self, iter: I)
where
I: Iterator<Item = &'a (K, V)> + Send + Debug,
K: AsRef<[u8]> + Sized + Debug + 'a,
V: AsRef<[u8]> + Sized + 'a,
{
let mut batch = WriteBatchWithTransaction::<false>::default();
for (key, val) in iter {
batch.put_cf(&self.cf(), key.as_ref(), val.as_ref());
}
let write_options = &self.write_options;
self.db
.db
.write_opt(batch, write_options)
.or_else(or_else)
.expect("database insert batch error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
}
}

View file

@ -0,0 +1,44 @@
use std::{convert::AsRef, fmt::Debug, io::Write};
use conduit::implement;
use serde::Serialize;
use crate::{ser, util::or_else};
#[implement(super::Map)]
pub fn del<K>(&self, key: &K)
where
K: Serialize + ?Sized + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
self.bdel(key, &mut buf);
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, buf), fields(%self), level = "trace")]
pub fn bdel<K, B>(&self, key: &K, buf: &mut B)
where
K: Serialize + ?Sized + Debug,
B: Write + AsRef<[u8]>,
{
let key = ser::serialize(buf, key).expect("failed to serialize deletion key");
self.remove(key);
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
pub fn remove<K>(&self, key: &K)
where
K: AsRef<[u8]> + ?Sized + Debug,
{
let write_options = &self.write_options;
self.db
.db
.delete_cf_opt(&self.cf(), key, write_options)
.or_else(or_else)
.expect("database remove error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
}
}

View file

@ -3,7 +3,7 @@ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use conduit::{trace, utils, Error, Result, Server}; use conduit::{trace, utils, utils::rand, Error, Result, Server};
use database::{Database, Deserialized, Map}; use database::{Database, Deserialized, Map};
use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
use ruma::{ use ruma::{
@ -102,7 +102,7 @@ impl Data {
fn stored_count(global: &Arc<Map>) -> Result<u64> { fn stored_count(global: &Arc<Map>) -> Result<u64> {
global global
.get(COUNTER) .get_blocking(COUNTER)
.as_deref() .as_deref()
.map_or(Ok(0_u64), utils::u64_from_bytes) .map_or(Ok(0_u64), utils::u64_from_bytes)
} }
@ -206,17 +206,23 @@ impl Data {
} }
pub fn load_keypair(&self) -> Result<Ed25519KeyPair> { pub fn load_keypair(&self) -> Result<Ed25519KeyPair> {
let keypair_bytes = self.global.get(b"keypair").map_or_else( let generate = |_| {
|_| { let keypair = Ed25519KeyPair::generate().expect("Ed25519KeyPair generation always works (?)");
let keypair = utils::generate_keypair();
self.global.insert(b"keypair", &keypair); let mut value = rand::string(8).as_bytes().to_vec();
Ok::<_, Error>(keypair) value.push(0xFF);
}, value.extend_from_slice(&keypair);
|val| Ok(val.to_vec()),
)?; self.global.insert(b"keypair", &value);
value
};
let keypair_bytes: Vec<u8> = self
.global
.get_blocking(b"keypair")
.map_or_else(generate, Into::into);
let mut parts = keypair_bytes.splitn(2, |&b| b == 0xFF); let mut parts = keypair_bytes.splitn(2, |&b| b == 0xFF);
utils::string_from_bytes( utils::string_from_bytes(
// 1. version // 1. version
parts parts

View file

@ -59,7 +59,7 @@ impl Data {
for (i, short) in self for (i, short) in self
.eventid_shorteventid .eventid_shorteventid
.multi_get(keys.iter()) .get_batch_blocking(keys.iter())
.iter() .iter()
.enumerate() .enumerate()
{ {

View file

@ -326,7 +326,7 @@ pub(super) fn pdu_count(pdu_id: &[u8]) -> PduCount {
//TODO: this is an ABA //TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) { fn increment(db: &Arc<Map>, key: &[u8]) {
let old = db.get(key); let old = db.get_blocking(key);
let new = utils::increment(old.ok().as_deref()); let new = utils::increment(old.ok().as_deref());
db.insert(key, &new); db.insert(key, &new);
} }

View file

@ -1000,7 +1000,7 @@ where
//TODO: this is an ABA //TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) { fn increment(db: &Arc<Map>, key: &[u8]) {
let old = db.get(key); let old = db.get_blocking(key);
let new = utils::increment(old.ok().as_deref()); let new = utils::increment(old.ok().as_deref());
db.insert(key, &new); db.insert(key, &new);
} }