//! Synchronous combinator extensions to futures::Stream #![allow(clippy::type_complexity)] use futures::{ future::{FutureExt, Ready, ready}, stream::{ All, Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile, }, }; /// Synchronous combinators to augment futures::StreamExt. Most Stream /// combinators take asynchronous arguments, but often only simple predicates /// are required to steer a Stream like an Iterator. This suite provides a /// convenience to reduce boilerplate by de-cluttering non-async predicates. /// /// This interface is not necessarily complete; feel free to add as-needed. pub trait ReadyExt where Self: Stream + Sized, { fn ready_all(self, f: F) -> All, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool; fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool; fn ready_find<'a, F>(self, f: F) -> impl Future> + Send where Self: Send + Unpin + 'a, F: Fn(&Item) -> bool + Send + 'a, Item: Send; fn ready_filter<'a, F>( self, f: F, ) -> Filter, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a; fn ready_filter_map( self, f: F, ) -> FilterMap>, impl FnMut(Item) -> Ready>> where F: Fn(Item) -> Option; fn ready_fold( self, init: T, f: F, ) -> Fold, T, impl FnMut(T, Item) -> Ready> where F: Fn(T, Item) -> T; fn ready_fold_default( self, f: F, ) -> Fold, T, impl FnMut(T, Item) -> Ready> where F: Fn(T, Item) -> T, T: Default; fn ready_for_each(self, f: F) -> ForEach, impl FnMut(Item) -> Ready<()>> where F: FnMut(Item); fn ready_take_while<'a, F>( self, f: F, ) -> TakeWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a; fn ready_scan( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, Item) -> Option; fn ready_scan_each( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, &Item); fn ready_skip_while<'a, F>( self, f: F, ) -> SkipWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a; } impl ReadyExt for S where S: Stream + Sized, { #[inline] fn ready_all(self, f: F) -> All, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool, { self.all(move |t| ready(f(t))) } #[inline] fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool, { self.any(move |t| ready(f(t))) } #[inline] fn ready_find<'a, F>(self, f: F) -> impl Future> + Send where Self: Send + Unpin + 'a, F: Fn(&Item) -> bool + Send + 'a, Item: Send, { self.ready_filter(f) .take(1) .into_future() .map(|(curr, _next)| curr) } #[inline] fn ready_filter<'a, F>( self, f: F, ) -> Filter, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a, { self.filter(move |t| ready(f(t))) } #[inline] fn ready_filter_map( self, f: F, ) -> FilterMap>, impl FnMut(Item) -> Ready>> where F: Fn(Item) -> Option, { self.filter_map(move |t| ready(f(t))) } #[inline] fn ready_fold( self, init: T, f: F, ) -> Fold, T, impl FnMut(T, Item) -> Ready> where F: Fn(T, Item) -> T, { self.fold(init, move |a, t| ready(f(a, t))) } #[inline] fn ready_fold_default( self, f: F, ) -> Fold, T, impl FnMut(T, Item) -> Ready> where F: Fn(T, Item) -> T, T: Default, { self.ready_fold(T::default(), f) } #[inline] #[allow(clippy::unit_arg)] fn ready_for_each( self, mut f: F, ) -> ForEach, impl FnMut(Item) -> Ready<()>> where F: FnMut(Item), { self.for_each(move |t| ready(f(t))) } #[inline] fn ready_take_while<'a, F>( self, f: F, ) -> TakeWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a, { self.take_while(move |t| ready(f(t))) } #[inline] fn ready_scan( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, Item) -> Option, { self.scan(init, move |s, t| ready(f(s, t))) } #[inline] fn ready_scan_each( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, &Item), { self.ready_scan(init, move |s, t| { f(s, &t); Some(t) }) } #[inline] fn ready_skip_while<'a, F>( self, f: F, ) -> SkipWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a, { self.skip_while(move |t| ready(f(t))) } }