additional weak references where applicable

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-28 09:03:17 +00:00 committed by Jason Volk
parent e37ac56dba
commit 15126ee1b2
4 changed files with 70 additions and 44 deletions

View file

@ -5,7 +5,7 @@ mod grant;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::{Arc, RwLock as StdRwLock}, sync::{Arc, RwLock as StdRwLock, Weak},
}; };
use async_trait::async_trait; use async_trait::async_trait;
@ -41,7 +41,7 @@ struct Services {
timeline: Dep<rooms::timeline::Service>, timeline: Dep<rooms::timeline::Service>,
state: Dep<rooms::state::Service>, state: Dep<rooms::state::Service>,
state_cache: Dep<rooms::state_cache::Service>, state_cache: Dep<rooms::state_cache::Service>,
services: StdRwLock<Option<Arc<crate::Services>>>, services: StdRwLock<Option<Weak<crate::Services>>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -174,7 +174,14 @@ impl Service {
} }
async fn process_command(&self, command: CommandInput) -> CommandResult { async fn process_command(&self, command: CommandInput) -> CommandResult {
let Some(services) = self.services.services.read().expect("locked").clone() else { let Some(services) = self
.services
.services
.read()
.expect("locked")
.as_ref()
.and_then(Weak::upgrade)
else {
return Err!("Services self-reference not initialized."); return Err!("Services self-reference not initialized.");
}; };
@ -365,7 +372,9 @@ impl Service {
/// Sets the self-reference to crate::Services which will provide context to /// Sets the self-reference to crate::Services which will provide context to
/// the admin commands. /// the admin commands.
pub(super) fn set_services(&self, services: Option<Arc<crate::Services>>) { pub(super) fn set_services(&self, services: &Option<Arc<crate::Services>>) {
*self.services.services.write().expect("locked for writing") = services; let receiver = &mut *self.services.services.write().expect("locked for writing");
let weak = services.as_ref().map(Arc::downgrade);
*receiver = weak;
} }
} }

View file

@ -60,7 +60,8 @@ impl Manager {
.read() .read()
.expect("locked for reading") .expect("locked for reading")
.values() .values()
.map(|v| v.0.clone()) .map(|val| val.0.upgrade())
.map(|arc| arc.expect("services available for manager startup"))
.collect(); .collect();
debug!("Starting service workers..."); debug!("Starting service workers...");

View file

@ -57,8 +57,10 @@ pub(crate) struct Dep<T> {
name: &'static str, name: &'static str,
} }
pub(crate) type Map = RwLock<BTreeMap<String, MapVal>>; pub(crate) type Map = RwLock<MapType>;
pub(crate) type MapVal = (Arc<dyn Service>, Arc<dyn Any + Send + Sync>); pub(crate) type MapType = BTreeMap<MapKey, MapVal>;
pub(crate) type MapVal = (Weak<dyn Service>, Weak<dyn Any + Send + Sync>);
pub(crate) type MapKey = String;
impl<T: Send + Sync + 'static> Deref for Dep<T> { impl<T: Send + Sync + 'static> Deref for Dep<T> {
type Target = Arc<T>; type Target = Arc<T>;
@ -76,9 +78,9 @@ impl<T: Send + Sync + 'static> Deref for Dep<T> {
} }
} }
impl Args<'_> { impl<'a> Args<'a> {
/// Create a lazy-reference to a service when constructing another Service. /// Create a lazy-reference to a service when constructing another Service.
pub(crate) fn depend<T: Send + Sync + 'static>(&self, name: &'static str) -> Dep<T> { pub(crate) fn depend<T: Send + Sync + 'a + 'static>(&'a self, name: &'static str) -> Dep<T> {
Dep::<T> { Dep::<T> {
dep: OnceLock::new(), dep: OnceLock::new(),
service: Arc::downgrade(self.service), service: Arc::downgrade(self.service),
@ -88,48 +90,55 @@ impl Args<'_> {
/// Create a reference immediately to a service when constructing another /// Create a reference immediately to a service when constructing another
/// Service. The other service must be constructed. /// Service. The other service must be constructed.
pub(crate) fn require<T: Send + Sync + 'static>(&self, name: &str) -> Arc<T> { require::<T>(self.service, name) } pub(crate) fn require<T: Send + Sync + 'a + 'static>(&'a self, name: &'static str) -> Arc<T> {
require::<T>(self.service, name)
}
} }
/// Reference a Service by name. Panics if the Service does not exist or was /// Reference a Service by name. Panics if the Service does not exist or was
/// incorrectly cast. /// incorrectly cast.
pub(crate) fn require<T: Send + Sync + 'static>(map: &Map, name: &str) -> Arc<T> { pub(crate) fn require<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(map: &'b Map, name: &'a str) -> Arc<T> {
try_get::<T>(map, name) try_get::<T>(map, name)
.inspect_err(inspect_log) .inspect_err(inspect_log)
.expect("Failure to reference service required by another service.") .expect("Failure to reference service required by another service.")
} }
/// Reference a Service by name. Returns Err if the Service does not exist or
/// was incorrectly cast.
pub(crate) fn try_get<T: Send + Sync + 'static>(map: &Map, name: &str) -> Result<Arc<T>> {
map.read()
.expect("locked for reading")
.get(name)
.map_or_else(
|| Err!("Service {name:?} does not exist or has not been built yet."),
|(_, s)| {
s.clone()
.downcast::<T>()
.map_err(|_| err!("Service {name:?} must be correctly downcast."))
},
)
}
/// Reference a Service by name. Returns None if the Service does not exist, but /// Reference a Service by name. Returns None if the Service does not exist, but
/// panics if incorrectly cast. /// panics if incorrectly cast.
/// ///
/// # Panics /// # Panics
/// Incorrect type is not a silent failure (None) as the type never has a reason /// Incorrect type is not a silent failure (None) as the type never has a reason
/// to be incorrect. /// to be incorrect.
pub(crate) fn get<T: Send + Sync + 'static>(map: &Map, name: &str) -> Option<Arc<T>> { pub(crate) fn get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(map: &'b Map, name: &'a str) -> Option<Arc<T>> {
map.read() map.read()
.expect("locked for reading") .expect("locked for reading")
.get(name) .get(name)
.map(|(_, s)| { .map(|(_, s)| {
s.clone() s.upgrade().map(|s| {
.downcast::<T>() s.downcast::<T>()
.expect("Service must be correctly downcast.") .expect("Service must be correctly downcast.")
}) })
})?
}
/// Reference a Service by name. Returns Err if the Service does not exist or
/// was incorrectly cast.
pub(crate) fn try_get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(map: &'b Map, name: &'a str) -> Result<Arc<T>> {
map.read()
.expect("locked for reading")
.get(name)
.map_or_else(
|| Err!("Service {name:?} does not exist or has not been built yet."),
|(_, s)| {
s.upgrade().map_or_else(
|| Err!("Service {name:?} no longer exists."),
|s| {
s.downcast::<T>()
.map_err(|_| err!("Service {name:?} must be correctly downcast."))
},
)
},
)
} }
/// Utility for service implementations; see Service::name() in the trait. /// Utility for service implementations; see Service::name() in the trait.

