feat: reject invites over federation

This commit is contained in:
Timo Kösters 2021-04-13 15:00:45 +02:00
parent 662a0cf1df
commit b4f79b77ba
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
10 changed files with 391 additions and 241 deletions

View file

@ -1,7 +1,7 @@
use crate::{utils, Error, Result};
use http::header::{HeaderValue, CONTENT_TYPE};
use log::warn;
use ruma::api::OutgoingRequest;
use ruma::api::{IncomingResponse, OutgoingRequest};
use std::{
convert::{TryFrom, TryInto},
fmt::Debug,
@ -66,15 +66,10 @@ where
let status = reqwest_response.status();
let body = reqwest_response
.bytes()
.await
.unwrap_or_else(|e| {
warn!("server error: {}", e);
Vec::new().into()
}) // TODO: handle timeout
.into_iter()
.collect::<Vec<_>>();
let body = reqwest_response.bytes().await.unwrap_or_else(|e| {
warn!("server error: {}", e);
Vec::new().into()
}); // TODO: handle timeout
if status != 200 {
warn!(
@ -86,7 +81,7 @@ where
);
}
let response = T::IncomingResponse::try_from(
let response = T::IncomingResponse::try_from_http_response(
http_response
.body(body)
.expect("reqwest body is valid http body"),

View file

@ -91,37 +91,7 @@ pub async fn leave_room_route(
) -> ConduitResult<leave_room::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>(
db.rooms
.room_state_get(
&body.room_id,
&EventType::RoomMember,
&sender_user.to_string(),
)?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
))?
.content,
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = member::MembershipState::Leave;
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_user.to_string()),
redacts: None,
},
&sender_user,
&body.room_id,
&db,
)?;
db.rooms.leave_room(sender_user, &body.room_id, &db).await?;
db.flush().await?;
@ -480,6 +450,7 @@ async fn join_room_by_id_helper(
Error::BadServerResponse("Invalid make_join event json received from server.")
})?;
// TODO: Is origin needed?
join_event_stub.insert(
"origin".to_owned(),
to_canonical_value(db.globals.server_name())
@ -699,5 +670,7 @@ async fn join_room_by_id_helper(
)?;
}
db.flush().await?;
Ok(join_room_by_id::Response::new(room_id.clone()).into())
}

View file

@ -1,6 +1,5 @@
use super::State;
use crate::{ConduitResult, Database, Error, Ruma};
use log::error;
use ruma::{
api::client::r0::sync::sync_events,
events::{room::member::MembershipState, AnySyncEphemeralRoomEvent, EventType},
@ -494,83 +493,17 @@ pub async fn sync_events_route(
}
let mut left_rooms = BTreeMap::new();
for room_id in db.rooms.rooms_left(&sender_user) {
let room_id = room_id?;
for result in db.rooms.rooms_left(&sender_user) {
let (room_id, left_state_events) = result?;
let left_count = db.rooms.get_left_count(&room_id, &sender_user)?;
let since_member = if let Some(since_member) = db
.rooms
.pdus_after(sender_user, &room_id, since)
.next()
.and_then(|pdu| pdu.ok())
.and_then(|pdu| {
db.rooms
.pdu_shortstatehash(&pdu.1.event_id)
.ok()?
.ok_or_else(|| {
error!("{:?}", pdu.1);
Error::bad_database("Pdu in db doesn't have a state hash.")
})
.ok()
})
.and_then(|shortstatehash| {
db.rooms
.state_get(shortstatehash, &EventType::RoomMember, sender_user.as_str())
.ok()?
.ok_or_else(|| Error::bad_database("State hash in db doesn't have a state."))
.ok()
})
.and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
pdu.content.clone(),
)
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.map(|content| (pdu, content))
.ok()
}) {
since_member
} else {
// We couldn't find the since_member event. This is very weird - we better abort
// Left before last sync
if Some(since) >= left_count {
continue;
};
}
let left_since_last_sync = since_member.1.membership == MembershipState::Join;
let left_room = if left_since_last_sync {
device_list_left.extend(
db.rooms
.room_members(&room_id)
.filter_map(|user_id| Some(user_id.ok()?))
.filter(|user_id| {
// Don't send key updates from the sender to the sender
sender_user != user_id
})
.filter(|user_id| {
// Only send if the sender doesn't share any encrypted room with the target
// anymore
!share_encrypted_room(&db, sender_user, user_id, &room_id)
}),
);
let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?;
let mut room_events = pdus
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
.take_while(|(_, pdu)| &since_member.0 != pdu)
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect::<Vec<_>>();
room_events.push(since_member.0.to_sync_room_event());
sync_events::LeftRoom {
account_data: sync_events::AccountData { events: Vec::new() },
timeline: sync_events::Timeline {
limited: false,
prev_batch: Some(next_batch.clone()),
events: room_events,
},
state: sync_events::State { events: Vec::new() },
}
} else {
left_rooms.insert(
room_id.clone(),
sync_events::LeftRoom {
account_data: sync_events::AccountData { events: Vec::new() },
timeline: sync_events::Timeline {
@ -578,13 +511,11 @@ pub async fn sync_events_route(
prev_batch: Some(next_batch.clone()),
events: Vec::new(),
},
state: sync_events::State { events: Vec::new() },
}
};
if !left_room.is_empty() {
left_rooms.insert(room_id.clone(), left_room);
}
state: sync_events::State {
events: left_state_events,
},
},
);
}
let mut invited_rooms = BTreeMap::new();

