From b5266ad9f5b77f5fa04b305ebbb0ed29e862396f Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 29 Nov 2024 03:25:29 +0000 Subject: [PATCH] parallelize sender edu selection Signed-off-by: Jason Volk --- src/service/sending/sender.rs | 109 +++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 41 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 3c544725..b1e909c9 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -1,7 +1,7 @@ use std::{ - cmp, collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, + sync::atomic::{AtomicU64, AtomicUsize, Ordering}, time::{Duration, Instant}, }; @@ -13,7 +13,12 @@ use conduit::{ utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt}, warn, Error, Result, }; -use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{ + future::{BoxFuture, OptionFuture}, + join, pin_mut, + stream::FuturesUnordered, + FutureExt, StreamExt, +}; use ruma::{ api::{ appservice::event::push_events::v1::Edu as RumaEdu, @@ -261,33 +266,45 @@ impl Service { #[tracing::instrument(skip_all, level = "debug")] async fn select_edus(&self, server_name: &ServerName) -> Result<(Vec>, u64)> { - // u64: count of last edu + // selection window let since = self.db.get_latest_educount(server_name).await; - let mut max_edu_count = since; - let mut events = Vec::new(); + let since_upper = self.services.globals.current_count()?; + let batch = (since, since_upper); + debug_assert!(batch.0 <= batch.1, "since range must not be negative"); - self.select_edus_device_changes(server_name, since, &mut max_edu_count, &mut events) - .await; + let events_len = AtomicUsize::default(); + let max_edu_count = AtomicU64::new(since); - if self.server.config.allow_outgoing_read_receipts { - self.select_edus_receipts(server_name, since, &mut max_edu_count, &mut events) - .await; - } + let device_changes = self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len); - if self.server.config.allow_outgoing_presence { - self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events) - .await; - } + let receipts: OptionFuture<_> = self + .server + .config + .allow_outgoing_read_receipts + .then(|| self.select_edus_receipts(server_name, batch, &max_edu_count)) + .into(); - Ok((events, max_edu_count)) + let presence: OptionFuture<_> = self + .server + .config + .allow_outgoing_presence + .then(|| self.select_edus_presence(server_name, batch, &max_edu_count)) + .into(); + + let (device_changes, receipts, presence) = join!(device_changes, receipts, presence); + + let mut events = device_changes; + events.extend(presence.into_iter().flatten()); + events.extend(receipts.into_iter().flatten()); + + Ok((events, max_edu_count.load(Ordering::Acquire))) } /// Look for presence async fn select_edus_device_changes( - &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, - ) { - debug_assert!(events.len() < SELECT_EDU_LIMIT, "called when edu limit reached"); - + &self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64, events_len: &AtomicUsize, + ) -> Vec> { + let mut events = Vec::new(); let server_rooms = self.services.state_cache.server_rooms(server_name); pin_mut!(server_rooms); @@ -296,12 +313,16 @@ impl Service { let keys_changed = self .services .users - .room_keys_changed(room_id, since, None) + .room_keys_changed(room_id, since.0, None) .ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id)); pin_mut!(keys_changed); while let Some((user_id, count)) = keys_changed.next().await { - *max_edu_count = cmp::max(count, *max_edu_count); + if count > since.1 { + break; + } + + max_edu_count.fetch_max(count, Ordering::Relaxed); if !device_list_changes.insert(user_id.into()) { continue; } @@ -321,19 +342,19 @@ impl Service { let edu = serde_json::to_vec(&edu).expect("failed to serialize device list update to JSON"); events.push(edu); - if events.len() >= SELECT_EDU_LIMIT { - return; + if events_len.fetch_add(1, Ordering::Relaxed) >= SELECT_EDU_LIMIT - 1 { + return events; } } } + + events } /// Look for read receipts in this room async fn select_edus_receipts( - &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, - ) { - debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached"); - + &self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64, + ) -> Option> { let server_rooms = self.services.state_cache.server_rooms(server_name); pin_mut!(server_rooms); @@ -350,7 +371,7 @@ impl Service { } if receipts.is_empty() { - return; + return None; } let receipt_content = Edu::Receipt(ReceiptContent { @@ -360,22 +381,26 @@ impl Service { let receipt_content = serde_json::to_vec(&receipt_content).expect("Failed to serialize Receipt EDU to JSON vec"); - events.push(receipt_content); + Some(receipt_content) } /// Look for read receipts in this room async fn select_edus_receipts_room( - &self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, num: &mut usize, + &self, room_id: &RoomId, since: (u64, u64), max_edu_count: &AtomicU64, num: &mut usize, ) -> ReceiptMap { let receipts = self .services .read_receipt - .readreceipts_since(room_id, since); + .readreceipts_since(room_id, since.0); pin_mut!(receipts); let mut read = BTreeMap::::new(); while let Some((user_id, count, read_receipt)) = receipts.next().await { - *max_edu_count = cmp::max(count, *max_edu_count); + if count > since.1 { + break; + } + + max_edu_count.fetch_max(count, Ordering::Relaxed); if !self.services.globals.user_is_local(user_id) { continue; } @@ -423,16 +448,18 @@ impl Service { /// Look for presence async fn select_edus_presence( - &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, - ) { - debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached"); - - let presence_since = self.services.presence.presence_since(since); + &self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64, + ) -> Option> { + let presence_since = self.services.presence.presence_since(since.0); pin_mut!(presence_since); let mut presence_updates = HashMap::::new(); while let Some((user_id, count, presence_bytes)) = presence_since.next().await { - *max_edu_count = cmp::max(count, *max_edu_count); + if count > since.1 { + break; + } + + max_edu_count.fetch_max(count, Ordering::Relaxed); if !self.services.globals.user_is_local(user_id) { continue; } @@ -474,7 +501,7 @@ impl Service { } if presence_updates.is_empty() { - return; + return None; } let presence_content = Edu::Presence(PresenceContent { @@ -483,7 +510,7 @@ impl Service { let presence_content = serde_json::to_vec(&presence_content).expect("failed to serialize Presence EDU to JSON"); - events.push(presence_content); + Some(presence_content) } async fn send_events(&self, dest: Destination, events: Vec) -> SendingResult {