misc cleanup
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
57e0a5f65d
commit
f503ed918c
7 changed files with 49 additions and 71 deletions
|
@ -1333,10 +1333,8 @@ pub async fn validate_and_add_event_id(
|
||||||
services: &Services, pdu: &RawJsonValue, room_version: &RoomVersionId,
|
services: &Services, pdu: &RawJsonValue, room_version: &RoomVersionId,
|
||||||
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
|
||||||
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
|
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
|
||||||
let mut value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
let mut value: CanonicalJsonObject = serde_json::from_str(pdu.get())
|
||||||
debug_error!("Invalid PDU in server response: {pdu:#?}");
|
.map_err(|e| err!(BadServerResponse(debug_error!("Invalid PDU in server response: {e:?}"))))?;
|
||||||
err!(BadServerResponse("Invalid PDU in server response: {e:?}"))
|
|
||||||
})?;
|
|
||||||
let event_id = EventId::parse(format!(
|
let event_id = EventId::parse(format!(
|
||||||
"${}",
|
"${}",
|
||||||
ruma::signatures::reference_hash(&value, room_version).expect("ruma can calculate reference hashes")
|
ruma::signatures::reference_hash(&value, room_version).expect("ruma can calculate reference hashes")
|
||||||
|
@ -1478,10 +1476,8 @@ pub(crate) async fn invite_helper(
|
||||||
|
|
||||||
if *pdu.event_id != *event_id {
|
if *pdu.event_id != *event_id {
|
||||||
warn!(
|
warn!(
|
||||||
"Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}",
|
"Server {} changed invite event, that's not allowed in the spec: ours: {pdu_json:?}, theirs: {value:?}",
|
||||||
user_id.server_name(),
|
user_id.server_name(),
|
||||||
pdu_json,
|
|
||||||
value
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1564,20 +1560,19 @@ pub(crate) async fn invite_helper(
|
||||||
// Make a user leave all their joined rooms, forgets all rooms, and ignores
|
// Make a user leave all their joined rooms, forgets all rooms, and ignores
|
||||||
// errors
|
// errors
|
||||||
pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
|
pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
|
||||||
let all_rooms: Vec<_> = services
|
let rooms_joined = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_joined(user_id)
|
.rooms_joined(user_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned);
|
||||||
.chain(
|
|
||||||
services
|
let rooms_invited = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_invited(user_id)
|
.rooms_invited(user_id)
|
||||||
.map(|(r, _)| r),
|
.map(|(r, _)| r);
|
||||||
)
|
|
||||||
.collect()
|
let all_rooms: Vec<_> = rooms_joined.chain(rooms_invited).collect().await;
|
||||||
.await;
|
|
||||||
|
|
||||||
for room_id in all_rooms {
|
for room_id in all_rooms {
|
||||||
// ignore errors
|
// ignore errors
|
||||||
|
@ -1601,7 +1596,7 @@ pub async fn leave_room(services: &Services, user_id: &UserId, room_id: &RoomId,
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
if let Err(e) = remote_leave_room(services, user_id, room_id).await {
|
if let Err(e) = remote_leave_room(services, user_id, room_id).await {
|
||||||
warn!("Failed to leave room {} remotely: {}", user_id, e);
|
warn!("Failed to leave room {user_id} remotely: {e}");
|
||||||
// Don't tell the client about this error
|
// Don't tell the client about this error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -176,7 +176,7 @@ async fn send_state_event_for_key_helper(
|
||||||
.build_and_append_pdu(
|
.build_and_append_pdu(
|
||||||
PduBuilder {
|
PduBuilder {
|
||||||
event_type: event_type.to_string().into(),
|
event_type: event_type.to_string().into(),
|
||||||
content: serde_json::from_str(json.json().get()).expect("content is valid json"),
|
content: serde_json::from_str(json.json().get())?,
|
||||||
unsigned: None,
|
unsigned: None,
|
||||||
state_key: Some(state_key),
|
state_key: Some(state_key),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
|
|
|
@ -47,10 +47,7 @@ pub(crate) async fn create_invite_route(
|
||||||
.forbidden_remote_server_names
|
.forbidden_remote_server_names
|
||||||
.contains(&server.to_owned())
|
.contains(&server.to_owned())
|
||||||
{
|
{
|
||||||
return Err(Error::BadRequest(
|
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||||
ErrorKind::forbidden(),
|
|
||||||
"Server is banned on this homeserver.",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,15 +61,13 @@ pub(crate) async fn create_invite_route(
|
||||||
"Received federated/remote invite from banned server {origin} for room ID {}. Rejecting.",
|
"Received federated/remote invite from banned server {origin} for room ID {}. Rejecting.",
|
||||||
body.room_id
|
body.room_id
|
||||||
);
|
);
|
||||||
return Err(Error::BadRequest(
|
|
||||||
ErrorKind::forbidden(),
|
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||||
"Server is banned on this homeserver.",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(via) = &body.via {
|
if let Some(via) = &body.via {
|
||||||
if via.is_empty() {
|
if via.is_empty() {
|
||||||
return Err(Error::BadRequest(ErrorKind::InvalidParam, "via field must not be empty."));
|
return Err!(Request(InvalidParam("via field must not be empty.")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,10 +81,7 @@ pub(crate) async fn create_invite_route(
|
||||||
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
|
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
|
||||||
|
|
||||||
if !services.globals.server_is_ours(invited_user.server_name()) {
|
if !services.globals.server_is_ours(invited_user.server_name()) {
|
||||||
return Err(Error::BadRequest(
|
return Err!(Request(InvalidParam("User does not belong to this homeserver.")));
|
||||||
ErrorKind::InvalidParam,
|
|
||||||
"User does not belong to this homeserver.",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we're not ACL'ed from their room.
|
// Make sure we're not ACL'ed from their room.
|
||||||
|
@ -124,17 +116,11 @@ pub(crate) async fn create_invite_route(
|
||||||
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
|
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
|
||||||
|
|
||||||
if services.rooms.metadata.is_banned(&body.room_id).await && !services.users.is_admin(&invited_user).await {
|
if services.rooms.metadata.is_banned(&body.room_id).await && !services.users.is_admin(&invited_user).await {
|
||||||
return Err(Error::BadRequest(
|
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
|
||||||
ErrorKind::forbidden(),
|
|
||||||
"This room is banned on this homeserver.",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if services.globals.block_non_admin_invites() && !services.users.is_admin(&invited_user).await {
|
if services.globals.block_non_admin_invites() && !services.users.is_admin(&invited_user).await {
|
||||||
return Err(Error::BadRequest(
|
return Err!(Request(Forbidden("This server does not allow room invites.")));
|
||||||
ErrorKind::forbidden(),
|
|
||||||
"This server does not allow room invites.",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut invite_state = body.invite_room_state.clone();
|
let mut invite_state = body.invite_room_state.clone();
|
||||||
|
|
|
@ -16,9 +16,11 @@ use ruma::{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
||||||
|
serde::Raw,
|
||||||
to_device::DeviceIdOrAllDevices,
|
to_device::DeviceIdOrAllDevices,
|
||||||
OwnedEventId, ServerName,
|
OwnedEventId, ServerName,
|
||||||
};
|
};
|
||||||
|
use serde_json::value::RawValue as RawJsonValue;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -70,8 +72,8 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
"Starting txn",
|
"Starting txn",
|
||||||
);
|
);
|
||||||
|
|
||||||
let resolved_map = handle_pdus(&services, &client, &body, origin, &txn_start_time).await;
|
let resolved_map = handle_pdus(&services, &client, &body.pdus, origin, &txn_start_time).await;
|
||||||
handle_edus(&services, &client, &body, origin).await;
|
handle_edus(&services, &client, &body.edus, origin).await;
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
pdus = ?body.pdus.len(),
|
pdus = ?body.pdus.len(),
|
||||||
|
@ -91,11 +93,10 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_pdus(
|
async fn handle_pdus(
|
||||||
services: &Services, _client: &IpAddr, body: &Ruma<send_transaction_message::v1::Request>, origin: &ServerName,
|
services: &Services, _client: &IpAddr, pdus: &[Box<RawJsonValue>], origin: &ServerName, txn_start_time: &Instant,
|
||||||
txn_start_time: &Instant,
|
|
||||||
) -> ResolvedMap {
|
) -> ResolvedMap {
|
||||||
let mut parsed_pdus = Vec::with_capacity(body.pdus.len());
|
let mut parsed_pdus = Vec::with_capacity(pdus.len());
|
||||||
for pdu in &body.pdus {
|
for pdu in pdus {
|
||||||
parsed_pdus.push(match services.rooms.event_handler.parse_incoming_pdu(pdu).await {
|
parsed_pdus.push(match services.rooms.event_handler.parse_incoming_pdu(pdu).await {
|
||||||
Ok(t) => t,
|
Ok(t) => t,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -162,11 +163,8 @@ async fn handle_pdus(
|
||||||
resolved_map
|
resolved_map
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_edus(
|
async fn handle_edus(services: &Services, client: &IpAddr, edus: &[Raw<Edu>], origin: &ServerName) {
|
||||||
services: &Services, client: &IpAddr, body: &Ruma<send_transaction_message::v1::Request>, origin: &ServerName,
|
for edu in edus
|
||||||
) {
|
|
||||||
for edu in body
|
|
||||||
.edus
|
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
|
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
|
||||||
{
|
{
|
||||||
|
@ -178,7 +176,7 @@ async fn handle_edus(
|
||||||
Edu::DirectToDevice(content) => handle_edu_direct_to_device(services, client, origin, content).await,
|
Edu::DirectToDevice(content) => handle_edu_direct_to_device(services, client, origin, content).await,
|
||||||
Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(services, client, origin, content).await,
|
Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(services, client, origin, content).await,
|
||||||
Edu::_Custom(ref _custom) => {
|
Edu::_Custom(ref _custom) => {
|
||||||
debug_warn!(?body.edus, "received custom/unknown EDU");
|
debug_warn!(?edus, "received custom/unknown EDU");
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -198,7 +198,6 @@ impl Service {
|
||||||
Ok(None) => debug!("Command successful with no response"),
|
Ok(None) => debug!("Command successful with no response"),
|
||||||
Ok(Some(output)) | Err(output) => self
|
Ok(Some(output)) | Err(output) => self
|
||||||
.handle_response(output)
|
.handle_response(output)
|
||||||
.boxed()
|
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(default_log),
|
.unwrap_or_else(default_log),
|
||||||
}
|
}
|
||||||
|
@ -277,6 +276,7 @@ impl Service {
|
||||||
};
|
};
|
||||||
|
|
||||||
self.respond_to_room(content, &pdu.room_id, response_sender)
|
self.respond_to_room(content, &pdu.room_id, response_sender)
|
||||||
|
.boxed()
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,29 +1,29 @@
|
||||||
use conduit::{debug_warn, err, pdu::gen_event_id_canonical_json, Err, Result};
|
use conduit::{err, pdu::gen_event_id_canonical_json, result::FlatOk, Result};
|
||||||
use ruma::{CanonicalJsonObject, OwnedEventId, OwnedRoomId, RoomId};
|
use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, RoomId};
|
||||||
use serde_json::value::RawValue as RawJsonValue;
|
use serde_json::value::RawValue as RawJsonValue;
|
||||||
|
|
||||||
impl super::Service {
|
impl super::Service {
|
||||||
pub async fn parse_incoming_pdu(
|
pub async fn parse_incoming_pdu(
|
||||||
&self, pdu: &RawJsonValue,
|
&self, pdu: &RawJsonValue,
|
||||||
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
|
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
|
||||||
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get())
|
||||||
debug_warn!("Error parsing incoming event {pdu:#?}");
|
.map_err(|e| err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}"))))?;
|
||||||
err!(BadServerResponse("Error parsing incoming event {e:?}"))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let room_id: OwnedRoomId = value
|
let room_id: OwnedRoomId = value
|
||||||
.get("room_id")
|
.get("room_id")
|
||||||
.and_then(|id| RoomId::parse(id.as_str()?).ok())
|
.and_then(CanonicalJsonValue::as_str)
|
||||||
.ok_or_else(|| err!(Request(InvalidParam("Invalid room id in pdu"))))?;
|
.map(RoomId::parse)
|
||||||
|
.flat_ok_or(err!(Request(InvalidParam("Invalid room_id in pdu"))))?;
|
||||||
|
|
||||||
let Ok(room_version_id) = self.services.state.get_room_version(&room_id).await else {
|
let room_version_id = self
|
||||||
return Err!("Server is not in room {room_id}");
|
.services
|
||||||
};
|
.state
|
||||||
|
.get_room_version(&room_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| err!("Server is not in room {room_id}"))?;
|
||||||
|
|
||||||
let Ok((event_id, value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
|
let (event_id, value) = gen_event_id_canonical_json(pdu, &room_version_id)
|
||||||
// Event could not be converted to canonical json
|
.map_err(|e| err!(Request(InvalidParam("Could not convert event to canonical json: {e}"))))?;
|
||||||
return Err!(Request(InvalidParam("Could not convert event to canonical json.")));
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((event_id, value, room_id))
|
Ok((event_id, value, room_id))
|
||||||
}
|
}
|
||||||
|
|
|
@ -661,8 +661,7 @@ impl Service {
|
||||||
.await
|
.await
|
||||||
.or_else(|_| {
|
.or_else(|_| {
|
||||||
if event_type == TimelineEventType::RoomCreate {
|
if event_type == TimelineEventType::RoomCreate {
|
||||||
let content = serde_json::from_str::<RoomCreateEventContent>(content.get())
|
let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
|
||||||
.expect("Invalid content in RoomCreate pdu.");
|
|
||||||
Ok(content.room_version)
|
Ok(content.room_version)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::InconsistentRoomState(
|
Err(Error::InconsistentRoomState(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue