diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 89e47d4e..d4c9a57b 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -266,15 +266,15 @@ pub(super) async fn get_remote_pdu( #[admin_command] pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result { let room_id = self.services.rooms.alias.resolve(&room).await?; - let room_state = self + let room_state: Vec<_> = self .services .rooms .state_accessor .room_state_full(&room_id) .await? .values() - .map(|pdu| pdu.to_state_event()) - .collect::>(); + .map(PduEvent::to_state_event) + .collect(); if room_state.is_empty() { return Ok(RoomMessageEventContent::text_plain( diff --git a/src/api/client/context.rs b/src/api/client/context.rs index 5b6b516e..bf87f5e1 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -103,7 +103,7 @@ pub(crate) async fn get_context_route( .collect() .await; - let lazy = once(&(base_token, (*base_event).clone())) + let lazy = once(&(base_token, base_event.clone())) .chain(events_before.iter()) .chain(events_after.iter()) .stream() diff --git a/src/api/client/report.rs b/src/api/client/report.rs index a0133704..31667323 100644 --- a/src/api/client/report.rs +++ b/src/api/client/report.rs @@ -137,7 +137,7 @@ pub(crate) async fn report_event_route( /// check if reporting user is in the reporting room async fn is_event_report_valid( services: &Services, event_id: &EventId, room_id: &RoomId, sender_user: &UserId, reason: Option<&String>, - score: Option, pdu: &std::sync::Arc, + score: Option, pdu: &PduEvent, ) -> Result<()> { debug_info!("Checking if report from user {sender_user} for event {event_id} in room {room_id} is valid"); diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 0f44f25d..090c70a7 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( event: services .rooms .timeline - .get_pdu_owned(&body.event_id) + .get_pdu(&body.event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", &body.event_id)))) .and_then(|event| async move { services diff --git a/src/api/client/search.rs b/src/api/client/search.rs index 1e5384fe..38468abb 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -181,11 +181,7 @@ async fn procure_room_state(services: &Services, room_id: &RoomId) -> Result( // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu_json checks both) - if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { + if let Ok(local_pdu) = self.services.timeline.get_pdu(id).map_ok(Arc::new).await { trace!("Found {id} in db"); events_with_auth_events.push((id, Some(local_pdu), vec![])); continue; diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 4d2d75d5..19367582 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -1,10 +1,11 @@ use std::{ collections::{hash_map, BTreeMap}, + sync::Arc, time::Instant, }; use conduit::{debug, err, implement, warn, Error, Result}; -use futures::FutureExt; +use futures::{FutureExt, TryFutureExt}; use ruma::{ api::client::error::ErrorKind, events::StateEventType, CanonicalJsonValue, EventId, RoomId, ServerName, UserId, }; @@ -79,6 +80,7 @@ pub async fn handle_incoming_pdu<'a>( .services .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "") + .map_ok(Arc::new) .await?; // Procure the room version diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 2d95ff63..21504b66 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -4,7 +4,7 @@ use std::{ }; use conduit::{debug, debug_info, err, implement, trace, warn, Err, Error, PduEvent, Result}; -use futures::future::ready; +use futures::{future::ready, TryFutureExt}; use ruma::{ api::client::error::ErrorKind, events::StateEventType, @@ -94,7 +94,7 @@ pub(super) async fn handle_outlier_pdu<'a>( // Build map of auth events let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); for id in &incoming_pdu.auth_events { - let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { + let Ok(auth_event) = self.services.timeline.get_pdu(id).map_ok(Arc::new).await else { warn!("Could not find auth event {id}"); continue; }; diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index f6440fe9..3fb7d5c4 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -17,7 +17,11 @@ use std::{ time::Instant, }; -use conduit::{utils::MutexMap, Err, PduEvent, Result, Server}; +use conduit::{ + utils::{MutexMap, TryFutureExtExt}, + Err, PduEvent, Result, Server, +}; +use futures::TryFutureExt; use ruma::{ events::room::create::RoomCreateEventContent, state_res::RoomVersion, EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, @@ -94,7 +98,12 @@ impl Service { async fn event_exists(&self, event_id: Arc) -> bool { self.services.timeline.pdu_exists(&event_id).await } async fn event_fetch(&self, event_id: Arc) -> Option> { - self.services.timeline.get_pdu(&event_id).await.ok() + self.services + .timeline + .get_pdu(&event_id) + .map_ok(Arc::new) + .ok() + .await } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 8bd5f7eb..03e2d2e8 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -445,7 +445,7 @@ impl Service { .into_iter() .map(at!(0)) .zip(auth_pdus.into_iter()) - .filter_map(|((event_type, state_key), pdu)| Some(((event_type, state_key), pdu.ok()?))) + .filter_map(|((event_type, state_key), pdu)| Some(((event_type, state_key), pdu.ok()?.into()))) .collect(); Ok(auth_pdus) diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index a6c2e429..6c67b856 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -46,7 +46,7 @@ impl Data { pub(super) async fn state_full( &self, shortstatehash: ShortStateHash, - ) -> Result>> { + ) -> Result> { let state = self .state_full_pdus(shortstatehash) .await? @@ -57,24 +57,27 @@ impl Data { Ok(state) } - pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result>> { + pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result> { let short_ids = self .state_full_shortids(shortstatehash) .await? .into_iter() .map(at!(1)); - let event_ids = self + let event_ids: Vec = self .services .short .multi_get_eventid_from_short(short_ids) - .await; + .await + .into_iter() + .filter_map(Result::ok) + .collect(); let full_pdus = event_ids - .into_iter() + .iter() .stream() + .then(|event_id| self.services.timeline.get_pdu(event_id)) .ready_filter_map(Result::ok) - .filter_map(|event_id: OwnedEventId| async move { self.services.timeline.get_pdu(&event_id).await.ok() }) .collect() .await; @@ -157,7 +160,7 @@ impl Data { /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). pub(super) async fn state_get( &self, shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, - ) -> Result> { + ) -> Result { self.state_get_id(shortstatehash, event_type, state_key) .and_then(|event_id| async move { self.services.timeline.get_pdu(&event_id).await }) .await @@ -181,7 +184,7 @@ impl Data { /// Returns the full room state. pub(super) async fn room_state_full( &self, room_id: &RoomId, - ) -> Result>> { + ) -> Result> { self.services .state .get_room_shortstatehash(room_id) @@ -192,7 +195,7 @@ impl Data { /// Returns the full room state's pdus. #[allow(unused_qualifications)] // async traits - pub(super) async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result>> { + pub(super) async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result> { self.services .state .get_room_shortstatehash(room_id) @@ -215,7 +218,7 @@ impl Data { /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). pub(super) async fn room_state_get( &self, room_id: &RoomId, event_type: &StateEventType, state_key: &str, - ) -> Result> { + ) -> Result { self.services .state .get_room_shortstatehash(room_id) diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index e08fac66..18f999b4 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -114,7 +114,7 @@ impl Service { pub async fn state_full( &self, shortstatehash: ShortStateHash, - ) -> Result>> { + ) -> Result> { self.db.state_full(shortstatehash).await } @@ -134,7 +134,7 @@ impl Service { #[inline] pub async fn state_get( &self, shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, - ) -> Result> { + ) -> Result { self.db .state_get(shortstatehash, event_type, state_key) .await @@ -311,13 +311,13 @@ impl Service { /// Returns the full room state. #[tracing::instrument(skip(self), level = "debug")] - pub async fn room_state_full(&self, room_id: &RoomId) -> Result>> { + pub async fn room_state_full(&self, room_id: &RoomId) -> Result> { self.db.room_state_full(room_id).await } /// Returns the full room state pdus #[tracing::instrument(skip(self), level = "debug")] - pub async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result>> { + pub async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result> { self.db.room_state_full_pdus(room_id).await } @@ -337,7 +337,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn room_state_get( &self, room_id: &RoomId, event_type: &StateEventType, state_key: &str, - ) -> Result> { + ) -> Result { self.db.room_state_get(room_id, event_type, state_key).await } diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index c15d8e7f..c394dc3b 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -126,14 +126,7 @@ impl Data { /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub(super) async fn get_pdu(&self, event_id: &EventId) -> Result> { - self.get_pdu_owned(event_id).await.map(Arc::new) - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub(super) async fn get_pdu_owned(&self, event_id: &EventId) -> Result { + pub(super) async fn get_pdu(&self, event_id: &EventId) -> Result { let accepted = self.get_non_outlier_pdu(event_id).boxed(); let outlier = self .eventid_outlierpdu diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index c0b48b9b..07b406c4 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -242,12 +242,7 @@ impl Service { /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub async fn get_pdu(&self, event_id: &EventId) -> Result> { self.db.get_pdu(event_id).await } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub async fn get_pdu_owned(&self, event_id: &EventId) -> Result { self.db.get_pdu_owned(event_id).await } + pub async fn get_pdu(&self, event_id: &EventId) -> Result { self.db.get_pdu(event_id).await } /// Checks if pdu exists /// @@ -327,11 +322,11 @@ impl Service { ); unsigned.insert( String::from("prev_sender"), - CanonicalJsonValue::String(prev_state.sender.clone().to_string()), + CanonicalJsonValue::String(prev_state.sender.to_string()), ); unsigned.insert( String::from("replaces_state"), - CanonicalJsonValue::String(prev_state.event_id.clone().to_string()), + CanonicalJsonValue::String(prev_state.event_id.to_string()), ); } }