From e0508958b71a12fbbf7ae0ba554f4656bacd4ab8 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 6 Apr 2025 21:59:18 +0000 Subject: [PATCH] increase snake sync asynchronicity Signed-off-by: Jason Volk --- src/api/client/sync/mod.rs | 36 +----- src/api/client/sync/v3.rs | 17 ++- src/api/client/sync/v4.rs | 38 +++++- src/api/client/sync/v5.rs | 256 ++++++++++++++++++++++++------------- 4 files changed, 212 insertions(+), 135 deletions(-) diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 14459acf..40370160 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -5,16 +5,12 @@ mod v5; use conduwuit::{ Error, PduCount, Result, matrix::pdu::PduEvent, - utils::{ - IterStream, - stream::{BroadbandExt, ReadyExt, TryIgnore}, - }, + utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; use conduwuit_service::Services; use futures::{StreamExt, pin_mut}; use ruma::{ RoomId, UserId, - directory::RoomTypeFilter, events::TimelineEventType::{ self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker, }, @@ -87,33 +83,3 @@ async fn share_encrypted_room( }) .await } - -pub(crate) async fn filter_rooms<'a>( - services: &Services, - rooms: &[&'a RoomId], - filter: &[RoomTypeFilter], - negate: bool, -) -> Vec<&'a RoomId> { - rooms - .iter() - .stream() - .filter_map(|r| async move { - let room_type = services.rooms.state_accessor.get_room_type(r).await; - - if room_type.as_ref().is_err_and(|e| !e.is_not_found()) { - return None; - } - - let room_type_filter = RoomTypeFilter::from(room_type.ok()); - - let include = if negate { - !filter.contains(&room_type_filter) - } else { - filter.is_empty() || filter.contains(&room_type_filter) - }; - - include.then_some(r) - }) - .collect() - .await -} diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 24930941..8eac6b66 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -14,8 +14,8 @@ use conduwuit::{ pair_of, ref_at, result::FlatOk, utils::{ - self, BoolExt, IterStream, ReadyExt, TryFutureExtExt, - future::OptionStream, + self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, + future::{OptionStream, ReadyEqExt}, math::ruma_from_u64, stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, }, @@ -32,6 +32,7 @@ use conduwuit_service::{ use futures::{ FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::{OptionFuture, join, join3, join4, join5, try_join, try_join4}, + pin_mut, }; use ruma::{ DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, @@ -433,10 +434,14 @@ async fn handle_left_room( return Ok(None); } - if !services.rooms.metadata.exists(room_id).await - || services.rooms.metadata.is_disabled(room_id).await - || services.rooms.metadata.is_banned(room_id).await - { + let is_not_found = services.rooms.metadata.exists(room_id).eq(&false); + + let is_disabled = services.rooms.metadata.is_disabled(room_id); + + let is_banned = services.rooms.metadata.is_banned(room_id); + + pin_mut!(is_not_found, is_disabled, is_banned); + if is_not_found.or(is_disabled).or(is_banned).await { // This is just a rejected invite, not a room we know // Insert a leave event anyways for the client let event = PduEvent { diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index 55faf420..f153b2da 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -7,6 +7,7 @@ use std::{ use axum::extract::State; use conduwuit::{ Err, Error, PduCount, PduEvent, Result, debug, error, extract_variant, + matrix::TypeStateKey, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::{ruma_from_usize, usize_from_ruma, usize_from_u64_truncated}, @@ -14,6 +15,7 @@ use conduwuit::{ warn, }; use conduwuit_service::{ + Services, rooms::read_receipt::pack_receipts, sync::{into_db_key, into_snake_key}, }; @@ -24,6 +26,7 @@ use ruma::{ self, DeviceLists, UnreadNotificationsCount, v4::{SlidingOp, SlidingSyncRoomHero}, }, + directory::RoomTypeFilter, events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType::*, @@ -36,10 +39,11 @@ use ruma::{ use super::{load_timeline, share_encrypted_room}; use crate::{ Ruma, - client::{DEFAULT_BUMP_TYPES, filter_rooms, ignored_filter, sync::v5::TodoRooms}, + client::{DEFAULT_BUMP_TYPES, ignored_filter}, }; -pub(crate) const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync"; +type TodoRooms = BTreeMap, usize, u64)>; +const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync"; /// POST `/_matrix/client/unstable/org.matrix.msc3575/sync` /// @@ -802,3 +806,33 @@ pub(crate) async fn sync_events_v4_route( delta_token: None, }) } + +async fn filter_rooms<'a>( + services: &Services, + rooms: &[&'a RoomId], + filter: &[RoomTypeFilter], + negate: bool, +) -> Vec<&'a RoomId> { + rooms + .iter() + .stream() + .filter_map(|r| async move { + let room_type = services.rooms.state_accessor.get_room_type(r).await; + + if room_type.as_ref().is_err_and(|e| !e.is_not_found()) { + return None; + } + + let room_type_filter = RoomTypeFilter::from(room_type.ok()); + + let include = if negate { + !filter.contains(&room_type_filter) + } else { + filter.is_empty() || filter.contains(&room_type_filter) + }; + + include.then_some(r) + }) + .collect() + .await +} diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 00a2d18d..f3fc0f44 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1,28 +1,35 @@ use std::{ cmp::{self, Ordering}, collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + ops::Deref, time::Duration, }; use axum::extract::State; use conduwuit::{ - Err, Error, Result, error, extract_variant, + Err, Error, Result, error, extract_variant, is_equal_to, matrix::{ TypeStateKey, pdu::{PduCount, PduEvent}, }, trace, utils::{ - BoolExt, IterStream, ReadyExt, TryFutureExtExt, + BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, + future::ReadyEqExt, math::{ruma_from_usize, usize_from_ruma}, }, warn, }; -use conduwuit_service::{rooms::read_receipt::pack_receipts, sync::into_snake_key}; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use conduwuit_service::{Services, rooms::read_receipt::pack_receipts, sync::into_snake_key}; +use futures::{ + FutureExt, Stream, StreamExt, TryFutureExt, + future::{OptionFuture, join3, try_join4}, + pin_mut, +}; use ruma::{ DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, api::client::sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, + directory::RoomTypeFilter, events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType, room::member::{MembershipState, RoomMemberEventContent}, @@ -31,13 +38,15 @@ use ruma::{ uint, }; -use super::{filter_rooms, share_encrypted_room}; +use super::share_encrypted_room; use crate::{ Ruma, client::{DEFAULT_BUMP_TYPES, ignored_filter, sync::load_timeline}, }; type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request); +type TodoRooms = BTreeMap, usize, u64)>; +type KnownRooms = BTreeMap>; /// `POST /_matrix/client/unstable/org.matrix.simplified_msc3575/sync` /// ([MSC4186]) @@ -50,7 +59,7 @@ type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request /// [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575 /// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186 pub(crate) async fn sync_events_v5_route( - State(services): State, + State(ref services): State, body: Ruma, ) -> Result { debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); @@ -89,65 +98,77 @@ pub(crate) async fn sync_events_v5_route( .sync .update_snake_sync_request_with_cache(&snake_key, &mut body); - let all_joined_rooms: Vec<_> = services + let all_joined_rooms = services .rooms .state_cache .rooms_joined(sender_user) .map(ToOwned::to_owned) - .collect() - .await; + .collect::>(); - let all_invited_rooms: Vec<_> = services + let all_invited_rooms = services .rooms .state_cache .rooms_invited(sender_user) .map(|r| r.0) - .collect() - .await; + .collect::>(); - let all_knocked_rooms: Vec<_> = services + let all_knocked_rooms = services .rooms .state_cache .rooms_knocked(sender_user) .map(|r| r.0) - .collect() - .await; + .collect::>(); - let all_rooms: Vec<&RoomId> = all_joined_rooms - .iter() - .map(AsRef::as_ref) - .chain(all_invited_rooms.iter().map(AsRef::as_ref)) - .chain(all_knocked_rooms.iter().map(AsRef::as_ref)) - .collect(); + let (all_joined_rooms, all_invited_rooms, all_knocked_rooms) = + join3(all_joined_rooms, all_invited_rooms, all_knocked_rooms).await; - let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect(); - let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect(); + let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref); + let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref); + let all_knocked_rooms = all_knocked_rooms.iter().map(AsRef::as_ref); + let all_rooms = all_joined_rooms + .clone() + .chain(all_invited_rooms.clone()) + .chain(all_knocked_rooms.clone()); let pos = next_batch.clone().to_string(); let mut todo_rooms: TodoRooms = BTreeMap::new(); let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &body); + + let account_data = collect_account_data(services, sync_info).map(Ok); + + let e2ee = collect_e2ee(services, sync_info, all_joined_rooms.clone()); + + let to_device = collect_to_device(services, sync_info, next_batch).map(Ok); + + let receipts = collect_receipts(services).map(Ok); + + let (account_data, e2ee, to_device, receipts) = + try_join4(account_data, e2ee, to_device, receipts).await?; + + let extensions = sync_events::v5::response::Extensions { + account_data, + e2ee, + to_device, + receipts, + typing: sync_events::v5::response::Typing::default(), + }; + let mut response = sync_events::v5::Response { txn_id: body.txn_id.clone(), pos, lists: BTreeMap::new(), rooms: BTreeMap::new(), - extensions: sync_events::v5::response::Extensions { - account_data: collect_account_data(services, sync_info).await, - e2ee: collect_e2ee(services, sync_info, &all_joined_rooms).await?, - to_device: collect_to_device(services, sync_info, next_batch).await, - receipts: collect_receipts(services).await, - typing: sync_events::v5::response::Typing::default(), - }, + extensions, }; handle_lists( services, sync_info, - &all_invited_rooms, - &all_joined_rooms, - &all_rooms, + all_invited_rooms.clone(), + all_joined_rooms.clone(), + all_rooms, &mut todo_rooms, &known_rooms, &mut response, @@ -160,7 +181,7 @@ pub(crate) async fn sync_events_v5_route( services, sender_user, next_batch, - &all_invited_rooms, + all_invited_rooms.clone(), &todo_rooms, &mut response, &body, @@ -185,31 +206,33 @@ pub(crate) async fn sync_events_v5_route( } trace!( - rooms=?response.rooms.len(), - account_data=?response.extensions.account_data.rooms.len(), - receipts=?response.extensions.receipts.rooms.len(), + rooms = ?response.rooms.len(), + account_data = ?response.extensions.account_data.rooms.len(), + receipts = ?response.extensions.receipts.rooms.len(), "responding to request with" ); Ok(response) } -type KnownRooms = BTreeMap>; -pub(crate) type TodoRooms = BTreeMap, usize, u64)>; - async fn fetch_subscriptions( - services: crate::State, + services: &Services, (sender_user, sender_device, globalsince, body): SyncInfo<'_>, known_rooms: &KnownRooms, todo_rooms: &mut TodoRooms, ) { let mut known_subscription_rooms = BTreeSet::new(); for (room_id, room) in &body.room_subscriptions { - if !services.rooms.metadata.exists(room_id).await - || services.rooms.metadata.is_disabled(room_id).await - || services.rooms.metadata.is_banned(room_id).await - { + let not_exists = services.rooms.metadata.exists(room_id).eq(&false); + + let is_disabled = services.rooms.metadata.is_disabled(room_id); + + let is_banned = services.rooms.metadata.is_banned(room_id); + + pin_mut!(not_exists, is_disabled, is_banned); + if not_exists.or(is_disabled).or(is_banned).await { continue; } + let todo_room = todo_rooms .entry(room_id.clone()) @@ -251,27 +274,39 @@ async fn fetch_subscriptions( } #[allow(clippy::too_many_arguments)] -async fn handle_lists<'a>( - services: crate::State, +async fn handle_lists<'a, Rooms, AllRooms>( + services: &Services, (sender_user, sender_device, globalsince, body): SyncInfo<'_>, - all_invited_rooms: &Vec<&'a RoomId>, - all_joined_rooms: &Vec<&'a RoomId>, - all_rooms: &Vec<&'a RoomId>, + all_invited_rooms: Rooms, + all_joined_rooms: Rooms, + all_rooms: AllRooms, todo_rooms: &'a mut TodoRooms, known_rooms: &'a KnownRooms, response: &'_ mut sync_events::v5::Response, -) -> KnownRooms { +) -> KnownRooms +where + Rooms: Iterator + Clone + Send + 'a, + AllRooms: Iterator + Clone + Send + 'a, +{ for (list_id, list) in &body.lists { - let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) { - | Some(true) => all_invited_rooms, - | Some(false) => all_joined_rooms, - | None => all_rooms, + let active_rooms: Vec<_> = match list.filters.as_ref().and_then(|f| f.is_invite) { + | None => all_rooms.clone().collect(), + | Some(true) => all_invited_rooms.clone().collect(), + | Some(false) => all_joined_rooms.clone().collect(), }; - let active_rooms = match list.filters.clone().map(|f| f.not_room_types) { - | Some(filter) if filter.is_empty() => active_rooms, - | Some(value) => &filter_rooms(&services, active_rooms, &value, true).await, + let active_rooms = match list.filters.as_ref().map(|f| &f.not_room_types) { | None => active_rooms, + | Some(filter) if filter.is_empty() => active_rooms, + | Some(value) => + filter_rooms( + services, + value, + &true, + active_rooms.iter().stream().map(Deref::deref), + ) + .collect() + .await, }; let mut new_known_rooms: BTreeSet = BTreeSet::new(); @@ -289,6 +324,7 @@ async fn handle_lists<'a>( let new_rooms: BTreeSet = room_ids.clone().into_iter().map(From::from).collect(); + new_known_rooms.extend(new_rooms); //new_known_rooms.extend(room_ids..cloned()); for room_id in room_ids { @@ -334,18 +370,22 @@ async fn handle_lists<'a>( ); } } + BTreeMap::default() } -async fn process_rooms( - services: crate::State, +async fn process_rooms<'a, Rooms>( + services: &Services, sender_user: &UserId, next_batch: u64, - all_invited_rooms: &[&RoomId], + all_invited_rooms: Rooms, todo_rooms: &TodoRooms, response: &mut sync_events::v5::Response, body: &sync_events::v5::Request, -) -> Result> { +) -> Result> +where + Rooms: Iterator + Clone + Send + 'a, +{ let mut rooms = BTreeMap::new(); for (room_id, (required_state_request, timeline_limit, roomsince)) in todo_rooms { let roomsincecount = PduCount::Normal(*roomsince); @@ -354,7 +394,7 @@ async fn process_rooms( let mut invite_state = None; let (timeline_pdus, limited); let new_room_id: &RoomId = (*room_id).as_ref(); - if all_invited_rooms.contains(&new_room_id) { + if all_invited_rooms.clone().any(is_equal_to!(new_room_id)) { // TODO: figure out a timestamp we can use for remote invites invite_state = services .rooms @@ -366,7 +406,7 @@ async fn process_rooms( (timeline_pdus, limited) = (Vec::new(), true); } else { (timeline_pdus, limited) = match load_timeline( - &services, + services, sender_user, room_id, roomsincecount, @@ -399,18 +439,17 @@ async fn process_rooms( .rooms .read_receipt .last_privateread_update(sender_user, room_id) - .await > *roomsince; + .await; - let private_read_event = if last_privateread_update { - services - .rooms - .read_receipt - .private_read_get(room_id, sender_user) - .await - .ok() - } else { - None - }; + let private_read_event: OptionFuture<_> = (last_privateread_update > *roomsince) + .then(|| { + services + .rooms + .read_receipt + .private_read_get(room_id, sender_user) + .ok() + }) + .into(); let mut receipts: Vec> = services .rooms @@ -426,7 +465,7 @@ async fn process_rooms( .collect() .await; - if let Some(private_read_event) = private_read_event { + if let Some(private_read_event) = private_read_event.await.flatten() { receipts.push(private_read_event); } @@ -475,7 +514,7 @@ async fn process_rooms( let room_events: Vec<_> = timeline_pdus .iter() .stream() - .filter_map(|item| ignored_filter(&services, item.clone(), sender_user)) + .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) .map(|(_, pdu)| pdu.to_sync_room_event()) .collect() .await; @@ -627,7 +666,7 @@ async fn process_rooms( Ok(rooms) } async fn collect_account_data( - services: crate::State, + services: &Services, (sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request), ) -> sync_events::v5::response::AccountData { let mut account_data = sync_events::v5::response::AccountData { @@ -663,16 +702,19 @@ async fn collect_account_data( account_data } -async fn collect_e2ee<'a>( - services: crate::State, +async fn collect_e2ee<'a, Rooms>( + services: &Services, (sender_user, sender_device, globalsince, body): ( &UserId, &DeviceId, u64, &sync_events::v5::Request, ), - all_joined_rooms: &'a Vec<&'a RoomId>, -) -> Result { + all_joined_rooms: Rooms, +) -> Result +where + Rooms: Iterator + Send + 'a, +{ if !body.extensions.e2ee.enabled.unwrap_or(false) { return Ok(sync_events::v5::response::E2EE::default()); } @@ -773,7 +815,7 @@ async fn collect_e2ee<'a>( | MembershipState::Join => { // A new user joined an encrypted room if !share_encrypted_room( - &services, + services, sender_user, user_id, Some(room_id), @@ -806,7 +848,7 @@ async fn collect_e2ee<'a>( // Only send keys if the sender doesn't share an encrypted room with the target // already .filter_map(|user_id| { - share_encrypted_room(&services, sender_user, user_id, Some(room_id)) + share_encrypted_room(services, sender_user, user_id, Some(room_id)) .map(|res| res.or_some(user_id.to_owned())) }) .collect::>() @@ -829,7 +871,7 @@ async fn collect_e2ee<'a>( for user_id in left_encrypted_users { let dont_share_encrypted_room = - !share_encrypted_room(&services, sender_user, &user_id, None).await; + !share_encrypted_room(services, sender_user, &user_id, None).await; // If the user doesn't share an encrypted room with the target anymore, we need // to tell them @@ -839,20 +881,22 @@ async fn collect_e2ee<'a>( } Ok(sync_events::v5::response::E2EE { - device_lists: DeviceLists { - changed: device_list_changes.into_iter().collect(), - left: device_list_left.into_iter().collect(), - }, + device_unused_fallback_key_types: None, + device_one_time_keys_count: services .users .count_one_time_keys(sender_user, sender_device) .await, - device_unused_fallback_key_types: None, + + device_lists: DeviceLists { + changed: device_list_changes.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }, }) } async fn collect_to_device( - services: crate::State, + services: &Services, (sender_user, sender_device, globalsince, body): SyncInfo<'_>, next_batch: u64, ) -> Option { @@ -875,7 +919,35 @@ async fn collect_to_device( }) } -async fn collect_receipts(_services: crate::State) -> sync_events::v5::response::Receipts { +async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts { sync_events::v5::response::Receipts { rooms: BTreeMap::new() } // TODO: get explicitly requested read receipts } + +fn filter_rooms<'a, Rooms>( + services: &'a Services, + filter: &'a [RoomTypeFilter], + negate: &'a bool, + rooms: Rooms, +) -> impl Stream + Send + 'a +where + Rooms: Stream + Send + 'a, +{ + rooms.filter_map(async |room_id| { + let room_type = services.rooms.state_accessor.get_room_type(room_id).await; + + if room_type.as_ref().is_err_and(|e| !e.is_not_found()) { + return None; + } + + let room_type_filter = RoomTypeFilter::from(room_type.ok()); + + let include = if *negate { + !filter.contains(&room_type_filter) + } else { + filter.is_empty() || filter.contains(&room_type_filter) + }; + + include.then_some(room_id) + }) +}