abort tasks for non-async pool shudown

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-18 03:29:42 +00:00 committed by strawberry
parent 7b8320e0eb
commit 4d46df2af5
2 changed files with 12 additions and 5 deletions

View file

@ -347,7 +347,8 @@ impl Drop for Engine {
fn drop(&mut self) { fn drop(&mut self) {
const BLOCKING: bool = true; const BLOCKING: bool = true;
debug_assert!(!self.pool.close(), "request pool was not closed"); debug!("Closing frontend pool");
self.pool.close();
debug!("Waiting for background tasks to finish..."); debug!("Waiting for background tasks to finish...");
self.db.cancel_all_background_work(BLOCKING); self.db.cancel_all_background_work(BLOCKING);

View file

@ -2,7 +2,7 @@ use std::{
mem::take, mem::take,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc, Mutex,
}, },
}; };
@ -11,7 +11,7 @@ use conduwuit::{debug, debug_warn, defer, err, implement, result::DebugInspect,
use futures::{channel::oneshot, TryFutureExt}; use futures::{channel::oneshot, TryFutureExt};
use oneshot::Sender as ResultSender; use oneshot::Sender as ResultSender;
use rocksdb::Direction; use rocksdb::Direction;
use tokio::{sync::Mutex, task::JoinSet}; use tokio::task::JoinSet;
use crate::{keyval::KeyBuf, stream, Handle, Map}; use crate::{keyval::KeyBuf, stream, Handle, Map};
@ -79,7 +79,7 @@ pub(crate) async fn new(server: &Arc<Server>, opts: &Opts) -> Result<Arc<Self>>
pub(crate) async fn shutdown(self: &Arc<Self>) { pub(crate) async fn shutdown(self: &Arc<Self>) {
self.close(); self.close();
let workers = take(&mut *self.workers.lock().await); let workers = take(&mut *self.workers.lock().expect("locked"));
debug!(workers = workers.len(), "Waiting for workers to join..."); debug!(workers = workers.len(), "Waiting for workers to join...");
workers.join_all().await; workers.join_all().await;
@ -92,7 +92,13 @@ pub(crate) fn close(&self) -> bool {
return false; return false;
} }
let mut workers = take(&mut *self.workers.lock().expect("locked"));
debug!(workers = workers.len(), "Waiting for workers to join...");
workers.abort_all();
drop(workers);
std::thread::yield_now(); std::thread::yield_now();
debug_assert!(self.queue.is_empty(), "channel is not empty");
debug!( debug!(
senders = self.queue.sender_count(), senders = self.queue.sender_count(),
receivers = self.queue.receiver_count(), receivers = self.queue.receiver_count(),
@ -104,7 +110,7 @@ pub(crate) fn close(&self) -> bool {
#[implement(Pool)] #[implement(Pool)]
async fn spawn_until(self: &Arc<Self>, recv: Receiver<Cmd>, max: usize) -> Result { async fn spawn_until(self: &Arc<Self>, recv: Receiver<Cmd>, max: usize) -> Result {
let mut workers = self.workers.lock().await; let mut workers = self.workers.lock().expect("locked");
while workers.len() < max { while workers.len() < max {
self.spawn_one(&mut workers, recv.clone())?; self.spawn_one(&mut workers, recv.clone())?;
} }