refactor presence to not involve rooms.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-04-01 20:48:40 -07:00 committed by June
parent 885224ab76
commit ca1c77d76b
11 changed files with 263 additions and 281 deletions

View file

@ -16,18 +16,9 @@ pub async fn set_presence_route(body: Ruma<set_presence::v3::Request>) -> Result
} }
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
for room_id in services().rooms.state_cache.rooms_joined(sender_user) { services()
let room_id = room_id?; .presence
.set_presence(sender_user, &body.presence, None, None, body.status_msg.clone())?;
services().presence.set_presence(
&room_id,
sender_user,
body.presence.clone(),
None,
None,
body.status_msg.clone(),
)?;
}
Ok(set_presence::v3::Response {}) Ok(set_presence::v3::Response {})
} }
@ -46,14 +37,12 @@ pub async fn get_presence_route(body: Ruma<get_presence::v3::Request>) -> Result
let mut presence_event = None; let mut presence_event = None;
for room_id in services() for _room_id in services()
.rooms .rooms
.user .user
.get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])? .get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])?
{ {
let room_id = room_id?; if let Some(presence) = services().presence.get_presence(sender_user)? {
if let Some(presence) = services().presence.get_presence(&room_id, sender_user)? {
presence_event = Some(presence); presence_event = Some(presence);
break; break;
} }

View file

@ -86,10 +86,12 @@ pub async fn set_displayname_route(
.await; .await;
} }
// Presence update if services().globals.allow_local_presence() {
services() // Presence update
.presence services()
.ping_presence(sender_user, PresenceState::Online)?; .presence
.ping_presence(sender_user, &PresenceState::Online)?;
}
Ok(set_display_name::v3::Response {}) Ok(set_display_name::v3::Response {})
} }
@ -224,10 +226,12 @@ pub async fn set_avatar_url_route(body: Ruma<set_avatar_url::v3::Request>) -> Re
.await; .await;
} }
// Presence update if services().globals.allow_local_presence() {
services() // Presence update
.presence services()
.ping_presence(sender_user, PresenceState::Online)?; .presence
.ping_presence(sender_user, &PresenceState::Online)?;
}
Ok(set_avatar_url::v3::Response {}) Ok(set_avatar_url::v3::Response {})
} }

View file

