implement lazy-loading for incremental sync

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-31 08:34:32 +00:00
parent a4ef04cd14
commit 6983798487

View file

@ -55,7 +55,10 @@ use ruma::{
};
use super::{load_timeline, share_encrypted_room};
use crate::{client::ignored_filter, Ruma, RumaResponse};
use crate::{
client::{ignored_filter, lazy_loading_witness},
Ruma, RumaResponse,
};
#[derive(Default)]
struct StateChanges {
@ -633,10 +636,6 @@ async fn load_joined_room(
})
.into();
let no_state_changes = timeline_pdus.is_empty()
&& (since_shortstatehash.is_none()
|| since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash)));
let since_sender_member: OptionFuture<_> = since_shortstatehash
.map(|short| {
services
@ -658,11 +657,7 @@ async fn load_joined_room(
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
let generate_witness =
lazy_loading_enabled && (since_shortstatehash.is_none() || joined_since_last_sync);
let lazy_reset = lazy_loading_enabled && since_shortstatehash.is_none();
let lazy_reset = since_shortstatehash.is_none();
let lazy_loading_context = &lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
@ -677,24 +672,10 @@ async fn load_joined_room(
.into();
lazy_load_reset.await;
let witness: Option<Witness> = generate_witness.then(|| {
timeline_pdus
.iter()
.map(|(_, pdu)| pdu.sender.clone())
.chain(receipt_events.keys().cloned())
.collect()
});
let witness: OptionFuture<_> = witness
.map(|witness| {
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
})
let witness: OptionFuture<_> = lazy_loading_enabled
.then(|| lazy_loading_witness(services, lazy_loading_context, timeline_pdus.iter()))
.into();
let witness = witness.await;
let StateChanges {
heroes,
joined_member_count,
@ -703,23 +684,19 @@ async fn load_joined_room(
state_events,
mut device_list_updates,
left_encrypted_users,
} = if no_state_changes {
StateChanges::default()
} else {
calculate_state_changes(
services,
sender_user,
room_id,
full_state,
filter,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness.as_ref(),
)
.boxed()
.await?
};
} = calculate_state_changes(
services,
sender_user,
room_id,
full_state,
filter,
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness.await.as_ref(),
)
.boxed()
.await?;
let account_data_events = services
.account_data
@ -908,6 +885,7 @@ async fn calculate_state_changes(
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness,
)
.await
}
@ -920,7 +898,7 @@ async fn calculate_state_initial(
sender_user: &UserId,
room_id: &RoomId,
full_state: bool,
filter: &FilterDefinition,
_filter: &FilterDefinition,
current_shortstatehash: ShortStateHash,
witness: Option<&Witness>,
) -> Result<StateChanges> {
@ -938,20 +916,14 @@ async fn calculate_state_initial(
.zip(event_ids.into_iter().stream())
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| {
let lazy_load_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
if lazy_load_enabled
let lazy = !full_state
&& event_type == StateEventType::RoomMember
&& !full_state
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
sender_user != user_id
&& witness.is_some_and(|witness| !witness.contains(user_id))
}) {
return None;
}
});
Some(event_id)
lazy.or_some(event_id)
})
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
@ -978,7 +950,7 @@ async fn calculate_state_initial(
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
async fn calculate_state_incremental(
async fn calculate_state_incremental<'a>(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
@ -987,39 +959,80 @@ async fn calculate_state_incremental(
since_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
witness: Option<&'a Witness>,
) -> Result<StateChanges> {
// Incremental /sync
let since_shortstatehash =
since_shortstatehash.expect("missing since_shortstatehash on incremental sync");
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
let mut delta_state_events = Vec::new();
let state_changed = since_shortstatehash != current_shortstatehash;
if since_shortstatehash != current_shortstatehash {
let current_state_ids = services
let state_get_id = |user_id: &'a UserId| {
services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.collect();
.state_get_id(current_shortstatehash, &StateEventType::RoomMember, user_id.as_str())
.ok()
};
let since_state_ids = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.collect();
let lazy_state_ids: OptionFuture<_> = witness
.map(|witness| {
witness
.iter()
.stream()
.broad_filter_map(|user_id| state_get_id(user_id))
.collect::<Vec<OwnedEventId>>()
})
.into();
let (current_state_ids, since_state_ids): (
HashMap<_, OwnedEventId>,
HashMap<_, OwnedEventId>,
) = join(current_state_ids, since_state_ids).await;
let current_state_ids: OptionFuture<_> = state_changed
.then(|| {
services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.collect::<Vec<(_, OwnedEventId)>>()
})
.into();
current_state_ids
.iter()
.stream()
.ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id))
.wide_filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok())
.ready_for_each(|pdu| delta_state_events.push(pdu))
.await;
}
let since_state_ids: OptionFuture<_> = (state_changed && !full_state)
.then(|| {
services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.collect::<HashMap<_, OwnedEventId>>()
})
.into();
let lazy_state_ids = lazy_state_ids
.map(Option::into_iter)
.map(|iter| iter.flat_map(Vec::into_iter))
.map(IterStream::stream)
.flatten_stream();
let ref since_state_ids = since_state_ids.shared();
let delta_state_events = current_state_ids
.map(Option::into_iter)
.map(|iter| iter.flat_map(Vec::into_iter))
.map(IterStream::stream)
.flatten_stream()
.filter_map(|(shortstatekey, event_id): (u64, OwnedEventId)| async move {
since_state_ids
.clone()
.await
.is_none_or(|since_state| since_state.get(&shortstatekey) != Some(&event_id))
.then_some(event_id)
})
.chain(lazy_state_ids)
.broad_filter_map(|event_id: OwnedEventId| async move {
services
.rooms
.timeline
.get_pdu(&event_id)
.await
.map(move |pdu| (event_id, pdu))
.ok()
})
.collect::<HashMap<_, _>>();
let since_encryption = services
.rooms
@ -1031,11 +1044,12 @@ async fn calculate_state_incremental(
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok()
.await;
.is_ok();
let (delta_state_events, encrypted_room) = join(delta_state_events, encrypted_room).await;
let (mut device_list_updates, left_encrypted_users) = delta_state_events
.iter()
.values()
.stream()
.ready_filter(|_| encrypted_room)
.ready_filter(|state_event| state_event.kind == RoomMember)
@ -1084,7 +1098,7 @@ async fn calculate_state_incremental(
}
let send_member_count = delta_state_events
.iter()
.values()
.any(|event| event.kind == RoomMember);
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
@ -1098,9 +1112,9 @@ async fn calculate_state_incremental(
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events: delta_state_events,
device_list_updates,
left_encrypted_users,
state_events: delta_state_events.into_values().collect(),
})
}