diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 67c34b55..aa46cbc3 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -83,21 +83,6 @@ port = 6167 # likely need this to be 0.0.0.0. address = "127.0.0.1" -# How many requests conduwuit sends to other servers at the same time concurrently. Default is 500 -# Note that because conduwuit is very fast unlike other homeserver implementations, setting this too -# high could inadvertently result in ratelimits kicking in, or overloading lower-end homeservers out there. -# -# A valid use-case for enabling this is if you have a significant amount of overall federation activity -# such as many rooms joined/tracked, and many servers in the true destination cache caused by that. Upon -# rebooting conduwuit, depending on how fast your resources are, client and incoming federation requests -# may timeout or be "stalled" for a period of time due to hitting the max concurrent requests limit from -# refreshing federation/destination caches and such. -# -# If you have a lot of active users on your homeserver, you will definitely need to raise this. -# -# No this will not speed up room joins. -#max_concurrent_requests = 500 - # Max request size for file uploads max_request_size = 20_000_000 # in bytes diff --git a/src/config/mod.rs b/src/config/mod.rs index ec954598..a8d1f977 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -106,8 +106,6 @@ pub(crate) struct Config { #[serde(default = "default_max_request_size")] pub(crate) max_request_size: u32, - #[serde(default = "default_max_concurrent_requests")] - pub(crate) max_concurrent_requests: u16, #[serde(default = "default_max_fetch_prev_events")] pub(crate) max_fetch_prev_events: u16, @@ -511,7 +509,6 @@ impl fmt::Display for Config { ("DNS fallback to TCP", &self.dns_tcp_fallback.to_string()), ("Query all nameservers", &self.query_all_nameservers.to_string()), ("Maximum request size (bytes)", &self.max_request_size.to_string()), - ("Maximum concurrent requests", &self.max_concurrent_requests.to_string()), ("Sender retry backoff limit", &self.sender_retry_backoff_limit.to_string()), ("Request connect timeout", &self.request_conn_timeout.to_string()), ("Request timeout", &self.request_timeout.to_string()), @@ -877,8 +874,6 @@ fn default_max_request_size() -> u32 { 20 * 1024 * 1024 // Default to 20 MB } -fn default_max_concurrent_requests() -> u16 { 500 } - fn default_request_conn_timeout() -> u64 { 10 } fn default_request_timeout() -> u64 { 35 } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 30f83e1a..e1be1e16 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -25,7 +25,7 @@ use ruma::{ events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; -use tokio::sync::{Mutex, Semaphore}; +use tokio::sync::Mutex; use tracing::{debug, error, warn}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -39,7 +39,6 @@ pub(crate) struct Service { pub(crate) db: &'static dyn Data, /// The state for a given state hash. - pub(crate) maximum_requests: Arc, sender: loole::Sender, receiver: Mutex>, startup_netburst: bool, @@ -91,7 +90,6 @@ impl Service { db, sender, receiver: Mutex::new(receiver), - maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), startup_netburst: config.startup_netburst, startup_netburst_keep: config.startup_netburst_keep, }) @@ -245,10 +243,7 @@ impl Service { T: OutgoingRequest + Debug, { let client = &services().globals.client.federation; - let permit = self.maximum_requests.acquire().await; - let response = send::send(client, dest, request).await; - drop(permit); - response + send::send(client, dest, request).await } /// Sends a request to an appservice @@ -261,11 +256,7 @@ impl Service { where T: OutgoingRequest + Debug, { - let permit = self.maximum_requests.acquire().await; - let response = appservice::send_request(registration, request).await; - drop(permit); - - response + appservice::send_request(registration, request).await } /// Cleanup event data @@ -670,10 +661,8 @@ async fn send_events_dest_appservice(dest: &Destination, id: &String, events: Ve } } - let permit = services().sending.maximum_requests.acquire().await; - debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction"); - let response = match appservice::send_request( + match appservice::send_request( services() .appservice .get_registration(id) @@ -702,11 +691,7 @@ async fn send_events_dest_appservice(dest: &Destination, id: &String, events: Ve { Ok(_) => Ok(dest.clone()), Err(e) => Err((dest.clone(), e)), - }; - - drop(permit); - - response + } } #[tracing::instrument(skip(dest, events))] @@ -772,16 +757,12 @@ async fn send_events_dest_push( .try_into() .expect("notification count can't go that high"); - let permit = services().sending.maximum_requests.acquire().await; - let _response = services() .pusher .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) .await .map(|_response| dest.clone()) .map_err(|e| (dest.clone(), e)); - - drop(permit); } Ok(dest.clone()) @@ -830,10 +811,9 @@ async fn send_events_dest_normal( } } - let permit = services().sending.maximum_requests.acquire().await; let client = &services().globals.client.sender; debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty transaction"); - let response = send::send( + send::send( client, server_name, send_transaction_message::v1::Request { @@ -862,11 +842,7 @@ async fn send_events_dest_normal( } dest.clone() }) - .map_err(|e| (dest.clone(), e)); - - drop(permit); - - response + .map_err(|e| (dest.clone(), e)) } impl Destination {