@ -171,9 +171,11 @@ async fn sync_helper(
// bool = caching allowed // bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> { ) -> Result<(sync_events::v3::Response, bool), Error> {
// Presence update // Presence update
services() if services().globals.allow_local_presence() {
.presence services()
.ping_presence(&sender_user, body.set_presence)?; .presence
.ping_presence(&sender_user, &body.set_presence)?;
}
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device); let watcher = services().globals.watch(&sender_user, &sender_device);
@ -222,6 +224,10 @@ async fn sync_helper(
.filter_map(Result::ok), .filter_map(Result::ok),
); );
if services().globals.allow_local_presence() {
process_presence_updates(&mut presence_updates, since, &sender_user).await?;
}
let all_joined_rooms = services() let all_joined_rooms = services()
.rooms .rooms
.state_cache .state_cache
@ -252,10 +258,6 @@ async fn sync_helper(
if !joined_room.is_empty() { if !joined_room.is_empty() {
joined_rooms.insert(room_id.clone(), joined_room); joined_rooms.insert(room_id.clone(), joined_room);
} }
if services().globals.allow_local_presence() {
process_room_presence_updates(&mut presence_updates, &room_id, since).await?;
}
} }
} }
@ -522,11 +524,19 @@ async fn sync_helper(
} }
} }
async fn process_room_presence_updates( async fn process_presence_updates(
presence_updates: &mut HashMap<OwnedUserId, PresenceEvent>, room_id: &RoomId, since: u64, presence_updates: &mut HashMap<OwnedUserId, PresenceEvent>, since: u64, syncing_user: &OwnedUserId,
) -> Result<()> { ) -> Result<()> {
// Take presence updates from this room // Take presence updates
for (user_id, _, presence_event) in services().presence.presence_since(room_id, since) { for (user_id, _, presence_event) in services().presence.presence_since(since) {
if !services()
.rooms
.state_cache
.user_sees_user(syncing_user, &user_id)?
{
continue;
}
match presence_updates.entry(user_id) { match presence_updates.entry(user_id) {
Entry::Vacant(slot) => { Entry::Vacant(slot) => {
slot.insert(presence_event); slot.insert(presence_event);

View file

@ -338,16 +338,13 @@ pub async fn send_transaction_message_route(
} }
for update in presence.push { for update in presence.push {
for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) { services().presence.set_presence(
services().presence.set_presence( &update.user_id,
&room_id?, &update.presence,
&update.user_id, Some(update.currently_active),
update.presence.clone(), Some(update.last_active_ago),
Some(update.currently_active), update.status_msg.clone(),
Some(update.last_active_ago), )?;
update.status_msg.clone(),
)?;
}
} }
}, },
Edu::Receipt(receipt) => { Edu::Receipt(receipt) => {

View file

@ -213,6 +213,8 @@ pub struct Config {
pub presence_idle_timeout_s: u64, pub presence_idle_timeout_s: u64,
#[serde(default = "default_presence_offline_timeout_s")] #[serde(default = "default_presence_offline_timeout_s")]
pub presence_offline_timeout_s: u64, pub presence_offline_timeout_s: u64,
#[serde(default = "true_fn")]
pub presence_timeout_remote_users: bool,
#[serde(default = "true_fn")] #[serde(default = "true_fn")]
pub allow_incoming_read_receipts: bool, pub allow_incoming_read_receipts: bool,
@ -718,9 +720,9 @@ fn default_notification_push_path() -> String { "/_matrix/push/v1/notify".to_own
fn default_turn_ttl() -> u64 { 60 * 60 * 24 } fn default_turn_ttl() -> u64 { 60 * 60 * 24 }
fn default_presence_idle_timeout_s() -> u64 { 2 * 60 } fn default_presence_idle_timeout_s() -> u64 { 5 * 60 }
fn default_presence_offline_timeout_s() -> u64 { 15 * 60 } fn default_presence_offline_timeout_s() -> u64 { 30 * 60 }
fn default_typing_federation_timeout_s() -> u64 { 30 } fn default_typing_federation_timeout_s() -> u64 { 30 }

View file

@ -1,7 +1,5 @@
use std::time::Duration; use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
use tracing::debug;
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId};
use tracing::error;
use crate::{ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
@ -12,149 +10,98 @@ use crate::{
}; };
impl service::presence::Data for KeyValueDatabase { impl service::presence::Data for KeyValueDatabase {
fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>> { fn get_presence(&self, user_id: &UserId) -> Result<Option<(u64, PresenceEvent)>> {
let key = presence_key(room_id, user_id); if let Some(count_bytes) = self.userid_presenceid.get(user_id.as_bytes())? {
let count = utils::u64_from_bytes(&count_bytes)
.map_err(|_e| Error::bad_database("No 'count' bytes in presence key"))?;
self.roomuserid_presence let key = presenceid_key(count, user_id);
.get(&key)? self.presenceid_presence
.map(|presence_bytes| -> Result<PresenceEvent> { .get(&key)?
Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id) .map(|presence_bytes| -> Result<(u64, PresenceEvent)> {
}) Ok((count, Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id)?))
.transpose() })
} .transpose()
fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> {
let Some(ref tx) = *self.presence_timer_sender else {
return Ok(());
};
let now = utils::millis_since_unix_epoch();
let mut state_changed = false;
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let key = presence_key(&room_id?, user_id);
let presence_bytes = self.roomuserid_presence.get(&key)?;
if let Some(presence_bytes) = presence_bytes {
let presence = Presence::from_json_bytes(&presence_bytes)?;
if presence.state != new_state {
state_changed = true;
break;
}
}
}
let count = if state_changed {
services().globals.next_count()?
} else { } else {
services().globals.current_count()? Ok(None)
};
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let key = presence_key(&room_id?, user_id);
let presence_bytes = self.roomuserid_presence.get(&key)?;
let new_presence = match presence_bytes {
Some(presence_bytes) => {
let mut presence = Presence::from_json_bytes(&presence_bytes)?;
presence.state = new_state.clone();
presence.currently_active = presence.state == PresenceState::Online;
presence.last_active_ts = now;
presence.last_count = count;
presence
},
None => Presence::new(new_state.clone(), new_state == PresenceState::Online, now, count, None),
};
self.roomuserid_presence
.insert(&key, &new_presence.to_json_bytes()?)?;
} }
let timeout = match new_state {
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
tx.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|e| {
error!("Failed to add presence timer: {}", e);
Error::bad_database("Failed to add presence timer")
})
} }
fn set_presence( fn set_presence(
&self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option<bool>, &self, user_id: &UserId, presence_state: &PresenceState, currently_active: Option<bool>,
last_active_ago: Option<UInt>, status_msg: Option<String>, last_active_ago: Option<UInt>, status_msg: Option<String>,
) -> Result<()> { ) -> Result<()> {
let Some(ref tx) = *self.presence_timer_sender else { let last_presence = self.get_presence(user_id)?;
return Ok(()); let state_changed = match last_presence {
None => true,
Some(ref presence) => presence.1.content.presence != *presence_state,
}; };
let now = utils::millis_since_unix_epoch(); let now = utils::millis_since_unix_epoch();
let last_active_ts = match last_active_ago { let last_last_active_ts = match last_presence {
Some(last_active_ago) => now.saturating_sub(last_active_ago.into()), None => 0,
None => now, Some((_, ref presence)) => now.saturating_sub(presence.content.last_active_ago.unwrap_or_default().into()),
}; };
let key = presence_key(room_id, user_id); let last_active_ts = match last_active_ago {
None => now,
Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
};
// tighten for state flicker?
if !state_changed && last_active_ts <= last_last_active_ts {
debug!(
"presence spam {:?} last_active_ts:{:?} <= {:?}",
user_id, last_active_ts, last_last_active_ts
);
return Ok(());
}
let presence = Presence::new( let presence = Presence::new(
presence_state, presence_state.to_owned(),
currently_active.unwrap_or(false), currently_active.unwrap_or(false),
last_active_ts, last_active_ts,
services().globals.next_count()?,
status_msg, status_msg,
); );
let count = services().globals.next_count()?;
let key = presenceid_key(count, user_id);
let timeout = match presence.state { self.presenceid_presence
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
tx.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|e| {
error!("Failed to add presence timer: {}", e);
Error::bad_database("Failed to add presence timer")
})?;
self.roomuserid_presence
.insert(&key, &presence.to_json_bytes()?)?; .insert(&key, &presence.to_json_bytes()?)?;
self.userid_presenceid
.insert(user_id.as_bytes(), &count.to_be_bytes())?;
if let Some((last_count, _)) = last_presence {
let key = presenceid_key(last_count, user_id);
self.presenceid_presence.remove(&key)?;
}
Ok(()) Ok(())
} }
fn remove_presence(&self, user_id: &UserId) -> Result<()> { fn remove_presence(&self, user_id: &UserId) -> Result<()> {
for room_id in services().rooms.state_cache.rooms_joined(user_id) { if let Some(count_bytes) = self.userid_presenceid.get(user_id.as_bytes())? {
let key = presence_key(&room_id?, user_id); let count = utils::u64_from_bytes(&count_bytes)
.map_err(|_e| Error::bad_database("No 'count' bytes in presence key"))?;
self.roomuserid_presence.remove(&key)?; let key = presenceid_key(count, user_id);
self.presenceid_presence.remove(&key)?;
self.userid_presenceid.remove(user_id.as_bytes())?;
} }
Ok(()) Ok(())
} }
fn presence_since<'a>( fn presence_since<'a>(&'a self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)> + 'a> {
&'a self, room_id: &RoomId, since: u64,
) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)> + 'a> {
let prefix = [room_id.as_bytes(), &[0xFF]].concat();
Box::new( Box::new(
self.roomuserid_presence self.presenceid_presence
.scan_prefix(prefix) .iter()
.flat_map(|(key, presence_bytes)| -> Result<(OwnedUserId, u64, PresenceEvent)> { .flat_map(|(key, presence_bytes)| -> Result<(OwnedUserId, u64, PresenceEvent)> {
let user_id = user_id_from_bytes( let (count, user_id) = presenceid_parse(&key)?;
key.rsplit(|byte| *byte == 0xFF)
.next()
.ok_or_else(|| Error::bad_database("No UserID bytes in presence key"))?,
)?;
let presence = Presence::from_json_bytes(&presence_bytes)?; let presence = Presence::from_json_bytes(&presence_bytes)?;
let presence_event = presence.to_presence_event(&user_id)?; let presence_event = presence.to_presence_event(&user_id)?;
Ok((user_id, presence.last_count, presence_event)) Ok((user_id, count, presence_event))
}) })
.filter(move |(_, count, _)| *count > since), .filter(move |(_, count, _)| *count > since),
) )
@ -162,6 +109,15 @@ impl service::presence::Data for KeyValueDatabase {
} }
#[inline] #[inline]
fn presence_key(room_id: &RoomId, user_id: &UserId) -> Vec<u8> { fn presenceid_key(count: u64, user_id: &UserId) -> Vec<u8> {
[room_id.as_bytes(), &[0xFF], user_id.as_bytes()].concat() [count.to_be_bytes().to_vec(), user_id.as_bytes().to_vec()].concat()
}
#[inline]
fn presenceid_parse(key: &[u8]) -> Result<(u64, OwnedUserId)> {
let (count, user_id) = key.split_at(8);
let user_id = user_id_from_bytes(user_id)?;
let count = utils::u64_from_bytes(count).unwrap();
Ok((count, user_id))
} }

View file

@ -28,16 +28,10 @@ use ruma::{
use serde::Deserialize; use serde::Deserialize;
#[cfg(unix)] #[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::{ use tokio::time::{interval, Instant};
sync::mpsc,
time::{interval, Instant},
};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::{ use crate::{service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services, SERVICES};
service::{presence::presence_handler, rooms::timeline::PduCount},
services, utils, Config, Error, PduEvent, Result, Services, SERVICES,
};
pub struct KeyValueDatabase { pub struct KeyValueDatabase {
db: Arc<dyn KeyValueDatabaseEngine>, db: Arc<dyn KeyValueDatabaseEngine>,
@ -65,8 +59,9 @@ pub struct KeyValueDatabase {
pub(super) userid_usersigningkeyid: Arc<dyn KvTree>, pub(super) userid_usersigningkeyid: Arc<dyn KvTree>,
pub(super) userfilterid_filter: Arc<dyn KvTree>, // UserFilterId = UserId + FilterId pub(super) userfilterid_filter: Arc<dyn KvTree>, // UserFilterId = UserId + FilterId
pub(super) todeviceid_events: Arc<dyn KvTree>, // ToDeviceId = UserId + DeviceId + Count
pub(super) todeviceid_events: Arc<dyn KvTree>, // ToDeviceId = UserId + DeviceId + Count pub(super) userid_presenceid: Arc<dyn KvTree>, // UserId => Count
pub(super) presenceid_presence: Arc<dyn KvTree>, // Count + UserId => Presence
//pub uiaa: uiaa::Uiaa, //pub uiaa: uiaa::Uiaa,
pub(super) userdevicesessionid_uiaainfo: Arc<dyn KvTree>, // User-interactive authentication pub(super) userdevicesessionid_uiaainfo: Arc<dyn KvTree>, // User-interactive authentication
@ -77,7 +72,6 @@ pub struct KeyValueDatabase {
pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId
pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
pub(super) roomuserid_presence: Arc<dyn KvTree>,
//pub rooms: rooms::Rooms, //pub rooms: rooms::Rooms,
pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
@ -185,7 +179,6 @@ pub struct KeyValueDatabase {
pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>, pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>, pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>, pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
pub(super) presence_timer_sender: Arc<Option<mpsc::UnboundedSender<(OwnedUserId, Duration)>>>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -275,14 +268,6 @@ impl KeyValueDatabase {
}, },
}; };
let presence_sender = if config.allow_local_presence {
let (presence_sender, presence_receiver) = mpsc::unbounded_channel();
Self::start_presence_handler(presence_receiver).await;
Some(presence_sender)
} else {
None
};
let db_raw = Box::new(Self { let db_raw = Box::new(Self {
db: builder.clone(), db: builder.clone(),
userid_password: builder.open_tree("userid_password")?, userid_password: builder.open_tree("userid_password")?,
@ -302,13 +287,14 @@ impl KeyValueDatabase {
userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?, userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?,
userfilterid_filter: builder.open_tree("userfilterid_filter")?, userfilterid_filter: builder.open_tree("userfilterid_filter")?,
todeviceid_events: builder.open_tree("todeviceid_events")?, todeviceid_events: builder.open_tree("todeviceid_events")?,
userid_presenceid: builder.open_tree("userid_presenceid")?,
presenceid_presence: builder.open_tree("presenceid_presence")?,
userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?, userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?,
userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()),
readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?, readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?,
roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt
roomuserid_lastprivatereadupdate: builder.open_tree("roomuserid_lastprivatereadupdate")?, roomuserid_lastprivatereadupdate: builder.open_tree("roomuserid_lastprivatereadupdate")?,
roomuserid_presence: builder.open_tree("roomuserid_presence")?,
pduid_pdu: builder.open_tree("pduid_pdu")?, pduid_pdu: builder.open_tree("pduid_pdu")?,
eventid_pduid: builder.open_tree("eventid_pduid")?, eventid_pduid: builder.open_tree("eventid_pduid")?,
roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
@ -404,7 +390,6 @@ impl KeyValueDatabase {
our_real_users_cache: RwLock::new(HashMap::new()), our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()),
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
presence_timer_sender: Arc::new(presence_sender),
}); });
let db = Box::leak(db_raw); let db = Box::leak(db_raw);
@ -1059,6 +1044,10 @@ impl KeyValueDatabase {
services().sending.start_handler(); services().sending.start_handler();
if config.allow_local_presence {
services().presence.start_handler();
}
Self::start_cleanup_task().await; Self::start_cleanup_task().await;
if services().globals.allow_check_for_updates() { if services().globals.allow_check_for_updates() {
Self::start_check_for_updates_task().await; Self::start_check_for_updates_task().await;
@ -1180,15 +1169,6 @@ impl KeyValueDatabase {
} }
}); });
} }
async fn start_presence_handler(presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>) {
tokio::spawn(async move {
match presence_handler(presence_timer_receiver).await {
Ok(()) => warn!("Presence maintenance task finished"),
Err(e) => error!("Presence maintenance task finished with error: {e}"),
}
});
}
} }
/// Sets the emergency password and push rules for the @conduit account in case /// Sets the emergency password and push rules for the @conduit account in case

