From 7d6710c03346f7157c74538b61e407c1b8536f64 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 30 Nov 2024 08:27:51 +0000 Subject: [PATCH] add broadband stream extensions Signed-off-by: Jason Volk --- src/core/utils/stream/broadband.rs | 84 ++++++++++++++++++++++++++++++ src/core/utils/stream/mod.rs | 4 ++ src/core/utils/stream/wideband.rs | 84 ++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 src/core/utils/stream/broadband.rs create mode 100644 src/core/utils/stream/wideband.rs diff --git a/src/core/utils/stream/broadband.rs b/src/core/utils/stream/broadband.rs new file mode 100644 index 00000000..ce17830c --- /dev/null +++ b/src/core/utils/stream/broadband.rs @@ -0,0 +1,84 @@ +//! Broadband stream combinator extensions to futures::Stream +#![allow(clippy::type_complexity)] + +use std::convert::identity; + +use futures::{ + stream::{Stream, StreamExt}, + Future, +}; + +use super::ReadyExt; + +const WIDTH: usize = 32; + +/// Concurrency extensions to augment futures::StreamExt. broad_ combinators +/// produce out-of-order +pub trait BroadbandExt +where + Self: Stream + Send + Sized, +{ + /// Concurrent filter_map(); unordered results + fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future> + Send, + U: Send; + + fn broadn_then(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + U: Send; + + #[inline] + fn broad_filter_map(self, f: F) -> impl Stream + Send + where + F: Fn(Item) -> Fut + Send, + Fut: Future> + Send, + U: Send, + { + self.broadn_filter_map(None, f) + } + + #[inline] + fn broad_then(self, f: F) -> impl Stream + Send + where + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + U: Send, + { + self.broadn_then(None, f) + } +} + +impl BroadbandExt for S +where + S: Stream + Send + Sized, +{ + #[inline] + fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future> + Send, + U: Send, + { + self.map(f) + .buffer_unordered(n.into().unwrap_or(WIDTH)) + .ready_filter_map(identity) + } + + #[inline] + fn broadn_then(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + U: Send, + { + self.map(f).buffer_unordered(n.into().unwrap_or(WIDTH)) + } +} diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index 1111915b..45c2110d 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -1,3 +1,4 @@ +mod broadband; mod cloned; mod expect; mod ignore; @@ -5,7 +6,9 @@ mod iter_stream; mod ready; mod tools; mod try_ready; +mod wideband; +pub use broadband::BroadbandExt; pub use cloned::Cloned; pub use expect::TryExpect; pub use ignore::TryIgnore; @@ -13,3 +16,4 @@ pub use iter_stream::IterStream; pub use ready::ReadyExt; pub use tools::Tools; pub use try_ready::TryReadyExt; +pub use wideband::WidebandExt; diff --git a/src/core/utils/stream/wideband.rs b/src/core/utils/stream/wideband.rs new file mode 100644 index 00000000..100990b8 --- /dev/null +++ b/src/core/utils/stream/wideband.rs @@ -0,0 +1,84 @@ +//! Wideband stream combinator extensions to futures::Stream +#![allow(clippy::type_complexity)] + +use std::convert::identity; + +use futures::{ + stream::{Stream, StreamExt}, + Future, +}; + +use super::ReadyExt; + +const WIDTH: usize = 32; + +/// Concurrency extensions to augment futures::StreamExt. wideband_ combinators +/// produce in-order. +pub trait WidebandExt +where + Self: Stream + Send + Sized, +{ + /// Concurrent filter_map(); ordered results + fn widen_filter_map(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future> + Send, + U: Send; + + fn widen_then(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + U: Send; + + #[inline] + fn wide_filter_map(self, f: F) -> impl Stream + Send + where + F: Fn(Item) -> Fut + Send, + Fut: Future> + Send, + U: Send, + { + self.widen_filter_map(None, f) + } + + #[inline] + fn wide_then(self, f: F) -> impl Stream + Send + where + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + U: Send, + { + self.widen_then(None, f) + } +} + +impl WidebandExt for S +where + S: Stream + Send + Sized, +{ + #[inline] + fn widen_filter_map(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future> + Send, + U: Send, + { + self.map(f) + .buffered(n.into().unwrap_or(WIDTH)) + .ready_filter_map(identity) + } + + #[inline] + fn widen_then(self, n: N, f: F) -> impl Stream + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send, + Fut: Future + Send, + U: Send, + { + self.map(f).buffered(n.into().unwrap_or(WIDTH)) + } +}