From 59c073d0d86ca8a6b9606037e2278890b5b84821 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 6 Feb 2025 23:58:45 +0000 Subject: [PATCH] add unconstrained feature to service worker Signed-off-by: Jason Volk --- src/service/manager.rs | 9 +++++++-- src/service/sending/mod.rs | 13 +++++++++++-- src/service/service.rs | 5 +++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/service/manager.rs b/src/service/manager.rs index ea33d285..e0d885c2 100644 --- a/src/service/manager.rs +++ b/src/service/manager.rs @@ -1,7 +1,7 @@ use std::{panic::AssertUnwindSafe, sync::Arc, time::Duration}; use conduwuit::{debug, debug_warn, error, trace, utils::time, warn, Err, Error, Result, Server}; -use futures::FutureExt; +use futures::{FutureExt, TryFutureExt}; use tokio::{ sync::{Mutex, MutexGuard}, task::{JoinHandle, JoinSet}, @@ -183,9 +183,14 @@ async fn worker(service: Arc) -> WorkerResult { let service_ = Arc::clone(&service); let result = AssertUnwindSafe(service_.worker()) .catch_unwind() - .await .map_err(Error::from_panic); + let result = if service.unconstrained() { + tokio::task::unconstrained(result).await + } else { + result.await + }; + // flattens JoinError for panic into worker's Error (service, result.unwrap_or_else(Err)) } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index b146ad49..86b219f7 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -22,7 +22,7 @@ use ruma::{ RoomId, ServerName, UserId, }; use smallvec::SmallVec; -use tokio::task::JoinSet; +use tokio::{task, task::JoinSet}; use self::data::Data; pub use self::{ @@ -111,8 +111,15 @@ impl crate::Service for Service { .enumerate() .fold(JoinSet::new(), |mut joinset, (id, _)| { let self_ = self.clone(); + let worker = self_.sender(id); + let worker = if self.unconstrained() { + task::unconstrained(worker).boxed() + } else { + worker.boxed() + }; + let runtime = self.server.runtime(); - let _abort = joinset.spawn_on(self_.sender(id).boxed(), runtime); + let _abort = joinset.spawn_on(worker, runtime); joinset }); @@ -139,6 +146,8 @@ impl crate::Service for Service { } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } + + fn unconstrained(&self) -> bool { true } } impl Service { diff --git a/src/service/service.rs b/src/service/service.rs index 7adb189e..cad01437 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -39,6 +39,11 @@ pub(crate) trait Service: Any + Send + Sync { /// Return the name of the service. /// i.e. `crate::service::make_name(std::module_path!())` fn name(&self) -> &str; + + /// Return true if the service worker opts out of the tokio cooperative + /// budgeting. This can reduce tail latency at the risk of event loop + /// starvation. + fn unconstrained(&self) -> bool { false } } /// Args are passed to `Service::build` when a service is constructed. This