diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 2ac0bfea..00976c78 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -93,7 +93,7 @@ pub(crate) async fn sync_events_route( } // Setup watchers, so if there's no response, we can wait for them - let watcher = services.globals.watch(&sender_user, &sender_device); + let watcher = services.sync.watch(&sender_user, &sender_device); let next_batch = services.globals.current_count()?; let next_batchcount = PduCount::Normal(next_batch); diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index 11e3830c..91abd24e 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -51,7 +51,7 @@ pub(crate) async fn sync_events_v4_route( let sender_device = body.sender_device.expect("user is authenticated"); let mut body = body.body; // Setup watchers, so if there's no response, we can wait for them - let watcher = services.globals.watch(sender_user, &sender_device); + let watcher = services.sync.watch(sender_user, &sender_device); let next_batch = services.globals.next_count()?; diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index eea7597a..bcfe101e 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,35 +1,12 @@ use std::sync::{Arc, RwLock}; -use conduit::{trace, utils, Result, Server}; +use conduit::{utils, Result}; use database::{Database, Deserialized, Map}; -use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; -use ruma::{DeviceId, UserId}; - -use crate::{rooms, Dep}; pub struct Data { global: Arc, - todeviceid_events: Arc, - userroomid_joined: Arc, - userroomid_invitestate: Arc, - userroomid_leftstate: Arc, - userroomid_notificationcount: Arc, - userroomid_highlightcount: Arc, - pduid_pdu: Arc, - keychangeid_userid: Arc, - roomusertype_roomuserdataid: Arc, - readreceiptid_readreceipt: Arc, - userid_lastonetimekeyupdate: Arc, counter: RwLock, pub(super) db: Arc, - services: Services, -} - -struct Services { - server: Arc, - short: Dep, - state_cache: Dep, - typing: Dep, } const COUNTER: &[u8] = b"c"; @@ -39,25 +16,8 @@ impl Data { let db = &args.db; Self { global: db["global"].clone(), - todeviceid_events: db["todeviceid_events"].clone(), - userroomid_joined: db["userroomid_joined"].clone(), - userroomid_invitestate: db["userroomid_invitestate"].clone(), - userroomid_leftstate: db["userroomid_leftstate"].clone(), - userroomid_notificationcount: db["userroomid_notificationcount"].clone(), - userroomid_highlightcount: db["userroomid_highlightcount"].clone(), - pduid_pdu: db["pduid_pdu"].clone(), - keychangeid_userid: db["keychangeid_userid"].clone(), - roomusertype_roomuserdataid: db["roomusertype_roomuserdataid"].clone(), - readreceiptid_readreceipt: db["readreceiptid_readreceipt"].clone(), - userid_lastonetimekeyupdate: db["userid_lastonetimekeyupdate"].clone(), counter: RwLock::new(Self::stored_count(&db["global"]).expect("initialized global counter")), db: args.db.clone(), - services: Services { - server: args.server.clone(), - short: args.depend::("rooms::short"), - state_cache: args.depend::("rooms::state_cache"), - typing: args.depend::("rooms::typing"), - }, } } @@ -98,104 +58,6 @@ impl Data { .map_or(Ok(0_u64), utils::u64_from_bytes) } - #[tracing::instrument(skip(self), level = "debug")] - pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> { - let userid_bytes = user_id.as_bytes().to_vec(); - let mut userid_prefix = userid_bytes.clone(); - userid_prefix.push(0xFF); - - let mut userdeviceid_prefix = userid_prefix.clone(); - userdeviceid_prefix.extend_from_slice(device_id.as_bytes()); - userdeviceid_prefix.push(0xFF); - - let mut futures = FuturesUnordered::new(); - - // Return when *any* user changed their key - // TODO: only send for user they share a room with - futures.push(self.todeviceid_events.watch_prefix(&userdeviceid_prefix)); - - futures.push(self.userroomid_joined.watch_prefix(&userid_prefix)); - futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix)); - futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix)); - futures.push( - self.userroomid_notificationcount - .watch_prefix(&userid_prefix), - ); - futures.push(self.userroomid_highlightcount.watch_prefix(&userid_prefix)); - - // Events for rooms we are in - let rooms_joined = self.services.state_cache.rooms_joined(user_id); - - pin_mut!(rooms_joined); - while let Some(room_id) = rooms_joined.next().await { - let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else { - continue; - }; - - let roomid_bytes = room_id.as_bytes().to_vec(); - let mut roomid_prefix = roomid_bytes.clone(); - roomid_prefix.push(0xFF); - - // Key changes - futures.push(self.keychangeid_userid.watch_prefix(&roomid_prefix)); - - // Room account data - let mut roomuser_prefix = roomid_prefix.clone(); - roomuser_prefix.extend_from_slice(&userid_prefix); - - futures.push( - self.roomusertype_roomuserdataid - .watch_prefix(&roomuser_prefix), - ); - - // PDUs - let short_roomid = short_roomid.to_be_bytes().to_vec(); - futures.push(self.pduid_pdu.watch_prefix(&short_roomid)); - - // EDUs - let typing_room_id = room_id.to_owned(); - let typing_wait_for_update = async move { - self.services.typing.wait_for_update(&typing_room_id).await; - }; - - futures.push(typing_wait_for_update.boxed()); - futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix)); - } - - let mut globaluserdata_prefix = vec![0xFF]; - globaluserdata_prefix.extend_from_slice(&userid_prefix); - - futures.push( - self.roomusertype_roomuserdataid - .watch_prefix(&globaluserdata_prefix), - ); - - // More key changes (used when user is not joined to any rooms) - futures.push(self.keychangeid_userid.watch_prefix(&userid_prefix)); - - // One time keys - futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes)); - - // Server shutdown - let server_shutdown = async move { - while self.services.server.running() { - self.services.server.signal.subscribe().recv().await.ok(); - } - }; - - futures.push(server_shutdown.boxed()); - if !self.services.server.running() { - return Ok(()); - } - - // Wait until one of them finds something - trace!(futures = futures.len(), "watch started"); - futures.next().await; - trace!(futures = futures.len(), "watch finished"); - - Ok(()) - } - pub async fn database_version(&self) -> u64 { self.global .get(b"version") diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index bd956964..55dd10aa 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -12,7 +12,7 @@ use data::Data; use ipaddress::IPAddress; use regex::RegexSet; use ruma::{ - api::client::discovery::discover_support::ContactRole, DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedServerName, + api::client::discovery::discover_support::ContactRole, OwnedEventId, OwnedRoomAliasId, OwnedServerName, OwnedUserId, RoomAliasId, RoomVersionId, ServerName, UserId, }; use tokio::sync::Mutex; @@ -163,10 +163,6 @@ impl Service { #[inline] pub fn current_count(&self) -> Result { Ok(self.db.current_count()) } - pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> { - self.db.watch(user_id, device_id).await - } - #[inline] pub fn server_name(&self) -> &ServerName { self.config.server_name.as_ref() } diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index 1bf4610f..f1a6ae75 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -1,9 +1,12 @@ +mod watch; + use std::{ collections::{BTreeMap, BTreeSet}, sync::{Arc, Mutex, Mutex as StdMutex}, }; -use conduit::Result; +use conduit::{Result, Server}; +use database::Map; use ruma::{ api::client::sync::sync_events::{ self, @@ -12,10 +15,35 @@ use ruma::{ OwnedDeviceId, OwnedRoomId, OwnedUserId, }; +use crate::{rooms, Dep}; + pub struct Service { + db: Data, + services: Services, connections: DbConnections, } +pub struct Data { + todeviceid_events: Arc, + userroomid_joined: Arc, + userroomid_invitestate: Arc, + userroomid_leftstate: Arc, + userroomid_notificationcount: Arc, + userroomid_highlightcount: Arc, + pduid_pdu: Arc, + keychangeid_userid: Arc, + roomusertype_roomuserdataid: Arc, + readreceiptid_readreceipt: Arc, + userid_lastonetimekeyupdate: Arc, +} + +struct Services { + server: Arc, + short: Dep, + state_cache: Dep, + typing: Dep, +} + struct SlidingSyncCache { lists: BTreeMap, subscriptions: BTreeMap, @@ -28,8 +56,27 @@ type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String); type DbConnectionsVal = Arc>; impl crate::Service for Service { - fn build(_args: crate::Args<'_>) -> Result> { + fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { + db: Data { + todeviceid_events: args.db["todeviceid_events"].clone(), + userroomid_joined: args.db["userroomid_joined"].clone(), + userroomid_invitestate: args.db["userroomid_invitestate"].clone(), + userroomid_leftstate: args.db["userroomid_leftstate"].clone(), + userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), + userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), + pduid_pdu: args.db["pduid_pdu"].clone(), + keychangeid_userid: args.db["keychangeid_userid"].clone(), + roomusertype_roomuserdataid: args.db["roomusertype_roomuserdataid"].clone(), + readreceiptid_readreceipt: args.db["readreceiptid_readreceipt"].clone(), + userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(), + }, + services: Services { + server: args.server.clone(), + short: args.depend::("rooms::short"), + state_cache: args.depend::("rooms::state_cache"), + typing: args.depend::("rooms::typing"), + }, connections: StdMutex::new(BTreeMap::new()), })) } diff --git a/src/service/sync/watch.rs b/src/service/sync/watch.rs new file mode 100644 index 00000000..3eb663c1 --- /dev/null +++ b/src/service/sync/watch.rs @@ -0,0 +1,117 @@ +use conduit::{implement, trace, Result}; +use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; +use ruma::{DeviceId, UserId}; + +#[implement(super::Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { + let userid_bytes = user_id.as_bytes().to_vec(); + let mut userid_prefix = userid_bytes.clone(); + userid_prefix.push(0xFF); + + let mut userdeviceid_prefix = userid_prefix.clone(); + userdeviceid_prefix.extend_from_slice(device_id.as_bytes()); + userdeviceid_prefix.push(0xFF); + + let mut futures = FuturesUnordered::new(); + + // Return when *any* user changed their key + // TODO: only send for user they share a room with + futures.push(self.db.todeviceid_events.watch_prefix(&userdeviceid_prefix)); + + futures.push(self.db.userroomid_joined.watch_prefix(&userid_prefix)); + futures.push(self.db.userroomid_invitestate.watch_prefix(&userid_prefix)); + futures.push(self.db.userroomid_leftstate.watch_prefix(&userid_prefix)); + futures.push( + self.db + .userroomid_notificationcount + .watch_prefix(&userid_prefix), + ); + futures.push( + self.db + .userroomid_highlightcount + .watch_prefix(&userid_prefix), + ); + + // Events for rooms we are in + let rooms_joined = self.services.state_cache.rooms_joined(user_id); + + pin_mut!(rooms_joined); + while let Some(room_id) = rooms_joined.next().await { + let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else { + continue; + }; + + let roomid_bytes = room_id.as_bytes().to_vec(); + let mut roomid_prefix = roomid_bytes.clone(); + roomid_prefix.push(0xFF); + + // Key changes + futures.push(self.db.keychangeid_userid.watch_prefix(&roomid_prefix)); + + // Room account data + let mut roomuser_prefix = roomid_prefix.clone(); + roomuser_prefix.extend_from_slice(&userid_prefix); + + futures.push( + self.db + .roomusertype_roomuserdataid + .watch_prefix(&roomuser_prefix), + ); + + // PDUs + let short_roomid = short_roomid.to_be_bytes().to_vec(); + futures.push(self.db.pduid_pdu.watch_prefix(&short_roomid)); + + // EDUs + let typing_room_id = room_id.to_owned(); + let typing_wait_for_update = async move { + self.services.typing.wait_for_update(&typing_room_id).await; + }; + + futures.push(typing_wait_for_update.boxed()); + futures.push( + self.db + .readreceiptid_readreceipt + .watch_prefix(&roomid_prefix), + ); + } + + let mut globaluserdata_prefix = vec![0xFF]; + globaluserdata_prefix.extend_from_slice(&userid_prefix); + + futures.push( + self.db + .roomusertype_roomuserdataid + .watch_prefix(&globaluserdata_prefix), + ); + + // More key changes (used when user is not joined to any rooms) + futures.push(self.db.keychangeid_userid.watch_prefix(&userid_prefix)); + + // One time keys + futures.push( + self.db + .userid_lastonetimekeyupdate + .watch_prefix(&userid_bytes), + ); + + // Server shutdown + let server_shutdown = async move { + while self.services.server.running() { + self.services.server.signal.subscribe().recv().await.ok(); + } + }; + + futures.push(server_shutdown.boxed()); + if !self.services.server.running() { + return Ok(()); + } + + // Wait until one of them finds something + trace!(futures = futures.len(), "watch started"); + futures.next().await; + trace!(futures = futures.len(), "watch finished"); + + Ok(()) +}