From d35376a90cb521b578aff75a1441699e10695bac Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 22 Oct 2024 05:30:28 +0000 Subject: [PATCH] aggregate receipts into single edu; dedup presence; refactor selection limits etc Signed-off-by: Jason Volk --- src/api/server/send.rs | 8 +- src/service/sending/mod.rs | 5 +- src/service/sending/sender.rs | 307 ++++++++++++++++++++-------------- 3 files changed, 190 insertions(+), 130 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 40f9403b..e2100a0f 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -21,16 +21,16 @@ use ruma::{ OwnedEventId, ServerName, }; use serde_json::value::RawValue as RawJsonValue; +use service::{ + sending::{EDU_LIMIT, PDU_LIMIT}, + Services, +}; use crate::{ - services::Services, utils::{self}, Ruma, }; -const PDU_LIMIT: usize = 50; -const EDU_LIMIT: usize = 100; - type ResolvedMap = BTreeMap>; /// # `PUT /_matrix/federation/v1/send/{txnId}` diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index a1d5f692..ea266883 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -20,7 +20,10 @@ use ruma::{ use tokio::sync::Mutex; use self::data::Data; -pub use self::dest::Destination; +pub use self::{ + dest::Destination, + sender::{EDU_LIMIT, PDU_LIMIT}, +}; use crate::{account_data, client, globals, presence, pusher, resolver, rooms, server_keys, users, Dep}; pub struct Service { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a57d4aea..d9087d44 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -7,7 +7,7 @@ use std::{ use base64::{engine::general_purpose, Engine as _}; use conduit::{ - debug, debug_warn, err, + debug, debug_warn, err, error, result::LogErr, trace, utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt}, @@ -26,8 +26,8 @@ use ruma::{ }, device_id, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, - push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, - ServerName, UInt, + push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, + RoomVersionId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use tokio::time::sleep_until; @@ -47,10 +47,16 @@ type SendingFuture<'a> = BoxFuture<'a, SendingResult>; type SendingFutures<'a> = FuturesUnordered>; type CurTransactionStatus = HashMap; -const DEQUEUE_LIMIT: usize = 48; -const SELECT_EDU_LIMIT: usize = 16; const CLEANUP_TIMEOUT_MS: u64 = 3500; +const SELECT_PRESENCE_LIMIT: usize = 256; +const SELECT_RECEIPT_LIMIT: usize = 256; +const SELECT_EDU_LIMIT: usize = EDU_LIMIT - 2; +const DEQUEUE_LIMIT: usize = 48; + +pub const PDU_LIMIT: usize = 50; +pub const EDU_LIMIT: usize = 100; + impl Service { #[tracing::instrument(skip_all, name = "sender")] pub(super) async fn sender(&self) -> Result<()> { @@ -216,6 +222,7 @@ impl Service { // Add EDU's into the transaction if let Destination::Normal(server_name) = dest { if let Ok((select_edus, last_count)) = self.select_edus(server_name).await { + debug_assert!(select_edus.len() <= EDU_LIMIT, "exceeded edus limit"); events.extend(select_edus.into_iter().map(SendingEvent::Edu)); self.db.set_latest_educount(server_name, last_count); } @@ -254,69 +261,176 @@ impl Service { async fn select_edus(&self, server_name: &ServerName) -> Result<(Vec>, u64)> { // u64: count of last edu let since = self.db.get_latest_educount(server_name).await; - let mut events = Vec::new(); let mut max_edu_count = since; - let mut device_list_changes = HashSet::new(); + let mut events = Vec::new(); - let server_rooms = self.services.state_cache.server_rooms(server_name); + self.select_edus_device_changes(server_name, since, &mut max_edu_count, &mut events) + .await; - pin_mut!(server_rooms); - while let Some(room_id) = server_rooms.next().await { - // Look for device list updates in this room - device_list_changes.extend( - self.services - .users - .keys_changed(room_id.as_str(), since, None) - .ready_filter(|user_id| self.services.globals.user_is_local(user_id)) - .map(ToOwned::to_owned) - .collect::>() - .await, - ); - - if self.server.config.allow_outgoing_read_receipts - && !self - .select_edus_receipts(room_id, since, &mut max_edu_count, &mut events) - .await? - { - break; - } - } - - for user_id in device_list_changes { - // Empty prev id forces synapse to resync; because synapse resyncs, - // we can just insert placeholder data - let edu = Edu::DeviceListUpdate(DeviceListUpdateContent { - user_id, - device_id: device_id!("placeholder").to_owned(), - device_display_name: Some("Placeholder".to_owned()), - stream_id: uint!(1), - prev_id: Vec::new(), - deleted: None, - keys: None, - }); - - events.push(serde_json::to_vec(&edu).expect("json can be serialized")); + if self.server.config.allow_outgoing_read_receipts { + self.select_edus_receipts(server_name, since, &mut max_edu_count, &mut events) + .await; } if self.server.config.allow_outgoing_presence { self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events) - .await?; + .await; } Ok((events, max_edu_count)) } + /// Look for presence + async fn select_edus_device_changes( + &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, + ) { + debug_assert!(events.len() < SELECT_EDU_LIMIT, "called when edu limit reached"); + + let server_rooms = self.services.state_cache.server_rooms(server_name); + + pin_mut!(server_rooms); + let mut device_list_changes = HashSet::::new(); + while let Some(room_id) = server_rooms.next().await { + let keys_changed = self + .services + .users + .room_keys_changed(room_id, since, None) + .ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id)); + + pin_mut!(keys_changed); + while let Some((user_id, count)) = keys_changed.next().await { + *max_edu_count = cmp::max(count, *max_edu_count); + if !device_list_changes.insert(user_id.into()) { + continue; + } + + // Empty prev id forces synapse to resync; because synapse resyncs, + // we can just insert placeholder data + let edu = Edu::DeviceListUpdate(DeviceListUpdateContent { + user_id: user_id.into(), + device_id: device_id!("placeholder").to_owned(), + device_display_name: Some("Placeholder".to_owned()), + stream_id: uint!(1), + prev_id: Vec::new(), + deleted: None, + keys: None, + }); + + let edu = serde_json::to_vec(&edu).expect("failed to serialize device list update to JSON"); + + events.push(edu); + if events.len() >= SELECT_EDU_LIMIT { + return; + } + } + } + } + + /// Look for read receipts in this room + async fn select_edus_receipts( + &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, + ) { + debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached"); + + let server_rooms = self.services.state_cache.server_rooms(server_name); + + pin_mut!(server_rooms); + let mut num = 0; + let mut receipts = BTreeMap::::new(); + while let Some(room_id) = server_rooms.next().await { + let receipt_map = self + .select_edus_receipts_room(room_id, since, max_edu_count, &mut num) + .await; + + if !receipt_map.read.is_empty() { + receipts.insert(room_id.into(), receipt_map); + } + } + + if receipts.is_empty() { + return; + } + + let receipt_content = Edu::Receipt(ReceiptContent { + receipts, + }); + + let receipt_content = + serde_json::to_vec(&receipt_content).expect("Failed to serialize Receipt EDU to JSON vec"); + + events.push(receipt_content); + } + + /// Look for read receipts in this room + async fn select_edus_receipts_room( + &self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, num: &mut usize, + ) -> ReceiptMap { + let receipts = self + .services + .read_receipt + .readreceipts_since(room_id, since); + + pin_mut!(receipts); + let mut read = BTreeMap::::new(); + while let Some((user_id, count, read_receipt)) = receipts.next().await { + *max_edu_count = cmp::max(count, *max_edu_count); + if !self.services.globals.user_is_local(&user_id) { + continue; + } + + let Ok(event) = serde_json::from_str(read_receipt.json().get()) else { + error!(?user_id, ?count, ?read_receipt, "Invalid edu event in read_receipts."); + continue; + }; + + let AnySyncEphemeralRoomEvent::Receipt(r) = event else { + error!(?user_id, ?count, ?event, "Invalid event type in read_receipts"); + continue; + }; + + let (event_id, mut receipt) = r + .content + .0 + .into_iter() + .next() + .expect("we only use one event per read receipt"); + + let receipt = receipt + .remove(&ReceiptType::Read) + .expect("our read receipts always set this") + .remove(&user_id) + .expect("our read receipts always have the user here"); + + let receipt_data = ReceiptData { + data: receipt, + event_ids: vec![event_id.clone()], + }; + + if read.insert(user_id, receipt_data).is_none() { + *num = num.saturating_add(1); + if *num >= SELECT_RECEIPT_LIMIT { + break; + } + } + } + + ReceiptMap { + read, + } + } + /// Look for presence async fn select_edus_presence( &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, - ) -> Result { + ) { + debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached"); + let presence_since = self.services.presence.presence_since(since); pin_mut!(presence_since); - let mut presence_updates = Vec::new(); + let mut presence_updates = HashMap::::new(); while let Some((user_id, count, presence_bytes)) = presence_since.next().await { *max_edu_count = cmp::max(count, *max_edu_count); - if !self.services.globals.user_is_local(user_id) { continue; } @@ -330,101 +444,44 @@ impl Service { continue; } - let presence_event = self + let Ok(presence_event) = self .services .presence - .from_json_bytes_to_event(&presence_bytes, &user_id) - .await?; + .from_json_bytes_to_event(presence_bytes, user_id) + .await + .log_err() + else { + continue; + }; - presence_updates.push(PresenceUpdate { - user_id, + let update = PresenceUpdate { + user_id: user_id.into(), presence: presence_event.content.presence, currently_active: presence_event.content.currently_active.unwrap_or(false), + status_msg: presence_event.content.status_msg, last_active_ago: presence_event .content .last_active_ago .unwrap_or_else(|| uint!(0)), - status_msg: presence_event.content.status_msg, - }); + }; - if presence_updates.len() >= SELECT_EDU_LIMIT { + presence_updates.insert(user_id.into(), update); + if presence_updates.len() >= SELECT_PRESENCE_LIMIT { break; } } - if !presence_updates.is_empty() { - let presence_content = Edu::Presence(PresenceContent::new(presence_updates)); - events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized")); + if presence_updates.is_empty() { + return; } - Ok(true) - } + let presence_content = Edu::Presence(PresenceContent { + push: presence_updates.into_values().collect(), + }); - /// Look for read receipts in this room - async fn select_edus_receipts( - &self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec>, - ) -> Result { - let receipts = self - .services - .read_receipt - .readreceipts_since(room_id, since); + let presence_content = serde_json::to_vec(&presence_content).expect("failed to serialize Presence EDU to JSON"); - pin_mut!(receipts); - while let Some((user_id, count, read_receipt)) = receipts.next().await { - *max_edu_count = cmp::max(count, *max_edu_count); - if !self.services.globals.user_is_local(&user_id) { - continue; - } - - let event = serde_json::from_str(read_receipt.json().get()) - .map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?; - - let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event { - let mut read = BTreeMap::new(); - let (event_id, mut receipt) = r - .content - .0 - .into_iter() - .next() - .expect("we only use one event per read receipt"); - - let receipt = receipt - .remove(&ReceiptType::Read) - .expect("our read receipts always set this") - .remove(&user_id) - .expect("our read receipts always have the user here"); - - read.insert( - user_id, - ReceiptData { - data: receipt.clone(), - event_ids: vec![event_id.clone()], - }, - ); - - let receipt_map = ReceiptMap { - read, - }; - - let mut receipts = BTreeMap::new(); - receipts.insert(room_id.to_owned(), receipt_map); - - Edu::Receipt(ReceiptContent { - receipts, - }) - } else { - Error::bad_database("Invalid event type in read_receipts"); - continue; - }; - - events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); - - if events.len() >= SELECT_EDU_LIMIT { - return Ok(false); - } - } - - Ok(true) + events.push(presence_content); } async fn send_events(&self, dest: Destination, events: Vec) -> SendingResult {