improvement: federation get_keys and optimize signingkey storage

- get encryption keys over federation
- optimize signing key storage
- rate limit parsing of bad events
- rate limit signature fetching
- dependency bumps
This commit is contained in:
Timo Kösters 2021-05-20 23:46:52 +02:00
parent ae41bc5067
commit 09157b2096
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
18 changed files with 566 additions and 371 deletions

View file

@ -1,7 +1,10 @@
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
use crate::{
client_server::{self, get_keys_helper},
utils, ConduitResult, Database, Error, PduEvent, Result, Ruma,
};
use get_profile_information::v1::ProfileField;
use http::header::{HeaderValue, AUTHORIZATION, HOST};
use log::{debug, error, info, warn};
use log::{debug, error, info, trace, warn};
use regex::Regex;
use rocket::{response::content::Json, State};
use ruma::{
@ -15,6 +18,7 @@ use ruma::{
VerifyKey,
},
event::{get_event, get_missing_events, get_room_state_ids},
keys::get_keys,
membership::{
create_invite,
create_join_event::{self, RoomState},
@ -32,12 +36,14 @@ use ruma::{
create::CreateEventContent,
member::{MemberEventContent, MembershipState},
},
AnyEphemeralRoomEvent, AnyEvent as EduEvent, EventType,
AnyEphemeralRoomEvent, EventType,
},
receipt::ReceiptType,
serde::Raw,
signatures::{CanonicalJsonObject, CanonicalJsonValue},
state_res::{self, Event, EventMap, RoomVersion, StateMap},
uint, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
ServerSigningKeyId, UserId,
};
use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet},
@ -49,8 +55,9 @@ use std::{
pin::Pin,
result::Result as StdResult,
sync::{Arc, RwLock},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};
use tokio::sync::Semaphore;
#[cfg(feature = "conduit_bin")]
use rocket::{get, post, put};
@ -452,7 +459,10 @@ pub fn get_server_keys_route(db: State<'_, Database>) -> Json<String> {
verify_keys,
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: SystemTime::now() + Duration::from_secs(60 * 2),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(60 * 2),
)
.expect("time is valid"),
},
}
.try_into_http_response::<Vec<u8>>()
@ -608,6 +618,7 @@ pub async fn send_transaction_message_route<'a>(
}
};
let start_time = Instant::now();
if let Err(e) = handle_incoming_pdu(
&body.origin,
&event_id,
@ -619,7 +630,17 @@ pub async fn send_transaction_message_route<'a>(
)
.await
{
resolved_map.insert(event_id, Err(e));
resolved_map.insert(event_id.clone(), Err(e));
}
let elapsed = start_time.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
"Handling event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
}
}
@ -653,19 +674,16 @@ pub async fn send_transaction_message_route<'a>(
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(
event_id.to_owned(),
ruma::events::receipt::Receipts {
read: Some(user_receipts),
},
);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let event =
EduEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt(ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
}));
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = AnyEphemeralRoomEvent::Receipt(ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
});
db.rooms.edus.readreceipt_update(
&user_id,
&room_id,
@ -698,6 +716,8 @@ pub async fn send_transaction_message_route<'a>(
}
}
info!("/send/{} done", body.transaction_id);
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
}
@ -794,7 +814,7 @@ pub fn handle_incoming_pdu<'a>(
) {
Err(e) => {
// Drop
warn!("{:?}: {}", value, e);
warn!("Dropping bad event {}: {}", event_id, e);
return Err("Signature verification failed".to_string());
}
Ok(ruma::signatures::Verified::Signatures) => {
@ -821,6 +841,7 @@ pub fn handle_incoming_pdu<'a>(
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// EDIT: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events for {}", incoming_pdu.event_id);
fetch_and_handle_events(
db,
@ -1292,12 +1313,30 @@ pub(crate) fn fetch_and_handle_events<'a>(
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
) -> AsyncRecursiveResult<'a, Vec<Arc<PduEvent>>, Error> {
Box::pin(async move {
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
};
let mut pdus = vec![];
for id in events {
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&id) {
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", id);
continue;
}
}
// a. Look at auth cache
let pdu = match auth_cache.get(id) {
Some(pdu) => {
debug!("Found {} in cache", id);
// We already have the auth chain for events in cache
pdu.clone()
}
@ -1306,7 +1345,7 @@ pub(crate) fn fetch_and_handle_events<'a>(
// (get_pdu checks both)
None => match db.rooms.get_pdu(&id)? {
Some(pdu) => {
debug!("Found {} in db", id);
trace!("Found {} in db", id);
// We need to fetch the auth chain
let _ = fetch_and_handle_events(
db,
@ -1331,7 +1370,7 @@ pub(crate) fn fetch_and_handle_events<'a>(
.await
{
Ok(res) => {
debug!("Got {} over federation: {:?}", id, res);
debug!("Got {} over federation", id);
let (event_id, mut value) =
crate::pdu::gen_event_id_canonical_json(&res.pdu)?;
// This will also fetch the auth chain
@ -1358,12 +1397,14 @@ pub(crate) fn fetch_and_handle_events<'a>(
}
Err(e) => {
warn!("Authentication of event {} failed: {:?}", id, e);
back_off(id.clone());
continue;
}
}
}
Err(_) => {
warn!("Failed to fetch event: {}", id);
back_off(id.clone());
continue;
}
}
@ -1383,10 +1424,67 @@ pub(crate) fn fetch_and_handle_events<'a>(
pub(crate) async fn fetch_signing_keys(
db: &Database,
origin: &ServerName,
signature_ids: Vec<&String>,
signature_ids: Vec<String>,
) -> Result<BTreeMap<String, String>> {
let contains_all_ids =
|keys: &BTreeMap<String, String>| signature_ids.iter().all(|&id| keys.contains_key(id));
|keys: &BTreeMap<String, String>| signature_ids.iter().all(|id| keys.contains_key(id));
let permit = db
.globals
.servername_ratelimiter
.read()
.unwrap()
.get(origin)
.map(|s| Arc::clone(s).acquire_owned());
let permit = match permit {
Some(p) => p,
None => {
let mut write = db.globals.servername_ratelimiter.write().unwrap();
let s = Arc::clone(
write
.entry(origin.to_owned())
.or_insert_with(|| Arc::new(Semaphore::new(1))),
);
s.acquire_owned()
}
}
.await;
let back_off = |id| match db
.globals
.bad_signature_ratelimiter
.write()
.unwrap()
.entry(id)
{
Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
};
if let Some((time, tries)) = db
.globals
.bad_signature_ratelimiter
.read()
.unwrap()
.get(&signature_ids)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {:?}", signature_ids);
return Err(Error::BadServerResponse("bad signature, still backing off"));
}
}
debug!("Loading signing keys for {}", origin);
let mut result = db
.globals
@ -1399,6 +1497,8 @@ pub(crate) async fn fetch_signing_keys(
return Ok(result);
}
debug!("Fetching signing keys for {} over federation", origin);
if let Ok(get_keys_response) = db
.sending
.send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
@ -1436,14 +1536,17 @@ pub(crate) async fn fetch_signing_keys(
&server,
get_remote_server_keys::v2::Request::new(
origin,
SystemTime::now()
.checked_add(Duration::from_secs(3600))
.expect("SystemTime to large"),
MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now()
.checked_add(Duration::from_secs(3600))
.expect("SystemTime to large"),
)
.expect("time is valid"),
),
)
.await
{
debug!("Got signing keys: {:?}", keys);
trace!("Got signing keys: {:?}", keys);
for k in keys.server_keys {
db.globals.add_signing_key(origin, &k)?;
result.extend(
@ -1464,6 +1567,10 @@ pub(crate) async fn fetch_signing_keys(
}
}
drop(permit);
back_off(signature_ids);
warn!("Failed to find public key for server: {}", origin);
Err(Error::BadServerResponse(
"Failed to find public key for server",
@ -1581,7 +1688,7 @@ pub fn get_event_route<'a>(
Ok(get_event::v1::Response {
origin: db.globals.server_name().to_owned(),
origin_server_ts: SystemTime::now(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdu: PduEvent::convert_to_outgoing_federation_event(
db.rooms
.get_pdu_json(&body.event_id)?
@ -2186,6 +2293,34 @@ pub fn get_profile_information_route<'a>(
.into())
}
#[cfg_attr(
feature = "conduit_bin",
post("/_matrix/federation/v1/user/keys/query", data = "<body>")
)]
#[tracing::instrument(skip(db, body))]
pub fn get_keys_route<'a>(
db: State<'a, Database>,
body: Ruma<get_keys::v1::Request>,
) -> ConduitResult<get_keys::v1::Response> {
if !db.globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let result = get_keys_helper(
None,
&body.device_keys,
|u| Some(u.server_name()) == body.sender_servername.as_deref(),
&db,
)?;
Ok(get_keys::v1::Response {
device_keys: result.device_keys,
master_keys: result.master_keys,
self_signing_keys: result.self_signing_keys,
}
.into())
}
pub async fn fetch_required_signing_keys(
event: &BTreeMap<String, CanonicalJsonValue>,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
@ -2208,9 +2343,8 @@ pub async fn fetch_required_signing_keys(
"Invalid signatures content object in server response pdu.",
))?;
let signature_ids = signature_object.keys().collect::<Vec<_>>();
let signature_ids = signature_object.keys().cloned().collect::<Vec<_>>();
debug!("Fetching signing keys for {}", signature_server);
let fetch_res = fetch_signing_keys(
db,
&Box::<ServerName>::try_from(&**signature_server).map_err(|_| {