From 34f9e3260f44c8a9f642aeb3c9b94266431d5e25 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 8 Dec 2024 03:00:09 +0000 Subject: [PATCH] additional sync v3 refactoring/optimizations and tracing instruments Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 275 +++++++++++++++++++++----------------- 1 file changed, 154 insertions(+), 121 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 31179d3c..28ca1ea2 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -11,8 +11,9 @@ use conduit::{ result::{FlatOk, LogDebugErr}, utils, utils::{ + future::OptionExt, math::ruma_from_u64, - stream::{BroadbandExt, Tools}, + stream::{BroadbandExt, Tools, WidebandExt}, BoolExt, IterStream, ReadyExt, TryFutureExtExt, }, Error, PduCount, PduEvent, Result, @@ -22,7 +23,7 @@ use conduit_service::{ Services, }; use futures::{ - future::{join, join3, join5, try_join, OptionFuture}, + future::{join, join3, join4, join5, try_join, try_join3, OptionFuture}, FutureExt, StreamExt, TryFutureExt, }; use ruma::{ @@ -122,7 +123,6 @@ pub(crate) async fn sync_events_route( let watcher = services.sync.watch(sender_user, sender_device); let next_batch = services.globals.current_count()?; - let next_batchcount = PduCount::Normal(next_batch); let next_batch_string = next_batch.to_string(); // Load filter @@ -153,9 +153,8 @@ pub(crate) async fn sync_events_route( .as_ref() .and_then(|string| string.parse().ok()) .unwrap_or(0); - let sincecount = PduCount::Normal(since); - let joined_related = services + let joined_rooms = services .rooms .state_cache .rooms_joined(sender_user) @@ -167,9 +166,7 @@ pub(crate) async fn sync_events_route( sender_device, room_id.clone(), since, - sincecount, next_batch, - next_batchcount, lazy_load_enabled, lazy_load_send_redundant, full_state, @@ -211,9 +208,11 @@ pub(crate) async fn sync_events_route( .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room))) .collect(); - let invited_rooms = services.rooms.state_cache.rooms_invited(sender_user).fold( - BTreeMap::new(), - |mut invited_rooms, (room_id, invite_state)| async move { + let invited_rooms = services + .rooms + .state_cache + .rooms_invited(sender_user) + .fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move { // Get and drop the lock to wait for remaining operations to finish let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; drop(insert_lock); @@ -238,8 +237,7 @@ pub(crate) async fn sync_events_route( invited_rooms.insert(room_id, invited_room); invited_rooms - }, - ); + }); let presence_updates: OptionFuture<_> = services .globals @@ -274,7 +272,7 @@ pub(crate) async fn sync_events_route( .users .remove_to_device_events(sender_user, sender_device, since); - let rooms = join3(joined_related, left_rooms, invited_rooms); + let rooms = join3(joined_rooms, left_rooms, invited_rooms); let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) .boxed() @@ -282,8 +280,8 @@ pub(crate) async fn sync_events_route( let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; let ((), to_device_events, presence_updates) = ephemeral; - let (joined_related, left_rooms, invited_rooms) = rooms; - let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_related; + let (joined_rooms, left_rooms, invited_rooms) = rooms; + let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_rooms; device_list_updates.extend(keys_changed); // If the user doesn't share an encrypted room with the target anymore, we need @@ -349,6 +347,7 @@ pub(crate) async fn sync_events_route( Ok(response) } +#[tracing::instrument(name = "presence", level = "debug", skip_all)] async fn process_presence_updates(services: &Services, since: u64, syncing_user: &UserId) -> PresenceUpdates { services .presence @@ -411,17 +410,17 @@ async fn process_presence_updates(services: &Services, since: u64, syncing_user: )] #[allow(clippy::too_many_arguments)] async fn handle_left_room( - services: &Services, since: u64, room_id: OwnedRoomId, sender_user: &UserId, next_batch_string: &str, + services: &Services, since: u64, ref room_id: OwnedRoomId, sender_user: &UserId, next_batch_string: &str, full_state: bool, lazy_load_enabled: bool, ) -> Result> { // Get and drop the lock to wait for remaining operations to finish - let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; + let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await; drop(insert_lock); let left_count = services .rooms .state_cache - .get_left_count(&room_id, sender_user) + .get_left_count(room_id, sender_user) .await .ok(); @@ -430,7 +429,7 @@ async fn handle_left_room( return Ok(None); } - if !services.rooms.metadata.exists(&room_id).await { + if !services.rooms.metadata.exists(room_id).await { // This is just a rejected invite, not a room we know // Insert a leave event anyways let event = PduEvent { @@ -476,7 +475,7 @@ async fn handle_left_room( let since_shortstatehash = services .rooms .user - .get_token_shortstatehash(&room_id, since) + .get_token_shortstatehash(room_id, since) .await; let since_state_ids = match since_shortstatehash { @@ -487,7 +486,7 @@ async fn handle_left_room( let Ok(left_event_id): Result = services .rooms .state_accessor - .room_state_get_id(&room_id, &StateEventType::RoomMember, sender_user.as_str()) + .room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str()) .await else { error!("Left room but no left state event"); @@ -557,30 +556,46 @@ async fn handle_left_room( })) } +#[tracing::instrument( + name = "joined", + level = "debug", + skip_all, + fields( + room_id = ?room_id, + ), +)] #[allow(clippy::too_many_arguments)] async fn load_joined_room( - services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: OwnedRoomId, since: u64, - sincecount: PduCount, next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool, - lazy_load_send_redundant: bool, full_state: bool, + services: &Services, sender_user: &UserId, sender_device: &DeviceId, ref room_id: OwnedRoomId, since: u64, + next_batch: u64, lazy_load_enabled: bool, lazy_load_send_redundant: bool, full_state: bool, ) -> Result<(JoinedRoom, HashSet, HashSet)> { - let mut device_list_updates = HashSet::::new(); - let mut left_encrypted_users = HashSet::::new(); - // Get and drop the lock to wait for remaining operations to finish // This will make sure the we have all events until next_batch - let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await; + let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await; drop(insert_lock); - let (timeline_pdus, limited) = - load_timeline(services, sender_user, &room_id, sincecount, Some(next_batchcount), 10_usize).await?; + let sincecount = PduCount::Normal(since); + let next_batchcount = PduCount::Normal(next_batch); - let send_notification_counts = !timeline_pdus.is_empty() - || services - .rooms - .user - .last_notification_read(sender_user, &room_id) - .await > since; + let current_shortstatehash = services + .rooms + .state + .get_room_shortstatehash(room_id) + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); + let since_shortstatehash = services + .rooms + .user + .get_token_shortstatehash(room_id, since) + .ok() + .map(Ok); + + let timeline = load_timeline(services, sender_user, room_id, sincecount, Some(next_batchcount), 10_usize); + + let (current_shortstatehash, since_shortstatehash, timeline) = + try_join3(current_shortstatehash, since_shortstatehash, timeline).await?; + + let (timeline_pdus, limited) = timeline; let timeline_users = timeline_pdus .iter() .fold(HashSet::new(), |mut timeline_users, (_, event)| { @@ -588,43 +603,44 @@ async fn load_joined_room( timeline_users }); + let last_notification_read: OptionFuture<_> = timeline_pdus + .is_empty() + .then(|| { + services + .rooms + .user + .last_notification_read(sender_user, room_id) + }) + .into(); + + let send_notification_counts = last_notification_read + .is_none_or(|&count| count > since) + .await; + services .rooms .lazy_loading - .lazy_load_confirm_delivery(sender_user, sender_device, &room_id, sincecount); + .lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount); - let current_shortstatehash = services - .rooms - .state - .get_room_shortstatehash(&room_id) - .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); - - let since_shortstatehash = services - .rooms - .user - .get_token_shortstatehash(&room_id, since) - .ok() - .map(Ok); - - let (current_shortstatehash, since_shortstatehash) = try_join(current_shortstatehash, since_shortstatehash).await?; + let no_state_changes = timeline_pdus.is_empty() + && (since_shortstatehash.is_none() || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash))); + let mut device_list_updates = HashSet::::new(); + let mut left_encrypted_users = HashSet::::new(); let StateChanges { heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events, - } = if timeline_pdus.is_empty() - && (since_shortstatehash.is_none() || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash))) - { - // No state changes + } = if no_state_changes { StateChanges::default() } else { calculate_state_changes( services, sender_user, sender_device, - &room_id, + room_id, next_batchcount, lazy_load_enabled, lazy_load_send_redundant, @@ -636,22 +652,68 @@ async fn load_joined_room( &timeline_pdus, &timeline_users, ) - .boxed() .await? }; - let prev_batch = timeline_pdus - .first() - .map(at!(0)) - .as_ref() - .map(ToString::to_string); + let account_data_events = services + .account_data + .changes_since(Some(room_id), sender_user, since) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect(); + + // Look for device list updates in this room + let device_updates = services + .users + .room_keys_changed(room_id, since, None) + .map(|(user_id, _)| user_id) + .map(ToOwned::to_owned) + .collect::>(); + + let room_events = timeline_pdus + .iter() + .stream() + .wide_filter_map(|item| ignored_filter(services, item.clone(), sender_user)) + .map(|(_, pdu)| pdu.to_sync_room_event()) + .collect(); + + let receipt_events = services + .rooms + .read_receipt + .readreceipts_since(room_id, since) + .filter_map(|(read_user, _, edu)| async move { + services + .users + .user_is_ignored(read_user, sender_user) + .await + .or_some((read_user.to_owned(), edu)) + }) + .collect::>>(); + + let typing_events = services + .rooms + .typing + .last_typing_update(room_id) + .and_then(|count| async move { + if count <= since { + return Ok(Vec::>::new()); + } + + let typings = services + .rooms + .typing + .typings_all(room_id, sender_user) + .await?; + + Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) + }) + .unwrap_or(Vec::new()); let notification_count: OptionFuture<_> = send_notification_counts .then(|| { services .rooms .user - .notification_count(sender_user, &room_id) + .notification_count(sender_user, room_id) .map(TryInto::try_into) .unwrap_or(uint!(0)) }) @@ -662,79 +724,33 @@ async fn load_joined_room( services .rooms .user - .highlight_count(sender_user, &room_id) + .highlight_count(sender_user, room_id) .map(TryInto::try_into) .unwrap_or(uint!(0)) }) .into(); - let room_events = timeline_pdus - .iter() - .stream() - .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) - .map(|(_, pdu)| pdu.to_sync_room_event()) - .collect(); - - let account_data_events = services - .account_data - .changes_since(Some(&room_id), sender_user, since) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect(); - - let receipt_events = services - .rooms - .read_receipt - .readreceipts_since(&room_id, since) - .filter_map(|(read_user, _, edu)| async move { - services - .users - .user_is_ignored(read_user, sender_user) - .await - .or_some((read_user.to_owned(), edu)) - }) - .collect::>>(); - - // Look for device list updates in this room - let device_updates = services - .users - .room_keys_changed(&room_id, since, None) - .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) - .collect::>(); - - let events = join3(room_events, account_data_events, receipt_events); + let events = join4(room_events, account_data_events, receipt_events, typing_events); let unread_notifications = join(notification_count, highlight_count); let (unread_notifications, events, device_updates) = join3(unread_notifications, events, device_updates) .boxed() .await; - let (room_events, account_data_events, receipt_events) = events; + let (room_events, account_data_events, receipt_events, typing_events) = events; let (notification_count, highlight_count) = unread_notifications; - device_list_updates.extend(device_updates); - let mut edus: Vec> = receipt_events.into_values().collect(); - if services.rooms.typing.last_typing_update(&room_id).await? > since { - edus.push( - serde_json::from_str( - &serde_json::to_string( - &services - .rooms - .typing - .typings_all(&room_id, sender_user) - .await?, - ) - .expect("event is valid, we just created it"), - ) - .expect("event is valid, we just created it"), - ); - } + device_list_updates.extend(device_updates); + let edus: Vec> = receipt_events + .into_values() + .chain(typing_events.into_iter()) + .collect(); // Save the state after this sync so we can send the correct state diff next // sync services .rooms .user - .associate_token_shortstatehash(&room_id, next_batch, current_shortstatehash) + .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash) .await; let joined_room = JoinedRoom { @@ -757,8 +773,12 @@ async fn load_joined_room( }, timeline: Timeline { limited: limited || joined_since_last_sync, - prev_batch, events: room_events, + prev_batch: timeline_pdus + .first() + .map(at!(0)) + .as_ref() + .map(ToString::to_string), }, state: RoomState { events: state_events @@ -775,6 +795,17 @@ async fn load_joined_room( Ok((joined_room, device_list_updates, left_encrypted_users)) } +#[tracing::instrument( + name = "state", + level = "trace", + skip_all, + fields( + full = %full_state, + ll = ?(lazy_load_enabled, lazy_load_send_redundant), + cs = %current_shortstatehash, + ss = ?since_shortstatehash, + ) +)] #[allow(clippy::too_many_arguments)] async fn calculate_state_changes( services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount, @@ -833,6 +864,7 @@ async fn calculate_state_changes( } } +#[tracing::instrument(name = "initial", level = "trace", skip_all)] #[allow(clippy::too_many_arguments)] async fn calculate_state_initial( services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount, @@ -847,7 +879,7 @@ async fn calculate_state_initial( .await? .into_iter() .stream() - .filter_map(|(shortstatekey, event_id): (ShortStateKey, OwnedEventId)| { + .broad_filter_map(|(shortstatekey, event_id): (ShortStateKey, OwnedEventId)| { services .rooms .short @@ -919,6 +951,7 @@ async fn calculate_state_initial( }) } +#[tracing::instrument(name = "incremental", level = "trace", skip_all)] #[allow(clippy::too_many_arguments)] async fn calculate_state_incremental( services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount, @@ -949,7 +982,7 @@ async fn calculate_state_incremental( .iter() .stream() .ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id)) - .filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok()) + .wide_filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok()) .ready_for_each(|pdu| delta_state_events.push(pdu)) .await; }