make federation retry timer-based

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-04-17 22:14:30 -04:00
parent 395b466b4a
commit d5a9c98657

View file

@ -1,6 +1,6 @@
use std::{ use std::{
cmp, cmp,
collections::{BTreeMap, HashMap, HashSet}, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
@ -25,7 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
}; };
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{oneshot, Mutex, Semaphore};
use tracing::{error, warn}; use tracing::{error, warn};
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -65,9 +65,26 @@ pub enum SendingEventType {
} }
enum TransactionStatus { enum TransactionStatus {
/// Currently running (for the first time)
Running, Running,
Failed(u32, Instant), // number of times failed, time of last failure /// Failed, backing off for a retry
Retrying(u32), // number of times failed Failed {
failures: u32,
waker: Option<oneshot::Sender<()>>,
},
/// Currently retrying
Retrying {
/// number of times failed
failures: u32,
},
}
/// A control-flow enum to dictate what the handler should do after (trying to)
/// prepare a transaction
enum TransactionPrepOutcome {
Send(Vec<SendingEventType>),
Wake(OutgoingDestination),
Nothing,
} }
impl Service { impl Service {
@ -274,9 +291,12 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let receiver = self.receiver.lock().await; let new_transactions = self.receiver.lock().await;
let (waking_sender, waking_receiver) = loole::unbounded();
let mut outgoing = FuturesUnordered::new();
let mut retrying = FuturesUnordered::new();
let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingDestination, TransactionStatus>::new(); let mut current_transaction_status = HashMap::<OutgoingDestination, TransactionStatus>::new();
// Retry requests we could not finish yet // Retry requests we could not finish yet
@ -300,13 +320,14 @@ impl Service {
for (outgoing_kind, events) in initial_transactions { for (outgoing_kind, events) in initial_transactions {
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(handle_events(outgoing_kind.clone(), events)); outgoing.push(handle_events(outgoing_kind.clone(), events));
} }
} }
loop { loop {
tokio::select! { tokio::select! {
Some(response) = futures.next() => { Some(response) = outgoing.next() => {
// Outgoing transaction succeeded
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
let _cork = services().globals.db.cork(); let _cork = services().globals.db.cork();
@ -322,35 +343,84 @@ impl Service {
if !new_events.is_empty() { if !new_events.is_empty() {
// Insert pdus we found // Insert pdus we found
self.db.mark_as_active(&new_events)?; self.db.mark_as_active(&new_events)?;
futures.push(handle_events(
outgoing_kind.clone(), // Clear retries
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
outgoing.push(handle_events(
outgoing_kind,
new_events.into_iter().map(|(event, _)| event).collect(), new_events.into_iter().map(|(event, _)| event).collect(),
)); ));
} else { } else {
current_transaction_status.remove(&outgoing_kind); current_transaction_status.remove(&outgoing_kind);
} }
} }
Err((outgoing_kind, _)) => { // Outgoing transaction failed
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { Err((destination, err)) => {
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), // Set status to Failed, create timer
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone());
TransactionStatus::Failed(_, _) => {
error!("Request that was not even running failed?!"); // Add timer to loop
return retrying.push(timer);
},
}); warn!("Outgoing request to {destination} failed: {err}");
} }
}; };
}, },
event = receiver.recv_async() => { // Transaction retry timers firing
if let Ok((outgoing_kind, event, key)) = event { Some(dest) = retrying.next() => {
if let Ok(Some(events)) = self.select_events( // Transition Failed => Retrying, return pending old transaction events
&outgoing_kind, match self.select_events(
&dest,
vec![], // will be ignored because fresh == false
&mut current_transaction_status,
false,
) {
Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
}
Ok(_) => {
// Unreachable because fresh == false
unreachable!("select_events on a stale transaction {} did not return ::Send", dest)
}
Err(err) => {
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err);
// transaction dropped, so drop destination as well.
current_transaction_status.remove(&dest);
}
}
},
// Explicit wakeups, makes a backoff timer return immediately
Ok(outgoing) = waking_receiver.recv_async() => {
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) {
if let Some(waker) = waker.take() {
_ = waker.send(());
}
}
},
// New transactions to be sent out (from server/user activity)
event = new_transactions.recv_async() => {
if let Ok((dest, event, key)) = event {
match self.select_events(
&dest,
vec![(event, key)], vec![(event, key)],
&mut current_transaction_status, &mut current_transaction_status,
) { true) {
futures.push(handle_events(outgoing_kind, events)); Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
},
Ok(TransactionPrepOutcome::Wake(dest)) => {
waking_sender.send(dest).expect("nothing closes this channel but ourselves");
},
Ok(TransactionPrepOutcome::Nothing) => {},
Err(err) => {
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err);
}
} }
} }
} }
@ -358,18 +428,70 @@ impl Service {
} }
} }
/// Generates timer/oneshot, alters status to reflect Failed
///
/// Returns timer/oneshot future to wake up loop for next retry
fn mark_failed_and_backoff(
status: &mut HashMap<OutgoingDestination, TransactionStatus>, dest: OutgoingDestination,
) -> impl std::future::Future<Output = OutgoingDestination> {
let now = Instant::now();
let entry = status
.get_mut(&dest)
.expect("guaranteed to be set before this function");
let failures = match entry {
// Running -> Failed
TransactionStatus::Running => 1,
// Retrying -> Failed
TransactionStatus::Retrying {
failures,
} => *failures + 1,
// The transition of Failed -> Retrying is handled by handle_events
TransactionStatus::Failed {
..
} => {
unreachable!(
"TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \
bailing..."
)
},
};
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
// Exponential backoff, clamp upper value to one day
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY);
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup);
*entry = TransactionStatus::Failed {
failures,
waker: Some(waker),
};
fut
}
/// This prepares a transaction, checks the transaction state, and selects
/// appropriate events.
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events( fn select_events(
&self, &self,
outgoing_kind: &OutgoingDestination, outgoing_kind: &OutgoingDestination,
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>,
) -> Result<Option<Vec<SendingEventType>>> { fresh: bool, // Wether or not this transaction came from server activity.
let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?; ) -> Result<TransactionPrepOutcome> {
let (allow, retry, wake_up) =
self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?;
// Nothing can be done for this remote, bail out. // Nothing can be done for this remote, bail out.
if !allow { if wake_up {
return Ok(None); return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone()));
} else if !allow {
return Ok(TransactionPrepOutcome::Nothing);
} }
let _cork = services().globals.db.cork(); let _cork = services().globals.db.cork();
@ -377,12 +499,14 @@ impl Service {
// Must retry any previous transaction for this remote. // Must retry any previous transaction for this remote.
if retry { if retry {
self.db // We retry the previous transaction
for (_, e) in self
.db
.active_requests_for(outgoing_kind) .active_requests_for(outgoing_kind)
.filter_map(Result::ok) .filter_map(Result::ok)
.for_each(|(_, e)| events.push(e)); {
events.push(e);
return Ok(Some(events)); }
} }
// Compose the next transaction // Compose the next transaction
@ -402,37 +526,72 @@ impl Service {
} }
} }
Ok(Some(events)) Ok(TransactionPrepOutcome::Send(events))
} }
#[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))]
fn select_events_current( fn select_events_current(
&self, outgoing_kind: OutgoingDestination, &self, outgoing_kind: OutgoingDestination,
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, fresh: bool,
) -> Result<(bool, bool)> { ) -> Result<(bool, bool, bool)> {
let (mut allow, mut retry) = (true, false); let (mut allow, mut retry, mut wake_up) = (true, false, false);
current_transaction_status
.entry(outgoing_kind) let entry = current_transaction_status.entry(outgoing_kind);
if fresh {
// If its fresh, we initialise the status if we need to.
//
// We do nothing if it is already running or retrying.
//
// We return with a wake if it is in the Failed state.
entry
.and_modify(|e| match e { .and_modify(|e| match e {
TransactionStatus::Failed(tries, time) => { TransactionStatus::Running
// Fail if a request has failed recently (exponential backoff) | TransactionStatus::Retrying {
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); ..
let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); } => {
min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION);
if time.elapsed() < min_elapsed_duration {
allow = false;
} else {
retry = true;
*e = TransactionStatus::Retrying(*tries);
}
},
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
allow = false; // already running allow = false; // already running
}, },
TransactionStatus::Failed {
..
} => {
// currently sleeping
wake_up = true;
},
}) })
.or_insert(TransactionStatus::Running); .or_insert(TransactionStatus::Running);
} else {
// If it's not fresh, we expect an entry.
//
// We also expect us to be the only one who are touching this destination right
// now, and its a stale transaction, so it must be in the Failed state
match entry {
Entry::Occupied(mut e) => {
let e = e.get_mut();
match e {
TransactionStatus::Failed {
failures,
..
} => {
*e = TransactionStatus::Retrying {
failures: *failures,
};
retry = true;
},
Ok((allow, retry)) _ => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got \
Running or Retrying"
),
}
},
Entry::Vacant(_) => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"
),
}
}
Ok((allow, retry, wake_up))
} }
#[tracing::instrument(skip(self, server_name))] #[tracing::instrument(skip(self, server_name))]
@ -721,7 +880,7 @@ async fn handle_events_kind_push(
let Some(pusher) = services() let Some(pusher) = services()
.pusher .pusher
.get_pusher(userid, pushkey) .get_pusher(userid, pushkey)
.map_err(|e| (kind.clone(), e))? .map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))?
else { else {
continue; continue;
}; };
@ -858,4 +1017,40 @@ impl OutgoingDestination {
prefix prefix
} }
/// This wraps the OutgoingDestination key in an interruptible sleep future.
///
/// The first return value is the future, the second is the oneshot that
/// interrupts that future, and causes it to return instantly.
fn wrap_in_interruptible_sleep(
self, at: Instant,
) -> (impl std::future::Future<Output = Self>, oneshot::Sender<()>) {
let (tx, rx) = oneshot::channel();
let at = tokio::time::Instant::from_std(at);
(
async move {
_ = tokio::time::timeout_at(at, rx).await;
self
},
tx,
)
}
}
impl std::fmt::Display for OutgoingDestination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingDestination::Appservice(appservice_id) => {
write!(f, "Appservice (ID {:?})", appservice_id)
},
OutgoingDestination::Push(user, push_key) => {
write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key)
},
OutgoingDestination::Normal(server) => {
write!(f, "Matrix Server ({:?})", server)
},
}
}
} }