fix missing iteration-optimized read options on several stream types

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-24 23:45:35 +00:00
parent d59f68a51a
commit 94f2384fb0
6 changed files with 28 additions and 18 deletions

View file

@ -34,7 +34,8 @@ use conduwuit::Result;
use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions}; use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions};
pub(crate) use self::options::{ pub(crate) use self::options::{
cache_read_options_default, iter_options_default, read_options_default, write_options_default, cache_iter_options_default, cache_read_options_default, iter_options_default,
read_options_default, write_options_default,
}; };
use crate::{watchers::Watchers, Engine}; use crate::{watchers::Watchers, Engine};

View file

@ -2,24 +2,33 @@ use rocksdb::{ReadOptions, ReadTier, WriteOptions};
#[inline] #[inline]
pub(crate) fn iter_options_default() -> ReadOptions { pub(crate) fn iter_options_default() -> ReadOptions {
let mut read_options = read_options_default(); let mut options = read_options_default();
read_options.set_background_purge_on_iterator_cleanup(true); options.set_background_purge_on_iterator_cleanup(true);
//read_options.set_pin_data(true); //options.set_pin_data(true);
read_options options
}
#[inline]
pub(crate) fn cache_iter_options_default() -> ReadOptions {
let mut options = cache_read_options_default();
options.set_background_purge_on_iterator_cleanup(true);
//options.set_pin_data(true);
options
} }
#[inline] #[inline]
pub(crate) fn cache_read_options_default() -> ReadOptions { pub(crate) fn cache_read_options_default() -> ReadOptions {
let mut read_options = read_options_default(); let mut options = read_options_default();
read_options.set_read_tier(ReadTier::BlockCache); options.set_read_tier(ReadTier::BlockCache);
read_options options.fill_cache(false);
options
} }
#[inline] #[inline]
pub(crate) fn read_options_default() -> ReadOptions { pub(crate) fn read_options_default() -> ReadOptions {
let mut read_options = ReadOptions::default(); let mut options = ReadOptions::default();
read_options.set_total_order_seek(true); options.set_total_order_seek(true);
read_options options
} }
#[inline] #[inline]

View file

@ -31,7 +31,7 @@ where
pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send { pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
use crate::pool::Seek; use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::iter_options_default();
let state = stream::State::new(self, opts); let state = stream::State::new(self, opts);
if is_cached(self) { if is_cached(self) {
let state = state.init_rev(None); let state = state.init_rev(None);
@ -66,7 +66,7 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
fields(%map), fields(%map),
)] )]
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool { pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
let opts = super::cache_read_options_default(); let opts = super::cache_iter_options_default();
let state = stream::State::new(map, opts).init_rev(None); let state = stream::State::new(map, opts).init_rev(None);
!state.is_incomplete() !state.is_incomplete()

View file

@ -118,7 +118,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where where
P: AsRef<[u8]> + ?Sized, P: AsRef<[u8]> + ?Sized,
{ {
let cache_opts = super::cache_read_options_default(); let cache_opts = super::cache_iter_options_default();
let cache_status = stream::State::new(map, cache_opts) let cache_status = stream::State::new(map, cache_opts)
.init_rev(from.as_ref().into()) .init_rev(from.as_ref().into())
.status(); .status();

View file

@ -30,7 +30,7 @@ where
pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send { pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
use crate::pool::Seek; use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::iter_options_default();
let state = stream::State::new(self, opts); let state = stream::State::new(self, opts);
if is_cached(self) { if is_cached(self) {
let state = state.init_fwd(None); let state = state.init_fwd(None);
@ -65,7 +65,7 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
fields(%map), fields(%map),
)] )]
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool { pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
let opts = super::cache_read_options_default(); let opts = super::cache_iter_options_default();
let state = stream::State::new(map, opts).init_fwd(None); let state = stream::State::new(map, opts).init_fwd(None);
!state.is_incomplete() !state.is_incomplete()

View file

@ -77,7 +77,7 @@ where
{ {
use crate::pool::Seek; use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::iter_options_default();
let state = stream::State::new(self, opts); let state = stream::State::new(self, opts);
if is_cached(self, from) { if is_cached(self, from) {
let state = state.init_fwd(from.as_ref().into()); let state = state.init_fwd(from.as_ref().into());
@ -115,7 +115,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where where
P: AsRef<[u8]> + ?Sized, P: AsRef<[u8]> + ?Sized,
{ {
let opts = super::cache_read_options_default(); let opts = super::cache_iter_options_default();
let state = stream::State::new(map, opts).init_fwd(from.as_ref().into()); let state = stream::State::new(map, opts).init_fwd(from.as_ref().into());
!state.is_incomplete() !state.is_incomplete()