move sync watcher from globals service to sync service
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
26c890d5ac
commit
3ed2c17f98
6 changed files with 170 additions and 148 deletions
|
@ -93,7 +93,7 @@ pub(crate) async fn sync_events_route(
|
|||
}
|
||||
|
||||
// Setup watchers, so if there's no response, we can wait for them
|
||||
let watcher = services.globals.watch(&sender_user, &sender_device);
|
||||
let watcher = services.sync.watch(&sender_user, &sender_device);
|
||||
|
||||
let next_batch = services.globals.current_count()?;
|
||||
let next_batchcount = PduCount::Normal(next_batch);
|
||||
|
|
|
@ -51,7 +51,7 @@ pub(crate) async fn sync_events_v4_route(
|
|||
let sender_device = body.sender_device.expect("user is authenticated");
|
||||
let mut body = body.body;
|
||||
// Setup watchers, so if there's no response, we can wait for them
|
||||
let watcher = services.globals.watch(sender_user, &sender_device);
|
||||
let watcher = services.sync.watch(sender_user, &sender_device);
|
||||
|
||||
let next_batch = services.globals.next_count()?;
|
||||
|
||||
|
|
|
@ -1,35 +1,12 @@
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use conduit::{trace, utils, Result, Server};
|
||||
use conduit::{utils, Result};
|
||||
use database::{Database, Deserialized, Map};
|
||||
use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
|
||||
use ruma::{DeviceId, UserId};
|
||||
|
||||
use crate::{rooms, Dep};
|
||||
|
||||
pub struct Data {
|
||||
global: Arc<Map>,
|
||||
todeviceid_events: Arc<Map>,
|
||||
userroomid_joined: Arc<Map>,
|
||||
userroomid_invitestate: Arc<Map>,
|
||||
userroomid_leftstate: Arc<Map>,
|
||||
userroomid_notificationcount: Arc<Map>,
|
||||
userroomid_highlightcount: Arc<Map>,
|
||||
pduid_pdu: Arc<Map>,
|
||||
keychangeid_userid: Arc<Map>,
|
||||
roomusertype_roomuserdataid: Arc<Map>,
|
||||
readreceiptid_readreceipt: Arc<Map>,
|
||||
userid_lastonetimekeyupdate: Arc<Map>,
|
||||
counter: RwLock<u64>,
|
||||
pub(super) db: Arc<Database>,
|
||||
services: Services,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
server: Arc<Server>,
|
||||
short: Dep<rooms::short::Service>,
|
||||
state_cache: Dep<rooms::state_cache::Service>,
|
||||
typing: Dep<rooms::typing::Service>,
|
||||
}
|
||||
|
||||
const COUNTER: &[u8] = b"c";
|
||||
|
@ -39,25 +16,8 @@ impl Data {
|
|||
let db = &args.db;
|
||||
Self {
|
||||
global: db["global"].clone(),
|
||||
todeviceid_events: db["todeviceid_events"].clone(),
|
||||
userroomid_joined: db["userroomid_joined"].clone(),
|
||||
userroomid_invitestate: db["userroomid_invitestate"].clone(),
|
||||
userroomid_leftstate: db["userroomid_leftstate"].clone(),
|
||||
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
|
||||
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
|
||||
pduid_pdu: db["pduid_pdu"].clone(),
|
||||
keychangeid_userid: db["keychangeid_userid"].clone(),
|
||||
roomusertype_roomuserdataid: db["roomusertype_roomuserdataid"].clone(),
|
||||
readreceiptid_readreceipt: db["readreceiptid_readreceipt"].clone(),
|
||||
userid_lastonetimekeyupdate: db["userid_lastonetimekeyupdate"].clone(),
|
||||
counter: RwLock::new(Self::stored_count(&db["global"]).expect("initialized global counter")),
|
||||
db: args.db.clone(),
|
||||
services: Services {
|
||||
server: args.server.clone(),
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
||||
typing: args.depend::<rooms::typing::Service>("rooms::typing"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,104 +58,6 @@ impl Data {
|
|||
.map_or(Ok(0_u64), utils::u64_from_bytes)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> {
|
||||
let userid_bytes = user_id.as_bytes().to_vec();
|
||||
let mut userid_prefix = userid_bytes.clone();
|
||||
userid_prefix.push(0xFF);
|
||||
|
||||
let mut userdeviceid_prefix = userid_prefix.clone();
|
||||
userdeviceid_prefix.extend_from_slice(device_id.as_bytes());
|
||||
userdeviceid_prefix.push(0xFF);
|
||||
|
||||
let mut futures = FuturesUnordered::new();
|
||||
|
||||
// Return when *any* user changed their key
|
||||
// TODO: only send for user they share a room with
|
||||
futures.push(self.todeviceid_events.watch_prefix(&userdeviceid_prefix));
|
||||
|
||||
futures.push(self.userroomid_joined.watch_prefix(&userid_prefix));
|
||||
futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix));
|
||||
futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix));
|
||||
futures.push(
|
||||
self.userroomid_notificationcount
|
||||
.watch_prefix(&userid_prefix),
|
||||
);
|
||||
futures.push(self.userroomid_highlightcount.watch_prefix(&userid_prefix));
|
||||
|
||||
// Events for rooms we are in
|
||||
let rooms_joined = self.services.state_cache.rooms_joined(user_id);
|
||||
|
||||
pin_mut!(rooms_joined);
|
||||
while let Some(room_id) = rooms_joined.next().await {
|
||||
let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let roomid_bytes = room_id.as_bytes().to_vec();
|
||||
let mut roomid_prefix = roomid_bytes.clone();
|
||||
roomid_prefix.push(0xFF);
|
||||
|
||||
// Key changes
|
||||
futures.push(self.keychangeid_userid.watch_prefix(&roomid_prefix));
|
||||
|
||||
// Room account data
|
||||
let mut roomuser_prefix = roomid_prefix.clone();
|
||||
roomuser_prefix.extend_from_slice(&userid_prefix);
|
||||
|
||||
futures.push(
|
||||
self.roomusertype_roomuserdataid
|
||||
.watch_prefix(&roomuser_prefix),
|
||||
);
|
||||
|
||||
// PDUs
|
||||
let short_roomid = short_roomid.to_be_bytes().to_vec();
|
||||
futures.push(self.pduid_pdu.watch_prefix(&short_roomid));
|
||||
|
||||
// EDUs
|
||||
let typing_room_id = room_id.to_owned();
|
||||
let typing_wait_for_update = async move {
|
||||
self.services.typing.wait_for_update(&typing_room_id).await;
|
||||
};
|
||||
|
||||
futures.push(typing_wait_for_update.boxed());
|
||||
futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix));
|
||||
}
|
||||
|
||||
let mut globaluserdata_prefix = vec![0xFF];
|
||||
globaluserdata_prefix.extend_from_slice(&userid_prefix);
|
||||
|
||||
futures.push(
|
||||
self.roomusertype_roomuserdataid
|
||||
.watch_prefix(&globaluserdata_prefix),
|
||||
);
|
||||
|
||||
// More key changes (used when user is not joined to any rooms)
|
||||
futures.push(self.keychangeid_userid.watch_prefix(&userid_prefix));
|
||||
|
||||
// One time keys
|
||||
futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes));
|
||||
|
||||
// Server shutdown
|
||||
let server_shutdown = async move {
|
||||
while self.services.server.running() {
|
||||
self.services.server.signal.subscribe().recv().await.ok();
|
||||
}
|
||||
};
|
||||
|
||||
futures.push(server_shutdown.boxed());
|
||||
if !self.services.server.running() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Wait until one of them finds something
|
||||
trace!(futures = futures.len(), "watch started");
|
||||
futures.next().await;
|
||||
trace!(futures = futures.len(), "watch finished");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn database_version(&self) -> u64 {
|
||||
self.global
|
||||
.get(b"version")
|
||||
|
|
|
@ -12,7 +12,7 @@ use data::Data;
|
|||
use ipaddress::IPAddress;
|
||||
use regex::RegexSet;
|
||||
use ruma::{
|
||||
api::client::discovery::discover_support::ContactRole, DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedServerName,
|
||||
api::client::discovery::discover_support::ContactRole, OwnedEventId, OwnedRoomAliasId, OwnedServerName,
|
||||
OwnedUserId, RoomAliasId, RoomVersionId, ServerName, UserId,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
@ -163,10 +163,6 @@ impl Service {
|
|||
#[inline]
|
||||
pub fn current_count(&self) -> Result<u64> { Ok(self.db.current_count()) }
|
||||
|
||||
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> {
|
||||
self.db.watch(user_id, device_id).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn server_name(&self) -> &ServerName { self.config.server_name.as_ref() }
|
||||
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
mod watch;
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
sync::{Arc, Mutex, Mutex as StdMutex},
|
||||
};
|
||||
|
||||
use conduit::Result;
|
||||
use conduit::{Result, Server};
|
||||
use database::Map;
|
||||
use ruma::{
|
||||
api::client::sync::sync_events::{
|
||||
self,
|
||||
|
@ -12,10 +15,35 @@ use ruma::{
|
|||
OwnedDeviceId, OwnedRoomId, OwnedUserId,
|
||||
};
|
||||
|
||||
use crate::{rooms, Dep};
|
||||
|
||||
pub struct Service {
|
||||
db: Data,
|
||||
services: Services,
|
||||
connections: DbConnections,
|
||||
}
|
||||
|
||||
pub struct Data {
|
||||
todeviceid_events: Arc<Map>,
|
||||
userroomid_joined: Arc<Map>,
|
||||
userroomid_invitestate: Arc<Map>,
|
||||
userroomid_leftstate: Arc<Map>,
|
||||
userroomid_notificationcount: Arc<Map>,
|
||||
userroomid_highlightcount: Arc<Map>,
|
||||
pduid_pdu: Arc<Map>,
|
||||
keychangeid_userid: Arc<Map>,
|
||||
roomusertype_roomuserdataid: Arc<Map>,
|
||||
readreceiptid_readreceipt: Arc<Map>,
|
||||
userid_lastonetimekeyupdate: Arc<Map>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
server: Arc<Server>,
|
||||
short: Dep<rooms::short::Service>,
|
||||
state_cache: Dep<rooms::state_cache::Service>,
|
||||
typing: Dep<rooms::typing::Service>,
|
||||
}
|
||||
|
||||
struct SlidingSyncCache {
|
||||
lists: BTreeMap<String, SyncRequestList>,
|
||||
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
|
||||
|
@ -28,8 +56,27 @@ type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String);
|
|||
type DbConnectionsVal = Arc<Mutex<SlidingSyncCache>>;
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(_args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
db: Data {
|
||||
todeviceid_events: args.db["todeviceid_events"].clone(),
|
||||
userroomid_joined: args.db["userroomid_joined"].clone(),
|
||||
userroomid_invitestate: args.db["userroomid_invitestate"].clone(),
|
||||
userroomid_leftstate: args.db["userroomid_leftstate"].clone(),
|
||||
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
||||
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
||||
pduid_pdu: args.db["pduid_pdu"].clone(),
|
||||
keychangeid_userid: args.db["keychangeid_userid"].clone(),
|
||||
roomusertype_roomuserdataid: args.db["roomusertype_roomuserdataid"].clone(),
|
||||
readreceiptid_readreceipt: args.db["readreceiptid_readreceipt"].clone(),
|
||||
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
|
||||
},
|
||||
services: Services {
|
||||
server: args.server.clone(),
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
||||
typing: args.depend::<rooms::typing::Service>("rooms::typing"),
|
||||
},
|
||||
connections: StdMutex::new(BTreeMap::new()),
|
||||
}))
|
||||
}
|
||||
|
|
117
src/service/sync/watch.rs
Normal file
117
src/service/sync/watch.rs
Normal file
|
@ -0,0 +1,117 @@
|
|||
use conduit::{implement, trace, Result};
|
||||
use futures::{pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
|
||||
use ruma::{DeviceId, UserId};
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result {
|
||||
let userid_bytes = user_id.as_bytes().to_vec();
|
||||
let mut userid_prefix = userid_bytes.clone();
|
||||
userid_prefix.push(0xFF);
|
||||
|
||||
let mut userdeviceid_prefix = userid_prefix.clone();
|
||||
userdeviceid_prefix.extend_from_slice(device_id.as_bytes());
|
||||
userdeviceid_prefix.push(0xFF);
|
||||
|
||||
let mut futures = FuturesUnordered::new();
|
||||
|
||||
// Return when *any* user changed their key
|
||||
// TODO: only send for user they share a room with
|
||||
futures.push(self.db.todeviceid_events.watch_prefix(&userdeviceid_prefix));
|
||||
|
||||
futures.push(self.db.userroomid_joined.watch_prefix(&userid_prefix));
|
||||
futures.push(self.db.userroomid_invitestate.watch_prefix(&userid_prefix));
|
||||
futures.push(self.db.userroomid_leftstate.watch_prefix(&userid_prefix));
|
||||
futures.push(
|
||||
self.db
|
||||
.userroomid_notificationcount
|
||||
.watch_prefix(&userid_prefix),
|
||||
);
|
||||
futures.push(
|
||||
self.db
|
||||
.userroomid_highlightcount
|
||||
.watch_prefix(&userid_prefix),
|
||||
);
|
||||
|
||||
// Events for rooms we are in
|
||||
let rooms_joined = self.services.state_cache.rooms_joined(user_id);
|
||||
|
||||
pin_mut!(rooms_joined);
|
||||
while let Some(room_id) = rooms_joined.next().await {
|
||||
let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let roomid_bytes = room_id.as_bytes().to_vec();
|
||||
let mut roomid_prefix = roomid_bytes.clone();
|
||||
roomid_prefix.push(0xFF);
|
||||
|
||||
// Key changes
|
||||
futures.push(self.db.keychangeid_userid.watch_prefix(&roomid_prefix));
|
||||
|
||||
// Room account data
|
||||
let mut roomuser_prefix = roomid_prefix.clone();
|
||||
roomuser_prefix.extend_from_slice(&userid_prefix);
|
||||
|
||||
futures.push(
|
||||
self.db
|
||||
.roomusertype_roomuserdataid
|
||||
.watch_prefix(&roomuser_prefix),
|
||||
);
|
||||
|
||||
// PDUs
|
||||
let short_roomid = short_roomid.to_be_bytes().to_vec();
|
||||
futures.push(self.db.pduid_pdu.watch_prefix(&short_roomid));
|
||||
|
||||
// EDUs
|
||||
let typing_room_id = room_id.to_owned();
|
||||
let typing_wait_for_update = async move {
|
||||
self.services.typing.wait_for_update(&typing_room_id).await;
|
||||
};
|
||||
|
||||
futures.push(typing_wait_for_update.boxed());
|
||||
futures.push(
|
||||
self.db
|
||||
.readreceiptid_readreceipt
|
||||
.watch_prefix(&roomid_prefix),
|
||||
);
|
||||
}
|
||||
|
||||
let mut globaluserdata_prefix = vec![0xFF];
|
||||
globaluserdata_prefix.extend_from_slice(&userid_prefix);
|
||||
|
||||
futures.push(
|
||||
self.db
|
||||
.roomusertype_roomuserdataid
|
||||
.watch_prefix(&globaluserdata_prefix),
|
||||
);
|
||||
|
||||
// More key changes (used when user is not joined to any rooms)
|
||||
futures.push(self.db.keychangeid_userid.watch_prefix(&userid_prefix));
|
||||
|
||||
// One time keys
|
||||
futures.push(
|
||||
self.db
|
||||
.userid_lastonetimekeyupdate
|
||||
.watch_prefix(&userid_bytes),
|
||||
);
|
||||
|
||||
// Server shutdown
|
||||
let server_shutdown = async move {
|
||||
while self.services.server.running() {
|
||||
self.services.server.signal.subscribe().recv().await.ok();
|
||||
}
|
||||
};
|
||||
|
||||
futures.push(server_shutdown.boxed());
|
||||
if !self.services.server.running() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Wait until one of them finds something
|
||||
trace!(futures = futures.len(), "watch started");
|
||||
futures.next().await;
|
||||
trace!(futures = futures.len(), "watch finished");
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue