diff --git a/src/api/client/context.rs b/src/api/client/context.rs index 9a5c4e82..9bf0c467 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -1,14 +1,25 @@ -use std::collections::HashSet; +use std::iter::once; use axum::extract::State; -use conduit::{err, error, Err}; -use futures::StreamExt; +use conduit::{ + err, error, + utils::{future::TryExtExt, stream::ReadyExt, IterStream}, + Err, Result, +}; +use futures::{future::try_join, StreamExt, TryFutureExt}; use ruma::{ api::client::{context::get_context, filter::LazyLoadOptions}, - events::{StateEventType, TimelineEventType::*}, + events::StateEventType, + UserId, }; -use crate::{Result, Ruma}; +use crate::{ + client::message::{event_filter, ignored_filter, update_lazy, visibility_filter, LazySet}, + Ruma, +}; + +const LIMIT_MAX: usize = 100; +const LIMIT_DEFAULT: usize = 10; /// # `GET /_matrix/client/r0/rooms/{roomId}/context/{eventId}` /// @@ -19,33 +30,43 @@ use crate::{Result, Ruma}; pub(crate) async fn get_context_route( State(services): State, body: Ruma, ) -> Result { - let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let sender_device = body.sender_device.as_ref().expect("user is authenticated"); + let filter = &body.filter; + let sender = body.sender(); + let (sender_user, _) = sender; + + // Use limit or else 10, with maximum 100 + let limit: usize = body + .limit + .try_into() + .unwrap_or(LIMIT_DEFAULT) + .min(LIMIT_MAX); // some clients, at least element, seem to require knowledge of redundant // members for "inline" profiles on the timeline to work properly - let (lazy_load_enabled, lazy_load_send_redundant) = match &body.filter.lazy_load_options { - LazyLoadOptions::Enabled { - include_redundant_members, - } => (true, *include_redundant_members), - LazyLoadOptions::Disabled => (false, cfg!(feature = "element_hacks")), - }; + let lazy_load_enabled = matches!(filter.lazy_load_options, LazyLoadOptions::Enabled { .. }); - let mut lazy_loaded = HashSet::with_capacity(100); + let lazy_load_redundant = if let LazyLoadOptions::Enabled { + include_redundant_members, + } = filter.lazy_load_options + { + include_redundant_members + } else { + false + }; let base_token = services .rooms .timeline .get_pdu_count(&body.event_id) - .await - .map_err(|_| err!(Request(NotFound("Base event id not found."))))?; + .map_err(|_| err!(Request(NotFound("Event not found.")))); let base_event = services .rooms .timeline .get_pdu(&body.event_id) - .await - .map_err(|_| err!(Request(NotFound("Base event not found."))))?; + .map_err(|_| err!(Request(NotFound("Base event not found.")))); + + let (base_token, base_event) = try_join(base_token, base_event).await?; let room_id = &base_event.room_id; @@ -58,136 +79,50 @@ pub(crate) async fn get_context_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - if !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, &base_event.sender) - .await || lazy_load_send_redundant - { - lazy_loaded.insert(base_event.sender.as_str().to_owned()); - } - - // Use limit or else 10, with maximum 100 - let limit = usize::try_from(body.limit).unwrap_or(10).min(100); - - let base_event = base_event.to_room_event(); - let events_before: Vec<_> = services .rooms .timeline .pdus_until(sender_user, room_id, base_token) .await? + .ready_filter_map(|item| event_filter(item, filter)) + .filter_map(|item| ignored_filter(&services, item, sender_user)) + .filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit / 2) - .filter_map(|(count, pdu)| async move { - // list of safe and common non-state events to ignore - if matches!( - &pdu.kind, - RoomMessage - | Sticker | CallInvite - | CallNotify | RoomEncrypted - | Image | File | Audio - | Voice | Video | UnstablePollStart - | PollStart | KeyVerificationStart - | Reaction | Emote - | Location - ) && services - .users - .user_is_ignored(&pdu.sender, sender_user) - .await - { - return None; - } - - services - .rooms - .state_accessor - .user_can_see_event(sender_user, room_id, &pdu.event_id) - .await - .then_some((count, pdu)) - }) .collect() .await; - for (_, event) in &events_before { - if !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, &event.sender) - .await || lazy_load_send_redundant - { - lazy_loaded.insert(event.sender.as_str().to_owned()); - } - } - - let start_token = events_before - .last() - .map_or_else(|| base_token.stringify(), |(count, _)| count.stringify()); - let events_after: Vec<_> = services .rooms .timeline .pdus_after(sender_user, room_id, base_token) .await? + .ready_filter_map(|item| event_filter(item, filter)) + .filter_map(|item| ignored_filter(&services, item, sender_user)) + .filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit / 2) - .filter_map(|(count, pdu)| async move { - // list of safe and common non-state events to ignore - if matches!( - &pdu.kind, - RoomMessage - | Sticker | CallInvite - | CallNotify | RoomEncrypted - | Image | File | Audio - | Voice | Video | UnstablePollStart - | PollStart | KeyVerificationStart - | Reaction | Emote - | Location - ) && services - .users - .user_is_ignored(&pdu.sender, sender_user) - .await - { - return None; - } - - services - .rooms - .state_accessor - .user_can_see_event(sender_user, room_id, &pdu.event_id) - .await - .then_some((count, pdu)) - }) .collect() .await; - for (_, event) in &events_after { - if !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, &event.sender) - .await || lazy_load_send_redundant - { - lazy_loaded.insert(event.sender.as_str().to_owned()); - } - } + let lazy = once(&(base_token, (*base_event).clone())) + .chain(events_before.iter()) + .chain(events_after.iter()) + .stream() + .fold(LazySet::new(), |lazy, item| { + update_lazy(&services, room_id, sender, lazy, item, lazy_load_redundant) + }) + .await; + + let state_id = events_after + .last() + .map_or(body.event_id.as_ref(), |(_, e)| e.event_id.as_ref()); let shortstatehash = services .rooms .state_accessor - .pdu_shortstatehash( - events_after - .last() - .map_or(&*body.event_id, |(_, e)| &*e.event_id), - ) + .pdu_shortstatehash(state_id) + .or_else(|_| services.rooms.state.get_room_shortstatehash(room_id)) .await - .map_or( - services - .rooms - .state - .get_room_shortstatehash(room_id) - .await - .expect("All rooms have state"), - |hash| hash, - ); + .map_err(|e| err!(Database("State hash not found: {e}")))?; let state_ids = services .rooms @@ -196,48 +131,61 @@ pub(crate) async fn get_context_route( .await .map_err(|e| err!(Database("State not found: {e}")))?; - let end_token = events_after - .last() - .map_or_else(|| base_token.stringify(), |(count, _)| count.stringify()); + let lazy = &lazy; + let state: Vec<_> = state_ids + .iter() + .stream() + .filter_map(|(shortstatekey, event_id)| { + services + .rooms + .short + .get_statekey_from_short(*shortstatekey) + .map_ok(move |(event_type, state_key)| (event_type, state_key, event_id)) + .ok() + }) + .filter_map(|(event_type, state_key, event_id)| async move { + if lazy_load_enabled && event_type == StateEventType::RoomMember { + let user_id: &UserId = state_key.as_str().try_into().ok()?; + if !lazy.contains(user_id) { + return None; + } + } - let mut state = Vec::with_capacity(state_ids.len()); - - for (shortstatekey, id) in state_ids { - let (event_type, state_key) = services - .rooms - .short - .get_statekey_from_short(shortstatekey) - .await?; - - if event_type != StateEventType::RoomMember { - let Ok(pdu) = services.rooms.timeline.get_pdu(&id).await else { - error!("Pdu in state not found: {id}"); - continue; - }; - - state.push(pdu.to_state_event()); - } else if !lazy_load_enabled || lazy_loaded.contains(&state_key) { - let Ok(pdu) = services.rooms.timeline.get_pdu(&id).await else { - error!("Pdu in state not found: {id}"); - continue; - }; - - state.push(pdu.to_state_event()); - } - } + services + .rooms + .timeline + .get_pdu(event_id) + .await + .inspect_err(|_| error!("Pdu in state not found: {event_id}")) + .map(|pdu| pdu.to_state_event()) + .ok() + }) + .collect() + .await; Ok(get_context::v3::Response { - start: Some(start_token), - end: Some(end_token), + event: Some(base_event.to_room_event()), + + start: events_before + .last() + .map_or_else(|| base_token.stringify(), |(count, _)| count.stringify()) + .into(), + + end: events_after + .last() + .map_or_else(|| base_token.stringify(), |(count, _)| count.stringify()) + .into(), + events_before: events_before - .iter() + .into_iter() .map(|(_, pdu)| pdu.to_room_event()) .collect(), - event: Some(base_event), + events_after: events_after - .iter() + .into_iter() .map(|(_, pdu)| pdu.to_room_event()) .collect(), + state, }) } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 094daa30..4fc58d9f 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -1,111 +1,52 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use axum::extract::State; use conduit::{ - err, - utils::{IterStream, ReadyExt}, - Err, PduCount, + at, is_equal_to, + utils::{ + result::{FlatOk, LogErr}, + IterStream, ReadyExt, + }, + Event, PduCount, Result, }; use futures::{FutureExt, StreamExt}; use ruma::{ - api::client::{ - filter::RoomEventFilter, - message::{get_message_events, send_message_event}, + api::{ + client::{filter::RoomEventFilter, message::get_message_events}, + Direction, }, - events::{MessageLikeEventType, StateEventType, TimelineEventType::*}, - UserId, + events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*}, + serde::Raw, + DeviceId, OwnedUserId, RoomId, UserId, }; -use serde_json::from_str; -use service::rooms::timeline::PdusIterItem; +use service::{rooms::timeline::PdusIterItem, Services}; -use crate::{ - service::{pdu::PduBuilder, Services}, - utils, Result, Ruma, -}; +use crate::Ruma; -/// # `PUT /_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}` -/// -/// Send a message event into the room. -/// -/// - Is a NOOP if the txn id was already used before and returns the same event -/// id again -/// - The only requirement for the content is that it has to be valid json -/// - Tries to send the event into the room, auth rules will determine if it is -/// allowed -pub(crate) async fn send_message_event_route( - State(services): State, body: Ruma, -) -> Result { - let sender_user = body.sender_user.as_deref().expect("user is authenticated"); - let sender_device = body.sender_device.as_deref(); - let appservice_info = body.appservice_info.as_ref(); +pub(crate) type LazySet = HashSet; - // Forbid m.room.encrypted if encryption is disabled - if MessageLikeEventType::RoomEncrypted == body.event_type && !services.globals.allow_encryption() { - return Err!(Request(Forbidden("Encryption has been disabled"))); - } +/// list of safe and common non-state events to ignore +const IGNORED_MESSAGE_TYPES: &[TimelineEventType] = &[ + RoomMessage, + Sticker, + CallInvite, + CallNotify, + RoomEncrypted, + Image, + File, + Audio, + Voice, + Video, + UnstablePollStart, + PollStart, + KeyVerificationStart, + Reaction, + Emote, + Location, +]; - let state_lock = services.rooms.state.mutex.lock(&body.room_id).await; - - if body.event_type == MessageLikeEventType::CallInvite - && services.rooms.directory.is_public_room(&body.room_id).await - { - return Err!(Request(Forbidden("Room call invites are not allowed in public rooms"))); - } - - // Check if this is a new transaction id - if let Ok(response) = services - .transaction_ids - .existing_txnid(sender_user, sender_device, &body.txn_id) - .await - { - // The client might have sent a txnid of the /sendToDevice endpoint - // This txnid has no response associated with it - if response.is_empty() { - return Err!(Request(InvalidParam( - "Tried to use txn id already used for an incompatible endpoint." - ))); - } - - return Ok(send_message_event::v3::Response { - event_id: utils::string_from_bytes(&response) - .map(TryInto::try_into) - .map_err(|e| err!(Database("Invalid event_id in txnid data: {e:?}")))??, - }); - } - - let mut unsigned = BTreeMap::new(); - unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into()); - - let content = - from_str(body.body.body.json().get()).map_err(|e| err!(Request(BadJson("Invalid JSON body: {e}"))))?; - - let event_id = services - .rooms - .timeline - .build_and_append_pdu( - PduBuilder { - event_type: body.event_type.clone().into(), - content, - unsigned: Some(unsigned), - timestamp: appservice_info.and(body.timestamp), - ..Default::default() - }, - sender_user, - &body.room_id, - &state_lock, - ) - .await?; - - services - .transaction_ids - .add_txnid(sender_user, sender_device, &body.txn_id, event_id.as_bytes()); - - drop(state_lock); - - Ok(send_message_event::v3::Response { - event_id: event_id.into(), - }) -} +const LIMIT_MAX: usize = 100; +const LIMIT_DEFAULT: usize = 10; /// # `GET /_matrix/client/r0/rooms/{roomId}/messages` /// @@ -116,209 +57,171 @@ pub(crate) async fn send_message_event_route( pub(crate) async fn get_message_events_route( State(services): State, body: Ruma, ) -> Result { - let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let sender_device = body.sender_device.as_ref().expect("user is authenticated"); - + let sender = body.sender(); + let (sender_user, sender_device) = sender; let room_id = &body.room_id; let filter = &body.filter; - let limit = usize::try_from(body.limit).unwrap_or(10).min(100); - let from = match body.from.as_ref() { - Some(from) => PduCount::try_from_string(from)?, - None => match body.dir { - ruma::api::Direction::Forward => PduCount::min(), - ruma::api::Direction::Backward => PduCount::max(), - }, + let from_default = match body.dir { + Direction::Forward => PduCount::min(), + Direction::Backward => PduCount::max(), }; - let to = body - .to - .as_ref() - .and_then(|t| PduCount::try_from_string(t).ok()); + let from = body + .from + .as_deref() + .map(PduCount::try_from_string) + .transpose()? + .unwrap_or(from_default); + + let to = body.to.as_deref().map(PduCount::try_from_string).flat_ok(); + + let limit: usize = body + .limit + .try_into() + .unwrap_or(LIMIT_DEFAULT) + .min(LIMIT_MAX); services .rooms .lazy_loading .lazy_load_confirm_delivery(sender_user, sender_device, room_id, from); - let mut resp = get_message_events::v3::Response::new(); - let mut lazy_loaded = HashSet::new(); - let next_token; - match body.dir { - ruma::api::Direction::Forward => { - let events_after: Vec = services - .rooms - .timeline - .pdus_after(sender_user, room_id, from) - .await? - .ready_filter_map(|item| event_filter(item, filter)) - .filter_map(|item| visibility_filter(&services, item, sender_user)) - .ready_take_while(|(count, _)| Some(*count) != to) // Stop at `to` - .take(limit) - .collect() - .boxed() - .await; - - for (_, event) in &events_after { - /* TODO: Remove the not "element_hacks" check when these are resolved: - * https://github.com/vector-im/element-android/issues/3417 - * https://github.com/vector-im/element-web/issues/21034 - */ - if !cfg!(feature = "element_hacks") - && !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, &event.sender) - .await - { - lazy_loaded.insert(event.sender.clone()); - } - - if cfg!(features = "element_hacks") { - lazy_loaded.insert(event.sender.clone()); - } - } - - next_token = events_after.last().map(|(count, _)| count).copied(); - - let events_after: Vec<_> = events_after - .into_iter() - .stream() - .filter_map(|(_, pdu)| async move { - // list of safe and common non-state events to ignore - if matches!( - &pdu.kind, - RoomMessage - | Sticker | CallInvite - | CallNotify | RoomEncrypted - | Image | File | Audio - | Voice | Video | UnstablePollStart - | PollStart | KeyVerificationStart - | Reaction | Emote | Location - ) && services - .users - .user_is_ignored(&pdu.sender, sender_user) - .await - { - return None; - } - - Some(pdu.to_room_event()) - }) - .collect() - .await; - - resp.start = from.stringify(); - resp.end = next_token.map(|count| count.stringify()); - resp.chunk = events_after; - }, - ruma::api::Direction::Backward => { - services - .rooms - .timeline - .backfill_if_required(room_id, from) - .boxed() - .await?; - - let events_before: Vec = services - .rooms - .timeline - .pdus_until(sender_user, room_id, from) - .await? - .ready_filter_map(|item| event_filter(item, filter)) - .filter_map(|(count, pdu)| async move { - // list of safe and common non-state events to ignore - if matches!( - &pdu.kind, - RoomMessage - | Sticker | CallInvite - | CallNotify | RoomEncrypted - | Image | File | Audio - | Voice | Video | UnstablePollStart - | PollStart | KeyVerificationStart - | Reaction | Emote | Location - ) && services - .users - .user_is_ignored(&pdu.sender, sender_user) - .await - { - return None; - } - - Some((count, pdu)) - }) - .filter_map(|item| visibility_filter(&services, item, sender_user)) - .ready_take_while(|(count, _)| Some(*count) != to) // Stop at `to` - .take(limit) - .collect() - .boxed() - .await; - - for (_, event) in &events_before { - /* TODO: Remove the not "element_hacks" check when these are resolved: - * https://github.com/vector-im/element-android/issues/3417 - * https://github.com/vector-im/element-web/issues/21034 - */ - if !cfg!(feature = "element_hacks") - && !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, &event.sender) - .await - { - lazy_loaded.insert(event.sender.clone()); - } - - if cfg!(features = "element_hacks") { - lazy_loaded.insert(event.sender.clone()); - } - } - - next_token = events_before.last().map(|(count, _)| count).copied(); - - let events_before: Vec<_> = events_before - .into_iter() - .map(|(_, pdu)| pdu.to_room_event()) - .collect(); - - resp.start = from.stringify(); - resp.end = next_token.map(|count| count.stringify()); - resp.chunk = events_before; - }, + if matches!(body.dir, Direction::Backward) { + services + .rooms + .timeline + .backfill_if_required(room_id, from) + .boxed() + .await + .log_err() + .ok(); } - resp.state = lazy_loaded - .iter() - .stream() - .filter_map(|ll_user_id| async move { - services - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomMember, ll_user_id.as_str()) - .await - .map(|member_event| member_event.to_state_event()) - .ok() - }) + let it = match body.dir { + Direction::Forward => services + .rooms + .timeline + .pdus_after(sender_user, room_id, from) + .await? + .boxed(), + + Direction::Backward => services + .rooms + .timeline + .pdus_until(sender_user, room_id, from) + .await? + .boxed(), + }; + + let events: Vec<_> = it + .ready_take_while(|(count, _)| Some(*count) != to) + .ready_filter_map(|item| event_filter(item, filter)) + .filter_map(|item| ignored_filter(&services, item, sender_user)) + .filter_map(|item| visibility_filter(&services, item, sender_user)) + .take(limit) .collect() .await; - // remove the feature check when we are sure clients like element can handle it + let lazy = events + .iter() + .stream() + .fold(LazySet::new(), |lazy, item| { + update_lazy(&services, room_id, sender, lazy, item, false) + }) + .await; + + let state = lazy + .iter() + .stream() + .filter_map(|user_id| get_member_event(&services, room_id, user_id)) + .collect() + .await; + + let next_token = events.last().map(|(count, _)| count).copied(); + if !cfg!(feature = "element_hacks") { if let Some(next_token) = next_token { - services.rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_token, - ); + services + .rooms + .lazy_loading + .lazy_load_mark_sent(sender_user, sender_device, room_id, lazy, next_token); } } - Ok(resp) + let chunk = events + .into_iter() + .map(at!(1)) + .map(|pdu| pdu.to_room_event()) + .collect(); + + Ok(get_message_events::v3::Response { + start: from.stringify(), + end: next_token.as_ref().map(PduCount::stringify), + chunk, + state, + }) } -async fn visibility_filter(services: &Services, item: PdusIterItem, user_id: &UserId) -> Option { +async fn get_member_event(services: &Services, room_id: &RoomId, user_id: &UserId) -> Option> { + services + .rooms + .state_accessor + .room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str()) + .await + .map(|member_event| member_event.to_state_event()) + .ok() +} + +pub(crate) async fn update_lazy( + services: &Services, room_id: &RoomId, sender: (&UserId, &DeviceId), mut lazy: LazySet, item: &PdusIterItem, + force: bool, +) -> LazySet { + let (_, event) = &item; + let (sender_user, sender_device) = sender; + + /* TODO: Remove the not "element_hacks" check when these are resolved: + * https://github.com/vector-im/element-android/issues/3417 + * https://github.com/vector-im/element-web/issues/21034 + */ + if force || cfg!(features = "element_hacks") { + lazy.insert(event.sender().into()); + return lazy; + } + + if !services + .rooms + .lazy_loading + .lazy_load_was_sent_before(sender_user, sender_device, room_id, event.sender()) + .await + { + lazy.insert(event.sender().into()); + } + + lazy +} + +pub(crate) async fn ignored_filter(services: &Services, item: PdusIterItem, user_id: &UserId) -> Option { + let (_, pdu) = &item; + + if pdu.kind.to_cow_str() == "org.matrix.dummy_event" { + return None; + } + + if !IGNORED_MESSAGE_TYPES.iter().any(is_equal_to!(&pdu.kind)) { + return Some(item); + } + + if !services.users.user_is_ignored(&pdu.sender, user_id).await { + return Some(item); + } + + None +} + +pub(crate) async fn visibility_filter( + services: &Services, item: PdusIterItem, user_id: &UserId, +) -> Option { let (_, pdu) = &item; services @@ -329,7 +232,7 @@ async fn visibility_filter(services: &Services, item: PdusIterItem, user_id: &Us .then_some(item) } -fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option { +pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option { let (_, pdu) = &item; pdu.matches(filter).then_some(item) } diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 2928be87..9ee88bec 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -23,6 +23,7 @@ pub(super) mod relations; pub(super) mod report; pub(super) mod room; pub(super) mod search; +pub(super) mod send; pub(super) mod session; pub(super) mod space; pub(super) mod state; @@ -65,6 +66,7 @@ pub(super) use relations::*; pub(super) use report::*; pub(super) use room::*; pub(super) use search::*; +pub(super) use send::*; pub(super) use session::*; pub(super) use space::*; pub(super) use state::*; diff --git a/src/api/client/send.rs b/src/api/client/send.rs new file mode 100644 index 00000000..ff011efa --- /dev/null +++ b/src/api/client/send.rs @@ -0,0 +1,92 @@ +use std::collections::BTreeMap; + +use axum::extract::State; +use conduit::{err, Err}; +use ruma::{api::client::message::send_message_event, events::MessageLikeEventType}; +use serde_json::from_str; + +use crate::{service::pdu::PduBuilder, utils, Result, Ruma}; + +/// # `PUT /_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}` +/// +/// Send a message event into the room. +/// +/// - Is a NOOP if the txn id was already used before and returns the same event +/// id again +/// - The only requirement for the content is that it has to be valid json +/// - Tries to send the event into the room, auth rules will determine if it is +/// allowed +pub(crate) async fn send_message_event_route( + State(services): State, body: Ruma, +) -> Result { + let sender_user = body.sender_user(); + let sender_device = body.sender_device.as_deref(); + let appservice_info = body.appservice_info.as_ref(); + + // Forbid m.room.encrypted if encryption is disabled + if MessageLikeEventType::RoomEncrypted == body.event_type && !services.globals.allow_encryption() { + return Err!(Request(Forbidden("Encryption has been disabled"))); + } + + let state_lock = services.rooms.state.mutex.lock(&body.room_id).await; + + if body.event_type == MessageLikeEventType::CallInvite + && services.rooms.directory.is_public_room(&body.room_id).await + { + return Err!(Request(Forbidden("Room call invites are not allowed in public rooms"))); + } + + // Check if this is a new transaction id + if let Ok(response) = services + .transaction_ids + .existing_txnid(sender_user, sender_device, &body.txn_id) + .await + { + // The client might have sent a txnid of the /sendToDevice endpoint + // This txnid has no response associated with it + if response.is_empty() { + return Err!(Request(InvalidParam( + "Tried to use txn id already used for an incompatible endpoint." + ))); + } + + return Ok(send_message_event::v3::Response { + event_id: utils::string_from_bytes(&response) + .map(TryInto::try_into) + .map_err(|e| err!(Database("Invalid event_id in txnid data: {e:?}")))??, + }); + } + + let mut unsigned = BTreeMap::new(); + unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into()); + + let content = + from_str(body.body.body.json().get()).map_err(|e| err!(Request(BadJson("Invalid JSON body: {e}"))))?; + + let event_id = services + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: body.event_type.clone().into(), + content, + unsigned: Some(unsigned), + timestamp: appservice_info.and(body.timestamp), + ..Default::default() + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; + + services + .transaction_ids + .add_txnid(sender_user, sender_device, &body.txn_id, event_id.as_bytes()); + + drop(state_lock); + + Ok(send_message_event::v3::Response { + event_id: event_id.into(), + }) +} diff --git a/src/api/router/args.rs b/src/api/router/args.rs index cefacac1..38236db3 100644 --- a/src/api/router/args.rs +++ b/src/api/router/args.rs @@ -3,7 +3,9 @@ use std::{mem, ops::Deref}; use axum::{async_trait, body::Body, extract::FromRequest}; use bytes::{BufMut, BytesMut}; use conduit::{debug, err, trace, utils::string::EMPTY, Error, Result}; -use ruma::{api::IncomingRequest, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, ServerName, UserId}; +use ruma::{ + api::IncomingRequest, CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedServerName, OwnedUserId, ServerName, UserId, +}; use service::Services; use super::{auth, auth::Auth, request, request::Request}; @@ -40,10 +42,28 @@ where T: IncomingRequest + Send + Sync + 'static, { #[inline] - pub(crate) fn sender_user(&self) -> &UserId { self.sender_user.as_deref().expect("user is authenticated") } + pub(crate) fn sender(&self) -> (&UserId, &DeviceId) { (self.sender_user(), self.sender_device()) } #[inline] - pub(crate) fn origin(&self) -> &ServerName { self.origin.as_deref().expect("server is authenticated") } + pub(crate) fn sender_user(&self) -> &UserId { + self.sender_user + .as_deref() + .expect("user must be authenticated for this handler") + } + + #[inline] + pub(crate) fn sender_device(&self) -> &DeviceId { + self.sender_device + .as_deref() + .expect("user must be authenticated and device identified") + } + + #[inline] + pub(crate) fn origin(&self) -> &ServerName { + self.origin + .as_deref() + .expect("server must be authenticated for this handler") + } } #[async_trait]