aggregate receipts into single edu; dedup presence; refactor selection limits etc

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-22 05:30:28 +00:00 committed by strawberry
parent a74461fc9a
commit d35376a90c
3 changed files with 190 additions and 130 deletions

View file

@ -21,16 +21,16 @@ use ruma::{
OwnedEventId, ServerName, OwnedEventId, ServerName,
}; };
use serde_json::value::RawValue as RawJsonValue; use serde_json::value::RawValue as RawJsonValue;
use service::{
sending::{EDU_LIMIT, PDU_LIMIT},
Services,
};
use crate::{ use crate::{
services::Services,
utils::{self}, utils::{self},
Ruma, Ruma,
}; };
const PDU_LIMIT: usize = 50;
const EDU_LIMIT: usize = 100;
type ResolvedMap = BTreeMap<OwnedEventId, Result<()>>; type ResolvedMap = BTreeMap<OwnedEventId, Result<()>>;
/// # `PUT /_matrix/federation/v1/send/{txnId}` /// # `PUT /_matrix/federation/v1/send/{txnId}`

View file

@ -20,7 +20,10 @@ use ruma::{
use tokio::sync::Mutex; use tokio::sync::Mutex;
use self::data::Data; use self::data::Data;
pub use self::dest::Destination; pub use self::{
dest::Destination,
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{account_data, client, globals, presence, pusher, resolver, rooms, server_keys, users, Dep}; use crate::{account_data, client, globals, presence, pusher, resolver, rooms, server_keys, users, Dep};
pub struct Service { pub struct Service {

View file

@ -7,7 +7,7 @@ use std::{
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use conduit::{ use conduit::{
debug, debug_warn, err, debug, debug_warn, err, error,
result::LogErr, result::LogErr,
trace, trace,
utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt}, utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt},
@ -26,8 +26,8 @@ use ruma::{
}, },
device_id, device_id,
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
ServerName, UInt, RoomVersionId, ServerName, UInt,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::time::sleep_until; use tokio::time::sleep_until;
@ -47,10 +47,16 @@ type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>; type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
type CurTransactionStatus = HashMap<Destination, TransactionStatus>; type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
const DEQUEUE_LIMIT: usize = 48;
const SELECT_EDU_LIMIT: usize = 16;
const CLEANUP_TIMEOUT_MS: u64 = 3500; const CLEANUP_TIMEOUT_MS: u64 = 3500;
const SELECT_PRESENCE_LIMIT: usize = 256;
const SELECT_RECEIPT_LIMIT: usize = 256;
const SELECT_EDU_LIMIT: usize = EDU_LIMIT - 2;
const DEQUEUE_LIMIT: usize = 48;
pub const PDU_LIMIT: usize = 50;
pub const EDU_LIMIT: usize = 100;
impl Service { impl Service {
#[tracing::instrument(skip_all, name = "sender")] #[tracing::instrument(skip_all, name = "sender")]
pub(super) async fn sender(&self) -> Result<()> { pub(super) async fn sender(&self) -> Result<()> {
@ -216,6 +222,7 @@ impl Service {
// Add EDU's into the transaction // Add EDU's into the transaction
if let Destination::Normal(server_name) = dest { if let Destination::Normal(server_name) = dest {
if let Ok((select_edus, last_count)) = self.select_edus(server_name).await { if let Ok((select_edus, last_count)) = self.select_edus(server_name).await {
debug_assert!(select_edus.len() <= EDU_LIMIT, "exceeded edus limit");
events.extend(select_edus.into_iter().map(SendingEvent::Edu)); events.extend(select_edus.into_iter().map(SendingEvent::Edu));
self.db.set_latest_educount(server_name, last_count); self.db.set_latest_educount(server_name, last_count);
} }
@ -254,39 +261,53 @@ impl Service {
async fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> { async fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> {
// u64: count of last edu // u64: count of last edu
let since = self.db.get_latest_educount(server_name).await; let since = self.db.get_latest_educount(server_name).await;
let mut events = Vec::new();
let mut max_edu_count = since; let mut max_edu_count = since;
let mut device_list_changes = HashSet::new(); let mut events = Vec::new();
self.select_edus_device_changes(server_name, since, &mut max_edu_count, &mut events)
.await;
if self.server.config.allow_outgoing_read_receipts {
self.select_edus_receipts(server_name, since, &mut max_edu_count, &mut events)
.await;
}
if self.server.config.allow_outgoing_presence {
self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events)
.await;
}
Ok((events, max_edu_count))
}
/// Look for presence
async fn select_edus_device_changes(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) {
debug_assert!(events.len() < SELECT_EDU_LIMIT, "called when edu limit reached");
let server_rooms = self.services.state_cache.server_rooms(server_name); let server_rooms = self.services.state_cache.server_rooms(server_name);
pin_mut!(server_rooms); pin_mut!(server_rooms);
let mut device_list_changes = HashSet::<OwnedUserId>::new();
while let Some(room_id) = server_rooms.next().await { while let Some(room_id) = server_rooms.next().await {
// Look for device list updates in this room let keys_changed = self
device_list_changes.extend( .services
self.services
.users .users
.keys_changed(room_id.as_str(), since, None) .room_keys_changed(room_id, since, None)
.ready_filter(|user_id| self.services.globals.user_is_local(user_id)) .ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
);
if self.server.config.allow_outgoing_read_receipts pin_mut!(keys_changed);
&& !self while let Some((user_id, count)) = keys_changed.next().await {
.select_edus_receipts(room_id, since, &mut max_edu_count, &mut events) *max_edu_count = cmp::max(count, *max_edu_count);
.await? if !device_list_changes.insert(user_id.into()) {
{ continue;
break;
}
} }
for user_id in device_list_changes {
// Empty prev id forces synapse to resync; because synapse resyncs, // Empty prev id forces synapse to resync; because synapse resyncs,
// we can just insert placeholder data // we can just insert placeholder data
let edu = Edu::DeviceListUpdate(DeviceListUpdateContent { let edu = Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id, user_id: user_id.into(),
device_id: device_id!("placeholder").to_owned(), device_id: device_id!("placeholder").to_owned(),
device_display_name: Some("Placeholder".to_owned()), device_display_name: Some("Placeholder".to_owned()),
stream_id: uint!(1), stream_id: uint!(1),
@ -295,92 +316,78 @@ impl Service {
keys: None, keys: None,
}); });
events.push(serde_json::to_vec(&edu).expect("json can be serialized")); let edu = serde_json::to_vec(&edu).expect("failed to serialize device list update to JSON");
}
if self.server.config.allow_outgoing_presence { events.push(edu);
self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events) if events.len() >= SELECT_EDU_LIMIT {
.await?; return;
}
Ok((events, max_edu_count))
}
/// Look for presence
async fn select_edus_presence(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
let presence_since = self.services.presence.presence_since(since);
pin_mut!(presence_since);
let mut presence_updates = Vec::new();
while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if !self.services.globals.user_is_local(user_id) {
continue;
}
if !self
.services
.state_cache
.server_sees_user(server_name, user_id)
.await
{
continue;
}
let presence_event = self
.services
.presence
.from_json_bytes_to_event(&presence_bytes, &user_id)
.await?;
presence_updates.push(PresenceUpdate {
user_id,
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
last_active_ago: presence_event
.content
.last_active_ago
.unwrap_or_else(|| uint!(0)),
status_msg: presence_event.content.status_msg,
});
if presence_updates.len() >= SELECT_EDU_LIMIT {
break;
} }
} }
if !presence_updates.is_empty() {
let presence_content = Edu::Presence(PresenceContent::new(presence_updates));
events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized"));
} }
Ok(true)
} }
/// Look for read receipts in this room /// Look for read receipts in this room
async fn select_edus_receipts( async fn select_edus_receipts(
&self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>, &self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> { ) {
debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached");
let server_rooms = self.services.state_cache.server_rooms(server_name);
pin_mut!(server_rooms);
let mut num = 0;
let mut receipts = BTreeMap::<OwnedRoomId, ReceiptMap>::new();
while let Some(room_id) = server_rooms.next().await {
let receipt_map = self
.select_edus_receipts_room(room_id, since, max_edu_count, &mut num)
.await;
if !receipt_map.read.is_empty() {
receipts.insert(room_id.into(), receipt_map);
}
}
if receipts.is_empty() {
return;
}
let receipt_content = Edu::Receipt(ReceiptContent {
receipts,
});
let receipt_content =
serde_json::to_vec(&receipt_content).expect("Failed to serialize Receipt EDU to JSON vec");
events.push(receipt_content);
}
/// Look for read receipts in this room
async fn select_edus_receipts_room(
&self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, num: &mut usize,
) -> ReceiptMap {
let receipts = self let receipts = self
.services .services
.read_receipt .read_receipt
.readreceipts_since(room_id, since); .readreceipts_since(room_id, since);
pin_mut!(receipts); pin_mut!(receipts);
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
while let Some((user_id, count, read_receipt)) = receipts.next().await { while let Some((user_id, count, read_receipt)) = receipts.next().await {
*max_edu_count = cmp::max(count, *max_edu_count); *max_edu_count = cmp::max(count, *max_edu_count);
if !self.services.globals.user_is_local(&user_id) { if !self.services.globals.user_is_local(&user_id) {
continue; continue;
} }
let event = serde_json::from_str(read_receipt.json().get()) let Ok(event) = serde_json::from_str(read_receipt.json().get()) else {
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?; error!(?user_id, ?count, ?read_receipt, "Invalid edu event in read_receipts.");
continue;
};
let AnySyncEphemeralRoomEvent::Receipt(r) = event else {
error!(?user_id, ?count, ?event, "Invalid event type in read_receipts");
continue;
};
let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
let mut read = BTreeMap::new();
let (event_id, mut receipt) = r let (event_id, mut receipt) = r
.content .content
.0 .0
@ -394,37 +401,87 @@ impl Service {
.remove(&user_id) .remove(&user_id)
.expect("our read receipts always have the user here"); .expect("our read receipts always have the user here");
read.insert( let receipt_data = ReceiptData {
user_id, data: receipt,
ReceiptData {
data: receipt.clone(),
event_ids: vec![event_id.clone()], event_ids: vec![event_id.clone()],
},
);
let receipt_map = ReceiptMap {
read,
}; };
let mut receipts = BTreeMap::new(); if read.insert(user_id, receipt_data).is_none() {
receipts.insert(room_id.to_owned(), receipt_map); *num = num.saturating_add(1);
if *num >= SELECT_RECEIPT_LIMIT {
break;
}
}
}
Edu::Receipt(ReceiptContent { ReceiptMap {
receipts, read,
}) }
} else { }
Error::bad_database("Invalid event type in read_receipts");
/// Look for presence
async fn select_edus_presence(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) {
debug_assert!(events.len() < EDU_LIMIT, "called when edu limit reached");
let presence_since = self.services.presence.presence_since(since);
pin_mut!(presence_since);
let mut presence_updates = HashMap::<OwnedUserId, PresenceUpdate>::new();
while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if !self.services.globals.user_is_local(user_id) {
continue;
}
if !self
.services
.state_cache
.server_sees_user(server_name, user_id)
.await
{
continue;
}
let Ok(presence_event) = self
.services
.presence
.from_json_bytes_to_event(presence_bytes, user_id)
.await
.log_err()
else {
continue; continue;
}; };
events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); let update = PresenceUpdate {
user_id: user_id.into(),
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
status_msg: presence_event.content.status_msg,
last_active_ago: presence_event
.content
.last_active_ago
.unwrap_or_else(|| uint!(0)),
};
if events.len() >= SELECT_EDU_LIMIT { presence_updates.insert(user_id.into(), update);
return Ok(false); if presence_updates.len() >= SELECT_PRESENCE_LIMIT {
break;
} }
} }
Ok(true) if presence_updates.is_empty() {
return;
}
let presence_content = Edu::Presence(PresenceContent {
push: presence_updates.into_values().collect(),
});
let presence_content = serde_json::to_vec(&presence_content).expect("failed to serialize Presence EDU to JSON");
events.push(presence_content);
} }
async fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingResult { async fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingResult {