diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs deleted file mode 100644 index 813f48ae..00000000 --- a/src/service/rooms/state/data.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::sync::Arc; - -use conduit::{ - utils::{stream::TryIgnore, ReadyExt}, - Result, -}; -use database::{Database, Deserialized, Interfix, Map}; -use ruma::{OwnedEventId, RoomId}; - -use super::RoomMutexGuard; - -pub(super) struct Data { - shorteventid_shortstatehash: Arc, - roomid_shortstatehash: Arc, - pub(super) roomid_pduleaves: Arc, -} - -impl Data { - pub(super) fn new(db: &Arc) -> Self { - Self { - shorteventid_shortstatehash: db["shorteventid_shortstatehash"].clone(), - roomid_shortstatehash: db["roomid_shortstatehash"].clone(), - roomid_pduleaves: db["roomid_pduleaves"].clone(), - } - } - - pub(super) async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result { - self.roomid_shortstatehash.get(room_id).await.deserialized() - } - - #[inline] - pub(super) fn set_room_state( - &self, - room_id: &RoomId, - new_shortstatehash: u64, - _mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex - ) { - self.roomid_shortstatehash - .raw_put(room_id, new_shortstatehash); - } - - pub(super) fn set_event_state(&self, shorteventid: u64, shortstatehash: u64) { - self.shorteventid_shortstatehash - .put(shorteventid, shortstatehash); - } - - pub(super) async fn set_forward_extremities( - &self, - room_id: &RoomId, - event_ids: Vec, - _mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex - ) { - let prefix = (room_id, Interfix); - self.roomid_pduleaves - .keys_prefix_raw(&prefix) - .ignore_err() - .ready_for_each(|key| self.roomid_pduleaves.remove(key)) - .await; - - for event_id in &event_ids { - let key = (room_id, event_id); - self.roomid_pduleaves.put_raw(key, event_id); - } - } -} diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index cfcb2da6..6abaa198 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -1,5 +1,3 @@ -mod data; - use std::{ collections::{HashMap, HashSet}, fmt::Write, @@ -10,11 +8,10 @@ use std::{ use conduit::{ err, result::FlatOk, - utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard}, + utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt}, warn, PduEvent, Result, }; -use data::Data; -use database::{Ignore, Interfix}; +use database::{Deserialized, Ignore, Interfix, Map}; use futures::{future::join_all, pin_mut, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use ruma::{ events::{ @@ -30,9 +27,9 @@ use super::state_compressor::CompressedStateEvent; use crate::{globals, rooms, Dep}; pub struct Service { + pub mutex: RoomMutexMap, services: Services, db: Data, - pub mutex: RoomMutexMap, } struct Services { @@ -45,12 +42,19 @@ struct Services { timeline: Dep, } +struct Data { + shorteventid_shortstatehash: Arc, + roomid_shortstatehash: Arc, + roomid_pduleaves: Arc, +} + type RoomMutexMap = MutexMap; pub type RoomMutexGuard = MutexMapGuard; impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { + mutex: RoomMutexMap::new(), services: Services { globals: args.depend::("globals"), short: args.depend::("rooms::short"), @@ -60,12 +64,15 @@ impl crate::Service for Service { state_compressor: args.depend::("rooms::state_compressor"), timeline: args.depend::("rooms::timeline"), }, - db: Data::new(args.db), - mutex: RoomMutexMap::new(), + db: Data { + shorteventid_shortstatehash: args.db["shorteventid_shortstatehash"].clone(), + roomid_shortstatehash: args.db["roomid_shortstatehash"].clone(), + roomid_pduleaves: args.db["roomid_pduleaves"].clone(), + }, })) } - fn memory_usage(&self, out: &mut dyn Write) -> Result<()> { + fn memory_usage(&self, out: &mut dyn Write) -> Result { let mutex = self.mutex.len(); writeln!(out, "state_mutex: {mutex}")?; @@ -84,7 +91,7 @@ impl Service { statediffnew: Arc>, _statediffremoved: Arc>, state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex - ) -> Result<()> { + ) -> Result { let event_ids = statediffnew.iter().stream().filter_map(|new| { self.services .state_compressor @@ -127,7 +134,7 @@ impl Service { self.services.state_cache.update_joined_count(room_id).await; - self.db.set_room_state(room_id, shortstatehash, state_lock); + self.set_room_state(room_id, shortstatehash, state_lock); Ok(()) } @@ -140,13 +147,15 @@ impl Service { pub async fn set_event_state( &self, event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc>, ) -> Result { + const BUFSIZE: usize = size_of::(); + let shorteventid = self .services .short .get_or_create_shorteventid(event_id) .await; - let previous_shortstatehash = self.db.get_room_shortstatehash(room_id).await; + let previous_shortstatehash = self.get_room_shortstatehash(room_id).await; let state_hash = calculate_hash( &state_ids_compressed @@ -196,7 +205,9 @@ impl Service { )?; } - self.db.set_event_state(shorteventid, shortstatehash); + self.db + .shorteventid_shortstatehash + .aput::(shorteventid, shortstatehash); Ok(shortstatehash) } @@ -207,6 +218,8 @@ impl Service { /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. #[tracing::instrument(skip(self, new_pdu), level = "debug")] pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result { + const BUFSIZE: usize = size_of::(); + let shorteventid = self .services .short @@ -216,7 +229,9 @@ impl Service { let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id).await; if let Ok(p) = previous_shortstatehash { - self.db.set_event_state(shorteventid, p); + self.db + .shorteventid_shortstatehash + .aput::(shorteventid, p); } if let Some(state_key) = &new_pdu.state_key { @@ -306,14 +321,18 @@ impl Service { } /// Set the state hash to a new version, but does not update state_cache. - #[tracing::instrument(skip(self, mutex_lock), level = "debug")] + #[tracing::instrument(skip(self, _mutex_lock), level = "debug")] pub fn set_room_state( &self, room_id: &RoomId, shortstatehash: u64, - mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex ) { - self.db.set_room_state(room_id, shortstatehash, mutex_lock); + const BUFSIZE: usize = size_of::(); + + self.db + .roomid_shortstatehash + .raw_aput::(room_id, shortstatehash); } /// Returns the room's version. @@ -327,9 +346,12 @@ impl Service { .map_err(|e| err!(Request(NotFound("No create event found: {e:?}")))) } - #[inline] pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result { - self.db.get_room_shortstatehash(room_id).await + self.db + .roomid_shortstatehash + .get(room_id) + .await + .deserialized() } pub fn get_forward_extremities<'a>(&'a self, room_id: &'a RoomId) -> impl Stream + Send + '_ { @@ -346,11 +368,20 @@ impl Service { &self, room_id: &RoomId, event_ids: Vec, - state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex + _state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex ) { + let prefix = (room_id, Interfix); self.db - .set_forward_extremities(room_id, event_ids, state_lock) + .roomid_pduleaves + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| self.db.roomid_pduleaves.remove(key)) .await; + + for event_id in &event_ids { + let key = (room_id, event_id); + self.db.roomid_pduleaves.put_raw(key, event_id); + } } /// This fetches auth events from the current state.