cleanup pending transactions before sender worker completes

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-14 01:11:03 +00:00
parent 05efd9b044
commit d2fb6d04c9

View file

@ -3,12 +3,12 @@ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc, sync::Arc,
time::Instant, time::{Duration, Instant},
}; };
use async_trait::async_trait; use async_trait::async_trait;
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use conduit::{debug, error, utils::math::continue_exponential_backoff_secs, warn}; use conduit::{debug, debug_warn, error, trace, utils::math::continue_exponential_backoff_secs, warn};
use federation::transactions::send_transaction_message; use federation::transactions::send_transaction_message;
use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
@ -24,7 +24,7 @@ use ruma::{
ServerName, UInt, ServerName, UInt,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::Mutex; use tokio::{sync::Mutex, time::sleep_until};
use super::{appservice, data::Data, send, Destination, Msg, SendingEvent, Service}; use super::{appservice, data::Data, send, Destination, Msg, SendingEvent, Service};
use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result};
@ -44,6 +44,7 @@ type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
const DEQUEUE_LIMIT: usize = 48; const DEQUEUE_LIMIT: usize = 48;
const SELECT_EDU_LIMIT: usize = 16; const SELECT_EDU_LIMIT: usize = 16;
const CLEANUP_TIMEOUT_MS: u64 = 3500;
#[async_trait] #[async_trait]
impl crate::Service for Service { impl crate::Service for Service {
@ -65,19 +66,22 @@ impl crate::Service for Service {
let mut futures: SendingFutures<'_> = FuturesUnordered::new(); let mut futures: SendingFutures<'_> = FuturesUnordered::new();
let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
self.initial_transactions(&futures, &mut statuses); self.initial_requests(&futures, &mut statuses);
loop { loop {
debug_assert!(!receiver.is_closed(), "channel error"); debug_assert!(!receiver.is_closed(), "channel error");
tokio::select! { tokio::select! {
request = receiver.recv_async() => match request { request = receiver.recv_async() => match request {
Ok(request) => self.handle_request(request, &futures, &mut statuses), Ok(request) => self.handle_request(request, &futures, &mut statuses),
Err(_) => return Ok(()), Err(_) => break,
}, },
Some(response) = futures.next() => { Some(response) = futures.next() => {
self.handle_response(response, &mut futures, &mut statuses); self.handle_response(response, &futures, &mut statuses);
}, },
} }
} }
self.finish_responses(&mut futures, &mut statuses).await;
Ok(())
} }
fn interrupt(&self) { fn interrupt(&self) {
@ -91,7 +95,7 @@ impl crate::Service for Service {
impl Service { impl Service {
fn handle_response( fn handle_response(
&self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, &self, response: SendingResult, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
) { ) {
match response { match response {
Ok(dest) => self.handle_response_ok(&dest, futures, statuses), Ok(dest) => self.handle_response_ok(&dest, futures, statuses),
@ -100,7 +104,7 @@ impl Service {
} }
fn handle_response_err( fn handle_response_err(
dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error, dest: Destination, _futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error,
) { ) {
debug!(dest = ?dest, "{e:?}"); debug!(dest = ?dest, "{e:?}");
statuses.entry(dest).and_modify(|e| { statuses.entry(dest).and_modify(|e| {
@ -151,7 +155,25 @@ impl Service {
} }
} }
fn initial_transactions(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) { async fn finish_responses(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
let now = Instant::now();
let timeout = Duration::from_millis(CLEANUP_TIMEOUT_MS);
let deadline = now.checked_add(timeout).unwrap_or(now);
loop {
trace!("Waiting for {} requests to complete...", futures.len());
tokio::select! {
() = sleep_until(deadline.into()) => break,
response = futures.next() => match response {
Some(response) => self.handle_response(response, futures, statuses),
None => return,
}
}
}
debug_warn!("Leaving with {} unfinished requests...", futures.len());
}
fn initial_requests(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX); let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new(); let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) { for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) {