use multiget for shortid conversions

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-22 16:01:27 +00:00
parent 3789d60b6a
commit c519a40cb8
3 changed files with 86 additions and 70 deletions

View file

@ -6,7 +6,7 @@ use std::{
}; };
use conduit::{ use conduit::{
err, at, err,
result::FlatOk, result::FlatOk,
utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt}, utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt},
warn, PduEvent, Result, warn, PduEvent, Result,
@ -398,59 +398,52 @@ impl Service {
return Ok(HashMap::new()); return Ok(HashMap::new());
}; };
let auth_events = state_res::auth_types_for_event(kind, sender, state_key, content)?; let mut sauthevents: HashMap<_, _> = state_res::auth_types_for_event(kind, sender, state_key, content)?
let mut sauthevents: HashMap<_, _> = auth_events
.iter() .iter()
.stream() .stream()
.filter_map(|(event_type, state_key)| { .filter_map(|(event_type, state_key)| {
self.services self.services
.short .short
.get_shortstatekey(event_type, state_key) .get_shortstatekey(event_type, state_key)
.map_ok(move |s| (s, (event_type, state_key))) .map_ok(move |ssk| (ssk, (event_type, state_key)))
.map(Result::ok) .map(Result::ok)
}) })
.map(|(ssk, (event_type, state_key))| (ssk, (event_type.to_owned(), state_key.to_owned())))
.collect() .collect()
.await; .await;
let full_state = self let auth_state: Vec<_> = self
.services .services
.state_compressor .state_accessor
.load_shortstatehash_info(shortstatehash) .state_full_shortids(shortstatehash)
.await .await
.map_err(|e| { .map_err(|e| err!(Database(error!(?room_id, ?shortstatehash, "{e:?}"))))?
err!(Database( .into_iter()
"Missing shortstatehash info for {room_id:?} at {shortstatehash:?}: {e:?}" .filter_map(|(shortstatekey, shorteventid)| {
)) sauthevents
})? .remove(&shortstatekey)
.pop() .map(|(event_type, state_key)| ((event_type, state_key), shorteventid))
.expect("there is always one layer") })
.full_state; .collect();
let mut ret = HashMap::new(); let auth_pdus: Vec<_> = self
for &compressed in full_state.iter() {
let (shortstatekey, shorteventid) = parse_compressed_state_event(compressed);
let Some((ty, state_key)) = sauthevents.remove(&shortstatekey) else {
continue;
};
let Ok(event_id) = self
.services .services
.short .short
.get_eventid_from_short(shorteventid) .multi_get_eventid_from_short(auth_state.iter().map(at!(1)))
.await .await
else { .into_iter()
continue; .stream()
}; .and_then(|event_id| async move { self.services.timeline.get_pdu(&event_id).await })
.collect()
.await;
let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await else { let auth_pdus = auth_state
continue; .into_iter()
}; .map(at!(0))
.zip(auth_pdus.into_iter())
.filter_map(|((event_type, state_key), pdu)| Some(((event_type, state_key), pdu.ok()?)))
.collect();
ret.insert((ty.to_owned(), state_key.to_owned()), pdu); Ok(auth_pdus)
}
Ok(ret)
} }
} }

View file

@ -1,8 +1,8 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use conduit::{ use conduit::{
err, at, err,
utils::{future::TryExtExt, IterStream}, utils::stream::{IterStream, ReadyExt},
PduEvent, Result, PduEvent, Result,
}; };
use database::{Deserialized, Map}; use database::{Deserialized, Map};
@ -49,52 +49,63 @@ impl Data {
pub(super) async fn state_full( pub(super) async fn state_full(
&self, shortstatehash: ShortStateHash, &self, shortstatehash: ShortStateHash,
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> { ) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
Ok(self let state = self
.state_full_pdus(shortstatehash) .state_full_pdus(shortstatehash)
.await? .await?
.into_iter() .into_iter()
.filter_map(|pdu| Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu))) .filter_map(|pdu| Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu)))
.collect()) .collect();
Ok(state)
} }
pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result<Vec<Arc<PduEvent>>> { pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result<Vec<Arc<PduEvent>>> {
Ok(self let short_ids = self
.state_full_shortids(shortstatehash) .state_full_shortids(shortstatehash)
.await? .await?
.iter() .into_iter()
.stream() .map(at!(1));
.filter_map(|(_, shorteventid)| {
self.services let event_ids = self
.services
.short .short
.get_eventid_from_short(*shorteventid) .multi_get_eventid_from_short(short_ids)
.ok() .await;
})
.filter_map(|eventid| async move { self.services.timeline.get_pdu(&eventid).await.ok() }) let full_pdus = event_ids
.into_iter()
.stream()
.ready_filter_map(Result::ok)
.filter_map(|event_id| async move { self.services.timeline.get_pdu(&event_id).await.ok() })
.collect() .collect()
.await) .await;
Ok(full_pdus)
} }
pub(super) async fn state_full_ids(&self, shortstatehash: ShortStateHash) -> Result<HashMap<u64, Arc<EventId>>> { pub(super) async fn state_full_ids(&self, shortstatehash: ShortStateHash) -> Result<HashMap<u64, Arc<EventId>>> {
Ok(self let short_ids = self.state_full_shortids(shortstatehash).await?;
.state_full_shortids(shortstatehash)
.await? let event_ids = self
.iter() .services
.stream()
.filter_map(|(shortstatekey, shorteventid)| {
self.services
.short .short
.get_eventid_from_short(*shorteventid) .multi_get_eventid_from_short(short_ids.iter().map(at!(1)))
.map_ok(move |eventid| (*shortstatekey, eventid)) .await;
.ok()
}) let full_ids = short_ids
.collect() .into_iter()
.await) .map(at!(0))
.zip(event_ids.into_iter())
.filter_map(|(shortstatekey, event_id)| Some((shortstatekey, event_id.ok()?)))
.collect();
Ok(full_ids)
} }
pub(super) async fn state_full_shortids( pub(super) async fn state_full_shortids(
&self, shortstatehash: ShortStateHash, &self, shortstatehash: ShortStateHash,
) -> Result<Vec<(ShortStateKey, ShortEventId)>> { ) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
Ok(self let shortids = self
.services .services
.state_compressor .state_compressor
.load_shortstatehash_info(shortstatehash) .load_shortstatehash_info(shortstatehash)
@ -106,7 +117,9 @@ impl Data {
.iter() .iter()
.copied() .copied()
.map(parse_compressed_state_event) .map(parse_compressed_state_event)
.collect()) .collect();
Ok(shortids)
} }
/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).

View file

@ -41,7 +41,10 @@ use serde::Deserialize;
use self::data::Data; use self::data::Data;
use crate::{ use crate::{
rooms, rooms,
rooms::{short::ShortStateHash, state::RoomMutexGuard}, rooms::{
short::{ShortEventId, ShortStateHash, ShortStateKey},
state::RoomMutexGuard,
},
Dep, Dep,
}; };
@ -102,6 +105,13 @@ impl Service {
self.db.state_full_ids(shortstatehash).await self.db.state_full_ids(shortstatehash).await
} }
#[inline]
pub async fn state_full_shortids(
&self, shortstatehash: ShortStateHash,
) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
self.db.state_full_shortids(shortstatehash).await
}
pub async fn state_full( pub async fn state_full(
&self, shortstatehash: ShortStateHash, &self, shortstatehash: ShortStateHash,
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> { ) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {