merge rooms state service and data

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-10-15 09:34:43 +00:00 committed by strawberry
parent d0ee4b6d25
commit ed5b5d7877
2 changed files with 52 additions and 86 deletions

View file

@ -1,65 +0,0 @@
use std::sync::Arc;
use conduit::{
utils::{stream::TryIgnore, ReadyExt},
Result,
};
use database::{Database, Deserialized, Interfix, Map};
use ruma::{OwnedEventId, RoomId};
use super::RoomMutexGuard;
pub(super) struct Data {
shorteventid_shortstatehash: Arc<Map>,
roomid_shortstatehash: Arc<Map>,
pub(super) roomid_pduleaves: Arc<Map>,
}
impl Data {
pub(super) fn new(db: &Arc<Database>) -> Self {
Self {
shorteventid_shortstatehash: db["shorteventid_shortstatehash"].clone(),
roomid_shortstatehash: db["roomid_shortstatehash"].clone(),
roomid_pduleaves: db["roomid_pduleaves"].clone(),
}
}
pub(super) async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<u64> {
self.roomid_shortstatehash.get(room_id).await.deserialized()
}
#[inline]
pub(super) fn set_room_state(
&self,
room_id: &RoomId,
new_shortstatehash: u64,
_mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
) {
self.roomid_shortstatehash
.raw_put(room_id, new_shortstatehash);
}
pub(super) fn set_event_state(&self, shorteventid: u64, shortstatehash: u64) {
self.shorteventid_shortstatehash
.put(shorteventid, shortstatehash);
}
pub(super) async fn set_forward_extremities(
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
_mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
) {
let prefix = (room_id, Interfix);
self.roomid_pduleaves
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| self.roomid_pduleaves.remove(key))
.await;
for event_id in &event_ids {
let key = (room_id, event_id);
self.roomid_pduleaves.put_raw(key, event_id);
}
}
}

View file

@ -1,5 +1,3 @@
mod data;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fmt::Write, fmt::Write,
@ -10,11 +8,10 @@ use std::{
use conduit::{ use conduit::{
err, err,
result::FlatOk, result::FlatOk,
utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard}, utils::{calculate_hash, stream::TryIgnore, IterStream, MutexMap, MutexMapGuard, ReadyExt},
warn, PduEvent, Result, warn, PduEvent, Result,
}; };
use data::Data; use database::{Deserialized, Ignore, Interfix, Map};
use database::{Ignore, Interfix};
use futures::{future::join_all, pin_mut, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use futures::{future::join_all, pin_mut, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{ use ruma::{
events::{ events::{
@ -30,9 +27,9 @@ use super::state_compressor::CompressedStateEvent;
use crate::{globals, rooms, Dep}; use crate::{globals, rooms, Dep};
pub struct Service { pub struct Service {
pub mutex: RoomMutexMap,
services: Services, services: Services,
db: Data, db: Data,
pub mutex: RoomMutexMap,
} }
struct Services { struct Services {
@ -45,12 +42,19 @@ struct Services {
timeline: Dep<rooms::timeline::Service>, timeline: Dep<rooms::timeline::Service>,
} }
struct Data {
shorteventid_shortstatehash: Arc<Map>,
roomid_shortstatehash: Arc<Map>,
roomid_pduleaves: Arc<Map>,
}
type RoomMutexMap = MutexMap<OwnedRoomId, ()>; type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>; pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>;
impl crate::Service for Service { impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> { fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { Ok(Arc::new(Self {
mutex: RoomMutexMap::new(),
services: Services { services: Services {
globals: args.depend::<globals::Service>("globals"), globals: args.depend::<globals::Service>("globals"),
short: args.depend::<rooms::short::Service>("rooms::short"), short: args.depend::<rooms::short::Service>("rooms::short"),
@ -60,12 +64,15 @@ impl crate::Service for Service {
state_compressor: args.depend::<rooms::state_compressor::Service>("rooms::state_compressor"), state_compressor: args.depend::<rooms::state_compressor::Service>("rooms::state_compressor"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"), timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
}, },
db: Data::new(args.db), db: Data {
mutex: RoomMutexMap::new(), shorteventid_shortstatehash: args.db["shorteventid_shortstatehash"].clone(),
roomid_shortstatehash: args.db["roomid_shortstatehash"].clone(),
roomid_pduleaves: args.db["roomid_pduleaves"].clone(),
},
})) }))
} }
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> { fn memory_usage(&self, out: &mut dyn Write) -> Result {
let mutex = self.mutex.len(); let mutex = self.mutex.len();
writeln!(out, "state_mutex: {mutex}")?; writeln!(out, "state_mutex: {mutex}")?;
@ -84,7 +91,7 @@ impl Service {
statediffnew: Arc<HashSet<CompressedStateEvent>>, statediffnew: Arc<HashSet<CompressedStateEvent>>,
_statediffremoved: Arc<HashSet<CompressedStateEvent>>, _statediffremoved: Arc<HashSet<CompressedStateEvent>>,
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> { ) -> Result {
let event_ids = statediffnew.iter().stream().filter_map(|new| { let event_ids = statediffnew.iter().stream().filter_map(|new| {
self.services self.services
.state_compressor .state_compressor
@ -127,7 +134,7 @@ impl Service {
self.services.state_cache.update_joined_count(room_id).await; self.services.state_cache.update_joined_count(room_id).await;
self.db.set_room_state(room_id, shortstatehash, state_lock); self.set_room_state(room_id, shortstatehash, state_lock);
Ok(()) Ok(())
} }
@ -140,13 +147,15 @@ impl Service {
pub async fn set_event_state( pub async fn set_event_state(
&self, event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc<HashSet<CompressedStateEvent>>, &self, event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<u64> { ) -> Result<u64> {
const BUFSIZE: usize = size_of::<u64>();
let shorteventid = self let shorteventid = self
.services .services
.short .short
.get_or_create_shorteventid(event_id) .get_or_create_shorteventid(event_id)
.await; .await;
let previous_shortstatehash = self.db.get_room_shortstatehash(room_id).await; let previous_shortstatehash = self.get_room_shortstatehash(room_id).await;
let state_hash = calculate_hash( let state_hash = calculate_hash(
&state_ids_compressed &state_ids_compressed
@ -196,7 +205,9 @@ impl Service {
)?; )?;
} }
self.db.set_event_state(shorteventid, shortstatehash); self.db
.shorteventid_shortstatehash
.aput::<BUFSIZE, BUFSIZE, _, _>(shorteventid, shortstatehash);
Ok(shortstatehash) Ok(shortstatehash)
} }
@ -207,6 +218,8 @@ impl Service {
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, new_pdu), level = "debug")] #[tracing::instrument(skip(self, new_pdu), level = "debug")]
pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> { pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> {
const BUFSIZE: usize = size_of::<u64>();
let shorteventid = self let shorteventid = self
.services .services
.short .short
@ -216,7 +229,9 @@ impl Service {
let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id).await; let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id).await;
if let Ok(p) = previous_shortstatehash { if let Ok(p) = previous_shortstatehash {
self.db.set_event_state(shorteventid, p); self.db
.shorteventid_shortstatehash
.aput::<BUFSIZE, BUFSIZE, _, _>(shorteventid, p);
} }
if let Some(state_key) = &new_pdu.state_key { if let Some(state_key) = &new_pdu.state_key {
@ -306,14 +321,18 @@ impl Service {
} }
/// Set the state hash to a new version, but does not update state_cache. /// Set the state hash to a new version, but does not update state_cache.
#[tracing::instrument(skip(self, mutex_lock), level = "debug")] #[tracing::instrument(skip(self, _mutex_lock), level = "debug")]
pub fn set_room_state( pub fn set_room_state(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
shortstatehash: u64, shortstatehash: u64,
mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex _mutex_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
) { ) {
self.db.set_room_state(room_id, shortstatehash, mutex_lock); const BUFSIZE: usize = size_of::<u64>();
self.db
.roomid_shortstatehash
.raw_aput::<BUFSIZE, _, _>(room_id, shortstatehash);
} }
/// Returns the room's version. /// Returns the room's version.
@ -327,9 +346,12 @@ impl Service {
.map_err(|e| err!(Request(NotFound("No create event found: {e:?}")))) .map_err(|e| err!(Request(NotFound("No create event found: {e:?}"))))
} }
#[inline]
pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<u64> { pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<u64> {
self.db.get_room_shortstatehash(room_id).await self.db
.roomid_shortstatehash
.get(room_id)
.await
.deserialized()
} }
pub fn get_forward_extremities<'a>(&'a self, room_id: &'a RoomId) -> impl Stream<Item = &EventId> + Send + '_ { pub fn get_forward_extremities<'a>(&'a self, room_id: &'a RoomId) -> impl Stream<Item = &EventId> + Send + '_ {
@ -346,11 +368,20 @@ impl Service {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event_ids: Vec<OwnedEventId>, event_ids: Vec<OwnedEventId>,
state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex _state_lock: &RoomMutexGuard, // Take mutex guard to make sure users get the room state mutex
) { ) {
let prefix = (room_id, Interfix);
self.db self.db
.set_forward_extremities(room_id, event_ids, state_lock) .roomid_pduleaves
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| self.db.roomid_pduleaves.remove(key))
.await; .await;
for event_id in &event_ids {
let key = (room_id, event_id);
self.db.roomid_pduleaves.put_raw(key, event_id);
}
} }
/// This fetches auth events from the current state. /// This fetches auth events from the current state.