From 52a561ff9e8196794995ccc2556c9976a636fb8b Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 7 Jul 2024 03:39:35 +0000 Subject: [PATCH] abstract expoential backoff to math utils. Signed-off-by: Jason Volk --- src/api/client/keys.rs | 21 +++++-------- src/api/client/membership.rs | 21 ++++++------- src/core/utils/math.rs | 18 +++++++++++ src/service/rooms/event_handler/mod.rs | 41 +++++++++++--------------- src/service/sending/sender.rs | 12 ++++---- 5 files changed, 60 insertions(+), 53 deletions(-) diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index 6f089875..7bb02a60 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -1,9 +1,9 @@ use std::{ - cmp, collections::{hash_map, BTreeMap, HashMap, HashSet}, - time::{Duration, Instant}, + time::Instant, }; +use conduit::{utils, utils::math::continue_exponential_backoff_secs, Error, Result}; use futures_util::{stream::FuturesUnordered, StreamExt}; use ruma::{ api::{ @@ -18,15 +18,11 @@ use ruma::{ DeviceKeyAlgorithm, OwnedDeviceId, OwnedUserId, UserId, }; use serde_json::json; +use service::user_is_local; use tracing::debug; use super::SESSION_ID_LENGTH; -use crate::{ - service::user_is_local, - services, - utils::{self}, - Error, Result, Ruma, -}; +use crate::{services, Ruma}; /// # `POST /_matrix/client/r0/keys/upload` /// @@ -357,11 +353,10 @@ pub(crate) async fn get_keys_helper bool + Send>( .get(server) { // Exponential backoff - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let min_elapsed_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); - - if time.elapsed() < min_elapsed_duration { - debug!("Backing off query from {:?}", server); + const MIN: u64 = 5 * 60; + const MAX: u64 = 60 * 60 * 24; + if continue_exponential_backoff_secs(MIN, MAX, time.elapsed(), *tries) { + debug!("Backing off query from {server:?}"); return (server, Err(Error::BadServerResponse("bad query, still backing off"))); } } diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 07a585fd..e2ba4c9a 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -1,13 +1,16 @@ use std::{ - cmp, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, net::IpAddr, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; use axum_client_ip::InsecureClientIp; -use conduit::utils::mutex_map; +use conduit::{ + debug, error, info, trace, utils, + utils::{math::continue_exponential_backoff_secs, mutex_map}, + warn, Error, PduEvent, Result, +}; use ruma::{ api::{ client::{ @@ -35,7 +38,6 @@ use ruma::{ use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use service::sending::convert_to_outgoing_federation_event; use tokio::sync::RwLock; -use tracing::{debug, error, info, trace, warn}; use crate::{ client::{update_avatar_url, update_displayname}, @@ -43,7 +45,7 @@ use crate::{ pdu::{gen_event_id_canonical_json, PduBuilder}, server_is_ours, user_is_local, }, - services, utils, Error, PduEvent, Result, Ruma, + services, Ruma, }; /// Checks if the room is banned in any way possible and the sender user is not @@ -1363,11 +1365,10 @@ pub async fn validate_and_add_event_id( .get(&event_id) { // Exponential backoff - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let min_elapsed_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); - - if time.elapsed() < min_elapsed_duration { - debug!("Backing off from {}", event_id); + const MIN: u64 = 60 * 5; + const MAX: u64 = 60 * 60 * 24; + if continue_exponential_backoff_secs(MIN, MAX, time.elapsed(), *tries) { + debug!("Backing off from {event_id}"); return Err(Error::BadServerResponse("bad event, still backing off")); } } diff --git a/src/core/utils/math.rs b/src/core/utils/math.rs index d5dbf3a6..a77f8e26 100644 --- a/src/core/utils/math.rs +++ b/src/core/utils/math.rs @@ -24,3 +24,21 @@ macro_rules! validated { macro_rules! validated { ($($input:tt)*) => { $crate::checked!($($input)*) } } + +/// Returns false if the exponential backoff has expired based on the inputs +#[inline] +#[must_use] +pub fn continue_exponential_backoff_secs(min: u64, max: u64, elapsed: Duration, tries: u32) -> bool { + let min = Duration::from_secs(min); + let max = Duration::from_secs(max); + continue_exponential_backoff(min, max, elapsed, tries) +} + +/// Returns false if the exponential backoff has expired based on the inputs +#[inline] +#[must_use] +pub fn continue_exponential_backoff(min: Duration, max: Duration, elapsed: Duration, tries: u32) -> bool { + let min = min.saturating_mul(tries).saturating_mul(tries); + let min = cmp::min(min, max); + elapsed < min +} diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 9f50ef58..395b70f2 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -2,14 +2,16 @@ mod parse_incoming_pdu; mod signing_keys; use std::{ - cmp, collections::{hash_map, BTreeMap, HashMap, HashSet}, pin::Pin, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; -use conduit::{debug_error, debug_info, Error, Result}; +use conduit::{ + debug, debug_error, debug_info, error, info, trace, utils::math::continue_exponential_backoff_secs, warn, Error, + Result, +}; use futures_util::Future; pub use parse_incoming_pdu::parse_incoming_pdu; use ruma::{ @@ -29,7 +31,6 @@ use ruma::{ uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, RoomVersionId, ServerName, }; use tokio::sync::RwLock; -use tracing::{debug, error, info, trace, warn}; use super::state_compressor::CompressedStateEvent; use crate::{pdu, services, PduEvent}; @@ -252,14 +253,12 @@ impl Service { .get(prev_id) { // Exponential backoff - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let min_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); - let duration = time.elapsed(); - - if duration < min_duration { + const MIN_DURATION: u64 = 5 * 60; + const MAX_DURATION: u64 = 60 * 60 * 24; + if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { debug!( - duration = ?duration, - min_duration = ?min_duration, + ?tries, + duration = ?time.elapsed(), "Backing off from prev_event" ); return Ok(()); @@ -1083,12 +1082,10 @@ impl Service { .get(&*next_id) { // Exponential backoff - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let min_elapsed_duration = - cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); - - if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", next_id); + const MIN_DURATION: u64 = 5 * 60; + const MAX_DURATION: u64 = 60 * 60 * 24; + if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { + info!("Backing off from {next_id}"); continue; } } @@ -1191,12 +1188,10 @@ impl Service { .get(&**next_id) { // Exponential backoff - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let min_elapsed_duration = - cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); - - if time.elapsed() < min_elapsed_duration { - debug!("Backing off from {}", next_id); + const MIN_DURATION: u64 = 5 * 60; + const MAX_DURATION: u64 = 60 * 60 * 24; + if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { + debug!("Backing off from {next_id}"); continue; } } diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 54302fd5..0f4fa17a 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -3,10 +3,11 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; use base64::{engine::general_purpose, Engine as _}; +use conduit::{debug, error, utils::math::continue_exponential_backoff_secs, warn}; use federation::transactions::send_transaction_message; use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use ruma::{ @@ -22,7 +23,6 @@ use ruma::{ ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tracing::{debug, error, warn}; use super::{appservice, send, Destination, Msg, SendingEvent, Service}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; @@ -216,11 +216,9 @@ impl Service { .and_modify(|e| match e { TransactionStatus::Failed(tries, time) => { // Fail if a request has failed recently (exponential backoff) - let max_duration = Duration::from_secs(services().globals.config.sender_retry_backoff_limit); - let min_duration = Duration::from_secs(services().globals.config.sender_timeout); - let min_elapsed_duration = min_duration * (*tries) * (*tries); - let min_elapsed_duration = cmp::min(min_elapsed_duration, max_duration); - if time.elapsed() < min_elapsed_duration { + let min = services().globals.config.sender_timeout; + let max = services().globals.config.sender_retry_backoff_limit; + if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) { allow = false; } else { retry = true;