eliminate future wrapping stream for all_pdus()

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-06 03:09:08 +00:00
parent 8e8c6bfe07
commit de3b137df8
3 changed files with 11 additions and 9 deletions

View file

@ -609,7 +609,6 @@ async fn load_joined_room(
.rooms .rooms
.timeline .timeline
.all_pdus(sender_user, room_id) .all_pdus(sender_user, room_id)
.await?
.ready_filter(|(_, pdu)| pdu.kind == RoomMember) .ready_filter(|(_, pdu)| pdu.kind == RoomMember)
.filter_map(|(_, pdu)| async move { .filter_map(|(_, pdu)| async move {
let content: RoomMemberEventContent = pdu.get_content().ok()?; let content: RoomMemberEventContent = pdu.get_content().ok()?;

View file

@ -220,7 +220,7 @@ impl Data {
pub(super) async fn pdus<'a>( pub(super) async fn pdus<'a>(
&'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, &'a self, user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> { ) -> Result<impl Stream<Item = PdusIterItem> + Send + Unpin + 'a> {
let current = self.count_to_id(room_id, from, Direction::Forward).await?; let current = self.count_to_id(room_id, from, Direction::Forward).await?;
let prefix = current.shortroomid(); let prefix = current.shortroomid();
let stream = self let stream = self

View file

@ -15,7 +15,7 @@ use conduit::{
validated, warn, Err, Error, Result, Server, validated, warn, Err, Error, Result, Server,
}; };
pub use conduit::{PduId, RawPduId}; pub use conduit::{PduId, RawPduId};
use futures::{future, future::ready, Future, FutureExt, Stream, StreamExt, TryStreamExt}; use futures::{future, future::ready, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{ use ruma::{
api::federation, api::federation,
canonical_json::to_canonical_value, canonical_json::to_canonical_value,
@ -168,7 +168,6 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Arc<PduEvent>> { pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Arc<PduEvent>> {
self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id) self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)
.await?
.next() .next()
.await .await
.map(|(_, p)| Arc::new(p)) .map(|(_, p)| Arc::new(p))
@ -968,12 +967,17 @@ impl Service {
Ok(Some(pdu_id)) Ok(Some(pdu_id))
} }
/// Returns an iterator over all PDUs in a room. /// Returns an iterator over all PDUs in a room. Unknown rooms produce no
/// items.
#[inline] #[inline]
pub async fn all_pdus<'a>( pub fn all_pdus<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, &'a self, user_id: &'a UserId, room_id: &'a RoomId,
) -> Result<impl Stream<Item = PdusIterItem> + Send + 'a> { ) -> impl Stream<Item = PdusIterItem> + Send + Unpin + 'a {
self.pdus(Some(user_id), room_id, None).await self.pdus(Some(user_id), room_id, None)
.map_ok(|stream| stream.map(Ok))
.try_flatten_stream()
.ignore_err()
.boxed()
} }
/// Reverse iteration starting at from. /// Reverse iteration starting at from.
@ -1048,7 +1052,6 @@ impl Service {
let first_pdu = self let first_pdu = self
.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id) .all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)
.await?
.next() .next()
.await .await
.expect("Room is not empty"); .expect("Room is not empty");