From ffd0fd42424a234d4fbd564b66b79521595b5b5b Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 26 Jan 2025 21:46:46 +0000 Subject: [PATCH] pipeline pdu fetch for federation sending destination Signed-off-by: Jason Volk --- src/service/sending/sender.rs | 135 ++++++++++++++++------------------ 1 file changed, 64 insertions(+), 71 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index c91e1d31..47be01f1 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -8,12 +8,12 @@ use std::{ time::{Duration, Instant}, }; -use base64::{engine::general_purpose, Engine as _}; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; use conduwuit::{ debug, err, error, result::LogErr, trace, - utils::{calculate_hash, continue_exponential_backoff_secs, ReadyExt}, + utils::{calculate_hash, continue_exponential_backoff_secs, stream::IterStream, ReadyExt}, warn, Error, Result, }; use futures::{ @@ -38,7 +38,9 @@ use ruma::{ push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType, }, - push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, + push, + serde::Raw, + uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; @@ -633,7 +635,7 @@ impl Service { } fn send_events(&self, dest: Destination, events: Vec) -> SendingFuture<'_> { - //debug_assert!(!events.is_empty(), "sending empty transaction"); + debug_assert!(!events.is_empty(), "sending empty transaction"); match dest { | Destination::Federation(server) => self.send_events_dest_federation(server, events).boxed(), @@ -698,7 +700,7 @@ impl Service { | SendingEvent::Flush => None, })); - let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash); + let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash); //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty // transaction"); @@ -796,81 +798,72 @@ impl Service { Ok(Destination::Push(user_id, pushkey)) } - #[tracing::instrument( - name = "fed", - level = "debug", - skip(self, events), - fields( - events = %events.len(), - ), - )] async fn send_events_dest_federation( &self, server: OwnedServerName, events: Vec, ) -> SendingResult { - let mut pdu_jsons = Vec::with_capacity( - events - .iter() - .filter(|event| matches!(event, SendingEvent::Pdu(_))) - .count(), - ); - let mut edu_jsons = Vec::with_capacity( - events - .iter() - .filter(|event| matches!(event, SendingEvent::Edu(_))) - .count(), - ); + let pdus: Vec<_> = events + .iter() + .filter_map(|pdu| match pdu { + | SendingEvent::Pdu(pdu) => Some(pdu), + | _ => None, + }) + .stream() + .then(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id)) + .ready_filter_map(Result::ok) + .then(|pdu| self.convert_to_outgoing_federation_event(pdu)) + .collect() + .await; - for event in &events { - match event { - // TODO: check room version and remove event_id if needed - | SendingEvent::Pdu(pdu_id) => { - if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await { - pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await); - } - }, - | SendingEvent::Edu(edu) => - if let Ok(raw) = serde_json::from_slice(edu) { - edu_jsons.push(raw); - }, - | SendingEvent::Flush => {}, // flush only; no new content + let edus: Vec> = events + .iter() + .filter_map(|edu| match edu { + | SendingEvent::Edu(edu) => Some(edu.as_ref()), + | _ => None, + }) + .map(serde_json::from_slice) + .filter_map(Result::ok) + .collect(); + + if pdus.is_empty() && edus.is_empty() { + return Ok(Destination::Federation(server)); + } + + let preimage = pdus + .iter() + .map(|raw| raw.get().as_bytes()) + .chain(edus.iter().map(|raw| raw.json().get().as_bytes())); + + let txn_hash = calculate_hash(preimage); + let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash); + let request = send_transaction_message::v1::Request { + transaction_id: txn_id.into(), + origin: self.server.name.clone(), + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus, + edus, + }; + + let result = self + .services + .federation + .execute_on(&self.services.client.sender, &server, request) + .await; + + for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { + if let Err(e) = result { + warn!( + %txn_id, %server, + "error sending PDU {event_id} to remote server: {e:?}" + ); } } - //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty - // transaction"); - - let txn_hash = calculate_hash(events.iter().filter_map(|e| match e { - | SendingEvent::Edu(b) => Some(&**b), - | SendingEvent::Pdu(b) => Some(b.as_ref()), - | SendingEvent::Flush => None, - })); - - let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash); - - let request = send_transaction_message::v1::Request { - origin: self.server.name.clone(), - pdus: pdu_jsons, - edus: edu_jsons, - origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - transaction_id: txn_id.into(), - }; - - let client = &self.services.client.sender; - self.services.federation.execute_on(client, &server, request) - .await - .inspect(|response| { - response - .pdus - .iter() - .filter(|(_, res)| res.is_err()) - .for_each( - |(pdu_id, res)| warn!(%txn_id, %server, "error sending PDU {pdu_id} to remote server: {res:?}"), - ); - }) - .map_err(|e| (Destination::Federation(server.clone()), e)) - .map(|_| Destination::Federation(server)) + match result { + | Err(error) => Err((Destination::Federation(server), error)), + | Ok(_) => Ok(Destination::Federation(server)), + } } /// This does not return a full `Pdu` it is only to satisfy ruma's types.