move signal handling out to main
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
1108235c63
commit
89ab687f16
3 changed files with 55 additions and 51 deletions
|
@ -6,7 +6,7 @@ use std::{
|
||||||
time::SystemTime,
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::runtime;
|
use tokio::{runtime, sync::broadcast};
|
||||||
|
|
||||||
use crate::{config::Config, log::LogLevelReloadHandles};
|
use crate::{config::Config, log::LogLevelReloadHandles};
|
||||||
|
|
||||||
|
@ -22,6 +22,9 @@ pub struct Server {
|
||||||
/// command to initiate shutdown.
|
/// command to initiate shutdown.
|
||||||
pub shutdown: Mutex<Option<axum_server::Handle>>,
|
pub shutdown: Mutex<Option<axum_server::Handle>>,
|
||||||
|
|
||||||
|
/// Reload/shutdown signal
|
||||||
|
pub signal: broadcast::Sender<&'static str>,
|
||||||
|
|
||||||
/// Reload/shutdown desired indicator; when false, shutdown is desired. This
|
/// Reload/shutdown desired indicator; when false, shutdown is desired. This
|
||||||
/// is an observable used on shutdown and modifying is not recommended.
|
/// is an observable used on shutdown and modifying is not recommended.
|
||||||
pub reload: AtomicBool,
|
pub reload: AtomicBool,
|
||||||
|
@ -51,6 +54,7 @@ impl Server {
|
||||||
config,
|
config,
|
||||||
started: SystemTime::now(),
|
started: SystemTime::now(),
|
||||||
shutdown: Mutex::new(None),
|
shutdown: Mutex::new(None),
|
||||||
|
signal: broadcast::channel::<&'static str>(1).0,
|
||||||
reload: AtomicBool::new(false),
|
reload: AtomicBool::new(false),
|
||||||
interrupt: AtomicBool::new(false),
|
interrupt: AtomicBool::new(false),
|
||||||
runtime,
|
runtime,
|
||||||
|
|
|
@ -6,9 +6,10 @@ extern crate conduit_core as conduit;
|
||||||
|
|
||||||
use std::{cmp, sync::Arc, time::Duration};
|
use std::{cmp, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use conduit::{debug_info, error, utils::available_parallelism, Error, Result};
|
use conduit::{debug_error, debug_info, error, utils::available_parallelism, warn, Error, Result};
|
||||||
use server::Server;
|
use server::Server;
|
||||||
use tokio::runtime;
|
use tokio::{runtime, signal};
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
const WORKER_NAME: &str = "conduwuit:worker";
|
const WORKER_NAME: &str = "conduwuit:worker";
|
||||||
const WORKER_MIN: usize = 2;
|
const WORKER_MIN: usize = 2;
|
||||||
|
@ -26,6 +27,7 @@ fn main() -> Result<(), Error> {
|
||||||
.expect("built runtime");
|
.expect("built runtime");
|
||||||
|
|
||||||
let server: Arc<Server> = Server::build(args, Some(runtime.handle()))?;
|
let server: Arc<Server> = Server::build(args, Some(runtime.handle()))?;
|
||||||
|
runtime.spawn(signal(server.clone()));
|
||||||
runtime.block_on(async_main(&server))?;
|
runtime.block_on(async_main(&server))?;
|
||||||
|
|
||||||
// explicit drop here to trace thread and tls dtors
|
// explicit drop here to trace thread and tls dtors
|
||||||
|
@ -94,3 +96,34 @@ async fn async_main(server: &Arc<Server>) -> Result<(), Error> {
|
||||||
debug_info!("Exit runtime");
|
debug_info!("Exit runtime");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn signal(server: Arc<Server>) {
|
||||||
|
let (mut term, mut quit);
|
||||||
|
#[cfg(unix)]
|
||||||
|
{
|
||||||
|
use signal::unix;
|
||||||
|
quit = unix::signal(unix::SignalKind::quit()).expect("SIGQUIT handler");
|
||||||
|
term = unix::signal(unix::SignalKind::terminate()).expect("SIGTERM handler");
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
debug!("Installed signal handlers");
|
||||||
|
let sig: &'static str;
|
||||||
|
#[cfg(unix)]
|
||||||
|
tokio::select! {
|
||||||
|
_ = term.recv() => { sig = "SIGTERM"; },
|
||||||
|
_ = quit.recv() => { sig = "Ctrl+\\"; },
|
||||||
|
_ = signal::ctrl_c() => { sig = "Ctrl+C"; },
|
||||||
|
}
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
tokio::select! {
|
||||||
|
_ = signal::ctrl_c() => { sig = "Ctrl+C"; },
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!("Received signal {}", sig);
|
||||||
|
if let Err(e) = server.server.signal.send(sig) {
|
||||||
|
debug_error!("signal channel: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,11 +1,8 @@
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use axum_server::Handle as ServerHandle;
|
use axum_server::Handle as ServerHandle;
|
||||||
use tokio::{
|
use tokio::sync::broadcast::{self, Sender};
|
||||||
signal,
|
use tracing::{debug, error, info};
|
||||||
sync::broadcast::{self, Sender},
|
|
||||||
};
|
|
||||||
use tracing::{debug, info, warn};
|
|
||||||
|
|
||||||
extern crate conduit_admin as admin;
|
extern crate conduit_admin as admin;
|
||||||
extern crate conduit_core as conduit;
|
extern crate conduit_core as conduit;
|
||||||
|
@ -39,9 +36,7 @@ pub(crate) async fn run(server: Arc<Server>) -> Result<(), Error> {
|
||||||
|
|
||||||
server.interrupt.store(false, Ordering::Release);
|
server.interrupt.store(false, Ordering::Release);
|
||||||
let (tx, _) = broadcast::channel::<()>(1);
|
let (tx, _) = broadcast::channel::<()>(1);
|
||||||
let sigs = server
|
let sigs = server.runtime().spawn(signal(server.clone(), tx.clone()));
|
||||||
.runtime()
|
|
||||||
.spawn(sighandle(server.clone(), tx.clone()));
|
|
||||||
|
|
||||||
// Serve clients
|
// Serve clients
|
||||||
let res = serve::serve(&server, app, handle, tx.subscribe()).await;
|
let res = serve::serve(&server, app, handle, tx.subscribe()).await;
|
||||||
|
@ -115,51 +110,25 @@ pub(crate) async fn stop(_server: Arc<Server>) -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn sighandle(server: Arc<Server>, tx: Sender<()>) -> Result<(), Error> {
|
async fn signal(server: Arc<Server>, tx: Sender<()>) {
|
||||||
let ctrl_c = async {
|
let sig: &'static str = server
|
||||||
signal::ctrl_c()
|
.signal
|
||||||
|
.subscribe()
|
||||||
|
.recv()
|
||||||
.await
|
.await
|
||||||
.expect("failed to install Ctrl+C handler");
|
.expect("channel error");
|
||||||
|
|
||||||
|
debug!("Received signal {}", sig);
|
||||||
|
if sig == "Ctrl+C" {
|
||||||
let reload = cfg!(unix) && cfg!(debug_assertions);
|
let reload = cfg!(unix) && cfg!(debug_assertions);
|
||||||
server.reload.store(reload, Ordering::Release);
|
server.reload.store(reload, Ordering::Release);
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
let ctrl_bs = async {
|
|
||||||
signal::unix::signal(signal::unix::SignalKind::quit())
|
|
||||||
.expect("failed to install Ctrl+\\ handler")
|
|
||||||
.recv()
|
|
||||||
.await;
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
let terminate = async {
|
|
||||||
signal::unix::signal(signal::unix::SignalKind::terminate())
|
|
||||||
.expect("failed to install SIGTERM handler")
|
|
||||||
.recv()
|
|
||||||
.await;
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("Installed signal handlers");
|
|
||||||
let sig: &str;
|
|
||||||
#[cfg(unix)]
|
|
||||||
tokio::select! {
|
|
||||||
() = ctrl_c => { sig = "Ctrl+C"; },
|
|
||||||
() = ctrl_bs => { sig = "Ctrl+\\"; },
|
|
||||||
() = terminate => { sig = "SIGTERM"; },
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
|
||||||
tokio::select! {
|
|
||||||
_ = ctrl_c => { sig = "Ctrl+C"; },
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!("Received {}", sig);
|
|
||||||
server.interrupt.store(true, Ordering::Release);
|
server.interrupt.store(true, Ordering::Release);
|
||||||
services().globals.rotate.fire();
|
services().globals.rotate.fire();
|
||||||
tx.send(())
|
if let Err(e) = tx.send(()) {
|
||||||
.expect("failed sending shutdown transaction to oneshot channel");
|
error!("failed sending shutdown transaction to channel: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(handle) = server.shutdown.lock().expect("locked").as_ref() {
|
if let Some(handle) = server.shutdown.lock().expect("locked").as_ref() {
|
||||||
let pending = server.requests_spawn_active.load(Ordering::Relaxed);
|
let pending = server.requests_spawn_active.load(Ordering::Relaxed);
|
||||||
|
@ -172,6 +141,4 @@ async fn sighandle(server: Arc<Server>, tx: Sender<()>) -> Result<(), Error> {
|
||||||
handle.shutdown();
|
handle.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue