From 10be3016466076a76ab0e9270dabb80e2acf1afa Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 9 Nov 2024 01:09:09 +0000 Subject: [PATCH] split large notary requests into batches Signed-off-by: Jason Volk --- src/core/config/mod.rs | 8 +++ src/service/server_keys/acquire.rs | 4 ++ src/service/server_keys/get.rs | 4 +- src/service/server_keys/mod.rs | 2 +- src/service/server_keys/request.rs | 89 +++++++++++++++++++----------- 5 files changed, 71 insertions(+), 36 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 43cca4b8..cd9c1b38 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -582,6 +582,12 @@ pub struct Config { #[serde(default)] pub only_query_trusted_key_servers: bool, + /// Maximum number of keys to request in each trusted server query. + /// + /// default: 1024 + #[serde(default = "default_trusted_server_batch_size")] + pub trusted_server_batch_size: usize, + /// max log level for conduwuit. allows debug, info, warn, or error /// see also: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives /// **Caveat**: @@ -2062,3 +2068,5 @@ fn parallelism_scaled_u32(val: u32) -> u32 { } fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } + +fn default_trusted_server_batch_size() -> usize { 256 } diff --git a/src/service/server_keys/acquire.rs b/src/service/server_keys/acquire.rs index cdaf28b4..190b4239 100644 --- a/src/service/server_keys/acquire.rs +++ b/src/service/server_keys/acquire.rs @@ -110,6 +110,10 @@ where {requested_servers} total servers; some events may not be verifiable" ); } + + for (server, key_ids) in missing { + debug_warn!(?server, ?key_ids, "missing"); + } } #[implement(super::Service)] diff --git a/src/service/server_keys/get.rs b/src/service/server_keys/get.rs index 441e33d4..dc4627f7 100644 --- a/src/service/server_keys/get.rs +++ b/src/service/server_keys/get.rs @@ -89,8 +89,8 @@ pub async fn get_verify_key(&self, origin: &ServerName, key_id: &ServerSigningKe async fn get_verify_key_from_notaries(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> Result { for notary in self.services.globals.trusted_servers() { if let Ok(server_keys) = self.notary_request(notary, origin).await { - for server_key in &server_keys { - self.add_signing_keys(server_key.clone()).await; + for server_key in server_keys.clone() { + self.add_signing_keys(server_key).await; } for server_key in server_keys { diff --git a/src/service/server_keys/mod.rs b/src/service/server_keys/mod.rs index dae45a51..333970df 100644 --- a/src/service/server_keys/mod.rs +++ b/src/service/server_keys/mod.rs @@ -7,7 +7,7 @@ mod verify; use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use conduit::{implement, utils::time::timepoint_from_now, Result, Server}; +use conduit::{implement, utils::timepoint_from_now, Result, Server}; use database::{Deserialized, Json, Map}; use ruma::{ api::federation::discovery::{ServerSigningKeys, VerifyKey}, diff --git a/src/service/server_keys/request.rs b/src/service/server_keys/request.rs index 84dd2871..7078f7cd 100644 --- a/src/service/server_keys/request.rs +++ b/src/service/server_keys/request.rs @@ -1,6 +1,6 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, fmt::Debug}; -use conduit::{implement, Err, Result}; +use conduit::{debug, implement, Err, Result}; use ruma::{ api::federation::discovery::{ get_remote_server_keys, @@ -25,34 +25,57 @@ where minimum_valid_until_ts: Some(self.minimum_valid_ts()), }; - let mut server_keys = RumaBatch::new(); - for (server, key_ids) in batch { - let entry = server_keys.entry(server.into()).or_default(); - for key_id in key_ids { - entry.insert(key_id.into(), criteria.clone()); - } - } + let mut server_keys = batch.fold(RumaBatch::new(), |mut batch, (server, key_ids)| { + batch + .entry(server.into()) + .or_default() + .extend(key_ids.map(|key_id| (key_id.into(), criteria.clone()))); + + batch + }); debug_assert!(!server_keys.is_empty(), "empty batch request to notary"); - let request = Request { - server_keys, - }; - self.services - .sending - .send_federation_request(notary, request) - .await - .map(|response| response.server_keys) - .map(|keys| { - keys.into_iter() - .map(|key| key.deserialize()) - .filter_map(Result::ok) - .collect() - }) + let mut results = Vec::new(); + while let Some(batch) = server_keys + .keys() + .rev() + .take(self.services.server.config.trusted_server_batch_size) + .last() + .cloned() + { + let request = Request { + server_keys: server_keys.split_off(&batch), + }; + + debug!( + ?notary, + ?batch, + remaining = %server_keys.len(), + requesting = ?request.server_keys.keys(), + "notary request" + ); + + let response = self + .services + .sending + .send_synapse_request(notary, request) + .await? + .server_keys + .into_iter() + .map(|key| key.deserialize()) + .filter_map(Result::ok); + + results.extend(response); + } + + Ok(results) } #[implement(super::Service)] -pub async fn notary_request(&self, notary: &ServerName, target: &ServerName) -> Result> { +pub async fn notary_request( + &self, notary: &ServerName, target: &ServerName, +) -> Result + Clone + Debug + Send> { use get_remote_server_keys::v2::Request; let request = Request { @@ -60,17 +83,17 @@ pub async fn notary_request(&self, notary: &ServerName, target: &ServerName) -> minimum_valid_until_ts: self.minimum_valid_ts(), }; - self.services + let response = self + .services .sending .send_federation_request(notary, request) - .await - .map(|response| response.server_keys) - .map(|keys| { - keys.into_iter() - .map(|key| key.deserialize()) - .filter_map(Result::ok) - .collect() - }) + .await? + .server_keys + .into_iter() + .map(|key| key.deserialize()) + .filter_map(Result::ok); + + Ok(response) } #[implement(super::Service)]