View file

@ -31,7 +31,7 @@ pub struct Services<'a> {
pub uiaa: uiaa::Service, pub uiaa: uiaa::Service,
pub users: users::Service, pub users: users::Service,
pub account_data: account_data::Service, pub account_data: account_data::Service,
pub presence: presence::Service, pub presence: Arc<presence::Service>,
pub admin: Arc<admin::Service>, pub admin: Arc<admin::Service>,
pub globals: globals::Service<'a>, pub globals: globals::Service<'a>,
pub key_backups: key_backups::Service, pub key_backups: key_backups::Service,
@ -155,9 +155,7 @@ impl Services<'_> {
account_data: account_data::Service { account_data: account_data::Service {
db, db,
}, },
presence: presence::Service { presence: presence::Service::build(db, config),
db,
},
admin: admin::Service::build(), admin: admin::Service::build(),
key_backups: key_backups::Service { key_backups: key_backups::Service {
db, db,

View file

@ -1,18 +1,14 @@
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId}; use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
use crate::Result; use crate::Result;
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
/// Returns the latest presence event for the given user in the given room. /// Returns the latest presence event for the given user.
fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>>; fn get_presence(&self, user_id: &UserId) -> Result<Option<(u64, PresenceEvent)>>;
/// Pings the presence of the given user in the given room, setting the
/// specified state.
fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()>;
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
fn set_presence( fn set_presence(
&self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option<bool>, &self, user_id: &UserId, presence_state: &PresenceState, currently_active: Option<bool>,
last_active_ago: Option<UInt>, status_msg: Option<String>, last_active_ago: Option<UInt>, status_msg: Option<String>,
) -> Result<()>; ) -> Result<()>;
@ -21,7 +17,5 @@ pub trait Data: Send + Sync {
/// Returns the most recent presence updates that happened after the event /// Returns the most recent presence updates that happened after the event
/// with id `since`. /// with id `since`.
fn presence_since<'a>( fn presence_since<'a>(&'a self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)> + 'a>;
&'a self, room_id: &RoomId, since: u64,
) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)> + 'a>;
} }

