optimise resetting all user presences

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2025-01-10 23:51:08 -05:00
parent fde1b94e26
commit 8c18481d1d
No known key found for this signature in database
4 changed files with 40 additions and 28 deletions

View file

@ -32,7 +32,7 @@ pub(super) async fn process(
match subcommand { match subcommand {
| PresenceCommand::GetPresence { user_id } => { | PresenceCommand::GetPresence { user_id } => {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
let results = services.presence.db.get_presence(&user_id).await; let results = services.presence.get_presence(&user_id).await;
let query_time = timer.elapsed(); let query_time = timer.elapsed();
Ok(RoomMessageEventContent::notice_markdown(format!( Ok(RoomMessageEventContent::notice_markdown(format!(

View file

@ -12,7 +12,7 @@ use ruma::{events::presence::PresenceEvent, presence::PresenceState, UInt, UserI
use super::Presence; use super::Presence;
use crate::{globals, users, Dep}; use crate::{globals, users, Dep};
pub struct Data { pub(crate) struct Data {
presenceid_presence: Arc<Map>, presenceid_presence: Arc<Map>,
userid_presenceid: Arc<Map>, userid_presenceid: Arc<Map>,
services: Services, services: Services,
@ -36,7 +36,7 @@ impl Data {
} }
} }
pub async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> { pub(super) async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> {
let count = self let count = self
.userid_presenceid .userid_presenceid
.get(user_id) .get(user_id)

View file

@ -4,7 +4,10 @@ mod presence;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{checked, debug, error, result::LogErr, Error, Result, Server}; use conduwuit::{
checked, debug, debug_warn, error, result::LogErr, trace, Error, Result, Server,
};
use database::Database;
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
@ -18,12 +21,13 @@ pub struct Service {
timeout_remote_users: bool, timeout_remote_users: bool,
idle_timeout: u64, idle_timeout: u64,
offline_timeout: u64, offline_timeout: u64,
pub db: Data, db: Data,
services: Services, services: Services,
} }
struct Services { struct Services {
server: Arc<Server>, server: Arc<Server>,
db: Arc<Database>,
globals: Dep<globals::Service>, globals: Dep<globals::Service>,
users: Dep<users::Service>, users: Dep<users::Service>,
} }
@ -44,6 +48,7 @@ impl crate::Service for Service {
db: Data::new(&args), db: Data::new(&args),
services: Services { services: Services {
server: args.server.clone(), server: args.server.clone(),
db: args.db.clone(),
globals: args.depend::<globals::Service>("globals"), globals: args.depend::<globals::Service>("globals"),
users: args.depend::<users::Service>("users"), users: args.depend::<users::Service>("users"),
}, },
@ -171,7 +176,9 @@ impl Service {
} }
// Unset online/unavailable presence to offline on startup // Unset online/unavailable presence to offline on startup
pub async fn unset_all_presence(&self) -> Result<()> { pub async fn unset_all_presence(&self) {
let _cork = self.services.db.cork();
for user_id in &self for user_id in &self
.services .services
.users .users
@ -184,28 +191,36 @@ impl Service {
let presence = match presence { let presence = match presence {
| Ok((_, ref presence)) => &presence.content, | Ok((_, ref presence)) => &presence.content,
| _ => return Ok(()), | _ => continue,
}; };
let need_reset = match presence.presence { if !matches!(
| PresenceState::Unavailable | PresenceState::Online => true, presence.presence,
| _ => false, PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy
}; ) {
trace!(?user_id, ?presence, "Skipping user");
if !need_reset {
continue; continue;
} }
self.set_presence( trace!(?user_id, ?presence, "Resetting presence to offline");
_ = self
.set_presence(
user_id, user_id,
&PresenceState::Offline, &PresenceState::Offline,
Some(false), Some(false),
presence.last_active_ago, presence.last_active_ago,
presence.status_msg.clone(), presence.status_msg.clone(),
) )
.await?; .await
.inspect_err(|e| {
debug_warn!(
?presence,
"{user_id} has invalid presence in database and failed to reset it to \
offline: {e}"
);
});
} }
Ok(())
} }
/// Returns the most recent presence updates that happened after the event /// Returns the most recent presence updates that happened after the event

View file

@ -123,13 +123,10 @@ impl Services {
.start() .start()
.await?; .await?;
// clear online statuses // reset dormant online/away statuses to offline, and set the server user as
if self.server.config.allow_local_presence { // online
_ = self.presence.unset_all_presence().await;
}
// set the server user as online
if self.server.config.allow_local_presence && !self.db.is_read_only() { if self.server.config.allow_local_presence && !self.db.is_read_only() {
self.presence.unset_all_presence().await;
_ = self _ = self
.presence .presence
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Online) .ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Online)