From 4ff1155bf0aefddd02e34ed9c709db25c0da3ecd Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 31 Jan 2025 01:23:27 +0000 Subject: [PATCH] reroll encrypted_room branch in incremental sync state Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 150 ++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 81 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index cd4dfc90..f5b612e4 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -6,7 +6,7 @@ use std::{ use axum::extract::State; use conduwuit::{ - at, err, error, extract_variant, is_equal_to, + at, err, error, extract_variant, is_equal_to, pair_of, pdu::EventHash, result::FlatOk, utils::{ @@ -16,7 +16,7 @@ use conduwuit::{ stream::{BroadbandExt, Tools, WidebandExt}, BoolExt, IterStream, ReadyExt, TryFutureExtExt, }, - Error, PduCount, PduEvent, Result, + PduCount, PduEvent, Result, }; use conduwuit_service::{ rooms::{ @@ -64,6 +64,8 @@ struct StateChanges { invited_member_count: Option, joined_since_last_sync: bool, state_events: Vec, + device_list_updates: HashSet, + left_encrypted_users: HashSet, } type PresenceUpdates = HashMap; @@ -325,18 +327,16 @@ pub(crate) async fn build_sync_events( // If the user doesn't share an encrypted room with the target anymore, we need // to tell them - let device_list_left = left_encrypted_users + let device_list_left: HashSet<_> = left_encrypted_users .into_iter() .stream() .broad_filter_map(|user_id| async move { - let no_shared_encrypted_room = - !share_encrypted_room(services, sender_user, &user_id, None).await; - no_shared_encrypted_room.then_some(user_id) - }) - .ready_fold(HashSet::new(), |mut device_list_left, user_id| { - device_list_left.insert(user_id); - device_list_left + share_encrypted_room(services, sender_user, &user_id, None) + .await + .eq(&false) + .then_some(user_id) }) + .collect() .await; let response = sync_events::v3::Response { @@ -730,14 +730,14 @@ async fn load_joined_room( .into(); let witness = witness.await; - let mut device_list_updates = HashSet::::new(); - let mut left_encrypted_users = HashSet::::new(); let StateChanges { heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events, + mut device_list_updates, + left_encrypted_users, } = if no_state_changes { StateChanges::default() } else { @@ -747,8 +747,6 @@ async fn load_joined_room( room_id, full_state, filter, - &mut device_list_updates, - &mut left_encrypted_users, since_shortstatehash, current_shortstatehash, joined_since_last_sync, @@ -919,8 +917,6 @@ async fn calculate_state_changes( room_id: &RoomId, full_state: bool, filter: &FilterDefinition, - device_list_updates: &mut HashSet, - left_encrypted_users: &mut HashSet, since_shortstatehash: Option, current_shortstatehash: ShortStateHash, joined_since_last_sync: bool, @@ -944,8 +940,6 @@ async fn calculate_state_changes( room_id, full_state, filter, - device_list_updates, - left_encrypted_users, since_shortstatehash, current_shortstatehash, joined_since_last_sync, @@ -1013,6 +1007,7 @@ async fn calculate_state_initial( invited_member_count, joined_since_last_sync: true, state_events, + ..Default::default() }) } @@ -1024,8 +1019,6 @@ async fn calculate_state_incremental( room_id: &RoomId, full_state: bool, _filter: &FilterDefinition, - device_list_updates: &mut HashSet, - left_encrypted_users: &mut HashSet, since_shortstatehash: Option, current_shortstatehash: ShortStateHash, joined_since_last_sync: bool, @@ -1063,79 +1056,72 @@ async fn calculate_state_incremental( .await; } - let encrypted_room = services - .rooms - .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); - let since_encryption = services .rooms .state_accessor .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") .is_ok(); - let (encrypted_room, since_encryption) = join(encrypted_room, since_encryption).await; + let encrypted_room = services + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok() + .await; - // Calculations: - let new_encrypted_room = encrypted_room && !since_encryption; + let (mut device_list_updates, left_encrypted_users) = delta_state_events + .iter() + .stream() + .ready_filter(|_| encrypted_room) + .ready_filter(|state_event| state_event.kind == RoomMember) + .ready_filter_map(|state_event| { + let content = state_event.get_content().ok()?; + let user_id = state_event.state_key.as_ref()?.parse().ok()?; + Some((content, user_id)) + }) + .ready_filter(|(_, user_id): &(RoomMemberEventContent, OwnedUserId)| { + user_id != sender_user + }) + .fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move { + use MembershipState::*; + + let shares_encrypted_room = + |user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id)); + + match content.membership { + | Join if !shares_encrypted_room(&user_id).await => dlu.insert(user_id), + | Leave => leu.insert(user_id), + | _ => false, + }; + + (dlu, leu) + }) + .await; + + // If the user is in a new encrypted room, give them all joined users + let new_encrypted_room = encrypted_room && !since_encryption.await; + if joined_since_last_sync && encrypted_room || new_encrypted_room { + services + .rooms + .state_cache + .room_members(room_id) + .ready_filter(|&user_id| sender_user != user_id) + .map(ToOwned::to_owned) + .broad_filter_map(|user_id| async move { + share_encrypted_room(services, sender_user, &user_id, Some(room_id)) + .await + .or_some(user_id) + }) + .ready_for_each(|user_id| { + device_list_updates.insert(user_id); + }) + .await; + } let send_member_count = delta_state_events .iter() .any(|event| event.kind == RoomMember); - if encrypted_room { - for state_event in &delta_state_events { - if state_event.kind != RoomMember { - continue; - } - - if let Some(state_key) = &state_event.state_key { - let user_id = UserId::parse(state_key) - .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; - - if user_id == sender_user { - continue; - } - - let content: RoomMemberEventContent = state_event.get_content()?; - - match content.membership { - | MembershipState::Join => { - // A new user joined an encrypted room - if !share_encrypted_room(services, sender_user, user_id, Some(room_id)) - .await - { - device_list_updates.insert(user_id.into()); - } - }, - | MembershipState::Leave => { - // Write down users that have left encrypted rooms we are in - left_encrypted_users.insert(user_id.into()); - }, - | _ => {}, - } - } - } - } - - if joined_since_last_sync && encrypted_room || new_encrypted_room { - let updates: Vec = services - .rooms - .state_cache - .room_members(room_id) - .ready_filter(|user_id| sender_user != *user_id) - .filter_map(|user_id| { - share_encrypted_room(services, sender_user, user_id, Some(room_id)) - .map(|res| res.or_some(user_id.to_owned())) - }) - .collect() - .await; - - // If the user is in a new encrypted room, give them all joined users - device_list_updates.extend(updates); - } - let (joined_member_count, invited_member_count, heroes) = if send_member_count { calculate_counts(services, room_id, sender_user).await? } else { @@ -1148,6 +1134,8 @@ async fn calculate_state_incremental( invited_member_count, joined_since_last_sync, state_events: delta_state_events, + device_list_updates, + left_encrypted_users, }) }