parallelize current and incoming fork-state fetch

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-20 07:38:32 +00:00
parent 388730d6dd
commit ea25dc04b2

View file

@ -6,10 +6,10 @@ use std::{
use conduwuit::{ use conduwuit::{
debug, err, implement, debug, err, implement,
utils::stream::{automatic_width, IterStream, ReadyExt, WidebandExt}, utils::stream::{automatic_width, IterStream, ReadyExt, TryWidebandExt, WidebandExt},
Result, Result,
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{ use ruma::{
state_res::{self, StateMap}, state_res::{self, StateMap},
OwnedEventId, RoomId, RoomVersionId, OwnedEventId, RoomId, RoomVersionId,
@ -40,20 +40,24 @@ pub async fn resolve_state(
.await?; .await?;
let fork_states = [current_state_ids, incoming_state]; let fork_states = [current_state_ids, incoming_state];
let mut auth_chain_sets = Vec::with_capacity(fork_states.len()); let auth_chain_sets: Vec<HashSet<OwnedEventId>> = fork_states
for state in &fork_states { .iter()
let starting_events = state.values().map(Borrow::borrow); .try_stream()
.wide_and_then(|state| async move {
let starting_events = state.values().map(Borrow::borrow);
let auth_chain: HashSet<OwnedEventId> = self let auth_chain = self
.services .services
.auth_chain .auth_chain
.get_event_ids(room_id, starting_events) .get_event_ids(room_id, starting_events)
.await? .await?
.into_iter() .into_iter()
.collect(); .collect();
auth_chain_sets.push(auth_chain); Ok(auth_chain)
} })
.try_collect()
.await?;
debug!("Loading fork states"); debug!("Loading fork states");
let fork_states: Vec<StateMap<OwnedEventId>> = fork_states let fork_states: Vec<StateMap<OwnedEventId>> = fork_states