check cache prior to offloading iterator seek

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-17 10:43:14 +00:00 committed by strawberry
parent f54a62dda0
commit ad8cbcaac1
7 changed files with 96 additions and 5 deletions

View file

@ -5,6 +5,7 @@ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use rocksdb::Direction; use rocksdb::Direction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::stream_from::is_cached;
use crate::{ use crate::{
keyval::{result_deserialize_key, serialize_key, Key}, keyval::{result_deserialize_key, serialize_key, Key},
stream, stream,
@ -54,6 +55,10 @@ where
let opts = super::iter_options_default(); let opts = super::iter_options_default();
let state = stream::State::new(&self.db, &self.cf, opts); let state = stream::State::new(&self.db, &self.cf, opts);
if is_cached(self, from) {
return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed();
}
let seek = Seek { let seek = Seek {
map: self.clone(), map: self.clone(),
dir: Direction::Forward, dir: Direction::Forward,

View file

@ -5,6 +5,7 @@ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use rocksdb::Direction; use rocksdb::Direction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::rev_stream_from::is_cached;
use crate::{ use crate::{
keyval::{result_deserialize_key, serialize_key, Key}, keyval::{result_deserialize_key, serialize_key, Key},
stream, stream,
@ -62,6 +63,10 @@ where
let opts = super::iter_options_default(); let opts = super::iter_options_default();
let state = stream::State::new(&self.db, &self.cf, opts); let state = stream::State::new(&self.db, &self.cf, opts);
if is_cached(self, from) {
return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
}
let seek = Seek { let seek = Seek {
map: self.clone(), map: self.clone(),
dir: Direction::Reverse, dir: Direction::Reverse,

View file

@ -26,3 +26,20 @@ pub fn rev_raw_stream(&self) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
let opts = super::iter_options_default(); let opts = super::iter_options_default();
stream::ItemsRev::new(&self.db, &self.cf, opts).init(None) stream::ItemsRev::new(&self.db, &self.cf, opts).init(None)
} }
#[tracing::instrument(
name = "cached",
level = "trace",
skip_all,
fields(%map),
)]
pub(super) fn _is_cached<P>(map: &super::Map) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let opts = super::cache_read_options_default();
let mut state = stream::State::new(&map.db, &map.cf, opts);
state.seek_rev();
!state.is_incomplete()
}

View file

@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
use crate::{ use crate::{
keyval::{result_deserialize, serialize_key, KeyVal}, keyval::{result_deserialize, serialize_key, KeyVal},
stream, stream,
util::is_incomplete,
}; };
/// Iterate key-value entries in the map starting from upper-bound. /// Iterate key-value entries in the map starting from upper-bound.
@ -83,6 +84,10 @@ where
let opts = super::iter_options_default(); let opts = super::iter_options_default();
let state = stream::State::new(&self.db, &self.cf, opts); let state = stream::State::new(&self.db, &self.cf, opts);
if is_cached(self, from) {
return stream::ItemsRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
};
let seek = Seek { let seek = Seek {
map: self.clone(), map: self.clone(),
dir: Direction::Reverse, dir: Direction::Reverse,
@ -99,3 +104,21 @@ where
.try_flatten() .try_flatten()
.boxed() .boxed()
} }
#[tracing::instrument(
name = "cached",
level = "trace",
skip(map, from),
fields(%map),
)]
pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let cache_opts = super::cache_read_options_default();
let cache_status = stream::State::new(&map.db, &map.cf, cache_opts)
.init_rev(from.as_ref().into())
.status();
!matches!(cache_status, Some(e) if is_incomplete(&e))
}

View file

@ -25,3 +25,20 @@ pub fn raw_stream(&self) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
let opts = super::iter_options_default(); let opts = super::iter_options_default();
stream::Items::new(&self.db, &self.cf, opts).init(None) stream::Items::new(&self.db, &self.cf, opts).init(None)
} }
#[tracing::instrument(
name = "cached",
level = "trace",
skip_all,
fields(%map),
)]
pub(super) fn _is_cached<P>(map: &super::Map) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let opts = super::cache_read_options_default();
let mut state = stream::State::new(&map.db, &map.cf, opts);
state.seek_fwd();
!state.is_incomplete()
}