View file

@ -1,19 +1,22 @@
mod data; mod data;
use std::time::Duration; use std::{sync::Arc, time::Duration};
pub use data::Data; pub use data::Data;
use futures_util::{stream::FuturesUnordered, StreamExt}; use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
events::presence::{PresenceEvent, PresenceEventContent}, events::presence::{PresenceEvent, PresenceEventContent},
presence::PresenceState, presence::PresenceState,
OwnedUserId, RoomId, UInt, UserId, OwnedUserId, UInt, UserId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, time::sleep}; use tokio::{
use tracing::debug; sync::{mpsc, Mutex},
time::sleep,
};
use tracing::{debug, error};
use crate::{services, utils, Error, Result}; use crate::{services, utils, Config, Error, Result};
/// Represents data required to be kept in order to implement the presence /// Represents data required to be kept in order to implement the presence
/// specification. /// specification.
@ -22,19 +25,15 @@ pub struct Presence {
pub state: PresenceState, pub state: PresenceState,
pub currently_active: bool, pub currently_active: bool,
pub last_active_ts: u64, pub last_active_ts: u64,
pub last_count: u64,
pub status_msg: Option<String>, pub status_msg: Option<String>,
} }
impl Presence { impl Presence {
pub fn new( pub fn new(state: PresenceState, currently_active: bool, last_active_ts: u64, status_msg: Option<String>) -> Self {
state: PresenceState, currently_active: bool, last_active_ts: u64, last_count: u64, status_msg: Option<String>,
) -> Self {
Self { Self {
state, state,
currently_active, currently_active,
last_active_ts, last_active_ts,
last_count,
status_msg, status_msg,
} }
} }
@ -72,27 +71,94 @@ impl Presence {
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>,
timer_receiver: Mutex<mpsc::UnboundedReceiver<(OwnedUserId, Duration)>>,
timeout_remote_users: bool,
} }
impl Service { impl Service {
/// Returns the latest presence event for the given user in the given room. pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
pub fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>> { let (timer_sender, timer_receiver) = mpsc::unbounded_channel();
self.db.get_presence(room_id, user_id)
Arc::new(Self {
db,
timer_sender,
timer_receiver: Mutex::new(timer_receiver),
timeout_remote_users: config.presence_timeout_remote_users,
})
}
pub fn start_handler(self: &Arc<Self>) {
let self_ = Arc::clone(self);
tokio::spawn(async move {
self_
.handler()
.await
.expect("Failed to start presence handler");
});
}
/// Returns the latest presence event for the given user.
pub fn get_presence(&self, user_id: &UserId) -> Result<Option<PresenceEvent>> {
if let Some((_, presence)) = self.db.get_presence(user_id)? {
Ok(Some(presence))
} else {
Ok(None)
}
} }
/// Pings the presence of the given user in the given room, setting the /// Pings the presence of the given user in the given room, setting the
/// specified state. /// specified state.
pub fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> { pub fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
self.db.ping_presence(user_id, new_state) let last_presence = self.db.get_presence(user_id)?;
let state_changed = match last_presence {
None => true,
Some((_, ref presence)) => presence.content.presence != *new_state,
};
let last_last_active_ago = match last_presence {
None => 0_u64,
Some((_, ref presence)) => presence.content.last_active_ago.unwrap_or_default().into(),
};
const REFRESH_TIMEOUT: u64 = 60 * 25 * 1000;
if !state_changed && last_last_active_ago < REFRESH_TIMEOUT {
return Ok(());
}
let status_msg = match last_presence {
Some((_, ref presence)) => presence.content.status_msg.clone(),
None => Some(String::new()),
};
let last_active_ago = UInt::new(0);
let currently_active = *new_state == PresenceState::Online;
self.set_presence(user_id, new_state, Some(currently_active), last_active_ago, status_msg)
} }
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
pub fn set_presence( pub fn set_presence(
&self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option<bool>, &self, user_id: &UserId, presence_state: &PresenceState, currently_active: Option<bool>,
last_active_ago: Option<UInt>, status_msg: Option<String>, last_active_ago: Option<UInt>, status_msg: Option<String>,
) -> Result<()> { ) -> Result<()> {
self.db self.db
.set_presence(room_id, user_id, presence_state, currently_active, last_active_ago, status_msg) .set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)?;
if self.timeout_remote_users || user_id.server_name() == services().globals.server_name() {
let timeout = match presence_state {
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
self.timer_sender
.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|e| {
error!("Failed to add presence timer: {}", e);
Error::bad_database("Failed to add presence timer")
})?;
}
Ok(())
} }
/// Removes the presence record for the given user from the database. /// Removes the presence record for the given user from the database.
@ -100,29 +166,23 @@ impl Service {
/// Returns the most recent presence updates that happened after the event /// Returns the most recent presence updates that happened after the event
/// with id `since`. /// with id `since`.
pub fn presence_since( pub fn presence_since(&self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)>> {
&self, room_id: &RoomId, since: u64, self.db.presence_since(since)
) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)>> {
self.db.presence_since(room_id, since)
} }
}
pub async fn presence_handler( async fn handler(&self) -> Result<()> {
mut presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>, let mut presence_timers = FuturesUnordered::new();
) -> Result<()> { let mut receiver = self.timer_receiver.lock().await;
let mut presence_timers = FuturesUnordered::new(); loop {
tokio::select! {
Some((user_id, timeout)) = receiver.recv() => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout));
}
loop { Some(user_id) = presence_timers.next() => {
debug!("Number of presence timers: {}", presence_timers.len()); process_presence_timer(&user_id)?;
}
tokio::select! {
Some((user_id, timeout)) = presence_timer_receiver.recv() => {
debug!("Adding timer for user '{user_id}': Timeout {timeout:?}");
presence_timers.push(presence_timer(user_id, timeout));
}
Some(user_id) = presence_timers.next() => {
process_presence_timer(&user_id)?;
} }
} }
} }
@ -142,16 +202,12 @@ fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> {
let mut last_active_ago = None; let mut last_active_ago = None;
let mut status_msg = None; let mut status_msg = None;
for room_id in services().rooms.state_cache.rooms_joined(user_id) { let presence_event = services().presence.get_presence(user_id)?;
let presence_event = services().presence.get_presence(&room_id?, user_id)?;
if let Some(presence_event) = presence_event { if let Some(presence_event) = presence_event {
presence_state = presence_event.content.presence; presence_state = presence_event.content.presence;
last_active_ago = presence_event.content.last_active_ago; last_active_ago = presence_event.content.last_active_ago;
status_msg = presence_event.content.status_msg; status_msg = presence_event.content.status_msg;
break;
}
} }
let new_state = match (&presence_state, last_active_ago.map(u64::from)) { let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
@ -163,16 +219,9 @@ fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> {
debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}"); debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}");
if let Some(new_state) = new_state { if let Some(new_state) = new_state {
for room_id in services().rooms.state_cache.rooms_joined(user_id) { services()
services().presence.set_presence( .presence
&room_id?, .set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)?;
user_id,
new_state.clone(),
Some(false),
last_active_ago,
status_msg.clone(),
)?;
}
} }
Ok(()) Ok(())

