diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index f0b26e80..65d62a78 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -14,7 +14,7 @@ use conduit::{ }, warn, PduCount, }; -use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt}; +use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt, TryFutureExt}; use ruma::{ api::client::{ error::ErrorKind, @@ -681,20 +681,22 @@ async fn load_joined_room( )) }; - let since_sender_member: Option = if let Some(short) = since_shortstatehash { + let get_sender_member_content = |short| { services .rooms .state_accessor - .state_get(short, &StateEventType::RoomMember, sender_user.as_str()) - .await - .and_then(|pdu| serde_json::from_str(pdu.content.get()).map_err(Into::into)) + .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) .ok() - } else { - None }; - let joined_since_last_sync = - since_sender_member.map_or(true, |member| member.membership != MembershipState::Join); + let since_sender_member: OptionFuture<_> = since_shortstatehash.map(get_sender_member_content).into(); + + let joined_since_last_sync = since_sender_member + .await + .flatten() + .map_or(true, |content: RoomMemberEventContent| { + content.membership != MembershipState::Join + }); if since_shortstatehash.is_none() || joined_since_last_sync { // Probably since = 0, we will do an initial sync @@ -1296,18 +1298,6 @@ pub(crate) async fn sync_events_v4_route( .await .ok(); - let since_sender_member: Option = if let Some(short) = since_shortstatehash { - services - .rooms - .state_accessor - .state_get(short, &StateEventType::RoomMember, sender_user.as_str()) - .await - .and_then(|pdu| serde_json::from_str(pdu.content.get()).map_err(Into::into)) - .ok() - } else { - None - }; - let encrypted_room = services .rooms .state_accessor @@ -1327,6 +1317,13 @@ pub(crate) async fn sync_events_v4_route( .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") .await; + let since_sender_member: Option = services + .rooms + .state_accessor + .state_get_content(since_shortstatehash, &StateEventType::RoomMember, sender_user.as_str()) + .ok() + .await; + let joined_since_last_sync = since_sender_member.map_or(true, |member| member.membership != MembershipState::Join); diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 4c28483c..ece8679d 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -33,8 +33,8 @@ use ruma::{ }, room::RoomType, space::SpaceRoomJoinRule, - EventEncryptionAlgorithm, EventId, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, - UserId, + EventEncryptionAlgorithm, EventId, JsOption, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, + ServerName, UserId, }; use serde::Deserialize; use serde_json::value::to_raw_value; @@ -125,16 +125,23 @@ impl Service { .await } + /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). + pub async fn state_get_content( + &self, shortstatehash: u64, event_type: &StateEventType, state_key: &str, + ) -> Result + where + T: for<'de> Deserialize<'de> + Send, + { + self.state_get(shortstatehash, event_type, state_key) + .await + .and_then(|event| event.get_content()) + } + /// Get membership for given user in state async fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> MembershipState { - self.state_get(shortstatehash, &StateEventType::RoomMember, user_id.as_str()) + self.state_get_content(shortstatehash, &StateEventType::RoomMember, user_id.as_str()) .await - .map_or(MembershipState::Leave, |s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomMemberEventContent| c.membership) - .map_err(|_| Error::bad_database("Invalid room membership event in database.")) - .unwrap() - }) + .map_or(MembershipState::Leave, |c: RoomMemberEventContent| c.membership) } /// The user was a joined member at this state (potentially in the past) @@ -171,19 +178,10 @@ impl Service { } let history_visibility = self - .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "") + .state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "") .await - .map_or(HistoryVisibility::Shared, |s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomHistoryVisibilityEventContent| c.history_visibility) - .map_err(|e| { - error!( - "Invalid history visibility event in database for room {room_id}, assuming is \"shared\": \ - {e}" - ); - Error::bad_database("Invalid history visibility event in database.") - }) - .unwrap() + .map_or(HistoryVisibility::Shared, |c: RoomHistoryVisibilityEventContent| { + c.history_visibility }); let current_server_members = self @@ -240,19 +238,10 @@ impl Service { let currently_member = self.services.state_cache.is_joined(user_id, room_id).await; let history_visibility = self - .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "") + .state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "") .await - .map_or(HistoryVisibility::Shared, |s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomHistoryVisibilityEventContent| c.history_visibility) - .map_err(|e| { - error!( - "Invalid history visibility event in database for room {room_id}, assuming is \"shared\": \ - {e}" - ); - Error::bad_database("Invalid history visibility event in database.") - }) - .unwrap() + .map_or(HistoryVisibility::Shared, |c: RoomHistoryVisibilityEventContent| { + c.history_visibility }); let visibility = match history_visibility { @@ -284,25 +273,18 @@ impl Service { /// the room's history_visibility at that event's state. #[tracing::instrument(skip(self, user_id, room_id))] pub async fn user_can_see_state_events(&self, user_id: &UserId, room_id: &RoomId) -> bool { - let currently_member = self.services.state_cache.is_joined(user_id, room_id).await; + if self.services.state_cache.is_joined(user_id, room_id).await { + return true; + } let history_visibility = self - .room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "") + .room_state_get_content(room_id, &StateEventType::RoomHistoryVisibility, "") .await - .map_or(Ok(HistoryVisibility::Shared), |s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomHistoryVisibilityEventContent| c.history_visibility) - .map_err(|e| { - error!( - "Invalid history visibility event in database for room {room_id}, assuming is \"shared\": \ - {e}" - ); - Error::bad_database("Invalid history visibility event in database.") - }) - }) - .unwrap_or(HistoryVisibility::Shared); + .map_or(HistoryVisibility::Shared, |c: RoomHistoryVisibilityEventContent| { + c.history_visibility + }); - currently_member || history_visibility == HistoryVisibility::WorldReadable + history_visibility == HistoryVisibility::WorldReadable } /// Returns the state hash for this pdu.