View file

@ -163,7 +163,8 @@ impl Database {
roomuseroncejoinedids: db.open_tree("roomuseroncejoinedids")?,
userroomid_invitestate: db.open_tree("userroomid_invitestate")?,
roomuserid_invitecount: db.open_tree("roomuserid_invitecount")?,
userroomid_left: db.open_tree("userroomid_left")?,
userroomid_leftstate: db.open_tree("userroomid_leftstate")?,
roomuserid_leftcount: db.open_tree("roomuserid_leftcount")?,
userroomid_notificationcount: db.open_tree("userroomid_notificationcount")?,
userroomid_highlightcount: db.open_tree("userroomid_highlightcount")?,
@ -244,7 +245,7 @@ impl Database {
.userroomid_invitestate
.watch_prefix(&userid_prefix),
);
futures.push(self.rooms.userroomid_left.watch_prefix(&userid_prefix));
futures.push(self.rooms.userroomid_leftstate.watch_prefix(&userid_prefix));
// Events for rooms we are in
for room_id in self.rooms.rooms_joined(user_id).filter_map(|r| r.ok()) {

View file

@ -7,7 +7,7 @@ use ruma::{
self,
v1::{Device, Notification, NotificationCounts, NotificationPriority},
},
OutgoingRequest,
IncomingResponse, OutgoingRequest,
},
events::{room::power_levels::PowerLevelsEventContent, EventType},
push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
@ -129,15 +129,10 @@ where
let status = reqwest_response.status();
let body = reqwest_response
.bytes()
.await
.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}) // TODO: handle timeout
.into_iter()
.collect::<Vec<_>>();
let body = reqwest_response.bytes().await.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
if status != 200 {
info!(
@ -149,7 +144,7 @@ where
);
}
let response = T::IncomingResponse::try_from(
let response = T::IncomingResponse::try_from_http_response(
http_response
.body(body)
.expect("reqwest body is valid http body"),

View file

@ -1,17 +1,18 @@
mod edus;
pub use edus::RoomEdus;
use member::MembershipState;
use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use log::{debug, error, warn};
use regex::Regex;
use ring::digest;
use ruma::{
api::client::error::ErrorKind,
api::{client::error::ErrorKind, federation},
events::{
ignored_user_list, push_rules,
room::{create::CreateEventContent, member, message},
AnyStrippedStateEvent, EventType,
AnyStrippedStateEvent, AnySyncStateEvent, EventType,
},
push::{self, Action, Tweak},
serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw},
@ -54,7 +55,8 @@ pub struct Rooms {
pub(super) roomuseroncejoinedids: sled::Tree,
pub(super) userroomid_invitestate: sled::Tree, // InviteState = Vec<Raw<Pdu>>
pub(super) roomuserid_invitecount: sled::Tree, // InviteCount = Count
pub(super) userroomid_left: sled::Tree,
pub(super) userroomid_leftstate: sled::Tree,
pub(super) roomuserid_leftcount: sled::Tree,
pub(super) userroomid_notificationcount: sled::Tree, // NotifyCount = u64
pub(super) userroomid_highlightcount: sled::Tree, // HightlightCount = u64
@ -671,7 +673,7 @@ impl Rooms {
.users
.iter()
.filter_map(|r| r.ok())
.filter(|user_id| db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false))
.filter(|user_id| self.is_joined(&user_id, &pdu.room_id).unwrap_or(false))
{
// Don't notify the user of their own events
if user == pdu.sender {
@ -782,9 +784,11 @@ impl Rooms {
{
state.push(e.to_stripped_state_event());
}
if let Some(e) =
self.room_state_get(&pdu.room_id, &EventType::RoomMember, pdu.sender.as_str())?
{
if let Some(e) = self.room_state_get(
&pdu.room_id,
&EventType::RoomMember,
pdu.sender.as_str(),
)? {
state.push(e.to_stripped_state_event());
}
@ -1380,7 +1384,7 @@ impl Rooms {
.state_key
.as_ref()
.map_or(false, |state_key| users.is_match(&state_key))
|| db.rooms.room_members(&room_id).any(|userid| {
|| self.room_members(&room_id).any(|userid| {
userid.map_or(false, |userid| users.is_match(userid.as_str()))
})
};
@ -1537,7 +1541,7 @@ impl Rooms {
user_id: &UserId,
membership: member::MembershipState,
sender: &UserId,
invite_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
account_data: &super::account_data::AccountData,
globals: &super::globals::Globals,
) -> Result<()> {
@ -1643,7 +1647,8 @@ impl Rooms {
self.roomuserid_joined.insert(&roomuser_id, &[])?;
self.userroomid_invitestate.remove(&userroom_id)?;
self.roomuserid_invitecount.remove(&roomuser_id)?;
self.userroomid_left.remove(&userroom_id)?;
self.userroomid_leftstate.remove(&userroom_id)?;
self.roomuserid_leftcount.remove(&roomuser_id)?;
}
member::MembershipState::Invite => {
// We want to know if the sender is ignored by the receiver
@ -1664,14 +1669,15 @@ impl Rooms {
self.roomserverids.insert(&roomserver_id, &[])?;
self.userroomid_invitestate.insert(
&userroom_id,
serde_json::to_vec(&invite_state.unwrap_or_default())
serde_json::to_vec(&last_state.unwrap_or_default())
.expect("state to bytes always works"),
)?;
self.roomuserid_invitecount
.insert(&roomuser_id, &globals.next_count()?.to_be_bytes())?;
self.userroomid_joined.remove(&userroom_id)?;
self.roomuserid_joined.remove(&roomuser_id)?;
self.userroomid_left.remove(&userroom_id)?;
self.userroomid_leftstate.remove(&userroom_id)?;
self.roomuserid_leftcount.remove(&roomuser_id)?;
}
member::MembershipState::Leave | member::MembershipState::Ban => {
if self
@ -1682,7 +1688,12 @@ impl Rooms {
{
self.roomserverids.remove(&roomserver_id)?;
}
self.userroomid_left.insert(&userroom_id, &[])?;
self.userroomid_leftstate.insert(
&userroom_id,
serde_json::to_vec(&Vec::<Raw<AnySyncStateEvent>>::new()).unwrap(),
)?; // TODO
self.roomuserid_leftcount
.insert(&roomuser_id, &globals.next_count()?.to_be_bytes())?;
self.userroomid_joined.remove(&userroom_id)?;
self.roomuserid_joined.remove(&roomuser_id)?;
self.userroomid_invitestate.remove(&userroom_id)?;
@ -1694,13 +1705,191 @@ impl Rooms {
Ok(())
}
pub async fn leave_room(
&self,
user_id: &UserId,
room_id: &RoomId,
db: &Database,
) -> Result<()> {
// Ask a remote server if we don't have this room
if !self.exists(room_id)? && room_id.server_name() != db.globals.server_name() {
if let Err(e) = self.remote_leave_room(user_id, room_id, db).await {
warn!("Failed to leave room {} remotely: {}", user_id, e);
// Don't tell the client about this error
}
let last_state = self
.invite_state(user_id, room_id)?
.map_or_else(|| self.left_state(user_id, room_id), |s| Ok(Some(s)))?;
// We always drop the invite, we can't rely on other servers
self.update_membership(
room_id,
user_id,
MembershipState::Leave,
user_id,
last_state,
&db.account_data,
&db.globals,
)?;
} else {
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>(
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
))?
.content,
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = member::MembershipState::Leave;
self.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(event)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
user_id,
room_id,
db,
)?;
}
Ok(())
}
async fn remote_leave_room(
&self,
user_id: &UserId,
room_id: &RoomId,
db: &Database,
) -> Result<()> {
let mut make_leave_response_and_server = Err(Error::BadServerResponse(
"No server available to assist in leaving.",
));
let invite_state = db
.rooms
.invite_state(user_id, room_id)?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"User is not invited.",
))?;
let servers = invite_state
.iter()
.filter_map(|event| {
serde_json::from_str::<serde_json::Value>(&event.json().to_string()).ok()
})
.filter_map(|event| event.get("sender").cloned())
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
.filter_map(|sender| UserId::try_from(sender).ok())
.map(|user| user.server_name().to_owned());
for remote_server in servers {
let make_leave_response = db
.sending
.send_federation_request(
&db.globals,
&remote_server,
federation::membership::get_leave_event::v1::Request { room_id, user_id },
)
.await;
make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server));
if make_leave_response_and_server.is_ok() {
break;
}
}
let (make_leave_response, remote_server) = make_leave_response_and_server?;
let room_version = match make_leave_response.room_version {
Some(room_version) if room_version == RoomVersionId::Version6 => room_version,
_ => return Err(Error::BadServerResponse("Room version is not supported")),
};
let mut leave_event_stub =
serde_json::from_str::<CanonicalJsonObject>(make_leave_response.event.json().get())
.map_err(|_| {
Error::BadServerResponse("Invalid make_leave event json received from server.")
})?;
// TODO: Is origin needed?
leave_event_stub.insert(
"origin".to_owned(),
to_canonical_value(db.globals.server_name())
.map_err(|_| Error::bad_database("Invalid server name found"))?,
);
leave_event_stub.insert(
"origin_server_ts".to_owned(),
to_canonical_value(utils::millis_since_unix_epoch())
.expect("Timestamp is valid js_int value"),
);
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
leave_event_stub.remove("event_id");
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
ruma::signatures::hash_and_sign_event(
db.globals.server_name().as_str(),
db.globals.keypair(),
&mut leave_event_stub,
&room_version,
)
.expect("event is valid, we just created it");
// Generate event id
let event_id = EventId::try_from(&*format!(
"${}",
ruma::signatures::reference_hash(&leave_event_stub, &room_version)
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
// Add event_id back
leave_event_stub.insert(
"event_id".to_owned(),
to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
);
// It has enough fields to be called a proper event now
let leave_event = leave_event_stub;
db.sending
.send_federation_request(
&db.globals,
&remote_server,
federation::membership::create_leave_event::v2::Request {
room_id,
event_id: &event_id,
pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()),
},
)
.await?;
Ok(())
}
/// Makes a user forget a room.
pub fn forget(&self, room_id: &RoomId, user_id: &UserId) -> Result<()> {
let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
self.userroomid_left.remove(userroom_id)?;
let mut roomuser_id = room_id.as_bytes().to_vec();
roomuser_id.push(0xff);
roomuser_id.extend_from_slice(user_id.as_bytes());
self.userroomid_leftstate.remove(userroom_id)?;
self.roomuserid_leftcount.remove(roomuser_id)?;
Ok(())
}
@ -1977,7 +2166,6 @@ impl Rooms {
})
}
/// Returns an iterator over all invited members of a room.
#[tracing::instrument(skip(self))]
pub fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.as_bytes().to_vec();
@ -1993,6 +2181,21 @@ impl Rooms {
})
}
#[tracing::instrument(skip(self))]
pub fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
self.roomuserid_leftcount
.get(key)?
.map_or(Ok(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid leftcount in db.")
})?))
})
}
/// Returns an iterator over all rooms this user joined.
#[tracing::instrument(skip(self))]
pub fn rooms_joined(&self, user_id: &UserId) -> impl Iterator<Item = Result<RoomId>> {
@ -2045,25 +2248,75 @@ impl Rooms {
})
}
#[tracing::instrument(skip(self))]
pub fn invite_state(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&room_id.as_bytes());
self.userroomid_invitestate
.get(key)?
.map(|state| {
let state = serde_json::from_slice(&state)
.map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?;
Ok(state)
})
.transpose()
}
#[tracing::instrument(skip(self))]
pub fn left_state(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&room_id.as_bytes());
self.userroomid_leftstate
.get(key)?
.map(|state| {
let state = serde_json::from_slice(&state)
.map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?;
Ok(state)
})
.transpose()
}
/// Returns an iterator over all rooms a user left.
#[tracing::instrument(skip(self))]
pub fn rooms_left(&self, user_id: &UserId) -> impl Iterator<Item = Result<RoomId>> {
pub fn rooms_left(
&self,
user_id: &UserId,
) -> impl Iterator<Item = Result<(RoomId, Vec<Raw<AnySyncStateEvent>>)>> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
self.userroomid_left.scan_prefix(prefix).keys().map(|key| {
Ok(RoomId::try_from(
self.userroomid_leftstate.scan_prefix(prefix).map(|r| {
let (key, state) = r?;
let room_id = RoomId::try_from(
utils::string_from_bytes(
&key?
.rsplit(|&b| b == 0xff)
&key.rsplit(|&b| b == 0xff)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| {
Error::bad_database("Room ID in userroomid_left is invalid unicode.")
Error::bad_database("Room ID in userroomid_invited is invalid unicode.")
})?,
)
.map_err(|_| Error::bad_database("Room ID in userroomid_left is invalid."))?)
.map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?;
let state = serde_json::from_slice(&state)
.map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?;
Ok((room_id, state))
})
}
@ -2096,6 +2349,6 @@ impl Rooms {
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
Ok(self.userroomid_left.get(userroom_id)?.is_some())
Ok(self.userroomid_leftstate.get(userroom_id)?.is_some())
}
}

View file

@ -1,9 +1,10 @@
use crate::Error;
use ruma::{
api::OutgoingResponse,
identifiers::{DeviceId, UserId},
Outgoing,
};
use std::{convert::TryInto, ops::Deref};
use std::ops::Deref;
#[cfg(feature = "conduit_bin")]
use {
@ -145,7 +146,7 @@ where
let mut body = Vec::new();
handle.read_to_end(&mut body).await.unwrap();
let http_request = http_request.body(body.clone()).unwrap();
let http_request = http_request.body(&*body).unwrap();
debug!("{:?}", http_request);
match <T::Incoming as IncomingRequest>::try_from_http_request(http_request) {
Ok(t) => Success(Ruma {
@ -178,9 +179,9 @@ impl<T: Outgoing> Deref for Ruma<T> {
/// This struct converts ruma responses into rocket http responses.
pub type ConduitResult<T> = std::result::Result<RumaResponse<T>, Error>;
pub struct RumaResponse<T: TryInto<http::Response<Vec<u8>>>>(pub T);
pub struct RumaResponse<T: OutgoingResponse>(pub T);
impl<T: TryInto<http::Response<Vec<u8>>>> From<T> for RumaResponse<T> {
impl<T: OutgoingResponse> From<T> for RumaResponse<T> {
fn from(t: T) -> Self {
Self(t)
}
@ -189,12 +190,11 @@ impl<T: TryInto<http::Response<Vec<u8>>>> From<T> for RumaResponse<T> {
#[cfg(feature = "conduit_bin")]
impl<'r, 'o, T> Responder<'r, 'o> for RumaResponse<T>
where
T: Send + TryInto<http::Response<Vec<u8>>>,
T::Error: Send,
T: Send + OutgoingResponse,
'o: 'r,
{
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'o> {
let http_response: Result<http::Response<_>, _> = self.0.try_into();
let http_response: Result<http::Response<_>, _> = self.0.try_into_http_response();
match http_response {
Ok(http_response) => {
let mut response = rocket::response::Response::build();

View file

@ -18,7 +18,7 @@ use ruma::{
query::get_profile_information,
transactions::send_transaction_message,
},
OutgoingRequest,
IncomingResponse, OutgoingRequest, OutgoingResponse,
},
directory::{IncomingFilter, IncomingRoomNetwork},
events::{
@ -173,15 +173,10 @@ where
let status = reqwest_response.status();
let body = reqwest_response
.bytes()
.await
.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}) // TODO: handle timeout
.into_iter()
.collect::<Vec<_>>();
let body = reqwest_response.bytes().await.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
if status != 200 {
info!(
@ -195,7 +190,7 @@ where
);
}
let response = T::IncomingResponse::try_from(
let response = T::IncomingResponse::try_from_http_response(
http_response
.body(body)
.expect("reqwest body is valid http body"),
@ -350,6 +345,7 @@ pub fn get_server_version_route(
.into())
}
// Response type for this endpoint is Json because we need to calculate a signature for the response
#[cfg_attr(feature = "conduit_bin", get("/_matrix/key/v2/server"))]
#[tracing::instrument(skip(db))]
pub fn get_server_keys_route(db: State<'_, Database>) -> Json<String> {
@ -369,7 +365,7 @@ pub fn get_server_keys_route(db: State<'_, Database>) -> Json<String> {
},
);
let mut response = serde_json::from_slice(
http::Response::try_from(get_server_keys::v2::Response {
get_server_keys::v2::Response {
server_key: ServerSigningKeys {
server_name: db.globals.server_name().to_owned(),
verify_keys,
@ -377,7 +373,8 @@ pub fn get_server_keys_route(db: State<'_, Database>) -> Json<String> {
signatures: BTreeMap::new(),
valid_until_ts: SystemTime::now() + Duration::from_secs(60 * 2),
},
})
}
.try_into_http_response()
.unwrap()
.body(),
)
@ -745,7 +742,7 @@ fn handle_incoming_pdu<'a>(
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
debug!("Fetching auth events.");
debug!("Fetching auth events for {}", incoming_pdu.event_id);
fetch_and_handle_events(
db,
origin,
@ -757,7 +754,10 @@ fn handle_incoming_pdu<'a>(
.map_err(|e| e.to_string())?;
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
debug!("Checking auth.");
debug!(
"Auth check for {} based on auth events",
incoming_pdu.event_id
);
// Build map of auth events
let mut auth_events = BTreeMap::new();
@ -1151,7 +1151,7 @@ pub(crate) async fn fetch_and_handle_events(
// a. Look at auth cache
let pdu = match auth_cache.get(id) {
Some(pdu) => {
debug!("Event found in cache");
debug!("Found {} in cache", id);
pdu.clone()
}
// b. Look in the main timeline (pduid_pdu tree)
@ -1159,12 +1159,12 @@ pub(crate) async fn fetch_and_handle_events(
// (get_pdu checks both)
None => match db.rooms.get_pdu(&id)? {
Some(pdu) => {
debug!("Event found in outliers");
debug!("Found {} in outliers", id);
Arc::new(pdu)
}
None => {
// d. Ask origin server over federation
debug!("Fetching event over federation: {:?}", id);
debug!("Fetching {} over federation.", id);
match db
.sending
.send_federation_request(
@ -1175,7 +1175,7 @@ pub(crate) async fn fetch_and_handle_events(
.await
{
Ok(res) => {
debug!("Got event over federation: {:?}", res);
debug!("Got {} over federation: {:?}", id, res);
let (event_id, value) =
crate::pdu::gen_event_id_canonical_json(&res.pdu)?;
let pdu = handle_incoming_pdu(