From eb17ccbd2772b5d79bbd235cd40cf35747ae6991 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 22 Apr 2025 11:00:07 +0000 Subject: [PATCH] Eliminate explicit parallel_fetches argument. Signed-off-by: Jason Volk --- src/core/matrix/state_res/benches.rs | 4 - src/core/matrix/state_res/mod.rs | 83 +++++++------------ src/core/matrix/state_res/test_utils.rs | 16 ++-- .../rooms/event_handler/resolve_state.rs | 13 +-- 4 files changed, 38 insertions(+), 78 deletions(-) diff --git a/src/core/matrix/state_res/benches.rs b/src/core/matrix/state_res/benches.rs index 01218b01..1aa8552b 100644 --- a/src/core/matrix/state_res/benches.rs +++ b/src/core/matrix/state_res/benches.rs @@ -52,7 +52,6 @@ fn lexico_topo_sort(c: &mut test::Bencher) { #[cfg(conduwuit_bench)] #[cfg_attr(conduwuit_bench, bench)] fn resolution_shallow_auth_chain(c: &mut test::Bencher) { - let parallel_fetches = 32; let mut store = TestStore(hashmap! {}); // build up the DAG @@ -78,7 +77,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) { &auth_chain_sets, &fetch, &exists, - parallel_fetches, ) .await { @@ -91,7 +89,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) { #[cfg(conduwuit_bench)] #[cfg_attr(conduwuit_bench, bench)] fn resolve_deeper_event_set(c: &mut test::Bencher) { - let parallel_fetches = 32; let mut inner = INITIAL_EVENTS(); let ban = BAN_STATE_SET(); @@ -153,7 +150,6 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) { &auth_chain_sets, &fetch, &exists, - parallel_fetches, ) .await { diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index 2ab7cb64..d37368c9 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -69,9 +69,6 @@ type Result = crate::Result; /// * `event_fetch` - Any event not found in the `event_map` will defer to this /// closure to find the event. /// -/// * `parallel_fetches` - The number of asynchronous fetch requests in-flight -/// for any given operation. -/// /// ## Invariants /// /// The caller of `resolve` must ensure that all the events are from the same @@ -85,7 +82,6 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis auth_chain_sets: &'a [HashSet], event_fetch: &Fetch, event_exists: &Exists, - parallel_fetches: usize, ) -> Result> where Fetch: Fn(E::Id) -> FetchFut + Sync, @@ -147,13 +143,8 @@ where // Sort the control events based on power_level/clock/event_id and // outgoing/incoming edges - let sorted_control_levels = reverse_topological_power_sort( - control_events, - &all_conflicted, - &event_fetch, - parallel_fetches, - ) - .await?; + let sorted_control_levels = + reverse_topological_power_sort(control_events, &all_conflicted, &event_fetch).await?; debug!(count = sorted_control_levels.len(), "power events"); trace!(list = ?sorted_control_levels, "sorted power events"); @@ -295,7 +286,6 @@ async fn reverse_topological_power_sort( events_to_sort: Vec, auth_diff: &HashSet, fetch_event: &F, - parallel_fetches: usize, ) -> Result> where F: Fn(E::Id) -> Fut + Sync, @@ -311,26 +301,25 @@ where } // This is used in the `key_fn` passed to the lexico_topo_sort fn - let event_to_pl = graph + let event_to_pl: HashMap<_, _> = graph .keys() .stream() - .map(|event_id| { - get_power_level_for_sender(event_id.clone(), fetch_event) - .map(move |res| res.map(|pl| (event_id, pl))) + .broad_filter_map(async |event_id| { + let pl = get_power_level_for_sender(&event_id, fetch_event) + .await + .ok()?; + Some((event_id, pl)) }) - .buffer_unordered(parallel_fetches) - .ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| { + .inspect(|(event_id, pl)| { debug!( - event_id = event_id.borrow().as_str(), - power_level = i64::from(pl), + event_id = event_id.as_str(), + power_level = i64::from(*pl), "found the power level of an event's sender", ); - - event_to_pl.insert(event_id.clone(), pl); - Ok(event_to_pl) }) + .collect() .boxed() - .await?; + .await; let event_to_pl = &event_to_pl; let fetcher = |event_id: E::Id| async move { @@ -909,7 +898,7 @@ mod tests { let fetcher = |id| ready(events.get(&id).cloned()); let sorted_power_events = - super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher, 1) + super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher) .await .unwrap(); @@ -1312,19 +1301,13 @@ mod tests { }) .collect(); - let resolved = match super::resolve( - &RoomVersionId::V2, - &state_sets, - &auth_chain, - &fetcher, - &exists, - 1, - ) - .await - { - | Ok(state) => state, - | Err(e) => panic!("{e}"), - }; + let resolved = + match super::resolve(&RoomVersionId::V2, &state_sets, &auth_chain, &fetcher, &exists) + .await + { + | Ok(state) => state, + | Err(e) => panic!("{e}"), + }; assert_eq!(expected, resolved); } @@ -1429,21 +1412,15 @@ mod tests { }) .collect(); - let fetcher = |id: ::Id| ready(ev_map.get(&id).cloned()); - let exists = |id: ::Id| ready(ev_map.get(&id).is_some()); - let resolved = match super::resolve( - &RoomVersionId::V6, - &state_sets, - &auth_chain, - &fetcher, - &exists, - 1, - ) - .await - { - | Ok(state) => state, - | Err(e) => panic!("{e}"), - }; + let fetcher = |id: OwnedEventId| ready(ev_map.get(&id).cloned()); + let exists = |id: OwnedEventId| ready(ev_map.get(&id).is_some()); + let resolved = + match super::resolve(&RoomVersionId::V6, &state_sets, &auth_chain, &fetcher, &exists) + .await + { + | Ok(state) => state, + | Err(e) => panic!("{e}"), + }; debug!( resolved = ?resolved diff --git a/src/core/matrix/state_res/test_utils.rs b/src/core/matrix/state_res/test_utils.rs index a666748a..ff7b30d0 100644 --- a/src/core/matrix/state_res/test_utils.rs +++ b/src/core/matrix/state_res/test_utils.rs @@ -133,17 +133,11 @@ pub(crate) async fn do_check( .collect(); let event_map = &event_map; - let fetch = |id: ::Id| ready(event_map.get(&id).cloned()); - let exists = |id: ::Id| ready(event_map.get(&id).is_some()); - let resolved = super::resolve( - &RoomVersionId::V6, - state_sets, - &auth_chain_sets, - &fetch, - &exists, - 1, - ) - .await; + let fetch = |id: OwnedEventId| ready(event_map.get(&id).cloned()); + let exists = |id: OwnedEventId| ready(event_map.get(&id).is_some()); + let resolved = + super::resolve(&RoomVersionId::V6, state_sets, &auth_chain_sets, &fetch, &exists) + .await; match resolved { | Ok(state) => state, diff --git a/src/service/rooms/event_handler/resolve_state.rs b/src/service/rooms/event_handler/resolve_state.rs index b3a7a71b..a67ac3b7 100644 --- a/src/service/rooms/event_handler/resolve_state.rs +++ b/src/service/rooms/event_handler/resolve_state.rs @@ -112,14 +112,7 @@ where { let event_fetch = |event_id| self.event_fetch(event_id); let event_exists = |event_id| self.event_exists(event_id); - state_res::resolve( - room_version, - state_sets, - auth_chain_sets, - &event_fetch, - &event_exists, - automatic_width(), - ) - .map_err(|e| err!(error!("State resolution failed: {e:?}"))) - .await + state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists) + .map_err(|e| err!(error!("State resolution failed: {e:?}"))) + .await }