From dd8c646b633ff05e9f8e49a274712d6bf435e83a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 25 Nov 2024 23:27:16 +0000 Subject: [PATCH] optimize state compressor I/O w/ batch operation Signed-off-by: Jason Volk --- .../rooms/event_handler/resolve_state.rs | 39 +++++++++------- .../event_handler/upgrade_outlier_pdu.rs | 20 +++++---- src/service/rooms/state_compressor/mod.rs | 45 ++++++++++++++----- 3 files changed, 69 insertions(+), 35 deletions(-) diff --git a/src/service/rooms/event_handler/resolve_state.rs b/src/service/rooms/event_handler/resolve_state.rs index 4863e340..dc0edd13 100644 --- a/src/service/rooms/event_handler/resolve_state.rs +++ b/src/service/rooms/event_handler/resolve_state.rs @@ -79,23 +79,30 @@ pub async fn resolve_state( drop(lock); - debug!("State resolution done. Compressing state"); - let mut new_room_state = HashSet::new(); - for ((event_type, state_key), event_id) in state { - let shortstatekey = self - .services - .short - .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key) - .await; + debug!("State resolution done."); + let state_events: Vec<_> = state + .iter() + .stream() + .then(|((event_type, state_key), event_id)| { + self.services + .short + .get_or_create_shortstatekey(event_type, state_key) + .map(move |shortstatekey| (shortstatekey, event_id)) + }) + .collect() + .await; - let compressed = self - .services - .state_compressor - .compress_state_event(shortstatekey, &event_id) - .await; - - new_room_state.insert(compressed); - } + debug!("Compressing state..."); + let new_room_state: HashSet<_> = self + .services + .state_compressor + .compress_state_events( + state_events + .iter() + .map(|(ref ssk, eid)| (ssk, (*eid).borrow())), + ) + .collect() + .await; Ok(Arc::new(new_room_state)) } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 2a1e4662..13e2b281 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Borrow, collections::{BTreeMap, HashSet}, sync::Arc, time::Instant, @@ -193,15 +194,16 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( extremities.retain(|id| retained.contains(id)); debug!("Retained {} extremities. Compressing state", extremities.len()); - let mut state_ids_compressed = HashSet::new(); - for (shortstatekey, id) in &state_at_incoming_event { - state_ids_compressed.insert( - self.services - .state_compressor - .compress_state_event(*shortstatekey, id) - .await, - ); - } + let state_ids_compressed: HashSet<_> = self + .services + .state_compressor + .compress_state_events( + state_at_incoming_event + .iter() + .map(|(ssk, eid)| (ssk, eid.borrow())), + ) + .collect() + .await; let state_ids_compressed = Arc::new(state_ids_compressed); diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 52ad5437..8c6eccbe 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - fmt::Write, + fmt::{Debug, Write}, mem::size_of, sync::{Arc, Mutex}, }; @@ -8,10 +8,11 @@ use std::{ use arrayvec::ArrayVec; use conduit::{ at, checked, debug, err, expected, utils, - utils::{bytes, math::usize_from_f64}, + utils::{bytes, math::usize_from_f64, stream::IterStream}, Result, }; use database::Map; +use futures::{Stream, StreamExt}; use lru_cache::LruCache; use ruma::{EventId, RoomId}; @@ -179,21 +180,32 @@ impl Service { Ok(stack) } - pub async fn compress_state_event(&self, shortstatekey: ShortStateKey, event_id: &EventId) -> CompressedStateEvent { - const SIZE: usize = size_of::(); + pub fn compress_state_events<'a, I>(&'a self, state: I) -> impl Stream + Send + 'a + where + I: Iterator + Clone + Debug + ExactSizeIterator + Send + 'a, + { + let event_ids = state.clone().map(at!(1)); + let short_event_ids = self + .services + .short + .multi_get_or_create_shorteventid(event_ids); + + state + .stream() + .map(at!(0)) + .zip(short_event_ids) + .map(|(shortstatekey, shorteventid)| compress_state_event(*shortstatekey, shorteventid)) + } + + pub async fn compress_state_event(&self, shortstatekey: ShortStateKey, event_id: &EventId) -> CompressedStateEvent { let shorteventid = self .services .short .get_or_create_shorteventid(event_id) .await; - let mut v = ArrayVec::::new(); - v.extend(shortstatekey.to_be_bytes()); - v.extend(shorteventid.to_be_bytes()); - v.as_ref() - .try_into() - .expect("failed to create CompressedStateEvent") + compress_state_event(shortstatekey, shorteventid) } /// Creates a new shortstatehash that often is just a diff to an already @@ -470,6 +482,19 @@ impl Service { } } +#[inline] +#[must_use] +fn compress_state_event(shortstatekey: ShortStateKey, shorteventid: ShortEventId) -> CompressedStateEvent { + const SIZE: usize = size_of::(); + + let mut v = ArrayVec::::new(); + v.extend(shortstatekey.to_be_bytes()); + v.extend(shorteventid.to_be_bytes()); + v.as_ref() + .try_into() + .expect("failed to create CompressedStateEvent") +} + #[inline] #[must_use] pub fn parse_compressed_state_event(compressed_event: CompressedStateEvent) -> (ShortStateKey, ShortEventId) {