process rooms and edus concurrently
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
677316631a
commit
94d786ac12
5 changed files with 142 additions and 112 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -685,6 +685,7 @@ dependencies = [
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"ipaddress",
|
"ipaddress",
|
||||||
|
"itertools 0.13.0",
|
||||||
"log",
|
"log",
|
||||||
"rand",
|
"rand",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
|
|
@ -50,6 +50,7 @@ http.workspace = true
|
||||||
http-body-util.workspace = true
|
http-body-util.workspace = true
|
||||||
hyper.workspace = true
|
hyper.workspace = true
|
||||||
ipaddress.workspace = true
|
ipaddress.workspace = true
|
||||||
|
itertools.workspace = true
|
||||||
log.workspace = true
|
log.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
reqwest.workspace = true
|
reqwest.workspace = true
|
||||||
|
|
|
@ -3,10 +3,17 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant};
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
debug, debug_warn, err, error, result::LogErr, trace, utils::ReadyExt, warn, Err, Error,
|
debug, debug_warn, err, error,
|
||||||
Result,
|
result::LogErr,
|
||||||
|
trace,
|
||||||
|
utils::{
|
||||||
|
stream::{automatic_width, BroadbandExt, TryBroadbandExt},
|
||||||
|
IterStream, ReadyExt,
|
||||||
|
},
|
||||||
|
warn, Err, Error, Result,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
|
use itertools::Itertools;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{
|
api::{
|
||||||
client::error::ErrorKind,
|
client::error::ErrorKind,
|
||||||
|
@ -19,11 +26,9 @@ use ruma::{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
||||||
serde::Raw,
|
|
||||||
to_device::DeviceIdOrAllDevices,
|
to_device::DeviceIdOrAllDevices,
|
||||||
OwnedEventId, ServerName,
|
CanonicalJsonObject, OwnedEventId, OwnedRoomId, ServerName,
|
||||||
};
|
};
|
||||||
use serde_json::value::RawValue as RawJsonValue;
|
|
||||||
use service::{
|
use service::{
|
||||||
sending::{EDU_LIMIT, PDU_LIMIT},
|
sending::{EDU_LIMIT, PDU_LIMIT},
|
||||||
Services,
|
Services,
|
||||||
|
@ -34,7 +39,8 @@ use crate::{
|
||||||
Ruma,
|
Ruma,
|
||||||
};
|
};
|
||||||
|
|
||||||
type ResolvedMap = BTreeMap<OwnedEventId, Result<()>>;
|
type ResolvedMap = BTreeMap<OwnedEventId, Result>;
|
||||||
|
type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
|
||||||
|
|
||||||
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
||||||
///
|
///
|
||||||
|
@ -73,91 +79,41 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
|
|
||||||
let txn_start_time = Instant::now();
|
let txn_start_time = Instant::now();
|
||||||
trace!(
|
trace!(
|
||||||
pdus = ?body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = ?body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
id = ?body.transaction_id,
|
id = ?body.transaction_id,
|
||||||
origin =?body.origin(),
|
origin =?body.origin(),
|
||||||
"Starting txn",
|
"Starting txn",
|
||||||
);
|
);
|
||||||
|
|
||||||
let resolved_map =
|
let pdus = body
|
||||||
handle_pdus(&services, &client, &body.pdus, body.origin(), &txn_start_time)
|
.pdus
|
||||||
.boxed()
|
.iter()
|
||||||
.await?;
|
.stream()
|
||||||
|
.broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu))
|
||||||
|
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
|
||||||
|
.ready_filter_map(Result::ok);
|
||||||
|
|
||||||
handle_edus(&services, &client, &body.edus, body.origin())
|
let edus = body
|
||||||
.boxed()
|
.edus
|
||||||
.await;
|
.iter()
|
||||||
|
.map(|edu| edu.json().get())
|
||||||
|
.map(serde_json::from_str)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.stream();
|
||||||
|
|
||||||
|
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
pdus = ?body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = ?body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
id = ?body.transaction_id,
|
id = ?body.transaction_id,
|
||||||
origin =?body.origin(),
|
origin =?body.origin(),
|
||||||
"Finished txn",
|
"Finished txn",
|
||||||
);
|
);
|
||||||
|
for (id, result) in &results {
|
||||||
Ok(send_transaction_message::v1::Response {
|
|
||||||
pdus: resolved_map
|
|
||||||
.into_iter()
|
|
||||||
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
|
|
||||||
.collect(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_pdus(
|
|
||||||
services: &Services,
|
|
||||||
_client: &IpAddr,
|
|
||||||
pdus: &[Box<RawJsonValue>],
|
|
||||||
origin: &ServerName,
|
|
||||||
txn_start_time: &Instant,
|
|
||||||
) -> Result<ResolvedMap> {
|
|
||||||
let mut parsed_pdus = Vec::with_capacity(pdus.len());
|
|
||||||
for pdu in pdus {
|
|
||||||
parsed_pdus.push(match services.rooms.event_handler.parse_incoming_pdu(pdu).await {
|
|
||||||
| Ok(t) => t,
|
|
||||||
| Err(e) => {
|
|
||||||
debug_warn!("Could not parse PDU: {e}");
|
|
||||||
continue;
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
// We do not add the event_id field to the pdu here because of signature
|
|
||||||
// and hashes checks
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut resolved_map = BTreeMap::new();
|
|
||||||
for (event_id, value, room_id) in parsed_pdus {
|
|
||||||
services.server.check_running()?;
|
|
||||||
let pdu_start_time = Instant::now();
|
|
||||||
let mutex_lock = services
|
|
||||||
.rooms
|
|
||||||
.event_handler
|
|
||||||
.mutex_federation
|
|
||||||
.lock(&room_id)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let result = services
|
|
||||||
.rooms
|
|
||||||
.event_handler
|
|
||||||
.handle_incoming_pdu(origin, &room_id, &event_id, value, true)
|
|
||||||
.boxed()
|
|
||||||
.await
|
|
||||||
.map(|_| ());
|
|
||||||
|
|
||||||
drop(mutex_lock);
|
|
||||||
debug!(
|
|
||||||
pdu_elapsed = ?pdu_start_time.elapsed(),
|
|
||||||
txn_elapsed = ?txn_start_time.elapsed(),
|
|
||||||
"Finished PDU {event_id}",
|
|
||||||
);
|
|
||||||
|
|
||||||
resolved_map.insert(event_id, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (id, result) in &resolved_map {
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
||||||
warn!("Incoming PDU failed {id}: {e:?}");
|
warn!("Incoming PDU failed {id}: {e:?}");
|
||||||
|
@ -165,39 +121,112 @@ async fn handle_pdus(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(resolved_map)
|
Ok(send_transaction_message::v1::Response {
|
||||||
|
pdus: results
|
||||||
|
.into_iter()
|
||||||
|
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
|
||||||
|
.collect(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_edus(
|
async fn handle(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
client: &IpAddr,
|
client: &IpAddr,
|
||||||
edus: &[Raw<Edu>],
|
|
||||||
origin: &ServerName,
|
origin: &ServerName,
|
||||||
) {
|
started: Instant,
|
||||||
for edu in edus
|
pdus: impl Stream<Item = Pdu> + Send,
|
||||||
.iter()
|
edus: impl Stream<Item = Edu> + Send,
|
||||||
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
|
) -> Result<ResolvedMap> {
|
||||||
{
|
// group pdus by room
|
||||||
match edu {
|
let pdus = pdus
|
||||||
| Edu::Presence(presence) => {
|
.collect()
|
||||||
handle_edu_presence(services, client, origin, presence).await;
|
.map(|mut pdus: Vec<_>| {
|
||||||
},
|
pdus.sort_by(|(room_a, ..), (room_b, ..)| room_a.cmp(room_b));
|
||||||
| Edu::Receipt(receipt) =>
|
pdus.into_iter()
|
||||||
handle_edu_receipt(services, client, origin, receipt).await,
|
.into_grouping_map_by(|(room_id, ..)| room_id.clone())
|
||||||
| Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await,
|
.collect()
|
||||||
| Edu::DeviceListUpdate(content) => {
|
})
|
||||||
handle_edu_device_list_update(services, client, origin, content).await;
|
.await;
|
||||||
},
|
|
||||||
| Edu::DirectToDevice(content) => {
|
// we can evaluate rooms concurrently
|
||||||
handle_edu_direct_to_device(services, client, origin, content).await;
|
let results: ResolvedMap = pdus
|
||||||
},
|
.into_iter()
|
||||||
| Edu::SigningKeyUpdate(content) => {
|
.try_stream()
|
||||||
handle_edu_signing_key_update(services, client, origin, content).await;
|
.broad_and_then(|(room_id, pdus)| {
|
||||||
},
|
handle_room(services, client, origin, started, room_id, pdus)
|
||||||
| Edu::_Custom(ref _custom) => {
|
.map_ok(Vec::into_iter)
|
||||||
debug_warn!(?edus, "received custom/unknown EDU");
|
.map_ok(IterStream::try_stream)
|
||||||
},
|
})
|
||||||
}
|
.try_flatten()
|
||||||
|
.try_collect()
|
||||||
|
.boxed()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// evaluate edus after pdus, at least for now.
|
||||||
|
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
|
||||||
|
.boxed()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_room(
|
||||||
|
services: &Services,
|
||||||
|
_client: &IpAddr,
|
||||||
|
origin: &ServerName,
|
||||||
|
txn_start_time: Instant,
|
||||||
|
room_id: OwnedRoomId,
|
||||||
|
pdus: Vec<Pdu>,
|
||||||
|
) -> Result<Vec<(OwnedEventId, Result)>> {
|
||||||
|
let _room_lock = services
|
||||||
|
.rooms
|
||||||
|
.event_handler
|
||||||
|
.mutex_federation
|
||||||
|
.lock(&room_id)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let mut results = Vec::with_capacity(pdus.len());
|
||||||
|
for (_, event_id, value) in pdus {
|
||||||
|
services.server.check_running()?;
|
||||||
|
let pdu_start_time = Instant::now();
|
||||||
|
let result = services
|
||||||
|
.rooms
|
||||||
|
.event_handler
|
||||||
|
.handle_incoming_pdu(origin, &room_id, &event_id, value, true)
|
||||||
|
.await
|
||||||
|
.map(|_| ());
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||||
|
txn_elapsed = ?txn_start_time.elapsed(),
|
||||||
|
"Finished PDU {event_id}",
|
||||||
|
);
|
||||||
|
|
||||||
|
results.push((event_id, result));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
|
||||||
|
match edu {
|
||||||
|
| Edu::Presence(presence) => {
|
||||||
|
handle_edu_presence(services, client, origin, presence).await;
|
||||||
|
},
|
||||||
|
| Edu::Receipt(receipt) => handle_edu_receipt(services, client, origin, receipt).await,
|
||||||
|
| Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await,
|
||||||
|
| Edu::DeviceListUpdate(content) => {
|
||||||
|
handle_edu_device_list_update(services, client, origin, content).await;
|
||||||
|
},
|
||||||
|
| Edu::DirectToDevice(content) => {
|
||||||
|
handle_edu_direct_to_device(services, client, origin, content).await;
|
||||||
|
},
|
||||||
|
| Edu::SigningKeyUpdate(content) => {
|
||||||
|
handle_edu_signing_key_update(services, client, origin, content).await;
|
||||||
|
},
|
||||||
|
| Edu::_Custom(ref _custom) => {
|
||||||
|
debug_warn!(?edu, "received custom/unknown EDU");
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,10 @@ use conduwuit::{err, implement, pdu::gen_event_id_canonical_json, result::FlatOk
|
||||||
use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId};
|
use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId};
|
||||||
use serde_json::value::RawValue as RawJsonValue;
|
use serde_json::value::RawValue as RawJsonValue;
|
||||||
|
|
||||||
|
type Parsed = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
pub async fn parse_incoming_pdu(
|
pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
|
||||||
&self,
|
|
||||||
pdu: &RawJsonValue,
|
|
||||||
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
|
|
||||||
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
|
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
|
||||||
err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}")))
|
err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}")))
|
||||||
})?;
|
})?;
|
||||||
|
@ -28,5 +27,5 @@ pub async fn parse_incoming_pdu(
|
||||||
err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
|
err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok((event_id, value, room_id))
|
Ok((room_id, event_id, value))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1166,7 +1166,7 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, pdu), level = "debug")]
|
#[tracing::instrument(skip(self, pdu), level = "debug")]
|
||||||
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> {
|
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> {
|
||||||
let (event_id, value, room_id) =
|
let (room_id, event_id, value) =
|
||||||
self.services.event_handler.parse_incoming_pdu(&pdu).await?;
|
self.services.event_handler.parse_incoming_pdu(&pdu).await?;
|
||||||
|
|
||||||
// Lock so we cannot backfill the same pdu twice at the same time
|
// Lock so we cannot backfill the same pdu twice at the same time
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue