diff --git a/src/core/utils/stream/broadband.rs b/src/core/utils/stream/broadband.rs index ce17830c..37416d63 100644 --- a/src/core/utils/stream/broadband.rs +++ b/src/core/utils/stream/broadband.rs @@ -1,5 +1,4 @@ //! Broadband stream combinator extensions to futures::Stream -#![allow(clippy::type_complexity)] use std::convert::identity; @@ -18,6 +17,18 @@ pub trait BroadbandExt where Self: Stream + Send + Sized, { + fn broadn_all(self, n: N, f: F) -> impl Future + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send; + + fn broadn_any(self, n: N, f: F) -> impl Future + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send; + /// Concurrent filter_map(); unordered results fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send where @@ -33,6 +44,24 @@ where Fut: Future + Send, U: Send; + #[inline] + fn broad_all(self, f: F) -> impl Future + Send + where + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + { + self.broadn_all(None, f) + } + + #[inline] + fn broad_any(self, f: F) -> impl Future + Send + where + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + { + self.broadn_any(None, f) + } + #[inline] fn broad_filter_map(self, f: F) -> impl Stream + Send where @@ -58,6 +87,30 @@ impl BroadbandExt for S where S: Stream + Send + Sized, { + #[inline] + fn broadn_all(self, n: N, f: F) -> impl Future + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + { + self.map(f) + .buffer_unordered(n.into().unwrap_or(WIDTH)) + .ready_all(identity) + } + + #[inline] + fn broadn_any(self, n: N, f: F) -> impl Future + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + { + self.map(f) + .buffer_unordered(n.into().unwrap_or(WIDTH)) + .ready_any(identity) + } + #[inline] fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send where diff --git a/src/core/utils/stream/iter_stream.rs b/src/core/utils/stream/iter_stream.rs index 69edf64f..2face4b0 100644 --- a/src/core/utils/stream/iter_stream.rs +++ b/src/core/utils/stream/iter_stream.rs @@ -4,12 +4,16 @@ use futures::{ StreamExt, }; +use crate::{Error, Result}; + pub trait IterStream { /// Convert an Iterator into a Stream fn stream(self) -> impl Stream::Item> + Send; /// Convert an Iterator into a TryStream - fn try_stream(self) -> impl TryStream::Item, Error = crate::Error> + Send; + fn try_stream( + self, + ) -> impl TryStream::Item, Error = Error, Item = Result<::Item, Error>> + Send; } impl IterStream for I @@ -21,7 +25,10 @@ where fn stream(self) -> impl Stream::Item> + Send { stream::iter(self) } #[inline] - fn try_stream(self) -> impl TryStream::Item, Error = crate::Error> + Send { + fn try_stream( + self, + ) -> impl TryStream::Item, Error = Error, Item = Result<::Item, Error>> + Send + { self.stream().map(Ok) } } diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index 45c2110d..c9138116 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -5,6 +5,7 @@ mod ignore; mod iter_stream; mod ready; mod tools; +mod try_broadband; mod try_ready; mod wideband; @@ -15,5 +16,6 @@ pub use ignore::TryIgnore; pub use iter_stream::IterStream; pub use ready::ReadyExt; pub use tools::Tools; +pub use try_broadband::TryBroadbandExt; pub use try_ready::TryReadyExt; pub use wideband::WidebandExt; diff --git a/src/core/utils/stream/ready.rs b/src/core/utils/stream/ready.rs index f4eec7d1..9bba589e 100644 --- a/src/core/utils/stream/ready.rs +++ b/src/core/utils/stream/ready.rs @@ -3,7 +3,7 @@ use futures::{ future::{ready, Ready}, - stream::{Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile}, + stream::{All, Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile}, }; /// Synchronous combinators to augment futures::StreamExt. Most Stream @@ -16,6 +16,10 @@ pub trait ReadyExt where Self: Stream + Send + Sized, { + fn ready_all(self, f: F) -> All, impl FnMut(Item) -> Ready> + where + F: Fn(Item) -> bool; + fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool; @@ -66,6 +70,14 @@ impl ReadyExt for S where S: Stream + Send + Sized, { + #[inline] + fn ready_all(self, f: F) -> All, impl FnMut(Item) -> Ready> + where + F: Fn(Item) -> bool, + { + self.all(move |t| ready(f(t))) + } + #[inline] fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where diff --git a/src/core/utils/stream/try_broadband.rs b/src/core/utils/stream/try_broadband.rs new file mode 100644 index 00000000..59c488e0 --- /dev/null +++ b/src/core/utils/stream/try_broadband.rs @@ -0,0 +1,43 @@ +//! Synchronous combinator extensions to futures::TryStream + +use futures::{TryFuture, TryStream, TryStreamExt}; + +use crate::Result; + +const WIDTH: usize = 32; + +/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators +/// produce out-of-order +pub trait TryBroadbandExt +where + Self: TryStream> + Send + Sized, +{ + fn broadn_and_then(self, n: N, f: F) -> impl TryStream> + Send + where + N: Into>, + F: Fn(Self::Ok) -> Fut + Send + Sync, + Fut: TryFuture> + Send; + + fn broad_and_then(self, f: F) -> impl TryStream> + Send + where + F: Fn(Self::Ok) -> Fut + Send + Sync, + Fut: TryFuture> + Send, + { + self.broadn_and_then(None, f) + } +} + +impl TryBroadbandExt for S +where + S: TryStream> + Send + Sized, +{ + fn broadn_and_then(self, n: N, f: F) -> impl TryStream> + Send + where + N: Into>, + F: Fn(Self::Ok) -> Fut + Send + Sync, + Fut: TryFuture> + Send, + { + self.map_ok(f) + .try_buffer_unordered(n.into().unwrap_or(WIDTH)) + } +} diff --git a/src/core/utils/stream/wideband.rs b/src/core/utils/stream/wideband.rs index 100990b8..053a351f 100644 --- a/src/core/utils/stream/wideband.rs +++ b/src/core/utils/stream/wideband.rs @@ -1,5 +1,4 @@ //! Wideband stream combinator extensions to futures::Stream -#![allow(clippy::type_complexity)] use std::convert::identity;