diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs deleted file mode 100644 index 96b009f8..00000000 --- a/src/service/rooms/user/data.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::sync::Arc; - -use conduit::Result; -use database::{Deserialized, Map}; -use futures::{Stream, StreamExt}; -use ruma::{RoomId, UserId}; - -use crate::{globals, rooms, Dep}; - -pub(super) struct Data { - userroomid_notificationcount: Arc, - userroomid_highlightcount: Arc, - roomuserid_lastnotificationread: Arc, - roomsynctoken_shortstatehash: Arc, - services: Services, -} - -struct Services { - globals: Dep, - short: Dep, - state_cache: Dep, -} - -impl Data { - pub(super) fn new(args: &crate::Args<'_>) -> Self { - let db = &args.db; - Self { - userroomid_notificationcount: db["userroomid_notificationcount"].clone(), - userroomid_highlightcount: db["userroomid_highlightcount"].clone(), - roomuserid_lastnotificationread: db["userroomid_highlightcount"].clone(), //< NOTE: known bug from conduit - roomsynctoken_shortstatehash: db["roomsynctoken_shortstatehash"].clone(), - services: Services { - globals: args.depend::("globals"), - short: args.depend::("rooms::short"), - state_cache: args.depend::("rooms::state_cache"), - }, - } - } - - pub(super) fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) { - let userroom_id = (user_id, room_id); - self.userroomid_highlightcount.put(userroom_id, 0_u64); - self.userroomid_notificationcount.put(userroom_id, 0_u64); - - let roomuser_id = (room_id, user_id); - let count = self.services.globals.next_count().unwrap(); - self.roomuserid_lastnotificationread.put(roomuser_id, count); - } - - pub(super) async fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { - let key = (user_id, room_id); - self.userroomid_notificationcount - .qry(&key) - .await - .deserialized() - .unwrap_or(0) - } - - pub(super) async fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { - let key = (user_id, room_id); - self.userroomid_highlightcount - .qry(&key) - .await - .deserialized() - .unwrap_or(0) - } - - pub(super) async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> u64 { - let key = (room_id, user_id); - self.roomuserid_lastnotificationread - .qry(&key) - .await - .deserialized() - .unwrap_or(0) - } - - pub(super) async fn associate_token_shortstatehash(&self, room_id: &RoomId, token: u64, shortstatehash: u64) { - let shortroomid = self - .services - .short - .get_shortroomid(room_id) - .await - .expect("room exists"); - - let key: &[u64] = &[shortroomid, token]; - self.roomsynctoken_shortstatehash.put(key, shortstatehash); - } - - pub(super) async fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result { - let shortroomid = self.services.short.get_shortroomid(room_id).await?; - - let key: &[u64] = &[shortroomid, token]; - self.roomsynctoken_shortstatehash - .qry(key) - .await - .deserialized() - } - - //TODO: optimize; replace point-queries with dual iteration - pub(super) fn get_shared_rooms<'a>( - &'a self, user_a: &'a UserId, user_b: &'a UserId, - ) -> impl Stream + Send + 'a { - self.services - .state_cache - .rooms_joined(user_a) - .filter(|room_id| self.services.state_cache.is_joined(user_b, room_id)) - } -} diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index d9d90ecf..e484203d 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -1,71 +1,139 @@ -mod data; - use std::sync::Arc; -use conduit::Result; +use conduit::{implement, Result}; +use database::{Deserialized, Map}; use futures::{pin_mut, Stream, StreamExt}; use ruma::{RoomId, UserId}; -use self::data::Data; +use crate::{globals, rooms, Dep}; pub struct Service { db: Data, + services: Services, +} + +struct Data { + userroomid_notificationcount: Arc, + userroomid_highlightcount: Arc, + roomuserid_lastnotificationread: Arc, + roomsynctoken_shortstatehash: Arc, +} + +struct Services { + globals: Dep, + short: Dep, + state_cache: Dep, } impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { - db: Data::new(&args), + db: Data { + userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), + userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), + roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(), //< NOTE: known bug from conduit + roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(), + }, + + services: Services { + globals: args.depend::("globals"), + short: args.depend::("rooms::short"), + state_cache: args.depend::("rooms::state_cache"), + }, })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - #[inline] - pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) { - self.db.reset_notification_counts(user_id, room_id); - } +#[implement(Service)] +pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) { + let userroom_id = (user_id, room_id); + self.db.userroomid_highlightcount.put(userroom_id, 0_u64); + self.db.userroomid_notificationcount.put(userroom_id, 0_u64); - #[inline] - pub async fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { - self.db.notification_count(user_id, room_id).await - } - - #[inline] - pub async fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { - self.db.highlight_count(user_id, room_id).await - } - - #[inline] - pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> u64 { - self.db.last_notification_read(user_id, room_id).await - } - - #[inline] - pub async fn associate_token_shortstatehash(&self, room_id: &RoomId, token: u64, shortstatehash: u64) { - self.db - .associate_token_shortstatehash(room_id, token, shortstatehash) - .await; - } - - #[inline] - pub async fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result { - self.db.get_token_shortstatehash(room_id, token).await - } - - #[inline] - pub fn get_shared_rooms<'a>( - &'a self, user_a: &'a UserId, user_b: &'a UserId, - ) -> impl Stream + Send + 'a { - self.db.get_shared_rooms(user_a, user_b) - } - - pub async fn has_shared_rooms<'a>(&'a self, user_a: &'a UserId, user_b: &'a UserId) -> bool { - let get_shared_rooms = self.get_shared_rooms(user_a, user_b); - - pin_mut!(get_shared_rooms); - get_shared_rooms.next().await.is_some() - } + let roomuser_id = (room_id, user_id); + let count = self.services.globals.next_count().unwrap(); + self.db + .roomuserid_lastnotificationread + .put(roomuser_id, count); +} + +#[implement(Service)] +pub async fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { + let key = (user_id, room_id); + self.db + .userroomid_notificationcount + .qry(&key) + .await + .deserialized() + .unwrap_or(0) +} + +#[implement(Service)] +pub async fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 { + let key = (user_id, room_id); + self.db + .userroomid_highlightcount + .qry(&key) + .await + .deserialized() + .unwrap_or(0) +} + +#[implement(Service)] +pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> u64 { + let key = (room_id, user_id); + self.db + .roomuserid_lastnotificationread + .qry(&key) + .await + .deserialized() + .unwrap_or(0) +} + +#[implement(Service)] +pub async fn associate_token_shortstatehash(&self, room_id: &RoomId, token: u64, shortstatehash: u64) { + let shortroomid = self + .services + .short + .get_shortroomid(room_id) + .await + .expect("room exists"); + + let key: &[u64] = &[shortroomid, token]; + self.db + .roomsynctoken_shortstatehash + .put(key, shortstatehash); +} + +#[implement(Service)] +pub async fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result { + let shortroomid = self.services.short.get_shortroomid(room_id).await?; + + let key: &[u64] = &[shortroomid, token]; + self.db + .roomsynctoken_shortstatehash + .qry(key) + .await + .deserialized() +} + +#[implement(Service)] +pub async fn has_shared_rooms<'a>(&'a self, user_a: &'a UserId, user_b: &'a UserId) -> bool { + let get_shared_rooms = self.get_shared_rooms(user_a, user_b); + + pin_mut!(get_shared_rooms); + get_shared_rooms.next().await.is_some() +} + +//TODO: optimize; replace point-queries with dual iteration +#[implement(Service)] +pub fn get_shared_rooms<'a>( + &'a self, user_a: &'a UserId, user_b: &'a UserId, +) -> impl Stream + Send + 'a { + self.services + .state_cache + .rooms_joined(user_a) + .filter(|room_id| self.services.state_cache.is_joined(user_b, room_id)) }