delete unnecessary real_users_cache, fix overwriting push_target iter, add proper function for getting local active users in room

this `real_users_cache` cache seems weird, and i have no idea what
prompted its creation upstream. perhaps they did this because
sqlite was very slow and their rocksdb setup is very poor, so
a "solution" was to stick member counts in memory.
slow iterators, scanning, etc do not apply to conduwuit where
our rocksdb is extremely tuned, and i seriously doubt something
like this would have any real world net-positive performance impact.

also for some reason, there is suspicious logic where we
overwrite the entire push target collection.

both of these things could be a potential cause for receiving
notifications in rooms we've left.

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-06-04 01:13:43 -04:00
parent c1227340b3
commit c738c119f8
6 changed files with 30 additions and 58 deletions

View file

@ -1,5 +1,5 @@
use std::{ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap},
path::Path, path::Path,
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
}; };
@ -150,7 +150,6 @@ pub struct KeyValueDatabase {
pub senderkey_pusher: Arc<dyn KvTree>, pub senderkey_pusher: Arc<dyn KvTree>,
pub auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<[u64]>>>, pub auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<[u64]>>>,
pub our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>, pub appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>, pub lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
} }
@ -265,7 +264,6 @@ impl KeyValueDatabase {
auth_chain_cache: Mutex::new(LruCache::new( auth_chain_cache: Mutex::new(LruCache::new(
(f64::from(config.auth_chain_cache_capacity) * config.conduit_cache_capacity_modifier) as usize, (f64::from(config.auth_chain_cache_capacity) * config.conduit_cache_capacity_modifier) as usize,
)), )),
our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()),
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
}) })

View file

