From caddc656fba15ef3e13f65f21a7e6f43eb42e786 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Sun, 23 Jul 2023 21:57:11 +0200
Subject: [PATCH] slightly better sliding sync

---
 src/api/client_server/sync.rs           | 120 ++++++++++++++++++---
 src/service/mod.rs                      |   7 +-
 src/service/rooms/state_accessor/mod.rs |  15 +++
 src/service/users/mod.rs                | 137 +++++++++++++++++++++++-
 4 files changed, 260 insertions(+), 19 deletions(-)

diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index fed4fb73..8883c162 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -23,7 +23,7 @@ use ruma::{
     uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
 };
 use std::{
-    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
+    collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
     sync::Arc,
     time::Duration,
 };
@@ -1174,8 +1174,7 @@ pub async fn sync_events_v4_route(
 ) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
     let sender_user = body.sender_user.expect("user is authenticated");
     let sender_device = body.sender_device.expect("user is authenticated");
-    let body = dbg!(body.body);
-
+    let mut body = dbg!(body.body);
     // Setup watchers, so if there's no response, we can wait for them
     let watcher = services().globals.watch(&sender_user, &sender_device);
 
@@ -1188,7 +1187,21 @@ pub async fn sync_events_v4_route(
         .unwrap_or(0);
     let sincecount = PduCount::Normal(since);
 
-    let initial = since == 0;
+    if since == 0 {
+        if let Some(conn_id) = &body.conn_id {
+            services().users.forget_sync_request_connection(
+                sender_user.clone(),
+                sender_device.clone(),
+                conn_id.clone(),
+            )
+        }
+    }
+
+    let known_rooms = services().users.update_sync_request_with_cache(
+        sender_user.clone(),
+        sender_device.clone(),
+        &mut body,
+    );
 
     let all_joined_rooms = services()
         .rooms
@@ -1205,8 +1218,10 @@ pub async fn sync_events_v4_route(
             continue;
         }
 
+        let mut new_known_rooms = BTreeMap::new();
+
         lists.insert(
-            list_id,
+            list_id.clone(),
             sync_events::v4::SyncList {
                 ops: list
                     .ranges
@@ -1219,14 +1234,27 @@ pub async fn sync_events_v4_route(
                         let room_ids = all_joined_rooms
                             [(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
                             .to_vec();
-                        todo_rooms.extend(room_ids.iter().cloned().map(|r| {
+                        new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true)));
+                        for room_id in &room_ids {
+                            let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
+                                BTreeSet::new(),
+                                0,
+                                true,
+                            ));
                             let limit = list
                                 .room_details
                                 .timeline_limit
                                 .map_or(10, u64::from)
                                 .min(100);
-                            (r, (list.room_details.required_state.clone(), limit))
-                        }));
+                            todo_room
+                                .0
+                                .extend(list.room_details.required_state.iter().cloned());
+                            todo_room.1 = todo_room.1.min(limit);
+                            if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true)
+                            {
+                                todo_room.2 = false;
+                            }
+                        }
                         sync_events::v4::SyncOp {
                             op: SlidingOp::Sync,
                             range: Some(r.clone()),
@@ -1239,12 +1267,36 @@ pub async fn sync_events_v4_route(
                 count: UInt::from(all_joined_rooms.len() as u32),
             },
         );
+
+        if let Some(conn_id) = &body.conn_id {
+            services().users.update_sync_known_rooms(
+                sender_user.clone(),
+                sender_device.clone(),
+                conn_id.clone(),
+                list_id,
+                new_known_rooms,
+            );
+        }
+    }
+
+    for (room_id, room) in body.room_subscriptions {
+        let todo_room = todo_rooms
+            .entry(room_id.clone())
+            .or_insert((BTreeSet::new(), 0, true));
+        let limit = room.timeline_limit.map_or(10, u64::from).min(100);
+        todo_room.0.extend(room.required_state.iter().cloned());
+        todo_room.1 = todo_room.1.min(limit);
+        todo_room.2 = false;
     }
 
     let mut rooms = BTreeMap::new();
-    for (room_id, (required_state_request, timeline_limit)) in todo_rooms {
+    for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms {
         let (timeline_pdus, limited) =
-            load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?;
+            load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?;
+
+        if *known && timeline_pdus.is_empty() {
+            continue;
+        }
 
         let prev_batch = timeline_pdus
             .first()
@@ -1256,7 +1308,14 @@ pub async fn sync_events_v4_route(
                     }
                     PduCount::Normal(c) => c.to_string(),
                 }))
-            })?;
+            })?
+            .or_else(|| {
+                if since != 0 {
+                    Some(since.to_string())
+                } else {
+                    None
+                }
+            });
 
         let room_events: Vec<_> = timeline_pdus
             .iter()
@@ -1279,8 +1338,41 @@ pub async fn sync_events_v4_route(
         rooms.insert(
             room_id.clone(),
             sync_events::v4::SlidingSyncRoom {
-                name: services().rooms.state_accessor.get_name(&room_id)?,
-                initial: Some(initial),
+                name: services()
+                    .rooms
+                    .state_accessor
+                    .get_name(&room_id)?
+                    .or_else(|| {
+                        // Heroes
+                        let mut names = services()
+                            .rooms
+                            .state_cache
+                            .room_members(&room_id)
+                            .filter_map(|r| r.ok())
+                            .filter(|member| member != &sender_user)
+                            .map(|member| {
+                                Ok::<_, Error>(
+                                    services()
+                                        .rooms
+                                        .state_accessor
+                                        .get_member(&room_id, &member)?
+                                        .and_then(|memberevent| memberevent.displayname)
+                                        .unwrap_or(member.to_string()),
+                                )
+                            })
+                            .filter_map(|r| r.ok())
+                            .take(5)
+                            .collect::<Vec<_>>();
+                        if names.len() > 1 {
+                            let last = names.pop().unwrap();
+                            Some(names.join(", ") + " and " + &last)
+                        } else if names.len() == 1 {
+                            Some(names.pop().unwrap())
+                        } else {
+                            None
+                        }
+                    }),
+                initial: Some(*known),
                 is_dm: None,
                 invite_state: None,
                 unread_notifications: UnreadNotificationsCount {
@@ -1326,7 +1418,7 @@ pub async fn sync_events_v4_route(
     }
 
     Ok(dbg!(sync_events::v4::Response {
-        initial: initial,
+        initial: since == 0,
         txn_id: body.txn_id.clone(),
         pos: next_batch.to_string(),
         lists,
diff --git a/src/service/mod.rs b/src/service/mod.rs
index 56aed7fa..f85da788 100644
--- a/src/service/mod.rs
+++ b/src/service/mod.rs
@@ -1,5 +1,5 @@
 use std::{
-    collections::HashMap,
+    collections::{BTreeMap, HashMap},
     sync::{Arc, Mutex},
 };
 
@@ -105,7 +105,10 @@ impl Services {
             },
             transaction_ids: transaction_ids::Service { db },
             uiaa: uiaa::Service { db },
-            users: users::Service { db },
+            users: users::Service {
+                db,
+                connections: Mutex::new(BTreeMap::new()),
+            },
             account_data: account_data::Service { db },
             admin: admin::Service::build(),
             key_backups: key_backups::Service { db },
diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs
index 9d071a53..435f4dff 100644
--- a/src/service/rooms/state_accessor/mod.rs
+++ b/src/service/rooms/state_accessor/mod.rs
@@ -282,4 +282,19 @@ impl Service {
                     .map_err(|_| Error::bad_database("Invalid room name event in database."))
             })
     }
+
+    pub fn get_member(
+        &self,
+        room_id: &RoomId,
+        user_id: &UserId,
+    ) -> Result<Option<RoomMemberEventContent>> {
+        services()
+            .rooms
+            .state_accessor
+            .room_state_get(&room_id, &StateEventType::RoomMember, user_id.as_str())?
+            .map_or(Ok(None), |s| {
+                serde_json::from_str(s.content.get())
+                    .map_err(|_| Error::bad_database("Invalid room member event in database."))
+            })
+    }
 }
diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs
index 6be5c895..63ab9b7d 100644
--- a/src/service/users/mod.rs
+++ b/src/service/users/mod.rs
@@ -1,20 +1,36 @@
 mod data;
-use std::{collections::BTreeMap, mem};
+use std::{
+    collections::BTreeMap,
+    mem,
+    sync::{Arc, Mutex},
+};
 
 pub use data::Data;
 use ruma::{
-    api::client::{device::Device, error::ErrorKind, filter::FilterDefinition},
+    api::client::{
+        device::Device,
+        error::ErrorKind,
+        filter::FilterDefinition,
+        sync::sync_events::{self, v4::SyncRequestList},
+    },
     encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
     events::AnyToDeviceEvent,
     serde::Raw,
     DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
-    OwnedUserId, RoomAliasId, UInt, UserId,
+    OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
 };
 
 use crate::{services, Error, Result};
 
+pub struct SlidingSyncCache {
+    lists: BTreeMap<String, SyncRequestList>,
+    known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, bool>>,
+}
+
 pub struct Service {
     pub db: &'static dyn Data,
+    pub connections:
+        Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>,
 }
 
 impl Service {
@@ -23,6 +39,121 @@ impl Service {
         self.db.exists(user_id)
     }
 
+    pub fn forget_sync_request_connection(
+        &self,
+        user_id: OwnedUserId,
+        device_id: OwnedDeviceId,
+        conn_id: String,
+    ) {
+        self.connections
+            .lock()
+            .unwrap()
+            .remove(&(user_id, device_id, conn_id));
+    }
+
+    pub fn update_sync_request_with_cache(
+        &self,
+        user_id: OwnedUserId,
+        device_id: OwnedDeviceId,
+        request: &mut sync_events::v4::Request,
+    ) -> BTreeMap<String, BTreeMap<OwnedRoomId, bool>> {
+        let Some(conn_id) = request.conn_id.clone() else { return BTreeMap::new(); };
+
+        let cache = &mut self.connections.lock().unwrap();
+        let cached = Arc::clone(
+            cache
+                .entry((user_id, device_id, conn_id))
+                .or_insert_with(|| {
+                    Arc::new(Mutex::new(SlidingSyncCache {
+                        lists: BTreeMap::new(),
+                        known_rooms: BTreeMap::new(),
+                    }))
+                }),
+        );
+        let cached = &mut cached.lock().unwrap();
+        drop(cache);
+
+        for (list_id, list) in &mut request.lists {
+            if let Some(cached_list) = cached.lists.remove(list_id) {
+                if list.sort.is_empty() {
+                    list.sort = cached_list.sort;
+                };
+                if list.room_details.required_state.is_empty() {
+                    list.room_details.required_state = cached_list.room_details.required_state;
+                };
+                list.room_details.timeline_limit = list
+                    .room_details
+                    .timeline_limit
+                    .or(cached_list.room_details.timeline_limit);
+                list.include_old_rooms = list
+                    .include_old_rooms
+                    .clone()
+                    .or(cached_list.include_old_rooms);
+                match (&mut list.filters, cached_list.filters) {
+                    (Some(list_filters), Some(cached_filters)) => {
+                        list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm);
+                        if list_filters.spaces.is_empty() {
+                            list_filters.spaces = cached_filters.spaces;
+                        }
+                        list_filters.is_encrypted =
+                            list_filters.is_encrypted.or(cached_filters.is_encrypted);
+                        list_filters.is_invite =
+                            list_filters.is_invite.or(cached_filters.is_invite);
+                        if list_filters.room_types.is_empty() {
+                            list_filters.room_types = cached_filters.room_types;
+                        }
+                        if list_filters.not_room_types.is_empty() {
+                            list_filters.not_room_types = cached_filters.not_room_types;
+                        }
+                        list_filters.room_name_like = list_filters
+                            .room_name_like
+                            .clone()
+                            .or(cached_filters.room_name_like);
+                        if list_filters.tags.is_empty() {
+                            list_filters.tags = cached_filters.tags;
+                        }
+                        if list_filters.not_tags.is_empty() {
+                            list_filters.not_tags = cached_filters.not_tags;
+                        }
+                    }
+                    (_, Some(cached_filters)) => list.filters = Some(cached_filters),
+                    (_, _) => {}
+                }
+                if list.bump_event_types.is_empty() {
+                    list.bump_event_types = cached_list.bump_event_types;
+                };
+            }
+            cached.lists.insert(list_id.clone(), list.clone());
+        }
+
+        cached.known_rooms.clone()
+    }
+
+    pub fn update_sync_known_rooms(
+        &self,
+        user_id: OwnedUserId,
+        device_id: OwnedDeviceId,
+        conn_id: String,
+        list_id: String,
+        new_cached_rooms: BTreeMap<OwnedRoomId, bool>,
+    ) {
+        let cache = &mut self.connections.lock().unwrap();
+        let cached = Arc::clone(
+            cache
+                .entry((user_id, device_id, conn_id))
+                .or_insert_with(|| {
+                    Arc::new(Mutex::new(SlidingSyncCache {
+                        lists: BTreeMap::new(),
+                        known_rooms: BTreeMap::new(),
+                    }))
+                }),
+        );
+        let cached = &mut cached.lock().unwrap();
+        drop(cache);
+
+        cached.known_rooms.insert(list_id, new_cached_rooms);
+    }
+
     /// Check if account is deactivated
     pub fn is_deactivated(&self, user_id: &UserId) -> Result<bool> {
         self.db.is_deactivated(user_id)