From 43ce46ff7e37a9e3b1f88774374dca796ae1391c Mon Sep 17 00:00:00 2001
From: Jason Volk <jason@zemos.net>
Date: Sun, 6 Apr 2025 21:59:18 +0000
Subject: [PATCH] increase snake sync asynchronicity

Signed-off-by: Jason Volk <jason@zemos.net>
---
 src/api/client/sync/mod.rs |  36 +-----
 src/api/client/sync/v3.rs  |  17 ++-
 src/api/client/sync/v4.rs  |  38 +++++-
 src/api/client/sync/v5.rs  | 256 ++++++++++++++++++++++++-------------
 4 files changed, 212 insertions(+), 135 deletions(-)

diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs
index 14459acf..40370160 100644
--- a/src/api/client/sync/mod.rs
+++ b/src/api/client/sync/mod.rs
@@ -5,16 +5,12 @@ mod v5;
 use conduwuit::{
 	Error, PduCount, Result,
 	matrix::pdu::PduEvent,
-	utils::{
-		IterStream,
-		stream::{BroadbandExt, ReadyExt, TryIgnore},
-	},
+	utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
 };
 use conduwuit_service::Services;
 use futures::{StreamExt, pin_mut};
 use ruma::{
 	RoomId, UserId,
-	directory::RoomTypeFilter,
 	events::TimelineEventType::{
 		self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker,
 	},
@@ -87,33 +83,3 @@ async fn share_encrypted_room(
 		})
 		.await
 }