View file

@ -1,4 +1,5 @@
use std::{ use std::{
cmp,
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc, sync::Arc,
@ -413,7 +414,7 @@ impl Service {
// Fail if a request has failed recently (exponential backoff) // Fail if a request has failed recently (exponential backoff)
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24);
let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries);
min_elapsed_duration = std::cmp::min(min_elapsed_duration, MAX_DURATION); min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION);
if time.elapsed() < min_elapsed_duration { if time.elapsed() < min_elapsed_duration {
allow = false; allow = false;
} else { } else {
@ -448,9 +449,6 @@ impl Service {
.filter_map(Result::ok) .filter_map(Result::ok)
.filter(|user_id| user_id.server_name() == services().globals.server_name()), .filter(|user_id| user_id.server_name() == services().globals.server_name()),
); );
if !select_edus_presence(&room_id, since, &mut max_edu_count, &mut events)? {
break;
}
if !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)? { if !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)? {
break; break;
} }
@ -472,29 +470,36 @@ impl Service {
events.push(serde_json::to_vec(&edu).expect("json can be serialized")); events.push(serde_json::to_vec(&edu).expect("json can be serialized"));
} }
if services().globals.allow_outgoing_presence() {
select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?;
}
Ok((events, max_edu_count)) Ok((events, max_edu_count))
} }
} }
/// Look for presence [in this room] <--- XXX /// Look for presence
#[tracing::instrument(skip(room_id, since, max_edu_count, events))] #[tracing::instrument(skip(server_name, since, max_edu_count, events))]
pub fn select_edus_presence( pub fn select_edus_presence(
room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> { ) -> Result<bool> {
if !services().globals.allow_outgoing_presence() { // Look for presence updates for this server
return Ok(true);
}
// Look for presence updates in this room
let mut presence_updates = Vec::new(); let mut presence_updates = Vec::new();
for (user_id, count, presence_event) in services().presence.presence_since(room_id, since) { for (user_id, count, presence_event) in services().presence.presence_since(since) {
if count > *max_edu_count { *max_edu_count = cmp::max(count, *max_edu_count);
*max_edu_count = count;
}
if user_id.server_name() != services().globals.server_name() { if user_id.server_name() != services().globals.server_name() {
continue; continue;
} }
if !services()
.rooms
.state_cache
.server_sees_user(server_name, &user_id)?
{
continue;
}
presence_updates.push(PresenceUpdate { presence_updates.push(PresenceUpdate {
user_id, user_id,
presence: presence_event.content.presence, presence: presence_event.content.presence,
@ -524,10 +529,8 @@ pub fn select_edus_receipts(
.readreceipts_since(room_id, since) .readreceipts_since(room_id, since)
{ {
let (user_id, count, read_receipt) = r?; let (user_id, count, read_receipt) = r?;
*max_edu_count = cmp::max(count, *max_edu_count);
if count > *max_edu_count {
*max_edu_count = count;
}
if user_id.server_name() != services().globals.server_name() { if user_id.server_name() != services().globals.server_name() {
continue; continue;
} }