revamp appservice registration to ruma's Registration type

squashed from https://gitlab.com/famedly/conduit/-/merge_requests/583

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
Matthias Ahouansou 2024-02-07 17:48:01 -05:00 committed by June
parent 0e9eb22ee7
commit 784d307425
13 changed files with 203 additions and 208 deletions

View file

@ -1,107 +1,114 @@
use crate::{services, utils, Error, Result}; use crate::{services, utils, Error, Result};
use bytes::BytesMut; use bytes::BytesMut;
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken}; use ruma::api::{
appservice::Registration, IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken,
};
use std::{fmt::Debug, mem, time::Duration}; use std::{fmt::Debug, mem, time::Duration};
use tracing::warn; use tracing::warn;
#[tracing::instrument(skip(request))] /// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice registration file
pub(crate) async fn send_request<T: OutgoingRequest>( pub(crate) async fn send_request<T: OutgoingRequest>(
registration: serde_yaml::Value, registration: Registration,
request: T, request: T,
) -> Result<T::IncomingResponse> ) -> Option<Result<T::IncomingResponse>>
where where
T: Debug, T: Debug,
{ {
let destination = registration.get("url").unwrap().as_str().unwrap(); if let Some(destination) = registration.url {
let hs_token = registration.get("hs_token").unwrap().as_str().unwrap(); let hs_token = registration.hs_token.as_str();
let mut http_request = request let mut http_request = request
.try_into_http_request::<BytesMut>( .try_into_http_request::<BytesMut>(
destination, &destination,
SendAccessToken::IfRequired(hs_token), SendAccessToken::IfRequired(hs_token),
&[MatrixVersion::V1_0], &[MatrixVersion::V1_0],
) )
.map_err(|e| { .map_err(|e| {
warn!("Failed to find destination {}: {}", destination, e); warn!("Failed to find destination {}: {}", destination, e);
Error::BadServerResponse("Invalid destination") Error::BadServerResponse("Invalid destination")
})? })
.map(|body| body.freeze()); .unwrap()
.map(|body| body.freeze());
let mut parts = http_request.uri().clone().into_parts(); let mut parts = http_request.uri().clone().into_parts();
let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned(); let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned();
let symbol = if old_path_and_query.contains('?') { let symbol = if old_path_and_query.contains('?') {
"&" "&"
} else { } else {
"?" "?"
}; };
parts.path_and_query = Some( parts.path_and_query = Some(
(old_path_and_query + symbol + "access_token=" + hs_token) (old_path_and_query + symbol + "access_token=" + hs_token)
.parse() .parse()
.unwrap(), .unwrap(),
); );
*http_request.uri_mut() = parts.try_into().expect("our manipulation is always valid"); *http_request.uri_mut() = parts.try_into().expect("our manipulation is always valid");
let mut reqwest_request = reqwest::Request::try_from(http_request)?; let mut reqwest_request = reqwest::Request::try_from(http_request)
.expect("all http requests are valid reqwest requests");
*reqwest_request.timeout_mut() = Some(Duration::from_secs(120)); *reqwest_request.timeout_mut() = Some(Duration::from_secs(120));
let url = reqwest_request.url().clone(); let url = reqwest_request.url().clone();
let mut response = match services() let mut response = match services()
.globals .globals
.default_client() .default_client()
.execute(reqwest_request) .execute(reqwest_request)
.await .await
{ {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
warn!(
"Could not send request to appservice {} at {}: {}",
registration.id, destination, e
);
return Some(Err(e.into()));
}
};
// reqwest::Response -> http::Response conversion
let status = response.status();
let mut http_response_builder = http::Response::builder()
.status(status)
.version(response.version());
mem::swap(
response.headers_mut(),
http_response_builder
.headers_mut()
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await.unwrap_or_else(|e| {
warn!("server error: {}", e);
Vec::new().into()
}); // TODO: handle timeout
if !status.is_success() {
warn!( warn!(
"Could not send request to appservice {:?} at {}: {}", "Appservice returned bad response {} {}\n{}\n{:?}",
registration.get("id"),
destination, destination,
e status,
url,
utils::string_from_bytes(&body)
); );
return Err(e.into());
} }
};
// reqwest::Response -> http::Response conversion let response = T::IncomingResponse::try_from_http_response(
let status = response.status(); http_response_builder
let mut http_response_builder = http::Response::builder() .body(body)
.status(status) .expect("reqwest body is valid http body"),
.version(response.version());
mem::swap(
response.headers_mut(),
http_response_builder
.headers_mut()
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await.unwrap_or_else(|e| {
warn!("server error: {}", e);
Vec::new().into()
}); // TODO: handle timeout
if !status.is_success() {
warn!(
"Appservice returned bad response {} {}\n{}\n{:?}",
destination,
status,
url,
utils::string_from_bytes(&body)
); );
Some(response.map_err(|_| {
warn!(
"Appservice returned invalid response bytes {}\n{}",
destination, url
);
Error::BadServerResponse("Server returned bad response.")
}))
} else {
None
} }
let response = T::IncomingResponse::try_from_http_response(
http_response_builder
.body(body)
.expect("reqwest body is valid http body"),
);
response.map_err(|_| {
warn!(
"Appservice returned invalid response bytes {}\n{}",
destination, url
);
Error::BadServerResponse("Server returned bad response.")
})
} }

View file

@ -157,20 +157,16 @@ pub(crate) async fn get_alias_helper(
None => { None => {
for (_id, registration) in services().appservice.all()? { for (_id, registration) in services().appservice.all()? {
let aliases = registration let aliases = registration
.get("namespaces") .namespaces
.and_then(|ns| ns.get("aliases")) .aliases
.and_then(|aliases| aliases.as_sequence()) .iter()
.map_or_else(Vec::new, |aliases| { .filter_map(|alias| Regex::new(alias.regex.as_str()).ok())
aliases .collect::<Vec<_>>();
.iter()
.filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
if aliases if aliases
.iter() .iter()
.any(|aliases| aliases.is_match(room_alias.as_str())) .any(|aliases| aliases.is_match(room_alias.as_str()))
&& services() && if let Some(opt_result) = services()
.sending .sending
.send_appservice_request( .send_appservice_request(
registration, registration,
@ -179,7 +175,11 @@ pub(crate) async fn get_alias_helper(
}, },
) )
.await .await
.is_ok() {
opt_result.is_ok()
} else {
false
}
{ {
room_id = Some( room_id = Some(
services() services()

View file

@ -81,12 +81,9 @@ where
let mut json_body = serde_json::from_slice::<CanonicalJsonValue>(&body).ok(); let mut json_body = serde_json::from_slice::<CanonicalJsonValue>(&body).ok();
let appservices = services().appservice.all().unwrap(); let appservices = services().appservice.all().unwrap();
let appservice_registration = appservices.iter().find(|(_id, registration)| { let appservice_registration = appservices
registration .iter()
.get("as_token") .find(|(_id, registration)| Some(registration.as_token.as_str()) == token);
.and_then(|as_token| as_token.as_str())
.map_or(false, |as_token| token == Some(as_token))
});
let (sender_user, sender_device, sender_servername, from_appservice) = let (sender_user, sender_device, sender_servername, from_appservice) =
if let Some((_id, registration)) = appservice_registration { if let Some((_id, registration)) = appservice_registration {
@ -95,11 +92,7 @@ where
let user_id = query_params.user_id.map_or_else( let user_id = query_params.user_id.map_or_else(
|| { || {
UserId::parse_with_server_name( UserId::parse_with_server_name(
registration registration.sender_localpart.as_str(),
.get("sender_localpart")
.unwrap()
.as_str()
.unwrap(),
services().globals.server_name(), services().globals.server_name(),
) )
.unwrap() .unwrap()

View file

@ -1,10 +1,11 @@
use ruma::api::appservice::Registration;
use crate::{database::KeyValueDatabase, service, utils, Error, Result}; use crate::{database::KeyValueDatabase, service, utils, Error, Result};
impl service::appservice::Data for KeyValueDatabase { impl service::appservice::Data for KeyValueDatabase {
/// Registers an appservice and returns the ID to the caller /// Registers an appservice and returns the ID to the caller
fn register_appservice(&self, yaml: serde_yaml::Value) -> Result<String> { fn register_appservice(&self, yaml: Registration) -> Result<String> {
// TODO: Rumaify let id = yaml.id.as_str();
let id = yaml.get("id").unwrap().as_str().unwrap();
self.id_appserviceregistrations.insert( self.id_appserviceregistrations.insert(
id.as_bytes(), id.as_bytes(),
serde_yaml::to_string(&yaml).unwrap().as_bytes(), serde_yaml::to_string(&yaml).unwrap().as_bytes(),
@ -32,7 +33,7 @@ impl service::appservice::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn get_registration(&self, id: &str) -> Result<Option<serde_yaml::Value>> { fn get_registration(&self, id: &str) -> Result<Option<Registration>> {
self.cached_registrations self.cached_registrations
.read() .read()
.unwrap() .unwrap()
@ -64,7 +65,7 @@ impl service::appservice::Data for KeyValueDatabase {
))) )))
} }
fn all(&self) -> Result<Vec<(String, serde_yaml::Value)>> { fn all(&self) -> Result<Vec<(String, Registration)>> {
self.iter_ids()? self.iter_ids()?
.filter_map(|id| id.ok()) .filter_map(|id| id.ok())
.map(move |id| { .map(move |id| {

View file

@ -2,6 +2,7 @@ use std::{collections::HashSet, sync::Arc};
use regex::Regex; use regex::Regex;
use ruma::{ use ruma::{
api::appservice::Registration,
events::{AnyStrippedStateEvent, AnySyncStateEvent}, events::{AnyStrippedStateEvent, AnySyncStateEvent},
serde::Raw, serde::Raw,
OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
@ -193,7 +194,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn appservice_in_room( fn appservice_in_room(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
appservice: &(String, serde_yaml::Value), appservice: &(String, Registration),
) -> Result<bool> { ) -> Result<bool> {
let maybe = self let maybe = self
.appservice_in_room_cache .appservice_in_room_cache
@ -205,24 +206,19 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
if let Some(b) = maybe { if let Some(b) = maybe {
Ok(b) Ok(b)
} else if let Some(namespaces) = appservice.1.get("namespaces") { } else {
let namespaces = &appservice.1.namespaces;
let users = namespaces let users = namespaces
.get("users") .users
.and_then(|users| users.as_sequence()) .iter()
.map_or_else(Vec::new, |users| { .filter_map(|users| Regex::new(users.regex.as_str()).ok())
users .collect::<Vec<_>>();
.iter()
.filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let bridge_user_id = appservice let bridge_user_id = UserId::parse_with_server_name(
.1 appservice.1.sender_localpart.as_str(),
.get("sender_localpart") services().globals.server_name(),
.and_then(|string| string.as_str()) )
.and_then(|string| { .ok();
UserId::parse_with_server_name(string, services().globals.server_name()).ok()
});
let in_room = bridge_user_id let in_room = bridge_user_id
.map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false)) .map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false))
@ -240,8 +236,6 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
.insert(appservice.0.clone(), in_room); .insert(appservice.0.clone(), in_room);
Ok(in_room) Ok(in_room)
} else {
Ok(false)
} }
} }

View file

@ -11,6 +11,7 @@ use directories::ProjectDirs;
use lru_cache::LruCache; use lru_cache::LruCache;
use rand::thread_rng; use rand::thread_rng;
use ruma::{ use ruma::{
api::appservice::Registration,
events::{ events::{
push_rules::{PushRulesEvent, PushRulesEventContent}, push_rules::{PushRulesEvent, PushRulesEventContent},
room::message::RoomMessageEventContent, room::message::RoomMessageEventContent,
@ -163,7 +164,7 @@ pub struct KeyValueDatabase {
//pub pusher: pusher::PushData, //pub pusher: pusher::PushData,
pub(super) senderkey_pusher: Arc<dyn KvTree>, pub(super) senderkey_pusher: Arc<dyn KvTree>,
pub(super) cached_registrations: Arc<RwLock<HashMap<String, serde_yaml::Value>>>, pub(super) cached_registrations: Arc<RwLock<HashMap<String, Registration>>>,
pub(super) pdu_cache: Mutex<LruCache<OwnedEventId, Arc<PduEvent>>>, pub(super) pdu_cache: Mutex<LruCache<OwnedEventId, Arc<PduEvent>>>,
pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>, pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>,
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>, pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>,

View file

@ -10,7 +10,7 @@ use std::fmt::Write;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use regex::Regex; use regex::Regex;
use ruma::{ use ruma::{
api::client::error::ErrorKind, api::{appservice::Registration, client::error::ErrorKind},
events::{ events::{
relation::InReplyTo, relation::InReplyTo,
room::{ room::{
@ -472,7 +472,7 @@ impl Service {
{ {
let appservice_config = body[1..body.len() - 1].join("\n"); let appservice_config = body[1..body.len() - 1].join("\n");
let parsed_config = let parsed_config =
serde_yaml::from_str::<serde_yaml::Value>(&appservice_config); serde_yaml::from_str::<Registration>(&appservice_config);
match parsed_config { match parsed_config {
Ok(yaml) => match services().appservice.register_appservice(yaml) { Ok(yaml) => match services().appservice.register_appservice(yaml) {
Ok(id) => RoomMessageEventContent::text_plain(format!( Ok(id) => RoomMessageEventContent::text_plain(format!(

View file

@ -1,8 +1,10 @@
use ruma::api::appservice::Registration;
use crate::Result; use crate::Result;
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
/// Registers an appservice and returns the ID to the caller /// Registers an appservice and returns the ID to the caller
fn register_appservice(&self, yaml: serde_yaml::Value) -> Result<String>; fn register_appservice(&self, yaml: Registration) -> Result<String>;
/// Remove an appservice registration /// Remove an appservice registration
/// ///
@ -11,9 +13,9 @@ pub trait Data: Send + Sync {
/// * `service_name` - the name you send to register the service previously /// * `service_name` - the name you send to register the service previously
fn unregister_appservice(&self, service_name: &str) -> Result<()>; fn unregister_appservice(&self, service_name: &str) -> Result<()>;
fn get_registration(&self, id: &str) -> Result<Option<serde_yaml::Value>>; fn get_registration(&self, id: &str) -> Result<Option<Registration>>;
fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>>; fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>>;
fn all(&self) -> Result<Vec<(String, serde_yaml::Value)>>; fn all(&self) -> Result<Vec<(String, Registration)>>;
} }

View file

@ -1,6 +1,7 @@
mod data; mod data;
pub(crate) use data::Data; pub(crate) use data::Data;
use ruma::api::appservice::Registration;
use crate::Result; use crate::Result;
@ -10,7 +11,7 @@ pub struct Service {
impl Service { impl Service {
/// Registers an appservice and returns the ID to the caller /// Registers an appservice and returns the ID to the caller
pub fn register_appservice(&self, yaml: serde_yaml::Value) -> Result<String> { pub fn register_appservice(&self, yaml: Registration) -> Result<String> {
self.db.register_appservice(yaml) self.db.register_appservice(yaml)
} }
@ -23,7 +24,7 @@ impl Service {
self.db.unregister_appservice(service_name) self.db.unregister_appservice(service_name)
} }
pub fn get_registration(&self, id: &str) -> Result<Option<serde_yaml::Value>> { pub fn get_registration(&self, id: &str) -> Result<Option<Registration>> {
self.db.get_registration(id) self.db.get_registration(id)
} }
@ -31,7 +32,7 @@ impl Service {
self.db.iter_ids() self.db.iter_ids()
} }
pub fn all(&self) -> Result<Vec<(String, serde_yaml::Value)>> { pub fn all(&self) -> Result<Vec<(String, Registration)>> {
self.db.all() self.db.all()
} }
} }

View file

@ -2,6 +2,7 @@ use std::{collections::HashSet, sync::Arc};
use crate::Result; use crate::Result;
use ruma::{ use ruma::{
api::appservice::Registration,
events::{AnyStrippedStateEvent, AnySyncStateEvent}, events::{AnyStrippedStateEvent, AnySyncStateEvent},
serde::Raw, serde::Raw,
OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
@ -31,7 +32,7 @@ pub trait Data: Send + Sync {
fn appservice_in_room( fn appservice_in_room(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
appservice: &(String, serde_yaml::Value), appservice: &(String, Registration),
) -> Result<bool>; ) -> Result<bool>;
/// Makes a user forget a room. /// Makes a user forget a room.

View file

@ -4,6 +4,7 @@ use std::{collections::HashSet, sync::Arc};
pub use data::Data; pub use data::Data;
use ruma::{ use ruma::{
api::appservice::Registration,
events::{ events::{
direct::DirectEvent, direct::DirectEvent,
ignored_user_list::IgnoredUserListEvent, ignored_user_list::IgnoredUserListEvent,
@ -240,7 +241,7 @@ impl Service {
pub fn appservice_in_room( pub fn appservice_in_room(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
appservice: &(String, serde_yaml::Value), appservice: &(String, Registration),
) -> Result<bool> { ) -> Result<bool> {
self.db.appservice_in_room(room_id, appservice) self.db.appservice_in_room(room_id, appservice)
} }

View file

@ -621,73 +621,61 @@ impl Service {
.as_ref() .as_ref()
.and_then(|state_key| UserId::parse(state_key.as_str()).ok()) .and_then(|state_key| UserId::parse(state_key.as_str()).ok())
{ {
if let Some(appservice_uid) = appservice let appservice_uid = appservice.1.sender_localpart.as_str();
.1 if state_key_uid == appservice_uid {
.get("sender_localpart") services()
.and_then(|string| string.as_str()) .sending
.and_then(|string| { .send_pdu_appservice(appservice.0, pdu_id.clone())?;
UserId::parse_with_server_name(string, services().globals.server_name()) continue;
.ok()
})
{
if state_key_uid == &appservice_uid {
services()
.sending
.send_pdu_appservice(appservice.0, pdu_id.clone())?;
continue;
}
} }
} }
} }
if let Some(namespaces) = appservice.1.get("namespaces") { let namespaces = appservice.1.namespaces;
let users = namespaces
.get("users")
.and_then(|users| users.as_sequence())
.map_or_else(Vec::new, |users| {
users
.iter()
.filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let aliases = namespaces
.get("aliases")
.and_then(|aliases| aliases.as_sequence())
.map_or_else(Vec::new, |aliases| {
aliases
.iter()
.filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let rooms = namespaces
.get("rooms")
.and_then(|rooms| rooms.as_sequence());
let matching_users = |users: &Regex| { // TODO: create some helper function to change from Strings to Regexes
users.is_match(pdu.sender.as_str()) let users = namespaces
|| pdu.kind == TimelineEventType::RoomMember .users
&& pdu .iter()
.state_key .filter_map(|user| Regex::new(user.regex.as_str()).ok())
.as_ref() .collect::<Vec<_>>();
.map_or(false, |state_key| users.is_match(state_key)) let aliases = namespaces
}; .aliases
let matching_aliases = |aliases: &Regex| { .iter()
services() .filter_map(|alias| Regex::new(alias.regex.as_str()).ok())
.rooms .collect::<Vec<_>>();
.alias let rooms = namespaces
.local_aliases_for_room(&pdu.room_id) .rooms
.filter_map(|r| r.ok()) .iter()
.any(|room_alias| aliases.is_match(room_alias.as_str())) .filter_map(|room| Regex::new(room.regex.as_str()).ok())
}; .collect::<Vec<_>>();
if aliases.iter().any(matching_aliases) let matching_users = |users: &Regex| {
|| rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) users.is_match(pdu.sender.as_str())
|| users.iter().any(matching_users) || pdu.kind == TimelineEventType::RoomMember
{ && pdu
services() .state_key
.sending .as_ref()
.send_pdu_appservice(appservice.0, pdu_id.clone())?; .map_or(false, |state_key| users.is_match(state_key))
} };
let matching_aliases = |aliases: &Regex| {
services()
.rooms
.alias
.local_aliases_for_room(&pdu.room_id)
.filter_map(|r| r.ok())
.any(|room_alias| aliases.is_match(room_alias.as_str()))
};
if aliases.iter().any(matching_aliases)
|| rooms
.iter()
.any(|namespace| namespace.is_match(pdu.room_id.as_str()))
|| users.iter().any(matching_users)
{
services()
.sending
.send_pdu_appservice(appservice.0, pdu_id.clone())?;
} }
} }

View file

@ -23,7 +23,7 @@ use base64::{engine::general_purpose, Engine as _};
use ruma::{ use ruma::{
api::{ api::{
appservice, appservice::{self, Registration},
federation::{ federation::{
self, self,
transactions::edu::{ transactions::edu::{
@ -520,7 +520,7 @@ impl Service {
let permit = services().sending.maximum_requests.acquire().await; let permit = services().sending.maximum_requests.acquire().await;
let response = appservice_server::send_request( let response = match appservice_server::send_request(
services() services()
.appservice .appservice
.get_registration(id) .get_registration(id)
@ -547,8 +547,12 @@ impl Service {
}, },
) )
.await .await
.map(|_response| kind.clone()) {
.map_err(|e| (kind, e)); None => Ok(kind.clone()),
Some(op_resp) => op_resp
.map(|_response| kind.clone())
.map_err(|e| (kind.clone(), e)),
};
drop(permit); drop(permit);
@ -764,12 +768,14 @@ impl Service {
response response
} }
#[tracing::instrument(skip(self, registration, request))] /// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice registration file
pub async fn send_appservice_request<T: OutgoingRequest>( pub async fn send_appservice_request<T: OutgoingRequest>(
&self, &self,
registration: serde_yaml::Value, registration: Registration,
request: T, request: T,
) -> Result<T::IncomingResponse> ) -> Option<Result<T::IncomingResponse>>
where where
T: Debug, T: Debug,
{ {