misc async optimizations; macro reformatting

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-09 01:50:13 +00:00
parent cd4e6b61a9
commit d82f00c31c

View file

@ -1,6 +1,6 @@
use std::{ use std::{
borrow::Borrow, borrow::Borrow,
collections::{BTreeMap, HashMap, HashSet}, collections::{HashMap, HashSet},
iter::once, iter::once,
net::IpAddr, net::IpAddr,
sync::Arc, sync::Arc,
@ -9,7 +9,7 @@ use std::{
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduwuit::{ use conduwuit::{
Err, Result, at, debug, debug_info, debug_warn, err, error, info, Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, info, is_matching,
matrix::{ matrix::{
StateKey, StateKey,
pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json}, pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json},
@ -17,7 +17,12 @@ use conduwuit::{
}, },
result::{FlatOk, NotFound}, result::{FlatOk, NotFound},
trace, trace,
utils::{self, IterStream, ReadyExt, shuffle}, utils::{
self, FutureBoolExt,
future::ReadyEqExt,
shuffle,
stream::{BroadbandExt, IterStream, ReadyExt},
},
warn, warn,
}; };
use conduwuit_service::{ use conduwuit_service::{
@ -28,7 +33,7 @@ use conduwuit_service::{
state_compressor::{CompressedState, HashSetCompressStateEvent}, state_compressor::{CompressedState, HashSetCompressStateEvent},
}, },
}; };
use futures::{FutureExt, StreamExt, TryFutureExt, future::join4, join}; use futures::{FutureExt, StreamExt, TryFutureExt, join, pin_mut};
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName, CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName,
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
@ -52,7 +57,6 @@ use ruma::{
room::{ room::{
join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent}, join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent}, member::{MembershipState, RoomMemberEventContent},
message::RoomMessageEventContent,
}, },
}, },
}; };
@ -81,7 +85,7 @@ async fn banned_room_check(
|| services || services
.config .config
.forbidden_remote_server_names .forbidden_remote_server_names
.is_match(room_id.server_name().unwrap().host()) .is_match(room_id.server_name().expect("legacy room mxid").host())
{ {
warn!( warn!(
"User {user_id} who is not an admin attempted to send an invite for or \ "User {user_id} who is not an admin attempted to send an invite for or \
@ -96,12 +100,11 @@ async fn banned_room_check(
if services.server.config.admin_room_notices { if services.server.config.admin_room_notices {
services services
.admin .admin
.send_message(RoomMessageEventContent::text_plain(format!( .send_text(&format!(
"Automatically deactivating user {user_id} due to attempted banned \ "Automatically deactivating user {user_id} due to attempted banned \
room join from IP {client_ip}" room join from IP {client_ip}"
))) ))
.await .await;
.ok();
} }
let all_joined_rooms: Vec<OwnedRoomId> = services let all_joined_rooms: Vec<OwnedRoomId> = services
@ -136,12 +139,11 @@ async fn banned_room_check(
if services.server.config.admin_room_notices { if services.server.config.admin_room_notices {
services services
.admin .admin
.send_message(RoomMessageEventContent::text_plain(format!( .send_text(&format!(
"Automatically deactivating user {user_id} due to attempted banned \ "Automatically deactivating user {user_id} due to attempted banned \
room join from IP {client_ip}" room join from IP {client_ip}"
))) ))
.await .await;
.ok();
} }
let all_joined_rooms: Vec<OwnedRoomId> = services let all_joined_rooms: Vec<OwnedRoomId> = services
@ -366,10 +368,10 @@ pub(crate) async fn knock_room_route(
InsecureClientIp(client): InsecureClientIp, InsecureClientIp(client): InsecureClientIp,
body: Ruma<knock_room::v3::Request>, body: Ruma<knock_room::v3::Request>,
) -> Result<knock_room::v3::Response> { ) -> Result<knock_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user();
let body = body.body; let body = &body.body;
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias) { let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias.clone()) {
| Ok(room_id) => { | Ok(room_id) => {
banned_room_check( banned_room_check(
&services, &services,
@ -493,7 +495,7 @@ pub(crate) async fn invite_user_route(
let sender_user = body.sender_user(); let sender_user = body.sender_user();
if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites { if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites {
info!( debug_error!(
"User {sender_user} is not an admin and attempted to send an invite to room {}", "User {sender_user} is not an admin and attempted to send an invite to room {}",
&body.room_id &body.room_id
); );
@ -722,12 +724,10 @@ pub(crate) async fn forget_room_route(
let joined = services.rooms.state_cache.is_joined(user_id, room_id); let joined = services.rooms.state_cache.is_joined(user_id, room_id);
let knocked = services.rooms.state_cache.is_knocked(user_id, room_id); let knocked = services.rooms.state_cache.is_knocked(user_id, room_id);
let left = services.rooms.state_cache.is_left(user_id, room_id);
let invited = services.rooms.state_cache.is_invited(user_id, room_id); let invited = services.rooms.state_cache.is_invited(user_id, room_id);
let (joined, knocked, left, invited) = join4(joined, knocked, left, invited).await; pin_mut!(joined, knocked, invited);
if joined.or(knocked).or(invited).await {
if joined || knocked || invited {
return Err!(Request(Unknown("You must leave the room before forgetting it"))); return Err!(Request(Unknown("You must leave the room before forgetting it")));
} }
@ -741,11 +741,11 @@ pub(crate) async fn forget_room_route(
return Err!(Request(Unknown("No membership event was found, room was never joined"))); return Err!(Request(Unknown("No membership event was found, room was never joined")));
} }
if left let non_membership = membership
|| membership.is_ok_and(|member| { .map(|member| member.membership)
member.membership == MembershipState::Leave .is_ok_and(is_matching!(MembershipState::Leave | MembershipState::Ban));
|| member.membership == MembershipState::Ban
}) { if non_membership || services.rooms.state_cache.is_left(user_id, room_id).await {
services.rooms.state_cache.forget(room_id, user_id); services.rooms.state_cache.forget(room_id, user_id);
} }
@ -866,32 +866,32 @@ pub(crate) async fn joined_members_route(
State(services): State<crate::State>, State(services): State<crate::State>,
body: Ruma<joined_members::v3::Request>, body: Ruma<joined_members::v3::Request>,
) -> Result<joined_members::v3::Response> { ) -> Result<joined_members::v3::Response> {
let sender_user = body.sender_user();
if !services if !services
.rooms .rooms
.state_accessor .state_accessor
.user_can_see_state_events(sender_user, &body.room_id) .user_can_see_state_events(body.sender_user(), &body.room_id)
.await .await
{ {
return Err!(Request(Forbidden("You don't have permission to view this room."))); return Err!(Request(Forbidden("You don't have permission to view this room.")));
} }
let joined: BTreeMap<OwnedUserId, RoomMember> = services Ok(joined_members::v3::Response {
joined: services
.rooms .rooms
.state_cache .state_cache
.room_members(&body.room_id) .room_members(&body.room_id)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.then(|user| async move { .broad_then(|user_id| async move {
(user.clone(), RoomMember { let member = RoomMember {
display_name: services.users.displayname(&user).await.ok(), display_name: services.users.displayname(&user_id).await.ok(),
avatar_url: services.users.avatar_url(&user).await.ok(), avatar_url: services.users.avatar_url(&user_id).await.ok(),
}) };
(user_id, member)
}) })
.collect() .collect()
.await; .await,
})
Ok(joined_members::v3::Response { joined })
} }
pub async fn join_room_by_id_helper( pub async fn join_room_by_id_helper(
@ -1118,9 +1118,10 @@ async fn join_room_by_id_helper_remote(
})?; })?;
if signed_event_id != event_id { if signed_event_id != event_id {
return Err!(Request(BadJson( return Err!(Request(BadJson(warn!(
warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID") %signed_event_id, %event_id,
))); "Server {remote_server} sent event with wrong event ID"
))));
} }
match signed_value["signatures"] match signed_value["signatures"]
@ -1696,19 +1697,18 @@ pub(crate) async fn invite_helper(
})?; })?;
if pdu.event_id != event_id { if pdu.event_id != event_id {
return Err!(Request(BadJson( return Err!(Request(BadJson(warn!(
warn!(%pdu.event_id, %event_id, "Server {} sent event with wrong event ID", user_id.server_name()) %pdu.event_id, %event_id,
))); "Server {} sent event with wrong event ID",
user_id.server_name()
))));
} }
let origin: OwnedServerName = serde_json::from_value( let origin: OwnedServerName = serde_json::from_value(serde_json::to_value(
serde_json::to_value(
value value
.get("origin") .get("origin")
.ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?, .ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?,
) )?)
.expect("CanonicalJson is valid json value"),
)
.map_err(|e| { .map_err(|e| {
err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}")))) err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}"))))
})?; })?;
@ -1818,9 +1818,11 @@ pub async fn leave_room(
blurhash: None, blurhash: None,
}; };
if services.rooms.metadata.is_banned(room_id).await let is_banned = services.rooms.metadata.is_banned(room_id);
|| services.rooms.metadata.is_disabled(room_id).await let is_disabled = services.rooms.metadata.is_disabled(room_id);
{
pin_mut!(is_banned, is_disabled);
if is_banned.or(is_disabled).await {
// the room is banned/disabled, the room must be rejected locally since we // the room is banned/disabled, the room must be rejected locally since we
// cant/dont want to federate with this server // cant/dont want to federate with this server
services services
@ -1840,18 +1842,21 @@ pub async fn leave_room(
return Ok(()); return Ok(());
} }
// Ask a remote server if we don't have this room and are not knocking on it let dont_have_room = services
if !services
.rooms .rooms
.state_cache .state_cache
.server_in_room(services.globals.server_name(), room_id) .server_in_room(services.globals.server_name(), room_id)
.await && !services .eq(&false);
let not_knocked = services
.rooms .rooms
.state_cache .state_cache
.is_knocked(user_id, room_id) .is_knocked(user_id, room_id)
.await .eq(&false);
{
if let Err(e) = remote_leave_room(services, user_id, room_id).await { // Ask a remote server if we don't have this room and are not knocking on it
if dont_have_room.and(not_knocked).await {
if let Err(e) = remote_leave_room(services, user_id, room_id).boxed().await {
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}"); warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
// Don't tell the client about this error // Don't tell the client about this error
} }