diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index c7bfa021..23455322 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -7,6 +7,7 @@ mod iter_stream; mod ready; mod tools; mod try_broadband; +mod try_parallel; mod try_ready; mod try_tools; mod try_wideband; @@ -24,6 +25,7 @@ pub use iter_stream::IterStream; pub use ready::ReadyExt; pub use tools::Tools; pub use try_broadband::TryBroadbandExt; +pub use try_parallel::TryParallelExt; pub use try_ready::TryReadyExt; pub use try_tools::TryTools; pub use try_wideband::TryWidebandExt; diff --git a/src/core/utils/stream/try_broadband.rs b/src/core/utils/stream/try_broadband.rs index c72fcc2c..361b4a92 100644 --- a/src/core/utils/stream/try_broadband.rs +++ b/src/core/utils/stream/try_broadband.rs @@ -18,7 +18,7 @@ where ) -> impl TryStream> + Send where N: Into>, - F: Fn(Self::Ok) -> Fut + Send + Sync, + F: Fn(Self::Ok) -> Fut + Send, Fut: TryFuture> + Send; fn broad_and_then( @@ -26,7 +26,7 @@ where f: F, ) -> impl TryStream> + Send where - F: Fn(Self::Ok) -> Fut + Send + Sync, + F: Fn(Self::Ok) -> Fut + Send, Fut: TryFuture> + Send, { self.broadn_and_then(None, f) @@ -44,7 +44,7 @@ where ) -> impl TryStream> + Send where N: Into>, - F: Fn(Self::Ok) -> Fut + Send + Sync, + F: Fn(Self::Ok) -> Fut + Send, Fut: TryFuture> + Send, { self.map_ok(f) diff --git a/src/core/utils/stream/try_parallel.rs b/src/core/utils/stream/try_parallel.rs new file mode 100644 index 00000000..7f8a63b1 --- /dev/null +++ b/src/core/utils/stream/try_parallel.rs @@ -0,0 +1,71 @@ +//! Parallelism stream combinator extensions to futures::Stream + +use futures::{stream::TryStream, TryFutureExt}; +use tokio::{runtime, task::JoinError}; + +use super::TryBroadbandExt; +use crate::{utils::sys::available_parallelism, Error, Result}; + +/// Parallelism extensions to augment futures::StreamExt. These combinators are +/// for computation-oriented workloads, unlike -band combinators for I/O +/// workloads; these default to the available compute parallelism for the +/// system. Threads are currently drawn from the tokio-spawn pool. Results are +/// unordered. +pub trait TryParallelExt +where + Self: TryStream> + Send + Sized, + E: From + From + Send + 'static, + T: Send + 'static, +{ + fn paralleln_and_then( + self, + h: H, + n: N, + f: F, + ) -> impl TryStream> + Send + where + N: Into>, + H: Into>, + F: Fn(Self::Ok) -> Result + Clone + Send + 'static, + U: Send + 'static; + + fn parallel_and_then( + self, + h: H, + f: F, + ) -> impl TryStream> + Send + where + H: Into>, + F: Fn(Self::Ok) -> Result + Clone + Send + 'static, + U: Send + 'static, + { + self.paralleln_and_then(h, None, f) + } +} + +impl TryParallelExt for S +where + S: TryStream> + Send + Sized, + E: From + From + Send + 'static, + T: Send + 'static, +{ + fn paralleln_and_then( + self, + h: H, + n: N, + f: F, + ) -> impl TryStream> + Send + where + N: Into>, + H: Into>, + F: Fn(Self::Ok) -> Result + Clone + Send + 'static, + U: Send + 'static, + { + let n = n.into().unwrap_or_else(available_parallelism); + let h = h.into().unwrap_or_else(runtime::Handle::current); + self.broadn_and_then(n, move |val| { + let (h, f) = (h.clone(), f.clone()); + async move { h.spawn_blocking(move || f(val)).map_err(E::from).await? } + }) + } +}