diff --git a/src/service/manager.rs b/src/service/manager.rs new file mode 100644 index 00000000..84a28a76 --- /dev/null +++ b/src/service/manager.rs @@ -0,0 +1,159 @@ +use std::{panic::AssertUnwindSafe, sync::Arc, time::Duration}; + +use conduit::{debug, debug_warn, error, trace, utils::time, warn, Error, Result, Server}; +use futures_util::FutureExt; +use tokio::{ + sync::{Mutex, MutexGuard}, + task::{JoinHandle, JoinSet}, + time::sleep, +}; + +use crate::{service::Service, Services}; + +pub(crate) struct Manager { + manager: Mutex>>>, + workers: Mutex, + server: Arc, + services: &'static Services, +} + +type Workers = JoinSet; +type WorkerResult = (Arc, Result<()>); +type WorkersLocked<'a> = MutexGuard<'a, Workers>; + +const RESTART_DELAY_MS: u64 = 2500; + +impl Manager { + pub(super) fn new(services: &Services) -> Arc { + Arc::new(Self { + manager: Mutex::new(None), + workers: Mutex::new(JoinSet::new()), + server: services.server.clone(), + services: crate::services(), + }) + } + + pub(super) async fn poll(&self) -> Result<()> { + if let Some(manager) = &mut *self.manager.lock().await { + trace!("Polling service manager..."); + return manager.await?; + } + + Ok(()) + } + + pub(super) async fn start(self: Arc) -> Result<()> { + let mut workers = self.workers.lock().await; + + debug!("Starting service manager..."); + let self_ = self.clone(); + _ = self.manager.lock().await.insert( + self.server + .runtime() + .spawn(async move { self_.worker().await }), + ); + + debug!("Starting service workers..."); + for service in self.services.service.values() { + self.start_worker(&mut workers, service).await?; + } + + Ok(()) + } + + pub(super) async fn stop(&self) { + if let Some(manager) = self.manager.lock().await.take() { + debug!("Waiting for service manager..."); + if let Err(e) = manager.await { + error!("Manager shutdown error: {e:?}"); + } + } + } + + async fn worker(&self) -> Result<()> { + loop { + let mut workers = self.workers.lock().await; + tokio::select! { + result = workers.join_next() => match result { + Some(Ok(result)) => self.handle_result(&mut workers, result).await?, + Some(Err(error)) => self.handle_abort(&mut workers, Error::from(error)).await?, + None => break, + } + } + } + + debug!("Worker manager finished"); + Ok(()) + } + + async fn handle_abort(&self, _workers: &mut WorkersLocked<'_>, error: Error) -> Result<()> { + // not supported until service can be associated with abort + unimplemented!("unexpected worker task abort {error:?}"); + } + + async fn handle_result(&self, workers: &mut WorkersLocked<'_>, result: WorkerResult) -> Result<()> { + let (service, result) = result; + match result { + Ok(()) => self.handle_finished(workers, &service).await, + Err(error) => self.handle_error(workers, &service, error).await, + } + } + + async fn handle_finished(&self, _workers: &mut WorkersLocked<'_>, service: &Arc) -> Result<()> { + debug!("service {:?} worker finished", service.name()); + Ok(()) + } + + async fn handle_error( + &self, workers: &mut WorkersLocked<'_>, service: &Arc, error: Error, + ) -> Result<()> { + let name = service.name(); + error!("service {name:?} aborted: {error}"); + + if !self.server.running() { + debug_warn!("service {name:?} error ignored on shutdown."); + return Ok(()); + } + + if !error.is_panic() { + return Err(error); + } + + let delay = Duration::from_millis(RESTART_DELAY_MS); + warn!("service {name:?} worker restarting after {} delay", time::pretty(delay)); + sleep(delay).await; + + self.start_worker(workers, service).await + } + + /// Start the worker in a task for the service. + async fn start_worker(&self, workers: &mut WorkersLocked<'_>, service: &Arc) -> Result<()> { + if !self.server.running() { + return Err(Error::Err(format!( + "Service {:?} worker not starting during server shutdown.", + service.name() + ))); + } + + debug!("Service {:?} worker starting...", service.name()); + workers.spawn_on(worker(service.clone()), self.server.runtime()); + + Ok(()) + } +} + +/// Base frame for service worker. This runs in a tokio::task. All errors and +/// panics from the worker are caught and returned cleanly. The JoinHandle +/// should never error with a panic, and if so it should propagate, but it may +/// error with an Abort which the manager should handle along with results to +/// determine if the worker should be restarted. +async fn worker(service: Arc) -> WorkerResult { + let service_ = Arc::clone(&service); + let result = AssertUnwindSafe(service_.worker()) + .catch_unwind() + .await + .map_err(Error::from_panic); + + // flattens JoinError for panic into worker's Error + (service, result.unwrap_or_else(Err)) +} diff --git a/src/service/mod.rs b/src/service/mod.rs index 3a1eaf57..ba68fae2 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,5 +1,6 @@ #![allow(refining_impl_trait)] +mod manager; mod service; pub mod services; diff --git a/src/service/services.rs b/src/service/services.rs index e9629ef7..62e73fd1 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -1,16 +1,13 @@ -use std::{collections::BTreeMap, fmt::Write, ops::DerefMut, panic::AssertUnwindSafe, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, fmt::Write, sync::Arc}; -use conduit::{debug, debug_info, debug_warn, error, info, trace, utils::time, warn, Error, Result, Server}; +use conduit::{debug, debug_info, info, trace, Result, Server}; use database::Database; -use futures_util::FutureExt; -use tokio::{ - sync::{Mutex, MutexGuard}, - task::{JoinHandle, JoinSet}, - time::sleep, -}; +use tokio::sync::Mutex; use crate::{ - account_data, admin, appservice, globals, key_backups, media, presence, pusher, rooms, sending, + account_data, admin, appservice, globals, key_backups, + manager::Manager, + media, presence, pusher, rooms, sending, service::{Args, Map, Service}, transaction_ids, uiaa, users, }; @@ -30,19 +27,12 @@ pub struct Services { pub media: Arc, pub sending: Arc, - manager: Mutex>>>, - workers: Mutex, + manager: Mutex>>, pub(crate) service: Map, pub server: Arc, pub db: Arc, } -type Workers = JoinSet; -type WorkerResult = (Arc, Result<()>); -type WorkersLocked<'a> = MutexGuard<'a, Workers>; - -const RESTART_DELAY_MS: u64 = 2500; - impl Services { pub fn build(server: Arc, db: Arc) -> Result { let mut service: Map = BTreeMap::new(); @@ -94,7 +84,6 @@ impl Services { sending: build!(sending::Service), globals: build!(globals::Service), manager: Mutex::new(None), - workers: Mutex::new(JoinSet::new()), service, server, db, @@ -108,13 +97,13 @@ impl Services { globals::migrations::migrations(&self.db, &self.globals.config).await?; globals::emerg_access::init_emergency_access(); - let mut workers = self.workers.lock().await; - for service in self.service.values() { - self.start_worker(&mut workers, service).await?; - } - - debug!("Starting service manager..."); - self.manager_start().await?; + self.manager + .lock() + .await + .insert(Manager::new(self)) + .clone() + .start() + .await?; if self.globals.allow_check_for_updates() { let handle = globals::updates::start_check_for_updates_task(); @@ -135,16 +124,16 @@ impl Services { _ = updates_handle.await; } - debug!("Stopping service manager..."); - self.manager_stop().await; + if let Some(manager) = self.manager.lock().await.as_ref() { + manager.stop().await; + } debug_info!("Services shutdown complete."); } pub async fn poll(&self) -> Result<()> { - if let Some(manager) = self.manager.lock().await.deref_mut() { - trace!("Polling service manager..."); - return manager.await?; + if let Some(manager) = self.manager.lock().await.as_ref() { + return manager.poll().await; } Ok(()) @@ -190,111 +179,4 @@ impl Services { service.interrupt(); } } - - async fn manager_start(&self) -> Result<()> { - debug!("Starting service manager..."); - self.manager.lock().await.get_or_insert_with(|| { - self.server - .runtime() - .spawn(async move { crate::services().manager().await }) - }); - - Ok(()) - } - - async fn manager_stop(&self) { - if let Some(manager) = self.manager.lock().await.take() { - debug!("Waiting for service manager..."); - if let Err(e) = manager.await { - error!("Manager shutdown error: {e:?}"); - } - } - } - - async fn manager(&self) -> Result<()> { - loop { - let mut workers = self.workers.lock().await; - tokio::select! { - result = workers.join_next() => match result { - Some(Ok(result)) => self.handle_result(&mut workers, result).await?, - Some(Err(error)) => self.handle_abort(&mut workers, Error::from(error)).await?, - None => break, - } - } - } - - debug!("Worker manager finished"); - Ok(()) - } - - async fn handle_abort(&self, _workers: &mut WorkersLocked<'_>, error: Error) -> Result<()> { - // not supported until service can be associated with abort - unimplemented!("unexpected worker task abort {error:?}"); - } - - async fn handle_result(&self, workers: &mut WorkersLocked<'_>, result: WorkerResult) -> Result<()> { - let (service, result) = result; - match result { - Ok(()) => self.handle_finished(workers, &service).await, - Err(error) => self.handle_error(workers, &service, error).await, - } - } - - async fn handle_finished(&self, _workers: &mut WorkersLocked<'_>, service: &Arc) -> Result<()> { - debug!("service {:?} worker finished", service.name()); - Ok(()) - } - - async fn handle_error( - &self, workers: &mut WorkersLocked<'_>, service: &Arc, error: Error, - ) -> Result<()> { - let name = service.name(); - error!("service {name:?} aborted: {error}"); - - if !self.server.running() { - debug_warn!("service {name:?} error ignored on shutdown."); - return Ok(()); - } - - if !error.is_panic() { - return Err(error); - } - - let delay = Duration::from_millis(RESTART_DELAY_MS); - warn!("service {name:?} worker restarting after {} delay", time::pretty(delay)); - sleep(delay).await; - - self.start_worker(workers, service).await - } - - /// Start the worker in a task for the service. - async fn start_worker(&self, workers: &mut WorkersLocked<'_>, service: &Arc) -> Result<()> { - if !self.server.running() { - return Err(Error::Err(format!( - "Service {:?} worker not starting during server shutdown.", - service.name() - ))); - } - - debug!("Service {:?} worker starting...", service.name()); - workers.spawn_on(worker(service.clone()), self.server.runtime()); - - Ok(()) - } -} - -/// Base frame for service worker. This runs in a tokio::task. All errors and -/// panics from the worker are caught and returned cleanly. The JoinHandle -/// should never error with a panic, and if so it should propagate, but it may -/// error with an Abort which the manager should handle along with results to -/// determine if the worker should be restarted. -async fn worker(service: Arc) -> WorkerResult { - let service_ = Arc::clone(&service); - let result = AssertUnwindSafe(service_.worker()) - .catch_unwind() - .await - .map_err(Error::from_panic); - - // flattens JoinError for panic into worker's Error - (service, result.unwrap_or_else(Err)) }