-
-pub(crate) async fn filter_rooms<'a>(
-	services: &Services,
-	rooms: &[&'a RoomId],
-	filter: &[RoomTypeFilter],
-	negate: bool,
-) -> Vec<&'a RoomId> {
-	rooms
-		.iter()
-		.stream()
-		.filter_map(|r| async move {
-			let room_type = services.rooms.state_accessor.get_room_type(r).await;
-
-			if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
-				return None;
-			}
-
-			let room_type_filter = RoomTypeFilter::from(room_type.ok());
-
-			let include = if negate {
-				!filter.contains(&room_type_filter)
-			} else {
-				filter.is_empty() || filter.contains(&room_type_filter)
-			};
-
-			include.then_some(r)
-		})
-		.collect()
-		.await
-}
diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs
index 24930941..8eac6b66 100644
--- a/src/api/client/sync/v3.rs
+++ b/src/api/client/sync/v3.rs
@@ -14,8 +14,8 @@ use conduwuit::{
 	pair_of, ref_at,
 	result::FlatOk,
 	utils::{
-		self, BoolExt, IterStream, ReadyExt, TryFutureExtExt,
-		future::OptionStream,
+		self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
+		future::{OptionStream, ReadyEqExt},
 		math::ruma_from_u64,
 		stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
 	},
@@ -32,6 +32,7 @@ use conduwuit_service::{
 use futures::{
 	FutureExt, StreamExt, TryFutureExt, TryStreamExt,
 	future::{OptionFuture, join, join3, join4, join5, try_join, try_join4},
+	pin_mut,
 };
 use ruma::{
 	DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
@@ -433,10 +434,14 @@ async fn handle_left_room(
 		return Ok(None);
 	}
 
-	if !services.rooms.metadata.exists(room_id).await
-		|| services.rooms.metadata.is_disabled(room_id).await
-		|| services.rooms.metadata.is_banned(room_id).await
-	{
+	let is_not_found = services.rooms.metadata.exists(room_id).eq(&false);
+
+	let is_disabled = services.rooms.metadata.is_disabled(room_id);
+
+	let is_banned = services.rooms.metadata.is_banned(room_id);
+
+	pin_mut!(is_not_found, is_disabled, is_banned);
+	if is_not_found.or(is_disabled).or(is_banned).await {
 		// This is just a rejected invite, not a room we know
 		// Insert a leave event anyways for the client
 		let event = PduEvent {
diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs
index 55faf420..f153b2da 100644
--- a/src/api/client/sync/v4.rs
+++ b/src/api/client/sync/v4.rs
@@ -7,6 +7,7 @@ use std::{
 use axum::extract::State;
 use conduwuit::{
 	Err, Error, PduCount, PduEvent, Result, debug, error, extract_variant,
+	matrix::TypeStateKey,
 	utils::{
 		BoolExt, IterStream, ReadyExt, TryFutureExtExt,
 		math::{ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
@@ -14,6 +15,7 @@ use conduwuit::{
 	warn,
 };
 use conduwuit_service::{
+	Services,
 	rooms::read_receipt::pack_receipts,
 	sync::{into_db_key, into_snake_key},
 };
@@ -24,6 +26,7 @@ use ruma::{
 		self, DeviceLists, UnreadNotificationsCount,
 		v4::{SlidingOp, SlidingSyncRoomHero},
 	},
+	directory::RoomTypeFilter,
 	events::{
 		AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
 		TimelineEventType::*,
@@ -36,10 +39,11 @@ use ruma::{
 use super::{load_timeline, share_encrypted_room};
 use crate::{
 	Ruma,
-	client::{DEFAULT_BUMP_TYPES, filter_rooms, ignored_filter, sync::v5::TodoRooms},
+	client::{DEFAULT_BUMP_TYPES, ignored_filter},
 };
 
-pub(crate) const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
+type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
+const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
 
 /// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
 ///
@@ -802,3 +806,33 @@ pub(crate) async fn sync_events_v4_route(
 		delta_token: None,
 	})
 }
+
+async fn filter_rooms<'a>(
+	services: &Services,
+	rooms: &[&'a RoomId],
+	filter: &[RoomTypeFilter],
+	negate: bool,
+) -> Vec<&'a RoomId> {
+	rooms
+		.iter()
+		.stream()
+		.filter_map(|r| async move {
+			let room_type = services.rooms.state_accessor.get_room_type(r).await;
+
+			if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
+				return None;
+			}
+
+			let room_type_filter = RoomTypeFilter::from(room_type.ok());
+
+			let include = if negate {
+				!filter.contains(&room_type_filter)
+			} else {
+				filter.is_empty() || filter.contains(&room_type_filter)
+			};
+
+			include.then_some(r)
+		})
+		.collect()
+		.await
+}
diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs
index 00a2d18d..f3fc0f44 100644
--- a/src/api/client/sync/v5.rs
+++ b/src/api/client/sync/v5.rs
@@ -1,28 +1,35 @@
 use std::{
 	cmp::{self, Ordering},
 	collections::{BTreeMap, BTreeSet, HashMap, HashSet},
+	ops::Deref,
 	time::Duration,
 };
 
 use axum::extract::State;
 use conduwuit::{
-	Err, Error, Result, error, extract_variant,
+	Err, Error, Result, error, extract_variant, is_equal_to,
 	matrix::{
 		TypeStateKey,
 		pdu::{PduCount, PduEvent},
 	},
 	trace,
 	utils::{
-		BoolExt, IterStream, ReadyExt, TryFutureExtExt,
+		BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
+		future::ReadyEqExt,
 		math::{ruma_from_usize, usize_from_ruma},
 	},
 	warn,
 };
-use conduwuit_service::{rooms::read_receipt::pack_receipts, sync::into_snake_key};
-use futures::{FutureExt, StreamExt, TryFutureExt};
+use conduwuit_service::{Services, rooms::read_receipt::pack_receipts, sync::into_snake_key};
+use futures::{
+	FutureExt, Stream, StreamExt, TryFutureExt,
+	future::{OptionFuture, join3, try_join4},
+	pin_mut,
+};
 use ruma::{
 	DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
 	api::client::sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
+	directory::RoomTypeFilter,
 	events::{
 		AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType,
 		room::member::{MembershipState, RoomMemberEventContent},
@@ -31,13 +38,15 @@ use ruma::{
 	uint,
 };
 
-use super::{filter_rooms, share_encrypted_room};
+use super::share_encrypted_room;
 use crate::{
 	Ruma,
 	client::{DEFAULT_BUMP_TYPES, ignored_filter, sync::load_timeline},
 };
 
 type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request);
+type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
+type KnownRooms = BTreeMap<String, BTreeMap<OwnedRoomId, u64>>;
 
 /// `POST /_matrix/client/unstable/org.matrix.simplified_msc3575/sync`
 /// ([MSC4186])
@@ -50,7 +59,7 @@ type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request
 /// [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575
 /// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
 pub(crate) async fn sync_events_v5_route(
-	State(services): State<crate::State>,
+	State(ref services): State<crate::State>,
 	body: Ruma<sync_events::v5::Request>,
 ) -> Result<sync_events::v5::Response> {
 	debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
@@ -89,65 +98,77 @@ pub(crate) async fn sync_events_v5_route(
 		.sync
 		.update_snake_sync_request_with_cache(&snake_key, &mut body);
 
-	let all_joined_rooms: Vec<_> = services
+	let all_joined_rooms = services
 		.rooms
 		.state_cache
 		.rooms_joined(sender_user)
 		.map(ToOwned::to_owned)
-		.collect()
-		.await;
+		.collect::<Vec<OwnedRoomId>>();
 
-	let all_invited_rooms: Vec<_> = services
+	let all_invited_rooms = services
 		.rooms
 		.state_cache
 		.rooms_invited(sender_user)
 		.map(|r| r.0)
-		.collect()
-		.await;
+		.collect::<Vec<OwnedRoomId>>();
 
-	let all_knocked_rooms: Vec<_> = services
+	let all_knocked_rooms = services
 		.rooms
 		.state_cache
 		.rooms_knocked(sender_user)
 		.map(|r| r.0)
-		.collect()
-		.await;
+		.collect::<Vec<OwnedRoomId>>();
 
-	let all_rooms: Vec<&RoomId> = all_joined_rooms
-		.iter()
-		.map(AsRef::as_ref)
-		.chain(all_invited_rooms.iter().map(AsRef::as_ref))
-		.chain(all_knocked_rooms.iter().map(AsRef::as_ref))
-		.collect();
+	let (all_joined_rooms, all_invited_rooms, all_knocked_rooms) =
+		join3(all_joined_rooms, all_invited_rooms, all_knocked_rooms).await;
 
-	let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect();
-	let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect();
+	let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref);
+	let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref);
+	let all_knocked_rooms = all_knocked_rooms.iter().map(AsRef::as_ref);
+	let all_rooms = all_joined_rooms
+		.clone()
+		.chain(all_invited_rooms.clone())
+		.chain(all_knocked_rooms.clone());
 
 	let pos = next_batch.clone().to_string();
 
 	let mut todo_rooms: TodoRooms = BTreeMap::new();
 
 	let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &body);
+
+	let account_data = collect_account_data(services, sync_info).map(Ok);
+
+	let e2ee = collect_e2ee(services, sync_info, all_joined_rooms.clone());
+
+	let to_device = collect_to_device(services, sync_info, next_batch).map(Ok);
+
+	let receipts = collect_receipts(services).map(Ok);
+
+	let (account_data, e2ee, to_device, receipts) =
+		try_join4(account_data, e2ee, to_device, receipts).await?;
+
+	let extensions = sync_events::v5::response::Extensions {
+		account_data,
+		e2ee,
+		to_device,
+		receipts,
+		typing: sync_events::v5::response::Typing::default(),
+	};
+
 	let mut response = sync_events::v5::Response {
 		txn_id: body.txn_id.clone(),
 		pos,
 		lists: BTreeMap::new(),
 		rooms: BTreeMap::new(),
-		extensions: sync_events::v5::response::Extensions {
-			account_data: collect_account_data(services, sync_info).await,
-			e2ee: collect_e2ee(services, sync_info, &all_joined_rooms).await?,
-			to_device: collect_to_device(services, sync_info, next_batch).await,
-			receipts: collect_receipts(services).await,
-			typing: sync_events::v5::response::Typing::default(),
-		},
+		extensions,
 	};
 
 	handle_lists(
 		services,
 		sync_info,
-		&all_invited_rooms,
-		&all_joined_rooms,
-		&all_rooms,
+		all_invited_rooms.clone(),
+		all_joined_rooms.clone(),
+		all_rooms,
 		&mut todo_rooms,
 		&known_rooms,
 		&mut response,
@@ -160,7 +181,7 @@ pub(crate) async fn sync_events_v5_route(
 		services,
 		sender_user,
 		next_batch,
-		&all_invited_rooms,
+		all_invited_rooms.clone(),
 		&todo_rooms,
 		&mut response,
 		&body,
@@ -185,31 +206,33 @@ pub(crate) async fn sync_events_v5_route(
 	}
 
 	trace!(
-		rooms=?response.rooms.len(),
-		account_data=?response.extensions.account_data.rooms.len(),
-		receipts=?response.extensions.receipts.rooms.len(),
+		rooms = ?response.rooms.len(),
+		account_data = ?response.extensions.account_data.rooms.len(),
+		receipts = ?response.extensions.receipts.rooms.len(),
 		"responding to request with"
 	);
 	Ok(response)
 }
 
-type KnownRooms = BTreeMap<String, BTreeMap<OwnedRoomId, u64>>;
-pub(crate) type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
-
 async fn fetch_subscriptions(
-	services: crate::State,
+	services: &Services,
 	(sender_user, sender_device, globalsince, body): SyncInfo<'_>,
 	known_rooms: &KnownRooms,
 	todo_rooms: &mut TodoRooms,
 ) {
 	let mut known_subscription_rooms = BTreeSet::new();
 	for (room_id, room) in &body.room_subscriptions {
-		if !services.rooms.metadata.exists(room_id).await
-			|| services.rooms.metadata.is_disabled(room_id).await
-			|| services.rooms.metadata.is_banned(room_id).await
-		{
+		let not_exists = services.rooms.metadata.exists(room_id).eq(&false);
+
+		let is_disabled = services.rooms.metadata.is_disabled(room_id);
+
+		let is_banned = services.rooms.metadata.is_banned(room_id);
+
+		pin_mut!(not_exists, is_disabled, is_banned);
+		if not_exists.or(is_disabled).or(is_banned).await {
 			continue;
 		}
+
 		let todo_room =
 			todo_rooms
 				.entry(room_id.clone())
@@ -251,27 +274,39 @@ async fn fetch_subscriptions(
 }
 
 #[allow(clippy::too_many_arguments)]
-async fn handle_lists<'a>(
-	services: crate::State,
+async fn handle_lists<'a, Rooms, AllRooms>(
+	services: &Services,
 	(sender_user, sender_device, globalsince, body): SyncInfo<'_>,
-	all_invited_rooms: &Vec<&'a RoomId>,
-	all_joined_rooms: &Vec<&'a RoomId>,
-	all_rooms: &Vec<&'a RoomId>,
+	all_invited_rooms: Rooms,
+	all_joined_rooms: Rooms,
+	all_rooms: AllRooms,
 	todo_rooms: &'a mut TodoRooms,
 	known_rooms: &'a KnownRooms,
 	response: &'_ mut sync_events::v5::Response,
-) -> KnownRooms {
+) -> KnownRooms
+where
+	Rooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
+	AllRooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
+{
 	for (list_id, list) in &body.lists {
-		let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) {
-			| Some(true) => all_invited_rooms,
-			| Some(false) => all_joined_rooms,
-			| None => all_rooms,
+		let active_rooms: Vec<_> = match list.filters.as_ref().and_then(|f| f.is_invite) {
+			| None => all_rooms.clone().collect(),
+			| Some(true) => all_invited_rooms.clone().collect(),
+			| Some(false) => all_joined_rooms.clone().collect(),
 		};
 
-		let active_rooms = match list.filters.clone().map(|f| f.not_room_types) {
-			| Some(filter) if filter.is_empty() => active_rooms,
-			| Some(value) => &filter_rooms(&services, active_rooms, &value, true).await,
+		let active_rooms = match list.filters.as_ref().map(|f| &f.not_room_types) {
 			| None => active_rooms,
+			| Some(filter) if filter.is_empty() => active_rooms,
+			| Some(value) =>
+				filter_rooms(
+					services,
+					value,
+					&true,
+					active_rooms.iter().stream().map(Deref::deref),
+				)
+				.collect()
+				.await,
 		};
 
 		let mut new_known_rooms: BTreeSet<OwnedRoomId> = BTreeSet::new();
@@ -289,6 +324,7 @@ async fn handle_lists<'a>(
 
 			let new_rooms: BTreeSet<OwnedRoomId> =
 				room_ids.clone().into_iter().map(From::from).collect();
+
 			new_known_rooms.extend(new_rooms);
 			//new_known_rooms.extend(room_ids..cloned());
 			for room_id in room_ids {
@@ -334,18 +370,22 @@ async fn handle_lists<'a>(
 			);
 		}
 	}
+
 	BTreeMap::default()
 }
 
-async fn process_rooms(
-	services: crate::State,
+async fn process_rooms<'a, Rooms>(
+	services: &Services,
 	sender_user: &UserId,
 	next_batch: u64,
-	all_invited_rooms: &[&RoomId],
+	all_invited_rooms: Rooms,
 	todo_rooms: &TodoRooms,
 	response: &mut sync_events::v5::Response,
 	body: &sync_events::v5::Request,
-) -> Result<BTreeMap<OwnedRoomId, sync_events::v5::response::Room>> {
+) -> Result<BTreeMap<OwnedRoomId, sync_events::v5::response::Room>>
+where
+	Rooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
+{
 	let mut rooms = BTreeMap::new();
 	for (room_id, (required_state_request, timeline_limit, roomsince)) in todo_rooms {
 		let roomsincecount = PduCount::Normal(*roomsince);
@@ -354,7 +394,7 @@ async fn process_rooms(
 		let mut invite_state = None;
 		let (timeline_pdus, limited);
 		let new_room_id: &RoomId = (*room_id).as_ref();
-		if all_invited_rooms.contains(&new_room_id) {
+		if all_invited_rooms.clone().any(is_equal_to!(new_room_id)) {
 			// TODO: figure out a timestamp we can use for remote invites
 			invite_state = services
 				.rooms
@@ -366,7 +406,7 @@ async fn process_rooms(
 			(timeline_pdus, limited) = (Vec::new(), true);
 		} else {
 			(timeline_pdus, limited) = match load_timeline(
-				&services,
+				services,
 				sender_user,
 				room_id,
 				roomsincecount,
@@ -399,18 +439,17 @@ async fn process_rooms(
 			.rooms
 			.read_receipt
 			.last_privateread_update(sender_user, room_id)
-			.await > *roomsince;
+			.await;
 
-		let private_read_event = if last_privateread_update {
-			services
-				.rooms
-				.read_receipt
-				.private_read_get(room_id, sender_user)
-				.await
-				.ok()
-		} else {
-			None
-		};
+		let private_read_event: OptionFuture<_> = (last_privateread_update > *roomsince)
+			.then(|| {
+				services
+					.rooms
+					.read_receipt
+					.private_read_get(room_id, sender_user)
+					.ok()
+			})
+			.into();
 
 		let mut receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
 			.rooms
@@ -426,7 +465,7 @@ async fn process_rooms(
 			.collect()
 			.await;
 
-		if let Some(private_read_event) = private_read_event {
+		if let Some(private_read_event) = private_read_event.await.flatten() {
 			receipts.push(private_read_event);
 		}
 
@@ -475,7 +514,7 @@ async fn process_rooms(
 		let room_events: Vec<_> = timeline_pdus
 			.iter()
 			.stream()
-			.filter_map(|item| ignored_filter(&services, item.clone(), sender_user))
+			.filter_map(|item| ignored_filter(services, item.clone(), sender_user))
 			.map(|(_, pdu)| pdu.to_sync_room_event())
 			.collect()
 			.await;
@@ -627,7 +666,7 @@ async fn process_rooms(
 	Ok(rooms)
 }
 async fn collect_account_data(
-	services: crate::State,
+	services: &Services,
 	(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
 ) -> sync_events::v5::response::AccountData {
 	let mut account_data = sync_events::v5::response::AccountData {
@@ -663,16 +702,19 @@ async fn collect_account_data(
 	account_data
 }
 
-async fn collect_e2ee<'a>(
-	services: crate::State,
+async fn collect_e2ee<'a, Rooms>(
+	services: &Services,
 	(sender_user, sender_device, globalsince, body): (
 		&UserId,
 		&DeviceId,
 		u64,
 		&sync_events::v5::Request,
 	),
-	all_joined_rooms: &'a Vec<&'a RoomId>,
-) -> Result<sync_events::v5::response::E2EE> {
+	all_joined_rooms: Rooms,
+) -> Result<sync_events::v5::response::E2EE>
+where
+	Rooms: Iterator<Item = &'a RoomId> + Send + 'a,
+{
 	if !body.extensions.e2ee.enabled.unwrap_or(false) {
 		return Ok(sync_events::v5::response::E2EE::default());
 	}
@@ -773,7 +815,7 @@ async fn collect_e2ee<'a>(
 									| MembershipState::Join => {
 										// A new user joined an encrypted room
 										if !share_encrypted_room(
-											&services,
+											services,
 											sender_user,
 											user_id,
 											Some(room_id),
@@ -806,7 +848,7 @@ async fn collect_e2ee<'a>(
 						// Only send keys if the sender doesn't share an encrypted room with the target
 						// already
 						.filter_map(|user_id| {
-							share_encrypted_room(&services, sender_user, user_id, Some(room_id))
+							share_encrypted_room(services, sender_user, user_id, Some(room_id))
 								.map(|res| res.or_some(user_id.to_owned()))
 						})
 						.collect::<Vec<_>>()
@@ -829,7 +871,7 @@ async fn collect_e2ee<'a>(
 
 	for user_id in left_encrypted_users {
 		let dont_share_encrypted_room =
-			!share_encrypted_room(&services, sender_user, &user_id, None).await;
+			!share_encrypted_room(services, sender_user, &user_id, None).await;
 
 		// If the user doesn't share an encrypted room with the target anymore, we need
 		// to tell them
@@ -839,20 +881,22 @@ async fn collect_e2ee<'a>(
 	}
 
 	Ok(sync_events::v5::response::E2EE {
-		device_lists: DeviceLists {
-			changed: device_list_changes.into_iter().collect(),
-			left: device_list_left.into_iter().collect(),
-		},
+		device_unused_fallback_key_types: None,
+
 		device_one_time_keys_count: services
 			.users
 			.count_one_time_keys(sender_user, sender_device)
 			.await,
-		device_unused_fallback_key_types: None,
+
+		device_lists: DeviceLists {
+			changed: device_list_changes.into_iter().collect(),
+			left: device_list_left.into_iter().collect(),
+		},
 	})
 }
 
 async fn collect_to_device(
-	services: crate::State,
+	services: &Services,
 	(sender_user, sender_device, globalsince, body): SyncInfo<'_>,
 	next_batch: u64,
 ) -> Option<sync_events::v5::response::ToDevice> {
@@ -875,7 +919,35 @@ async fn collect_to_device(
 	})
 }
 
-async fn collect_receipts(_services: crate::State) -> sync_events::v5::response::Receipts {
+async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts {
 	sync_events::v5::response::Receipts { rooms: BTreeMap::new() }
 	// TODO: get explicitly requested read receipts
 }
+
+fn filter_rooms<'a, Rooms>(
+	services: &'a Services,
+	filter: &'a [RoomTypeFilter],
+	negate: &'a bool,
+	rooms: Rooms,
+) -> impl Stream<Item = &'a RoomId> + Send + 'a
+where
+	Rooms: Stream<Item = &'a RoomId> + Send + 'a,
+{
+	rooms.filter_map(async |room_id| {
+		let room_type = services.rooms.state_accessor.get_room_type(room_id).await;
+
+		if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
+			return None;
+		}
+
+		let room_type_filter = RoomTypeFilter::from(room_type.ok());
+
+		let include = if *negate {
+			!filter.contains(&room_type_filter)
+		} else {
+			filter.is_empty() || filter.contains(&room_type_filter)
+		};
+
+		include.then_some(room_id)
+	})
+}