From 8fedc358e063cfc90a4f7598107d0e04685bb10a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 15 Nov 2024 22:23:42 +0000 Subject: [PATCH] typename additional shortids cleanup/split state_compressor load Signed-off-by: Jason Volk --- src/service/rooms/state/mod.rs | 19 ++++-- src/service/rooms/state_compressor/mod.rs | 83 ++++++++++++----------- 2 files changed, 58 insertions(+), 44 deletions(-) diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 29ffedfc..622b8325 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -23,8 +23,14 @@ use ruma::{ EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId, }; -use super::state_compressor::CompressedStateEvent; -use crate::{globals, rooms, Dep}; +use crate::{ + globals, rooms, + rooms::{ + short::{ShortEventId, ShortStateHash}, + state_compressor::CompressedStateEvent, + }, + Dep, +}; pub struct Service { pub mutex: RoomMutexMap, @@ -146,8 +152,9 @@ impl Service { #[tracing::instrument(skip(self, state_ids_compressed), level = "debug")] pub async fn set_event_state( &self, event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc>, - ) -> Result { - const BUFSIZE: usize = size_of::(); + ) -> Result { + const KEY_LEN: usize = size_of::(); + const VAL_LEN: usize = size_of::(); let shorteventid = self .services @@ -202,7 +209,7 @@ impl Service { self.db .shorteventid_shortstatehash - .aput::(shorteventid, shortstatehash); + .aput::(shorteventid, shortstatehash); Ok(shortstatehash) } @@ -343,7 +350,7 @@ impl Service { .map_err(|e| err!(Request(NotFound("No create event found: {e:?}")))) } - pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result { + pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result { self.db .roomid_shortstatehash .get(room_id) diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 0466fb12..f9db6f67 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -89,9 +89,10 @@ impl crate::Service for Service { .map(at!(1)) .flat_map(|vec| vec.iter()) .fold(HashMap::new(), |mut ents, ssi| { - ents.insert(Arc::as_ptr(&ssi.added), compressed_state_size(&ssi.added)); - ents.insert(Arc::as_ptr(&ssi.removed), compressed_state_size(&ssi.removed)); - ents.insert(Arc::as_ptr(&ssi.full_state), compressed_state_size(&ssi.full_state)); + for cs in &[&ssi.added, &ssi.removed, &ssi.full_state] { + ents.insert(Arc::as_ptr(cs), compressed_state_size(cs)); + } + ents }); @@ -125,51 +126,57 @@ impl Service { return Ok(r.clone()); } - let StateDiff { - parent, - added, - removed, - } = self.get_statediff(shortstatehash).await?; - - let response = if let Some(parent) = parent { - let mut response = Box::pin(self.load_shortstatehash_info(parent)).await?; - let mut state = (*response.last().expect("at least one response").full_state).clone(); - state.extend(added.iter().copied()); - let removed = (*removed).clone(); - for r in &removed { - state.remove(r); - } - - response.push(ShortStateInfo { - shortstatehash, - full_state: Arc::new(state), - added, - removed: Arc::new(removed), - }); - - response - } else { - vec![ShortStateInfo { - shortstatehash, - full_state: added.clone(), - added, - removed, - }] - }; + let stack = self.new_shortstatehash_info(shortstatehash).await?; debug!( - ?parent, ?shortstatehash, - vec_len = %response.len(), + len = %stack.len(), "cache update" ); self.stateinfo_cache .lock() .expect("locked") - .insert(shortstatehash, response.clone()); + .insert(shortstatehash, stack.clone()); - Ok(response) + Ok(stack) + } + + async fn new_shortstatehash_info(&self, shortstatehash: ShortStateHash) -> Result { + let StateDiff { + parent, + added, + removed, + } = self.get_statediff(shortstatehash).await?; + + let Some(parent) = parent else { + return Ok(vec![ShortStateInfo { + shortstatehash, + full_state: added.clone(), + added, + removed, + }]); + }; + + let mut stack = Box::pin(self.load_shortstatehash_info(parent)).await?; + let top = stack.last().expect("at least one frame"); + + let mut full_state = (*top.full_state).clone(); + full_state.extend(added.iter().copied()); + + let removed = (*removed).clone(); + for r in &removed { + full_state.remove(r); + } + + stack.push(ShortStateInfo { + shortstatehash, + added, + removed: Arc::new(removed), + full_state: Arc::new(full_state), + }); + + Ok(stack) } pub async fn compress_state_event(&self, shortstatekey: ShortStateKey, event_id: &EventId) -> CompressedStateEvent {