From 320b0680bdbbb2f08790a178ea2a75c5a0dee11f Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 30 Nov 2024 08:31:28 +0000 Subject: [PATCH] pipeline various loops Signed-off-by: Jason Volk --- src/api/client/context.rs | 39 ++++++++++++------------ src/api/client/message.rs | 7 +++-- src/service/rooms/search/mod.rs | 12 +++++--- src/service/rooms/state/mod.rs | 29 +++++++++++------- src/service/rooms/state_accessor/data.rs | 14 ++++----- 5 files changed, 56 insertions(+), 45 deletions(-) diff --git a/src/api/client/context.rs b/src/api/client/context.rs index 652e17f4..af4e26f0 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -1,9 +1,13 @@ -use std::{collections::HashMap, iter::once}; +use std::iter::once; use axum::extract::State; use conduit::{ - at, err, error, - utils::{future::TryExtExt, stream::ReadyExt, IterStream}, + at, err, + utils::{ + future::TryExtExt, + stream::{BroadbandExt, ReadyExt, WidebandExt}, + IterStream, + }, Err, Result, }; use futures::{future::try_join, StreamExt, TryFutureExt}; @@ -85,8 +89,8 @@ pub(crate) async fn get_context_route( .pdus_rev(Some(sender_user), room_id, Some(base_token)) .await? .ready_filter_map(|item| event_filter(item, filter)) - .filter_map(|item| ignored_filter(&services, item, sender_user)) - .filter_map(|item| visibility_filter(&services, item, sender_user)) + .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) + .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit / 2) .collect() .await; @@ -97,8 +101,8 @@ pub(crate) async fn get_context_route( .pdus(Some(sender_user), room_id, Some(base_token)) .await? .ready_filter_map(|item| event_filter(item, filter)) - .filter_map(|item| ignored_filter(&services, item, sender_user)) - .filter_map(|item| visibility_filter(&services, item, sender_user)) + .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) + .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit / 2) .collect() .await; @@ -124,7 +128,7 @@ pub(crate) async fn get_context_route( .await .map_err(|e| err!(Database("State hash not found: {e}")))?; - let state_ids: HashMap<_, OwnedEventId> = services + let state_ids = services .rooms .state_accessor .state_full_ids(shortstatehash) @@ -133,17 +137,17 @@ pub(crate) async fn get_context_route( let lazy = &lazy; let state: Vec<_> = state_ids - .iter() + .into_iter() .stream() - .filter_map(|(shortstatekey, event_id)| { + .broad_filter_map(|(shortstatekey, event_id)| { services .rooms .short - .get_statekey_from_short(*shortstatekey) + .get_statekey_from_short(shortstatekey) .map_ok(move |(event_type, state_key)| (event_type, state_key, event_id)) .ok() }) - .filter_map(|(event_type, state_key, event_id)| async move { + .ready_filter_map(|(event_type, state_key, event_id)| { if lazy_load_enabled && event_type == StateEventType::RoomMember { let user_id: &UserId = state_key.as_str().try_into().ok()?; if !lazy.contains(user_id) { @@ -151,15 +155,10 @@ pub(crate) async fn get_context_route( } } - services - .rooms - .timeline - .get_pdu(event_id) - .await - .inspect_err(|_| error!("Pdu in state not found: {event_id}")) - .map(|pdu| pdu.to_state_event()) - .ok() + Some(event_id) }) + .broad_filter_map(|event_id: OwnedEventId| async move { services.rooms.timeline.get_pdu(&event_id).await.ok() }) + .map(|pdu| pdu.to_state_event()) .collect() .await; diff --git a/src/api/client/message.rs b/src/api/client/message.rs index d8043855..242c1681 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -5,6 +5,7 @@ use conduit::{ at, is_equal_to, utils::{ result::{FlatOk, LogErr}, + stream::{BroadbandExt, WidebandExt}, IterStream, ReadyExt, }, Event, PduCount, Result, @@ -115,8 +116,8 @@ pub(crate) async fn get_message_events_route( let events: Vec<_> = it .ready_take_while(|(count, _)| Some(*count) != to) .ready_filter_map(|item| event_filter(item, filter)) - .filter_map(|item| ignored_filter(&services, item, sender_user)) - .filter_map(|item| visibility_filter(&services, item, sender_user)) + .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) + .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit) .collect() .await; @@ -132,7 +133,7 @@ pub(crate) async fn get_message_events_route( let state = lazy .iter() .stream() - .filter_map(|user_id| get_member_event(&services, room_id, user_id)) + .broad_filter_map(|user_id| get_member_event(&services, room_id, user_id)) .collect() .await; diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index d59d1d11..ae3567ce 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -3,7 +3,11 @@ use std::sync::Arc; use arrayvec::ArrayVec; use conduit::{ implement, - utils::{set, stream::TryIgnore, ArrayVecExt, IterStream, ReadyExt}, + utils::{ + set, + stream::{TryIgnore, WidebandExt}, + ArrayVecExt, IterStream, ReadyExt, + }, PduCount, PduEvent, Result, }; use database::{keyval::Val, Map}; @@ -107,7 +111,7 @@ pub async fn search_pdus<'a>( let pdus = pdu_ids .into_iter() .stream() - .filter_map(move |result_pdu_id: RawPduId| async move { + .wide_filter_map(move |result_pdu_id: RawPduId| async move { self.services .timeline .get_pdu_from_id(&result_pdu_id) @@ -116,7 +120,7 @@ pub async fn search_pdus<'a>( }) .ready_filter(|pdu| !pdu.is_redacted()) .ready_filter(|pdu| pdu.matches(&query.criteria.filter)) - .filter_map(move |pdu| async move { + .wide_filter_map(move |pdu| async move { self.services .state_accessor .user_can_see_event(query.user_id?, &pdu.room_id, &pdu.event_id) @@ -146,7 +150,7 @@ pub async fn search_pdu_ids(&self, query: &RoomQuery<'_>) -> Result, shortroomid: ShortRoomId) -> Vec> { tokenize(&query.criteria.search_term) .stream() - .then(|word| async move { + .wide_then(|word| async move { self.search_pdu_ids_query_words(shortroomid, &word) .collect::>() .await diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 03e2d2e8..838deacd 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -8,7 +8,11 @@ use std::{ use conduit::{ at, err, result::FlatOk, - utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt}, + utils::{ + calculate_hash, + stream::{BroadbandExt, TryIgnore}, + IterStream, MutexMap, MutexMapGuard, ReadyExt, + }, warn, PduEvent, Result, }; use database::{Deserialized, Ignore, Interfix, Map}; @@ -405,7 +409,7 @@ impl Service { let mut sauthevents: HashMap<_, _> = state_res::auth_types_for_event(kind, sender, state_key, content)? .iter() .stream() - .filter_map(|(event_type, state_key)| { + .broad_filter_map(|(event_type, state_key)| { self.services .short .get_shortstatekey(event_type, state_key) @@ -430,24 +434,27 @@ impl Service { }) .collect(); - let auth_pdus: Vec<_> = self + let auth_pdus = self .services .short .multi_get_eventid_from_short(auth_state.iter().map(at!(1))) .await .into_iter() .stream() - .and_then(|event_id: OwnedEventId| async move { self.services.timeline.get_pdu(&event_id).await }) + .zip(auth_state.into_iter().stream().map(at!(0))) + .ready_filter_map(|(event_id, tsk)| Some((tsk, event_id.ok()?))) + .broad_filter_map(|(tsk, event_id): (_, OwnedEventId)| async move { + self.services + .timeline + .get_pdu(&event_id) + .await + .map(Arc::new) + .map(move |pdu| (tsk, pdu)) + .ok() + }) .collect() .await; - let auth_pdus = auth_state - .into_iter() - .map(at!(0)) - .zip(auth_pdus.into_iter()) - .filter_map(|((event_type, state_key), pdu)| Some(((event_type, state_key), pdu.ok()?.into()))) - .collect(); - Ok(auth_pdus) } } diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 7760d5b6..2a670066 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -2,7 +2,7 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use conduit::{ at, err, - utils::stream::{IterStream, ReadyExt}, + utils::stream::{BroadbandExt, IterStream}, PduEvent, Result, }; use database::{Deserialized, Map}; @@ -65,20 +65,20 @@ impl Data { .into_iter() .map(at!(1)); - let event_ids: Vec = self + let event_ids = self .services .short .multi_get_eventid_from_short(short_ids) .await .into_iter() - .filter_map(Result::ok) - .collect(); + .filter_map(Result::ok); let full_pdus = event_ids - .iter() + .into_iter() .stream() - .then(|event_id| self.services.timeline.get_pdu(event_id)) - .ready_filter_map(Result::ok) + .broad_filter_map( + |event_id: OwnedEventId| async move { self.services.timeline.get_pdu(&event_id).await.ok() }, + ) .collect() .await;