optimize state compressor I/O w/ batch operation

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-25 23:27:16 +00:00
parent 527494a34b
commit dd8c646b63
3 changed files with 69 additions and 35 deletions

View file

@ -79,23 +79,30 @@ pub async fn resolve_state(
drop(lock); drop(lock);
debug!("State resolution done. Compressing state"); debug!("State resolution done.");
let mut new_room_state = HashSet::new(); let state_events: Vec<_> = state
for ((event_type, state_key), event_id) in state { .iter()
let shortstatekey = self .stream()
.services .then(|((event_type, state_key), event_id)| {
self.services
.short .short
.get_or_create_shortstatekey(&event_type.to_string().into(), &state_key) .get_or_create_shortstatekey(event_type, state_key)
.map(move |shortstatekey| (shortstatekey, event_id))
})
.collect()
.await; .await;
let compressed = self debug!("Compressing state...");
let new_room_state: HashSet<_> = self
.services .services
.state_compressor .state_compressor
.compress_state_event(shortstatekey, &event_id) .compress_state_events(
state_events
.iter()
.map(|(ref ssk, eid)| (ssk, (*eid).borrow())),
)
.collect()
.await; .await;
new_room_state.insert(compressed);
}
Ok(Arc::new(new_room_state)) Ok(Arc::new(new_room_state))
} }

View file

@ -1,4 +1,5 @@
use std::{ use std::{
borrow::Borrow,
collections::{BTreeMap, HashSet}, collections::{BTreeMap, HashSet},
sync::Arc, sync::Arc,
time::Instant, time::Instant,
@ -193,15 +194,16 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
extremities.retain(|id| retained.contains(id)); extremities.retain(|id| retained.contains(id));
debug!("Retained {} extremities. Compressing state", extremities.len()); debug!("Retained {} extremities. Compressing state", extremities.len());
let mut state_ids_compressed = HashSet::new(); let state_ids_compressed: HashSet<_> = self
for (shortstatekey, id) in &state_at_incoming_event { .services
state_ids_compressed.insert(
self.services
.state_compressor .state_compressor
.compress_state_event(*shortstatekey, id) .compress_state_events(
.await, state_at_incoming_event
); .iter()
} .map(|(ssk, eid)| (ssk, eid.borrow())),
)
.collect()
.await;
let state_ids_compressed = Arc::new(state_ids_compressed); let state_ids_compressed = Arc::new(state_ids_compressed);

View file

@ -1,6 +1,6 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fmt::Write, fmt::{Debug, Write},
mem::size_of, mem::size_of,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
@ -8,10 +8,11 @@ use std::{
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use conduit::{ use conduit::{
at, checked, debug, err, expected, utils, at, checked, debug, err, expected, utils,
utils::{bytes, math::usize_from_f64}, utils::{bytes, math::usize_from_f64, stream::IterStream},
Result, Result,
}; };
use database::Map; use database::Map;
use futures::{Stream, StreamExt};
use lru_cache::LruCache; use lru_cache::LruCache;
use ruma::{EventId, RoomId}; use ruma::{EventId, RoomId};
@ -179,21 +180,32 @@ impl Service {
Ok(stack) Ok(stack)
} }
pub async fn compress_state_event(&self, shortstatekey: ShortStateKey, event_id: &EventId) -> CompressedStateEvent { pub fn compress_state_events<'a, I>(&'a self, state: I) -> impl Stream<Item = CompressedStateEvent> + Send + 'a
const SIZE: usize = size_of::<CompressedStateEvent>(); where
I: Iterator<Item = (&'a ShortStateKey, &'a EventId)> + Clone + Debug + ExactSizeIterator + Send + 'a,
{
let event_ids = state.clone().map(at!(1));
let short_event_ids = self
.services
.short
.multi_get_or_create_shorteventid(event_ids);
state
.stream()
.map(at!(0))
.zip(short_event_ids)
.map(|(shortstatekey, shorteventid)| compress_state_event(*shortstatekey, shorteventid))
}
pub async fn compress_state_event(&self, shortstatekey: ShortStateKey, event_id: &EventId) -> CompressedStateEvent {
let shorteventid = self let shorteventid = self
.services .services
.short .short
.get_or_create_shorteventid(event_id) .get_or_create_shorteventid(event_id)
.await; .await;
let mut v = ArrayVec::<u8, SIZE>::new(); compress_state_event(shortstatekey, shorteventid)
v.extend(shortstatekey.to_be_bytes());
v.extend(shorteventid.to_be_bytes());
v.as_ref()
.try_into()
.expect("failed to create CompressedStateEvent")
} }
/// Creates a new shortstatehash that often is just a diff to an already /// Creates a new shortstatehash that often is just a diff to an already
@ -470,6 +482,19 @@ impl Service {
} }
} }
#[inline]
#[must_use]
fn compress_state_event(shortstatekey: ShortStateKey, shorteventid: ShortEventId) -> CompressedStateEvent {
const SIZE: usize = size_of::<CompressedStateEvent>();
let mut v = ArrayVec::<u8, SIZE>::new();
v.extend(shortstatekey.to_be_bytes());
v.extend(shorteventid.to_be_bytes());
v.as_ref()
.try_into()
.expect("failed to create CompressedStateEvent")
}
#[inline] #[inline]
#[must_use] #[must_use]
pub fn parse_compressed_state_event(compressed_event: CompressedStateEvent) -> (ShortStateKey, ShortEventId) { pub fn parse_compressed_state_event(compressed_event: CompressedStateEvent) -> (ShortStateKey, ShortEventId) {