From 09bc71caaba40321ec0f987574a94e788175c4f9 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 31 Jan 2025 09:08:13 +0000 Subject: [PATCH] fix missed concurrent fetch opportunities in sender (ffd0fd42424a) Signed-off-by: Jason Volk --- src/service/sending/sender.rs | 41 +++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 363bb994..f19b69da 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -13,7 +13,12 @@ use conduwuit::{ debug, err, error, result::LogErr, trace, - utils::{calculate_hash, continue_exponential_backoff_secs, stream::IterStream, ReadyExt}, + utils::{ + calculate_hash, continue_exponential_backoff_secs, + future::TryExtExt, + stream::{BroadbandExt, IterStream, WidebandExt}, + ReadyExt, + }, warn, Error, Result, }; use futures::{ @@ -474,20 +479,25 @@ impl Service { since: (u64, u64), max_edu_count: &AtomicU64, ) -> Option { - let server_rooms = self.services.state_cache.server_rooms(server_name); - - pin_mut!(server_rooms); let mut num = 0; - let mut receipts = BTreeMap::::new(); - while let Some(room_id) = server_rooms.next().await { - let receipt_map = self - .select_edus_receipts_room(room_id, since, max_edu_count, &mut num) - .await; + let receipts: BTreeMap = self + .services + .state_cache + .server_rooms(server_name) + .map(ToOwned::to_owned) + .broad_filter_map(|room_id| async move { + let receipt_map = self + .select_edus_receipts_room(&room_id, since, max_edu_count, &mut num) + .await; - if !receipt_map.read.is_empty() { - receipts.insert(room_id.into(), receipt_map); - } - } + receipt_map + .read + .is_empty() + .eq(&false) + .then_some((room_id, receipt_map)) + }) + .collect() + .await; if receipts.is_empty() { return None; @@ -820,9 +830,8 @@ impl Service { | _ => None, }) .stream() - .then(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id)) - .ready_filter_map(Result::ok) - .then(|pdu| self.convert_to_outgoing_federation_event(pdu)) + .wide_filter_map(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id).ok()) + .wide_then(|pdu| self.convert_to_outgoing_federation_event(pdu)) .collect() .await;