From ac944496c15bc476bc9964e034b7fcea737cc733 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 14 Jan 2025 19:17:45 +0000 Subject: [PATCH] optimize statekey-from-short loopsite Signed-off-by: Jason Volk --- .../rooms/event_handler/resolve_state.rs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/service/rooms/event_handler/resolve_state.rs b/src/service/rooms/event_handler/resolve_state.rs index 8640c582..edce880d 100644 --- a/src/service/rooms/event_handler/resolve_state.rs +++ b/src/service/rooms/event_handler/resolve_state.rs @@ -6,10 +6,10 @@ use std::{ use conduwuit::{ debug, err, implement, - utils::stream::{automatic_width, IterStream, WidebandExt}, + utils::stream::{automatic_width, IterStream, ReadyExt, WidebandExt}, Result, }; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt}; use ruma::{ state_res::{self, StateMap}, OwnedEventId, RoomId, RoomVersionId, @@ -59,17 +59,18 @@ pub async fn resolve_state( let fork_states: Vec> = fork_states .into_iter() .stream() - .wide_then(|fork_state| { - fork_state - .into_iter() - .stream() - .wide_filter_map(|(k, id)| { - self.services - .short - .get_statekey_from_short(k) - .map_ok_or_else(|_| None, move |(ty, st_key)| Some(((ty, st_key), id))) - }) + .wide_then(|fork_state| async move { + let shortstatekeys = fork_state.keys().copied().stream(); + + let event_ids = fork_state.values().cloned().stream().boxed(); + + self.services + .short + .multi_get_statekey_from_short(shortstatekeys) + .zip(event_ids) + .ready_filter_map(|(ty_sk, id)| Some((ty_sk.ok()?, id))) .collect() + .await }) .collect() .await;