diff --git a/src/api/client/message.rs b/src/api/client/message.rs index bab5fa54..d577e3c8 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -1,7 +1,11 @@ use std::collections::{BTreeMap, HashSet}; use axum::extract::State; -use conduit::{err, utils::ReadyExt, Err, PduCount}; +use conduit::{ + err, + utils::{IterStream, ReadyExt}, + Err, PduCount, +}; use futures::{FutureExt, StreamExt}; use ruma::{ api::client::{ @@ -9,7 +13,7 @@ use ruma::{ filter::{RoomEventFilter, UrlFilter}, message::{get_message_events, send_message_event}, }, - events::{MessageLikeEventType, StateEventType}, + events::{MessageLikeEventType, StateEventType, TimelineEventType::*}, UserId, }; use serde_json::{from_str, Value}; @@ -182,8 +186,30 @@ pub(crate) async fn get_message_events_route( let events_after: Vec<_> = events_after .into_iter() - .map(|(_, pdu)| pdu.to_room_event()) - .collect(); + .stream() + .filter_map(|(_, pdu)| async move { + // list of safe and common non-state events to ignore + if matches!( + &pdu.kind, + RoomMessage + | Sticker | CallInvite + | CallNotify | RoomEncrypted + | Image | File | Audio + | Voice | Video | UnstablePollStart + | PollStart | KeyVerificationStart + | Reaction | Emote | Location + ) && services + .users + .user_is_ignored(&pdu.sender, sender_user) + .await + { + return None; + } + + Some(pdu.to_room_event()) + }) + .collect() + .await; resp.start = from.stringify(); resp.end = next_token.map(|count| count.stringify()); @@ -203,6 +229,27 @@ pub(crate) async fn get_message_events_route( .pdus_until(sender_user, room_id, from) .await? .ready_filter_map(|item| contains_url_filter(item, filter)) + .filter_map(|(count, pdu)| async move { + // list of safe and common non-state events to ignore + if matches!( + &pdu.kind, + RoomMessage + | Sticker | CallInvite + | CallNotify | RoomEncrypted + | Image | File | Audio + | Voice | Video | UnstablePollStart + | PollStart | KeyVerificationStart + | Reaction | Emote | Location + ) && services + .users + .user_is_ignored(&pdu.sender, sender_user) + .await + { + return None; + } + + Some((count, pdu)) + }) .filter_map(|item| visibility_filter(&services, item, sender_user)) .ready_take_while(|(count, _)| Some(*count) != to) // Stop at `to` .take(limit) @@ -243,17 +290,20 @@ pub(crate) async fn get_message_events_route( }, } - resp.state = Vec::new(); - for ll_id in &lazy_loaded { - if let Ok(member_event) = services - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomMember, ll_id.as_str()) - .await - { - resp.state.push(member_event.to_state_event()); - } - } + resp.state = lazy_loaded + .iter() + .stream() + .filter_map(|ll_user_id| async move { + services + .rooms + .state_accessor + .room_state_get(room_id, &StateEventType::RoomMember, ll_user_id.as_str()) + .await + .map(|member_event| member_event.to_state_event()) + .ok() + }) + .collect() + .await; // remove the feature check when we are sure clients like element can handle it if !cfg!(feature = "element_hacks") {