From ad8cbcaac1ab77b0b074132c477c7d1943c93919 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 17 Dec 2024 10:43:14 +0000 Subject: [PATCH] check cache prior to offloading iterator seek Signed-off-by: Jason Volk --- src/database/map/keys_from.rs | 5 +++++ src/database/map/rev_keys_from.rs | 5 +++++ src/database/map/rev_stream.rs | 17 +++++++++++++++++ src/database/map/rev_stream_from.rs | 23 +++++++++++++++++++++++ src/database/map/stream.rs | 17 +++++++++++++++++ src/database/map/stream_from.rs | 20 ++++++++++++++++++++ src/database/stream.rs | 14 +++++++++----- 7 files changed, 96 insertions(+), 5 deletions(-) diff --git a/src/database/map/keys_from.rs b/src/database/map/keys_from.rs index 2ffc68df..95c6611b 100644 --- a/src/database/map/keys_from.rs +++ b/src/database/map/keys_from.rs @@ -5,6 +5,7 @@ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; +use super::stream_from::is_cached; use crate::{ keyval::{result_deserialize_key, serialize_key, Key}, stream, @@ -54,6 +55,10 @@ where let opts = super::iter_options_default(); let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self, from) { + return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed(); + } + let seek = Seek { map: self.clone(), dir: Direction::Forward, diff --git a/src/database/map/rev_keys_from.rs b/src/database/map/rev_keys_from.rs index a398f315..e208c505 100644 --- a/src/database/map/rev_keys_from.rs +++ b/src/database/map/rev_keys_from.rs @@ -5,6 +5,7 @@ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; +use super::rev_stream_from::is_cached; use crate::{ keyval::{result_deserialize_key, serialize_key, Key}, stream, @@ -62,6 +63,10 @@ where let opts = super::iter_options_default(); let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self, from) { + return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed(); + } + let seek = Seek { map: self.clone(), dir: Direction::Reverse, diff --git a/src/database/map/rev_stream.rs b/src/database/map/rev_stream.rs index 81359800..7f58582f 100644 --- a/src/database/map/rev_stream.rs +++ b/src/database/map/rev_stream.rs @@ -26,3 +26,20 @@ pub fn rev_raw_stream(&self) -> impl Stream>> + Send { let opts = super::iter_options_default(); stream::ItemsRev::new(&self.db, &self.cf, opts).init(None) } + +#[tracing::instrument( + name = "cached", + level = "trace", + skip_all, + fields(%map), +)] +pub(super) fn _is_cached

(map: &super::Map) -> bool +where + P: AsRef<[u8]> + ?Sized, +{ + let opts = super::cache_read_options_default(); + let mut state = stream::State::new(&map.db, &map.cf, opts); + + state.seek_rev(); + !state.is_incomplete() +} diff --git a/src/database/map/rev_stream_from.rs b/src/database/map/rev_stream_from.rs index 6ddb9bc7..d166aa0f 100644 --- a/src/database/map/rev_stream_from.rs +++ b/src/database/map/rev_stream_from.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use crate::{ keyval::{result_deserialize, serialize_key, KeyVal}, stream, + util::is_incomplete, }; /// Iterate key-value entries in the map starting from upper-bound. @@ -83,6 +84,10 @@ where let opts = super::iter_options_default(); let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self, from) { + return stream::ItemsRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed(); + }; + let seek = Seek { map: self.clone(), dir: Direction::Reverse, @@ -99,3 +104,21 @@ where .try_flatten() .boxed() } + +#[tracing::instrument( + name = "cached", + level = "trace", + skip(map, from), + fields(%map), +)] +pub(super) fn is_cached

(map: &Arc, from: &P) -> bool +where + P: AsRef<[u8]> + ?Sized, +{ + let cache_opts = super::cache_read_options_default(); + let cache_status = stream::State::new(&map.db, &map.cf, cache_opts) + .init_rev(from.as_ref().into()) + .status(); + + !matches!(cache_status, Some(e) if is_incomplete(&e)) +} diff --git a/src/database/map/stream.rs b/src/database/map/stream.rs index c2d9b6b8..1a90b8fb 100644 --- a/src/database/map/stream.rs +++ b/src/database/map/stream.rs @@ -25,3 +25,20 @@ pub fn raw_stream(&self) -> impl Stream>> + Send { let opts = super::iter_options_default(); stream::Items::new(&self.db, &self.cf, opts).init(None) } + +#[tracing::instrument( + name = "cached", + level = "trace", + skip_all, + fields(%map), +)] +pub(super) fn _is_cached

(map: &super::Map) -> bool +where + P: AsRef<[u8]> + ?Sized, +{ + let opts = super::cache_read_options_default(); + let mut state = stream::State::new(&map.db, &map.cf, opts); + + state.seek_fwd(); + !state.is_incomplete() +} diff --git a/src/database/map/stream_from.rs b/src/database/map/stream_from.rs index 1dae9d78..107ce4b1 100644 --- a/src/database/map/stream_from.rs +++ b/src/database/map/stream_from.rs @@ -81,6 +81,10 @@ where let opts = super::read_options_default(); let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self, from) { + return stream::Items::<'_>::from(state.init_fwd(from.as_ref().into())).boxed(); + }; + let seek = Seek { map: self.clone(), dir: Direction::Forward, @@ -97,3 +101,19 @@ where .try_flatten() .boxed() } + +#[tracing::instrument( + name = "cached", + level = "trace", + skip(map, from), + fields(%map), +)] +pub(super) fn is_cached

(map: &Arc, from: &P) -> bool +where + P: AsRef<[u8]> + ?Sized, +{ + let opts = super::cache_read_options_default(); + let state = stream::State::new(&map.db, &map.cf, opts).init_fwd(from.as_ref().into()); + + !state.is_incomplete() +} diff --git a/src/database/stream.rs b/src/database/stream.rs index 775fb930..f849d08f 100644 --- a/src/database/stream.rs +++ b/src/database/stream.rs @@ -5,14 +5,14 @@ mod keys_rev; use std::sync::Arc; -use conduwuit::{utils::exchange, Error, Result}; +use conduwuit::{utils::exchange, Result}; use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, ReadOptions}; pub(crate) use self::{items::Items, items_rev::ItemsRev, keys::Keys, keys_rev::KeysRev}; use crate::{ engine::Db, keyval::{Key, KeyVal, Val}, - util::map_err, + util::{is_incomplete, map_err}, Engine, Slice, }; @@ -34,7 +34,7 @@ pub(crate) trait Cursor<'a, T> { fn get(&self) -> Option> { self.fetch() .map(Ok) - .or_else(|| self.state().status().map(Err)) + .or_else(|| self.state().status().map(map_err).map(Err)) } fn seek_and_get(&mut self) -> Option> { @@ -91,16 +91,20 @@ impl<'a> State<'a> { } } + pub(super) fn is_incomplete(&self) -> bool { + matches!(self.status(), Some(e) if is_incomplete(&e)) + } + fn fetch_key(&self) -> Option> { self.inner.key().map(Key::from) } fn _fetch_val(&self) -> Option> { self.inner.value().map(Val::from) } fn fetch(&self) -> Option> { self.inner.item().map(KeyVal::from) } - fn status(&self) -> Option { self.inner.status().map_err(map_err).err() } + pub(super) fn status(&self) -> Option { self.inner.status().err() } #[inline] - fn valid(&self) -> bool { self.inner.valid() } + pub(super) fn valid(&self) -> bool { self.inner.valid() } } fn keyval_longevity<'a, 'b: 'a>(item: KeyVal<'a>) -> KeyVal<'b> {