additional stream extensions for any/all
additional stream extension TryBroadbandExt Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
c2d97aaa5e
commit
59d5e3ebf1
6 changed files with 121 additions and 5 deletions
|
@ -1,5 +1,4 @@
|
||||||
//! Broadband stream combinator extensions to futures::Stream
|
//! Broadband stream combinator extensions to futures::Stream
|
||||||
#![allow(clippy::type_complexity)]
|
|
||||||
|
|
||||||
use std::convert::identity;
|
use std::convert::identity;
|
||||||
|
|
||||||
|
@ -18,6 +17,18 @@ pub trait BroadbandExt<Item>
|
||||||
where
|
where
|
||||||
Self: Stream<Item = Item> + Send + Sized,
|
Self: Stream<Item = Item> + Send + Sized,
|
||||||
{
|
{
|
||||||
|
fn broadn_all<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
|
||||||
|
where
|
||||||
|
N: Into<Option<usize>>,
|
||||||
|
F: Fn(Item) -> Fut + Send,
|
||||||
|
Fut: Future<Output = bool> + Send;
|
||||||
|
|
||||||
|
fn broadn_any<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
|
||||||
|
where
|
||||||
|
N: Into<Option<usize>>,
|
||||||
|
F: Fn(Item) -> Fut + Send,
|
||||||
|
Fut: Future<Output = bool> + Send;
|
||||||
|
|
||||||
/// Concurrent filter_map(); unordered results
|
/// Concurrent filter_map(); unordered results
|
||||||
fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
||||||
where
|
where
|
||||||
|
@ -33,6 +44,24 @@ where
|
||||||
Fut: Future<Output = U> + Send,
|
Fut: Future<Output = U> + Send,
|
||||||
U: Send;
|
U: Send;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn broad_all<F, Fut>(self, f: F) -> impl Future<Output = bool> + Send
|
||||||
|
where
|
||||||
|
F: Fn(Item) -> Fut + Send,
|
||||||
|
Fut: Future<Output = bool> + Send,
|
||||||
|
{
|
||||||
|
self.broadn_all(None, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn broad_any<F, Fut>(self, f: F) -> impl Future<Output = bool> + Send
|
||||||
|
where
|
||||||
|
F: Fn(Item) -> Fut + Send,
|
||||||
|
Fut: Future<Output = bool> + Send,
|
||||||
|
{
|
||||||
|
self.broadn_any(None, f)
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn broad_filter_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
|
fn broad_filter_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
|
||||||
where
|
where
|
||||||
|
@ -58,6 +87,30 @@ impl<Item, S> BroadbandExt<Item> for S
|
||||||
where
|
where
|
||||||
S: Stream<Item = Item> + Send + Sized,
|
S: Stream<Item = Item> + Send + Sized,
|
||||||
{
|
{
|
||||||
|
#[inline]
|
||||||
|
fn broadn_all<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
|
||||||
|
where
|
||||||
|
N: Into<Option<usize>>,
|
||||||
|
F: Fn(Item) -> Fut + Send,
|
||||||
|
Fut: Future<Output = bool> + Send,
|
||||||
|
{
|
||||||
|
self.map(f)
|
||||||
|
.buffer_unordered(n.into().unwrap_or(WIDTH))
|
||||||
|
.ready_all(identity)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn broadn_any<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
|
||||||
|
where
|
||||||
|
N: Into<Option<usize>>,
|
||||||
|
F: Fn(Item) -> Fut + Send,
|
||||||
|
Fut: Future<Output = bool> + Send,
|
||||||
|
{
|
||||||
|
self.map(f)
|
||||||
|
.buffer_unordered(n.into().unwrap_or(WIDTH))
|
||||||
|
.ready_any(identity)
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
|
||||||
where
|
where
|
||||||
|
|
|
@ -4,12 +4,16 @@ use futures::{
|
||||||
StreamExt,
|
StreamExt,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::{Error, Result};
|
||||||
|
|
||||||
pub trait IterStream<I: IntoIterator + Send> {
|
pub trait IterStream<I: IntoIterator + Send> {
|
||||||
/// Convert an Iterator into a Stream
|
/// Convert an Iterator into a Stream
|
||||||
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
|
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
|
||||||
|
|
||||||
/// Convert an Iterator into a TryStream
|
/// Convert an Iterator into a TryStream
|
||||||
fn try_stream(self) -> impl TryStream<Ok = <I as IntoIterator>::Item, Error = crate::Error> + Send;
|
fn try_stream(
|
||||||
|
self,
|
||||||
|
) -> impl TryStream<Ok = <I as IntoIterator>::Item, Error = Error, Item = Result<<I as IntoIterator>::Item, Error>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I> IterStream<I> for I
|
impl<I> IterStream<I> for I
|
||||||
|
@ -21,7 +25,10 @@ where
|
||||||
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
|
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn try_stream(self) -> impl TryStream<Ok = <I as IntoIterator>::Item, Error = crate::Error> + Send {
|
fn try_stream(
|
||||||
|
self,
|
||||||
|
) -> impl TryStream<Ok = <I as IntoIterator>::Item, Error = Error, Item = Result<<I as IntoIterator>::Item, Error>> + Send
|
||||||
|
{
|
||||||
self.stream().map(Ok)
|
self.stream().map(Ok)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ mod ignore;
|
||||||
mod iter_stream;
|
mod iter_stream;
|
||||||
mod ready;
|
mod ready;
|
||||||
mod tools;
|
mod tools;
|
||||||
|
mod try_broadband;
|
||||||
mod try_ready;
|
mod try_ready;
|
||||||
mod wideband;
|
mod wideband;
|
||||||
|
|
||||||
|
@ -15,5 +16,6 @@ pub use ignore::TryIgnore;
|
||||||
pub use iter_stream::IterStream;
|
pub use iter_stream::IterStream;
|
||||||
pub use ready::ReadyExt;
|
pub use ready::ReadyExt;
|
||||||
pub use tools::Tools;
|
pub use tools::Tools;
|
||||||
|
pub use try_broadband::TryBroadbandExt;
|
||||||
pub use try_ready::TryReadyExt;
|
pub use try_ready::TryReadyExt;
|
||||||
pub use wideband::WidebandExt;
|
pub use wideband::WidebandExt;
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{ready, Ready},
|
future::{ready, Ready},
|
||||||
stream::{Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile},
|
stream::{All, Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Synchronous combinators to augment futures::StreamExt. Most Stream
|
/// Synchronous combinators to augment futures::StreamExt. Most Stream
|
||||||
|
@ -16,6 +16,10 @@ pub trait ReadyExt<Item>
|
||||||
where
|
where
|
||||||
Self: Stream<Item = Item> + Send + Sized,
|
Self: Stream<Item = Item> + Send + Sized,
|
||||||
{
|
{
|
||||||
|
fn ready_all<F>(self, f: F) -> All<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
|
||||||
|
where
|
||||||
|
F: Fn(Item) -> bool;
|
||||||
|
|
||||||
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
|
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
|
||||||
where
|
where
|
||||||
F: Fn(Item) -> bool;
|
F: Fn(Item) -> bool;
|
||||||
|
@ -66,6 +70,14 @@ impl<Item, S> ReadyExt<Item> for S
|
||||||
where
|
where
|
||||||
S: Stream<Item = Item> + Send + Sized,
|
S: Stream<Item = Item> + Send + Sized,
|
||||||
{
|
{
|
||||||
|
#[inline]
|
||||||
|
fn ready_all<F>(self, f: F) -> All<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
|
||||||
|
where
|
||||||
|
F: Fn(Item) -> bool,
|
||||||
|
{
|
||||||
|
self.all(move |t| ready(f(t)))
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
|
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
|
||||||
where
|
where
|
||||||
|
|
43
src/core/utils/stream/try_broadband.rs
Normal file
43
src/core/utils/stream/try_broadband.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
//! Synchronous combinator extensions to futures::TryStream
|
||||||
|
|
||||||
|
use futures::{TryFuture, TryStream, TryStreamExt};
|
||||||
|
|
||||||
|
use crate::Result;
|
||||||
|
|
||||||
|
const WIDTH: usize = 32;
|
||||||
|
|
||||||
|
/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
|
||||||
|
/// produce out-of-order
|
||||||
|
pub trait TryBroadbandExt<T, E>
|
||||||
|
where
|
||||||
|
Self: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
|
||||||
|
{
|
||||||
|
fn broadn_and_then<U, F, Fut, N>(self, n: N, f: F) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
|
||||||
|
where
|
||||||
|
N: Into<Option<usize>>,
|
||||||
|
F: Fn(Self::Ok) -> Fut + Send + Sync,
|
||||||
|
Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send;
|
||||||
|
|
||||||
|
fn broad_and_then<U, F, Fut>(self, f: F) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
|
||||||
|
where
|
||||||
|
F: Fn(Self::Ok) -> Fut + Send + Sync,
|
||||||
|
Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
|
||||||
|
{
|
||||||
|
self.broadn_and_then(None, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E, S> TryBroadbandExt<T, E> for S
|
||||||
|
where
|
||||||
|
S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
|
||||||
|
{
|
||||||
|
fn broadn_and_then<U, F, Fut, N>(self, n: N, f: F) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
|
||||||
|
where
|
||||||
|
N: Into<Option<usize>>,
|
||||||
|
F: Fn(Self::Ok) -> Fut + Send + Sync,
|
||||||
|
Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
|
||||||
|
{
|
||||||
|
self.map_ok(f)
|
||||||
|
.try_buffer_unordered(n.into().unwrap_or(WIDTH))
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,4 @@
|
||||||
//! Wideband stream combinator extensions to futures::Stream
|
//! Wideband stream combinator extensions to futures::Stream
|
||||||
#![allow(clippy::type_complexity)]
|
|
||||||
|
|
||||||
use std::convert::identity;
|
use std::convert::identity;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue