refactor incoming prev events loop; mitigate large future

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-02 09:53:42 +00:00
parent 6a073b4fa4
commit d036394ec7
2 changed files with 88 additions and 79 deletions

View file

@ -3,9 +3,12 @@ use std::{
time::Instant, time::Instant,
}; };
use conduwuit::{Err, Result, debug, debug::INFO_SPAN_LEVEL, err, implement, warn}; use conduwuit::{
Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, utils::stream::IterStream,
warn,
};
use futures::{ use futures::{
FutureExt, FutureExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, try_join5}, future::{OptionFuture, try_join5},
}; };
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType}; use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
@ -86,7 +89,7 @@ pub async fn handle_incoming_pdu<'a>(
.state_accessor .state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, ""); .room_state_get(room_id, &StateEventType::RoomCreate, "");
let (meta_exists, is_disabled, (), (), create_event) = try_join5( let (meta_exists, is_disabled, (), (), ref create_event) = try_join5(
meta_exists, meta_exists,
is_disabled, is_disabled,
origin_acl_check, origin_acl_check,
@ -104,7 +107,7 @@ pub async fn handle_incoming_pdu<'a>(
} }
let (incoming_pdu, val) = self let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false) .handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.await?; .await?;
// 8. if not timeline event: stop // 8. if not timeline event: stop
@ -129,66 +132,71 @@ pub async fn handle_incoming_pdu<'a>(
let (sorted_prev_events, mut eventid_info) = self let (sorted_prev_events, mut eventid_info) = self
.fetch_prev( .fetch_prev(
origin, origin,
&create_event, create_event,
room_id, room_id,
first_ts_in_room, first_ts_in_room,
incoming_pdu.prev_events.clone(), incoming_pdu.prev_events.clone(),
) )
.await?; .await?;
debug!(events = ?sorted_prev_events, "Got previous events"); debug!(
for prev_id in sorted_prev_events { events = ?sorted_prev_events,
self.services.server.check_running()?; "Handling previous events"
if let Err(e) = self );
.handle_prev_pdu(
sorted_prev_events
.iter()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin, origin,
event_id, event_id,
room_id, room_id,
&mut eventid_info, eventid_info.remove(prev_id),
&create_event, create_event,
first_ts_in_room, first_ts_in_room,
&prev_id, prev_id,
) )
.await .inspect_err(move |e| {
{ warn!("Prev {prev_id} failed: {e}");
use hash_map::Entry; match self
.services
let now = Instant::now(); .globals
warn!("Prev event {prev_id} failed: {e}"); .bad_event_ratelimiter
.write()
match self .expect("locked")
.services .entry(prev_id.into())
.globals {
.bad_event_ratelimiter | hash_map::Entry::Vacant(e) => {
.write() e.insert((Instant::now(), 1));
.expect("locked") },
.entry(prev_id) | hash_map::Entry::Occupied(mut e) => {
{ let tries = e.get().1.saturating_add(1);
| Entry::Vacant(e) => { *e.get_mut() = (Instant::now(), tries);
e.insert((now, 1)); },
}, }
| Entry::Occupied(mut e) => { })
*e.get_mut() = (now, e.get().1.saturating_add(1)); .map(|_| self.services.server.check_running())
}, })
} .boxed()
} .await?;
}
// Done with prev events, now handling the incoming event // Done with prev events, now handling the incoming event
let start_time = Instant::now(); let start_time = Instant::now();
self.federation_handletime self.federation_handletime
.write() .write()
.expect("locked") .expect("locked")
.insert(room_id.to_owned(), (event_id.to_owned(), start_time)); .insert(room_id.into(), (event_id.to_owned(), start_time));
let r = self defer! {{
.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id) self.federation_handletime
.await; .write()
.expect("locked")
.remove(room_id);
}};
self.federation_handletime self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.write() .boxed()
.expect("locked") .await
.remove(&room_id.to_owned());
r
} }

View file

@ -1,13 +1,10 @@
use std::{ use std::{collections::BTreeMap, time::Instant};
collections::{BTreeMap, HashMap},
time::Instant,
};
use conduwuit::{ use conduwuit::{
Err, PduEvent, Result, debug, debug::INFO_SPAN_LEVEL, implement, Err, PduEvent, Result, debug, debug::INFO_SPAN_LEVEL, defer, implement,
utils::continue_exponential_backoff_secs, utils::continue_exponential_backoff_secs,
}; };
use ruma::{CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, UInt}; use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UInt};
#[implement(super::Service)] #[implement(super::Service)]
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -23,10 +20,10 @@ pub(super) async fn handle_prev_pdu<'a>(
origin: &'a ServerName, origin: &'a ServerName,
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
eventid_info: &mut HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>, eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &PduEvent, create_event: &'a PduEvent,
first_ts_in_room: UInt, first_ts_in_room: UInt,
prev_id: &EventId, prev_id: &'a EventId,
) -> Result { ) -> Result {
// Check for disabled again because it might have changed // Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await { if self.services.metadata.is_disabled(room_id).await {
@ -57,31 +54,35 @@ pub(super) async fn handle_prev_pdu<'a>(
} }
} }
if let Some((pdu, json)) = eventid_info.remove(prev_id) { let Some((pdu, json)) = eventid_info else {
// Skip old events return Ok(());
if pdu.origin_server_ts < first_ts_in_room { };
return Ok(());
}
let start_time = Instant::now(); // Skip old events
self.federation_handletime if pdu.origin_server_ts < first_ts_in_room {
.write() return Ok(());
.expect("locked")
.insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time));
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
self.federation_handletime
.write()
.expect("locked")
.remove(&room_id.to_owned());
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
} }
let start_time = Instant::now();
self.federation_handletime
.write()
.expect("locked")
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
defer! {{
self.federation_handletime
.write()
.expect("locked")
.remove(room_id);
}};
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
Ok(()) Ok(())
} }