add ready_try_for_each to TryReadyExt extension utils

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-21 23:54:54 +00:00 committed by strawberry
parent b505f0d0d7
commit 0e55fa2de2
2 changed files with 26 additions and 6 deletions

View file

@ -2,7 +2,7 @@
use futures::{
future::{ready, Ready},
stream::{AndThen, TryStream, TryStreamExt},
stream::{AndThen, TryForEach, TryStream, TryStreamExt},
};
use crate::Result;
@ -18,6 +18,12 @@ where
fn ready_and_then<U, F>(self, f: F) -> AndThen<Self, Ready<Result<U, E>>, impl FnMut(S::Ok) -> Ready<Result<U, E>>>
where
F: Fn(S::Ok) -> Result<U, E>;
fn ready_try_for_each<F>(
self, f: F,
) -> TryForEach<Self, Ready<Result<(), E>>, impl FnMut(S::Ok) -> Ready<Result<(), E>>>
where
F: Fn(S::Ok) -> Result<(), E>;
}
impl<T, E, S> TryReadyExt<T, E, S> for S
@ -32,4 +38,14 @@ where
{
self.and_then(move |t| ready(f(t)))
}
#[inline]
fn ready_try_for_each<F>(
self, f: F,
) -> TryForEach<Self, Ready<Result<(), E>>, impl FnMut(S::Ok) -> Ready<Result<(), E>>>
where
F: Fn(S::Ok) -> Result<(), E>,
{
self.try_for_each(move |t| ready(f(t)))
}
}

View file

@ -7,8 +7,12 @@ mod sender;
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use conduit::{err, utils::ReadyExt, warn, Result, Server};
use futures::{future::ready, Stream, StreamExt, TryStreamExt};
use conduit::{
err,
utils::{ReadyExt, TryReadyExt},
warn, Result, Server,
};
use futures::{Stream, StreamExt};
use ruma::{
api::{appservice::Registration, OutgoingRequest},
RoomId, ServerName, UserId,
@ -235,12 +239,12 @@ impl Service {
.map(ToOwned::to_owned)
.map(Destination::Normal)
.map(Ok)
.try_for_each(|dest| {
ready(self.dispatch(Msg {
.ready_try_for_each(|dest| {
self.dispatch(Msg {
dest,
event: SendingEvent::Flush,
queue_id: Vec::<u8>::new(),
}))
})
})
.await
}