partially revert 9a9c071e82; use std threads for db pool.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-05 23:33:27 +00:00
parent 925061b92d
commit d36167ab64
4 changed files with 80 additions and 67 deletions

View file

@ -31,11 +31,11 @@ pub struct Engine {
opts: Options,
env: Env,
cfs: Mutex<BTreeSet<String>>,
pub(crate) pool: Arc<Pool>,
pub(crate) db: Db,
corks: AtomicU32,
pub(super) read_only: bool,
pub(super) secondary: bool,
pub(crate) pool: Arc<Pool>,
}
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
@ -44,6 +44,8 @@ impl Engine {
#[tracing::instrument(skip_all)]
pub(crate) async fn open(server: &Arc<Server>) -> Result<Arc<Self>> {
let config = &server.config;
let path = &config.database_path;
let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)]
@ -64,30 +66,32 @@ impl Engine {
col_cache.get("primary").expect("primary cache exists"),
)?;
let load_time = std::time::Instant::now();
if config.rocksdb_repair {
repair(&db_opts, &config.database_path)?;
}
debug!("Listing column families in database");
let cfs = Db::list_cf(&db_opts, &config.database_path)
.unwrap_or_default()
.into_iter()
.collect::<BTreeSet<_>>();
debug!("Opening {} column family descriptors in database", cfs.len());
debug!("Configuring {} column families found in database", cfs.len());
let cfopts = cfs
.iter()
.map(|name| cf_options(config, name, db_opts.clone(), &mut col_cache))
.collect::<Result<Vec<_>>>()?;
debug!("Opening {} column family descriptors in database", cfs.len());
let cfds = cfs
.iter()
.zip(cfopts.into_iter())
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
.collect::<Vec<_>>();
let path = &config.database_path;
debug!("Starting frontend request pool");
let pool = Pool::new(server)?;
let load_time = std::time::Instant::now();
if config.rocksdb_repair {
repair(&db_opts, &config.database_path)?;
}
debug!("Opening database...");
let res = if config.rocksdb_read_only {
@ -113,11 +117,11 @@ impl Engine {
opts: db_opts,
env: db_env,
cfs: Mutex::new(cfs),
db,
corks: AtomicU32::new(0),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
pool: Pool::new(server).await?,
pool,
db,
}))
}
@ -146,8 +150,6 @@ impl Engine {
.expect("column was created and exists")
}
pub async fn shutdown_pool(&self) { self.pool.shutdown().await; }
pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) }
pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) }

View file

