mod data; use std::sync::Arc; use conduit::{error, warn, Error, Result, Server}; use data::Data; use database::Database; use itertools::Itertools; use ruma::{ events::{ direct::DirectEvent, ignored_user_list::IgnoredUserListEvent, room::{ create::RoomCreateEventContent, member::{MembershipState, RoomMemberEventContent}, power_levels::RoomPowerLevelsEventContent, }, AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, }, int, serde::Raw, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; use crate::{appservice::RegistrationInfo, services, user_is_local}; pub struct Service { db: Data, } impl Service { pub fn build(_server: &Arc, db: &Arc) -> Result { Ok(Self { db: Data::new(db), }) } /// Update current membership data. #[tracing::instrument(skip(self, last_state))] #[allow(clippy::too_many_arguments)] pub fn update_membership( &self, room_id: &RoomId, user_id: &UserId, membership_event: RoomMemberEventContent, sender: &UserId, last_state: Option>>, invite_via: Option>, update_joined_count: bool, ) -> Result<()> { let membership = membership_event.membership; // Keep track what remote users exist by adding them as "deactivated" users // // TODO: use futures to update remote profiles without blocking the membership // update #[allow(clippy::collapsible_if)] if !user_is_local(user_id) { if !services().users.exists(user_id)? { services().users.create(user_id, None)?; } /* // Try to update our local copy of the user if ours does not match if ((services().users.displayname(user_id)? != membership_event.displayname) || (services().users.avatar_url(user_id)? != membership_event.avatar_url) || (services().users.blurhash(user_id)? != membership_event.blurhash)) && (membership != MembershipState::Leave) { let response = services() .sending .send_federation_request( user_id.server_name(), federation::query::get_profile_information::v1::Request { user_id: user_id.into(), field: None, // we want the full user's profile to update locally too }, ) .await; services().users.set_displayname(user_id, response.displayname.clone()).await?; services().users.set_avatar_url(user_id, response.avatar_url).await?; services().users.set_blurhash(user_id, response.blurhash).await?; }; */ } match &membership { MembershipState::Join => { // Check if the user never joined this room if !self.once_joined(user_id, room_id)? { // Add the user ID to the join list then self.db.mark_as_once_joined(user_id, room_id)?; // Check if the room has a predecessor if let Some(predecessor) = services() .rooms .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "")? .and_then(|create| serde_json::from_str(create.content.get()).ok()) .and_then(|content: RoomCreateEventContent| content.predecessor) { // Copy user settings from predecessor to the current room: // - Push rules // // TODO: finish this once push rules are implemented. // // let mut push_rules_event_content: PushRulesEvent = account_data // .get( // None, // user_id, // EventType::PushRules, // )?; // // NOTE: find where `predecessor.room_id` match // and update to `room_id`. // // account_data // .update( // None, // user_id, // EventType::PushRules, // &push_rules_event_content, // globals, // ) // .ok(); // Copy old tags to new room if let Some(tag_event) = services() .account_data .get(Some(&predecessor.room_id), user_id, RoomAccountDataEventType::Tag)? .map(|event| { serde_json::from_str(event.get()).map_err(|e| { warn!("Invalid account data event in db: {e:?}"); Error::BadDatabase("Invalid account data event in db.") }) }) { services() .account_data .update(Some(room_id), user_id, RoomAccountDataEventType::Tag, &tag_event?) .ok(); }; // Copy direct chat flag if let Some(direct_event) = services() .account_data .get(None, user_id, GlobalAccountDataEventType::Direct.to_string().into())? .map(|event| { serde_json::from_str::(event.get()).map_err(|e| { warn!("Invalid account data event in db: {e:?}"); Error::BadDatabase("Invalid account data event in db.") }) }) { let mut direct_event = direct_event?; let mut room_ids_updated = false; for room_ids in direct_event.content.0.values_mut() { if room_ids.iter().any(|r| r == &predecessor.room_id) { room_ids.push(room_id.to_owned()); room_ids_updated = true; } } if room_ids_updated { services().account_data.update( None, user_id, GlobalAccountDataEventType::Direct.to_string().into(), &serde_json::to_value(&direct_event).expect("to json always works"), )?; } }; } } self.db.mark_as_joined(user_id, room_id)?; }, MembershipState::Invite => { // We want to know if the sender is ignored by the receiver let is_ignored = services() .account_data .get( None, // Ignored users are in global account data user_id, // Receiver GlobalAccountDataEventType::IgnoredUserList .to_string() .into(), )? .map(|event| { serde_json::from_str::(event.get()).map_err(|e| { warn!("Invalid account data event in db: {e:?}"); Error::BadDatabase("Invalid account data event in db.") }) }) .transpose()? .map_or(false, |ignored| { ignored .content .ignored_users .iter() .any(|(user, _details)| user == sender) }); if is_ignored { return Ok(()); } self.db .mark_as_invited(user_id, room_id, last_state, invite_via)?; }, MembershipState::Leave | MembershipState::Ban => { self.db.mark_as_left(user_id, room_id)?; }, _ => {}, } if update_joined_count { self.update_joined_count(room_id)?; } Ok(()) } #[tracing::instrument(skip(self, room_id))] pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { self.db.update_joined_count(room_id) } #[tracing::instrument(skip(self, room_id, appservice))] pub fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result { self.db.appservice_in_room(room_id, appservice) } /// Direct DB function to directly mark a user as left. It is not /// recommended to use this directly. You most likely should use /// `update_membership` instead #[tracing::instrument(skip(self))] pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { self.db.mark_as_left(user_id, room_id) } /// Direct DB function to directly mark a user as joined. It is not /// recommended to use this directly. You most likely should use /// `update_membership` instead #[tracing::instrument(skip(self))] pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { self.db.mark_as_joined(user_id, room_id) } /// Makes a user forget a room. #[tracing::instrument(skip(self))] pub fn forget(&self, room_id: &RoomId, user_id: &UserId) -> Result<()> { self.db.forget(room_id, user_id) } /// Returns an iterator of all servers participating in this room. #[tracing::instrument(skip(self))] pub fn room_servers(&self, room_id: &RoomId) -> impl Iterator> + '_ { self.db.room_servers(room_id) } #[tracing::instrument(skip(self))] pub fn server_in_room(&self, server: &ServerName, room_id: &RoomId) -> Result { self.db.server_in_room(server, room_id) } /// Returns an iterator of all rooms a server participates in (as far as we /// know). #[tracing::instrument(skip(self))] pub fn server_rooms(&self, server: &ServerName) -> impl Iterator> + '_ { self.db.server_rooms(server) } /// Returns true if server can see user by sharing at least one room. #[tracing::instrument(skip(self))] pub fn server_sees_user(&self, server: &ServerName, user_id: &UserId) -> Result { Ok(self .server_rooms(server) .filter_map(Result::ok) .any(|room_id: OwnedRoomId| self.is_joined(user_id, &room_id).unwrap_or(false))) } /// Returns true if user_a and user_b share at least one room. #[tracing::instrument(skip(self))] pub fn user_sees_user(&self, user_a: &UserId, user_b: &UserId) -> Result { // Minimize number of point-queries by iterating user with least nr rooms let (a, b) = if self.rooms_joined(user_a).count() < self.rooms_joined(user_b).count() { (user_a, user_b) } else { (user_b, user_a) }; Ok(self .rooms_joined(a) .filter_map(Result::ok) .any(|room_id| self.is_joined(b, &room_id).unwrap_or(false))) } /// Returns an iterator over all joined members of a room. #[tracing::instrument(skip(self))] pub fn room_members(&self, room_id: &RoomId) -> impl Iterator> + '_ { self.db.room_members(room_id) } #[tracing::instrument(skip(self))] pub fn room_joined_count(&self, room_id: &RoomId) -> Result> { self.db.room_joined_count(room_id) } #[tracing::instrument(skip(self))] /// Returns an iterator of all our local users in the room, even if they're /// deactivated/guests pub fn local_users_in_room<'a>(&'a self, room_id: &RoomId) -> impl Iterator + 'a { self.db.local_users_in_room(room_id) } #[tracing::instrument(skip(self))] /// Returns an iterator of all our local joined users in a room who are /// active (not deactivated, not guest) pub fn active_local_users_in_room<'a>(&'a self, room_id: &RoomId) -> impl Iterator + 'a { self.db.active_local_users_in_room(room_id) } #[tracing::instrument(skip(self))] pub fn room_invited_count(&self, room_id: &RoomId) -> Result> { self.db.room_invited_count(room_id) } /// Returns an iterator over all User IDs who ever joined a room. #[tracing::instrument(skip(self))] pub fn room_useroncejoined(&self, room_id: &RoomId) -> impl Iterator> + '_ { self.db.room_useroncejoined(room_id) } /// Returns an iterator over all invited members of a room. #[tracing::instrument(skip(self))] pub fn room_members_invited(&self, room_id: &RoomId) -> impl Iterator> + '_ { self.db.room_members_invited(room_id) } #[tracing::instrument(skip(self))] pub fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { self.db.get_invite_count(room_id, user_id) } #[tracing::instrument(skip(self))] pub fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { self.db.get_left_count(room_id, user_id) } /// Returns an iterator over all rooms this user joined. #[tracing::instrument(skip(self))] pub fn rooms_joined(&self, user_id: &UserId) -> impl Iterator> + '_ { self.db.rooms_joined(user_id) } /// Returns an iterator over all rooms a user was invited to. #[tracing::instrument(skip(self))] pub fn rooms_invited( &self, user_id: &UserId, ) -> impl Iterator>)>> + '_ { self.db.rooms_invited(user_id) } #[tracing::instrument(skip(self))] pub fn invite_state(&self, user_id: &UserId, room_id: &RoomId) -> Result>>> { self.db.invite_state(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn left_state(&self, user_id: &UserId, room_id: &RoomId) -> Result>>> { self.db.left_state(user_id, room_id) } /// Returns an iterator over all rooms a user left. #[tracing::instrument(skip(self))] pub fn rooms_left( &self, user_id: &UserId, ) -> impl Iterator>)>> + '_ { self.db.rooms_left(user_id) } #[tracing::instrument(skip(self))] pub fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { self.db.once_joined(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn is_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { self.db.is_joined(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> Result { self.db.is_invited(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> Result { self.db.is_left(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn servers_invite_via(&self, room_id: &RoomId) -> impl Iterator> + '_ { self.db.servers_invite_via(room_id) } /// Gets up to three servers that are likely to be in the room in the /// distant future. /// /// See #[tracing::instrument(skip(self))] pub fn servers_route_via(&self, room_id: &RoomId) -> Result> { let most_powerful_user_server = services() .rooms .state_accessor .room_state_get(room_id, &StateEventType::RoomPowerLevels, "")? .map(|pdu| { serde_json::from_str(pdu.content.get()).map(|conent: RoomPowerLevelsEventContent| { conent .users .iter() .max_by_key(|(_, power)| *power) .and_then(|x| { if x.1 >= &int!(50) { Some(x) } else { None } }) .map(|(user, _power)| user.server_name().to_owned()) }) }) .transpose() .map_err(|e| { error!("Invalid power levels event content in database: {e}"); Error::bad_database("Invalid power levels event content in database") })? .flatten(); let mut servers: Vec = self .room_members(room_id) .filter_map(Result::ok) .counts_by(|user| user.server_name().to_owned()) .iter() .sorted_by_key(|(_, users)| *users) .map(|(server, _)| server.to_owned()) .rev() .take(3) .collect_vec(); if let Some(server) = most_powerful_user_server { servers.insert(0, server); servers.truncate(3); } Ok(servers) } }