parallelize sender edu selection

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-29 03:25:29 +00:00
parent 6175e72f1c
commit b5266ad9f5

View file

@ -1,7 +1,7 @@
use std::{
cmp,
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
time::{Duration, Instant},
};
@ -13,7 +13,12 @@ use conduit::{
utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt},
warn, Error, Result,
};
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{
future::{BoxFuture, OptionFuture},
join, pin_mut,
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use ruma::{
api::{
appservice::event::push_events::v1::Edu as RumaEdu,
@ -261,33 +266,45 @@ impl Service {
#[tracing::instrument(skip_all, level = "debug")]
async fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> {
// u64: count of last edu
// selection window
let since = self.db.get_latest_educount(server_name).await;
let mut max_edu_count = since;
let mut events = Vec::new();
let since_upper = self.services.globals.current_count()?;
let batch = (since, since_upper);
debug_assert!(batch.0 <= batch.1, "since range must not be negative");
self.select_edus_device_changes(server_name, since, &mut max_edu_count, &mut events)
.await;
let events_len = AtomicUsize::default();
let max_edu_count = AtomicU64::new(since);
if self.server.config.allow_outgoing_read_receipts {
self.select_edus_receipts(server_name, since, &mut max_edu_count, &mut events)
.await;
}
let device_changes = self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len);
if self.server.config.allow_outgoing_presence {
self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events)
.await;
}
let receipts: OptionFuture<_> = self
.server
.config
.allow_outgoing_read_receipts
.then(|| self.select_edus_receipts(server_name, batch, &max_edu_count))
.into();
Ok((events, max_edu_count))
let presence: OptionFuture<_> = self
.server
.config
.allow_outgoing_presence
.then(|| self.select_edus_presence(server_name, batch, &max_edu_count))
.into();
let (device_changes, receipts, presence) = join!(device_changes, receipts, presence);
let mut events = device_changes;
events.extend(presence.into_iter().flatten());
events.extend(receipts.into_iter().flatten());
Ok((events, max_edu_count.load(Ordering::Acquire)))
}
/// Look for presence
async fn select_edus_device_changes(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) {
debug_assert!(events.len() < SELECT_EDU_LIMIT, "called when edu limit reached");
&self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64, events_len: &AtomicUsize,
) -> Vec<Vec<u8>> {
let mut events = Vec::new();
let server_rooms = self.services.state_cache.server_rooms(server_name);
pin_mut!(server_rooms);
@ -296,12 +313,16 @@ impl Service {
let keys_changed = self
.services
.users
.room_keys_changed(room_id, since, None)
.room_keys_changed(room_id, since.0, None)
.ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
pin_mut!(keys_changed);
while let Some((user_id, count)) = keys_changed.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if count > since.1 {
break;
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !device_list_changes.insert(user_id.into()) {
continue;
}
@ -321,19 +342,19 @@ impl Service {
let edu = serde_json::to_vec(&edu).expect("failed to serialize device list update to JSON");
events.push(edu);
if events.len() >= SELECT_EDU_LIMIT {
return;
if events_len.fetch_add(1, Ordering::Relaxed) >= SELECT_EDU_LIMIT - 1 {
return events;
}
}
}
events
}
/// Look for read receipts in this room
async fn select_edus_receipts(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) {
debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached");
&self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64,
) -> Option<Vec<u8>> {
let server_rooms = self.services.state_cache.server_rooms(server_name);
pin_mut!(server_rooms);
@ -350,7 +371,7 @@ impl Service {
}
if receipts.is_empty() {
return;
return None;
}
let receipt_content = Edu::Receipt(ReceiptContent {
@ -360,22 +381,26 @@ impl Service {
let receipt_content =
serde_json::to_vec(&receipt_content).expect("Failed to serialize Receipt EDU to JSON vec");
events.push(receipt_content);
Some(receipt_content)
}
/// Look for read receipts in this room
async fn select_edus_receipts_room(
&self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, num: &mut usize,
&self, room_id: &RoomId, since: (u64, u64), max_edu_count: &AtomicU64, num: &mut usize,
) -> ReceiptMap {
let receipts = self
.services
.read_receipt
.readreceipts_since(room_id, since);
.readreceipts_since(room_id, since.0);
pin_mut!(receipts);
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
while let Some((user_id, count, read_receipt)) = receipts.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if count > since.1 {
break;
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !self.services.globals.user_is_local(user_id) {
continue;
}
@ -423,16 +448,18 @@ impl Service {
/// Look for presence
async fn select_edus_presence(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) {
debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached");
let presence_since = self.services.presence.presence_since(since);
&self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64,
) -> Option<Vec<u8>> {
let presence_since = self.services.presence.presence_since(since.0);
pin_mut!(presence_since);
let mut presence_updates = HashMap::<OwnedUserId, PresenceUpdate>::new();
while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if count > since.1 {
break;
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !self.services.globals.user_is_local(user_id) {
continue;
}
@ -474,7 +501,7 @@ impl Service {
}
if presence_updates.is_empty() {
return;
return None;
}
let presence_content = Edu::Presence(PresenceContent {
@ -483,7 +510,7 @@ impl Service {
let presence_content = serde_json::to_vec(&presence_content).expect("failed to serialize Presence EDU to JSON");
events.push(presence_content);
Some(presence_content)
}
async fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingResult {