From 4c0ae8c2f708cc2d950f6a8269844ae42069d55a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 20 Jan 2025 09:02:50 +0000 Subject: [PATCH] parallelize get_auth_chain outer Signed-off-by: Jason Volk --- src/service/rooms/auth_chain/mod.rs | 112 ++++++++++++++-------------- 1 file changed, 55 insertions(+), 57 deletions(-) diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 74064701..fb7b6163 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -7,11 +7,14 @@ use std::{ }; use conduwuit::{ - debug, debug_error, trace, - utils::{stream::ReadyExt, IterStream}, + at, debug, debug_error, trace, + utils::{ + stream::{ReadyExt, TryBroadbandExt}, + IterStream, + }, validated, warn, Err, Result, }; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use ruma::{EventId, OwnedEventId, RoomId}; use self::data::Data; @@ -112,66 +115,61 @@ impl Service { "start", ); - let mut hits: usize = 0; - let mut misses: usize = 0; - let mut full_auth_chain = Vec::with_capacity(buckets.len()); - for chunk in buckets { - if chunk.is_empty() { - continue; - } + let full_auth_chain: Vec<_> = buckets + .into_iter() + .try_stream() + .broad_and_then(|chunk| async move { + let chunk_key: Vec = chunk.iter().map(at!(0)).collect(); - let chunk_key: Vec = - chunk.iter().map(|(short, _)| short).copied().collect(); - if let Ok(cached) = self.get_cached_eventid_authchain(&chunk_key).await { - trace!("Found cache entry for whole chunk"); - full_auth_chain.extend(cached.iter().copied()); - hits = hits.saturating_add(1); - continue; - } + if chunk_key.is_empty() { + return Ok(Vec::new()); + } - let mut hits2: usize = 0; - let mut misses2: usize = 0; - let mut chunk_cache = Vec::with_capacity(chunk.len()); - for (sevent_id, event_id) in chunk { - if let Ok(cached) = self.get_cached_eventid_authchain(&[sevent_id]).await { - trace!(?event_id, "Found cache entry for event"); - chunk_cache.extend(cached.iter().copied()); - hits2 = hits2.saturating_add(1); - } else { - let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?; - self.cache_auth_chain(vec![sevent_id], &auth_chain); - chunk_cache.extend(auth_chain.iter()); - misses2 = misses2.saturating_add(1); - debug!( - event_id = ?event_id, - chain_length = ?auth_chain.len(), - chunk_cache_length = ?chunk_cache.len(), - elapsed = ?started.elapsed(), - "Cache missed event" - ); - }; - } + if let Ok(cached) = self.get_cached_eventid_authchain(&chunk_key).await { + return Ok(cached.to_vec()); + } - chunk_cache.sort_unstable(); - chunk_cache.dedup(); - self.cache_auth_chain_vec(chunk_key, &chunk_cache); - full_auth_chain.extend(chunk_cache.iter()); - misses = misses.saturating_add(1); - debug!( - chunk_cache_length = ?chunk_cache.len(), - hits = ?hits2, - misses = ?misses2, - elapsed = ?started.elapsed(), - "Chunk missed", - ); - } + let chunk_cache: Vec<_> = chunk + .into_iter() + .try_stream() + .broad_and_then(|(shortid, event_id)| async move { + if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await { + return Ok(cached.to_vec()); + } + let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?; + self.cache_auth_chain_vec(vec![shortid], auth_chain.as_slice()); + debug!( + ?event_id, + elapsed = ?started.elapsed(), + "Cache missed event" + ); + + Ok(auth_chain) + }) + .try_collect() + .await?; + + let mut chunk_cache: Vec<_> = chunk_cache.into_iter().flatten().collect(); + chunk_cache.sort_unstable(); + chunk_cache.dedup(); + self.cache_auth_chain_vec(chunk_key, chunk_cache.as_slice()); + debug!( + chunk_cache_length = ?chunk_cache.len(), + elapsed = ?started.elapsed(), + "Cache missed chunk", + ); + + Ok(chunk_cache) + }) + .try_collect() + .await?; + + let mut full_auth_chain: Vec<_> = full_auth_chain.into_iter().flatten().collect(); full_auth_chain.sort_unstable(); full_auth_chain.dedup(); debug!( chain_length = ?full_auth_chain.len(), - hits = ?hits, - misses = ?misses, elapsed = ?started.elapsed(), "done", ); @@ -184,7 +182,7 @@ impl Service { &self, room_id: &RoomId, event_id: &EventId, - ) -> Result> { + ) -> Result> { let mut todo: VecDeque<_> = [event_id.to_owned()].into(); let mut found = HashSet::new(); @@ -226,7 +224,7 @@ impl Service { } } - Ok(found) + Ok(found.into_iter().collect()) } #[inline]