misc async optimizations; macro reformatting
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
460cf27a03
commit
f86d7236ac
1 changed files with 72 additions and 67 deletions
|
@ -1,6 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Borrow,
|
borrow::Borrow,
|
||||||
collections::{BTreeMap, HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
iter::once,
|
iter::once,
|
||||||
net::IpAddr,
|
net::IpAddr,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
@ -9,7 +9,7 @@ use std::{
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, at, debug, debug_info, debug_warn, err, error, info,
|
Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, info, is_matching,
|
||||||
matrix::{
|
matrix::{
|
||||||
StateKey,
|
StateKey,
|
||||||
pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json},
|
pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json},
|
||||||
|
@ -17,7 +17,12 @@ use conduwuit::{
|
||||||
},
|
},
|
||||||
result::{FlatOk, NotFound},
|
result::{FlatOk, NotFound},
|
||||||
trace,
|
trace,
|
||||||
utils::{self, IterStream, ReadyExt, shuffle},
|
utils::{
|
||||||
|
self, FutureBoolExt,
|
||||||
|
future::ReadyEqExt,
|
||||||
|
shuffle,
|
||||||
|
stream::{BroadbandExt, IterStream, ReadyExt},
|
||||||
|
},
|
||||||
warn,
|
warn,
|
||||||
};
|
};
|
||||||
use conduwuit_service::{
|
use conduwuit_service::{
|
||||||
|
@ -28,7 +33,7 @@ use conduwuit_service::{
|
||||||
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt, TryFutureExt, future::join4, join};
|
use futures::{FutureExt, StreamExt, TryFutureExt, join, pin_mut};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName,
|
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName,
|
||||||
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
|
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
|
@ -52,7 +57,6 @@ use ruma::{
|
||||||
room::{
|
room::{
|
||||||
join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent},
|
join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent},
|
||||||
member::{MembershipState, RoomMemberEventContent},
|
member::{MembershipState, RoomMemberEventContent},
|
||||||
message::RoomMessageEventContent,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -81,7 +85,7 @@ async fn banned_room_check(
|
||||||
|| services
|
|| services
|
||||||
.config
|
.config
|
||||||
.forbidden_remote_server_names
|
.forbidden_remote_server_names
|
||||||
.is_match(room_id.server_name().unwrap().host())
|
.is_match(room_id.server_name().expect("legacy room mxid").host())
|
||||||
{
|
{
|
||||||
warn!(
|
warn!(
|
||||||
"User {user_id} who is not an admin attempted to send an invite for or \
|
"User {user_id} who is not an admin attempted to send an invite for or \
|
||||||
|
@ -96,12 +100,11 @@ async fn banned_room_check(
|
||||||
if services.server.config.admin_room_notices {
|
if services.server.config.admin_room_notices {
|
||||||
services
|
services
|
||||||
.admin
|
.admin
|
||||||
.send_message(RoomMessageEventContent::text_plain(format!(
|
.send_text(&format!(
|
||||||
"Automatically deactivating user {user_id} due to attempted banned \
|
"Automatically deactivating user {user_id} due to attempted banned \
|
||||||
room join from IP {client_ip}"
|
room join from IP {client_ip}"
|
||||||
)))
|
))
|
||||||
.await
|
.await;
|
||||||
.ok();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let all_joined_rooms: Vec<OwnedRoomId> = services
|
let all_joined_rooms: Vec<OwnedRoomId> = services
|
||||||
|
@ -136,12 +139,11 @@ async fn banned_room_check(
|
||||||
if services.server.config.admin_room_notices {
|
if services.server.config.admin_room_notices {
|
||||||
services
|
services
|
||||||
.admin
|
.admin
|
||||||
.send_message(RoomMessageEventContent::text_plain(format!(
|
.send_text(&format!(
|
||||||
"Automatically deactivating user {user_id} due to attempted banned \
|
"Automatically deactivating user {user_id} due to attempted banned \
|
||||||
room join from IP {client_ip}"
|
room join from IP {client_ip}"
|
||||||
)))
|
))
|
||||||
.await
|
.await;
|
||||||
.ok();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let all_joined_rooms: Vec<OwnedRoomId> = services
|
let all_joined_rooms: Vec<OwnedRoomId> = services
|
||||||
|
@ -366,10 +368,10 @@ pub(crate) async fn knock_room_route(
|
||||||
InsecureClientIp(client): InsecureClientIp,
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
body: Ruma<knock_room::v3::Request>,
|
body: Ruma<knock_room::v3::Request>,
|
||||||
) -> Result<knock_room::v3::Response> {
|
) -> Result<knock_room::v3::Response> {
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user();
|
||||||
let body = body.body;
|
let body = &body.body;
|
||||||
|
|
||||||
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias) {
|
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias.clone()) {
|
||||||
| Ok(room_id) => {
|
| Ok(room_id) => {
|
||||||
banned_room_check(
|
banned_room_check(
|
||||||
&services,
|
&services,
|
||||||
|
@ -493,7 +495,7 @@ pub(crate) async fn invite_user_route(
|
||||||
let sender_user = body.sender_user();
|
let sender_user = body.sender_user();
|
||||||
|
|
||||||
if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites {
|
if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites {
|
||||||
info!(
|
debug_error!(
|
||||||
"User {sender_user} is not an admin and attempted to send an invite to room {}",
|
"User {sender_user} is not an admin and attempted to send an invite to room {}",
|
||||||
&body.room_id
|
&body.room_id
|
||||||
);
|
);
|
||||||
|
@ -722,12 +724,10 @@ pub(crate) async fn forget_room_route(
|
||||||
|
|
||||||
let joined = services.rooms.state_cache.is_joined(user_id, room_id);
|
let joined = services.rooms.state_cache.is_joined(user_id, room_id);
|
||||||
let knocked = services.rooms.state_cache.is_knocked(user_id, room_id);
|
let knocked = services.rooms.state_cache.is_knocked(user_id, room_id);
|
||||||
let left = services.rooms.state_cache.is_left(user_id, room_id);
|
|
||||||
let invited = services.rooms.state_cache.is_invited(user_id, room_id);
|
let invited = services.rooms.state_cache.is_invited(user_id, room_id);
|
||||||
|
|
||||||
let (joined, knocked, left, invited) = join4(joined, knocked, left, invited).await;
|
pin_mut!(joined, knocked, invited);
|
||||||
|
if joined.or(knocked).or(invited).await {
|
||||||
if joined || knocked || invited {
|
|
||||||
return Err!(Request(Unknown("You must leave the room before forgetting it")));
|
return Err!(Request(Unknown("You must leave the room before forgetting it")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -741,11 +741,11 @@ pub(crate) async fn forget_room_route(
|
||||||
return Err!(Request(Unknown("No membership event was found, room was never joined")));
|
return Err!(Request(Unknown("No membership event was found, room was never joined")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if left
|
let non_membership = membership
|
||||||
|| membership.is_ok_and(|member| {
|
.map(|member| member.membership)
|
||||||
member.membership == MembershipState::Leave
|
.is_ok_and(is_matching!(MembershipState::Leave | MembershipState::Ban));
|
||||||
|| member.membership == MembershipState::Ban
|
|
||||||
}) {
|
if non_membership || services.rooms.state_cache.is_left(user_id, room_id).await {
|
||||||
services.rooms.state_cache.forget(room_id, user_id);
|
services.rooms.state_cache.forget(room_id, user_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,32 +866,32 @@ pub(crate) async fn joined_members_route(
|
||||||
State(services): State<crate::State>,
|
State(services): State<crate::State>,
|
||||||
body: Ruma<joined_members::v3::Request>,
|
body: Ruma<joined_members::v3::Request>,
|
||||||
) -> Result<joined_members::v3::Response> {
|
) -> Result<joined_members::v3::Response> {
|
||||||
let sender_user = body.sender_user();
|
|
||||||
|
|
||||||
if !services
|
if !services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.user_can_see_state_events(sender_user, &body.room_id)
|
.user_can_see_state_events(body.sender_user(), &body.room_id)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
return Err!(Request(Forbidden("You don't have permission to view this room.")));
|
return Err!(Request(Forbidden("You don't have permission to view this room.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
let joined: BTreeMap<OwnedUserId, RoomMember> = services
|
Ok(joined_members::v3::Response {
|
||||||
.rooms
|
joined: services
|
||||||
.state_cache
|
.rooms
|
||||||
.room_members(&body.room_id)
|
.state_cache
|
||||||
.map(ToOwned::to_owned)
|
.room_members(&body.room_id)
|
||||||
.then(|user| async move {
|
.map(ToOwned::to_owned)
|
||||||
(user.clone(), RoomMember {
|
.broad_then(|user_id| async move {
|
||||||
display_name: services.users.displayname(&user).await.ok(),
|
let member = RoomMember {
|
||||||
avatar_url: services.users.avatar_url(&user).await.ok(),
|
display_name: services.users.displayname(&user_id).await.ok(),
|
||||||
})
|
avatar_url: services.users.avatar_url(&user_id).await.ok(),
|
||||||
})
|
};
|
||||||
.collect()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(joined_members::v3::Response { joined })
|
(user_id, member)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
.await,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn join_room_by_id_helper(
|
pub async fn join_room_by_id_helper(
|
||||||
|
@ -1118,9 +1118,10 @@ async fn join_room_by_id_helper_remote(
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if signed_event_id != event_id {
|
if signed_event_id != event_id {
|
||||||
return Err!(Request(BadJson(
|
return Err!(Request(BadJson(warn!(
|
||||||
warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID")
|
%signed_event_id, %event_id,
|
||||||
)));
|
"Server {remote_server} sent event with wrong event ID"
|
||||||
|
))));
|
||||||
}
|
}
|
||||||
|
|
||||||
match signed_value["signatures"]
|
match signed_value["signatures"]
|
||||||
|
@ -1696,19 +1697,18 @@ pub(crate) async fn invite_helper(
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if pdu.event_id != event_id {
|
if pdu.event_id != event_id {
|
||||||
return Err!(Request(BadJson(
|
return Err!(Request(BadJson(warn!(
|
||||||
warn!(%pdu.event_id, %event_id, "Server {} sent event with wrong event ID", user_id.server_name())
|
%pdu.event_id, %event_id,
|
||||||
)));
|
"Server {} sent event with wrong event ID",
|
||||||
|
user_id.server_name()
|
||||||
|
))));
|
||||||
}
|
}
|
||||||
|
|
||||||
let origin: OwnedServerName = serde_json::from_value(
|
let origin: OwnedServerName = serde_json::from_value(serde_json::to_value(
|
||||||
serde_json::to_value(
|
value
|
||||||
value
|
.get("origin")
|
||||||
.get("origin")
|
.ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?,
|
||||||
.ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?,
|
)?)
|
||||||
)
|
|
||||||
.expect("CanonicalJson is valid json value"),
|
|
||||||
)
|
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}"))))
|
err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}"))))
|
||||||
})?;
|
})?;
|
||||||
|
@ -1818,9 +1818,11 @@ pub async fn leave_room(
|
||||||
blurhash: None,
|
blurhash: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
if services.rooms.metadata.is_banned(room_id).await
|
let is_banned = services.rooms.metadata.is_banned(room_id);
|
||||||
|| services.rooms.metadata.is_disabled(room_id).await
|
let is_disabled = services.rooms.metadata.is_disabled(room_id);
|
||||||
{
|
|
||||||
|
pin_mut!(is_banned, is_disabled);
|
||||||
|
if is_banned.or(is_disabled).await {
|
||||||
// the room is banned/disabled, the room must be rejected locally since we
|
// the room is banned/disabled, the room must be rejected locally since we
|
||||||
// cant/dont want to federate with this server
|
// cant/dont want to federate with this server
|
||||||
services
|
services
|
||||||
|
@ -1840,18 +1842,21 @@ pub async fn leave_room(
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ask a remote server if we don't have this room and are not knocking on it
|
let dont_have_room = services
|
||||||
if !services
|
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.server_in_room(services.globals.server_name(), room_id)
|
.server_in_room(services.globals.server_name(), room_id)
|
||||||
.await && !services
|
.eq(&false);
|
||||||
|
|
||||||
|
let not_knocked = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.is_knocked(user_id, room_id)
|
.is_knocked(user_id, room_id)
|
||||||
.await
|
.eq(&false);
|
||||||
{
|
|
||||||
if let Err(e) = remote_leave_room(services, user_id, room_id).await {
|
// Ask a remote server if we don't have this room and are not knocking on it
|
||||||
|
if dont_have_room.and(not_knocked).await {
|
||||||
|
if let Err(e) = remote_leave_room(services, user_id, room_id).boxed().await {
|
||||||
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
|
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
|
||||||
// Don't tell the client about this error
|
// Don't tell the client about this error
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue