fanout edu processing

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-26 15:12:08 +00:00
parent 94d786ac12
commit c516a8df3e

View file

@ -20,19 +20,22 @@ use ruma::{
federation::transactions::{ federation::transactions::{
edu::{ edu::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
ReceiptContent, SigningKeyUpdateContent, TypingContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent,
TypingContent,
}, },
send_transaction_message, send_transaction_message,
}, },
}, },
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
serde::Raw,
to_device::DeviceIdOrAllDevices, to_device::DeviceIdOrAllDevices,
CanonicalJsonObject, OwnedEventId, OwnedRoomId, ServerName, CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
}; };
use service::{ use service::{
sending::{EDU_LIMIT, PDU_LIMIT}, sending::{EDU_LIMIT, PDU_LIMIT},
Services, Services,
}; };
use utils::millis_since_unix_epoch;
use crate::{ use crate::{
utils::{self}, utils::{self},
@ -152,8 +155,8 @@ async fn handle(
let results: ResolvedMap = pdus let results: ResolvedMap = pdus
.into_iter() .into_iter()
.try_stream() .try_stream()
.broad_and_then(|(room_id, pdus)| { .broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, started, room_id, pdus) handle_room(services, client, origin, started, room_id, pdus.into_iter())
.map_ok(Vec::into_iter) .map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream) .map_ok(IterStream::try_stream)
}) })
@ -176,7 +179,7 @@ async fn handle_room(
origin: &ServerName, origin: &ServerName,
txn_start_time: Instant, txn_start_time: Instant,
room_id: OwnedRoomId, room_id: OwnedRoomId,
pdus: Vec<Pdu>, pdus: impl Iterator<Item = Pdu> + Send,
) -> Result<Vec<(OwnedEventId, Result)>> { ) -> Result<Vec<(OwnedEventId, Result)>> {
let _room_lock = services let _room_lock = services
.rooms .rooms
@ -185,48 +188,53 @@ async fn handle_room(
.lock(&room_id) .lock(&room_id)
.await; .await;
let mut results = Vec::with_capacity(pdus.len()); let room_id = &room_id;
for (_, event_id, value) in pdus { pdus.try_stream()
services.server.check_running()?; .and_then(|(_, event_id, value)| async move {
let pdu_start_time = Instant::now(); services.server.check_running()?;
let result = services let pdu_start_time = Instant::now();
.rooms let result = services
.event_handler .rooms
.handle_incoming_pdu(origin, &room_id, &event_id, value, true) .event_handler
.await .handle_incoming_pdu(origin, room_id, &event_id, value, true)
.map(|_| ()); .await
.map(|_| ());
debug!( debug!(
pdu_elapsed = ?pdu_start_time.elapsed(), pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}", "Finished PDU {event_id}",
); );
results.push((event_id, result)); Ok((event_id, result))
} })
.try_collect()
Ok(results) .await
} }
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) { async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
match edu { match edu {
| Edu::Presence(presence) => { | Edu::Presence(presence) if services.server.config.allow_incoming_presence =>
handle_edu_presence(services, client, origin, presence).await; handle_edu_presence(services, client, origin, presence).await,
},
| Edu::Receipt(receipt) => handle_edu_receipt(services, client, origin, receipt).await, | Edu::Receipt(receipt) if services.server.config.allow_incoming_read_receipts =>
| Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await, handle_edu_receipt(services, client, origin, receipt).await,
| Edu::DeviceListUpdate(content) => {
handle_edu_device_list_update(services, client, origin, content).await; | Edu::Typing(typing) if services.server.config.allow_incoming_typing =>
}, handle_edu_typing(services, client, origin, typing).await,
| Edu::DirectToDevice(content) => {
handle_edu_direct_to_device(services, client, origin, content).await; | Edu::DeviceListUpdate(content) =>
}, handle_edu_device_list_update(services, client, origin, content).await,
| Edu::SigningKeyUpdate(content) => {
handle_edu_signing_key_update(services, client, origin, content).await; | Edu::DirectToDevice(content) =>
}, handle_edu_direct_to_device(services, client, origin, content).await,
| Edu::_Custom(ref _custom) => {
debug_warn!(?edu, "received custom/unknown EDU"); | Edu::SigningKeyUpdate(content) =>
}, handle_edu_signing_key_update(services, client, origin, content).await,
| Edu::_Custom(ref _custom) => debug_warn!(?edu, "received custom/unknown EDU"),
| _ => trace!(?edu, "skipped"),
} }
} }
@ -236,32 +244,41 @@ async fn handle_edu_presence(
origin: &ServerName, origin: &ServerName,
presence: PresenceContent, presence: PresenceContent,
) { ) {
if !services.globals.allow_incoming_presence() { presence
.push
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |update| {
handle_edu_presence_update(services, origin, update)
})
.await;
}
async fn handle_edu_presence_update(
services: &Services,
origin: &ServerName,
update: PresenceUpdate,
) {
if update.user_id.server_name() != origin {
debug_warn!(
%update.user_id, %origin,
"received presence EDU for user not belonging to origin"
);
return; return;
} }
for update in presence.push { services
if update.user_id.server_name() != origin { .presence
debug_warn!( .set_presence(
%update.user_id, %origin, &update.user_id,
"received presence EDU for user not belonging to origin" &update.presence,
); Some(update.currently_active),
continue; Some(update.last_active_ago),
} update.status_msg.clone(),
)
services .await
.presence .log_err()
.set_presence( .ok();
&update.user_id,
&update.presence,
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)
.await
.log_err()
.ok();
}
} }
async fn handle_edu_receipt( async fn handle_edu_receipt(
@ -270,66 +287,94 @@ async fn handle_edu_receipt(
origin: &ServerName, origin: &ServerName,
receipt: ReceiptContent, receipt: ReceiptContent,
) { ) {
if !services.globals.allow_incoming_read_receipts() { receipt
.receipts
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |(room_id, room_updates)| {
handle_edu_receipt_room(services, origin, room_id, room_updates)
})
.await;
}
async fn handle_edu_receipt_room(
services: &Services,
origin: &ServerName,
room_id: OwnedRoomId,
room_updates: ReceiptMap,
) {
if services
.rooms
.event_handler
.acl_check(origin, &room_id)
.await
.is_err()
{
debug_warn!(
%origin, %room_id,
"received read receipt EDU from ACL'd server"
);
return; return;
} }
for (room_id, room_updates) in receipt.receipts { let room_id = &room_id;
if services room_updates
.rooms .read
.event_handler .into_iter()
.acl_check(origin, &room_id) .stream()
.await .for_each_concurrent(automatic_width(), |(user_id, user_updates)| async move {
.is_err() handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
{ })
debug_warn!( .await;
%origin, %room_id, }
"received read receipt EDU from ACL'd server"
);
continue;
}
for (user_id, user_updates) in room_updates.read { async fn handle_edu_receipt_room_user(
if user_id.server_name() != origin { services: &Services,
debug_warn!( origin: &ServerName,
%user_id, %origin, room_id: &RoomId,
"received read receipt EDU for user not belonging to origin" user_id: &UserId,
); user_updates: ReceiptData,
continue; ) {
} if user_id.server_name() != origin {
debug_warn!(
if services %user_id, %origin,
.rooms "received read receipt EDU for user not belonging to origin"
.state_cache );
.room_members(&room_id) return;
.ready_any(|member| member.server_name() == user_id.server_name())
.await
{
for event_id in &user_updates.event_ids {
let user_receipts =
BTreeMap::from([(user_id.clone(), user_updates.data.clone())]);
let receipts = BTreeMap::from([(ReceiptType::Read, user_receipts)]);
let receipt_content = BTreeMap::from([(event_id.to_owned(), receipts)]);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services
.rooms
.read_receipt
.readreceipt_update(&user_id, &room_id, &event)
.await;
}
} else {
debug_warn!(
%user_id, %room_id, %origin,
"received read receipt EDU from server who does not have a member in the room",
);
continue;
}
}
} }
if !services
.rooms
.state_cache
.server_in_room(origin, room_id)
.await
{
debug_warn!(
%user_id, %room_id, %origin,
"received read receipt EDU from server who does not have a member in the room",
);
return;
}
let data = &user_updates.data;
user_updates
.event_ids
.into_iter()
.stream()
.for_each_concurrent(automatic_width(), |event_id| async move {
let user_data = [(user_id.to_owned(), data.clone())];
let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
let content = [(event_id.clone(), BTreeMap::from(receipts))];
services
.rooms
.read_receipt
.readreceipt_update(user_id, room_id, &ReceiptEvent {
content: ReceiptEventContent(content.into()),
room_id: room_id.to_owned(),
})
.await;
})
.await;
} }
async fn handle_edu_typing( async fn handle_edu_typing(
@ -338,10 +383,6 @@ async fn handle_edu_typing(
origin: &ServerName, origin: &ServerName,
typing: TypingContent, typing: TypingContent,
) { ) {
if !services.server.config.allow_incoming_typing {
return;
}
if typing.user_id.server_name() != origin { if typing.user_id.server_name() != origin {
debug_warn!( debug_warn!(
%typing.user_id, %origin, %typing.user_id, %origin,
@ -364,41 +405,38 @@ async fn handle_edu_typing(
return; return;
} }
if services if !services
.rooms .rooms
.state_cache .state_cache
.is_joined(&typing.user_id, &typing.room_id) .is_joined(&typing.user_id, &typing.room_id)
.await .await
{ {
if typing.typing {
let timeout = utils::millis_since_unix_epoch().saturating_add(
services
.server
.config
.typing_federation_timeout_s
.saturating_mul(1000),
);
services
.rooms
.typing
.typing_add(&typing.user_id, &typing.room_id, timeout)
.await
.log_err()
.ok();
} else {
services
.rooms
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await
.log_err()
.ok();
}
} else {
debug_warn!( debug_warn!(
%typing.user_id, %typing.room_id, %origin, %typing.user_id, %typing.room_id, %origin,
"received typing EDU for user not in room" "received typing EDU for user not in room"
); );
return;
}
if typing.typing {
let secs = services.server.config.typing_federation_timeout_s;
let timeout = millis_since_unix_epoch().saturating_add(secs.saturating_mul(1000));
services
.rooms
.typing
.typing_add(&typing.user_id, &typing.room_id, timeout)
.await
.log_err()
.ok();
} else {
services
.rooms
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await
.log_err()
.ok();
} }
} }
@ -427,7 +465,12 @@ async fn handle_edu_direct_to_device(
origin: &ServerName, origin: &ServerName,
content: DirectDeviceContent, content: DirectDeviceContent,
) { ) {
let DirectDeviceContent { sender, ev_type, message_id, messages } = content; let DirectDeviceContent {
ref sender,
ref ev_type,
ref message_id,
messages,
} = content;
if sender.server_name() != origin { if sender.server_name() != origin {
debug_warn!( debug_warn!(
@ -440,60 +483,88 @@ async fn handle_edu_direct_to_device(
// Check if this is a new transaction id // Check if this is a new transaction id
if services if services
.transaction_ids .transaction_ids
.existing_txnid(&sender, None, &message_id) .existing_txnid(sender, None, message_id)
.await .await
.is_ok() .is_ok()
{ {
return; return;
} }
for (target_user_id, map) in &messages { // process messages concurrently for different users
for (target_device_id_maybe, event) in map { let ev_type = ev_type.to_string();
let Ok(event) = event.deserialize_as().map_err(|e| { messages
err!(Request(InvalidParam(error!("To-Device event is invalid: {e}")))) .into_iter()
}) else { .stream()
continue; .for_each_concurrent(automatic_width(), |(target_user_id, map)| {
}; handle_edu_direct_to_device_user(services, target_user_id, sender, &ev_type, map)
})
let ev_type = ev_type.to_string(); .await;
match target_device_id_maybe {
| DeviceIdOrAllDevices::DeviceId(target_device_id) => {
services
.users
.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type,
event,
)
.await;
},
| DeviceIdOrAllDevices::AllDevices => {
let (sender, ev_type, event) = (&sender, &ev_type, &event);
services
.users
.all_device_ids(target_user_id)
.for_each(|target_device_id| {
services.users.add_to_device_event(
sender,
target_user_id,
target_device_id,
ev_type,
event.clone(),
)
})
.await;
},
}
}
}
// Save transaction id with empty data // Save transaction id with empty data
services services
.transaction_ids .transaction_ids
.add_txnid(&sender, None, &message_id, &[]); .add_txnid(sender, None, message_id, &[]);
}
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
services: &Services,
target_user_id: OwnedUserId,
sender: &UserId,
ev_type: &str,
map: BTreeMap<DeviceIdOrAllDevices, Raw<Event>>,
) {
for (target_device_id_maybe, event) in map {
let Ok(event) = event
.deserialize_as()
.map_err(|e| err!(Request(InvalidParam(error!("To-Device event is invalid: {e}")))))
else {
continue;
};
handle_edu_direct_to_device_event(
services,
&target_user_id,
sender,
target_device_id_maybe,
ev_type,
event,
)
.await;
}
}
async fn handle_edu_direct_to_device_event(
services: &Services,
target_user_id: &UserId,
sender: &UserId,
target_device_id_maybe: DeviceIdOrAllDevices,
ev_type: &str,
event: serde_json::Value,
) {
match target_device_id_maybe {
| DeviceIdOrAllDevices::DeviceId(ref target_device_id) => {
services
.users
.add_to_device_event(sender, target_user_id, target_device_id, ev_type, event)
.await;
},
| DeviceIdOrAllDevices::AllDevices => {
services
.users
.all_device_ids(target_user_id)
.for_each(|target_device_id| {
services.users.add_to_device_event(
sender,
target_user_id,
target_device_id,
ev_type,
event.clone(),
)
})
.await;
},
}
} }
async fn handle_edu_signing_key_update( async fn handle_edu_signing_key_update(