move remaining runtime caches into their respective service

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-02 22:40:58 +00:00
parent e1d1dac95e
commit 473b29d524
12 changed files with 108 additions and 85 deletions

View file

@ -1,24 +1,31 @@
use std::{mem::size_of, sync::Arc};
use std::{
mem::size_of,
sync::{Arc, Mutex},
};
use conduit::{utils, Result};
use conduit::{utils, Result, Server};
use database::{Database, Map};
use lru_cache::LruCache;
pub(super) struct Data {
shorteventid_authchain: Arc<Map>,
db: Arc<Database>,
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<[u64]>>>,
}
impl Data {
pub(super) fn new(db: &Arc<Database>) -> Self {
pub(super) fn new(server: &Arc<Server>, db: &Arc<Database>) -> Self {
let config = &server.config;
let cache_size = f64::from(config.auth_chain_cache_capacity);
let cache_size = (cache_size * config.conduit_cache_capacity_modifier) as usize;
Self {
shorteventid_authchain: db["shorteventid_authchain"].clone(),
db: db.clone(),
auth_chain_cache: Mutex::new(LruCache::new(cache_size)),
}
}
pub(super) fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<[u64]>>> {
// Check RAM cache
if let Some(result) = self.db.auth_chain_cache.lock().unwrap().get_mut(key) {
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) {
return Ok(Some(Arc::clone(result)));
}
@ -37,10 +44,9 @@ impl Data {
if let Some(chain) = chain {
// Cache in RAM
self.db
.auth_chain_cache
self.auth_chain_cache
.lock()
.unwrap()
.expect("locked")
.insert(vec![key[0]], Arc::clone(&chain));
return Ok(Some(chain));
@ -63,10 +69,9 @@ impl Data {
}
// Cache in RAM
self.db
.auth_chain_cache
self.auth_chain_cache
.lock()
.unwrap()
.expect("locked")
.insert(key, auth_chain);
Ok(())

View file

@ -17,9 +17,9 @@ pub struct Service {
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
db: Data::new(server, db),
})
}
@ -181,4 +181,11 @@ impl Service {
self.db
.cache_auth_chain(key, auth_chain.iter().copied().collect::<Arc<[u64]>>())
}
pub fn get_cache_usage(&self) -> (usize, usize) {
let cache = self.db.auth_chain_cache.lock().expect("locked");
(cache.len(), cache.capacity())
}
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); }
}

View file

@ -1,4 +1,7 @@
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
};
use conduit::{utils, Error, Result};
use database::{Database, Map};
@ -13,6 +16,7 @@ use crate::{appservice::RegistrationInfo, services, user_is_local};
type StrippedStateEventIter<'a> = Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>;
type AnySyncStateEventIter<'a> = Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>;
type AppServiceInRoomCache = RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
pub(super) struct Data {
userroomid_joined: Arc<Map>,
@ -27,7 +31,7 @@ pub(super) struct Data {
roomid_invitedcount: Arc<Map>,
roomserverids: Arc<Map>,
serverroomids: Arc<Map>,
db: Arc<Database>,
pub(super) appservice_in_room_cache: AppServiceInRoomCache,
}
impl Data {
@ -45,7 +49,7 @@ impl Data {
roomid_invitedcount: db["roomid_invitedcount"].clone(),
roomserverids: db["roomserverids"].clone(),
serverroomids: db["serverroomids"].clone(),
db: db.clone(),
appservice_in_room_cache: RwLock::new(HashMap::new()),
}
}
@ -201,8 +205,7 @@ impl Data {
self.serverroomids.insert(&serverroom_id, &[])?;
}
self.db
.appservice_in_room_cache
self.appservice_in_room_cache
.write()
.unwrap()
.remove(room_id);
@ -213,7 +216,6 @@ impl Data {
#[tracing::instrument(skip(self, room_id, appservice))]
pub(super) fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> {
let maybe = self
.db
.appservice_in_room_cache
.read()
.unwrap()
@ -235,8 +237,7 @@ impl Data {
.room_members(room_id)
.any(|userid| userid.map_or(false, |userid| appservice.users.is_match(userid.as_str())));
self.db
.appservice_in_room_cache
self.appservice_in_room_cache
.write()
.unwrap()
.entry(room_id.to_owned())

View file

@ -441,4 +441,17 @@ impl Service {
Ok(servers)
}
pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) {
let cache = self.db.appservice_in_room_cache.read().expect("locked");
(cache.len(), cache.capacity())
}
pub fn clear_appservice_in_room_cache(&self) {
self.db
.appservice_in_room_cache
.write()
.expect("locked")
.clear();
}
}

View file

@ -1,8 +1,12 @@
use std::{collections::hash_map, mem::size_of, sync::Arc};
use std::{
collections::{hash_map, HashMap},
mem::size_of,
sync::{Arc, Mutex},
};
use conduit::{error, utils, Error, Result};
use database::{Database, Map};
use ruma::{api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId};
use ruma::{api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId};
use crate::{services, PduCount, PduEvent};
@ -12,11 +16,12 @@ pub(super) struct Data {
eventid_outlierpdu: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
db: Arc<Database>,
pub(super) lasttimelinecount_cache: LastTimelineCountCache,
}
type PdusIterItem = Result<(PduCount, PduEvent)>;
type PdusIterator<'a> = Box<dyn Iterator<Item = PdusIterItem> + 'a>;
type LastTimelineCountCache = Mutex<HashMap<OwnedRoomId, PduCount>>;
impl Data {
pub(super) fn new(db: &Arc<Database>) -> Self {
@ -26,16 +31,15 @@ impl Data {
eventid_outlierpdu: db["eventid_outlierpdu"].clone(),
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
db: db.clone(),
lasttimelinecount_cache: Mutex::new(HashMap::new()),
}
}
pub(super) fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> {
match self
.db
.lasttimelinecount_cache
.lock()
.unwrap()
.expect("locked")
.entry(room_id.to_owned())
{
hash_map::Entry::Vacant(v) => {
@ -162,10 +166,9 @@ impl Data {
&serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"),
)?;
self.db
.lasttimelinecount_cache
self.lasttimelinecount_cache
.lock()
.unwrap()
.expect("locked")
.insert(pdu.room_id.clone(), PduCount::Normal(count));
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?;

View file

@ -1241,6 +1241,19 @@ impl Service {
debug!("Prepended backfill pdu");
Ok(())
}
pub fn get_lasttimelinecount_cache_usage(&self) -> (usize, usize) {
let cache = self.db.lasttimelinecount_cache.lock().expect("locked");
(cache.len(), cache.capacity())
}
pub fn clear_lasttimelinecount_cache(&self) {
self.db
.lasttimelinecount_cache
.lock()
.expect("locked")
.clear();
}
}
#[cfg(test)]