continuwuity/src/service/mod.rs
Benjamin Lee 8a5599adf9 add optional support for tokio-console
This turned out to be quite hairy, mostly because we need to apply the
config's log level filter to the actual logs (stdout and, optionally
sentry), but do not want to filter out the tokio tracing events needed by
the console_subscriber. I hit several edge cases in tracing getting
this to work, and we now depend on a git version of tracing with a
backported patch :(
2024-05-03 01:52:29 -04:00

297 lines
7.2 KiB
Rust

use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex as StdMutex},
};
use lru_cache::LruCache;
use tokio::sync::{broadcast, Mutex, RwLock};
use crate::{Config, LogLevelReloadHandles, Result};
pub(crate) mod account_data;
pub(crate) mod admin;
pub(crate) mod appservice;
pub(crate) mod globals;
pub(crate) mod key_backups;
pub(crate) mod media;
pub(crate) mod pdu;
pub(crate) mod presence;
pub(crate) mod pusher;
pub(crate) mod rooms;
pub(crate) mod sending;
pub(crate) mod transaction_ids;
pub(crate) mod uiaa;
pub(crate) mod users;
pub(crate) struct Services<'a> {
pub(crate) appservice: appservice::Service,
pub(crate) pusher: pusher::Service,
pub(crate) rooms: rooms::Service,
pub(crate) transaction_ids: transaction_ids::Service,
pub(crate) uiaa: uiaa::Service,
pub(crate) users: users::Service,
pub(crate) account_data: account_data::Service,
pub(crate) presence: Arc<presence::Service>,
pub(crate) admin: Arc<admin::Service>,
pub(crate) globals: globals::Service<'a>,
pub(crate) key_backups: key_backups::Service,
pub(crate) media: media::Service,
pub(crate) sending: Arc<sending::Service>,
}
impl Services<'_> {
pub(crate) fn build<
D: appservice::Data
+ pusher::Data
+ rooms::Data
+ transaction_ids::Data
+ uiaa::Data
+ users::Data
+ account_data::Data
+ presence::Data
+ globals::Data
+ key_backups::Data
+ media::Data
+ sending::Data
+ 'static,
>(
db: &'static D, config: &Config, tracing_reload_handle: LogLevelReloadHandles,
) -> Result<Self> {
Ok(Self {
appservice: appservice::Service::build(db)?,
pusher: pusher::Service {
db,
},
rooms: rooms::Service {
alias: rooms::alias::Service {
db,
},
auth_chain: rooms::auth_chain::Service {
db,
},
directory: rooms::directory::Service {
db,
},
event_handler: rooms::event_handler::Service,
lazy_loading: rooms::lazy_loading::Service {
db,
lazy_load_waiting: Mutex::new(HashMap::new()),
},
metadata: rooms::metadata::Service {
db,
},
outlier: rooms::outlier::Service {
db,
},
pdu_metadata: rooms::pdu_metadata::Service {
db,
},
read_receipt: rooms::read_receipt::Service {
db,
},
search: rooms::search::Service {
db,
},
short: rooms::short::Service {
db,
},
state: rooms::state::Service {
db,
},
state_accessor: rooms::state_accessor::Service {
db,
server_visibility_cache: StdMutex::new(LruCache::new(
(f64::from(config.server_visibility_cache_capacity) * config.conduit_cache_capacity_modifier)
as usize,
)),
user_visibility_cache: StdMutex::new(LruCache::new(
(f64::from(config.user_visibility_cache_capacity) * config.conduit_cache_capacity_modifier)
as usize,
)),
},
state_cache: rooms::state_cache::Service {
db,
},
state_compressor: rooms::state_compressor::Service {
db,
stateinfo_cache: StdMutex::new(LruCache::new(
(f64::from(config.stateinfo_cache_capacity) * config.conduit_cache_capacity_modifier) as usize,
)),
},
timeline: rooms::timeline::Service {
db,
lasttimelinecount_cache: Mutex::new(HashMap::new()),
},
threads: rooms::threads::Service {
db,
},
typing: rooms::typing::Service {
typing: RwLock::new(BTreeMap::new()),
last_typing_update: RwLock::new(BTreeMap::new()),
typing_update_sender: broadcast::channel(100).0,
},
spaces: rooms::spaces::Service {
roomid_spacehierarchy_cache: Mutex::new(LruCache::new(
(f64::from(config.roomid_spacehierarchy_cache_capacity)
* config.conduit_cache_capacity_modifier) as usize,
)),
},
user: rooms::user::Service {
db,
},
},
transaction_ids: transaction_ids::Service {
db,
},
uiaa: uiaa::Service {
db,
},
users: users::Service {
db,
connections: StdMutex::new(BTreeMap::new()),
},
account_data: account_data::Service {
db,
},
presence: presence::Service::build(db, config),
admin: admin::Service::build(),
key_backups: key_backups::Service {
db,
},
media: media::Service {
db,
url_preview_mutex: RwLock::new(HashMap::new()),
},
sending: sending::Service::build(db, config),
globals: globals::Service::load(db, config, tracing_reload_handle)?,
})
}
async fn memory_usage(&self) -> String {
let lazy_load_waiting = self.rooms.lazy_loading.lazy_load_waiting.lock().await.len();
let server_visibility_cache = self
.rooms
.state_accessor
.server_visibility_cache
.lock()
.unwrap()
.len();
let user_visibility_cache = self
.rooms
.state_accessor
.user_visibility_cache
.lock()
.unwrap()
.len();
let stateinfo_cache = self
.rooms
.state_compressor
.stateinfo_cache
.lock()
.unwrap()
.len();
let lasttimelinecount_cache = self
.rooms
.timeline
.lasttimelinecount_cache
.lock()
.await
.len();
let roomid_spacehierarchy_cache = self
.rooms
.spaces
.roomid_spacehierarchy_cache
.lock()
.await
.len();
let resolver_overrides_cache = self.globals.resolver.overrides.read().unwrap().len();
let resolver_destinations_cache = self.globals.resolver.destinations.read().await.len();
let bad_event_ratelimiter = self.globals.bad_event_ratelimiter.read().await.len();
let bad_query_ratelimiter = self.globals.bad_query_ratelimiter.read().await.len();
let bad_signature_ratelimiter = self.globals.bad_signature_ratelimiter.read().await.len();
format!(
"\
lazy_load_waiting: {lazy_load_waiting}
server_visibility_cache: {server_visibility_cache}
user_visibility_cache: {user_visibility_cache}
stateinfo_cache: {stateinfo_cache}
lasttimelinecount_cache: {lasttimelinecount_cache}
roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}
resolver_overrides_cache: {resolver_overrides_cache}
resolver_destinations_cache: {resolver_destinations_cache}
bad_event_ratelimiter: {bad_event_ratelimiter}
bad_query_ratelimiter: {bad_query_ratelimiter}
bad_signature_ratelimiter: {bad_signature_ratelimiter}
"
)
}
async fn clear_caches(&self, amount: u32) {
if amount > 0 {
self.rooms
.lazy_loading
.lazy_load_waiting
.lock()
.await
.clear();
}
if amount > 1 {
self.rooms
.state_accessor
.server_visibility_cache
.lock()
.unwrap()
.clear();
}
if amount > 2 {
self.rooms
.state_accessor
.user_visibility_cache
.lock()
.unwrap()
.clear();
}
if amount > 3 {
self.rooms
.state_compressor
.stateinfo_cache
.lock()
.unwrap()
.clear();
}
if amount > 4 {
self.rooms
.timeline
.lasttimelinecount_cache
.lock()
.await
.clear();
}
if amount > 5 {
self.rooms
.spaces
.roomid_spacehierarchy_cache
.lock()
.await
.clear();
}
if amount > 6 {
self.globals.resolver.overrides.write().unwrap().clear();
self.globals.resolver.destinations.write().await.clear();
}
if amount > 7 {
self.globals.resolver.resolver.clear_cache();
}
if amount > 8 {
self.globals.bad_event_ratelimiter.write().await.clear();
}
if amount > 9 {
self.globals.bad_query_ratelimiter.write().await.clear();
}
if amount > 10 {
self.globals.bad_signature_ratelimiter.write().await.clear();
}
}
}