additional sync v3 refactoring/optimizations and tracing instruments

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-08 03:00:09 +00:00
parent aba88ccead
commit 34f9e3260f

View file

@ -11,8 +11,9 @@ use conduit::{
result::{FlatOk, LogDebugErr},
utils,
utils::{
future::OptionExt,
math::ruma_from_u64,
stream::{BroadbandExt, Tools},
stream::{BroadbandExt, Tools, WidebandExt},
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
},
Error, PduCount, PduEvent, Result,
@ -22,7 +23,7 @@ use conduit_service::{
Services,
};
use futures::{
future::{join, join3, join5, try_join, OptionFuture},
future::{join, join3, join4, join5, try_join, try_join3, OptionFuture},
FutureExt, StreamExt, TryFutureExt,
};
use ruma::{
@ -122,7 +123,6 @@ pub(crate) async fn sync_events_route(
let watcher = services.sync.watch(sender_user, sender_device);
let next_batch = services.globals.current_count()?;
let next_batchcount = PduCount::Normal(next_batch);
let next_batch_string = next_batch.to_string();
// Load filter
@ -153,9 +153,8 @@ pub(crate) async fn sync_events_route(
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let sincecount = PduCount::Normal(since);
let joined_related = services
let joined_rooms = services
.rooms
.state_cache
.rooms_joined(sender_user)
@ -167,9 +166,7 @@ pub(crate) async fn sync_events_route(
sender_device,
room_id.clone(),
since,
sincecount,
next_batch,
next_batchcount,
lazy_load_enabled,
lazy_load_send_redundant,
full_state,
@ -211,9 +208,11 @@ pub(crate) async fn sync_events_route(
.ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
.collect();
let invited_rooms = services.rooms.state_cache.rooms_invited(sender_user).fold(
BTreeMap::new(),
|mut invited_rooms, (room_id, invite_state)| async move {
let invited_rooms = services
.rooms
.state_cache
.rooms_invited(sender_user)
.fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move {
// Get and drop the lock to wait for remaining operations to finish
let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await;
drop(insert_lock);
@ -238,8 +237,7 @@ pub(crate) async fn sync_events_route(
invited_rooms.insert(room_id, invited_room);
invited_rooms
},
);
});
let presence_updates: OptionFuture<_> = services
.globals
@ -274,7 +272,7 @@ pub(crate) async fn sync_events_route(
.users
.remove_to_device_events(sender_user, sender_device, since);
let rooms = join3(joined_related, left_rooms, invited_rooms);
let rooms = join3(joined_rooms, left_rooms, invited_rooms);
let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates);
let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms)
.boxed()
@ -282,8 +280,8 @@ pub(crate) async fn sync_events_route(
let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top;
let ((), to_device_events, presence_updates) = ephemeral;
let (joined_related, left_rooms, invited_rooms) = rooms;
let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_related;
let (joined_rooms, left_rooms, invited_rooms) = rooms;
let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_rooms;
device_list_updates.extend(keys_changed);
// If the user doesn't share an encrypted room with the target anymore, we need
@ -349,6 +347,7 @@ pub(crate) async fn sync_events_route(
Ok(response)
}
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
async fn process_presence_updates(services: &Services, since: u64, syncing_user: &UserId) -> PresenceUpdates {
services
.presence
@ -411,17 +410,17 @@ async fn process_presence_updates(services: &Services, since: u64, syncing_user:
)]
#[allow(clippy::too_many_arguments)]
async fn handle_left_room(
services: &Services, since: u64, room_id: OwnedRoomId, sender_user: &UserId, next_batch_string: &str,
services: &Services, since: u64, ref room_id: OwnedRoomId, sender_user: &UserId, next_batch_string: &str,
full_state: bool, lazy_load_enabled: bool,
) -> Result<Option<LeftRoom>> {
// Get and drop the lock to wait for remaining operations to finish
let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await;
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
drop(insert_lock);
let left_count = services
.rooms
.state_cache
.get_left_count(&room_id, sender_user)
.get_left_count(room_id, sender_user)
.await
.ok();
@ -430,7 +429,7 @@ async fn handle_left_room(
return Ok(None);
}
if !services.rooms.metadata.exists(&room_id).await {
if !services.rooms.metadata.exists(room_id).await {
// This is just a rejected invite, not a room we know
// Insert a leave event anyways
let event = PduEvent {
@ -476,7 +475,7 @@ async fn handle_left_room(
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(&room_id, since)
.get_token_shortstatehash(room_id, since)
.await;
let since_state_ids = match since_shortstatehash {
@ -487,7 +486,7 @@ async fn handle_left_room(
let Ok(left_event_id): Result<OwnedEventId> = services
.rooms
.state_accessor
.room_state_get_id(&room_id, &StateEventType::RoomMember, sender_user.as_str())
.room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str())
.await
else {
error!("Left room but no left state event");
@ -557,30 +556,46 @@ async fn handle_left_room(
}))
}
#[tracing::instrument(
name = "joined",
level = "debug",
skip_all,
fields(
room_id = ?room_id,
),
)]
#[allow(clippy::too_many_arguments)]
async fn load_joined_room(
services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: OwnedRoomId, since: u64,
sincecount: PduCount, next_batch: u64, next_batchcount: PduCount, lazy_load_enabled: bool,
lazy_load_send_redundant: bool, full_state: bool,
services: &Services, sender_user: &UserId, sender_device: &DeviceId, ref room_id: OwnedRoomId, since: u64,
next_batch: u64, lazy_load_enabled: bool, lazy_load_send_redundant: bool, full_state: bool,
) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
let mut device_list_updates = HashSet::<OwnedUserId>::new();
let mut left_encrypted_users = HashSet::<OwnedUserId>::new();
// Get and drop the lock to wait for remaining operations to finish
// This will make sure the we have all events until next_batch
let insert_lock = services.rooms.timeline.mutex_insert.lock(&room_id).await;
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
drop(insert_lock);
let (timeline_pdus, limited) =
load_timeline(services, sender_user, &room_id, sincecount, Some(next_batchcount), 10_usize).await?;
let sincecount = PduCount::Normal(since);
let next_batchcount = PduCount::Normal(next_batch);
let send_notification_counts = !timeline_pdus.is_empty()
|| services
.rooms
.user
.last_notification_read(sender_user, &room_id)
.await > since;
let current_shortstatehash = services
.rooms
.state
.get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, since)
.ok()
.map(Ok);
let timeline = load_timeline(services, sender_user, room_id, sincecount, Some(next_batchcount), 10_usize);
let (current_shortstatehash, since_shortstatehash, timeline) =
try_join3(current_shortstatehash, since_shortstatehash, timeline).await?;
let (timeline_pdus, limited) = timeline;
let timeline_users = timeline_pdus
.iter()
.fold(HashSet::new(), |mut timeline_users, (_, event)| {
@ -588,43 +603,44 @@ async fn load_joined_room(
timeline_users
});
let last_notification_read: OptionFuture<_> = timeline_pdus
.is_empty()
.then(|| {
services
.rooms
.user
.last_notification_read(sender_user, room_id)
})
.into();
let send_notification_counts = last_notification_read
.is_none_or(|&count| count > since)
.await;
services
.rooms
.lazy_loading
.lazy_load_confirm_delivery(sender_user, sender_device, &room_id, sincecount);
.lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount);
let current_shortstatehash = services
.rooms
.state
.get_room_shortstatehash(&room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(&room_id, since)
.ok()
.map(Ok);
let (current_shortstatehash, since_shortstatehash) = try_join(current_shortstatehash, since_shortstatehash).await?;
let no_state_changes = timeline_pdus.is_empty()
&& (since_shortstatehash.is_none() || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash)));
let mut device_list_updates = HashSet::<OwnedUserId>::new();
let mut left_encrypted_users = HashSet::<OwnedUserId>::new();
let StateChanges {
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events,
} = if timeline_pdus.is_empty()
&& (since_shortstatehash.is_none() || since_shortstatehash.is_some_and(is_equal_to!(current_shortstatehash)))
{
// No state changes
} = if no_state_changes {
StateChanges::default()
} else {
calculate_state_changes(
services,
sender_user,
sender_device,
&room_id,
room_id,
next_batchcount,
lazy_load_enabled,
lazy_load_send_redundant,
@ -636,22 +652,68 @@ async fn load_joined_room(
&timeline_pdus,
&timeline_users,
)
.boxed()
.await?
};
let prev_batch = timeline_pdus
.first()
.map(at!(0))
.as_ref()
.map(ToString::to_string);
let account_data_events = services
.account_data
.changes_since(Some(room_id), sender_user, since)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
// Look for device list updates in this room
let device_updates = services
.users
.room_keys_changed(room_id, since, None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let room_events = timeline_pdus
.iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item.clone(), sender_user))
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let receipt_events = services
.rooms
.read_receipt
.readreceipts_since(room_id, since)
.filter_map(|(read_user, _, edu)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.await
.or_some((read_user.to_owned(), edu))
})
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>();
let typing_events = services
.rooms
.typing
.last_typing_update(room_id)
.and_then(|count| async move {
if count <= since {
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
}
let typings = services
.rooms
.typing
.typings_all(room_id, sender_user)
.await?;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
})
.unwrap_or(Vec::new());
let notification_count: OptionFuture<_> = send_notification_counts
.then(|| {
services
.rooms
.user
.notification_count(sender_user, &room_id)
.notification_count(sender_user, room_id)
.map(TryInto::try_into)
.unwrap_or(uint!(0))
})
@ -662,79 +724,33 @@ async fn load_joined_room(
services
.rooms
.user
.highlight_count(sender_user, &room_id)
.highlight_count(sender_user, room_id)
.map(TryInto::try_into)
.unwrap_or(uint!(0))
})
.into();
let room_events = timeline_pdus
.iter()
.stream()
.filter_map(|item| ignored_filter(services, item.clone(), sender_user))
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let account_data_events = services
.account_data
.changes_since(Some(&room_id), sender_user, since)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
let receipt_events = services
.rooms
.read_receipt
.readreceipts_since(&room_id, since)
.filter_map(|(read_user, _, edu)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.await
.or_some((read_user.to_owned(), edu))
})
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>();
// Look for device list updates in this room
let device_updates = services
.users
.room_keys_changed(&room_id, since, None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let events = join3(room_events, account_data_events, receipt_events);
let events = join4(room_events, account_data_events, receipt_events, typing_events);
let unread_notifications = join(notification_count, highlight_count);
let (unread_notifications, events, device_updates) = join3(unread_notifications, events, device_updates)
.boxed()
.await;
let (room_events, account_data_events, receipt_events) = events;
let (room_events, account_data_events, receipt_events, typing_events) = events;
let (notification_count, highlight_count) = unread_notifications;
device_list_updates.extend(device_updates);
let mut edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events.into_values().collect();
if services.rooms.typing.last_typing_update(&room_id).await? > since {
edus.push(
serde_json::from_str(
&serde_json::to_string(
&services
.rooms
.typing
.typings_all(&room_id, sender_user)
.await?,
)
.expect("event is valid, we just created it"),
)
.expect("event is valid, we just created it"),
);
}
device_list_updates.extend(device_updates);
let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
.into_values()
.chain(typing_events.into_iter())
.collect();
// Save the state after this sync so we can send the correct state diff next
// sync
services
.rooms
.user
.associate_token_shortstatehash(&room_id, next_batch, current_shortstatehash)
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash)
.await;
let joined_room = JoinedRoom {
@ -757,8 +773,12 @@ async fn load_joined_room(
},
timeline: Timeline {
limited: limited || joined_since_last_sync,
prev_batch,
events: room_events,
prev_batch: timeline_pdus
.first()
.map(at!(0))
.as_ref()
.map(ToString::to_string),
},
state: RoomState {
events: state_events
@ -775,6 +795,17 @@ async fn load_joined_room(
Ok((joined_room, device_list_updates, left_encrypted_users))
}
#[tracing::instrument(
name = "state",
level = "trace",
skip_all,
fields(
full = %full_state,
ll = ?(lazy_load_enabled, lazy_load_send_redundant),
cs = %current_shortstatehash,
ss = ?since_shortstatehash,
)
)]
#[allow(clippy::too_many_arguments)]
async fn calculate_state_changes(
services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount,
@ -833,6 +864,7 @@ async fn calculate_state_changes(
}
}
#[tracing::instrument(name = "initial", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
async fn calculate_state_initial(
services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount,
@ -847,7 +879,7 @@ async fn calculate_state_initial(
.await?
.into_iter()
.stream()
.filter_map(|(shortstatekey, event_id): (ShortStateKey, OwnedEventId)| {
.broad_filter_map(|(shortstatekey, event_id): (ShortStateKey, OwnedEventId)| {
services
.rooms
.short
@ -919,6 +951,7 @@ async fn calculate_state_initial(
})
}
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
async fn calculate_state_incremental(
services: &Services, sender_user: &UserId, sender_device: &DeviceId, room_id: &RoomId, next_batchcount: PduCount,
@ -949,7 +982,7 @@ async fn calculate_state_incremental(
.iter()
.stream()
.ready_filter(|(key, id)| full_state || since_state_ids.get(key) != Some(id))
.filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok())
.wide_filter_map(|(_, id)| services.rooms.timeline.get_pdu(id).ok())
.ready_for_each(|pdu| delta_state_events.push(pdu))
.await;
}