split large notary requests into batches

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-09 01:09:09 +00:00
parent 1ce3db727f
commit 10be301646
5 changed files with 71 additions and 36 deletions

View file

@ -582,6 +582,12 @@ pub struct Config {
#[serde(default)]
pub only_query_trusted_key_servers: bool,
/// Maximum number of keys to request in each trusted server query.
///
/// default: 1024
#[serde(default = "default_trusted_server_batch_size")]
pub trusted_server_batch_size: usize,
/// max log level for conduwuit. allows debug, info, warn, or error
/// see also: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
/// **Caveat**:
@ -2062,3 +2068,5 @@ fn parallelism_scaled_u32(val: u32) -> u32 {
}
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
fn default_trusted_server_batch_size() -> usize { 256 }

View file

@ -110,6 +110,10 @@ where
{requested_servers} total servers; some events may not be verifiable"
);
}
for (server, key_ids) in missing {
debug_warn!(?server, ?key_ids, "missing");
}
}
#[implement(super::Service)]

View file

@ -89,8 +89,8 @@ pub async fn get_verify_key(&self, origin: &ServerName, key_id: &ServerSigningKe
async fn get_verify_key_from_notaries(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> Result<VerifyKey> {
for notary in self.services.globals.trusted_servers() {
if let Ok(server_keys) = self.notary_request(notary, origin).await {
for server_key in &server_keys {
self.add_signing_keys(server_key.clone()).await;
for server_key in server_keys.clone() {
self.add_signing_keys(server_key).await;
}
for server_key in server_keys {

View file

@ -7,7 +7,7 @@ mod verify;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use conduit::{implement, utils::time::timepoint_from_now, Result, Server};
use conduit::{implement, utils::timepoint_from_now, Result, Server};
use database::{Deserialized, Json, Map};
use ruma::{
api::federation::discovery::{ServerSigningKeys, VerifyKey},

View file

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, fmt::Debug};
use conduit::{implement, Err, Result};
use conduit::{debug, implement, Err, Result};
use ruma::{
api::federation::discovery::{
get_remote_server_keys,
@ -25,34 +25,57 @@ where
minimum_valid_until_ts: Some(self.minimum_valid_ts()),
};
let mut server_keys = RumaBatch::new();
for (server, key_ids) in batch {
let entry = server_keys.entry(server.into()).or_default();
for key_id in key_ids {
entry.insert(key_id.into(), criteria.clone());
}
}
let mut server_keys = batch.fold(RumaBatch::new(), |mut batch, (server, key_ids)| {
batch
.entry(server.into())
.or_default()
.extend(key_ids.map(|key_id| (key_id.into(), criteria.clone())));
batch
});
debug_assert!(!server_keys.is_empty(), "empty batch request to notary");
let mut results = Vec::new();
while let Some(batch) = server_keys
.keys()
.rev()
.take(self.services.server.config.trusted_server_batch_size)
.last()
.cloned()
{
let request = Request {
server_keys,
server_keys: server_keys.split_off(&batch),
};
self.services
debug!(
?notary,
?batch,
remaining = %server_keys.len(),
requesting = ?request.server_keys.keys(),
"notary request"
);
let response = self
.services
.sending
.send_federation_request(notary, request)
.await
.map(|response| response.server_keys)
.map(|keys| {
keys.into_iter()
.send_synapse_request(notary, request)
.await?
.server_keys
.into_iter()
.map(|key| key.deserialize())
.filter_map(Result::ok)
.collect()
})
.filter_map(Result::ok);
results.extend(response);
}
Ok(results)
}
#[implement(super::Service)]
pub async fn notary_request(&self, notary: &ServerName, target: &ServerName) -> Result<Vec<ServerSigningKeys>> {
pub async fn notary_request(
&self, notary: &ServerName, target: &ServerName,
) -> Result<impl Iterator<Item = ServerSigningKeys> + Clone + Debug + Send> {
use get_remote_server_keys::v2::Request;
let request = Request {
@ -60,17 +83,17 @@ pub async fn notary_request(&self, notary: &ServerName, target: &ServerName) ->
minimum_valid_until_ts: self.minimum_valid_ts(),
};
self.services
let response = self
.services
.sending
.send_federation_request(notary, request)
.await
.map(|response| response.server_keys)
.map(|keys| {
keys.into_iter()
.await?
.server_keys
.into_iter()
.map(|key| key.deserialize())
.filter_map(Result::ok)
.collect()
})
.filter_map(Result::ok);
Ok(response)
}
#[implement(super::Service)]