add broad timeout on acquire_origins keys operation

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-03 18:23:35 +00:00
parent 4a94a4c945
commit 78aeb620bc

View file

@ -1,15 +1,17 @@
use std::{ use std::{
borrow::Borrow, borrow::Borrow,
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
time::Duration,
}; };
use conduit::{debug, debug_warn, error, implement, result::FlatOk, warn}; use conduit::{debug, debug_error, debug_warn, error, implement, result::FlatOk, trace, warn};
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
api::federation::discovery::ServerSigningKeys, serde::Raw, CanonicalJsonObject, OwnedServerName, api::federation::discovery::ServerSigningKeys, serde::Raw, CanonicalJsonObject, OwnedServerName,
OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId,
}; };
use serde_json::value::RawValue as RawJsonValue; use serde_json::value::RawValue as RawJsonValue;
use tokio::time::{timeout_at, Instant};
use super::key_exists; use super::key_exists;
@ -136,8 +138,12 @@ async fn acquire_origins<I>(&self, batch: I) -> Batch
where where
I: Iterator<Item = (OwnedServerName, Vec<OwnedServerSigningKeyId>)> + Send, I: Iterator<Item = (OwnedServerName, Vec<OwnedServerSigningKeyId>)> + Send,
{ {
let timeout = Instant::now()
.checked_add(Duration::from_secs(45))
.expect("timeout overflows");
let mut requests: FuturesUnordered<_> = batch let mut requests: FuturesUnordered<_> = batch
.map(|(origin, key_ids)| self.acquire_origin(origin, key_ids)) .map(|(origin, key_ids)| self.acquire_origin(origin, key_ids, timeout))
.collect(); .collect();
let mut missing = Batch::new(); let mut missing = Batch::new();
@ -152,11 +158,22 @@ where
#[implement(super::Service)] #[implement(super::Service)]
async fn acquire_origin( async fn acquire_origin(
&self, origin: OwnedServerName, mut key_ids: Vec<OwnedServerSigningKeyId>, &self, origin: OwnedServerName, mut key_ids: Vec<OwnedServerSigningKeyId>, timeout: Instant,
) -> (OwnedServerName, Vec<OwnedServerSigningKeyId>) { ) -> (OwnedServerName, Vec<OwnedServerSigningKeyId>) {
if let Ok(server_keys) = self.server_request(&origin).await { match timeout_at(timeout, self.server_request(&origin)).await {
self.add_signing_keys(server_keys.clone()).await; Err(e) => debug_warn!(?origin, "timed out: {e}"),
key_ids.retain(|key_id| !key_exists(&server_keys, key_id)); Ok(Err(e)) => debug_error!(?origin, "{e}"),
Ok(Ok(server_keys)) => {
trace!(
%origin,
?key_ids,
?server_keys,
"received server_keys"
);
self.add_signing_keys(server_keys.clone()).await;
key_ids.retain(|key_id| !key_exists(&server_keys, key_id));
},
} }
(origin, key_ids) (origin, key_ids)