Add state_get_content(shortid) for serde_json::from elim
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
da34b43302
commit
68315ac112
2 changed files with 48 additions and 69 deletions
|
@ -14,7 +14,7 @@ use conduit::{
|
||||||
},
|
},
|
||||||
warn, PduCount,
|
warn, PduCount,
|
||||||
};
|
};
|
||||||
use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt};
|
use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt, TryFutureExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::{
|
api::client::{
|
||||||
error::ErrorKind,
|
error::ErrorKind,
|
||||||
|
@ -681,20 +681,22 @@ async fn load_joined_room(
|
||||||
))
|
))
|
||||||
};
|
};
|
||||||
|
|
||||||
let since_sender_member: Option<RoomMemberEventContent> = if let Some(short) = since_shortstatehash {
|
let get_sender_member_content = |short| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.state_get(short, &StateEventType::RoomMember, sender_user.as_str())
|
.state_get_content(short, &StateEventType::RoomMember, sender_user.as_str())
|
||||||
.await
|
|
||||||
.and_then(|pdu| serde_json::from_str(pdu.content.get()).map_err(Into::into))
|
|
||||||
.ok()
|
.ok()
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let joined_since_last_sync =
|
let since_sender_member: OptionFuture<_> = since_shortstatehash.map(get_sender_member_content).into();
|
||||||
since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
|
|
||||||
|
let joined_since_last_sync = since_sender_member
|
||||||
|
.await
|
||||||
|
.flatten()
|
||||||
|
.map_or(true, |content: RoomMemberEventContent| {
|
||||||
|
content.membership != MembershipState::Join
|
||||||
|
});
|
||||||
|
|
||||||
if since_shortstatehash.is_none() || joined_since_last_sync {
|
if since_shortstatehash.is_none() || joined_since_last_sync {
|
||||||
// Probably since = 0, we will do an initial sync
|
// Probably since = 0, we will do an initial sync
|
||||||
|
@ -1296,18 +1298,6 @@ pub(crate) async fn sync_events_v4_route(
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
let since_sender_member: Option<RoomMemberEventContent> = if let Some(short) = since_shortstatehash {
|
|
||||||
services
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.state_get(short, &StateEventType::RoomMember, sender_user.as_str())
|
|
||||||
.await
|
|
||||||
.and_then(|pdu| serde_json::from_str(pdu.content.get()).map_err(Into::into))
|
|
||||||
.ok()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let encrypted_room = services
|
let encrypted_room = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
|
@ -1327,6 +1317,13 @@ pub(crate) async fn sync_events_v4_route(
|
||||||
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
|
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
let since_sender_member: Option<RoomMemberEventContent> = services
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.state_get_content(since_shortstatehash, &StateEventType::RoomMember, sender_user.as_str())
|
||||||
|
.ok()
|
||||||
|
.await;
|
||||||
|
|
||||||
let joined_since_last_sync =
|
let joined_since_last_sync =
|
||||||
since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
|
since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
|
||||||
|
|
||||||
|
|
|
@ -33,8 +33,8 @@ use ruma::{
|
||||||
},
|
},
|
||||||
room::RoomType,
|
room::RoomType,
|
||||||
space::SpaceRoomJoinRule,
|
space::SpaceRoomJoinRule,
|
||||||
EventEncryptionAlgorithm, EventId, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName,
|
EventEncryptionAlgorithm, EventId, JsOption, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
|
||||||
UserId,
|
ServerName, UserId,
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
|
@ -125,16 +125,23 @@ impl Service {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
|
||||||
|
pub async fn state_get_content<T>(
|
||||||
|
&self, shortstatehash: u64, event_type: &StateEventType, state_key: &str,
|
||||||
|
) -> Result<T>
|
||||||
|
where
|
||||||
|
T: for<'de> Deserialize<'de> + Send,
|
||||||
|
{
|
||||||
|
self.state_get(shortstatehash, event_type, state_key)
|
||||||
|
.await
|
||||||
|
.and_then(|event| event.get_content())
|
||||||
|
}
|
||||||
|
|
||||||
/// Get membership for given user in state
|
/// Get membership for given user in state
|
||||||
async fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> MembershipState {
|
async fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> MembershipState {
|
||||||
self.state_get(shortstatehash, &StateEventType::RoomMember, user_id.as_str())
|
self.state_get_content(shortstatehash, &StateEventType::RoomMember, user_id.as_str())
|
||||||
.await
|
.await
|
||||||
.map_or(MembershipState::Leave, |s| {
|
.map_or(MembershipState::Leave, |c: RoomMemberEventContent| c.membership)
|
||||||
serde_json::from_str(s.content.get())
|
|
||||||
.map(|c: RoomMemberEventContent| c.membership)
|
|
||||||
.map_err(|_| Error::bad_database("Invalid room membership event in database."))
|
|
||||||
.unwrap()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The user was a joined member at this state (potentially in the past)
|
/// The user was a joined member at this state (potentially in the past)
|
||||||
|
@ -171,19 +178,10 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
let history_visibility = self
|
let history_visibility = self
|
||||||
.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")
|
.state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")
|
||||||
.await
|
.await
|
||||||
.map_or(HistoryVisibility::Shared, |s| {
|
.map_or(HistoryVisibility::Shared, |c: RoomHistoryVisibilityEventContent| {
|
||||||
serde_json::from_str(s.content.get())
|
c.history_visibility
|
||||||
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!(
|
|
||||||
"Invalid history visibility event in database for room {room_id}, assuming is \"shared\": \
|
|
||||||
{e}"
|
|
||||||
);
|
|
||||||
Error::bad_database("Invalid history visibility event in database.")
|
|
||||||
})
|
|
||||||
.unwrap()
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let current_server_members = self
|
let current_server_members = self
|
||||||
|
@ -240,19 +238,10 @@ impl Service {
|
||||||
let currently_member = self.services.state_cache.is_joined(user_id, room_id).await;
|
let currently_member = self.services.state_cache.is_joined(user_id, room_id).await;
|
||||||
|
|
||||||
let history_visibility = self
|
let history_visibility = self
|
||||||
.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")
|
.state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")
|
||||||
.await
|
.await
|
||||||
.map_or(HistoryVisibility::Shared, |s| {
|
.map_or(HistoryVisibility::Shared, |c: RoomHistoryVisibilityEventContent| {
|
||||||
serde_json::from_str(s.content.get())
|
c.history_visibility
|
||||||
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!(
|
|
||||||
"Invalid history visibility event in database for room {room_id}, assuming is \"shared\": \
|
|
||||||
{e}"
|
|
||||||
);
|
|
||||||
Error::bad_database("Invalid history visibility event in database.")
|
|
||||||
})
|
|
||||||
.unwrap()
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let visibility = match history_visibility {
|
let visibility = match history_visibility {
|
||||||
|
@ -284,25 +273,18 @@ impl Service {
|
||||||
/// the room's history_visibility at that event's state.
|
/// the room's history_visibility at that event's state.
|
||||||
#[tracing::instrument(skip(self, user_id, room_id))]
|
#[tracing::instrument(skip(self, user_id, room_id))]
|
||||||
pub async fn user_can_see_state_events(&self, user_id: &UserId, room_id: &RoomId) -> bool {
|
pub async fn user_can_see_state_events(&self, user_id: &UserId, room_id: &RoomId) -> bool {
|
||||||
let currently_member = self.services.state_cache.is_joined(user_id, room_id).await;
|
if self.services.state_cache.is_joined(user_id, room_id).await {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
let history_visibility = self
|
let history_visibility = self
|
||||||
.room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")
|
.room_state_get_content(room_id, &StateEventType::RoomHistoryVisibility, "")
|
||||||
.await
|
.await
|
||||||
.map_or(Ok(HistoryVisibility::Shared), |s| {
|
.map_or(HistoryVisibility::Shared, |c: RoomHistoryVisibilityEventContent| {
|
||||||
serde_json::from_str(s.content.get())
|
c.history_visibility
|
||||||
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
|
});
|
||||||
.map_err(|e| {
|
|
||||||
error!(
|
|
||||||
"Invalid history visibility event in database for room {room_id}, assuming is \"shared\": \
|
|
||||||
{e}"
|
|
||||||
);
|
|
||||||
Error::bad_database("Invalid history visibility event in database.")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.unwrap_or(HistoryVisibility::Shared);
|
|
||||||
|
|
||||||
currently_member || history_visibility == HistoryVisibility::WorldReadable
|
history_visibility == HistoryVisibility::WorldReadable
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the state hash for this pdu.
|
/// Returns the state hash for this pdu.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue