From 2aeee4f5095993c40463193dc491d8a0279c48dc Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 27 Nov 2024 05:57:20 +0000 Subject: [PATCH] parallel query for outlier/non-outlier pdu data Signed-off-by: Jason Volk --- src/service/rooms/timeline/data.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 22a6c1d0..c15d8e7f 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -12,7 +12,7 @@ use conduit::{ Err, PduCount, PduEvent, Result, }; use database::{Database, Deserialized, Json, KeyVal, Map}; -use futures::{Stream, StreamExt}; +use futures::{future::select_ok, FutureExt, Stream, StreamExt}; use ruma::{api::Direction, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId}; use tokio::sync::Mutex; @@ -82,11 +82,14 @@ impl Data { /// Returns the json of a pdu. pub(super) async fn get_pdu_json(&self, event_id: &EventId) -> Result { - if let Ok(pdu) = self.get_non_outlier_pdu_json(event_id).await { - return Ok(pdu); - } + let accepted = self.get_non_outlier_pdu_json(event_id).boxed(); + let outlier = self + .eventid_outlierpdu + .get(event_id) + .map(Deserialized::deserialized) + .boxed(); - self.eventid_outlierpdu.get(event_id).await.deserialized() + select_ok([accepted, outlier]).await.map(at!(0)) } /// Returns the json of a pdu. @@ -131,11 +134,14 @@ impl Data { /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. pub(super) async fn get_pdu_owned(&self, event_id: &EventId) -> Result { - if let Ok(pdu) = self.get_non_outlier_pdu(event_id).await { - return Ok(pdu); - } + let accepted = self.get_non_outlier_pdu(event_id).boxed(); + let outlier = self + .eventid_outlierpdu + .get(event_id) + .map(Deserialized::deserialized) + .boxed(); - self.eventid_outlierpdu.get(event_id).await.deserialized() + select_ok([accepted, outlier]).await.map(at!(0)) } /// Like get_non_outlier_pdu(), but without the expense of fetching and