increase snake sync asynchronicity

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-06 21:59:18 +00:00
parent ccf10c6b47
commit e0508958b7
4 changed files with 212 additions and 135 deletions

View file

@ -7,6 +7,7 @@ use std::{
use axum::extract::State;
use conduwuit::{
Err, Error, PduCount, PduEvent, Result, debug, error, extract_variant,
matrix::TypeStateKey,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
math::{ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
@ -14,6 +15,7 @@ use conduwuit::{
warn,
};
use conduwuit_service::{
Services,
rooms::read_receipt::pack_receipts,
sync::{into_db_key, into_snake_key},
};
@ -24,6 +26,7 @@ use ruma::{
self, DeviceLists, UnreadNotificationsCount,
v4::{SlidingOp, SlidingSyncRoomHero},
},
directory::RoomTypeFilter,
events::{
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
TimelineEventType::*,
@ -36,10 +39,11 @@ use ruma::{
use super::{load_timeline, share_encrypted_room};
use crate::{
Ruma,
client::{DEFAULT_BUMP_TYPES, filter_rooms, ignored_filter, sync::v5::TodoRooms},
client::{DEFAULT_BUMP_TYPES, ignored_filter},
};
pub(crate) const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
/// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
///
@ -802,3 +806,33 @@ pub(crate) async fn sync_events_v4_route(
delta_token: None,
})
}
async fn filter_rooms<'a>(
services: &Services,
rooms: &[&'a RoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<&'a RoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;
if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}
let room_type_filter = RoomTypeFilter::from(room_type.ok());
let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};
include.then_some(r)
})
.collect()
.await
}