From d36167ab64560221ac08e848e479b285d8c4dc48 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 5 Jan 2025 23:33:27 +0000 Subject: [PATCH] partially revert 9a9c071e82; use std threads for db pool. Signed-off-by: Jason Volk --- src/database/engine.rs | 26 +++++----- src/database/pool.rs | 107 +++++++++++++++++++++++------------------ src/main/runtime.rs | 2 +- src/router/run.rs | 12 ++--- 4 files changed, 80 insertions(+), 67 deletions(-) diff --git a/src/database/engine.rs b/src/database/engine.rs index faf5b131..670817b5 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -31,11 +31,11 @@ pub struct Engine { opts: Options, env: Env, cfs: Mutex>, + pub(crate) pool: Arc, pub(crate) db: Db, corks: AtomicU32, pub(super) read_only: bool, pub(super) secondary: bool, - pub(crate) pool: Arc, } pub(crate) type Db = DBWithThreadMode; @@ -44,6 +44,8 @@ impl Engine { #[tracing::instrument(skip_all)] pub(crate) async fn open(server: &Arc) -> Result> { let config = &server.config; + let path = &config.database_path; + let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0; #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] @@ -64,30 +66,32 @@ impl Engine { col_cache.get("primary").expect("primary cache exists"), )?; - let load_time = std::time::Instant::now(); - if config.rocksdb_repair { - repair(&db_opts, &config.database_path)?; - } - debug!("Listing column families in database"); let cfs = Db::list_cf(&db_opts, &config.database_path) .unwrap_or_default() .into_iter() .collect::>(); - debug!("Opening {} column family descriptors in database", cfs.len()); + debug!("Configuring {} column families found in database", cfs.len()); let cfopts = cfs .iter() .map(|name| cf_options(config, name, db_opts.clone(), &mut col_cache)) .collect::>>()?; + debug!("Opening {} column family descriptors in database", cfs.len()); let cfds = cfs .iter() .zip(cfopts.into_iter()) .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)) .collect::>(); - let path = &config.database_path; + debug!("Starting frontend request pool"); + let pool = Pool::new(server)?; + + let load_time = std::time::Instant::now(); + if config.rocksdb_repair { + repair(&db_opts, &config.database_path)?; + } debug!("Opening database..."); let res = if config.rocksdb_read_only { @@ -113,11 +117,11 @@ impl Engine { opts: db_opts, env: db_env, cfs: Mutex::new(cfs), - db, corks: AtomicU32::new(0), read_only: config.rocksdb_read_only, secondary: config.rocksdb_secondary, - pool: Pool::new(server).await?, + pool, + db, })) } @@ -146,8 +150,6 @@ impl Engine { .expect("column was created and exists") } - pub async fn shutdown_pool(&self) { self.pool.shutdown().await; } - pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) } pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) } diff --git a/src/database/pool.rs b/src/database/pool.rs index 0295e0e9..1560c8b0 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -6,21 +6,22 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }, + thread, + thread::JoinHandle, }; use async_channel::{QueueStrategy, Receiver, RecvError, Sender}; use conduwuit::{ - debug, debug_warn, defer, err, implement, + debug, debug_warn, defer, err, error, implement, result::DebugInspect, trace, utils::sys::compute::{get_affinity, nth_core_available, set_affinity}, - Result, Server, + Error, Result, Server, }; use futures::{channel::oneshot, TryFutureExt}; use oneshot::Sender as ResultSender; use rocksdb::Direction; use smallvec::SmallVec; -use tokio::task::JoinSet; use self::configure::configure; use crate::{keyval::KeyBuf, stream, Handle, Map}; @@ -29,9 +30,9 @@ use crate::{keyval::KeyBuf, stream, Handle, Map}; /// requests which are not cached. These thread-blocking requests are offloaded /// from the tokio async workers and executed on this threadpool. pub(crate) struct Pool { - server: Arc, + _server: Arc, queues: Vec>, - workers: Mutex>, + workers: Mutex>>, topology: Vec, busy: AtomicUsize, queued_max: AtomicUsize, @@ -69,39 +70,42 @@ const WORKER_LIMIT: (usize, usize) = (1, 1024); const QUEUE_LIMIT: (usize, usize) = (1, 2048); const BATCH_INLINE: usize = 1; +const WORKER_STACK_SIZE: usize = 1_048_576; +const WORKER_NAME: &str = "conduwuit:db"; + #[implement(Pool)] -pub(crate) async fn new(server: &Arc) -> Result> { +pub(crate) fn new(server: &Arc) -> Result> { const CHAN_SCHED: (QueueStrategy, QueueStrategy) = (QueueStrategy::Fifo, QueueStrategy::Lifo); let (total_workers, queue_sizes, topology) = configure(server); - let (senders, receivers) = queue_sizes + let (senders, receivers): (Vec<_>, Vec<_>) = queue_sizes .into_iter() .map(|cap| async_channel::bounded_with_queue_strategy(cap, CHAN_SCHED)) .unzip(); let pool = Arc::new(Self { - server: server.clone(), - + _server: server.clone(), queues: senders, - - workers: JoinSet::new().into(), - + workers: Vec::new().into(), topology, - busy: AtomicUsize::default(), - queued_max: AtomicUsize::default(), }); - pool.spawn_until(receivers, total_workers).await?; + pool.spawn_until(&receivers, total_workers)?; Ok(pool) } impl Drop for Pool { fn drop(&mut self) { - debug_assert!(self.queues.iter().all(Sender::is_empty), "channel must be empty on drop"); + self.close(); + + debug_assert!( + self.queues.iter().all(Sender::is_empty), + "channel must should not have requests queued on drop" + ); debug_assert!( self.queues.iter().all(Sender::is_closed), "channel should be closed on drop" @@ -110,17 +114,10 @@ impl Drop for Pool { } #[implement(Pool)] -pub(crate) async fn shutdown(self: &Arc) { - self.close(); - - let workers = take(&mut *self.workers.lock().expect("locked")); - debug!(workers = workers.len(), "Waiting for workers to join..."); - - workers.join_all().await; -} - -#[implement(Pool)] +#[tracing::instrument(skip_all)] pub(crate) fn close(&self) { + let workers = take(&mut *self.workers.lock().expect("locked")); + let senders = self.queues.iter().map(Sender::sender_count).sum::(); let receivers = self @@ -129,27 +126,40 @@ pub(crate) fn close(&self) { .map(Sender::receiver_count) .sum::(); - debug!( - queues = self.queues.len(), - workers = self.workers.lock().expect("locked").len(), - ?senders, - ?receivers, - "Closing pool..." - ); - for queue in &self.queues { queue.close(); } - self.workers.lock().expect("locked").abort_all(); - std::thread::yield_now(); + if workers.is_empty() { + return; + } + + debug!( + senders, + receivers, + queues = self.queues.len(), + workers = workers.len(), + "Closing pool. Waiting for workers to join..." + ); + + workers + .into_iter() + .map(JoinHandle::join) + .map(|result| result.map_err(Error::from_panic)) + .enumerate() + .for_each(|(id, result)| { + match result { + | Ok(()) => trace!(?id, "worker joined"), + | Err(error) => error!(?id, "worker joined with error: {error}"), + }; + }); } #[implement(Pool)] -async fn spawn_until(self: &Arc, recv: Vec>, count: usize) -> Result { +fn spawn_until(self: &Arc, recv: &[Receiver], count: usize) -> Result { let mut workers = self.workers.lock().expect("locked"); while workers.len() < count { - self.spawn_one(&mut workers, &recv)?; + self.clone().spawn_one(&mut workers, recv)?; } Ok(()) @@ -162,23 +172,24 @@ async fn spawn_until(self: &Arc, recv: Vec>, count: usize) - skip_all, fields(id = %workers.len()) )] -fn spawn_one(self: &Arc, workers: &mut JoinSet<()>, recv: &[Receiver]) -> Result { +fn spawn_one( + self: Arc, + workers: &mut Vec>, + recv: &[Receiver], +) -> Result { debug_assert!(!self.queues.is_empty(), "Must have at least one queue"); debug_assert!(!recv.is_empty(), "Must have at least one receiver"); let id = workers.len(); let group = id.overflowing_rem(self.queues.len()).0; let recv = recv[group].clone(); - let self_ = self.clone(); - #[cfg(not(tokio_unstable))] - let _abort = workers.spawn_blocking_on(move || self_.worker(id, recv), self.server.runtime()); + let handle = thread::Builder::new() + .name(WORKER_NAME.into()) + .stack_size(WORKER_STACK_SIZE) + .spawn(move || self.worker(id, recv))?; - #[cfg(tokio_unstable)] - let _abort = workers - .build_task() - .name("conduwuit:dbpool") - .spawn_blocking_on(move || self_.worker(id, recv), self.server.runtime()); + workers.push(handle); Ok(()) } @@ -258,7 +269,7 @@ async fn execute(&self, queue: &Sender, cmd: Cmd) -> Result { level = "debug", skip(self, recv), fields( - tid = ?std::thread::current().id(), + tid = ?thread::current().id(), ), )] fn worker(self: Arc, id: usize, recv: Receiver) { diff --git a/src/main/runtime.rs b/src/main/runtime.rs index b9dfc866..3039ef1b 100644 --- a/src/main/runtime.rs +++ b/src/main/runtime.rs @@ -19,7 +19,7 @@ use crate::clap::Args; const WORKER_NAME: &str = "conduwuit:worker"; const WORKER_MIN: usize = 2; const WORKER_KEEPALIVE: u64 = 36; -const MAX_BLOCKING_THREADS: usize = 2048; +const MAX_BLOCKING_THREADS: usize = 1024; static WORKER_AFFINITY: OnceLock = OnceLock::new(); diff --git a/src/router/run.rs b/src/router/run.rs index 248f7052..1b4d7437 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -3,7 +3,7 @@ extern crate conduwuit_core as conduwuit; extern crate conduwuit_service as service; use std::{ - sync::{atomic::Ordering, Arc}, + sync::{atomic::Ordering, Arc, Weak}, time::Duration, }; @@ -92,11 +92,11 @@ pub(crate) async fn stop(services: Arc) -> Result<()> { ); } - // The db threadpool requires async join if we use tokio/spawn_blocking to - // manage the threads. Without async-drop we have to wait here; for symmetry - // with Services construction it can't be done in services.stop(). - if let Some(db) = db.upgrade() { - db.db.shutdown_pool().await; + if Weak::strong_count(&db) > 0 { + debug_error!( + "{} dangling references to Database after shutdown", + Weak::strong_count(&db) + ); } #[cfg(feature = "systemd")]