diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 3a78c9ad..61a0ea5c 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -6,12 +6,25 @@ use std::{ use axum::extract::State; use conduit::{ - at, err, error, extract_variant, is_equal_to, - result::FlatOk, - utils::{math::ruma_from_u64, BoolExt, IterStream, ReadyExt, TryFutureExtExt}, - PduCount, + at, err, error, extract_variant, is_equal_to, is_false, + pdu::EventHash, + result::{FlatOk, LogDebugErr}, + utils, + utils::{ + math::ruma_from_u64, + stream::{BroadbandExt, Tools}, + BoolExt, IterStream, ReadyExt, TryFutureExtExt, + }, + Error, PduCount, PduEvent, Result, +}; +use conduit_service::{ + rooms::short::{ShortStateHash, ShortStateKey}, + Services, +}; +use futures::{ + future::{join, join3, join5, try_join, OptionFuture}, + FutureExt, StreamExt, TryFutureExt, }; -use futures::{future::OptionFuture, pin_mut, FutureExt, StreamExt}; use ruma::{ api::client::{ filter::{FilterDefinition, LazyLoadOptions}, @@ -34,14 +47,20 @@ use ruma::{ serde::Raw, uint, DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, }; -use tracing::{Instrument as _, Span}; use super::{load_timeline, share_encrypted_room}; -use crate::{ - client::ignored_filter, - service::{pdu::EventHash, Services}, - utils, Error, PduEvent, Result, Ruma, RumaResponse, -}; +use crate::{client::ignored_filter, Ruma, RumaResponse}; + +#[derive(Default)] +struct StateChanges { + heroes: Option>, + joined_member_count: Option, + invited_member_count: Option, + joined_since_last_sync: bool, + state_events: Vec, +} + +type PresenceUpdates = HashMap; /// # `GET /_matrix/client/r0/sync` /// @@ -81,32 +100,30 @@ use crate::{ pub(crate) async fn sync_events_route( State(services): State, body: Ruma, ) -> Result> { - let sender_user = body.sender_user.expect("user is authenticated"); - let sender_device = body.sender_device.expect("user is authenticated"); - let body = body.body; + let (sender_user, sender_device) = body.sender(); // Presence update if services.globals.allow_local_presence() { services .presence - .ping_presence(&sender_user, &body.set_presence) + .ping_presence(sender_user, &body.body.set_presence) .await?; } // Setup watchers, so if there's no response, we can wait for them - let watcher = services.sync.watch(&sender_user, &sender_device); + let watcher = services.sync.watch(sender_user, sender_device); let next_batch = services.globals.current_count()?; let next_batchcount = PduCount::Normal(next_batch); let next_batch_string = next_batch.to_string(); // Load filter - let filter = match body.filter { + let filter = match body.body.filter.as_ref() { None => FilterDefinition::default(), - Some(Filter::FilterDefinition(filter)) => filter, - Some(Filter::FilterId(filter_id)) => services + Some(Filter::FilterDefinition(ref filter)) => filter.clone(), + Some(Filter::FilterId(ref filter_id)) => services .users - .get_filter(&sender_user, &filter_id) + .get_filter(sender_user, filter_id) .await .unwrap_or_default(), }; @@ -120,183 +137,190 @@ pub(crate) async fn sync_events_route( LazyLoadOptions::Disabled => (false, cfg!(feature = "element_hacks")), }; - let full_state = body.full_state; + let full_state = body.body.full_state; - let mut joined_rooms = BTreeMap::new(); let since = body + .body .since .as_ref() .and_then(|string| string.parse().ok()) .unwrap_or(0); let sincecount = PduCount::Normal(since); - let mut presence_updates = HashMap::new(); - let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in - let mut device_list_updates = HashSet::new(); - let mut device_list_left = HashSet::new(); - - // Look for device list updates of this account - device_list_updates.extend( - services - .users - .keys_changed(&sender_user, since, None) - .map(ToOwned::to_owned) - .collect::>() - .await, - ); - - if services.globals.allow_local_presence() { - process_presence_updates(&services, &mut presence_updates, since, &sender_user).await?; - } - - let all_joined_rooms: Vec<_> = services + let joined_related = services .rooms .state_cache - .rooms_joined(&sender_user) + .rooms_joined(sender_user) .map(ToOwned::to_owned) - .collect() - .await; + .broad_filter_map(|room_id| { + load_joined_room( + &services, + sender_user, + sender_device, + room_id.clone(), + since, + sincecount, + next_batch, + next_batchcount, + lazy_load_enabled, + lazy_load_send_redundant, + full_state, + ) + .map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu)) + .ok() + }) + .ready_fold( + (BTreeMap::new(), HashSet::new(), HashSet::new()), + |(mut joined_rooms, mut device_list_updates, mut left_encrypted_users), + (room_id, joined_room, dlu, leu)| { + device_list_updates.extend(dlu); + left_encrypted_users.extend(leu); + if !joined_room.is_empty() { + joined_rooms.insert(room_id, joined_room); + } - // Coalesce database writes for the remainder of this scope. - let _cork = services.db.cork_and_flush(); - - for room_id in all_joined_rooms { - if let Ok(joined_room) = load_joined_room( - &services, - &sender_user, - &sender_device, - &room_id, - since, - sincecount, - next_batch, - next_batchcount, - lazy_load_enabled, - lazy_load_send_redundant, - full_state, - &mut device_list_updates, - &mut left_encrypted_users, - ) - .await - { - if !joined_room.is_empty() { - joined_rooms.insert(room_id.clone(), joined_room); - } - } - } - - let mut left_rooms = BTreeMap::new(); - let all_left_rooms: Vec<_> = services - .rooms - .state_cache - .rooms_left(&sender_user) - .collect() - .await; - - for result in all_left_rooms { - handle_left_room( - &services, - since, - &result.0, - &sender_user, - &mut left_rooms, - &next_batch_string, - full_state, - lazy_load_enabled, - ) - .instrument(Span::current()) - .await?; - } - - let mut invited_rooms = BTreeMap::new(); - let all_invited_rooms: Vec<_> = services - .rooms - .state_cache - .rooms_invited(&sender_user) - .collect() - .await; - - for (room_id, invite_state_events) in all_invited_rooms { - // Get and drop the lock to wait for remaining operations to finish - let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; - drop(insert_lock); - - let invite_count = services - .rooms - .state_cache - .get_invite_count(&room_id, &sender_user) - .await - .ok(); - - // Invited before last sync - if Some(since) >= invite_count { - continue; - } - - invited_rooms.insert( - room_id.clone(), - InvitedRoom { - invite_state: InviteState { - events: invite_state_events, - }, + (joined_rooms, device_list_updates, left_encrypted_users) }, ); - } - for user_id in left_encrypted_users { - let dont_share_encrypted_room = !share_encrypted_room(&services, &sender_user, &user_id, None).await; + let left_rooms = services + .rooms + .state_cache + .rooms_left(sender_user) + .broad_filter_map(|(room_id, _)| { + handle_left_room( + &services, + since, + room_id.clone(), + sender_user, + &next_batch_string, + full_state, + lazy_load_enabled, + ) + .map_ok(move |left_room| (room_id, left_room)) + .ok() + }) + .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room))) + .collect(); - // If the user doesn't share an encrypted room with the target anymore, we need - // to tell them - if dont_share_encrypted_room { - device_list_left.insert(user_id); - } - } + let invited_rooms = services.rooms.state_cache.rooms_invited(sender_user).fold( + BTreeMap::new(), + |mut invited_rooms, (room_id, invite_state)| async move { + // Get and drop the lock to wait for remaining operations to finish + let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; + drop(insert_lock); + + let invite_count = services + .rooms + .state_cache + .get_invite_count(&room_id, sender_user) + .await + .ok(); + + // Invited before last sync + if Some(since) >= invite_count { + return invited_rooms; + } + + let invited_room = InvitedRoom { + invite_state: InviteState { + events: invite_state, + }, + }; + + invited_rooms.insert(room_id, invited_room); + invited_rooms + }, + ); + + let presence_updates: OptionFuture<_> = services + .globals + .allow_local_presence() + .then(|| process_presence_updates(&services, since, sender_user)) + .into(); + + let account_data = services + .account_data + .changes_since(None, sender_user, since) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect(); + + // Look for device list updates of this account + let keys_changed = services + .users + .keys_changed(sender_user, since, None) + .map(ToOwned::to_owned) + .collect::>(); + + let to_device_events = services + .users + .get_to_device_events(sender_user, sender_device) + .collect::>(); + + let device_one_time_keys_count = services + .users + .count_one_time_keys(sender_user, sender_device); // Remove all to-device events the device received *last time* - services + let remove_to_device_events = services .users - .remove_to_device_events(&sender_user, &sender_device, since) + .remove_to_device_events(sender_user, sender_device, since); + + let rooms = join3(joined_related, left_rooms, invited_rooms); + let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); + let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) + .boxed() + .await; + + let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; + let ((), to_device_events, presence_updates) = ephemeral; + let (joined_related, left_rooms, invited_rooms) = rooms; + let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_related; + device_list_updates.extend(keys_changed); + + // If the user doesn't share an encrypted room with the target anymore, we need + // to tell them + let device_list_left = left_encrypted_users + .into_iter() + .stream() + .broad_filter_map(|user_id| async move { + let no_shared_encrypted_room = !share_encrypted_room(&services, sender_user, &user_id, None).await; + no_shared_encrypted_room.then_some(user_id) + }) + .ready_fold(HashSet::new(), |mut device_list_left, user_id| { + device_list_left.insert(user_id); + device_list_left + }) .await; let response = sync_events::v3::Response { + account_data: GlobalAccountData { + events: account_data, + }, + device_lists: DeviceLists { + changed: device_list_updates.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }, + device_one_time_keys_count, + // Fallback keys are not yet supported + device_unused_fallback_key_types: None, next_batch: next_batch_string, + presence: Presence { + events: presence_updates + .unwrap_or_default() + .into_values() + .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully")) + .collect(), + }, rooms: Rooms { leave: left_rooms, join: joined_rooms, invite: invited_rooms, knock: BTreeMap::new(), // TODO }, - presence: Presence { - events: presence_updates - .into_values() - .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully")) - .collect(), - }, - account_data: GlobalAccountData { - events: services - .account_data - .changes_since(None, &sender_user, since) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) - .collect() - .await, - }, - device_lists: DeviceLists { - changed: device_list_updates.into_iter().collect(), - left: device_list_left.into_iter().collect(), - }, - device_one_time_keys_count: services - .users - .count_one_time_keys(&sender_user, &sender_device) - .await, to_device: ToDevice { - events: services - .users - .get_to_device_events(&sender_user, &sender_device) - .collect() - .await, + events: to_device_events, }, - // Fallback keys are not yet supported - device_unused_fallback_key_types: None, }; // TODO: Retry the endpoint instead of returning @@ -310,37 +334,86 @@ pub(crate) async fn sync_events_route( // Hang a few seconds so requests are not spammed // Stop hanging if new info arrives let default = Duration::from_secs(30); - let duration = cmp::min(body.timeout.unwrap_or(default), default); + let duration = cmp::min(body.body.timeout.unwrap_or(default), default); _ = tokio::time::timeout(duration, watcher).await; } Ok(response) } +async fn process_presence_updates(services: &Services, since: u64, syncing_user: &UserId) -> PresenceUpdates { + services + .presence + .presence_since(since) + .filter(|(user_id, ..)| { + services + .rooms + .state_cache + .user_sees_user(syncing_user, user_id) + }) + .filter_map(|(user_id, _, presence_bytes)| { + services + .presence + .from_json_bytes_to_event(presence_bytes, user_id) + .map_ok(move |event| (user_id, event)) + .ok() + }) + .ready_fold(PresenceUpdates::new(), |mut updates, (user_id, event)| { + match updates.entry(user_id.into()) { + Entry::Vacant(slot) => { + slot.insert(event); + }, + Entry::Occupied(mut slot) => { + let curr_event = slot.get_mut(); + let curr_content = &mut curr_event.content; + let new_content = event.content; + + // Update existing presence event with more info + curr_content.presence = new_content.presence; + curr_content.status_msg = new_content + .status_msg + .or_else(|| curr_content.status_msg.take()); + curr_content.last_active_ago = new_content.last_active_ago.or(curr_content.last_active_ago); + curr_content.displayname = new_content + .displayname + .or_else(|| curr_content.displayname.take()); + curr_content.avatar_url = new_content + .avatar_url + .or_else(|| curr_content.avatar_url.take()); + curr_content.currently_active = new_content + .currently_active + .or(curr_content.currently_active); + }, + }; + + updates + }) + .await +} + #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all, fields(user_id = %sender_user, room_id = %room_id), name = "left_room")] async fn handle_left_room( - services: &Services, since: u64, room_id: &RoomId, sender_user: &UserId, - left_rooms: &mut BTreeMap, next_batch_string: &str, full_state: bool, - lazy_load_enabled: bool, -) -> Result<()> { + services: &Services, since: u64, room_id: OwnedRoomId, sender_user: &UserId, next_batch_string: &str, + full_state: bool, lazy_load_enabled: bool, +) -> Result> { // Get and drop the lock to wait for remaining operations to finish - let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await; + let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; drop(insert_lock); let left_count = services .rooms .state_cache - .get_left_count(room_id, sender_user) + .get_left_count(&room_id, sender_user) .await .ok(); // Left before last sync if Some(since) >= left_count { - return Ok(()); + return Ok(None); } - if !services.rooms.metadata.exists(room_id).await { + if !services.rooms.metadata.exists(&room_id).await { // This is just a rejected invite, not a room we know // Insert a leave event anyways let event = PduEvent { @@ -355,7 +428,7 @@ async fn handle_left_room( state_key: Some(sender_user.to_string()), unsigned: None, // The following keys are dropped on conversion - room_id: room_id.to_owned(), + room_id: room_id.clone(), prev_events: vec![], depth: uint!(1), auth_events: vec![], @@ -366,23 +439,19 @@ async fn handle_left_room( signatures: None, }; - left_rooms.insert( - room_id.to_owned(), - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.to_owned()), - events: Vec::new(), - }, - state: RoomState { - events: vec![event.to_sync_state_event()], - }, + return Ok(Some(LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), }, - ); - return Ok(()); + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.to_owned()), + events: Vec::new(), + }, + state: RoomState { + events: vec![event.to_sync_state_event()], + }, + })); } let mut left_state_events = Vec::new(); @@ -390,7 +459,7 @@ async fn handle_left_room( let since_shortstatehash = services .rooms .user - .get_token_shortstatehash(room_id, since) + .get_token_shortstatehash(&room_id, since) .await; let since_state_ids = match since_shortstatehash { @@ -401,11 +470,11 @@ async fn handle_left_room( let Ok(left_event_id): Result = services .rooms .state_accessor - .room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str()) + .room_state_get_id(&room_id, &StateEventType::RoomMember, sender_user.as_str()) .await else { error!("Left room but no left state event"); - return Ok(()); + return Ok(None); }; let Ok(left_shortstatehash) = services @@ -415,7 +484,7 @@ async fn handle_left_room( .await else { error!(event_id = %left_event_id, "Leave event has no state"); - return Ok(()); + return Ok(None); }; let mut left_state_ids = services @@ -456,488 +525,102 @@ async fn handle_left_room( } } - left_rooms.insert( - room_id.to_owned(), - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.to_owned()), - events: Vec::new(), - }, - state: RoomState { - events: left_state_events, - }, + Ok(Some(LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), }, - ); - Ok(()) -} - -async fn process_presence_updates( - services: &Services, presence_updates: &mut HashMap, since: u64, syncing_user: &UserId, -) -> Result<()> { - let presence_since = services.presence.presence_since(since); - - // Take presence updates - pin_mut!(presence_since); - while let Some((user_id, _, presence_bytes)) = presence_since.next().await { - if !services - .rooms - .state_cache - .user_sees_user(syncing_user, user_id) - .await - { - continue; - } - - let presence_event = services - .presence - .from_json_bytes_to_event(presence_bytes, user_id) - .await?; - - match presence_updates.entry(user_id.into()) { - Entry::Vacant(slot) => { - slot.insert(presence_event); - }, - Entry::Occupied(mut slot) => { - let curr_event = slot.get_mut(); - let curr_content = &mut curr_event.content; - let new_content = presence_event.content; - - // Update existing presence event with more info - curr_content.presence = new_content.presence; - curr_content.status_msg = new_content - .status_msg - .or_else(|| curr_content.status_msg.take()); - curr_content.last_active_ago = new_content.last_active_ago.or(curr_content.last_active_ago); - curr_content.displayname = new_content - .displayname - .or_else(|| curr_content.displayname.take()); - curr_content.avatar_url = new_content - .avatar_url - .or_else(|| curr_content.avatar_url.take()); - curr_content.currently_active = new_content - .currently_active - .or(curr_content.currently_active); - }, - }; - } - - Ok(()) + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.to_owned()), + events: Vec::new(), + }, + state: RoomState { + events: left_state_events, + }, + })) } #[allow(clippy::too_many_arguments)] async fn load_joined_room( - services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, since: u64, + services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: OwnedRoomId, since: u64, sincecount: PduCount, next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool, - lazy_load_send_redundant: bool, full_state: bool, device_list_updates: &mut HashSet, - left_encrypted_users: &mut HashSet, -) -> Result { + lazy_load_send_redundant: bool, full_state: bool, +) -> Result<(JoinedRoom, HashSet, HashSet)> { + let mut device_list_updates = HashSet::::new(); + let mut left_encrypted_users = HashSet::::new(); + // Get and drop the lock to wait for remaining operations to finish // This will make sure the we have all events until next_batch - let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await; + let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; drop(insert_lock); let (timeline_pdus, limited) = - load_timeline(services, sender_user, room_id, sincecount, Some(next_batchcount), 10_usize).await?; + load_timeline(services, sender_user, &room_id, sincecount, Some(next_batchcount), 10_usize).await?; let send_notification_counts = !timeline_pdus.is_empty() || services .rooms .user - .last_notification_read(sender_user, room_id) + .last_notification_read(sender_user, &room_id) .await > since; - let mut timeline_users = HashSet::new(); - for (_, event) in &timeline_pdus { - timeline_users.insert(event.sender.as_str().to_owned()); - } + let timeline_users = timeline_pdus + .iter() + .fold(HashSet::new(), |mut timeline_users, (_, event)| { + timeline_users.insert(event.sender.as_str().to_owned()); + timeline_users + }); services .rooms .lazy_loading - .lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount); + .lazy_load_confirm_delivery(sender_user, sender_device, &room_id, sincecount); let current_shortstatehash = services .rooms .state - .get_room_shortstatehash(room_id) - .await - .map_err(|_| err!(Database(error!("Room {room_id} has no state"))))?; + .get_room_shortstatehash(&room_id) + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); let since_shortstatehash = services .rooms .user - .get_token_shortstatehash(room_id, since) - .await - .ok(); + .get_token_shortstatehash(&room_id, since) + .ok() + .map(Ok); - let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) = if timeline_pdus - .is_empty() + let (current_shortstatehash, since_shortstatehash) = try_join(current_shortstatehash, since_shortstatehash).await?; + + let StateChanges { + heroes, + joined_member_count, + invited_member_count, + joined_since_last_sync, + state_events, + } = if timeline_pdus.is_empty() && (since_shortstatehash.is_none() || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash))) { // No state changes - (Vec::new(), None, None, false, Vec::new()) + StateChanges::default() } else { - // Calculates joined_member_count, invited_member_count and heroes - let calculate_counts = || async { - let joined_member_count = services - .rooms - .state_cache - .room_joined_count(room_id) - .await - .unwrap_or(0); - - let invited_member_count = services - .rooms - .state_cache - .room_invited_count(room_id) - .await - .unwrap_or(0); - - if joined_member_count.saturating_add(invited_member_count) > 5 { - return Ok::<_, Error>((Some(joined_member_count), Some(invited_member_count), Vec::new())); - } - - // Go through all PDUs and for each member event, check if the user is still - // joined or invited until we have 5 or we reach the end - - // Recalculate heroes (first 5 members) - let heroes = services - .rooms - .timeline - .all_pdus(sender_user, room_id) - .ready_filter(|(_, pdu)| pdu.kind == RoomMember) - .filter_map(|(_, pdu)| async move { - let content: RoomMemberEventContent = pdu.get_content().ok()?; - let user_id: &UserId = pdu.state_key.as_deref().map(TryInto::try_into).flat_ok()?; - - if user_id == sender_user { - return None; - } - - // The membership was and still is invite or join - if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) { - return None; - } - - let is_invited = services.rooms.state_cache.is_invited(user_id, room_id); - - let is_joined = services.rooms.state_cache.is_joined(user_id, room_id); - - if !is_joined.await && is_invited.await { - return None; - } - - Some(user_id.to_owned()) - }) - .collect::>() - .await; - - Ok::<_, Error>(( - Some(joined_member_count), - Some(invited_member_count), - heroes.into_iter().collect::>(), - )) - }; - - let get_sender_member_content = |short| { - services - .rooms - .state_accessor - .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) - .ok() - }; - - let since_sender_member: OptionFuture<_> = since_shortstatehash.map(get_sender_member_content).into(); - - let joined_since_last_sync = since_sender_member - .await - .flatten() - .map_or(true, |content: RoomMemberEventContent| { - content.membership != MembershipState::Join - }); - - if since_shortstatehash.is_none() || joined_since_last_sync { - // Probably since = 0, we will do an initial sync - - let (joined_member_count, invited_member_count, heroes) = calculate_counts().await?; - - let current_state_ids: HashMap<_, OwnedEventId> = services - .rooms - .state_accessor - .state_full_ids(current_shortstatehash) - .await?; - - let mut state_events = Vec::new(); - let mut lazy_loaded = HashSet::new(); - - for (shortstatekey, event_id) in current_state_ids { - let (event_type, state_key) = services - .rooms - .short - .get_statekey_from_short(shortstatekey) - .await?; - - if event_type != StateEventType::RoomMember { - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - continue; - }; - - state_events.push(pdu); - continue; - } - - // TODO: Delete "element_hacks" when this is resolved: https://github.com/vector-im/element-web/issues/22565 - if !lazy_load_enabled - || full_state || timeline_users.contains(&state_key) - || (cfg!(feature = "element_hacks") && *sender_user == state_key) - { - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - continue; - }; - - // This check is in case a bad user ID made it into the database - if let Ok(uid) = UserId::parse(&state_key) { - lazy_loaded.insert(uid); - } - - state_events.push(pdu); - } - } - - // Reset lazy loading because this is an initial sync - services - .rooms - .lazy_loading - .lazy_load_reset(sender_user, sender_device, room_id) - .await; - - // The state_events above should contain all timeline_users, let's mark them as - // lazy loaded. - services.rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_batchcount, - ); - - (heroes, joined_member_count, invited_member_count, true, state_events) - } else { - // Incremental /sync - let since_shortstatehash = since_shortstatehash.expect("missing since_shortstatehash on incremental sync"); - - let mut delta_state_events = Vec::new(); - - if since_shortstatehash != current_shortstatehash { - let current_state_ids: HashMap<_, OwnedEventId> = services - .rooms - .state_accessor - .state_full_ids(current_shortstatehash) - .await?; - - let since_state_ids = services - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .await?; - - for (key, id) in current_state_ids { - if full_state || since_state_ids.get(&key) != Some(&id) { - let Ok(pdu) = services.rooms.timeline.get_pdu(&id).await else { - error!("Pdu in state not found: {id}"); - continue; - }; - - delta_state_events.push(pdu); - } - } - } - - let encrypted_room = services - .rooms - .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .await - .is_ok(); - - let since_encryption = services - .rooms - .state_accessor - .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") - .await; - - // Calculations: - let new_encrypted_room = encrypted_room && since_encryption.is_err(); - - let send_member_count = delta_state_events - .iter() - .any(|event| event.kind == RoomMember); - - if encrypted_room { - for state_event in &delta_state_events { - if state_event.kind != RoomMember { - continue; - } - - if let Some(state_key) = &state_event.state_key { - let user_id = UserId::parse(state_key.clone()) - .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; - - if user_id == sender_user { - continue; - } - - let content: RoomMemberEventContent = state_event.get_content()?; - - match content.membership { - MembershipState::Join => { - // A new user joined an encrypted room - if !share_encrypted_room(services, sender_user, &user_id, Some(room_id)).await { - device_list_updates.insert(user_id); - } - }, - MembershipState::Leave => { - // Write down users that have left encrypted rooms we are in - left_encrypted_users.insert(user_id); - }, - _ => {}, - } - } - } - } - - if joined_since_last_sync && encrypted_room || new_encrypted_room { - // If the user is in a new encrypted room, give them all joined users - device_list_updates.extend( - services - .rooms - .state_cache - .room_members(room_id) - // Don't send key updates from the sender to the sender - .ready_filter(|user_id| sender_user != *user_id) - // Only send keys if the sender doesn't share an encrypted room with the target - // already - .filter_map(|user_id| { - share_encrypted_room(services, sender_user, user_id, Some(room_id)) - .map(|res| res.or_some(user_id.to_owned())) - }) - .collect::>() - .await, - ); - } - - let (joined_member_count, invited_member_count, heroes) = if send_member_count { - calculate_counts().await? - } else { - (None, None, Vec::new()) - }; - - let mut state_events = delta_state_events; - let mut lazy_loaded = HashSet::new(); - - // Mark all member events we're returning as lazy-loaded - for pdu in &state_events { - if pdu.kind == RoomMember { - match UserId::parse( - pdu.state_key - .as_ref() - .expect("State event has state key") - .clone(), - ) { - Ok(state_key_userid) => { - lazy_loaded.insert(state_key_userid); - }, - Err(e) => error!("Invalid state key for member event: {}", e), - } - } - } - - // Fetch contextual member state events for events from the timeline, and - // mark them as lazy-loaded as well. - for (_, event) in &timeline_pdus { - if lazy_loaded.contains(&event.sender) { - continue; - } - - if !services - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, room_id, &event.sender) - .await || lazy_load_send_redundant - { - if let Ok(member_event) = services - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomMember, event.sender.as_str()) - .await - { - lazy_loaded.insert(event.sender.clone()); - state_events.push(member_event); - } - } - } - - services.rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_batchcount, - ); - - ( - heroes, - joined_member_count, - invited_member_count, - joined_since_last_sync, - state_events, - ) - } - }; - - // Look for device list updates in this room - device_list_updates.extend( - services - .users - .room_keys_changed(room_id, since, None) - .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) - .collect::>() - .await, - ); - - let notification_count = if send_notification_counts { - Some( - services - .rooms - .user - .notification_count(sender_user, room_id) - .await - .try_into() - .expect("notification count can't go that high"), + calculate_state_changes( + services, + sender_user, + sender_device, + &room_id, + next_batchcount, + lazy_load_enabled, + lazy_load_send_redundant, + full_state, + &mut device_list_updates, + &mut left_encrypted_users, + since_shortstatehash, + current_shortstatehash, + &timeline_pdus, + &timeline_users, ) - } else { - None - }; - - let highlight_count = if send_notification_counts { - Some( - services - .rooms - .user - .highlight_count(sender_user, room_id) - .await - .try_into() - .expect("highlight count can't go that high"), - ) - } else { - None + .boxed() + .await? }; let prev_batch = timeline_pdus @@ -946,18 +629,45 @@ async fn load_joined_room( .as_ref() .map(ToString::to_string); - let room_events: Vec<_> = timeline_pdus + let notification_count: OptionFuture<_> = send_notification_counts + .then(|| { + services + .rooms + .user + .notification_count(sender_user, &room_id) + .map(TryInto::try_into) + .unwrap_or(uint!(0)) + }) + .into(); + + let highlight_count: OptionFuture<_> = send_notification_counts + .then(|| { + services + .rooms + .user + .highlight_count(sender_user, &room_id) + .map(TryInto::try_into) + .unwrap_or(uint!(0)) + }) + .into(); + + let room_events = timeline_pdus .iter() .stream() .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) .map(|(_, pdu)| pdu.to_sync_room_event()) - .collect() - .await; + .collect(); - let edus: HashMap> = services + let account_data_events = services + .account_data + .changes_since(Some(&room_id), sender_user, since) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect(); + + let receipt_events = services .rooms .read_receipt - .readreceipts_since(room_id, since) + .readreceipts_since(&room_id, since) .filter_map(|(read_user, _, edu)| async move { services .users @@ -965,19 +675,35 @@ async fn load_joined_room( .await .or_some((read_user.to_owned(), edu)) }) - .collect() + .collect::>>(); + + // Look for device list updates in this room + let device_updates = services + .users + .room_keys_changed(&room_id, since, None) + .map(|(user_id, _)| user_id) + .map(ToOwned::to_owned) + .collect::>(); + + let events = join3(room_events, account_data_events, receipt_events); + let unread_notifications = join(notification_count, highlight_count); + let (unread_notifications, events, device_updates) = join3(unread_notifications, events, device_updates) + .boxed() .await; - let mut edus: Vec> = edus.into_values().collect(); + let (room_events, account_data_events, receipt_events) = events; + let (notification_count, highlight_count) = unread_notifications; + device_list_updates.extend(device_updates); - if services.rooms.typing.last_typing_update(room_id).await? > since { + let mut edus: Vec> = receipt_events.into_values().collect(); + if services.rooms.typing.last_typing_update(&room_id).await? > since { edus.push( serde_json::from_str( &serde_json::to_string( &services .rooms .typing - .typings_all(room_id, sender_user) + .typings_all(&room_id, sender_user) .await?, ) .expect("event is valid, we just created it"), @@ -991,22 +717,22 @@ async fn load_joined_room( services .rooms .user - .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash) + .associate_token_shortstatehash(&room_id, next_batch, current_shortstatehash) .await; - Ok(JoinedRoom { + let joined_room = JoinedRoom { account_data: RoomAccountData { - events: services - .account_data - .changes_since(Some(room_id), sender_user, since) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect() - .await, + events: account_data_events, }, summary: RoomSummary { - heroes, joined_member_count: joined_member_count.map(ruma_from_u64), invited_member_count: invited_member_count.map(ruma_from_u64), + heroes: heroes + .into_iter() + .flatten() + .map(TryInto::try_into) + .filter_map(Result::ok) + .collect(), }, unread_notifications: UnreadNotificationsCount { highlight_count, @@ -1027,5 +753,407 @@ async fn load_joined_room( events: edus, }, unread_thread_notifications: BTreeMap::new(), + }; + + Ok((joined_room, device_list_updates, left_encrypted_users)) +} + +#[allow(clippy::too_many_arguments)] +async fn calculate_state_changes( + services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount, + lazy_load_enabled: bool, lazy_load_send_redundant: bool, full_state: bool, + device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, + since_shortstatehash: Option, current_shortstatehash: ShortStateHash, + timeline_pdus: &Vec<(PduCount, PduEvent)>, timeline_users: &HashSet, +) -> Result { + let since_sender_member: OptionFuture<_> = since_shortstatehash + .map(|short| { + services + .rooms + .state_accessor + .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) + .ok() + }) + .into(); + + let joined_since_last_sync = since_sender_member + .await + .flatten() + .map_or(true, |content: RoomMemberEventContent| { + content.membership != MembershipState::Join + }); + + if since_shortstatehash.is_none() || joined_since_last_sync { + calculate_state_initial( + services, + sender_user, + sender_device, + room_id, + next_batchcount, + lazy_load_enabled, + full_state, + current_shortstatehash, + timeline_users, + ) + .await + } else { + calculate_state_incremental( + services, + sender_user, + sender_device, + room_id, + next_batchcount, + lazy_load_send_redundant, + full_state, + device_list_updates, + left_encrypted_users, + since_shortstatehash, + current_shortstatehash, + timeline_pdus, + joined_since_last_sync, + ) + .await + } +} + +#[allow(clippy::too_many_arguments)] +async fn calculate_state_initial( + services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount, + lazy_load_enabled: bool, full_state: bool, current_shortstatehash: ShortStateHash, + timeline_users: &HashSet, +) -> Result { + // Probably since = 0, we will do an initial sync + let state = services + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .await? + .into_iter() + .stream() + .filter_map(|(shortstatekey, event_id): (ShortStateKey, OwnedEventId)| { + services + .rooms + .short + .get_statekey_from_short(shortstatekey) + .map_ok(move |(event_type, state_key)| ((event_type, state_key), event_id)) + .ok() + }) + .fold((Vec::new(), HashSet::new()), |a, item| async move { + let (mut state_events, mut lazy_loaded) = a; + let ((event_type, state_key), event_id) = item; + + if event_type != StateEventType::RoomMember { + let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { + error!("Pdu in state not found: {event_id}"); + return (state_events, lazy_loaded); + }; + + state_events.push(pdu); + return (state_events, lazy_loaded); + } + + // TODO: Delete "element_hacks" when this is resolved: https://github.com/vector-im/element-web/issues/22565 + if !lazy_load_enabled + || full_state + || timeline_users.contains(&state_key) + || (cfg!(feature = "element_hacks") && *sender_user == state_key) + { + let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { + error!("Pdu in state not found: {event_id}"); + return (state_events, lazy_loaded); + }; + + // This check is in case a bad user ID made it into the database + if let Ok(uid) = UserId::parse(&state_key) { + lazy_loaded.insert(uid); + } + + state_events.push(pdu); + } + + (state_events, lazy_loaded) + }) + .map(Ok); + + let counts = calculate_counts(services, room_id, sender_user); + let ((joined_member_count, invited_member_count, heroes), (state_events, lazy_loaded)) = + try_join(counts, state).boxed().await?; + + // Reset lazy loading because this is an initial sync + services + .rooms + .lazy_loading + .lazy_load_reset(sender_user, sender_device, room_id) + .await; + + // The state_events above should contain all timeline_users, let's mark them as + // lazy loaded. + services + .rooms + .lazy_loading + .lazy_load_mark_sent(sender_user, sender_device, room_id, lazy_loaded, next_batchcount); + + Ok(StateChanges { + heroes, + joined_member_count, + invited_member_count, + joined_since_last_sync: true, + state_events, }) } + +#[allow(clippy::too_many_arguments)] +async fn calculate_state_incremental( + services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount, + lazy_load_send_redundant: bool, full_state: bool, device_list_updates: &mut HashSet, + left_encrypted_users: &mut HashSet, since_shortstatehash: Option, + current_shortstatehash: ShortStateHash, timeline_pdus: &Vec<(PduCount, PduEvent)>, joined_since_last_sync: bool, +) -> Result { + // Incremental /sync + let since_shortstatehash = since_shortstatehash.expect("missing since_shortstatehash on incremental sync"); + + let mut delta_state_events = Vec::new(); + + if since_shortstatehash != current_shortstatehash { + let current_state_ids = services + .rooms + .state_accessor + .state_full_ids(current_shortstatehash); + + let since_state_ids = services + .rooms + .state_accessor + .state_full_ids(since_shortstatehash); + + let (current_state_ids, since_state_ids): (HashMap<_, OwnedEventId>, HashMap<_, OwnedEventId>) = + try_join(current_state_ids, since_state_ids).await?; + + current_state_ids + .iter() + .stream() + .ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id)) + .filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok()) + .ready_for_each(|pdu| delta_state_events.push(pdu)) + .await; + } + + let encrypted_room = services + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + + let since_encryption = services + .rooms + .state_accessor + .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + + let (encrypted_room, since_encryption) = join(encrypted_room, since_encryption).await; + + // Calculations: + let new_encrypted_room = encrypted_room && !since_encryption; + + let send_member_count = delta_state_events + .iter() + .any(|event| event.kind == RoomMember); + + if encrypted_room { + for state_event in &delta_state_events { + if state_event.kind != RoomMember { + continue; + } + + if let Some(state_key) = &state_event.state_key { + let user_id = UserId::parse(state_key.clone()) + .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; + + if user_id == sender_user { + continue; + } + + let content: RoomMemberEventContent = state_event.get_content()?; + + match content.membership { + MembershipState::Join => { + // A new user joined an encrypted room + if !share_encrypted_room(services, sender_user, &user_id, Some(room_id)).await { + device_list_updates.insert(user_id); + } + }, + MembershipState::Leave => { + // Write down users that have left encrypted rooms we are in + left_encrypted_users.insert(user_id); + }, + _ => {}, + } + } + } + } + + if joined_since_last_sync && encrypted_room || new_encrypted_room { + let updates: Vec = services + .rooms + .state_cache + .room_members(room_id) + .ready_filter(|user_id| sender_user != *user_id) + .filter_map(|user_id| { + share_encrypted_room(services, sender_user, user_id, Some(room_id)) + .map(|res| res.or_some(user_id.to_owned())) + }) + .collect() + .await; + + // If the user is in a new encrypted room, give them all joined users + device_list_updates.extend(updates); + } + + let (joined_member_count, invited_member_count, heroes) = if send_member_count { + calculate_counts(services, room_id, sender_user).await? + } else { + (None, None, None) + }; + + let mut state_events = delta_state_events; + + // Mark all member events we're returning as lazy-loaded + let mut lazy_loaded = state_events + .iter() + .filter(|pdu| pdu.kind == RoomMember) + .filter_map(|pdu| { + pdu.state_key + .clone() + .map(TryInto::try_into) + .map(LogDebugErr::log_debug_err) + .flat_ok() + }) + .fold(HashSet::new(), |mut lazy_loaded, user_id| { + lazy_loaded.insert(user_id); + lazy_loaded + }); + + // Fetch contextual member state events for events from the timeline, and + // mark them as lazy-loaded as well. + for (_, event) in timeline_pdus { + if lazy_loaded.contains(&event.sender) { + continue; + } + + let sent_before: OptionFuture<_> = (!lazy_load_send_redundant) + .then(|| { + services.rooms.lazy_loading.lazy_load_was_sent_before( + sender_user, + sender_device, + room_id, + &event.sender, + ) + }) + .into(); + + let member_event: OptionFuture<_> = sent_before + .await + .is_none_or(is_false!()) + .then(|| { + services.rooms.state_accessor.room_state_get( + room_id, + &StateEventType::RoomMember, + event.sender.as_str(), + ) + }) + .into(); + + let Some(Ok(member_event)) = member_event.await else { + continue; + }; + + lazy_loaded.insert(event.sender.clone()); + state_events.push(member_event); + } + + services + .rooms + .lazy_loading + .lazy_load_mark_sent(sender_user, sender_device, room_id, lazy_loaded, next_batchcount); + + Ok(StateChanges { + heroes, + joined_member_count, + invited_member_count, + joined_since_last_sync, + state_events, + }) +} + +async fn calculate_counts( + services: &Services, room_id: &RoomId, sender_user: &UserId, +) -> Result<(Option, Option, Option>)> { + let joined_member_count = services + .rooms + .state_cache + .room_joined_count(room_id) + .unwrap_or(0); + + let invited_member_count = services + .rooms + .state_cache + .room_invited_count(room_id) + .unwrap_or(0); + + let (joined_member_count, invited_member_count) = join(joined_member_count, invited_member_count).await; + + let small_room = joined_member_count.saturating_add(invited_member_count) > 5; + + let heroes: OptionFuture<_> = small_room + .then(|| calculate_heroes(services, room_id, sender_user)) + .into(); + + Ok((Some(joined_member_count), Some(invited_member_count), heroes.await)) +} + +async fn calculate_heroes(services: &Services, room_id: &RoomId, sender_user: &UserId) -> Vec { + services + .rooms + .timeline + .all_pdus(sender_user, room_id) + .ready_filter(|(_, pdu)| pdu.kind == RoomMember) + .fold_default(|heroes: Vec<_>, (_, pdu)| fold_hero(heroes, services, room_id, sender_user, pdu)) + .await +} + +async fn fold_hero( + mut heroes: Vec, services: &Services, room_id: &RoomId, sender_user: &UserId, pdu: PduEvent, +) -> Vec { + let Some(user_id): Option<&UserId> = pdu.state_key.as_deref().map(TryInto::try_into).flat_ok() else { + return heroes; + }; + + if user_id == sender_user { + return heroes; + } + + let Ok(content): Result = pdu.get_content() else { + return heroes; + }; + + // The membership was and still is invite or join + if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) { + return heroes; + } + + if heroes.iter().any(is_equal_to!(user_id)) { + return heroes; + } + + let (is_invited, is_joined) = join( + services.rooms.state_cache.is_invited(user_id, room_id), + services.rooms.state_cache.is_joined(user_id, room_id), + ) + .await; + + if !is_joined && is_invited { + return heroes; + } + + heroes.push(user_id.to_owned()); + heroes +}