diff --git a/src/core/utils/stream/try_ready.rs b/src/core/utils/stream/try_ready.rs index ab37d9b3..df356456 100644 --- a/src/core/utils/stream/try_ready.rs +++ b/src/core/utils/stream/try_ready.rs @@ -2,7 +2,7 @@ use futures::{ future::{ready, Ready}, - stream::{AndThen, TryStream, TryStreamExt}, + stream::{AndThen, TryForEach, TryStream, TryStreamExt}, }; use crate::Result; @@ -18,6 +18,12 @@ where fn ready_and_then(self, f: F) -> AndThen>, impl FnMut(S::Ok) -> Ready>> where F: Fn(S::Ok) -> Result; + + fn ready_try_for_each( + self, f: F, + ) -> TryForEach>, impl FnMut(S::Ok) -> Ready>> + where + F: Fn(S::Ok) -> Result<(), E>; } impl TryReadyExt for S @@ -32,4 +38,14 @@ where { self.and_then(move |t| ready(f(t))) } + + #[inline] + fn ready_try_for_each( + self, f: F, + ) -> TryForEach>, impl FnMut(S::Ok) -> Ready>> + where + F: Fn(S::Ok) -> Result<(), E>, + { + self.try_for_each(move |t| ready(f(t))) + } } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 63c5e655..a1d5f692 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -7,8 +7,12 @@ mod sender; use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use conduit::{err, utils::ReadyExt, warn, Result, Server}; -use futures::{future::ready, Stream, StreamExt, TryStreamExt}; +use conduit::{ + err, + utils::{ReadyExt, TryReadyExt}, + warn, Result, Server, +}; +use futures::{Stream, StreamExt}; use ruma::{ api::{appservice::Registration, OutgoingRequest}, RoomId, ServerName, UserId, @@ -235,12 +239,12 @@ impl Service { .map(ToOwned::to_owned) .map(Destination::Normal) .map(Ok) - .try_for_each(|dest| { - ready(self.dispatch(Msg { + .ready_try_for_each(|dest| { + self.dispatch(Msg { dest, event: SendingEvent::Flush, queue_id: Vec::::new(), - })) + }) }) .await }