diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index e3f559f5..49246514 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -55,7 +55,10 @@ use ruma::{ }; use super::{load_timeline, share_encrypted_room}; -use crate::{client::ignored_filter, Ruma, RumaResponse}; +use crate::{ + client::{ignored_filter, lazy_loading_witness}, + Ruma, RumaResponse, +}; #[derive(Default)] struct StateChanges { @@ -633,10 +636,6 @@ async fn load_joined_room( }) .into(); - let no_state_changes = timeline_pdus.is_empty() - && (since_shortstatehash.is_none() - || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash))); - let since_sender_member: OptionFuture<_> = since_shortstatehash .map(|short| { services @@ -658,11 +657,7 @@ async fn load_joined_room( let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() || filter.room.timeline.lazy_load_options.is_enabled(); - let generate_witness = - lazy_loading_enabled && (since_shortstatehash.is_none() || joined_since_last_sync); - - let lazy_reset = lazy_loading_enabled && since_shortstatehash.is_none(); - + let lazy_reset = since_shortstatehash.is_none(); let lazy_loading_context = &lazy_loading::Context { user_id: sender_user, device_id: sender_device, @@ -677,24 +672,10 @@ async fn load_joined_room( .into(); lazy_load_reset.await; - let witness: Option = generate_witness.then(|| { - timeline_pdus - .iter() - .map(|(_, pdu)| pdu.sender.clone()) - .chain(receipt_events.keys().cloned()) - .collect() - }); - - let witness: OptionFuture<_> = witness - .map(|witness| { - services - .rooms - .lazy_loading - .witness_retain(witness, lazy_loading_context) - }) + let witness: OptionFuture<_> = lazy_loading_enabled + .then(|| lazy_loading_witness(services, lazy_loading_context, timeline_pdus.iter())) .into(); - let witness = witness.await; let StateChanges { heroes, joined_member_count, @@ -703,23 +684,19 @@ async fn load_joined_room( state_events, mut device_list_updates, left_encrypted_users, - } = if no_state_changes { - StateChanges::default() - } else { - calculate_state_changes( - services, - sender_user, - room_id, - full_state, - filter, - since_shortstatehash, - current_shortstatehash, - joined_since_last_sync, - witness.as_ref(), - ) - .boxed() - .await? - }; + } = calculate_state_changes( + services, + sender_user, + room_id, + full_state, + filter, + since_shortstatehash, + current_shortstatehash, + joined_since_last_sync, + witness.await.as_ref(), + ) + .boxed() + .await?; let account_data_events = services .account_data @@ -908,6 +885,7 @@ async fn calculate_state_changes( since_shortstatehash, current_shortstatehash, joined_since_last_sync, + witness, ) .await } @@ -920,7 +898,7 @@ async fn calculate_state_initial( sender_user: &UserId, room_id: &RoomId, full_state: bool, - filter: &FilterDefinition, + _filter: &FilterDefinition, current_shortstatehash: ShortStateHash, witness: Option<&Witness>, ) -> Result { @@ -938,20 +916,14 @@ async fn calculate_state_initial( .zip(event_ids.into_iter().stream()) .ready_filter_map(|item| Some((item.0.ok()?, item.1))) .ready_filter_map(|((event_type, state_key), event_id)| { - let lazy_load_enabled = filter.room.state.lazy_load_options.is_enabled() - || filter.room.timeline.lazy_load_options.is_enabled(); - - if lazy_load_enabled + let lazy = !full_state && event_type == StateEventType::RoomMember - && !full_state && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { sender_user != user_id && witness.is_some_and(|witness| !witness.contains(user_id)) - }) { - return None; - } + }); - Some(event_id) + lazy.or_some(event_id) }) .broad_filter_map(|event_id: OwnedEventId| async move { services.rooms.timeline.get_pdu(&event_id).await.ok() @@ -978,7 +950,7 @@ async fn calculate_state_initial( #[tracing::instrument(name = "incremental", level = "trace", skip_all)] #[allow(clippy::too_many_arguments)] -async fn calculate_state_incremental( +async fn calculate_state_incremental<'a>( services: &Services, sender_user: &UserId, room_id: &RoomId, @@ -987,39 +959,80 @@ async fn calculate_state_incremental( since_shortstatehash: Option, current_shortstatehash: ShortStateHash, joined_since_last_sync: bool, + witness: Option<&'a Witness>, ) -> Result { - // Incremental /sync - let since_shortstatehash = - since_shortstatehash.expect("missing since_shortstatehash on incremental sync"); + let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); - let mut delta_state_events = Vec::new(); + let state_changed = since_shortstatehash != current_shortstatehash; - if since_shortstatehash != current_shortstatehash { - let current_state_ids = services + let state_get_id = |user_id: &'a UserId| { + services .rooms .state_accessor - .state_full_ids(current_shortstatehash) - .collect(); + .state_get_id(current_shortstatehash, &StateEventType::RoomMember, user_id.as_str()) + .ok() + }; - let since_state_ids = services - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .collect(); + let lazy_state_ids: OptionFuture<_> = witness + .map(|witness| { + witness + .iter() + .stream() + .broad_filter_map(|user_id| state_get_id(user_id)) + .collect::>() + }) + .into(); - let (current_state_ids, since_state_ids): ( - HashMap<_, OwnedEventId>, - HashMap<_, OwnedEventId>, - ) = join(current_state_ids, since_state_ids).await; + let current_state_ids: OptionFuture<_> = state_changed + .then(|| { + services + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .collect::>() + }) + .into(); - current_state_ids - .iter() - .stream() - .ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id)) - .wide_filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok()) - .ready_for_each(|pdu| delta_state_events.push(pdu)) - .await; - } + let since_state_ids: OptionFuture<_> = (state_changed && !full_state) + .then(|| { + services + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .collect::>() + }) + .into(); + + let lazy_state_ids = lazy_state_ids + .map(Option::into_iter) + .map(|iter| iter.flat_map(Vec::into_iter)) + .map(IterStream::stream) + .flatten_stream(); + + let ref since_state_ids = since_state_ids.shared(); + let delta_state_events = current_state_ids + .map(Option::into_iter) + .map(|iter| iter.flat_map(Vec::into_iter)) + .map(IterStream::stream) + .flatten_stream() + .filter_map(|(shortstatekey, event_id): (u64, OwnedEventId)| async move { + since_state_ids + .clone() + .await + .is_none_or(|since_state| since_state.get(&shortstatekey) != Some(&event_id)) + .then_some(event_id) + }) + .chain(lazy_state_ids) + .broad_filter_map(|event_id: OwnedEventId| async move { + services + .rooms + .timeline + .get_pdu(&event_id) + .await + .map(move |pdu| (event_id, pdu)) + .ok() + }) + .collect::>(); let since_encryption = services .rooms @@ -1031,11 +1044,12 @@ async fn calculate_state_incremental( .rooms .state_accessor .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok() - .await; + .is_ok(); + + let (delta_state_events, encrypted_room) = join(delta_state_events, encrypted_room).await; let (mut device_list_updates, left_encrypted_users) = delta_state_events - .iter() + .values() .stream() .ready_filter(|_| encrypted_room) .ready_filter(|state_event| state_event.kind == RoomMember) @@ -1084,7 +1098,7 @@ async fn calculate_state_incremental( } let send_member_count = delta_state_events - .iter() + .values() .any(|event| event.kind == RoomMember); let (joined_member_count, invited_member_count, heroes) = if send_member_count { @@ -1098,9 +1112,9 @@ async fn calculate_state_incremental( joined_member_count, invited_member_count, joined_since_last_sync, - state_events: delta_state_events, device_list_updates, left_encrypted_users, + state_events: delta_state_events.into_values().collect(), }) }