diff --git a/src/router/run.rs b/src/router/run.rs index 02cec781..b3ec2285 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -1,8 +1,10 @@ use std::{sync::Arc, time::Duration}; use axum_server::Handle as ServerHandle; -use tokio::sync::broadcast::{self, Sender}; -use tracing::{debug, error, info}; +use tokio::{ + sync::broadcast::{self, Sender}, + task::JoinHandle, +}; extern crate conduit_admin as admin; extern crate conduit_core as conduit; @@ -10,14 +12,14 @@ extern crate conduit_service as service; use std::sync::atomic::Ordering; -use conduit::{debug_info, trace, Error, Result, Server}; +use conduit::{debug, debug_info, error, info, trace, Error, Result, Server}; -use crate::{layers, serve}; +use crate::serve; /// Main loop base #[tracing::instrument(skip_all)] -pub(crate) async fn run(server: Arc) -> Result<(), Error> { - let app = layers::build(&server)?; +pub(crate) async fn run(server: Arc) -> Result<()> { + debug!("Start"); // Install the admin room callback here for now admin::init().await; @@ -29,8 +31,16 @@ pub(crate) async fn run(server: Arc) -> Result<(), Error> { .runtime() .spawn(signal(server.clone(), tx.clone(), handle.clone())); - // Serve clients - let res = serve::serve(&server, app, handle, tx.subscribe()).await; + let mut listener = server + .runtime() + .spawn(serve::serve(server.clone(), handle.clone(), tx.subscribe())); + + // Focal point + debug!("Running"); + let res = tokio::select! { + res = &mut listener => res.map_err(Error::from).unwrap_or_else(Err), + res = service::services().poll() => handle_services_poll(&server, res, listener).await, + }; // Join the signal handler before we leave. sigs.abort(); @@ -39,16 +49,16 @@ pub(crate) async fn run(server: Arc) -> Result<(), Error> { // Remove the admin room callback admin::fini().await; - debug_info!("Finished"); + debug_info!("Finish"); res } /// Async initializations #[tracing::instrument(skip_all)] -pub(crate) async fn start(server: Arc) -> Result<(), Error> { +pub(crate) async fn start(server: Arc) -> Result<()> { debug!("Starting..."); - service::init(&server).await?; + service::start(&server).await?; #[cfg(feature = "systemd")] sd_notify::notify(true, &[sd_notify::NotifyState::Ready]).expect("failed to notify systemd of ready state"); @@ -59,12 +69,12 @@ pub(crate) async fn start(server: Arc) -> Result<(), Error> { /// Async destructions #[tracing::instrument(skip_all)] -pub(crate) async fn stop(_server: Arc) -> Result<(), Error> { +pub(crate) async fn stop(_server: Arc) -> Result<()> { debug!("Shutting down..."); // Wait for all completions before dropping or we'll lose them to the module // unload and explode. - service::fini().await; + service::stop().await; debug!("Cleaning up..."); @@ -108,3 +118,21 @@ async fn handle_shutdown(server: &Arc, tx: &Sender<()>, handle: &axum_se handle.shutdown(); } } + +async fn handle_services_poll( + server: &Arc, result: Result<()>, listener: JoinHandle>, +) -> Result<()> { + debug!("Service manager finished: {result:?}"); + + if server.running() { + if let Err(e) = server.shutdown() { + error!("Failed to send shutdown signal: {e}"); + } + } + + if let Err(e) = listener.await { + error!("Client listener task finished with error: {e}"); + } + + result +} diff --git a/src/router/serve/mod.rs b/src/router/serve/mod.rs index 47f2fd43..4e923444 100644 --- a/src/router/serve/mod.rs +++ b/src/router/serve/mod.rs @@ -4,23 +4,23 @@ mod unix; use std::sync::Arc; -use axum::Router; use axum_server::Handle as ServerHandle; -use conduit::{Error, Result, Server}; +use conduit::{Result, Server}; use tokio::sync::broadcast; +use crate::layers; + /// Serve clients -pub(super) async fn serve( - server: &Arc, app: Router, handle: ServerHandle, shutdown: broadcast::Receiver<()>, -) -> Result<(), Error> { +pub(super) async fn serve(server: Arc, handle: ServerHandle, shutdown: broadcast::Receiver<()>) -> Result<()> { let config = &server.config; let addrs = config.get_bind_addrs(); + let app = layers::build(&server)?; if cfg!(unix) && config.unix_socket_path.is_some() { - unix::serve(server, app, shutdown).await + unix::serve(&server, app, shutdown).await } else if config.tls.is_some() { - tls::serve(server, app, handle, addrs).await + tls::serve(&server, app, handle, addrs).await } else { - plain::serve(server, app, handle, addrs).await + plain::serve(&server, app, handle, addrs).await } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 15c4cc35..3a1eaf57 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -37,7 +37,7 @@ conduit::mod_dtor! {} static SERVICES: RwLock> = RwLock::new(None); -pub async fn init(server: &Arc) -> Result<()> { +pub async fn start(server: &Arc) -> Result<()> { let d = Arc::new(Database::open(server).await?); let s = Box::new(Services::build(server.clone(), d)?); _ = SERVICES.write().expect("write locked").insert(Box::leak(s)); @@ -45,7 +45,7 @@ pub async fn init(server: &Arc) -> Result<()> { services().start().await } -pub async fn fini() { +pub async fn stop() { services().stop().await; // Deactivate services(). Any further use will panic the caller. diff --git a/src/service/services.rs b/src/service/services.rs index 13689008..e9629ef7 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -1,6 +1,6 @@ -use std::{collections::BTreeMap, fmt::Write, panic::AssertUnwindSafe, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, fmt::Write, ops::DerefMut, panic::AssertUnwindSafe, sync::Arc, time::Duration}; -use conduit::{debug, debug_info, error, info, trace, utils::time, warn, Error, Result, Server}; +use conduit::{debug, debug_info, debug_warn, error, info, trace, utils::time, warn, Error, Result, Server}; use database::Database; use futures_util::FutureExt; use tokio::{ @@ -30,8 +30,8 @@ pub struct Services { pub media: Arc, pub sending: Arc, - workers: Mutex, manager: Mutex>>>, + workers: Mutex, pub(crate) service: Map, pub server: Arc, pub db: Arc, @@ -93,15 +93,15 @@ impl Services { media: build!(media::Service), sending: build!(sending::Service), globals: build!(globals::Service), - workers: Mutex::new(JoinSet::new()), manager: Mutex::new(None), + workers: Mutex::new(JoinSet::new()), service, server, db, }) } - pub async fn start(&self) -> Result<()> { + pub(super) async fn start(&self) -> Result<()> { debug_info!("Starting services..."); self.media.create_media_dir().await?; @@ -114,9 +114,7 @@ impl Services { } debug!("Starting service manager..."); - let manager = async move { crate::services().manager().await }; - let manager = self.server.runtime().spawn(manager); - _ = self.manager.lock().await.insert(manager); + self.manager_start().await?; if self.globals.allow_check_for_updates() { let handle = globals::updates::start_check_for_updates_task(); @@ -127,7 +125,7 @@ impl Services { Ok(()) } - pub async fn stop(&self) { + pub(super) async fn stop(&self) { info!("Shutting down services..."); self.interrupt(); @@ -138,15 +136,20 @@ impl Services { } debug!("Stopping service manager..."); - if let Some(manager) = self.manager.lock().await.take() { - if let Err(e) = manager.await { - error!("Manager shutdown error: {e:?}"); - } - } + self.manager_stop().await; debug_info!("Services shutdown complete."); } + pub async fn poll(&self) -> Result<()> { + if let Some(manager) = self.manager.lock().await.deref_mut() { + trace!("Polling service manager..."); + return manager.await?; + } + + Ok(()) + } + pub async fn clear_cache(&self) { for service in self.service.values() { service.clear_cache(); @@ -188,6 +191,26 @@ impl Services { } } + async fn manager_start(&self) -> Result<()> { + debug!("Starting service manager..."); + self.manager.lock().await.get_or_insert_with(|| { + self.server + .runtime() + .spawn(async move { crate::services().manager().await }) + }); + + Ok(()) + } + + async fn manager_stop(&self) { + if let Some(manager) = self.manager.lock().await.take() { + debug!("Waiting for service manager..."); + if let Err(e) = manager.await { + error!("Manager shutdown error: {e:?}"); + } + } + } + async fn manager(&self) -> Result<()> { loop { let mut workers = self.workers.lock().await; @@ -226,14 +249,15 @@ impl Services { &self, workers: &mut WorkersLocked<'_>, service: &Arc, error: Error, ) -> Result<()> { let name = service.name(); - error!("service {name:?} worker error: {error}"); + error!("service {name:?} aborted: {error}"); - if !error.is_panic() { + if !self.server.running() { + debug_warn!("service {name:?} error ignored on shutdown."); return Ok(()); } - if !self.server.running() { - return Ok(()); + if !error.is_panic() { + return Err(error); } let delay = Duration::from_millis(RESTART_DELAY_MS);