From ef9b1c63036868f336894b86f00b29cad0f48673 Mon Sep 17 00:00:00 2001
From: Jason Volk <jason@zemos.net>
Date: Mon, 2 Dec 2024 13:50:09 +0000
Subject: [PATCH] simplify sender shutdown; prevent launching any retries

Signed-off-by: Jason Volk <jason@zemos.net>
---
 src/service/sending/sender.rs | 23 +++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs
index b1e909c9..5c00915b 100644
--- a/src/service/sending/sender.rs
+++ b/src/service/sending/sender.rs
@@ -7,7 +7,7 @@ use std::{
 
 use base64::{engine::general_purpose, Engine as _};
 use conduit::{
-	debug, debug_warn, err, error,
+	debug, err, error,
 	result::LogErr,
 	trace,
 	utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt},
@@ -35,7 +35,6 @@ use ruma::{
 	RoomVersionId, ServerName, UInt,
 };
 use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
-use tokio::time::sleep_until;
 
 use super::{appservice, data::QueueItem, Destination, Msg, SendingEvent, Service};
 
@@ -81,7 +80,7 @@ impl Service {
 				},
 			}
 		}
-		self.finish_responses(&mut futures, &mut statuses).await;
+		self.finish_responses(&mut futures).await;
 
 		Ok(())
 	}
@@ -146,22 +145,26 @@ impl Service {
 		}
 	}
 
-	async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
+	async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>) {
+		use tokio::{
+			select,
+			time::{sleep_until, Instant},
+		};
+
 		let now = Instant::now();
 		let timeout = Duration::from_millis(CLEANUP_TIMEOUT_MS);
 		let deadline = now.checked_add(timeout).unwrap_or(now);
 		loop {
 			trace!("Waiting for {} requests to complete...", futures.len());
-			tokio::select! {
-				() = sleep_until(deadline.into()) => break,
+			select! {
+				() = sleep_until(deadline) => return,
 				response = futures.next() => match response {
-					Some(response) => self.handle_response(response, futures, statuses).await,
+					Some(Ok(dest)) => self.db.delete_all_active_requests_for(&dest).await,
+					Some(_) => continue,
 					None => return,
-				}
+				},
 			}
 		}
-
-		debug_warn!("Leaving with {} unfinished requests...", futures.len());
 	}
 
 	#[allow(clippy::needless_pass_by_ref_mut)]