parallelize IO for PublicRoomsChunk vector

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-03-12 23:10:38 +00:00
parent aa4d2e2363
commit 7294368015

View file

@ -1,7 +1,17 @@
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Error, Result, info, warn}; use conduwuit::{
use futures::{StreamExt, TryFutureExt}; Err, Error, Result, info,
utils::{
TryFutureExtExt,
stream::{ReadyExt, WidebandExt},
},
warn,
};
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{join, join4, join5},
};
use ruma::{ use ruma::{
OwnedRoomId, RoomId, ServerName, UInt, UserId, OwnedRoomId, RoomId, ServerName, UInt, UserId,
api::{ api::{
@ -287,8 +297,8 @@ pub(crate) async fn get_public_rooms_filtered_helper(
.directory .directory
.public_rooms() .public_rooms()
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.then(|room_id| public_rooms_chunk(services, room_id)) .wide_then(|room_id| public_rooms_chunk(services, room_id))
.filter_map(|chunk| async move { .ready_filter_map(|chunk| {
if !filter.room_types.is_empty() && !filter.room_types.contains(&RoomTypeFilter::from(chunk.room_type.clone())) { if !filter.room_types.is_empty() && !filter.room_types.contains(&RoomTypeFilter::from(chunk.room_type.clone())) {
return None; return None;
} }
@ -394,60 +404,60 @@ async fn user_can_publish_room(
} }
async fn public_rooms_chunk(services: &Services, room_id: OwnedRoomId) -> PublicRoomsChunk { async fn public_rooms_chunk(services: &Services, room_id: OwnedRoomId) -> PublicRoomsChunk {
let name = services.rooms.state_accessor.get_name(&room_id).ok();
let room_type = services.rooms.state_accessor.get_room_type(&room_id).ok();
let canonical_alias = services
.rooms
.state_accessor
.get_canonical_alias(&room_id)
.ok();
let avatar_url = services.rooms.state_accessor.get_avatar(&room_id);
let topic = services.rooms.state_accessor.get_room_topic(&room_id).ok();
let world_readable = services.rooms.state_accessor.is_world_readable(&room_id);
let join_rule = services
.rooms
.state_accessor
.room_state_get_content(&room_id, &StateEventType::RoomJoinRules, "")
.map_ok(|c: RoomJoinRulesEventContent| match c.join_rule {
| JoinRule::Public => PublicRoomJoinRule::Public,
| JoinRule::Knock => "knock".into(),
| JoinRule::KnockRestricted(_) => "knock_restricted".into(),
| _ => "invite".into(),
});
let guest_can_join = services.rooms.state_accessor.guest_can_join(&room_id);
let num_joined_members = services.rooms.state_cache.room_joined_count(&room_id);
let (
(avatar_url, canonical_alias, guest_can_join, join_rule, name),
(num_joined_members, room_type, topic, world_readable),
) = join(
join5(avatar_url, canonical_alias, guest_can_join, join_rule, name),
join4(num_joined_members, room_type, topic, world_readable),
)
.boxed()
.await;
PublicRoomsChunk { PublicRoomsChunk {
canonical_alias: services avatar_url: avatar_url.into_option().unwrap_or_default().url,
.rooms canonical_alias,
.state_accessor guest_can_join,
.get_canonical_alias(&room_id) join_rule: join_rule.unwrap_or_default(),
.await name,
.ok(), num_joined_members: num_joined_members
name: services.rooms.state_accessor.get_name(&room_id).await.ok(),
num_joined_members: services
.rooms
.state_cache
.room_joined_count(&room_id)
.await
.unwrap_or(0) .unwrap_or(0)
.try_into() .try_into()
.expect("joined count overflows ruma UInt"), .expect("joined count overflows ruma UInt"),
topic: services
.rooms
.state_accessor
.get_room_topic(&room_id)
.await
.ok(),
world_readable: services
.rooms
.state_accessor
.is_world_readable(&room_id)
.await,
guest_can_join: services.rooms.state_accessor.guest_can_join(&room_id).await,
avatar_url: services
.rooms
.state_accessor
.get_avatar(&room_id)
.await
.into_option()
.unwrap_or_default()
.url,
join_rule: services
.rooms
.state_accessor
.room_state_get_content(&room_id, &StateEventType::RoomJoinRules, "")
.map_ok(|c: RoomJoinRulesEventContent| match c.join_rule {
| JoinRule::Public => PublicRoomJoinRule::Public,
| JoinRule::Knock => "knock".into(),
| JoinRule::KnockRestricted(_) => "knock_restricted".into(),
| _ => "invite".into(),
})
.await
.unwrap_or_default(),
room_type: services
.rooms
.state_accessor
.get_room_type(&room_id)
.await
.ok(),
room_id, room_id,
room_type,
topic,
world_readable,
} }
} }