refactor fed membership endpoints, add missing checks, some cleanup, reduce line width

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-12-07 01:07:01 -05:00
parent 61670370ed
commit 9d59f777d2
12 changed files with 474 additions and 498 deletions

View file

@ -7,12 +7,12 @@ use std::{
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduit::{
debug, debug_info, debug_warn, err, error, info, pdu,
pdu::{gen_event_id_canonical_json, PduBuilder},
debug, debug_info, debug_warn, err, error, info,
pdu::{self, gen_event_id_canonical_json, PduBuilder},
result::FlatOk,
trace, utils,
utils::{shuffle, IterStream, ReadyExt},
warn, Err, Error, PduEvent, Result,
trace,
utils::{self, shuffle, IterStream, ReadyExt},
warn, Err, PduEvent, Result,
};
use futures::{join, FutureExt, StreamExt};
use ruma::{
@ -153,21 +153,14 @@ async fn banned_room_check(
/// rules locally
/// - If the server does not know about the room: asks other servers over
/// federation
#[tracing::instrument(skip_all, fields(%client_ip), name = "join")]
#[tracing::instrument(skip_all, fields(%client), name = "join")]
pub(crate) async fn join_room_by_id_route(
State(services): State<crate::State>, InsecureClientIp(client_ip): InsecureClientIp,
State(services): State<crate::State>, InsecureClientIp(client): InsecureClientIp,
body: Ruma<join_room_by_id::v3::Request>,
) -> Result<join_room_by_id::v3::Response> {
let sender_user = body.sender_user();
banned_room_check(
&services,
sender_user,
Some(&body.room_id),
body.room_id.server_name(),
client_ip,
)
.await?;
banned_room_check(&services, sender_user, Some(&body.room_id), body.room_id.server_name(), client).await?;
// There is no body.server_name for /roomId/join
let mut servers: Vec<_> = services
@ -354,10 +347,7 @@ pub(crate) async fn invite_user_route(
"User {sender_user} is not an admin and attempted to send an invite to room {}",
&body.room_id
);
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Invites are not allowed on this server.",
));
return Err!(Request(Forbidden("Invites are not allowed on this server.")));
}
banned_room_check(&services, sender_user, Some(&body.room_id), body.room_id.server_name(), client).await?;
@ -388,7 +378,7 @@ pub(crate) async fn invite_user_route(
Ok(invite_user::v3::Response {})
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "User not found."))
Err!(Request(NotFound("User not found.")))
}
}
@ -686,6 +676,18 @@ pub async fn join_room_by_id_helper(
});
}
if let Ok(membership) = services
.rooms
.state_accessor
.get_member(room_id, sender_user)
.await
{
if membership.membership == MembershipState::Ban {
debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
return Err!(Request(Forbidden("You are banned from the room.")));
}
}
let server_in_room = services
.rooms
.state_cache
@ -730,19 +732,29 @@ async fn join_room_by_id_helper_remote(
));
}
let mut join_event_stub: CanonicalJsonObject = serde_json::from_str(make_join_response.event.get())
.map_err(|e| err!(BadServerResponse("Invalid make_join event json received from server: {e:?}")))?;
let mut join_event_stub: CanonicalJsonObject =
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
err!(BadServerResponse(warn!(
"Invalid make_join event json received from server: {e:?}"
)))
})?;
let join_authorized_via_users_server = join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
let join_authorized_via_users_server = {
use RoomVersionId::*;
if !matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok())
} else {
None
}
};
// TODO: Is origin needed?
join_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()),
@ -811,65 +823,46 @@ async fn join_room_by_id_helper_remote(
info!("send_join finished");
if join_authorized_via_users_server.is_some() {
use RoomVersionId::*;
match &room_version_id {
V1 | V2 | V3 | V4 | V5 | V6 | V7 => {
warn!(
"Found `join_authorised_via_users_server` but room {} is version {}. Ignoring.",
room_id, &room_version_id
);
},
// only room versions 8 and above using `join_authorized_via_users_server` (restricted joins) need to
// validate and send signatures
_ => {
if let Some(signed_raw) = &send_join_response.room_state.event {
debug_info!(
"There is a signed event. This room is probably using restricted joins. Adding signature to \
our event"
if let Some(signed_raw) = &send_join_response.room_state.event {
debug_info!(
"There is a signed event with join_authorized_via_users_server. This room is probably using \
restricted joins. Adding signature to our event"
);
let (signed_event_id, signed_value) = gen_event_id_canonical_json(signed_raw, &room_version_id)
.map_err(|e| err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))))?;
if signed_event_id != event_id {
return Err!(Request(BadJson(
warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID")
)));
}
match signed_value["signatures"]
.as_object()
.ok_or_else(|| err!(BadServerResponse(warn!("Server {remote_server} sent invalid signatures type"))))
.and_then(|e| {
e.get(remote_server.as_str()).ok_or_else(|| {
err!(BadServerResponse(warn!(
"Server {remote_server} did not send its signature for a restricted room"
)))
})
}) {
Ok(signature) => {
join_event
.get_mut("signatures")
.expect("we created a valid pdu")
.as_object_mut()
.expect("we created a valid pdu")
.insert(remote_server.to_string(), signature.clone());
},
Err(e) => {
warn!(
"Server {remote_server} sent invalid signature in send_join signatures for event \
{signed_value:?}: {e:?}",
);
let Ok((signed_event_id, signed_value)) = gen_event_id_canonical_json(signed_raw, &room_version_id)
else {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
};
if signed_event_id != event_id {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Server sent event with wrong event id",
));
}
match signed_value["signatures"]
.as_object()
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Server sent invalid signatures type",
))
.and_then(|e| {
e.get(remote_server.as_str())
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Server did not send its signature"))
}) {
Ok(signature) => {
join_event
.get_mut("signatures")
.expect("we created a valid pdu")
.as_object_mut()
.expect("we created a valid pdu")
.insert(remote_server.to_string(), signature.clone());
},
Err(e) => {
warn!(
"Server {remote_server} sent invalid signature in sendjoin signatures for event \
{signed_value:?}: {e:?}",
);
},
}
}
},
},
}
}
}
@ -1041,14 +1034,13 @@ async fn join_room_by_id_helper_local(
services: &Services, sender_user: &UserId, room_id: &RoomId, reason: Option<String>, servers: &[OwnedServerName],
_third_party_signed: Option<&ThirdPartySigned>, state_lock: RoomMutexGuard,
) -> Result {
debug!("We can join locally");
debug_info!("We can join locally");
let join_rules_event_content = services
.rooms
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomJoinRules, "")
.await
.map(|content: RoomJoinRulesEventContent| content);
.room_state_get_content::<RoomJoinRulesEventContent>(room_id, &StateEventType::RoomJoinRules, "")
.await;
let restriction_rooms = match join_rules_event_content {
Ok(RoomJoinRulesEventContent {
@ -1064,40 +1056,36 @@ async fn join_room_by_id_helper_local(
_ => Vec::new(),
};
let local_members: Vec<_> = services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|user| services.globals.user_is_local(user))
.map(ToOwned::to_owned)
.collect()
.await;
let mut join_authorized_via_users_server: Option<OwnedUserId> = None;
if restriction_rooms
.iter()
.stream()
.any(|restriction_room_id| {
let join_authorized_via_users_server: Option<OwnedUserId> = {
if restriction_rooms
.iter()
.stream()
.any(|restriction_room_id| {
services
.rooms
.state_cache
.is_joined(sender_user, restriction_room_id)
})
.await
{
services
.rooms
.state_cache
.is_joined(sender_user, restriction_room_id)
})
.await
{
for user in local_members {
if services
.rooms
.state_accessor
.user_can_invite(room_id, &user, sender_user, &state_lock)
.local_users_in_room(room_id)
.filter(|user| {
services
.rooms
.state_accessor
.user_can_invite(room_id, user, sender_user, &state_lock)
})
.boxed()
.next()
.await
{
join_authorized_via_users_server = Some(user);
break;
}
.map(ToOwned::to_owned)
} else {
None
}
}
};
let content = RoomMemberEventContent {
displayname: services.users.displayname(sender_user).await.ok(),
@ -1109,7 +1097,7 @@ async fn join_room_by_id_helper_local(
};
// Try normal join first
let error = match services
let Err(error) = services
.rooms
.timeline
.build_and_append_pdu(
@ -1119,130 +1107,125 @@ async fn join_room_by_id_helper_local(
&state_lock,
)
.await
{
Ok(_) => return Ok(()),
Err(e) => e,
else {
return Ok(());
};
if !restriction_rooms.is_empty()
&& servers
.iter()
.any(|server_name| !services.globals.server_is_ours(server_name))
if restriction_rooms.is_empty()
&& (servers.is_empty() || servers.len() == 1 && services.globals.server_is_ours(&servers[0]))
{
warn!("We couldn't do the join locally, maybe federation can help to satisfy the restricted join requirements");
let (make_join_response, remote_server) = make_join_request(services, sender_user, room_id, servers).await?;
return Err(error);
}
let Some(room_version_id) = make_join_response.room_version else {
return Err!(BadServerResponse("Remote room version is not supported by conduwuit"));
};
warn!("We couldn't do the join locally, maybe federation can help to satisfy the restricted join requirements");
let Ok((make_join_response, remote_server)) = make_join_request(services, sender_user, room_id, servers).await
else {
return Err(error);
};
if !services.server.supported_room_version(&room_version_id) {
return Err!(BadServerResponse(
"Remote room version {room_version_id} is not supported by conduwuit"
));
}
let Some(room_version_id) = make_join_response.room_version else {
return Err!(BadServerResponse("Remote room version is not supported by conduwuit"));
};
let mut join_event_stub: CanonicalJsonObject = serde_json::from_str(make_join_response.event.get())
.map_err(|e| err!(BadServerResponse("Invalid make_join event json received from server: {e:?}")))?;
let join_authorized_via_users_server = join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
// TODO: Is origin needed?
join_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()),
);
join_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
join_event_stub.insert(
"content".to_owned(),
to_canonical_value(RoomMemberEventContent {
displayname: services.users.displayname(sender_user).await.ok(),
avatar_url: services.users.avatar_url(sender_user).await.ok(),
blurhash: services.users.blurhash(sender_user).await.ok(),
reason,
join_authorized_via_users_server,
..RoomMemberEventContent::new(MembershipState::Join)
})
.expect("event is valid, we just created it"),
);
if !services.server.supported_room_version(&room_version_id) {
return Err!(BadServerResponse(
"Remote room version {room_version_id} is not supported by conduwuit"
));
}
// We keep the "event_id" in the pdu only in v1 or
// v2 rooms
match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => {
join_event_stub.remove("event_id");
let mut join_event_stub: CanonicalJsonObject = serde_json::from_str(make_join_response.event.get())
.map_err(|e| err!(BadServerResponse("Invalid make_join event json received from server: {e:?}")))?;
let join_authorized_via_users_server = join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
join_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()),
);
join_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
join_event_stub.insert(
"content".to_owned(),
to_canonical_value(RoomMemberEventContent {
displayname: services.users.displayname(sender_user).await.ok(),
avatar_url: services.users.avatar_url(sender_user).await.ok(),
blurhash: services.users.blurhash(sender_user).await.ok(),
reason,
join_authorized_via_users_server,
..RoomMemberEventContent::new(MembershipState::Join)
})
.expect("event is valid, we just created it"),
);
// We keep the "event_id" in the pdu only in v1 or
// v2 rooms
match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => {
join_event_stub.remove("event_id");
},
};
// In order to create a compatible ref hash (EventID) the `hashes` field needs
// to be present
services
.server_keys
.hash_and_sign_event(&mut join_event_stub, &room_version_id)?;
// Generate event id
let event_id = pdu::gen_event_id(&join_event_stub, &room_version_id)?;
// Add event_id back
join_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
// It has enough fields to be called a proper event now
let join_event = join_event_stub;
let send_join_response = services
.sending
.send_synapse_request(
&remote_server,
federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: services
.sending
.convert_to_outgoing_federation_event(join_event.clone())
.await,
},
};
)
.await?;
// In order to create a compatible ref hash (EventID) the `hashes` field needs
// to be present
services
.server_keys
.hash_and_sign_event(&mut join_event_stub, &room_version_id)?;
if let Some(signed_raw) = send_join_response.room_state.event {
let (signed_event_id, signed_value) = gen_event_id_canonical_json(&signed_raw, &room_version_id)
.map_err(|e| err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))))?;
// Generate event id
let event_id = pdu::gen_event_id(&join_event_stub, &room_version_id)?;
// Add event_id back
join_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
// It has enough fields to be called a proper event now
let join_event = join_event_stub;
let send_join_response = services
.sending
.send_synapse_request(
&remote_server,
federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: services
.sending
.convert_to_outgoing_federation_event(join_event.clone())
.await,
},
)
.await?;
if let Some(signed_raw) = send_join_response.room_state.event {
let Ok((signed_event_id, signed_value)) = gen_event_id_canonical_json(&signed_raw, &room_version_id) else {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
};
if signed_event_id != event_id {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Server sent event with wrong event id",
));
}
drop(state_lock);
services
.rooms
.event_handler
.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
.await?;
} else {
return Err(error);
if signed_event_id != event_id {
return Err!(Request(BadJson(
warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID")
)));
}
drop(state_lock);
services
.rooms
.event_handler
.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
.await?;
} else {
return Err(error);
}
@ -1317,13 +1300,10 @@ async fn make_join_request(
pub(crate) async fn invite_helper(
services: &Services, sender_user: &UserId, user_id: &UserId, room_id: &RoomId, reason: Option<String>,
is_direct: bool,
) -> Result<()> {
) -> Result {
if !services.users.is_admin(sender_user).await && services.globals.block_non_admin_invites() {
info!("User {sender_user} is not an admin and attempted to send an invite to room {room_id}");
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Invites are not allowed on this server.",
));
return Err!(Request(Forbidden("Invites are not allowed on this server.")));
}
if !services.globals.user_is_local(user_id) {
@ -1382,30 +1362,24 @@ pub(crate) async fn invite_helper(
// We do not add the event_id field to the pdu here because of signature and
// hashes checks
let Ok((event_id, value)) = gen_event_id_canonical_json(&response.event, &room_version_id) else {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
};
let (event_id, value) = gen_event_id_canonical_json(&response.event, &room_version_id)
.map_err(|e| err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))))?;
if *pdu.event_id != *event_id {
warn!(
"Server {} changed invite event, that's not allowed in the spec: ours: {pdu_json:?}, theirs: {value:?}",
user_id.server_name(),
);
if pdu.event_id != event_id {
return Err!(Request(BadJson(
warn!(%pdu.event_id, %event_id, "Server {} sent event with wrong event ID", user_id.server_name())
)));
}
let origin: OwnedServerName = serde_json::from_value(
serde_json::to_value(
value
.get("origin")
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Event needs an origin field."))?,
.ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?,
)
.expect("CanonicalJson is valid json value"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
.map_err(|e| err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}")))))?;
let pdu_id = services
.rooms
@ -1414,8 +1388,7 @@ pub(crate) async fn invite_helper(
.await?
.ok_or_else(|| err!(Request(InvalidParam("Could not accept incoming PDU as timeline event."))))?;
services.sending.send_pdu_room(room_id, &pdu_id).await?;
return Ok(());
return services.sending.send_pdu_room(room_id, &pdu_id).await;
}
if !services
@ -1424,10 +1397,9 @@ pub(crate) async fn invite_helper(
.is_joined(sender_user, room_id)
.await
{
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"You don't have permission to view this room.",
));
return Err!(Request(Forbidden(
"You must be joined in the room you are trying to invite from."
)));
}
let state_lock = services.rooms.state.mutex.lock(room_id).await;
@ -1599,7 +1571,11 @@ async fn remote_leave_room(services: &Services, user_id: &UserId, room_id: &Room
.map(|user| user.server_name().to_owned()),
);
debug!("servers in remote_leave_room: {servers:?}");
if let Some(room_id_server_name) = room_id.server_name() {
servers.insert(room_id_server_name.to_owned());
}
debug_info!("servers in remote_leave_room: {servers:?}");
for remote_server in servers {
let make_leave_response = services