abstract and encapsulate the awkward OptionFuture into Stream pattern

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-04 02:39:40 +00:00
parent 532dfd004d
commit bb8320a691
4 changed files with 35 additions and 40 deletions

View file

@ -15,6 +15,7 @@ use conduwuit::{
result::FlatOk, result::FlatOk,
utils::{ utils::{
self, BoolExt, IterStream, ReadyExt, TryFutureExtExt, self, BoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::OptionStream,
math::ruma_from_u64, math::ruma_from_u64,
stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
}, },
@ -1036,7 +1037,7 @@ async fn calculate_state_incremental<'a>(
}) })
.into(); .into();
let state_diff: OptionFuture<_> = (!full_state && state_changed) let state_diff_ids: OptionFuture<_> = (!full_state && state_changed)
.then(|| { .then(|| {
StreamExt::into_future( StreamExt::into_future(
services services
@ -1061,45 +1062,9 @@ async fn calculate_state_incremental<'a>(
}) })
.into(); .into();
let lazy_state_ids = lazy_state_ids
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten();
let state_diff_ids = state_diff
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten();
let state_events = current_state_ids let state_events = current_state_ids
.map(|opt| { .stream()
opt.map(|(curr, next)| { .chain(state_diff_ids.stream())
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten()
.chain(state_diff_ids)
.broad_filter_map(|(shortstatekey, shorteventid)| async move { .broad_filter_map(|(shortstatekey, shorteventid)| async move {
if witness.is_none() || encrypted_room { if witness.is_none() || encrypted_room {
return Some(shorteventid); return Some(shorteventid);
@ -1107,7 +1072,7 @@ async fn calculate_state_incremental<'a>(
lazy_filter(services, sender_user, shortstatekey, shorteventid).await lazy_filter(services, sender_user, shortstatekey, shorteventid).await
}) })
.chain(lazy_state_ids) .chain(lazy_state_ids.stream())
.broad_filter_map(|shorteventid| { .broad_filter_map(|shorteventid| {
services services
.rooms .rooms

View file

@ -1,9 +1,11 @@
mod bool_ext; mod bool_ext;
mod ext_ext; mod ext_ext;
mod option_ext; mod option_ext;
mod option_stream;
mod try_ext_ext; mod try_ext_ext;
pub use bool_ext::{BoolExt, and, or}; pub use bool_ext::{BoolExt, and, or};
pub use ext_ext::ExtExt; pub use ext_ext::ExtExt;
pub use option_ext::OptionExt; pub use option_ext::OptionExt;
pub use option_stream::OptionStream;
pub use try_ext_ext::TryExtExt; pub use try_ext_ext::TryExtExt;

View file

@ -11,11 +11,14 @@ pub trait OptionExt<T> {
impl<T, Fut> OptionExt<T> for OptionFuture<Fut> impl<T, Fut> OptionExt<T> for OptionFuture<Fut>
where where
Fut: Future<Output = T> + Send, Fut: Future<Output = T> + Send,
T: Send,
{ {
#[inline]
fn is_none_or(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send { fn is_none_or(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send {
self.map(|o| o.as_ref().is_none_or(f)) self.map(|o| o.as_ref().is_none_or(f))
} }
#[inline]
fn is_some_and(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send { fn is_some_and(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send {
self.map(|o| o.as_ref().is_some_and(f)) self.map(|o| o.as_ref().is_some_and(f))
} }

View file

@ -0,0 +1,25 @@
use futures::{Future, FutureExt, Stream, StreamExt, future::OptionFuture};
use super::super::IterStream;
pub trait OptionStream<T> {
fn stream(self) -> impl Stream<Item = T> + Send;
}
impl<T, O, S, Fut> OptionStream<T> for OptionFuture<Fut>
where
Fut: Future<Output = (O, S)> + Send,
S: Stream<Item = T> + Send,
O: IntoIterator<Item = T> + Send,
<O as IntoIterator>::IntoIter: Send,
T: Send,
{
#[inline]
fn stream(self) -> impl Stream<Item = T> + Send {
self.map(|opt| opt.map(|(curr, next)| curr.into_iter().stream().chain(next)))
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten()
}
}