From 6b80361c31fc8b2eeeafbcfbf14a463c3423ee7c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 30 Sep 2024 06:46:54 +0000 Subject: [PATCH] additional stream tools Signed-off-by: Jason Volk --- src/core/result.rs | 5 +- src/core/result/into_is_ok.rs | 10 +++ src/core/utils/bool.rs | 16 +++++ src/core/utils/future/mod.rs | 3 + src/core/utils/future/try_ext_ext.rs | 48 +++++++++++++ src/core/utils/mod.rs | 6 +- src/core/utils/stream/mod.rs | 2 + src/core/utils/stream/ready.rs | 102 ++++++++++++++++++--------- src/core/utils/stream/tools.rs | 80 +++++++++++++++++++++ src/service/rooms/state_cache/mod.rs | 16 ++--- 10 files changed, 242 insertions(+), 46 deletions(-) create mode 100644 src/core/result/into_is_ok.rs create mode 100644 src/core/utils/bool.rs create mode 100644 src/core/utils/future/mod.rs create mode 100644 src/core/utils/future/try_ext_ext.rs create mode 100644 src/core/utils/stream/tools.rs diff --git a/src/core/result.rs b/src/core/result.rs index 96a34b8a..82d67a9c 100644 --- a/src/core/result.rs +++ b/src/core/result.rs @@ -1,4 +1,5 @@ mod debug_inspect; +mod into_is_ok; mod log_debug_err; mod log_err; mod map_expect; @@ -6,8 +7,8 @@ mod not_found; mod unwrap_infallible; pub use self::{ - debug_inspect::DebugInspect, log_debug_err::LogDebugErr, log_err::LogErr, map_expect::MapExpect, - not_found::NotFound, unwrap_infallible::UnwrapInfallible, + debug_inspect::DebugInspect, into_is_ok::IntoIsOk, log_debug_err::LogDebugErr, log_err::LogErr, + map_expect::MapExpect, not_found::NotFound, unwrap_infallible::UnwrapInfallible, }; pub type Result = std::result::Result; diff --git a/src/core/result/into_is_ok.rs b/src/core/result/into_is_ok.rs new file mode 100644 index 00000000..220ce010 --- /dev/null +++ b/src/core/result/into_is_ok.rs @@ -0,0 +1,10 @@ +use super::Result; + +pub trait IntoIsOk { + fn into_is_ok(self) -> bool; +} + +impl IntoIsOk for Result { + #[inline] + fn into_is_ok(self) -> bool { self.is_ok() } +} diff --git a/src/core/utils/bool.rs b/src/core/utils/bool.rs new file mode 100644 index 00000000..d7ce78fe --- /dev/null +++ b/src/core/utils/bool.rs @@ -0,0 +1,16 @@ +//! Trait BoolExt + +/// Boolean extensions and chain.starters +pub trait BoolExt { + fn or T>(self, f: F) -> Option; + + fn or_some(self, t: T) -> Option; +} + +impl BoolExt for bool { + #[inline] + fn or T>(self, f: F) -> Option { (!self).then(f) } + + #[inline] + fn or_some(self, t: T) -> Option { (!self).then_some(t) } +} diff --git a/src/core/utils/future/mod.rs b/src/core/utils/future/mod.rs new file mode 100644 index 00000000..6d45b656 --- /dev/null +++ b/src/core/utils/future/mod.rs @@ -0,0 +1,3 @@ +mod try_ext_ext; + +pub use try_ext_ext::TryExtExt; diff --git a/src/core/utils/future/try_ext_ext.rs b/src/core/utils/future/try_ext_ext.rs new file mode 100644 index 00000000..e444ad94 --- /dev/null +++ b/src/core/utils/future/try_ext_ext.rs @@ -0,0 +1,48 @@ +//! Extended external extensions to futures::TryFutureExt + +use futures::{future::MapOkOrElse, TryFuture, TryFutureExt}; + +/// This interface is not necessarily complete; feel free to add as-needed. +pub trait TryExtExt +where + Self: TryFuture + Send, +{ + fn map_ok_or( + self, default: U, f: F, + ) -> MapOkOrElse U, impl FnOnce(Self::Error) -> U> + where + F: FnOnce(Self::Ok) -> U, + Self: Send + Sized; + + fn ok( + self, + ) -> MapOkOrElse Option, impl FnOnce(Self::Error) -> Option> + where + Self: Sized; +} + +impl TryExtExt for Fut +where + Fut: TryFuture + Send, +{ + #[inline] + fn map_ok_or( + self, default: U, f: F, + ) -> MapOkOrElse U, impl FnOnce(Self::Error) -> U> + where + F: FnOnce(Self::Ok) -> U, + Self: Send + Sized, + { + self.map_ok_or_else(|_| default, f) + } + + #[inline] + fn ok( + self, + ) -> MapOkOrElse Option, impl FnOnce(Self::Error) -> Option> + where + Self: Sized, + { + self.map_ok_or(None, Some) + } +} diff --git a/src/core/utils/mod.rs b/src/core/utils/mod.rs index fef83395..c34691d2 100644 --- a/src/core/utils/mod.rs +++ b/src/core/utils/mod.rs @@ -1,7 +1,9 @@ +pub mod bool; pub mod bytes; pub mod content_disposition; pub mod debug; pub mod defer; +pub mod future; pub mod hash; pub mod html; pub mod json; @@ -19,15 +21,17 @@ pub use ::conduit_macros::implement; pub use ::ctor::{ctor, dtor}; pub use self::{ + bool::BoolExt, bytes::{increment, u64_from_bytes, u64_from_u8, u64_from_u8x8}, debug::slice_truncated as debug_slice_truncated, + future::TryExtExt as TryFutureExtExt, hash::calculate_hash, html::Escape as HtmlEscape, json::{deserialize_from_str, to_canonical_object}, math::clamp, mutex_map::{Guard as MutexMapGuard, MutexMap}, rand::string as random_string, - stream::{IterStream, ReadyExt, TryReadyExt}, + stream::{IterStream, ReadyExt, Tools as StreamTools, TryReadyExt}, string::{str_from_bytes, string_from_bytes}, sys::available_parallelism, time::now_millis as millis_since_unix_epoch, diff --git a/src/core/utils/stream/mod.rs b/src/core/utils/stream/mod.rs index 781bd522..1111915b 100644 --- a/src/core/utils/stream/mod.rs +++ b/src/core/utils/stream/mod.rs @@ -3,6 +3,7 @@ mod expect; mod ignore; mod iter_stream; mod ready; +mod tools; mod try_ready; pub use cloned::Cloned; @@ -10,4 +11,5 @@ pub use expect::TryExpect; pub use ignore::TryIgnore; pub use iter_stream::IterStream; pub use ready::ReadyExt; +pub use tools::Tools; pub use try_ready::TryReadyExt; diff --git a/src/core/utils/stream/ready.rs b/src/core/utils/stream/ready.rs index 13f730a7..da5aec5a 100644 --- a/src/core/utils/stream/ready.rs +++ b/src/core/utils/stream/ready.rs @@ -2,7 +2,7 @@ use futures::{ future::{ready, Ready}, - stream::{Any, Filter, FilterMap, Fold, ForEach, SkipWhile, Stream, StreamExt, TakeWhile}, + stream::{Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile}, }; /// Synchronous combinators to augment futures::StreamExt. Most Stream @@ -11,98 +11,130 @@ use futures::{ /// convenience to reduce boilerplate by de-cluttering non-async predicates. /// /// This interface is not necessarily complete; feel free to add as-needed. -pub trait ReadyExt +pub trait ReadyExt where - S: Stream + Send + ?Sized, - Self: Stream + Send + Sized, + Self: Stream + Send + Sized, { - fn ready_any(self, f: F) -> Any, impl FnMut(S::Item) -> Ready> + fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where - F: Fn(S::Item) -> bool; + F: Fn(Item) -> bool; - fn ready_filter<'a, F>(self, f: F) -> Filter, impl FnMut(&S::Item) -> Ready + 'a> + fn ready_filter<'a, F>(self, f: F) -> Filter, impl FnMut(&Item) -> Ready + 'a> where - F: Fn(&S::Item) -> bool + 'a; + F: Fn(&Item) -> bool + 'a; - fn ready_filter_map(self, f: F) -> FilterMap>, impl FnMut(S::Item) -> Ready>> + fn ready_filter_map(self, f: F) -> FilterMap>, impl FnMut(Item) -> Ready>> where - F: Fn(S::Item) -> Option; + F: Fn(Item) -> Option; - fn ready_fold(self, init: T, f: F) -> Fold, T, impl FnMut(T, S::Item) -> Ready> + fn ready_fold(self, init: T, f: F) -> Fold, T, impl FnMut(T, Item) -> Ready> where - F: Fn(T, S::Item) -> T; + F: Fn(T, Item) -> T; - fn ready_for_each(self, f: F) -> ForEach, impl FnMut(S::Item) -> Ready<()>> + fn ready_for_each(self, f: F) -> ForEach, impl FnMut(Item) -> Ready<()>> where - F: FnMut(S::Item); + F: FnMut(Item); - fn ready_take_while<'a, F>(self, f: F) -> TakeWhile, impl FnMut(&S::Item) -> Ready + 'a> + fn ready_take_while<'a, F>(self, f: F) -> TakeWhile, impl FnMut(&Item) -> Ready + 'a> where - F: Fn(&S::Item) -> bool + 'a; + F: Fn(&Item) -> bool + 'a; - fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile, impl FnMut(&S::Item) -> Ready + 'a> + fn ready_scan( + self, init: T, f: F, + ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where - F: Fn(&S::Item) -> bool + 'a; + F: Fn(&mut T, Item) -> Option; + + fn ready_scan_each( + self, init: T, f: F, + ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> + where + F: Fn(&mut T, &Item); + + fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile, impl FnMut(&Item) -> Ready + 'a> + where + F: Fn(&Item) -> bool + 'a; } -impl ReadyExt for S +impl ReadyExt for S where - S: Stream + Send + ?Sized, - Self: Stream + Send + Sized, + S: Stream + Send + Sized, { #[inline] - fn ready_any(self, f: F) -> Any, impl FnMut(S::Item) -> Ready> + fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where - F: Fn(S::Item) -> bool, + F: Fn(Item) -> bool, { self.any(move |t| ready(f(t))) } #[inline] - fn ready_filter<'a, F>(self, f: F) -> Filter, impl FnMut(&S::Item) -> Ready + 'a> + fn ready_filter<'a, F>(self, f: F) -> Filter, impl FnMut(&Item) -> Ready + 'a> where - F: Fn(&S::Item) -> bool + 'a, + F: Fn(&Item) -> bool + 'a, { self.filter(move |t| ready(f(t))) } #[inline] - fn ready_filter_map(self, f: F) -> FilterMap>, impl FnMut(S::Item) -> Ready>> + fn ready_filter_map(self, f: F) -> FilterMap>, impl FnMut(Item) -> Ready>> where - F: Fn(S::Item) -> Option, + F: Fn(Item) -> Option, { self.filter_map(move |t| ready(f(t))) } #[inline] - fn ready_fold(self, init: T, f: F) -> Fold, T, impl FnMut(T, S::Item) -> Ready> + fn ready_fold(self, init: T, f: F) -> Fold, T, impl FnMut(T, Item) -> Ready> where - F: Fn(T, S::Item) -> T, + F: Fn(T, Item) -> T, { self.fold(init, move |a, t| ready(f(a, t))) } #[inline] #[allow(clippy::unit_arg)] - fn ready_for_each(self, mut f: F) -> ForEach, impl FnMut(S::Item) -> Ready<()>> + fn ready_for_each(self, mut f: F) -> ForEach, impl FnMut(Item) -> Ready<()>> where - F: FnMut(S::Item), + F: FnMut(Item), { self.for_each(move |t| ready(f(t))) } #[inline] - fn ready_take_while<'a, F>(self, f: F) -> TakeWhile, impl FnMut(&S::Item) -> Ready + 'a> + fn ready_take_while<'a, F>(self, f: F) -> TakeWhile, impl FnMut(&Item) -> Ready + 'a> where - F: Fn(&S::Item) -> bool + 'a, + F: Fn(&Item) -> bool + 'a, { self.take_while(move |t| ready(f(t))) } #[inline] - fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile, impl FnMut(&S::Item) -> Ready + 'a> + fn ready_scan( + self, init: T, f: F, + ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where - F: Fn(&S::Item) -> bool + 'a, + F: Fn(&mut T, Item) -> Option, + { + self.scan(init, move |s, t| ready(f(s, t))) + } + + fn ready_scan_each( + self, init: T, f: F, + ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> + where + F: Fn(&mut T, &Item), + { + self.ready_scan(init, move |s, t| { + f(s, &t); + Some(t) + }) + } + + #[inline] + fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile, impl FnMut(&Item) -> Ready + 'a> + where + F: Fn(&Item) -> bool + 'a, { self.skip_while(move |t| ready(f(t))) } diff --git a/src/core/utils/stream/tools.rs b/src/core/utils/stream/tools.rs new file mode 100644 index 00000000..cc6b7ca9 --- /dev/null +++ b/src/core/utils/stream/tools.rs @@ -0,0 +1,80 @@ +//! StreamTools for futures::Stream + +use std::{collections::HashMap, hash::Hash}; + +use futures::{Future, Stream, StreamExt}; + +use super::ReadyExt; +use crate::expected; + +/// StreamTools +/// +/// This interface is not necessarily complete; feel free to add as-needed. +pub trait Tools +where + Self: Stream + Send + Sized, + ::Item: Send, +{ + fn counts(self) -> impl Future> + Send + where + ::Item: Eq + Hash; + + fn counts_by(self, f: F) -> impl Future> + Send + where + F: Fn(Item) -> K + Send, + K: Eq + Hash + Send; + + fn counts_by_with_cap(self, f: F) -> impl Future> + Send + where + F: Fn(Item) -> K + Send, + K: Eq + Hash + Send; + + fn counts_with_cap(self) -> impl Future> + Send + where + ::Item: Eq + Hash; +} + +impl Tools for S +where + S: Stream + Send + Sized, + ::Item: Send, +{ + #[inline] + fn counts(self) -> impl Future> + Send + where + ::Item: Eq + Hash, + { + self.counts_with_cap::<0>() + } + + #[inline] + fn counts_by(self, f: F) -> impl Future> + Send + where + F: Fn(Item) -> K + Send, + K: Eq + Hash + Send, + { + self.counts_by_with_cap::<0, K, F>(f) + } + + #[inline] + fn counts_by_with_cap(self, f: F) -> impl Future> + Send + where + F: Fn(Item) -> K + Send, + K: Eq + Hash + Send, + { + self.map(f).counts_with_cap::() + } + + #[inline] + fn counts_with_cap(self) -> impl Future> + Send + where + ::Item: Eq + Hash, + { + self.ready_fold(HashMap::with_capacity(CAP), |mut counts, item| { + let entry = counts.entry(item).or_default(); + let value = *entry; + *entry = expected!(value + 1); + counts + }) + } +} diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index eedff861..25388084 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -4,7 +4,7 @@ use std::{collections::HashSet, sync::Arc}; use conduit::{ err, - utils::{stream::TryIgnore, ReadyExt}, + utils::{stream::TryIgnore, ReadyExt, StreamTools}, warn, Result, }; use data::Data; @@ -495,11 +495,13 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub fn servers_invite_via<'a>(&'a self, room_id: &'a RoomId) -> impl Stream + Send + 'a { + type KeyVal<'a> = (Ignore, Vec<&'a ServerName>); + self.db .roomid_inviteviaservers .stream_prefix_raw(room_id) .ignore_err() - .map(|(_, servers): (Ignore, Vec<&ServerName>)| &**(servers.last().expect("at least one servername"))) + .map(|(_, servers): KeyVal<'_>| *servers.last().expect("at least one server")) } /// Gets up to three servers that are likely to be in the room in the @@ -525,16 +527,14 @@ impl Service { let mut servers: Vec = self .room_members(room_id) - .collect::>() - .await - .iter() .counts_by(|user| user.server_name().to_owned()) - .iter() + .await + .into_iter() .sorted_by_key(|(_, users)| *users) - .map(|(server, _)| server.to_owned()) + .map(|(server, _)| server) .rev() .take(3) - .collect_vec(); + .collect(); if let Some(server) = most_powerful_user_server { servers.insert(0, server);