Sliding sync subscriptions, e2ee, to_device messages

This commit is contained in:
Timo Kösters 2023-07-24 10:41:50 +02:00
parent caddc656fb
commit d220641d64
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
2 changed files with 361 additions and 22 deletions

View file

@ -11,7 +11,10 @@ use ruma::{
device::Device,
error::ErrorKind,
filter::FilterDefinition,
sync::sync_events::{self, v4::SyncRequestList},
sync::sync_events::{
self,
v4::{ExtensionsConfig, SyncRequestList},
},
},
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent,
@ -24,7 +27,9 @@ use crate::{services, Error, Result};
pub struct SlidingSyncCache {
lists: BTreeMap<String, SyncRequestList>,
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, bool>>,
extensions: ExtensionsConfig,
}
pub struct Service {
@ -66,7 +71,9 @@ impl Service {
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(),
}))
}),
);
@ -74,12 +81,13 @@ impl Service {
drop(cache);
for (list_id, list) in &mut request.lists {
if let Some(cached_list) = cached.lists.remove(list_id) {
if let Some(cached_list) = cached.lists.get(list_id) {
if list.sort.is_empty() {
list.sort = cached_list.sort;
list.sort = cached_list.sort.clone();
};
if list.room_details.required_state.is_empty() {
list.room_details.required_state = cached_list.room_details.required_state;
list.room_details.required_state =
cached_list.room_details.required_state.clone();
};
list.room_details.timeline_limit = list
.room_details
@ -88,8 +96,8 @@ impl Service {
list.include_old_rooms = list
.include_old_rooms
.clone()
.or(cached_list.include_old_rooms);
match (&mut list.filters, cached_list.filters) {
.or(cached_list.include_old_rooms.clone());
match (&mut list.filters, cached_list.filters.clone()) {
(Some(list_filters), Some(cached_filters)) => {
list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm);
if list_filters.spaces.is_empty() {
@ -120,15 +128,80 @@ impl Service {
(_, _) => {}
}
if list.bump_event_types.is_empty() {
list.bump_event_types = cached_list.bump_event_types;
list.bump_event_types = cached_list.bump_event_types.clone();
};
}
cached.lists.insert(list_id.clone(), list.clone());
}
cached
.subscriptions
.extend(request.room_subscriptions.clone().into_iter());
request
.room_subscriptions
.extend(cached.subscriptions.clone().into_iter());
request.extensions.e2ee.enabled = request
.extensions
.e2ee
.enabled
.or(cached.extensions.e2ee.enabled);
request.extensions.to_device.enabled = request
.extensions
.to_device
.enabled
.or(cached.extensions.to_device.enabled);
request.extensions.account_data.enabled = request
.extensions
.account_data
.enabled
.or(cached.extensions.account_data.enabled);
request.extensions.account_data.lists = request
.extensions
.account_data
.lists
.clone()
.or(cached.extensions.account_data.lists.clone());
request.extensions.account_data.rooms = request
.extensions
.account_data
.rooms
.clone()
.or(cached.extensions.account_data.rooms.clone());
cached.extensions = request.extensions.clone();
cached.known_rooms.clone()
}
pub fn update_sync_subscriptions(
&self,
user_id: OwnedUserId,
device_id: OwnedDeviceId,
conn_id: String,
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
) {
let cache = &mut self.connections.lock().unwrap();
let cached = Arc::clone(
cache
.entry((user_id, device_id, conn_id))
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(),
}))
}),
);
let cached = &mut cached.lock().unwrap();
drop(cache);
cached.subscriptions = subscriptions;
}
pub fn update_sync_known_rooms(
&self,
user_id: OwnedUserId,
@ -144,7 +217,9 @@ impl Service {
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(),
}))
}),
);