From 473b29d5244d55f0b279b4d1bd6fa75c171faca3 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 2 Jul 2024 22:40:58 +0000 Subject: [PATCH] move remaining runtime caches into their respective service Signed-off-by: Jason Volk --- Cargo.lock | 1 - src/database/Cargo.toml | 1 - src/database/database.rs | 24 ++-------------- src/service/globals/data.rs | 40 +++++++++++++-------------- src/service/globals/migrations.rs | 6 ++-- src/service/rooms/auth_chain/data.rs | 29 +++++++++++-------- src/service/rooms/auth_chain/mod.rs | 11 ++++++-- src/service/rooms/state_cache/data.rs | 17 ++++++------ src/service/rooms/state_cache/mod.rs | 13 +++++++++ src/service/rooms/timeline/data.rs | 21 ++++++++------ src/service/rooms/timeline/mod.rs | 13 +++++++++ src/service/uiaa/data.rs | 17 ++++++------ 12 files changed, 108 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edc41835..05547769 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -696,7 +696,6 @@ dependencies = [ "chrono", "conduit_core", "log", - "lru-cache", "ruma", "rust-rocksdb-uwu", "tokio", diff --git a/src/database/Cargo.toml b/src/database/Cargo.toml index 104ef93d..f7f3c773 100644 --- a/src/database/Cargo.toml +++ b/src/database/Cargo.toml @@ -38,7 +38,6 @@ zstd_compression = [ chrono.workspace = true conduit-core.workspace = true log.workspace = true -lru-cache.workspace = true ruma.workspace = true rust-rocksdb.workspace = true tokio.workspace = true diff --git a/src/database/database.rs b/src/database/database.rs index 833b0047..2c6c6808 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -1,41 +1,21 @@ -use std::{ - collections::{BTreeMap, HashMap}, - ops::Index, - sync::{Arc, Mutex, RwLock}, -}; +use std::{ops::Index, sync::Arc}; -use conduit::{PduCount, Result, Server}; -use lru_cache::LruCache; -use ruma::{CanonicalJsonValue, OwnedDeviceId, OwnedRoomId, OwnedUserId}; +use conduit::{Result, Server}; use crate::{cork::Cork, maps, maps::Maps, Engine, Map}; pub struct Database { pub db: Arc, pub map: Maps, - - //TODO: not a database - pub userdevicesessionid_uiaarequest: RwLock>, - pub auth_chain_cache: Mutex, Arc<[u64]>>>, - pub appservice_in_room_cache: RwLock>>, - pub lasttimelinecount_cache: Mutex>, } impl Database { /// Load an existing database or create a new one. pub async fn open(server: &Arc) -> Result { - let config = &server.config; let db = Engine::open(server)?; Ok(Self { db: db.clone(), map: maps::open(&db)?, - - userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), - appservice_in_room_cache: RwLock::new(HashMap::new()), - lasttimelinecount_cache: Mutex::new(HashMap::new()), - auth_chain_cache: Mutex::new(LruCache::new( - (f64::from(config.auth_chain_cache_capacity) * config.conduit_cache_capacity_modifier) as usize, - )), }) } diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 823adb46..022762f4 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,12 +1,11 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::BTreeMap, sync::{Arc, RwLock}, }; use conduit::{trace, utils, Error, Result}; use database::{Database, Map}; use futures_util::{stream::FuturesUnordered, StreamExt}; -use lru_cache::LruCache; use ruma::{ api::federation::discovery::{ServerSigningKeys, VerifyKey}, signatures::Ed25519KeyPair, @@ -210,36 +209,37 @@ impl Data { pub fn cleanup(&self) -> Result<()> { self.db.db.cleanup() } pub fn memory_usage(&self) -> String { - let auth_chain_cache = self.db.auth_chain_cache.lock().unwrap().len(); - let appservice_in_room_cache = self.db.appservice_in_room_cache.read().unwrap().len(); - let lasttimelinecount_cache = self.db.lasttimelinecount_cache.lock().unwrap().len(); - - let max_auth_chain_cache = self.db.auth_chain_cache.lock().unwrap().capacity(); - let max_appservice_in_room_cache = self.db.appservice_in_room_cache.read().unwrap().capacity(); - let max_lasttimelinecount_cache = self.db.lasttimelinecount_cache.lock().unwrap().capacity(); + let (auth_chain_cache, max_auth_chain_cache) = services().rooms.auth_chain.get_cache_usage(); + let (appservice_in_room_cache, max_appservice_in_room_cache) = services() + .rooms + .state_cache + .get_appservice_in_room_cache_usage(); + let (lasttimelinecount_cache, max_lasttimelinecount_cache) = services() + .rooms + .timeline + .get_lasttimelinecount_cache_usage(); format!( - "\ -auth_chain_cache: {auth_chain_cache} / {max_auth_chain_cache} -appservice_in_room_cache: {appservice_in_room_cache} / {max_appservice_in_room_cache} -lasttimelinecount_cache: {lasttimelinecount_cache} / {max_lasttimelinecount_cache}\n\n -{}", + "auth_chain_cache: {auth_chain_cache} / {max_auth_chain_cache}\nappservice_in_room_cache: \ + {appservice_in_room_cache} / {max_appservice_in_room_cache}\nlasttimelinecount_cache: \ + {lasttimelinecount_cache} / {max_lasttimelinecount_cache}\n\n{}", self.db.db.memory_usage().unwrap_or_default() ) } + #[allow(clippy::unused_self)] pub fn clear_caches(&self, amount: u32) { if amount > 1 { - let c = &mut *self.db.auth_chain_cache.lock().unwrap(); - *c = LruCache::new(c.capacity()); + services().rooms.auth_chain.clear_cache(); } if amount > 2 { - let c = &mut *self.db.appservice_in_room_cache.write().unwrap(); - *c = HashMap::new(); + services() + .rooms + .state_cache + .clear_appservice_in_room_cache(); } if amount > 3 { - let c = &mut *self.db.lasttimelinecount_cache.lock().unwrap(); - *c = HashMap::new(); + services().rooms.timeline.clear_lasttimelinecount_cache(); } } diff --git a/src/service/globals/migrations.rs b/src/service/globals/migrations.rs index a98eec4e..5cc25a1d 100644 --- a/src/service/globals/migrations.rs +++ b/src/service/globals/migrations.rs @@ -595,8 +595,10 @@ async fn db_lt_10(db: &Arc, _config: &Config) -> Result<()> { Ok(()) } -async fn db_lt_11(db: &Arc, _config: &Config) -> Result<()> { - let _userdevicesessionid_uiaarequest = &db["userdevicesessionid_uiaarequest"]; +#[allow(unreachable_code)] +async fn db_lt_11(_db: &Arc, _config: &Config) -> Result<()> { + todo!("Dropping a column to clear data is not implemented yet."); + //let userdevicesessionid_uiaarequest = &db["userdevicesessionid_uiaarequest"]; //userdevicesessionid_uiaarequest.clear()?; services().globals.bump_database_version(11)?; diff --git a/src/service/rooms/auth_chain/data.rs b/src/service/rooms/auth_chain/data.rs index 4e844d6c..b2e80b7b 100644 --- a/src/service/rooms/auth_chain/data.rs +++ b/src/service/rooms/auth_chain/data.rs @@ -1,24 +1,31 @@ -use std::{mem::size_of, sync::Arc}; +use std::{ + mem::size_of, + sync::{Arc, Mutex}, +}; -use conduit::{utils, Result}; +use conduit::{utils, Result, Server}; use database::{Database, Map}; +use lru_cache::LruCache; pub(super) struct Data { shorteventid_authchain: Arc, - db: Arc, + pub(super) auth_chain_cache: Mutex, Arc<[u64]>>>, } impl Data { - pub(super) fn new(db: &Arc) -> Self { + pub(super) fn new(server: &Arc, db: &Arc) -> Self { + let config = &server.config; + let cache_size = f64::from(config.auth_chain_cache_capacity); + let cache_size = (cache_size * config.conduit_cache_capacity_modifier) as usize; Self { shorteventid_authchain: db["shorteventid_authchain"].clone(), - db: db.clone(), + auth_chain_cache: Mutex::new(LruCache::new(cache_size)), } } pub(super) fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result>> { // Check RAM cache - if let Some(result) = self.db.auth_chain_cache.lock().unwrap().get_mut(key) { + if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) { return Ok(Some(Arc::clone(result))); } @@ -37,10 +44,9 @@ impl Data { if let Some(chain) = chain { // Cache in RAM - self.db - .auth_chain_cache + self.auth_chain_cache .lock() - .unwrap() + .expect("locked") .insert(vec![key[0]], Arc::clone(&chain)); return Ok(Some(chain)); @@ -63,10 +69,9 @@ impl Data { } // Cache in RAM - self.db - .auth_chain_cache + self.auth_chain_cache .lock() - .unwrap() + .expect("locked") .insert(key, auth_chain); Ok(()) diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index ca9cf5f2..6b14fda0 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -17,9 +17,9 @@ pub struct Service { } impl Service { - pub fn build(_server: &Arc, db: &Arc) -> Result { + pub fn build(server: &Arc, db: &Arc) -> Result { Ok(Self { - db: Data::new(db), + db: Data::new(server, db), }) } @@ -181,4 +181,11 @@ impl Service { self.db .cache_auth_chain(key, auth_chain.iter().copied().collect::>()) } + + pub fn get_cache_usage(&self) -> (usize, usize) { + let cache = self.db.auth_chain_cache.lock().expect("locked"); + (cache.len(), cache.capacity()) + } + + pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); } } diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index c8d05ab0..f79ee678 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -1,4 +1,7 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, +}; use conduit::{utils, Error, Result}; use database::{Database, Map}; @@ -13,6 +16,7 @@ use crate::{appservice::RegistrationInfo, services, user_is_local}; type StrippedStateEventIter<'a> = Box>)>> + 'a>; type AnySyncStateEventIter<'a> = Box>)>> + 'a>; +type AppServiceInRoomCache = RwLock>>; pub(super) struct Data { userroomid_joined: Arc, @@ -27,7 +31,7 @@ pub(super) struct Data { roomid_invitedcount: Arc, roomserverids: Arc, serverroomids: Arc, - db: Arc, + pub(super) appservice_in_room_cache: AppServiceInRoomCache, } impl Data { @@ -45,7 +49,7 @@ impl Data { roomid_invitedcount: db["roomid_invitedcount"].clone(), roomserverids: db["roomserverids"].clone(), serverroomids: db["serverroomids"].clone(), - db: db.clone(), + appservice_in_room_cache: RwLock::new(HashMap::new()), } } @@ -201,8 +205,7 @@ impl Data { self.serverroomids.insert(&serverroom_id, &[])?; } - self.db - .appservice_in_room_cache + self.appservice_in_room_cache .write() .unwrap() .remove(room_id); @@ -213,7 +216,6 @@ impl Data { #[tracing::instrument(skip(self, room_id, appservice))] pub(super) fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result { let maybe = self - .db .appservice_in_room_cache .read() .unwrap() @@ -235,8 +237,7 @@ impl Data { .room_members(room_id) .any(|userid| userid.map_or(false, |userid| appservice.users.is_match(userid.as_str()))); - self.db - .appservice_in_room_cache + self.appservice_in_room_cache .write() .unwrap() .entry(room_id.to_owned()) diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 6572fcba..5038ef1c 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -441,4 +441,17 @@ impl Service { Ok(servers) } + + pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) { + let cache = self.db.appservice_in_room_cache.read().expect("locked"); + (cache.len(), cache.capacity()) + } + + pub fn clear_appservice_in_room_cache(&self) { + self.db + .appservice_in_room_cache + .write() + .expect("locked") + .clear(); + } } diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 023457fa..2054caf7 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,8 +1,12 @@ -use std::{collections::hash_map, mem::size_of, sync::Arc}; +use std::{ + collections::{hash_map, HashMap}, + mem::size_of, + sync::{Arc, Mutex}, +}; use conduit::{error, utils, Error, Result}; use database::{Database, Map}; -use ruma::{api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId}; +use ruma::{api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId}; use crate::{services, PduCount, PduEvent}; @@ -12,11 +16,12 @@ pub(super) struct Data { eventid_outlierpdu: Arc, userroomid_notificationcount: Arc, userroomid_highlightcount: Arc, - db: Arc, + pub(super) lasttimelinecount_cache: LastTimelineCountCache, } type PdusIterItem = Result<(PduCount, PduEvent)>; type PdusIterator<'a> = Box + 'a>; +type LastTimelineCountCache = Mutex>; impl Data { pub(super) fn new(db: &Arc) -> Self { @@ -26,16 +31,15 @@ impl Data { eventid_outlierpdu: db["eventid_outlierpdu"].clone(), userroomid_notificationcount: db["userroomid_notificationcount"].clone(), userroomid_highlightcount: db["userroomid_highlightcount"].clone(), - db: db.clone(), + lasttimelinecount_cache: Mutex::new(HashMap::new()), } } pub(super) fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result { match self - .db .lasttimelinecount_cache .lock() - .unwrap() + .expect("locked") .entry(room_id.to_owned()) { hash_map::Entry::Vacant(v) => { @@ -162,10 +166,9 @@ impl Data { &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), )?; - self.db - .lasttimelinecount_cache + self.lasttimelinecount_cache .lock() - .unwrap() + .expect("locked") .insert(pdu.room_id.clone(), PduCount::Normal(count)); self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?; diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 7aed2c0d..b9c0d7ba 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1241,6 +1241,19 @@ impl Service { debug!("Prepended backfill pdu"); Ok(()) } + + pub fn get_lasttimelinecount_cache_usage(&self) -> (usize, usize) { + let cache = self.db.lasttimelinecount_cache.lock().expect("locked"); + (cache.len(), cache.capacity()) + } + + pub fn clear_lasttimelinecount_cache(&self) { + self.db + .lasttimelinecount_cache + .lock() + .expect("locked") + .clear(); + } } #[cfg(test)] diff --git a/src/service/uiaa/data.rs b/src/service/uiaa/data.rs index 71d43dc2..ce071da0 100644 --- a/src/service/uiaa/data.rs +++ b/src/service/uiaa/data.rs @@ -1,30 +1,32 @@ -use std::sync::Arc; +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; use conduit::{Error, Result}; use database::{Database, Map}; use ruma::{ api::client::{error::ErrorKind, uiaa::UiaaInfo}, - CanonicalJsonValue, DeviceId, UserId, + CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedUserId, UserId, }; pub struct Data { + userdevicesessionid_uiaarequest: RwLock>, userdevicesessionid_uiaainfo: Arc, - db: Arc, } impl Data { pub(super) fn new(db: &Arc) -> Self { Self { + userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), userdevicesessionid_uiaainfo: db["userdevicesessionid_uiaainfo"].clone(), - db: db.clone(), } } pub(super) fn set_uiaa_request( &self, user_id: &UserId, device_id: &DeviceId, session: &str, request: &CanonicalJsonValue, ) -> Result<()> { - self.db - .userdevicesessionid_uiaarequest + self.userdevicesessionid_uiaarequest .write() .unwrap() .insert( @@ -38,8 +40,7 @@ impl Data { pub(super) fn get_uiaa_request( &self, user_id: &UserId, device_id: &DeviceId, session: &str, ) -> Option { - self.db - .userdevicesessionid_uiaarequest + self.userdevicesessionid_uiaarequest .read() .unwrap() .get(&(user_id.to_owned(), device_id.to_owned(), session.to_owned()))