federation incoming logging/tracing related

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-04-12 23:52:20 -07:00 committed by June
parent ad4e214d28
commit cdb2dff7dd
3 changed files with 47 additions and 27 deletions

View file

@ -49,7 +49,7 @@ use ruma::{
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, trace, warn};
use crate::{ use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper}, api::client_server::{self, claim_keys_helper, get_keys_helper},
@ -247,6 +247,7 @@ pub async fn send_transaction_message_route(
// maybe) all of the auth events that it references. // maybe) all of the auth events that it references.
// let mut auth_cache = EventMap::new(); // let mut auth_cache = EventMap::new();
let txn_start_time = Instant::now();
let mut parsed_pdus = Vec::with_capacity(body.pdus.len()); let mut parsed_pdus = Vec::with_capacity(body.pdus.len());
for pdu in &body.pdus { for pdu in &body.pdus {
parsed_pdus.push(match parse_incoming_pdu(pdu) { parsed_pdus.push(match parse_incoming_pdu(pdu) {
@ -261,6 +262,15 @@ pub async fn send_transaction_message_route(
// and hashes checks // and hashes checks
} }
trace!(
pdus = ?parsed_pdus.len(),
edus = ?body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin,
"Starting txn",
);
// We go through all the signatures we see on the PDUs and fetch the // We go through all the signatures we see on the PDUs and fetch the
// corresponding signing keys // corresponding signing keys
let pub_key_map = RwLock::new(BTreeMap::new()); let pub_key_map = RwLock::new(BTreeMap::new());
@ -273,10 +283,16 @@ pub async fn send_transaction_message_route(
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e); warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e);
}); });
debug!(
elapsed = ?txn_start_time.elapsed(),
"Fetched signing keys"
);
} }
let mut resolved_map = BTreeMap::new(); let mut resolved_map = BTreeMap::new();
for (event_id, value, room_id) in parsed_pdus { for (event_id, value, room_id) in parsed_pdus {
let pdu_start_time = Instant::now();
let mutex = Arc::clone( let mutex = Arc::clone(
services() services()
.globals .globals
@ -287,7 +303,6 @@ pub async fn send_transaction_message_route(
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let mutex_lock = mutex.lock().await;
let start_time = Instant::now();
resolved_map.insert( resolved_map.insert(
event_id.clone(), event_id.clone(),
services() services()
@ -300,8 +315,9 @@ pub async fn send_transaction_message_route(
drop(mutex_lock); drop(mutex_lock);
debug!( debug!(
elapsed = ?start_time.elapsed(), pdu_elapsed = ?pdu_start_time.elapsed(),
"Handled PDU {event_id}" txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
); );
} }
@ -483,6 +499,15 @@ pub async fn send_transaction_message_route(
} }
} }
debug!(
pdus = ?body.pdus.len(),
edus = ?body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin,
"Finished txn",
);
Ok(send_transaction_message::v1::Response { Ok(send_transaction_message::v1::Response {
pdus: resolved_map pdus: resolved_map
.into_iter() .into_iter()

View file

@ -411,7 +411,7 @@ impl Service {
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed")); return Err(Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed"));
} }
debug!("Validation successful."); trace!("Validation successful.");
// 7. Persist the event as an outlier. // 7. Persist the event as an outlier.
services() services()
@ -419,7 +419,7 @@ impl Service {
.outlier .outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val)?; .add_pdu_outlier(&incoming_pdu.event_id, &val)?;
debug!("Added pdu as outlier."); trace!("Added pdu as outlier.");
Ok((Arc::new(incoming_pdu), val)) Ok((Arc::new(incoming_pdu), val))
}) })
@ -526,14 +526,14 @@ impl Service {
.or_default(), .or_default(),
); );
debug!("Locking the room"); trace!("Locking the room");
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
// Now we calculate the set of extremities this room has after the incoming // Now we calculate the set of extremities this room has after the incoming
// event has been applied. We start with the previous extremities (aka leaves) // event has been applied. We start with the previous extremities (aka leaves)
debug!("Calculating extremities"); trace!("Calculating extremities");
let mut extremities = services().rooms.state.get_forward_extremities(room_id)?; let mut extremities = services().rooms.state.get_forward_extremities(room_id)?;
debug!("Calculated {} extremities", extremities.len()); trace!("Calculated {} extremities", extremities.len());
// Remove any forward extremities that are referenced by this incoming event's // Remove any forward extremities that are referenced by this incoming event's
// prev_events // prev_events
@ -623,7 +623,7 @@ impl Service {
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed")); return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed"));
} }
debug!("Appending pdu to timeline"); trace!("Appending pdu to timeline");
extremities.insert(incoming_pdu.event_id.clone()); extremities.insert(incoming_pdu.event_id.clone());
// Now that the event has passed all auth it is added into the timeline. // Now that the event has passed all auth it is added into the timeline.

View file

