Eliminate explicit parallel_fetches argument.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
bfb0a2b76a
commit
44302ce732
4 changed files with 38 additions and 78 deletions
|
@ -52,7 +52,6 @@ fn lexico_topo_sort(c: &mut test::Bencher) {
|
|||
#[cfg(conduwuit_bench)]
|
||||
#[cfg_attr(conduwuit_bench, bench)]
|
||||
fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
|
||||
let parallel_fetches = 32;
|
||||
let mut store = TestStore(hashmap! {});
|
||||
|
||||
// build up the DAG
|
||||
|
@ -78,7 +77,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
|
|||
&auth_chain_sets,
|
||||
&fetch,
|
||||
&exists,
|
||||
parallel_fetches,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
@ -91,7 +89,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
|
|||
#[cfg(conduwuit_bench)]
|
||||
#[cfg_attr(conduwuit_bench, bench)]
|
||||
fn resolve_deeper_event_set(c: &mut test::Bencher) {
|
||||
let parallel_fetches = 32;
|
||||
let mut inner = INITIAL_EVENTS();
|
||||
let ban = BAN_STATE_SET();
|
||||
|
||||
|
@ -153,7 +150,6 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
|
|||
&auth_chain_sets,
|
||||
&fetch,
|
||||
&exists,
|
||||
parallel_fetches,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
|
@ -69,9 +69,6 @@ type Result<T, E = Error> = crate::Result<T, E>;
|
|||
/// * `event_fetch` - Any event not found in the `event_map` will defer to this
|
||||
/// closure to find the event.
|
||||
///
|
||||
/// * `parallel_fetches` - The number of asynchronous fetch requests in-flight
|
||||
/// for any given operation.
|
||||
///
|
||||
/// ## Invariants
|
||||
///
|
||||
/// The caller of `resolve` must ensure that all the events are from the same
|
||||
|
@ -85,7 +82,6 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
|
|||
auth_chain_sets: &'a [HashSet<E::Id, Hasher>],
|
||||
event_fetch: &Fetch,
|
||||
event_exists: &Exists,
|
||||
parallel_fetches: usize,
|
||||
) -> Result<StateMap<E::Id>>
|
||||
where
|
||||
Fetch: Fn(E::Id) -> FetchFut + Sync,
|
||||
|
@ -147,13 +143,8 @@ where
|
|||
|
||||
// Sort the control events based on power_level/clock/event_id and
|
||||
// outgoing/incoming edges
|
||||
let sorted_control_levels = reverse_topological_power_sort(
|
||||
control_events,
|
||||
&all_conflicted,
|
||||
&event_fetch,
|
||||
parallel_fetches,
|
||||
)
|
||||
.await?;
|
||||
let sorted_control_levels =
|
||||
reverse_topological_power_sort(control_events, &all_conflicted, &event_fetch).await?;
|
||||
|
||||
debug!(count = sorted_control_levels.len(), "power events");
|
||||
trace!(list = ?sorted_control_levels, "sorted power events");
|
||||
|
@ -295,7 +286,6 @@ async fn reverse_topological_power_sort<E, F, Fut>(
|
|||
events_to_sort: Vec<E::Id>,
|
||||
auth_diff: &HashSet<E::Id>,
|
||||
fetch_event: &F,
|
||||
parallel_fetches: usize,
|
||||
) -> Result<Vec<E::Id>>
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
|
@ -311,26 +301,25 @@ where
|
|||
}
|
||||
|
||||
// This is used in the `key_fn` passed to the lexico_topo_sort fn
|
||||
let event_to_pl = graph
|
||||
let event_to_pl: HashMap<_, _> = graph
|
||||
.keys()
|
||||
.stream()
|
||||
.map(|event_id| {
|
||||
get_power_level_for_sender(event_id.clone(), fetch_event)
|
||||
.map(move |res| res.map(|pl| (event_id, pl)))
|
||||
.broad_filter_map(async |event_id| {
|
||||
let pl = get_power_level_for_sender(&event_id, fetch_event)
|
||||
.await
|
||||
.ok()?;
|
||||
Some((event_id, pl))
|
||||
})
|
||||
.buffer_unordered(parallel_fetches)
|
||||
.ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
|
||||
.inspect(|(event_id, pl)| {
|
||||
debug!(
|
||||
event_id = event_id.borrow().as_str(),
|
||||
power_level = i64::from(pl),
|
||||
event_id = event_id.as_str(),
|
||||
power_level = i64::from(*pl),
|
||||
"found the power level of an event's sender",
|
||||
);
|
||||
|
||||
event_to_pl.insert(event_id.clone(), pl);
|
||||
Ok(event_to_pl)
|
||||
})
|
||||
.collect()
|
||||
.boxed()
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
let event_to_pl = &event_to_pl;
|
||||
let fetcher = |event_id: E::Id| async move {
|
||||
|
@ -909,7 +898,7 @@ mod tests {
|
|||
|
||||
let fetcher = |id| ready(events.get(&id).cloned());
|
||||
let sorted_power_events =
|
||||
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher, 1)
|
||||
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1312,19 +1301,13 @@ mod tests {
|
|||
})
|
||||
.collect();
|
||||
|
||||
let resolved = match super::resolve(
|
||||
&RoomVersionId::V2,
|
||||
&state_sets,
|
||||
&auth_chain,
|
||||
&fetcher,
|
||||
&exists,
|
||||
1,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
let resolved =
|
||||
match super::resolve(&RoomVersionId::V2, &state_sets, &auth_chain, &fetcher, &exists)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
|
||||
assert_eq!(expected, resolved);
|
||||
}
|
||||
|
@ -1429,21 +1412,15 @@ mod tests {
|
|||
})
|
||||
.collect();
|
||||
|
||||
let fetcher = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).cloned());
|
||||
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).is_some());
|
||||
let resolved = match super::resolve(
|
||||
&RoomVersionId::V6,
|
||||
&state_sets,
|
||||
&auth_chain,
|
||||
&fetcher,
|
||||
&exists,
|
||||
1,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
let fetcher = |id: OwnedEventId| ready(ev_map.get(&id).cloned());
|
||||
let exists = |id: OwnedEventId| ready(ev_map.get(&id).is_some());
|
||||
let resolved =
|
||||
match super::resolve(&RoomVersionId::V6, &state_sets, &auth_chain, &fetcher, &exists)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
|
||||
debug!(
|
||||
resolved = ?resolved
|
||||
|
|
|
@ -133,17 +133,11 @@ pub(crate) async fn do_check(
|
|||
.collect();
|
||||
|
||||
let event_map = &event_map;
|
||||
let fetch = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).cloned());
|
||||
let exists = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).is_some());
|
||||
let resolved = super::resolve(
|
||||
&RoomVersionId::V6,
|
||||
state_sets,
|
||||
&auth_chain_sets,
|
||||
&fetch,
|
||||
&exists,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let fetch = |id: OwnedEventId| ready(event_map.get(&id).cloned());
|
||||
let exists = |id: OwnedEventId| ready(event_map.get(&id).is_some());
|
||||
let resolved =
|
||||
super::resolve(&RoomVersionId::V6, state_sets, &auth_chain_sets, &fetch, &exists)
|
||||
.await;
|
||||
|
||||
match resolved {
|
||||
| Ok(state) => state,
|
||||
|
|
|
@ -112,14 +112,7 @@ where
|
|||
{
|
||||
let event_fetch = |event_id| self.event_fetch(event_id);
|
||||
let event_exists = |event_id| self.event_exists(event_id);
|
||||
state_res::resolve(
|
||||
room_version,
|
||||
state_sets,
|
||||
auth_chain_sets,
|
||||
&event_fetch,
|
||||
&event_exists,
|
||||
automatic_width(),
|
||||
)
|
||||
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
|
||||
.await
|
||||
state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists)
|
||||
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
|
||||
.await
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue