diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 016f5194..f4903447 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -20,19 +20,22 @@ use ruma::{ federation::transactions::{ edu::{ DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, - ReceiptContent, SigningKeyUpdateContent, TypingContent, + PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent, + TypingContent, }, send_transaction_message, }, }, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, + serde::Raw, to_device::DeviceIdOrAllDevices, - CanonicalJsonObject, OwnedEventId, OwnedRoomId, ServerName, + CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId, }; use service::{ sending::{EDU_LIMIT, PDU_LIMIT}, Services, }; +use utils::millis_since_unix_epoch; use crate::{ utils::{self}, @@ -152,8 +155,8 @@ async fn handle( let results: ResolvedMap = pdus .into_iter() .try_stream() - .broad_and_then(|(room_id, pdus)| { - handle_room(services, client, origin, started, room_id, pdus) + .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { + handle_room(services, client, origin, started, room_id, pdus.into_iter()) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -176,7 +179,7 @@ async fn handle_room( origin: &ServerName, txn_start_time: Instant, room_id: OwnedRoomId, - pdus: Vec, + pdus: impl Iterator + Send, ) -> Result> { let _room_lock = services .rooms @@ -185,48 +188,53 @@ async fn handle_room( .lock(&room_id) .await; - let mut results = Vec::with_capacity(pdus.len()); - for (_, event_id, value) in pdus { - services.server.check_running()?; - let pdu_start_time = Instant::now(); - let result = services - .rooms - .event_handler - .handle_incoming_pdu(origin, &room_id, &event_id, value, true) - .await - .map(|_| ()); + let room_id = &room_id; + pdus.try_stream() + .and_then(|(_, event_id, value)| async move { + services.server.check_running()?; + let pdu_start_time = Instant::now(); + let result = services + .rooms + .event_handler + .handle_incoming_pdu(origin, room_id, &event_id, value, true) + .await + .map(|_| ()); - debug!( - pdu_elapsed = ?pdu_start_time.elapsed(), - txn_elapsed = ?txn_start_time.elapsed(), - "Finished PDU {event_id}", - ); + debug!( + pdu_elapsed = ?pdu_start_time.elapsed(), + txn_elapsed = ?txn_start_time.elapsed(), + "Finished PDU {event_id}", + ); - results.push((event_id, result)); - } - - Ok(results) + Ok((event_id, result)) + }) + .try_collect() + .await } async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) { match edu { - | Edu::Presence(presence) => { - handle_edu_presence(services, client, origin, presence).await; - }, - | Edu::Receipt(receipt) => handle_edu_receipt(services, client, origin, receipt).await, - | Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await, - | Edu::DeviceListUpdate(content) => { - handle_edu_device_list_update(services, client, origin, content).await; - }, - | Edu::DirectToDevice(content) => { - handle_edu_direct_to_device(services, client, origin, content).await; - }, - | Edu::SigningKeyUpdate(content) => { - handle_edu_signing_key_update(services, client, origin, content).await; - }, - | Edu::_Custom(ref _custom) => { - debug_warn!(?edu, "received custom/unknown EDU"); - }, + | Edu::Presence(presence) if services.server.config.allow_incoming_presence => + handle_edu_presence(services, client, origin, presence).await, + + | Edu::Receipt(receipt) if services.server.config.allow_incoming_read_receipts => + handle_edu_receipt(services, client, origin, receipt).await, + + | Edu::Typing(typing) if services.server.config.allow_incoming_typing => + handle_edu_typing(services, client, origin, typing).await, + + | Edu::DeviceListUpdate(content) => + handle_edu_device_list_update(services, client, origin, content).await, + + | Edu::DirectToDevice(content) => + handle_edu_direct_to_device(services, client, origin, content).await, + + | Edu::SigningKeyUpdate(content) => + handle_edu_signing_key_update(services, client, origin, content).await, + + | Edu::_Custom(ref _custom) => debug_warn!(?edu, "received custom/unknown EDU"), + + | _ => trace!(?edu, "skipped"), } } @@ -236,32 +244,41 @@ async fn handle_edu_presence( origin: &ServerName, presence: PresenceContent, ) { - if !services.globals.allow_incoming_presence() { + presence + .push + .into_iter() + .stream() + .for_each_concurrent(automatic_width(), |update| { + handle_edu_presence_update(services, origin, update) + }) + .await; +} + +async fn handle_edu_presence_update( + services: &Services, + origin: &ServerName, + update: PresenceUpdate, +) { + if update.user_id.server_name() != origin { + debug_warn!( + %update.user_id, %origin, + "received presence EDU for user not belonging to origin" + ); return; } - for update in presence.push { - if update.user_id.server_name() != origin { - debug_warn!( - %update.user_id, %origin, - "received presence EDU for user not belonging to origin" - ); - continue; - } - - services - .presence - .set_presence( - &update.user_id, - &update.presence, - Some(update.currently_active), - Some(update.last_active_ago), - update.status_msg.clone(), - ) - .await - .log_err() - .ok(); - } + services + .presence + .set_presence( + &update.user_id, + &update.presence, + Some(update.currently_active), + Some(update.last_active_ago), + update.status_msg.clone(), + ) + .await + .log_err() + .ok(); } async fn handle_edu_receipt( @@ -270,66 +287,94 @@ async fn handle_edu_receipt( origin: &ServerName, receipt: ReceiptContent, ) { - if !services.globals.allow_incoming_read_receipts() { + receipt + .receipts + .into_iter() + .stream() + .for_each_concurrent(automatic_width(), |(room_id, room_updates)| { + handle_edu_receipt_room(services, origin, room_id, room_updates) + }) + .await; +} + +async fn handle_edu_receipt_room( + services: &Services, + origin: &ServerName, + room_id: OwnedRoomId, + room_updates: ReceiptMap, +) { + if services + .rooms + .event_handler + .acl_check(origin, &room_id) + .await + .is_err() + { + debug_warn!( + %origin, %room_id, + "received read receipt EDU from ACL'd server" + ); return; } - for (room_id, room_updates) in receipt.receipts { - if services - .rooms - .event_handler - .acl_check(origin, &room_id) - .await - .is_err() - { - debug_warn!( - %origin, %room_id, - "received read receipt EDU from ACL'd server" - ); - continue; - } + let room_id = &room_id; + room_updates + .read + .into_iter() + .stream() + .for_each_concurrent(automatic_width(), |(user_id, user_updates)| async move { + handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await; + }) + .await; +} - for (user_id, user_updates) in room_updates.read { - if user_id.server_name() != origin { - debug_warn!( - %user_id, %origin, - "received read receipt EDU for user not belonging to origin" - ); - continue; - } - - if services - .rooms - .state_cache - .room_members(&room_id) - .ready_any(|member| member.server_name() == user_id.server_name()) - .await - { - for event_id in &user_updates.event_ids { - let user_receipts = - BTreeMap::from([(user_id.clone(), user_updates.data.clone())]); - let receipts = BTreeMap::from([(ReceiptType::Read, user_receipts)]); - let receipt_content = BTreeMap::from([(event_id.to_owned(), receipts)]); - let event = ReceiptEvent { - content: ReceiptEventContent(receipt_content), - room_id: room_id.clone(), - }; - - services - .rooms - .read_receipt - .readreceipt_update(&user_id, &room_id, &event) - .await; - } - } else { - debug_warn!( - %user_id, %room_id, %origin, - "received read receipt EDU from server who does not have a member in the room", - ); - continue; - } - } +async fn handle_edu_receipt_room_user( + services: &Services, + origin: &ServerName, + room_id: &RoomId, + user_id: &UserId, + user_updates: ReceiptData, +) { + if user_id.server_name() != origin { + debug_warn!( + %user_id, %origin, + "received read receipt EDU for user not belonging to origin" + ); + return; } + + if !services + .rooms + .state_cache + .server_in_room(origin, room_id) + .await + { + debug_warn!( + %user_id, %room_id, %origin, + "received read receipt EDU from server who does not have a member in the room", + ); + return; + } + + let data = &user_updates.data; + user_updates + .event_ids + .into_iter() + .stream() + .for_each_concurrent(automatic_width(), |event_id| async move { + let user_data = [(user_id.to_owned(), data.clone())]; + let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))]; + let content = [(event_id.clone(), BTreeMap::from(receipts))]; + services + .rooms + .read_receipt + .readreceipt_update(user_id, room_id, &ReceiptEvent { + content: ReceiptEventContent(content.into()), + room_id: room_id.to_owned(), + }) + .await; + }) + .await; } async fn handle_edu_typing( @@ -338,10 +383,6 @@ async fn handle_edu_typing( origin: &ServerName, typing: TypingContent, ) { - if !services.server.config.allow_incoming_typing { - return; - } - if typing.user_id.server_name() != origin { debug_warn!( %typing.user_id, %origin, @@ -364,41 +405,38 @@ async fn handle_edu_typing( return; } - if services + if !services .rooms .state_cache .is_joined(&typing.user_id, &typing.room_id) .await { - if typing.typing { - let timeout = utils::millis_since_unix_epoch().saturating_add( - services - .server - .config - .typing_federation_timeout_s - .saturating_mul(1000), - ); - services - .rooms - .typing - .typing_add(&typing.user_id, &typing.room_id, timeout) - .await - .log_err() - .ok(); - } else { - services - .rooms - .typing - .typing_remove(&typing.user_id, &typing.room_id) - .await - .log_err() - .ok(); - } - } else { debug_warn!( %typing.user_id, %typing.room_id, %origin, "received typing EDU for user not in room" ); + return; + } + + if typing.typing { + let secs = services.server.config.typing_federation_timeout_s; + let timeout = millis_since_unix_epoch().saturating_add(secs.saturating_mul(1000)); + + services + .rooms + .typing + .typing_add(&typing.user_id, &typing.room_id, timeout) + .await + .log_err() + .ok(); + } else { + services + .rooms + .typing + .typing_remove(&typing.user_id, &typing.room_id) + .await + .log_err() + .ok(); } } @@ -427,7 +465,12 @@ async fn handle_edu_direct_to_device( origin: &ServerName, content: DirectDeviceContent, ) { - let DirectDeviceContent { sender, ev_type, message_id, messages } = content; + let DirectDeviceContent { + ref sender, + ref ev_type, + ref message_id, + messages, + } = content; if sender.server_name() != origin { debug_warn!( @@ -440,60 +483,88 @@ async fn handle_edu_direct_to_device( // Check if this is a new transaction id if services .transaction_ids - .existing_txnid(&sender, None, &message_id) + .existing_txnid(sender, None, message_id) .await .is_ok() { return; } - for (target_user_id, map) in &messages { - for (target_device_id_maybe, event) in map { - let Ok(event) = event.deserialize_as().map_err(|e| { - err!(Request(InvalidParam(error!("To-Device event is invalid: {e}")))) - }) else { - continue; - }; - - let ev_type = ev_type.to_string(); - match target_device_id_maybe { - | DeviceIdOrAllDevices::DeviceId(target_device_id) => { - services - .users - .add_to_device_event( - &sender, - target_user_id, - target_device_id, - &ev_type, - event, - ) - .await; - }, - - | DeviceIdOrAllDevices::AllDevices => { - let (sender, ev_type, event) = (&sender, &ev_type, &event); - services - .users - .all_device_ids(target_user_id) - .for_each(|target_device_id| { - services.users.add_to_device_event( - sender, - target_user_id, - target_device_id, - ev_type, - event.clone(), - ) - }) - .await; - }, - } - } - } + // process messages concurrently for different users + let ev_type = ev_type.to_string(); + messages + .into_iter() + .stream() + .for_each_concurrent(automatic_width(), |(target_user_id, map)| { + handle_edu_direct_to_device_user(services, target_user_id, sender, &ev_type, map) + }) + .await; // Save transaction id with empty data services .transaction_ids - .add_txnid(&sender, None, &message_id, &[]); + .add_txnid(sender, None, message_id, &[]); +} + +async fn handle_edu_direct_to_device_user( + services: &Services, + target_user_id: OwnedUserId, + sender: &UserId, + ev_type: &str, + map: BTreeMap>, +) { + for (target_device_id_maybe, event) in map { + let Ok(event) = event + .deserialize_as() + .map_err(|e| err!(Request(InvalidParam(error!("To-Device event is invalid: {e}"))))) + else { + continue; + }; + + handle_edu_direct_to_device_event( + services, + &target_user_id, + sender, + target_device_id_maybe, + ev_type, + event, + ) + .await; + } +} + +async fn handle_edu_direct_to_device_event( + services: &Services, + target_user_id: &UserId, + sender: &UserId, + target_device_id_maybe: DeviceIdOrAllDevices, + ev_type: &str, + event: serde_json::Value, +) { + match target_device_id_maybe { + | DeviceIdOrAllDevices::DeviceId(ref target_device_id) => { + services + .users + .add_to_device_event(sender, target_user_id, target_device_id, ev_type, event) + .await; + }, + + | DeviceIdOrAllDevices::AllDevices => { + services + .users + .all_device_ids(target_user_id) + .for_each(|target_device_id| { + services.users.add_to_device_event( + sender, + target_user_id, + target_device_id, + ev_type, + event.clone(), + ) + }) + .await; + }, + } } async fn handle_edu_signing_key_update(