From ccf10c6b4705012f11dc025eab1729dbec925fd9 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 6 Apr 2025 20:30:15 +0000 Subject: [PATCH] modest cleanup of snake sync service related Signed-off-by: Jason Volk --- src/api/client/sync/v4.rs | 83 +++++++-------- src/api/client/sync/v5.rs | 55 ++++------ src/service/sync/mod.rs | 205 +++++++++++++++++--------------------- 3 files changed, 143 insertions(+), 200 deletions(-) diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index f7edb8c0..55faf420 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -6,22 +6,23 @@ use std::{ use axum::extract::State; use conduwuit::{ - Error, PduCount, PduEvent, Result, debug, error, extract_variant, + Err, Error, PduCount, PduEvent, Result, debug, error, extract_variant, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::{ruma_from_usize, usize_from_ruma, usize_from_u64_truncated}, }, warn, }; +use conduwuit_service::{ + rooms::read_receipt::pack_receipts, + sync::{into_db_key, into_snake_key}, +}; use futures::{FutureExt, StreamExt, TryFutureExt}; use ruma::{ MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, - api::client::{ - error::ErrorKind, - sync::sync_events::{ - self, DeviceLists, UnreadNotificationsCount, - v4::{SlidingOp, SlidingSyncRoomHero}, - }, + api::client::sync::sync_events::{ + self, DeviceLists, UnreadNotificationsCount, + v4::{SlidingOp, SlidingSyncRoomHero}, }, events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, @@ -31,7 +32,6 @@ use ruma::{ serde::Raw, uint, }; -use service::rooms::read_receipt::pack_receipts; use super::{load_timeline, share_encrypted_room}; use crate::{ @@ -50,10 +50,11 @@ pub(crate) async fn sync_events_v4_route( ) -> Result { debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let sender_device = body.sender_device.expect("user is authenticated"); + let sender_device = body.sender_device.as_ref().expect("user is authenticated"); let mut body = body.body; + // Setup watchers, so if there's no response, we can wait for them - let watcher = services.sync.watch(sender_user, &sender_device); + let watcher = services.sync.watch(sender_user, sender_device); let next_batch = services.globals.next_count()?; @@ -68,33 +69,21 @@ pub(crate) async fn sync_events_v4_route( .and_then(|string| string.parse().ok()) .unwrap_or(0); - if globalsince != 0 - && !services - .sync - .remembered(sender_user.clone(), sender_device.clone(), conn_id.clone()) - { + let db_key = into_db_key(sender_user, sender_device, conn_id.clone()); + if globalsince != 0 && !services.sync.remembered(&db_key) { debug!("Restarting sync stream because it was gone from the database"); - return Err(Error::Request( - ErrorKind::UnknownPos, - "Connection data lost since last time".into(), - http::StatusCode::BAD_REQUEST, - )); + return Err!(Request(UnknownPos("Connection data lost since last time"))); } if globalsince == 0 { - services.sync.forget_sync_request_connection( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - ); + services.sync.forget_sync_request_connection(&db_key); } // Get sticky parameters from cache - let known_rooms = services.sync.update_sync_request_with_cache( - sender_user.clone(), - sender_device.clone(), - &mut body, - ); + let snake_key = into_snake_key(sender_user, sender_device, conn_id.clone()); + let known_rooms = services + .sync + .update_sync_request_with_cache(&snake_key, &mut body); let all_joined_rooms: Vec<_> = services .rooms @@ -136,7 +125,7 @@ pub(crate) async fn sync_events_v4_route( if body.extensions.to_device.enabled.unwrap_or(false) { services .users - .remove_to_device_events(sender_user, &sender_device, globalsince) + .remove_to_device_events(sender_user, sender_device, globalsince) .await; } @@ -261,7 +250,7 @@ pub(crate) async fn sync_events_v4_route( if let Some(Ok(user_id)) = pdu.state_key.as_deref().map(UserId::parse) { - if user_id == *sender_user { + if user_id == sender_user { continue; } @@ -299,7 +288,7 @@ pub(crate) async fn sync_events_v4_route( .state_cache .room_members(room_id) // Don't send key updates from the sender to the sender - .ready_filter(|user_id| sender_user != user_id) + .ready_filter(|&user_id| sender_user != user_id) // Only send keys if the sender doesn't share an encrypted room with the target // already .filter_map(|user_id| { @@ -425,10 +414,9 @@ pub(crate) async fn sync_events_v4_route( }); if let Some(conn_id) = &body.conn_id { + let db_key = into_db_key(sender_user, sender_device, conn_id); services.sync.update_sync_known_rooms( - sender_user, - &sender_device, - conn_id.clone(), + &db_key, list_id.clone(), new_known_rooms, globalsince, @@ -478,23 +466,20 @@ pub(crate) async fn sync_events_v4_route( } if let Some(conn_id) = &body.conn_id { + let db_key = into_db_key(sender_user, sender_device, conn_id); services.sync.update_sync_known_rooms( - sender_user, - &sender_device, - conn_id.clone(), + &db_key, "subscriptions".to_owned(), known_subscription_rooms, globalsince, ); } - if let Some(conn_id) = &body.conn_id { - services.sync.update_sync_subscriptions( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - body.room_subscriptions, - ); + if let Some(conn_id) = body.conn_id.clone() { + let db_key = into_db_key(sender_user, sender_device, conn_id); + services + .sync + .update_sync_subscriptions(&db_key, body.room_subscriptions); } let mut rooms = BTreeMap::new(); @@ -648,7 +633,7 @@ pub(crate) async fn sync_events_v4_route( .rooms .state_cache .room_members(room_id) - .ready_filter(|member| member != sender_user) + .ready_filter(|&member| member != sender_user) .filter_map(|user_id| { services .rooms @@ -787,7 +772,7 @@ pub(crate) async fn sync_events_v4_route( .users .get_to_device_events( sender_user, - &sender_device, + sender_device, Some(globalsince), Some(next_batch), ) @@ -805,7 +790,7 @@ pub(crate) async fn sync_events_v4_route( }, device_one_time_keys_count: services .users - .count_one_time_keys(sender_user, &sender_device) + .count_one_time_keys(sender_user, sender_device) .await, // Fallback keys are not yet supported device_unused_fallback_key_types: None, diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 684752ec..00a2d18d 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -6,7 +6,7 @@ use std::{ use axum::extract::State; use conduwuit::{ - Error, Result, debug, error, extract_variant, + Err, Error, Result, error, extract_variant, matrix::{ TypeStateKey, pdu::{PduCount, PduEvent}, @@ -18,14 +18,11 @@ use conduwuit::{ }, warn, }; -use conduwuit_service::rooms::read_receipt::pack_receipts; +use conduwuit_service::{rooms::read_receipt::pack_receipts, sync::into_snake_key}; use futures::{FutureExt, StreamExt, TryFutureExt}; use ruma::{ DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, - api::client::{ - error::ErrorKind, - sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, - }, + api::client::sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType, room::member::{MembershipState, RoomMemberEventContent}, @@ -74,35 +71,23 @@ pub(crate) async fn sync_events_v5_route( .and_then(|string| string.parse().ok()) .unwrap_or(0); - if globalsince != 0 - && !services.sync.snake_connection_cached( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - ) { - debug!("Restarting sync stream because it was gone from the database"); - return Err(Error::Request( - ErrorKind::UnknownPos, - "Connection data lost since last time".into(), - http::StatusCode::BAD_REQUEST, - )); + let snake_key = into_snake_key(sender_user, sender_device, conn_id); + + if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) { + return Err!(Request(UnknownPos( + "Connection data unknown to server; restarting sync stream." + ))); } // Client / User requested an initial sync if globalsince == 0 { - services.sync.forget_snake_sync_connection( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - ); + services.sync.forget_snake_sync_connection(&snake_key); } // Get sticky parameters from cache - let known_rooms = services.sync.update_snake_sync_request_with_cache( - sender_user.clone(), - sender_device.clone(), - &mut body, - ); + let known_rooms = services + .sync + .update_snake_sync_request_with_cache(&snake_key, &mut body); let all_joined_rooms: Vec<_> = services .rooms @@ -254,11 +239,10 @@ async fn fetch_subscriptions( // body.room_subscriptions.remove(&r); //} - if let Some(conn_id) = &body.conn_id { + if let Some(conn_id) = body.conn_id.clone() { + let snake_key = into_snake_key(sender_user, sender_device, conn_id); services.sync.update_snake_sync_known_rooms( - sender_user, - sender_device, - conn_id.clone(), + &snake_key, "subscriptions".to_owned(), known_subscription_rooms, globalsince, @@ -340,11 +324,10 @@ async fn handle_lists<'a>( count: ruma_from_usize(active_rooms.len()), }); - if let Some(conn_id) = &body.conn_id { + if let Some(conn_id) = body.conn_id.clone() { + let snake_key = into_snake_key(sender_user, sender_device, conn_id); services.sync.update_snake_sync_known_rooms( - sender_user, - sender_device, - conn_id.clone(), + &snake_key, list_id.clone(), new_known_rooms, globalsince, diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index bf2bc142..b095d2c1 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -8,7 +8,7 @@ use std::{ use conduwuit::{Result, Server}; use database::Map; use ruma::{ - DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, UserId, + OwnedDeviceId, OwnedRoomId, OwnedUserId, api::client::sync::sync_events::{ self, v4::{ExtensionsConfig, SyncRequestList}, @@ -49,8 +49,8 @@ struct Services { struct SlidingSyncCache { lists: BTreeMap, subscriptions: BTreeMap, - known_rooms: BTreeMap>, /* For every room, the - * roomsince number */ + // For every room, the roomsince number + known_rooms: BTreeMap>, extensions: ExtensionsConfig, } @@ -98,79 +98,35 @@ impl crate::Service for Service { fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -/// load params from cache if body doesn't contain it, as long as it's allowed -/// in some cases we may need to allow an empty list as an actual value -fn list_or_sticky(target: &mut Vec, cached: &Vec) { - if target.is_empty() { - target.clone_from(cached); - } -} -fn some_or_sticky(target: &mut Option, cached: Option) { - if target.is_none() { - *target = cached; - } -} - impl Service { - pub fn snake_connection_cached( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: Option, - ) -> bool { - self.snake_connections - .lock() - .unwrap() - .contains_key(&(user_id, device_id, conn_id)) - } - - pub fn forget_snake_sync_connection( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: Option, - ) { + pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool { self.snake_connections .lock() .expect("locked") - .remove(&(user_id, device_id, conn_id)); + .contains_key(key) } - pub fn remembered( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, - ) -> bool { - self.connections - .lock() - .unwrap() - .contains_key(&(user_id, device_id, conn_id)) + pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) { + self.snake_connections.lock().expect("locked").remove(key); } - pub fn forget_sync_request_connection( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, - ) { - self.connections - .lock() - .expect("locked") - .remove(&(user_id, device_id, conn_id)); + pub fn remembered(&self, key: &DbConnectionsKey) -> bool { + self.connections.lock().expect("locked").contains_key(key) + } + + pub fn forget_sync_request_connection(&self, key: &DbConnectionsKey) { + self.connections.lock().expect("locked").remove(key); } pub fn update_snake_sync_request_with_cache( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, + snake_key: &SnakeConnectionsKey, request: &mut v5::Request, ) -> BTreeMap> { - let conn_id = request.conn_id.clone(); let mut cache = self.snake_connections.lock().expect("locked"); let cached = Arc::clone( cache - .entry((user_id, device_id, conn_id)) + .entry(snake_key.clone()) .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), ); let cached = &mut cached.lock().expect("locked"); @@ -268,25 +224,23 @@ impl Service { pub fn update_sync_request_with_cache( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, + key: &SnakeConnectionsKey, request: &mut sync_events::v4::Request, ) -> BTreeMap> { let Some(conn_id) = request.conn_id.clone() else { return BTreeMap::new(); }; + let key = into_db_key(key.0.clone(), key.1.clone(), conn_id); let mut cache = self.connections.lock().expect("locked"); - let cached = Arc::clone(cache.entry((user_id, device_id, conn_id)).or_insert_with( - || { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }, - )); + let cached = Arc::clone(cache.entry(key).or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + })); let cached = &mut cached.lock().expect("locked"); drop(cache); @@ -371,22 +325,18 @@ impl Service { pub fn update_sync_subscriptions( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, + key: &DbConnectionsKey, subscriptions: BTreeMap, ) { let mut cache = self.connections.lock().expect("locked"); - let cached = Arc::clone(cache.entry((user_id, device_id, conn_id)).or_insert_with( - || { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }, - )); + let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + })); let cached = &mut cached.lock().expect("locked"); drop(cache); @@ -395,90 +345,81 @@ impl Service { pub fn update_sync_known_rooms( &self, - user_id: &UserId, - device_id: &DeviceId, - conn_id: String, + key: &DbConnectionsKey, list_id: String, new_cached_rooms: BTreeSet, globalsince: u64, ) { let mut cache = self.connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry((user_id.to_owned(), device_id.to_owned(), conn_id)) - .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); + let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + })); let cached = &mut cached.lock().expect("locked"); drop(cache); - for (roomid, lastsince) in cached + for (room_id, lastsince) in cached .known_rooms .entry(list_id.clone()) .or_default() .iter_mut() { - if !new_cached_rooms.contains(roomid) { + if !new_cached_rooms.contains(room_id) { *lastsince = 0; } } let list = cached.known_rooms.entry(list_id).or_default(); - for roomid in new_cached_rooms { - list.insert(roomid, globalsince); + for room_id in new_cached_rooms { + list.insert(room_id, globalsince); } } pub fn update_snake_sync_known_rooms( &self, - user_id: &UserId, - device_id: &DeviceId, - conn_id: String, + key: &SnakeConnectionsKey, list_id: String, new_cached_rooms: BTreeSet, globalsince: u64, ) { + assert!(key.2.is_some(), "Some(conn_id) required for this call"); let mut cache = self.snake_connections.lock().expect("locked"); let cached = Arc::clone( cache - .entry((user_id.to_owned(), device_id.to_owned(), Some(conn_id))) + .entry(key.clone()) .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), ); let cached = &mut cached.lock().expect("locked"); drop(cache); - for (roomid, lastsince) in cached + for (room_id, lastsince) in cached .known_rooms .entry(list_id.clone()) .or_default() .iter_mut() { - if !new_cached_rooms.contains(roomid) { + if !new_cached_rooms.contains(room_id) { *lastsince = 0; } } let list = cached.known_rooms.entry(list_id).or_default(); - for roomid in new_cached_rooms { - list.insert(roomid, globalsince); + for room_id in new_cached_rooms { + list.insert(room_id, globalsince); } } pub fn update_snake_sync_subscriptions( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: Option, + key: &SnakeConnectionsKey, subscriptions: BTreeMap, ) { let mut cache = self.snake_connections.lock().expect("locked"); let cached = Arc::clone( cache - .entry((user_id, device_id, conn_id)) + .entry(key.clone()) .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), ); let cached = &mut cached.lock().expect("locked"); @@ -487,3 +428,37 @@ impl Service { cached.subscriptions = subscriptions; } } + +#[inline] +pub fn into_snake_key(user_id: U, device_id: D, conn_id: C) -> SnakeConnectionsKey +where + U: Into, + D: Into, + C: Into>, +{ + (user_id.into(), device_id.into(), conn_id.into()) +} + +#[inline] +pub fn into_db_key(user_id: U, device_id: D, conn_id: C) -> DbConnectionsKey +where + U: Into, + D: Into, + C: Into, +{ + (user_id.into(), device_id.into(), conn_id.into()) +} + +/// load params from cache if body doesn't contain it, as long as it's allowed +/// in some cases we may need to allow an empty list as an actual value +fn list_or_sticky(target: &mut Vec, cached: &Vec) { + if target.is_empty() { + target.clone_from(cached); + } +} + +fn some_or_sticky(target: &mut Option, cached: Option) { + if target.is_none() { + *target = cached; + } +}