From 9ce95a703038e8603da62f15516f205ca70ad962 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 15 Mar 2025 04:07:53 +0000 Subject: [PATCH] make service memory_usage()/clear_cache() async trait Signed-off-by: Jason Volk --- src/service/globals/mod.rs | 6 ++- src/service/rooms/event_handler/mod.rs | 4 +- src/service/rooms/spaces/mod.rs | 14 +++++- src/service/rooms/state/mod.rs | 4 +- src/service/rooms/state_accessor/mod.rs | 6 ++- src/service/rooms/state_compressor/mod.rs | 6 ++- src/service/rooms/timeline/mod.rs | 4 +- src/service/service.rs | 4 +- src/service/services.rs | 57 ++++++++++------------- 9 files changed, 61 insertions(+), 44 deletions(-) diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 74f83228..1dd7db8e 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -7,6 +7,7 @@ use std::{ time::Instant, }; +use async_trait::async_trait; use conduwuit::{Result, Server, error, utils::bytes::pretty}; use data::Data; use regex::RegexSet; @@ -27,6 +28,7 @@ pub struct Service { type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let db = Data::new(&args); @@ -73,7 +75,7 @@ impl crate::Service for Service { })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result { + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { let (ber_count, ber_bytes) = self.bad_event_ratelimiter.read()?.iter().fold( (0_usize, 0_usize), |(mut count, mut bytes), (event_id, _)| { @@ -89,7 +91,7 @@ impl crate::Service for Service { Ok(()) } - fn clear_cache(&self) { + async fn clear_cache(&self) { self.bad_event_ratelimiter .write() .expect("locked for writing") diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index e9e79ce4..4944f3ec 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -17,6 +17,7 @@ use std::{ time::Instant, }; +use async_trait::async_trait; use conduwuit::{ Err, PduEvent, Result, RoomVersion, Server, utils::{MutexMap, TryFutureExtExt}, @@ -54,6 +55,7 @@ struct Services { type RoomMutexMap = MutexMap; type HandleTimeMap = HashMap; +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { @@ -79,7 +81,7 @@ impl crate::Service for Service { })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result<()> { + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { let mutex_federation = self.mutex_federation.len(); writeln!(out, "federation_mutex: {mutex_federation}")?; diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs index 1da38234..55897f9c 100644 --- a/src/service/rooms/spaces/mod.rs +++ b/src/service/rooms/spaces/mod.rs @@ -2,8 +2,9 @@ mod pagination_token; #[cfg(test)] mod tests; -use std::sync::Arc; +use std::{fmt::Write, sync::Arc}; +use async_trait::async_trait; use conduwuit::{ Err, Error, Result, implement, utils::{ @@ -70,6 +71,7 @@ pub enum Identifier<'a> { type Cache = LruCache>; +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let config = &args.server.config; @@ -90,6 +92,16 @@ impl crate::Service for Service { })) } + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { + let roomid_spacehierarchy_cache = self.roomid_spacehierarchy_cache.lock().await.len(); + + writeln!(out, "roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}")?; + + Ok(()) + } + + async fn clear_cache(&self) { self.roomid_spacehierarchy_cache.lock().await.clear(); } + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 8683a3be..56955497 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, fmt::Write, iter::once, sync::Arc}; +use async_trait::async_trait; use conduwuit::{ PduEvent, Result, err, result::FlatOk, @@ -56,6 +57,7 @@ struct Data { type RoomMutexMap = MutexMap; pub type RoomMutexGuard = MutexMapGuard; +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { @@ -79,7 +81,7 @@ impl crate::Service for Service { })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result { + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { let mutex = self.mutex.len(); writeln!(out, "state_mutex: {mutex}")?; diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 7004e35a..652fdbd7 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -8,6 +8,7 @@ use std::{ sync::{Arc, Mutex as StdMutex, Mutex}, }; +use async_trait::async_trait; use conduwuit::{ Result, err, utils, utils::math::{Expected, usize_from_f64}, @@ -57,6 +58,7 @@ struct Data { shorteventid_shortstatehash: Arc, } +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let config = &args.server.config; @@ -86,7 +88,7 @@ impl crate::Service for Service { })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result { + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { use utils::bytes::pretty; let (svc_count, svc_bytes) = self.server_visibility_cache.lock()?.iter().fold( @@ -119,7 +121,7 @@ impl crate::Service for Service { Ok(()) } - fn clear_cache(&self) { + async fn clear_cache(&self) { self.server_visibility_cache.lock().expect("locked").clear(); self.user_visibility_cache.lock().expect("locked").clear(); } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 305d3187..56a91d0e 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -5,6 +5,7 @@ use std::{ sync::{Arc, Mutex}, }; +use async_trait::async_trait; use conduwuit::{ Result, arrayvec::ArrayVec, @@ -65,6 +66,7 @@ type ParentStatesVec = Vec; pub type CompressedState = BTreeSet; pub type CompressedStateEvent = [u8; 2 * size_of::()]; +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let config = &args.server.config; @@ -82,7 +84,7 @@ impl crate::Service for Service { })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result { + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { let (cache_len, ents) = { let cache = self.stateinfo_cache.lock().expect("locked"); let ents = cache.iter().map(at!(1)).flat_map(|vec| vec.iter()).fold( @@ -108,7 +110,7 @@ impl crate::Service for Service { Ok(()) } - fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); } + async fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 826a1dae..dc359d22 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -9,6 +9,7 @@ use std::{ sync::Arc, }; +use async_trait::async_trait; use conduwuit::{ Err, Error, Result, Server, at, debug, debug_warn, err, error, implement, info, pdu::{EventHash, PduBuilder, PduCount, PduEvent, gen_event_id}, @@ -109,6 +110,7 @@ struct Services { type RoomMutexMap = MutexMap; pub type RoomMutexGuard = MutexMapGuard; +#[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { @@ -142,7 +144,7 @@ impl crate::Service for Service { })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result<()> { + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { let mutex_insert = self.mutex_insert.len(); writeln!(out, "insert_mutex: {mutex_insert}")?; diff --git a/src/service/service.rs b/src/service/service.rs index 2907a562..574efd8f 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -31,10 +31,10 @@ pub(crate) trait Service: Any + Send + Sync { fn interrupt(&self) {} /// Clear any caches or similar runtime state. - fn clear_cache(&self) {} + async fn clear_cache(&self) {} /// Memory usage report in a markdown string. - fn memory_usage(&self, _out: &mut dyn Write) -> Result<()> { Ok(()) } + async fn memory_usage(&self, _out: &mut (dyn Write + Send)) -> Result { Ok(()) } /// Return the name of the service. /// i.e. `crate::service::make_name(std::module_path!())` diff --git a/src/service/services.rs b/src/service/services.rs index 269a1f87..dc390054 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -1,12 +1,12 @@ use std::{ any::Any, collections::BTreeMap, - fmt::Write, sync::{Arc, RwLock}, }; -use conduwuit::{Result, Server, debug, debug_info, info, trace}; +use conduwuit::{Result, Server, debug, debug_info, info, trace, utils::stream::IterStream}; use database::Database; +use futures::{Stream, StreamExt, TryStreamExt}; use tokio::sync::Mutex; use crate::{ @@ -171,40 +171,21 @@ impl Services { } pub async fn clear_cache(&self) { - for (service, ..) in self.service.read().expect("locked for reading").values() { - if let Some(service) = service.upgrade() { - service.clear_cache(); - } - } - - //TODO - self.rooms - .spaces - .roomid_spacehierarchy_cache - .lock() - .await - .clear(); + self.services() + .for_each(|service| async move { + service.clear_cache().await; + }) + .await; } pub async fn memory_usage(&self) -> Result { - let mut out = String::new(); - for (service, ..) in self.service.read().expect("locked for reading").values() { - if let Some(service) = service.upgrade() { - service.memory_usage(&mut out)?; - } - } - - //TODO - let roomid_spacehierarchy_cache = self - .rooms - .spaces - .roomid_spacehierarchy_cache - .lock() + self.services() + .map(Ok) + .try_fold(String::new(), |mut out, service| async move { + service.memory_usage(&mut out).await?; + Ok(out) + }) .await - .len(); - writeln!(out, "roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}")?; - - Ok(out) } fn interrupt(&self) { @@ -217,6 +198,18 @@ impl Services { } } + /// Iterate from snapshot of the services map + fn services(&self) -> impl Stream> + Send { + self.service + .read() + .expect("locked for reading") + .values() + .filter_map(|val| val.0.upgrade()) + .collect::>() + .into_iter() + .stream() + } + #[inline] pub fn try_get(&self, name: &str) -> Result> where