diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index b1e909c9..5c00915b 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -7,7 +7,7 @@ use std::{ use base64::{engine::general_purpose, Engine as _}; use conduit::{ - debug, debug_warn, err, error, + debug, err, error, result::LogErr, trace, utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt}, @@ -35,7 +35,6 @@ use ruma::{ RoomVersionId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::time::sleep_until; use super::{appservice, data::QueueItem, Destination, Msg, SendingEvent, Service}; @@ -81,7 +80,7 @@ impl Service { }, } } - self.finish_responses(&mut futures, &mut statuses).await; + self.finish_responses(&mut futures).await; Ok(()) } @@ -146,22 +145,26 @@ impl Service { } } - async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus) { + async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>) { + use tokio::{ + select, + time::{sleep_until, Instant}, + }; + let now = Instant::now(); let timeout = Duration::from_millis(CLEANUP_TIMEOUT_MS); let deadline = now.checked_add(timeout).unwrap_or(now); loop { trace!("Waiting for {} requests to complete...", futures.len()); - tokio::select! { - () = sleep_until(deadline.into()) => break, + select! { + () = sleep_until(deadline) => return, response = futures.next() => match response { - Some(response) => self.handle_response(response, futures, statuses).await, + Some(Ok(dest)) => self.db.delete_all_active_requests_for(&dest).await, + Some(_) => continue, None => return, - } + }, } } - - debug_warn!("Leaving with {} unfinished requests...", futures.len()); } #[allow(clippy::needless_pass_by_ref_mut)]