pipeline pdu fetch for federation sending destination

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-26 21:46:46 +00:00
parent b2a565b0b4
commit ffd0fd4242

View file

@ -8,12 +8,12 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use conduwuit::{ use conduwuit::{
debug, err, error, debug, err, error,
result::LogErr, result::LogErr,
trace, trace,
utils::{calculate_hash, continue_exponential_backoff_secs, ReadyExt}, utils::{calculate_hash, continue_exponential_backoff_secs, stream::IterStream, ReadyExt},
warn, Error, Result, warn, Error, Result,
}; };
use futures::{ use futures::{
@ -38,7 +38,9 @@ use ruma::{
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
GlobalAccountDataEventType, GlobalAccountDataEventType,
}, },
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, push,
serde::Raw,
uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
OwnedUserId, RoomId, RoomVersionId, ServerName, UInt, OwnedUserId, RoomId, RoomVersionId, ServerName, UInt,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
@ -633,7 +635,7 @@ impl Service {
} }
fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> { fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
//debug_assert!(!events.is_empty(), "sending empty transaction"); debug_assert!(!events.is_empty(), "sending empty transaction");
match dest { match dest {
| Destination::Federation(server) => | Destination::Federation(server) =>
self.send_events_dest_federation(server, events).boxed(), self.send_events_dest_federation(server, events).boxed(),
@ -698,7 +700,7 @@ impl Service {
| SendingEvent::Flush => None, | SendingEvent::Flush => None,
})); }));
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash); let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction"); // transaction");
@ -796,81 +798,72 @@ impl Service {
Ok(Destination::Push(user_id, pushkey)) Ok(Destination::Push(user_id, pushkey))
} }
#[tracing::instrument(
name = "fed",
level = "debug",
skip(self, events),
fields(
events = %events.len(),
),
)]
async fn send_events_dest_federation( async fn send_events_dest_federation(
&self, &self,
server: OwnedServerName, server: OwnedServerName,
events: Vec<SendingEvent>, events: Vec<SendingEvent>,
) -> SendingResult { ) -> SendingResult {
let mut pdu_jsons = Vec::with_capacity( let pdus: Vec<_> = events
events .iter()
.iter() .filter_map(|pdu| match pdu {
.filter(|event| matches!(event, SendingEvent::Pdu(_))) | SendingEvent::Pdu(pdu) => Some(pdu),
.count(), | _ => None,
); })
let mut edu_jsons = Vec::with_capacity( .stream()
events .then(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id))
.iter() .ready_filter_map(Result::ok)
.filter(|event| matches!(event, SendingEvent::Edu(_))) .then(|pdu| self.convert_to_outgoing_federation_event(pdu))
.count(), .collect()
); .await;
for event in &events { let edus: Vec<Raw<Edu>> = events
match event { .iter()
// TODO: check room version and remove event_id if needed .filter_map(|edu| match edu {
| SendingEvent::Pdu(pdu_id) => { | SendingEvent::Edu(edu) => Some(edu.as_ref()),
if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await { | _ => None,
pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await); })
} .map(serde_json::from_slice)
}, .filter_map(Result::ok)
| SendingEvent::Edu(edu) => .collect();
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw); if pdus.is_empty() && edus.is_empty() {
}, return Ok(Destination::Federation(server));
| SendingEvent::Flush => {}, // flush only; no new content }
let preimage = pdus
.iter()
.map(|raw| raw.get().as_bytes())
.chain(edus.iter().map(|raw| raw.json().get().as_bytes()));
let txn_hash = calculate_hash(preimage);
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
let request = send_transaction_message::v1::Request {
transaction_id: txn_id.into(),
origin: self.server.name.clone(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdus,
edus,
};
let result = self
.services
.federation
.execute_on(&self.services.client.sender, &server, request)
.await;
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
if let Err(e) = result {
warn!(
%txn_id, %server,
"error sending PDU {event_id} to remote server: {e:?}"
);
} }
} }
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty match result {
// transaction"); | Err(error) => Err((Destination::Federation(server), error)),
| Ok(_) => Ok(Destination::Federation(server)),
let txn_hash = calculate_hash(events.iter().filter_map(|e| match e { }
| SendingEvent::Edu(b) => Some(&**b),
| SendingEvent::Pdu(b) => Some(b.as_ref()),
| SendingEvent::Flush => None,
}));
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
let request = send_transaction_message::v1::Request {
origin: self.server.name.clone(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: txn_id.into(),
};
let client = &self.services.client.sender;
self.services.federation.execute_on(client, &server, request)
.await
.inspect(|response| {
response
.pdus
.iter()
.filter(|(_, res)| res.is_err())
.for_each(
|(pdu_id, res)| warn!(%txn_id, %server, "error sending PDU {pdu_id} to remote server: {res:?}"),
);
})
.map_err(|e| (Destination::Federation(server.clone()), e))
.map(|_| Destination::Federation(server))
} }
/// This does not return a full `Pdu` it is only to satisfy ruma's types. /// This does not return a full `Pdu` it is only to satisfy ruma's types.