offload initial iterator seeks to threadpool

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-02 08:51:59 +00:00
parent 320b0680bd
commit b5006a4c41
20 changed files with 361 additions and 131 deletions

View file

@ -1,4 +1,4 @@
use std::{fmt::Debug, future::Future}; use std::{fmt::Debug, future::Future, sync::Arc};
use conduit::implement; use conduit::implement;
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -14,7 +14,7 @@ pub fn count(&self) -> impl Future<Output = usize> + Send + '_ { self.raw_keys()
/// - From is a structured key /// - From is a structured key
#[implement(super::Map)] #[implement(super::Map)]
#[inline] #[inline]
pub fn count_from<'a, P>(&'a self, from: &P) -> impl Future<Output = usize> + Send + 'a pub fn count_from<'a, P>(self: &'a Arc<Self>, from: &P) -> impl Future<Output = usize> + Send + 'a
where where
P: Serialize + ?Sized + Debug + 'a, P: Serialize + ?Sized + Debug + 'a,
{ {
@ -26,7 +26,7 @@ where
/// - From is a raw /// - From is a raw
#[implement(super::Map)] #[implement(super::Map)]
#[inline] #[inline]
pub fn raw_count_from<'a, P>(&'a self, from: &'a P) -> impl Future<Output = usize> + Send + 'a pub fn raw_count_from<'a, P>(self: &'a Arc<Self>, from: &'a P) -> impl Future<Output = usize> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
{ {
@ -38,7 +38,7 @@ where
/// - Prefix is structured key /// - Prefix is structured key
#[implement(super::Map)] #[implement(super::Map)]
#[inline] #[inline]
pub fn count_prefix<'a, P>(&'a self, prefix: &P) -> impl Future<Output = usize> + Send + 'a pub fn count_prefix<'a, P>(self: &'a Arc<Self>, prefix: &P) -> impl Future<Output = usize> + Send + 'a
where where
P: Serialize + ?Sized + Debug + 'a, P: Serialize + ?Sized + Debug + 'a,
{ {
@ -50,7 +50,7 @@ where
/// - Prefix is raw /// - Prefix is raw
#[implement(super::Map)] #[implement(super::Map)]
#[inline] #[inline]
pub fn raw_count_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Future<Output = usize> + Send + 'a pub fn raw_count_prefix<'a, P>(self: &'a Arc<Self>, prefix: &'a P) -> impl Future<Output = usize> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
{ {

View file

@ -1,7 +1,11 @@
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc}; use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use conduit::{err, implement, utils::IterStream, Err, Result}; use conduit::{
err, implement,
utils::{result::MapExpect, IterStream},
Err, Result,
};
use futures::{future, Future, FutureExt, Stream, StreamExt}; use futures::{future, Future, FutureExt, Stream, StreamExt};
use rocksdb::DBPinnableSlice; use rocksdb::DBPinnableSlice;
use serde::Serialize; use serde::Serialize;
@ -74,21 +78,21 @@ pub fn get<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'
where where
K: AsRef<[u8]> + Debug + ?Sized, K: AsRef<[u8]> + Debug + ?Sized,
{ {
use crate::pool::{Cmd, Get}; use crate::pool::Get;
let cached = self.get_cached(key); let cached = self.get_cached(key);
if matches!(cached, Err(_) | Ok(Some(_))) { if matches!(cached, Err(_) | Ok(Some(_))) {
return future::ready(cached.map(|res| res.expect("Option is Some"))).boxed(); return future::ready(cached.map_expect("data found in cache")).boxed();
} }
debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete"); debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
let cmd = Cmd::Get(Get { let cmd = Get {
map: self.clone(), map: self.clone(),
key: key.as_ref().into(), key: key.as_ref().into(),
res: None, res: None,
}); };
self.db.pool.execute(cmd).boxed() self.db.pool.execute_get(cmd).boxed()
} }
#[implement(super::Map)] #[implement(super::Map)]

View file

@ -2,7 +2,7 @@ use conduit::{implement, Result};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use serde::Deserialize; use serde::Deserialize;
use crate::{keyval, keyval::Key, stream}; use crate::{keyval, keyval::Key, stream, stream::Cursor};
#[implement(super::Map)] #[implement(super::Map)]
pub fn keys<'a, K>(&'a self) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn keys<'a, K>(&'a self) -> impl Stream<Item = Result<Key<'_, K>>> + Send
@ -16,5 +16,5 @@ where
#[tracing::instrument(skip(self), fields(%self), level = "trace")] #[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn raw_keys(&self) -> impl Stream<Item = Result<Key<'_>>> + Send { pub fn raw_keys(&self) -> impl Stream<Item = Result<Key<'_>>> + Send {
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::Keys::new(&self.db, &self.cf, opts, None) stream::Keys::new(&self.db, &self.cf, opts).init(None)
} }

View file

@ -1,7 +1,8 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::{Stream, StreamExt}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use rocksdb::Direction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
@ -10,7 +11,7 @@ use crate::{
}; };
#[implement(super::Map)] #[implement(super::Map)]
pub fn keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn keys_from<'a, K, P>(self: &'a Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -20,7 +21,7 @@ where
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn keys_from_raw<P>(&self, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send pub fn keys_from_raw<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -29,7 +30,7 @@ where
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn keys_raw_from<'a, K, P>(&'a self, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn keys_raw_from<'a, K, P>(self: &'a Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync, P: AsRef<[u8]> + ?Sized + Debug + Sync,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -39,10 +40,27 @@ where
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self, from), fields(%self), level = "trace")] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
pub fn raw_keys_from<P>(&self, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send pub fn raw_keys_from<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug, P: AsRef<[u8]> + ?Sized + Debug,
{ {
use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::Keys::new(&self.db, &self.cf, opts, Some(from.as_ref())) let state = stream::State::new(&self.db, &self.cf, opts);
let seek = Seek {
map: self.clone(),
dir: Direction::Forward,
key: Some(from.as_ref().into()),
state: crate::pool::into_send_seek(state),
res: None,
};
self.db
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()
.into_stream()
.try_flatten()
.boxed()
} }

View file

@ -1,4 +1,4 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::{ use futures::{
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use crate::keyval::{result_deserialize_key, serialize_key, Key}; use crate::keyval::{result_deserialize_key, serialize_key, Key};
#[implement(super::Map)] #[implement(super::Map)]
pub fn keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn keys_prefix<'a, K, P>(self: &'a Arc<Self>, prefix: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -22,7 +22,7 @@ where
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn keys_prefix_raw<P>(&self, prefix: &P) -> impl Stream<Item = Result<Key<'_>>> + Send pub fn keys_prefix_raw<P>(self: &Arc<Self>, prefix: &P) -> impl Stream<Item = Result<Key<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -32,7 +32,9 @@ where
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn keys_raw_prefix<'a, K, P>(&'a self, prefix: &'a P) -> impl Stream<Item = Result<Key<'_, K>>> + Send + 'a pub fn keys_raw_prefix<'a, K, P>(
self: &'a Arc<Self>, prefix: &'a P,
) -> impl Stream<Item = Result<Key<'_, K>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
K: Deserialize<'a> + Send + 'a, K: Deserialize<'a> + Send + 'a,
@ -42,7 +44,7 @@ where
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn raw_keys_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream<Item = Result<Key<'_>>> + Send + 'a pub fn raw_keys_prefix<'a, P>(self: &'a Arc<Self>, prefix: &'a P) -> impl Stream<Item = Result<Key<'_>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
{ {

View file

@ -2,7 +2,7 @@ use conduit::{implement, Result};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use serde::Deserialize; use serde::Deserialize;
use crate::{keyval, keyval::Key, stream}; use crate::{keyval, keyval::Key, stream, stream::Cursor};
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_keys<'a, K>(&'a self) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn rev_keys<'a, K>(&'a self) -> impl Stream<Item = Result<Key<'_, K>>> + Send
@ -16,5 +16,5 @@ where
#[tracing::instrument(skip(self), fields(%self), level = "trace")] #[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn rev_raw_keys(&self) -> impl Stream<Item = Result<Key<'_>>> + Send { pub fn rev_raw_keys(&self) -> impl Stream<Item = Result<Key<'_>>> + Send {
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::KeysRev::new(&self.db, &self.cf, opts, None) stream::KeysRev::new(&self.db, &self.cf, opts).init(None)
} }

View file

@ -1,7 +1,8 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::{Stream, StreamExt}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use rocksdb::Direction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
@ -10,7 +11,7 @@ use crate::{
}; };
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn rev_keys_from<'a, K, P>(self: &'a Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -21,7 +22,7 @@ where
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn rev_keys_from_raw<P>(&self, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send pub fn rev_keys_from_raw<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -30,7 +31,7 @@ where
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_keys_raw_from<'a, K, P>(&'a self, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn rev_keys_raw_from<'a, K, P>(self: &'a Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync, P: AsRef<[u8]> + ?Sized + Debug + Sync,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -41,10 +42,27 @@ where
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self, from), fields(%self), level = "trace")] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
pub fn rev_raw_keys_from<P>(&self, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send pub fn rev_raw_keys_from<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<Key<'_>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug, P: AsRef<[u8]> + ?Sized + Debug,
{ {
use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::KeysRev::new(&self.db, &self.cf, opts, Some(from.as_ref())) let state = stream::State::new(&self.db, &self.cf, opts);
let seek = Seek {
map: self.clone(),
dir: Direction::Reverse,
key: Some(from.as_ref().into()),
state: crate::pool::into_send_seek(state),
res: None,
};
self.db
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()
.into_stream()
.try_flatten()
.boxed()
} }

View file

@ -1,4 +1,4 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::{ use futures::{
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use crate::keyval::{result_deserialize_key, serialize_key, Key}; use crate::keyval::{result_deserialize_key, serialize_key, Key};
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send pub fn rev_keys_prefix<'a, K, P>(self: &'a Arc<Self>, prefix: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -22,7 +22,7 @@ where
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn rev_keys_prefix_raw<P>(&self, prefix: &P) -> impl Stream<Item = Result<Key<'_>>> + Send pub fn rev_keys_prefix_raw<P>(self: &Arc<Self>, prefix: &P) -> impl Stream<Item = Result<Key<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -32,7 +32,9 @@ where
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_keys_raw_prefix<'a, K, P>(&'a self, prefix: &'a P) -> impl Stream<Item = Result<Key<'_, K>>> + Send + 'a pub fn rev_keys_raw_prefix<'a, K, P>(
self: &'a Arc<Self>, prefix: &'a P,
) -> impl Stream<Item = Result<Key<'_, K>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
K: Deserialize<'a> + Send + 'a, K: Deserialize<'a> + Send + 'a,
@ -42,7 +44,7 @@ where
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_raw_keys_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream<Item = Result<Key<'_>>> + Send + 'a pub fn rev_raw_keys_prefix<'a, P>(self: &'a Arc<Self>, prefix: &'a P) -> impl Stream<Item = Result<Key<'_>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
{ {

View file

@ -2,7 +2,7 @@ use conduit::{implement, Result};
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use serde::Deserialize; use serde::Deserialize;
use crate::{keyval, keyval::KeyVal, stream}; use crate::{keyval, keyval::KeyVal, stream, stream::Cursor};
/// Iterate key-value entries in the map from the end. /// Iterate key-value entries in the map from the end.
/// ///
@ -24,5 +24,5 @@ where
#[tracing::instrument(skip(self), fields(%self), level = "trace")] #[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn rev_raw_stream(&self) -> impl Stream<Item = Result<KeyVal<'_>>> + Send { pub fn rev_raw_stream(&self) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::ItemsRev::new(&self.db, &self.cf, opts, None) stream::ItemsRev::new(&self.db, &self.cf, opts).init(None)
} }

View file

@ -1,7 +1,11 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::stream::{Stream, StreamExt}; use futures::{
stream::{Stream, StreamExt},
FutureExt, TryFutureExt, TryStreamExt,
};
use rocksdb::Direction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
@ -14,7 +18,9 @@ use crate::{
/// - Query is serialized /// - Query is serialized
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_stream_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send pub fn rev_stream_from<'a, K, V, P>(
self: &'a Arc<Self>, from: &P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -30,7 +36,7 @@ where
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn rev_stream_from_raw<P>(&self, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send pub fn rev_stream_from_raw<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -43,7 +49,9 @@ where
/// - Query is raw /// - Query is raw
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_stream_raw_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send pub fn rev_stream_raw_from<'a, K, V, P>(
self: &'a Arc<Self>, from: &P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync, P: AsRef<[u8]> + ?Sized + Debug + Sync,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -59,10 +67,27 @@ where
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self, from), fields(%self), level = "trace")] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
pub fn rev_raw_stream_from<P>(&self, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send pub fn rev_raw_stream_from<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug, P: AsRef<[u8]> + ?Sized + Debug,
{ {
use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::ItemsRev::new(&self.db, &self.cf, opts, Some(from.as_ref())) let state = stream::State::new(&self.db, &self.cf, opts);
let seek = Seek {
map: self.clone(),
dir: Direction::Reverse,
key: Some(from.as_ref().into()),
state: crate::pool::into_send_seek(state),
res: None,
};
self.db
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
.into_stream()
.try_flatten()
.boxed()
} }

View file

@ -1,4 +1,4 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::{ use futures::{
@ -15,7 +15,9 @@ use crate::keyval::{result_deserialize, serialize_key, KeyVal};
/// - Query is serialized /// - Query is serialized
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_stream_prefix<'a, K, V, P>(&'a self, prefix: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send pub fn rev_stream_prefix<'a, K, V, P>(
self: &'a Arc<Self>, prefix: &P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -31,7 +33,7 @@ where
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn rev_stream_prefix_raw<P>(&self, prefix: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send pub fn rev_stream_prefix_raw<P>(self: &Arc<Self>, prefix: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -46,7 +48,7 @@ where
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_stream_raw_prefix<'a, K, V, P>( pub fn rev_stream_raw_prefix<'a, K, V, P>(
&'a self, prefix: &'a P, self: &'a Arc<Self>, prefix: &'a P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + 'a ) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
@ -62,7 +64,9 @@ where
/// - Query is raw /// - Query is raw
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
pub fn rev_raw_stream_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + 'a pub fn rev_raw_stream_prefix<'a, P>(
self: &'a Arc<Self>, prefix: &'a P,
) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
{ {

View file

@ -2,7 +2,7 @@ use conduit::{implement, Result};
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use serde::Deserialize; use serde::Deserialize;
use crate::{keyval, keyval::KeyVal, stream}; use crate::{keyval, keyval::KeyVal, stream, stream::Cursor};
/// Iterate key-value entries in the map from the beginning. /// Iterate key-value entries in the map from the beginning.
/// ///
@ -23,5 +23,5 @@ where
#[tracing::instrument(skip(self), fields(%self), level = "trace")] #[tracing::instrument(skip(self), fields(%self), level = "trace")]
pub fn raw_stream(&self) -> impl Stream<Item = Result<KeyVal<'_>>> + Send { pub fn raw_stream(&self) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::Items::new(&self.db, &self.cf, opts, None) stream::Items::new(&self.db, &self.cf, opts).init(None)
} }

View file

@ -1,7 +1,11 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::stream::{Stream, StreamExt}; use futures::{
stream::{Stream, StreamExt},
FutureExt, TryFutureExt, TryStreamExt,
};
use rocksdb::Direction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
@ -14,7 +18,7 @@ use crate::{
/// - Query is serialized /// - Query is serialized
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn stream_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send pub fn stream_from<'a, K, V, P>(self: &'a Arc<Self>, from: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -29,7 +33,7 @@ where
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn stream_from_raw<P>(&self, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send pub fn stream_from_raw<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -42,7 +46,9 @@ where
/// - Query is raw /// - Query is raw
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn stream_raw_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send pub fn stream_raw_from<'a, K, V, P>(
self: &'a Arc<Self>, from: &P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync, P: AsRef<[u8]> + ?Sized + Debug + Sync,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -57,10 +63,27 @@ where
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self, from), fields(%self), level = "trace")] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
pub fn raw_stream_from<P>(&self, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send pub fn raw_stream_from<P>(self: &Arc<Self>, from: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send
where where
P: AsRef<[u8]> + ?Sized + Debug, P: AsRef<[u8]> + ?Sized + Debug,
{ {
use crate::pool::Seek;
let opts = super::read_options_default(); let opts = super::read_options_default();
stream::Items::new(&self.db, &self.cf, opts, Some(from.as_ref())) let state = stream::State::new(&self.db, &self.cf, opts);
let seek = Seek {
map: self.clone(),
dir: Direction::Forward,
key: Some(from.as_ref().into()),
state: crate::pool::into_send_seek(state),
res: None,
};
self.db
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
.into_stream()
.try_flatten()
.boxed()
} }

View file

@ -1,4 +1,4 @@
use std::{convert::AsRef, fmt::Debug}; use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduit::{implement, Result}; use conduit::{implement, Result};
use futures::{ use futures::{
@ -15,7 +15,9 @@ use crate::keyval::{result_deserialize, serialize_key, KeyVal};
/// - Query is serialized /// - Query is serialized
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn stream_prefix<'a, K, V, P>(&'a self, prefix: &P) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send pub fn stream_prefix<'a, K, V, P>(
self: &'a Arc<Self>, prefix: &P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send, K: Deserialize<'a> + Send,
@ -31,7 +33,7 @@ where
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]
pub fn stream_prefix_raw<P>(&self, prefix: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send pub fn stream_prefix_raw<P>(self: &Arc<Self>, prefix: &P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send
where where
P: Serialize + ?Sized + Debug, P: Serialize + ?Sized + Debug,
{ {
@ -46,7 +48,7 @@ where
/// - Result is deserialized /// - Result is deserialized
#[implement(super::Map)] #[implement(super::Map)]
pub fn stream_raw_prefix<'a, K, V, P>( pub fn stream_raw_prefix<'a, K, V, P>(
&'a self, prefix: &'a P, self: &'a Arc<Self>, prefix: &'a P,
) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + 'a ) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
@ -62,7 +64,9 @@ where
/// - Query is raw /// - Query is raw
/// - Result is raw /// - Result is raw
#[implement(super::Map)] #[implement(super::Map)]
pub fn raw_stream_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + 'a pub fn raw_stream_prefix<'a, P>(
self: &'a Arc<Self>, prefix: &'a P,
) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + 'a
where where
P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
{ {

View file

@ -8,17 +8,18 @@ use std::{
use async_channel::{bounded, Receiver, RecvError, Sender}; use async_channel::{bounded, Receiver, RecvError, Sender};
use conduit::{debug, debug_warn, defer, err, implement, result::DebugInspect, Result, Server}; use conduit::{debug, debug_warn, defer, err, implement, result::DebugInspect, Result, Server};
use futures::channel::oneshot; use futures::{channel::oneshot, TryFutureExt};
use oneshot::Sender as ResultSender;
use rocksdb::Direction;
use tokio::{sync::Mutex, task::JoinSet}; use tokio::{sync::Mutex, task::JoinSet};
use crate::{keyval::KeyBuf, Handle, Map}; use crate::{keyval::KeyBuf, stream, Handle, Map};
pub(crate) struct Pool { pub(crate) struct Pool {
server: Arc<Server>, server: Arc<Server>,
workers: Mutex<JoinSet<()>>, workers: Mutex<JoinSet<()>>,
queue: Sender<Cmd>, queue: Sender<Cmd>,
busy: AtomicUsize, busy: AtomicUsize,
busy_max: AtomicUsize,
queued_max: AtomicUsize, queued_max: AtomicUsize,
} }
@ -27,19 +28,24 @@ pub(crate) struct Opts {
pub(crate) worker_num: usize, pub(crate) worker_num: usize,
} }
#[derive(Debug)]
pub(crate) enum Cmd { pub(crate) enum Cmd {
Get(Get), Get(Get),
Iter(Seek),
} }
#[derive(Debug)]
pub(crate) struct Get { pub(crate) struct Get {
pub(crate) map: Arc<Map>, pub(crate) map: Arc<Map>,
pub(crate) key: KeyBuf, pub(crate) key: KeyBuf,
pub(crate) res: Option<ResultSender>, pub(crate) res: Option<ResultSender<Result<Handle<'static>>>>,
} }
type ResultSender = oneshot::Sender<Result<Handle<'static>>>; pub(crate) struct Seek {
pub(crate) map: Arc<Map>,
pub(crate) state: stream::State<'static>,
pub(crate) dir: Direction,
pub(crate) key: Option<KeyBuf>,
pub(crate) res: Option<ResultSender<stream::State<'static>>>,
}
const QUEUE_LIMIT: (usize, usize) = (1, 3072); const QUEUE_LIMIT: (usize, usize) = (1, 3072);
const WORKER_LIMIT: (usize, usize) = (1, 512); const WORKER_LIMIT: (usize, usize) = (1, 512);
@ -60,7 +66,6 @@ pub(crate) async fn new(server: &Arc<Server>, opts: &Opts) -> Result<Arc<Self>>
workers: JoinSet::new().into(), workers: JoinSet::new().into(),
queue: send, queue: send,
busy: AtomicUsize::default(), busy: AtomicUsize::default(),
busy_max: AtomicUsize::default(),
queued_max: AtomicUsize::default(), queued_max: AtomicUsize::default(),
}); });
@ -94,6 +99,8 @@ pub(crate) fn close(&self) {
let closing = self.queue.close(); let closing = self.queue.close();
debug_assert!(closing, "channel is not closing"); debug_assert!(closing, "channel is not closing");
std::thread::yield_now();
} }
#[implement(Pool)] #[implement(Pool)]
@ -117,22 +124,45 @@ fn spawn_one(self: &Arc<Self>, workers: &mut JoinSet<()>, recv: Receiver<Cmd>) -
Ok(()) Ok(())
} }
#[implement(Pool)]
#[tracing::instrument(level = "trace", name = "get", skip(self, cmd))]
pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result<Handle<'_>> {
let (send, recv) = oneshot::channel();
_ = cmd.res.insert(send);
self.execute(Cmd::Get(cmd))
.and_then(|()| {
recv.map_ok(into_recv_get_result)
.map_err(|e| err!(error!("recv failed {e:?}")))
})
.await?
}
#[implement(Pool)]
#[tracing::instrument(level = "trace", name = "iter", skip(self, cmd))]
pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result<stream::State<'_>> {
let (send, recv) = oneshot::channel();
_ = cmd.res.insert(send);
self.execute(Cmd::Iter(cmd))
.and_then(|()| {
recv.map_ok(into_recv_seek)
.map_err(|e| err!(error!("recv failed {e:?}")))
})
.await
}
#[implement(Pool)] #[implement(Pool)]
#[tracing::instrument( #[tracing::instrument(
level = "trace" level = "trace",
name = "execute",
skip(self, cmd), skip(self, cmd),
fields( fields(
task = ?tokio::task::try_id(), task = ?tokio::task::try_id(),
receivers = self.queue.receiver_count(), receivers = self.queue.receiver_count(),
senders = self.queue.sender_count(),
queued = self.queue.len(), queued = self.queue.len(),
queued_max = self.queued_max.load(Ordering::Relaxed), queued_max = self.queued_max.load(Ordering::Relaxed),
), ),
)] )]
pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result<Handle<'_>> { async fn execute(&self, cmd: Cmd) -> Result {
let (send, recv) = oneshot::channel();
Self::prepare(&mut cmd, send);
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
self.queued_max self.queued_max
.fetch_max(self.queue.len(), Ordering::Relaxed); .fetch_max(self.queue.len(), Ordering::Relaxed);
@ -148,20 +178,7 @@ pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result<Handle<'_>> {
self.queue self.queue
.send(cmd) .send(cmd)
.await .await
.map_err(|e| err!(error!("send failed {e:?}")))?; .map_err(|e| err!(error!("send failed {e:?}")))
recv.await
.map(into_recv_result)
.map_err(|e| err!(error!("recv failed {e:?}")))?
}
#[implement(Pool)]
fn prepare(cmd: &mut Cmd, send: ResultSender) {
match cmd {
Cmd::Get(ref mut cmd) => {
_ = cmd.res.insert(send);
},
};
} }
#[implement(Pool)] #[implement(Pool)]
@ -178,8 +195,8 @@ fn worker_loop(&self, recv: &Receiver<Cmd>) {
// initial +1 needed prior to entering wait // initial +1 needed prior to entering wait
self.busy.fetch_add(1, Ordering::Relaxed); self.busy.fetch_add(1, Ordering::Relaxed);
while let Ok(mut cmd) = self.worker_wait(recv) { while let Ok(cmd) = self.worker_wait(recv) {
self.worker_handle(&mut cmd); self.worker_handle(cmd);
} }
} }
@ -190,13 +207,8 @@ fn worker_loop(&self, recv: &Receiver<Cmd>) {
skip_all, skip_all,
fields( fields(
receivers = recv.receiver_count(), receivers = recv.receiver_count(),
senders = recv.sender_count(),
queued = recv.len(), queued = recv.len(),
busy = self.busy.load(Ordering::Relaxed), busy = self.busy.fetch_sub(1, Ordering::Relaxed) - 1,
busy_max = self.busy_max.fetch_max(
self.busy.fetch_sub(1, Ordering::Relaxed),
Ordering::Relaxed
),
), ),
)] )]
fn worker_wait(&self, recv: &Receiver<Cmd>) -> Result<Cmd, RecvError> { fn worker_wait(&self, recv: &Receiver<Cmd>) -> Result<Cmd, RecvError> {
@ -206,12 +218,60 @@ fn worker_wait(&self, recv: &Receiver<Cmd>) -> Result<Cmd, RecvError> {
} }
#[implement(Pool)] #[implement(Pool)]
fn worker_handle(&self, cmd: &mut Cmd) { fn worker_handle(&self, cmd: Cmd) {
match cmd { match cmd {
Cmd::Get(cmd) => self.handle_get(cmd), Cmd::Get(cmd) => self.handle_get(cmd),
Cmd::Iter(cmd) => self.handle_iter(cmd),
} }
} }
#[implement(Pool)]
#[tracing::instrument(
name = "iter",
level = "trace",
skip_all,
fields(%cmd.map),
)]
fn handle_iter(&self, mut cmd: Seek) {
let chan = cmd.res.take().expect("missing result channel");
if chan.is_canceled() {
return;
}
let from = cmd.key.as_deref().map(Into::into);
let result = match cmd.dir {
Direction::Forward => cmd.state.init_fwd(from),
Direction::Reverse => cmd.state.init_rev(from),
};
let chan_result = chan.send(into_send_seek(result));
let _chan_sent = chan_result.is_ok();
}
#[implement(Pool)]
#[tracing::instrument(
name = "seek",
level = "trace",
skip_all,
fields(%cmd.map),
)]
fn _handle_seek(&self, mut cmd: Seek) {
let chan = cmd.res.take().expect("missing result channel");
if chan.is_canceled() {
return;
}
match cmd.dir {
Direction::Forward => cmd.state.seek_fwd(),
Direction::Reverse => cmd.state.seek_rev(),
};
let chan_result = chan.send(into_send_seek(cmd.state));
let _chan_sent = chan_result.is_ok();
}
#[implement(Pool)] #[implement(Pool)]
#[tracing::instrument( #[tracing::instrument(
name = "get", name = "get",
@ -219,7 +279,7 @@ fn worker_handle(&self, cmd: &mut Cmd) {
skip_all, skip_all,
fields(%cmd.map), fields(%cmd.map),
)] )]
fn handle_get(&self, cmd: &mut Get) { fn handle_get(&self, mut cmd: Get) {
debug_assert!(!cmd.key.is_empty(), "querying for empty key"); debug_assert!(!cmd.key.is_empty(), "querying for empty key");
// Obtain the result channel. // Obtain the result channel.
@ -237,23 +297,31 @@ fn handle_get(&self, cmd: &mut Get) {
let result = cmd.map.get_blocking(&cmd.key); let result = cmd.map.get_blocking(&cmd.key);
// Send the result back to the submitter. // Send the result back to the submitter.
let chan_result = chan.send(into_send_result(result)); let chan_result = chan.send(into_send_get_result(result));
// If the future was dropped during the query this will fail acceptably. // If the future was dropped during the query this will fail acceptably.
let _chan_sent = chan_result.is_ok(); let _chan_sent = chan_result.is_ok();
} }
fn into_send_result(result: Result<Handle<'_>>) -> Result<Handle<'static>> { fn into_send_get_result(result: Result<Handle<'_>>) -> Result<Handle<'static>> {
// SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through // SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through
// the channel. The lifetime on the handle is a device by rust-rocksdb to // the channel. The lifetime on the handle is a device by rust-rocksdb to
// associate a database lifetime with its assets. The Handle must be dropped // associate a database lifetime with its assets. The Handle must be dropped
// before the database is dropped. The handle must pass through recv_handle() on // before the database is dropped.
// the other end of the channel.
unsafe { std::mem::transmute(result) } unsafe { std::mem::transmute(result) }
} }
fn into_recv_result(result: Result<Handle<'static>>) -> Result<Handle<'_>> { fn into_recv_get_result(result: Result<Handle<'static>>) -> Result<Handle<'_>> {
// SAFETY: This is to receive the Handle from the channel. Previously it had // SAFETY: This is to receive the Handle from the channel.
// passed through send_handle(). unsafe { std::mem::transmute(result) }
}
pub(crate) fn into_send_seek(result: stream::State<'_>) -> stream::State<'static> {
// SAFETY: Necessary to send the State through the channel; see above.
unsafe { std::mem::transmute(result) }
}
fn into_recv_seek(result: stream::State<'static>) -> stream::State<'_> {
// SAFETY: This is to receive the State from the channel; see above.
unsafe { std::mem::transmute(result) } unsafe { std::mem::transmute(result) }
} }

View file

@ -16,19 +16,21 @@ use crate::{
Engine, Slice, Engine, Slice,
}; };
struct State<'a> { pub(crate) struct State<'a> {
inner: Inner<'a>, inner: Inner<'a>,
seek: bool, seek: bool,
init: bool, init: bool,
} }
trait Cursor<'a, T> { pub(crate) trait Cursor<'a, T> {
fn state(&self) -> &State<'a>; fn state(&self) -> &State<'a>;
fn fetch(&self) -> Option<T>; fn fetch(&self) -> Option<T>;
fn seek(&mut self); fn seek(&mut self);
fn init(self, from: From<'a>) -> Self;
fn get(&self) -> Option<Result<T>> { fn get(&self) -> Option<Result<T>> {
self.fetch() self.fetch()
.map(Ok) .map(Ok)
@ -45,7 +47,7 @@ type Inner<'a> = DBRawIteratorWithThreadMode<'a, Db>;
type From<'a> = Option<Key<'a>>; type From<'a> = Option<Key<'a>>;
impl<'a> State<'a> { impl<'a> State<'a> {
fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions) -> Self { pub(super) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions) -> Self {
Self { Self {
inner: db.db.raw_iterator_cf_opt(&**cf, opts), inner: db.db.raw_iterator_cf_opt(&**cf, opts),
init: true, init: true,
@ -53,7 +55,7 @@ impl<'a> State<'a> {
} }
} }
fn init_fwd(mut self, from: From<'_>) -> Self { pub(super) fn init_fwd(mut self, from: From<'_>) -> Self {
if let Some(key) = from { if let Some(key) = from {
self.inner.seek(key); self.inner.seek(key);
self.seek = true; self.seek = true;
@ -62,7 +64,7 @@ impl<'a> State<'a> {
self self
} }
fn init_rev(mut self, from: From<'_>) -> Self { pub(super) fn init_rev(mut self, from: From<'_>) -> Self {
if let Some(key) = from { if let Some(key) = from {
self.inner.seek_for_prev(key); self.inner.seek_for_prev(key);
self.seek = true; self.seek = true;
@ -72,7 +74,7 @@ impl<'a> State<'a> {
} }
#[inline] #[inline]
fn seek_fwd(&mut self) { pub(super) fn seek_fwd(&mut self) {
if !exchange(&mut self.init, false) { if !exchange(&mut self.init, false) {
self.inner.next(); self.inner.next();
} else if !self.seek { } else if !self.seek {
@ -81,7 +83,7 @@ impl<'a> State<'a> {
} }
#[inline] #[inline]
fn seek_rev(&mut self) { pub(super) fn seek_rev(&mut self) {
if !exchange(&mut self.init, false) { if !exchange(&mut self.init, false) {
self.inner.prev(); self.inner.prev();
} else if !self.seek { } else if !self.seek {

View file

@ -1,4 +1,4 @@
use std::{pin::Pin, sync::Arc}; use std::{convert, pin::Pin, sync::Arc};
use conduit::Result; use conduit::Result;
use futures::{ use futures::{
@ -16,9 +16,17 @@ pub(crate) struct Items<'a> {
} }
impl<'a> Items<'a> { impl<'a> Items<'a> {
pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions, from: From<'_>) -> Self { pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions) -> Self {
Self { Self {
state: State::new(db, cf, opts).init_fwd(from), state: State::new(db, cf, opts),
}
}
}
impl<'a> convert::From<State<'a>> for Items<'a> {
fn from(state: State<'a>) -> Self {
Self {
state,
} }
} }
} }
@ -30,6 +38,13 @@ impl<'a> Cursor<'a, KeyVal<'a>> for Items<'a> {
#[inline] #[inline]
fn seek(&mut self) { self.state.seek_fwd(); } fn seek(&mut self) { self.state.seek_fwd(); }
#[inline]
fn init(self, from: From<'a>) -> Self {
Self {
state: self.state.init_fwd(from),
}
}
} }
impl<'a> Stream for Items<'a> { impl<'a> Stream for Items<'a> {

View file

@ -1,4 +1,4 @@
use std::{pin::Pin, sync::Arc}; use std::{convert, pin::Pin, sync::Arc};
use conduit::Result; use conduit::Result;
use futures::{ use futures::{
@ -16,9 +16,17 @@ pub(crate) struct ItemsRev<'a> {
} }
impl<'a> ItemsRev<'a> { impl<'a> ItemsRev<'a> {
pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions, from: From<'_>) -> Self { pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions) -> Self {
Self { Self {
state: State::new(db, cf, opts).init_rev(from), state: State::new(db, cf, opts),
}
}
}
impl<'a> convert::From<State<'a>> for ItemsRev<'a> {
fn from(state: State<'a>) -> Self {
Self {
state,
} }
} }
} }
@ -30,6 +38,13 @@ impl<'a> Cursor<'a, KeyVal<'a>> for ItemsRev<'a> {
#[inline] #[inline]
fn seek(&mut self) { self.state.seek_rev(); } fn seek(&mut self) { self.state.seek_rev(); }
#[inline]
fn init(self, from: From<'a>) -> Self {
Self {
state: self.state.init_rev(from),
}
}
} }
impl<'a> Stream for ItemsRev<'a> { impl<'a> Stream for ItemsRev<'a> {

View file

@ -1,4 +1,4 @@
use std::{pin::Pin, sync::Arc}; use std::{convert, pin::Pin, sync::Arc};
use conduit::Result; use conduit::Result;
use futures::{ use futures::{
@ -16,9 +16,17 @@ pub(crate) struct Keys<'a> {
} }
impl<'a> Keys<'a> { impl<'a> Keys<'a> {
pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions, from: From<'_>) -> Self { pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions) -> Self {
Self { Self {
state: State::new(db, cf, opts).init_fwd(from), state: State::new(db, cf, opts),
}
}
}
impl<'a> convert::From<State<'a>> for Keys<'a> {
fn from(state: State<'a>) -> Self {
Self {
state,
} }
} }
} }
@ -31,6 +39,13 @@ impl<'a> Cursor<'a, Key<'a>> for Keys<'a> {
#[inline] #[inline]
fn seek(&mut self) { self.state.seek_fwd(); } fn seek(&mut self) { self.state.seek_fwd(); }
#[inline]
fn init(self, from: From<'a>) -> Self {
Self {
state: self.state.init_fwd(from),
}
}
} }
impl<'a> Stream for Keys<'a> { impl<'a> Stream for Keys<'a> {

View file

@ -1,4 +1,4 @@
use std::{pin::Pin, sync::Arc}; use std::{convert, pin::Pin, sync::Arc};
use conduit::Result; use conduit::Result;
use futures::{ use futures::{
@ -16,9 +16,17 @@ pub(crate) struct KeysRev<'a> {
} }
impl<'a> KeysRev<'a> { impl<'a> KeysRev<'a> {
pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions, from: From<'_>) -> Self { pub(crate) fn new(db: &'a Arc<Engine>, cf: &'a Arc<ColumnFamily>, opts: ReadOptions) -> Self {
Self { Self {
state: State::new(db, cf, opts).init_rev(from), state: State::new(db, cf, opts),
}
}
}
impl<'a> convert::From<State<'a>> for KeysRev<'a> {
fn from(state: State<'a>) -> Self {
Self {
state,
} }
} }
} }
@ -31,6 +39,13 @@ impl<'a> Cursor<'a, Key<'a>> for KeysRev<'a> {
#[inline] #[inline]
fn seek(&mut self) { self.state.seek_rev(); } fn seek(&mut self) { self.state.seek_rev(); }
#[inline]
fn init(self, from: From<'a>) -> Self {
Self {
state: self.state.init_rev(from),
}
}
} }
impl<'a> Stream for KeysRev<'a> { impl<'a> Stream for KeysRev<'a> {