diff --git a/src/database/map/count.rs b/src/database/map/count.rs index b9b34613..22b298b9 100644 --- a/src/database/map/count.rs +++ b/src/database/map/count.rs @@ -7,7 +7,9 @@ use serde::Serialize; /// Count the total number of entries in the map. #[implement(super::Map)] #[inline] -pub fn count(&self) -> impl Future + Send + '_ { self.raw_keys().count() } +pub fn count(self: &Arc) -> impl Future + Send + '_ { + self.raw_keys().count() +} /// Count the number of entries in the map starting from a lower-bound. /// diff --git a/src/database/map/keys.rs b/src/database/map/keys.rs index 3ab5bacc..7d09f3da 100644 --- a/src/database/map/keys.rs +++ b/src/database/map/keys.rs @@ -1,11 +1,16 @@ -use conduwuit::{implement, Result}; -use futures::{Stream, StreamExt}; -use serde::Deserialize; +use std::sync::Arc; -use crate::{keyval, keyval::Key, stream, stream::Cursor}; +use conduwuit::{implement, Result}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use rocksdb::Direction; +use serde::Deserialize; +use tokio::task; + +use super::stream::is_cached; +use crate::{keyval, keyval::Key, stream}; #[implement(super::Map)] -pub fn keys<'a, K>(&'a self) -> impl Stream>> + Send +pub fn keys<'a, K>(self: &'a Arc) -> impl Stream>> + Send where K: Deserialize<'a> + Send, { @@ -14,7 +19,33 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self), fields(%self), level = "trace")] -pub fn raw_keys(&self) -> impl Stream>> + Send { +pub fn raw_keys(self: &Arc) -> impl Stream>> + Send { + use crate::pool::Seek; + let opts = super::iter_options_default(); - stream::Keys::new(&self.db, &self.cf, opts).init(None) + let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self) { + let state = state.init_fwd(None); + return task::consume_budget() + .map(move |()| stream::Keys::<'_>::from(state)) + .into_stream() + .flatten() + .boxed(); + } + + let seek = Seek { + map: self.clone(), + dir: Direction::Forward, + state: crate::pool::into_send_seek(state), + key: None, + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } diff --git a/src/database/map/keys_prefix.rs b/src/database/map/keys_prefix.rs index 32a1f04c..28bc7ccd 100644 --- a/src/database/map/keys_prefix.rs +++ b/src/database/map/keys_prefix.rs @@ -1,11 +1,7 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{implement, Result}; -use futures::{ - future, - stream::{Stream, StreamExt}, - TryStreamExt, -}; +use futures::{future, Stream, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::keyval::{result_deserialize_key, serialize_key, Key}; diff --git a/src/database/map/rev_keys.rs b/src/database/map/rev_keys.rs index 7eb4ce63..0ca6ad0f 100644 --- a/src/database/map/rev_keys.rs +++ b/src/database/map/rev_keys.rs @@ -1,11 +1,16 @@ -use conduwuit::{implement, Result}; -use futures::{Stream, StreamExt}; -use serde::Deserialize; +use std::sync::Arc; -use crate::{keyval, keyval::Key, stream, stream::Cursor}; +use conduwuit::{implement, Result}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use rocksdb::Direction; +use serde::Deserialize; +use tokio::task; + +use super::rev_stream::is_cached; +use crate::{keyval, keyval::Key, stream}; #[implement(super::Map)] -pub fn rev_keys<'a, K>(&'a self) -> impl Stream>> + Send +pub fn rev_keys<'a, K>(self: &'a Arc) -> impl Stream>> + Send where K: Deserialize<'a> + Send, { @@ -14,7 +19,33 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self), fields(%self), level = "trace")] -pub fn rev_raw_keys(&self) -> impl Stream>> + Send { +pub fn rev_raw_keys(self: &Arc) -> impl Stream>> + Send { + use crate::pool::Seek; + let opts = super::iter_options_default(); - stream::KeysRev::new(&self.db, &self.cf, opts).init(None) + let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self) { + let state = state.init_rev(None); + return task::consume_budget() + .map(move |()| stream::KeysRev::<'_>::from(state)) + .into_stream() + .flatten() + .boxed(); + } + + let seek = Seek { + map: self.clone(), + dir: Direction::Reverse, + state: crate::pool::into_send_seek(state), + key: None, + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } diff --git a/src/database/map/rev_keys_prefix.rs b/src/database/map/rev_keys_prefix.rs index 9fda49a0..fb29acaf 100644 --- a/src/database/map/rev_keys_prefix.rs +++ b/src/database/map/rev_keys_prefix.rs @@ -1,11 +1,7 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{implement, Result}; -use futures::{ - future, - stream::{Stream, StreamExt}, - TryStreamExt, -}; +use futures::{future, Stream, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::keyval::{result_deserialize_key, serialize_key, Key}; diff --git a/src/database/map/rev_stream.rs b/src/database/map/rev_stream.rs index 7f58582f..d882dd91 100644 --- a/src/database/map/rev_stream.rs +++ b/src/database/map/rev_stream.rs @@ -1,14 +1,20 @@ -use conduwuit::{implement, Result}; -use futures::stream::{Stream, StreamExt}; -use serde::Deserialize; +use std::sync::Arc; -use crate::{keyval, keyval::KeyVal, stream, stream::Cursor}; +use conduwuit::{implement, Result}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use rocksdb::Direction; +use serde::Deserialize; +use tokio::task; + +use crate::{keyval, keyval::KeyVal, stream}; /// Iterate key-value entries in the map from the end. /// /// - Result is deserialized #[implement(super::Map)] -pub fn rev_stream<'a, K, V>(&'a self) -> impl Stream>> + Send +pub fn rev_stream<'a, K, V>( + self: &'a Arc, +) -> impl Stream>> + Send where K: Deserialize<'a> + Send, V: Deserialize<'a> + Send, @@ -22,9 +28,35 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self), fields(%self), level = "trace")] -pub fn rev_raw_stream(&self) -> impl Stream>> + Send { - let opts = super::iter_options_default(); - stream::ItemsRev::new(&self.db, &self.cf, opts).init(None) +pub fn rev_raw_stream(self: &Arc) -> impl Stream>> + Send { + use crate::pool::Seek; + + let opts = super::read_options_default(); + let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self) { + let state = state.init_rev(None); + return task::consume_budget() + .map(move |()| stream::ItemsRev::<'_>::from(state)) + .into_stream() + .flatten() + .boxed(); + }; + + let seek = Seek { + map: self.clone(), + dir: Direction::Reverse, + state: crate::pool::into_send_seek(state), + key: None, + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } #[tracing::instrument( @@ -33,13 +65,9 @@ pub fn rev_raw_stream(&self) -> impl Stream>> + Send { skip_all, fields(%map), )] -pub(super) fn _is_cached

(map: &super::Map) -> bool -where - P: AsRef<[u8]> + ?Sized, -{ +pub(super) fn is_cached(map: &super::Map) -> bool { let opts = super::cache_read_options_default(); - let mut state = stream::State::new(&map.db, &map.cf, opts); + let state = stream::State::new(&map.db, &map.cf, opts).init_rev(None); - state.seek_rev(); !state.is_incomplete() } diff --git a/src/database/map/rev_stream_from.rs b/src/database/map/rev_stream_from.rs index d166aa0f..72fc739c 100644 --- a/src/database/map/rev_stream_from.rs +++ b/src/database/map/rev_stream_from.rs @@ -1,12 +1,10 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{implement, Result}; -use futures::{ - stream::{Stream, StreamExt}, - FutureExt, TryFutureExt, TryStreamExt, -}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; +use tokio::task; use crate::{ keyval::{result_deserialize, serialize_key, KeyVal}, @@ -85,7 +83,12 @@ where let opts = super::iter_options_default(); let state = stream::State::new(&self.db, &self.cf, opts); if is_cached(self, from) { - return stream::ItemsRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed(); + let state = state.init_rev(from.as_ref().into()); + return task::consume_budget() + .map(move |()| stream::ItemsRev::<'_>::from(state)) + .into_stream() + .flatten() + .boxed(); }; let seek = Seek { diff --git a/src/database/map/rev_stream_prefix.rs b/src/database/map/rev_stream_prefix.rs index 857aa3a5..22a2ce53 100644 --- a/src/database/map/rev_stream_prefix.rs +++ b/src/database/map/rev_stream_prefix.rs @@ -1,11 +1,7 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{implement, Result}; -use futures::{ - future, - stream::{Stream, StreamExt}, - TryStreamExt, -}; +use futures::{future, Stream, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::keyval::{result_deserialize, serialize_key, KeyVal}; diff --git a/src/database/map/stream.rs b/src/database/map/stream.rs index 1a90b8fb..11b0676c 100644 --- a/src/database/map/stream.rs +++ b/src/database/map/stream.rs @@ -1,14 +1,20 @@ -use conduwuit::{implement, Result}; -use futures::stream::{Stream, StreamExt}; -use serde::Deserialize; +use std::sync::Arc; -use crate::{keyval, keyval::KeyVal, stream, stream::Cursor}; +use conduwuit::{implement, Result}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use rocksdb::Direction; +use serde::Deserialize; +use tokio::task; + +use crate::{keyval, keyval::KeyVal, stream}; /// Iterate key-value entries in the map from the beginning. /// /// - Result is deserialized #[implement(super::Map)] -pub fn stream<'a, K, V>(&'a self) -> impl Stream>> + Send +pub fn stream<'a, K, V>( + self: &'a Arc, +) -> impl Stream>> + Send where K: Deserialize<'a> + Send, V: Deserialize<'a> + Send, @@ -21,9 +27,35 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self), fields(%self), level = "trace")] -pub fn raw_stream(&self) -> impl Stream>> + Send { - let opts = super::iter_options_default(); - stream::Items::new(&self.db, &self.cf, opts).init(None) +pub fn raw_stream(self: &Arc) -> impl Stream>> + Send { + use crate::pool::Seek; + + let opts = super::read_options_default(); + let state = stream::State::new(&self.db, &self.cf, opts); + if is_cached(self) { + let state = state.init_fwd(None); + return task::consume_budget() + .map(move |()| stream::Items::<'_>::from(state)) + .into_stream() + .flatten() + .boxed(); + }; + + let seek = Seek { + map: self.clone(), + dir: Direction::Forward, + state: crate::pool::into_send_seek(state), + key: None, + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } #[tracing::instrument( @@ -32,13 +64,9 @@ pub fn raw_stream(&self) -> impl Stream>> + Send { skip_all, fields(%map), )] -pub(super) fn _is_cached

(map: &super::Map) -> bool -where - P: AsRef<[u8]> + ?Sized, -{ +pub(super) fn is_cached(map: &super::Map) -> bool { let opts = super::cache_read_options_default(); - let mut state = stream::State::new(&map.db, &map.cf, opts); + let state = stream::State::new(&map.db, &map.cf, opts).init_fwd(None); - state.seek_fwd(); !state.is_incomplete() } diff --git a/src/database/map/stream_from.rs b/src/database/map/stream_from.rs index 107ce4b1..79ea8f51 100644 --- a/src/database/map/stream_from.rs +++ b/src/database/map/stream_from.rs @@ -1,12 +1,10 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{implement, Result}; -use futures::{ - stream::{Stream, StreamExt}, - FutureExt, TryFutureExt, TryStreamExt, -}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; +use tokio::task; use crate::{ keyval::{result_deserialize, serialize_key, KeyVal}, @@ -82,7 +80,12 @@ where let opts = super::read_options_default(); let state = stream::State::new(&self.db, &self.cf, opts); if is_cached(self, from) { - return stream::Items::<'_>::from(state.init_fwd(from.as_ref().into())).boxed(); + let state = state.init_fwd(from.as_ref().into()); + return task::consume_budget() + .map(move |()| stream::Items::<'_>::from(state)) + .into_stream() + .flatten() + .boxed(); }; let seek = Seek { diff --git a/src/database/map/stream_prefix.rs b/src/database/map/stream_prefix.rs index a05e2fc5..adacfc81 100644 --- a/src/database/map/stream_prefix.rs +++ b/src/database/map/stream_prefix.rs @@ -1,11 +1,7 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduwuit::{implement, Result}; -use futures::{ - future, - stream::{Stream, StreamExt}, - TryStreamExt, -}; +use futures::{future, Stream, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::keyval::{result_deserialize, serialize_key, KeyVal}; diff --git a/src/database/stream.rs b/src/database/stream.rs index f8e6733d..d7cb16c6 100644 --- a/src/database/stream.rs +++ b/src/database/stream.rs @@ -29,8 +29,6 @@ pub(crate) trait Cursor<'a, T> { fn seek(&mut self); - fn init(self, from: From<'a>) -> Self; - fn get(&self) -> Option> { self.fetch() .map(Ok) diff --git a/src/database/stream/items.rs b/src/database/stream/items.rs index 2a38d97e..cd81b4a0 100644 --- a/src/database/stream/items.rs +++ b/src/database/stream/items.rs @@ -1,4 +1,4 @@ -use std::{convert, pin::Pin, sync::Arc}; +use std::pin::Pin; use conduwuit::Result; use futures::{ @@ -6,22 +6,15 @@ use futures::{ task::{Context, Poll}, Stream, }; -use rocksdb::{ColumnFamily, ReadOptions}; -use super::{keyval_longevity, Cursor, From, State}; -use crate::{keyval::KeyVal, Engine}; +use super::{keyval_longevity, Cursor, State}; +use crate::keyval::KeyVal; pub(crate) struct Items<'a> { state: State<'a>, } -impl<'a> Items<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { - Self { state: State::new(db, cf, opts) } - } -} - -impl<'a> convert::From> for Items<'a> { +impl<'a> From> for Items<'a> { fn from(state: State<'a>) -> Self { Self { state } } } @@ -32,9 +25,6 @@ impl<'a> Cursor<'a, KeyVal<'a>> for Items<'a> { #[inline] fn seek(&mut self) { self.state.seek_fwd(); } - - #[inline] - fn init(self, from: From<'a>) -> Self { Self { state: self.state.init_fwd(from) } } } impl<'a> Stream for Items<'a> { diff --git a/src/database/stream/items_rev.rs b/src/database/stream/items_rev.rs index c3a6cc7f..c6cf9b53 100644 --- a/src/database/stream/items_rev.rs +++ b/src/database/stream/items_rev.rs @@ -1,4 +1,4 @@ -use std::{convert, pin::Pin, sync::Arc}; +use std::pin::Pin; use conduwuit::Result; use futures::{ @@ -6,22 +6,15 @@ use futures::{ task::{Context, Poll}, Stream, }; -use rocksdb::{ColumnFamily, ReadOptions}; -use super::{keyval_longevity, Cursor, From, State}; -use crate::{keyval::KeyVal, Engine}; +use super::{keyval_longevity, Cursor, State}; +use crate::keyval::KeyVal; pub(crate) struct ItemsRev<'a> { state: State<'a>, } -impl<'a> ItemsRev<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { - Self { state: State::new(db, cf, opts) } - } -} - -impl<'a> convert::From> for ItemsRev<'a> { +impl<'a> From> for ItemsRev<'a> { fn from(state: State<'a>) -> Self { Self { state } } } @@ -32,9 +25,6 @@ impl<'a> Cursor<'a, KeyVal<'a>> for ItemsRev<'a> { #[inline] fn seek(&mut self) { self.state.seek_rev(); } - - #[inline] - fn init(self, from: From<'a>) -> Self { Self { state: self.state.init_rev(from) } } } impl<'a> Stream for ItemsRev<'a> { diff --git a/src/database/stream/keys.rs b/src/database/stream/keys.rs index 0696781d..9bf27507 100644 --- a/src/database/stream/keys.rs +++ b/src/database/stream/keys.rs @@ -1,4 +1,4 @@ -use std::{convert, pin::Pin, sync::Arc}; +use std::pin::Pin; use conduwuit::Result; use futures::{ @@ -6,22 +6,15 @@ use futures::{ task::{Context, Poll}, Stream, }; -use rocksdb::{ColumnFamily, ReadOptions}; -use super::{slice_longevity, Cursor, From, State}; -use crate::{keyval::Key, Engine}; +use super::{slice_longevity, Cursor, State}; +use crate::keyval::Key; pub(crate) struct Keys<'a> { state: State<'a>, } -impl<'a> Keys<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { - Self { state: State::new(db, cf, opts) } - } -} - -impl<'a> convert::From> for Keys<'a> { +impl<'a> From> for Keys<'a> { fn from(state: State<'a>) -> Self { Self { state } } } @@ -33,9 +26,6 @@ impl<'a> Cursor<'a, Key<'a>> for Keys<'a> { #[inline] fn seek(&mut self) { self.state.seek_fwd(); } - - #[inline] - fn init(self, from: From<'a>) -> Self { Self { state: self.state.init_fwd(from) } } } impl<'a> Stream for Keys<'a> { diff --git a/src/database/stream/keys_rev.rs b/src/database/stream/keys_rev.rs index 42706d9f..8657df0f 100644 --- a/src/database/stream/keys_rev.rs +++ b/src/database/stream/keys_rev.rs @@ -1,4 +1,4 @@ -use std::{convert, pin::Pin, sync::Arc}; +use std::pin::Pin; use conduwuit::Result; use futures::{ @@ -6,22 +6,15 @@ use futures::{ task::{Context, Poll}, Stream, }; -use rocksdb::{ColumnFamily, ReadOptions}; -use super::{slice_longevity, Cursor, From, State}; -use crate::{keyval::Key, Engine}; +use super::{slice_longevity, Cursor, State}; +use crate::keyval::Key; pub(crate) struct KeysRev<'a> { state: State<'a>, } -impl<'a> KeysRev<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { - Self { state: State::new(db, cf, opts) } - } -} - -impl<'a> convert::From> for KeysRev<'a> { +impl<'a> From> for KeysRev<'a> { fn from(state: State<'a>) -> Self { Self { state } } } @@ -33,9 +26,6 @@ impl<'a> Cursor<'a, Key<'a>> for KeysRev<'a> { #[inline] fn seek(&mut self) { self.state.seek_rev(); } - - #[inline] - fn init(self, from: From<'a>) -> Self { Self { state: self.state.init_rev(from) } } } impl<'a> Stream for KeysRev<'a> {