optimize auth_chain short_id to event_id translation step
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
ab06701ed0
commit
36677bb982
3 changed files with 46 additions and 17 deletions
|
@ -6,7 +6,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{debug, debug_error, trace, utils::IterStream, validated, warn, Err, Result};
|
use conduit::{debug, debug_error, trace, utils::IterStream, validated, warn, Err, Result};
|
||||||
use futures::{FutureExt, Stream, StreamExt};
|
use futures::Stream;
|
||||||
use ruma::{EventId, RoomId};
|
use ruma::{EventId, RoomId};
|
||||||
|
|
||||||
use self::data::Data;
|
use self::data::Data;
|
||||||
|
@ -40,15 +40,27 @@ impl Service {
|
||||||
pub async fn event_ids_iter(
|
pub async fn event_ids_iter(
|
||||||
&self, room_id: &RoomId, starting_events: &[&EventId],
|
&self, room_id: &RoomId, starting_events: &[&EventId],
|
||||||
) -> Result<impl Stream<Item = Arc<EventId>> + Send + '_> {
|
) -> Result<impl Stream<Item = Arc<EventId>> + Send + '_> {
|
||||||
let chain = self.get_auth_chain(room_id, starting_events).await?;
|
let stream = self
|
||||||
let iter = chain.into_iter().stream().filter_map(|sid| {
|
.get_event_ids(room_id, starting_events)
|
||||||
self.services
|
.await?
|
||||||
.short
|
.into_iter()
|
||||||
.get_eventid_from_short(sid)
|
.stream();
|
||||||
.map(Result::ok)
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(iter)
|
Ok(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_event_ids(&self, room_id: &RoomId, starting_events: &[&EventId]) -> Result<Vec<Arc<EventId>>> {
|
||||||
|
let chain = self.get_auth_chain(room_id, starting_events).await?;
|
||||||
|
let event_ids = self
|
||||||
|
.services
|
||||||
|
.short
|
||||||
|
.multi_get_eventid_from_short(&chain)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(event_ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, name = "auth_chain")]
|
#[tracing::instrument(skip_all, name = "auth_chain")]
|
||||||
|
|
|
@ -797,13 +797,13 @@ impl Service {
|
||||||
for state in &fork_states {
|
for state in &fork_states {
|
||||||
let starting_events: Vec<&EventId> = state.values().map(Borrow::borrow).collect();
|
let starting_events: Vec<&EventId> = state.values().map(Borrow::borrow).collect();
|
||||||
|
|
||||||
let auth_chain = self
|
let auth_chain: HashSet<Arc<EventId>> = self
|
||||||
.services
|
.services
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.event_ids_iter(room_id, &starting_events)
|
.get_event_ids(room_id, &starting_events)
|
||||||
.await?
|
.await?
|
||||||
.collect::<HashSet<Arc<EventId>>>()
|
.into_iter()
|
||||||
.await;
|
.collect();
|
||||||
|
|
||||||
auth_chain_sets.push(auth_chain);
|
auth_chain_sets.push(auth_chain);
|
||||||
}
|
}
|
||||||
|
@ -983,13 +983,13 @@ impl Service {
|
||||||
starting_events.push(id.borrow());
|
starting_events.push(id.borrow());
|
||||||
}
|
}
|
||||||
|
|
||||||
let auth_chain = self
|
let auth_chain: HashSet<Arc<EventId>> = self
|
||||||
.services
|
.services
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.event_ids_iter(room_id, &starting_events)
|
.get_event_ids(room_id, &starting_events)
|
||||||
.await?
|
.await?
|
||||||
.collect()
|
.into_iter()
|
||||||
.await;
|
.collect();
|
||||||
|
|
||||||
auth_chain_sets.push(auth_chain);
|
auth_chain_sets.push(auth_chain);
|
||||||
fork_states.push(state);
|
fork_states.push(state);
|
||||||
|
|
|
@ -141,6 +141,23 @@ pub async fn get_eventid_from_short(&self, shorteventid: u64) -> Result<Arc<Even
|
||||||
.map_err(|e| err!(Database("Failed to find EventId from short {shorteventid:?}: {e:?}")))
|
.map_err(|e| err!(Database("Failed to find EventId from short {shorteventid:?}: {e:?}")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
pub async fn multi_get_eventid_from_short(&self, shorteventid: &[u64]) -> Vec<Result<Arc<EventId>>> {
|
||||||
|
const BUFSIZE: usize = size_of::<u64>();
|
||||||
|
|
||||||
|
let keys: Vec<[u8; BUFSIZE]> = shorteventid
|
||||||
|
.iter()
|
||||||
|
.map(|short| short.to_be_bytes())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.shorteventid_eventid
|
||||||
|
.get_batch_blocking(keys.iter())
|
||||||
|
.into_iter()
|
||||||
|
.map(Deserialized::deserialized)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub async fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(StateEventType, String)> {
|
pub async fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(StateEventType, String)> {
|
||||||
const BUFSIZE: usize = size_of::<u64>();
|
const BUFSIZE: usize = size_of::<u64>();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue