significantly improve get_missing_events fed code

Signed-off-by: June Clementine Strawberry <june@3.dog>
This commit is contained in:
June Clementine Strawberry 2025-04-02 22:44:44 -04:00
parent ea246d91d9
commit 74012c5289
No known key found for this signature in database
2 changed files with 65 additions and 58 deletions

View file

@ -6,11 +6,17 @@ use conduwuit::{
utils::{IterStream, ReadyExt, stream::TryTools}, utils::{IterStream, ReadyExt, stream::TryTools},
}; };
use futures::{FutureExt, StreamExt, TryStreamExt}; use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{MilliSecondsSinceUnixEpoch, api::federation::backfill::get_backfill, uint}; use ruma::{MilliSecondsSinceUnixEpoch, api::federation::backfill::get_backfill};
use super::AccessCheck; use super::AccessCheck;
use crate::Ruma; use crate::Ruma;
/// arbitrary number but synapse's is 100 and we can handle lots of these
/// anyways
const LIMIT_MAX: usize = 150;
/// no spec defined number but we can handle a lot of these
const LIMIT_DEFAULT: usize = 50;
/// # `GET /_matrix/federation/v1/backfill/<room_id>` /// # `GET /_matrix/federation/v1/backfill/<room_id>`
/// ///
/// Retrieves events from before the sender joined the room, if the room's /// Retrieves events from before the sender joined the room, if the room's
@ -30,9 +36,9 @@ pub(crate) async fn get_backfill_route(
let limit = body let limit = body
.limit .limit
.min(uint!(100))
.try_into() .try_into()
.expect("UInt could not be converted to usize"); .unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
let from = body let from = body
.v .v

View file

@ -1,13 +1,19 @@
use axum::extract::State; use axum::extract::State;
use conduwuit::{Error, Result}; use conduwuit::{
use ruma::{ Result, debug, debug_info, debug_warn,
CanonicalJsonValue, EventId, RoomId, utils::{self},
api::{client::error::ErrorKind, federation::event::get_missing_events}, warn,
}; };
use ruma::api::federation::event::get_missing_events;
use super::AccessCheck; use super::AccessCheck;
use crate::Ruma; use crate::Ruma;
/// arbitrary number but synapse's is 20 and we can handle lots of these anyways
const LIMIT_MAX: usize = 50;
/// spec says default is 10
const LIMIT_DEFAULT: usize = 10;
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
/// ///
/// Retrieves events that the sender is missing. /// Retrieves events that the sender is missing.
@ -24,7 +30,11 @@ pub(crate) async fn get_missing_events_route(
.check() .check()
.await?; .await?;
let limit = body.limit.try_into()?; let limit = body
.limit
.try_into()
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
let mut queued_events = body.latest_events.clone(); let mut queued_events = body.latest_events.clone();
// the vec will never have more entries the limit // the vec will never have more entries the limit
@ -32,60 +42,51 @@ pub(crate) async fn get_missing_events_route(
let mut i: usize = 0; let mut i: usize = 0;
while i < queued_events.len() && events.len() < limit { while i < queued_events.len() && events.len() < limit {
if let Ok(pdu) = services let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else {
debug_info!(?body.origin, "Event {} does not exist locally, skipping", &queued_events[i]);
i = i.saturating_add(1);
continue;
};
if pdu.room_id != body.room_id {
warn!(?body.origin,
"Got an event for the wrong room in database. Found {:?} in {:?}, server requested events in {:?}. Skipping.",
pdu.event_id, pdu.room_id, body.room_id
);
i = i.saturating_add(1);
continue;
}
if body.earliest_events.contains(&queued_events[i]) {
i = i.saturating_add(1);
continue;
}
if !services
.rooms .rooms
.timeline .state_accessor
.get_pdu_json(&queued_events[i]) .server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
.await .await
{ {
let room_id_str = pdu debug!(?body.origin, "Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id);
.get("room_id") i = i.saturating_add(1);
.and_then(|val| val.as_str()) continue;
.ok_or_else(|| Error::bad_database("Invalid event in database."))?;
let event_room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room_id in event in database."))?;
if event_room_id != body.room_id {
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event from wrong room."));
}
if body.earliest_events.contains(&queued_events[i]) {
i = i.saturating_add(1);
continue;
}
if !services
.rooms
.state_accessor
.server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
.await
{
i = i.saturating_add(1);
continue;
}
let prev_events = pdu
.get("prev_events")
.and_then(CanonicalJsonValue::as_array)
.unwrap_or_default();
queued_events.extend(
prev_events
.iter()
.map(<&EventId>::try_from)
.filter_map(Result::ok)
.map(ToOwned::to_owned),
);
events.push(
services
.sending
.convert_to_outgoing_federation_event(pdu)
.await,
);
} }
i = i.saturating_add(1);
let Ok(pdu_json) = utils::to_canonical_object(&pdu) else {
debug_warn!(?body.origin, "Failed to convert PDU in database to canonical JSON: {pdu:?}");
i = i.saturating_add(1);
continue;
};
queued_events.extend(pdu.prev_events.iter().map(ToOwned::to_owned));
events.push(
services
.sending
.convert_to_outgoing_federation_event(pdu_json)
.await,
);
} }
Ok(get_missing_events::v1::Response { events }) Ok(get_missing_events::v1::Response { events })