diff --git a/src/core/server.rs b/src/core/server.rs index 4bfff340..9e4497ae 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -6,7 +6,7 @@ use std::{ time::SystemTime, }; -use tokio::runtime; +use tokio::{runtime, sync::broadcast}; use crate::{config::Config, log::LogLevelReloadHandles}; @@ -22,6 +22,9 @@ pub struct Server { /// command to initiate shutdown. pub shutdown: Mutex>, + /// Reload/shutdown signal + pub signal: broadcast::Sender<&'static str>, + /// Reload/shutdown desired indicator; when false, shutdown is desired. This /// is an observable used on shutdown and modifying is not recommended. pub reload: AtomicBool, @@ -51,6 +54,7 @@ impl Server { config, started: SystemTime::now(), shutdown: Mutex::new(None), + signal: broadcast::channel::<&'static str>(1).0, reload: AtomicBool::new(false), interrupt: AtomicBool::new(false), runtime, diff --git a/src/main/main.rs b/src/main/main.rs index ba1cb2d3..73e6e690 100644 --- a/src/main/main.rs +++ b/src/main/main.rs @@ -6,9 +6,10 @@ extern crate conduit_core as conduit; use std::{cmp, sync::Arc, time::Duration}; -use conduit::{debug_info, error, utils::available_parallelism, Error, Result}; +use conduit::{debug_error, debug_info, error, utils::available_parallelism, warn, Error, Result}; use server::Server; -use tokio::runtime; +use tokio::{runtime, signal}; +use tracing::debug; const WORKER_NAME: &str = "conduwuit:worker"; const WORKER_MIN: usize = 2; @@ -26,6 +27,7 @@ fn main() -> Result<(), Error> { .expect("built runtime"); let server: Arc = Server::build(args, Some(runtime.handle()))?; + runtime.spawn(signal(server.clone())); runtime.block_on(async_main(&server))?; // explicit drop here to trace thread and tls dtors @@ -94,3 +96,34 @@ async fn async_main(server: &Arc) -> Result<(), Error> { debug_info!("Exit runtime"); Ok(()) } + +#[tracing::instrument(skip_all)] +async fn signal(server: Arc) { + let (mut term, mut quit); + #[cfg(unix)] + { + use signal::unix; + quit = unix::signal(unix::SignalKind::quit()).expect("SIGQUIT handler"); + term = unix::signal(unix::SignalKind::terminate()).expect("SIGTERM handler"); + }; + + loop { + debug!("Installed signal handlers"); + let sig: &'static str; + #[cfg(unix)] + tokio::select! { + _ = term.recv() => { sig = "SIGTERM"; }, + _ = quit.recv() => { sig = "Ctrl+\\"; }, + _ = signal::ctrl_c() => { sig = "Ctrl+C"; }, + } + #[cfg(not(unix))] + tokio::select! { + _ = signal::ctrl_c() => { sig = "Ctrl+C"; }, + } + + warn!("Received signal {}", sig); + if let Err(e) = server.server.signal.send(sig) { + debug_error!("signal channel: {e}"); + } + } +} diff --git a/src/router/run.rs b/src/router/run.rs index e6238853..8afb2dc3 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -1,11 +1,8 @@ use std::{sync::Arc, time::Duration}; use axum_server::Handle as ServerHandle; -use tokio::{ - signal, - sync::broadcast::{self, Sender}, -}; -use tracing::{debug, info, warn}; +use tokio::sync::broadcast::{self, Sender}; +use tracing::{debug, error, info}; extern crate conduit_admin as admin; extern crate conduit_core as conduit; @@ -39,9 +36,7 @@ pub(crate) async fn run(server: Arc) -> Result<(), Error> { server.interrupt.store(false, Ordering::Release); let (tx, _) = broadcast::channel::<()>(1); - let sigs = server - .runtime() - .spawn(sighandle(server.clone(), tx.clone())); + let sigs = server.runtime().spawn(signal(server.clone(), tx.clone())); // Serve clients let res = serve::serve(&server, app, handle, tx.subscribe()).await; @@ -115,51 +110,25 @@ pub(crate) async fn stop(_server: Arc) -> Result<(), Error> { } #[tracing::instrument(skip_all)] -async fn sighandle(server: Arc, tx: Sender<()>) -> Result<(), Error> { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); +async fn signal(server: Arc, tx: Sender<()>) { + let sig: &'static str = server + .signal + .subscribe() + .recv() + .await + .expect("channel error"); + debug!("Received signal {}", sig); + if sig == "Ctrl+C" { let reload = cfg!(unix) && cfg!(debug_assertions); server.reload.store(reload, Ordering::Release); - }; - - #[cfg(unix)] - let ctrl_bs = async { - signal::unix::signal(signal::unix::SignalKind::quit()) - .expect("failed to install Ctrl+\\ handler") - .recv() - .await; - }; - - #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install SIGTERM handler") - .recv() - .await; - }; - - debug!("Installed signal handlers"); - let sig: &str; - #[cfg(unix)] - tokio::select! { - () = ctrl_c => { sig = "Ctrl+C"; }, - () = ctrl_bs => { sig = "Ctrl+\\"; }, - () = terminate => { sig = "SIGTERM"; }, } - #[cfg(not(unix))] - tokio::select! { - _ = ctrl_c => { sig = "Ctrl+C"; }, - } - - warn!("Received {}", sig); server.interrupt.store(true, Ordering::Release); services().globals.rotate.fire(); - tx.send(()) - .expect("failed sending shutdown transaction to oneshot channel"); + if let Err(e) = tx.send(()) { + error!("failed sending shutdown transaction to channel: {e}"); + } if let Some(handle) = server.shutdown.lock().expect("locked").as_ref() { let pending = server.requests_spawn_active.load(Ordering::Relaxed); @@ -172,6 +141,4 @@ async fn sighandle(server: Arc, tx: Sender<()>) -> Result<(), Error> { handle.shutdown(); } } - - Ok(()) }