additional stream tools

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-09-30 06:46:54 +00:00 committed by strawberry
parent a8d5cf9651
commit 6b80361c31
10 changed files with 242 additions and 46 deletions

View file

@ -1,4 +1,5 @@
mod debug_inspect;
mod into_is_ok;
mod log_debug_err;
mod log_err;
mod map_expect;
@ -6,8 +7,8 @@ mod not_found;
mod unwrap_infallible;
pub use self::{
debug_inspect::DebugInspect, log_debug_err::LogDebugErr, log_err::LogErr, map_expect::MapExpect,
not_found::NotFound, unwrap_infallible::UnwrapInfallible,
debug_inspect::DebugInspect, into_is_ok::IntoIsOk, log_debug_err::LogDebugErr, log_err::LogErr,
map_expect::MapExpect, not_found::NotFound, unwrap_infallible::UnwrapInfallible,
};
pub type Result<T = (), E = crate::Error> = std::result::Result<T, E>;

View file

@ -0,0 +1,10 @@
use super::Result;
pub trait IntoIsOk<T, E> {
fn into_is_ok(self) -> bool;
}
impl<T, E> IntoIsOk<T, E> for Result<T, E> {
#[inline]
fn into_is_ok(self) -> bool { self.is_ok() }
}

16
src/core/utils/bool.rs Normal file
View file

@ -0,0 +1,16 @@
//! Trait BoolExt
/// Boolean extensions and chain.starters
pub trait BoolExt {
fn or<T, F: FnOnce() -> T>(self, f: F) -> Option<T>;
fn or_some<T>(self, t: T) -> Option<T>;
}
impl BoolExt for bool {
#[inline]
fn or<T, F: FnOnce() -> T>(self, f: F) -> Option<T> { (!self).then(f) }
#[inline]
fn or_some<T>(self, t: T) -> Option<T> { (!self).then_some(t) }
}

View file

@ -0,0 +1,3 @@
mod try_ext_ext;
pub use try_ext_ext::TryExtExt;

View file

@ -0,0 +1,48 @@
//! Extended external extensions to futures::TryFutureExt
use futures::{future::MapOkOrElse, TryFuture, TryFutureExt};
/// This interface is not necessarily complete; feel free to add as-needed.
pub trait TryExtExt<T, E>
where
Self: TryFuture<Ok = T, Error = E> + Send,
{
fn map_ok_or<U, F>(
self, default: U, f: F,
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
where
F: FnOnce(Self::Ok) -> U,
Self: Send + Sized;
fn ok(
self,
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> Option<Self::Ok>, impl FnOnce(Self::Error) -> Option<Self::Ok>>
where
Self: Sized;
}
impl<T, E, Fut> TryExtExt<T, E> for Fut
where
Fut: TryFuture<Ok = T, Error = E> + Send,
{
#[inline]
fn map_ok_or<U, F>(
self, default: U, f: F,
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
where
F: FnOnce(Self::Ok) -> U,
Self: Send + Sized,
{
self.map_ok_or_else(|_| default, f)
}
#[inline]
fn ok(
self,
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> Option<Self::Ok>, impl FnOnce(Self::Error) -> Option<Self::Ok>>
where
Self: Sized,
{
self.map_ok_or(None, Some)
}
}

View file

@ -1,7 +1,9 @@
pub mod bool;
pub mod bytes;
pub mod content_disposition;
pub mod debug;
pub mod defer;
pub mod future;
pub mod hash;
pub mod html;
pub mod json;
@ -19,15 +21,17 @@ pub use ::conduit_macros::implement;
pub use ::ctor::{ctor, dtor};
pub use self::{
bool::BoolExt,
bytes::{increment, u64_from_bytes, u64_from_u8, u64_from_u8x8},
debug::slice_truncated as debug_slice_truncated,
future::TryExtExt as TryFutureExtExt,
hash::calculate_hash,
html::Escape as HtmlEscape,
json::{deserialize_from_str, to_canonical_object},
math::clamp,
mutex_map::{Guard as MutexMapGuard, MutexMap},
rand::string as random_string,
stream::{IterStream, ReadyExt, TryReadyExt},
stream::{IterStream, ReadyExt, Tools as StreamTools, TryReadyExt},
string::{str_from_bytes, string_from_bytes},
sys::available_parallelism,
time::now_millis as millis_since_unix_epoch,

View file

@ -3,6 +3,7 @@ mod expect;
mod ignore;
mod iter_stream;
mod ready;
mod tools;
mod try_ready;
pub use cloned::Cloned;
@ -10,4 +11,5 @@ pub use expect::TryExpect;
pub use ignore::TryIgnore;
pub use iter_stream::IterStream;
pub use ready::ReadyExt;
pub use tools::Tools;
pub use try_ready::TryReadyExt;

View file

@ -2,7 +2,7 @@
use futures::{
future::{ready, Ready},
stream::{Any, Filter, FilterMap, Fold, ForEach, SkipWhile, Stream, StreamExt, TakeWhile},
stream::{Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile},
};
/// Synchronous combinators to augment futures::StreamExt. Most Stream
@ -11,98 +11,130 @@ use futures::{
/// convenience to reduce boilerplate by de-cluttering non-async predicates.
///
/// This interface is not necessarily complete; feel free to add as-needed.
pub trait ReadyExt<Item, S>
pub trait ReadyExt<Item>
where
S: Stream<Item = Item> + Send + ?Sized,
Self: Stream + Send + Sized,
Self: Stream<Item = Item> + Send + Sized,
{
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(S::Item) -> Ready<bool>>
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
where
F: Fn(S::Item) -> bool;
F: Fn(Item) -> bool;
fn ready_filter<'a, F>(self, f: F) -> Filter<Self, Ready<bool>, impl FnMut(&S::Item) -> Ready<bool> + 'a>
fn ready_filter<'a, F>(self, f: F) -> Filter<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
where
F: Fn(&S::Item) -> bool + 'a;
F: Fn(&Item) -> bool + 'a;
fn ready_filter_map<F, U>(self, f: F) -> FilterMap<Self, Ready<Option<U>>, impl FnMut(S::Item) -> Ready<Option<U>>>
fn ready_filter_map<F, U>(self, f: F) -> FilterMap<Self, Ready<Option<U>>, impl FnMut(Item) -> Ready<Option<U>>>
where
F: Fn(S::Item) -> Option<U>;
F: Fn(Item) -> Option<U>;
fn ready_fold<T, F>(self, init: T, f: F) -> Fold<Self, Ready<T>, T, impl FnMut(T, S::Item) -> Ready<T>>
fn ready_fold<T, F>(self, init: T, f: F) -> Fold<Self, Ready<T>, T, impl FnMut(T, Item) -> Ready<T>>
where
F: Fn(T, S::Item) -> T;
F: Fn(T, Item) -> T;
fn ready_for_each<F>(self, f: F) -> ForEach<Self, Ready<()>, impl FnMut(S::Item) -> Ready<()>>
fn ready_for_each<F>(self, f: F) -> ForEach<Self, Ready<()>, impl FnMut(Item) -> Ready<()>>
where
F: FnMut(S::Item);
F: FnMut(Item);
fn ready_take_while<'a, F>(self, f: F) -> TakeWhile<Self, Ready<bool>, impl FnMut(&S::Item) -> Ready<bool> + 'a>
fn ready_take_while<'a, F>(self, f: F) -> TakeWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
where
F: Fn(&S::Item) -> bool + 'a;
F: Fn(&Item) -> bool + 'a;
fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile<Self, Ready<bool>, impl FnMut(&S::Item) -> Ready<bool> + 'a>
fn ready_scan<B, T, F>(
self, init: T, f: F,
) -> Scan<Self, T, Ready<Option<B>>, impl FnMut(&mut T, Item) -> Ready<Option<B>>>
where
F: Fn(&S::Item) -> bool + 'a;
F: Fn(&mut T, Item) -> Option<B>;
fn ready_scan_each<T, F>(
self, init: T, f: F,
) -> Scan<Self, T, Ready<Option<Item>>, impl FnMut(&mut T, Item) -> Ready<Option<Item>>>
where
F: Fn(&mut T, &Item);
fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
where
F: Fn(&Item) -> bool + 'a;
}
impl<Item, S> ReadyExt<Item, S> for S
impl<Item, S> ReadyExt<Item> for S
where
S: Stream<Item = Item> + Send + ?Sized,
Self: Stream + Send + Sized,
S: Stream<Item = Item> + Send + Sized,
{
#[inline]
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(S::Item) -> Ready<bool>>
fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
where
F: Fn(S::Item) -> bool,
F: Fn(Item) -> bool,
{
self.any(move |t| ready(f(t)))
}
#[inline]
fn ready_filter<'a, F>(self, f: F) -> Filter<Self, Ready<bool>, impl FnMut(&S::Item) -> Ready<bool> + 'a>
fn ready_filter<'a, F>(self, f: F) -> Filter<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
where
F: Fn(&S::Item) -> bool + 'a,
F: Fn(&Item) -> bool + 'a,
{
self.filter(move |t| ready(f(t)))
}
#[inline]
fn ready_filter_map<F, U>(self, f: F) -> FilterMap<Self, Ready<Option<U>>, impl FnMut(S::Item) -> Ready<Option<U>>>
fn ready_filter_map<F, U>(self, f: F) -> FilterMap<Self, Ready<Option<U>>, impl FnMut(Item) -> Ready<Option<U>>>
where
F: Fn(S::Item) -> Option<U>,
F: Fn(Item) -> Option<U>,
{
self.filter_map(move |t| ready(f(t)))
}
#[inline]
fn ready_fold<T, F>(self, init: T, f: F) -> Fold<Self, Ready<T>, T, impl FnMut(T, S::Item) -> Ready<T>>
fn ready_fold<T, F>(self, init: T, f: F) -> Fold<Self, Ready<T>, T, impl FnMut(T, Item) -> Ready<T>>
where
F: Fn(T, S::Item) -> T,
F: Fn(T, Item) -> T,
{
self.fold(init, move |a, t| ready(f(a, t)))
}
#[inline]
#[allow(clippy::unit_arg)]
fn ready_for_each<F>(self, mut f: F) -> ForEach<Self, Ready<()>, impl FnMut(S::Item) -> Ready<()>>
fn ready_for_each<F>(self, mut f: F) -> ForEach<Self, Ready<()>, impl FnMut(Item) -> Ready<()>>
where
F: FnMut(S::Item),
F: FnMut(Item),
{
self.for_each(move |t| ready(f(t)))
}
#[inline]
fn ready_take_while<'a, F>(self, f: F) -> TakeWhile<Self, Ready<bool>, impl FnMut(&S::Item) -> Ready<bool> + 'a>
fn ready_take_while<'a, F>(self, f: F) -> TakeWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
where
F: Fn(&S::Item) -> bool + 'a,
F: Fn(&Item) -> bool + 'a,
{
self.take_while(move |t| ready(f(t)))
}
#[inline]
fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile<Self, Ready<bool>, impl FnMut(&S::Item) -> Ready<bool> + 'a>
fn ready_scan<B, T, F>(
self, init: T, f: F,
) -> Scan<Self, T, Ready<Option<B>>, impl FnMut(&mut T, Item) -> Ready<Option<B>>>
where
F: Fn(&S::Item) -> bool + 'a,
F: Fn(&mut T, Item) -> Option<B>,
{
self.scan(init, move |s, t| ready(f(s, t)))
}
fn ready_scan_each<T, F>(
self, init: T, f: F,
) -> Scan<Self, T, Ready<Option<Item>>, impl FnMut(&mut T, Item) -> Ready<Option<Item>>>
where
F: Fn(&mut T, &Item),
{
self.ready_scan(init, move |s, t| {
f(s, &t);
Some(t)
})
}
#[inline]
fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
where
F: Fn(&Item) -> bool + 'a,
{
self.skip_while(move |t| ready(f(t)))
}

View file

@ -0,0 +1,80 @@
//! StreamTools for futures::Stream
use std::{collections::HashMap, hash::Hash};
use futures::{Future, Stream, StreamExt};
use super::ReadyExt;
use crate::expected;
/// StreamTools
///
/// This interface is not necessarily complete; feel free to add as-needed.
pub trait Tools<Item>
where
Self: Stream<Item = Item> + Send + Sized,
<Self as Stream>::Item: Send,
{
fn counts(self) -> impl Future<Output = HashMap<Item, usize>> + Send
where
<Self as Stream>::Item: Eq + Hash;
fn counts_by<K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
where
F: Fn(Item) -> K + Send,
K: Eq + Hash + Send;
fn counts_by_with_cap<const CAP: usize, K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
where
F: Fn(Item) -> K + Send,
K: Eq + Hash + Send;
fn counts_with_cap<const CAP: usize>(self) -> impl Future<Output = HashMap<Item, usize>> + Send
where
<Self as Stream>::Item: Eq + Hash;
}
impl<Item, S> Tools<Item> for S
where
S: Stream<Item = Item> + Send + Sized,
<Self as Stream>::Item: Send,
{
#[inline]
fn counts(self) -> impl Future<Output = HashMap<Item, usize>> + Send
where
<Self as Stream>::Item: Eq + Hash,
{
self.counts_with_cap::<0>()
}
#[inline]
fn counts_by<K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
where
F: Fn(Item) -> K + Send,
K: Eq + Hash + Send,
{
self.counts_by_with_cap::<0, K, F>(f)
}
#[inline]
fn counts_by_with_cap<const CAP: usize, K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
where
F: Fn(Item) -> K + Send,
K: Eq + Hash + Send,
{
self.map(f).counts_with_cap::<CAP>()
}
#[inline]
fn counts_with_cap<const CAP: usize>(self) -> impl Future<Output = HashMap<Item, usize>> + Send
where
<Self as Stream>::Item: Eq + Hash,
{
self.ready_fold(HashMap::with_capacity(CAP), |mut counts, item| {
let entry = counts.entry(item).or_default();
let value = *entry;
*entry = expected!(value + 1);
counts
})
}
}

View file

@ -4,7 +4,7 @@ use std::{collections::HashSet, sync::Arc};
use conduit::{
err,
utils::{stream::TryIgnore, ReadyExt},
utils::{stream::TryIgnore, ReadyExt, StreamTools},
warn, Result,
};
use data::Data;
@ -495,11 +495,13 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub fn servers_invite_via<'a>(&'a self, room_id: &'a RoomId) -> impl Stream<Item = &ServerName> + Send + 'a {
type KeyVal<'a> = (Ignore, Vec<&'a ServerName>);
self.db
.roomid_inviteviaservers
.stream_prefix_raw(room_id)
.ignore_err()
.map(|(_, servers): (Ignore, Vec<&ServerName>)| &**(servers.last().expect("at least one servername")))
.map(|(_, servers): KeyVal<'_>| *servers.last().expect("at least one server"))
}
/// Gets up to three servers that are likely to be in the room in the
@ -525,16 +527,14 @@ impl Service {
let mut servers: Vec<OwnedServerName> = self
.room_members(room_id)
.collect::<Vec<_>>()
.await
.iter()
.counts_by(|user| user.server_name().to_owned())
.iter()
.await
.into_iter()
.sorted_by_key(|(_, users)| *users)
.map(|(server, _)| server.to_owned())
.map(|(server, _)| server)
.rev()
.take(3)
.collect_vec();
.collect();
if let Some(server) = most_powerful_user_server {
servers.insert(0, server);