diff --git a/src/core/metrics/mod.rs b/src/core/metrics/mod.rs index f2022166..8f7a5571 100644 --- a/src/core/metrics/mod.rs +++ b/src/core/metrics/mod.rs @@ -19,8 +19,6 @@ pub struct Metrics { runtime_intervals: std::sync::Mutex>, // TODO: move stats - pub requests_spawn_active: AtomicU32, - pub requests_spawn_finished: AtomicU32, pub requests_handle_active: AtomicU32, pub requests_handle_finished: AtomicU32, pub requests_panic: AtomicU32, @@ -48,8 +46,6 @@ impl Metrics { #[cfg(tokio_unstable)] runtime_intervals: std::sync::Mutex::new(runtime_intervals), - requests_spawn_active: AtomicU32::new(0), - requests_spawn_finished: AtomicU32::new(0), requests_handle_active: AtomicU32::new(0), requests_handle_finished: AtomicU32::new(0), requests_panic: AtomicU32::new(0), diff --git a/src/router/layers.rs b/src/router/layers.rs index 96bca4fd..c5227c22 100644 --- a/src/router/layers.rs +++ b/src/router/layers.rs @@ -5,7 +5,7 @@ use axum::{ Router, }; use axum_client_ip::SecureClientIpSource; -use conduwuit::{error, Result, Server}; +use conduwuit::{debug, error, Result, Server}; use conduwuit_api::router::state::Guard; use conduwuit_service::Services; use http::{ @@ -50,7 +50,6 @@ pub(crate) fn build(services: &Arc) -> Result<(Router, Guard)> { let layers = layers .layer(SetSensitiveHeadersLayer::new([header::AUTHORIZATION])) - .layer(axum::middleware::from_fn_with_state(Arc::clone(services), request::spawn)) .layer( TraceLayer::new_for_http() .make_span_with(tracing_span::<_>) @@ -196,20 +195,26 @@ fn catch_panic( } fn tracing_span(request: &http::Request) -> tracing::Span { - let path = request.extensions().get::().map_or_else( - || { - request - .uri() - .path_and_query() - .expect("all requests have a path") - .as_str() - }, - truncated_matched_path, - ); + let path = request + .extensions() + .get::() + .map_or_else(|| request_path_str(request), truncated_matched_path); - let method = request.method(); + tracing::span! { + parent: None, + debug::INFO_SPAN_LEVEL, + "router", + method = %request.method(), + %path, + } +} - tracing::debug_span!(parent: None, "router", %method, %path) +fn request_path_str(request: &http::Request) -> &str { + request + .uri() + .path_and_query() + .expect("all requests have a path") + .as_str() } fn truncated_matched_path(path: &MatchedPath) -> &str { diff --git a/src/router/request.rs b/src/router/request.rs index ca063338..f7b94417 100644 --- a/src/router/request.rs +++ b/src/router/request.rs @@ -8,48 +8,6 @@ use conduwuit::{debug, debug_error, debug_warn, err, error, trace, Result}; use conduwuit_service::Services; use http::{Method, StatusCode, Uri}; -#[tracing::instrument( - parent = None, - level = "trace", - skip_all, - fields( - handled = %services - .server - .metrics - .requests_spawn_finished - .fetch_add(1, Ordering::Relaxed), - active = %services - .server - .metrics - .requests_spawn_active - .fetch_add(1, Ordering::Relaxed), - ) -)] -pub(crate) async fn spawn( - State(services): State>, - req: http::Request, - next: axum::middleware::Next, -) -> Result { - let server = &services.server; - - #[cfg(debug_assertions)] - conduwuit::defer! {{ - _ = server - .metrics - .requests_spawn_active - .fetch_sub(1, Ordering::Relaxed); - }}; - - if !server.running() { - debug_warn!("unavailable pending shutdown"); - return Err(StatusCode::SERVICE_UNAVAILABLE); - } - - let fut = next.run(req); - let task = server.runtime().spawn(fut); - task.await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) -} - #[tracing::instrument( level = "debug", skip_all, @@ -71,17 +29,15 @@ pub(crate) async fn handle( req: http::Request, next: axum::middleware::Next, ) -> Result { - let server = &services.server; - #[cfg(debug_assertions)] conduwuit::defer! {{ - _ = server + _ = services.server .metrics .requests_handle_active .fetch_sub(1, Ordering::Relaxed); }}; - if !server.running() { + if !services.server.running() { debug_warn!( method = %req.method(), uri = %req.uri(), @@ -91,10 +47,15 @@ pub(crate) async fn handle( return Err(StatusCode::SERVICE_UNAVAILABLE); } - let uri = req.uri().clone(); let method = req.method().clone(); - let result = next.run(req).await; - handle_result(&method, &uri, result) + let uri = req.uri().clone(); + services + .server + .runtime() + .spawn(next.run(req)) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) + .and_then(|result| handle_result(&method, &uri, result)) } fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result { diff --git a/src/router/run.rs b/src/router/run.rs index ea8a7666..605168b8 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -125,7 +125,6 @@ async fn handle_shutdown(server: Arc, tx: Sender<()>, handle: axum_serve let timeout = Duration::from_secs(36); debug!( ?timeout, - spawn_active = ?server.metrics.requests_spawn_active.load(Ordering::Relaxed), handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed), "Notifying for graceful shutdown" ); diff --git a/src/router/serve/plain.rs b/src/router/serve/plain.rs index 0e971f3c..535282b9 100644 --- a/src/router/serve/plain.rs +++ b/src/router/serve/plain.rs @@ -24,27 +24,20 @@ pub(super) async fn serve( info!("Listening on {addrs:?}"); while join_set.join_next().await.is_some() {} - let spawn_active = server.metrics.requests_spawn_active.load(Ordering::Relaxed); let handle_active = server .metrics .requests_handle_active .load(Ordering::Relaxed); debug_info!( - spawn_finished = server - .metrics - .requests_spawn_finished - .load(Ordering::Relaxed), handle_finished = server .metrics .requests_handle_finished .load(Ordering::Relaxed), panics = server.metrics.requests_panic.load(Ordering::Relaxed), - spawn_active, handle_active, "Stopped listening on {addrs:?}", ); - debug_assert!(spawn_active == 0, "active request tasks are not joined"); debug_assert!(handle_active == 0, "active request handles still pending"); Ok(()) diff --git a/src/router/serve/unix.rs b/src/router/serve/unix.rs index 6855b34c..6a030c30 100644 --- a/src/router/serve/unix.rs +++ b/src/router/serve/unix.rs @@ -159,7 +159,12 @@ async fn fini(server: &Arc, listener: UnixListener, mut tasks: JoinSet<( drop(listener); debug!("Waiting for requests to finish..."); - while server.metrics.requests_spawn_active.load(Ordering::Relaxed) > 0 { + while server + .metrics + .requests_handle_active + .load(Ordering::Relaxed) + .gt(&0) + { tokio::select! { task = tasks.join_next() => if task.is_none() { break; }, () = sleep(FINI_POLL_INTERVAL) => {},