diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index adb4d8da..1383f902 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -1011,15 +1011,27 @@ async fn load_joined_room( .rooms .read_receipt .readreceipts_since(room_id, since) - .map(|(_, _, v)| v) + .filter_map(|(read_user, _, v)| async move { + (!services + .users + .user_is_ignored(&read_user, sender_user) + .await) + .then_some(v) + }) .collect() .await; if services.rooms.typing.last_typing_update(room_id).await? > since { edus.push( serde_json::from_str( - &serde_json::to_string(&services.rooms.typing.typings_all(room_id).await?) - .expect("event is valid, we just created it"), + &serde_json::to_string( + &services + .rooms + .typing + .typings_all(room_id, sender_user) + .await?, + ) + .expect("event is valid, we just created it"), ) .expect("event is valid, we just created it"), ); @@ -1583,6 +1595,13 @@ pub(crate) async fn sync_events_v4_route( .rooms .read_receipt .readreceipts_since(room_id, *roomsince) + .filter_map(|(read_user, ts, v)| async move { + (!services + .users + .user_is_ignored(&read_user, sender_user) + .await) + .then_some((read_user, ts, v)) + }) .collect() .await; diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index dbe38561..b1a71caf 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -14,7 +14,6 @@ use itertools::Itertools; use ruma::{ events::{ direct::DirectEvent, - ignored_user_list::IgnoredUserListEvent, room::{ create::RoomCreateEventContent, member::{MembershipState, RoomMemberEventContent}, @@ -197,30 +196,7 @@ impl Service { }, MembershipState::Invite => { // We want to know if the sender is ignored by the receiver - let is_ignored = self - .services - .account_data - .get( - None, // Ignored users are in global account data - user_id, // Receiver - GlobalAccountDataEventType::IgnoredUserList - .to_string() - .into(), - ) - .await - .and_then(|event| { - serde_json::from_str::(event.get()) - .map_err(|e| err!(Database(warn!("Invalid account data event in db: {e:?}")))) - }) - .map_or(false, |ignored| { - ignored - .content - .ignored_users - .iter() - .any(|(user, _details)| user == sender) - }); - - if is_ignored { + if self.services.users.user_is_ignored(sender, user_id).await { return Ok(()); } diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs index bcfce616..8ee34f44 100644 --- a/src/service/rooms/typing/mod.rs +++ b/src/service/rooms/typing/mod.rs @@ -1,6 +1,11 @@ use std::{collections::BTreeMap, sync::Arc}; -use conduit::{debug_info, trace, utils, Result, Server}; +use conduit::{ + debug_info, trace, + utils::{self, IterStream}, + Result, Server, +}; +use futures::StreamExt; use ruma::{ api::federation::transactions::edu::{Edu, TypingContent}, events::SyncEphemeralRoomEvent, @@ -8,7 +13,7 @@ use ruma::{ }; use tokio::sync::{broadcast, RwLock}; -use crate::{globals, sending, Dep}; +use crate::{globals, sending, users, Dep}; pub struct Service { server: Arc, @@ -23,6 +28,7 @@ pub struct Service { struct Services { globals: Dep, sending: Dep, + users: Dep, } impl crate::Service for Service { @@ -32,6 +38,7 @@ impl crate::Service for Service { services: Services { globals: args.depend::("globals"), sending: args.depend::("sending"), + users: args.depend::("users"), }, typing: RwLock::new(BTreeMap::new()), last_typing_update: RwLock::new(BTreeMap::new()), @@ -170,17 +177,35 @@ impl Service { /// Returns a new typing EDU. pub async fn typings_all( - &self, room_id: &RoomId, + &self, room_id: &RoomId, sender_user: &UserId, ) -> Result> { + let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); + + let Some(typing_indicators) = room_typing_indicators else { + return Ok(SyncEphemeralRoomEvent { + content: ruma::events::typing::TypingEventContent { + user_ids: Vec::new(), + }, + }); + }; + + let user_ids: Vec<_> = typing_indicators + .into_keys() + .stream() + .filter_map(|typing_user_id| async move { + (!self + .services + .users + .user_is_ignored(&typing_user_id, sender_user) + .await) + .then_some(typing_user_id) + }) + .collect() + .await; + Ok(SyncEphemeralRoomEvent { content: ruma::events::typing::TypingEventContent { - user_ids: self - .typing - .read() - .await - .get(room_id) - .map(|m| m.keys().cloned().collect()) - .unwrap_or_default(), + user_ids, }, }) } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 438c220b..1c079085 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -10,13 +10,13 @@ use futures::{pin_mut, FutureExt, Stream, StreamExt, TryFutureExt}; use ruma::{ api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, - events::{AnyToDeviceEvent, StateEventType}, + events::{ignored_user_list::IgnoredUserListEvent, AnyToDeviceEvent, GlobalAccountDataEventType, StateEventType}, serde::Raw, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri, OwnedUserId, UInt, UserId, }; -use crate::{admin, globals, rooms, Dep}; +use crate::{account_data, admin, globals, rooms, Dep}; pub struct Service { services: Services, @@ -25,6 +25,7 @@ pub struct Service { struct Services { server: Arc, + account_data: Dep, admin: Dep, globals: Dep, state_accessor: Dep, @@ -58,6 +59,7 @@ impl crate::Service for Service { Ok(Arc::new(Self { services: Services { server: args.server.clone(), + account_data: args.depend::("account_data"), admin: args.depend::("admin"), globals: args.depend::("globals"), state_accessor: args.depend::("rooms::state_accessor"), @@ -91,6 +93,32 @@ impl crate::Service for Service { } impl Service { + /// Returns true/false based on whether the recipient/receiving user has + /// blocked the sender + pub async fn user_is_ignored(&self, sender_user: &UserId, recipient_user: &UserId) -> bool { + self.services + .account_data + .get( + None, + recipient_user, + GlobalAccountDataEventType::IgnoredUserList + .to_string() + .into(), + ) + .await + .and_then(|event| { + serde_json::from_str::(event.get()) + .map_err(|e| err!(Database(warn!("Invalid account data event in db: {e:?}")))) + }) + .map_or(false, |ignored| { + ignored + .content + .ignored_users + .keys() + .any(|blocked_user| blocked_user == sender_user) + }) + } + /// Check if a user is an admin #[inline] pub async fn is_admin(&self, user_id: &UserId) -> bool { self.services.admin.user_is_admin(user_id).await }