reroll encrypted_room branch in incremental sync state

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-31 01:23:27 +00:00
parent e161e5dd61
commit 4ff1155bf0

View file

@ -6,7 +6,7 @@ use std::{
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
at, err, error, extract_variant, is_equal_to, at, err, error, extract_variant, is_equal_to, pair_of,
pdu::EventHash, pdu::EventHash,
result::FlatOk, result::FlatOk,
utils::{ utils::{
@ -16,7 +16,7 @@ use conduwuit::{
stream::{BroadbandExt, Tools, WidebandExt}, stream::{BroadbandExt, Tools, WidebandExt},
BoolExt, IterStream, ReadyExt, TryFutureExtExt, BoolExt, IterStream, ReadyExt, TryFutureExtExt,
}, },
Error, PduCount, PduEvent, Result, PduCount, PduEvent, Result,
}; };
use conduwuit_service::{ use conduwuit_service::{
rooms::{ rooms::{
@ -64,6 +64,8 @@ struct StateChanges {
invited_member_count: Option<u64>, invited_member_count: Option<u64>,
joined_since_last_sync: bool, joined_since_last_sync: bool,
state_events: Vec<PduEvent>, state_events: Vec<PduEvent>,
device_list_updates: HashSet<OwnedUserId>,
left_encrypted_users: HashSet<OwnedUserId>,
} }
type PresenceUpdates = HashMap<OwnedUserId, PresenceEvent>; type PresenceUpdates = HashMap<OwnedUserId, PresenceEvent>;
@ -325,18 +327,16 @@ pub(crate) async fn build_sync_events(
// If the user doesn't share an encrypted room with the target anymore, we need // If the user doesn't share an encrypted room with the target anymore, we need
// to tell them // to tell them
let device_list_left = left_encrypted_users let device_list_left: HashSet<_> = left_encrypted_users
.into_iter() .into_iter()
.stream() .stream()
.broad_filter_map(|user_id| async move { .broad_filter_map(|user_id| async move {
let no_shared_encrypted_room = share_encrypted_room(services, sender_user, &user_id, None)
!share_encrypted_room(services, sender_user, &user_id, None).await; .await
no_shared_encrypted_room.then_some(user_id) .eq(&false)
}) .then_some(user_id)
.ready_fold(HashSet::new(), |mut device_list_left, user_id| {
device_list_left.insert(user_id);
device_list_left
}) })
.collect()
.await; .await;
let response = sync_events::v3::Response { let response = sync_events::v3::Response {
@ -730,14 +730,14 @@ async fn load_joined_room(
.into(); .into();
let witness = witness.await; let witness = witness.await;
let mut device_list_updates = HashSet::<OwnedUserId>::new();
let mut left_encrypted_users = HashSet::<OwnedUserId>::new();
let StateChanges { let StateChanges {
heroes, heroes,
joined_member_count, joined_member_count,
invited_member_count, invited_member_count,
joined_since_last_sync, joined_since_last_sync,
state_events, state_events,
mut device_list_updates,
left_encrypted_users,
} = if no_state_changes { } = if no_state_changes {
StateChanges::default() StateChanges::default()
} else { } else {
@ -747,8 +747,6 @@ async fn load_joined_room(
room_id, room_id,
full_state, full_state,
filter, filter,
&mut device_list_updates,
&mut left_encrypted_users,
since_shortstatehash, since_shortstatehash,
current_shortstatehash, current_shortstatehash,
joined_since_last_sync, joined_since_last_sync,
@ -919,8 +917,6 @@ async fn calculate_state_changes(
room_id: &RoomId, room_id: &RoomId,
full_state: bool, full_state: bool,
filter: &FilterDefinition, filter: &FilterDefinition,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
since_shortstatehash: Option<ShortStateHash>, since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash, current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool, joined_since_last_sync: bool,
@ -944,8 +940,6 @@ async fn calculate_state_changes(
room_id, room_id,
full_state, full_state,
filter, filter,
device_list_updates,
left_encrypted_users,
since_shortstatehash, since_shortstatehash,
current_shortstatehash, current_shortstatehash,
joined_since_last_sync, joined_since_last_sync,
@ -1013,6 +1007,7 @@ async fn calculate_state_initial(
invited_member_count, invited_member_count,
joined_since_last_sync: true, joined_since_last_sync: true,
state_events, state_events,
..Default::default()
}) })
} }
@ -1024,8 +1019,6 @@ async fn calculate_state_incremental(
room_id: &RoomId, room_id: &RoomId,
full_state: bool, full_state: bool,
_filter: &FilterDefinition, _filter: &FilterDefinition,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
since_shortstatehash: Option<ShortStateHash>, since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash, current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool, joined_since_last_sync: bool,
@ -1063,79 +1056,72 @@ async fn calculate_state_incremental(
.await; .await;
} }
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
let since_encryption = services let since_encryption = services
.rooms .rooms
.state_accessor .state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok(); .is_ok();
let (encrypted_room, since_encryption) = join(encrypted_room, since_encryption).await; let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok()
.await;
// Calculations: let (mut device_list_updates, left_encrypted_users) = delta_state_events
let new_encrypted_room = encrypted_room && !since_encryption; .iter()
.stream()
.ready_filter(|_| encrypted_room)
.ready_filter(|state_event| state_event.kind == RoomMember)
.ready_filter_map(|state_event| {
let content = state_event.get_content().ok()?;
let user_id = state_event.state_key.as_ref()?.parse().ok()?;
Some((content, user_id))
})
.ready_filter(|(_, user_id): &(RoomMemberEventContent, OwnedUserId)| {
user_id != sender_user
})
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
use MembershipState::*;
let shares_encrypted_room =
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));
match content.membership {
| Join if !shares_encrypted_room(&user_id).await => dlu.insert(user_id),
| Leave => leu.insert(user_id),
| _ => false,
};
(dlu, leu)
})
.await;
// If the user is in a new encrypted room, give them all joined users
let new_encrypted_room = encrypted_room && !since_encryption.await;
if joined_since_last_sync && encrypted_room || new_encrypted_room {
services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|&user_id| sender_user != user_id)
.map(ToOwned::to_owned)
.broad_filter_map(|user_id| async move {
share_encrypted_room(services, sender_user, &user_id, Some(room_id))
.await
.or_some(user_id)
})
.ready_for_each(|user_id| {
device_list_updates.insert(user_id);
})
.await;
}
let send_member_count = delta_state_events let send_member_count = delta_state_events
.iter() .iter()
.any(|event| event.kind == RoomMember); .any(|event| event.kind == RoomMember);
if encrypted_room {
for state_event in &delta_state_events {
if state_event.kind != RoomMember {
continue;
}
if let Some(state_key) = &state_event.state_key {
let user_id = UserId::parse(state_key)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
if user_id == sender_user {
continue;
}
let content: RoomMemberEventContent = state_event.get_content()?;
match content.membership {
| MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(services, sender_user, user_id, Some(room_id))
.await
{
device_list_updates.insert(user_id.into());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id.into());
},
| _ => {},
}
}
}
}
if joined_since_last_sync && encrypted_room || new_encrypted_room {
let updates: Vec<OwnedUserId> = services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|user_id| sender_user != *user_id)
.filter_map(|user_id| {
share_encrypted_room(services, sender_user, user_id, Some(room_id))
.map(|res| res.or_some(user_id.to_owned()))
})
.collect()
.await;
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(updates);
}
let (joined_member_count, invited_member_count, heroes) = if send_member_count { let (joined_member_count, invited_member_count, heroes) = if send_member_count {
calculate_counts(services, room_id, sender_user).await? calculate_counts(services, room_id, sender_user).await?
} else { } else {
@ -1148,6 +1134,8 @@ async fn calculate_state_incremental(
invited_member_count, invited_member_count,
joined_since_last_sync, joined_since_last_sync,
state_events: delta_state_events, state_events: delta_state_events,
device_list_updates,
left_encrypted_users,
}) })
} }