From 3109c0daba1115a4c58f46c9192d6ade8f078885 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 3 Dec 2024 06:34:56 +0000 Subject: [PATCH] perform async shutdown for database pool after services stop Signed-off-by: Jason Volk --- src/database/engine.rs | 5 +++-- src/database/pool.rs | 21 ++++++++++----------- src/router/run.rs | 12 +++++++++++- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/database/engine.rs b/src/database/engine.rs index a6ed7d86..fe6602ae 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -148,6 +148,8 @@ impl Engine { .expect("column was created and exists") } + pub async fn shutdown_pool(&self) { self.pool.shutdown().await; } + pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) } pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) } @@ -325,8 +327,7 @@ impl Drop for Engine { fn drop(&mut self) { const BLOCKING: bool = true; - debug!("Shutting down request pool..."); - self.pool.close(); + debug_assert!(!self.pool.close(), "request pool was not closed"); debug!("Waiting for background tasks to finish..."); self.db.cancel_all_background_work(BLOCKING); diff --git a/src/database/pool.rs b/src/database/pool.rs index 136de67d..e7ffc807 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -76,10 +76,8 @@ pub(crate) async fn new(server: &Arc, opts: &Opts) -> Result> } #[implement(Pool)] -pub(crate) async fn _shutdown(self: &Arc) { - if !self.queue.is_closed() { - self.close(); - } +pub(crate) async fn shutdown(self: &Arc) { + self.close(); let workers = take(&mut *self.workers.lock().await); debug!(workers = workers.len(), "Waiting for workers to join..."); @@ -89,18 +87,19 @@ pub(crate) async fn _shutdown(self: &Arc) { } #[implement(Pool)] -pub(crate) fn close(&self) { - debug_assert!(!self.queue.is_closed(), "channel already closed"); +pub(crate) fn close(&self) -> bool { + if !self.queue.close() { + return false; + } + + std::thread::yield_now(); debug!( senders = self.queue.sender_count(), receivers = self.queue.receiver_count(), - "Closing pool channel" + "Closed pool channel" ); - let closing = self.queue.close(); - debug_assert!(closing, "channel is not closing"); - - std::thread::yield_now(); + true } #[implement(Pool)] diff --git a/src/router/run.rs b/src/router/run.rs index 395aa8c4..93b1339b 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -78,6 +78,11 @@ pub(crate) async fn stop(services: Arc) -> Result<()> { // unload and explode. services.stop().await; + // Check that Services and Database will drop as expected, The complex of Arc's + // used for various components can easily lead to references being held + // somewhere improperly; this can hang shutdowns. + debug!("Cleaning up..."); + let db = Arc::downgrade(&services.db); if let Err(services) = Arc::try_unwrap(services) { debug_error!( "{} dangling references to Services after shutdown", @@ -85,7 +90,12 @@ pub(crate) async fn stop(services: Arc) -> Result<()> { ); } - debug!("Cleaning up..."); + // The db threadpool requires async join if we use tokio/spawn_blocking to + // manage the threads. Without async-drop we have to wait here; for symmetry + // with Services construction it can't be done in services.stop(). + if let Some(db) = db.upgrade() { + db.db.shutdown_pool().await; + } #[cfg(feature = "systemd")] sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]).expect("failed to notify systemd of stopping state");