@ -6,21 +6,22 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
thread,
thread::JoinHandle,
};
use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
use conduwuit::{
debug, debug_warn, defer, err, implement,
debug, debug_warn, defer, err, error, implement,
result::DebugInspect,
trace,
utils::sys::compute::{get_affinity, nth_core_available, set_affinity},
Result, Server,
Error, Result, Server,
};
use futures::{channel::oneshot, TryFutureExt};
use oneshot::Sender as ResultSender;
use rocksdb::Direction;
use smallvec::SmallVec;
use tokio::task::JoinSet;
use self::configure::configure;
use crate::{keyval::KeyBuf, stream, Handle, Map};
@ -29,9 +30,9 @@ use crate::{keyval::KeyBuf, stream, Handle, Map};
/// requests which are not cached. These thread-blocking requests are offloaded
/// from the tokio async workers and executed on this threadpool.
pub(crate) struct Pool {
server: Arc<Server>,
_server: Arc<Server>,
queues: Vec<Sender<Cmd>>,
workers: Mutex<JoinSet<()>>,
workers: Mutex<Vec<JoinHandle<()>>>,
topology: Vec<usize>,
busy: AtomicUsize,
queued_max: AtomicUsize,
@ -69,39 +70,42 @@ const WORKER_LIMIT: (usize, usize) = (1, 1024);
const QUEUE_LIMIT: (usize, usize) = (1, 2048);
const BATCH_INLINE: usize = 1;
const WORKER_STACK_SIZE: usize = 1_048_576;
const WORKER_NAME: &str = "conduwuit:db";
#[implement(Pool)]
pub(crate) async fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
const CHAN_SCHED: (QueueStrategy, QueueStrategy) = (QueueStrategy::Fifo, QueueStrategy::Lifo);
let (total_workers, queue_sizes, topology) = configure(server);
let (senders, receivers) = queue_sizes
let (senders, receivers): (Vec<_>, Vec<_>) = queue_sizes
.into_iter()
.map(|cap| async_channel::bounded_with_queue_strategy(cap, CHAN_SCHED))
.unzip();
let pool = Arc::new(Self {
server: server.clone(),
_server: server.clone(),
queues: senders,
workers: JoinSet::new().into(),
workers: Vec::new().into(),
topology,
busy: AtomicUsize::default(),
queued_max: AtomicUsize::default(),
});
pool.spawn_until(receivers, total_workers).await?;
pool.spawn_until(&receivers, total_workers)?;
Ok(pool)
}
impl Drop for Pool {
fn drop(&mut self) {
debug_assert!(self.queues.iter().all(Sender::is_empty), "channel must be empty on drop");
self.close();
debug_assert!(
self.queues.iter().all(Sender::is_empty),
"channel must should not have requests queued on drop"
);
debug_assert!(
self.queues.iter().all(Sender::is_closed),
"channel should be closed on drop"
@ -110,17 +114,10 @@ impl Drop for Pool {
}
#[implement(Pool)]
pub(crate) async fn shutdown(self: &Arc<Self>) {
self.close();
let workers = take(&mut *self.workers.lock().expect("locked"));
debug!(workers = workers.len(), "Waiting for workers to join...");
workers.join_all().await;
}
#[implement(Pool)]
#[tracing::instrument(skip_all)]
pub(crate) fn close(&self) {
let workers = take(&mut *self.workers.lock().expect("locked"));
let senders = self.queues.iter().map(Sender::sender_count).sum::<usize>();
let receivers = self
@ -129,27 +126,40 @@ pub(crate) fn close(&self) {
.map(Sender::receiver_count)
.sum::<usize>();
debug!(
queues = self.queues.len(),
workers = self.workers.lock().expect("locked").len(),
?senders,
?receivers,
"Closing pool..."
);
for queue in &self.queues {
queue.close();
}
self.workers.lock().expect("locked").abort_all();
std::thread::yield_now();
if workers.is_empty() {
return;
}
debug!(
senders,
receivers,
queues = self.queues.len(),
workers = workers.len(),
"Closing pool. Waiting for workers to join..."
);
workers
.into_iter()
.map(JoinHandle::join)
.map(|result| result.map_err(Error::from_panic))
.enumerate()
.for_each(|(id, result)| {
match result {
| Ok(()) => trace!(?id, "worker joined"),
| Err(error) => error!(?id, "worker joined with error: {error}"),
};
});
}
#[implement(Pool)]
async fn spawn_until(self: &Arc<Self>, recv: Vec<Receiver<Cmd>>, count: usize) -> Result {
fn spawn_until(self: &Arc<Self>, recv: &[Receiver<Cmd>], count: usize) -> Result {
let mut workers = self.workers.lock().expect("locked");
while workers.len() < count {
self.spawn_one(&mut workers, &recv)?;
self.clone().spawn_one(&mut workers, recv)?;
}
Ok(())
@ -162,23 +172,24 @@ async fn spawn_until(self: &Arc<Self>, recv: Vec<Receiver<Cmd>>, count: usize) -
skip_all,
fields(id = %workers.len())
)]
fn spawn_one(self: &Arc<Self>, workers: &mut JoinSet<()>, recv: &[Receiver<Cmd>]) -> Result {
fn spawn_one(
self: Arc<Self>,
workers: &mut Vec<JoinHandle<()>>,
recv: &[Receiver<Cmd>],
) -> Result {
debug_assert!(!self.queues.is_empty(), "Must have at least one queue");
debug_assert!(!recv.is_empty(), "Must have at least one receiver");
let id = workers.len();
let group = id.overflowing_rem(self.queues.len()).0;
let recv = recv[group].clone();
let self_ = self.clone();
#[cfg(not(tokio_unstable))]
let _abort = workers.spawn_blocking_on(move || self_.worker(id, recv), self.server.runtime());
let handle = thread::Builder::new()
.name(WORKER_NAME.into())
.stack_size(WORKER_STACK_SIZE)
.spawn(move || self.worker(id, recv))?;
#[cfg(tokio_unstable)]
let _abort = workers
.build_task()
.name("conduwuit:dbpool")
.spawn_blocking_on(move || self_.worker(id, recv), self.server.runtime());
workers.push(handle);
Ok(())
}
@ -258,7 +269,7 @@ async fn execute(&self, queue: &Sender<Cmd>, cmd: Cmd) -> Result {
level = "debug",
skip(self, recv),
fields(
tid = ?std::thread::current().id(),
tid = ?thread::current().id(),
),
)]
fn worker(self: Arc<Self>, id: usize, recv: Receiver<Cmd>) {

View file

@ -19,7 +19,7 @@ use crate::clap::Args;
const WORKER_NAME: &str = "conduwuit:worker";
const WORKER_MIN: usize = 2;
const WORKER_KEEPALIVE: u64 = 36;
const MAX_BLOCKING_THREADS: usize = 2048;
const MAX_BLOCKING_THREADS: usize = 1024;
static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();

View file

@ -3,7 +3,7 @@ extern crate conduwuit_core as conduwuit;
extern crate conduwuit_service as service;
use std::{
sync::{atomic::Ordering, Arc},
sync::{atomic::Ordering, Arc, Weak},
time::Duration,
};
@ -92,11 +92,11 @@ pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
);
}
// The db threadpool requires async join if we use tokio/spawn_blocking to
// manage the threads. Without async-drop we have to wait here; for symmetry
// with Services construction it can't be done in services.stop().
if let Some(db) = db.upgrade() {
db.db.shutdown_pool().await;
if Weak::strong_count(&db) > 0 {
debug_error!(
"{} dangling references to Database after shutdown",
Weak::strong_count(&db)
);
}
#[cfg(feature = "systemd")]