diff --git a/Cargo.lock b/Cargo.lock index f2036284..48d1a91f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -694,6 +694,7 @@ dependencies = [ "tikv-jemalloc-sys", "tikv-jemallocator", "tokio", + "tokio-metrics", "tracing", "tracing-core", "tracing-subscriber", @@ -4058,6 +4059,7 @@ checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" dependencies = [ "futures-util", "pin-project-lite", + "tokio", "tokio-stream", ] diff --git a/Cargo.toml b/Cargo.toml index 33e2b9d9..e24d84b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,6 +200,9 @@ features = [ "io-util", ] +[workspace.dependencies.tokio-metrics] +version = "0.3.1" + [workspace.dependencies.libloading] version = "0.8.3" @@ -382,10 +385,6 @@ version = "0.5.4" default-features = false features = ["use_std"] -[workspace.dependencies.tokio-metrics] -version = "0.3.1" -default-features = false - [workspace.dependencies.console-subscriber] version = "0.3" diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index a5087467..46f71622 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -653,3 +653,37 @@ pub(super) fn memory_stats() -> RoomMessageEventContent { html_body.expect("string result"), ) } + +#[cfg(tokio_unstable)] +pub(super) async fn runtime_metrics(_body: Vec<&str>) -> Result { + let out = services().server.metrics.runtime_metrics().map_or_else( + || "Runtime metrics are not available.".to_owned(), + |metrics| format!("```rs\n{metrics:#?}\n```"), + ); + + Ok(RoomMessageEventContent::text_markdown(out)) +} + +#[cfg(not(tokio_unstable))] +pub(super) async fn runtime_metrics(_body: Vec<&str>) -> Result { + Ok(RoomMessageEventContent::text_markdown( + "Runtime metrics require building with `tokio_unstable`.", + )) +} + +#[cfg(tokio_unstable)] +pub(super) async fn runtime_interval(_body: Vec<&str>) -> Result { + let out = services().server.metrics.runtime_interval().map_or_else( + || "Runtime metrics are not available.".to_owned(), + |metrics| format!("```rs\n{metrics:#?}\n```"), + ); + + Ok(RoomMessageEventContent::text_markdown(out)) +} + +#[cfg(not(tokio_unstable))] +pub(super) async fn runtime_interval(_body: Vec<&str>) -> Result { + Ok(RoomMessageEventContent::text_markdown( + "Runtime metrics require building with `tokio_unstable`.", + )) +} diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index eed3b633..7d6cafa7 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -160,6 +160,13 @@ pub(super) enum DebugCommand { /// - Print extended memory usage MemoryStats, + /// - Print general tokio runtime metric totals. + RuntimeMetrics, + + /// - Print detailed tokio runtime metrics accumulated since last command + /// invocation. + RuntimeInterval, + /// - Developer test stubs #[command(subcommand)] Tester(TesterCommand), @@ -213,6 +220,8 @@ pub(super) async fn process(command: DebugCommand, body: Vec<&str>) -> Result resolve_true_destination(body, server_name, no_cache).await?, DebugCommand::MemoryStats => memory_stats(), + DebugCommand::RuntimeMetrics => runtime_metrics(body).await?, + DebugCommand::RuntimeInterval => runtime_interval(body).await?, DebugCommand::Tester(command) => tester::process(command, body).await?, }) } diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index 73ee0152..453d7b13 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -82,6 +82,7 @@ tikv-jemalloc-ctl.workspace = true tikv-jemalloc-sys.optional = true tikv-jemalloc-sys.workspace = true tokio.workspace = true +tokio-metrics.workspace = true tracing-core.workspace = true tracing-subscriber.workspace = true tracing.workspace = true diff --git a/src/core/metrics/mod.rs b/src/core/metrics/mod.rs new file mode 100644 index 00000000..3ae139a8 --- /dev/null +++ b/src/core/metrics/mod.rs @@ -0,0 +1,72 @@ +use std::sync::atomic::AtomicU32; + +use tokio::runtime; +use tokio_metrics::TaskMonitor; +#[cfg(tokio_unstable)] +use tokio_metrics::{RuntimeIntervals, RuntimeMonitor}; + +pub struct Metrics { + _runtime: Option, + + runtime_metrics: Option, + + task_monitor: Option, + + #[cfg(tokio_unstable)] + _runtime_monitor: Option, + + #[cfg(tokio_unstable)] + runtime_intervals: std::sync::Mutex>, + + // TODO: move stats + pub requests_spawn_active: AtomicU32, + pub requests_spawn_finished: AtomicU32, + pub requests_handle_active: AtomicU32, + pub requests_handle_finished: AtomicU32, + pub requests_panic: AtomicU32, +} + +impl Metrics { + #[must_use] + pub fn new(runtime: Option) -> Self { + #[cfg(tokio_unstable)] + let runtime_monitor = runtime.as_ref().map(RuntimeMonitor::new); + + #[cfg(tokio_unstable)] + let runtime_intervals = runtime_monitor.as_ref().map(RuntimeMonitor::intervals); + + Self { + _runtime: runtime.clone(), + + runtime_metrics: runtime.as_ref().map(runtime::Handle::metrics), + + task_monitor: runtime.map(|_| TaskMonitor::new()), + + #[cfg(tokio_unstable)] + _runtime_monitor: runtime_monitor, + + #[cfg(tokio_unstable)] + runtime_intervals: std::sync::Mutex::new(runtime_intervals), + + requests_spawn_active: AtomicU32::new(0), + requests_spawn_finished: AtomicU32::new(0), + requests_handle_active: AtomicU32::new(0), + requests_handle_finished: AtomicU32::new(0), + requests_panic: AtomicU32::new(0), + } + } + + #[cfg(tokio_unstable)] + pub fn runtime_interval(&self) -> Option { + self.runtime_intervals + .lock() + .expect("locked") + .as_mut() + .map(Iterator::next) + .expect("next interval") + } + + pub fn task_root(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() } + + pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> { self.runtime_metrics.as_ref() } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index de8057fa..9716b46e 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -3,6 +3,7 @@ pub mod config; pub mod debug; pub mod error; pub mod log; +pub mod metrics; pub mod mods; pub mod pdu; pub mod server; diff --git a/src/core/server.rs b/src/core/server.rs index 752680ad..bf0ab99d 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -1,11 +1,11 @@ use std::{ - sync::atomic::{AtomicBool, AtomicU32, Ordering}, + sync::atomic::{AtomicBool, Ordering}, time::SystemTime, }; use tokio::{runtime, sync::broadcast}; -use crate::{config::Config, log, Err, Result}; +use crate::{config::Config, log::Log, metrics::Metrics, Err, Result}; /// Server runtime state; public portion pub struct Server { @@ -33,33 +33,25 @@ pub struct Server { pub signal: broadcast::Sender<&'static str>, /// Logging subsystem state - pub log: log::Log, + pub log: Log, - /// TODO: move stats - pub requests_spawn_active: AtomicU32, - pub requests_spawn_finished: AtomicU32, - pub requests_handle_active: AtomicU32, - pub requests_handle_finished: AtomicU32, - pub requests_panic: AtomicU32, + /// Metrics subsystem state + pub metrics: Metrics, } impl Server { #[must_use] - pub fn new(config: Config, runtime: Option, log: log::Log) -> Self { + pub fn new(config: Config, runtime: Option, log: Log) -> Self { Self { config, started: SystemTime::now(), stopping: AtomicBool::new(false), reloading: AtomicBool::new(false), restarting: AtomicBool::new(false), - runtime, + runtime: runtime.clone(), signal: broadcast::channel::<&'static str>(1).0, log, - requests_spawn_active: AtomicU32::new(0), - requests_spawn_finished: AtomicU32::new(0), - requests_handle_active: AtomicU32::new(0), - requests_handle_finished: AtomicU32::new(0), - requests_panic: AtomicU32::new(0), + metrics: Metrics::new(runtime), } } diff --git a/src/main/main.rs b/src/main/main.rs index 23f53b4e..959e8610 100644 --- a/src/main/main.rs +++ b/src/main/main.rs @@ -20,7 +20,7 @@ use tokio::runtime; const WORKER_NAME: &str = "conduwuit:worker"; const WORKER_MIN: usize = 2; -const WORKER_KEEPALIVE_MS: u64 = 2500; +const WORKER_KEEPALIVE: u64 = 36; fn main() -> Result<(), Error> { let args = clap::parse(); @@ -29,7 +29,7 @@ fn main() -> Result<(), Error> { .enable_time() .thread_name(WORKER_NAME) .worker_threads(cmp::max(WORKER_MIN, available_parallelism())) - .thread_keep_alive(Duration::from_millis(WORKER_KEEPALIVE_MS)) + .thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE)) .build() .expect("built runtime"); diff --git a/src/router/layers.rs b/src/router/layers.rs index 8c2e114b..073940f1 100644 --- a/src/router/layers.rs +++ b/src/router/layers.rs @@ -153,6 +153,7 @@ fn body_limit_layer(server: &Server) -> DefaultBodyLimit { DefaultBodyLimit::max fn catch_panic(err: Box) -> http::Response> { conduit_service::services() .server + .metrics .requests_panic .fetch_add(1, std::sync::atomic::Ordering::Release); diff --git a/src/router/request.rs b/src/router/request.rs index 851bd168..ae739984 100644 --- a/src/router/request.rs +++ b/src/router/request.rs @@ -17,11 +17,14 @@ pub(crate) async fn spawn( return Err(StatusCode::SERVICE_UNAVAILABLE); } - let active = server.requests_spawn_active.fetch_add(1, Ordering::Relaxed); + let active = server + .metrics + .requests_spawn_active + .fetch_add(1, Ordering::Relaxed); trace!(active, "enter"); defer! {{ - let active = server.requests_spawn_active.fetch_sub(1, Ordering::Relaxed); - let finished = server.requests_spawn_finished.fetch_add(1, Ordering::Relaxed); + let active = server.metrics.requests_spawn_active.fetch_sub(1, Ordering::Relaxed); + let finished = server.metrics.requests_spawn_finished.fetch_add(1, Ordering::Relaxed); trace!(active, finished, "leave"); }}; @@ -45,12 +48,13 @@ pub(crate) async fn handle( } let active = server + .metrics .requests_handle_active .fetch_add(1, Ordering::Relaxed); trace!(active, "enter"); defer! {{ - let active = server.requests_handle_active.fetch_sub(1, Ordering::Relaxed); - let finished = server.requests_handle_finished.fetch_add(1, Ordering::Relaxed); + let active = server.metrics.requests_handle_active.fetch_sub(1, Ordering::Relaxed); + let finished = server.metrics.requests_handle_finished.fetch_add(1, Ordering::Relaxed); trace!(active, finished, "leave"); }}; diff --git a/src/router/run.rs b/src/router/run.rs index b3ec2285..91507772 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -108,7 +108,7 @@ async fn handle_shutdown(server: &Arc, tx: &Sender<()>, handle: &axum_se error!("failed sending shutdown transaction to channel: {e}"); } - let pending = server.requests_spawn_active.load(Ordering::Relaxed); + let pending = server.metrics.requests_spawn_active.load(Ordering::Relaxed); if pending > 0 { let timeout = Duration::from_secs(36); trace!(pending, ?timeout, "Notifying for graceful shutdown"); diff --git a/src/router/serve/plain.rs b/src/router/serve/plain.rs index b79d342d..08263353 100644 --- a/src/router/serve/plain.rs +++ b/src/router/serve/plain.rs @@ -21,12 +21,21 @@ pub(super) async fn serve( info!("Listening on {addrs:?}"); while join_set.join_next().await.is_some() {} - let spawn_active = server.requests_spawn_active.load(Ordering::Relaxed); - let handle_active = server.requests_handle_active.load(Ordering::Relaxed); + let spawn_active = server.metrics.requests_spawn_active.load(Ordering::Relaxed); + let handle_active = server + .metrics + .requests_handle_active + .load(Ordering::Relaxed); debug_info!( - spawn_finished = server.requests_spawn_finished.load(Ordering::Relaxed), - handle_finished = server.requests_handle_finished.load(Ordering::Relaxed), - panics = server.requests_panic.load(Ordering::Relaxed), + spawn_finished = server + .metrics + .requests_spawn_finished + .load(Ordering::Relaxed), + handle_finished = server + .metrics + .requests_handle_finished + .load(Ordering::Relaxed), + panics = server.metrics.requests_panic.load(Ordering::Relaxed), spawn_active, handle_active, "Stopped listening on {addrs:?}",