From c0939c3e9a9d7c193e8092333cd9289499540463 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 11 Oct 2024 18:57:59 +0000 Subject: [PATCH] Refactor server_keys service/interface and related callsites Signed-off-by: Jason Volk Signed-off-by: strawberry --- Cargo.lock | 26 +- Cargo.toml | 2 +- src/admin/debug/commands.rs | 173 +++---- src/admin/debug/mod.rs | 17 +- src/admin/query/globals.rs | 13 +- src/api/client/membership.rs | 252 +++------ src/api/client/mod.rs | 2 +- src/api/router/args.rs | 2 +- src/api/router/auth.rs | 222 ++++---- src/api/server/invite.rs | 11 +- src/api/server/key.rs | 70 +-- src/api/server/send.rs | 22 +- src/api/server/send_join.rs | 24 +- src/api/server/send_leave.rs | 13 +- src/core/config/mod.rs | 28 - src/core/error/mod.rs | 4 + src/core/pdu/mod.rs | 25 +- src/service/globals/data.rs | 118 +---- src/service/globals/mod.rs | 50 +- src/service/rooms/event_handler/mod.rs | 105 ++-- src/service/rooms/timeline/mod.rs | 45 +- src/service/sending/mod.rs | 4 +- src/service/sending/send.rs | 24 +- src/service/server_keys/acquire.rs | 175 +++++++ src/service/server_keys/get.rs | 86 ++++ src/service/server_keys/keypair.rs | 64 +++ src/service/server_keys/mod.rs | 678 +++++-------------------- src/service/server_keys/request.rs | 97 ++++ src/service/server_keys/sign.rs | 18 + src/service/server_keys/verify.rs | 33 ++ 30 files changed, 1025 insertions(+), 1378 deletions(-) create mode 100644 src/service/server_keys/acquire.rs create mode 100644 src/service/server_keys/get.rs create mode 100644 src/service/server_keys/keypair.rs create mode 100644 src/service/server_keys/request.rs create mode 100644 src/service/server_keys/sign.rs create mode 100644 src/service/server_keys/verify.rs diff --git a/Cargo.lock b/Cargo.lock index db1394ce..4ac7cc35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2976,7 +2976,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.10.1" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "assign", "js_int", @@ -2998,7 +2998,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.10.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "js_int", "ruma-common", @@ -3010,7 +3010,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.18.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "as_variant", "assign", @@ -3033,7 +3033,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.13.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "as_variant", "base64 0.22.1", @@ -3063,7 +3063,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.28.1" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "as_variant", "indexmap 2.6.0", @@ -3087,7 +3087,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "bytes", "http", @@ -3105,7 +3105,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.5" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "js_int", "thiserror", @@ -3114,7 +3114,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "js_int", "ruma-common", @@ -3124,7 +3124,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.13.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "cfg-if", "once_cell", @@ -3140,7 +3140,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.9.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "js_int", "ruma-common", @@ -3152,7 +3152,7 @@ dependencies = [ [[package]] name = "ruma-server-util" version = "0.3.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "headers", "http", @@ -3165,7 +3165,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.15.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -3181,7 +3181,7 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.11.0" -source = "git+https://github.com/girlbossceo/ruwuma?rev=3109496a1f91357c89cbb57cf86f179e2cb013e7#3109496a1f91357c89cbb57cf86f179e2cb013e7" +source = "git+https://github.com/girlbossceo/ruwuma?rev=d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73#d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" dependencies = [ "futures-util", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 0a98befd..966c2818 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -315,7 +315,7 @@ version = "0.1.2" [workspace.dependencies.ruma] git = "https://github.com/girlbossceo/ruwuma" #branch = "conduwuit-changes" -rev = "3109496a1f91357c89cbb57cf86f179e2cb013e7" +rev = "d7baeb7e5c3ae28e79ad3fe81c5e8b207a26cc73" features = [ "compat", "rand", diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index fd8c39f7..7fe8addf 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -1,19 +1,17 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, fmt::Write, sync::Arc, time::{Instant, SystemTime}, }; -use api::client::validate_and_add_event_id; -use conduit::{debug, debug_error, err, info, trace, utils, utils::string::EMPTY, warn, Error, PduEvent, Result}; +use conduit::{debug_error, err, info, trace, utils, utils::string::EMPTY, warn, Error, PduEvent, Result}; use futures::StreamExt; use ruma::{ api::{client::error::ErrorKind, federation::event::get_room_state}, events::room::message::RoomMessageEventContent, CanonicalJsonObject, EventId, OwnedRoomOrAliasId, RoomId, RoomVersionId, ServerName, }; -use tokio::sync::RwLock; use tracing_subscriber::EnvFilter; use crate::admin_command; @@ -219,7 +217,7 @@ pub(super) async fn get_remote_pdu( })?; trace!("Attempting to parse PDU: {:?}", &response.pdu); - let parsed_pdu = { + let _parsed_pdu = { let parsed_result = self .services .rooms @@ -241,22 +239,11 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; - let pub_key_map = RwLock::new(BTreeMap::new()); - - debug!("Attempting to fetch homeserver signing keys for {server}"); - self.services - .server_keys - .fetch_required_signing_keys(parsed_pdu.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) - .await - .unwrap_or_else(|e| { - warn!("Could not fetch all signatures for PDUs from {server}: {e:?}"); - }); - info!("Attempting to handle event ID {event_id} as backfilled PDU"); self.services .rooms .timeline - .backfill_pdu(&server, response.pdu, &pub_key_map) + .backfill_pdu(&server, response.pdu) .await?; let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json"); @@ -433,12 +420,10 @@ pub(super) async fn sign_json(&self) -> Result { let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n"); match serde_json::from_str(&string) { Ok(mut value) => { - ruma::signatures::sign_json( - self.services.globals.server_name().as_str(), - self.services.globals.keypair(), - &mut value, - ) - .expect("our request json is what ruma expects"); + self.services + .server_keys + .sign_json(&mut value) + .expect("our request json is what ruma expects"); let json_text = serde_json::to_string_pretty(&value).expect("canonical json is valid json"); Ok(RoomMessageEventContent::text_plain(json_text)) }, @@ -456,27 +441,31 @@ pub(super) async fn verify_json(&self) -> Result { } let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n"); - match serde_json::from_str(&string) { - Ok(value) => { - let pub_key_map = RwLock::new(BTreeMap::new()); - - self.services - .server_keys - .fetch_required_signing_keys([&value], &pub_key_map) - .await?; - - let pub_key_map = pub_key_map.read().await; - match ruma::signatures::verify_json(&pub_key_map, &value) { - Ok(()) => Ok(RoomMessageEventContent::text_plain("Signature correct")), - Err(e) => Ok(RoomMessageEventContent::text_plain(format!( - "Signature verification failed: {e}" - ))), - } + match serde_json::from_str::(&string) { + Ok(value) => match self.services.server_keys.verify_json(&value, None).await { + Ok(()) => Ok(RoomMessageEventContent::text_plain("Signature correct")), + Err(e) => Ok(RoomMessageEventContent::text_plain(format!( + "Signature verification failed: {e}" + ))), }, Err(e) => Ok(RoomMessageEventContent::text_plain(format!("Invalid json: {e}"))), } } +#[admin_command] +pub(super) async fn verify_pdu(&self, event_id: Box) -> Result { + let mut event = self.services.rooms.timeline.get_pdu_json(&event_id).await?; + + event.remove("event_id"); + let msg = match self.services.server_keys.verify_event(&event, None).await { + Ok(ruma::signatures::Verified::Signatures) => "signatures OK, but content hash failed (redaction).", + Ok(ruma::signatures::Verified::All) => "signatures and hashes OK.", + Err(e) => return Err(e), + }; + + Ok(RoomMessageEventContent::notice_plain(msg)) +} + #[admin_command] #[tracing::instrument(skip(self))] pub(super) async fn first_pdu_in_room(&self, room_id: Box) -> Result { @@ -557,7 +546,6 @@ pub(super) async fn force_set_room_state_from_server( let room_version = self.services.rooms.state.get_room_version(&room_id).await?; let mut state: HashMap> = HashMap::new(); - let pub_key_map = RwLock::new(BTreeMap::new()); let remote_state_response = self .services @@ -571,38 +559,28 @@ pub(super) async fn force_set_room_state_from_server( ) .await?; - let mut events = Vec::with_capacity(remote_state_response.pdus.len()); - for pdu in remote_state_response.pdus.clone() { - events.push( - match self - .services - .rooms - .event_handler - .parse_incoming_pdu(&pdu) - .await - { - Ok(t) => t, - Err(e) => { - warn!("Could not parse PDU, ignoring: {e}"); - continue; - }, + match self + .services + .rooms + .event_handler + .parse_incoming_pdu(&pdu) + .await + { + Ok(t) => t, + Err(e) => { + warn!("Could not parse PDU, ignoring: {e}"); + continue; }, - ); + }; } - info!("Fetching required signing keys for all the state events we got"); - self.services - .server_keys - .fetch_required_signing_keys(events.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) - .await?; - info!("Going through room_state response PDUs"); - for result in remote_state_response - .pdus - .iter() - .map(|pdu| validate_and_add_event_id(self.services, pdu, &room_version, &pub_key_map)) - { + for result in remote_state_response.pdus.iter().map(|pdu| { + self.services + .server_keys + .validate_and_add_event_id(pdu, &room_version) + }) { let Ok((event_id, value)) = result.await else { continue; }; @@ -630,11 +608,11 @@ pub(super) async fn force_set_room_state_from_server( } info!("Going through auth_chain response"); - for result in remote_state_response - .auth_chain - .iter() - .map(|pdu| validate_and_add_event_id(self.services, pdu, &room_version, &pub_key_map)) - { + for result in remote_state_response.auth_chain.iter().map(|pdu| { + self.services + .server_keys + .validate_and_add_event_id(pdu, &room_version) + }) { let Ok((event_id, value)) = result.await else { continue; }; @@ -686,10 +664,33 @@ pub(super) async fn force_set_room_state_from_server( #[admin_command] pub(super) async fn get_signing_keys( - &self, server_name: Option>, _cached: bool, + &self, server_name: Option>, notary: Option>, query: bool, ) -> Result { let server_name = server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into()); - let signing_keys = self.services.globals.signing_keys_for(&server_name).await?; + + if let Some(notary) = notary { + let signing_keys = self + .services + .server_keys + .notary_request(¬ary, &server_name) + .await?; + + return Ok(RoomMessageEventContent::notice_markdown(format!( + "```rs\n{signing_keys:#?}\n```" + ))); + } + + let signing_keys = if query { + self.services + .server_keys + .server_request(&server_name) + .await? + } else { + self.services + .server_keys + .signing_keys_for(&server_name) + .await? + }; Ok(RoomMessageEventContent::notice_markdown(format!( "```rs\n{signing_keys:#?}\n```" @@ -697,34 +698,20 @@ pub(super) async fn get_signing_keys( } #[admin_command] -#[allow(dead_code)] -pub(super) async fn get_verify_keys( - &self, server_name: Option>, cached: bool, -) -> Result { +pub(super) async fn get_verify_keys(&self, server_name: Option>) -> Result { let server_name = server_name.unwrap_or_else(|| self.services.server.config.server_name.clone().into()); - let mut out = String::new(); - if cached { - writeln!(out, "| Key ID | VerifyKey |")?; - writeln!(out, "| --- | --- |")?; - for (key_id, verify_key) in self.services.globals.verify_keys_for(&server_name).await? { - writeln!(out, "| {key_id} | {verify_key:?} |")?; - } - - return Ok(RoomMessageEventContent::notice_markdown(out)); - } - - let signature_ids: Vec = Vec::new(); let keys = self .services .server_keys - .fetch_signing_keys_for_server(&server_name, signature_ids) - .await?; + .verify_keys_for(&server_name) + .await; + let mut out = String::new(); writeln!(out, "| Key ID | Public Key |")?; writeln!(out, "| --- | --- |")?; for (key_id, key) in keys { - writeln!(out, "| {key_id} | {key} |")?; + writeln!(out, "| {key_id} | {key:?} |")?; } Ok(RoomMessageEventContent::notice_markdown(out)) diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 20ddbf2f..b74e9c36 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -80,8 +80,16 @@ pub(super) enum DebugCommand { GetSigningKeys { server_name: Option>, + #[arg(long)] + notary: Option>, + #[arg(short, long)] - cached: bool, + query: bool, + }, + + /// - Get and display signing keys from local cache or remote server. + GetVerifyKeys { + server_name: Option>, }, /// - Sends a federation request to the remote server's @@ -119,6 +127,13 @@ pub(super) enum DebugCommand { /// the command. VerifyJson, + /// - Verify PDU + /// + /// This re-verifies a PDU existing in the database found by ID. + VerifyPdu { + event_id: Box, + }, + /// - Prints the very first PDU in the specified room (typically /// m.room.create) FirstPduInRoom { diff --git a/src/admin/query/globals.rs b/src/admin/query/globals.rs index 150a213c..837d34e6 100644 --- a/src/admin/query/globals.rs +++ b/src/admin/query/globals.rs @@ -13,8 +13,6 @@ pub(crate) enum GlobalsCommand { LastCheckForUpdatesId, - LoadKeypair, - /// - This returns an empty `Ok(BTreeMap<..>)` when there are no keys found /// for the server. SigningKeysFor { @@ -54,20 +52,11 @@ pub(super) async fn process(subcommand: GlobalsCommand, context: &Command<'_>) - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, - GlobalsCommand::LoadKeypair => { - let timer = tokio::time::Instant::now(); - let results = services.globals.db.load_keypair(); - let query_time = timer.elapsed(); - - Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" - ))) - }, GlobalsCommand::SigningKeysFor { origin, } => { let timer = tokio::time::Instant::now(); - let results = services.globals.db.verify_keys_for(&origin).await; + let results = services.server_keys.verify_keys_for(&origin).await; let query_time = timer.elapsed(); Ok(RoomMessageEventContent::notice_markdown(format!( diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index a7a5b166..2fa34ff7 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -1,17 +1,16 @@ use std::{ - collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, net::IpAddr, sync::Arc, - time::Instant, }; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduit::{ - debug, debug_error, debug_warn, err, error, info, + debug, debug_info, debug_warn, err, error, info, pdu, pdu::{gen_event_id_canonical_json, PduBuilder}, trace, utils, - utils::{math::continue_exponential_backoff_secs, IterStream, ReadyExt}, + utils::{IterStream, ReadyExt}, warn, Err, Error, PduEvent, Result, }; use futures::{FutureExt, StreamExt}; @@ -36,13 +35,10 @@ use ruma::{ }, StateEventType, }, - serde::Base64, - state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, - OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, + state_res, CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, + RoomVersionId, ServerName, UserId, }; -use serde_json::value::RawValue as RawJsonValue; use service::{appservice::RegistrationInfo, rooms::state::RoomMutexGuard, Services}; -use tokio::sync::RwLock; use crate::{client::full_user_deactivate, Ruma}; @@ -670,20 +666,22 @@ pub async fn join_room_by_id_helper( if local_join { join_room_by_id_helper_local(services, sender_user, room_id, reason, servers, third_party_signed, state_lock) .boxed() - .await + .await?; } else { // Ask a remote server if we are not participating in this room join_room_by_id_helper_remote(services, sender_user, room_id, reason, servers, third_party_signed, state_lock) .boxed() - .await + .await?; } + + Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) } #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote")] async fn join_room_by_id_helper_remote( services: &Services, sender_user: &UserId, room_id: &RoomId, reason: Option, servers: &[OwnedServerName], _third_party_signed: Option<&ThirdPartySigned>, state_lock: RoomMutexGuard, -) -> Result { +) -> Result { info!("Joining {room_id} over federation."); let (make_join_response, remote_server) = make_join_request(services, sender_user, room_id, servers).await?; @@ -751,43 +749,33 @@ async fn join_room_by_id_helper_remote( // In order to create a compatible ref hash (EventID) the `hashes` field needs // to be present - ruma::signatures::hash_and_sign_event( - services.globals.server_name().as_str(), - services.globals.keypair(), - &mut join_event_stub, - &room_version_id, - ) - .expect("event is valid, we just created it"); + services + .server_keys + .hash_and_sign_event(&mut join_event_stub, &room_version_id)?; // Generate event id - let event_id = format!( - "${}", - ruma::signatures::reference_hash(&join_event_stub, &room_version_id) - .expect("ruma can calculate reference hashes") - ); - let event_id = <&EventId>::try_from(event_id.as_str()).expect("ruma's reference hashes are valid event ids"); + let event_id = pdu::gen_event_id(&join_event_stub, &room_version_id)?; // Add event_id back - join_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); + join_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); // It has enough fields to be called a proper event now let mut join_event = join_event_stub; info!("Asking {remote_server} for send_join in room {room_id}"); + let send_join_request = federation::membership::create_join_event::v2::Request { + room_id: room_id.to_owned(), + event_id: event_id.clone(), + omit_members: false, + pdu: services + .sending + .convert_to_outgoing_federation_event(join_event.clone()) + .await, + }; + let send_join_response = services .sending - .send_federation_request( - &remote_server, - federation::membership::create_join_event::v2::Request { - room_id: room_id.to_owned(), - event_id: event_id.to_owned(), - omit_members: false, - pdu: services - .sending - .convert_to_outgoing_federation_event(join_event.clone()) - .await, - }, - ) + .send_federation_request(&remote_server, send_join_request) .await?; info!("send_join finished"); @@ -805,7 +793,7 @@ async fn join_room_by_id_helper_remote( // validate and send signatures _ => { if let Some(signed_raw) = &send_join_response.room_state.event { - info!( + debug_info!( "There is a signed event. This room is probably using restricted joins. Adding signature to \ our event" ); @@ -862,25 +850,25 @@ async fn join_room_by_id_helper_remote( .await; info!("Parsing join event"); - let parsed_join_pdu = PduEvent::from_id_val(event_id, join_event.clone()) + let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone()) .map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?; - let mut state = HashMap::new(); - let pub_key_map = RwLock::new(BTreeMap::new()); - - info!("Fetching join signing keys"); + info!("Acquiring server signing keys for response events"); + let resp_events = &send_join_response.room_state; + let resp_state = &resp_events.state; + let resp_auth = &resp_events.auth_chain; services .server_keys - .fetch_join_signing_keys(&send_join_response, &room_version_id, &pub_key_map) - .await?; + .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter())) + .await; info!("Going through send_join response room_state"); - for result in send_join_response - .room_state - .state - .iter() - .map(|pdu| validate_and_add_event_id(services, pdu, &room_version_id, &pub_key_map)) - { + let mut state = HashMap::new(); + for result in send_join_response.room_state.state.iter().map(|pdu| { + services + .server_keys + .validate_and_add_event_id(pdu, &room_version_id) + }) { let Ok((event_id, value)) = result.await else { continue; }; @@ -902,12 +890,11 @@ async fn join_room_by_id_helper_remote( } info!("Going through send_join response auth_chain"); - for result in send_join_response - .room_state - .auth_chain - .iter() - .map(|pdu| validate_and_add_event_id(services, pdu, &room_version_id, &pub_key_map)) - { + for result in send_join_response.room_state.auth_chain.iter().map(|pdu| { + services + .server_keys + .validate_and_add_event_id(pdu, &room_version_id) + }) { let Ok((event_id, value)) = result.await else { continue; }; @@ -937,29 +924,22 @@ async fn join_room_by_id_helper_remote( return Err!(Request(Forbidden("Auth check failed"))); } - info!("Saving state from send_join"); + info!("Compressing state from send_join"); + let compressed = state + .iter() + .stream() + .then(|(&k, id)| services.rooms.state_compressor.compress_state_event(k, id)) + .collect() + .await; + + debug!("Saving compressed state"); let (statehash_before_join, new, removed) = services .rooms .state_compressor - .save_state( - room_id, - Arc::new( - state - .into_iter() - .stream() - .then(|(k, id)| async move { - services - .rooms - .state_compressor - .compress_state_event(k, &id) - .await - }) - .collect() - .await, - ), - ) + .save_state(room_id, Arc::new(compressed)) .await?; + debug!("Forcing state for new room"); services .rooms .state @@ -1002,14 +982,14 @@ async fn join_room_by_id_helper_remote( .state .set_room_state(room_id, statehash_after_join, &state_lock); - Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) + Ok(()) } #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")] async fn join_room_by_id_helper_local( services: &Services, sender_user: &UserId, room_id: &RoomId, reason: Option, servers: &[OwnedServerName], _third_party_signed: Option<&ThirdPartySigned>, state_lock: RoomMutexGuard, -) -> Result { +) -> Result { debug!("We can join locally"); let join_rules_event_content = services @@ -1089,7 +1069,7 @@ async fn join_room_by_id_helper_local( ) .await { - Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())), + Ok(_) => return Ok(()), Err(e) => e, }; @@ -1159,24 +1139,15 @@ async fn join_room_by_id_helper_local( // In order to create a compatible ref hash (EventID) the `hashes` field needs // to be present - ruma::signatures::hash_and_sign_event( - services.globals.server_name().as_str(), - services.globals.keypair(), - &mut join_event_stub, - &room_version_id, - ) - .expect("event is valid, we just created it"); + services + .server_keys + .hash_and_sign_event(&mut join_event_stub, &room_version_id)?; // Generate event id - let event_id = format!( - "${}", - ruma::signatures::reference_hash(&join_event_stub, &room_version_id) - .expect("ruma can calculate reference hashes") - ); - let event_id = <&EventId>::try_from(event_id.as_str()).expect("ruma's reference hashes are valid event ids"); + let event_id = pdu::gen_event_id(&join_event_stub, &room_version_id)?; // Add event_id back - join_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); + join_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); // It has enough fields to be called a proper event now let join_event = join_event_stub; @@ -1187,7 +1158,7 @@ async fn join_room_by_id_helper_local( &remote_server, federation::membership::create_join_event::v2::Request { room_id: room_id.to_owned(), - event_id: event_id.to_owned(), + event_id: event_id.clone(), omit_members: false, pdu: services .sending @@ -1214,15 +1185,10 @@ async fn join_room_by_id_helper_local( } drop(state_lock); - let pub_key_map = RwLock::new(BTreeMap::new()); - services - .server_keys - .fetch_required_signing_keys([&signed_value], &pub_key_map) - .await?; services .rooms .event_handler - .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true, &pub_key_map) + .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) .await?; } else { return Err(error); @@ -1231,7 +1197,7 @@ async fn join_room_by_id_helper_local( return Err(error); } - Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) + Ok(()) } async fn make_join_request( @@ -1301,62 +1267,6 @@ async fn make_join_request( make_join_response_and_server } -pub async fn validate_and_add_event_id( - services: &Services, pdu: &RawJsonValue, room_version: &RoomVersionId, - pub_key_map: &RwLock>>, -) -> Result<(OwnedEventId, CanonicalJsonObject)> { - let mut value: CanonicalJsonObject = serde_json::from_str(pdu.get()) - .map_err(|e| err!(BadServerResponse(debug_error!("Invalid PDU in server response: {e:?}"))))?; - let event_id = EventId::parse(format!( - "${}", - ruma::signatures::reference_hash(&value, room_version).expect("ruma can calculate reference hashes") - )) - .expect("ruma's reference hashes are valid event ids"); - - let back_off = |id| async { - match services - .globals - .bad_event_ratelimiter - .write() - .expect("locked") - .entry(id) - { - Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - }, - Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1.saturating_add(1)); - }, - } - }; - - if let Some((time, tries)) = services - .globals - .bad_event_ratelimiter - .read() - .expect("locked") - .get(&event_id) - { - // Exponential backoff - const MIN: u64 = 60 * 5; - const MAX: u64 = 60 * 60 * 24; - if continue_exponential_backoff_secs(MIN, MAX, time.elapsed(), *tries) { - return Err!(BadServerResponse("bad event {event_id:?}, still backing off")); - } - } - - if let Err(e) = ruma::signatures::verify_event(&*pub_key_map.read().await, &value, room_version) { - debug_error!("Event {event_id} failed verification {pdu:#?}"); - let e = Err!(BadServerResponse(debug_error!("Event {event_id} failed verification: {e:?}"))); - back_off(event_id).await; - return e; - } - - value.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); - - Ok((event_id, value)) -} - pub(crate) async fn invite_helper( services: &Services, sender_user: &UserId, user_id: &UserId, room_id: &RoomId, reason: Option, is_direct: bool, @@ -1423,8 +1333,6 @@ pub(crate) async fn invite_helper( ) .await?; - let pub_key_map = RwLock::new(BTreeMap::new()); - // We do not add the event_id field to the pdu here because of signature and // hashes checks let Ok((event_id, value)) = gen_event_id_canonical_json(&response.event, &room_version_id) else { @@ -1452,15 +1360,10 @@ pub(crate) async fn invite_helper( ) .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; - services - .server_keys - .fetch_required_signing_keys([&value], &pub_key_map) - .await?; - let pdu_id: Vec = services .rooms .event_handler - .handle_incoming_pdu(&origin, room_id, &event_id, value, true, &pub_key_map) + .handle_incoming_pdu(&origin, room_id, &event_id, value, true) .await? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, @@ -1714,24 +1617,15 @@ async fn remote_leave_room(services: &Services, user_id: &UserId, room_id: &Room // In order to create a compatible ref hash (EventID) the `hashes` field needs // to be present - ruma::signatures::hash_and_sign_event( - services.globals.server_name().as_str(), - services.globals.keypair(), - &mut leave_event_stub, - &room_version_id, - ) - .expect("event is valid, we just created it"); + services + .server_keys + .hash_and_sign_event(&mut leave_event_stub, &room_version_id)?; // Generate event id - let event_id = EventId::parse(format!( - "${}", - ruma::signatures::reference_hash(&leave_event_stub, &room_version_id) - .expect("ruma can calculate reference hashes") - )) - .expect("ruma's reference hashes are valid event ids"); + let event_id = pdu::gen_event_id(&leave_event_stub, &room_version_id)?; // Add event_id back - leave_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); + leave_event_stub.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); // It has enough fields to be called a proper event now let leave_event = leave_event_stub; diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 4b7b64b9..2928be87 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -52,7 +52,7 @@ pub(super) use keys::*; pub(super) use media::*; pub(super) use media_legacy::*; pub(super) use membership::*; -pub use membership::{join_room_by_id_helper, leave_all_rooms, leave_room, validate_and_add_event_id}; +pub use membership::{join_room_by_id_helper, leave_all_rooms, leave_room}; pub(super) use message::*; pub(super) use openid::*; pub(super) use presence::*; diff --git a/src/api/router/args.rs b/src/api/router/args.rs index 7381a55f..746e1cfc 100644 --- a/src/api/router/args.rs +++ b/src/api/router/args.rs @@ -48,7 +48,7 @@ where async fn from_request(request: hyper::Request, services: &State) -> Result { let mut request = request::from(services, request).await?; let mut json_body = serde_json::from_slice::(&request.body).ok(); - let auth = auth::auth(services, &mut request, &json_body, &T::METADATA).await?; + let auth = auth::auth(services, &mut request, json_body.as_ref(), &T::METADATA).await?; Ok(Self { body: make_body::(services, &mut request, &mut json_body, &auth)?, origin: auth.origin, diff --git a/src/api/router/auth.rs b/src/api/router/auth.rs index 8d76b4be..6b90c5ff 100644 --- a/src/api/router/auth.rs +++ b/src/api/router/auth.rs @@ -1,19 +1,20 @@ -use std::collections::BTreeMap; - use axum::RequestPartsExt; use axum_extra::{ headers::{authorization::Bearer, Authorization}, typed_header::TypedHeaderRejectionReason, TypedHeader, }; -use conduit::{debug_info, warn, Err, Error, Result}; +use conduit::{debug_error, err, warn, Err, Error, Result}; use http::uri::PathAndQuery; use ruma::{ api::{client::error::ErrorKind, AuthScheme, Metadata}, server_util::authorization::XMatrix, - CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId, + CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId, +}; +use service::{ + server_keys::{PubKeyMap, PubKeys}, + Services, }; -use service::Services; use super::request::Request; use crate::service::appservice::RegistrationInfo; @@ -33,7 +34,7 @@ pub(super) struct Auth { } pub(super) async fn auth( - services: &Services, request: &mut Request, json_body: &Option, metadata: &Metadata, + services: &Services, request: &mut Request, json_body: Option<&CanonicalJsonValue>, metadata: &Metadata, ) -> Result { let bearer: Option>> = request.parts.extract().await?; let token = match &bearer { @@ -151,27 +152,24 @@ pub(super) async fn auth( } async fn auth_appservice(services: &Services, request: &Request, info: Box) -> Result { - let user_id = request + let user_id_default = + || UserId::parse_with_server_name(info.registration.sender_localpart.as_str(), services.globals.server_name()); + + let Ok(user_id) = request .query .user_id .clone() - .map_or_else( - || { - UserId::parse_with_server_name( - info.registration.sender_localpart.as_str(), - services.globals.server_name(), - ) - }, - UserId::parse, - ) - .map_err(|_| Error::BadRequest(ErrorKind::InvalidUsername, "Username is invalid."))?; + .map_or_else(user_id_default, UserId::parse) + else { + return Err!(Request(InvalidUsername("Username is invalid."))); + }; if !info.is_user_match(&user_id) { - return Err(Error::BadRequest(ErrorKind::Exclusive, "User is not in namespace.")); + return Err!(Request(Exclusive("User is not in namespace."))); } if !services.users.exists(&user_id).await { - return Err(Error::BadRequest(ErrorKind::forbidden(), "User does not exist.")); + return Err!(Request(Forbidden("User does not exist."))); } Ok(Auth { @@ -182,118 +180,104 @@ async fn auth_appservice(services: &Services, request: &Request, info: Box, -) -> Result { +async fn auth_server(services: &Services, request: &mut Request, body: Option<&CanonicalJsonValue>) -> Result { + type Member = (String, CanonicalJsonValue); + type Object = CanonicalJsonObject; + type Value = CanonicalJsonValue; + + let x_matrix = parse_x_matrix(request).await?; + auth_server_checks(services, &x_matrix)?; + + let destination = services.globals.server_name(); + let origin = &x_matrix.origin; + #[allow(clippy::or_fun_call)] + let signature_uri = request + .parts + .uri + .path_and_query() + .unwrap_or(&PathAndQuery::from_static("/")) + .to_string(); + + let signature: [Member; 1] = [(x_matrix.key.to_string(), Value::String(x_matrix.sig.to_string()))]; + let signatures: [Member; 1] = [(origin.to_string(), Value::Object(signature.into()))]; + let authorization: [Member; 5] = [ + ("destination".into(), Value::String(destination.into())), + ("method".into(), Value::String(request.parts.method.to_string())), + ("origin".into(), Value::String(origin.to_string())), + ("signatures".into(), Value::Object(signatures.into())), + ("uri".into(), Value::String(signature_uri)), + ]; + + let mut authorization: Object = authorization.into(); + if let Some(body) = body { + authorization.insert("content".to_owned(), body.clone()); + } + + let key = services + .server_keys + .get_verify_key(origin, &x_matrix.key) + .await + .map_err(|e| err!(Request(Forbidden(warn!("Failed to fetch signing keys: {e}")))))?; + + let keys: PubKeys = [(x_matrix.key.to_string(), key.key)].into(); + let keys: PubKeyMap = [(origin.to_string(), keys)].into(); + if let Err(e) = ruma::signatures::verify_json(&keys, authorization) { + debug_error!("Failed to verify federation request from {origin}: {e}"); + if request.parts.uri.to_string().contains('@') { + warn!( + "Request uri contained '@' character. Make sure your reverse proxy gives Conduit the raw uri (apache: \ + use nocanon)" + ); + } + + return Err!(Request(Forbidden("Failed to verify X-Matrix signatures."))); + } + + Ok(Auth { + origin: origin.to_owned().into(), + sender_user: None, + sender_device: None, + appservice_info: None, + }) +} + +fn auth_server_checks(services: &Services, x_matrix: &XMatrix) -> Result<()> { if !services.server.config.allow_federation { return Err!(Config("allow_federation", "Federation is disabled.")); } - let TypedHeader(Authorization(x_matrix)) = request - .parts - .extract::>>() - .await - .map_err(|e| { - warn!("Missing or invalid Authorization header: {e}"); - - let msg = match e.reason() { - TypedHeaderRejectionReason::Missing => "Missing Authorization header.", - TypedHeaderRejectionReason::Error(_) => "Invalid X-Matrix signatures.", - _ => "Unknown header-related error", - }; - - Error::BadRequest(ErrorKind::forbidden(), msg) - })?; + let destination = services.globals.server_name(); + if x_matrix.destination.as_deref() != Some(destination) { + return Err!(Request(Forbidden("Invalid destination."))); + } let origin = &x_matrix.origin; - if services .server .config .forbidden_remote_server_names .contains(origin) { - debug_info!("Refusing to accept inbound federation request to {origin}"); - return Err!(Request(Forbidden("Federation with this homeserver is not allowed."))); + return Err!(Request(Forbidden(debug_warn!("Federation requests from {origin} denied.")))); } - let signatures = - BTreeMap::from_iter([(x_matrix.key.clone(), CanonicalJsonValue::String(x_matrix.sig.to_string()))]); - let signatures = BTreeMap::from_iter([( - origin.as_str().to_owned(), - CanonicalJsonValue::Object( - signatures - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(), - ), - )]); - - let server_destination = services.globals.server_name().as_str().to_owned(); - if let Some(destination) = x_matrix.destination.as_ref() { - if destination != &server_destination { - return Err(Error::BadRequest(ErrorKind::forbidden(), "Invalid authorization.")); - } - } - - #[allow(clippy::or_fun_call)] - let signature_uri = CanonicalJsonValue::String( - request - .parts - .uri - .path_and_query() - .unwrap_or(&PathAndQuery::from_static("/")) - .to_string(), - ); - - let mut request_map = BTreeMap::from_iter([ - ( - "method".to_owned(), - CanonicalJsonValue::String(request.parts.method.to_string()), - ), - ("uri".to_owned(), signature_uri), - ("origin".to_owned(), CanonicalJsonValue::String(origin.as_str().to_owned())), - ("destination".to_owned(), CanonicalJsonValue::String(server_destination)), - ("signatures".to_owned(), CanonicalJsonValue::Object(signatures)), - ]); - - if let Some(json_body) = json_body { - request_map.insert("content".to_owned(), json_body.clone()); - }; - - let keys_result = services - .server_keys - .fetch_signing_keys_for_server(origin, vec![x_matrix.key.to_string()]) - .await; - - let keys = keys_result.map_err(|e| { - warn!("Failed to fetch signing keys: {e}"); - Error::BadRequest(ErrorKind::forbidden(), "Failed to fetch signing keys.") - })?; - - let pub_key_map = BTreeMap::from_iter([(origin.as_str().to_owned(), keys)]); - - match ruma::signatures::verify_json(&pub_key_map, &request_map) { - Ok(()) => Ok(Auth { - origin: Some(origin.clone()), - sender_user: None, - sender_device: None, - appservice_info: None, - }), - Err(e) => { - warn!("Failed to verify json request from {origin}: {e}\n{request_map:?}"); - - if request.parts.uri.to_string().contains('@') { - warn!( - "Request uri contained '@' character. Make sure your reverse proxy gives Conduit the raw uri \ - (apache: use nocanon)" - ); - } - - Err(Error::BadRequest( - ErrorKind::forbidden(), - "Failed to verify X-Matrix signatures.", - )) - }, - } + Ok(()) +} + +async fn parse_x_matrix(request: &mut Request) -> Result { + let TypedHeader(Authorization(x_matrix)) = request + .parts + .extract::>>() + .await + .map_err(|e| { + let msg = match e.reason() { + TypedHeaderRejectionReason::Missing => "Missing Authorization header.", + TypedHeaderRejectionReason::Error(_) => "Invalid X-Matrix signatures.", + _ => "Unknown header-related error", + }; + + err!(Request(Forbidden(warn!("{msg}: {e}")))) + })?; + + Ok(x_matrix) } diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index f02655e6..a9e404c5 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -85,13 +85,10 @@ pub(crate) async fn create_invite_route( .acl_check(invited_user.server_name(), &body.room_id) .await?; - ruma::signatures::hash_and_sign_event( - services.globals.server_name().as_str(), - services.globals.keypair(), - &mut signed_event, - &body.room_version, - ) - .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?; + services + .server_keys + .hash_and_sign_event(&mut signed_event, &body.room_version) + .map_err(|e| err!(Request(InvalidParam("Failed to sign event: {e}"))))?; // Generate event id let event_id = EventId::parse(format!( diff --git a/src/api/server/key.rs b/src/api/server/key.rs index 686e4424..3913ce43 100644 --- a/src/api/server/key.rs +++ b/src/api/server/key.rs @@ -1,20 +1,16 @@ -use std::{ - collections::BTreeMap, - time::{Duration, SystemTime}, -}; +use std::{collections::BTreeMap, time::Duration}; use axum::{extract::State, response::IntoResponse, Json}; +use conduit::{utils::timepoint_from_now, Result}; use ruma::{ api::{ - federation::discovery::{get_server_keys, ServerSigningKeys, VerifyKey}, + federation::discovery::{get_server_keys, ServerSigningKeys}, OutgoingResponse, }, - serde::{Base64, Raw}, - MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, + serde::Raw, + MilliSecondsSinceUnixEpoch, }; -use crate::Result; - /// # `GET /_matrix/key/v2/server` /// /// Gets the public signing keys of this server. @@ -24,47 +20,33 @@ use crate::Result; // Response type for this endpoint is Json because we need to calculate a // signature for the response pub(crate) async fn get_server_keys_route(State(services): State) -> Result { - let verify_keys: BTreeMap = BTreeMap::from([( - format!("ed25519:{}", services.globals.keypair().version()) - .try_into() - .expect("found invalid server signing keys in DB"), - VerifyKey { - key: Base64::new(services.globals.keypair().public_key().to_vec()), - }, - )]); + let server_name = services.globals.server_name(); + let verify_keys = services.server_keys.verify_keys_for(server_name).await; + let server_key = ServerSigningKeys { + verify_keys, + server_name: server_name.to_owned(), + valid_until_ts: valid_until_ts(), + old_verify_keys: BTreeMap::new(), + signatures: BTreeMap::new(), + }; - let mut response = serde_json::from_slice( - get_server_keys::v2::Response { - server_key: Raw::new(&ServerSigningKeys { - server_name: services.globals.server_name().to_owned(), - verify_keys, - old_verify_keys: BTreeMap::new(), - signatures: BTreeMap::new(), - valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time( - SystemTime::now() - .checked_add(Duration::from_secs(86400 * 7)) - .expect("valid_until_ts should not get this high"), - ) - .expect("time is valid"), - }) - .expect("static conversion, no errors"), - } - .try_into_http_response::>() - .unwrap() - .body(), - ) - .unwrap(); + let response = get_server_keys::v2::Response { + server_key: Raw::new(&server_key)?, + } + .try_into_http_response::>()?; - ruma::signatures::sign_json( - services.globals.server_name().as_str(), - services.globals.keypair(), - &mut response, - ) - .unwrap(); + let mut response = serde_json::from_slice(response.body())?; + services.server_keys.sign_json(&mut response)?; Ok(Json(response)) } +fn valid_until_ts() -> MilliSecondsSinceUnixEpoch { + let dur = Duration::from_secs(86400 * 7); + let timepoint = timepoint_from_now(dur).expect("SystemTime should not overflow"); + MilliSecondsSinceUnixEpoch::from_system_time(timepoint).expect("UInt should not overflow") +} + /// # `GET /_matrix/key/v2/server/{keyId}` /// /// Gets the public signing keys of this server. diff --git a/src/api/server/send.rs b/src/api/server/send.rs index f6916ccf..40f9403b 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -21,7 +21,6 @@ use ruma::{ OwnedEventId, ServerName, }; use serde_json::value::RawValue as RawJsonValue; -use tokio::sync::RwLock; use crate::{ services::Services, @@ -109,22 +108,6 @@ async fn handle_pdus( // and hashes checks } - // We go through all the signatures we see on the PDUs and fetch the - // corresponding signing keys - let pub_key_map = RwLock::new(BTreeMap::new()); - if !parsed_pdus.is_empty() { - services - .server_keys - .fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) - .await - .unwrap_or_else(|e| warn!("Could not fetch all signatures for PDUs from {origin}: {e:?}")); - - debug!( - elapsed = ?txn_start_time.elapsed(), - "Fetched signing keys" - ); - } - let mut resolved_map = BTreeMap::new(); for (event_id, value, room_id) in parsed_pdus { let pdu_start_time = Instant::now(); @@ -134,17 +117,18 @@ async fn handle_pdus( .mutex_federation .lock(&room_id) .await; + resolved_map.insert( event_id.clone(), services .rooms .event_handler - .handle_incoming_pdu(origin, &room_id, &event_id, value, true, &pub_key_map) + .handle_incoming_pdu(origin, &room_id, &event_id, value, true) .await .map(|_| ()), ); - drop(mutex_lock); + drop(mutex_lock); debug!( pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index f9257690..d888d75e 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,6 +1,6 @@ #![allow(deprecated)] -use std::{borrow::Borrow, collections::BTreeMap}; +use std::borrow::Borrow; use axum::extract::State; use conduit::{err, pdu::gen_event_id_canonical_json, utils::IterStream, warn, Error, Result}; @@ -15,7 +15,6 @@ use ruma::{ }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use service::Services; -use tokio::sync::RwLock; use crate::Ruma; @@ -43,9 +42,6 @@ async fn create_join_event( .await .map_err(|_| err!(Request(NotFound("Event state not found."))))?; - let pub_key_map = RwLock::new(BTreeMap::new()); - // let mut auth_cache = EventMap::new(); - // We do not add the event_id field to the pdu here because of signature and // hashes checks let room_version_id = services.rooms.state.get_room_version(room_id).await?; @@ -137,20 +133,12 @@ async fn create_join_event( .await .unwrap_or_default() { - ruma::signatures::hash_and_sign_event( - services.globals.server_name().as_str(), - services.globals.keypair(), - &mut value, - &room_version_id, - ) - .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?; + services + .server_keys + .hash_and_sign_event(&mut value, &room_version_id) + .map_err(|e| err!(Request(InvalidParam("Failed to sign event: {e}"))))?; } - services - .server_keys - .fetch_required_signing_keys([&value], &pub_key_map) - .await?; - let origin: OwnedServerName = serde_json::from_value( serde_json::to_value( value @@ -171,7 +159,7 @@ async fn create_join_event( let pdu_id: Vec = services .rooms .event_handler - .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true, &pub_key_map) + .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true) .await? .ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Could not accept as timeline event."))?; diff --git a/src/api/server/send_leave.rs b/src/api/server/send_leave.rs index 81f41af0..0530f9dd 100644 --- a/src/api/server/send_leave.rs +++ b/src/api/server/send_leave.rs @@ -1,7 +1,5 @@ #![allow(deprecated)] -use std::collections::BTreeMap; - use axum::extract::State; use conduit::{utils::ReadyExt, Error, Result}; use ruma::{ @@ -13,7 +11,6 @@ use ruma::{ OwnedServerName, OwnedUserId, RoomId, ServerName, }; use serde_json::value::RawValue as RawJsonValue; -use tokio::sync::RwLock; use crate::{ service::{pdu::gen_event_id_canonical_json, Services}, @@ -60,8 +57,6 @@ async fn create_leave_event( .acl_check(origin, room_id) .await?; - let pub_key_map = RwLock::new(BTreeMap::new()); - // We do not add the event_id field to the pdu here because of signature and // hashes checks let room_version_id = services.rooms.state.get_room_version(room_id).await?; @@ -154,21 +149,17 @@ async fn create_leave_event( ) .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "origin is not a server name."))?; - services - .server_keys - .fetch_required_signing_keys([&value], &pub_key_map) - .await?; - let mutex_lock = services .rooms .event_handler .mutex_federation .lock(room_id) .await; + let pdu_id: Vec = services .rooms .event_handler - .handle_incoming_pdu(&origin, room_id, &event_id, value, true, &pub_key_map) + .handle_incoming_pdu(&origin, room_id, &event_id, value, true) .await? .ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Could not accept as timeline event."))?; diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index b5e07da2..114c6e76 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -490,30 +490,6 @@ pub struct Config { #[serde(default = "default_trusted_servers")] pub trusted_servers: Vec, - /// Option to control whether conduwuit will query your list of trusted - /// notary key servers (`trusted_servers`) for remote homeserver signing - /// keys it doesn't know *first*, or query the individual servers first - /// before falling back to the trusted key servers. - /// - /// The former/default behaviour makes federated/remote rooms joins - /// generally faster because we're querying a single (or list of) server - /// that we know works, is reasonably fast, and is reliable for just about - /// all the homeserver signing keys in the room. Querying individual - /// servers may take longer depending on the general infrastructure of - /// everyone in there, how many dead servers there are, etc. - /// - /// However, this does create an increased reliance on one single or - /// multiple large entities as `trusted_servers` should generally - /// contain long-term and large servers who know a very large number of - /// homeservers. - /// - /// If you don't know what any of this means, leave this and - /// `trusted_servers` alone to their defaults. - /// - /// Defaults to true as this is the fastest option for federation. - #[serde(default = "true_fn")] - pub query_trusted_key_servers_first: bool, - /// max log level for conduwuit. allows debug, info, warn, or error /// see also: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives /// **Caveat**: @@ -1518,10 +1494,6 @@ impl fmt::Display for Config { .map(|server| server.host()) .join(", "), ); - line( - "Query Trusted Key Servers First", - &self.query_trusted_key_servers_first.to_string(), - ); line("OpenID Token TTL", &self.openid_token_ttl.to_string()); line( "TURN username", diff --git a/src/core/error/mod.rs b/src/core/error/mod.rs index 39fa4340..42250a0c 100644 --- a/src/core/error/mod.rs +++ b/src/core/error/mod.rs @@ -85,6 +85,8 @@ pub enum Error { BadRequest(ruma::api::client::error::ErrorKind, &'static str), //TODO: remove #[error("{0}")] BadServerResponse(Cow<'static, str>), + #[error(transparent)] + CanonicalJson(#[from] ruma::CanonicalJsonError), #[error("There was a problem with the '{0}' directive in your configuration: {1}")] Config(&'static str, Cow<'static, str>), #[error("{0}")] @@ -110,6 +112,8 @@ pub enum Error { #[error(transparent)] Ruma(#[from] ruma::api::client::error::Error), #[error(transparent)] + Signatures(#[from] ruma::signatures::Error), + #[error(transparent)] StateRes(#[from] ruma::state_res::Error), #[error("uiaa")] Uiaa(ruma::api::client::uiaa::UiaaInfo), diff --git a/src/core/pdu/mod.rs b/src/core/pdu/mod.rs index 5f50fe5b..274b96bd 100644 --- a/src/core/pdu/mod.rs +++ b/src/core/pdu/mod.rs @@ -408,10 +408,13 @@ impl PduEvent { serde_json::from_value(json).expect("Raw::from_value always works") } - pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result { - json.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); + pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result { + json.insert("event_id".into(), CanonicalJsonValue::String(event_id.into())); - serde_json::from_value(serde_json::to_value(json).expect("valid JSON")) + let value = serde_json::to_value(json)?; + let pdu = serde_json::from_value(value)?; + + Ok(pdu) } } @@ -462,13 +465,15 @@ pub fn gen_event_id_canonical_json( let value: CanonicalJsonObject = serde_json::from_str(pdu.get()) .map_err(|e| err!(BadServerResponse(warn!("Error parsing incoming event: {e:?}"))))?; - let event_id = format!( - "${}", - // Anything higher than version3 behaves the same - ruma::signatures::reference_hash(&value, room_version_id).expect("ruma can calculate reference hashes") - ) - .try_into() - .expect("ruma's reference hashes are valid event ids"); + let event_id = gen_event_id(&value, room_version_id)?; Ok((event_id, value)) } + +/// Generates a correct eventId for the incoming pdu. +pub fn gen_event_id(value: &CanonicalJsonObject, room_version_id: &RoomVersionId) -> Result { + let reference_hash = ruma::signatures::reference_hash(value, room_version_id)?; + let event_id: OwnedEventId = format!("${reference_hash}").try_into()?; + + Ok(event_id) +} diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 3638cb56..eea7597a 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,16 +1,9 @@ -use std::{ - collections::BTreeMap, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; -use conduit::{trace, utils, utils::rand, Error, Result, Server}; -use database::{Database, Deserialized, Json, Map}; +use conduit::{trace, utils, Result, Server}; +use database::{Database, Deserialized, Map}; use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; -use ruma::{ - api::federation::discovery::{ServerSigningKeys, VerifyKey}, - signatures::Ed25519KeyPair, - DeviceId, MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, UserId, -}; +use ruma::{DeviceId, UserId}; use crate::{rooms, Dep}; @@ -25,7 +18,6 @@ pub struct Data { pduid_pdu: Arc, keychangeid_userid: Arc, roomusertype_roomuserdataid: Arc, - server_signingkeys: Arc, readreceiptid_readreceipt: Arc, userid_lastonetimekeyupdate: Arc, counter: RwLock, @@ -56,7 +48,6 @@ impl Data { pduid_pdu: db["pduid_pdu"].clone(), keychangeid_userid: db["keychangeid_userid"].clone(), roomusertype_roomuserdataid: db["roomusertype_roomuserdataid"].clone(), - server_signingkeys: db["server_signingkeys"].clone(), readreceiptid_readreceipt: db["readreceiptid_readreceipt"].clone(), userid_lastonetimekeyupdate: db["userid_lastonetimekeyupdate"].clone(), counter: RwLock::new(Self::stored_count(&db["global"]).expect("initialized global counter")), @@ -205,107 +196,6 @@ impl Data { Ok(()) } - pub fn load_keypair(&self) -> Result { - let generate = |_| { - let keypair = Ed25519KeyPair::generate().expect("Ed25519KeyPair generation always works (?)"); - - let mut value = rand::string(8).as_bytes().to_vec(); - value.push(0xFF); - value.extend_from_slice(&keypair); - - self.global.insert(b"keypair", &value); - value - }; - - let keypair_bytes: Vec = self - .global - .get_blocking(b"keypair") - .map_or_else(generate, Into::into); - - let mut parts = keypair_bytes.splitn(2, |&b| b == 0xFF); - utils::string_from_bytes( - // 1. version - parts - .next() - .expect("splitn always returns at least one element"), - ) - .map_err(|_| Error::bad_database("Invalid version bytes in keypair.")) - .and_then(|version| { - // 2. key - parts - .next() - .ok_or_else(|| Error::bad_database("Invalid keypair format in database.")) - .map(|key| (version, key)) - }) - .and_then(|(version, key)| { - Ed25519KeyPair::from_der(key, version) - .map_err(|_| Error::bad_database("Private or public keys are invalid.")) - }) - } - - #[inline] - pub fn remove_keypair(&self) -> Result<()> { - self.global.remove(b"keypair"); - Ok(()) - } - - /// TODO: the key valid until timestamp (`valid_until_ts`) is only honored - /// in room version > 4 - /// - /// Remove the outdated keys and insert the new ones. - /// - /// This doesn't actually check that the keys provided are newer than the - /// old set. - pub async fn add_signing_key( - &self, origin: &ServerName, new_keys: ServerSigningKeys, - ) -> BTreeMap { - // (timo) Not atomic, but this is not critical - let mut keys: ServerSigningKeys = self - .server_signingkeys - .get(origin) - .await - .deserialized() - .unwrap_or_else(|_| { - // Just insert "now", it doesn't matter - ServerSigningKeys::new(origin.to_owned(), MilliSecondsSinceUnixEpoch::now()) - }); - - keys.verify_keys.extend(new_keys.verify_keys); - keys.old_verify_keys.extend(new_keys.old_verify_keys); - - self.server_signingkeys.raw_put(origin, Json(&keys)); - - let mut tree = keys.verify_keys; - tree.extend( - keys.old_verify_keys - .into_iter() - .map(|old| (old.0, VerifyKey::new(old.1.key))), - ); - - tree - } - - /// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found - /// for the server. - pub async fn verify_keys_for(&self, origin: &ServerName) -> Result> { - self.signing_keys_for(origin).await.map_or_else( - |_| Ok(BTreeMap::new()), - |keys: ServerSigningKeys| { - let mut tree = keys.verify_keys; - tree.extend( - keys.old_verify_keys - .into_iter() - .map(|old| (old.0, VerifyKey::new(old.1.key))), - ); - Ok(tree) - }, - ) - } - - pub async fn signing_keys_for(&self, origin: &ServerName) -> Result { - self.server_signingkeys.get(origin).await.deserialized() - } - pub async fn database_version(&self) -> u64 { self.global .get(b"version") diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index fb970f07..7680007d 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -2,7 +2,7 @@ mod data; pub(super) mod migrations; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, fmt::Write, sync::{Arc, RwLock}, time::Instant, @@ -13,13 +13,8 @@ use data::Data; use ipaddress::IPAddress; use regex::RegexSet; use ruma::{ - api::{ - client::discovery::discover_support::ContactRole, - federation::discovery::{ServerSigningKeys, VerifyKey}, - }, - serde::Base64, - DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomAliasId, - RoomVersionId, ServerName, UserId, + api::client::discovery::discover_support::ContactRole, DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedServerName, + OwnedUserId, RoomAliasId, RoomVersionId, ServerName, UserId, }; use tokio::sync::Mutex; use url::Url; @@ -31,7 +26,6 @@ pub struct Service { pub config: Config, pub cidr_range_denylist: Vec, - keypair: Arc, jwt_decoding_key: Option, pub stable_room_versions: Vec, pub unstable_room_versions: Vec, @@ -50,16 +44,6 @@ impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let db = Data::new(&args); let config = &args.server.config; - let keypair = db.load_keypair(); - - let keypair = match keypair { - Ok(k) => k, - Err(e) => { - error!("Keypair invalid. Deleting..."); - db.remove_keypair()?; - return Err(e); - }, - }; let jwt_decoding_key = config .jwt_secret @@ -115,7 +99,6 @@ impl crate::Service for Service { db, config: config.clone(), cidr_range_denylist, - keypair: Arc::new(keypair), jwt_decoding_key, stable_room_versions, unstable_room_versions, @@ -175,9 +158,6 @@ impl crate::Service for Service { } impl Service { - /// Returns this server's keypair. - pub fn keypair(&self) -> &ruma::signatures::Ed25519KeyPair { &self.keypair } - #[inline] pub fn next_count(&self) -> Result { self.db.next_count() } @@ -224,8 +204,6 @@ impl Service { pub fn trusted_servers(&self) -> &[OwnedServerName] { &self.config.trusted_servers } - pub fn query_trusted_key_servers_first(&self) -> bool { self.config.query_trusted_key_servers_first } - pub fn jwt_decoding_key(&self) -> Option<&jsonwebtoken::DecodingKey> { self.jwt_decoding_key.as_ref() } pub fn turn_password(&self) -> &String { &self.config.turn_password } @@ -302,28 +280,6 @@ impl Service { } } - /// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found - /// for the server. - pub async fn verify_keys_for(&self, origin: &ServerName) -> Result> { - let mut keys = self.db.verify_keys_for(origin).await?; - if origin == self.server_name() { - keys.insert( - format!("ed25519:{}", self.keypair().version()) - .try_into() - .expect("found invalid server signing keys in DB"), - VerifyKey { - key: Base64::new(self.keypair.public_key().to_vec()), - }, - ); - } - - Ok(keys) - } - - pub async fn signing_keys_for(&self, origin: &ServerName) -> Result { - self.db.signing_keys_for(origin).await - } - pub fn well_known_client(&self) -> &Option { &self.config.well_known.client } pub fn well_known_server(&self) -> &Option { &self.config.well_known.server } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index f8042b67..8448404b 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -28,12 +28,10 @@ use ruma::{ StateEventType, TimelineEventType, }, int, - serde::Base64, state_res::{self, EventTypeExt, RoomVersion, StateMap}, - uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, - ServerName, UserId, + uint, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, + RoomId, RoomVersionId, ServerName, UserId, }; -use tokio::sync::RwLock; use super::state_compressor::CompressedStateEvent; use crate::{globals, rooms, sending, server_keys, Dep}; @@ -129,11 +127,10 @@ impl Service { /// 13. Use state resolution to find new room state /// 14. Check if the event passes auth based on the "current state" of the /// room, if not soft fail it - #[tracing::instrument(skip(self, origin, value, is_timeline_event, pub_key_map), name = "pdu")] + #[tracing::instrument(skip(self, origin, value, is_timeline_event), name = "pdu")] pub async fn handle_incoming_pdu<'a>( &self, origin: &'a ServerName, room_id: &'a RoomId, event_id: &'a EventId, value: BTreeMap, is_timeline_event: bool, - pub_key_map: &'a RwLock>>, ) -> Result>> { // 1. Skip the PDU if we already have it as a timeline event if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await { @@ -177,7 +174,7 @@ impl Service { let first_pdu_in_room = self.services.timeline.first_pdu_in_room(room_id).await?; let (incoming_pdu, val) = self - .handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false, pub_key_map) + .handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false) .boxed() .await?; @@ -200,7 +197,6 @@ impl Service { &create_event, room_id, &room_version_id, - pub_key_map, incoming_pdu.prev_events.clone(), ) .await?; @@ -212,7 +208,6 @@ impl Service { origin, event_id, room_id, - pub_key_map, &mut eventid_info, &create_event, &first_pdu_in_room, @@ -250,7 +245,7 @@ impl Service { .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); let r = self - .upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id, pub_key_map) + .upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id) .await; self.federation_handletime @@ -264,12 +259,11 @@ impl Service { #[allow(clippy::type_complexity)] #[allow(clippy::too_many_arguments)] #[tracing::instrument( - skip(self, origin, event_id, room_id, pub_key_map, eventid_info, create_event, first_pdu_in_room), + skip(self, origin, event_id, room_id, eventid_info, create_event, first_pdu_in_room), name = "prev" )] pub async fn handle_prev_pdu<'a>( &self, origin: &'a ServerName, event_id: &'a EventId, room_id: &'a RoomId, - pub_key_map: &'a RwLock>>, eventid_info: &mut HashMap, (Arc, BTreeMap)>, create_event: &Arc, first_pdu_in_room: &Arc, prev_id: &EventId, ) -> Result<()> { @@ -318,7 +312,7 @@ impl Service { .expect("locked") .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); - self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id, pub_key_map) + self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id) .await?; self.federation_handletime @@ -338,8 +332,7 @@ impl Service { #[allow(clippy::too_many_arguments)] async fn handle_outlier_pdu<'a>( &self, origin: &'a ServerName, create_event: &'a PduEvent, event_id: &'a EventId, room_id: &'a RoomId, - mut value: BTreeMap, auth_events_known: bool, - pub_key_map: &'a RwLock>>, + mut value: CanonicalJsonObject, auth_events_known: bool, ) -> Result<(Arc, BTreeMap)> { // 1. Remove unsigned field value.remove("unsigned"); @@ -349,14 +342,13 @@ impl Service { // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match let room_version_id = Self::get_room_version_id(create_event)?; - - let guard = pub_key_map.read().await; - let mut val = match ruma::signatures::verify_event(&guard, &value, &room_version_id) { - Err(e) => { - // Drop - warn!("Dropping bad event {event_id}: {e}"); - return Err!(Request(InvalidParam("Signature verification failed"))); - }, + let mut val = match self + .services + .server_keys + .verify_event(&value, Some(&room_version_id)) + .await + { + Ok(ruma::signatures::Verified::All) => value, Ok(ruma::signatures::Verified::Signatures) => { // Redact debug_info!("Calculated hash does not match (redaction): {event_id}"); @@ -371,11 +363,13 @@ impl Service { obj }, - Ok(ruma::signatures::Verified::All) => value, + Err(e) => { + return Err!(Request(InvalidParam(debug_error!( + "Signature verification failed for {event_id}: {e}" + )))) + }, }; - drop(guard); - // Now that we have checked the signature and hashes we can add the eventID and // convert to our PduEvent type val.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); @@ -404,7 +398,6 @@ impl Service { create_event, room_id, &room_version_id, - pub_key_map, ), ) .await; @@ -487,7 +480,7 @@ impl Service { pub async fn upgrade_outlier_to_timeline_pdu( &self, incoming_pdu: Arc, val: BTreeMap, create_event: &PduEvent, - origin: &ServerName, room_id: &RoomId, pub_key_map: &RwLock>>, + origin: &ServerName, room_id: &RoomId, ) -> Result>> { // Skip the PDU if we already have it as a timeline event if let Ok(pduid) = self @@ -526,14 +519,7 @@ impl Service { if state_at_incoming_event.is_none() { state_at_incoming_event = self - .fetch_state( - origin, - create_event, - room_id, - &room_version_id, - pub_key_map, - &incoming_pdu.event_id, - ) + .fetch_state(origin, create_event, room_id, &room_version_id, &incoming_pdu.event_id) .await?; } @@ -1021,10 +1007,10 @@ impl Service { /// Call /state_ids to find out what the state at this pdu is. We trust the /// server's response to some extend (sic), but we still do a lot of checks /// on the events - #[tracing::instrument(skip(self, pub_key_map, create_event, room_version_id))] + #[tracing::instrument(skip(self, create_event, room_version_id))] async fn fetch_state( &self, origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, room_version_id: &RoomVersionId, - pub_key_map: &RwLock>>, event_id: &EventId, + event_id: &EventId, ) -> Result>>> { debug!("Fetching state ids"); let res = self @@ -1048,7 +1034,7 @@ impl Service { .collect::>(); let state_vec = self - .fetch_and_handle_outliers(origin, &collect, create_event, room_id, room_version_id, pub_key_map) + .fetch_and_handle_outliers(origin, &collect, create_event, room_id, room_version_id) .boxed() .await; @@ -1102,7 +1088,7 @@ impl Service { /// d. TODO: Ask other servers over federation? pub async fn fetch_and_handle_outliers<'a>( &self, origin: &'a ServerName, events: &'a [Arc], create_event: &'a PduEvent, room_id: &'a RoomId, - room_version_id: &'a RoomVersionId, pub_key_map: &'a RwLock>>, + room_version_id: &'a RoomVersionId, ) -> Vec<(Arc, Option>)> { let back_off = |id| match self .services @@ -1222,22 +1208,6 @@ impl Service { events_with_auth_events.push((id, None, events_in_reverse_order)); } - // We go through all the signatures we see on the PDUs and their unresolved - // dependencies and fetch the corresponding signing keys - self.services - .server_keys - .fetch_required_signing_keys( - events_with_auth_events - .iter() - .flat_map(|(_id, _local_pdu, events)| events) - .map(|(_event_id, event)| event), - pub_key_map, - ) - .await - .unwrap_or_else(|e| { - warn!("Could not fetch all signatures for PDUs from {origin}: {e:?}"); - }); - let mut pdus = Vec::with_capacity(events_with_auth_events.len()); for (id, local_pdu, events_in_reverse_order) in events_with_auth_events { // a. Look in the main timeline (pduid_pdu tree) @@ -1266,16 +1236,8 @@ impl Service { } } - match Box::pin(self.handle_outlier_pdu( - origin, - create_event, - &next_id, - room_id, - value.clone(), - true, - pub_key_map, - )) - .await + match Box::pin(self.handle_outlier_pdu(origin, create_event, &next_id, room_id, value.clone(), true)) + .await { Ok((pdu, json)) => { if next_id == *id { @@ -1296,7 +1258,7 @@ impl Service { #[tracing::instrument(skip_all)] async fn fetch_prev( &self, origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, room_version_id: &RoomVersionId, - pub_key_map: &RwLock>>, initial_set: Vec>, + initial_set: Vec>, ) -> Result<( Vec>, HashMap, (Arc, BTreeMap)>, @@ -1311,14 +1273,7 @@ impl Service { while let Some(prev_event_id) = todo_outlier_stack.pop() { if let Some((pdu, mut json_opt)) = self - .fetch_and_handle_outliers( - origin, - &[prev_event_id.clone()], - create_event, - room_id, - room_version_id, - pub_key_map, - ) + .fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id, room_version_id) .boxed() .await .pop() diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 21e5395d..902e50ff 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -16,7 +16,7 @@ use conduit::{ }; use futures::{future, future::ready, Future, FutureExt, Stream, StreamExt, TryStreamExt}; use ruma::{ - api::{client::error::ErrorKind, federation}, + api::federation, canonical_json::to_canonical_value, events::{ push_rules::PushRulesEvent, @@ -30,14 +30,12 @@ use ruma::{ GlobalAccountDataEventType, StateEventType, TimelineEventType, }, push::{Action, Ruleset, Tweak}, - serde::Base64, state_res::{self, Event, RoomVersion}, uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, RoomVersionId, ServerName, UserId, }; use serde::Deserialize; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::sync::RwLock; use self::data::Data; pub use self::data::PdusIterItem; @@ -784,21 +782,15 @@ impl Service { to_canonical_value(self.services.globals.server_name()).expect("server name is a valid CanonicalJsonValue"), ); - match ruma::signatures::hash_and_sign_event( - self.services.globals.server_name().as_str(), - self.services.globals.keypair(), - &mut pdu_json, - &room_version_id, - ) { - Ok(()) => {}, - Err(e) => { - return match e { - ruma::signatures::Error::PduSize => { - Err(Error::BadRequest(ErrorKind::TooLarge, "Message is too long")) - }, - _ => Err(Error::BadRequest(ErrorKind::Unknown, "Signing event failed")), - } - }, + if let Err(e) = self + .services + .server_keys + .hash_and_sign_event(&mut pdu_json, &room_version_id) + { + return match e { + Error::Signatures(ruma::signatures::Error::PduSize) => Err!(Request(TooLarge("Message is too long"))), + _ => Err!(Request(Unknown("Signing event failed"))), + }; } // Generate event id @@ -1106,9 +1098,8 @@ impl Service { .await; match response { Ok(response) => { - let pub_key_map = RwLock::new(BTreeMap::new()); for pdu in response.pdus { - if let Err(e) = self.backfill_pdu(backfill_server, pdu, &pub_key_map).await { + if let Err(e) = self.backfill_pdu(backfill_server, pdu).await { warn!("Failed to add backfilled pdu in room {room_id}: {e}"); } } @@ -1124,11 +1115,8 @@ impl Service { Ok(()) } - #[tracing::instrument(skip(self, pdu, pub_key_map))] - pub async fn backfill_pdu( - &self, origin: &ServerName, pdu: Box, - pub_key_map: &RwLock>>, - ) -> Result<()> { + #[tracing::instrument(skip(self, pdu))] + pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { let (event_id, value, room_id) = self.services.event_handler.parse_incoming_pdu(&pdu).await?; // Lock so we cannot backfill the same pdu twice at the same time @@ -1146,14 +1134,9 @@ impl Service { return Ok(()); } - self.services - .server_keys - .fetch_required_signing_keys([&value], pub_key_map) - .await?; - self.services .event_handler - .handle_incoming_pdu(origin, &room_id, &event_id, value, false, pub_key_map) + .handle_incoming_pdu(origin, &room_id, &event_id, value, false) .await?; let value = self diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index e3582f2e..5970c383 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -17,7 +17,7 @@ use tokio::sync::Mutex; use self::data::Data; pub use self::dest::Destination; -use crate::{account_data, client, globals, presence, pusher, resolver, rooms, users, Dep}; +use crate::{account_data, client, globals, presence, pusher, resolver, rooms, server_keys, users, Dep}; pub struct Service { server: Arc, @@ -41,6 +41,7 @@ struct Services { account_data: Dep, appservice: Dep, pusher: Dep, + server_keys: Dep, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -78,6 +79,7 @@ impl crate::Service for Service { account_data: args.depend::("account_data"), appservice: args.depend::("appservice"), pusher: args.depend::("pusher"), + server_keys: args.depend::("server_keys"), }, db: Data::new(&args), sender, diff --git a/src/service/sending/send.rs b/src/service/sending/send.rs index 9a8f408b..73b6a468 100644 --- a/src/service/sending/send.rs +++ b/src/service/sending/send.rs @@ -1,8 +1,8 @@ use std::{fmt::Debug, mem}; use conduit::{ - debug, debug_error, debug_info, debug_warn, err, error::inspect_debug_log, trace, utils::string::EMPTY, Err, Error, - Result, + debug, debug_error, debug_info, debug_warn, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, + Err, Error, Result, }; use http::{header::AUTHORIZATION, HeaderValue}; use ipaddress::IPAddress; @@ -18,7 +18,7 @@ use ruma::{ }; use crate::{ - globals, resolver, + resolver, resolver::{actual::ActualDest, cache::CachedDest}, }; @@ -75,7 +75,7 @@ impl super::Service { .try_into_http_request::>(&actual.string, SATIR, &VERSIONS) .map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?; - sign_request::(&self.services.globals, dest, &mut http_request); + self.sign_request::(dest, &mut http_request); let request = Request::try_from(http_request)?; self.validate_url(request.url())?; @@ -178,7 +178,8 @@ where Err(e.into()) } -fn sign_request(globals: &globals::Service, dest: &ServerName, http_request: &mut http::Request>) +#[implement(super::Service)] +fn sign_request(&self, dest: &ServerName, http_request: &mut http::Request>) where T: OutgoingRequest + Debug + Send, { @@ -200,11 +201,13 @@ where .to_string() .into(), ); - req_map.insert("origin".to_owned(), globals.server_name().as_str().into()); + req_map.insert("origin".to_owned(), self.services.globals.server_name().to_string().into()); req_map.insert("destination".to_owned(), dest.as_str().into()); let mut req_json = serde_json::from_value(req_map.into()).expect("valid JSON is valid BTreeMap"); - ruma::signatures::sign_json(globals.server_name().as_str(), globals.keypair(), &mut req_json) + self.services + .server_keys + .sign_json(&mut req_json) .expect("our request json is what ruma expects"); let req_json: serde_json::Map = @@ -231,7 +234,12 @@ where http_request.headers_mut().insert( AUTHORIZATION, - HeaderValue::from(&XMatrix::new(globals.config.server_name.clone(), dest.to_owned(), key, sig)), + HeaderValue::from(&XMatrix::new( + self.services.globals.server_name().to_owned(), + dest.to_owned(), + key, + sig, + )), ); } } diff --git a/src/service/server_keys/acquire.rs b/src/service/server_keys/acquire.rs new file mode 100644 index 00000000..2b170040 --- /dev/null +++ b/src/service/server_keys/acquire.rs @@ -0,0 +1,175 @@ +use std::{ + borrow::Borrow, + collections::{BTreeMap, BTreeSet}, +}; + +use conduit::{debug, debug_warn, error, implement, result::FlatOk, warn}; +use futures::{stream::FuturesUnordered, StreamExt}; +use ruma::{ + api::federation::discovery::ServerSigningKeys, serde::Raw, CanonicalJsonObject, OwnedServerName, + OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, +}; +use serde_json::value::RawValue as RawJsonValue; + +use super::key_exists; + +type Batch = BTreeMap>; + +#[implement(super::Service)] +pub async fn acquire_events_pubkeys<'a, I>(&self, events: I) +where + I: Iterator> + Send, +{ + type Batch = BTreeMap>; + type Signatures = BTreeMap>; + + let mut batch = Batch::new(); + events + .cloned() + .map(Raw::::from_json) + .map(|event| event.get_field::("signatures")) + .filter_map(FlatOk::flat_ok) + .flat_map(IntoIterator::into_iter) + .for_each(|(server, sigs)| { + batch.entry(server).or_default().extend(sigs.into_keys()); + }); + + let batch = batch + .iter() + .map(|(server, keys)| (server.borrow(), keys.iter().map(Borrow::borrow))); + + self.acquire_pubkeys(batch).await; +} + +#[implement(super::Service)] +pub async fn acquire_pubkeys<'a, S, K>(&self, batch: S) +where + S: Iterator + Send + Clone, + K: Iterator + Send + Clone, +{ + let requested_servers = batch.clone().count(); + let requested_keys = batch.clone().flat_map(|(_, key_ids)| key_ids).count(); + + debug!("acquire {requested_keys} keys from {requested_servers}"); + + let missing = self.acquire_locals(batch).await; + let missing_keys = keys_count(&missing); + let missing_servers = missing.len(); + if missing_servers == 0 { + return; + } + + debug!("missing {missing_keys} keys for {missing_servers} servers locally"); + + let missing = self.acquire_origins(missing.into_iter()).await; + let missing_keys = keys_count(&missing); + let missing_servers = missing.len(); + if missing_servers == 0 { + return; + } + + debug_warn!("missing {missing_keys} keys for {missing_servers} servers unreachable"); + + let missing = self.acquire_notary(missing.into_iter()).await; + let missing_keys = keys_count(&missing); + let missing_servers = missing.len(); + if missing_keys > 0 { + debug_warn!("still missing {missing_keys} keys for {missing_servers} servers from all notaries"); + warn!("did not obtain {missing_keys} of {requested_keys} keys; some events may not be accepted"); + } +} + +#[implement(super::Service)] +async fn acquire_locals<'a, S, K>(&self, batch: S) -> Batch +where + S: Iterator + Send, + K: Iterator + Send, +{ + let mut missing = Batch::new(); + for (server, key_ids) in batch { + for key_id in key_ids { + if !self.verify_key_exists(server, key_id).await { + missing + .entry(server.into()) + .or_default() + .push(key_id.into()); + } + } + } + + missing +} + +#[implement(super::Service)] +async fn acquire_origins(&self, batch: I) -> Batch +where + I: Iterator)> + Send, +{ + let mut requests: FuturesUnordered<_> = batch + .map(|(origin, key_ids)| self.acquire_origin(origin, key_ids)) + .collect(); + + let mut missing = Batch::new(); + while let Some((origin, key_ids)) = requests.next().await { + if !key_ids.is_empty() { + missing.insert(origin, key_ids); + } + } + + missing +} + +#[implement(super::Service)] +async fn acquire_origin( + &self, origin: OwnedServerName, mut key_ids: Vec, +) -> (OwnedServerName, Vec) { + if let Ok(server_keys) = self.server_request(&origin).await { + self.add_signing_keys(server_keys.clone()).await; + key_ids.retain(|key_id| !key_exists(&server_keys, key_id)); + } + + (origin, key_ids) +} + +#[implement(super::Service)] +async fn acquire_notary(&self, batch: I) -> Batch +where + I: Iterator)> + Send, +{ + let mut missing: Batch = batch.collect(); + for notary in self.services.globals.trusted_servers() { + let missing_keys = keys_count(&missing); + let missing_servers = missing.len(); + debug!("Asking notary {notary} for {missing_keys} missing keys from {missing_servers} servers"); + + let batch = missing + .iter() + .map(|(server, keys)| (server.borrow(), keys.iter().map(Borrow::borrow))); + + match self.batch_notary_request(notary, batch).await { + Err(e) => error!("Failed to contact notary {notary:?}: {e}"), + Ok(results) => { + for server_keys in results { + self.acquire_notary_result(&mut missing, server_keys).await; + } + }, + } + } + + missing +} + +#[implement(super::Service)] +async fn acquire_notary_result(&self, missing: &mut Batch, server_keys: ServerSigningKeys) { + let server = &server_keys.server_name; + self.add_signing_keys(server_keys.clone()).await; + + if let Some(key_ids) = missing.get_mut(server) { + key_ids.retain(|key_id| key_exists(&server_keys, key_id)); + if key_ids.is_empty() { + missing.remove(server); + } + } +} + +fn keys_count(batch: &Batch) -> usize { batch.iter().flat_map(|(_, key_ids)| key_ids.iter()).count() } diff --git a/src/service/server_keys/get.rs b/src/service/server_keys/get.rs new file mode 100644 index 00000000..0f449b46 --- /dev/null +++ b/src/service/server_keys/get.rs @@ -0,0 +1,86 @@ +use std::borrow::Borrow; + +use conduit::{implement, Err, Result}; +use ruma::{api::federation::discovery::VerifyKey, CanonicalJsonObject, RoomVersionId, ServerName, ServerSigningKeyId}; + +use super::{extract_key, PubKeyMap, PubKeys}; + +#[implement(super::Service)] +pub async fn get_event_keys(&self, object: &CanonicalJsonObject, version: &RoomVersionId) -> Result { + use ruma::signatures::required_keys; + + let required = match required_keys(object, version) { + Ok(required) => required, + Err(e) => return Err!(BadServerResponse("Failed to determine keys required to verify: {e}")), + }; + + let batch = required + .iter() + .map(|(s, ids)| (s.borrow(), ids.iter().map(Borrow::borrow))); + + Ok(self.get_pubkeys(batch).await) +} + +#[implement(super::Service)] +pub async fn get_pubkeys<'a, S, K>(&self, batch: S) -> PubKeyMap +where + S: Iterator + Send, + K: Iterator + Send, +{ + let mut keys = PubKeyMap::new(); + for (server, key_ids) in batch { + let pubkeys = self.get_pubkeys_for(server, key_ids).await; + keys.insert(server.into(), pubkeys); + } + + keys +} + +#[implement(super::Service)] +pub async fn get_pubkeys_for<'a, I>(&self, origin: &ServerName, key_ids: I) -> PubKeys +where + I: Iterator + Send, +{ + let mut keys = PubKeys::new(); + for key_id in key_ids { + if let Ok(verify_key) = self.get_verify_key(origin, key_id).await { + keys.insert(key_id.into(), verify_key.key); + } + } + + keys +} + +#[implement(super::Service)] +pub async fn get_verify_key(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> Result { + if let Some(result) = self.verify_keys_for(origin).await.remove(key_id) { + return Ok(result); + } + + if let Ok(server_key) = self.server_request(origin).await { + self.add_signing_keys(server_key.clone()).await; + if let Some(result) = extract_key(server_key, key_id) { + return Ok(result); + } + } + + for notary in self.services.globals.trusted_servers() { + if let Ok(server_keys) = self.notary_request(notary, origin).await { + for server_key in &server_keys { + self.add_signing_keys(server_key.clone()).await; + } + + for server_key in server_keys { + if let Some(result) = extract_key(server_key, key_id) { + return Ok(result); + } + } + } + } + + Err!(BadServerResponse(debug_error!( + ?key_id, + ?origin, + "Failed to fetch federation signing-key" + ))) +} diff --git a/src/service/server_keys/keypair.rs b/src/service/server_keys/keypair.rs new file mode 100644 index 00000000..31a24cdf --- /dev/null +++ b/src/service/server_keys/keypair.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use conduit::{debug, debug_info, err, error, utils, utils::string_from_bytes, Result}; +use database::Database; +use ruma::{api::federation::discovery::VerifyKey, serde::Base64, signatures::Ed25519KeyPair}; + +use super::VerifyKeys; + +pub(super) fn init(db: &Arc) -> Result<(Box, VerifyKeys)> { + let keypair = load(db).inspect_err(|_e| { + error!("Keypair invalid. Deleting..."); + remove(db); + })?; + + let verify_key = VerifyKey { + key: Base64::new(keypair.public_key().to_vec()), + }; + + let id = format!("ed25519:{}", keypair.version()); + let verify_keys: VerifyKeys = [(id.try_into()?, verify_key)].into(); + + Ok((keypair, verify_keys)) +} + +fn load(db: &Arc) -> Result> { + let (version, key) = db["global"] + .get_blocking(b"keypair") + .map(|ref val| { + // database deserializer is having trouble with this so it's manual for now + let mut elems = val.split(|&b| b == b'\xFF'); + let vlen = elems.next().expect("invalid keypair entry").len(); + let ver = string_from_bytes(&val[..vlen]).expect("invalid keypair version"); + let der = val[vlen.saturating_add(1)..].to_vec(); + debug!("Found existing Ed25519 keypair: {ver:?}"); + (ver, der) + }) + .or_else(|e| { + assert!(e.is_not_found(), "unexpected error fetching keypair"); + create(db) + })?; + + let key = + Ed25519KeyPair::from_der(&key, version).map_err(|e| err!("Failed to load ed25519 keypair from der: {e:?}"))?; + + Ok(Box::new(key)) +} + +fn create(db: &Arc) -> Result<(String, Vec)> { + let keypair = Ed25519KeyPair::generate().map_err(|e| err!("Failed to generate new ed25519 keypair: {e:?}"))?; + + let id = utils::rand::string(8); + debug_info!("Generated new Ed25519 keypair: {id:?}"); + + let value: (String, Vec) = (id, keypair.to_vec()); + db["global"].raw_put(b"keypair", &value); + + Ok(value) +} + +#[inline] +fn remove(db: &Arc) { + let global = &db["global"]; + global.remove(b"keypair"); +} diff --git a/src/service/server_keys/mod.rs b/src/service/server_keys/mod.rs index ae2b8c3c..c3b84cb3 100644 --- a/src/service/server_keys/mod.rs +++ b/src/service/server_keys/mod.rs @@ -1,31 +1,30 @@ -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - sync::Arc, - time::{Duration, SystemTime}, -}; +mod acquire; +mod get; +mod keypair; +mod request; +mod sign; +mod verify; -use conduit::{debug, debug_error, debug_warn, err, error, info, trace, warn, Err, Result}; -use futures::{stream::FuturesUnordered, StreamExt}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use conduit::{implement, utils::time::timepoint_from_now, Result}; +use database::{Deserialized, Json, Map}; use ruma::{ - api::federation::{ - discovery::{ - get_remote_server_keys, - get_remote_server_keys_batch::{self, v2::QueryCriteria}, - get_server_keys, - }, - membership::create_join_event, - }, - serde::Base64, - CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedServerSigningKeyId, - RoomVersionId, ServerName, + api::federation::discovery::{ServerSigningKeys, VerifyKey}, + serde::Raw, + signatures::{Ed25519KeyPair, PublicKeyMap, PublicKeySet}, + MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, }; use serde_json::value::RawValue as RawJsonValue; -use tokio::sync::{RwLock, RwLockWriteGuard}; use crate::{globals, sending, Dep}; pub struct Service { + keypair: Box, + verify_keys: VerifyKeys, + minimum_valid: Duration, services: Services, + db: Data, } struct Services { @@ -33,546 +32,135 @@ struct Services { sending: Dep, } +struct Data { + server_signingkeys: Arc, +} + +pub type VerifyKeys = BTreeMap; +pub type PubKeyMap = PublicKeyMap; +pub type PubKeys = PublicKeySet; + impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { + let minimum_valid = Duration::from_secs(3600); + let (keypair, verify_keys) = keypair::init(args.db)?; + Ok(Arc::new(Self { + keypair, + verify_keys, + minimum_valid, services: Services { globals: args.depend::("globals"), sending: args.depend::("sending"), }, + db: Data { + server_signingkeys: args.db["server_signingkeys"].clone(), + }, })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - pub async fn fetch_required_signing_keys<'a, E>( - &'a self, events: E, pub_key_map: &RwLock>>, - ) -> Result<()> - where - E: IntoIterator> + Send, - { - let mut server_key_ids = HashMap::new(); - for event in events { - for (signature_server, signature) in event - .get("signatures") - .ok_or(err!(BadServerResponse("No signatures in server response pdu.")))? - .as_object() - .ok_or(err!(BadServerResponse("Invalid signatures object in server response pdu.")))? - { - let signature_object = signature.as_object().ok_or(err!(BadServerResponse( - "Invalid signatures content object in server response pdu.", - )))?; +#[implement(Service)] +#[inline] +pub fn keypair(&self) -> &Ed25519KeyPair { &self.keypair } - for signature_id in signature_object.keys() { - server_key_ids - .entry(signature_server.clone()) - .or_insert_with(HashSet::new) - .insert(signature_id.clone()); - } - } - } +#[implement(Service)] +async fn add_signing_keys(&self, new_keys: ServerSigningKeys) { + let origin = &new_keys.server_name; - if server_key_ids.is_empty() { - // Nothing to do, can exit early - trace!("server_key_ids is empty, not fetching any keys"); - return Ok(()); - } + // (timo) Not atomic, but this is not critical + let mut keys: ServerSigningKeys = self + .db + .server_signingkeys + .get(origin) + .await + .deserialized() + .unwrap_or_else(|_| { + // Just insert "now", it doesn't matter + ServerSigningKeys::new(origin.to_owned(), MilliSecondsSinceUnixEpoch::now()) + }); - trace!( - "Fetch keys for {}", - server_key_ids - .keys() - .cloned() - .collect::>() - .join(", ") - ); - - let mut server_keys: FuturesUnordered<_> = server_key_ids - .into_iter() - .map(|(signature_server, signature_ids)| async { - let fetch_res = self - .fetch_signing_keys_for_server( - signature_server.as_str().try_into().map_err(|e| { - ( - signature_server.clone(), - err!(BadServerResponse( - "Invalid servername in signatures of server response pdu: {e:?}" - )), - ) - })?, - signature_ids.into_iter().collect(), // HashSet to Vec - ) - .await; - - match fetch_res { - Ok(keys) => Ok((signature_server, keys)), - Err(e) => { - debug_error!( - "Signature verification failed: Could not fetch signing key for {signature_server}: {e}", - ); - Err((signature_server, e)) - }, - } - }) - .collect(); - - while let Some(fetch_res) = server_keys.next().await { - match fetch_res { - Ok((signature_server, keys)) => { - pub_key_map - .write() - .await - .insert(signature_server.clone(), keys); - }, - Err((signature_server, e)) => { - debug_warn!("Failed to fetch keys for {signature_server}: {e:?}"); - }, - } - } - - Ok(()) - } - - // Gets a list of servers for which we don't have the signing key yet. We go - // over the PDUs and either cache the key or add it to the list that needs to be - // retrieved. - async fn get_server_keys_from_cache( - &self, pdu: &RawJsonValue, - servers: &mut BTreeMap>, - _room_version: &RoomVersionId, - pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap>>, - ) -> Result<()> { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { - debug_error!("Invalid PDU in server response: {pdu:#?}"); - err!(BadServerResponse(error!("Invalid PDU in server response: {e:?}"))) - })?; - - let signatures = value - .get("signatures") - .ok_or(err!(BadServerResponse("No signatures in server response pdu.")))? - .as_object() - .ok_or(err!(BadServerResponse("Invalid signatures object in server response pdu.")))?; - - for (signature_server, signature) in signatures { - let signature_object = signature.as_object().ok_or(err!(BadServerResponse( - "Invalid signatures content object in server response pdu.", - )))?; - - let signature_ids = signature_object.keys().cloned().collect::>(); - - let contains_all_ids = - |keys: &BTreeMap| signature_ids.iter().all(|id| keys.contains_key(id)); - - let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|e| { - err!(BadServerResponse( - "Invalid servername in signatures of server response pdu: {e:?}" - )) - })?; - - if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) { - continue; - } - - debug!("Loading signing keys for {origin}"); - let result: BTreeMap<_, _> = self - .services - .globals - .verify_keys_for(origin) - .await? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - - if !contains_all_ids(&result) { - debug_warn!("Signing key not loaded for {origin}"); - servers.insert(origin.to_owned(), BTreeMap::new()); - } - - pub_key_map.insert(origin.to_string(), result); - } - - Ok(()) - } - - /// Batch requests homeserver signing keys from trusted notary key servers - /// (`trusted_servers` config option) - async fn batch_request_signing_keys( - &self, mut servers: BTreeMap>, - pub_key_map: &RwLock>>, - ) -> Result<()> { - for server in self.services.globals.trusted_servers() { - debug!("Asking batch signing keys from trusted server {server}"); - match self - .services - .sending - .send_federation_request( - server, - get_remote_server_keys_batch::v2::Request { - server_keys: servers.clone(), - }, - ) - .await - { - Ok(keys) => { - debug!("Got signing keys: {keys:?}"); - let mut pkm = pub_key_map.write().await; - for k in keys.server_keys { - let k = match k.deserialize() { - Ok(key) => key, - Err(e) => { - warn!( - "Received error {e} while fetching keys from trusted server {server}: {:#?}", - k.into_json() - ); - continue; - }, - }; - - // TODO: Check signature from trusted server? - servers.remove(&k.server_name); - - let result = self - .services - .globals - .db - .add_signing_key(&k.server_name, k.clone()) - .await - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect::>(); - - pkm.insert(k.server_name.to_string(), result); - } - }, - Err(e) => error!( - "Failed sending batched key request to trusted key server {server} for the remote servers \ - {servers:?}: {e}" - ), - } - } - - Ok(()) - } - - /// Requests multiple homeserver signing keys from individual servers (not - /// trused notary servers) - async fn request_signing_keys( - &self, servers: BTreeMap>, - pub_key_map: &RwLock>>, - ) -> Result<()> { - debug!("Asking individual servers for signing keys: {servers:?}"); - let mut futures: FuturesUnordered<_> = servers - .into_keys() - .map(|server| async move { - ( - self.services - .sending - .send_federation_request(&server, get_server_keys::v2::Request::new()) - .await, - server, - ) - }) - .collect(); - - while let Some(result) = futures.next().await { - debug!("Received new Future result"); - if let (Ok(get_keys_response), origin) = result { - debug!("Result is from {origin}"); - if let Ok(key) = get_keys_response.server_key.deserialize() { - let result: BTreeMap<_, _> = self - .services - .globals - .db - .add_signing_key(&origin, key) - .await - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - pub_key_map.write().await.insert(origin.to_string(), result); - } - } - debug!("Done handling Future result"); - } - - Ok(()) - } - - pub async fn fetch_join_signing_keys( - &self, event: &create_join_event::v2::Response, room_version: &RoomVersionId, - pub_key_map: &RwLock>>, - ) -> Result<()> { - let mut servers: BTreeMap> = BTreeMap::new(); - - { - let mut pkm = pub_key_map.write().await; - - // Try to fetch keys, failure is okay. Servers we couldn't find in the cache - // will be added to `servers` - for pdu in event - .room_state - .state - .iter() - .chain(&event.room_state.auth_chain) - { - if let Err(error) = self - .get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm) - .await - { - debug!(%error, "failed to get server keys from cache"); - }; - } - - drop(pkm); - }; - - if servers.is_empty() { - trace!("We had all keys cached locally, not fetching any keys from remote servers"); - return Ok(()); - } - - if self.services.globals.query_trusted_key_servers_first() { - info!( - "query_trusted_key_servers_first is set to true, querying notary trusted key servers first for \ - homeserver signing keys." - ); - - self.batch_request_signing_keys(servers.clone(), pub_key_map) - .await?; - - if servers.is_empty() { - debug!("Trusted server supplied all signing keys, no more keys to fetch"); - return Ok(()); - } - - debug!("Remaining servers left that the notary/trusted servers did not provide: {servers:?}"); - - self.request_signing_keys(servers.clone(), pub_key_map) - .await?; - } else { - debug!("query_trusted_key_servers_first is set to false, querying individual homeservers first"); - - self.request_signing_keys(servers.clone(), pub_key_map) - .await?; - - if servers.is_empty() { - debug!("Individual homeservers supplied all signing keys, no more keys to fetch"); - return Ok(()); - } - - debug!("Remaining servers left the individual homeservers did not provide: {servers:?}"); - - self.batch_request_signing_keys(servers.clone(), pub_key_map) - .await?; - } - - debug!("Search for signing keys done"); - - /*if servers.is_empty() { - warn!("Failed to find homeserver signing keys for the remaining servers: {servers:?}"); - }*/ - - Ok(()) - } - - /// Search the DB for the signing keys of the given server, if we don't have - /// them fetch them from the server and save to our DB. - #[tracing::instrument(skip_all)] - pub async fn fetch_signing_keys_for_server( - &self, origin: &ServerName, signature_ids: Vec, - ) -> Result> { - let contains_all_ids = |keys: &BTreeMap| signature_ids.iter().all(|id| keys.contains_key(id)); - - let mut result: BTreeMap<_, _> = self - .services - .globals - .verify_keys_for(origin) - .await? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - - if contains_all_ids(&result) { - trace!("We have all homeserver signing keys locally for {origin}, not fetching any remotely"); - return Ok(result); - } - - // i didnt split this out into their own functions because it's relatively small - if self.services.globals.query_trusted_key_servers_first() { - info!( - "query_trusted_key_servers_first is set to true, querying notary trusted servers first for {origin} \ - keys" - ); - - for server in self.services.globals.trusted_servers() { - debug!("Asking notary server {server} for {origin}'s signing key"); - if let Some(server_keys) = self - .services - .sending - .send_federation_request( - server, - get_remote_server_keys::v2::Request::new( - origin.to_owned(), - MilliSecondsSinceUnixEpoch::from_system_time( - SystemTime::now() - .checked_add(Duration::from_secs(3600)) - .expect("SystemTime too large"), - ) - .expect("time is valid"), - ), - ) - .await - .ok() - .map(|resp| { - resp.server_keys - .into_iter() - .filter_map(|e| e.deserialize().ok()) - .collect::>() - }) { - debug!("Got signing keys: {:?}", server_keys); - for k in server_keys { - self.services - .globals - .db - .add_signing_key(origin, k.clone()) - .await; - result.extend( - k.verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - k.old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - } - - if contains_all_ids(&result) { - return Ok(result); - } - } - } - - debug!("Asking {origin} for their signing keys over federation"); - if let Some(server_key) = self - .services - .sending - .send_federation_request(origin, get_server_keys::v2::Request::new()) - .await - .ok() - .and_then(|resp| resp.server_key.deserialize().ok()) - { - self.services - .globals - .db - .add_signing_key(origin, server_key.clone()) - .await; - - result.extend( - server_key - .verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - server_key - .old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - - if contains_all_ids(&result) { - return Ok(result); - } - } - } else { - info!("query_trusted_key_servers_first is set to false, querying {origin} first"); - debug!("Asking {origin} for their signing keys over federation"); - if let Some(server_key) = self - .services - .sending - .send_federation_request(origin, get_server_keys::v2::Request::new()) - .await - .ok() - .and_then(|resp| resp.server_key.deserialize().ok()) - { - self.services - .globals - .db - .add_signing_key(origin, server_key.clone()) - .await; - - result.extend( - server_key - .verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - server_key - .old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - - if contains_all_ids(&result) { - return Ok(result); - } - } - - for server in self.services.globals.trusted_servers() { - debug!("Asking notary server {server} for {origin}'s signing key"); - if let Some(server_keys) = self - .services - .sending - .send_federation_request( - server, - get_remote_server_keys::v2::Request::new( - origin.to_owned(), - MilliSecondsSinceUnixEpoch::from_system_time( - SystemTime::now() - .checked_add(Duration::from_secs(3600)) - .expect("SystemTime too large"), - ) - .expect("time is valid"), - ), - ) - .await - .ok() - .map(|resp| { - resp.server_keys - .into_iter() - .filter_map(|e| e.deserialize().ok()) - .collect::>() - }) { - debug!("Got signing keys: {server_keys:?}"); - for k in server_keys { - self.services - .globals - .db - .add_signing_key(origin, k.clone()) - .await; - result.extend( - k.verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - k.old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - } - - if contains_all_ids(&result) { - return Ok(result); - } - } - } - } - - Err!(BadServerResponse(warn!("Failed to find public key for server {origin:?}"))) - } + keys.verify_keys.extend(new_keys.verify_keys); + keys.old_verify_keys.extend(new_keys.old_verify_keys); + self.db.server_signingkeys.raw_put(origin, Json(&keys)); +} + +#[implement(Service)] +async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool { + type KeysMap<'a> = BTreeMap<&'a ServerSigningKeyId, &'a RawJsonValue>; + + let Ok(keys) = self + .db + .server_signingkeys + .get(origin) + .await + .deserialized::>() + else { + return false; + }; + + if let Ok(Some(verify_keys)) = keys.get_field::>("verify_keys") { + if verify_keys.contains_key(key_id) { + return true; + } + } + + if let Ok(Some(old_verify_keys)) = keys.get_field::>("old_verify_keys") { + if old_verify_keys.contains_key(key_id) { + return true; + } + } + + false +} + +#[implement(Service)] +pub async fn verify_keys_for(&self, origin: &ServerName) -> VerifyKeys { + let mut keys = self + .signing_keys_for(origin) + .await + .map(|keys| merge_old_keys(keys).verify_keys) + .unwrap_or(BTreeMap::new()); + + if self.services.globals.server_is_ours(origin) { + keys.extend(self.verify_keys.clone().into_iter()); + } + + keys +} + +#[implement(Service)] +pub async fn signing_keys_for(&self, origin: &ServerName) -> Result { + self.db.server_signingkeys.get(origin).await.deserialized() +} + +#[implement(Service)] +fn minimum_valid_ts(&self) -> MilliSecondsSinceUnixEpoch { + let timepoint = timepoint_from_now(self.minimum_valid).expect("SystemTime should not overflow"); + MilliSecondsSinceUnixEpoch::from_system_time(timepoint).expect("UInt should not overflow") +} + +fn merge_old_keys(mut keys: ServerSigningKeys) -> ServerSigningKeys { + keys.verify_keys.extend( + keys.old_verify_keys + .clone() + .into_iter() + .map(|(key_id, old)| (key_id, VerifyKey::new(old.key))), + ); + + keys +} + +fn extract_key(mut keys: ServerSigningKeys, key_id: &ServerSigningKeyId) -> Option { + keys.verify_keys.remove(key_id).or_else(|| { + keys.old_verify_keys + .remove(key_id) + .map(|old| VerifyKey::new(old.key)) + }) +} + +fn key_exists(keys: &ServerSigningKeys, key_id: &ServerSigningKeyId) -> bool { + keys.verify_keys.contains_key(key_id) || keys.old_verify_keys.contains_key(key_id) } diff --git a/src/service/server_keys/request.rs b/src/service/server_keys/request.rs new file mode 100644 index 00000000..84dd2871 --- /dev/null +++ b/src/service/server_keys/request.rs @@ -0,0 +1,97 @@ +use std::collections::BTreeMap; + +use conduit::{implement, Err, Result}; +use ruma::{ + api::federation::discovery::{ + get_remote_server_keys, + get_remote_server_keys_batch::{self, v2::QueryCriteria}, + get_server_keys, ServerSigningKeys, + }, + OwnedServerName, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, +}; + +#[implement(super::Service)] +pub(super) async fn batch_notary_request<'a, S, K>( + &self, notary: &ServerName, batch: S, +) -> Result> +where + S: Iterator + Send, + K: Iterator + Send, +{ + use get_remote_server_keys_batch::v2::Request; + type RumaBatch = BTreeMap>; + + let criteria = QueryCriteria { + minimum_valid_until_ts: Some(self.minimum_valid_ts()), + }; + + let mut server_keys = RumaBatch::new(); + for (server, key_ids) in batch { + let entry = server_keys.entry(server.into()).or_default(); + for key_id in key_ids { + entry.insert(key_id.into(), criteria.clone()); + } + } + + debug_assert!(!server_keys.is_empty(), "empty batch request to notary"); + let request = Request { + server_keys, + }; + + self.services + .sending + .send_federation_request(notary, request) + .await + .map(|response| response.server_keys) + .map(|keys| { + keys.into_iter() + .map(|key| key.deserialize()) + .filter_map(Result::ok) + .collect() + }) +} + +#[implement(super::Service)] +pub async fn notary_request(&self, notary: &ServerName, target: &ServerName) -> Result> { + use get_remote_server_keys::v2::Request; + + let request = Request { + server_name: target.into(), + minimum_valid_until_ts: self.minimum_valid_ts(), + }; + + self.services + .sending + .send_federation_request(notary, request) + .await + .map(|response| response.server_keys) + .map(|keys| { + keys.into_iter() + .map(|key| key.deserialize()) + .filter_map(Result::ok) + .collect() + }) +} + +#[implement(super::Service)] +pub async fn server_request(&self, target: &ServerName) -> Result { + use get_server_keys::v2::Request; + + let server_signing_key = self + .services + .sending + .send_federation_request(target, Request::new()) + .await + .map(|response| response.server_key) + .and_then(|key| key.deserialize().map_err(Into::into))?; + + if server_signing_key.server_name != target { + return Err!(BadServerResponse(debug_warn!( + requested = ?target, + response = ?server_signing_key.server_name, + "Server responded with bogus server_name" + ))); + } + + Ok(server_signing_key) +} diff --git a/src/service/server_keys/sign.rs b/src/service/server_keys/sign.rs new file mode 100644 index 00000000..28fd7e80 --- /dev/null +++ b/src/service/server_keys/sign.rs @@ -0,0 +1,18 @@ +use conduit::{implement, Result}; +use ruma::{CanonicalJsonObject, RoomVersionId}; + +#[implement(super::Service)] +pub fn sign_json(&self, object: &mut CanonicalJsonObject) -> Result { + use ruma::signatures::sign_json; + + let server_name = self.services.globals.server_name().as_str(); + sign_json(server_name, self.keypair(), object).map_err(Into::into) +} + +#[implement(super::Service)] +pub fn hash_and_sign_event(&self, object: &mut CanonicalJsonObject, room_version: &RoomVersionId) -> Result { + use ruma::signatures::hash_and_sign_event; + + let server_name = self.services.globals.server_name().as_str(); + hash_and_sign_event(server_name, self.keypair(), object, room_version).map_err(Into::into) +} diff --git a/src/service/server_keys/verify.rs b/src/service/server_keys/verify.rs new file mode 100644 index 00000000..ad20fec7 --- /dev/null +++ b/src/service/server_keys/verify.rs @@ -0,0 +1,33 @@ +use conduit::{implement, pdu::gen_event_id_canonical_json, Err, Result}; +use ruma::{signatures::Verified, CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, RoomVersionId}; +use serde_json::value::RawValue as RawJsonValue; + +#[implement(super::Service)] +pub async fn validate_and_add_event_id( + &self, pdu: &RawJsonValue, room_version: &RoomVersionId, +) -> Result<(OwnedEventId, CanonicalJsonObject)> { + let (event_id, mut value) = gen_event_id_canonical_json(pdu, room_version)?; + if let Err(e) = self.verify_event(&value, Some(room_version)).await { + return Err!(BadServerResponse(debug_error!("Event {event_id} failed verification: {e:?}"))); + } + + value.insert("event_id".into(), CanonicalJsonValue::String(event_id.as_str().into())); + + Ok((event_id, value)) +} + +#[implement(super::Service)] +pub async fn verify_event( + &self, event: &CanonicalJsonObject, room_version: Option<&RoomVersionId>, +) -> Result { + let room_version = room_version.unwrap_or(&RoomVersionId::V11); + let keys = self.get_event_keys(event, room_version).await?; + ruma::signatures::verify_event(&keys, event, room_version).map_err(Into::into) +} + +#[implement(super::Service)] +pub async fn verify_json(&self, event: &CanonicalJsonObject, room_version: Option<&RoomVersionId>) -> Result { + let room_version = room_version.unwrap_or(&RoomVersionId::V11); + let keys = self.get_event_keys(event, room_version).await?; + ruma::signatures::verify_json(&keys, event.clone()).map_err(Into::into) +}