parallelize calculate_invite_state

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-04 03:40:00 +00:00 committed by strawberry
parent 685eadb171
commit 2b2055fe8a
3 changed files with 27 additions and 60 deletions

View file

@ -1452,7 +1452,7 @@ pub(crate) async fn invite_helper(
) )
.await?; .await?;
let invite_room_state = services.rooms.state.calculate_invite_state(&pdu).await?; let invite_room_state = services.rooms.state.summary_stripped(&pdu).await;
drop(state_lock); drop(state_lock);

View file

@ -3,6 +3,7 @@ mod data;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fmt::Write, fmt::Write,
iter::once,
sync::Arc, sync::Arc,
}; };
@ -13,7 +14,7 @@ use conduit::{
}; };
use data::Data; use data::Data;
use database::{Ignore, Interfix}; use database::{Ignore, Interfix};
use futures::{pin_mut, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use futures::{future::join_all, pin_mut, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{ use ruma::{
events::{ events::{
room::{create::RoomCreateEventContent, member::RoomMemberEventContent}, room::{create::RoomCreateEventContent, member::RoomMemberEventContent},
@ -288,61 +289,30 @@ impl Service {
} }
} }
#[tracing::instrument(skip(self, invite_event), level = "debug")] #[tracing::instrument(skip_all, level = "debug")]
pub async fn calculate_invite_state(&self, invite_event: &PduEvent) -> Result<Vec<Raw<AnyStrippedStateEvent>>> { pub async fn summary_stripped(&self, invite: &PduEvent) -> Vec<Raw<AnyStrippedStateEvent>> {
let mut state = Vec::new(); let cells = [
// Add recommended events (&StateEventType::RoomCreate, ""),
if let Ok(e) = self (&StateEventType::RoomJoinRules, ""),
.services (&StateEventType::RoomCanonicalAlias, ""),
.state_accessor (&StateEventType::RoomName, ""),
.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "") (&StateEventType::RoomAvatar, ""),
.await (&StateEventType::RoomMember, invite.sender.as_str()), // Add recommended events
{ ];
state.push(e.to_stripped_state_event());
}
if let Ok(e) = self
.services
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")
.await
{
state.push(e.to_stripped_state_event());
}
if let Ok(e) = self
.services
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomCanonicalAlias, "")
.await
{
state.push(e.to_stripped_state_event());
}
if let Ok(e) = self
.services
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")
.await
{
state.push(e.to_stripped_state_event());
}
if let Ok(e) = self
.services
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")
.await
{
state.push(e.to_stripped_state_event());
}
if let Ok(e) = self
.services
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomMember, invite_event.sender.as_str())
.await
{
state.push(e.to_stripped_state_event());
}
state.push(invite_event.to_stripped_state_event()); let fetches = cells.iter().map(|(event_type, state_key)| {
Ok(state) self.services
.state_accessor
.room_state_get(&invite.room_id, event_type, state_key)
});
join_all(fetches)
.await
.into_iter()
.filter_map(Result::ok)
.map(|e| e.to_stripped_state_event())
.chain(once(invite.to_stripped_state_event()))
.collect()
} }
/// Set the state hash to a new version, but does not update state_cache. /// Set the state hash to a new version, but does not update state_cache.

View file

@ -513,10 +513,7 @@ impl Service {
})?; })?;
let invite_state = match content.membership { let invite_state = match content.membership {
MembershipState::Invite => { MembershipState::Invite => self.services.state.summary_stripped(pdu).await.into(),
let state = self.services.state.calculate_invite_state(pdu).await?;
Some(state)
},
_ => None, _ => None,
}; };