de-global services for services

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-18 06:37:47 +00:00
parent 992c0a1e58
commit 010e4ee35a
85 changed files with 2480 additions and 1887 deletions

View file

@ -1,16 +1,17 @@
use std::{fmt::Debug, mem};
use bytes::BytesMut;
use conduit::{debug_error, trace, utils, warn, Error, Result};
use reqwest::Client;
use ruma::api::{appservice::Registration, IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken};
use tracing::{trace, warn};
use crate::{debug_error, services, utils, Error, Result};
/// Sends a request to an appservice
///
/// Only returns Ok(None) if there is no url specified in the appservice
/// registration file
pub(crate) async fn send_request<T>(registration: Registration, request: T) -> Result<Option<T::IncomingResponse>>
pub(crate) async fn send_request<T>(
client: &Client, registration: Registration, request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
{
@ -48,15 +49,10 @@ where
let reqwest_request = reqwest::Request::try_from(http_request)?;
let mut response = services()
.client
.appservice
.execute(reqwest_request)
.await
.map_err(|e| {
warn!("Could not send request to appservice \"{}\" at {dest}: {e}", registration.id);
e
})?;
let mut response = client.execute(reqwest_request).await.map_err(|e| {
warn!("Could not send request to appservice \"{}\" at {dest}: {e}", registration.id);
e
})?;
// reqwest::Response -> http::Response conversion
let status = response.status();

View file

@ -5,7 +5,7 @@ use database::{Database, Map};
use ruma::{ServerName, UserId};
use super::{Destination, SendingEvent};
use crate::services;
use crate::{globals, Dep};
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Destination, SendingEvent)>> + 'a>;
type SendingEventIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEvent)>> + 'a>;
@ -15,15 +15,24 @@ pub struct Data {
servernameevent_data: Arc<Map>,
servername_educount: Arc<Map>,
pub(super) db: Arc<Database>,
services: Services,
}
struct Services {
globals: Dep<globals::Service>,
}
impl Data {
pub(super) fn new(db: Arc<Database>) -> Self {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = &args.db;
Self {
servercurrentevent_data: db["servercurrentevent_data"].clone(),
servernameevent_data: db["servernameevent_data"].clone(),
servername_educount: db["servername_educount"].clone(),
db,
db: args.db.clone(),
services: Services {
globals: args.depend::<globals::Service>("globals"),
},
}
}
@ -78,7 +87,7 @@ impl Data {
if let SendingEvent::Pdu(value) = &event {
key.extend_from_slice(value);
} else {
key.extend_from_slice(&services().globals.next_count()?.to_be_bytes());
key.extend_from_slice(&self.services.globals.next_count()?.to_be_bytes());
}
let value = if let SendingEvent::Edu(value) = &event {
&**value

View file

@ -6,26 +6,39 @@ mod sender;
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use conduit::{err, Result, Server};
use conduit::{err, warn, Result, Server};
use ruma::{
api::{appservice::Registration, OutgoingRequest},
OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
};
pub use sender::convert_to_outgoing_federation_event;
use tokio::sync::Mutex;
use tracing::warn;
use crate::{server_is_ours, services};
use crate::{account_data, client, globals, presence, pusher, resolver, rooms, server_is_ours, users, Dep};
pub struct Service {
pub db: data::Data,
server: Arc<Server>,
/// The state for a given state hash.
services: Services,
pub db: data::Data,
sender: loole::Sender<Msg>,
receiver: Mutex<loole::Receiver<Msg>>,
}
struct Services {
client: Dep<client::Service>,
globals: Dep<globals::Service>,
resolver: Dep<resolver::Service>,
state: Dep<rooms::state::Service>,
state_cache: Dep<rooms::state_cache::Service>,
user: Dep<rooms::user::Service>,
users: Dep<users::Service>,
presence: Dep<presence::Service>,
read_receipt: Dep<rooms::read_receipt::Service>,
timeline: Dep<rooms::timeline::Service>,
account_data: Dep<account_data::Service>,
appservice: Dep<crate::appservice::Service>,
pusher: Dep<pusher::Service>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct Msg {
dest: Destination,
@ -53,8 +66,23 @@ impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let (sender, receiver) = loole::unbounded();
Ok(Arc::new(Self {
db: data::Data::new(args.db.clone()),
server: args.server.clone(),
services: Services {
client: args.depend::<client::Service>("client"),
globals: args.depend::<globals::Service>("globals"),
resolver: args.depend::<resolver::Service>("resolver"),
state: args.depend::<rooms::state::Service>("rooms::state"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
user: args.depend::<rooms::user::Service>("rooms::user"),
users: args.depend::<users::Service>("users"),
presence: args.depend::<presence::Service>("presence"),
read_receipt: args.depend::<rooms::read_receipt::Service>("rooms::read_receipt"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
account_data: args.depend::<account_data::Service>("account_data"),
appservice: args.depend::<crate::appservice::Service>("appservice"),
pusher: args.depend::<pusher::Service>("pusher"),
},
db: data::Data::new(&args),
sender,
receiver: Mutex::new(receiver),
}))
@ -103,8 +131,8 @@ impl Service {
#[tracing::instrument(skip(self, room_id, pdu_id), level = "debug")]
pub fn send_pdu_room(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result<()> {
let servers = services()
.rooms
let servers = self
.services
.state_cache
.room_servers(room_id)
.filter_map(Result::ok)
@ -152,8 +180,8 @@ impl Service {
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub fn send_edu_room(&self, room_id: &RoomId, serialized: Vec<u8>) -> Result<()> {
let servers = services()
.rooms
let servers = self
.services
.state_cache
.room_servers(room_id)
.filter_map(Result::ok)
@ -189,8 +217,8 @@ impl Service {
#[tracing::instrument(skip(self, room_id), level = "debug")]
pub fn flush_room(&self, room_id: &RoomId) -> Result<()> {
let servers = services()
.rooms
let servers = self
.services
.state_cache
.room_servers(room_id)
.filter_map(Result::ok)
@ -213,13 +241,13 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(self, request), name = "request")]
#[tracing::instrument(skip_all, name = "request")]
pub async fn send_federation_request<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &services().client.federation;
send::send(client, dest, request).await
let client = &self.services.client.federation;
self.send(client, dest, request).await
}
/// Sends a request to an appservice
@ -232,7 +260,8 @@ impl Service {
where
T: OutgoingRequest + Debug + Send,
{
appservice::send_request(registration, request).await
let client = &self.services.client.appservice;
appservice::send_request(client, registration, request).await
}
/// Cleanup event data

View file

@ -1,6 +1,8 @@
use std::{fmt::Debug, mem};
use conduit::Err;
use conduit::{
debug, debug_error, debug_warn, error::inspect_debug_log, trace, utils::string::EMPTY, Err, Error, Result,
};
use http::{header::AUTHORIZATION, HeaderValue};
use ipaddress::IPAddress;
use reqwest::{Client, Method, Request, Response, Url};
@ -13,75 +15,91 @@ use ruma::{
server_util::authorization::XMatrix,
ServerName,
};
use tracing::{debug, trace};
use crate::{
debug_error, debug_warn, resolver,
globals, resolver,
resolver::{actual::ActualDest, cache::CachedDest},
services, Error, Result,
};
#[tracing::instrument(skip_all, name = "send")]
pub async fn send<T>(client: &Client, dest: &ServerName, req: T) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
if !services().globals.allow_federation() {
return Err!(Config("allow_federation", "Federation is disabled."));
impl super::Service {
#[tracing::instrument(skip(self, client, req), name = "send")]
pub async fn send<T>(&self, client: &Client, dest: &ServerName, req: T) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
if !self.server.config.allow_federation {
return Err!(Config("allow_federation", "Federation is disabled."));
}
let actual = self.services.resolver.get_actual_dest(dest).await?;
let request = self.prepare::<T>(dest, &actual, req).await?;
self.execute::<T>(dest, &actual, request, client).await
}
let actual = services().resolver.get_actual_dest(dest).await?;
let request = prepare::<T>(dest, &actual, req).await?;
execute::<T>(client, dest, &actual, request).await
}
async fn execute<T>(
&self, dest: &ServerName, actual: &ActualDest, request: Request, client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let url = request.url().clone();
let method = request.method().clone();
async fn execute<T>(
client: &Client, dest: &ServerName, actual: &ActualDest, request: Request,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let method = request.method().clone();
let url = request.url().clone();
debug!(
method = ?method,
url = ?url,
"Sending request",
);
match client.execute(request).await {
Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
Err(e) => handle_error::<T>(dest, actual, &method, &url, e),
debug!(?method, ?url, "Sending request");
match client.execute(request).await {
Ok(response) => handle_response::<T>(&self.services.resolver, dest, actual, &method, &url, response).await,
Err(error) => handle_error::<T>(dest, actual, &method, &url, error),
}
}
}
async fn prepare<T>(dest: &ServerName, actual: &ActualDest, req: T) -> Result<Request>
where
T: OutgoingRequest + Debug + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_5];
async fn prepare<T>(&self, dest: &ServerName, actual: &ActualDest, req: T) -> Result<Request>
where
T: OutgoingRequest + Debug + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_5];
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
trace!("Preparing request");
trace!("Preparing request");
let mut http_request = req
.try_into_http_request::<Vec<u8>>(&actual.string, SATIR, &VERSIONS)
.map_err(|_| Error::BadServerResponse("Invalid destination"))?;
let mut http_request = req
.try_into_http_request::<Vec<u8>>(&actual.string, SendAccessToken::IfRequired(""), &VERSIONS)
.map_err(|_e| Error::BadServerResponse("Invalid destination"))?;
sign_request::<T>(&self.services.globals, dest, &mut http_request);
sign_request::<T>(dest, &mut http_request);
let request = Request::try_from(http_request)?;
self.validate_url(request.url())?;
let request = Request::try_from(http_request)?;
validate_url(request.url())?;
Ok(request)
}
Ok(request)
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
self.services.resolver.validate_ip(&ip)?;
}
}
Ok(())
}
}
async fn handle_response<T>(
dest: &ServerName, actual: &ActualDest, method: &Method, url: &Url, mut response: Response,
resolver: &resolver::Service, dest: &ServerName, actual: &ActualDest, method: &Method, url: &Url,
mut response: Response,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
trace!("Received response from {} for {} with {}", actual.string, url, response.url());
let status = response.status();
trace!(
?status, ?method,
request_url = ?url,
response_url = ?response.url(),
"Received response from {}",
actual.string,
);
let mut http_response_builder = http::Response::builder()
.status(status)
.version(response.version());
@ -92,11 +110,13 @@ where
.expect("http::response::Builder is usable"),
);
trace!("Waiting for response body");
let body = response.bytes().await.unwrap_or_else(|e| {
debug_error!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
// TODO: handle timeout
trace!("Waiting for response body...");
let body = response
.bytes()
.await
.inspect_err(inspect_debug_log)
.unwrap_or_else(|_| Vec::new().into());
let http_response = http_response_builder
.body(body)
@ -109,7 +129,7 @@ where
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && !actual.cached {
services().resolver.set_cached_destination(
resolver.set_cached_destination(
dest.to_owned(),
CachedDest {
dest: actual.dest.clone(),
@ -120,7 +140,7 @@ where
}
match response {
Err(_e) => Err(Error::BadServerResponse("Server returned bad 200 response.")),
Err(_) => Err(Error::BadServerResponse("Server returned bad 200 response.")),
Ok(response) => Ok(response),
}
}
@ -150,7 +170,7 @@ where
Err(e.into())
}
fn sign_request<T>(dest: &ServerName, http_request: &mut http::Request<Vec<u8>>)
fn sign_request<T>(globals: &globals::Service, dest: &ServerName, http_request: &mut http::Request<Vec<u8>>)
where
T: OutgoingRequest + Debug + Send,
{
@ -172,16 +192,12 @@ where
.to_string()
.into(),
);
req_map.insert("origin".to_owned(), services().globals.server_name().as_str().into());
req_map.insert("origin".to_owned(), globals.server_name().as_str().into());
req_map.insert("destination".to_owned(), dest.as_str().into());
let mut req_json = serde_json::from_value(req_map.into()).expect("valid JSON is valid BTreeMap");
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut req_json,
)
.expect("our request json is what ruma expects");
ruma::signatures::sign_json(globals.server_name().as_str(), globals.keypair(), &mut req_json)
.expect("our request json is what ruma expects");
let req_json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&serde_json::to_vec(&req_json).unwrap()).unwrap();
@ -207,24 +223,8 @@ where
http_request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from(&XMatrix::new(
services().globals.config.server_name.clone(),
dest.to_owned(),
key,
sig,
)),
HeaderValue::from(&XMatrix::new(globals.config.server_name.clone(), dest.to_owned(), key, sig)),
);
}
}
}
fn validate_url(url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
resolver::actual::validate_ip(&ip)?;
}
}
Ok(())
}

View file

@ -6,7 +6,11 @@ use std::{
};
use base64::{engine::general_purpose, Engine as _};
use conduit::{debug, debug_warn, error, trace, utils::math::continue_exponential_backoff_secs, warn};
use conduit::{
debug, debug_warn, error, trace,
utils::{calculate_hash, math::continue_exponential_backoff_secs},
warn, Error, Result,
};
use federation::transactions::send_transaction_message;
use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use ruma::{
@ -24,8 +28,8 @@ use ruma::{
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::time::sleep_until;
use super::{appservice, send, Destination, Msg, SendingEvent, Service};
use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result};
use super::{appservice, Destination, Msg, SendingEvent, Service};
use crate::user_is_local;
#[derive(Debug)]
enum TransactionStatus {
@ -69,8 +73,8 @@ impl Service {
Ok(())
}
fn handle_response(
&self, response: SendingResult, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
fn handle_response<'a>(
&'a self, response: SendingResult, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus,
) {
match response {
Ok(dest) => self.handle_response_ok(&dest, futures, statuses),
@ -91,8 +95,8 @@ impl Service {
});
}
fn handle_response_ok(
&self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
fn handle_response_ok<'a>(
&'a self, dest: &Destination, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus,
) {
let _cork = self.db.db.cork();
self.db
@ -113,24 +117,24 @@ impl Service {
.mark_as_active(&new_events)
.expect("marked as active");
let new_events_vec = new_events.into_iter().map(|(event, _)| event).collect();
futures.push(Box::pin(send_events(dest.clone(), new_events_vec)));
futures.push(Box::pin(self.send_events(dest.clone(), new_events_vec)));
} else {
statuses.remove(dest);
}
}
fn handle_request(&self, msg: Msg, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
fn handle_request<'a>(&'a self, msg: Msg, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
let iv = vec![(msg.event, msg.queue_id)];
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) {
if !events.is_empty() {
futures.push(Box::pin(send_events(msg.dest, events)));
futures.push(Box::pin(self.send_events(msg.dest, events)));
} else {
statuses.remove(&msg.dest);
}
}
}
async fn finish_responses(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
let now = Instant::now();
let timeout = Duration::from_millis(CLEANUP_TIMEOUT_MS);
let deadline = now.checked_add(timeout).unwrap_or(now);
@ -148,7 +152,7 @@ impl Service {
debug_warn!("Leaving with {} unfinished requests...", futures.len());
}
fn initial_requests(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
fn initial_requests<'a>(&'a self, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
let keep = usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) {
@ -166,12 +170,12 @@ impl Service {
for (dest, events) in txns {
if self.server.config.startup_netburst && !events.is_empty() {
statuses.insert(dest.clone(), TransactionStatus::Running);
futures.push(Box::pin(send_events(dest.clone(), events)));
futures.push(Box::pin(self.send_events(dest.clone(), events)));
}
}
}
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, level = "debug")]
fn select_events(
&self,
dest: &Destination,
@ -218,7 +222,7 @@ impl Service {
Ok(Some(events))
}
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, level = "debug")]
fn select_events_current(&self, dest: Destination, statuses: &mut CurTransactionStatus) -> Result<(bool, bool)> {
let (mut allow, mut retry) = (true, false);
statuses
@ -244,7 +248,7 @@ impl Service {
Ok((allow, retry))
}
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, level = "debug")]
fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> {
// u64: count of last edu
let since = self.db.get_latest_educount(server_name)?;
@ -252,11 +256,11 @@ impl Service {
let mut max_edu_count = since;
let mut device_list_changes = HashSet::new();
for room_id in services().rooms.state_cache.server_rooms(server_name) {
for room_id in self.services.state_cache.server_rooms(server_name) {
let room_id = room_id?;
// Look for device list updates in this room
device_list_changes.extend(
services()
self.services
.users
.keys_changed(room_id.as_ref(), since, None)
.filter_map(Result::ok)
@ -264,7 +268,7 @@ impl Service {
);
if self.server.config.allow_outgoing_read_receipts
&& !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)?
&& !self.select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)?
{
break;
}
@ -287,381 +291,390 @@ impl Service {
}
if self.server.config.allow_outgoing_presence {
select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?;
self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?;
}
Ok((events, max_edu_count))
}
}
/// Look for presence
fn select_edus_presence(
server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
// Look for presence updates for this server
let mut presence_updates = Vec::new();
for (user_id, count, presence_bytes) in services().presence.presence_since(since) {
*max_edu_count = cmp::max(count, *max_edu_count);
/// Look for presence
fn select_edus_presence(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
// Look for presence updates for this server
let mut presence_updates = Vec::new();
for (user_id, count, presence_bytes) in self.services.presence.presence_since(since) {
*max_edu_count = cmp::max(count, *max_edu_count);
if !user_is_local(&user_id) {
continue;
}
if !user_is_local(&user_id) {
continue;
}
if !services()
.rooms
.state_cache
.server_sees_user(server_name, &user_id)?
{
continue;
}
if !self
.services
.state_cache
.server_sees_user(server_name, &user_id)?
{
continue;
}
let presence_event = Presence::from_json_bytes_to_event(&presence_bytes, &user_id)?;
presence_updates.push(PresenceUpdate {
user_id,
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
last_active_ago: presence_event
.content
.last_active_ago
.unwrap_or_else(|| uint!(0)),
status_msg: presence_event.content.status_msg,
});
if presence_updates.len() >= SELECT_EDU_LIMIT {
break;
}
}
if !presence_updates.is_empty() {
let presence_content = Edu::Presence(PresenceContent::new(presence_updates));
events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized"));
}
Ok(true)
}
/// Look for read receipts in this room
fn select_edus_receipts(
room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
for r in services()
.rooms
.read_receipt
.readreceipts_since(room_id, since)
{
let (user_id, count, read_receipt) = r?;
*max_edu_count = cmp::max(count, *max_edu_count);
if !user_is_local(&user_id) {
continue;
}
let event = serde_json::from_str(read_receipt.json().get())
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
let mut read = BTreeMap::new();
let (event_id, mut receipt) = r
.content
.0
.into_iter()
.next()
.expect("we only use one event per read receipt");
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(&user_id)
.expect("our read receipts always have the user here");
read.insert(
let presence_event = self
.services
.presence
.from_json_bytes_to_event(&presence_bytes, &user_id)?;
presence_updates.push(PresenceUpdate {
user_id,
ReceiptData {
data: receipt.clone(),
event_ids: vec![event_id.clone()],
},
);
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
last_active_ago: presence_event
.content
.last_active_ago
.unwrap_or_else(|| uint!(0)),
status_msg: presence_event.content.status_msg,
});
let receipt_map = ReceiptMap {
read,
};
let mut receipts = BTreeMap::new();
receipts.insert(room_id.to_owned(), receipt_map);
Edu::Receipt(ReceiptContent {
receipts,
})
} else {
Error::bad_database("Invalid event type in read_receipts");
continue;
};
events.push(serde_json::to_vec(&federation_event).expect("json can be serialized"));
if events.len() >= SELECT_EDU_LIMIT {
return Ok(false);
}
}
Ok(true)
}
async fn send_events(dest: Destination, events: Vec<SendingEvent>) -> SendingResult {
//debug_assert!(!events.is_empty(), "sending empty transaction");
match dest {
Destination::Normal(ref server) => send_events_dest_normal(&dest, server, events).await,
Destination::Appservice(ref id) => send_events_dest_appservice(&dest, id, events).await,
Destination::Push(ref userid, ref pushkey) => send_events_dest_push(&dest, userid, pushkey, events).await,
}
}
#[tracing::instrument(skip(dest, events))]
async fn send_events_dest_appservice(dest: &Destination, id: &str, events: Vec<SendingEvent>) -> SendingResult {
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
pdu_jsons.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Appservice] Event in servernameevent_data not found in db."),
)
})?
.to_room_event(),
);
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
// Appservices don't need EDUs (?) and flush only;
// no new content
},
}
}
//debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction");
match appservice::send_request(
services()
.appservice
.get_registration(id)
.await
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Appservice] Could not load registration from db."),
)
})?,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b,
SendingEvent::Flush => &[],
})
.collect::<Vec<_>>(),
)))
.into(),
},
)
.await
{
Ok(_) => Ok(dest.clone()),
Err(e) => Err((dest.clone(), e)),
}
}
#[tracing::instrument(skip(dest, events))]
async fn send_events_dest_push(
dest: &Destination, userid: &OwnedUserId, pushkey: &str, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
pdus.push(
services()
.rooms
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Push] Event in servernameevent_data not found in db."),
)
})?,
);
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
// Push gateways don't need EDUs (?) and flush only;
// no new content
},
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) = serde_json::from_str::<serde_json::Value>(unsigned.get()) {
if unsigned.get("redacted_because").is_some() {
continue;
}
if presence_updates.len() >= SELECT_EDU_LIMIT {
break;
}
}
let Some(pusher) = services()
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| (dest.clone(), e))?
else {
continue;
};
if !presence_updates.is_empty() {
let presence_content = Edu::Presence(PresenceContent::new(presence_updates));
events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized"));
}
let rules_for_user = services()
.account_data
.get(None, userid, GlobalAccountDataEventType::PushRules.to_string().into())
.unwrap_or_default()
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok())
.map_or_else(|| push::Ruleset::server_default(userid), |ev: PushRulesEvent| ev.content.global);
let unread: UInt = services()
.rooms
.user
.notification_count(userid, &pdu.room_id)
.map_err(|e| (dest.clone(), e))?
.try_into()
.expect("notification count can't go that high");
let _response = services()
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.await
.map(|_response| dest.clone())
.map_err(|e| (dest.clone(), e));
Ok(true)
}
Ok(dest.clone())
}
/// Look for read receipts in this room
fn select_edus_receipts(
&self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
for r in self
.services
.read_receipt
.readreceipts_since(room_id, since)
{
let (user_id, count, read_receipt) = r?;
*max_edu_count = cmp::max(count, *max_edu_count);
#[tracing::instrument(skip(dest, events), name = "")]
async fn send_events_dest_normal(
dest: &Destination, server: &OwnedServerName, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
let mut edu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Edu(_)))
.count(),
);
if !user_is_local(&user_id) {
continue;
}
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => pdu_jsons.push(convert_to_outgoing_federation_event(
// TODO: check room version and remove event_id if needed
services()
.rooms
.timeline
.get_pdu_json_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
error!(?dest, ?server, ?pdu_id, "event not found");
(
dest.clone(),
Error::bad_database("[Normal] Event in servernameevent_data not found in db."),
)
})?,
)),
SendingEvent::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
},
SendingEvent::Flush => {
// flush only; no new content
let event = serde_json::from_str(read_receipt.json().get())
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
let mut read = BTreeMap::new();
let (event_id, mut receipt) = r
.content
.0
.into_iter()
.next()
.expect("we only use one event per read receipt");
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(&user_id)
.expect("our read receipts always have the user here");
read.insert(
user_id,
ReceiptData {
data: receipt.clone(),
event_ids: vec![event_id.clone()],
},
);
let receipt_map = ReceiptMap {
read,
};
let mut receipts = BTreeMap::new();
receipts.insert(room_id.to_owned(), receipt_map);
Edu::Receipt(ReceiptContent {
receipts,
})
} else {
Error::bad_database("Invalid event type in read_receipts");
continue;
};
events.push(serde_json::to_vec(&federation_event).expect("json can be serialized"));
if events.len() >= SELECT_EDU_LIMIT {
return Ok(false);
}
}
Ok(true)
}
async fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingResult {
//debug_assert!(!events.is_empty(), "sending empty transaction");
match dest {
Destination::Normal(ref server) => self.send_events_dest_normal(&dest, server, events).await,
Destination::Appservice(ref id) => self.send_events_dest_appservice(&dest, id, events).await,
Destination::Push(ref userid, ref pushkey) => {
self.send_events_dest_push(&dest, userid, pushkey, events)
.await
},
}
}
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
send::send(
&services().client.sender,
server,
send_transaction_message::v1::Request {
origin: services().server.config.server_name.clone(),
#[tracing::instrument(skip(self, dest, events), name = "appservice")]
async fn send_events_dest_appservice(
&self, dest: &Destination, id: &str, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
pdu_jsons.push(
self.services
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Appservice] Event in servernameevent_data not found in db."),
)
})?
.to_room_event(),
);
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
// Appservices don't need EDUs (?) and flush only;
// no new content
},
}
}
//debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction");
let client = &self.services.client.appservice;
match appservice::send_request(
client,
self.services
.appservice
.get_registration(id)
.await
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Appservice] Could not load registration from db."),
)
})?,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b,
SendingEvent::Flush => &[],
})
.collect::<Vec<_>>(),
)))
.into(),
},
)
.await
{
Ok(_) => Ok(dest.clone()),
Err(e) => Err((dest.clone(), e)),
}
}
#[tracing::instrument(skip(self, dest, events), name = "push")]
async fn send_events_dest_push(
&self, dest: &Destination, userid: &OwnedUserId, pushkey: &str, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
pdus.push(
self.services
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Push] Event in servernameevent_data not found in db."),
)
})?,
);
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
// Push gateways don't need EDUs (?) and flush only;
// no new content
},
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) = serde_json::from_str::<serde_json::Value>(unsigned.get()) {
if unsigned.get("redacted_because").is_some() {
continue;
}
}
}
let Some(pusher) = self
.services
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| (dest.clone(), e))?
else {
continue;
};
let rules_for_user = self
.services
.account_data
.get(None, userid, GlobalAccountDataEventType::PushRules.to_string().into())
.unwrap_or_default()
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok())
.map_or_else(|| push::Ruleset::server_default(userid), |ev: PushRulesEvent| ev.content.global);
let unread: UInt = self
.services
.user
.notification_count(userid, &pdu.room_id)
.map_err(|e| (dest.clone(), e))?
.try_into()
.expect("notification count can't go that high");
let _response = self
.services
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.await
.map(|_response| dest.clone())
.map_err(|e| (dest.clone(), e));
}
Ok(dest.clone())
}
#[tracing::instrument(skip(self, dest, events), name = "", level = "debug")]
async fn send_events_dest_normal(
&self, dest: &Destination, server: &OwnedServerName, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
let mut edu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Edu(_)))
.count(),
);
for event in &events {
match event {
// TODO: check room version and remove event_id if needed
SendingEvent::Pdu(pdu_id) => pdu_jsons.push(
self.convert_to_outgoing_federation_event(
self.services
.timeline
.get_pdu_json_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
error!(?dest, ?server, ?pdu_id, "event not found");
(
dest.clone(),
Error::bad_database("[Normal] Event in servernameevent_data not found in db."),
)
})?,
),
),
SendingEvent::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
},
SendingEvent::Flush => {}, // flush only; no new content
}
}
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let transaction_id = &*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b,
SendingEvent::Flush => &[],
})
.collect::<Vec<_>>(),
));
let request = send_transaction_message::v1::Request {
origin: self.server.config.server_name.clone(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
transaction_id: transaction_id.into(),
};
let client = &self.services.client.sender;
self.send(client, server, request)
.await
.inspect(|response| {
response
.pdus
.iter()
.map(|e| match e {
SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b,
SendingEvent::Flush => &[],
})
.collect::<Vec<_>>(),
)))
.into(),
},
)
.await
.map(|response| {
for pdu in response.pdus {
if pdu.1.is_err() {
warn!("error for {} from remote: {:?}", pdu.0, pdu.1);
.filter(|(_, res)| res.is_err())
.for_each(|(pdu_id, res)| warn!("error for {pdu_id} from remote: {res:?}"));
})
.map(|_| dest.clone())
.map_err(|e| (dest.clone(), e))
}
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
pub fn convert_to_outgoing_federation_event(&self, mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
{
unsigned.remove("transaction_id");
}
// room v3 and above removed the "event_id" field from remote PDU format
if let Some(room_id) = pdu_json
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match self.services.state.get_room_version(&room_id) {
Ok(room_version_id) => match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => _ = pdu_json.remove("event_id"),
},
Err(_) => _ = pdu_json.remove("event_id"),
}
} else {
pdu_json.remove("event_id");
}
dest.clone()
})
.map_err(|e| (dest.clone(), e))
}
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
#[tracing::instrument]
pub fn convert_to_outgoing_federation_event(mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
{
unsigned.remove("transaction_id");
// TODO: another option would be to convert it to a canonical string to validate
// size and return a Result<Raw<...>>
// serde_json::from_str::<Raw<_>>(
// ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is
// valid serde_json::Value"), )
// .expect("Raw::from_value always works")
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}
// room v3 and above removed the "event_id" field from remote PDU format
if let Some(room_id) = pdu_json
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match services().rooms.state.get_room_version(&room_id) {
Ok(room_version_id) => match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => _ = pdu_json.remove("event_id"),
},
Err(_) => _ = pdu_json.remove("event_id"),
}
} else {
pdu_json.remove("event_id");
}
// TODO: another option would be to convert it to a canonical string to validate
// size and return a Result<Raw<...>>
// serde_json::from_str::<Raw<_>>(
// ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is
// valid serde_json::Value"), )
// .expect("Raw::from_value always works")
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}