From 677316631a029fdc23fb48092a1af14284e26448 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 26 Jan 2025 06:15:01 +0000 Subject: [PATCH] pipeline prologue of handle_incoming_pdu simplify room_version/first_pdu_in_room argument passing Signed-off-by: Jason Volk --- .../fetch_and_handle_outliers.rs | 13 ++-- src/service/rooms/event_handler/fetch_prev.rs | 17 ++--- .../rooms/event_handler/fetch_state.rs | 5 +- .../event_handler/handle_incoming_pdu.rs | 72 +++++++++++-------- .../rooms/event_handler/handle_outlier_pdu.rs | 1 - .../rooms/event_handler/handle_prev_pdu.rs | 8 +-- .../event_handler/upgrade_outlier_pdu.rs | 2 +- 7 files changed, 62 insertions(+), 56 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 84d0edd0..540ebb64 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -10,10 +10,11 @@ use conduwuit::{ }; use futures::TryFutureExt; use ruma::{ - api::federation::event::get_event, CanonicalJsonValue, OwnedEventId, RoomId, RoomVersionId, - ServerName, + api::federation::event::get_event, CanonicalJsonValue, OwnedEventId, RoomId, ServerName, }; +use super::get_room_version_id; + /// Find the event and auth it. Once the event is validated (steps 1 - 8) /// it is appended to the outliers Tree. /// @@ -30,7 +31,6 @@ pub(super) async fn fetch_and_handle_outliers<'a>( events: &'a [OwnedEventId], create_event: &'a PduEvent, room_id: &'a RoomId, - room_version_id: &'a RoomVersionId, ) -> Vec<(Arc, Option>)> { let back_off = |id| match self .services @@ -113,8 +113,13 @@ pub(super) async fn fetch_and_handle_outliers<'a>( { | Ok(res) => { debug!("Got {next_id} over federation"); + let Ok(room_version_id) = get_room_version_id(create_event) else { + back_off((*next_id).to_owned()); + continue; + }; + let Ok((calculated_event_id, value)) = - pdu::gen_event_id_canonical_json(&res.pdu, room_version_id) + pdu::gen_event_id_canonical_json(&res.pdu, &room_version_id) else { back_off((*next_id).to_owned()); continue; diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 5966aeba..aea70739 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -8,8 +8,7 @@ use futures::{future, FutureExt}; use ruma::{ int, state_res::{self}, - uint, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, RoomVersionId, - ServerName, + uint, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, UInt, }; use super::check_room_id; @@ -26,7 +25,7 @@ pub(super) async fn fetch_prev( origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, - room_version_id: &RoomVersionId, + first_ts_in_room: UInt, initial_set: Vec, ) -> Result<( Vec, @@ -36,21 +35,13 @@ pub(super) async fn fetch_prev( let mut eventid_info = HashMap::new(); let mut todo_outlier_stack: VecDeque = initial_set.into(); - let first_pdu_in_room = self.services.timeline.first_pdu_in_room(room_id).await?; - let mut amount = 0; while let Some(prev_event_id) = todo_outlier_stack.pop_front() { self.services.server.check_running()?; if let Some((pdu, mut json_opt)) = self - .fetch_and_handle_outliers( - origin, - &[prev_event_id.clone()], - create_event, - room_id, - room_version_id, - ) + .fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id) .boxed() .await .pop() @@ -74,7 +65,7 @@ pub(super) async fn fetch_prev( } if let Some(json) = json_opt { - if pdu.origin_server_ts > first_pdu_in_room.origin_server_ts { + if pdu.origin_server_ts > first_ts_in_room { amount = amount.saturating_add(1); for prev_prev in &pdu.prev_events { if !graph.contains_key(prev_prev) { diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index 0892655e..4f2580db 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -4,7 +4,7 @@ use conduwuit::{debug, debug_warn, implement, Err, Error, PduEvent, Result}; use futures::FutureExt; use ruma::{ api::federation::event::get_room_state_ids, events::StateEventType, EventId, OwnedEventId, - RoomId, RoomVersionId, ServerName, + RoomId, ServerName, }; use crate::rooms::short::ShortStateKey; @@ -23,7 +23,6 @@ pub(super) async fn fetch_state( origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, - room_version_id: &RoomVersionId, event_id: &EventId, ) -> Result>> { let res = self @@ -38,7 +37,7 @@ pub(super) async fn fetch_state( debug!("Fetching state events"); let state_vec = self - .fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id, room_version_id) + .fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id) .boxed() .await; diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 94d4bcc7..7db71961 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -1,14 +1,15 @@ use std::{ collections::{hash_map, BTreeMap}, - sync::Arc, time::Instant, }; use conduwuit::{debug, err, implement, warn, Err, Result}; -use futures::{FutureExt, TryFutureExt}; +use futures::{ + future::{try_join5, OptionFuture}, + FutureExt, +}; use ruma::{events::StateEventType, CanonicalJsonValue, EventId, RoomId, ServerName, UserId}; -use super::{check_room_id, get_room_version_id}; use crate::rooms::timeline::RawPduId; /// When receiving an event one needs to: @@ -59,19 +60,13 @@ pub async fn handle_incoming_pdu<'a>( } // 1.1 Check the server is in the room - if !self.services.metadata.exists(room_id).await { - return Err!(Request(NotFound("Room is unknown to this server"))); - } + let meta_exists = self.services.metadata.exists(room_id).map(Ok); // 1.2 Check if the room is disabled - if self.services.metadata.is_disabled(room_id).await { - return Err!(Request(Forbidden( - "Federation of this room is currently disabled on this server." - ))); - } + let is_disabled = self.services.metadata.is_disabled(room_id).map(Ok); // 1.3.1 Check room ACL on origin field/server - self.acl_check(origin, room_id).await?; + let origin_acl_check = self.acl_check(origin, room_id); // 1.3.2 Check room ACL on sender's server name let sender: &UserId = value @@ -79,36 +74,53 @@ pub async fn handle_incoming_pdu<'a>( .try_into() .map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?; - if sender.server_name() != origin { - self.acl_check(sender.server_name(), room_id).await?; - } + let sender_acl_check: OptionFuture<_> = sender + .server_name() + .ne(origin) + .then(|| self.acl_check(sender.server_name(), room_id)) + .into(); // Fetch create event - let create_event = self - .services - .state_accessor - .room_state_get(room_id, &StateEventType::RoomCreate, "") - .map_ok(Arc::new) - .await?; + let create_event = + self.services + .state_accessor + .room_state_get(room_id, &StateEventType::RoomCreate, ""); - // Procure the room version - let room_version_id = get_room_version_id(&create_event)?; + let (meta_exists, is_disabled, (), (), create_event) = try_join5( + meta_exists, + is_disabled, + origin_acl_check, + sender_acl_check.map(|o| o.unwrap_or(Ok(()))), + create_event, + ) + .await?; - let first_pdu_in_room = self.services.timeline.first_pdu_in_room(room_id).await?; + if !meta_exists { + return Err!(Request(NotFound("Room is unknown to this server"))); + } + + if is_disabled { + return Err!(Request(Forbidden("Federation of this room is disabled by this server."))); + } let (incoming_pdu, val) = self .handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false) - .boxed() .await?; - check_room_id(room_id, &incoming_pdu)?; - // 8. if not timeline event: stop if !is_timeline_event { return Ok(None); } + // Skip old events - if incoming_pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { + let first_ts_in_room = self + .services + .timeline + .first_pdu_in_room(room_id) + .await? + .origin_server_ts; + + if incoming_pdu.origin_server_ts < first_ts_in_room { return Ok(None); } @@ -119,7 +131,7 @@ pub async fn handle_incoming_pdu<'a>( origin, &create_event, room_id, - &room_version_id, + first_ts_in_room, incoming_pdu.prev_events.clone(), ) .await?; @@ -134,7 +146,7 @@ pub async fn handle_incoming_pdu<'a>( room_id, &mut eventid_info, &create_event, - &first_pdu_in_room, + first_ts_in_room, &prev_id, ) .await diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 3ad73295..a35aabe0 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -84,7 +84,6 @@ pub(super) async fn handle_outlier_pdu<'a>( &incoming_pdu.auth_events, create_event, room_id, - &room_version_id, )) .await; } diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 2bec4eba..32ab505f 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -7,7 +7,7 @@ use std::{ use conduwuit::{ debug, implement, utils::continue_exponential_backoff_secs, Err, PduEvent, Result, }; -use ruma::{CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName}; +use ruma::{CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, UInt}; #[implement(super::Service)] #[allow(clippy::type_complexity)] @@ -27,8 +27,8 @@ pub(super) async fn handle_prev_pdu<'a>( OwnedEventId, (Arc, BTreeMap), >, - create_event: &Arc, - first_pdu_in_room: &PduEvent, + create_event: &PduEvent, + first_ts_in_room: UInt, prev_id: &EventId, ) -> Result { // Check for disabled again because it might have changed @@ -62,7 +62,7 @@ pub(super) async fn handle_prev_pdu<'a>( if let Some((pdu, json)) = eventid_info.remove(prev_id) { // Skip old events - if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { + if pdu.origin_server_ts < first_ts_in_room { return Ok(()); } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 8adf4246..f0c8f0c5 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -63,7 +63,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( if state_at_incoming_event.is_none() { state_at_incoming_event = self - .fetch_state(origin, create_event, room_id, &room_version_id, &incoming_pdu.event_id) + .fetch_state(origin, create_event, room_id, &incoming_pdu.event_id) .await?; }