From d2fb6d04c9cb6583814f86366de389fc02e3cf73 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 14 Jul 2024 01:11:03 +0000 Subject: [PATCH] cleanup pending transactions before sender worker completes Signed-off-by: Jason Volk --- src/service/sending/sender.rs | 40 +++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index cfd5b4bc..2f542dfe 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -3,12 +3,12 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, sync::Arc, - time::Instant, + time::{Duration, Instant}, }; use async_trait::async_trait; use base64::{engine::general_purpose, Engine as _}; -use conduit::{debug, error, utils::math::continue_exponential_backoff_secs, warn}; +use conduit::{debug, debug_warn, error, trace, utils::math::continue_exponential_backoff_secs, warn}; use federation::transactions::send_transaction_message; use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use ruma::{ @@ -24,7 +24,7 @@ use ruma::{ ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, time::sleep_until}; use super::{appservice, data::Data, send, Destination, Msg, SendingEvent, Service}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; @@ -44,6 +44,7 @@ type CurTransactionStatus = HashMap; const DEQUEUE_LIMIT: usize = 48; const SELECT_EDU_LIMIT: usize = 16; +const CLEANUP_TIMEOUT_MS: u64 = 3500; #[async_trait] impl crate::Service for Service { @@ -65,19 +66,22 @@ impl crate::Service for Service { let mut futures: SendingFutures<'_> = FuturesUnordered::new(); let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); - self.initial_transactions(&futures, &mut statuses); + self.initial_requests(&futures, &mut statuses); loop { debug_assert!(!receiver.is_closed(), "channel error"); tokio::select! { request = receiver.recv_async() => match request { Ok(request) => self.handle_request(request, &futures, &mut statuses), - Err(_) => return Ok(()), + Err(_) => break, }, Some(response) = futures.next() => { - self.handle_response(response, &mut futures, &mut statuses); + self.handle_response(response, &futures, &mut statuses); }, } } + self.finish_responses(&mut futures, &mut statuses).await; + + Ok(()) } fn interrupt(&self) { @@ -91,7 +95,7 @@ impl crate::Service for Service { impl Service { fn handle_response( - &self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, + &self, response: SendingResult, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { match response { Ok(dest) => self.handle_response_ok(&dest, futures, statuses), @@ -100,7 +104,7 @@ impl Service { } fn handle_response_err( - dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error, + dest: Destination, _futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error, ) { debug!(dest = ?dest, "{e:?}"); statuses.entry(dest).and_modify(|e| { @@ -151,7 +155,25 @@ impl Service { } } - fn initial_transactions(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) { + async fn finish_responses(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) { + let now = Instant::now(); + let timeout = Duration::from_millis(CLEANUP_TIMEOUT_MS); + let deadline = now.checked_add(timeout).unwrap_or(now); + loop { + trace!("Waiting for {} requests to complete...", futures.len()); + tokio::select! { + () = sleep_until(deadline.into()) => break, + response = futures.next() => match response { + Some(response) => self.handle_response(response, futures, statuses), + None => return, + } + } + } + + debug_warn!("Leaving with {} unfinished requests...", futures.len()); + } + + fn initial_requests(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) { let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX); let mut txns = HashMap::>::new(); for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) {