diff --git a/src/database/map/count.rs b/src/database/map/count.rs index 3e92279c..894fe12e 100644 --- a/src/database/map/count.rs +++ b/src/database/map/count.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, future::Future}; +use std::{fmt::Debug, future::Future, sync::Arc}; use conduit::implement; use futures::stream::StreamExt; @@ -14,7 +14,7 @@ pub fn count(&self) -> impl Future + Send + '_ { self.raw_keys() /// - From is a structured key #[implement(super::Map)] #[inline] -pub fn count_from<'a, P>(&'a self, from: &P) -> impl Future + Send + 'a +pub fn count_from<'a, P>(self: &'a Arc, from: &P) -> impl Future + Send + 'a where P: Serialize + ?Sized + Debug + 'a, { @@ -26,7 +26,7 @@ where /// - From is a raw #[implement(super::Map)] #[inline] -pub fn raw_count_from<'a, P>(&'a self, from: &'a P) -> impl Future + Send + 'a +pub fn raw_count_from<'a, P>(self: &'a Arc, from: &'a P) -> impl Future + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, { @@ -38,7 +38,7 @@ where /// - Prefix is structured key #[implement(super::Map)] #[inline] -pub fn count_prefix<'a, P>(&'a self, prefix: &P) -> impl Future + Send + 'a +pub fn count_prefix<'a, P>(self: &'a Arc, prefix: &P) -> impl Future + Send + 'a where P: Serialize + ?Sized + Debug + 'a, { @@ -50,7 +50,7 @@ where /// - Prefix is raw #[implement(super::Map)] #[inline] -pub fn raw_count_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Future + Send + 'a +pub fn raw_count_prefix<'a, P>(self: &'a Arc, prefix: &'a P) -> impl Future + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, { diff --git a/src/database/map/get.rs b/src/database/map/get.rs index 4699fec4..ef552177 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -1,7 +1,11 @@ use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc}; use arrayvec::ArrayVec; -use conduit::{err, implement, utils::IterStream, Err, Result}; +use conduit::{ + err, implement, + utils::{result::MapExpect, IterStream}, + Err, Result, +}; use futures::{future, Future, FutureExt, Stream, StreamExt}; use rocksdb::DBPinnableSlice; use serde::Serialize; @@ -74,21 +78,21 @@ pub fn get(self: &Arc, key: &K) -> impl Future + Debug + ?Sized, { - use crate::pool::{Cmd, Get}; + use crate::pool::Get; let cached = self.get_cached(key); if matches!(cached, Err(_) | Ok(Some(_))) { - return future::ready(cached.map(|res| res.expect("Option is Some"))).boxed(); + return future::ready(cached.map_expect("data found in cache")).boxed(); } debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete"); - let cmd = Cmd::Get(Get { + let cmd = Get { map: self.clone(), key: key.as_ref().into(), res: None, - }); + }; - self.db.pool.execute(cmd).boxed() + self.db.pool.execute_get(cmd).boxed() } #[implement(super::Map)] diff --git a/src/database/map/keys.rs b/src/database/map/keys.rs index 9c4d66e4..80cf1e15 100644 --- a/src/database/map/keys.rs +++ b/src/database/map/keys.rs @@ -2,7 +2,7 @@ use conduit::{implement, Result}; use futures::{Stream, StreamExt}; use serde::Deserialize; -use crate::{keyval, keyval::Key, stream}; +use crate::{keyval, keyval::Key, stream, stream::Cursor}; #[implement(super::Map)] pub fn keys<'a, K>(&'a self) -> impl Stream>> + Send @@ -16,5 +16,5 @@ where #[tracing::instrument(skip(self), fields(%self), level = "trace")] pub fn raw_keys(&self) -> impl Stream>> + Send { let opts = super::read_options_default(); - stream::Keys::new(&self.db, &self.cf, opts, None) + stream::Keys::new(&self.db, &self.cf, opts).init(None) } diff --git a/src/database/map/keys_from.rs b/src/database/map/keys_from.rs index 093f7fd6..7be3dd1d 100644 --- a/src/database/map/keys_from.rs +++ b/src/database/map/keys_from.rs @@ -1,7 +1,8 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; -use futures::{Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use rocksdb::Direction; use serde::{Deserialize, Serialize}; use crate::{ @@ -10,7 +11,7 @@ use crate::{ }; #[implement(super::Map)] -pub fn keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn keys_from<'a, K, P>(self: &'a Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -20,7 +21,7 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn keys_from_raw

(&self, from: &P) -> impl Stream>> + Send +pub fn keys_from_raw

(self: &Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -29,7 +30,7 @@ where } #[implement(super::Map)] -pub fn keys_raw_from<'a, K, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn keys_raw_from<'a, K, P>(self: &'a Arc, from: &P) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug + Sync, K: Deserialize<'a> + Send, @@ -39,10 +40,27 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")] -pub fn raw_keys_from

(&self, from: &P) -> impl Stream>> + Send +pub fn raw_keys_from

(self: &Arc, from: &P) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug, { + use crate::pool::Seek; + let opts = super::read_options_default(); - stream::Keys::new(&self.db, &self.cf, opts, Some(from.as_ref())) + let state = stream::State::new(&self.db, &self.cf, opts); + let seek = Seek { + map: self.clone(), + dir: Direction::Forward, + key: Some(from.as_ref().into()), + state: crate::pool::into_send_seek(state), + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } diff --git a/src/database/map/keys_prefix.rs b/src/database/map/keys_prefix.rs index 8963f002..9122d78e 100644 --- a/src/database/map/keys_prefix.rs +++ b/src/database/map/keys_prefix.rs @@ -1,4 +1,4 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; use futures::{ @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use crate::keyval::{result_deserialize_key, serialize_key, Key}; #[implement(super::Map)] -pub fn keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream>> + Send +pub fn keys_prefix<'a, K, P>(self: &'a Arc, prefix: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -22,7 +22,7 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn keys_prefix_raw

(&self, prefix: &P) -> impl Stream>> + Send +pub fn keys_prefix_raw

(self: &Arc, prefix: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -32,7 +32,9 @@ where } #[implement(super::Map)] -pub fn keys_raw_prefix<'a, K, P>(&'a self, prefix: &'a P) -> impl Stream>> + Send + 'a +pub fn keys_raw_prefix<'a, K, P>( + self: &'a Arc, prefix: &'a P, +) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, K: Deserialize<'a> + Send + 'a, @@ -42,7 +44,7 @@ where } #[implement(super::Map)] -pub fn raw_keys_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream>> + Send + 'a +pub fn raw_keys_prefix<'a, P>(self: &'a Arc, prefix: &'a P) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, { diff --git a/src/database/map/rev_keys.rs b/src/database/map/rev_keys.rs index e10a199c..45a0203f 100644 --- a/src/database/map/rev_keys.rs +++ b/src/database/map/rev_keys.rs @@ -2,7 +2,7 @@ use conduit::{implement, Result}; use futures::{Stream, StreamExt}; use serde::Deserialize; -use crate::{keyval, keyval::Key, stream}; +use crate::{keyval, keyval::Key, stream, stream::Cursor}; #[implement(super::Map)] pub fn rev_keys<'a, K>(&'a self) -> impl Stream>> + Send @@ -16,5 +16,5 @@ where #[tracing::instrument(skip(self), fields(%self), level = "trace")] pub fn rev_raw_keys(&self) -> impl Stream>> + Send { let opts = super::read_options_default(); - stream::KeysRev::new(&self.db, &self.cf, opts, None) + stream::KeysRev::new(&self.db, &self.cf, opts).init(None) } diff --git a/src/database/map/rev_keys_from.rs b/src/database/map/rev_keys_from.rs index 75d062b5..2b59a5d7 100644 --- a/src/database/map/rev_keys_from.rs +++ b/src/database/map/rev_keys_from.rs @@ -1,7 +1,8 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; -use futures::{Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use rocksdb::Direction; use serde::{Deserialize, Serialize}; use crate::{ @@ -10,7 +11,7 @@ use crate::{ }; #[implement(super::Map)] -pub fn rev_keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn rev_keys_from<'a, K, P>(self: &'a Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -21,7 +22,7 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn rev_keys_from_raw

(&self, from: &P) -> impl Stream>> + Send +pub fn rev_keys_from_raw

(self: &Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -30,7 +31,7 @@ where } #[implement(super::Map)] -pub fn rev_keys_raw_from<'a, K, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn rev_keys_raw_from<'a, K, P>(self: &'a Arc, from: &P) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug + Sync, K: Deserialize<'a> + Send, @@ -41,10 +42,27 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")] -pub fn rev_raw_keys_from

(&self, from: &P) -> impl Stream>> + Send +pub fn rev_raw_keys_from

(self: &Arc, from: &P) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug, { + use crate::pool::Seek; + let opts = super::read_options_default(); - stream::KeysRev::new(&self.db, &self.cf, opts, Some(from.as_ref())) + let state = stream::State::new(&self.db, &self.cf, opts); + let seek = Seek { + map: self.clone(), + dir: Direction::Reverse, + key: Some(from.as_ref().into()), + state: crate::pool::into_send_seek(state), + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } diff --git a/src/database/map/rev_keys_prefix.rs b/src/database/map/rev_keys_prefix.rs index c14909d4..69dc54f2 100644 --- a/src/database/map/rev_keys_prefix.rs +++ b/src/database/map/rev_keys_prefix.rs @@ -1,4 +1,4 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; use futures::{ @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use crate::keyval::{result_deserialize_key, serialize_key, Key}; #[implement(super::Map)] -pub fn rev_keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream>> + Send +pub fn rev_keys_prefix<'a, K, P>(self: &'a Arc, prefix: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -22,7 +22,7 @@ where #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn rev_keys_prefix_raw

(&self, prefix: &P) -> impl Stream>> + Send +pub fn rev_keys_prefix_raw

(self: &Arc, prefix: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -32,7 +32,9 @@ where } #[implement(super::Map)] -pub fn rev_keys_raw_prefix<'a, K, P>(&'a self, prefix: &'a P) -> impl Stream>> + Send + 'a +pub fn rev_keys_raw_prefix<'a, K, P>( + self: &'a Arc, prefix: &'a P, +) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, K: Deserialize<'a> + Send + 'a, @@ -42,7 +44,7 @@ where } #[implement(super::Map)] -pub fn rev_raw_keys_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream>> + Send + 'a +pub fn rev_raw_keys_prefix<'a, P>(self: &'a Arc, prefix: &'a P) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, { diff --git a/src/database/map/rev_stream.rs b/src/database/map/rev_stream.rs index f4be69fd..37b0d3b3 100644 --- a/src/database/map/rev_stream.rs +++ b/src/database/map/rev_stream.rs @@ -2,7 +2,7 @@ use conduit::{implement, Result}; use futures::stream::{Stream, StreamExt}; use serde::Deserialize; -use crate::{keyval, keyval::KeyVal, stream}; +use crate::{keyval, keyval::KeyVal, stream, stream::Cursor}; /// Iterate key-value entries in the map from the end. /// @@ -24,5 +24,5 @@ where #[tracing::instrument(skip(self), fields(%self), level = "trace")] pub fn rev_raw_stream(&self) -> impl Stream>> + Send { let opts = super::read_options_default(); - stream::ItemsRev::new(&self.db, &self.cf, opts, None) + stream::ItemsRev::new(&self.db, &self.cf, opts).init(None) } diff --git a/src/database/map/rev_stream_from.rs b/src/database/map/rev_stream_from.rs index 6ac1cd1a..9811d106 100644 --- a/src/database/map/rev_stream_from.rs +++ b/src/database/map/rev_stream_from.rs @@ -1,7 +1,11 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; -use futures::stream::{Stream, StreamExt}; +use futures::{ + stream::{Stream, StreamExt}, + FutureExt, TryFutureExt, TryStreamExt, +}; +use rocksdb::Direction; use serde::{Deserialize, Serialize}; use crate::{ @@ -14,7 +18,9 @@ use crate::{ /// - Query is serialized /// - Result is deserialized #[implement(super::Map)] -pub fn rev_stream_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn rev_stream_from<'a, K, V, P>( + self: &'a Arc, from: &P, +) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -30,7 +36,7 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn rev_stream_from_raw

(&self, from: &P) -> impl Stream>> + Send +pub fn rev_stream_from_raw

(self: &Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -43,7 +49,9 @@ where /// - Query is raw /// - Result is deserialized #[implement(super::Map)] -pub fn rev_stream_raw_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn rev_stream_raw_from<'a, K, V, P>( + self: &'a Arc, from: &P, +) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug + Sync, K: Deserialize<'a> + Send, @@ -59,10 +67,27 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")] -pub fn rev_raw_stream_from

(&self, from: &P) -> impl Stream>> + Send +pub fn rev_raw_stream_from

(self: &Arc, from: &P) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug, { + use crate::pool::Seek; + let opts = super::read_options_default(); - stream::ItemsRev::new(&self.db, &self.cf, opts, Some(from.as_ref())) + let state = stream::State::new(&self.db, &self.cf, opts); + let seek = Seek { + map: self.clone(), + dir: Direction::Reverse, + key: Some(from.as_ref().into()), + state: crate::pool::into_send_seek(state), + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } diff --git a/src/database/map/rev_stream_prefix.rs b/src/database/map/rev_stream_prefix.rs index fd0d93ff..e5c2fbea 100644 --- a/src/database/map/rev_stream_prefix.rs +++ b/src/database/map/rev_stream_prefix.rs @@ -1,4 +1,4 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; use futures::{ @@ -15,7 +15,9 @@ use crate::keyval::{result_deserialize, serialize_key, KeyVal}; /// - Query is serialized /// - Result is deserialized #[implement(super::Map)] -pub fn rev_stream_prefix<'a, K, V, P>(&'a self, prefix: &P) -> impl Stream>> + Send +pub fn rev_stream_prefix<'a, K, V, P>( + self: &'a Arc, prefix: &P, +) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -31,7 +33,7 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn rev_stream_prefix_raw

(&self, prefix: &P) -> impl Stream>> + Send +pub fn rev_stream_prefix_raw

(self: &Arc, prefix: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -46,7 +48,7 @@ where /// - Result is deserialized #[implement(super::Map)] pub fn rev_stream_raw_prefix<'a, K, V, P>( - &'a self, prefix: &'a P, + self: &'a Arc, prefix: &'a P, ) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, @@ -62,7 +64,9 @@ where /// - Query is raw /// - Result is raw #[implement(super::Map)] -pub fn rev_raw_stream_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream>> + Send + 'a +pub fn rev_raw_stream_prefix<'a, P>( + self: &'a Arc, prefix: &'a P, +) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, { diff --git a/src/database/map/stream.rs b/src/database/map/stream.rs index 143b0d0c..4f4fbd08 100644 --- a/src/database/map/stream.rs +++ b/src/database/map/stream.rs @@ -2,7 +2,7 @@ use conduit::{implement, Result}; use futures::stream::{Stream, StreamExt}; use serde::Deserialize; -use crate::{keyval, keyval::KeyVal, stream}; +use crate::{keyval, keyval::KeyVal, stream, stream::Cursor}; /// Iterate key-value entries in the map from the beginning. /// @@ -23,5 +23,5 @@ where #[tracing::instrument(skip(self), fields(%self), level = "trace")] pub fn raw_stream(&self) -> impl Stream>> + Send { let opts = super::read_options_default(); - stream::Items::new(&self.db, &self.cf, opts, None) + stream::Items::new(&self.db, &self.cf, opts).init(None) } diff --git a/src/database/map/stream_from.rs b/src/database/map/stream_from.rs index 052a2e74..6468846f 100644 --- a/src/database/map/stream_from.rs +++ b/src/database/map/stream_from.rs @@ -1,7 +1,11 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; -use futures::stream::{Stream, StreamExt}; +use futures::{ + stream::{Stream, StreamExt}, + FutureExt, TryFutureExt, TryStreamExt, +}; +use rocksdb::Direction; use serde::{Deserialize, Serialize}; use crate::{ @@ -14,7 +18,7 @@ use crate::{ /// - Query is serialized /// - Result is deserialized #[implement(super::Map)] -pub fn stream_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn stream_from<'a, K, V, P>(self: &'a Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -29,7 +33,7 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn stream_from_raw

(&self, from: &P) -> impl Stream>> + Send +pub fn stream_from_raw

(self: &Arc, from: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -42,7 +46,9 @@ where /// - Query is raw /// - Result is deserialized #[implement(super::Map)] -pub fn stream_raw_from<'a, K, V, P>(&'a self, from: &P) -> impl Stream>> + Send +pub fn stream_raw_from<'a, K, V, P>( + self: &'a Arc, from: &P, +) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug + Sync, K: Deserialize<'a> + Send, @@ -57,10 +63,27 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self, from), fields(%self), level = "trace")] -pub fn raw_stream_from

(&self, from: &P) -> impl Stream>> + Send +pub fn raw_stream_from

(self: &Arc, from: &P) -> impl Stream>> + Send where P: AsRef<[u8]> + ?Sized + Debug, { + use crate::pool::Seek; + let opts = super::read_options_default(); - stream::Items::new(&self.db, &self.cf, opts, Some(from.as_ref())) + let state = stream::State::new(&self.db, &self.cf, opts); + let seek = Seek { + map: self.clone(), + dir: Direction::Forward, + key: Some(from.as_ref().into()), + state: crate::pool::into_send_seek(state), + res: None, + }; + + self.db + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten() + .boxed() } diff --git a/src/database/map/stream_prefix.rs b/src/database/map/stream_prefix.rs index a08b1e2a..3c7bce2e 100644 --- a/src/database/map/stream_prefix.rs +++ b/src/database/map/stream_prefix.rs @@ -1,4 +1,4 @@ -use std::{convert::AsRef, fmt::Debug}; +use std::{convert::AsRef, fmt::Debug, sync::Arc}; use conduit::{implement, Result}; use futures::{ @@ -15,7 +15,9 @@ use crate::keyval::{result_deserialize, serialize_key, KeyVal}; /// - Query is serialized /// - Result is deserialized #[implement(super::Map)] -pub fn stream_prefix<'a, K, V, P>(&'a self, prefix: &P) -> impl Stream>> + Send +pub fn stream_prefix<'a, K, V, P>( + self: &'a Arc, prefix: &P, +) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, K: Deserialize<'a> + Send, @@ -31,7 +33,7 @@ where /// - Result is raw #[implement(super::Map)] #[tracing::instrument(skip(self), level = "trace")] -pub fn stream_prefix_raw

(&self, prefix: &P) -> impl Stream>> + Send +pub fn stream_prefix_raw

(self: &Arc, prefix: &P) -> impl Stream>> + Send where P: Serialize + ?Sized + Debug, { @@ -46,7 +48,7 @@ where /// - Result is deserialized #[implement(super::Map)] pub fn stream_raw_prefix<'a, K, V, P>( - &'a self, prefix: &'a P, + self: &'a Arc, prefix: &'a P, ) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, @@ -62,7 +64,9 @@ where /// - Query is raw /// - Result is raw #[implement(super::Map)] -pub fn raw_stream_prefix<'a, P>(&'a self, prefix: &'a P) -> impl Stream>> + Send + 'a +pub fn raw_stream_prefix<'a, P>( + self: &'a Arc, prefix: &'a P, +) -> impl Stream>> + Send + 'a where P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a, { diff --git a/src/database/pool.rs b/src/database/pool.rs index a9697625..136de67d 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -8,17 +8,18 @@ use std::{ use async_channel::{bounded, Receiver, RecvError, Sender}; use conduit::{debug, debug_warn, defer, err, implement, result::DebugInspect, Result, Server}; -use futures::channel::oneshot; +use futures::{channel::oneshot, TryFutureExt}; +use oneshot::Sender as ResultSender; +use rocksdb::Direction; use tokio::{sync::Mutex, task::JoinSet}; -use crate::{keyval::KeyBuf, Handle, Map}; +use crate::{keyval::KeyBuf, stream, Handle, Map}; pub(crate) struct Pool { server: Arc, workers: Mutex>, queue: Sender, busy: AtomicUsize, - busy_max: AtomicUsize, queued_max: AtomicUsize, } @@ -27,19 +28,24 @@ pub(crate) struct Opts { pub(crate) worker_num: usize, } -#[derive(Debug)] pub(crate) enum Cmd { Get(Get), + Iter(Seek), } -#[derive(Debug)] pub(crate) struct Get { pub(crate) map: Arc, pub(crate) key: KeyBuf, - pub(crate) res: Option, + pub(crate) res: Option>>>, } -type ResultSender = oneshot::Sender>>; +pub(crate) struct Seek { + pub(crate) map: Arc, + pub(crate) state: stream::State<'static>, + pub(crate) dir: Direction, + pub(crate) key: Option, + pub(crate) res: Option>>, +} const QUEUE_LIMIT: (usize, usize) = (1, 3072); const WORKER_LIMIT: (usize, usize) = (1, 512); @@ -60,7 +66,6 @@ pub(crate) async fn new(server: &Arc, opts: &Opts) -> Result> workers: JoinSet::new().into(), queue: send, busy: AtomicUsize::default(), - busy_max: AtomicUsize::default(), queued_max: AtomicUsize::default(), }); @@ -94,6 +99,8 @@ pub(crate) fn close(&self) { let closing = self.queue.close(); debug_assert!(closing, "channel is not closing"); + + std::thread::yield_now(); } #[implement(Pool)] @@ -117,22 +124,45 @@ fn spawn_one(self: &Arc, workers: &mut JoinSet<()>, recv: Receiver) - Ok(()) } +#[implement(Pool)] +#[tracing::instrument(level = "trace", name = "get", skip(self, cmd))] +pub(crate) async fn execute_get(&self, mut cmd: Get) -> Result> { + let (send, recv) = oneshot::channel(); + _ = cmd.res.insert(send); + self.execute(Cmd::Get(cmd)) + .and_then(|()| { + recv.map_ok(into_recv_get_result) + .map_err(|e| err!(error!("recv failed {e:?}"))) + }) + .await? +} + +#[implement(Pool)] +#[tracing::instrument(level = "trace", name = "iter", skip(self, cmd))] +pub(crate) async fn execute_iter(&self, mut cmd: Seek) -> Result> { + let (send, recv) = oneshot::channel(); + _ = cmd.res.insert(send); + self.execute(Cmd::Iter(cmd)) + .and_then(|()| { + recv.map_ok(into_recv_seek) + .map_err(|e| err!(error!("recv failed {e:?}"))) + }) + .await +} + #[implement(Pool)] #[tracing::instrument( - level = "trace" + level = "trace", + name = "execute", skip(self, cmd), fields( task = ?tokio::task::try_id(), receivers = self.queue.receiver_count(), - senders = self.queue.sender_count(), queued = self.queue.len(), queued_max = self.queued_max.load(Ordering::Relaxed), ), )] -pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result> { - let (send, recv) = oneshot::channel(); - Self::prepare(&mut cmd, send); - +async fn execute(&self, cmd: Cmd) -> Result { if cfg!(debug_assertions) { self.queued_max .fetch_max(self.queue.len(), Ordering::Relaxed); @@ -148,20 +178,7 @@ pub(crate) async fn execute(&self, mut cmd: Cmd) -> Result> { self.queue .send(cmd) .await - .map_err(|e| err!(error!("send failed {e:?}")))?; - - recv.await - .map(into_recv_result) - .map_err(|e| err!(error!("recv failed {e:?}")))? -} - -#[implement(Pool)] -fn prepare(cmd: &mut Cmd, send: ResultSender) { - match cmd { - Cmd::Get(ref mut cmd) => { - _ = cmd.res.insert(send); - }, - }; + .map_err(|e| err!(error!("send failed {e:?}"))) } #[implement(Pool)] @@ -178,8 +195,8 @@ fn worker_loop(&self, recv: &Receiver) { // initial +1 needed prior to entering wait self.busy.fetch_add(1, Ordering::Relaxed); - while let Ok(mut cmd) = self.worker_wait(recv) { - self.worker_handle(&mut cmd); + while let Ok(cmd) = self.worker_wait(recv) { + self.worker_handle(cmd); } } @@ -190,13 +207,8 @@ fn worker_loop(&self, recv: &Receiver) { skip_all, fields( receivers = recv.receiver_count(), - senders = recv.sender_count(), queued = recv.len(), - busy = self.busy.load(Ordering::Relaxed), - busy_max = self.busy_max.fetch_max( - self.busy.fetch_sub(1, Ordering::Relaxed), - Ordering::Relaxed - ), + busy = self.busy.fetch_sub(1, Ordering::Relaxed) - 1, ), )] fn worker_wait(&self, recv: &Receiver) -> Result { @@ -206,12 +218,60 @@ fn worker_wait(&self, recv: &Receiver) -> Result { } #[implement(Pool)] -fn worker_handle(&self, cmd: &mut Cmd) { +fn worker_handle(&self, cmd: Cmd) { match cmd { Cmd::Get(cmd) => self.handle_get(cmd), + Cmd::Iter(cmd) => self.handle_iter(cmd), } } +#[implement(Pool)] +#[tracing::instrument( + name = "iter", + level = "trace", + skip_all, + fields(%cmd.map), +)] +fn handle_iter(&self, mut cmd: Seek) { + let chan = cmd.res.take().expect("missing result channel"); + + if chan.is_canceled() { + return; + } + + let from = cmd.key.as_deref().map(Into::into); + let result = match cmd.dir { + Direction::Forward => cmd.state.init_fwd(from), + Direction::Reverse => cmd.state.init_rev(from), + }; + + let chan_result = chan.send(into_send_seek(result)); + let _chan_sent = chan_result.is_ok(); +} + +#[implement(Pool)] +#[tracing::instrument( + name = "seek", + level = "trace", + skip_all, + fields(%cmd.map), +)] +fn _handle_seek(&self, mut cmd: Seek) { + let chan = cmd.res.take().expect("missing result channel"); + + if chan.is_canceled() { + return; + } + + match cmd.dir { + Direction::Forward => cmd.state.seek_fwd(), + Direction::Reverse => cmd.state.seek_rev(), + }; + + let chan_result = chan.send(into_send_seek(cmd.state)); + let _chan_sent = chan_result.is_ok(); +} + #[implement(Pool)] #[tracing::instrument( name = "get", @@ -219,7 +279,7 @@ fn worker_handle(&self, cmd: &mut Cmd) { skip_all, fields(%cmd.map), )] -fn handle_get(&self, cmd: &mut Get) { +fn handle_get(&self, mut cmd: Get) { debug_assert!(!cmd.key.is_empty(), "querying for empty key"); // Obtain the result channel. @@ -237,23 +297,31 @@ fn handle_get(&self, cmd: &mut Get) { let result = cmd.map.get_blocking(&cmd.key); // Send the result back to the submitter. - let chan_result = chan.send(into_send_result(result)); + let chan_result = chan.send(into_send_get_result(result)); // If the future was dropped during the query this will fail acceptably. let _chan_sent = chan_result.is_ok(); } -fn into_send_result(result: Result>) -> Result> { +fn into_send_get_result(result: Result>) -> Result> { // SAFETY: Necessary to send the Handle (rust_rocksdb::PinnableSlice) through // the channel. The lifetime on the handle is a device by rust-rocksdb to // associate a database lifetime with its assets. The Handle must be dropped - // before the database is dropped. The handle must pass through recv_handle() on - // the other end of the channel. + // before the database is dropped. unsafe { std::mem::transmute(result) } } -fn into_recv_result(result: Result>) -> Result> { - // SAFETY: This is to receive the Handle from the channel. Previously it had - // passed through send_handle(). +fn into_recv_get_result(result: Result>) -> Result> { + // SAFETY: This is to receive the Handle from the channel. + unsafe { std::mem::transmute(result) } +} + +pub(crate) fn into_send_seek(result: stream::State<'_>) -> stream::State<'static> { + // SAFETY: Necessary to send the State through the channel; see above. + unsafe { std::mem::transmute(result) } +} + +fn into_recv_seek(result: stream::State<'static>) -> stream::State<'_> { + // SAFETY: This is to receive the State from the channel; see above. unsafe { std::mem::transmute(result) } } diff --git a/src/database/stream.rs b/src/database/stream.rs index a2a72e44..38c46596 100644 --- a/src/database/stream.rs +++ b/src/database/stream.rs @@ -16,19 +16,21 @@ use crate::{ Engine, Slice, }; -struct State<'a> { +pub(crate) struct State<'a> { inner: Inner<'a>, seek: bool, init: bool, } -trait Cursor<'a, T> { +pub(crate) trait Cursor<'a, T> { fn state(&self) -> &State<'a>; fn fetch(&self) -> Option; fn seek(&mut self); + fn init(self, from: From<'a>) -> Self; + fn get(&self) -> Option> { self.fetch() .map(Ok) @@ -45,7 +47,7 @@ type Inner<'a> = DBRawIteratorWithThreadMode<'a, Db>; type From<'a> = Option>; impl<'a> State<'a> { - fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { + pub(super) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { Self { inner: db.db.raw_iterator_cf_opt(&**cf, opts), init: true, @@ -53,7 +55,7 @@ impl<'a> State<'a> { } } - fn init_fwd(mut self, from: From<'_>) -> Self { + pub(super) fn init_fwd(mut self, from: From<'_>) -> Self { if let Some(key) = from { self.inner.seek(key); self.seek = true; @@ -62,7 +64,7 @@ impl<'a> State<'a> { self } - fn init_rev(mut self, from: From<'_>) -> Self { + pub(super) fn init_rev(mut self, from: From<'_>) -> Self { if let Some(key) = from { self.inner.seek_for_prev(key); self.seek = true; @@ -72,7 +74,7 @@ impl<'a> State<'a> { } #[inline] - fn seek_fwd(&mut self) { + pub(super) fn seek_fwd(&mut self) { if !exchange(&mut self.init, false) { self.inner.next(); } else if !self.seek { @@ -81,7 +83,7 @@ impl<'a> State<'a> { } #[inline] - fn seek_rev(&mut self) { + pub(super) fn seek_rev(&mut self) { if !exchange(&mut self.init, false) { self.inner.prev(); } else if !self.seek { diff --git a/src/database/stream/items.rs b/src/database/stream/items.rs index 54f8bc5c..77b08a0b 100644 --- a/src/database/stream/items.rs +++ b/src/database/stream/items.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{convert, pin::Pin, sync::Arc}; use conduit::Result; use futures::{ @@ -16,9 +16,17 @@ pub(crate) struct Items<'a> { } impl<'a> Items<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions, from: From<'_>) -> Self { + pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { Self { - state: State::new(db, cf, opts).init_fwd(from), + state: State::new(db, cf, opts), + } + } +} + +impl<'a> convert::From> for Items<'a> { + fn from(state: State<'a>) -> Self { + Self { + state, } } } @@ -30,6 +38,13 @@ impl<'a> Cursor<'a, KeyVal<'a>> for Items<'a> { #[inline] fn seek(&mut self) { self.state.seek_fwd(); } + + #[inline] + fn init(self, from: From<'a>) -> Self { + Self { + state: self.state.init_fwd(from), + } + } } impl<'a> Stream for Items<'a> { diff --git a/src/database/stream/items_rev.rs b/src/database/stream/items_rev.rs index 26492db8..dfd3a107 100644 --- a/src/database/stream/items_rev.rs +++ b/src/database/stream/items_rev.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{convert, pin::Pin, sync::Arc}; use conduit::Result; use futures::{ @@ -16,9 +16,17 @@ pub(crate) struct ItemsRev<'a> { } impl<'a> ItemsRev<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions, from: From<'_>) -> Self { + pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { Self { - state: State::new(db, cf, opts).init_rev(from), + state: State::new(db, cf, opts), + } + } +} + +impl<'a> convert::From> for ItemsRev<'a> { + fn from(state: State<'a>) -> Self { + Self { + state, } } } @@ -30,6 +38,13 @@ impl<'a> Cursor<'a, KeyVal<'a>> for ItemsRev<'a> { #[inline] fn seek(&mut self) { self.state.seek_rev(); } + + #[inline] + fn init(self, from: From<'a>) -> Self { + Self { + state: self.state.init_rev(from), + } + } } impl<'a> Stream for ItemsRev<'a> { diff --git a/src/database/stream/keys.rs b/src/database/stream/keys.rs index 91884c8d..2ce88959 100644 --- a/src/database/stream/keys.rs +++ b/src/database/stream/keys.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{convert, pin::Pin, sync::Arc}; use conduit::Result; use futures::{ @@ -16,9 +16,17 @@ pub(crate) struct Keys<'a> { } impl<'a> Keys<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions, from: From<'_>) -> Self { + pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { Self { - state: State::new(db, cf, opts).init_fwd(from), + state: State::new(db, cf, opts), + } + } +} + +impl<'a> convert::From> for Keys<'a> { + fn from(state: State<'a>) -> Self { + Self { + state, } } } @@ -31,6 +39,13 @@ impl<'a> Cursor<'a, Key<'a>> for Keys<'a> { #[inline] fn seek(&mut self) { self.state.seek_fwd(); } + + #[inline] + fn init(self, from: From<'a>) -> Self { + Self { + state: self.state.init_fwd(from), + } + } } impl<'a> Stream for Keys<'a> { diff --git a/src/database/stream/keys_rev.rs b/src/database/stream/keys_rev.rs index 59f66c2e..12dae759 100644 --- a/src/database/stream/keys_rev.rs +++ b/src/database/stream/keys_rev.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, sync::Arc}; +use std::{convert, pin::Pin, sync::Arc}; use conduit::Result; use futures::{ @@ -16,9 +16,17 @@ pub(crate) struct KeysRev<'a> { } impl<'a> KeysRev<'a> { - pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions, from: From<'_>) -> Self { + pub(crate) fn new(db: &'a Arc, cf: &'a Arc, opts: ReadOptions) -> Self { Self { - state: State::new(db, cf, opts).init_rev(from), + state: State::new(db, cf, opts), + } + } +} + +impl<'a> convert::From> for KeysRev<'a> { + fn from(state: State<'a>) -> Self { + Self { + state, } } } @@ -31,6 +39,13 @@ impl<'a> Cursor<'a, Key<'a>> for KeysRev<'a> { #[inline] fn seek(&mut self) { self.state.seek_rev(); } + + #[inline] + fn init(self, from: From<'a>) -> Self { + Self { + state: self.state.init_rev(from), + } + } } impl<'a> Stream for KeysRev<'a> {