View file

@ -109,7 +109,7 @@ impl Services {
pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> { pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
debug_info!("Starting services..."); debug_info!("Starting services...");
self.admin.set_services(Some(Arc::clone(self))); self.admin.set_services(&Some(Arc::clone(self)));
globals::migrations::migrations(self).await?; globals::migrations::migrations(self).await?;
self.manager self.manager
.lock() .lock()
@ -131,7 +131,7 @@ impl Services {
manager.stop().await; manager.stop().await;
} }
self.admin.set_services(None); self.admin.set_services(&None);
debug_info!("Services shutdown complete."); debug_info!("Services shutdown complete.");
} }
@ -146,7 +146,9 @@ impl Services {
pub async fn clear_cache(&self) { pub async fn clear_cache(&self) {
for (service, ..) in self.service.read().expect("locked for reading").values() { for (service, ..) in self.service.read().expect("locked for reading").values() {
service.clear_cache(); if let Some(service) = service.upgrade() {
service.clear_cache();
}
} }
//TODO //TODO
@ -161,7 +163,9 @@ impl Services {
pub async fn memory_usage(&self) -> Result<String> { pub async fn memory_usage(&self) -> Result<String> {
let mut out = String::new(); let mut out = String::new();
for (service, ..) in self.service.read().expect("locked for reading").values() { for (service, ..) in self.service.read().expect("locked for reading").values() {
service.memory_usage(&mut out)?; if let Some(service) = service.upgrade() {
service.memory_usage(&mut out)?;
}
} }
//TODO //TODO
@ -179,27 +183,30 @@ impl Services {
fn interrupt(&self) { fn interrupt(&self) {
debug!("Interrupting services..."); debug!("Interrupting services...");
for (name, (service, ..)) in self.service.read().expect("locked for reading").iter() { for (name, (service, ..)) in self.service.read().expect("locked for reading").iter() {
trace!("Interrupting {name}"); if let Some(service) = service.upgrade() {
service.interrupt(); trace!("Interrupting {name}");
service.interrupt();
}
} }
} }
pub fn try_get<T: Send + Sync + 'static>(&self, name: &str) -> Result<Arc<T>> { pub fn try_get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(&'b self, name: &'a str) -> Result<Arc<T>> {
service::try_get::<T>(&self.service, name) service::try_get::<T>(&self.service, name)
} }
pub fn get<T: Send + Sync + 'static>(&self, name: &str) -> Option<Arc<T>> { service::get::<T>(&self.service, name) } pub fn get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(&'b self, name: &'a str) -> Option<Arc<T>> {
service::get::<T>(&self.service, name)
}
} }
#[allow(clippy::needless_pass_by_value)]
fn add_service(map: &Arc<Map>, s: Arc<dyn Service>, a: Arc<dyn Any + Send + Sync>) { fn add_service(map: &Arc<Map>, s: Arc<dyn Service>, a: Arc<dyn Any + Send + Sync>) {
let name = s.name(); let name = s.name();
let len = map.read().expect("locked for reading").len(); let len = map.read().expect("locked for reading").len();
trace!("built service #{len}: {name:?}"); trace!("built service #{len}: {name:?}");
map.write() map.write()
.expect("locked for writing") .expect("locked for writing")
.insert(name.to_owned(), (s, a)); .insert(name.to_owned(), (Arc::downgrade(&s), Arc::downgrade(&a)));
} }