diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 9d702cd7..4429e912 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -6,7 +6,7 @@ use std::{ }; use conduit::{ - err, + at, err, result::FlatOk, utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt}, warn, PduEvent, Result, @@ -398,59 +398,52 @@ impl Service { return Ok(HashMap::new()); }; - let auth_events = state_res::auth_types_for_event(kind, sender, state_key, content)?; - - let mut sauthevents: HashMap<_, _> = auth_events + let mut sauthevents: HashMap<_, _> = state_res::auth_types_for_event(kind, sender, state_key, content)? .iter() .stream() .filter_map(|(event_type, state_key)| { self.services .short .get_shortstatekey(event_type, state_key) - .map_ok(move |s| (s, (event_type, state_key))) + .map_ok(move |ssk| (ssk, (event_type, state_key))) .map(Result::ok) }) + .map(|(ssk, (event_type, state_key))| (ssk, (event_type.to_owned(), state_key.to_owned()))) .collect() .await; - let full_state = self + let auth_state: Vec<_> = self .services - .state_compressor - .load_shortstatehash_info(shortstatehash) + .state_accessor + .state_full_shortids(shortstatehash) .await - .map_err(|e| { - err!(Database( - "Missing shortstatehash info for {room_id:?} at {shortstatehash:?}: {e:?}" - )) - })? - .pop() - .expect("there is always one layer") - .full_state; + .map_err(|e| err!(Database(error!(?room_id, ?shortstatehash, "{e:?}"))))? + .into_iter() + .filter_map(|(shortstatekey, shorteventid)| { + sauthevents + .remove(&shortstatekey) + .map(|(event_type, state_key)| ((event_type, state_key), shorteventid)) + }) + .collect(); - let mut ret = HashMap::new(); - for &compressed in full_state.iter() { - let (shortstatekey, shorteventid) = parse_compressed_state_event(compressed); + let auth_pdus: Vec<_> = self + .services + .short + .multi_get_eventid_from_short(auth_state.iter().map(at!(1))) + .await + .into_iter() + .stream() + .and_then(|event_id| async move { self.services.timeline.get_pdu(&event_id).await }) + .collect() + .await; - let Some((ty, state_key)) = sauthevents.remove(&shortstatekey) else { - continue; - }; + let auth_pdus = auth_state + .into_iter() + .map(at!(0)) + .zip(auth_pdus.into_iter()) + .filter_map(|((event_type, state_key), pdu)| Some(((event_type, state_key), pdu.ok()?))) + .collect(); - let Ok(event_id) = self - .services - .short - .get_eventid_from_short(shorteventid) - .await - else { - continue; - }; - - let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await else { - continue; - }; - - ret.insert((ty.to_owned(), state_key.to_owned()), pdu); - } - - Ok(ret) + Ok(auth_pdus) } } diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 8df0d8b0..80046d77 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,8 +1,8 @@ use std::{collections::HashMap, sync::Arc}; use conduit::{ - err, - utils::{future::TryExtExt, IterStream}, + at, err, + utils::stream::{IterStream, ReadyExt}, PduEvent, Result, }; use database::{Deserialized, Map}; @@ -49,52 +49,63 @@ impl Data { pub(super) async fn state_full( &self, shortstatehash: ShortStateHash, ) -> Result>> { - Ok(self + let state = self .state_full_pdus(shortstatehash) .await? .into_iter() .filter_map(|pdu| Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu))) - .collect()) + .collect(); + + Ok(state) } pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result>> { - Ok(self + let short_ids = self .state_full_shortids(shortstatehash) .await? - .iter() + .into_iter() + .map(at!(1)); + + let event_ids = self + .services + .short + .multi_get_eventid_from_short(short_ids) + .await; + + let full_pdus = event_ids + .into_iter() .stream() - .filter_map(|(_, shorteventid)| { - self.services - .short - .get_eventid_from_short(*shorteventid) - .ok() - }) - .filter_map(|eventid| async move { self.services.timeline.get_pdu(&eventid).await.ok() }) + .ready_filter_map(Result::ok) + .filter_map(|event_id| async move { self.services.timeline.get_pdu(&event_id).await.ok() }) .collect() - .await) + .await; + + Ok(full_pdus) } pub(super) async fn state_full_ids(&self, shortstatehash: ShortStateHash) -> Result>> { - Ok(self - .state_full_shortids(shortstatehash) - .await? - .iter() - .stream() - .filter_map(|(shortstatekey, shorteventid)| { - self.services - .short - .get_eventid_from_short(*shorteventid) - .map_ok(move |eventid| (*shortstatekey, eventid)) - .ok() - }) - .collect() - .await) + let short_ids = self.state_full_shortids(shortstatehash).await?; + + let event_ids = self + .services + .short + .multi_get_eventid_from_short(short_ids.iter().map(at!(1))) + .await; + + let full_ids = short_ids + .into_iter() + .map(at!(0)) + .zip(event_ids.into_iter()) + .filter_map(|(shortstatekey, event_id)| Some((shortstatekey, event_id.ok()?))) + .collect(); + + Ok(full_ids) } pub(super) async fn state_full_shortids( &self, shortstatehash: ShortStateHash, ) -> Result> { - Ok(self + let shortids = self .services .state_compressor .load_shortstatehash_info(shortstatehash) @@ -106,7 +117,9 @@ impl Data { .iter() .copied() .map(parse_compressed_state_event) - .collect()) + .collect(); + + Ok(shortids) } /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 89db88a6..e08fac66 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -41,7 +41,10 @@ use serde::Deserialize; use self::data::Data; use crate::{ rooms, - rooms::{short::ShortStateHash, state::RoomMutexGuard}, + rooms::{ + short::{ShortEventId, ShortStateHash, ShortStateKey}, + state::RoomMutexGuard, + }, Dep, }; @@ -102,6 +105,13 @@ impl Service { self.db.state_full_ids(shortstatehash).await } + #[inline] + pub async fn state_full_shortids( + &self, shortstatehash: ShortStateHash, + ) -> Result> { + self.db.state_full_shortids(shortstatehash).await + } + pub async fn state_full( &self, shortstatehash: ShortStateHash, ) -> Result>> {