View file

@ -81,6 +81,10 @@ where
let opts = super::read_options_default(); let opts = super::read_options_default();
let state = stream::State::new(&self.db, &self.cf, opts); let state = stream::State::new(&self.db, &self.cf, opts);
if is_cached(self, from) {
return stream::Items::<'_>::from(state.init_fwd(from.as_ref().into())).boxed();
};
let seek = Seek { let seek = Seek {
map: self.clone(), map: self.clone(),
dir: Direction::Forward, dir: Direction::Forward,
@ -97,3 +101,19 @@ where
.try_flatten() .try_flatten()
.boxed() .boxed()
} }
#[tracing::instrument(
name = "cached",
level = "trace",
skip(map, from),
fields(%map),
)]
pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let opts = super::cache_read_options_default();
let state = stream::State::new(&map.db, &map.cf, opts).init_fwd(from.as_ref().into());
!state.is_incomplete()
}

View file

@ -5,14 +5,14 @@ mod keys_rev;
use std::sync::Arc; use std::sync::Arc;
use conduwuit::{utils::exchange, Error, Result}; use conduwuit::{utils::exchange, Result};
use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, ReadOptions}; use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, ReadOptions};
pub(crate) use self::{items::Items, items_rev::ItemsRev, keys::Keys, keys_rev::KeysRev}; pub(crate) use self::{items::Items, items_rev::ItemsRev, keys::Keys, keys_rev::KeysRev};
use crate::{ use crate::{
engine::Db, engine::Db,
keyval::{Key, KeyVal, Val}, keyval::{Key, KeyVal, Val},
util::map_err, util::{is_incomplete, map_err},
Engine, Slice, Engine, Slice,
}; };
@ -34,7 +34,7 @@ pub(crate) trait Cursor<'a, T> {
fn get(&self) -> Option<Result<T>> { fn get(&self) -> Option<Result<T>> {
self.fetch() self.fetch()
.map(Ok) .map(Ok)
.or_else(|| self.state().status().map(Err)) .or_else(|| self.state().status().map(map_err).map(Err))
} }
fn seek_and_get(&mut self) -> Option<Result<T>> { fn seek_and_get(&mut self) -> Option<Result<T>> {
@ -91,16 +91,20 @@ impl<'a> State<'a> {
} }
} }
pub(super) fn is_incomplete(&self) -> bool {
matches!(self.status(), Some(e) if is_incomplete(&e))
}
fn fetch_key(&self) -> Option<Key<'_>> { self.inner.key().map(Key::from) } fn fetch_key(&self) -> Option<Key<'_>> { self.inner.key().map(Key::from) }
fn _fetch_val(&self) -> Option<Val<'_>> { self.inner.value().map(Val::from) } fn _fetch_val(&self) -> Option<Val<'_>> { self.inner.value().map(Val::from) }
fn fetch(&self) -> Option<KeyVal<'_>> { self.inner.item().map(KeyVal::from) } fn fetch(&self) -> Option<KeyVal<'_>> { self.inner.item().map(KeyVal::from) }
fn status(&self) -> Option<Error> { self.inner.status().map_err(map_err).err() } pub(super) fn status(&self) -> Option<rocksdb::Error> { self.inner.status().err() }
#[inline] #[inline]
fn valid(&self) -> bool { self.inner.valid() } pub(super) fn valid(&self) -> bool { self.inner.valid() }
} }
fn keyval_longevity<'a, 'b: 'a>(item: KeyVal<'a>) -> KeyVal<'b> { fn keyval_longevity<'a, 'b: 'a>(item: KeyVal<'a>) -> KeyVal<'b> {