diff --git a/src/core/utils/future/ext_ext.rs b/src/core/utils/future/ext_ext.rs new file mode 100644 index 00000000..38decaae --- /dev/null +++ b/src/core/utils/future/ext_ext.rs @@ -0,0 +1,34 @@ +//! Extended external extensions to futures::FutureExt + +use std::marker::Unpin; + +use futures::{future, future::Select, Future}; + +/// This interface is not necessarily complete; feel free to add as-needed. +pub trait ExtExt +where + Self: Future + Send, +{ + fn until(self, f: F) -> Select + where + Self: Sized, + F: FnOnce() -> B, + A: Future + From + Send + Unpin, + B: Future + Send + Unpin; +} + +impl ExtExt for Fut +where + Fut: Future + Send, +{ + #[inline] + fn until(self, f: F) -> Select + where + Self: Sized, + F: FnOnce() -> B, + A: Future + From + Send + Unpin, + B: Future + Send + Unpin, + { + future::select(self.into(), f()) + } +} diff --git a/src/core/utils/future/mod.rs b/src/core/utils/future/mod.rs index 3d8ec8f4..153dcfe1 100644 --- a/src/core/utils/future/mod.rs +++ b/src/core/utils/future/mod.rs @@ -1,5 +1,7 @@ +mod ext_ext; mod option_ext; mod try_ext_ext; +pub use ext_ext::ExtExt; pub use option_ext::OptionExt; pub use try_ext_ext::TryExtExt; diff --git a/src/core/utils/future/try_ext_ext.rs b/src/core/utils/future/try_ext_ext.rs index 19761309..aa3d72e4 100644 --- a/src/core/utils/future/try_ext_ext.rs +++ b/src/core/utils/future/try_ext_ext.rs @@ -4,8 +4,11 @@ // caller only ever caring about result status while discarding all contents. #![allow(clippy::wrong_self_convention)] +use std::marker::Unpin; + use futures::{ - future::{MapOkOrElse, UnwrapOrElse}, + future, + future::{MapOkOrElse, TrySelect, UnwrapOrElse}, TryFuture, TryFutureExt, }; @@ -46,6 +49,13 @@ where where Self: Sized; + fn try_until(self, f: F) -> TrySelect + where + Self: Sized, + F: FnOnce() -> B, + A: TryFuture + From + Send + Unpin, + B: TryFuture + Send + Unpin; + fn unwrap_or( self, default: Self::Ok, @@ -110,6 +120,17 @@ where self.map_ok_or(None, Some) } + #[inline] + fn try_until(self, f: F) -> TrySelect + where + Self: Sized, + F: FnOnce() -> B, + A: TryFuture + From + Send + Unpin, + B: TryFuture + Send + Unpin, + { + future::try_select(self.into(), f()) + } + #[inline] fn unwrap_or( self, diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index 61ae993d..0fee0a3a 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -8,6 +8,7 @@ mod ready; mod tools; mod try_broadband; mod try_ready; +mod try_tools; mod wideband; pub use band::{ @@ -23,4 +24,5 @@ pub use ready::ReadyExt; pub use tools::Tools; pub use try_broadband::TryBroadbandExt; pub use try_ready::TryReadyExt; +pub use try_tools::TryTools; pub use wideband::WidebandExt; diff --git a/src/core/utils/stream/try_ready.rs b/src/core/utils/stream/try_ready.rs index d8da04ec..3261acb6 100644 --- a/src/core/utils/stream/try_ready.rs +++ b/src/core/utils/stream/try_ready.rs @@ -3,7 +3,7 @@ use futures::{ future::{ready, Ready}, - stream::{AndThen, TryFilterMap, TryFold, TryForEach, TryStream, TryStreamExt}, + stream::{AndThen, TryFilterMap, TryFold, TryForEach, TryStream, TryStreamExt, TryTakeWhile}, }; use crate::Result; @@ -56,6 +56,13 @@ where ) -> TryForEach>, impl FnMut(S::Ok) -> Ready>> where F: FnMut(S::Ok) -> Result<(), E>; + + fn ready_try_take_while( + self, + f: F, + ) -> TryTakeWhile>, impl FnMut(&S::Ok) -> Ready>> + where + F: Fn(&S::Ok) -> Result; } impl TryReadyExt for S @@ -122,4 +129,15 @@ where { self.try_for_each(move |t| ready(f(t))) } + + #[inline] + fn ready_try_take_while( + self, + f: F, + ) -> TryTakeWhile>, impl FnMut(&S::Ok) -> Ready>> + where + F: Fn(&S::Ok) -> Result, + { + self.try_take_while(move |t| ready(f(t))) + } } diff --git a/src/core/utils/stream/try_tools.rs b/src/core/utils/stream/try_tools.rs new file mode 100644 index 00000000..3ddce6ad --- /dev/null +++ b/src/core/utils/stream/try_tools.rs @@ -0,0 +1,44 @@ +//! TryStreamTools for futures::TryStream +#![allow(clippy::type_complexity)] + +use futures::{future, future::Ready, stream::TryTakeWhile, TryStream, TryStreamExt}; + +use crate::Result; + +/// TryStreamTools +pub trait TryTools +where + S: TryStream> + Send + ?Sized, + Self: TryStream + Send + Sized, +{ + fn try_take( + self, + n: usize, + ) -> TryTakeWhile< + Self, + Ready>, + impl FnMut(&S::Ok) -> Ready>, + >; +} + +impl TryTools for S +where + S: TryStream> + Send + ?Sized, + Self: TryStream + Send + Sized, +{ + #[inline] + fn try_take( + self, + mut n: usize, + ) -> TryTakeWhile< + Self, + Ready>, + impl FnMut(&S::Ok) -> Ready>, + > { + self.try_take_while(move |_| { + let res = future::ok(n > 0); + n = n.saturating_sub(1); + res + }) + } +}