pipeline pdu fetch for federation sending destination
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
b2a565b0b4
commit
ffd0fd4242
1 changed files with 64 additions and 71 deletions
|
@ -8,12 +8,12 @@ use std::{
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use base64::{engine::general_purpose, Engine as _};
|
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
debug, err, error,
|
debug, err, error,
|
||||||
result::LogErr,
|
result::LogErr,
|
||||||
trace,
|
trace,
|
||||||
utils::{calculate_hash, continue_exponential_backoff_secs, ReadyExt},
|
utils::{calculate_hash, continue_exponential_backoff_secs, stream::IterStream, ReadyExt},
|
||||||
warn, Error, Result,
|
warn, Error, Result,
|
||||||
};
|
};
|
||||||
use futures::{
|
use futures::{
|
||||||
|
@ -38,7 +38,9 @@ use ruma::{
|
||||||
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
|
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
|
||||||
GlobalAccountDataEventType,
|
GlobalAccountDataEventType,
|
||||||
},
|
},
|
||||||
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
|
push,
|
||||||
|
serde::Raw,
|
||||||
|
uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
|
||||||
OwnedUserId, RoomId, RoomVersionId, ServerName, UInt,
|
OwnedUserId, RoomId, RoomVersionId, ServerName, UInt,
|
||||||
};
|
};
|
||||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||||
|
@ -633,7 +635,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
|
fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
|
||||||
//debug_assert!(!events.is_empty(), "sending empty transaction");
|
debug_assert!(!events.is_empty(), "sending empty transaction");
|
||||||
match dest {
|
match dest {
|
||||||
| Destination::Federation(server) =>
|
| Destination::Federation(server) =>
|
||||||
self.send_events_dest_federation(server, events).boxed(),
|
self.send_events_dest_federation(server, events).boxed(),
|
||||||
|
@ -698,7 +700,7 @@ impl Service {
|
||||||
| SendingEvent::Flush => None,
|
| SendingEvent::Flush => None,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
|
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
|
||||||
|
|
||||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
||||||
// transaction");
|
// transaction");
|
||||||
|
@ -796,81 +798,72 @@ impl Service {
|
||||||
Ok(Destination::Push(user_id, pushkey))
|
Ok(Destination::Push(user_id, pushkey))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
|
||||||
name = "fed",
|
|
||||||
level = "debug",
|
|
||||||
skip(self, events),
|
|
||||||
fields(
|
|
||||||
events = %events.len(),
|
|
||||||
),
|
|
||||||
)]
|
|
||||||
async fn send_events_dest_federation(
|
async fn send_events_dest_federation(
|
||||||
&self,
|
&self,
|
||||||
server: OwnedServerName,
|
server: OwnedServerName,
|
||||||
events: Vec<SendingEvent>,
|
events: Vec<SendingEvent>,
|
||||||
) -> SendingResult {
|
) -> SendingResult {
|
||||||
let mut pdu_jsons = Vec::with_capacity(
|
let pdus: Vec<_> = events
|
||||||
events
|
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
|
.filter_map(|pdu| match pdu {
|
||||||
.count(),
|
| SendingEvent::Pdu(pdu) => Some(pdu),
|
||||||
);
|
| _ => None,
|
||||||
let mut edu_jsons = Vec::with_capacity(
|
})
|
||||||
events
|
.stream()
|
||||||
|
.then(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id))
|
||||||
|
.ready_filter_map(Result::ok)
|
||||||
|
.then(|pdu| self.convert_to_outgoing_federation_event(pdu))
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let edus: Vec<Raw<Edu>> = events
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|event| matches!(event, SendingEvent::Edu(_)))
|
.filter_map(|edu| match edu {
|
||||||
.count(),
|
| SendingEvent::Edu(edu) => Some(edu.as_ref()),
|
||||||
);
|
| _ => None,
|
||||||
|
})
|
||||||
|
.map(serde_json::from_slice)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.collect();
|
||||||
|
|
||||||
for event in &events {
|
if pdus.is_empty() && edus.is_empty() {
|
||||||
match event {
|
return Ok(Destination::Federation(server));
|
||||||
// TODO: check room version and remove event_id if needed
|
|
||||||
| SendingEvent::Pdu(pdu_id) => {
|
|
||||||
if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await {
|
|
||||||
pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
| SendingEvent::Edu(edu) =>
|
|
||||||
if let Ok(raw) = serde_json::from_slice(edu) {
|
|
||||||
edu_jsons.push(raw);
|
|
||||||
},
|
|
||||||
| SendingEvent::Flush => {}, // flush only; no new content
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
let preimage = pdus
|
||||||
// transaction");
|
.iter()
|
||||||
|
.map(|raw| raw.get().as_bytes())
|
||||||
let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
|
.chain(edus.iter().map(|raw| raw.json().get().as_bytes()));
|
||||||
| SendingEvent::Edu(b) => Some(&**b),
|
|
||||||
| SendingEvent::Pdu(b) => Some(b.as_ref()),
|
|
||||||
| SendingEvent::Flush => None,
|
|
||||||
}));
|
|
||||||
|
|
||||||
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
|
|
||||||
|
|
||||||
|
let txn_hash = calculate_hash(preimage);
|
||||||
|
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
|
||||||
let request = send_transaction_message::v1::Request {
|
let request = send_transaction_message::v1::Request {
|
||||||
origin: self.server.name.clone(),
|
|
||||||
pdus: pdu_jsons,
|
|
||||||
edus: edu_jsons,
|
|
||||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
|
||||||
transaction_id: txn_id.into(),
|
transaction_id: txn_id.into(),
|
||||||
|
origin: self.server.name.clone(),
|
||||||
|
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||||
|
pdus,
|
||||||
|
edus,
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = &self.services.client.sender;
|
let result = self
|
||||||
self.services.federation.execute_on(client, &server, request)
|
.services
|
||||||
.await
|
.federation
|
||||||
.inspect(|response| {
|
.execute_on(&self.services.client.sender, &server, request)
|
||||||
response
|
.await;
|
||||||
.pdus
|
|
||||||
.iter()
|
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
|
||||||
.filter(|(_, res)| res.is_err())
|
if let Err(e) = result {
|
||||||
.for_each(
|
warn!(
|
||||||
|(pdu_id, res)| warn!(%txn_id, %server, "error sending PDU {pdu_id} to remote server: {res:?}"),
|
%txn_id, %server,
|
||||||
|
"error sending PDU {event_id} to remote server: {e:?}"
|
||||||
);
|
);
|
||||||
})
|
}
|
||||||
.map_err(|e| (Destination::Federation(server.clone()), e))
|
}
|
||||||
.map(|_| Destination::Federation(server))
|
|
||||||
|
match result {
|
||||||
|
| Err(error) => Err((Destination::Federation(server), error)),
|
||||||
|
| Ok(_) => Ok(Destination::Federation(server)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
|
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue