offload remaining db iterator initial seeks on cache miss

consume task budget on cache hit

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-18 22:56:53 +00:00 committed by strawberry
parent 14341bb906
commit 98e6c81e49
16 changed files with 199 additions and 131 deletions

View file

@ -1,12 +1,10 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduwuit::{implement, Result};
use futures::{
stream::{Stream, StreamExt},
FutureExt, TryFutureExt, TryStreamExt,
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use rocksdb::Direction;
use serde::{Deserialize, Serialize};
use tokio::task;
use crate::{
keyval::{result_deserialize, serialize_key, KeyVal},
@ -85,7 +83,12 @@ where
let opts = super::iter_options_default();
let state = stream::State::new(&self.db, &self.cf, opts);
if is_cached(self, from) {
return stream::ItemsRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
let state = state.init_rev(from.as_ref().into());
return task::consume_budget()
.map(move |()| stream::ItemsRev::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
};
let seek = Seek {