@ -177,19 +177,16 @@ impl Data for KeyValueDatabase {
fn memory_usage(&self) -> String { fn memory_usage(&self) -> String {
let auth_chain_cache = self.auth_chain_cache.lock().unwrap().len(); let auth_chain_cache = self.auth_chain_cache.lock().unwrap().len();
let our_real_users_cache = self.our_real_users_cache.read().unwrap().len();
let appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().len(); let appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().len();
let lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().len(); let lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().len();
let max_auth_chain_cache = self.auth_chain_cache.lock().unwrap().capacity(); let max_auth_chain_cache = self.auth_chain_cache.lock().unwrap().capacity();
let max_our_real_users_cache = self.our_real_users_cache.read().unwrap().capacity();
let max_appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().capacity(); let max_appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().capacity();
let max_lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().capacity(); let max_lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().capacity();
format!( format!(
"\ "\
auth_chain_cache: {auth_chain_cache} / {max_auth_chain_cache} auth_chain_cache: {auth_chain_cache} / {max_auth_chain_cache}
our_real_users_cache: {our_real_users_cache} / {max_our_real_users_cache}
appservice_in_room_cache: {appservice_in_room_cache} / {max_appservice_in_room_cache} appservice_in_room_cache: {appservice_in_room_cache} / {max_appservice_in_room_cache}
lasttimelinecount_cache: {lasttimelinecount_cache} / {max_lasttimelinecount_cache}\n\n lasttimelinecount_cache: {lasttimelinecount_cache} / {max_lasttimelinecount_cache}\n\n
{}", {}",
@ -203,14 +200,10 @@ lasttimelinecount_cache: {lasttimelinecount_cache} / {max_lasttimelinecount_cach
*c = LruCache::new(c.capacity()); *c = LruCache::new(c.capacity());
} }
if amount > 2 { if amount > 2 {
let c = &mut *self.our_real_users_cache.write().unwrap();
*c = HashMap::new();
}
if amount > 3 {
let c = &mut *self.appservice_in_room_cache.write().unwrap(); let c = &mut *self.appservice_in_room_cache.write().unwrap();
*c = HashMap::new(); *c = HashMap::new();
} }
if amount > 4 { if amount > 3 {
let c = &mut *self.lasttimelinecount_cache.lock().unwrap(); let c = &mut *self.lasttimelinecount_cache.lock().unwrap();
*c = HashMap::new(); *c = HashMap::new();
} }

View file

@ -1,4 +1,4 @@
use std::{collections::HashSet, sync::Arc}; use std::collections::HashSet;
use itertools::Itertools; use itertools::Itertools;
use ruma::{ use ruma::{
@ -29,8 +29,6 @@ pub trait Data: Send + Sync {
fn update_joined_count(&self, room_id: &RoomId) -> Result<()>; fn update_joined_count(&self, room_id: &RoomId) -> Result<()>;
fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>>;
fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool>; fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool>;
/// Makes a user forget a room. /// Makes a user forget a room.
@ -48,6 +46,10 @@ pub trait Data: Send + Sync {
/// Returns an iterator over all joined members of a room. /// Returns an iterator over all joined members of a room.
fn room_members<'a>(&'a self, room_id: &RoomId) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>; fn room_members<'a>(&'a self, room_id: &RoomId) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
/// Returns a vec of all the users joined in a room who are active
/// (not guests, not deactivated users)
fn active_local_users_in_room(&self, room_id: &RoomId) -> Vec<OwnedUserId>;
fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>>; fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>>;
fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>>; fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>>;
@ -225,13 +227,9 @@ impl Data for KeyValueDatabase {
let mut joinedcount = 0_u64; let mut joinedcount = 0_u64;
let mut invitedcount = 0_u64; let mut invitedcount = 0_u64;
let mut joined_servers = HashSet::new(); let mut joined_servers = HashSet::new();
let mut real_users = HashSet::new();
for joined in self.room_members(room_id).filter_map(Result::ok) { for joined in self.room_members(room_id).filter_map(Result::ok) {
joined_servers.insert(joined.server_name().to_owned()); joined_servers.insert(joined.server_name().to_owned());
if user_is_local(&joined) && !services().users.is_deactivated(&joined).unwrap_or(true) {
real_users.insert(joined);
}
joinedcount = joinedcount.saturating_add(1); joinedcount = joinedcount.saturating_add(1);
} }
@ -245,11 +243,6 @@ impl Data for KeyValueDatabase {
self.roomid_invitedcount self.roomid_invitedcount
.insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?; .insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?;
self.our_real_users_cache
.write()
.unwrap()
.insert(room_id.to_owned(), Arc::new(real_users));
for old_joined_server in self.room_servers(room_id).filter_map(Result::ok) { for old_joined_server in self.room_servers(room_id).filter_map(Result::ok) {
if !joined_servers.remove(&old_joined_server) { if !joined_servers.remove(&old_joined_server) {
// Server not in room anymore // Server not in room anymore
@ -288,28 +281,6 @@ impl Data for KeyValueDatabase {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, room_id))]
fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>> {
let maybe = self
.our_real_users_cache
.read()
.unwrap()
.get(room_id)
.cloned();
if let Some(users) = maybe {
Ok(users)
} else {
self.update_joined_count(room_id)?;
Ok(Arc::clone(
self.our_real_users_cache
.read()
.unwrap()
.get(room_id)
.unwrap(),
))
}
}
#[tracing::instrument(skip(self, room_id, appservice))] #[tracing::instrument(skip(self, room_id, appservice))]
fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> { fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> {
let maybe = self let maybe = self
@ -429,6 +400,16 @@ impl Data for KeyValueDatabase {
})) }))
} }
/// Returns a vec of all our local users joined in a room who are active
/// (not guests / not deactivated users)
#[tracing::instrument(skip(self))]
fn active_local_users_in_room(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
self.room_members(room_id)
.filter_map(Result::ok)
.filter(|user| user_is_local(user) && !services().users.is_deactivated(user).unwrap_or(true))
.collect_vec()
}
/// Returns the number of users which are currently in a room /// Returns the number of users which are currently in a room
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> { fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> {

View file

@ -1,4 +1,4 @@
use std::{collections::HashSet, sync::Arc}; use std::sync::Arc;
use data::Data; use data::Data;
use itertools::Itertools; use itertools::Itertools;
@ -212,11 +212,6 @@ impl Service {
#[tracing::instrument(skip(self, room_id))] #[tracing::instrument(skip(self, room_id))]
pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { self.db.update_joined_count(room_id) } pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { self.db.update_joined_count(room_id) }
#[tracing::instrument(skip(self, room_id))]
pub fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>> {
self.db.get_our_real_users(room_id)
}
#[tracing::instrument(skip(self, room_id, appservice))] #[tracing::instrument(skip(self, room_id, appservice))]
pub fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> { pub fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> {
self.db.appservice_in_room(room_id, appservice) self.db.appservice_in_room(room_id, appservice)
@ -278,6 +273,13 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> { self.db.room_joined_count(room_id) } pub fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> { self.db.room_joined_count(room_id) }
#[tracing::instrument(skip(self))]
/// Returns a vec of all the users joined in a room who are active
/// (not guests, not deactivated users)
pub fn active_local_users_in_room(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
self.db.active_local_users_in_room(room_id)
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>> { self.db.room_invited_count(room_id) } pub fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>> { self.db.room_invited_count(room_id) }

View file

@ -309,21 +309,19 @@ impl Service {
let mut push_target = services() let mut push_target = services()
.rooms .rooms
.state_cache .state_cache
.get_our_real_users(&pdu.room_id)?; .active_local_users_in_room(&pdu.room_id);
if pdu.kind == TimelineEventType::RoomMember { if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key { if let Some(state_key) = &pdu.state_key {
let target_user_id = UserId::parse(state_key.clone()).expect("This state_key was previously validated"); let target_user_id = UserId::parse(state_key.clone()).expect("This state_key was previously validated");
if !push_target.contains(&target_user_id) { if !push_target.contains(&target_user_id) {
let mut target = push_target.as_ref().clone(); push_target.push(target_user_id);
target.insert(target_user_id);
push_target = Arc::new(target);
} }
} }
} }
for user in push_target.iter() { for user in &push_target {
// Don't notify the user of their own events // Don't notify the user of their own events
if user == &pdu.sender { if user == &pdu.sender {
continue; continue;

View file

@ -485,7 +485,7 @@ async fn send_events_dest_push(
.ok_or_else(|| { .ok_or_else(|| {
( (
dest.clone(), dest.clone(),
Error::bad_database("[Push] Event in servernamevent_datas not found in db."), Error::bad_database("[Push] Event in servernameevent_data not found in db."),
) )
})?, })?,
); );
@ -567,7 +567,7 @@ async fn send_events_dest_normal(
); );
( (
dest.clone(), dest.clone(),
Error::bad_database("[Normal] Event in servernamevent_datas not found in db."), Error::bad_database("[Normal] Event in servernameevent_data not found in db."),
) )
})?, })?,
); );