improve tracing attributes in sending stack.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-03-30 23:52:08 -07:00 committed by June
parent a87e7d8e17
commit 22b123de7b
2 changed files with 14 additions and 12 deletions

View file

@ -224,17 +224,17 @@ impl Service {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, destination, request))] #[tracing::instrument(skip(self, request), name = "request")]
pub async fn send_federation_request<T>(&self, destination: &ServerName, request: T) -> Result<T::IncomingResponse> pub async fn send_federation_request<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
where where
T: OutgoingRequest + Debug, T: OutgoingRequest + Debug,
{ {
let permit = self.maximum_requests.acquire().await; let permit = self.maximum_requests.acquire().await;
let timeout = Duration::from_secs(self.timeout); let timeout = Duration::from_secs(self.timeout);
let response = tokio::time::timeout(timeout, send::send_request(destination, request)) let response = tokio::time::timeout(timeout, send::send_request(dest, request))
.await .await
.map_err(|_| { .map_err(|_| {
warn!("Timeout after 300 seconds waiting for server response of {destination}"); warn!("Timeout after 300 seconds waiting for server response of {dest}");
Error::BadServerResponse("Timeout after 300 seconds waiting for server response") Error::BadServerResponse("Timeout after 300 seconds waiting for server response")
})?; })?;
drop(permit); drop(permit);
@ -269,6 +269,7 @@ impl Service {
}); });
} }
#[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let mut receiver = self.receiver.lock().await;
@ -583,7 +584,6 @@ pub fn select_edus_receipts(
Ok(true) Ok(true)
} }
#[tracing::instrument(skip(events, kind))]
async fn handle_events( async fn handle_events(
kind: OutgoingKind, events: Vec<SendingEventType>, kind: OutgoingKind, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingKind, (OutgoingKind, Error)> {
@ -743,9 +743,9 @@ async fn handle_events_kind_push(
Ok(kind.clone()) Ok(kind.clone())
} }
#[tracing::instrument(skip(kind, events))] #[tracing::instrument(skip(kind, events), name = "")]
async fn handle_events_kind_normal( async fn handle_events_kind_normal(
kind: &OutgoingKind, server: &OwnedServerName, events: Vec<SendingEventType>, kind: &OutgoingKind, dest: &OwnedServerName, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingKind, (OutgoingKind, Error)> {
let mut edu_jsons = Vec::new(); let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
@ -761,7 +761,7 @@ async fn handle_events_kind_normal(
.get_pdu_json_from_id(pdu_id) .get_pdu_json_from_id(pdu_id)
.map_err(|e| (kind.clone(), e))? .map_err(|e| (kind.clone(), e))?
.ok_or_else(|| { .ok_or_else(|| {
error!("event not found: {server} {pdu_id:?}"); error!("event not found: {dest} {pdu_id:?}");
( (
kind.clone(), kind.clone(),
Error::bad_database("[Normal] Event in servernamevent_datas not found in db."), Error::bad_database("[Normal] Event in servernamevent_datas not found in db."),
@ -784,7 +784,7 @@ async fn handle_events_kind_normal(
let permit = services().sending.maximum_requests.acquire().await; let permit = services().sending.maximum_requests.acquire().await;
let response = send::send_request( let response = send::send_request(
server, dest,
send_transaction_message::v1::Request { send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(), origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons, pdus: pdu_jsons,
@ -806,7 +806,7 @@ async fn handle_events_kind_normal(
.map(|response| { .map(|response| {
for pdu in response.pdus { for pdu in response.pdus {
if pdu.1.is_err() { if pdu.1.is_err() {
warn!("Failed to send to {}: {:?}", server, pdu); warn!("Failed to send to {}: {:?}", dest, pdu);
} }
} }
kind.clone() kind.clone()

View file

@ -43,6 +43,7 @@ pub enum FedDest {
Named(String, String), Named(String, String),
} }
#[tracing::instrument(skip_all, name = "send")]
pub(crate) async fn send_request<T>(destination: &ServerName, request: T) -> Result<T::IncomingResponse> pub(crate) async fn send_request<T>(destination: &ServerName, request: T) -> Result<T::IncomingResponse>
where where
T: OutgoingRequest + Debug, T: OutgoingRequest + Debug,
@ -100,7 +101,7 @@ where
} else { } else {
write_destination_to_cache = true; write_destination_to_cache = true;
let result = find_actual_destination(destination).await; let result = resolve_actual_destination(destination).await;
(result.0, result.1.into_uri_string()) (result.0, result.1.into_uri_string())
}; };
@ -338,7 +339,8 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest {
/// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names> /// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names>
/// Numbers in comments below refer to bullet points in linked section of /// Numbers in comments below refer to bullet points in linked section of
/// specification /// specification
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) { #[tracing::instrument(skip_all, name = "resolve")]
async fn resolve_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
debug!("Finding actual destination for {destination}"); debug!("Finding actual destination for {destination}");
let destination_str = destination.as_str().to_owned(); let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone(); let mut hostname = destination_str.clone();