perform async shutdown for database pool after services stop

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-03 06:34:56 +00:00
parent ef9b1c6303
commit 3109c0daba
3 changed files with 24 additions and 14 deletions

View file

@ -148,6 +148,8 @@ impl Engine {
.expect("column was created and exists") .expect("column was created and exists")
} }
pub async fn shutdown_pool(&self) { self.pool.shutdown().await; }
pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) } pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) }
pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) } pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) }
@ -325,8 +327,7 @@ impl Drop for Engine {
fn drop(&mut self) { fn drop(&mut self) {
const BLOCKING: bool = true; const BLOCKING: bool = true;
debug!("Shutting down request pool..."); debug_assert!(!self.pool.close(), "request pool was not closed");
self.pool.close();
debug!("Waiting for background tasks to finish..."); debug!("Waiting for background tasks to finish...");
self.db.cancel_all_background_work(BLOCKING); self.db.cancel_all_background_work(BLOCKING);

View file

@ -76,10 +76,8 @@ pub(crate) async fn new(server: &Arc<Server>, opts: &Opts) -> Result<Arc<Self>>
} }
#[implement(Pool)] #[implement(Pool)]
pub(crate) async fn _shutdown(self: &Arc<Self>) { pub(crate) async fn shutdown(self: &Arc<Self>) {
if !self.queue.is_closed() { self.close();
self.close();
}
let workers = take(&mut *self.workers.lock().await); let workers = take(&mut *self.workers.lock().await);
debug!(workers = workers.len(), "Waiting for workers to join..."); debug!(workers = workers.len(), "Waiting for workers to join...");
@ -89,18 +87,19 @@ pub(crate) async fn _shutdown(self: &Arc<Self>) {
} }
#[implement(Pool)] #[implement(Pool)]
pub(crate) fn close(&self) { pub(crate) fn close(&self) -> bool {
debug_assert!(!self.queue.is_closed(), "channel already closed"); if !self.queue.close() {
return false;
}
std::thread::yield_now();
debug!( debug!(
senders = self.queue.sender_count(), senders = self.queue.sender_count(),
receivers = self.queue.receiver_count(), receivers = self.queue.receiver_count(),
"Closing pool channel" "Closed pool channel"
); );
let closing = self.queue.close(); true
debug_assert!(closing, "channel is not closing");
std::thread::yield_now();
} }
#[implement(Pool)] #[implement(Pool)]

View file

@ -78,6 +78,11 @@ pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
// unload and explode. // unload and explode.
services.stop().await; services.stop().await;
// Check that Services and Database will drop as expected, The complex of Arc's
// used for various components can easily lead to references being held
// somewhere improperly; this can hang shutdowns.
debug!("Cleaning up...");
let db = Arc::downgrade(&services.db);
if let Err(services) = Arc::try_unwrap(services) { if let Err(services) = Arc::try_unwrap(services) {
debug_error!( debug_error!(
"{} dangling references to Services after shutdown", "{} dangling references to Services after shutdown",
@ -85,7 +90,12 @@ pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
); );
} }
debug!("Cleaning up..."); // The db threadpool requires async join if we use tokio/spawn_blocking to
// manage the threads. Without async-drop we have to wait here; for symmetry
// with Services construction it can't be done in services.stop().
if let Some(db) = db.upgrade() {
db.db.shutdown_pool().await;
}
#[cfg(feature = "systemd")] #[cfg(feature = "systemd")]
sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]).expect("failed to notify systemd of stopping state"); sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]).expect("failed to notify systemd of stopping state");