@ -34,9 +34,7 @@ impl super::Service {
E: IntoIterator<Item = &'a BTreeMap<String, CanonicalJsonValue>>, E: IntoIterator<Item = &'a BTreeMap<String, CanonicalJsonValue>>,
{ {
let mut server_key_ids = HashMap::new(); let mut server_key_ids = HashMap::new();
for event in events { for event in events {
debug!("Fetching keys for event: {event:?}");
for (signature_server, signature) in event for (signature_server, signature) in event
.get("signatures") .get("signatures")
.ok_or(Error::BadServerResponse("No signatures in server response pdu."))? .ok_or(Error::BadServerResponse("No signatures in server response pdu."))?
@ -62,7 +60,7 @@ impl super::Service {
return Ok(()); return Ok(());
} }
debug!( trace!(
"Fetch keys for {}", "Fetch keys for {}",
server_key_ids server_key_ids
.keys() .keys()
@ -76,8 +74,7 @@ impl super::Service {
.map(|(signature_server, signature_ids)| async { .map(|(signature_server, signature_ids)| async {
let fetch_res = self let fetch_res = self
.fetch_signing_keys_for_server( .fetch_signing_keys_for_server(
signature_server.as_str().try_into().map_err(|e| { signature_server.as_str().try_into().map_err(|_| {
info!("Invalid servername in signatures of server response pdu: {e}");
( (
signature_server.clone(), signature_server.clone(),
Error::BadServerResponse("Invalid servername in signatures of server response pdu."), Error::BadServerResponse("Invalid servername in signatures of server response pdu."),
@ -169,10 +166,8 @@ impl super::Service {
let contains_all_ids = let contains_all_ids =
|keys: &BTreeMap<String, Base64>| signature_ids.iter().all(|id| keys.contains_key(id)); |keys: &BTreeMap<String, Base64>| signature_ids.iter().all(|id| keys.contains_key(id));
let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|e| { let origin = <&ServerName>::try_from(signature_server.as_str())
info!("Invalid servername in signatures of server response pdu: {e}"); .map_err(|_| Error::BadServerResponse("Invalid servername in signatures of server response pdu."))?;
Error::BadServerResponse("Invalid servername in signatures of server response pdu.")
})?;
if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) { if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) {
continue; continue;
@ -205,7 +200,7 @@ impl super::Service {
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<()> { ) -> Result<()> {
for server in services().globals.trusted_servers() { for server in services().globals.trusted_servers() {
info!("Asking batch signing keys from trusted server {}", server); debug!("Asking batch signing keys from trusted server {}", server);
match services() match services()
.sending .sending
.send_federation_request( .send_federation_request(
@ -261,7 +256,7 @@ impl super::Service {
&self, servers: BTreeMap<OwnedServerName, BTreeMap<OwnedServerSigningKeyId, QueryCriteria>>, &self, servers: BTreeMap<OwnedServerName, BTreeMap<OwnedServerSigningKeyId, QueryCriteria>>,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<()> { ) -> Result<()> {
info!("Asking individual servers for signing keys: {servers:?}"); debug!("Asking individual servers for signing keys: {servers:?}");
let mut futures: FuturesUnordered<_> = servers let mut futures: FuturesUnordered<_> = servers
.into_keys() .into_keys()
.map(|server| async move { .map(|server| async move {
@ -278,7 +273,7 @@ impl super::Service {
while let Some(result) = futures.next().await { while let Some(result) = futures.next().await {
debug!("Received new Future result"); debug!("Received new Future result");
if let (Ok(get_keys_response), origin) = result { if let (Ok(get_keys_response), origin) = result {
info!("Result is from {origin}"); debug!("Result is from {origin}");
if let Ok(key) = get_keys_response.server_key.deserialize() { if let Ok(key) = get_keys_response.server_key.deserialize() {
let result: BTreeMap<_, _> = services() let result: BTreeMap<_, _> = services()
.globals .globals
@ -335,26 +330,26 @@ impl super::Service {
.await?; .await?;
if servers.is_empty() { if servers.is_empty() {
info!("Trusted server supplied all signing keys, no more keys to fetch"); debug!("Trusted server supplied all signing keys, no more keys to fetch");
return Ok(()); return Ok(());
} }
info!("Remaining servers left that the notary/trusted servers did not provide: {servers:?}"); debug!("Remaining servers left that the notary/trusted servers did not provide: {servers:?}");
self.request_signing_keys(servers.clone(), pub_key_map) self.request_signing_keys(servers.clone(), pub_key_map)
.await?; .await?;
} else { } else {
info!("query_trusted_key_servers_first is set to false, querying individual homeservers first"); debug!("query_trusted_key_servers_first is set to false, querying individual homeservers first");
self.request_signing_keys(servers.clone(), pub_key_map) self.request_signing_keys(servers.clone(), pub_key_map)
.await?; .await?;
if servers.is_empty() { if servers.is_empty() {
info!("Individual homeservers supplied all signing keys, no more keys to fetch"); debug!("Individual homeservers supplied all signing keys, no more keys to fetch");
return Ok(()); return Ok(());
} }
info!("Remaining servers left the individual homeservers did not provide: {servers:?}"); debug!("Remaining servers left the individual homeservers did not provide: {servers:?}");
self.batch_request_signing_keys(servers.clone(), pub_key_map) self.batch_request_signing_keys(servers.clone(), pub_key_map)
.await?; .await?;