diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs deleted file mode 100644 index c06c8107..00000000 --- a/src/service/rooms/state_cache/data.rs +++ /dev/null @@ -1,179 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; - -use conduit::{utils::stream::TryIgnore, Result}; -use database::{serialize_to_vec, Deserialized, Interfix, Json, Map}; -use futures::{Stream, StreamExt}; -use ruma::{ - events::{AnyStrippedStateEvent, AnySyncStateEvent}, - serde::Raw, - OwnedRoomId, RoomId, UserId, -}; - -use crate::{globals, Dep}; - -type AppServiceInRoomCache = RwLock>>; -type StrippedStateEventItem = (OwnedRoomId, Vec>); -type SyncStateEventItem = (OwnedRoomId, Vec>); - -pub(super) struct Data { - pub(super) appservice_in_room_cache: AppServiceInRoomCache, - pub(super) roomid_invitedcount: Arc, - pub(super) roomid_inviteviaservers: Arc, - pub(super) roomid_joinedcount: Arc, - pub(super) roomserverids: Arc, - pub(super) roomuserid_invitecount: Arc, - pub(super) roomuserid_joined: Arc, - pub(super) roomuserid_leftcount: Arc, - pub(super) roomuseroncejoinedids: Arc, - pub(super) serverroomids: Arc, - pub(super) userroomid_invitestate: Arc, - pub(super) userroomid_joined: Arc, - pub(super) userroomid_leftstate: Arc, - services: Services, -} - -struct Services { - globals: Dep, -} - -impl Data { - pub(super) fn new(args: &crate::Args<'_>) -> Self { - let db = &args.db; - Self { - appservice_in_room_cache: RwLock::new(HashMap::new()), - roomid_invitedcount: db["roomid_invitedcount"].clone(), - roomid_inviteviaservers: db["roomid_inviteviaservers"].clone(), - roomid_joinedcount: db["roomid_joinedcount"].clone(), - roomserverids: db["roomserverids"].clone(), - roomuserid_invitecount: db["roomuserid_invitecount"].clone(), - roomuserid_joined: db["roomuserid_joined"].clone(), - roomuserid_leftcount: db["roomuserid_leftcount"].clone(), - roomuseroncejoinedids: db["roomuseroncejoinedids"].clone(), - serverroomids: db["serverroomids"].clone(), - userroomid_invitestate: db["userroomid_invitestate"].clone(), - userroomid_joined: db["userroomid_joined"].clone(), - userroomid_leftstate: db["userroomid_leftstate"].clone(), - services: Services { - globals: args.depend::("globals"), - }, - } - } - - pub(super) fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) { - let key = (user_id, room_id); - - self.roomuseroncejoinedids.put_raw(key, []); - } - - pub(super) fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { - let userroom_id = (user_id, room_id); - let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); - - let roomuser_id = (room_id, user_id); - let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); - - self.userroomid_joined.insert(&userroom_id, []); - self.roomuserid_joined.insert(&roomuser_id, []); - - self.userroomid_invitestate.remove(&userroom_id); - self.roomuserid_invitecount.remove(&roomuser_id); - - self.userroomid_leftstate.remove(&userroom_id); - self.roomuserid_leftcount.remove(&roomuser_id); - - self.roomid_inviteviaservers.remove(room_id); - } - - pub(super) fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { - let userroom_id = (user_id, room_id); - let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); - - let roomuser_id = (room_id, user_id); - let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); - - // (timo) TODO - let leftstate = Vec::>::new(); - let count = self.services.globals.next_count().unwrap(); - - self.userroomid_leftstate - .raw_put(&userroom_id, Json(leftstate)); - self.roomuserid_leftcount.raw_put(&roomuser_id, count); - - self.userroomid_joined.remove(&userroom_id); - self.roomuserid_joined.remove(&roomuser_id); - - self.userroomid_invitestate.remove(&userroom_id); - self.roomuserid_invitecount.remove(&roomuser_id); - - self.roomid_inviteviaservers.remove(room_id); - } - - /// Makes a user forget a room. - #[tracing::instrument(skip(self), level = "debug")] - pub(super) fn forget(&self, room_id: &RoomId, user_id: &UserId) { - let userroom_id = (user_id, room_id); - let roomuser_id = (room_id, user_id); - - self.userroomid_leftstate.del(userroom_id); - self.roomuserid_leftcount.del(roomuser_id); - } - - /// Returns an iterator over all rooms a user was invited to. - #[inline] - pub(super) fn rooms_invited<'a>( - &'a self, user_id: &'a UserId, - ) -> impl Stream + Send + 'a { - type Key<'a> = (&'a UserId, &'a RoomId); - type KeyVal<'a> = (Key<'a>, Raw>); - - let prefix = (user_id, Interfix); - self.userroomid_invitestate - .stream_prefix(&prefix) - .ignore_err() - .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) - .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) - .ignore_err() - } - - /// Returns an iterator over all rooms a user left. - #[inline] - pub(super) fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { - type Key<'a> = (&'a UserId, &'a RoomId); - type KeyVal<'a> = (Key<'a>, Raw>>); - - let prefix = (user_id, Interfix); - self.userroomid_leftstate - .stream_prefix(&prefix) - .ignore_err() - .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) - .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) - .ignore_err() - } - - #[tracing::instrument(skip(self), level = "debug")] - pub(super) async fn invite_state( - &self, user_id: &UserId, room_id: &RoomId, - ) -> Result>> { - let key = (user_id, room_id); - self.userroomid_invitestate - .qry(&key) - .await - .deserialized() - .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) - } - - #[tracing::instrument(skip(self), level = "debug")] - pub(super) async fn left_state( - &self, user_id: &UserId, room_id: &RoomId, - ) -> Result>> { - let key = (user_id, room_id); - self.userroomid_leftstate - .qry(&key) - .await - .deserialized() - .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) - } -} diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 077eee10..4f4ff264 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -1,14 +1,14 @@ -mod data; - -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, +}; use conduit::{ err, is_not_empty, utils::{stream::TryIgnore, ReadyExt, StreamTools}, warn, Result, }; -use data::Data; -use database::{serialize_to_vec, Deserialized, Ignore, Interfix, Json}; +use database::{serialize_to_vec, Deserialized, Ignore, Interfix, Json, Map}; use futures::{stream::iter, Stream, StreamExt}; use itertools::Itertools; use ruma::{ @@ -29,6 +29,7 @@ use ruma::{ use crate::{account_data, appservice::RegistrationInfo, globals, rooms, users, Dep}; pub struct Service { + appservice_in_room_cache: AppServiceInRoomCache, services: Services, db: Data, } @@ -40,16 +41,49 @@ struct Services { users: Dep, } +struct Data { + roomid_invitedcount: Arc, + roomid_inviteviaservers: Arc, + roomid_joinedcount: Arc, + roomserverids: Arc, + roomuserid_invitecount: Arc, + roomuserid_joined: Arc, + roomuserid_leftcount: Arc, + roomuseroncejoinedids: Arc, + serverroomids: Arc, + userroomid_invitestate: Arc, + userroomid_joined: Arc, + userroomid_leftstate: Arc, +} + +type AppServiceInRoomCache = RwLock>>; +type StrippedStateEventItem = (OwnedRoomId, Vec>); +type SyncStateEventItem = (OwnedRoomId, Vec>); + impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { + appservice_in_room_cache: RwLock::new(HashMap::new()), services: Services { account_data: args.depend::("account_data"), globals: args.depend::("globals"), state_accessor: args.depend::("rooms::state_accessor"), users: args.depend::("users"), }, - db: Data::new(&args), + db: Data { + roomid_invitedcount: args.db["roomid_invitedcount"].clone(), + roomid_inviteviaservers: args.db["roomid_inviteviaservers"].clone(), + roomid_joinedcount: args.db["roomid_joinedcount"].clone(), + roomserverids: args.db["roomserverids"].clone(), + roomuserid_invitecount: args.db["roomuserid_invitecount"].clone(), + roomuserid_joined: args.db["roomuserid_joined"].clone(), + roomuserid_leftcount: args.db["roomuserid_leftcount"].clone(), + roomuseroncejoinedids: args.db["roomuseroncejoinedids"].clone(), + serverroomids: args.db["serverroomids"].clone(), + userroomid_invitestate: args.db["userroomid_invitestate"].clone(), + userroomid_joined: args.db["userroomid_joined"].clone(), + userroomid_leftstate: args.db["userroomid_leftstate"].clone(), + }, })) } @@ -107,7 +141,7 @@ impl Service { // Check if the user never joined this room if !self.once_joined(user_id, room_id).await { // Add the user ID to the join list then - self.db.mark_as_once_joined(user_id, room_id); + self.mark_as_once_joined(user_id, room_id); // Check if the room has a predecessor if let Ok(Some(predecessor)) = self @@ -186,7 +220,7 @@ impl Service { } } - self.db.mark_as_joined(user_id, room_id); + self.mark_as_joined(user_id, room_id); }, MembershipState::Invite => { // We want to know if the sender is ignored by the receiver @@ -198,7 +232,7 @@ impl Service { .await; }, MembershipState::Leave | MembershipState::Ban => { - self.db.mark_as_left(user_id, room_id); + self.mark_as_left(user_id, room_id); }, _ => {}, } @@ -213,10 +247,9 @@ impl Service { #[tracing::instrument(skip(self, room_id, appservice), level = "debug")] pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> bool { let maybe = self - .db .appservice_in_room_cache .read() - .unwrap() + .expect("locked") .get(room_id) .and_then(|map| map.get(&appservice.registration.id)) .copied(); @@ -242,10 +275,9 @@ impl Service { .ready_any(|userid| appservice.users.is_match(userid.as_str())) .await; - self.db - .appservice_in_room_cache + self.appservice_in_room_cache .write() - .unwrap() + .expect("locked") .entry(room_id.to_owned()) .or_default() .insert(appservice.registration.id.clone(), in_room); @@ -254,21 +286,67 @@ impl Service { } } - /// Direct DB function to directly mark a user as left. It is not - /// recommended to use this directly. You most likely should use - /// `update_membership` instead - #[tracing::instrument(skip(self), level = "debug")] - pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { self.db.mark_as_left(user_id, room_id); } - /// Direct DB function to directly mark a user as joined. It is not /// recommended to use this directly. You most likely should use /// `update_membership` instead #[tracing::instrument(skip(self), level = "debug")] - pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { self.db.mark_as_joined(user_id, room_id); } + pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { + let userroom_id = (user_id, room_id); + let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); + + let roomuser_id = (room_id, user_id); + let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); + + self.db.userroomid_joined.insert(&userroom_id, []); + self.db.roomuserid_joined.insert(&roomuser_id, []); + + self.db.userroomid_invitestate.remove(&userroom_id); + self.db.roomuserid_invitecount.remove(&roomuser_id); + + self.db.userroomid_leftstate.remove(&userroom_id); + self.db.roomuserid_leftcount.remove(&roomuser_id); + + self.db.roomid_inviteviaservers.remove(room_id); + } + + /// Direct DB function to directly mark a user as left. It is not + /// recommended to use this directly. You most likely should use + /// `update_membership` instead + #[tracing::instrument(skip(self), level = "debug")] + pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { + let userroom_id = (user_id, room_id); + let userroom_id = serialize_to_vec(userroom_id).expect("failed to serialize userroom_id"); + + let roomuser_id = (room_id, user_id); + let roomuser_id = serialize_to_vec(roomuser_id).expect("failed to serialize roomuser_id"); + + // (timo) TODO + let leftstate = Vec::>::new(); + let count = self.services.globals.next_count().unwrap(); + + self.db + .userroomid_leftstate + .raw_put(&userroom_id, Json(leftstate)); + self.db.roomuserid_leftcount.raw_put(&roomuser_id, count); + + self.db.userroomid_joined.remove(&userroom_id); + self.db.roomuserid_joined.remove(&roomuser_id); + + self.db.userroomid_invitestate.remove(&userroom_id); + self.db.roomuserid_invitecount.remove(&roomuser_id); + + self.db.roomid_inviteviaservers.remove(room_id); + } /// Makes a user forget a room. #[tracing::instrument(skip(self), level = "debug")] - pub fn forget(&self, room_id: &RoomId, user_id: &UserId) { self.db.forget(room_id, user_id); } + pub fn forget(&self, room_id: &RoomId, user_id: &UserId) { + let userroom_id = (user_id, room_id); + let roomuser_id = (room_id, user_id); + + self.db.userroomid_leftstate.del(userroom_id); + self.db.roomuserid_leftcount.del(roomuser_id); + } /// Returns an iterator of all servers participating in this room. #[tracing::instrument(skip(self), level = "debug")] @@ -415,28 +493,56 @@ impl Service { /// Returns an iterator over all rooms a user was invited to. #[tracing::instrument(skip(self), level = "debug")] - pub fn rooms_invited<'a>( - &'a self, user_id: &'a UserId, - ) -> impl Stream>)> + Send + 'a { - self.db.rooms_invited(user_id) + pub fn rooms_invited<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { + type KeyVal<'a> = (Key<'a>, Raw>); + type Key<'a> = (&'a UserId, &'a RoomId); + + let prefix = (user_id, Interfix); + self.db + .userroomid_invitestate + .stream_prefix(&prefix) + .ignore_err() + .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) + .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) + .ignore_err() } #[tracing::instrument(skip(self), level = "debug")] pub async fn invite_state(&self, user_id: &UserId, room_id: &RoomId) -> Result>> { - self.db.invite_state(user_id, room_id).await + let key = (user_id, room_id); + self.db + .userroomid_invitestate + .qry(&key) + .await + .deserialized() + .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) } #[tracing::instrument(skip(self), level = "debug")] pub async fn left_state(&self, user_id: &UserId, room_id: &RoomId) -> Result>> { - self.db.left_state(user_id, room_id).await + let key = (user_id, room_id); + self.db + .userroomid_leftstate + .qry(&key) + .await + .deserialized() + .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) } /// Returns an iterator over all rooms a user left. #[tracing::instrument(skip(self), level = "debug")] - pub fn rooms_left<'a>( - &'a self, user_id: &'a UserId, - ) -> impl Stream>)> + Send + 'a { - self.db.rooms_left(user_id) + pub fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream + Send + 'a { + type KeyVal<'a> = (Key<'a>, Raw>>); + type Key<'a> = (&'a UserId, &'a RoomId); + + let prefix = (user_id, Interfix); + self.db + .userroomid_leftstate + .stream_prefix(&prefix) + .ignore_err() + .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) + .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) + .ignore_err() } #[tracing::instrument(skip(self), level = "debug")] @@ -515,13 +621,13 @@ impl Service { } pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) { - let cache = self.db.appservice_in_room_cache.read().expect("locked"); + let cache = self.appservice_in_room_cache.read().expect("locked"); + (cache.len(), cache.capacity()) } pub fn clear_appservice_in_room_cache(&self) { - self.db - .appservice_in_room_cache + self.appservice_in_room_cache .write() .expect("locked") .clear(); @@ -574,13 +680,17 @@ impl Service { self.db.serverroomids.put_raw(serverroom_id, []); } - self.db - .appservice_in_room_cache + self.appservice_in_room_cache .write() .expect("locked") .remove(room_id); } + fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) { + let key = (user_id, room_id); + self.db.roomuseroncejoinedids.put_raw(key, []); + } + pub async fn mark_as_invited( &self, user_id: &UserId, room_id: &RoomId, last_state: Option>>, invite_via: Option>,