From de3b137df801f4e362957235884b11e34a7c6626 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 6 Dec 2024 03:09:08 +0000 Subject: [PATCH] eliminate future wrapping stream for all_pdus() Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 1 - src/service/rooms/timeline/data.rs | 2 +- src/service/rooms/timeline/mod.rs | 17 ++++++++++------- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 5578077f..3a78c9ad 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -609,7 +609,6 @@ async fn load_joined_room( .rooms .timeline .all_pdus(sender_user, room_id) - .await? .ready_filter(|(_, pdu)| pdu.kind == RoomMember) .filter_map(|(_, pdu)| async move { let content: RoomMemberEventContent = pdu.get_content().ok()?; diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index c394dc3b..94621385 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -220,7 +220,7 @@ impl Data { pub(super) async fn pdus<'a>( &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, - ) -> Result + Send + 'a> { + ) -> Result + Send + Unpin + 'a> { let current = self.count_to_id(room_id, from, Direction::Forward).await?; let prefix = current.shortroomid(); let stream = self diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 07b406c4..0a96322b 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -15,7 +15,7 @@ use conduit::{ validated, warn, Err, Error, Result, Server, }; pub use conduit::{PduId, RawPduId}; -use futures::{future, future::ready, Future, FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{future, future::ready, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use ruma::{ api::federation, canonical_json::to_canonical_value, @@ -168,7 +168,6 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result> { self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id) - .await? .next() .await .map(|(_, p)| Arc::new(p)) @@ -968,12 +967,17 @@ impl Service { Ok(Some(pdu_id)) } - /// Returns an iterator over all PDUs in a room. + /// Returns an iterator over all PDUs in a room. Unknown rooms produce no + /// items. #[inline] - pub async fn all_pdus<'a>( + pub fn all_pdus<'a>( &'a self, user_id: &'a UserId, room_id: &'a RoomId, - ) -> Result + Send + 'a> { - self.pdus(Some(user_id), room_id, None).await + ) -> impl Stream + Send + Unpin + 'a { + self.pdus(Some(user_id), room_id, None) + .map_ok(|stream| stream.map(Ok)) + .try_flatten_stream() + .ignore_err() + .boxed() } /// Reverse iteration starting at from. @@ -1048,7 +1052,6 @@ impl Service { let first_pdu = self .all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id) - .await? .next() .await .expect("Room is not empty");