diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index 0fee0a3a..c7bfa021 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -9,6 +9,7 @@ mod tools; mod try_broadband; mod try_ready; mod try_tools; +mod try_wideband; mod wideband; pub use band::{ @@ -25,4 +26,5 @@ pub use tools::Tools; pub use try_broadband::TryBroadbandExt; pub use try_ready::TryReadyExt; pub use try_tools::TryTools; +pub use try_wideband::TryWidebandExt; pub use wideband::WidebandExt; diff --git a/src/core/utils/stream/try_wideband.rs b/src/core/utils/stream/try_wideband.rs new file mode 100644 index 00000000..0af3c8ec --- /dev/null +++ b/src/core/utils/stream/try_wideband.rs @@ -0,0 +1,57 @@ +//! Synchronous combinator extensions to futures::TryStream + +use futures::{TryFuture, TryStream, TryStreamExt}; + +use super::automatic_width; +use crate::Result; + +/// Concurrency extensions to augment futures::TryStreamExt. wide_ combinators +/// produce in-order results +pub trait TryWidebandExt +where + Self: TryStream> + Send + Sized, +{ + fn widen_and_then( + self, + n: N, + f: F, + ) -> impl TryStream> + Send + where + N: Into>, + F: Fn(Self::Ok) -> Fut + Send, + Fut: TryFuture> + Send, + U: Send; + + fn wide_and_then( + self, + f: F, + ) -> impl TryStream> + Send + where + F: Fn(Self::Ok) -> Fut + Send, + Fut: TryFuture> + Send, + U: Send, + { + self.widen_and_then(None, f) + } +} + +impl TryWidebandExt for S +where + S: TryStream> + Send + Sized, + E: Send, +{ + fn widen_and_then( + self, + n: N, + f: F, + ) -> impl TryStream> + Send + where + N: Into>, + F: Fn(Self::Ok) -> Fut + Send, + Fut: TryFuture> + Send, + U: Send, + { + self.map_ok(f) + .try_buffered(n.into().unwrap_or_else(automatic_width)) + } +}