consolidate key/value types; consistent interface arguments

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-02 09:51:00 +00:00
parent 46423cab4f
commit a2d25215a3
8 changed files with 172 additions and 79 deletions

View file

@ -1,11 +1,16 @@
use std::{future::Future, pin::Pin, sync::Arc};
use std::{future::Future, mem::size_of, pin::Pin, sync::Arc};
use conduit::{utils, Result};
use rocksdb::{
AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions,
};
use crate::{or_else, result, watchers::Watchers, Engine, Handle, Iter};
use crate::{
or_else, result,
slice::{Byte, Key, KeyVal, OwnedKey, OwnedKeyValPair, OwnedVal, Val},
watchers::Watchers,
Engine, Handle, Iter,
};
pub struct Map {
name: String,
@ -16,9 +21,7 @@ pub struct Map {
read_options: ReadOptions,
}
pub(crate) type KeyVal = (Key, Val);
pub(crate) type Val = Vec<u8>;
pub(crate) type Key = Vec<u8>;
type OwnedKeyValPairIter<'a> = Box<dyn Iterator<Item = OwnedKeyValPair> + Send + 'a>;
impl Map {
pub(crate) fn open(db: &Arc<Engine>, name: &str) -> Result<Arc<Self>> {
@ -32,19 +35,19 @@ impl Map {
}))
}
pub fn get(&self, key: &[u8]) -> Result<Option<Handle<'_>>> {
pub fn get(&self, key: &Key) -> Result<Option<Handle<'_>>> {
let read_options = &self.read_options;
let res = self.db.db.get_pinned_cf_opt(&self.cf(), key, read_options);
Ok(result(res)?.map(Handle::from))
}
pub fn multi_get(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>> {
pub fn multi_get(&self, keys: &[&Key]) -> Result<Vec<Option<OwnedVal>>> {
// Optimization can be `true` if key vector is pre-sorted **by the column
// comparator**.
const SORTED: bool = false;
let mut ret: Vec<Option<Vec<u8>>> = Vec::with_capacity(keys.len());
let mut ret: Vec<Option<OwnedKey>> = Vec::with_capacity(keys.len());
let read_options = &self.read_options;
for res in self
.db
@ -61,7 +64,7 @@ impl Map {
Ok(ret)
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
pub fn insert(&self, key: &Key, value: &Val) -> Result<()> {
let write_options = &self.write_options;
self.db
.db
@ -77,9 +80,12 @@ impl Map {
Ok(())
}
pub fn insert_batch(&self, iter: &mut dyn Iterator<Item = KeyVal>) -> Result<()> {
pub fn insert_batch<'a, I>(&'a self, iter: I) -> Result<()>
where
I: Iterator<Item = KeyVal<'a>>,
{
let mut batch = WriteBatchWithTransaction::<false>::default();
for (key, value) in iter {
for KeyVal(key, value) in iter {
batch.put_cf(&self.cf(), key, value);
}
@ -93,7 +99,7 @@ impl Map {
result(res)
}
pub fn remove(&self, key: &[u8]) -> Result<()> {
pub fn remove(&self, key: &Key) -> Result<()> {
let write_options = &self.write_options;
let res = self.db.db.delete_cf_opt(&self.cf(), key, write_options);
@ -104,7 +110,10 @@ impl Map {
result(res)
}
pub fn remove_batch(&self, iter: &mut dyn Iterator<Item = Key>) -> Result<()> {
pub fn remove_batch<'a, I>(&'a self, iter: I) -> Result<()>
where
I: Iterator<Item = &'a Key>,
{
let mut batch = WriteBatchWithTransaction::<false>::default();
for key in iter {
batch.delete_cf(&self.cf(), key);
@ -120,14 +129,14 @@ impl Map {
result(res)
}
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
pub fn iter(&self) -> OwnedKeyValPairIter<'_> {
let mode = IteratorMode::Start;
let read_options = read_options_default();
Box::new(Iter::new(&self.db, &self.cf, read_options, &mode))
}
pub fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
let direction = if backwards {
pub fn iter_from(&self, from: &Key, reverse: bool) -> OwnedKeyValPairIter<'_> {
let direction = if reverse {
Direction::Reverse
} else {
Direction::Forward
@ -137,13 +146,13 @@ impl Map {
Box::new(Iter::new(&self.db, &self.cf, read_options, &mode))
}
pub fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
pub fn scan_prefix(&self, prefix: OwnedKey) -> OwnedKeyValPairIter<'_> {
let mode = IteratorMode::From(&prefix, Direction::Forward);
let read_options = read_options_default();
Box::new(Iter::new(&self.db, &self.cf, read_options, &mode).take_while(move |(k, _)| k.starts_with(&prefix)))
}
pub fn increment(&self, key: &[u8]) -> Result<[u8; 8]> {
pub fn increment(&self, key: &Key) -> Result<[Byte; size_of::<u64>()]> {
let old = self.get(key)?;
let new = utils::increment(old.as_deref());
self.insert(key, &new)?;
@ -155,10 +164,13 @@ impl Map {
Ok(new)
}
pub fn increment_batch(&self, iter: &mut dyn Iterator<Item = Val>) -> Result<()> {
pub fn increment_batch<'a, I>(&'a self, iter: I) -> Result<()>
where
I: Iterator<Item = &'a Key>,
{
let mut batch = WriteBatchWithTransaction::<false>::default();
for key in iter {
let old = self.get(&key)?;
let old = self.get(key)?;
let new = utils::increment(old.as_deref());
batch.put_cf(&self.cf(), key, new);
}
@ -173,7 +185,7 @@ impl Map {
result(res)
}
pub fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
pub fn watch_prefix<'a>(&'a self, prefix: &Key) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
self.watchers.watch(prefix)
}
@ -184,8 +196,8 @@ impl Map {
}
impl<'a> IntoIterator for &'a Map {
type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'a>;
type Item = KeyVal;
type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send + 'a>;
type Item = OwnedKeyValPair;
fn into_iter(self) -> Self::IntoIter { self.iter() }
}